JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Dealing with jobs

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Dealing with jobs


A job is a grouping of tasks with a common set of characteristics and a common SLA. These characteristics include:

  • common data shared between tasks (data provider)
  • A common Service Level Agreement (SLA) comprising:
    • the job priority
    • the maximum number of nodes a job can be executed on
    • an optional execution policy describing which nodes it can run on
    • a suspended indicator, that enables submitting a job in suspended state, waiting for an external command to resume or start its execution
    • an execution start date and time
    • an expiration (timeout) date and time
    • an indicator specifying what the server should do when the application is disconnected
  • a blocking/non-blocking indicator, specifying whether the job execution is synchronous or asynchronous from the application's point of view
  • a listener to receive notifications of completed tasks when running in non-blocking mode
  • the ability to receive notifications when the job execution starts and completes
  • a persistence manager, to store the job state during execution, and recover its latest saved state on demand, in particular after an application crash

In the JPPF API, a job is represented by the class JPPFJob. In addition to accessors and mutators for the attributes we have seen above, JPPFJob provides methods to add tasks and a set of constructors that the make creation of jobs easier.

1 Creating a job

To create a job, the JPPFJob has a single no-arg constructor, which generates a unique universal identifier for the job:

public class JPPFJob extends AbstractJPPFJob
                     implements Iterable<Task<?>>, Future<Task<?>> {
  // creates a job with default values for its attributes
  public JPPFJob()

  // get the UUID of this job
  public String getUuid()
}

The job UUID is automatically generated as a pseudo-random string of hexadecimal characters in the standard 8-4-4-12 format. It can then be obtained with the job's getUuid() method.

Important note: the role of the job UUID is critical, since it is used to distinguish the job from potentially many others in all JPPF grid topologies. It is also used in most job management and monitoring operations.

Each job also has a name, which can be used to identify a job in a human-readable way. When a job is created, its name is set to the job UUID. It can later be changed or obtained with the following accessors:

public class JPPFJob extends AbstractJPPFJob
                     implements Iterable<Task<?>>, Future<Task<?>> {
  // get the name of this job
  public String getName()

  // assign a name to this job
  public void setName(String name)
}

Note that the job's name is displayed in the "job data" view of the JPPF graphical administration console.

2 Adding tasks to a job

As we have seen in section 3.1 about the various forms of tasks that we can use in JPPF, JPPFJob provides two methods to add tasks to a job.

2.1 Adding a standard JPPF task

public Task<?> add(Task<?> task) throws JPPFException

The return value is the task provided as a parameter. If the task has already been added to the job, then adding it a second time has no effect.

If the task is an instance of TaskNode (if it has dependencies), then all the tasks in its graph of dependencies will also be added if they are not present. This ensures that the job will not be submitted with an incomplete graph that wouild prevent its completion.

2.2 Adding an annotated, Runnable or Callable task

public Task<?> add(Object taskObject, Object...args) throws JPPFException

The taskObject parameter can be one of the following:

  • an instance of Task
  • an instance of a class with a non-static public method annotated with @JPPFRunnable
  • a Class object representing a class that has a public static method or a constructor annotated with @JPPFRunnable
  • an instance of a a Runnable class
  • an instance of a Callable class

The args parameter is optional and is only used to pass the arguments of a method or constructor annotated with @JPPFRunnable. It is ignored for all other forms of tasks.

The return value is an instance of Task, regardless the type of task that is added. In the case of an annotated, Runnable or Callable task, the original task object, wrapped by this Task, can be retrieved using the method AbstractTask.getTaskObject(), as in the following example:

Task<?> task = job.add(new MyRunnableTask());
MyRunnableTask runnableTask = (MyRunnableTask) task.getTaskObject();

As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.

2.3 Adding a POJO task

public Task<?> add(String method, Object taskObject, Object...args)
  throws JPPFException

The method parameter is the name of the method or of the constructor to execute as the entry point of the task. In the case of a constructor, it must be the same as the name of the class.

The taskObject parameter can be one of the following:

  • an instance of the POJO class if the entry point is a non-static method
  • a Class object representing a POJO class that has a public static method or a constructor as entry point

The args parameter is optional and is used to pass the arguments of a method or constructor defined as the task's entry point.

As for the other form of this method, the return value is a Task, and the original task object can be retrieved using the method Task.getTaskObject(), as in the following example:

Task<?> task = job.add("myMethod", new MyPOJO(), 3, "string");
MyPOJO pojo = (MyPOJO) task.getTaskObject();
// we can also set a timeout on the wrapper
task.setTimeoutSchedule(new JPPFSchedule(5000L));

