001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.server;
019
020import static org.jppf.utils.stats.JPPFStatisticsHelper.createServerStatistics;
021
022import java.util.*;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.jppf.*;
026import org.jppf.classloader.*;
027import org.jppf.comm.discovery.JPPFConnectionInformation;
028import org.jppf.discovery.PeerDriverDiscovery;
029import org.jppf.job.JobTasksListenerManager;
030import org.jppf.logging.jmx.JmxMessageNotifier;
031import org.jppf.management.*;
032import org.jppf.nio.*;
033import org.jppf.nio.acceptor.AcceptorNioServer;
034import org.jppf.node.initialization.OutputRedirectHook;
035import org.jppf.node.protocol.JPPFDistributedJob;
036import org.jppf.process.LauncherListener;
037import org.jppf.serialization.ObjectSerializer;
038import org.jppf.server.job.JPPFJobManager;
039import org.jppf.server.nio.classloader.LocalClassContext;
040import org.jppf.server.nio.classloader.client.ClientClassNioServer;
041import org.jppf.server.nio.classloader.node.NodeClassNioServer;
042import org.jppf.server.nio.client.ClientNioServer;
043import org.jppf.server.nio.heartbeat.HeartbeatNioServer;
044import org.jppf.server.nio.nodeserver.*;
045import org.jppf.server.node.JPPFNode;
046import org.jppf.server.node.local.*;
047import org.jppf.server.protocol.ServerJob;
048import org.jppf.server.queue.JPPFPriorityQueue;
049import org.jppf.startup.JPPFDriverStartupSPI;
050import org.jppf.utils.*;
051import org.jppf.utils.concurrent.ThreadUtils;
052import org.jppf.utils.configuration.JPPFProperties;
053import org.jppf.utils.hooks.HookFactory;
054import org.jppf.utils.stats.JPPFStatistics;
055import org.slf4j.*;
056
057/**
058 * This class serves as an initializer for the entire JPPF server. It follows the singleton pattern and provides access,
059 * across the JVM, to the tasks execution queue.
060 * <p>It also holds a server for incoming client connections, a server for incoming node connections, along with a class server
061 * to handle requests to and from remote class loaders.
062 * @author Laurent Cohen
063 * @author Lane Schwartz (dynamically allocated server port) 
064 */
065public class JPPFDriver {
066  // this static block must be the first thing executed when this class is loaded
067  static {
068    JPPFInitializer.init();
069  }
070  /**
071   * Logger for this class.
072   */
073  static Logger log = LoggerFactory.getLogger(JPPFDriver.class);
074  /**
075   * Determines whether debug-level logging is enabled.
076   */
077  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
078  /**
079   * Flag indicating whether collection of debug information is available via JMX.
080   * @exclude
081   */
082  public static final boolean JPPF_DEBUG = JPPFConfiguration.get(JPPFProperties.DEBUG_ENABLED);
083  /**
084   * Singleton instance of the JPPFDriver.
085   */
086  private static JPPFDriver instance;
087  /**
088   * Used for serialization / deserialization.
089   */
090  private static final ObjectSerializer serializer = new ObjectSerializerImpl();
091  /**
092   * Reference to the local node if it is enabled.
093   */
094  private JPPFNode localNode;
095  /**
096   * The queue that handles the tasks to execute. Objects are added to, and removed from, this queue, asynchronously and by multiple threads.
097   */
098  private final JPPFPriorityQueue taskQueue;
099  /**
100   * Serves the execution requests coming from client applications.
101   */
102  private ClientNioServer clientNioServer;
103  /**
104   * Serves the JPPF nodes.
105   */
106  private NodeNioServer nodeNioServer;
107  /**
108   * Serves class loading requests from the JPPF nodes.
109   */
110  private ClientClassNioServer clientClassServer;
111  /**
112   * Serves class loading requests from the JPPF nodes.
113   */
114  private NodeClassNioServer nodeClassServer;
115  /**
116   * Handles the initial handshake and peer channel identification.
117   */
118  private AcceptorNioServer acceptorServer;
119  /**
120   * Handles the heartbeat messages with the nodes.
121   */
122  private HeartbeatNioServer nodeHeartbeatServer;
123  /**
124   * Handles the heartbeat messages with the clients.
125   */
126  private HeartbeatNioServer clientHeartbeatServer;
127  /**
128   * Determines whether this server has scheduled a shutdown.
129   */
130  private final AtomicBoolean shutdownSchduled = new AtomicBoolean(false);
131  /**
132   * Determines whether this server has initiated a shutdown, in which case it does not accept connections anymore.
133   */
134  final AtomicBoolean shuttingDown = new AtomicBoolean(false);
135  /**
136   * Holds the statistics monitors.
137   */
138  private final JPPFStatistics statistics;
139  /**
140   * Manages and monitors the jobs throughout their processing within this driver.
141   */
142  private JPPFJobManager jobManager;
143  /**
144   * Uuid for this driver.
145   */
146  private final String uuid;
147  /**
148   * Performs initialization of the driver's components.
149   */
150  private DriverInitializer initializer;
151  /**
152   * Configuration for this driver.
153   */
154  private final TypedProperties config;
155  /**
156   * System ibnformation for this driver.
157   */
158  private JPPFSystemInformation systemInformation;
159
160  /**
161   * Initialize this JPPFDriver.
162   * @exclude
163   */
164  protected JPPFDriver() {
165    config = JPPFConfiguration.getProperties();
166    final String s;
167    this.uuid = (s = config.getString("jppf.driver.uuid", null)) == null ? JPPFUuid.normalUUID() : s;
168    new JmxMessageNotifier(); // initialize the jmx logger
169    Thread.setDefaultUncaughtExceptionHandler(new JPPFDefaultUncaughtExceptionHandler());
170    new OutputRedirectHook().initializing(new UnmodifiableTypedProperties(config));
171    VersionUtils.logVersionInformation("driver", uuid);
172    SystemUtils.printPidAndUuid("driver", uuid);
173    statistics = createServerStatistics();
174    systemInformation = new JPPFSystemInformation(uuid, false, true, statistics);
175    statistics.addListener(new StatsSystemInformationUpdater(systemInformation));
176    initializer = new DriverInitializer(this, config);
177    initializer.initDatasources();
178    jobManager = new JPPFJobManager();
179    taskQueue = new JPPFPriorityQueue(this, jobManager);
180  }
181
182  /**
183   * Initialize and start this driver.
184   * @throws Exception if the initialization fails.
185   * @exclude
186   */
187  public void run() throws Exception {
188    if (debugEnabled) log.debug("starting JPPF driver");
189    final JPPFConnectionInformation info = initializer.getConnectionInformation();
190    initializer.handleDebugActions();
191
192    final int[] sslPorts = extractValidPorts(info.sslServerPorts);
193    final boolean useSSL = (sslPorts != null) && (sslPorts.length > 0);
194    if (debugEnabled) log.debug("starting nio servers");
195    if (JPPFConfiguration.get(JPPFProperties.RECOVERY_ENABLED)) {
196      nodeHeartbeatServer = initHeartbeatServer(JPPFIdentifiers.NODE_HEARTBEAT_CHANNEL, useSSL);
197      clientHeartbeatServer = initHeartbeatServer(JPPFIdentifiers.CLIENT_HEARTBEAT_CHANNEL, useSSL);
198    }
199    NioHelper.putServer(JPPFIdentifiers.CLIENT_CLASSLOADER_CHANNEL, clientClassServer = startServer(new ClientClassNioServer(this, useSSL)));
200    NioHelper.putServer(JPPFIdentifiers.NODE_CLASSLOADER_CHANNEL, nodeClassServer = startServer(new NodeClassNioServer(this, useSSL)));
201    NioHelper.putServer(JPPFIdentifiers.CLIENT_JOB_DATA_CHANNEL, clientNioServer = startServer(new ClientNioServer(this, useSSL)));
202    NioHelper.putServer(JPPFIdentifiers.NODE_JOB_DATA_CHANNEL, nodeNioServer = startServer(new NodeNioServer(this, taskQueue, useSSL)));
203    NioHelper.putServer(JPPFIdentifiers.ACCEPTOR_CHANNEL, acceptorServer = new AcceptorNioServer(extractValidPorts(info.serverPorts), sslPorts, statistics));
204    jobManager.loadTaskReturnListeners();
205    if (isManagementEnabled(config)) initializer.registerProviderMBeans();
206    initializer.initJmxServer();
207    HookFactory.registerSPIMultipleHook(JPPFDriverStartupSPI.class, null, null).invoke("run");
208    initializer.getNodeConnectionEventHandler().loadListeners();
209
210    startServer(acceptorServer);
211
212    if (config.get(JPPFProperties.LOCAL_NODE_ENABLED)) {
213      final LocalClassLoaderChannel localClassChannel = new LocalClassLoaderChannel(new LocalClassContext());
214      localClassChannel.getContext().setChannel(localClassChannel);
215      final LocalNodeChannel localNodeChannel = new LocalNodeChannel(new LocalNodeContext(nodeNioServer.getTransitionManager()));
216      localNodeChannel.getContext().setChannel(localNodeChannel);
217      final boolean offline = JPPFConfiguration.get(JPPFProperties.NODE_OFFLINE);
218      localNode = new JPPFLocalNode(new LocalNodeConnection(localNodeChannel), offline  ? null : new LocalClassLoaderConnection(localClassChannel));
219      nodeClassServer.initLocalChannel(localClassChannel);
220      nodeNioServer.initLocalChannel(localNodeChannel);
221      ThreadUtils.startDaemonThread(localNode, "Local node");
222    }
223    initializer.initBroadcaster();
224    initializer.initPeers();
225    taskQueue.getPersistenceHandler().loadPersistedJobs();
226    if (debugEnabled) log.debug("JPPF Driver initialization complete");
227    System.out.println("JPPF Driver initialization complete");
228  }
229
230  /**
231   * Get the singleton instance of the JPPFDriver.
232   * @return a <code>JPPFDriver</code> instance.
233   */
234  public static JPPFDriver getInstance() {
235    return instance;
236  }
237
238  /**
239   * Get the queue that handles the tasks to execute.
240   * @return a JPPFQueue instance.
241   * @exclude
242   */
243  public JPPFPriorityQueue getQueue() {
244    return taskQueue;
245  }
246
247  /**
248   * Get the JPPF client server.
249   * @return a <code>ClientNioServer</code> instance.
250   * @exclude
251   */
252  public ClientNioServer getClientNioServer() {
253    return clientNioServer;
254  }
255
256  /**
257   * Get the JPPF class server.
258   * @return a <code>ClassNioServer</code> instance.
259   * @exclude
260   */
261  public ClientClassNioServer getClientClassServer() {
262    return clientClassServer;
263  }
264
265  /**
266   * Get the JPPF class server.
267   * @return a <code>ClassNioServer</code> instance.
268   * @exclude
269   */
270  public NodeClassNioServer getNodeClassServer() {
271    return nodeClassServer;
272  }
273
274  /**
275   * Get the JPPF nodes server.
276   * @return a <code>NodeNioServer</code> instance.
277   * @exclude
278   */
279  public NodeNioServer getNodeNioServer() {
280    return nodeNioServer;
281  }
282
283  /**
284   * Get the server which handles the initial handshake and peer channel identification.
285   * @return a {@link AcceptorNioServer} instance.
286   * @exclude
287   */
288  public AcceptorNioServer getAcceptorServer() {
289    return acceptorServer;
290  }
291
292  /**
293   * Get this driver's unique identifier.
294   * @return the uuid as a string.
295   */
296  public String getUuid() {
297    return uuid;
298  }
299
300  /**
301   * Get a server-side representation of a job from its uuid.
302   * @param uuid the uuid of the job to lookup.
303   * @return a {@link JPPFDistributedJob} instance, or {@code null} if there is no job with the specified uuid.
304   */
305  public JPPFDistributedJob getJob(final String uuid) {
306    final ServerJob serverJob = this.getQueue().getJob(uuid);
307    return (serverJob == null) ? null : serverJob.getJob();
308  }
309
310  /**
311   * Initialize this task with the specified parameters.<br>
312   * The shutdown is initiated after the specified shutdown delay has expired.<br>
313   * If the restart parameter is set to false then the JVM exits after the shutdown is complete.
314   * @param shutdownDelay delay, in milliseconds, after which the server shutdown is initiated. A value of 0 or less
315   * means an immediate shutdown.
316   * @param restart determines whether the server should restart after shutdown is complete.
317   * If set to false, then the JVM will exit.
318   * @param restartDelay delay, starting from shutdown completion, after which the server is restarted.
319   * A value of 0 or less means the server is restarted immediately after the shutdown is complete.
320   * @exclude
321   */
322  public void initiateShutdownRestart(final long shutdownDelay, final boolean restart, final long restartDelay) {
323    if (shutdownSchduled.compareAndSet(false, true)) {
324      log.info("Scheduling server shutdown in " + shutdownDelay + " ms");
325      final Timer timer = new Timer();
326      final ShutdownRestartTask task = new ShutdownRestartTask(restart, restartDelay, this);
327      timer.schedule(task, (shutdownDelay <= 0L) ? 0L : shutdownDelay);
328    } else {
329      log.info("shutdown/restart request ignored because a previous request is already scheduled");
330    }
331  }
332
333  /**
334   * Shutdown this server and all its components.
335   * @exclude
336   */
337  public void shutdown() {
338    log.info("Shutting down");
339    if (acceptorServer != null) acceptorServer.shutdown();
340    if (nodeHeartbeatServer != null) nodeHeartbeatServer.shutdown();
341    if (clientHeartbeatServer != null) clientHeartbeatServer.shutdown();
342    if (clientClassServer != null) clientClassServer.shutdown();
343    if (nodeClassServer != null) nodeClassServer.shutdown();
344    if (nodeNioServer != null) nodeNioServer.shutdown();
345    if (clientNioServer != null) clientNioServer.shutdown();
346    NioHelper.shutdown(true);
347    initializer.stopBroadcaster();
348    initializer.stopPeerDiscoveryThread();
349    initializer.stopJmxServer();
350    jobManager.close();
351  }
352
353  /**
354   * Get the object that manages and monitors the jobs throughout their processing within this driver.
355   * @return an instance of <code>JPPFJobManager</code>.
356   * @exclude
357   */
358  public JPPFJobManager getJobManager() {
359    return jobManager;
360  }
361
362  /**
363   * Get the object which manages the registration and unregistration of job
364   * dispatch listeners and notifies these listeners of job dispatch events.
365   * @return an instance of {@link JobTasksListenerManager}.
366   */
367  public JobTasksListenerManager getJobTasksListenerManager() {
368    return jobManager;
369  }
370
371  /**
372   * Get this driver's initializer.
373   * @return a <code>DriverInitializer</code> instance.
374   * @exclude
375   */
376  public DriverInitializer getInitializer() {
377    return initializer;
378  }
379
380  /**
381   * Start the JPPF server.
382   * @param args not used.
383   * @exclude
384   */
385  public static void main(final String...args) {
386    try {
387      if (debugEnabled) log.debug("starting the JPPF driver");
388      if ((args == null) || (args.length <= 0))
389        throw new JPPFException("The driver should be run with an argument representing a valid TCP port or 'noLauncher'");
390      if (!"noLauncher".equals(args[0])) {
391        final int port = Integer.parseInt(args[0]);
392        new LauncherListener(port).start();
393      }
394      instance = new JPPFDriver();
395      if (debugEnabled) log.debug("Driver system properties: {}", SystemUtils.printSystemProperties());
396      instance.run();
397      final Object lock = new Object();
398      synchronized(lock) {
399        try {
400          while(true) lock.wait();
401        } catch (@SuppressWarnings("unused") final Exception e) {
402        }
403      }
404    } catch(final Exception e) {
405      e.printStackTrace();
406      log.error(e.getMessage(), e);
407      if (JPPFConfiguration.get(JPPFProperties.SERVER_EXIT_ON_SHUTDOWN)) System.exit(1);
408    }
409  }
410
411  /**
412   * Start a heartbeat server with the specified channel identifier.
413   * @param identifier the channel identifier for the server connections.
414   * @param useSSL whether to use SSL connectivity.
415   * @return the created server.
416   * @throws Exception if any error occurs.
417   */
418  private static HeartbeatNioServer initHeartbeatServer(final int identifier, final boolean useSSL) throws Exception {
419    final HeartbeatNioServer server = startServer(new HeartbeatNioServer(identifier, useSSL));
420    NioHelper.putServer(identifier, server);
421    return server;
422  }
423
424  /**
425   * Start server, register it to recovery server if requested and print initialization message.
426   * @param <T> the type of the server to start.
427   * @param nioServer the nio server to start.
428   * @return started nioServer
429   */
430  private static <T extends NioServer<?, ?>> T startServer(final T nioServer) {
431    if (nioServer == null) throw new IllegalArgumentException("nioServer is null");
432    if (debugEnabled) log.debug("starting nio server {}", nioServer);
433    nioServer.start();
434    printInitializedMessage(nioServer.getPorts(), nioServer.getSSLPorts(), nioServer.getName());
435    return nioServer;
436  }
437
438  /**
439   * Print a message to the console to signify that the initialization of a server was successful.
440   * @param ports the ports on which the server is listening.
441   * @param sslPorts SSL ports for initialization message.
442   * @param name the name to use for the server.
443   */
444  private static void printInitializedMessage(final int[] ports, final int[] sslPorts, final String name) {
445    final StringBuilder sb = new StringBuilder();
446    if (name != null) {
447      sb.append(name);
448      sb.append(" initialized");
449    }
450    if ((ports != null) && (ports.length > 0)) {
451      sb.append("\n-  accepting plain connections on port");
452      if (ports.length > 1) sb.append('s');
453      for (int n: ports) sb.append(' ').append(n);
454    }
455    if ((sslPorts != null) && (sslPorts.length > 0)) {
456      sb.append("\n- accepting secure connections on port");
457      if (sslPorts.length > 1) sb.append('s');
458      for (int n: sslPorts) sb.append(' ').append(n);
459    }
460    System.out.println(sb.toString());
461    if (debugEnabled) log.debug(sb.toString());
462  }
463
464  /**
465   * Determine whether management is enabled and if there is an active remote connector server.
466   * @return <code>true</code> if management is enabled, <code>false</code> otherwise.
467   * @param config the configuration to test whether management is enabled.
468   */
469  private static boolean isManagementEnabled(final TypedProperties config) {
470    return config.get(JPPFProperties.MANAGEMENT_ENABLED);
471  }
472
473  /**
474   * Get the system ibnformation for this driver.
475   * @return a {@link JPPFSystemInformation} instance.
476   */
477  public JPPFSystemInformation getSystemInformation() {
478    return systemInformation;
479  }
480
481  /**
482   * Extract only th valid ports from the input array.
483   * @param ports the array of port numbers to check.
484   * @return an array, possibly of length 0, containing all the valid port numbers in the input array.
485   */
486  private static int[] extractValidPorts(final int[] ports) {
487    if ((ports == null) || (ports.length == 0)) return ports;
488    final List<Integer> list = new ArrayList<>();
489    for (int port: ports) {
490      if (port >= 0) list.add(port);
491    }
492    final int[] result = new int[list.size()];
493    for (int i=0; i<result.length; i++) result[i] = list.get(i);
494    return result;
495  }
496
497  /**
498   * Get the object holding the statistics monitors.
499   * @return a {@link JPPFStatistics} instance.
500   */
501  public JPPFStatistics getStatistics() {
502    return statistics;
503  }
504
505  /**
506   * Get the object used for serialization / deserialization.
507   * @return an {@link ObjectSerializer} instance.
508   * @exclude
509   */
510  public static ObjectSerializer getSerializer() {
511    return serializer;
512  }
513
514  /**
515   * Get the jmx server used to manage and monitor this driver.
516   * @param secure specifies whether to get the ssl-based connector server. 
517   * @return a {@link JMXServer} instance.
518   */
519  public JMXServer getJMXServer(final boolean secure) {
520    return initializer.getJmxServer(secure);
521  }
522
523  /**
524   * Add a custom peer driver discovery mechanism to those already registered, if any.
525   * @param discovery the driver discovery to add.
526   */
527  public void addDriverDiscovery(final PeerDriverDiscovery discovery) {
528    initializer.discoveryHandler.addDiscovery(discovery);
529  }
530
531  /**
532   * Remove a custom peer driver discovery mechanism from those already registered.
533   * @param discovery the driver discovery to remove.
534   */
535  public void removeDriverDiscovery(final PeerDriverDiscovery discovery) {
536    initializer.discoveryHandler.removeDiscovery(discovery);
537  }
538
539  /**
540   * Determine whether this server has initiated a shutdown, in which case it does not accept connections anymore.
541   * @return {@code true} if a shutdown is initiated, {@code false} otherwise.
542   */
543  public boolean isShuttingDown() {
544    return shuttingDown.get();
545  }
546}