JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   Forums 

Job Service Level Agreement

From JPPF 4.2 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Job Service Level Agreement


A job service level agreement (SLA) defines the terms and conditions in which a job will be processed. A job carries two distinct SLAs, one which defines a contract between the job and the JPPF server, the other defining a different contract between the job and the JPPF client.


Server and client SLAs have common attributes, which specify:

  • the characteristics of the nodes it can run on (server side), or of the channels it can be sent through (client side): the job execution policy
  • the time at which a job is scheduled to start
  • an expiration date for the job


The attributes specific to the server side SLA are:

  • the priority of a job
  • whether it is submitted in suspended state
  • the maximum number of nodes it can run on
  • whether the job is a standard or broadcast job
  • whether the server should immediately cancel the job, if the client that submitted it is disconnected


The attributes specific to the client side SLA are:

  • the maximum number of channels it can be sent through


A job SLA is represented by the interface JobSLA for the server side SLA, and by the interface JobClientSLA for the client side SLA. It can be accessed from a job using the related getters and setters:

public class JPPFJob implements Serializable, JPPFDistributedJob {
  // The job's server-side SLA
  public JobSLA getSLA()
  public void setSLA(final JobSLA jobSLA)

  // The job's client-side SLA
  public JobClientSLA getClientSLA()
  public void setClientSLA(final JobClientSLA jobClientSLA)
}

Example usage:

JPPFJob myJob = new JPPFJob();
myJob.getClientSLA().setMaxChannels(2);
myJob.getSLA().setPriority(1000);

Also note that both interfaces extend the common interface JobCommonSLA. We will go into the details of these interfaces in the following sections.

Attributes common to server and client side SLAs

As seen previously, the common attributes for server and client side SLAs are defined by the JobCommonSLA interface:

public interface JobCommonSLA extends Serializable {
  // The execution policy
  ExecutionPolicy getExecutionPolicy();
  void setExecutionPolicy(ExecutionPolicy executionPolicy);

  // The job start schedule
  JPPFSchedule getJobSchedule();
  void setJobSchedule(JPPFSchedule jobSchedule);

  // The job expiration schedule
  JPPFSchedule getJobExpirationSchedule();
  void setJobExpirationSchedule(JPPFSchedule jobExpirationSchedule);
}

Execution policy

An execution policy is an object that determines whether a particular set of JPPF tasks can be executed on a JPPF node (for the server-side SLA) or if it can be sent via a communication channel (for the client-side). It does so by applying the set of rules (or tests) it is made of, against a set of properties associated with the node or channel.

For a fully detailed description of how to create and use execution policies, please read the Execution policies section of this development guide.

Example usage:

// define a non-trivial server-side execution policy:
// execute on nodes that have at least 2 threads and whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy serverPolicy = new AtLeast("processing.threads", 2).and(
  new Contains("ipv4.addresses", true, "192.168.1."));
// define a client-side execution policy:
// submit to the client local executor or to drivers whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy clientPolicy = new Equal("jppf.channel.local", true).or(
  new Contains("ipv4.addresses", true, "192.168.1."));
JPPFJob job = new JPPFJob();
// set the server-side policy
job.getSLA().setExecutionPolicy(serverPolicy);
// set the client-side policy
job.getClientSLA().setExecutionPolicy(clientPolicy);
// print an XML representation of the server-side policy
System.out.println("server policy is:\n" + job.getSLA().getExecutionPolicy());

Job start and expiration scheduling

It is possible to schedule a job for a later start, and also to set a job for expiration at a specified date/time. The job SLA allows this by providing the following methods:

// job start schedule
public JPPFSchedule getJobSchedule()
public void setJobSchedule(JPPFSchedule schedule)

// job expiration schedule
public JPPFSchedule getJobExpirationSchedule()
public void setJobExpirationSchedule(JPPFSchedule schedule)

As we can see, this is all about getting and setting an instance of JPPFSchedule. A schedule is normally defined through one of its constructors:

As a fixed length of time

public JPPFSchedule(long duration)

The semantics is that the job will start duration milliseconds after the job is received by the server. Here is an example:

JPPFJob myJob = new Job();
// set the job to start 5 seconds after being received
JPPFSchedule mySchedule = new JPPFSchedule(5000L);
myJob.getSLA().setJobSchedule(mySchedule);

As a specific date/time

public JPPFSchedule(String date, String dateFormat)

Here the date format is specified as a pattern for a SimpleDateFormat instance.

Here is an example use of this constructor:

