JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   Forums 

Submitting multiple jobs concurrently

From JPPF 5.1 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Concurrent jobs


In this section, we will present a number of ways to design an application, such that it can execute multiple jobs concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.

Base requirement: multiple connections

For a JPPF client to be able to process multiple jobs in parallel, it is mandatory that the client holds multiple connections, whether to a single server, multiple servers or any combination of these. The number of connections determines how many jobs can be sent concurently to a server. If only one connection is available, then only one job at a time will actually be processed by the JPPF grid. Other jobs submitted by the same client will remain in the client queue until the first job has completed.

Multiple connections can be obtained either statically from the JPPF client configuration, or dynamically using the connection pool APIs. In the rest of this section, we will simply assume this is done appropriately.

Job submissions from multiple threads

This pattern explores how concurrent jobs can be submiited by the same JPPFClient instance by multiple threads. In this pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread until the job completes:

public void multipleThreadsBlockingJobs() {
  // a pool of threads that will submit the jobs and execute their results
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try (JPPFClient client = new JPPFClient()) {
    // handles for later retrieval of the job submissions results
    List<Future<List<Task<?>>>> futures = new ArrayList<>();
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      // ... set attributes and add tasks ...
      // submit the job in a separate thread
      futures.add(executor.submit(new JobSubmitter(client, job)));
    }
    for (Future<List<Task<?>>> future: futures) {
      try {
        // wait until each job has completed and retrieve its results
        List<Task<?>> results = future.get();
        // ... process the job results ...
        processResults(results);
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
  executor.shutdown();
}

The class JobSubmitter is defined as follows:

public class JobSubmitter implements Callable<List<Task<?>>> {
  private final JPPFClient client;
  private final JPPFJob job;

  public JobSubmitter(JPPFClient client, JPPFJob job) {
    this.client = client;
    this.job = job;
  }

  @Override public List<Task<?>> call() throws Exception {
    // just submit the job
    return client.submitJob(job);
  }
}

Multiple non-blocking jobs from a single thread

Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of the previous pattern:

public void singleThreadNonBlockingJobs() {
  try (final JPPFClient client = new JPPFClient()) {
    // holds the submitted jobs for later retrieval of their results
    List<JPPFJob> jobs = new ArrayList<>();
    // submit the jobs without blocking the current thread
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      job.setBlocking(false);
      // ... set other attributes and add tasks ...
      jobs.add(job);
      client.submitJob(job); // non-blocking operation
     }
    // get and process the jobs results
    for (JPPFJob job: jobs) {
      // synchronize on each job's completion: this is a blocking operation
      List<Task<?>> results = job.awaitResults();
      processResults(results); // process the job results
    }
  } catch(Exception e) {
    e.printStackTrace();
  }
}

Fully asynchronous processing

Here, we use a JobListener to retrieve and process the results of the jobs. The only synchronization occurs in the main method, to await on the global completion of all jobs:

public void asynchronousNonBlockingJobs() {
  try (final JPPFClient client = new JPPFClient()) {
    int nbJobs = 4;
    // synchronization helper that tells us when all jobs have completed
    final CountDownLatch countDown = new CountDownLatch(nbJobs);
    for (int i=0; i<nbJobs; i++) {
      JPPFJob job = new JPPFJob();
      job.setBlocking(false);
      // results will be processed asynchronously within
      // the job listener's jobEnded() notifications
      job.addJobListener(new JobListenerAdapter() {
        @Override public void jobEnded(JobEvent event) {
          List<Task<?>> results = event.getJob().getAllResults();
          processResults(results); // process the job results
          // decrease the jobs count down
          // when the count reaches 0, countDown.await() will exit immediately
          countDown.countDown();
        }
      });
      // ... set other attributes, add tasks, submit the job ...
      client.submitJob(job);
    }
    // wait until all jobs are complete, i.e. until the count down reaches 0
    countDown.await();
  } catch(Exception e) {
    e.printStackTrace();
  }
}

Job streaming

Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite source of data. The main problem to overcome in this use case is when jobs are created much faster than they are executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.

Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will implement the Iterable interface:

public class JobProvider extends JobListenerAdapter
   implements Iterable<JPPFJob>, Iterator<JPPFJob> {
  private int concurrencyLimit;  // limit to the maximum number of concurrent jobs
  private int currentNbJobs = 0; // current count of concurrent jobs

  public JobProvider(int concurrencyLimit) {
    this.concurrencyLimit = concurrencyLimit;
  }

  // implementation of Iterator<JPPFJob>
  @Override public synchronized boolean hasNext() {
    boolean hasMoreJobs = false;
    // ... compute hasMoreJobs, e.g. check if there is any more data to read
    return hasMoreJobs;
  }

  @Override public synchronized JPPFJob next() {
    // wait until the number of running jobs is less than the concurrency limit
    while (currentNbJobs >= concurrencyLimit) {
      try {
        wait();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    return buildJob();
  }

  @Override public void remove() {
    throw new UnsupportedOperationException("remove() is not supported");
  }

  private synchronized JPPFJob buildJob() {
    JPPFJob job = new JPPFJob();
    // ... build the tasks by reading data from a file, a database, etc...
    // ... add the tasks to the job ...
    job.setBlocking(false);
    // add a listener to update the concurrent jobs count when the job ends
    job.addJobListener(this);
    // increase the count of concurrently running jobs
    currentNbJobs++;
    return job;
  }

  // implementation of JobListener
  @Override synchronized  public void jobEnded(JobEvent event) {
    processResults(event.getJob().getAllResults()); // process the job results
    // decrease the count of concurrently running jobs
    currentNbJobs--;
    // wake up the threads waiting in next()
    notifyAll();
  }

  // implementation of Iterable<JPPFJob>
  @Override public Iterator<JPPFJob> iterator() { return this; }

  private void processResults(List<Task<?>> results) { // ... }
}

Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create new jobs from its data source. It is also used to process the job results asynchronously.

Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:

public void jobStreaming() {
  try (JPPFClient client = new JPPFClient()) {
    // create the job provider with a limiting concurrency factor
    JobProvider jobProvider = new JobProvider(4);
    // build and submit the provided jobs until no more is available
    for (JPPFJob job: jobProvider) {
      client.submitJob(job);
    }
  } catch(Exception e) {
    e.printStackTrace();
  }
}

The AbstractJPPFJobStream helper class

Given the potential complexity of the job streaming pattern, we found it useful to provide a helper class which alleviates the work of a developer by implementing all the wiring and internal state transitions, such that the developers can solely focus on the specifics of the jobs they want to submit. The abstract class AbstractJPPFJobStream serves this purpose. It is defined as follows:

public abstract class AbstractJPPFJobStream extends JobListenerAdapter
  implements Iterable<JPPFJob>, Iterator<JPPFJob>, AutoCloseable {

  // Initialize this job provider with a concurrency limit
  public AbstractJPPFJobStream(final int concurrencyLimit)

  // Determine whether there is at least one more job in the stream
  // This method must be overriden in subclasses
  public abstract boolean hasNext()
  // Get the next job in the stream
  public synchronized JPPFJob next() throws NoSuchElementException
  // Create the next job in the stream, along with its tasks
  // This method must be overriden in subclasses and is called from next()
  protected abstract JPPFJob createNextJob()
  // This operation is not supported
  public void remove() throws UnsupportedOperationException

  // Update the state of this job stream and process the results of a job asynchronously
  public void jobEnded(final JobEvent event)
  // Callback invoked from jobEnded() when a job is complete
  // This method must be overriden in subclasses
  protected abstract void processResults(JPPFJob job)

  // implementation of Iterable<JPPFJob>
  public Iterator<JPPFJob> iterator()

  // Close this stream and release the underlying resources it uses
  // This method must be overriden in subclasses
 public abstract void close() throws Exception

  // Determine whether any job is still being executed
  public synchronized boolean hasPendingJob()
  // Get the number of executed jobs
  public synchronized int getJobCount()
  // Get the number of executed tasks
  public synchronized int getTaskCount()
}

This class is designed to be subclassed and to this effect, we have outlined the four abstract methods that must be overriden in any subclass. We can see how this will simplify the work of any implementation. Let's re-implement the previous example by subclassing AbstractJPPFJobStream:

public class JobProvider extends AbstractJPPFJobStream {
  public JobProvider(int concurrencyLimit) {
    super(concurrencyLimit);
  }

  @Override public synchronized boolean hasNext() {
    boolean hasMoreJobs = false;
    // ... compute hasMoreJobs, e.g. check if there is any more data to read
    return hasMoreJobs;
  }

  @Override protected JPPFJob createNextJob() {
    JPPFJob job = new JPPFJob();
    // ... build the tasks by reading data from a file, a database, etc...
    // ... add the tasks to the job and return it ...
    return job;
  }

  @Override protected void processResults(List<Task<?>> results) { // ... }

  @Override public void close() throws Exception {
    // close a file, database connection, etc...
  }
}

Dedicated sample

For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the dedicated Concurrent Jobs demo.

Main Page > Development guide > Concurrent jobs

JPPF Copyright © 2005-2017 JPPF.org Powered by MediaWiki