JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   Forums 

Node management

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Management and monitoring > Node management


Out of the box in JPPF, each node provides 2 MBeans that can be accessed remotely using a JMXMP remote connector with the JMX URL “service:jmx:jmxmp://host:port”, where host is the host name or IP address of the machine where the node is running (value of “jppf.management.host” in the node configuration file), and port is the value of the property “jppf.management.port” specified in the node's configuration file.

1 Node-level management and monitoring MBean

MBean name: “org.jppf:name=admin,type=node

This is also the value of the constant JPPFNodeAdminMBean.MBEAN_NAME.

This MBean's role is to perform management and monitoring at the node level, however we will see that it also has (for historical reasons) some task-level management functions. It exposes the JPPFNodeAdminMBean interface, which provides the functionalities described hereafter.

1.1 Getting a snapshot of the node's state

This is done by invoking the following method on the MBean:

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Get the latest state information from the node
  public JPPFNodeState state() throws Exception;
}

This method returns a JPPFNodeState object, which provides the following information on the node:

public class JPPFNodeState implements Serializable {
  // the status of the connection with the server
  public String getConnectionStatus()

  // the current task execution status
  public String getExecutionStatus()

  // the cpu time consumed by the node's execution threads
  // this includes the tasks cpu time and some JPPF processing overhead
  public long getCpuTime()

  // the total number of tasks executed
  public int getNbTasksExecuted()

  // the current size of the pool of threads used for tasks execution
  public int getThreadPoolSize()

  // the current priority assigned to the execution threads
  public int getThreadPriority()
}

1.2 Updating the execution thread pool properties

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Set the size of the node's execution thread pool
  public void updateThreadPoolSize(Integer size) throws Exception;

  // Update the priority of all execution threads
  public void updateThreadsPriority(Integer newPriority) throws Exception;
}

1.3 Shutting down and restarting the node

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Restart the node immediately
  public void restart() throws Exception;

  // Restart the node once it is idle
  public void restart(Boolean interruptIfRunning) throws Exception;

  // Shutdown the node immediately
  public void shutdown() throws Exception;

  // Shutdown the node once it is idle
  public void shutdown(Boolean interruptIfRunning) throws Exception;

  // Determine wether a deffered shutdown or restart was requested and not yet performed
  public NodePendingAction pendingAction();

  // Cancel a previous deferred shutdown or restart request, if any
  public void cancelPendingAction();
}

These methods should be used with precautions. Please note that, once shutdown() has been invoked, it is not possible anymore to restart the node remotely.

Calling restart() or shutdown() is equivalent to calling restart(true) or shutdown(true), respectively. When any of these methods is invoked without the interruptIfRunning flag, or when the flag's value is true, the tasks that were being executed, if any, are automatically resubmitted to the server queue.

When the interruptIfRunning parameter is false, the node will wait until no more tasks are being executed before restarting or sutting down. Thus, interruptIfRunning = false indicates a deferred operation request.

It is possible to query whether a deferred action has been requested with the pendingAction() method, which returns a PendingNodeAction enum element, defined as follows:

public enum NodePendingAction {
  // There is no pending action
  NONE,
  // A deferred shutdown was requested
  SHUTDOWN,
  // A deferred restart was requested
  RESTART;
}

Finally, a deferred action can be cancelled with cancelPendingAction(), provided the action hasn't yet started.

1.4 Updating the executed tasks counter

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Reset the node's executed tasks counter to zero
  public void resetTaskCounter() throws Exception;

  // Reset the node's executed tasks counter to the specified value
  public void setTaskCounter(Integer n) throws Exception;
}

Please note that resetTaskCounter() is equivalent to setTaskCounter(0).

1.5 Getting information about the node's host

public interface JPPFAdminMBean extends Serializable {
  // Get detailed information about the node's JVM properties, environment variables,
  // memory usage, available processors andavailable storage space
  JPPFSystemInformation systemInformation() throws Exception;
}

This method returns an object of type JPPFSystemInformation, which is a snapshot of the environment of the JPPF node, the JVM and the host they run on. The properties defined in this object are also those used by execution policies, as we have seen in section 3.4.1 of this manual.

JPPFSystemInformation provides information about 6 different aspects of the environment:

public class JPPFSystemInformation implements Serializable {
  // get the system properties
  public TypedProperties getSystem()
  // get runtime information about JVM memory and available processors
  public TypedProperties getRuntime()
  // get the host environment variables
  public TypedProperties getEnv()
  // get IPV4 and IPV6 addresses assigned to the host
  public TypedProperties getNetwork()
  // get the JPPF configuration properties
  public TypedProperties getJppf()
  // get information on available disk storage
  public TypedProperties getStorage()
}

