001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.io.*;
022import java.util.*;
023import java.util.concurrent.*;
024
025import org.jppf.JPPFException;
026import org.jppf.client.event.*;
027import org.jppf.client.persistence.JobPersistence;
028import org.jppf.client.taskwrapper.JPPFAnnotatedTask;
029import org.jppf.execute.ExecutorChannel;
030import org.jppf.node.protocol.*;
031import org.jppf.utils.*;
032import org.slf4j.*;
033
034/**
035 * Instances of this class represent a JPPF job and hold all the required elements:
036 * tasks, execution policy, task listener, data provider, priority, blocking indicator.<br>
037 * <p>This class also provides the API for handling JPPF-annotated tasks and POJO tasks.
038 * <p>All jobs have a name. It can be specified by calling {@link #setName(java.lang.String) setName(String name)}.
039 * If left unspecified, JPPF will automatically assign a uuid as its value.
040 * @author Laurent Cohen
041 */
042public class JPPFJob extends AbstractJPPFJob implements Iterable<Task<?>>, Future<List<Task<?>>> {
043  /**
044   * Logger for this class.
045   */
046  private static Logger log = LoggerFactory.getLogger(JPPFJob.class);
047  /**
048   * Explicit serialVersionUID.
049   */
050  private static final long serialVersionUID = 1L;
051  /**
052   * The listener that receives notifications of completed tasks.
053   */
054  transient JPPFResultCollector resultCollector;
055
056  /**
057   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
058   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
059   */
060  public JPPFJob() {
061    this(JPPFUuid.normalUUID());
062  }
063
064  /**
065   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
066   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
067   * @param jobUuid the uuid to assign to this job.
068   */
069  public JPPFJob(final String jobUuid) {
070    super(jobUuid);
071    resultCollector = new JPPFResultCollector(this);
072  }
073
074  /**
075   * Get the listener that receives notifications of completed tasks.
076   * @return a <code>TaskCompletionListener</code> instance.
077   * @exclude
078   */
079  public JPPFResultCollector getResultCollector() {
080    return resultCollector;
081  }
082
083  @Override
084  public String getUuid() {
085    return uuid;
086  }
087
088  @Override
089  public String getName() {
090    return name;
091  }
092
093  /**
094   * Set the user-defined display name for this job.
095   * @param name the display name as a string.
096   */
097  public void setName(final String name) {
098    this.name = name;
099  }
100
101  /**
102   * Get the list of tasks to execute.
103   * @return a list of objects.
104   */
105  public List<Task<?>> getJobTasks() {
106    return tasks;
107  }
108
109  /**
110   * Add a task to this job. This method is for adding a task that is either an instance of {@link org.jppf.node.protocol.Task Task},
111   * annotated with {@link org.jppf.node.protocol.JPPFRunnable JPPFRunnable}, or an instance of {@link java.lang.Runnable Runnable} or {@link java.util.concurrent.Callable Callable}.
112   * @param taskObject the task to add to this job.
113   * @param args arguments to use with a JPPF-annotated class.
114   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
115   * or a wrapper around the input object in the other cases.
116   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
117   */
118  public Task<?> add(final Object taskObject, final Object...args) throws JPPFException {
119    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
120    Task<?> jppfTask = null;
121    if (taskObject instanceof Task) jppfTask = (Task) taskObject;
122    else jppfTask = new JPPFAnnotatedTask(taskObject, args);
123    tasks.add(jppfTask);
124    jppfTask.setPosition(tasks.size()-1);
125    return jppfTask;
126  }
127
128  /**
129   * Add a POJO task to this job. The POJO task is identified as a method name associated with either an object for a non-static method,
130   * or a class for a static method or for a constructor.
131   * @param taskObject the task to add to this job.
132   * @param method the name of the method to execute.
133   * @param args arguments to use with a JPPF-annotated class.
134   * @return an instance of <code>Task</code> that is a wrapper around the input task object.
135   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
136   */
137  public Task<?> add(final String method, final Object taskObject, final Object...args) throws JPPFException {
138    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
139    Task <?>jppfTask = new JPPFAnnotatedTask(taskObject, method, args);
140    tasks.add(jppfTask);
141    jppfTask.setPosition(tasks.size()-1);
142    return jppfTask;
143  }
144
145  /**
146   * Add a {@link Task} to this job.
147   * @param task the task to add to this job.
148   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
149   * or a wrapper around the input object in the other cases.
150   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
151   * @since 5.0
152   */
153  public Task<?> add(final Task<?> task) throws JPPFException {
154    return add(task, (Object[]) null);
155  }
156
157  /**
158   * Add a {@link Runnable} task to this job.
159   * @param runnable the runnable task to add to this job.
160   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
161   * or a wrapper around the input object in the other cases.
162   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
163   * @since 5.0
164   */
165  public Task<?> add(final Runnable runnable) throws JPPFException {
166    return add(runnable, (Object[]) null);
167  }
168
169  /**
170   * Add a {@link Callable} task to this job.
171   * @param callable the callable task to add to this job.
172   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
173   * or a wrapper around the input object in the other cases.
174   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
175   * @since 5.0
176   */
177  public Task<?> add(final Callable<?> callable) throws JPPFException {
178    return add(callable, (Object[]) null);
179  }
180
181  /**
182   * Get the container for data shared between tasks.
183   * @return a <code>DataProvider</code> instance.
184   */
185  public DataProvider getDataProvider() {
186    return dataProvider;
187  }
188
189  /**
190   * Set the container for data shared between tasks.
191   * @param dataProvider a <code>DataProvider</code> instance.
192   */
193  public void setDataProvider(final DataProvider dataProvider) {
194    this.dataProvider = dataProvider;
195  }
196
197  /**
198   * Determine whether the execution of this job is blocking on the client side.
199   * @return true if the execution is blocking, false otherwise.
200   */
201  public boolean isBlocking() {
202    return blocking;
203  }
204
205  /**
206   * Specify whether the execution of this job is blocking on the client side.
207   * @param blocking true if the execution is blocking, false otherwise.
208   */
209  public void setBlocking(final boolean blocking) {
210    this.blocking = blocking;
211  }
212
213  @Override
214  public JobSLA getSLA() {
215    return jobSLA;
216  }
217
218  /**
219   * Get the job SLA for the client side.
220   * @return an instance of <code>JobSLA</code>.
221   */
222  public JobClientSLA getClientSLA() {
223    return jobClientSLA;
224  }
225
226  @Override
227  public JobMetadata getMetadata() {
228    return jobMetadata;
229  }
230
231  /**
232   * Add a listener to the list of job listeners.
233   * @param listener a {@link JobListener} instance.
234   */
235  public void addJobListener(final JobListener listener) {
236    listeners.add(listener);
237  }
238
239  /**
240   * Remove a listener from the list of job listeners.
241   * @param listener a {@link JobListener} instance.
242   */
243  public void removeJobListener(final JobListener listener) {
244    listeners.remove(listener);
245  }
246
247  /**
248   * Notify all listeners of the specified event type.
249   * @param type the type of the event.
250   * @param channel the channel to which a job is dispatched or from which it is returned.
251   * @param tasks the tasks that were dispatched or returned.
252   * @exclude
253   */
254  public void fireJobEvent(final JobEvent.Type type, final ExecutorChannel channel, final List<Task<?>> tasks) {
255    if (log.isDebugEnabled()) log.debug(String.format("firing %s event with %d tasks for %s", type, (tasks == null ? 0 : tasks.size()), this));
256    JobEvent event = new JobEvent(this, channel, tasks);
257    switch(type) {
258      case JOB_START: for (JobListener listener: listeners) listener.jobStarted(event);
259      break;
260      case JOB_END: for (JobListener listener: listeners) listener.jobEnded(event);
261      break;
262      case JOB_DISPATCH: for (JobListener listener: listeners) listener.jobDispatched(event);
263      break;
264      case JOB_RETURN: for (JobListener listener: listeners) listener.jobReturned(event);
265      break;
266    }
267  }
268
269  /**
270   * Get the persistence manager that enables saving and restoring the state of this job.
271   * @return a {@link JobPersistence} instance.
272   * @param <T> the type of the keys used by the persistence manager.
273   */
274  @SuppressWarnings("unchecked")
275  public <T> JobPersistence<T> getPersistenceManager() {
276    return (JobPersistence<T>) persistenceManager;
277  }
278
279  /**
280   * Set the persistence manager that enables saving and restoring the state of this job.
281   * @param persistenceManager a {@link JobPersistence} instance.
282   * @param <T> the type of the keys used by the persistence manager.
283   */
284  public <T> void setPersistenceManager(final JobPersistence<T> persistenceManager) {
285    this.persistenceManager = persistenceManager;
286  }
287
288  @Override
289  public Iterator<Task<?>> iterator() {
290    return tasks.iterator();
291  }
292
293  /**
294   * Get the count of the tasks in this job that haven completed.
295   * @return the number of executed tasks in this job.
296   * @since 4.2
297   */
298  public int executedTaskCount() {
299    return results.size();
300  }
301
302  /**
303   * Get the count of the tasks in this job that haven't yet been executed.
304   * @return the number of unexecuted tasks in this job.
305   * @since 4.2
306   */
307  public int unexecutedTaskCount() {
308    return tasks.size() - results.size();
309  }
310
311  /**
312   * Wait until all execution results of the tasks in this job have been collected.
313   * This method is equivalent to {@code get()}, except that it doesn't raise an exception.
314   * @return the list of resulting tasks.
315   * @since 4.2
316   */
317  public List<Task<?>> awaitResults() {
318    return awaitResults(Long.MAX_VALUE);
319  }
320
321  /**
322   * Wait until all execution results of the tasks in this job have been collected, or the timeout expires, whichever happens first.
323   * This method is equivalent to {@code get(timeout, TimeUnit.MILLISECONDS)}, except that it doesn't raise an exception.
324   * @param timeout the maximum time to wait in milliseconds, zero or less meaning an infinite wait.
325   * @return the list of resulting tasks, or {@code null} if the timeout expired before all results were received.
326   * @since 4.2
327   */
328  public List<Task<?>> awaitResults(final long timeout) {
329    try {
330      await(timeout, false);
331    } catch (TimeoutException ignore) {
332      return null;
333    }
334    return results.getResultsList();
335  }
336
337  /**
338   * Get the list of currently available task execution results.
339   * This method is a shortcut for {@code getResults().getResultsList()}.
340   * @return a list of {@link Task} instances, possibly empty.
341   * @since 4.2
342   */
343  public List<Task<?>> getAllResults() {
344    return results.getResultsList();
345  }
346
347  /**
348   * Get the execution status of this job.
349   * @return a {@link JobStatus} enum value, or {@code null} isd the status could not be determined.
350   * @since 4.2
351   */
352  public JobStatus getStatus() {
353    if (resultCollector instanceof JobStatusHandler) return ((JobStatusHandler) resultCollector).getStatus();
354    return null;
355  }
356
357  /**
358   * Cancel this job unconditionally.
359   * This method is equivalent to calling {@code cancel(true)}.
360   * @return {@code false} if the job could not be cancelled, typically because it has already completed normally; {@code true} otherwise.
361   * @since 4.2
362   */
363  public boolean cancel() {
364    return cancel(true);
365  }
366
367  /**
368   * {@inheritDoc}
369   * @since 4.2
370   */
371  @Override
372  public boolean cancel(final boolean mayInterruptIfRunning) {
373    if (log.isDebugEnabled()) log.debug("request to cancel {}, client={}", this, client);
374    if (getCancellingFlag().compareAndSet(false, true)) {
375      try {
376        if (mayInterruptIfRunning || (getStatus() != JobStatus.EXECUTING)) {
377          try {
378            if (client != null) return client.cancelJob(uuid);
379          } catch(Exception e) {
380            log.error("error cancelling job {} : {}", this, ExceptionUtils.getStackTrace(e));
381          }
382        }
383      } finally {
384        getCancellingFlag().set(false);
385      }
386    }
387    return false;
388  }
389
390  /**
391   * {@inheritDoc}
392   * @since 4.2
393   */
394  @Override
395  public boolean isCancelled() {
396    return cancelled.get();
397  }
398
399  /**
400   * {@inheritDoc}
401   * @since 4.2
402   */
403  @Override
404  public boolean isDone() {
405    return cancelled.get() || (unexecutedTaskCount() <= 0);
406  }
407
408  /**
409   * {@inheritDoc}
410   * @since 4.2
411   */
412  @Override
413  public List<Task<?>> get() throws InterruptedException, ExecutionException {
414    return awaitResults(Long.MAX_VALUE);
415  }
416
417  /**
418   * {@inheritDoc}
419   * @since 4.2
420   */
421  @Override
422  public List<Task<?>> get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
423    await(DateTimeUtils.toMillis(timeout, unit), true);
424    return results.getResultsList();
425  }
426
427  /**
428   * Wait until the job is complete or the timeout expires, whichever happens first.
429   * @param timeout the maximum time to wait for the job completion.
430   * @param raiseTimeoutException whether to raise a {@link TimeoutException} when the timeout expires.
431   * @throws TimeoutException if the tiemout expired and {@code raiseTimeoutException == true}.
432   */
433  void await(final long timeout, final boolean raiseTimeoutException) throws TimeoutException {
434    boolean fullfilled = ConcurrentUtils.awaitCondition(results, new ConcurrentUtils.Condition() {
435      @Override public boolean evaluate() {
436        JobStatus status = getStatus();
437        return (results.size() >= tasks.size()) && ((status == JobStatus.FAILED) || (status == JobStatus.COMPLETE));
438      }
439    }, timeout);
440    if (!fullfilled && raiseTimeoutException) throw new TimeoutException("timeout expired");
441  }
442
443  /**
444   * Save the state of the {@code JPPFJob} instance to a stream (i.e.,serialize it).
445   * @param out the output stream to which to write the job. 
446   * @throws IOException if any I/O error occurs.
447   * @since 5.0
448   */
449  private void writeObject(final ObjectOutputStream out) throws IOException {
450    out.defaultWriteObject();
451  }
452
453  /**
454   * Reconstitute the {@code TreeMap} instance from a stream (i.e., deserialize it).
455   * @param in the input stream from which to read the job. 
456   * @throws IOException if any I/O error occurs.
457   * @throws ClassNotFoundException if the class of an object in the object graph can not be found.
458   * @since 5.0
459   */
460  private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
461    in.defaultReadObject();
462    resultCollector = new JPPFResultCollector(this);
463  }
464
465  @Override
466  public int getTaskCount() {
467    return tasks.size();
468  }
469}