001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.management;
020
021import java.net.*;
022import java.util.*;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.atomic.*;
025
026import javax.management.*;
027import javax.management.remote.*;
028import javax.management.remote.generic.GenericConnector;
029
030import org.jppf.jmx.*;
031import org.jppf.ssl.SSLHelper;
032import org.jppf.utils.*;
033import org.jppf.utils.concurrent.ThreadSynchronization;
034import org.jppf.utils.configuration.JPPFProperties;
035import org.slf4j.*;
036
037/**
038 * Wrapper around a JMX connection, providing a thread-safe way of handling disconnections and recovery.
039 * @author Laurent Cohen
040 */
041public abstract class AbstractJMXConnectionWrapper extends ThreadSynchronization implements JPPFAdminMBean, AutoCloseable {
042  /**
043   * Explicit serialVersionUID.
044   */
045  private static final long serialVersionUID = 1L;
046  /** Logger for this class. */
047  private static Logger log = LoggerFactory.getLogger(AbstractJMXConnectionWrapper.class);
048  /** Determines whether debug log statements are enabled. */
049  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
050  /** Prefix for the name given to the connection thread. */
051  public static String CONNECTION_NAME_PREFIX = "jmx@";
052  /** The timeout in millis for JMX connection attempts. A value of 0 or less means no timeout. */
053  static final long CONNECTION_TIMEOUT = JPPFConfiguration.get(JPPFProperties.MANAGEMENT_CONNECTION_TIMEOUT);
054  /** URL of the MBean server, in a JMX-compliant format. */
055  protected JMXServiceURL url;
056  /** The JMX client. */
057  protected JMXConnector jmxc;
058  /** A connection to the MBean server. */
059  protected AtomicReference<MBeanServerConnection> mbeanConnection = new AtomicReference<>(null);
060  /** The host the server is running on. */
061  protected String host;
062  /** The RMI port used by the server. */
063  protected int port;
064  /** The connection thread that performs the connection to the management server. */
065  protected AtomicReference<JMXConnectionThread> connectionThread = new AtomicReference<>(null);
066  /** A string representing this connection, used for logging purposes. */
067  protected String idString;
068  /** A string representing this connection, used for displaying in the admin conosle. */
069  protected String displayName;
070  /** Determines whether the connection to the JMX server has been established. */
071  protected AtomicBoolean connected = new AtomicBoolean(false);
072  /** Determines whether this connection has been closed by a all to the {@link #close()} method. */
073  protected AtomicBoolean closed = new AtomicBoolean(false);
074  /** Determines whether the connection to the JMX server has been established. */
075  protected boolean local;
076  /** JMX properties used for establishing the connection. */
077  protected Map<String, Object> env = new HashMap<>();
078  /** Determines whether the JMX connection should be secure or not. */
079  protected boolean sslEnabled;
080  /** Used to synchronize during the connection process. */
081  final Object connectionLock = new Object();
082  /** The list of listeners to this connection wrapper. */
083  final List<JMXWrapperListener> listeners = new CopyOnWriteArrayList<>();
084  /** The time at which connection attempts started. */
085  long connectionStart;
086  /** Whether to try to reconnect upon error. */
087  boolean reconnectOnError = true;
088  /**
089   * The JMX remote protocol.
090   */
091  private final String protocol;
092
093  /**
094   * Initialize a local connection (same JVM) to the MBean server.
095   */
096  public AbstractJMXConnectionWrapper() {
097    local = true;
098    idString = displayName = "local";
099    host = "local";
100    this.protocol = JMXHelper.LOCAL_PROTOCOL;
101  }
102
103  /**
104   * Initialize the connection to the remote MBean server.
105   * @param host the host the server is running on.
106   * @param port the port used by the server.
107   * @param sslEnabled specifies whether the jmx connection should be secure or not.
108   */
109  public AbstractJMXConnectionWrapper(final String host, final int port, final boolean sslEnabled) {
110    this(JPPFConfiguration.get(JPPFProperties.JMX_REMOTE_PROTOCOL), host, port, sslEnabled);
111  }
112
113  /**
114   * Initialize the connection to the remote MBean server.
115   * @param protocol the JMX remote protocol to use.
116   * @param host the host the server is running on.
117   * @param port the port used by the server.
118   * @param sslEnabled specifies whether the jmx connection should be secure or not.
119   */
120  public AbstractJMXConnectionWrapper(final String protocol, final String host, final int port, final boolean sslEnabled) {
121    this.protocol = protocol;
122    try {
123      this.host = (NetworkUtils.isIPv6Address(host)) ? "[" + host + "]" : host;
124      this.port = port;
125      this.sslEnabled = sslEnabled;
126      idString = this.host + ':' + this.port;
127      this.displayName = this.idString;
128      //url = new JMXServiceURL("service:jmx:jmxmp://" + idString);
129      url = new JMXServiceURL(protocol, host, port);
130      if (sslEnabled) SSLHelper.configureJMXProperties(protocol, env);
131      if (JMXHelper.JMXMP_PROTOCOL.equals(protocol)) initJMXMP();
132      else initJPPF();
133      ClassLoader cl = Thread.currentThread().getContextClassLoader();
134      if (cl == null) cl = getClass().getClassLoader();
135      env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_CLASS_LOADER, cl);
136      env.put(JMXConnectorFactory.DEFAULT_CLASS_LOADER, cl);
137      if (debugEnabled) log.debug("created {} with sslEnabled={}, url={}, env={}", getClass().getSimpleName(), this.sslEnabled, url, env);
138    } catch(final Exception e) {
139      log.error(e.getMessage(), e);
140    }
141    local = false;
142  }
143
144  /**
145   * Initialize the environment for the JMXMP protocol.
146   * @throws Exception if any error occcurs.
147   */
148  private void initJMXMP() throws Exception {
149    env.put(GenericConnector.OBJECT_WRAPPING, JMXMPServer.newObjectWrapping());
150    env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "com.sun.jmx.remote.protocol");
151    env.put("jmx.remote.x.server.max.threads", 1);
152    env.put("jmx.remote.x.client.connection.check.period", 0);
153    env.put("jmx.remote.x.request.timeout", JPPFConfiguration.get(JPPFProperties.JMX_REMOTE_REQUEST_TIMEOUT));
154  }
155
156  /**
157   * Initialize the environment for the JPPF JMX remote protocol.
158   * @throws Exception if any error occcurs.
159   */
160  private void initJPPF() throws Exception {
161    env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "org.jppf.jmxremote.protocol");
162    env.put(JPPFJMXProperties.REQUEST_TIMEOUT.getName(), JPPFConfiguration.get(JPPFJMXProperties.REQUEST_TIMEOUT));
163    env.put(JPPFJMXProperties.TLS_ENABLED.getName(), Boolean.valueOf(sslEnabled).toString());
164  }
165
166  /**
167   * Initialize the connection to the remote MBean server.
168   */
169  public abstract void connect();
170
171  /**
172   * Initiate the connection and wait until the connection is established or the timeout has expired, whichever comes first.
173   * @param timeout the maximum time to wait for, a value of zero means no timeout and
174   * this method just waits until the connection is established.
175   * @return {@code true} if the connection was established in the specified time, {@code false} otherwise.
176   */
177  public abstract boolean connectAndWait(final long timeout);
178
179  /**
180   * Initialize the connection to the remote MBean server.
181   * @throws Exception if the connection could not be established.
182   */
183  void performConnection() throws Exception {
184    connected.set(false);
185    final long elapsed;
186    synchronized(this) {
187      elapsed = (System.nanoTime() - connectionStart) / 1_000_000L;
188    }
189    if ((CONNECTION_TIMEOUT > 0L) && (elapsed >= CONNECTION_TIMEOUT)) {
190      fireTimeout();
191      close();
192      return;
193    }
194    synchronized(connectionLock) {
195      if (jmxc == null) {
196        jmxc = JMXConnectorFactory.newJMXConnector(url, env);
197        jmxc.addConnectionNotificationListener(new NotificationListener() {
198          @Override
199          public void handleNotification(final Notification notification, final Object handback) {
200            if (JMXConnectionNotification.FAILED.equals(notification.getType())) reset();
201          }
202        }, null, null);
203      }
204      jmxc.connect();
205      //connectionThread.get().close();
206      connectionThread.get().setStopped(true);
207      connectionThread.set(null);
208    }
209    synchronized(this) {
210      mbeanConnection.set(jmxc.getMBeanServerConnection());
211      try {
212        setHost(InetAddress.getByName(host).getHostName());
213      } catch (@SuppressWarnings("unused") final UnknownHostException e) {
214      }
215    }
216    connected.set(true);
217    wakeUp();
218    fireConnected();
219    if (debugEnabled) log.debug(getId() + " JMX connection successfully established");
220  }
221
222  /**
223   * Reset the JMX connection and attempt to reconnect.
224   */
225  void reset() {
226    connected.set(false);
227    if (jmxc != null) {
228      try {
229        jmxc.close();
230      } catch(final Exception e2) {
231        if (debugEnabled) log.debug(e2.getMessage(), e2);
232      }
233      jmxc = null;
234    }
235    if (isReconnectOnError()) connect();
236  }
237
238  /**
239   * Get the host the server is running on.
240   * @return the host as a string.
241   */
242  public String getHost() {
243    return host;
244  }
245
246  /**
247   * Get the host the server is running on.
248   * @param host the host as a string.
249   */
250  public void setHost(final String host) {
251    this.host = host;
252    this.displayName = this.host + ':' + this.port;
253  }
254
255  /**
256   * Get a string describing this connection.
257   * @return a string in the format host:port.
258   */
259  public String getId() {
260    return idString;
261  }
262
263  /**
264   * Get the string representing this connection, used for displaying in the admin conosle.
265   * @return the display name as a string.
266   */
267  public String getDisplayName() {
268    return displayName;
269  }
270
271  @Override
272  public String toString() {
273    return  new StringBuilder(getClass().getSimpleName()).append('[').append("url=").append(url).append(", connected=").append(connected)
274      .append(", local=").append(local).append(", secure=").append(sslEnabled).append(']').toString();
275  }
276
277  /**
278   * Add a listener to this connection wrapper.
279   * @param listener the listener to add.
280   */
281  public void addJMXWrapperListener(final JMXWrapperListener listener) {
282    listeners.add(listener);
283  }
284
285  /**
286   * Remove a listener from this connection wrapper.
287   * @param listener the listener to add.
288   */
289  public void removeJMXWrapperListener(final JMXWrapperListener listener) {
290    listeners.remove(listener);
291  }
292
293  /**
294   * Notify all listeners that the connection was successful.
295   */
296  protected void fireConnected() {
297    final JMXWrapperEvent event = new JMXWrapperEvent(this);
298    final Runnable r = new Runnable() {
299      @Override
300      public void run() {
301        for (JMXWrapperListener listener: listeners) listener.jmxWrapperConnected(event);
302      }
303    };
304    //new Thread(r, getDisplayName() + " connection notifier").start();
305    r.run();
306  }
307
308  /**
309   * Notify all listeners that the connection could not be established before reaching the timeout.
310   */
311  protected void fireTimeout() {
312    final JMXWrapperEvent event = new JMXWrapperEvent(this);
313    for (final JMXWrapperListener listener: listeners) listener.jmxWrapperTimeout(event);
314  }
315
316  /**
317   * @return Whether this connection wrapper reconnects on error.
318   * @exclude
319   */
320  public synchronized boolean isReconnectOnError() {
321    return reconnectOnError;
322  }
323
324  /**
325   * Specifiy whether this connection wrapper reconnects on error.
326   * @param reconnectOnError {@code true} to reconnect, {@code false} otherwise.
327   * @exclude
328   */
329  public synchronized void setReconnectOnError(final boolean reconnectOnError) {
330    this.reconnectOnError = reconnectOnError;
331  }
332
333  /**
334   * Get the JMX remote protocol used.
335   * @return the JMX remote protocol string.
336   */
337  public String getProtocol() {
338    return protocol;
339  }
340}