001/*
002 * JPPF.
003 * Copyright (C) 2005-2019 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.concurrent;
020
021import java.util.*;
022import java.util.concurrent.*;
023import java.util.concurrent.atomic.*;
024
025import org.jppf.client.*;
026import org.jppf.client.event.*;
027import org.jppf.node.protocol.Task;
028import org.jppf.utils.*;
029import org.jppf.utils.concurrent.ThreadUtils;
030import org.slf4j.*;
031
032/**
033 * Implementation of an {@link ExecutorService} wrapper around a {@link JPPFClient}.
034 * <p>This executor has two modes in which it functions:
035 * <p>1) Standard mode: in this mode each task or set of tasks submitted via one of the
036 * <code>invokeXXX()</code> or <code>submit()</code> methods is sent immediately to the server in its own JPPF job.
037 * <p>2) Batch mode: the <code>JPPFExecutorService</code> can be configured to only send tasks to the server
038 * when a number of tasks, submitted via one of the <code>invokeXXX()</code> or <code>submit()</code> methods,
039 * has been reached, or when a timeout specified in milliseconds has expired, or a combination of both.<br/>
040 * This facility is designed to optimize the task execution throughput, especially when many individual tasks are submitted
041 * using one of the <code>submit()</code> methods. This way, the tasks are sent to the server as a single job,
042 * instead of one job per task, and the execution will fully benefit from the parallel features of the JPPF server, including
043 * scheduling, load-balancing and parallel I/O.
044 * <p>In batch mode, the following behavior is to be noted:
045 * <ul>
046 * <li>If both size-based and time-based batching are used, tasks will be sent whenever one of the two thresholds is reached.
047 * Whenever this happens, both counters are reset. For instance, if the size-based threshold is reached, then the time-based counter
048 * will be reset as well, and the timeout counting will start from 0 again</li>
049 * <li>When a collection of tasks is submitted via one of the <code>invokeXXX()</code> methods, they are guaranteed
050 * to be all sent together in the same JPPF job. This is the one exception to the batch size threshold.</li>
051 * <li>If one of the threshold is changed while tasks are still pending execution, the behavior is unspecified</li>
052 * </ul>
053 * @see org.jppf.client.concurrent.JPPFExecutorService#setBatchSize(int)
054 * @see org.jppf.client.concurrent.JPPFExecutorService#setBatchTimeout(long)
055 * @author Laurent Cohen
056 */
057public class JPPFExecutorService extends JobListenerAdapter implements ExecutorService {
058  /**
059   * Logger for this class.
060   */
061  private static Logger log = LoggerFactory.getLogger(JPPFExecutorService.class);
062  /**
063   * Determines whether debug-level logging is enabled.
064   */
065  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
066  /**
067   * Count of instances of this class, use to name the batch handler thread.
068   */
069  private static final AtomicInteger instanceCount = new AtomicInteger(0);
070  /**
071   * The {@link JPPFClient} to which tasks executions are delegated.
072   */
073  JPPFClient client;
074  /**
075   * Maintains a list of the jobs submitted by this executor.
076   */
077  private final Map<String, JPPFJob> jobMap = new Hashtable<>();
078  /**
079   * Determines whether a shutdown has been requested.
080   */
081  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
082  /**
083   * Determines whether this executor has been terminated.
084   */
085  private AtomicBoolean terminated = new AtomicBoolean(false);
086  /**
087   * Handles the batching of tasks.
088   */
089  private BatchHandler batchHandler;
090
091  /**
092   * Initialize this executor service with the specified JPPF client.
093   * @param client the {@link JPPFClient} to use for job submission.
094   */
095  public JPPFExecutorService(final JPPFClient client) {
096    this(client, 0, 0L);
097  }
098
099  /**
100   * Initialize this executor service with the specified JPPF client, batch size and batch tiemout.
101   * @param client the {@link JPPFClient} to use for job submission.
102   * @param batchSize the minimum number of tasks that must be submitted before they are sent to the server.
103   * @param batchTimeout the maximum time to wait before the next batch of tasks is to be sent for execution.
104   */
105  public JPPFExecutorService(final JPPFClient client, final int batchSize, final long batchTimeout) {
106    if (debugEnabled) log.debug("new {} with batchSize={}, batchTimeout={}, client={}", getClass().getSimpleName(), batchSize, batchTimeout, client);
107    this.client = client;
108    batchHandler = new BatchHandler(this, batchSize, batchTimeout);
109    ThreadUtils.startThread(batchHandler, "BatchHandler-" + instanceCount.incrementAndGet());
110  }
111
112  /**
113   * Executes the given tasks, returning a list of Futures holding their status and results when all complete.
114   * @param <T> the type of results returned by the tasks.
115   * @param tasks the tasks to execute.
116   * @return a list of Futures representing the tasks, in the same sequential order as produced by the
117   * iterator for the given task list, each of which has completed.
118   * @throws InterruptedException if interrupted while waiting, in which case unfinished tasks are cancelled.
119   * @throws NullPointerException if tasks or any of its elements are null.
120   * @throws RejectedExecutionException if any task cannot be scheduled for execution.
121   */
122  @Override
123  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
124    return invokeAll(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
125  }
126
127  /**
128   * Executes the given tasks, returning a list of Futures holding their status and results
129   * when all complete or the timeout expires, whichever happens first.
130   * @param <T> the type of results returned by the tasks.
131   * @param tasks the tasks to execute.
132   * @param timeout the maximum time to wait.
133   * @param unit the time unit of the timeout argument.
134   * @return a list of Futures representing the tasks, in the same sequential order as produced by the
135   * iterator for the given task list, each of which has completed.
136   * @throws InterruptedException if interrupted while waiting, in which case unfinished tasks are cancelled.
137   * @throws NullPointerException if tasks or any of its elements are null.
138   * @throws RejectedExecutionException if any task cannot be scheduled for execution.
139   */
140  @Override
141  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
142    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
143    if (timeout < 0) throw new IllegalArgumentException("timeout cannot be negative");
144    final long start = System.nanoTime();
145    final long millis = TimeUnit.MILLISECONDS.equals(unit) ? timeout : DateTimeUtils.toMillis(timeout, unit);
146    if (debugEnabled) log.debug("timeout in millis: " + millis);
147    final Pair<JPPFJob, Integer> pair = batchHandler.addTasks(tasks);
148    final JPPFJob job = pair.first();
149    int position = pair.second();
150    final List<Future<T>> futureList = new ArrayList<>(tasks.size());
151    for (final Callable<T> task : tasks) {
152      if (task == null) throw new NullPointerException("a task cannot be null");
153      final JPPFTaskFuture<T> future = new JPPFTaskFuture<>(job, position);
154      futureList.add(future);
155      final long elapsed = (System.nanoTime() - start) / 1_000_000L;
156      try {
157        future.getResult(millis - elapsed);
158      } catch (@SuppressWarnings("unused") final TimeoutException ignore) {
159      }
160      position++;
161    }
162    return futureList;
163  }
164
165  /**
166   * Ensure that all futures in the specified list that have not completed are marked as cancelled.
167   * @param <T> the type of results held by each future.
168   * @param futureList the list of futures to handle.
169   */
170  private static <T> void handleFutureList(final List<Future<T>> futureList) {
171    for (final Future<T> f : futureList) {
172      if (!f.isDone()) {
173        final JPPFTaskFuture<T> future = (JPPFTaskFuture<T>) f;
174        future.setDone();
175        future.setCancelled();
176      }
177    }
178  }
179
180  /**
181   * Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.
182   * Upon normal or exceptional return, tasks that have not completed are cancelled.
183   * @param <T> the type of results returned by the tasks.
184   * @param tasks the tasks to execute.
185   * @return the result returned by one of the tasks.
186   * @throws InterruptedException if interrupted while waiting.
187   * @throws NullPointerException if tasks or any of its elements are null.
188   * @throws IllegalArgumentException if tasks empty.
189   * @throws ExecutionException if no task successfully completes.
190   * @throws RejectedExecutionException if tasks cannot be scheduled for execution.
191   */
192  @Override
193  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
194    try {
195      return invokeAny(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
196    } catch (@SuppressWarnings("unused") final TimeoutException e) {
197      return null;
198    }
199  }
200
201  /**
202   * Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception),
203   * if any do before the given timeout elapses. Upon normal or exceptional return, tasks that have not completed are cancelled.
204   * @param <T> the type of results returned by the tasks.
205   * @param tasks the tasks to execute.
206   * @param timeout the maximum time to wait.
207   * @param unit the time unit of the timeout argument.
208   * @return the result returned by one of the tasks.
209   * @throws InterruptedException if interrupted while waiting.
210   * @throws NullPointerException if tasks or any of its elements are null.
211   * @throws IllegalArgumentException if tasks empty.
212   * @throws ExecutionException if no task successfully completes.
213   * @throws RejectedExecutionException if tasks cannot be scheduled for execution.
214   * @throws TimeoutException if the given timeout elapses before any task successfully completes.
215   */
216  @Override
217  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
218    final List<Future<T>> futureList = invokeAll(tasks, timeout, unit);
219    handleFutureList(futureList);
220    for (final Future<T> f : futureList) {
221      if (f.isDone() && !f.isCancelled()) return f.get();
222    }
223    return null;
224  }
225
226  /**
227   * Submit a value-returning task for execution and returns a Future representing the pending results of the task.
228   * @param <T> the type of result returned by the task.
229   * @param task the task to execute.
230   * @return a Future representing pending completion of the task.
231   */
232  @Override
233  public <T> Future<T> submit(final Callable<T> task) {
234    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
235    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, (T) null);
236    return batchHandler.addTask(task);
237  }
238
239  /**
240   * Submits a Runnable task for execution and returns a Future representing that task.
241   * @param task the task to execute.
242   * @see java.util.concurrent.ExecutorService#submit(java.lang.Runnable)
243   */
244  @Override
245  public Future<?> submit(final Runnable task) {
246    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
247    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, (Object) null);
248    return batchHandler.addTask(task, (Object) null);
249  }
250
251  /**
252   * Submits a Runnable task for execution and returns a Future representing that task that will upon completion return the given result.
253   * @param <T> the type of result returned by the task.
254   * @param task the task to execute.
255   * @param result the result to return .
256   * @return a Future representing pending completion of the task, and whose get() method will return the given result upon completion.
257   */
258  @Override
259  public <T> Future<T> submit(final Runnable task, final T result) {
260    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
261    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, result);
262    return batchHandler.addTask(task, result);
263  }
264
265  /**
266   * Executes the given command at some time in the future.
267   * The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
268   * @param command the command to execute.
269   * @see java.util.concurrent.Executor#execute(java.lang.Runnable)
270   */
271  @Override
272  public void execute(final Runnable command) {
273    submit(command);
274  }
275
276  /**
277   * Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs,
278   * or the current thread is interrupted, whichever happens first.
279   * @param timeout the maximum time to wait.
280   * @param unit the time unit of the timeout argument.
281   * @return true if this executor terminated and false if the timeout elapsed before termination.
282   * @throws InterruptedException if interrupted while waiting.
283   */
284  @Override
285  public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
286    final long millis = DateTimeUtils.toMillis(timeout, unit);
287    waitForTerminated(millis);
288    return isTerminated();
289  }
290
291  /**
292   * Determine whether this executor has been shut down.
293   * @return true if this executor has been shut down, false otherwise.
294   */
295  @Override
296  public boolean isShutdown() {
297    return shuttingDown.get();
298  }
299
300  /**
301   * Determine whether all tasks have completed following shut down.
302   * Note that isTerminated is never true unless either shutdown or shutdownNow was called first.
303   * @return true if all tasks have completed following shut down.
304   */
305  @Override
306  public boolean isTerminated() {
307    return terminated.get();
308  }
309
310  /**
311   * Set the terminated status for this executor.
312   */
313  private void setTerminated() {
314    terminated.set(true);
315    synchronized (this) {
316      notifyAll();
317    }
318  }
319
320  /**
321   * Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
322   */
323  @Override
324  public void shutdown() {
325    shuttingDown.set(true);
326    synchronized (jobMap) {
327      if (debugEnabled) log.debug("normal shutdown requested, " + jobMap.size() + " jobs pending");
328      terminated.compareAndSet(false, jobMap.isEmpty());
329    }
330    batchHandler.close();
331  }
332
333  /**
334   * Attempts to stop all actively executing tasks, halts the processing of waiting tasks,
335   * and returns a list of the tasks that were awaiting execution.<br>
336   * This implementation simply waits for all submitted tasks to terminate, due to the complexity of stopping remote tasks.
337   * @return a list of tasks that never commenced execution.
338   */
339  @Override
340  public List<Runnable> shutdownNow() {
341    shuttingDown.set(true);
342    synchronized (jobMap) {
343      if (debugEnabled) log.debug("immediate shutdown requested, " + jobMap.size() + " jobs pending");
344      jobMap.clear();
345    }
346    setTerminated();
347    batchHandler.close();
348    waitForTerminated(Long.MAX_VALUE);
349    return null;
350  }
351
352  /**
353   * Submit the specified job for execution on the grid.
354   * @param job the job to submit.
355   * @throws Exception if any error occurs.
356   */
357  void submitJob(final JPPFJob job) throws Exception {
358    if (debugEnabled) log.debug("submitting job '" + job.getName() + "' with " + job.getJobTasks().size() + " tasks");
359    client.submitAsync(job);
360    synchronized (jobMap) {
361      jobMap.put(job.getUuid(), job);
362    }
363  }
364
365  /**
366   * Wait until this executor has terminated, or the specified timeout has expired, whichever happens first.
367   * @param timeout the maximum time to wait, zero means indefinite time.
368   */
369  private void waitForTerminated(final long timeout) {
370    long elapsed = 0L;
371    final long maxWait = timeout <= 0L ? Long.MAX_VALUE : timeout;
372    final long start = System.nanoTime();
373    while (!isTerminated() && (elapsed < maxWait)) {
374      synchronized (this) {
375        try {
376          wait(timeout - elapsed);
377        } catch (final InterruptedException e) {
378          log.error(e.getMessage(), e);
379        }
380        elapsed = (System.nanoTime() - start) / 1_000_000L;
381      }
382    }
383  }
384
385  /**
386   * Called when all results from a job have been received.
387   * @param event the event object.
388   * @exclude
389   */
390  @Override
391  public void jobReturned(final JobEvent event) {
392    final String jobUuid = event.getJob().getUuid();
393    synchronized (jobMap) {
394      jobMap.remove(jobUuid);
395      if (isShutdown() && jobMap.isEmpty()) setTerminated();
396    }
397  }
398
399  /**
400   * Get the minimum number of tasks that must be submitted before they are sent to the server.
401   * @return the batch size as an int.
402   */
403  public int getBatchSize() {
404    return batchHandler.getBatchSize();
405  }
406
407  /**
408   * Set the minimum number of tasks that must be submitted before they are sent to the server.
409   * @param batchSize the batch size as an int.
410   * @return this executor service, for method chaining.
411   */
412  public JPPFExecutorService setBatchSize(final int batchSize) {
413    if (debugEnabled) log.debug("setting batchSize = {}", batchSize);
414    batchHandler.setBatchSize(batchSize);
415    return this;
416  }
417
418  /**
419   * Get the maximum time to wait before the next batch of tasks is to be sent for execution.
420   * @return the timeout as a long.
421   */
422  public long getBatchTimeout() {
423    return batchHandler.getBatchTimeout();
424  }
425
426  /**
427   * Set the maximum time to wait before the next batch of tasks is to be sent for execution.
428   * @param batchTimeout the timeout as a long.
429   * @return this executor service, for method chaining.
430   */
431  public JPPFExecutorService setBatchTimeout(final long batchTimeout) {
432    if (debugEnabled) log.debug("setting batchTimeout = {}", batchTimeout);
433    batchHandler.setBatchTimeout(batchTimeout);
434    return this;
435  }
436
437  /**
438   * Get the configuration for this executor service.
439   * @return an {@link ExecutorServiceConfiguration} instance.
440   */
441  public ExecutorServiceConfiguration getConfiguration() {
442    return batchHandler.getConfig();
443  }
444
445  /**
446   * Reset the configuration for this executor service to a blank state.
447   * @return an {@link ExecutorServiceConfiguration} instance.
448   */
449  public ExecutorServiceConfiguration resetConfiguration() {
450    return batchHandler.resetConfig();
451  }
452}