JPPFJob myJob = new Job();
String dateFormat = "MM/dd/yyyy hh:mm a z";
// set the job to expire on September 30, 2010 at 12:08 PM in the CEDT time zone
JPPFSchedule schedule = new JPPFSchedule("09/30/2010 12:08 PM CEDT", dateFormat);
myJob.getSLA().setJobExpirationSchedule(mySchedule);

Server side SLA attributes

A server-side SLA is described by the JobSLA interface, defined as:

public interface JobSLA extends JobCommonSLA {
  // Job priority
  int getPriority();
  void setPriority(int priority);

  // Maximum number of nodes the job can run on
  int getMaxNodes();
  void setMaxNodes(int maxNodes);

  // Whether the job is initially suspended
  boolean isSuspended();
  void setSuspended(boolean suspended);

  // Whether the job is a broadcast job
  boolean isBroadcastJob();
  void setBroadcastJob(boolean broadcastJob);

  // Determine whether the job should be canceled by the server
  // if the client gets disconnected
  boolean isCancelUponClientDisconnect();
  void setCancelUponClientDisconnect(boolean cancelUponClientDisconnect);

  // expiration schedule for any subset of the job dispatched to a node
  JPPFSchedule getDispatchExpirationSchedule();
  void setDispatchExpirationSchedule(JPPFSchedule schedule);

  // number of times a dispatched task can expire before it is finally cancelled
  int getMaxDispatchExpirations();
  void setMaxDispatchExpirations(int max);

  // class path associated with the job
  ClassPath getClassPath();
  void setClassPath(ClassPath classpath);
}

Job priority

The priority of a job determines the order in which the job will be executed by the server. It can be any integer value, such that if jobA.getPriority() > jobB.getPriority() then jobA will be executed before jobB. There are situations where both jobs may be executed at the same time, for instance if there remain any available nodes for jobB after jobA has been dispatched. Two jobs with the same priority will have an equal share (as much as is possible) of the available grid nodes.

The priority attribute is also manageable, which means that it can be dynamically updated, while the job is still executing, using the JPPF administration console or the related management APIs. The default priority is zero.

Example usage:

// create a job with a non-default priority
JPPFJob job1 = new JPPFJob();
job1.getSLA().setPriority(10);
// create a second job with a slightly higher priority
JPPFJob job2 = new JPPFJob();
job2.getSLA().setPriority(job1.getSLA().getPriority() + 1);

Maximum number of nodes

The maximum number of nodes attribute determines how many grid nodes a job can run on, at any given time. This is an upper bound limit, and does not guarantee that always this number of nodes will be used, only that no more than this number of nodes will be assigned to the job. This attribute is also non-distinctive, in that it does not specify which nodes the job will run on. The default value of this attribute is equal to Integer.MAX_VALUE, i.e. 231-1.

The resulting assignment of nodes to the job is influenced by other attributes, especially the job priority and an eventual execution policy.

The maximum number of nodes is a manageable attribute, which means it can be dynamically updated, while the job is still executing, using the JPPF administration console or the related management APIs.

Example usage:

JPPFJob job = new JPPFJob();
// this job will execute on a maximum of 10 nodes
job.getSLA().setMaxNodes(10);

Initial suspended state

A job can be initially suspended. In this case, it will remain in the server's queue until it is explicitly resumed or canceled, or if it expires (if a timeout was set), whichever happens first. A job can be resumed and suspended again any number of times via the JPPF administration console or the related management APIs.

Example usage:

JPPFJob job = new JPPFJob();
// this job will be submitted to the server and will remain suspended until
// it is resumed or cancelled via the admin console or management APIs
job.getSLA().setSuspended(true);

Broadcast jobs

A broadcast job is a specific type of job, for which each task will be be executed on all the nodes currently present in the grid. This opens new possibilities for grid applications, such as performing maintenance operations on the nodes or drastically reducing the size of a job that performs identical tasks on each node.

With regards to the job SLA, a job is set in broadcast mode via a boolean indicator, for which the interface JobSLA provides the following accessors:

public boolean isBroadcastJob()
public void setBroadcastJob(boolean broadcastJob)

To set a job in broadcast mode:

JPPFJob myJob = new JPPFJob();
myJob.getSLA().setBroadcastJob(true);

With respect to the dynamic aspect of a JPPF grid, the following behavior is enforced:

  • a broadcast job is executed on all the nodes connected to the driver, at the time the job is received by the JPPF driver. This includes nodes that are executing another job at that time
  • if a node dies or disconnects while the job is executing on it, the job is canceled for this node
  • if a new node connects while the job is executing, the broadcast job will not execute on it
  • a broadcast job does not return any results, i.e. it returns the tasks in the same state as they were submitted

