JPPF Executor Services
From JPPF 6.2 Documentation
|
Main Page > Development guide > JPPF Executor Services |
1 Basic usage
JPPF 2.2 introduced a new API, that serves as an ExecutorService facade to the JPPF client API. This API consists in a simple class: JPPFExecutorService, implementing the interface java.util.concurrent.ExecutorService. A JPPFExecutorService is obtained via its constructor, to which a JPPFClient must be passed:
JPPFClient jppfClient = new JPPFClient(); ExecutorService executor = new JPPFExecutorService(jppfClient);
The behavior of the resulting executor will depend largely on the configuration of the JPPFClient and on which ExecutorService method you invoke to submit tasks. In effect, each time you invoke an invokeAll(...), invokeAny(...), submit(...) or execute(...) method of the executor, a new JPPFJob will be created and sent for execution on the grid. This means that, if the executor method you invoke only takes a single task, then a job with only one task will be sent to the JPPF server.
Here is an example use:
JPPFClient jppfClient = new JPPFClient(); ExecutorService executor = new JPPFExecutorService(jppfClient); try { // submit a single task Runnable myTask = new MyRunnable(0); Future<?> future = executor.submit(myTask); // wait for the results future.get(); // process the results ... // submit a list of tasks List<Runnable> myTaskList = new ArrayList<Runnable>; for (int i=0; i<10; i++) myTaskList.add(new MyRunnable(i)); List<Future<?>> futureList = executor.invokeAll(myTaskList); // wait for the results for (Future<?> future: futureList) future.get(); // process the results for the list of tasks ... } finally { // clean up after use executor.shutdown(); jppfClient.close(); } // !!! it is important that this task is Serializable !!! public static class MyRunnable implements Runnable, Serializable { private int id = 0; public MyRunnable(int id) { this.id = id; } public void run() { System.out.println("Running task id " + id); } }
2 Batch modes
The executor's behavior can be modified by using one of the batch modes of the JPPFExecutorService. By batch mode, we mean the ability to group tasks into batches, in several different ways. This enables tasks to be sent together, even if they are submitted individually, and allows them to benefit from the parallel features inherent to JPPF. This will also dramatically improve the throughput of individual tasks sent via an executor service.
Using a batch size: specifying a batch size via the method JPPFExecutorService.setBatchSize(int limit) causes the executor to only send tasks when at least that number of tasks have been submitted.
When using this mode, you must be cautious as to how many tasks you send via the executor: if you send less than the batch limit, these tasks will remain pending and un-executed.
Sometimes, the executor will send more than the specified number of tasks in the same batch: this will happen in the case where one of the JPPFExecutorService.invokeXXX() method is called with n tasks, such that current batch size + n > limit.
The behavior is to send all tasks included in the invokeXXX() call together.
Here is an example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient); // the executor will send jobs with at least 5 tasks each executor.setBatchSize(5); List<Future<?>> futures = new ArrayList<Future<?>>(); // we submit 10 = 2 * 5 tasks, this will cause the client to send 2 jobs for (int i=0; i<10; i++) futures.add(executor.submit(new MyTask(i))); for (Future<?> f: futures) f.get();
Using a batch timeout: this is done via the method JPPFExecutorService.setBatchTimeout(long timeout) and causes the executor to send the tasks at regular intervals, specified as the timeout. The timeout value is expressed in milliseconds.
Once the timeout has expired, the counter is reset to zero. If no task has been submitted between two timeout expirations, then nothing happens.
Example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient); // the executor will send a job every second (if any task is submitted) executor.setBatchTimeout(1000L); List<Future<?>> futures = new ArrayList<Future<?>>(); // we submit 5 tasks for (int i=0; i<5; i++) futures.add(executor.submit(new MyTask(i))); // we wait 1.5 second, during that time a job with 5 tasks will be submitted Thread.sleep(1500L); // we submit 6 more tasks, they will be sent in a different job for (int i=5; i<11; i++) futures.add(executor.submit(new MyTask(i))); // here we get the results for tasks sent in 2 different jobs! for (Future<?> f: futures) f.get();
Using both batch size and timeout: it is possible to use a combination of batch size and timeout. In this case, a job will be sent whenever the batch limit is reached or the timeout expires, whichever happens first.
In any case, the timeout counter will be reset each time a job is sent. Using a timeout is also an efficient way to deal with the possible blocking behavior of the batch size mode. In this case, just use a timeout that is sufficently large for your needs.
Example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient); executor.setBatchTimeout(1000L); executor.setBatchSize(5); List<Future<?>> futures = new ArrayList<Future<?>>(); // we submit 3 tasks for (int i=0; i<3; i++) futures.add(executor.submit(new MyTask(i))); // we wait 1.5 second, during that time a job with 3 tasks will be submitted, // even though the batch size is set to 5 Thread.sleep(1500L); for (Future<?> f: futures) f.get();
3 Configuring jobs and tasks
There is a limitation in the JPPFExecutorService, in that if you use only the ExecutorService interface which it extends, it does not provide a way to use JPPF-specific features, such as job SLA, metadata or persistence, or task timeout, onTimeout() and onCancel().
To overcome this limitation without breaking the semantics of ExecutorSevice, JPPFExecutorService provides a way to specify the configuration of the jobs and tasks that will be submitted subsequently.
This can be done via the ExecutorServiceConfiguration interface, which can be accessed from a JPPFExecutorService instance via the following accessor methods:
// Get the configuration for this executor service public ExecutorServiceConfiguration getConfiguration(); // Reset the configuration for this executor service to a blank state public ExecutorServiceConfiguration resetConfiguration();
ExecutorServiceConfiguration provides the following API:
// Get the configuration to use for the jobs submitted by the executor service JobConfiguration getJobConfiguration(); // Get the configuration to use for the tasks submitted by the executor service TaskConfiguration getTaskConfiguration();
3.1 Job configuration
The JobConfiguration interface is defined as follows:
public interface JobConfiguration { // Get the service level agreement between the jobs and the server JobSLA getSLA(); // Get the service level agreement between the jobs and the client JobClientSLA getClientSLA(); // Get the user-defined metadata associated with the jobs JobMetadata getMetadata(); // Get/set the persistence manager which enables saving and restoring the state of the jobs <T> JobPersistence<T> getPersistenceManager(); <T> void setPersistenceManager(final JobPersistence<T> persistenceManager); // Get/set the job's data provider DataProvider getDataProvider(); void setDataProvider(DataProvider dataProvider); // Add or remove a listener to/from the list of job listeners void addJobListener(JobListener listener); void removeJobListener(JobListener listener); // get all the class loaders added to this job configuration List<ClassLoader> getClassLoaders(); }
As we can see, this provides a way to set the properties normally available to JPPFJob instances, even though the jobs submiited by a JPPFExecutorService are not visible. Any change to the JobConfiguration will apply to the next job that will be submitted by the executor and all subsequent jobs.
Here is an example usage:
JPPFExecutorService executor = ...; // get the executor ocnfiguration ExecutorServiceConfiguration config = executor.getConfiguration(); // get the job configuration JobConfiguration jobConfig = config.getJobConfiguration(); // set all jobs to expire after 5 seconds jobConfig.getSLA().setJobExpirationSchedule(new JPPFSchedule(5000L)); // add a class loader that cnanot be computed from the tasks jobConfig.getClassLoaders().add(myClassLoader);
3.2 Task configuration
The TaskConfiguration interface can be used to set JPPF-specific properties onto executor service tasks that do not implement Task. It is defined as follows:
public interface TaskConfiguration { // Get the delegate for the onCancel() method JPPFTaskCallback getOnCancelCallback(); // Set the delegate for the onCancel() method void setOnCancelCallback(final JPPFTaskCallback cancelCallback); // Get the delegate for the onTimeout() method JPPFTaskCallback getOnTimeoutCallback(); // Set the delegate for the onTimeout() method void setOnTimeoutCallback(final JPPFTaskCallback timeoutCallback); // Get the task timeout schedule JPPFSchedule getTimeoutSchedule(); // Set the task timeout schedule void setTimeoutSchedule(final JPPFSchedule timeoutSchedule); }
This API introduces the concept of a callback delegate, which is used in lieu of the “standard” Task callback methods, Task.onCancel() and Task.onTimeout(). This is done by providing an subclass of JPPFTaskCallback, which is defined as follows:
public abstract class JPPFTaskCallback implements Runnable, Serializable { // Get the task this callback is associated with public final JPPFTask getTask(); }
Here is a task configuration usage example:
JPPFExecutorService executor = ...; // get the executor configuration ExecutorServiceConfiguration config = executor.getConfiguration(); // get the task configuration TaskConfiguration taskConfig = config.getTaskConfiguration(); // set the task to timeout after 5 seconds taskConfig.setTimeoutSchedule(new JPPFSchedule(5000L)); // set the onTimeout() callback taskConfig.setOnTimeoutCallback(new MyTaskCallback()); // A callback that sets a timeout message as the task result static class MyTaskCallback extends JPPFTaskCallback { @Override public void run() { getTask().setResult("this task has timed out"); } }
4 JPPFCompletionService
The JDK package java.util.concurrent provides the interface CompletionService, which represents “a service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks”. The JDK also provides a concrete implementation with the class ExecutorCompletionService. Unfortunately, this class does not work with a JPPFExecutorService, as it was not designed with distributed execution in mind.
As a convenience, the JPPF API provides a specific implementation of CompletionService with the class JPPFCompletionService, which respects the contract and semantics defined by the CompletionService interface and which can be used as follows:
JPPFExecutorService executor = ...; JPPFCompletionService<String> completion service = new JPPFCompletionService<String>(executor); MyCallable<String> task = new MyCallable<String>(); Future<String> future = completionService.submit(task); // ... later on ... // block until a result is available future = completionService.take(); String result = future.get();
Main Page > Development guide > JPPF Executor Services |