001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.monitoring.topology;
020
021import java.util.*;
022import java.util.concurrent.*;
023
024import org.jppf.client.*;
025import org.jppf.client.event.*;
026import org.jppf.management.NodeSelector;
027import org.jppf.utils.*;
028import org.jppf.utils.concurrent.JPPFThreadFactory;
029import org.jppf.utils.configuration.JPPFProperties;
030import org.slf4j.*;
031
032/**
033 * Instances of this class discover and maintain a representation of a JPPF grid topology.
034 * @author Laurent Cohen
035 * @since 5.0
036 */
037public class TopologyManager extends ConnectionPoolListenerAdapter implements AutoCloseable {
038  /**
039   * Logger for this class.
040   */
041  static Logger log = LoggerFactory.getLogger(TopologyManager.class);
042  /**
043   * Determines whether debug log statements are enabled.
044   */
045  static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
046  /**
047   * Mapping of driver uuids to the corresponding {@link TopologyDriver} objects.
048   */
049  private final Map<String, TopologyDriver> driverMap = new Hashtable<>();
050  /**
051   * Synchronization lock.
052   */
053  private final Object driversLock = new Object();
054  /**
055   * Mapping of peer driver uuids to the corresponding {@link TopologyPeer} objects.
056   */
057  private final Map<String, TopologyPeer> peerMap = new Hashtable<>();
058  /**
059   * Mapping of node uuids to the corresponding {@link TopologyNode} objects.
060   */
061  private final Map<String, TopologyNode> nodeMap = new Hashtable<>();
062  /**
063   * Mapping of the driver connections to their assocated status listener.
064   */
065  private final Map<JPPFClientConnection, ClientConnectionStatusListener> statusListenerMap = new Hashtable<>();
066  /**
067   * List of listeners to the changes in the topology.
068   */
069  private final List<TopologyListener> listeners = new CopyOnWriteArrayList<>();
070  /**
071   * Separate thread used to sequentialize events emitted by this topology manager.
072   */
073  private ExecutorService executor = Executors.newSingleThreadExecutor(new JPPFThreadFactory("TopologyEvents"));
074  /**
075   * The JPPF client.
076   */
077  private final JPPFClient client;
078  /**
079   * Refreshes the states of the nodes at rehular intervals.
080   */
081  final NodeRefreshHandler refreshHandler;
082  /**
083   * Refreshes the latests JVM health snapshots of the drivers and nodes at rehular intervals.
084   */
085  final JVMHealthRefreshHandler jvmHealthRefreshHandler;
086  /**
087   * Use to filter the nodes and associated events.
088   * @since 5.2
089   */
090  private NodeSelector nodeFilter;
091
092  /**
093   * Initialize this topology manager with a new {@link JPPFClient} and the specified listeners.
094   * The refresh intervals are determined from the configuration, or take a default value of 1000L if they are not configured.
095   * @param listeners a set of listeners to subscribe immediately for topology events.
096   */
097  public TopologyManager(final TopologyListener...listeners) {
098    this(null, listeners);
099  }
100
101  /**
102   * Initialize this topology manager with a new {@link JPPFClient} and the specified listeners.
103   * @param topologyRefreshInterval the interval in millis between refreshes of the topology.
104   * @param jvmHealthRefreshInterval the interval in millis between refreshes of the JVM health data.
105   * @param listeners a set of listeners to subscribe immediately for topology events.
106   */
107  public TopologyManager(final long topologyRefreshInterval, final long jvmHealthRefreshInterval, final TopologyListener...listeners) {
108    this(topologyRefreshInterval, jvmHealthRefreshInterval, null, false, listeners);
109  }
110
111  /**
112   * Initialize this topology manager with the specified {@link JPPFClient} and listeners.
113   * The refresh intervals are determined from the configuration, or take a default value of 1000L if they are not configured.
114   * @param client the JPPF client used to discover and monitor the grid topology.
115   * @param listeners a set of listeners to subscribe immediately for topology events.
116   */
117  public TopologyManager(final JPPFClient client, final TopologyListener...listeners) {
118    this(client == null ? -1 : client.getConfig().get(JPPFProperties.ADMIN_REFRESH_INTERVAL_TOPOLOGY),
119      client == null ? -1 : client.getConfig().get(JPPFProperties.ADMIN_REFRESH_INTERVAL_HEALTH), client, false, listeners);
120  }
121
122  /**
123   * Initialize this topology manager with the specified {@link JPPFClient} and listeners.
124   * @param topologyRefreshInterval the interval in millis between refreshes of the topology.
125   * @param jvmHealthRefreshInterval the interval in millis between refreshes of the JVM health data.
126   * @param client the JPPF client used to discover and monitor the grid topology.
127   * @param listeners a set of listeners to subscribe immediately for topology events.
128   */
129  public TopologyManager(final long topologyRefreshInterval, final long jvmHealthRefreshInterval, final JPPFClient client, final TopologyListener...listeners) {
130    this(topologyRefreshInterval, jvmHealthRefreshInterval, client, false, listeners);
131  }
132
133  /**
134   * Initialize this topology manager with the specified {@link JPPFClient} and listeners.
135   * @param topologyRefreshInterval the interval in millis between refreshes of the topology.
136   * @param jvmHealthRefreshInterval the interval in millis between refreshes of the JVM health data.
137   * @param client the JPPF client used to discover and monitor the grid topology.
138   * @param listeners a set of listeners to subscribe immediately for topology events.
139   * @param loadSystemInfo whether the system info of the nodes should be loaded.
140   */
141  public TopologyManager(final long topologyRefreshInterval, final long jvmHealthRefreshInterval, final JPPFClient client, final boolean loadSystemInfo, final TopologyListener...listeners) {
142    long n1 = 0, n2 = 0;
143    if (client == null) {
144      this.client = new JPPFClient(this);
145      n1 = this.client.getConfig().get(JPPFProperties.ADMIN_REFRESH_INTERVAL_TOPOLOGY);
146      n2 = this.client.getConfig().get(JPPFProperties.ADMIN_REFRESH_INTERVAL_HEALTH);
147    } else {
148      this.client = client;
149      n1 = topologyRefreshInterval;
150      n2 = jvmHealthRefreshInterval;
151    }
152    this.refreshHandler = new NodeRefreshHandler(this, n1, loadSystemInfo);
153    this.jvmHealthRefreshHandler = new JVMHealthRefreshHandler(this, n2);
154    if (client != null) client.addConnectionPoolListener(this);
155    if (listeners != null) for (TopologyListener listener: listeners) addTopologyListener(listener);
156    init();
157  }
158
159  /**
160   * Initialize the topology tree.
161   */
162  private void init() {
163    for (final JPPFConnectionPool pool: client.getConnectionPools()) {
164      List<JPPFClientConnection> list = pool.getConnections(JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
165      if (list.isEmpty()) list = pool.getConnections();
166      if (!list.isEmpty()) {
167        final JPPFClientConnection c = list.get(0);
168        connectionAdded(new ConnectionPoolEvent(pool, c));
169      }
170    }
171  }
172
173  /**
174   * Get the drivers currently handled.
175   * @return a list of {@link TopologyDriver} instances.
176   */
177  public List<TopologyDriver> getDrivers() {
178    synchronized(driversLock) {
179      return new ArrayList<>(driverMap.values());
180    }
181  }
182
183  /**
184   * Get the driver with the specified uuid.
185   * @param uuid the uuid of the driver to lookup.
186   * @return a {@link TopologyDriver} instance.
187   */
188  public TopologyDriver getDriver(final String uuid) {
189    synchronized(driversLock) {
190      return driverMap.get(uuid);
191    }
192  }
193
194  /**
195   * Get the nodes currently handled.
196   * @return a list of {@link TopologyNode} instances.
197   */
198  public List<TopologyNode> getNodes() {
199    synchronized(nodeMap) {
200      return new ArrayList<>(nodeMap.values());
201    }
202  }
203
204  /**
205   * Get the nodes that are slaves of the specified master node.
206   * @param masterNodeUuid the UUID of the master node whose slaves to lookup.
207   * @return a list of {@link TopologyNode} instances, possibly empty but never {@code null}.
208   * @since 6.0
209   */
210  public List<TopologyNode> getSlaveNodes(final String masterNodeUuid) {
211    final List<TopologyNode> result = new ArrayList<>(getNodeCount());
212    if (masterNodeUuid != null) {
213      synchronized(nodeMap) {
214        for (final Map.Entry<String, TopologyNode> entry: nodeMap.entrySet()) {
215          final TopologyNode node = entry.getValue();
216          final String uuid = node.getMasterUuid();
217          if ((uuid != null) && uuid.equals(masterNodeUuid)) result.add(node);
218        }
219      }
220    }
221    return result;
222  }
223
224  /**
225   * Get the node with the specified uuid.
226   * @param uuid the uuid of the driver to lookup.
227   * @return a {@link TopologyNode} instance.
228   */
229  public TopologyNode getNode(final String uuid) {
230    synchronized(nodeMap) {
231      return nodeMap.get(uuid);
232    }
233  }
234
235  /**
236   * Get the peers currently handled.
237   * @return a list of {@link TopologyPeer} instances.
238   */
239  public List<TopologyPeer> getPeers() {
240    synchronized(peerMap) {
241      return new ArrayList<>(peerMap.values());
242    }
243  }
244
245  /**
246   * Get the peer with the specified uuid.
247   * @param uuid the uuid of the driver to lookup.
248   * @return a {@link TopologyPeer} instance.
249   */
250  public TopologyPeer getPeer(final String uuid) {
251    synchronized(peerMap) {
252      return peerMap.get(uuid);
253    }
254  }
255
256  /**
257   * Get the number of drivers currently handled.
258   * @return the number of drivers.
259   */
260  public int getDriverCount() {
261    synchronized(driversLock) {
262      return driverMap.size();
263    }
264  }
265
266  /**
267   * Get the number of nodes currently handled.
268   * @return the number of nodes.
269   */
270  public int getNodeCount() {
271    synchronized(nodeMap) {
272      return nodeMap.size();
273    }
274  }
275
276  /**
277   * Get the number of peers currently handled.
278   * @return the number of peers.
279   */
280  public int getPeerCount() {
281    synchronized(peerMap) {
282      return peerMap.size();
283    }
284  }
285
286  /**
287   * Get the node with the psecified uuid.
288   * @param uuid the uuid of the node to lookup.
289   * @return a {@link TopologyNode} instance.
290   */
291  public TopologyNode getNodeOrPeer(final String uuid) {
292    TopologyNode node = getNode(uuid);
293    if (node == null) node = getPeer(uuid);
294    return node;
295  }
296
297  /**
298   * {@inheritDoc}}
299   * @exclude
300   */
301  @Override
302  public void connectionAdded(final ConnectionPoolEvent event) {
303    final JPPFClientConnection c = event.getConnection();
304    final StatusListener listener = new StatusListener();
305    if (c.getStatus().isWorkingStatus()) {
306      final TopologyDriver driver = new TopologyDriver(c);
307      if (debugEnabled) log.debug("before adding driver {}", driver);
308      driverAdded(driver);
309    }
310    statusListenerMap.put(c, listener);
311    c.addClientConnectionStatusListener(listener);
312  }
313
314  /**
315   * {@inheritDoc}}
316   * @exclude
317   */
318  @Override
319  public void connectionRemoved(final ConnectionPoolEvent event) {
320    final JPPFClientConnection c = event.getConnection();
321    final String uuid = c.getDriverUuid();
322    if (uuid != null) {
323      final TopologyDriver driver = driverMap.remove(uuid);
324      if (driver != null) driverRemoved(driver);
325    }
326    final StatusListener listener = (StatusListener) statusListenerMap.remove(c);
327    if (listener != null) c.removeClientConnectionStatusListener(listener);
328  }
329
330  /**
331   * Add a topology change listener.
332   * @param listener the listener to add.
333   */
334  public void addTopologyListener(final TopologyListener listener) {
335    if (listener == null) throw new IllegalArgumentException("cannot add a null listener");
336    listeners.add(listener);
337  }
338
339  /**
340   * Remove a topology change listener.
341   * @param listener the listener to remove.
342   */
343  public void removeTopologyListener(final TopologyListener listener) {
344    if (listener == null) throw new IllegalArgumentException("cannot remove a null listener");
345    listeners.remove(listener);
346  }
347
348  /**
349   * Notify all listeners that a driver was added.
350   * @param driver the driver to add.
351   */
352  void driverAdded(final TopologyDriver driver) {
353    if (debugEnabled) log.debug("adding driver {}, uuid={}", driver, driver.getUuid());
354    TopologyDriver other = null;
355    synchronized(driversLock) {
356      other = driverMap.get(driver.getUuid());
357      if (debugEnabled && (other != null)) log.debug("driver already exists with same uuid: {}", other);
358      if (other == null) {
359        other = driverMap.get(driver.getManagementInfo().toDisplayString());
360        if (debugEnabled && (other != null)) log.debug("driver already exists with same jmx id: {}", other);
361      }
362    }
363    if (other != null) {
364      driverRemoved(other);
365    }
366    synchronized(driversLock) {
367      driverMap.put(driver.getUuid(), driver);
368    }
369    final TopologyEvent event = new TopologyEvent(this, driver, null, TopologyEvent.UpdateType.TOPOLOGY);
370    dispatchEvent(event, TopologyEvent.Type.DRIVER_ADDED);
371  }
372
373  /**
374   * Notify all listeners that a driver was added.
375   * @param driver the driver that was removed.
376   */
377  void driverRemoved(final TopologyDriver driver) {
378    if (debugEnabled) log.debug("removing driver {}", driver);
379    final JPPFClientConnection c = driver.getConnection();
380    final ClientConnectionStatusListener listener = statusListenerMap.remove(c);
381    if (listener != null) c.removeClientConnectionStatusListener(listener);
382    for (final AbstractTopologyComponent child: driver.getChildren()) nodeRemoved(driver, (TopologyNode) child);
383    synchronized(driversLock) {
384      driverMap.remove(driver.getUuid());
385    }
386    final TopologyEvent event = new TopologyEvent(this, driver, null, TopologyEvent.UpdateType.TOPOLOGY);
387    dispatchEvent(event, TopologyEvent.Type.DRIVER_REMOVED);
388  }
389
390  /**
391   * Notify all listeners that the state of a driver has changed.
392   * @param driver the driver to add.
393   * @param updateType the type of update.
394   */
395  void driverUpdated(final TopologyDriver driver, final TopologyEvent.UpdateType updateType) {
396    final TopologyEvent event = new TopologyEvent(this, driver, null, updateType);
397    dispatchEvent(event, TopologyEvent.Type.DRIVER_UPDATED);
398  }
399
400  /**
401   * Notify all listeners that a driver was added.
402   * @param driver the driver to which the node is attached.
403   * @param node the node that was added.
404   */
405  void nodeAdded(final TopologyDriver driver, final TopologyNode node) {
406    if (debugEnabled) log.debug("adding {} {} to driver {}", node.isPeer() ? "peer" : "node", node, driver);
407    if (node.isNode()) {
408      final TopologyNode other = getNodeOrPeer(node.getUuid());
409      if (other != null) nodeRemoved((TopologyDriver) other.getParent(), other);
410    }
411    driver.add(node);
412    if (node.isNode()) {
413      synchronized(nodeMap) {
414        nodeMap.put(node.getUuid(), node);
415      }
416    } else {
417      synchronized(peerMap) {
418        peerMap.put(node.getUuid(), (TopologyPeer) node);
419      }
420    }
421    final TopologyEvent event = new TopologyEvent(this, driver, node, TopologyEvent.UpdateType.TOPOLOGY);
422    dispatchEvent(event, TopologyEvent.Type.NODE_ADDED);
423  }
424
425  /**
426   * Notify all listeners that a node was removed.
427   * @param driver the driver to which the node is attached.
428   * @param node the node that was removed.
429   */
430  void nodeRemoved(final TopologyDriver driver, final TopologyNode node) {
431    if (debugEnabled) log.debug("removing {} {} from driver {}", (node.isNode() ? "node" : "peer"), node, driver);
432    driver.remove(node);
433    if (node.isNode()) {
434      synchronized(nodeMap) {
435        nodeMap.remove(node.getUuid());
436      }
437    } else {
438      synchronized(peerMap) {
439        peerMap.remove(node.getUuid());
440      }
441    }
442    final TopologyEvent event = new TopologyEvent(this, driver, node, TopologyEvent.UpdateType.TOPOLOGY);
443    dispatchEvent(event, TopologyEvent.Type.NODE_REMOVED);
444  }
445
446  /**
447   * Notify all listeners that a driver was added.
448   * @param driverData the driver that was updated or to which the updated node is attached.
449   * @param node the node that was updated, or <code>null</code> if it is a driver that was updated.
450   * @param updateType the type of update.
451   */
452  void nodeUpdated(final TopologyDriver driverData, final TopologyNode node, final TopologyEvent.UpdateType updateType) {
453    final TopologyEvent event = new TopologyEvent(this, driverData, node, updateType);
454    dispatchEvent(event, TopologyEvent.Type.NODE_UPDATED);
455  }
456
457  /**
458   * Dispatch the specified event to all listeners.
459   * @param event the event to dispatch.
460   * @param type the type of event.
461   */
462  private void dispatchEvent(final TopologyEvent event, final TopologyEvent.Type type) {
463    final Runnable dispatchTask = new Runnable() {
464      @Override public void run() {
465        if (log.isTraceEnabled()) log.trace("dispatching event type={} : {}", type, event);
466        switch (type) {
467          case DRIVER_ADDED: for (TopologyListener listener: listeners) listener.driverAdded(event);
468          break;
469          case DRIVER_REMOVED: for (TopologyListener listener: listeners) listener.driverRemoved(event);
470          break;
471          case DRIVER_UPDATED: for (TopologyListener listener: listeners) listener.driverUpdated(event);
472          break;
473          case NODE_ADDED: for (TopologyListener listener: listeners) listener.nodeAdded(event);
474          break;
475          case NODE_REMOVED: for (TopologyListener listener: listeners) listener.nodeRemoved(event);
476          break;
477          case NODE_UPDATED: for (TopologyListener listener: listeners) listener.nodeUpdated(event);
478          break;
479        }
480      }
481    };
482    executor.execute(dispatchTask);
483  }
484
485  /**
486   * Get the JPPF client.
487   * @return a {@link JPPFClient} object.
488   */
489  public JPPFClient getJPPFClient() {
490    return client;
491  }
492
493  /**
494   * Listens for the status of a driver connection and updates the tree accordingly.
495   */
496  private class StatusListener implements ClientConnectionStatusListener {
497    @Override
498    public void statusChanged(final ClientConnectionStatusEvent event) {
499      final JPPFClientConnection c = (JPPFClientConnection) event.getClientConnectionStatusHandler();
500      final JPPFClientConnectionStatus newStatus = c.getStatus();
501      final JPPFClientConnectionStatus oldStatus = event.getOldStatus();
502      if (newStatus.isWorkingStatus() && !oldStatus.isWorkingStatus()) {
503        final TopologyDriver driver = new TopologyDriver(c);
504        if (debugEnabled) log.debug("before adding driver {}", driver);
505        driverAdded(driver);
506      } else if (!newStatus.isWorkingStatus() && (c.getDriverUuid() != null)) {
507        final TopologyDriver driver = getDriver(c.getDriverUuid());
508        if (driver != null) {
509          if (oldStatus.isWorkingStatus()) {
510            for (final AbstractTopologyComponent child: driver.getChildren()) nodeRemoved(driver, (TopologyNode) child);
511          }
512          if (newStatus.isTerminatedStatus() && !oldStatus.isTerminatedStatus()) driverRemoved(driver);
513        }
514      }
515    }
516  }
517
518  /**
519   * Get the node selector used to filter the nodes and associated events.
520   * @return a {@link NodeSelector} instance.
521   * @since 5.2
522   */
523  public synchronized NodeSelector getNodeFilter() {
524    return nodeFilter;
525  }
526
527  /**
528   * Set the node selector used to filter the nodes and associated events.
529   * @param nodeFilter a {@link NodeSelector} instance.
530   * @since 5.2
531   */
532  public synchronized void setNodeFilter(final NodeSelector nodeFilter) {
533    this.nodeFilter = nodeFilter;
534  }
535
536  @Override
537  public void close() {
538    refreshHandler.stopRefreshTimer();;
539    jvmHealthRefreshHandler.stopRefreshTimer();;
540    listeners.clear();
541    client.removeConnectionPoolListener(this);
542  }
543}