Additionally, if local execution of jobs is enabled for the JPPF client, a broadcast job will not be executed locally. In other words, a broadcast job is only executed on remote nodes.

Canceling a job upon client disconnection

By default, if the JPPF client is disconnected from the server while a job is executing, the server will automatically attempt to cancel the job's execution on all nodes it was dispatched to, and remove the job from the server queue. You may disable this behavior on a per-node basis, for example if you want to let the job execute until completion but do not need the execution results.

This property is set once for each job, and cannot be changed once the job has been submitted to the server, i.e. it is not dynamically manageable.

Example usage:

JPPFJob myJob = new JPPFJob();
myJob.getSLA().setCancelUponDisconnect(true);

Expiration of job dispatches

Definition: a job dispatch is the whole or part of a job that is dispatched by the server to a node.

The server-side job SLA enables specifying whether a job dispatch will expire, along with the behavior upon exipration. This is done with a combination of two attributes: a dispatch expiration schedule, which specifies when the dispatch will expire, and a maximum number of expirations after which the tasks in the dispatch will be cancelled instead of resubmitted. By default, a job dispatch will not expire and the number of expirations is set to zero (tasks are cancelled upon the first expiration, if any).

One possible use for this mechanism is to prevent resource-intensive tasks from bloating slow nodes, without having to cancel the whole job or set timeouts on inidividual tasks.

Example usage:

JPPFJob job = new JPPFJob();
// job dispatches will expire if they execute for more than 5 seconds
job.getSLA().setDispatchExpirationSchedule(new JPPFSchedule(5000L));
// dispatched tasks will be resubmitted at most 2 times before they are cancelled
job.getSLA().setMaxDispatchExpirations(2);

Setting a class path onto the job

The classpath attribute of the job SLA allows sending library files along with the job and its tasks. Out of the box, this attribute is only used by offline nodes, to work around the fact that offline nodes do no have remote class loading capabilities. The class path attribute, by default empty but not not null, is accessed with the following methods:

public interface JobClientSLA extends JobCommonSLA {
  // get / set the class path associated with the job
  ClassPath getClassPath();
  void setClassPath(ClassPath classpath);
}

We can see that a class path is represented by the ClassPath interface, defined as follows:

public interface ClassPath extends Serializable, Iterable<ClassPathElement> {
  // add an element to this classpath
  ClassPath add(ClassPathElement element);
  ClassPath add(String name, Location<?> location);
  ClassPath add(String name, Location<?> localLocation, Location<?> remoteLocation);

  // remove an element from this classpath
  ClassPath remove(ClassPathElement element);
  ClassPath remove(String name);

  // get an element with the specified name
  ClassPathElement element(String name);

  // get all the elements in this classpath
  Collection<ClassPathElement> allElements();

  // empty this classpath (remove all elements)
  ClassPath clear();

  // is this classpath empty?
  boolean isEmpty();

  // should the node force a reset of the class loader before executing the tasks?
  boolean isForceClassLoaderReset();
  void setForceClassLoaderReset(boolean forceReset);
}

Note that one of the add(...) methods uses a ClassPathElement as parameter, while the others use a name with one or two Location objects (see the Location API section). These methods are equivalent. For the last two, JPPF will internally create instances of a default implementation of ClassPathElement (class ClassPathElementImpl). It is preferred to avoid creating ClassPathElement instances, as it makes the code less cumbersome and independent from any specific implementation.

Also note that ClassPath implements Iterable<ClassPathElement>, so that it can be used in for loops:

for (ClassPathElement elt: myJob.getSLA().getClassPath()) ...;

The ClassPathElement interface is defined as follows:

public interface ClassPathElement extends Serializable {
  // get the name of this classpath element
  String getName();
 
  // get the local (to the client) location of this element
  Location<?> getLocalLocation();
 
  // get the remote (local to the node) location of this element, if any
  Location<?> getRemoteLocation();
 
  // perform a validation of this classpath element
  boolean validate();
}

JPPF provides a default implementation ClassPathElementImpl which does not perform any validation, that is, its validate() method always returns true.

Finally, here is an example of how this can all be put together:

