001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.server;
019
020import static org.jppf.utils.stats.JPPFStatisticsHelper.createServerStatistics;
021
022import java.util.*;
023
024import org.jppf.*;
025import org.jppf.classloader.*;
026import org.jppf.comm.discovery.JPPFConnectionInformation;
027import org.jppf.comm.recovery.*;
028import org.jppf.discovery.*;
029import org.jppf.job.*;
030import org.jppf.logging.jmx.JmxMessageNotifier;
031import org.jppf.management.*;
032import org.jppf.nio.*;
033import org.jppf.node.initialization.OutputRedirectHook;
034import org.jppf.node.protocol.JPPFDistributedJob;
035import org.jppf.process.LauncherListener;
036import org.jppf.serialization.ObjectSerializer;
037import org.jppf.server.job.JPPFJobManager;
038import org.jppf.server.nio.acceptor.AcceptorNioServer;
039import org.jppf.server.nio.classloader.LocalClassContext;
040import org.jppf.server.nio.classloader.client.ClientClassNioServer;
041import org.jppf.server.nio.classloader.node.NodeClassNioServer;
042import org.jppf.server.nio.client.ClientNioServer;
043import org.jppf.server.nio.nodeserver.*;
044import org.jppf.server.node.JPPFNode;
045import org.jppf.server.node.local.*;
046import org.jppf.server.protocol.ServerJob;
047import org.jppf.server.queue.JPPFPriorityQueue;
048import org.jppf.startup.JPPFDriverStartupSPI;
049import org.jppf.utils.*;
050import org.jppf.utils.configuration.JPPFProperties;
051import org.jppf.utils.hooks.HookFactory;
052import org.jppf.utils.stats.JPPFStatistics;
053import org.slf4j.*;
054
055/**
056 * This class serves as an initializer for the entire JPPF server. It follows the singleton pattern and provides access,
057 * across the JVM, to the tasks execution queue.
058 * <p>It also holds a server for incoming client connections, a server for incoming node connections, along with a class server
059 * to handle requests to and from remote class loaders.
060 * @author Laurent Cohen
061 * @author Lane Schwartz (dynamically allocated server port) 
062 */
063public class JPPFDriver {
064  // this static block must be the first thing executed when this class is loaded
065  static {
066    JPPFInitializer.init();
067  }
068  /**
069   * Logger for this class.
070   */
071  static Logger log = LoggerFactory.getLogger(JPPFDriver.class);
072  /**
073   * Determines whether debug-level logging is enabled.
074   */
075  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
076  /**
077   * Flag indicating whether collection of debug information is available via JMX.
078   * @exclude
079   */
080  public static final boolean JPPF_DEBUG = JPPFConfiguration.get(JPPFProperties.DEBUG_ENABLED);
081  /**
082   * Singleton instance of the JPPFDriver.
083   */
084  private static JPPFDriver instance = null;
085  /**
086   * Used for serialization / deserialization.
087   */
088  private static final ObjectSerializer serializer = new ObjectSerializerImpl();
089  /**
090   * Reference to the local node if it is enabled.
091   */
092  private JPPFNode localNode = null;
093  /**
094   * The queue that handles the tasks to execute. Objects are added to, and removed from, this queue, asynchronously and by multiple threads.
095   */
096  private final JPPFPriorityQueue taskQueue;
097  /**
098   * Serves the execution requests coming from client applications.
099   */
100  private ClientNioServer clientNioServer = null;
101  /**
102   * Serves the JPPF nodes.
103   */
104  private NodeNioServer nodeNioServer = null;
105  /**
106   * Serves class loading requests from the JPPF nodes.
107   */
108  private ClientClassNioServer clientClassServer = null;
109  /**
110   * Serves class loading requests from the JPPF nodes.
111   */
112  private NodeClassNioServer nodeClassServer = null;
113  /**
114   * Handles the initial handshake and peer channel identification.
115   */
116  private AcceptorNioServer acceptorServer = null;
117  /**
118   * Determines whether this server has initiated a shutdown, in which case it does not accept connections anymore.
119   */
120  private boolean shuttingDown = false;
121  /**
122   * Holds the statistics monitors.
123   */
124  private final JPPFStatistics statistics;
125  /**
126   * Manages and monitors the jobs throughout their processing within this driver.
127   */
128  private JPPFJobManager jobManager = null;
129  /**
130   * Uuid for this driver.log4j.logger.org.jppf.comm.interceptor=INFO
131
132   */
133  private final String uuid;
134  /**
135   * Performs initialization of the driver's components.
136   */
137  private DriverInitializer initializer = null;
138  /**
139   * Configuration for this driver.
140   */
141  private final TypedProperties config;
142  /**
143   * System ibnformation for this driver.
144   */
145  private JPPFSystemInformation systemInformation = null;
146
147  /**
148   * Initialize this JPPFDriver.
149   * @exclude
150   */
151  protected JPPFDriver() {
152    config = JPPFConfiguration.getProperties();
153    String s;
154    this.uuid = (s = config.getString("jppf.driver.uuid", null)) == null ? JPPFUuid.normalUUID() : s;
155    new JmxMessageNotifier(); // initialize the jmx logger
156    Thread.setDefaultUncaughtExceptionHandler(new JPPFDefaultUncaughtExceptionHandler());
157    new OutputRedirectHook().initializing(new UnmodifiableTypedProperties(config));
158    VersionUtils.logVersionInformation("driver", uuid);
159    SystemUtils.printPidAndUuid("driver", uuid);
160    statistics = createServerStatistics();
161    systemInformation = new JPPFSystemInformation(uuid, false, true, statistics);
162    statistics.addListener(new StatsSystemInformationUpdater(systemInformation));
163    jobManager = new JPPFJobManager();
164    taskQueue = new JPPFPriorityQueue(this, jobManager);
165    initializer = new DriverInitializer(this, config);
166  }
167
168  /**
169   * Initialize and start this driver.
170   * @throws Exception if the initialization fails.
171   * @exclude
172   */
173  @SuppressWarnings("unchecked")
174  public void run() throws Exception {
175    if (debugEnabled) log.debug("starting JPPF driver");
176    JPPFConnectionInformation info = initializer.getConnectionInformation();
177    initializer.registerDebugMBean();
178    initializer.initRecoveryServer();
179
180    RecoveryServer recoveryServer = initializer.getRecoveryServer();
181    int[] sslPorts = extractValidPorts(info.sslServerPorts);
182    boolean useSSL = (sslPorts != null) && (sslPorts.length > 0);
183    if (debugEnabled) log.debug("starting nio servers");
184    clientClassServer = startServer(recoveryServer, new ClientClassNioServer(this, useSSL));
185    nodeClassServer = startServer(recoveryServer, new NodeClassNioServer(this, useSSL));
186    clientNioServer = startServer(recoveryServer, new ClientNioServer(this, useSSL));
187    nodeNioServer = startServer(recoveryServer, new NodeNioServer(this, taskQueue, useSSL));
188    jobManager.loadTaskReturnListeners();
189    if (isManagementEnabled(config)) initializer.registerProviderMBeans();
190    initializer.initJmxServer();
191    HookFactory.registerSPIMultipleHook(JPPFDriverStartupSPI.class, null, null).invoke("run");
192    initializer.getNodeConnectionEventHandler().loadListeners();
193
194    acceptorServer = startServer(recoveryServer, new AcceptorNioServer(extractValidPorts(info.serverPorts), sslPorts));
195
196    if (config.get(JPPFProperties.LOCAL_NODE_ENABLED)) {
197      LocalClassLoaderChannel localClassChannel = new LocalClassLoaderChannel(new LocalClassContext());
198      localClassChannel.getContext().setChannel(localClassChannel);
199      LocalNodeChannel localNodeChannel = new LocalNodeChannel(new LocalNodeContext(nodeNioServer.getTransitionManager()));
200      localNodeChannel.getContext().setChannel(localNodeChannel);
201      final boolean offline = JPPFConfiguration.get(JPPFProperties.NODE_OFFLINE);
202      localNode = new JPPFLocalNode(new LocalNodeConnection(localNodeChannel), offline  ? null : new LocalClassLoaderConnection(localClassChannel));
203      nodeClassServer.initLocalChannel(localClassChannel);
204      nodeNioServer.initLocalChannel(localNodeChannel);
205      new Thread(localNode, "Local node").start();
206    }
207    initializer.initBroadcaster();
208    initializer.initPeers(clientClassServer);
209    if (debugEnabled) log.debug("JPPF Driver initialization complete");
210    System.out.println("JPPF Driver initialization complete");
211  }
212
213  /**
214   * Get the singleton instance of the JPPFDriver.
215   * @return a <code>JPPFDriver</code> instance.
216   */
217  public static JPPFDriver getInstance() {
218    return instance;
219  }
220
221  /**
222   * Get the queue that handles the tasks to execute.
223   * @return a JPPFQueue instance.
224   * @exclude
225   */
226  public JPPFPriorityQueue getQueue() {
227    return taskQueue;
228  }
229
230  /**
231   * Get the JPPF client server.
232   * @return a <code>ClientNioServer</code> instance.
233   * @exclude
234   */
235  public ClientNioServer getClientNioServer() {
236    return clientNioServer;
237  }
238
239  /**
240   * Get the JPPF class server.
241   * @return a <code>ClassNioServer</code> instance.
242   * @exclude
243   */
244  public ClientClassNioServer getClientClassServer() {
245    return clientClassServer;
246  }
247
248  /**
249   * Get the JPPF class server.
250   * @return a <code>ClassNioServer</code> instance.
251   * @exclude
252   */
253  public NodeClassNioServer getNodeClassServer() {
254    return nodeClassServer;
255  }
256
257  /**
258   * Get the JPPF nodes server.
259   * @return a <code>NodeNioServer</code> instance.
260   * @exclude
261   */
262  public NodeNioServer getNodeNioServer() {
263    return nodeNioServer;
264  }
265
266  /**
267   * Get the server which handles the initial handshake and peer channel identification.
268   * @return a {@link AcceptorNioServer} instance.
269   * @exclude
270   */
271  public AcceptorNioServer getAcceptorServer() {
272    return acceptorServer;
273  }
274
275  /**
276   * Determines whether this server has initiated a shutdown, in which case it does not accept connections anymore.
277   * @return true if a shutdown is initiated, false otherwise.
278   * @exclude
279   */
280  public boolean isShuttingDown() {
281    return shuttingDown;
282  }
283
284  /**
285   * Get this driver's unique identifier.
286   * @return the uuid as a string.
287   */
288  public String getUuid() {
289    return uuid;
290  }
291
292  /**
293   * Get a server-side representation of a job from its uuid.
294   * @param uuid the uuid of the job to lookup.
295   * @return a {@link JPPFDistributedJob} instance, or {@code null} if there is no job with the specified uuid.
296   */
297  public JPPFDistributedJob getJob(final String uuid) {
298    ServerJob serverJob = this.getQueue().getJob(uuid);
299    return (serverJob == null) ? null : serverJob.getJob();
300  }
301
302  /**
303   * Initialize this task with the specified parameters.<br>
304   * The shutdown is initiated after the specified shutdown delay has expired.<br>
305   * If the restart parameter is set to false then the JVM exits after the shutdown is complete.
306   * @param shutdownDelay delay, in milliseconds, after which the server shutdown is initiated. A value of 0 or less
307   * means an immediate shutdown.
308   * @param restart determines whether the server should restart after shutdown is complete.
309   * If set to false, then the JVM will exit.
310   * @param restartDelay delay, starting from shutdown completion, after which the server is restarted.
311   * A value of 0 or less means the server is restarted immediately after the shutdown is complete.
312   * @exclude
313   */
314  public void initiateShutdownRestart(final long shutdownDelay, final boolean restart, final long restartDelay) {
315    log.info("Scheduling server shutdown in " + shutdownDelay + " ms");
316    shuttingDown = true;
317    Timer timer = new Timer();
318    ShutdownRestartTask task = new ShutdownRestartTask(timer, restart, restartDelay, this);
319    timer.schedule(task, (shutdownDelay <= 0L) ? 0L : shutdownDelay);
320  }
321
322  /**
323   * Shutdown this server and all its components.
324   * @exclude
325   */
326  public void shutdown() {
327    log.info("Shutting down");
328    if (acceptorServer != null) acceptorServer.shutdown();
329    if (clientClassServer != null) clientClassServer.shutdown();
330    if (nodeClassServer != null) nodeClassServer.shutdown();
331    if (nodeNioServer != null) nodeNioServer.shutdown();
332    if (clientNioServer != null) clientNioServer.shutdown();
333    StateTransitionManager.shutdown(true);
334    initializer.stopBroadcaster();
335    initializer.stopPeerDiscoveryThread();
336    initializer.stopJmxServer();
337    jobManager.close();
338    initializer.stopRecoveryServer();
339  }
340
341  /**
342   * Get the object that manages and monitors the jobs throughout their processing within this driver.
343   * @return an instance of <code>JPPFJobManager</code>.
344   * @exclude
345   */
346  public JPPFJobManager getJobManager() {
347    return jobManager;
348  }
349
350  /**
351   * Get the object which manages the registration and unregistration of job
352   * dispatch listeners and notifies these listeners of job dispatch events.
353   * @return an instance of {@link TaskReturnManager}.
354   * @deprecated use {@link #getJobTasksListenerManager()} instead.
355   */
356  public TaskReturnManager getTaskReturnManager() {
357    return jobManager;
358  }
359
360  /**
361   * Get the object which manages the registration and unregistration of job
362   * dispatch listeners and notifies these listeners of job dispatch events.
363   * @return an instance of {@link JobTasksListenerManager}.
364   */
365  public JobTasksListenerManager getJobTasksListenerManager() {
366    return jobManager;
367  }
368
369  /**
370   * Get this driver's initializer.
371   * @return a <code>DriverInitializer</code> instance.
372   * @exclude
373   */
374  public DriverInitializer getInitializer() {
375    return initializer;
376  }
377
378  /**
379   * Start the JPPF server.
380   * @param args not used.
381   * @exclude
382   */
383  public static void main(final String...args) {
384    try {
385      if (debugEnabled) log.debug("starting the JPPF driver");
386      if ((args == null) || (args.length <= 0))
387        throw new JPPFException("The driver should be run with an argument representing a valid TCP port or 'noLauncher'");
388      if (!"noLauncher".equals(args[0])) {
389        int port = Integer.parseInt(args[0]);
390        new LauncherListener(port).start();
391      }
392      (instance = new JPPFDriver()).run();
393    } catch(Exception e) {
394      e.printStackTrace();
395      log.error(e.getMessage(), e);
396      if (JPPFConfiguration.get(JPPFProperties.SERVER_EXIT_ON_SHUTDOWN)) System.exit(1);
397    }
398  }
399
400  /**
401   * Start server, register it to recovery server if requested and print initialization message.
402   * @param recoveryServer Recovery server for nioServers that implements ReaperListener
403   * @param nioServer starting nio server
404   * @param <T> the type of the server to start
405   * @return started nioServer
406   */
407  private static <T extends NioServer> T startServer(final RecoveryServer recoveryServer, final T nioServer) {
408    if(nioServer == null) throw new IllegalArgumentException("nioServer is null");
409    if (debugEnabled) log.debug("starting nio server {}", nioServer);
410    if(recoveryServer != null && nioServer instanceof ReaperListener) {
411      Reaper reaper = recoveryServer.getReaper();
412      reaper.addReaperListener((ReaperListener) nioServer);
413    }
414    nioServer.start();
415    printInitializedMessage(nioServer.getPorts(), nioServer.getSSLPorts(), nioServer.getName());
416    return nioServer;
417  }
418
419  /**
420   * Print a message to the console to signify that the initialization of a server was successful.
421   * @param ports the ports on which the server is listening.
422   * @param sslPorts SSL ports for initialization message.
423   * @param name the name to use for the server.
424   */
425  private static void printInitializedMessage(final int[] ports, final int[] sslPorts, final String name) {
426    StringBuilder sb = new StringBuilder();
427    if (name != null) {
428      sb.append(name);
429      sb.append(" initialized");
430    }
431    if ((ports != null) && (ports.length > 0)) {
432      sb.append("\n-  accepting plain connections on port");
433      if (ports.length > 1) sb.append('s');
434      for (int n: ports) sb.append(' ').append(n);
435    }
436    if ((sslPorts != null) && (sslPorts.length > 0)) {
437      sb.append("\n- accepting secure connections on port");
438      if (sslPorts.length > 1) sb.append('s');
439      for (int n: sslPorts) sb.append(' ').append(n);
440    }
441    System.out.println(sb.toString());
442    if (debugEnabled) log.debug(sb.toString());
443  }
444
445  /**
446   * Determine whether management is enabled and if there is an active remote connector server.
447   * @return <code>true</code> if management is enabled, <code>false</code> otherwise.
448   * @param config the configuration to test whether management is enabled.
449   */
450  private static boolean isManagementEnabled(final TypedProperties config) {
451    return config.get(JPPFProperties.MANAGEMENT_ENABLED) || config.get(JPPFProperties.MANAGEMENT_SSL_ENABLED);
452  }
453
454  /**
455   * Get the system ibnformation for this driver.
456   * @return a {@link JPPFSystemInformation} instance.
457   */
458  public JPPFSystemInformation getSystemInformation() {
459    return systemInformation;
460  }
461
462  /**
463   * Extract only th valid ports from the input array.
464   * @param ports the array of port numbers to check.
465   * @return an array, possibly of length 0, containing all the valid port numbers in the input array.
466   */
467  private int[] extractValidPorts(final int[] ports) {
468    if ((ports == null) || (ports.length == 0)) return ports;
469    List<Integer> list = new ArrayList<>();
470    for (int port: ports) {
471      if (port >= 0) list.add(port);
472    }
473    int[] result = new int[list.size()];
474    for (int i=0; i<result.length; i++) result[i] = list.get(i);
475    return result;
476  }
477
478  /**
479   * Get the object holding the statistics monitors.
480   * @return a {@link JPPFStatistics} instance.
481   */
482  public JPPFStatistics getStatistics() {
483    return statistics;
484  }
485
486  /**
487   * Get the object used for serialization / deserialization.
488   * @return an {@link ObjectSerializer} instance.
489   * @exclude
490   */
491  public static ObjectSerializer getSerializer() {
492    return serializer;
493  }
494
495  /**
496   * Get the jmx server used to manage and monitor this driver.
497   * @param secure specifies whether to get the ssl-based connector server. 
498   * @return a {@link JMXServer} instance.
499   */
500  public JMXServer getJMXServer(final boolean secure) {
501    return initializer.getJmxServer(secure);
502  }
503
504  /**
505   * Add a custom peer driver discovery mechanism to those already registered, if any.
506   * @param discovery the driver discovery to add.
507   */
508  public void addDriverDiscovery(final PeerDriverDiscovery discovery) {
509    initializer.discoveryHandler.addDiscovery(discovery);
510  }
511
512  /**
513   * Remove a custom peer driver discovery mechanism from those already registered.
514   * @param discovery the driver discovery to remove.
515   */
516  public void removeDriverDiscovery(final PeerDriverDiscovery discovery) {
517    initializer.discoveryHandler.removeDiscovery(discovery);
518  }
519}