JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

JPPF Executor Services

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > JPPF Executor Services


1 Basic usage

JPPF 2.2 introduced a new API, that serves as an ExecutorService facade to the JPPF client API. This API consists in a simple class: JPPFExecutorService, implementing the interface java.util.concurrent.ExecutorService. A JPPFExecutorService is obtained via its constructor, to which a JPPFClient must be passed:

JPPFClient jppfClient = new JPPFClient();
ExecutorService executor = new JPPFExecutorService(jppfClient);

The behavior of the resulting executor will depend largely on the configuration of the JPPFClient and on which ExecutorService method you invoke to submit tasks. In effect, each time you invoke an invokeAll(...), invokeAny(...), submit(...) or execute(...) method of the executor, a new JPPFJob will be created and sent for execution on the grid. This means that, if the executor method you invoke only takes a single task, then a job with only one task will be sent to the JPPF server.

Here is an example use:

JPPFClient jppfClient = new JPPFClient();
ExecutorService executor = new JPPFExecutorService(jppfClient);

try {
  // submit a single task
  Runnable myTask = new MyRunnable(0);
  Future<?> future = executor.submit(myTask);
  // wait for the results
  future.get();
  // process the results
  ...

  // submit a list of tasks
  List<Runnable> myTaskList = new ArrayList<Runnable>;
  for (int i=0; i<10; i++) myTaskList.add(new MyRunnable(i));
  List<Future<?>> futureList = executor.invokeAll(myTaskList);
  // wait for the results
  for (Future<?> future: futureList) future.get();
  // process the results for the list of tasks
  ...
} finally {
  // clean up after use
  executor.shutdown();
  jppfClient.close();
}

// !!! it is important that this task is Serializable !!!
public static class MyRunnable implements Runnable, Serializable {
  private int id = 0;

  public MyRunnable(int id) {
    this.id = id;
  }

  public void run() {
    System.out.println("Running task id " + id);
  }
}

2 Batch modes

The executor's behavior can be modified by using one of the batch modes of the JPPFExecutorService. By batch mode, we mean the ability to group tasks into batches, in several different ways. This enables tasks to be sent together, even if they are submitted individually, and allows them to benefit from the parallel features inherent to JPPF. This will also dramatically improve the throughput of individual tasks sent via an executor service.


Using a batch size: specifying a batch size via the method JPPFExecutorService.setBatchSize(int limit) causes the executor to only send tasks when at least that number of tasks have been submitted. When using this mode, you must be cautious as to how many tasks you send via the executor: if you send less than the batch limit, these tasks will remain pending and un-executed. Sometimes, the executor will send more than the specified number of tasks in the same batch: this will happen in the case where one of the JPPFExecutorService.invokeXXX() method is called with n tasks, such that current batch size + n > limit. The behavior is to send all tasks included in the invokeXXX() call together.

Here is an example:

JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
// the executor will send jobs with at least 5 tasks each
executor.setBatchSize(5);
List<Future<?>> futures = new ArrayList<Future<?>>();
// we submit 10 = 2 * 5 tasks, this will cause the client to send 2 jobs
for (int i=0; i<10; i++) futures.add(executor.submit(new MyTask(i)));
for (Future<?> f: futures) f.get();


Using a batch timeout: this is done via the method JPPFExecutorService.setBatchTimeout(long timeout) and causes the executor to send the tasks at regular intervals, specified as the timeout. The timeout value is expressed in milliseconds. Once the timeout has expired, the counter is reset to zero. If no task has been submitted between two timeout expirations, then nothing happens.

Example:

JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
// the executor will send a job every second (if any task is submitted)
executor.setBatchTimeout(1000L);
List<Future<?>> futures = new ArrayList<Future<?>>();
// we submit 5 tasks
for (int i=0; i<5; i++) futures.add(executor.submit(new MyTask(i)));
// we wait 1.5 second, during that time a job with 5 tasks will be submitted
Thread.sleep(1500L);
// we submit 6 more tasks, they will be sent in a different job
for (int i=5; i<11; i++) futures.add(executor.submit(new MyTask(i)));
// here we get the results for tasks sent in 2 different jobs!
for (Future<?> f: futures) f.get();


Using both batch size and timeout: it is possible to use a combination of batch size and timeout. In this case, a job will be sent whenever the batch limit is reached or the timeout expires, whichever happens first. In any case, the timeout counter will be reset each time a job is sent. Using a timeout is also an efficient way to deal with the possible blocking behavior of the batch size mode. In this case, just use a timeout that is sufficently large for your needs.

Example:

JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
executor.setBatchTimeout(1000L);
executor.setBatchSize(5);
List<Future<?>> futures = new ArrayList<Future<?>>();
// we submit 3 tasks
for (int i=0; i<3; i++) futures.add(executor.submit(new MyTask(i)));
// we wait 1.5 second, during that time a job with 3 tasks will be submitted,
// even though the batch size is set to 5
Thread.sleep(1500L);
for (Future<?> f: futures) f.get();

3 Configuring jobs and tasks

There is a limitation in the JPPFExecutorService, in that if you use only the ExecutorService interface which it extends, it does not provide a way to use JPPF-specific features, such as job SLA, metadata or persistence, or task timeout, onTimeout() and onCancel().