We encourage the reader to follow the links to the above methods' Javadoc, to obtain details on each set of information, and how the information is formatted and named.

Each of the methods in JPPFSystemInformation returns a TypedProperties object. TypedProperties is a subclass of the standard java.util.Properties that provides convenience methods to read property values as primitive types other than String.

1.6 Canceling a job

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Cancel the job with the specified uuid. The requeue parameter determines
  // whether the job should be requeued on the server side or not
  public void cancelJob(String jobUuid, Boolean requeue) throws Exception;
}

This MBean method is used to cancel a job currently running in the node. The job is identified by its uuid. The requeue parameter is used to notify the server that the canceled job should be requeued on the server and executed again, possibly on an other node. If requeue is false, the job is simply terminated and any remaining task will not be executed.

This method should normally only be used by the JPPF server, in the case where a user requested that the server terminates a job. In effect, a job can contain several tasks, with each task potentially executed concurrently on a separate node. When the server receives a job termination request, it will handle the termination of “sub-jobs” (i.e. subsets of the tasks in the job) by notifying each corresponding node.

1.7 Updating the node's configuration properties

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Update the configuration properties of the node and optionally restart the node.
  void updateConfiguration(Map<String, String> config, Boolean restart) throws Exception;
  void updateConfiguration(Map<String, String> config, Boolean restart,
     Boolean interrupIfRunning) throws Exception;
}

These methods send a set of configuration properties to the node, that will override those defined in the node's configuration file. The restart parameter will allow the node to take the changes into account, especially in the case where the server connection or discovery properties have been changed, for instance to force the node to connect to another server, or when you need to restart the node with a different JVM by specifying the jppf.java.path property.

Note that, when the restart parameter is true, the updateConfiguration() method will call restart() or restart(interruptIfRunning) after storing the configuration updates, depending on which method overload is invoked initially.

Finally, the interruptIfRunning parameter specifies whether the node should be restarted initially, or wait until it no longer has any task to execute.

2 Task-level monitoring

MBean name : “org.jppf:name=task.monitor,type=node”.

This is also the value of the constant JPPFNodeTaskMonitorMBean.MBEAN_NAME

This MBean monitors the task activity within a node. It exposes the interface JPPFNodeTaskMonitorMBean and also emits JMX notifications of type TaskExecutionNotification.

2.1 Snapshot of the tasks activity

The interface JPPFNodeTaskMonitorMBean provides access to aggregated statistics on the tasks executed within a node:

public interface JPPFNodeTaskMonitorMBean extends NotificationEmitter {
  // The total number of tasks executed by the node
  Integer getTotalTasksExecuted();

  // The total number of tasks that ended in error
  Integer getTotalTasksInError();

  // The total number of tasks that executed sucessfully
  Integer getTotalTasksSucessfull();

  // The total cpu time used by the tasks in milliseconds
  Long getTotalTaskCpuTime();

  // The total elapsed time used by the tasks in milliseconds
  Long getTotalTaskElapsedTime();
}

2.2 Notification of tasks execution

Each time a task completes its execution in a node, the task monitor MBean will emit a JMX notification of type TaskExecutionNotification defined as follows:

public class TaskExecutionNotification extends Notification {
  // Get the object encapsulating information about the task
  public TaskInformation getTaskInformation();

  // Whether this is a user-defined notification sent from a task
  public boolean isUerNotification();
}

This notification essentially encapsulates an object of type TaskInformation, which provides the following information about each executed task:

public class TaskInformation implements Serializable {
  // Get the task id
  public String getId()

  // Get the uuid of the job this task belongs to
  public String getJobId()

  // Get the cpu time used by the task
  public long getCpuTime()

  // Get the wall clock time used by the task
  public long getElapsedTime()

  // Determines whether the task had an exception
  public boolean hasError()

  // Get the timestamp for the task completion
  // Caution: this value is related to the node's system time,
  // not to the time of the notification receiver
  public long getTimestamp()

  // Get the position of the task in the job to which it belongs
  public int getJobPosition()
}

TaskExecutionNotification also inherits the method getUserData(), which returns the object specified by the user code when calling Task.fireNotification(Object, boolean) with the second parameter set to true.

Additionally, the method isUserNotification() allows you to unambiguously distinguish between user-defined notifications, sent via Task.fireNotification(Object, boolean), and task completion notifications automatcially sent by the JPPF nodes.

3 Node maintenance

