001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.io.*;
022import java.util.*;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.atomic.*;
025
026import org.jppf.client.event.*;
027import org.jppf.client.persistence.JobPersistence;
028import org.jppf.node.protocol.*;
029import org.jppf.utils.JPPFUuid;
030import org.slf4j.*;
031
032/**
033 * Instances of this class represent a JPPF job and hold all the required elements:
034 * tasks, execution policy, task listener, data provider, priority, blocking indicator.<br>
035 * @author Laurent Cohen
036 */
037public abstract class AbstractJPPFJob implements Serializable, JPPFDistributedJob, JobStatusHandler {
038  /**
039   * Logger for this class.
040   */
041  private static Logger log = LoggerFactory.getLogger(AbstractJPPFJob.class);
042  /**
043   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
044   */
045  private static boolean debugEnabled = log.isDebugEnabled();
046  /**
047   * Explicit serialVersionUID.
048   */
049  private static final long serialVersionUID = 1L;
050  /**
051   * The list of tasks to execute.
052   */
053  final List<Task<?>> tasks = new ArrayList<>();
054  /**
055   * The container for data shared between tasks.
056   * The data provider should be considered read-only, i.e. no modification will be returned back to the client application.
057   */
058  DataProvider dataProvider = null;
059  /**
060   * Determines whether the execution of this job is blocking on the client side.
061   */
062  boolean blocking = true;
063  /**
064   * The user-defined display name for this job.
065   */
066  String name = null;
067  /**
068   * The universal unique id for this job.
069   */
070  final String uuid;
071  /**
072   * The service level agreement between the job and the server.
073   */
074  JobSLA jobSLA = new JobSLA();
075  /**
076   * The service level agreement on the client side.
077   */
078  JobClientSLA jobClientSLA = new JobClientSLA();
079  /**
080   * The user-defined metadata associated with this job.
081   */
082  JobMetadata jobMetadata = new JPPFJobMetadata();
083  /**
084   * The object that holds the results of executed tasks.
085   */
086  final JobResults results = new JobResults();
087  /**
088   * The list of listeners registered with this job.
089   */
090  transient List<JobListener> listeners = new CopyOnWriteArrayList<>();
091  /**
092   * The persistence manager that enables saving and restoring the state of this job.
093   */
094  transient JobPersistence<?> persistenceManager = null;
095  /**
096   * The client that submitted this job.
097   */
098  transient JPPFClient client;
099  /**
100   * Whether this job has been cancelled.
101   */
102  transient AtomicBoolean cancelled = new AtomicBoolean(false);
103  /**
104   * Whether this job is being cancelled.
105   */
106  transient AtomicBoolean cancelling = new AtomicBoolean(false);
107  /**
108   * The status of this job.
109   */
110  private AtomicReference<JobStatus> status = new AtomicReference<>(JobStatus.SUBMITTED);
111  /**
112   * List of listeners registered to receive this job's status change notifications.
113   */
114  private transient List<JobStatusListener> statusListeners = new ArrayList<>();
115
116  /**
117   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
118   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
119   */
120  public AbstractJPPFJob() {
121    this(JPPFUuid.normalUUID());
122  }
123
124  /**
125   * Default constructor, creates a blocking job with no data provider, default SLA values and a priority of 0.
126   * This constructor generates a pseudo-random id as a string of 32 hexadecimal characters.
127   * @param jobUuid the uuid to assign to this job.
128   */
129  public AbstractJPPFJob(final String jobUuid) {
130    this.uuid = (jobUuid == null) ? JPPFUuid.normalUUID() : jobUuid;
131    name = this.uuid;
132  }
133
134  @Override
135  public String getUuid() {
136    return uuid;
137  }
138
139  @Override
140  public String getName() {
141    return name;
142  }
143
144  /**
145   * Set the user-defined display name for this job.
146   * @param name the display name as a string.
147   * @return this job, for method chaining.
148   */
149  public AbstractJPPFJob setName(final String name) {
150    this.name = name;
151    return this;
152  }
153
154  @Override
155  public int getTaskCount() {
156    return tasks.size();
157  }
158
159  @Override
160  public int hashCode() {
161    return 31 + (uuid == null ? 0 : uuid.hashCode());
162  }
163
164  @Override
165  public boolean equals(final Object obj) {
166    if (this == obj) return true;
167    if (!(obj instanceof AbstractJPPFJob)) return false;
168    final AbstractJPPFJob other = (AbstractJPPFJob) obj;
169    return (uuid == null) ? other.uuid == null : uuid.equals(other.uuid);
170  }
171
172  /**
173   * Get the object that holds the results of executed tasks.
174   * @return a {@link JobResults} instance.
175   */
176  public JobResults getResults() {
177    return results;
178  }
179
180  /**
181   * Get the count of the tasks in this job that haven completed.
182   * @return the number of executed tasks in this job.
183   * @since 4.2
184   */
185  public int executedTaskCount() {
186    return results.size();
187  }
188
189  /**
190   * Get the count of the tasks in this job that haven't yet been executed.
191   * @return the number of unexecuted tasks in this job.
192   * @since 4.2
193   */
194  public int unexecutedTaskCount() {
195    return tasks.size() - results.size();
196  }
197
198  /**
199   * Get the container for data shared between tasks.
200   * @return a <code>DataProvider</code> instance.
201   */
202  public DataProvider getDataProvider() {
203    return dataProvider;
204  }
205
206  /**
207   * Set the container for data shared between tasks.
208   * @param dataProvider a <code>DataProvider</code> instance.
209   * @return this job, for method chaining.
210   */
211  public AbstractJPPFJob setDataProvider(final DataProvider dataProvider) {
212    this.dataProvider = dataProvider;
213    return this;
214  }
215
216  /**
217   * Determine whether the execution of this job is blocking on the client side.
218   * @return true if the execution is blocking, false otherwise.
219   */
220  public boolean isBlocking() {
221    return blocking;
222  }
223
224  /**
225   * Specify whether the execution of this job is blocking on the client side.
226   * @param blocking true if the execution is blocking, false otherwise.
227   * @return this job, for method chaining.
228   */
229  public AbstractJPPFJob setBlocking(final boolean blocking) {
230    this.blocking = blocking;
231    return this;
232  }
233
234  @Override
235  public JobSLA getSLA() {
236    return jobSLA;
237  }
238
239  /**
240   * Get the job SLA for the client side.
241   * @return an instance of <code>JobSLA</code>.
242   */
243  public JobClientSLA getClientSLA() {
244    return jobClientSLA;
245  }
246
247  @Override
248  public JobMetadata getMetadata() {
249    return jobMetadata;
250  }
251
252  /**
253   * Get the service level agreement between the job and the server.
254   * @param jobSLA an instance of <code>JobSLA</code>.
255   * @exclude
256   */
257  public void setSLA(final JobSLA jobSLA) {
258    this.jobSLA = jobSLA;
259  }
260
261  /**
262   * Get the service level agreement between the job and the server.
263   * @param jobClientSLA an instance of <code>JobSLA</code>.
264   * @exclude
265   */
266  public void setClientSLA(final JobClientSLA jobClientSLA) {
267    this.jobClientSLA = jobClientSLA;
268  }
269
270  /**
271   * Set this job's metadata.
272   * @param jobMetadata a {@link JobMetadata} instance.
273   * @exclude
274   */
275  public void setMetadata(final JobMetadata jobMetadata) {
276    this.jobMetadata = jobMetadata;
277  }
278
279  /**
280   * Resolve this instance after deserialization.
281   * @return an instance of {@link Object}.
282   * @exclude
283   */
284  protected Object readResolve() {
285    listeners = new LinkedList<>();
286    return this;
287  }
288
289  @Override
290  public String toString() {
291    final StringBuilder sb = new StringBuilder();
292    sb.append(getClass().getSimpleName()).append('[');
293    sb.append("name=").append(name);
294    sb.append(", uuid=").append(uuid);
295    sb.append(", blocking=").append(blocking);
296    sb.append(", nbTasks=").append(tasks.size());
297    sb.append(", nbResults=").append(results.size());
298    sb.append(", jobSLA=").append(jobSLA);
299    sb.append(']');
300    return sb.toString();
301  }
302
303  /**
304   * Get the flag that determines whether this job has been cancelled.
305   * @return an {@code AtomicBoolean} instance.
306   * @exclude
307   */
308  public AtomicBoolean getCancelledFlag() {
309    return cancelled;
310  }
311
312
313  @Override
314  public JobStatus getStatus() {
315    return status.get();
316  }
317
318  @Override
319  public void setStatus(final JobStatus newStatus) {
320    if (status.get() != newStatus) {
321      if (debugEnabled) log.debug("job [" + uuid + "] status changing from '" + this.status + "' to '" + newStatus + "'");
322      this.status.set(newStatus);
323      fireStatusChangeEvent(newStatus);
324    }
325  }
326
327  /**
328   * Add a listener to the list of status listeners.
329   * @param listener the listener to add.
330   * @exclude
331   */
332  public void addJobStatusListener(final JobStatusListener listener) {
333    synchronized(statusListeners) {
334      if (debugEnabled) log.debug("job [" + uuid + "] adding status listener " + listener);
335      if (listener != null) statusListeners.add(listener);
336    }
337  }
338
339  /**
340   * Remove a listener from the list of status listeners.
341   * @param listener the listener to remove.
342   * @exclude
343   */
344  public void removeJobStatusListener(final JobStatusListener listener) {
345    synchronized(statusListeners) {
346      if (debugEnabled) log.debug("job [" + uuid + "] removing status listener " + listener);
347      if (listener != null) statusListeners.remove(listener);
348    }
349  }
350
351  /**
352   * Notify all listeners of a change of status for this job.
353   * @param newStatus the status for job event.
354   * @exclude
355   */
356  protected void fireStatusChangeEvent(final JobStatus newStatus) {
357    synchronized(statusListeners) {
358      if (debugEnabled) log.debug("job [" + uuid + "] fire status changed event for '" + newStatus + "'");
359      if (!statusListeners.isEmpty()) {
360        final JobStatusEvent event = new JobStatusEvent(uuid, newStatus);
361        for (final JobStatusListener listener: statusListeners) listener.jobStatusChanged(event);
362      }
363    }
364    results.wakeUp();
365  }
366
367  /**
368   * Get the flag that determines whether this job is being cancelled.
369   * @return an {@code AtomicBoolean} instance.
370   * @exclude
371   */
372  public AtomicBoolean getCancellingFlag() {
373    return cancelling;
374  }
375
376  /**
377   * Save the state of the {@code AbstractJPPFJob} instance to a stream (i.e.,serialize it).
378   * @param out the output stream to which to write the job. 
379   * @throws IOException if any I/O error occurs.
380   */
381  @SuppressWarnings("static-method")
382  private void writeObject(final ObjectOutputStream out) throws IOException {
383    out.defaultWriteObject();
384  }
385
386  /**
387   * Reconstitute the {@code AbstractJPPFJob} instance from a stream (i.e., deserialize it).
388   * @param in the input stream from which to read the job. 
389   * @throws IOException if any I/O error occurs.
390   * @throws ClassNotFoundException if the class of an object in the object graph can not be found.
391   */
392  private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
393    in.defaultReadObject();
394    statusListeners = new ArrayList<>();
395    listeners = new CopyOnWriteArrayList<>();
396    cancelled = new AtomicBoolean(false);
397    cancelling = new AtomicBoolean(false);
398  }
399}