001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.monitoring.jobs;
020
021import static org.jppf.client.monitoring.jobs.JobMonitoringEvent.Type.*;
022
023import java.util.*;
024import java.util.concurrent.CopyOnWriteArrayList;
025
026import org.jppf.client.monitoring.topology.*;
027import org.jppf.job.JobInformation;
028import org.jppf.utils.ExceptionUtils;
029import org.jppf.utils.collections.*;
030import org.slf4j.*;
031
032/**
033 * This class provides a representation of the jobs and corresponding node dispatches executing in a JPPF grid.
034 * @author Laurent Cohen
035 * @since 5.1
036 */
037public class JobMonitor extends TopologyListenerAdapter implements JobMonitoringHandler {
038  /**
039   * Logger for this class.
040   */
041  private static Logger log = LoggerFactory.getLogger(JobMonitor.class);
042  /**
043   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
044   */
045  private static boolean debugEnabled = log.isDebugEnabled();
046  /**
047   * The topology manager.
048   */
049  private final TopologyManager topologyManager;
050  /**
051   * Mapping of job drivers to their uuid.
052   */
053  private final Map<String, JobDriver> driverMap = new HashMap<>();
054  /**
055   * Mapping of each job to all the drivers it was submitted to.
056   */
057  private final CollectionMap<String, JobDriver> jobDriverMap = new SetHashMap<>();
058  /**
059   * For synchronized access and modifications.
060   */
061  private final Object lock = new Object();
062  /**
063   * The list of {@link JobMonitoringListener}s.
064   */
065  private final List<JobMonitoringListener> listeners = new CopyOnWriteArrayList<>();
066  /**
067   * The object that receives information on the jobs and publishes it a job monitoring events.
068   */
069  final AutoCloseable refreshHandler;
070  /**
071   * Initialize this job manager with the specified topology manager in {@link JobMonitorUpdateMode#IMMEDIATE_NOTIFICATIONS IMMEDIATE_NOTFICATIONS} mode.
072   * @param topologyManager the topology manager to use.
073   * @param listeners optional listeners that can be registered immediately with this job monitor.
074   */
075  public JobMonitor(final TopologyManager topologyManager, final JobMonitoringListener...listeners) {
076    this(JobMonitorUpdateMode.IMMEDIATE_NOTIFICATIONS, 0L, topologyManager, listeners);
077  }
078
079  /**
080   * Initialize this job manager with the specified topology manager and event mode.
081   * @param updateMode the update mode which determines how job monitoring events are generated.
082   * @param period the interval between publications of updates. This is not used in {@link JobMonitorUpdateMode#IMMEDIATE_NOTIFICATIONS IMMEDIATE_NOTFICATIONS} mode.
083   * @param topologyManager the topology manager to use.
084   * @param listeners optional listeners that can be registered immediately with this job monitor.
085   */
086  public JobMonitor(final JobMonitorUpdateMode updateMode, final long period, final TopologyManager topologyManager, final JobMonitoringListener...listeners) {
087    if (debugEnabled) log.debug("initializing job monitor in {} mode with period = {}", updateMode, period);
088    this.topologyManager = topologyManager;
089    if (listeners != null) {
090      for (JobMonitoringListener listener: listeners) addJobMonitoringListener(listener);
091    }
092    for (TopologyDriver driver: topologyManager.getDrivers()) driverAdded(new JobDriver(driver));
093    topologyManager.addTopologyListener(this);
094    switch(updateMode) {
095      case POLLING:
096        refreshHandler = new JobPollingHandler(this, "JobRefreshHandler", period);
097        break;
098      case DEFERRED_NOTIFICATIONS:
099        refreshHandler = new DeferredJobNotificationsHandler(this, "JobRefreshHandler", period);
100        break;
101      case IMMEDIATE_NOTIFICATIONS:
102      default:
103        refreshHandler = new ImmediateJobNotificationsHandler(this);
104        break;
105    }
106  }
107
108  /**
109   * Get the topology manager associated with this job monitor.
110   * @return a {@link TopologyManager} object.
111   */
112  public TopologyManager getTopologyManager() {
113    return topologyManager;
114  }
115
116  /**
117   * Get the driver with the specified uuid.
118   * @param driverUuid the uuid of the driver to retrieve.
119   * @return a {@link JobDriver} object, or {@code null} if there si no such driver.
120   */
121  public JobDriver getJobDriver(final String driverUuid) {
122    synchronized(driverMap) {
123      return driverMap.get(driverUuid);
124    }
125  }
126
127  /**
128   * Get the drivers monitored by this job monitor.
129   * @return a list of {@link JobDriver} objects.
130   */
131  public List<JobDriver> getJobDrivers() {
132    synchronized(driverMap) {
133      return new ArrayList<>(driverMap.values());
134    }
135  }
136
137  /**
138   * Get the drivers to which a job was submitted, either in parallel from the same client, or from one driver to another in a multi-driver topology.
139   * @param jobUuid the uuid of the job for which to find the drivers.
140   * @return a list of {@link JobDriver} instances, possibly empty.
141   */
142  public List<JobDriver> getDriversForJob(final String jobUuid) {
143    final List<JobDriver> result = new ArrayList<>();
144    synchronized(lock) {
145      final Collection<JobDriver> drivers = jobDriverMap.getValues(jobUuid);
146      if (drivers != null) result.addAll(drivers);
147    }
148    return result;
149  }
150
151  /**
152   * Get the dispatches of the specified job accrosss the entire topology. This will return a list of dispatches where the nodes may be attached to more than one server.
153   * @param jobUuid the uuid of the job for which to find the dispatches.
154   * @return a list of {@link JobDispatch} instances, possibly empty.
155   */
156  public List<JobDispatch> getAllJobDispatches(final String jobUuid) {
157    final List<JobDispatch> result = new ArrayList<>();
158    synchronized(lock) {
159      final Collection<JobDriver> drivers = jobDriverMap.getValues(jobUuid);
160      if (drivers != null) {
161        for (final JobDriver driver: drivers) {
162          final Job job = driver.getJob(jobUuid);
163          if (job != null) result.addAll(job.getJobDispatches());
164        }
165      }
166    }
167    return result;
168  }
169
170  /**
171   * Add a listener to the events emitted by this job monitor.
172   * @param listener the listener to add.
173   */
174  public void addJobMonitoringListener(final JobMonitoringListener listener) {
175    if (listener != null) listeners.add(listener);
176  }
177
178  /**
179   * Remove a listener to the events emitted by this job monitor.
180   * @param listener the listener to remove.
181   */
182  public void removeJobMonitoringListener(final JobMonitoringListener listener) {
183    if (listener != null) listeners.remove(listener);
184  }
185
186  /**
187   * {@inheritDoc}
188   * @exclude
189   */
190  @Override
191  public void driverAdded(final TopologyEvent event) {
192    driverAdded(new JobDriver(event.getDriver()));
193  }
194
195  /**
196   * Called when a driver is added.
197   * @param driver the driver to add.
198   */
199  void driverAdded(final JobDriver driver) {
200    if (debugEnabled) log.debug("driver {} added", driver.getDisplayName());
201    synchronized(lock) {
202      if (driverMap.get(driver.getUuid()) != null) return;
203      driverMap.put(driver.getUuid(), driver);
204    }
205    dispatchEvent(DRIVER_ADDED, new JobMonitoringEvent(this, driver, null, null));
206  }
207
208  /**
209   * {@inheritDoc}
210   * @exclude
211   */
212  @Override
213  public void driverRemoved(final TopologyEvent event) {
214    final String uuid = event.getDriver().getUuid();
215    JobDriver driver = null;
216    synchronized(lock) {
217      driver = driverMap.get(uuid);
218    }
219    if (driver != null) driverRemoved(driver);
220  }
221
222  /**
223   * Called when a driver is removed.
224   * @param driver the driver to remove.
225   */
226  void driverRemoved(final JobDriver driver) {
227    if (debugEnabled) log.debug("driver {} removed", driver.getDisplayName());
228    synchronized(lock) {
229      driverMap.remove(driver.getUuid());
230    }
231    dispatchEvent(DRIVER_REMOVED, new JobMonitoringEvent(this, driver, null, null));
232  }
233
234  /**
235   * Called when a job is added to the specified driver.
236   * @param driver the driver where the job was received.
237   * @param job the job to add.
238   */
239  void jobAdded(final JobDriver driver, final Job job) {
240    if (debugEnabled) log.debug("job '{}' added to driver {}", job.getDisplayName(), driver.getDisplayName());
241    driver.add(job);
242    synchronized(lock) {
243      jobDriverMap.putValue(job.getUuid(), driver);
244    }
245    dispatchEvent(JOB_ADDED, new JobMonitoringEvent(this, driver, job, null));
246  }
247
248  /**
249   * Called when a job is removed from the specified driver.
250   * @param driver the driver where the job was received.
251   * @param job the job to remove.
252   */
253  void jobRemoved(final JobDriver driver, final Job job) {
254    if (debugEnabled) log.debug("job '{}' removed from driver {}", job.getDisplayName(), driver.getDisplayName());
255    if (job != null) {
256      driver.remove(job);
257      synchronized(lock) {
258        jobDriverMap.removeValue(job.getUuid(), driver);
259      }
260      dispatchEvent(JOB_REMOVED, new JobMonitoringEvent(this, driver, job, null));
261    }
262  }
263
264  /**
265   * Called when a job in the specified driver is updated.
266   * @param driver the driver where the job was received.
267   * @param job the job to update.
268   */
269  void jobUpdated(final JobDriver driver, final Job job) {
270    if (debugEnabled) log.debug("job '{}' updated in driver {}", job.getDisplayName(), driver.getDisplayName());
271    dispatchEvent(JOB_UPDATED, new JobMonitoringEvent(this, driver, job, null));
272  }
273
274  /**
275   * Called when a job in the specified driver is dispatched to a node.
276   * @param driver the driver where the job was received.
277   * @param job the job to update.
278   * @param dispatch the job dispatch to add.
279   */
280  void dispatchAdded(final JobDriver driver, final Job job, final JobDispatch dispatch) {
281    if (debugEnabled) log.debug("adding dispatch {} to job {}", dispatch, job);
282    if (job != null) {
283      job.add(dispatch);
284      dispatchEvent(DISPATCH_ADDED, new JobMonitoringEvent(this, driver, job, dispatch));
285    }
286  }
287
288  /**
289   * Called when a job in the specified driver returns from a node.
290   * @param driver the driver where the job was received.
291   * @param job the job to update.
292   * @param dispatch the job dispatch to remove.
293   */
294  void dispatchRemoved(final JobDriver driver, final Job job, final JobDispatch dispatch) {
295    if (debugEnabled) log.debug("removing dispatch {} from job '{}'", (dispatch == null ? "null" : dispatch.getDisplayName()), job.getDisplayName());
296    if (dispatch != null) {
297      job.remove(dispatch);
298      dispatchEvent(DISPATCH_REMOVED, new JobMonitoringEvent(this, driver, job, dispatch));
299    }
300  }
301
302  /**
303   * Dispatch the specified event to all registered listeners.
304   * @param type the type of event to dispatch.
305   * @param event the event to dispatch.
306   */
307  void dispatchEvent(final JobMonitoringEvent.Type type, final JobMonitoringEvent event) {
308    try {
309      switch(type) {
310        case DRIVER_ADDED:
311          for (JobMonitoringListener listener: listeners) listener.driverAdded(event);
312          break;
313  
314        case DRIVER_REMOVED:
315          for (JobMonitoringListener listener: listeners) listener.driverRemoved(event);
316          break;
317  
318        case JOB_ADDED:
319          for (JobMonitoringListener listener: listeners) listener.jobAdded(event);
320          break;
321  
322        case JOB_REMOVED:
323          for (JobMonitoringListener listener: listeners) listener.jobRemoved(event);
324          break;
325  
326        case JOB_UPDATED:
327          for (JobMonitoringListener listener: listeners) listener.jobUpdated(event);
328          break;
329  
330        case DISPATCH_ADDED:
331          for (JobMonitoringListener listener: listeners) listener.jobDispatchAdded(event);
332          break;
333  
334        case DISPATCH_REMOVED:
335          for (JobMonitoringListener listener: listeners) listener.jobDispatchRemoved(event);
336          break;
337  
338        default:
339          break;
340      }
341    } catch(final Exception e) {
342      log.error("error dispatching event of type {}, event={}, exception: {}", type, event, ExceptionUtils.getStackTrace(e));
343    }
344  }
345
346  /**
347   * Detemrine whether a job was updated by comapring its current and latest informtion.
348   * @param oldJob the current job information.
349   * @param newJob the latest job information received from the driver.
350   * @return {@code true} if any of the job information changed, {@code false} otherwise.
351   */
352  boolean isJobUpdated(final JobInformation oldJob, final JobInformation newJob) {
353    return (oldJob.getTaskCount() != newJob.getTaskCount()) || (oldJob.getMaxNodes() != newJob.getMaxNodes()) ||
354        (oldJob.getPriority() != newJob.getPriority()) || (oldJob.isSuspended() ^ newJob.isSuspended()) || (oldJob.isPending() ^ newJob.isPending());
355  }
356
357  @Override
358  public void close() {
359    try {
360      listeners.clear();
361      refreshHandler.close();
362    } catch (final Exception e) {
363      log.error(e.getMessage(), e);
364    }
365  }
366}