JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Node management and monitoring

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Management and monitoring > Node management


1 Node state

The management and monitoring of the state and life cycle of a JPPF node is performed with an MBean named “org.jppf:name=admin,type=node”, which is also the value of the constant JPPFNodeAdminMBean.MBEAN_NAME. This MBean exposes the JPPFNodeAdminMBean interface, which provides the functionalities described hereafter.

Reminnder: JMXNodeConnectionWrapper also implements JPPFNodeAdminMBean.

1.1 Getting a snapshot of the node's state

This is done by invoking the following method on the MBean:

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Get the latest state information from the node
  public JPPFNodeState state() throws Exception;
}

This method returns a JPPFNodeState object, which provides the following information on the node:

public class JPPFNodeState implements Serializable {
  // the status of the connection with the server
  public String getConnectionStatus()

  // the current tasks execution status
  public String getExecutionStatus()

  // the cpu time consumed by the node's execution threads
  public long getCpuTime()

  // the total number of tasks executed
  public int getNbTasksExecuted()

  // the current size of the pool of threads used for tasks execution
  public int getThreadPoolSize()

  // the current priority assigned to the execution threads
  public int getThreadPriority()
}

Here is an example that retrieves and prints information about a node's state:

try (JMXNodeConnectionWrapper node = new JMXNodeConnectionWrapper("mynode.org", 12001)) {
  if (node.connectAndWait(5000L)) {
    // retrieve the state from the remote node
    JPPFNodeState state = node.state();
    // print the execution status
    switch (state.getExecutionStatus()) {
      case EXECUTING:
        System.out.println("the node is currently executing tasks");
        break;

      case IDLE:
        System.out.println("the node is currently idle");
        break;
    }
    // print the total number of executed tasks
    System.out.println("the node has already executed " + state.getNbTasksExecuted() + " tasks");
  }
}

1.2 Shutting down and restarting the node

Node can be shut down or restarted with the follwoing methods in JPPFNodeAdminMBean:

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Restart the node immediately
  public void restart() throws Exception;
  // Restart the node once it is idle
  public void restart(Boolean interruptIfRunning) throws Exception;
  // Shutdown the node immediately.
  public void shutdown() throws Exception;
  // Shutdown the node once it is idle
  public void shutdown(Boolean interruptIfRunning) throws Exception;
  // Determine wether a deffered shutdown or restart was requested and not yet performed
  public NodePendingAction pendingAction();
  // Cancel a previous deferred shutdown or restart request, if any
  public void cancelPendingAction();
}

These methods should be used with precautions. Please note that, once shutdown() has been invoked, it is not possible anymore to restart the node remotely.

Calling restart() or shutdown() is equivalent to calling restart(true) or shutdown(true), respectively. When any of these methods is invoked without the interruptIfRunning flag, or when the flag's value is true, the tasks that were being executed, if any, are automatically resubmitted to the server queue.

When the interruptIfRunning parameter is false, the node will wait until no more tasks are being executed before restarting or shutting down. Thus, interruptIfRunning = false indicates a deferred operation request.

It is possible to query whether a deferred action has been requested with the pendingAction() method, which returns a PendingNodeAction enum element, defined as follows:

public enum NodePendingAction {
  // There is no pending action
  NONE,
  // A deferred shutdown was requested
  SHUTDOWN,
  // A deferred restart was requested
  RESTART;
}

Finally, a deferred action can be cancelled with cancelPendingAction(), provided the action hasn't yet started.

1.3 Updating the execution thread pool properties

Each JPPF node maintains a pool of threads for executing the tasks it receives. The size of this pool, along with the priority of the threads it contains, can be dynamically updated with the following methods:

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Set the size of the node's execution thread pool.
  public void updateThreadPoolSize(Integer size) throws Exception;
  // Update the priority of all execution threads.
  public void updateThreadsPriority(Integer newPriority) throws Exception;
}

Example usage:

JMXNodeConnectionWrapper node = ..;
JPPFNodeState state = node.state();
// if the thread pool has 2 or less threads, increase its size
if (state.getThreadPoolSize() <= 2)
  node.updateThreadPoolSize(4);

1.4 Updating the executed tasks counter

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Reset the node's executed tasks counter to zero.
  public void resetTaskCounter() throws Exception;
  // Reset the node's executed tasks counter to the specified value.
  public void setTaskCounter(Integer n) throws Exception;
}

Please note that resetTaskCounter() is equivalent to setTaskCounter(0).

1.5 Getting information about the node's host and JVM

public interface JPPFAdminMBean extends Serializable {
  // Get detailed information about the node's JVM properties, environment variables,
  // memory usage, available processors andavailable storage space.
  JPPFSystemInformation systemInformation() throws Exception;
}

