JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Receiving the status of tasks dispatched to or returned from the nodes

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Customizing JPPF > Status of job tasks


1 Implementation

Each time a set of tasks is dispatched to, or returns from a node, the driver emits a notification which encapsulates information about the set of tasks, alongs with detailed information on each of the tasks it contains. To receive these notifications, you must write a listener which implements the JobTasksListener interface, defined as:

public interface JobTasksListener extends EventListener {
  // Called when tasks from a job are dispatched to a node
  void tasksDispatched(JobTasksEvent event);
  // Called when tasks from a job return from the node
  void tasksReturned(JobTasksEvent event);
  // Called when tasks from a job return from the node
  void resultsReceived(JobTasksEvent event);
}

Please also note that the methods of JobTasksListener will be called even if the client that submitted the job is disconnected, for any reason. Combined with the ability to process the tasks results, this enables a callback implemented on the server side, which can be used with a "submit and forget" strategy on the client side.


As we can see, all the methods receive events of type JobTasksEvent, defined as follows:

public class JobTasksEvent extends EventObject {
  // Get the uuid of the job to which the tasks belong
  public String getJobUuid()
  // Get the name of the job to which the tasks belong
  public String getJobName()
  // Get the list of tasks that were dispatched or returned
  public List<ServerTaskInformation> getTasks()
  // Get the job's server-side SLA
  public JobSLA getJobSLA()
  // Get the job metadata
  public JobMetadata getJobMEtadata()
  // Get the reason why the set of tasks was returned by a node
  public JobReturnReason getReturnReason()
  // Get the information on the node where the tasks were dispatched or returned
  public JPPFManagementInfo getNodeInfo()
}

The method getReturnReason() provides a high-level indication of why the tasks were returned, among the possible reasons defined in the JobReturnReason enum:

public enum JobReturnReason {
  // The tasks were normally processed by the node
  RESULTS_RECEIVED,
  // The processing of the tasks took longer than the specified dispatch timeout
  DISPATCH_TIMEOUT,
  // An error occurred in the node which prevented the normal execution of the tasks
  NODE_PROCESSING_ERROR,
  // An error occurred in the driver while processing the results returned by the node
  DRIVER_PROCESSING_ERROR,
  // The connection between node and server was severed before results could be returned
  NODE_CHANNEL_ERROR
}

The job return reason is only available for tasksReturned() and resultsReceived() notifications. Therefore, for a tasksDispatched() notification, getReturnReason() always returns null. Also please note that for a resultsReceived() notification the return reason is always RESULTS_RECEIVED.


The method getTasks() returns a list of ServerTaskInformation objects, providing details on individual tasks:

public class ServerTaskInformation implements Serializable {
  // Get the position of this task within the job submitted by the client
  public int getJobPosition()
  // Get the throwable raised during the processing of the task
  public Throwable getThrowable()
  // Get the number of times a dispatch of the task has expired
  public int getExpirationCount()
  // Get the maximum number of times the task can be resubmitted
  public int getMaxResubmits()
  // Get the number of times the task was resubmitted
  public int getResubmitCount()
  // Get the task result as binary data
  public InputStream getResultAsStream() throws Exception
  // Get the  task result as a JPPF Task object
  public Task<?> getResultAsTask() throws Exception
}

Important: When using the getResultAsTask() method, you must ensure that the code of the task and all its dependencies are in the driver's classpath. This is because this method actually deserializes the task from the binary format in which it is stored in the driver.


Deserializing a task from a stream: in scenarios where, instead of directly deserializing the task, you wish to copy its binary form for later processing by a separate processs, you will need a way to deserialize it that takes into account the configured serialization scheme and eventual composite serialization. This can be easily achieved with the JPPFSerialization.Factory API, as in this example:

InputStream taskInputStream = ...;
JPPFSerialization serialization = JPPFSerialization.Factory.getSerialization();
Task<?> task = (Task<?>) serialization.deserialize(taskInputStream);

The following code is an example listener which prints out its events to the console:

public class MyJobTasksListener implements JobTasksListener {
  public MyJobTasksListener() {
    System.out.println("in MyJobTasksListener()"); // displayed at driver startup time
  }

  @Override public void tasksDispatched(TaskReturnEvent event) {
    printEvent(event);
  }

  @Override public void tasksReturned(TaskReturnEvent event) {
    printEvent(event);
  }

  @Override
  public void resultsReceived(TaskReturnEvent event) {
    printEvent(event);
  }

  private void printEvent(JobTasksEvent event) {
    System.out.printf("listener event: name=%s, uuid=%s, reason=%s, node=%s, %d tasks%n",
      event.getJobName(), event.getJobUuid(), event.getReturnReason(),
      event.getNodeInfo(), event.getTasks().size());
  }
}

and here is an example output:

listener event: name=my job, uuid=job_uuid, reason=RESULTS_RECEIVED,
node=JPPFManagementInfo[192.168.1.24:12001, type=node|MASTER, local=false, secure=false,
uuid=node_uuid], 5 tasks

2 Deployment / Integration

There are two ways to integrate a JobTasksListener with the JPPF driver:

2.1 Using the JPPFDriver API

This can be done by calling the method getJobTasksListenerManager() on the JPPFDriver instance provided by another driver plugin, add-on or extension. For instance, as in this driver startup class:

public class MyDriverStartup implements JPPFDriverStartupSPI {
  JPPFDriver driver;

  @Override
  public void run() {
    // get the object which manages task return listeners
    JobTasksListenerManager manager = this.driver.getJobTasksListenerManager();
    // register a new TaskReturnListener
    manager.addJobTasksListener(new JobTasksListener() {
      @Override
      public void tasksDispatched(JobTasksEvent event) { ... }
      @Override
      public void tasksReturned(JobTasksEvent event) { ... }
      @Override
      public void resultsReceived(JobTasksEvent event) { ... }
    });
  }

  // JPPF will call this method before run()
  public void setDriver(JPPFDriver driver) {
    this.driver = driver;
  }
}
Important note: you will need to have the jppf-server-x.y.z.jar file in your build path for this code to compile.

2.2 Using the Service Provider Interface (SPI)

To register the JobTasksListener implementation as a standalone service, create a service definition file named "org.jppf.job.JobTasksListener" in the "META-INF/services" directory. In this file, add, for each of your listener implementations, a line containing the fully qualified name of the implementation class. For instance, if we defined two implementations MyJobTasksListener1 and MyJobTasksListener2 in the test1 and test2 packages, respectively, then the service definiton file should contain:

test1.MyJobTasksListener1
test2.MyJobTasksListener2

Note that, to work with the SPI, each implementation must have a no-args constructor, whether implicit or explicit.

Main Page > Customizing JPPF > Status of job tasks



JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki