Submitting multiple jobs concurrently
From JPPF 6.2 Documentation
|
Main Page > Development guide > Concurrent jobs |
In this section, we will present a number of ways to design an application, such that it can execute multiple jobs concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.
The patterns presented here all make the assumption that job submissions are performed through a single instance of JPPFClient. It is indeed the recommended way to work with JPPF, since it benefits the most from the built-in features of the JPPF client:
- thread safety
- ability to connect to multiple remote drivers, to the same driver multiple times, or any combination of these
- load-balancing between available connections
- ability to submit a job over multiple connections for increased performance
- fine-grained filtering of eligible connections for each job, via the job's client-side execution policy
- connection failover strategies defined via the connection pools priorities
1 Job submissions from multiple threads
This pattern explores how concurrent jobs can be submitted by the same JPPFClient instance by multiple threads. In this pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread until the job completes:
public void multipleThreadsBlockingJobs() { // a pool of threads that will submit the jobs and retrieve their results ExecutorService executor = Executors.newFixedThreadPool(4); try (JPPFClient client = new JPPFClient()) { // handles for later retrieval of the job submissions results List<Future<List<Task<?>>>> futures = new ArrayList<>(); for (int i=0; i<4; i++) { JPPFJob job = new JPPFJob(); // ... set attributes and add tasks ... // submit the job as a Callable in a separate thread futures.add(executor.submit(client::submit)); } futures.forEach(future -> { try { // wait until each job has completed and retrieve its results List<Task<?>> results = future.get(); // ... process the job results ... processResults(results); } catch (Exception e) { e.printStackTrace(); } }); } executor.shutdown(); }
2 Multiple non-blocking jobs from a single thread
Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of the previous pattern:
public void singleThreadNonBlockingJobs() { try (JPPFClient client = new JPPFClient()) { // holds the submitted jobs for later retrieval of their results List<JPPFJob> jobs = new ArrayList<>(); // submit the jobs without blocking the current thread for (int i=0; i<4; i++) { JPPFJob job = new JPPFJob(); // ... set other attributes and add tasks ... jobs.add(job); client.submitAsync(job); // non-blocking operation } // get and process the jobs results jobs.forEach(job -> { // synchronize on each job's completion: this is a blocking operation List<Task<?>> results = job.awaitResults(); processResults(results); // process the job results }); } catch(Exception e) { e.printStackTrace(); } }
3 Fully asynchronous processing
Here, we use a JobListener to retrieve and process the results of the jobs via jobs life cycle notifications. The only synchronization occurs in the main method, to await on the global completion of all jobs:
public void asynchronousNonBlockingJobs() { try (JPPFClient client = new JPPFClient()) { int nbJobs = 4; // synchronization helper that tells us when all jobs have completed final CountDownLatch countDown = new CountDownLatch(nbJobs); for (int i=0; i<nbJobs; i++) { JPPFJob job = new JPPFJob(); // results will be processed asynchronously in the job listener's jobEnded() notifications job.addJobListener(new JobListenerAdapter() { @Override public void jobEnded(JobEvent event) { List<Task<?>> results = event.getJob().getAllResults(); processResults(results); // process the job results // decrease the jobs count down; when it reaches 0, countDown.await() will exit countDown.countDown(); } }); // ... set other attributes, add tasks, submit the job ... client.submitAsync(job); } // wait until all jobs are complete, i.e. until the count down reaches 0 countDown.await(); } catch(Exception e) { e.printStackTrace(); } }
4 Job streaming
Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite source of data. The main problem to overcome in this use case is when jobs are created much faster than they are executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.
Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will implement the Iterable interface:
public class JobProvider extends JobListenerAdapter implements Iterable<JPPFJob>, Iterator<JPPFJob> { private int concurrencyLimit; // limit to the maximum number of concurrent jobs private int currentNbJobs = 0; // current count of concurrent jobs public JobProvider(int concurrencyLimit) { this.concurrencyLimit = concurrencyLimit; } // implementation of Iterator<JPPFJob> @Override public synchronized boolean hasNext() { boolean hasMoreJobs = false; // ... compute hasMoreJobs, e.g. check if there is any more data to read return hasMoreJobs; } @Override public synchronized JPPFJob next() { // wait until the number of running jobs is less than the concurrency limit while (currentNbJobs >= concurrencyLimit) { try { wait(); } catch (Exception e) { e.printStackTrace(); } } return buildJob(); } @Override public void remove() { throw new UnsupportedOperationException("remove() is not supported"); } private synchronized JPPFJob buildJob() { JPPFJob job = new JPPFJob(); // ... build the tasks by reading data from a file, a database, etc... // ... add the tasks to the job ... // add a listener to update the concurrent jobs count when the job ends job.addJobListener(this); // increase the count of concurrently running jobs currentNbJobs++; return job; } // implementation of JobListener @Override synchronized public void jobEnded(JobEvent event) { processResults(event.getJob().getAllResults()); // process the job results // decrease the count of concurrently running jobs currentNbJobs--; // wake up the threads waiting in next() notifyAll(); } // implementation of Iterable<JPPFJob> @Override public Iterator<JPPFJob> iterator() { return this; } private void processResults(List<Task<?>> results) { // ... } }
Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create new jobs from its data source. It is also used to process the job results asynchronously.
Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:
public void jobStreaming() { try (JPPFClient client = new JPPFClient()) { // create the job provider with a limiting concurrency factor JobProvider jobProvider = new JobProvider(4); // build and submit the provided jobs until no more is available jobProvider.forEach(client::submitAsync)); } catch(Exception e) { e.printStackTrace(); } }
4.1 The AbstractJPPFJobStream helper class
Given the potential complexity of the job streaming pattern, we found it useful to provide a helper class which alleviates the work of a developer by implementing all the wiring and internal state transitions, such that the developers can solely focus on the specifics of the jobs they want to submit. The abstract class AbstractJPPFJobStream serves this purpose. It is defined as follows:
public abstract class AbstractJPPFJobStream extends JobListenerAdapter implements Iterable<JPPFJob>, Iterator<JPPFJob>, AutoCloseable { // Initialize this job provider with a concurrency limit public AbstractJPPFJobStream(final int concurrencyLimit) // Determine whether there is at least one more job in the stream // This method must be overriden in subclasses public abstract boolean hasNext() // Get the next job in the stream public synchronized JPPFJob next() throws NoSuchElementException // Create the next job in the stream, along with its tasks // This method must be overriden in subclasses and is called from next() protected abstract JPPFJob createNextJob() // This operation is not supported public void remove() throws UnsupportedOperationException // Update the state of this job stream and process the results of a job asynchronously public void jobEnded(final JobEvent event) // Callback invoked from jobEnded() when a job is complete // This method must be overriden in subclasses protected abstract void processResults(JPPFJob job) // implementation of Iterable<JPPFJob> public Iterator<JPPFJob> iterator() // Close this stream and release the underlying resources it uses // This method must be overriden in subclasses public abstract void close() throws Exception // Determine whether any job is still being executed public synchronized boolean hasPendingJob() // Get the number of executed jobs public synchronized int getJobCount() // Get the number of executed tasks public synchronized int getTaskCount() }
This class is designed to be subclassed and to this effect, we have outlined the four abstract methods that must be overridden in any subclass. We can see how this will simplify the work of any implementation. Let's re-implement the previous example by subclassing AbstractJPPFJobStream:
public class JobProvider extends AbstractJPPFJobStream { public JobProvider(int concurrencyLimit) { super(concurrencyLimit); } @Override public synchronized boolean hasNext() { boolean hasMoreJobs = false; // ... compute hasMoreJobs, e.g. check if there is any more data to read return hasMoreJobs; } @Override protected JPPFJob createNextJob() { JPPFJob job = new JPPFJob(); // ... build the tasks by reading data from a file, a database, etc... // ... add the tasks to the job and return it ... return job; } @Override protected void processResults(List<Task<?>> results) { // ... } @Override public void close() throws Exception { // close a file, database connection, etc... } }
5 Dedicated sample
For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the dedicated Concurrent Jobs demo.
Main Page > Development guide > Concurrent jobs |