JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Fork/Join thread pool in the nodes

From JPPF 6.2 Documentation

Jump to: navigation, search
Main Page > Customizing JPPF > Fork/Join thread pool


By default, JPPF nodes use a “standard” thread pool for executing tasks. This add-on allows the use of a fork/join thread pool instead of the standard one. This enables JPPF tasks to locally (in the node) spawn ForkJoinTask (or any of its subclasses) instances and have them processed as expected for a ForkJoinPool.

To use this add-on, you will need to deploy the jar file "ThreadManagerForkJoin.jar" to either the JPPF server's or node's classpath. If deployed in the server's classpath, it will be available to all nodes.

The next step is to configure each node for use of the fork/join thread pool. This is achieved by adding the following property to the node's configuration:

jppf.thread.manager.class = org.jppf.server.node.fj.ThreadManagerForkJoin

Here is an example usage, which computes the number of occurrences of each word in a set of documents:

public class WordCountTask extends JPPFTask {
  // a list of docuuments to process
  private final List<String> documents;

  public WordCountTask(final List<String> documents) {
    this.documents = documents;
  }

  @Override
  public void run() {
    List<Map<String, Integer>> results = new ArrayList<>();
    // compute word counts in each document
    if (ForkJoinTask.inForkJoinPool()) {
      List<ForkJoinTask<Map<String, Integer>>> tasks = new ArrayList<>();
      // fork one new task per document
      for (String doc: documents) tasks.add(new MyForkJoinTask(doc).fork());
      // wait until all forked tasks have completed (i.e. join)
      for (ForkJoinTask<Map<String, Integer>> task: tasks) results.add(task.join());
    } else {
      // if not in FJ pool, process documents sequentially
      for (String doc: documents) results.add(new MyForkJoinTask(doc).compute());
    }
    // merge the results of all documents
    Map<String, Integer> globalResult = new HashMap<>();
    for (Map<String, Integer> map: results) {
      for (Map.Entry<String, Integer> entry: map.entrySet()) {
        Integer n = globalResult.get(entry.getKey());
        if (n == null) globalResult.put(entry.getKey(), entry.getValue());
        else globalResult.put(entry.getKey(), n + entry.getValue());
      }
    }
    // set the merged word counts as this task's result
    this.setResult(globalResult);
  }
}

We can see here that the execution strategy depends on the result of calling ForkJoinTask.inForkJoinPool(): if we determine that a fork/join pool is available, then a new task is forked for each document, and thus executed asynchronously. The execution is then synchronized by joining each forked task. Otherwise, the documents are processed sequentially.

In this example, our fork/join task is defined as follows:

public class MyForkJoinTask extends RecursiveTask<Map<String, Integer>> {
  // remove spaces and non-word characters
  private static Pattern pattern = Pattern.compile("\\s|\\W");
  private final String document;

  public MyForkJoinTask(final String document) {
    this.document = document;
  }

  @Override
  // return a mapping of each word to its number of occurrences
  public Map<String, Integer> compute() {
    Map<String, Integer> result = new HashMap<>();
    // split the document into individual words
    String[] words = pattern.split(document);
    // count the number of occurrences of each word in the document
    for (String word: words) {
      Integer n = result.get(w);
      result.put(word, (n == null) ? 1 : n+1);
    }
    return result;
  }
}

Related sample: the fork/join thread pool add-on of the JPPF distribution provides a more sophisticated example, taking full advantage of the fork/join features in Java 7.


Main Page > Customizing JPPF > Fork/Join thread pool



JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki