001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.util.*;
022import java.util.concurrent.*;
023
024import org.jppf.JPPFException;
025import org.jppf.client.balancer.ClientTaskBundle;
026import org.jppf.client.event.*;
027import org.jppf.client.event.JobEvent.Type;
028import org.jppf.client.persistence.*;
029import org.jppf.client.taskwrapper.JPPFAnnotatedTask;
030import org.jppf.execute.ExecutorChannel;
031import org.jppf.node.protocol.Task;
032import org.jppf.utils.*;
033import org.jppf.utils.concurrent.ConcurrentUtils;
034import org.slf4j.*;
035
036/**
037 * Instances of this class represent a JPPF job and hold all the required elements:
038 * tasks, execution policy, task listener, data provider, priority, blocking indicator.<br>
039 * <p>This class also provides the API for handling JPPF-annotated tasks and POJO tasks.
040 * <p>All jobs have a name. It can be specified by calling {@link #setName(java.lang.String) setName(String name)}.
041 * If left unspecified, JPPF will automatically assign a uuid as its value.
042 * @author Laurent Cohen
043 */
044public class JPPFJob extends AbstractJPPFJob implements Iterable<Task<?>>, Future<List<Task<?>>> {
045  /**
046   * Logger for this class.
047   */
048  private static Logger log = LoggerFactory.getLogger(JPPFJob.class);
049  /**
050   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
051   */
052  private static boolean debugEnabled = log.isDebugEnabled();
053  /**
054   * Explicit serialVersionUID.
055   */
056  private static final long serialVersionUID = 1L;
057
058  /**
059   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
060   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
061   */
062  public JPPFJob() {
063    this(JPPFUuid.normalUUID());
064  }
065
066  /**
067   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
068   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
069   * @param jobUuid the uuid to assign to this job.
070   */
071  public JPPFJob(final String jobUuid) {
072    super(jobUuid);
073  }
074
075  /**
076   * Get the list of tasks to execute.
077   * @return a list of objects.
078   */
079  public List<Task<?>> getJobTasks() {
080    return tasks;
081  }
082
083  /**
084   * Add the specified tasks to this job in a bulk operation.
085   * @param tasks the list of tasks to add.
086   * @throws JPPFException if any error occurs.
087   * @since 6.0
088   */
089  public void addAll(final List<Task<?>> tasks)  throws JPPFException {
090    int pos = this.tasks.size();
091    for (Task<?> task: tasks) task.setPosition(pos++);
092    this.tasks.addAll(tasks);
093  }
094
095  /**
096   * Add a task to this job. This method is for adding a task that is either an instance of {@link org.jppf.node.protocol.Task Task},
097   * annotated with {@link org.jppf.node.protocol.JPPFRunnable JPPFRunnable}, or an instance of {@link java.lang.Runnable Runnable} or {@link java.util.concurrent.Callable Callable}.
098   * @param taskObject the task to add to this job.
099   * @param args arguments to use with a JPPF-annotated class.
100   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
101   * or a wrapper around the input object in the other cases.
102   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
103   */
104  public Task<?> add(final Object taskObject, final Object...args) throws JPPFException {
105    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
106    Task<?> jppfTask = null;
107    if (taskObject instanceof Task) jppfTask = (Task<?>) taskObject;
108    else jppfTask = new JPPFAnnotatedTask(taskObject, args);
109    tasks.add(jppfTask);
110    jppfTask.setPosition(tasks.size()-1);
111    return jppfTask;
112  }
113
114  /**
115   * Add a POJO task to this job. The POJO task is identified as a method name associated with either an object for a non-static method,
116   * or a class for a static method or for a constructor.
117   * @param method the name of the method to execute. For a constructor, this should be identical to the simple name of the class as per {@code Class.getSimpleName()}.
118   * @param taskObject the task to add to this job.
119   * @param args arguments to use with a JPPF-annotated class.
120   * @return an instance of <code>Task</code> that is a wrapper around the input task object.
121   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
122   */
123  public Task<?> add(final String method, final Object taskObject, final Object...args) throws JPPFException {
124    if (taskObject == null) throw new JPPFException("null tasks are not accepted");
125    final Task <?>jppfTask = new JPPFAnnotatedTask(taskObject, method, args);
126    tasks.add(jppfTask);
127    jppfTask.setPosition(tasks.size()-1);
128    return jppfTask;
129  }
130
131  /**
132   * Add a {@link Task} to this job.
133   * @param task the task to add to this job.
134   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
135   * or a wrapper around the input object in the other cases.
136   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
137   * @since 5.0
138   */
139  public Task<?> add(final Task<?> task) throws JPPFException {
140    return add(task, (Object[]) null);
141  }
142
143  /**
144   * Add a {@link Runnable} task to this job.
145   * @param runnable the runnable task to add to this job.
146   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
147   * or a wrapper around the input object in the other cases.
148   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
149   * @since 5.0
150   */
151  public Task<?> add(final Runnable runnable) throws JPPFException {
152    return add(runnable, (Object[]) null);
153  }
154
155  /**
156   * Add a {@link Callable} task to this job.
157   * @param callable the callable task to add to this job.
158   * @return an instance of <code>Task</code> that is either the same as the input if the input is a subclass of <code>JPPFTask</code>,
159   * or a wrapper around the input object in the other cases.
160   * @throws JPPFException if one of the tasks is neither a <code>Task</code> or a JPPF-annotated class.
161   * @since 5.0
162   */
163  public Task<?> add(final Callable<?> callable) throws JPPFException {
164    return add(callable, (Object[]) null);
165  }
166
167  /**
168   * Add a listener to the list of job listeners.
169   * @param listener a {@link JobListener} instance.
170   */
171  public void addJobListener(final JobListener listener) {
172    listeners.add(listener);
173  }
174
175  /**
176   * Remove a listener from the list of job listeners.
177   * @param listener a {@link JobListener} instance.
178   */
179  public void removeJobListener(final JobListener listener) {
180    listeners.remove(listener);
181  }
182
183  /**
184   * Notify all listeners of the specified event type.
185   * @param type the type of the event.
186   * @param channel the channel to which a job is dispatched or from which it is returned.
187   * @param tasks the tasks that were dispatched or returned.
188   * @exclude
189   */
190  public void fireJobEvent(final JobEvent.Type type, final ExecutorChannel<ClientTaskBundle> channel, final List<Task<?>> tasks) {
191    if (log.isDebugEnabled()) log.debug("firing {} event with {} tasks for {}, connection = {}", type, (tasks == null ? 0 : tasks.size()), this, channel);
192    final JobEvent event = new JobEvent(this, channel, tasks);
193    switch(type) {
194      case JOB_START: for (JobListener listener: listeners) listener.jobStarted(event);
195      break;
196      case JOB_END: for (JobListener listener: listeners) listener.jobEnded(event);
197      break;
198      case JOB_DISPATCH: for (JobListener listener: listeners) listener.jobDispatched(event);
199      break;
200      case JOB_RETURN: for (JobListener listener: listeners) listener.jobReturned(event);
201      break;
202    }
203  }
204
205  /**
206   * Get the persistence manager that enables saving and restoring the state of this job.
207   * @return a {@link JobPersistence} instance.
208   * @param <T> the type of the keys used by the persistence manager.
209   */
210  @SuppressWarnings("unchecked")
211  public <T> JobPersistence<T> getPersistenceManager() {
212    return (JobPersistence<T>) persistenceManager;
213  }
214
215  /**
216   * Set the persistence manager that enables saving and restoring the state of this job.
217   * @param persistenceManager a {@link JobPersistence} instance.
218   * @param <T> the type of the keys used by the persistence manager.
219   * @return this job, for method chaining.
220   */
221  public <T> JPPFJob setPersistenceManager(final JobPersistence<T> persistenceManager) {
222    this.persistenceManager = persistenceManager;
223    return this;
224  }
225
226  @Override
227  public Iterator<Task<?>> iterator() {
228    return tasks.iterator();
229  }
230
231  /**
232   * Wait until all execution results of the tasks in this job have been collected.
233   * This method is equivalent to {@code get()}, except that it doesn't raise an exception.
234   * @return the list of resulting tasks.
235   * @since 4.2
236   */
237  public List<Task<?>> awaitResults() {
238    return awaitResults(Long.MAX_VALUE);
239  }
240
241  /**
242   * Wait until all execution results of the tasks in this job have been collected, or the timeout expires, whichever happens first.
243   * This method is equivalent to {@code get(timeout, TimeUnit.MILLISECONDS)}, except that it doesn't raise an exception.
244   * @param timeout the maximum time to wait in milliseconds, zero or less meaning an infinite wait.
245   * @return the list of resulting tasks, or {@code null} if the timeout expired before all results were received.
246   * @since 4.2
247   */
248  public List<Task<?>> awaitResults(final long timeout) {
249    try {
250      await(timeout, false);
251    } catch (@SuppressWarnings("unused") final TimeoutException ignore) {
252    }
253    return results.getResultsList();
254  }
255
256  /**
257   * Get the list of currently available task execution results.
258   * This method is a shortcut for {@code getResults().getResultsList()}.
259   * @return a list of {@link Task} instances, possibly empty.
260   * @since 4.2
261   */
262  public List<Task<?>> getAllResults() {
263    return results.getResultsList();
264  }
265
266  /**
267   * Cancel this job unconditionally.
268   * This method is equivalent to calling {@code cancel(true)}.
269   * @return {@code false} if the job could not be cancelled, typically because it has already completed normally; {@code true} otherwise.
270   * @since 4.2
271   */
272  public boolean cancel() {
273    return cancel(true);
274  }
275
276  /**
277   * {@inheritDoc}
278   * @since 4.2
279   */
280  @Override
281  public boolean cancel(final boolean mayInterruptIfRunning) {
282    if (log.isDebugEnabled()) log.debug("request to cancel {}, client={}", this, client);
283    if (getCancellingFlag().compareAndSet(false, true)) {
284      try {
285        if (mayInterruptIfRunning || (getStatus() != JobStatus.EXECUTING)) {
286          try {
287            if (client != null) return client.cancelJob(uuid);
288          } catch(final Exception e) {
289            log.error("error cancelling job {} : {}", this, ExceptionUtils.getStackTrace(e));
290          }
291        }
292      } finally {
293        getCancellingFlag().set(false);
294      }
295    }
296    return false;
297  }
298
299  /**
300   * {@inheritDoc}
301   * @since 4.2
302   */
303  @Override
304  public boolean isCancelled() {
305    return cancelled.get();
306  }
307
308  /**
309   * {@inheritDoc}
310   * @since 4.2
311   */
312  @Override
313  public boolean isDone() {
314    return cancelled.get() || (unexecutedTaskCount() <= 0);
315  }
316
317  /**
318   * {@inheritDoc}
319   * @since 4.2
320   */
321  @Override
322  public List<Task<?>> get() throws InterruptedException, ExecutionException {
323    return awaitResults(Long.MAX_VALUE);
324  }
325
326  /**
327   * {@inheritDoc}
328   * @since 4.2
329   */
330  @Override
331  public List<Task<?>> get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
332    await(DateTimeUtils.toMillis(timeout, unit), true);
333    return results.getResultsList();
334  }
335
336  /**
337   * Wait until the job is complete or the timeout expires, whichever happens first.
338   * @param timeout the maximum time to wait for the job completion.
339   * @param raiseTimeoutException whether to raise a {@link TimeoutException} when the timeout expires.
340   * @throws TimeoutException if the tiemout expired and {@code raiseTimeoutException == true}.
341   */
342  void await(final long timeout, final boolean raiseTimeoutException) throws TimeoutException {
343    final boolean fullfilled = ConcurrentUtils.awaitCondition(results, new ConcurrentUtils.Condition() {
344      @Override public boolean evaluate() {
345        final JobStatus status = getStatus();
346        return (results.size() >= tasks.size()) && ((status == JobStatus.FAILED) || (status == JobStatus.COMPLETE));
347      }
348    }, timeout, 1000L);
349    if (!fullfilled && raiseTimeoutException) throw new TimeoutException("timeout expired");
350  }
351
352  /**
353   * Called to notify that the results of a number of tasks have been received from the server.
354   * @param tasks the list of tasks whose results have been received from the server.
355   * @param throwable the throwable that was raised while receiving the results.
356   * @param sendJobEvent whether to emit a {@link org.jppf.client.event.JobEvent JobEvent} notification.
357   * @exclude
358   */
359  public void resultsReceived(final List<Task<?>> tasks, final Throwable throwable, final boolean sendJobEvent) {
360    synchronized(results) {
361      int unexecutedTaskCount = 0;
362      if (tasks != null) {
363        results.addResults(tasks);
364        unexecutedTaskCount = this.unexecutedTaskCount();
365        if (debugEnabled) log.debug("Received results for {} tasks, pendingCount={}, count={}, jobResults={}", tasks.size(), unexecutedTaskCount, tasks.size(), results);
366        if (persistenceManager != null) {
367          try {
368            @SuppressWarnings("unchecked")
369            final JobPersistence<Object> pm = (JobPersistence<Object>) persistenceManager;
370            pm.storeJob(pm.computeKey(this), this, tasks);
371          } catch (final JobPersistenceException e) {
372            log.error(e.getMessage(), e);
373          }
374        }
375      } else {
376        if (debugEnabled) log.debug("received throwable '{}'", ExceptionUtils.getMessage(throwable));
377      }
378      if (sendJobEvent) {
379        fireJobEvent(JobEvent.Type.JOB_RETURN, null, tasks);
380        if (unexecutedTaskCount <= 0) fireJobEvent(Type.JOB_END, null, tasks);
381      }
382      client.unregisterClassLoaders(uuid);
383    }
384    results.wakeUp();
385  }
386}