001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.monitoring.topology;
020
021import java.util.*;
022import java.util.concurrent.*;
023
024import org.jppf.client.*;
025import org.jppf.client.event.*;
026import org.jppf.management.NodeSelector;
027import org.jppf.utils.*;
028import org.jppf.utils.configuration.JPPFProperties;
029import org.slf4j.*;
030
031/**
032 * Instances of this class discover and maintain a representation of a JPPF grid topology.
033 * @author Laurent Cohen
034 * @since 5.0
035 */
036public class TopologyManager extends ConnectionPoolListenerAdapter {
037  /**
038   * Logger for this class.
039   */
040  static Logger log = LoggerFactory.getLogger(TopologyManager.class);
041  /**
042   * Determines whether debug log statements are enabled.
043   */
044  static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
045  /**
046   * The JPPF configuration.
047   */
048  private static TypedProperties config = JPPFConfiguration.getProperties();
049  /**
050   * Mapping of driver uuids to the corresponding {@link TopologyDriver} objects.
051   */
052  private final Map<String, TopologyDriver> driverMap = new Hashtable<>();
053  /**
054   * Synchronizationlock.
055   */
056  private final Object driversLock = new Object();
057  /**
058   * Mapping of peer driver uuids to the corresponding {@link TopologyPeer} objects.
059   */
060  private final Map<String, TopologyPeer> peerMap = new Hashtable<>();
061  /**
062   * Mapping of node uuids to the corresponding {@link TopologyNode} objects.
063   */
064  private final Map<String, TopologyNode> nodeMap = new Hashtable<>();
065  /**
066   *
067   */
068  private final Map<JPPFClientConnection, ClientConnectionStatusListener> statusListenerMap = new Hashtable<>();
069  /**
070   * List of listeners to the changes in the topology.
071   */
072  private final List<TopologyListener> listeners = new CopyOnWriteArrayList<>();
073  /**
074   * Separate thread used to sequentialize events emitted by this topology manager.
075   */
076  private ExecutorService executor = Executors.newSingleThreadExecutor(new JPPFThreadFactory("TopologyEvents"));
077  /**
078   * The JPPF client.
079   */
080  private final JPPFClient client;
081  /**
082   * Refreshes the states of the nodes at rehular intervals.
083   */
084  final NodeRefreshHandler refreshHandler;
085  /**
086   * Refreshes the latests JVM health snapshots of the drivers and nodes at rehular intervals.
087   */
088  final JVMHealthRefreshHandler jvmHealthRefreshHandler;
089  /**
090   * Use to filter the nodes and associated events.
091   * @since 5.2
092   */
093  private NodeSelector nodeFilter;
094
095  /**
096   * Initialize this topology manager with a new {@link JPPFClient} and the specified listeners.
097   * The refresh intervals are determined from the configuration, or take a default value of 1000L if they are not configured.
098   * @param listeners a set of listeners to subscribe immediately for topology events.
099   */
100  public TopologyManager(final TopologyListener...listeners) {
101    this(null, listeners);
102  }
103
104  /**
105   * Initialize this topology manager with a new {@link JPPFClient} and the specified listeners.
106   * @param topologyRefreshInterval the interval in millis between refreshes of the topology.
107   * @param jvmHealthRefreshInterval the interval in millis between refreshes of the JVM health data.
108   * @param listeners a set of listeners to subscribe immediately for topology events.
109   */
110  public TopologyManager(final long topologyRefreshInterval, final long jvmHealthRefreshInterval, final TopologyListener...listeners) {
111    this(topologyRefreshInterval, jvmHealthRefreshInterval, null, listeners);
112  }
113
114  /**
115   * Initialize this topology manager with the specified {@link JPPFClient} and listeners.
116   * The refresh intervals are determined from the configuration, or take a default value of 1000L if they are not configured.
117   * @param client the JPPF client used to discover and monitor the grid topology.
118   * @param listeners a set of listeners to subscribe immediately for topology events.
119   */
120  public TopologyManager(final JPPFClient client, final TopologyListener...listeners) {
121    this(config.get(JPPFProperties.ADMIN_REFRESH_INTERVAL_TOPOLOGY), config.get(JPPFProperties.ADMIN_REFRESH_INTERVAL_HEALTH), client, listeners);
122  }
123
124  /**
125   * Initialize this topology manager with the specified {@link JPPFClient} and listeners.
126   * @param topologyRefreshInterval the interval in millis between refreshes of the topology.
127   * @param jvmHealthRefreshInterval the interval in millis between refreshes of the JVM health data.
128   * @param client the JPPF client used to discover and monitor the grid topology.
129   * @param listeners a set of listeners to subscribe immediately for topology events.
130   */
131  public TopologyManager(final long topologyRefreshInterval, final long jvmHealthRefreshInterval, final JPPFClient client, final TopologyListener...listeners) {
132    this.refreshHandler = new NodeRefreshHandler(this, topologyRefreshInterval);
133    this.jvmHealthRefreshHandler = new JVMHealthRefreshHandler(this, jvmHealthRefreshInterval);
134    this.client = (client == null) ? new JPPFClient(this) : client;
135    if (client != null) client.addConnectionPoolListener(this);
136    if (listeners != null) for (TopologyListener listener: listeners) addTopologyListener(listener);
137    init();
138  }
139
140  /**
141   * Intialize the topology tree.
142   */
143  private void init() {
144    for (JPPFConnectionPool pool: client.getConnectionPools()) {
145      List<JPPFClientConnection> list = pool.getConnections(JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
146      if (list.isEmpty()) list = pool.getConnections();
147      if (!list.isEmpty()) {
148        JPPFClientConnection c = list.get(0);
149        connectionAdded(new ConnectionPoolEvent(pool, c));
150      }
151    }
152  }
153
154  /**
155   * Get the drivers currently handled.
156   * @return a list of {@link TopologyDriver} instances.
157   */
158  public List<TopologyDriver> getDrivers() {
159    synchronized(driverMap) {
160      return new ArrayList<>(driverMap.values());
161    }
162  }
163
164  /**
165   * Get the driver with the specified uuid.
166   * @param uuid the uuid of the driver to lookup.
167   * @return a {@link TopologyDriver} instance.
168   */
169  public TopologyDriver getDriver(final String uuid) {
170    return driverMap.get(uuid);
171  }
172
173  /**
174   * Get the drivers currently handled.
175   * @return a list of {@link TopologyNode} instances.
176   */
177  public List<TopologyNode> getNodes() {
178    synchronized(nodeMap) {
179      return new ArrayList<>(nodeMap.values());
180    }
181  }
182
183  /**
184   * Get the driver with the specified uuid.
185   * @param uuid the uuid of the driver to lookup.
186   * @return a {@link TopologyNode} instance.
187   */
188  public TopologyNode getNode(final String uuid) {
189    return nodeMap.get(uuid);
190  }
191
192  /**
193   * Get the peers currently handled.
194   * @return a list of {@link TopologyPeer} instances.
195   */
196  public List<TopologyPeer> getPeers() {
197    synchronized(peerMap) {
198      return new ArrayList<>(peerMap.values());
199    }
200  }
201
202  /**
203   * Get the peer with the specified uuid.
204   * @param uuid the uuid of the driver to lookup.
205   * @return a {@link TopologyPeer} instance.
206   */
207  public TopologyPeer getPeer(final String uuid) {
208    return peerMap.get(uuid);
209  }
210
211  /**
212   * Get the number of drivers currently handled.
213   * @return the number of drivers.
214   */
215  public int getDriverCount() {
216    return driverMap.size();
217  }
218
219  /**
220   * Get the number of nodes currently handled.
221   * @return the number of nodes.
222   */
223  public int getNodeCount() {
224    return nodeMap.size();
225  }
226
227  /**
228   * Get the number of peers currently handled.
229   * @return the number of peers.
230   */
231  public int getPeerCount() {
232    return peerMap.size();
233  }
234
235  /**
236   * Get the node with the psecified uuid.
237   * @param uuid the uuid of the node to lookup.
238   * @return a {@link TopologyNode} instance.
239   */
240  public TopologyNode getNodeOrPeer(final String uuid) {
241    TopologyNode node = nodeMap.get(uuid);
242    if (node == null) node = peerMap.get(uuid);
243    return node;
244  }
245
246  /**
247   * {@inheritDoc}}
248   * @exclude
249   */
250  @Override
251  public void connectionAdded(final ConnectionPoolEvent event) {
252    final JPPFClientConnection c = event.getConnection();
253    StatusListener listener = new StatusListener();
254    if (c.getStatus().isWorkingStatus()) {
255      TopologyDriver driver = new TopologyDriver(c);
256      if (debugEnabled) log.debug("before adding driver {}", driver);
257      driverAdded(driver);
258    }
259    statusListenerMap.put(c, listener);
260    c.addClientConnectionStatusListener(listener);
261  }
262
263  /**
264   * {@inheritDoc}}
265   * @exclude
266   */
267  @Override
268  public void connectionRemoved(final ConnectionPoolEvent event) {
269    final JPPFClientConnection c = event.getConnection();
270    String uuid = c.getDriverUuid();
271    if (uuid != null) {
272      TopologyDriver driver = driverMap.remove(uuid);
273      if (driver != null) driverRemoved(driver);
274    }
275    StatusListener listener = (StatusListener) statusListenerMap.remove(c);
276    if (listener != null) c.removeClientConnectionStatusListener(listener);
277  }
278
279  /**
280   * Add a topology change listener.
281   * @param listener the listener to add.
282   */
283  public void addTopologyListener(final TopologyListener listener) {
284    if (listener == null) throw new IllegalArgumentException("cannot add a null listener");
285    listeners.add(listener);
286  }
287
288  /**
289   * Remove a topology change listener.
290   * @param listener the listener to remove.
291   */
292  public void removeTopologyListener(final TopologyListener listener) {
293    if (listener == null) throw new IllegalArgumentException("cannot remove a null listener");
294    listeners.remove(listener);
295  }
296
297  /**
298   * Notify all listeners that a driver was added.
299   * @param driver the driver to add.
300   */
301  void driverAdded(final TopologyDriver driver) {
302    if (debugEnabled) log.debug("adding driver {}, uuid={}", driver, driver.getUuid());
303    TopologyDriver other = null;
304    synchronized(driversLock) {
305      other = driverMap.get(driver.getUuid());
306      if (debugEnabled && (other != null)) log.debug("driver already exists with same uuid: {}", other);
307      if (other == null) {
308        other = driverMap.get(driver.getManagementInfo().toDisplayString());
309        if (debugEnabled && (other != null)) log.debug("driver already exists with same jmx id: {}", other);
310      }
311    }
312    if (other != null) {
313      driverRemoved(other);
314    }
315    synchronized(driversLock) {
316      driverMap.put(driver.getUuid(), driver);
317    }
318    TopologyEvent event = new TopologyEvent(this, driver, null, TopologyEvent.UpdateType.TOPOLOGY);
319    dispatchEvent(event, TopologyEvent.Type.DRIVER_ADDED);
320  }
321
322  /**
323   * Notify all listeners that a driver was added.
324   * @param driver the driver that was removed.
325   */
326  void driverRemoved(final TopologyDriver driver) {
327    if (debugEnabled) log.debug("removing driver {}", driver);
328    //if (driverMap.get(driver.getUuid()) == null) return;
329    JPPFClientConnection c = driver.getConnection();
330    ClientConnectionStatusListener listener = statusListenerMap.remove(c);
331    if (listener != null) c.removeClientConnectionStatusListener(listener);
332    for (AbstractTopologyComponent child: driver.getChildren()) nodeRemoved(driver, (TopologyNode) child);
333    synchronized(driversLock) {
334      driverMap.remove(driver.getUuid());
335    }
336    TopologyEvent event = new TopologyEvent(this, driver, null, TopologyEvent.UpdateType.TOPOLOGY);
337    dispatchEvent(event, TopologyEvent.Type.DRIVER_REMOVED);
338  }
339
340  /**
341   * Notify all listeners that the state of a driver has changed.
342   * @param driver the driver to add.
343   * @param updateType the type of update.
344   */
345  void driverUpdated(final TopologyDriver driver, final TopologyEvent.UpdateType updateType) {
346    TopologyEvent event = new TopologyEvent(this, driver, null, updateType);
347    dispatchEvent(event, TopologyEvent.Type.DRIVER_UPDATED);
348  }
349
350  /**
351   * Notify all listeners that a driver was added.
352   * @param driver the driver to which the node is attached.
353   * @param node the node that was added.
354   */
355  void nodeAdded(final TopologyDriver driver, final TopologyNode node) {
356    if (debugEnabled) log.debug(String.format("adding %s %s to driver %s", node.isPeer() ? "peer" : "node", node, driver));
357    if (node.isNode()) {
358      TopologyNode other = getNodeOrPeer(node.getUuid());
359      if (other != null) nodeRemoved((TopologyDriver) other.getParent(), other);
360    }
361    driver.add(node);
362    if (node.isNode()) nodeMap.put(node.getUuid(), node);
363    else peerMap.put(node.getUuid(), (TopologyPeer) node);
364    TopologyEvent event = new TopologyEvent(this, driver, node, TopologyEvent.UpdateType.TOPOLOGY);
365    dispatchEvent(event, TopologyEvent.Type.NODE_ADDED);
366  }
367
368  /**
369   * Notify all listeners that a node was removed.
370   * @param driver the driver to which the node is attached.
371   * @param node the node that was removed.
372   */
373  void nodeRemoved(final TopologyDriver driver, final TopologyNode node) {
374    if (debugEnabled) log.debug(String.format("removing %s %s from driver %s", (node.isNode() ? "node" : "peer"), node, driver));
375    driver.remove(node);
376    TopologyEvent event = null;
377    if (node.isNode()) nodeMap.remove(node.getUuid());
378    else peerMap.remove(node.getUuid());
379    event = new TopologyEvent(this, driver, node, TopologyEvent.UpdateType.TOPOLOGY);
380    dispatchEvent(event, TopologyEvent.Type.NODE_REMOVED);
381  }
382
383  /**
384   * Notify all listeners that a driver was added.
385   * @param driverData the driver that was updated or to which the updated node is attached.
386   * @param node the node that was updated, or <code>null</code> if it is a driver that was updated.
387   * @param updateType the type of update.
388   */
389  void nodeUpdated(final TopologyDriver driverData, final TopologyNode node, final TopologyEvent.UpdateType updateType) {
390    TopologyEvent event = new TopologyEvent(this, driverData, node, updateType);
391    dispatchEvent(event, TopologyEvent.Type.NODE_UPDATED);
392  }
393
394  /**
395   * Dispatch the specified event to all listeners.
396   * @param event the event to dispatch.
397   * @param type the type of event.
398   */
399  private void dispatchEvent(final TopologyEvent event, final TopologyEvent.Type type) {
400    Runnable dispatchTask = new Runnable() {
401      @Override public void run() {
402        if (log.isTraceEnabled()) log.trace("dispatching event type={} : {}", type, event);
403        switch (type) {
404          case DRIVER_ADDED: for (TopologyListener listener: listeners) listener.driverAdded(event);
405          break;
406          case DRIVER_REMOVED: for (TopologyListener listener: listeners) listener.driverRemoved(event);
407          break;
408          case DRIVER_UPDATED: for (TopologyListener listener: listeners) listener.driverUpdated(event);
409          break;
410          case NODE_ADDED: for (TopologyListener listener: listeners) listener.nodeAdded(event);
411          break;
412          case NODE_REMOVED: for (TopologyListener listener: listeners) listener.nodeRemoved(event);
413          break;
414          case NODE_UPDATED: for (TopologyListener listener: listeners) listener.nodeUpdated(event);
415          break;
416        }
417      }
418    };
419    executor.submit(dispatchTask);
420  }
421
422  /**
423   * Get the JPPF client.
424   * @return a {@link JPPFClient} object.
425   */
426  public JPPFClient getJPPFClient() {
427    return client;
428  }
429
430  /**
431   * Listens for the status of a driver connection and updates the tree accordingly.
432   */
433  private class StatusListener implements ClientConnectionStatusListener {
434    @Override
435    public void statusChanged(final ClientConnectionStatusEvent event) {
436      JPPFClientConnection c = (JPPFClientConnection) event.getClientConnectionStatusHandler();
437      JPPFClientConnectionStatus newStatus = c.getStatus();
438      JPPFClientConnectionStatus oldStatus = event.getOldStatus();
439      if (newStatus.isWorkingStatus() && !oldStatus.isWorkingStatus()) {
440        TopologyDriver driver = new TopologyDriver(c);
441        if (debugEnabled) log.debug("before adding driver {}", driver);
442        driverAdded(driver);
443      } else if (!newStatus.isWorkingStatus() && oldStatus.isWorkingStatus()) {
444        String uuid = c.getDriverUuid();
445        TopologyDriver driver = getDriver(uuid);
446        if (driver != null) {
447          for (AbstractTopologyComponent child: driver.getChildren()) nodeRemoved(driver, (TopologyNode) child);
448          //driverRemoved(driver);
449        }
450      }
451    }
452  }
453
454  /**
455   * Get the node selector used to filter the nodes and associated events.
456   * @return a {@link NodeSelector} instance.
457   * @since 5.2
458   */
459  public synchronized NodeSelector getNodeFilter() {
460    return nodeFilter;
461  }
462
463  /**
464   * Set the node selector used to filter the nodes and associated events.
465   * @param nodeFilter a {@link NodeSelector} instance.
466   * @since 5.2
467   */
468  public synchronized void setNodeFilter(final NodeSelector nodeFilter) {
469    this.nodeFilter = nodeFilter;
470  }
471}