As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.

3 Inspecting the tasks of a job

JPPFJob provides two ways to get and inspect its tasks: one way is to call the method getJobTasks() to obtain the list of tasks, the other is to take advantage of JPPFJob implementing Iterable<Task<?>>.

For example, the following two ways to iterate over the tasks in a job are equivalent:

JPPFJob myJob = ...;
 
// get the list of tasks in the job and iterate over them
for (Task<?> task: myJob.getJobTasks()) {
  // do something ...
}

// iterate over the job directly
for (Task<?> task: myJob) {
  // do something ...
}

4 Job submission

Jobs are submitted with the JPPFClient API, which we will explore in more details later on in this manual. Job submission can be performed either synchronously, using the JPPFClient.submit(JPPFJob) method, or asynchronously, using the JPPFClient.submitAsync(JPPFJob) method.

JPPFClient client = ...;
// a new job is blocking by default
JPPFJob blockingJob = new JPPFJob();
blockingJob.add(new MyTask()):
// blocks until the job has completed
List<Task<?>> results = client.submit(blockingJob);

JPPFJob nonBlockingJob = new JPPFJob();
nonBlockingJob.add(new MyTask()):
// returns immediately, without blocking the current thread
client.submitAsync(nonBlockingJob);
// ... later on, collect the results
List<Task<?>> results2 = nonBlockingJob.awaitResults();

5 Job execution results

JPPFJob provides the following methods to explore and obtain the execution results of its tasks:

public class JPPFJob extends AbstractJPPFJob
                     implements Iterable<Task<?>>, Future<Task<?>> {
  // Get the count of the tasks in this job that have completed
  public int executedTaskCount()
  // Get the count of the tasks in this job that haven't yet been executed
  public int unexecutedTaskCount()
  // Wait until all execution results of the tasks in this job have been collected
  public List<Task<?>> awaitResults()
  // Wait until all execution results of the tasks in this job have been collected
  // or the timeout expires, whichever happens first
  public List<Task<?>> awaitResults(final long timeout)
  // Get the list of currently available task execution results
  public List<Task<?>> getAllResults()
  // Get the execution status of this job
  public JobStatus getStatus()
  // determine whether this job was cancelled
  public boolean isCancelled()
  // determine whether this job has completed normally or was cancelled
  public boolean isDone()
  // wait until the job is done
  public List<Task<?>> get() throws InterruptedException, ExecutionException
  // wait until the job is done or the timeout expires, whichever happens first
  public List<Task<?>> get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException
}

Note that the awaitResults() methods will block until the job has completed, or the timeout expires if any is specified. If the timeout expires, an incomplete list of results will be returned. By contrast, getAllResults() will return immediately with a partial list of task execution results, possibly empty if no result was received yet.

The getStatus() method returns an indication of the job's completion status, as one of the values defined in the JobStatus enum:

public enum JobStatus {
  // The job was just submitted
  SUBMITTED,
  // The job is currently in the submission queue (on the client side)
  PENDING,
  // The job is being executed
  EXECUTING,
  // The job execution is complete
  COMPLETE,
  // The job execution has failed
  FAILED
}

A notable difference between the awaitResults(long) and get(long, TimeUnit) methods is that the get(...) method will throw a TimeoutException whenever the specified timeout expires before the job completes. Other than that, awaitResults(timeout) is equivalent to get(timeout, TimeUnit.MILLISECONDS).

6 Cancelling a job

Cancelling a job can be performed with the cancel() and cancel(boolean mayInterruptIfRunning) methods of JPPFJob. The mayInterruptIfRunning flag specifies whether the job can be cancelled while it is being executed: if the flag is true and the job is executing, then it will not be cancelled and the cancel(...) method will return false.

Note that the return value of isCancelled() will reflect the cancelled state of the job, but only if it was cancelled within the scope of the JPPF client application: with the JPPFClient API, or JPPFJob.cancel(boolean), or as the result of setting an expiration schedule in the job's client SLA.

Example usage:

try (JPPFClient client = new JPPFClient()) {
  // create and populate the job
  JPPFJob job = new JPPFJob().setName("myJob");
  job.add(new MyTask());

  // submit the job asynchronously
  client.submitAsync(job);
  
  ... 

  // cancel the job
  job.cancel(true);  

  // get and process the job results
  List<Task<?>> results = job.awaitResult();
  ...
}
Main Page > Development guide > Dealing with jobs

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki