001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.node;
019
020import java.lang.reflect.Constructor;
021import java.security.*;
022import java.util.*;
023import java.util.concurrent.*;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.jppf.*;
027import org.jppf.classloader.*;
028import org.jppf.logging.jmx.JmxMessageNotifier;
029import org.jppf.node.connection.*;
030import org.jppf.node.initialization.InitializationHook;
031import org.jppf.process.LauncherListener;
032import org.jppf.security.JPPFPolicy;
033import org.jppf.server.node.JPPFNode;
034import org.jppf.utils.*;
035import org.jppf.utils.configuration.*;
036import org.jppf.utils.hooks.HookFactory;
037import org.slf4j.*;
038
039/**
040 * Bootstrap class for launching a JPPF node. The node class is dynamically loaded from a remote server.
041 * @author Laurent Cohen
042 */
043public class NodeRunner {
044  // this static block must be the first thing executed when this class is loaded
045  static {
046    JPPFInitializer.init();
047  }
048  /**
049   * Logger for this class.
050   */
051  private static Logger log = LoggerFactory.getLogger(NodeRunner.class);
052  /**
053   * Determines whether debug-level logging is enabled.
054   */
055  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
056  /**
057   * The ClassLoader used for loading the classes of the framework.
058   */
059  private static AbstractJPPFClassLoader classLoader = null;
060  /**
061   * Determine whether a security manager has already been set.
062   */
063  private static boolean securityManagerSet = false;
064  /**
065   * Container for data stored at the JVM level.
066   */
067  private static Hashtable<Object, Object> persistentData = new Hashtable<>();
068  /**
069   * Used to executed a JVM termination task;
070   */
071  private static ExecutorService executor = Executors.newFixedThreadPool(1);
072  /**
073   * The JPPF node.
074   */
075  private static JPPFNode node = null;
076  /**
077   * Used to synchronize start and stop methods when the node is run as a service.
078   */
079  private static ThreadSynchronization serviceLock = new ThreadSynchronization();
080  /**
081   * This node's universal identifier.
082   */
083  private static String uuid = JPPFConfiguration.getProperties().getString("jppf.node.uuid", JPPFUuid.normalUUID());
084  /**
085   * The offline node flag.
086   */
087  private static boolean offline = JPPFConfiguration.get(JPPFProperties.NODE_OFFLINE);
088  /**
089   * The initial configuration, such as read from the config file.
090   * The JPPF config is modified by the discovery mechanism, so we want to store the initial values somewhere.
091   */
092  private static TypedProperties initialConfig = null;
093  /**
094   * Determines whether this node is currently shutting down.
095   */
096  private static AtomicBoolean shuttingDown = new AtomicBoolean(false);
097  /**
098   * The current server connection information.
099   */
100  private static DriverConnectionInfo currentConnectionInfo = null;
101  /**
102   * 
103   */
104  private static LauncherListener launcherListener = null;
105  /**
106   * 
107   */
108  private static boolean ANDROID = JPPFConfiguration.get(JPPFProperties.NODE_ANDROID);
109
110  /**
111   * Run a node as a standalone application.
112   * @param args not used.
113   */
114  public static void main(final String...args) {
115    node = null;
116    try {
117      TypedProperties config = JPPFConfiguration.getProperties();
118      TypedProperties overrides = new ConfigurationOverridesHandler().load(true);
119      if (overrides != null) JPPFConfiguration.getProperties().putAll(overrides);
120      if (!ANDROID) new JmxMessageNotifier(); // initialize the jmx logger
121      Thread.setDefaultUncaughtExceptionHandler(new JPPFDefaultUncaughtExceptionHandler());
122      if (debugEnabled) log.debug("launching the JPPF node");
123      VersionUtils.logVersionInformation("node", uuid);
124      HookFactory.registerSPIMultipleHook(InitializationHook.class, null, null);
125      HookFactory.registerConfigSingleHook(JPPFProperties.SERVER_CONNECTION_STRATEGY, DriverConnectionStrategy.class, new JPPFDefaultConnectionStrategy(), null);
126      if ((args == null) || (args.length <= 0))
127        throw new JPPFException("The node should be run with an argument representing a valid TCP port or 'noLauncher'");
128      if (!"noLauncher".equals(args[0])) {
129        int port = Integer.parseInt(args[0]);
130        (launcherListener = new LauncherListener(port)).start();
131      }
132    } catch(Exception e) {
133      log.error(e.getMessage(), e);
134      System.exit(1);
135    }
136    try {
137      ConnectionContext context = new ConnectionContext("Initial connection", null, ConnectionReason.INITIAL_CONNECTION_REQUEST);
138      while (!getShuttingDown().get()) {
139        try {
140          if (initialConfig == null) initialConfig = new TypedProperties(JPPFConfiguration.getProperties());
141          else restoreInitialConfig();
142          node = createNode(context);
143          if (launcherListener != null) launcherListener.setActionHandler(new ShutdownRestartNodeProtocolHandler(node));
144          node.run();
145        } catch(JPPFNodeReconnectionNotification e) {
146          if (debugEnabled) log.debug("received reconnection notification : {}", ExceptionUtils.getStackTrace(e));
147          context = new ConnectionContext(e.getMessage(), e.getCause(), e.getReason());
148          if (classLoader != null) classLoader.close();
149          classLoader = null;
150          if (node != null) node.stopNode();
151          unsetSecurity();
152        }
153      }
154    } catch(Exception e) {
155      e.printStackTrace();
156    }
157  }
158
159  /**
160   * Run a node as a standalone application.
161   * @param args not used.
162   * @exclude
163   */
164  public static void start(final String...args) {
165    main(args);
166    serviceLock.goToSleep();
167  }
168
169  /**
170   * Run a node as a standalone application.
171   * @param args not used.
172   * @exclude
173   */
174  public static void stop(final String...args) {
175    serviceLock.wakeUp();
176    System.exit(0);
177  }
178
179  /**
180   * Start the node.
181   * @param connectionContext provides context information on the new connection request to the driver.
182   * @return the node that was started, as a <code>JPPFNode</code> instance.
183   * @throws Exception if the node failed to run or couldn't connect to the server.
184   * @exclude
185   */
186  public static JPPFNode createNode(final ConnectionContext connectionContext) throws Exception {
187    HookFactory.invokeHook(InitializationHook.class, "initializing", new UnmodifiableTypedProperties(initialConfig));
188    SystemUtils.printPidAndUuid("node", uuid);
189    currentConnectionInfo = (DriverConnectionInfo) HookFactory.invokeHook(DriverConnectionStrategy.class, "nextConnectionInfo", currentConnectionInfo, connectionContext)[0];
190    setSecurity();
191    //String className = "org.jppf.server.node.remote.JPPFRemoteNode";
192    String className = JPPFConfiguration.get(JPPFProperties.NODE_CLASS);
193    Class<?> clazz = getJPPFClassLoader().loadClass(className);
194    Constructor c = clazz.getConstructor(DriverConnectionInfo.class);
195    JPPFNode node = (JPPFNode) c.newInstance(currentConnectionInfo);
196    if (debugEnabled) log.debug("Created new node instance: " + node);
197    return node;
198  }
199
200  /**
201   * Restore the configuration from the snapshot taken at startup time.
202   */
203  public static void restoreInitialConfig() {
204    TypedProperties config = JPPFConfiguration.getProperties();
205    for (Map.Entry<Object, Object> entry: initialConfig.entrySet()) {
206      if ((entry.getKey() instanceof String) && (entry.getValue() instanceof String)) {
207        config.setProperty((String) entry.getKey(), (String) entry.getValue());
208      }
209    }
210  }
211
212  /**
213   * Set the security manager with the permission granted in the policy file.
214   * @throws Exception if the security could not be set.
215   */
216  private static void setSecurity() throws Exception {
217    if (!securityManagerSet) {
218      String s = JPPFConfiguration.get(JPPFProperties.POLICY_FILE);
219      if (s != null) {
220        if (debugEnabled) log.debug("setting security");
221        Policy.setPolicy(new JPPFPolicy(getJPPFClassLoader()));
222        System.setSecurityManager(new SecurityManager());
223        securityManagerSet = true;
224      }
225    }
226  }
227
228  /**
229   * Set the security manager with the permissions granted in the policy file.
230   */
231  private static void unsetSecurity() {
232    if (securityManagerSet) {
233      if (debugEnabled) log.debug("un-setting security");
234      PrivilegedAction<Object> pa = new PrivilegedAction<Object>() {
235        @Override
236        public Object run() {
237          System.setSecurityManager(null);
238          return null;
239        }
240      };
241      AccessController.doPrivileged(pa);
242      securityManagerSet = false;
243    }
244  }
245
246  /**
247   * Get the main classloader for the node. This method performs a lazy initialization of the classloader.
248   * @return a <code>AbstractJPPFClassLoader</code> used for loading the classes of the framework.
249   * @exclude
250   */
251  public static AbstractJPPFClassLoader getJPPFClassLoader() {
252    synchronized(JPPFClassLoader.class) {
253      if (classLoader == null) {
254        PrivilegedAction<JPPFClassLoader> pa = new PrivilegedAction<JPPFClassLoader>() {
255          @Override
256          public JPPFClassLoader run() {
257            return new JPPFClassLoader(offline ? null : new RemoteClassLoaderConnection(currentConnectionInfo), NodeRunner.class.getClassLoader());
258          }
259        };
260        classLoader = AccessController.doPrivileged(pa);
261        Thread.currentThread().setContextClassLoader(classLoader);
262      }
263      return classLoader;
264    }
265  }
266
267  /**
268   * Set a persistent object with the specified key.
269   * @param key the key associated with the object's value.
270   * @param value the object to persist.
271   */
272  public static void setPersistentData(final Object key, final Object value) {
273    persistentData.put(key, value);
274  }
275
276  /**
277   * Get a persistent object given its key.
278   * @param key the key used to retrieve the persistent object.
279   * @return the value associated with the key.
280   */
281  public static Object getPersistentData(final Object key) {
282    return persistentData.get(key);
283  }
284
285  /**
286   * Remove a persistent object.
287   * @param key the key associated with the object to remove.
288   * @return the value associated with the key, or null if the key was not found.
289   */
290  public static Object removePersistentData(final Object key) {
291    return persistentData.remove(key);
292  }
293
294  /**
295   * Get the JPPF node.
296   * @return a <code>Node</code> instance.
297   * @exclude
298   */
299  public static Node getNode() {
300    return node;
301  }
302
303  /**
304   * Shutdown and eventually restart the node.
305   * @param node the node to shutdown or restart.
306   * @param restart determines whether this node should be restarted by the node launcher.
307   * @exclude
308   */
309  public static void shutdown(final NodeInternal node, final boolean restart) {
310    //executor.submit(new ShutdownOrRestart(restart));
311    new ShutdownOrRestart(restart, node).run();
312  }
313
314  /**
315   * Stop the JMX server.
316   */
317  private static void stopJmxServer() {
318    try {
319      node.stopJmxServer();
320      Runnable r = new Runnable() {
321        @Override
322        public void run() {
323          try {
324            node.stopJmxServer();
325          } catch (Exception ignore) {
326          }
327        }
328      };
329      Future<?> f = executor.submit(r);
330      // we don't want to wait forever for the connection to close
331      try {
332        f.get(1000L, TimeUnit.MILLISECONDS);
333      } catch (Exception ignore) {
334      }
335    } catch (Exception ignore) {
336    }
337  }
338
339  /**
340   * Task used to terminate the JVM.
341   * @exclude
342   */
343  public static class ShutdownOrRestart implements Runnable {
344    /**
345     * True if the node is to be restarted, false to only shut it down.
346     */
347    private boolean restart = false;
348    /**
349     * True if the node is to be restarted, false to only shut it down.
350     */
351    private final NodeInternal node;
352
353    /**
354     * Initialize this task.
355     * @param restart true if the node is to be restarted, false to only shut it down.
356     * @param node this node.
357     */
358    public ShutdownOrRestart(final boolean restart, final NodeInternal node) {
359      this.restart = restart;
360      this.node = node;
361    }
362
363    @Override
364    public void run() {
365      AccessController.doPrivileged(new PrivilegedAction<Object>() {
366        @Override
367        public Object run() {
368          node.stopNode();
369          // close the JMX server connection to avoid request being sent again by the client.
370          stopJmxServer();
371          try {
372            Thread.sleep(500L);
373          } catch(Exception ignore) {
374          }
375          System.exit(restart ? 2 : 0);
376          return null;
377        }
378      });
379    }
380  }
381
382  /**
383   * This node's universal identifier.
384   * @return a uuid as a string.
385   */
386  public static String getUuid() {
387    return uuid;
388  }
389
390  /**
391   * Determine whether this node is currently shutting down.
392   * @return an {@link AtomicBoolean} instance whose value is <code>true</code> if the node is shutting down, <code>false</code> otherwise.
393   */
394  public static AtomicBoolean getShuttingDown() {
395    return shuttingDown;
396  }
397
398  /**
399   * Get the offline node flag.
400   * @return <code>true</code> if the node is offline, <code>false</code> otherwise.
401   */
402  public static boolean isOffline() {
403    return offline;
404  }
405}