This method returns an object of type JPPFSystemInformation, which is a snapshot of the environment of the JPPF node, the JVM and the host they run on. The properties defined in this object are also those used by execution policies, and are fully detailed in the execution policy properties reference.

JPPFSystemInformation provides information about different aspects of the environment:

public class JPPFSystemInformation implements Serializable {
  // get the system properties
  public TypedProperties getSystem()
  // get runtime information about JVM memory and available processors
  public TypedProperties getRuntime()
  // get the host environment variables
  public TypedProperties getEnv()
  // get IPV4 and IPV6 addresses assigned to the host
  public TypedProperties getNetwork()
  // get the JPPF configuration properties
  public TypedProperties getJppf()
  // get information on available disk storage
  public TypedProperties getStorage()
}

Each of the methods in JPPFSystemInformation returns a TypedProperties object, which is a subclass of the standard java.util.Properties that provides convenience methods to read property values as primitive types other than String.

The following example retieves the number of processors available to the node and sets its thread pool size accordingly:

JMXNodeConnectionWrapper node = ...;
JPPFSystemInformation systemInfo = node.systemInformation();
int nbProcessors = systemInfo.getRuntime().getInt("availableProcessors");
node.updateThreadPoolSize(nbProcessors);

1.6 Cancelling a job

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Cancel the job with the specified uuid. The requeue parameters determines
  // whether the job should be requeued on the server side or not.
  public void cancelJob(String jobUuid, Boolean requeue) throws Exception;
}

This MBean method is used to cancel a job currently running in the node. The job is identified by its UUID. The requeue parameter is used to notify the server that the cancelled job should be requeued on the driver and executed again, possibly on an other node. If requeue is false, the job execution in the node is simply terminated and any remaining task will not be executed.

This method should normally only be used by the JPPF driver, in the case where a user requested that the server terminates a job. In effect, a job can contain several tasks, with each task potentially executed concurrently on a separate node. When the server receives a job termination request, it will handle the termination of “job dispatches” (i.e. subsets of the tasks in the job) by notifying each corresponding node.

1.7 Updating the node's configuration properties

public interface JPPFNodeAdminMBean extends JPPFAdminMBean {
  // Update the configuration properties of the node and optionally restart the node.
  void updateConfiguration(Map<String, String> config, Boolean restart) throws Exception;
  void updateConfiguration(Map<String, String> config, Boolean restart,
     Boolean interrupIfRunning) throws Exception;
}

These methods send a set of configuration properties to the node, that will override those defined in the node's configuration file. he restart parameter will allow the node to take the changes into account, especially in the case where the server connection or discovery properties have been changed, for instance to force the node to connect to another server, or when you need to restart the node with a different JVM by specifying the jppf.java.path property.

Note that, when the restart parameter is true, the updateConfiguration() method will call restart() or restart(interruptIfRunning) after storing the configuration updates, depending on which method overload is invoked initially.

Finally, the interruptIfRunning parameter specifies whether the node should be restarted initially, or wait until it no longer has any task to execute.

The following example updates the node's JVM options if its maximum heap size is less than 2 GB:

JMXNodeConnectionWrapper node = ...;
// 2 gigabytes
long _2GB = 2L * 1024L * 1024L * 1024L;
JPPFSystemInformation systemInfo = node.systemInformation();
// retrieve the mex heap size
long heapSize = systemInfo.getRuntime().getLong("maxMemory");
// if the heap size is less than 2 GB, restart the node with new JVM options
if (heapSize < _2GB) {
  TypedProperties configUpdate = new TypedProperties()
    .set(JPPFProperties.JVM_OPTIONS, "-Xmx2g -Xms512m -Dconfig.updated=true");
  // restart the node with the configuration updates,
  // but wait until all tasks currently executing have completed
  node.updateConfiguration(configUpdate, true, false);
}

2 Tasks execution management and monitoring

The management and monitoring of the execution of the tasks in a node is performed with an MBean named “org.jppf:name=task.monitor,type=node”, which is also the value of the JPPFNodeTaskMonitorMBean.MBEAN_NAME constant. This MBean monitors the task activity within a node. It exposes the interface JPPFNodeTaskMonitorMBean and also emits JMX notifications of type TaskExecutionNotification.

2.1 Snapshot of the tasks activity

The interface JPPFNodeTaskMonitorMBean provides access to aggregated statistics on the tasks executed in a node:

public interface JPPFNodeTaskMonitorMBean extends NotificationEmitter {
  // The total number of tasks executed by the node
  Integer getTotalTasksExecuted();
  // The total number of tasks that ended in error
  Integer getTotalTasksInError();
  // The total number of tasks that executed sucessfully
  Integer getTotalTasksSucessfull();
  // The total cpu time used by the tasks in milliseconds
  Long getTotalTaskCpuTime();
  // The total elapsed time used by the tasks in milliseconds
  Long getTotalTaskElapsedTime();
}

