Fork/Join thread pool in the nodes
From JPPF 6.2 Documentation
Main Page > Customizing JPPF > Fork/Join thread pool |
By default, JPPF nodes use a “standard” thread pool for executing tasks. This add-on allows the use of a fork/join thread pool instead of the standard one. This enables JPPF tasks to locally (in the node) spawn ForkJoinTask (or any of its subclasses) instances and have them processed as expected for a ForkJoinPool.
To use this add-on, you will need to deploy the jar file "ThreadManagerForkJoin.jar" to either the JPPF server's or node's classpath. If deployed in the server's classpath, it will be available to all nodes.
The next step is to configure each node for use of the fork/join thread pool. This is achieved by adding the following property to the node's configuration:
jppf.thread.manager.class = org.jppf.server.node.fj.ThreadManagerForkJoin
Here is an example usage, which computes the number of occurrences of each word in a set of documents:
public class WordCountTask extends JPPFTask { // a list of docuuments to process private final List<String> documents; public WordCountTask(final List<String> documents) { this.documents = documents; } @Override public void run() { List<Map<String, Integer>> results = new ArrayList<>(); // compute word counts in each document if (ForkJoinTask.inForkJoinPool()) { List<ForkJoinTask<Map<String, Integer>>> tasks = new ArrayList<>(); // fork one new task per document for (String doc: documents) tasks.add(new MyForkJoinTask(doc).fork()); // wait until all forked tasks have completed (i.e. join) for (ForkJoinTask<Map<String, Integer>> task: tasks) results.add(task.join()); } else { // if not in FJ pool, process documents sequentially for (String doc: documents) results.add(new MyForkJoinTask(doc).compute()); } // merge the results of all documents Map<String, Integer> globalResult = new HashMap<>(); for (Map<String, Integer> map: results) { for (Map.Entry<String, Integer> entry: map.entrySet()) { Integer n = globalResult.get(entry.getKey()); if (n == null) globalResult.put(entry.getKey(), entry.getValue()); else globalResult.put(entry.getKey(), n + entry.getValue()); } } // set the merged word counts as this task's result this.setResult(globalResult); } }
We can see here that the execution strategy depends on the result of calling ForkJoinTask.inForkJoinPool(): if we determine that a fork/join pool is available, then a new task is forked for each document, and thus executed asynchronously. The execution is then synchronized by joining each forked task. Otherwise, the documents are processed sequentially.
In this example, our fork/join task is defined as follows:
public class MyForkJoinTask extends RecursiveTask<Map<String, Integer>> { // remove spaces and non-word characters private static Pattern pattern = Pattern.compile("\\s|\\W"); private final String document; public MyForkJoinTask(final String document) { this.document = document; } @Override // return a mapping of each word to its number of occurrences public Map<String, Integer> compute() { Map<String, Integer> result = new HashMap<>(); // split the document into individual words String[] words = pattern.split(document); // count the number of occurrences of each word in the document for (String word: words) { Integer n = result.get(w); result.put(word, (n == null) ? 1 : n+1); } return result; } }
Related sample: the fork/join thread pool add-on of the JPPF distribution provides a more sophisticated example, taking full advantage of the fork/join features in Java 7.
Main Page > Customizing JPPF > Fork/Join thread pool |