MBean name : "org.jppf:name=node.maintenance,type=node".
This is also the value of the constant JPPFNodeMaintenanceMBean.MBEAN_NAME

This MBean provides operations for the maintenance of a node. It exposes the interface JPPFNodeMaintenanceMBean defined as follows:

public interface JPPFNodeMaintenanceMBean extends Serializable {
  // object name for this MBean
  String MBEAN_NAME = "org.jppf:name=node.maintenance,type=node";
 
  // request a reset of the resource caches of all the JPPF class loaders
  // maintained by the node
  void requestResourceCacheReset() throws Exception;
}

Please note that the method requestResourceCacheReset() does not perform the reset immediately. Instead, it sets an internal flag, and the reset will take place when it is safe to do so, as part of the node's life cycle. The outcome of the reset operation is that the temporary files created by the JPPF class loaders wil be deleted, freeing space in the temporary files folder.

4 Node provisioning

Any JPPF node has the ability to start new nodes on the same physical or virtual machine, and stop and monitor these nodes afterwards. This provides a node provisioning facility, which allows to dynamically grow or shrink a JPPF grid based on the workload requirements.

This provisioning ability establishes a master/slave relationship between a standard node (master) and the nodes that it starts (slaves). Please note that a slave node cannot be in turn used as a master. Apart from this restriction, slave nodes can be managed and monitored as any oher node.

4.1 Node provisioning MBean

Node provisioning is implemented with a dedicated Mbean, which exposes the JPPFNodeProvisioningMBean interface:

MBean name: "org.jppf:name=provisioning,type=node"
This is also the value of the constant JPPFNodeProvisioningMBean.MBEAN_NAME.

JPPFNodeProvisioningMBean is defined as follows:

public interface JPPFNodeProvisioningMBean {
  // The object name of this MBean
  String MBEAN_NAME = "org.jppf:name=provisioning,type=node";
  // Constant for notifications that a slave node has started
  String SLAVE_STARTED_NOTIFICATION_TYPE = "slave_started";
  // Constant for notifications that a slave node has stopped
  String SLAVE_STOPPED_NOTIFICATION_TYPE = "slave_stopped";

  // Get the number of slave nodes started by this MBean
  int getNbSlaves();

  // Start or stop the required number of slaves to reach the specified number,
  // with an interrupt flag set to true
  void provisionSlaveNodes(int nbNodes);
  // Same action, explicitely specifiying the interrupt flag
  void provisionSlaveNodes(int nbNodes, boolean interruptIfRunning);

  // Start or stop the required number of slaves to reach the specified number,
  // using the specified configuration overrides and an interrupt flag set to true
  void provisionSlaveNodes(int nbNodes, TypedProperties configOverrides);
  // Same action, explicitely specifiying the interrupt flag
  void provisionSlaveNodes(int nbNodes, boolean interruptIfRunning, TypedProperties configOverrides);
}

The method provisionSlaveNodes(int) will start or stop a number of slave nodes, according to how many slaves are already started.For instance, if 4 slaves are already running and provisionSlaveNodes(2) is invoked, then 2 slaves will be stopped. Inversely, if no slave is running, then 2 slave nodes will be started.

The method provisionSlaveNodes(int, TypedProperties) behaves differently, unless the second argument is null. Since the argument of type TypedProperties specifies overrides of the slaves configuration, this means that all already running slaves must be restarted to take these configuration changes into account. Thus, this method will first stop all running slaves, then start the specified number of slaves, after applying the configuration overrides.

When the interruptIfRunning flag is set to false, it will cause slave nodes to stop only once they are idle, that is they will not stop immediately if they are executing tasks at the time the provisioning request is made. This flag is true by default, in the provisionSlaveNodes() methods that do not specify it.

Therefore, provisionSlaveNodes(n), provisionSlaveNodes(n, null), provisionSlaveNodes(n, true) and provisiongSlaveNodes(n, true, null) all have the same effect.

The following example shows how to start new slave nodes with a single processing thread each:

// connect to the node's JMX server
JMXNodeConnectionWrapper jmxNode = new JMXNodeConnectionWrapper(host, port, false);
// create a provisioning proxy instance
String mbeanName = JPPFNodeProvisioningMBean.MBEAN_NAME;
JPPFNodeProvisioningMBean provisioning = jmxNode.getProxy(mbeanName, JPPFNodeProvisioningMBean.class);
// set the configuration with a single processing thread
TypedProperties overrides = new TypedProperties().setInt("jppf.processing.threads", 1);
// start 2 slaves with the config overrides
provisioning.provisionSlaveNodes(2, overrides);
// check the number of slave nodes
int nbSlaves = provisioning.getNbSlaves();
// or, using jmxNode directly get it as a JMX attribute
nbSlaves = (Integer) jmxNode.getAttribute(mbeanName, "NbSlaves");