2.2 Notification of tasks execution

JPPFNodeTaskMonitorMBean emits JMX notifications of type TaskExecutionNotification in two situations:

  • each time a task completes its execution in a node, the task monitor MBean will automatically notify listeners of the task completion:
  • any number of user notifications can also be emitted from a task, by invoking the Task.fireNotification(Object, boolean) method

TaskExecutionNotification is defined as follows:

public class TaskExecutionNotification extends Notification {
  // Get the object encapsulating information about the task
  public TaskInformation getTaskInformation();
  // Whether this is a user-defined notification sent from a task
  public boolean isUserNotification();
}

This notification encapsulates an object of type TaskInformation, which provides the following information about each executed task:

public class TaskInformation implements Serializable {
  // Get the task id
  public String getId()
  // Get the uuid of the job this task belongs to
  public String getJobId()
  // Get the cpu time used by the task
  public long getCpuTime()
  // Get the wall clock time used by the task
  public long getElapsedTime()
  // Determines whether the task had an exception
  public boolean hasError()
  // Get the timestamp for the task completion. Caution: this value is related
  // to the node's system time, not to the time of the notification receiver
  public long getTimestamp()
  // Get the position of the task in the job to which it belongs
  public int getJobPosition()
}

TaskExecutionNotification also inherits the method getUserData(), which returns the object specified by the user code when calling Task.fireNotification(Object, boolean) with the second parameter set to true.

Additionally, the method isUserNotification() allows you to unambiguously distinguish between user-defined notifications, sent via Task.fireNotification(Object, boolean), and task completion notifications automatcially sent by the JPPF nodes.

For example, consider this task, which emits a JMX notification as soon as it starts executing:

public static class MyTask extends AbstractTask<String> {
  @Override
  public void run() {
    fireNotification("starting MyTask", true);
    setResult("Hello, world");
  }
}

The following code registers a notification listener that will receive and process the notifications emitted from the task:

JMXNodeConnectionWrapper node = ...;
// get a proxy to the remote JPPFNodeTaskMonitorMBean
JPPFNodeTaskMonitorMBean taskMonitor = node.getNodeTaskMonitor();
// define and register a task notification listener
NotificationListener listener = (notification, handback) -> {
  TaskExecutionNotification taskNotif = (TaskExecutionNotification) notification;
  // print a message based on whether the nofitication is sent by the user or by JPPF
  if (taskNotif.isUserNotification()) {
    System.out.println("task started: " + taskNotif.getUserData());
  } else {
    System.out.println("task copmleted: " + taskNotif.getTaskInformation().toString());
  }
};
taskMonitor.addNotificationListener(listener, null, null);

3 Node maintenance

MBean name : “org.jppf:name=node.maintenance,type=node”. This is also the value of the constant JPPFNodeMaintenanceMBean.MBEAN_NAME

This MBean provides operations for the maintenance of a node. It exposes the interface JPPFNodeMaintenanceMBean defined as follows:

public interface JPPFNodeMaintenanceMBean extends Serializable {
  // object name for this MBean
  String MBEAN_NAME = "org.jppf:name=node.maintenance,type=node";
 
  // request a reset of the resource caches of all the JPPF class loaders
  // maintained by the node
  void requestResourceCacheReset() throws Exception;
}

Please note that requestResourceCacheReset() does not perform the reset immediately. It sets an internal flag, and the reset will take place when it is safe to do so, as part of the node's life cycle. The outcome of the reset operation is that the temporary files created by the JPPF class loaders wil be deleted, freeing space in the temporary files folder.

4 Node provisioning

Any JPPF node has the ability to start new nodes on the same physical or virtual machine, and stop and monitor these nodes afterwards. This provides a node provisioning facility, which allows to dynamically grow or shrink a JPPF grid based on the workload requirements.

This provisioning ability establishes a master/slave relationship between a standard node (master) and the nodes that it starts (slaves). Please note that a slave node cannot be in turn used as a master. Apart from this restriction, slave nodes can be managed and monitored as any oher node.

See also: the documentation section dedicated to node provisioning.

4.1 Node provisioning MBean

Node provisioning is implemented with a dedicated Mbean, which exposes the JPPFNodeProvisioningMBean interface. The MBean is named “org.jppf:name=provisioning,type=node”, which is also the value of the constant JPPFNodeProvisioningMBean.MBEAN_NAME.

JPPFNodeProvisioningMBean is defined as follows:

public interface JPPFNodeProvisioningMBean extends Serializable, NotficationEmitter {
  // The object name of this MBean
  String MBEAN_NAME = "org.jppf:name=provisioning,type=node";
  // Constant for notifications that a slave node has started
  String SLAVE_STARTED_NOTIFICATION_TYPE = "slave_started";
  // Constant for notifications that a slave node has stopped
  String SLAVE_STOPPED_NOTIFICATION_TYPE = "slave_stopped";