JPPFJob myJob = new JPPFJob();
ClassPath classpath = myJob.getSLA().getClassPath();
// wrap a jar file into a FileLocation object
Location jarLocation = new FileLocation(“libs/MyLib.jar”);
// copy the jar file into memory
Location location = jarLocation.copyTo(new MemoryLocation(jarLocation.size());
// or another way to do this:
location = new MemoryLocation(jarLocation.toByteArray());
// add it as classpath element
classpath.add(“myLib”, location);
// the following is functionally equivalent:
classpath.add(new ClassPathElementImpl(“myLib”,  location));
// tell the node to reset the tasks classloader with this new class path
classpath.setForceClassLoaderReset(true);

Maximum number of tasks resubmits

As we have seen in the "resubmitting a task" section, tasks have the ability to schedule themselves for resubmission by the server. The job server-side SLA allows you to set the maximum number of times this can occur, with the following accessors:

public interface JobSLA extends JobCommonSLA {
  // get the naximum number of times a task can resubmit itself
  // via AbstractTask.setResubmit(boolean)
  int getMaxTaskResubmits();

  // set the naximum number of times a task can resubmit itself
  void setMaxTaskResubmits(int maxResubmits);
 
  // Determine whether the max resubmits limit for tasks is also applied
  // when tasks are resubmitted due to a node error
  boolean isApplyMaxResubmitsUponNodeError();

  // Specify whether the max resubmits limit for tasks should also be applied
  // when tasks are resubmitted due to a node error
  void setApplyMaxResubmitsUponNodeError(boolean applyMaxResubmitsUponNodeError);
}

The default value for the maxTaskResubmits attribute is 1, which means that by default a task can resubmit itself at most once. Additionally, this attribute can be overriden by setting the maxResubmits attribute of individual tasks.

The applyMaxResubmitsUponNodeError flag is set to false by default. This means that, when the tasks are resubmitted due to a node connection error, the resubmit will not count with regards to the limit. To change this behavior, setApplyMaxResubmitsUponNodeError(true) must be called explicitely.

Example usage:

public class MyTask extends AbstractTask<String> {
  @Override public void run() {
    // unconditional resubmit could lead to an infinite loop
    setResubmit(true);
    // the result will only be kept after the max number of resubmits is reached
    setResult("success");
  }
}

JPPFJob job = new JPPFJob();
job.add(new MyTask());
// tasks can be resubmitted 4 times, meaning they can execute up to 5 times total
job.getSLA().setMaxTaskResubmits(4);
// resubmits due to node errors are also counted
job.getSLA().setApplyMaxResubmitsUponNodeError(true);
// ... submit the job and get the results ...

Disabling remote class loading during job execution

Jobs can specify whether remote class loader lookups are enabled during their execution in a remote node. When remote class loading is disabled, lookups are only performed in the local classpath of each class loader in the class loader hierarchy, and no remote resource requests are sent to the server or client. This is done with the following accessors:

public interface JobSLA extends JobCommonSLA {
  // Determine whether remote class loading is enabled for the job. Default to true
  boolean isRemoteClassLoadingEnabled();

  // Specify whether remote class loading is enabled for the job
  void setRemoteClassLoadingEnabled(boolean enabled);
}

Note 1: when remote class loading is disabled, the classes that the JPPF node normally loads from the server cannot be loaded remotely either. It is thus required to have these classes in the node's local classpath, which is usally done by adding the "jppf-server.jar" and "jppf-common.jar" files to the node's classpath.

Note 2: if a class is not found while remote class loading is disabled, it will remain not found, even if the next job specifies that remote class loading is enabled. This is due to the fact that the JPPF class loaders maintain a cache of classes not found to avoid unnecessary remote lookups. To avoid this behavior, the task class loader should be reset before the next job is executed.

Client side SLA attributes

A client-side SLA is described by the interface JobClientSLA, defined as:

public interface JobClientSLA extends JobCommonSLA {
  // The maximum number of channels the job can be sent through,
  // including the local executor if any is configured
  int getMaxChannels();
  void setMaxChannels(int maxChannels);
}

Note: since JPPF clients do not have a management interface, none of the client-side SLA attributes are manageable.

Maximum number of execution channels

The maximum number of channels attribute determines how many server connections a job can be sent through, at any given time. This is an upper bound limit, and does not guarantee that this number of channels will always be used. This attribute is also non-specific, since it does not specify which channels will be used.

Using more than one channel for a job enables faster I/O between the client and the server, since the job can be split in multiple chunks and sent to the server via multiple channels in parallel.

Note 1: when the JPPF client is configured with a single server connection, this attribute has no effect.

Note 2: when local execution is enabled in the JPPF client, the local executor counts as one (additional) channel.

Note 3: the resulting assignment of channels to the job is influenced by other attributes, especially the execution policy.

Example usage:

JPPFJob job = new JPPFJob();
// use 2 channels to send the job and receive the results
job.getClientSLA().setMaxChannels(2);
Main Page > Development guide > Job Service Level Agreement

Support This Project Copyright © 2005-2016 JPPF.org Powered by MediaWiki