4.2 Provisioning notifications

The node provisioning MBean also emits notfications when a slave node is started or stopped. These notifications are standard JMX Notification instances, with the following characteristics:

Note that getExitCode() always returns -1 for slave started notifications.

In the following example, we define a NotificationListener which receives provisioning notifications and prints out the relevant information for each slave node that is started or stopped:

public class MyProvisioningListener implements NotificationListener {
  @Override
  public void handleNotification(Notification notif, Object handback) {
    switch(notif.getType()) {
      // case when a slave node is started
      case JPPFNodeProvisioningMBean.SLAVE_STARTED_NOTIFICATION_TYPE:
        System.out.println("slave node started: " + notif.getUserData());
        break;

      // case when a slave node is stopped
      case JPPFNodeProvisioningMBean.SLAVE_STOPPED_NOTIFICATION_TYPE:
        System.out.println("slave node stopped: " + notif.getUserData());
        break;
    }
  }
}

The notification listener can then be used as follows:

try (JMXNodeConnectionWrapper nodeJmx = new JMXNodeConnectionWrapper("localhost", 12001)) {
  if (nodeJmx.connectAndWait(5000L)) {
    JPPFNodeProvisioningMBean provisioning = nodeJmx.getJPPFNodeProvisioningProxy();
    // register a provisioning notification listener
    provisioning.addNotificationListener(new MyProvisioningListener(), null, null);
    // start 2 slave nodes and wait for the output of the notification listener
    provisioning.provisionSlaveNodes(2);
  }
}

5 Accessing and using the node MBeans

JPPF provides an API that simplifies access to the JMX-based management features of a node, by abstracting most of the complexities of JMX programming. This API is represented by the class JMXNodeConnectionWrapper, which provides a simplified way of connecting to the node's MBean server, along with a set of convenience methods to easily access the MBeans' exposed methods and attributes.

5.1 Connecting to an MBean server

Connection to to a node MBean server is done in two steps:

a. Create an instance of JMXNodeConnectionWrapper

To connect to a local (same JVM, no network connection involved) MBean server, use the no-arg constructor:

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper();

To connect to a remote MBean server, use the constructor specifying the management host, port and secure flag:

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);

Here host and port represent the node's configuration properties "jppf.management.host" and "jppf.management.port", and secure is a boolean flag indicating whether network transport is secured via SSL/TLS.

b. Initiate the connection to the MBean server and wait until it is established

There are two ways to do this:

Synchronously:

// connect and wait for the connection to be established
// choose a reasonable value for the timeout, or 0 for no timeout
wrapper.connectAndWait(timeout);

Asynchronously:

// initiate the connection; this method returns immediately
wrapper.connect()

// ... do something else ...

// check if we are connected
if (wrapper.isConnected()) ...;
else ...;

5.2 Direct use of the JMX wrapper

JMXNodeConnectionWrapper implements directly the interface JPPFNodeAdminMBean. This means that all the methods of this interface can be used directly from the JMX wrapper. For example:

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
wrapper.connectAndWait(timeout);

// get the number of tasks executed since the last reset
int nbTasks = wrapper.state().getNbTasksExecuted();
// stop the node
wrapper.shutdown();

5.3 Use of the JMX wrapper's invoke() method

JMXConnectionWrapper.invoke() is a generic method that allows invoking any exposed method of an MBean.

Here is an example:

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
wrapper.connectAndWait(timeout);

// equivalent to JPPFNodeState state = wrapper.state();
JPPFNodeState state = (JPPFNodeState) wrapper.invoke(
  JPPFNodeAdminMBean.MBEAN_NAME, "state", (Object[]) null, (String[]) null);
int nbTasks = state.getNbTasksExecuted();
// get the total CPU time used
long cpuTime = (Long) wrapper.invoke(JPPFNodeTaskMonitorMBean.MBEAN_NAME,
  "getTotalTaskCpuTime", (Object[]) null, (String[]) null);

5.4 Use of an MBean proxy

A proxy is a dynamically created object that implements an interface specified at runtime.

The standard JMX API provides a way to create a proxy to a remote or local MBeans. This is done as follows:

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
wrapper.connectAndWait(timeout);

// create the proxy instance
JPPFNodeTaskMonitorMBean proxy = wrapper.getProxy(
  JPPFNodeTaskMonitorMBean.MBEAN_NAME, JPPFNodeTaskMonitorMBean.class);