  // Get the number of slave nodes started by this MBean
  int getNbSlaves();
 
  // Start or stop the required number of slaves to reach the specified number,
  // with an interrupt flag set to true
  void provisionSlaveNodes(int nbNodes);
  // Same action, explicitely specifiying the interrupt flag
  void provisionSlaveNodes(int nbNodes, boolean interruptIfRunning);
  // Start or stop the required number of slaves to reach the specified number,
  // using the specified configuration overrides and an interrupt flag set to true
  void provisionSlaveNodes(int nbNodes, TypedProperties configOverrides);
  // Same action, explicitely specifying the 'interrupt if runnning' flag
  void provisionSlaveNodes(int nbNodes, boolean interrupt, TypedProperties configOverrides);
}

The method provisionSlaveNodes(int) will start or stop a number of slave nodes, according to how many slaves are already started. For instance, if 4 slaves are already running and provisionSlaveNodes(2) is invoked, then 2 slaves will be stopped. Inversely, if no slave is running, then 2 slave nodes will be started.

The method provisionSlaveNodes(int, TypedProperties) behaves differently, unless the second argument is null. Since the argument of type TypedProperties specifies overrides of the slaves configuration, this means that all already running slaves must be restarted to take these configuration changes into account. Thus, this method will first stop all running slaves, then start the specified number of slaves, after applying the configuration overrides.

When the interruptIfRunning flag is set to false, it will cause slave nodes to stop only once they are idle, that is they will not stop immediately if they are executing tasks at the time the provisioning request is made. This flag is true by default, in the provisionSlaveNodes() methods that do not specify it. Therefore, the provisionSlaveNodes(n), provisionSlaveNodes(n, null), provisionSlaveNodes(n, true) and provisiongSlaveNodes(n, true, null) methods all have the same effect.

The following example shows how to start new slave nodes with a single processing thread each:

// connect to the node's JMX server
JMXNodeConnectionWrapper node = new JMXNodeConnectionWrapper(host, port, false);
// create a provisioning proxy instance
JPPFNodeProvisioningMBean provisioner = node.getNodeProvisioner();
// set the configuration with a single task processing thread
// and start 2 slaves with this configuration override
provisioner.provisionSlaveNodes(2, new TypedProperties().setInt("jppf.processing.threads", 1));
// check the number of slave nodes
int nbSlaves = provisioner.getNbSlaves();
// or, using jmxNode directly get it as a JMX attribute
nbSlaves = (Integer) node.getAttribute(JPPFNodeProvisioningMBean.MBEAN_NAME, "NbSlaves");

4.2 Provisioning notifications

The node provisioning MBean also emits notfications when a slave node is started or stopped. These notifications are standard JMX Notification instances, with the following characteristics:

- the notification type, obtained with Notification.getType(), is one of these constants in JPPFNodeProvisioningMBean:

- the notification's user data, obtained with Notification.getUserData(), is a JPPFProvisioningInfo object, defined as:

public class JPPFProvisioningInfo implements Serializable {
  // Get the uuid of the master node that launched the slave
  public String getMasterUuid()
  // Get the id of the slave, relevant only to the master
  public int getSlaveId()
  // Get the slave node process exit code
  public int getExitCode()
  // Get the command line used to start the slave node process
  public List<String> getLaunchCommand()
}

Note that getExitCode() always returns -1 for slave started notifications.

In the following example, we define a NotificationListener which receives provisioning notifications and prints out the relevant information for each slave node that is started or stopped:

public class MyProvisioningListener implements NotificationListener {
  @Override
  public void handleNotification(Notification notif, Object handback) {
    switch(notif.getType()) {
      // case when a slave node is started
      case JPPFNodeProvisioningMBean.SLAVE_STARTED_NOTIFICATION_TYPE:
        System.out.println("slave node started: " + notif.getUserData());
        break;

      // case when a slave node is stopped
      case JPPFNodeProvisioningMBean.SLAVE_STOPPED_NOTIFICATION_TYPE:
        System.out.println("slave node stopped: " + notif.getUserData());
        break;
    }
  }
}

The notification listener can then be used as follows:

try (JMXNodeConnectionWrapper nodeJmx = new JMXNodeConnectionWrapper("localhost", 12001)) {
  if (nodeJmx.connectAndWait(5000L)) {
    JPPFNodeProvisioningMBean provisioning = nodeJmx.getJPPFNodeProvisioningProxy();
    // register a provisioning notification listener
    provisioning.addNotificationListener(new MyProvisioningListener(), null, null);
    // start 2 slave nodes and wait for the output of the notification listener
    provisioning.provisionSlaveNodes(2);
  }
}
Main Page > Management and monitoring > Node management

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki