001/*
002 * JPPF.
003 * Copyright (C) 2005-2015 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.io.*;
022import java.util.*;
023import java.util.concurrent.*;
024
025import org.jppf.JPPFException;
026import org.jppf.client.event.*;
027import org.jppf.client.persistence.JobPersistence;
028import org.jppf.client.taskwrapper.JPPFAnnotatedTask;
029import org.jppf.execute.ExecutorChannel;
030import org.jppf.node.protocol.*;
031import org.jppf.utils.*;
032import org.slf4j.*;
033
034/**
035 * Instances of this class represent a JPPF job and hold all the required elements:
036 * tasks, execution policy, task listener, data provider, priority, blocking indicator.<br>
037 * <p>This class also provides the API for handling JPPF-annotated tasks and POJO tasks.
038 * <p>All jobs have a name. It can be specified by calling {@link #setName(java.lang.String) setName(String name)}.
039 * If left unspecified, JPPF will automatically assign a uuid as its value.
040 * @author Laurent Cohen
041 */
042public class JPPFJob extends AbstractJPPFJob implements Iterable<Task<?>>, Future<List<Task<?>>> {
043  /**
044   * Logger for this class.
045   */
046  private static Logger log = LoggerFactory.getLogger(JPPFJob.class);
047  /**
048   * Explicit serialVersionUID.
049   */
050  private static final long serialVersionUID = 1L;
051  /**
052   * The listener that receives notifications of completed tasks.
053   */
054  transient JPPFResultCollector resultCollector;
055
056  /**
057   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
058   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
059   */
060  public JPPFJob() {
061    this(JPPFUuid.normalUUID());
062  }
063
064  /**
065   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
066   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
067   * @param jobUuid the uuid to assign to this job.
068   */
069  public JPPFJob(final String jobUuid) {
070    super(jobUuid);
071    resultCollector = new JPPFResultCollector(this);
072  }
073
074  /**
075   * Get the listener that receives notifications of completed tasks.
076   * @return a <code>TaskCompletionListener</code> instance.
077   * @exclude
078   */
079  public JPPFResultCollector getResultCollector() {
080    return resultCollector;
081  }
082
083  @Override
084  public String getUuid() {
085    return uuid;
086  }
087
088  @Override
089  public String getName() {
090    return name;
091  }
092
093  /**
094   * Set the user-defined display name for this job.
095   * @param name the display name as a string.
096   */
097  public void setName(final String name) {
098    this.name = name;
099  }
100
101  /**
102   * Get the list of tasks to execute.
103   * @return a list of objects.
104   */
105  public List<Task<?>> getJobTasks() {
106    return tasks;
107  }
108
109  /**
110   * Add a task to this job. This method is for adding a task that is either an instance of {@link org.jppf.node.protocol.Task Task},
111   * annotated with {@link org.jppf.node.protocol.JPPFRunnable JPPFRunnable}, or an instance of {@link java.lang.Runnable Runnable} or {@link java.util.concurrent.Callable Callable}.
112   * @param taskObject the task to add to this job.
113   * @param args arguments to use with a JPPF-annotated class.
114   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
115   * or a wrapper around the input object in the other cases.
116   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
117   */
118  public Task<?> add(final Object taskObject, final Object...args) throws JPPFException {
119    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
120    Task<?> jppfTask = null;
121    if (taskObject instanceof Task) jppfTask = (Task) taskObject;
122    else jppfTask = new JPPFAnnotatedTask(taskObject, args);
123    tasks.add(jppfTask);
124    jppfTask.setPosition(tasks.size()-1);
125    return jppfTask;
126  }
127
128  /**
129   * Add a POJO task to this job. The POJO task is identified as a method name associated with either an object for a non-static method,
130   * or a class for a static method or for a constructor.
131   * @param taskObject the task to add to this job.
132   * @param method the name of the method to execute.
133   * @param args arguments to use with a JPPF-annotated class.
134   * @return an instance of <code>Task</code> that is a wrapper around the input task object.
135   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
136   */
137  public Task<?> add(final String method, final Object taskObject, final Object...args) throws JPPFException {
138    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
139    Task <?>jppfTask = new JPPFAnnotatedTask(taskObject, method, args);
140    tasks.add(jppfTask);
141    jppfTask.setPosition(tasks.size()-1);
142    return jppfTask;
143  }
144
145  /**
146   * Add a {@link Task} to this job.
147   * @param task the task to add to this job.
148   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
149   * or a wrapper around the input object in the other cases.
150   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
151   * @since 5.0
152   */
153  public Task<?> add(final Task<?> task) throws JPPFException {
154    return add(task, (Object[]) null);
155  }
156
157  /**
158   * Add a {@link Runnable} task to this job.
159   * @param runnable the runnable task to add to this job.
160   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
161   * or a wrapper around the input object in the other cases.
162   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
163   * @since 5.0
164   */
165  public Task<?> add(final Runnable runnable) throws JPPFException {
166    return add(runnable, (Object[]) null);
167  }
168
169  /**
170   * Add a {@link Callable} task to this job.
171   * @param callable the callable task to add to this job.
172   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
173   * or a wrapper around the input object in the other cases.
174   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
175   * @since 5.0
176   */
177  public Task<?> add(final Callable<?> callable) throws JPPFException {
178    return add(callable, (Object[]) null);
179  }
180
181  /**
182   * Get the container for data shared between tasks.
183   * @return a <code>DataProvider</code> instance.
184   */
185  public DataProvider getDataProvider() {
186    return dataProvider;
187  }
188
189  /**
190   * Set the container for data shared between tasks.
191   * @param dataProvider a <code>DataProvider</code> instance.
192   */
193  public void setDataProvider(final DataProvider dataProvider) {
194    this.dataProvider = dataProvider;
195  }
196
197  /**
198   * Determine whether the execution of this job is blocking on the client side.
199   * @return true if the execution is blocking, false otherwise.
200   */
201  public boolean isBlocking() {
202    return blocking;
203  }
204
205  /**
206   * Specify whether the execution of this job is blocking on the client side.
207   * @param blocking true if the execution is blocking, false otherwise.
208   */
209  public void setBlocking(final boolean blocking) {
210    this.blocking = blocking;
211  }
212
213  @Override
214  public JobSLA getSLA() {
215    return jobSLA;
216  }
217
218  /**
219   * Get the job SLA for the client side.
220   * @return an instance of <code>JobSLA</code>.
221   */
222  public JobClientSLA getClientSLA() {
223    return jobClientSLA;
224  }
225
226  @Override
227  public JobMetadata getMetadata() {
228    return jobMetadata;
229  }
230
231  /**
232   * Add a listener to the list of job listeners.
233   * @param listener a {@link JobListener} instance.
234   */
235  public void addJobListener(final JobListener listener) {
236    listeners.add(listener);
237  }
238
239  /**
240   * Remove a listener from the list of job listeners.
241   * @param listener a {@link JobListener} instance.
242   */
243  public void removeJobListener(final JobListener listener) {
244    listeners.remove(listener);
245  }
246
247  /**
248   * Notify all listeners of the specified event type.
249   * @param type the type of the event.
250   * @param channel the channel to which a job is dispatched or from which it is returned.
251   * @param tasks the tasks that were dispatched or returned.
252   * @exclude
253   */
254  public void fireJobEvent(final JobEvent.Type type, final ExecutorChannel channel, final List<Task<?>> tasks) {
255    if (log.isDebugEnabled()) log.debug(String.format("firing %s event with %d tasks for %s", type, (tasks == null ? 0 : tasks.size()), this));
256    JobEvent event = new JobEvent(this, channel, tasks);
257    switch(type) {
258      case JOB_START: for (JobListener listener: listeners) listener.jobStarted(event);
259      break;
260      case JOB_END: for (JobListener listener: listeners) listener.jobEnded(event);
261      break;
262      case JOB_DISPATCH: for (JobListener listener: listeners) listener.jobDispatched(event);
263      break;
264      case JOB_RETURN: for (JobListener listener: listeners) listener.jobReturned(event);
265      break;
266    }
267  }
268
269  /**
270   * Get the persistence manager that enables saving and restoring the state of this job.
271   * @return a {@link JobPersistence} instance.
272   * @param <T> the type of the keys used by the persistence manager.
273   */
274  @SuppressWarnings("unchecked")
275  public <T> JobPersistence<T> getPersistenceManager() {
276    return (JobPersistence<T>) persistenceManager;
277  }
278
279  /**
280   * Set the persistence manager that enables saving and restoring the state of this job.
281   * @param persistenceManager a {@link JobPersistence} instance.
282   * @param <T> the type of the keys used by the persistence manager.
283   */
284  public <T> void setPersistenceManager(final JobPersistence<T> persistenceManager) {
285    this.persistenceManager = persistenceManager;
286  }
287
288  @Override
289  public Iterator<Task<?>> iterator() {
290    return tasks.iterator();
291  }
292
293  /**
294   * Get the count of the tasks in this job that haven completed.
295   * @return the number of executed tasks in this job.
296   * @since 4.2
297   */
298  public int executedTaskCount() {
299    return results.size();
300  }
301
302  /**
303   * Get the count of the tasks in this job that haven't yet been executed.
304   * @return the number of unexecuted tasks in this job.
305   * @since 4.2
306   */
307  public int unexecutedTaskCount() {
308    return tasks.size() - results.size();
309  }
310
311  /**
312   * Wait until all execution results of the tasks in this job have been collected.
313   * This method is equivalent to {@code get()}, except that it doesn't raise an exception.
314   * @return the list of resulting tasks.
315   * @since 4.2
316   */
317  public List<Task<?>> awaitResults() {
318    return awaitResults(Long.MAX_VALUE);
319  }
320
321  /**
322   * Wait until all execution results of the tasks in this job have been collected, or the timeout expires, whichever happens first.
323   * This method is equivalent to {@code get(timeout, TimeUnit.MILLISECONDS)}, except that it doesn't raise an exception.
324   * @param timeout the maximum time to wait in milliseconds, zero or less meaning an infinite wait.
325   * @return the list of resulting tasks, or {@code null} if the timeout expired before all results were received.
326   * @since 4.2
327   */
328  public List<Task<?>> awaitResults(final long timeout) {
329    try {
330      await(timeout, false);
331    } catch (TimeoutException ignore) {
332    }
333    return results.getResultsList();
334  }
335
336  /**
337   * Get the list of currently available task execution results.
338   * This method is a shortcut for {@code getResults().getResultsList()}.
339   * @return a list of {@link Task} instances, possibly empty.
340   * @since 4.2
341   */
342  public List<Task<?>> getAllResults() {
343    return results.getResultsList();
344  }
345
346  /**
347   * Get the execution status of this job.
348   * @return a {@link JobStatus} enum value, or {@code null} isd the status could not be determined.
349   * @since 4.2
350   */
351  public JobStatus getStatus() {
352    if (resultCollector instanceof JobStatusHandler) return ((JobStatusHandler) resultCollector).getStatus();
353    return null;
354  }
355
356  /**
357   * Cancel this job unconditionally.
358   * This method is equivalent to calling {@code cancel(true)}.
359   * @return {@code false} if the job could not be cancelled, typically because it has already completed normally; {@code true} otherwise.
360   * @since 4.2
361   */
362  public boolean cancel() {
363    return cancel(true);
364  }
365
366  /**
367   * {@inheritDoc}
368   * @since 4.2
369   */
370  @Override
371  public boolean cancel(final boolean mayInterruptIfRunning) {
372    if (log.isDebugEnabled()) log.debug("request to cancel {}, client={}", this, client);
373    if (mayInterruptIfRunning || (getStatus() != JobStatus.EXECUTING)) {
374      try {
375        if (client != null) return client.cancelJob(uuid);
376      } catch(Exception e) {
377        log.error("error cancelling job {} : {}", this, ExceptionUtils.getStackTrace(e));
378      }
379    }
380    return false;
381  }
382
383  /**
384   * {@inheritDoc}
385   * @since 4.2
386   */
387  @Override
388  public boolean isCancelled() {
389    return cancelled.get();
390  }
391
392  /**
393   * {@inheritDoc}
394   * @since 4.2
395   */
396  @Override
397  public boolean isDone() {
398    return cancelled.get() || (unexecutedTaskCount() <= 0);
399  }
400
401  /**
402   * {@inheritDoc}
403   * @since 4.2
404   */
405  @Override
406  public List<Task<?>> get() throws InterruptedException, ExecutionException {
407    return awaitResults(Long.MAX_VALUE);
408  }
409
410  /**
411   * {@inheritDoc}
412   * @since 4.2
413   */
414  @Override
415  public List<Task<?>> get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
416    await(DateTimeUtils.toMillis(timeout, unit), true);
417    return results.getResultsList();
418  }
419
420  /**
421   * Wait until the job is complete or the timeout expires, whichever happens first.
422   * @param timeout the maximum time to wait for the job completion.
423   * @param raiseTimeoutException whether to raise a {@link TimeoutException} when the timeout expires.
424   * @throws TimeoutException if the tiemout expired and {@code raiseTimeoutException == true}.
425   */
426  void await(final long timeout, final boolean raiseTimeoutException) throws TimeoutException {
427    boolean fullfilled = ConcurrentUtils.awaitCondition(results, new ConcurrentUtils.Condition() {
428      @Override public boolean evaluate() {
429        JobStatus status = getStatus();
430        return (results.size() >= tasks.size()) && ((status == JobStatus.FAILED) || (status == JobStatus.COMPLETE));
431      }
432    }, timeout);
433    if (!fullfilled && raiseTimeoutException) throw new TimeoutException("timeout expired");
434  }
435
436  /**
437   * Save the state of the {@code JPPFJob} instance to a stream (i.e.,serialize it).
438   * @param out the output stream to which to write the job. 
439   * @throws IOException if any I/O error occurs.
440   * @since 5.0
441   */
442  private void writeObject(final ObjectOutputStream out) throws IOException {
443    out.defaultWriteObject();
444  }
445
446  /**
447   * Reconstitute the {@code TreeMap} instance from a stream (i.e., deserialize it).
448   * @param in the input stream from which to read the job. 
449   * @throws IOException if any I/O error occurs.
450   * @throws ClassNotFoundException if the class of an object in the object graph can not be found.
451   * @since 5.0
452   */
453  private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
454    in.defaultReadObject();
455    resultCollector = new JPPFResultCollector(this);
456  }
457
458  @Override
459  public int getTaskCount() {
460    return tasks.size();
461  }
462}