// get the total CPU time used
long cpuTime = proxy.getTotalTaskCpuTime();

5.5 Subscribing to MBean notifications

We have seen that the task monitoring MBean represented by the JPPFNodeTaskMonitorMBean interface is able to emit notifications of type TaskExecutionNotification. There are 2 ways to subscribe to these notifications:

a. Using a proxy to the MBean

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
wrapper.connectAndWait(timeout);
JPPFNodeTaskMonitorMBean proxy = wrapper.getProxy(
  JPPFNodeTaskMonitorMBean.MBEAN_NAME, JPPFNodeTaskMonitorMBean.class);

// subscribe to all notifications from the MBean
proxy.addNotificationListener(myNotificationListener, null, null);

b. Using the MBeanServerConnection API

JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
wrapper.connectAndWait(timeout);
MBeanServerConnection mbsc = wrapper.getMbeanConnection();
ObjectName objectName = new ObjectName(JPPFNodeTaskMonitorMBean.MBEAN_NAME);

// subscribe to all notifications from the MBean
mbsc.addNotificationListener(objectName, myNotificationListener, null, null);

Here is an example notification listener implementing the NotificationListener interface:

// this class counts the number of tasks executed, along with
// the total cpu time and wall clock time used by the node
public class MyNotificationListener implements NotificationListener {
  AtomicInteger taskCount = new AtomicInteger(0);
  AtomicLong cpuTime = new AtomicLong(0L);
  AtomicLong elapsedTime = new AtomicLong(0L);

  // Handle an MBean notification
  public void handleNotification(Notification notification, Object handback) {
    TaskExecutionNotification jppfNotif = (TaskExecutionNotification) notification;
    TaskInformation info = jppfNotif.getTaskInformation();
    int n = taskCount.incrementAndGet();
    long cpu = cpuTime.addAndGet(info.getCpuTime());
    long elapsed = elapsedTime.addAndGet(info.getElapsedTime());
    // display the statistics for every 50 tasks executed
    if (n % 50 == 0) {
      System.out.println("nb tasks = " + n + ", cpu time = " + cpu
        + " ms, elapsed time = " + elapsed +" ms");
    }
  }
};

NotificationListener myNotificationListener = new MyNotificationListener();

6 Remote logging

It is possible to receive logging messages from a node as JMX notifications. Specific implementation are available for Log4j and JDK logging.

To configure Log4j for emitting JMX notifications, edit the log4j configuration files of the node and add the following:

### direct messages to the JMX Logger ###
log4j.appender.JMX = org.jppf.logging.log4j.JmxAppender
log4j.appender.JMX.layout = org.apache.log4j.PatternLayout
log4j.appender.JMX.layout.ConversionPattern = %d [%-5p][%c.%M(%L)]: %m\n

### set log levels - for more verbose logging change 'info' to 'debug' ###
log4j.rootLogger = INFO, JPPF, JMX

To configure the JDK logging to send JMX notifications, edit the JDK logging configuration file of the node and add the following:

# list of handlers
handlers = java.util.logging.FileHandler, org.jppf.logging.jdk.JmxHandler

# Write log messages as JMX notifications
org.jppf.logging.jdk.JmxHandler.level = FINEST
org.jppf.logging.jdk.JmxHandler.formatter = org.jppf.logging.jdk.JPPFLogFormatter


To receive the logging notifications from a remote application, you can use the following code:

// get a JMX connection to the node MBean server
JMXNodeConnectionWrapper jmxNode = new JMXNodeConnectionWrapper(host, port, secure);
jmxNode.connectAndWait(5000L);
// get a proxy to the MBean
JmxLogger nodeProxy = jmxNode.getProxy(JmxLogger.DEFAULT_MBEAN_NAME, JmxLogger.class);

// use a handback object so we know where the log messages come from
String source = "node   " + jmxNode.getHost() + ":" + jmxNode.getPort();
// subbscribe to all notifications from the MBean
NotificationListener listener = new MyLoggingHandler();
nodeProxy.addNotificationListener(listener, null, source);

// Logging notification listener that prints remote log messages
// to the console
public class MyLoggingHandler implements NotificationListener {
  // handle the logging notifications
  public void handleNotification(Notification notification, final Object handback) {
    String message = notification.getMessage();
    String toDisplay = handback.toString() + ": " + message;
    System.out.println(toDisplay);
  }
}
Main Page > Management and monitoring > Node management



JPPF Copyright © 2005-2019 JPPF.org Powered by MediaWiki