JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   Forums 

Submitting multiple jobs concurrently

From JPPF 4.2 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Concurrent jobs


In this section, we will present a number of ways to design an application, such that it can execute multiple jobs concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.

Base requirement: multiple connections

For a JPPF client to be able to process multiple jobs in parallel, it is mandatory that the client holds multiple connections, whether to a single server, multiple servers or any combination of these. The number of connections determines how many jobs can be sent concurently to a server. If only one connection is available, then only one job at a time will actually be processed by the JPPF grid. Other jobs submitted by the same client will remain in the client queue until the first job has completed.

Multiple connections can be obtained either statically from the JPPF client configuration, or dynamically using the connection pool APIs. In the rest of this section, we will simply assume this is done appropriately.

Job submissions from multiple threads

This pattern explores how concurrent jobs can be submiited by the same JPPFClient instance by multiple threads. In this pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread until the job completes:

public void multipleThreadsBlockingJobs() {
  // a pool of threads that will submit the jobs and execute their results
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try (JPPFClient client = new JPPFClient()) {
    // handles for later retrieval of the job submissions results
    List<Future<List<Task<?>>>> futures = new ArrayList<>();
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      // ... set attributes and add tasks ...
      // submit the job in a separate thread
      futures.add(executor.submit(new JobSubmitter(client, job)));
    }
    for (Future<List<Task<?>>> future: futures) {
      try {
        // wait until each job has completed and retrieve its results
        List<Task<?>> results = future.get();
        // ... process the job results ...
        processResults(results);
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
  executor.shutdown();
}

The class JobSubmitter is defined as follows:

public class JobSubmitter implements Callable<List<Task<?>>> {
  private final JPPFClient client;
  private final JPPFJob job;

  public JobSubmitter(JPPFClient client, JPPFJob job) {
    this.client = client;
    this.job = job;
  }

  @Override public List<Task<?>> call() throws Exception {
    // just submit the job
    return client.submitJob(job);
  }
}

Multiple non-blocking jobs from a single thread

Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of the previous pattern:

public void singleThreadNonBlockingJobs() {
  try (final JPPFClient client = new JPPFClient()) {
    // holds the submitted jobs for later retrieval of their results
    List<JPPFJob> jobs = new ArrayList<>();
    // submit the jobs without blocking the current thread
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      job.setBlocking(false);
      // ... set other attributes and add tasks ...
      jobs.add(job);
      client.submitJob(job); // non-blocking operation
     }
    // get and process the jobs results
    for (JPPFJob job: jobs) {
      // synchronize on each job's completion: this is a blocking operation
      List<Task<?>> results = job.awaitResults();
      processResults(results); // process the job results
    }
  } catch(Exception e) {
    e.printStackTrace();
  }
}

Fully asynchronous processing

Here, we use a JobListener to retrieve and process the results of the jobs. The only synchronization occurs in the main method, to await on the global completion of all jobs:

public void asynchronousNonBlockingJobs() {
  try (final JPPFClient client = new JPPFClient()) {
    int nbJobs = 4;
    // synchronization helper that tells us when all jobs have completed
    final CountDownLatch countDown = new CountDownLatch(nbJobs);
    for (int i=0; i<nbJobs; i++) {
      JPPFJob job = new JPPFJob();
      job.setBlocking(false);
      // results will be processed asynchronously within
      // the job listener's jobEnded() notifications
      job.addJobListener(new JobListenerAdapter() {
        @Override public void jobEnded(JobEvent event) {
          List<Task<?>> results = event.getJob().getAllResults();
          processResults(results); // process the job results
          // decrease the jobs count down
          // when the count reaches 0, countDown.await() will exit immediately
          countDown.countDown();
        }
      });
      // ... set other attributes, add tasks, submit the job ...
      client.submitJob(job);
    }
    // wait until all jobs are complete, i.e. until the count down reaches 0
    countDown.await();
  } catch(Exception e) {
    e.printStackTrace();
  }
}

Job streaming

Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite source of data. The main problem to overcome in this use case is when jobs are created much faster than they are executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.

Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will implement the Iterable interface:

public class JobProvider extends JobListenerAdapter
   implements Iterable<JPPFJob>, Iterator<JPPFJob> {
  private int concurrencyLimit;  // limit to the maximum number of concurrent jobs
  private int currentNbJobs = 0; // current count of concurrent jobs

  public JobProvider(int concurrencyLimit) {
    this.concurrencyLimit = concurrencyLimit;
  }

  // implementation of Iterator<JPPFJob>
  @Override public synchronized boolean hasNext() {
    boolean hasMoreJobs = false;
    // ... compute hasMoreJobs, e.g. check if there is any more data to read
    return hasMoreJobs;
  }

  @Override public synchronized JPPFJob next() {
    // wait until the number of running jobs is less than the concurrency limit
    while (currentNbJobs >= concurrencyLimit) {
      try {
        wait();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    return buildJob();
  }

  @Override public void remove() {
    throw new UnsupportedOperationException("remove() is not supported");
  }

  private synchronized JPPFJob buildJob() {
    JPPFJob job = new JPPFJob();
    // ... build the tasks by reading data from a file, a database, etc...
    // ... add the tasks to the job ...
    job.setBlocking(false);
    // add a listener to update the concurrent jobs count when the job ends
    job.addJobListener(this);
    // increase the count of concurrently running jobs
    currentNbJobs++;
    return job;
  }

  // implementation of JobListener
  @Override synchronized  public void jobEnded(JobEvent event) {
    processResults(event.getJob().getAllResults()); // process the job results
    // decrease the count of concurrently running jobs
    currentNbJobs--;
    // wake up the threads waiting in next()
    notifyAll();
  }

  // implementation of Iterable<JPPFJob>
  @Override public Iterator<JPPFJob> iterator() { return this; }

  private void processResults(List<Task<?>> results) { // ... }
}

Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create new jobs from its data source. It is also used to process the job results asynchronously.

Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:

public void jobStreaming() {
  try (JPPFClient client = new JPPFClient()) {
    // create the job provider with a limiting concurrency factor
    JobProvider jobProvider = new JobProvider(4);
    // build and submit the provided jobs until no more is available
    for (JPPFJob job: jobProvider) {
      client.submitJob(job);
    }
  } catch(Exception e) {
    e.printStackTrace();
  }
}

Dedicated sample

For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the dedicated Concurrent Jobs demo.

Main Page > Development guide > Concurrent jobs

JPPF Copyright © 2005-2017 JPPF.org Powered by MediaWiki