* JPPF.
 * Copyright (C) 2005-2018 JPPF Team.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.jppf.example.loadbalancer.client;

import java.util.List;

import org.jppf.client.*;
import org.jppf.example.loadbalancer.common.MyCustomPolicy;
import org.jppf.node.policy.*;
import org.jppf.node.protocol.*;

 * This is a fully commented job runner for the Custom Load Balancer sample.
 * @author Laurent Cohen
public class CustomLoadBalancerRunner {
   * The JPPF client, handles all communications with the server.
   * It is recommended to only use one JPPF client per JVM, so it
   * should generally be created and used as a singleton.
  private static JPPFClient jppfClient = null;
   * Value representing one kilobyte.
  private static int KB = 1024;
   * Value representing one megabyte.
  private static int MB = 1024 * KB;

   * The entry point for this application runner to be run from a Java command line.
   * @param args by default, we do not use the command line arguments, however nothing prevents us from using them if need be.
  public static void main(final String... args) {
    try {
      // create the JPPFClient. This constructor call causes JPPF to read the configuration file
      // and connect with one or multiple JPPF drivers.
      jppfClient = new JPPFClient();

      // create a runner instance.
      final CustomLoadBalancerRunner runner = new CustomLoadBalancerRunner();

      // Create a "heavy" job
      // The is the maximum memory footprint of a task in the job
      final int taskFootprint = 20 * MB;
      // We want 40 MB available for each heavy task running concurrently.
      // This is not easily doable with a standard execution policy, so we create a custom one.
      // We use double the task footprint because it will take approximately twice the memory footprint
      // when each task is serialized or deserialized in the node (serialized data + the object itself).
      final ExecutionPolicy heavyPolicy = new MyCustomPolicy("" + (2 * taskFootprint));
      // Tasks in the job will have 20 MB data size, will last at most 5 seconds,
      // and the maximum execution time for a set of tasks will be no more than 60 seconds.
      // With 4 tasks and the node's heap set to 64 MB, the load-balancer will be forced to split the job in 2 at least.
      final JPPFJob heavyJob = runner.createJob("Heavy Job", 4, taskFootprint, 5L * 1000L, 60L * 1000L, heavyPolicy);
      // This job has long-running tasks, so we can submit it and create the second job while it is executing.

      // Create a "light" job
      // We want at least 2 light tasks executing concurrently in a node, to mitigate the network overhead.
      final ExecutionPolicy lightPolicy = new AtLeast("jppf.processing.threads", 2);
      // Tasks in the job will have 10 KB data size, will last at most 80 milliseconds,
      // and the maximum execution time for a set of tasks will be no more than 3 seconds.
      // Here the allowed time will be the limiting factor for the number of tasks that can be sent to a node,
      // so the if the node has 4 threads, the job should be split in 2: one set of 150 tasks, then one set of 50 (approximately).
      // (total time = 200 * 80 / 4)
      final JPPFJob lightJob = runner.createJob("Light Job", 200, 10 * KB, 80L, 3L * 1000L, lightPolicy);
      // Submit the light job.

      // now we obtain and process the results - this will cause a lot of logging to the console!
    } catch (final Exception e) {
    } finally {
      if (jppfClient != null) jppfClient.close();

   * Create a JPPF job that can be submitted for execution.
   * @param jobName the name (or id) assigned to the job.
   * @param nbTasks the number of tasks to add to the job.
   * @param size the data size of each task, in bytes.
   * @param duration the duration of each tasks, in milliseconds.
   * @param allowedTime the maximum execution time for a set of tasks on a node.
   * @param policy execution policy assigned to the job.
   * @return an instance of the {@link JPPFJob} class.
   * @throws Exception if an error occurs while creating the job or adding tasks.
  public JPPFJob createJob(final String jobName, final int nbTasks, final int size, final long duration, final long allowedTime, final ExecutionPolicy policy) throws Exception {
    // Create a JPPF job.
    final JPPFJob job = new JPPFJob();

    // Give this job a readable unique id that we can use to monitor and manage it.

    // Specify the job metadata.
    final JobMetadata metadata = job.getMetadata();
    metadata.setParameter("task.memory", "" + size);
    metadata.setParameter("task.time", "" + duration);
    metadata.setParameter("allowed.time", "" + allowedTime);
    metadata.setParameter("id", jobName);

    // Add the tasks to the job.
    for (int i = 1; i <= nbTasks; i++) {
      // create a task with the specified data size and duration
      final Task<?> task = new CustomLoadBalancerTask(size, duration);
      // task id contains the job name for easier identification
      task.setId(jobName + " - task " + i);

    // Assign an execution policy to the job.

    // Set the job in non-blocking (asynchronous) mode.

    return job;

   * Collect and process the execution results of a job.
   * @param job the JPPF job to process.
   * @throws Exception if an error occurs while processing the results.
  public void processJobResults(final JPPFJob job) throws Exception {
    // Print a banner to visually separate the results for each job.
    System.out.println("\n********** Results for job : " + job.getName() + " **********\n");

    // We are now ready to get the results of the job execution.
    // We use JPPFJob.awaitResults() for this. This method returns immediately with the
    // results if the job has completed, otherwise it waits until the job execution is complete.
    final List<Task<?>> results = job.awaitResults();

    // Process the results
    for (Task<?> task : results) {
      // If the task execution resulted in an exception, display the stack trace.
      if (task.getThrowable() != null) {
        // process the exception here ...
        final StringWriter sw = new StringWriter();
        task.getThrowable().printStackTrace(new PrintWriter(sw));
        System.out.println("Exception occurred while executing task " + task.getId() + ":");
      } else {
        // Display the task result.
        System.out.println("Result for task " + task.getId() + " : " + task.getResult());