To overcome this limitation without breaking the semantics of ExecutorSevice, JPPFExecutorService provides a way to specify the configuration of the jobs and tasks that will be submitted subsequently.

This can be done via the ExecutorServiceConfiguration interface, which can be accessed from a JPPFExecutorService instance via the following accessor methods:

// Get the configuration for this executor service
public ExecutorServiceConfiguration getConfiguration();

// Reset the configuration for this executor service to a blank state
public ExecutorServiceConfiguration resetConfiguration();

ExecutorServiceConfiguration provides the following API:

// Get the configuration to use for the jobs submitted by the executor service
JobConfiguration getJobConfiguration();

// Get the configuration to use for the tasks submitted by the executor service
TaskConfiguration getTaskConfiguration();

3.1 Job configuration

The JobConfiguration interface is defined as follows:

public interface JobConfiguration {
  // Get the service level agreement between the jobs and the server
  JobSLA getSLA();

  // Get the service level agreement between the jobs and the client
  JobClientSLA getClientSLA();

  // Get the user-defined metadata associated with the jobs
  JobMetadata getMetadata();

  // Get/set the persistence manager which enables saving and
  restoring the state of the jobs
  <T> JobPersistence<T> getPersistenceManager();
  <T> void setPersistenceManager(final JobPersistence<T> persistenceManager);

  // Get/set the job's data provider
  DataProvider getDataProvider();
  void setDataProvider(DataProvider dataProvider);

  // Add or remove a listener to/from the list of job listeners
  void addJobListener(JobListener listener);
  void removeJobListener(JobListener listener);

  // get all the class loaders added to this job configuration
  List<ClassLoader> getClassLoaders();
}

As we can see, this provides a way to set the properties normally available to JPPFJob instances, even though the jobs submiited by a JPPFExecutorService are not visible. Any change to the JobConfiguration will apply to the next job that will be submitted by the executor and all subsequent jobs.

Here is an example usage:

JPPFExecutorService executor = ...;
// get the executor ocnfiguration
ExecutorServiceConfiguration config = executor.getConfiguration();
// get the job configuration
JobConfiguration jobConfig = config.getJobConfiguration();
// set all jobs to expire after 5 seconds
jobConfig.getSLA().setJobExpirationSchedule(new JPPFSchedule(5000L));
// add a class loader that cnanot be computed from the tasks
jobConfig.getClassLoaders().add(myClassLoader);

3.2 Task configuration

The TaskConfiguration interface can be used to set JPPF-specific properties onto executor service tasks that do not implement Task. It is defined as follows:

public interface TaskConfiguration {
  // Get the delegate for the onCancel() method
  JPPFTaskCallback getOnCancelCallback();

  // Set the delegate for the onCancel() method
  void setOnCancelCallback(final JPPFTaskCallback cancelCallback);

  // Get the delegate for the onTimeout() method
  JPPFTaskCallback getOnTimeoutCallback();

  // Set the delegate for the onTimeout() method
  void setOnTimeoutCallback(final JPPFTaskCallback timeoutCallback);

  // Get the task timeout schedule
  JPPFSchedule getTimeoutSchedule();

  // Set the task timeout schedule
  void setTimeoutSchedule(final JPPFSchedule timeoutSchedule);
}

This API introduces the concept of a callback delegate, which is used in lieu of the “standard” Task callback methods, Task.onCancel() and Task.onTimeout(). This is done by providing an subclass of JPPFTaskCallback, which is defined as follows:

public abstract class JPPFTaskCallback implements Runnable, Serializable {
  // Get the task this callback is associated with
  public final JPPFTask getTask();
}

Here is a task configuration usage example:

JPPFExecutorService executor = ...;
// get the executor configuration
ExecutorServiceConfiguration config = executor.getConfiguration();
// get the task configuration
TaskConfiguration taskConfig = config.getTaskConfiguration();
// set the task to timeout after 5 seconds
taskConfig.setTimeoutSchedule(new JPPFSchedule(5000L));
// set the onTimeout() callback
taskConfig.setOnTimeoutCallback(new MyTaskCallback());

// A callback that sets a timeout message as the task result
static class MyTaskCallback extends JPPFTaskCallback {
  @Override
  public void run() {
    getTask().setResult("this task has timed out");
  }
}

4 JPPFCompletionService

The JDK package java.util.concurrent provides the interface CompletionService, which represents “a service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks”. The JDK also provides a concrete implementation with the class ExecutorCompletionService. Unfortunately, this class does not work with a JPPFExecutorService, as it was not designed with distributed execution in mind.

As a convenience, the JPPF API provides a specific implementation of CompletionService with the class JPPFCompletionService, which respects the contract and semantics defined by the CompletionService interface and which can be used as follows:

JPPFExecutorService executor = ...;
JPPFCompletionService<String> completion service =
  new JPPFCompletionService<String>(executor);
MyCallable<String> task = new MyCallable<String>();
Future<String> future = completionService.submit(task);

// ... later on ...
// block until a result is available
future = completionService.take();
String result = future.get();
Main Page > Development guide > JPPF Executor Services



JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki