001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.concurrent;
020
021import java.util.*;
022import java.util.concurrent.*;
023import java.util.concurrent.atomic.*;
024
025import org.jppf.client.*;
026import org.jppf.client.event.*;
027import org.jppf.node.protocol.Task;
028import org.jppf.utils.*;
029import org.jppf.utils.concurrent.ThreadUtils;
030import org.slf4j.*;
031
032/**
033 * Implementation of an {@link ExecutorService} wrapper around a {@link JPPFClient}.
034 * <p>This executor has two modes in which it functions:
035 * <p>1) Standard mode: in this mode each task or set of tasks submitted via one of the
036 * <code>invokeXXX()</code> or <code>submit()</code> methods is sent immediately to the server in its own JPPF job.
037 * <p>2) Batch mode: the <code>JPPFExecutorService</code> can be configured to only send tasks to the server
038 * when a number of tasks, submitted via one of the <code>invokeXXX()</code> or <code>submit()</code> methods,
039 * has been reached, or when a timeout specified in milliseconds has expired, or a combination of both.<br/>
040 * This facility is designed to optimize the task execution throughput, especially when many individual tasks are submitted
041 * using one of the <code>submit()</code> methods. This way, the tasks are sent to the server as a single job,
042 * instead of one job per task, and the execution will fully benefit from the parallel features of the JPPF server, including
043 * scheduling, load-balancing and parallel I/O.
044 * <p>In batch mode, the following behavior is to be noted:
045 * <ul>
046 * <li>If both size-based and time-based batching are used, tasks will be sent whenever one of the two thresholds is reached.
047 * Whenever this happens, both counters are reset. For instance, if the size-based threshold is reached, then the time-based counter
048 * will be reset as well, and the timeout counting will start from 0 again</li>
049 * <li>When a collection of tasks is submitted via one of the <code>invokeXXX()</code> methods, they are guaranteed
050 * to be all sent together in the same JPPF job. This is the one exception to the batch size threshold.</li>
051 * <li>If one of the threshold is changed while tasks are still pending execution, the behavior is unspecified</li>
052 * </ul>
053 * @see org.jppf.client.concurrent.JPPFExecutorService#setBatchSize(int)
054 * @see org.jppf.client.concurrent.JPPFExecutorService#setBatchTimeout(long)
055 * @author Laurent Cohen
056 */
057public class JPPFExecutorService extends JobListenerAdapter implements ExecutorService {
058  /**
059   * Logger for this class.
060   */
061  private static Logger log = LoggerFactory.getLogger(JPPFExecutorService.class);
062  /**
063   * Determines whether debug-level logging is enabled.
064   */
065  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
066  /**
067   * Count of instances of this class, use to name the batch handler thread.
068   */
069  private static final AtomicInteger instanceCount = new AtomicInteger(0);
070  /**
071   * The {@link JPPFClient} to which tasks executions are delegated.
072   */
073  JPPFClient client = null;
074  /**
075   * Maintains a list of the jobs submitted by this executor.
076   */
077  private final Map<String, JPPFJob> jobMap = new Hashtable<>();
078  /**
079   * Determines whether a shutdown has been requested.
080   */
081  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
082  /**
083   * Determines whether this executor has been terminated.
084   */
085  private AtomicBoolean terminated = new AtomicBoolean(false);
086  /**
087   * Handles the batching of tasks.
088   */
089  private BatchHandler batchHandler = null;
090
091  /**
092   * Initialize this executor service with the specified JPPF client.
093   * @param client the {@link JPPFClient} to use for job submission.
094   */
095  public JPPFExecutorService(final JPPFClient client) {
096    this(client, 0, 0L);
097  }
098
099  /**
100   * Initialize this executor service with the specified JPPF client, batch size and batch tiemout.
101   * @param client the {@link JPPFClient} to use for job submission.
102   * @param batchSize the minimum number of tasks that must be submitted before they are sent to the server.
103   * @param batchTimeout the maximum time to wait before the next batch of tasks is to be sent for execution.
104   */
105  public JPPFExecutorService(final JPPFClient client, final int batchSize, final long batchTimeout) {
106    if (debugEnabled) log.debug("new {} with batchSize={}, batchTimeout={}, client={}", getClass().getSimpleName(), batchSize, batchTimeout, client);
107    this.client = client;
108    batchHandler = new BatchHandler(this, batchSize, batchTimeout);
109    ThreadUtils.startThread(batchHandler, "BatchHandler-" + instanceCount.incrementAndGet());
110  }
111
112  /**
113   * Executes the given tasks, returning a list of Futures holding their status and results when all complete.
114   * @param <T> the type of results returned by the tasks.
115   * @param tasks the tasks to execute.
116   * @return a list of Futures representing the tasks, in the same sequential order as produced by the
117   * iterator for the given task list, each of which has completed.
118   * @throws InterruptedException if interrupted while waiting, in which case unfinished tasks are cancelled.
119   * @throws NullPointerException if tasks or any of its elements are null.
120   * @throws RejectedExecutionException if any task cannot be scheduled for execution.
121   * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)
122   */
123  @Override
124  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
125    return invokeAll(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
126  }
127
128  /**
129   * Executes the given tasks, returning a list of Futures holding their status and results
130   * when all complete or the timeout expires, whichever happens first.
131   * @param <T> the type of results returned by the tasks.
132   * @param tasks the tasks to execute.
133   * @param timeout the maximum time to wait.
134   * @param unit the time unit of the timeout argument.
135   * @return a list of Futures representing the tasks, in the same sequential order as produced by the
136   * iterator for the given task list, each of which has completed.
137   * @throws InterruptedException if interrupted while waiting, in which case unfinished tasks are cancelled.
138   * @throws NullPointerException if tasks or any of its elements are null.
139   * @throws RejectedExecutionException if any task cannot be scheduled for execution.
140   * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
141   */
142  @Override
143  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
144    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
145    if (timeout < 0) throw new IllegalArgumentException("timeout cannot be negative");
146    final long start = System.nanoTime();
147    final long millis = TimeUnit.MILLISECONDS.equals(unit) ? timeout : DateTimeUtils.toMillis(timeout, unit);
148    if (debugEnabled) log.debug("timeout in millis: " + millis);
149    final Pair<JPPFJob, Integer> pair = batchHandler.addTasks(tasks);
150    final JPPFJob job = pair.first();
151    int position = pair.second();
152    final List<Future<T>> futureList = new ArrayList<>(tasks.size());
153    for (final Callable<T> task : tasks) {
154      if (task == null) throw new NullPointerException("a task cannot be null");
155      final JPPFTaskFuture<T> future = new JPPFTaskFuture<>(job, position);
156      futureList.add(future);
157      final long elapsed = (System.nanoTime() - start) / 1_000_000L;
158      try {
159        future.getResult(millis - elapsed);
160      } catch (@SuppressWarnings("unused") final TimeoutException ignore) {
161      }
162      position++;
163    }
164    return futureList;
165  }
166
167  /**
168   * Ensure that all futures in the specified list that have not completed are marked as cancelled.
169   * @param <T> the type of results held by each future.
170   * @param futureList the list of futures to handle.
171   */
172  private static <T> void handleFutureList(final List<Future<T>> futureList) {
173    for (final Future<T> f : futureList) {
174      if (!f.isDone()) {
175        final JPPFTaskFuture<T> future = (JPPFTaskFuture<T>) f;
176        future.setDone();
177        future.setCancelled();
178      }
179    }
180  }
181
182  /**
183   * Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.
184   * Upon normal or exceptional return, tasks that have not completed are cancelled.
185   * @param <T> the type of results returned by the tasks.
186   * @param tasks the tasks to execute.
187   * @return the result returned by one of the tasks.
188   * @throws InterruptedException if interrupted while waiting.
189   * @throws NullPointerException if tasks or any of its elements are null.
190   * @throws IllegalArgumentException if tasks empty.
191   * @throws ExecutionException if no task successfully completes.
192   * @throws RejectedExecutionException if tasks cannot be scheduled for execution.
193   * @see java.util.concurrent.ExecutorService#invokeAny(java.util.Collection)
194   */
195  @Override
196  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
197    try {
198      return invokeAny(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
199    } catch (@SuppressWarnings("unused") final TimeoutException e) {
200      return null;
201    }
202  }
203
204  /**
205   * Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception),
206   * if any do before the given timeout elapses. Upon normal or exceptional return, tasks that have not completed are cancelled.
207   * @param <T> the type of results returned by the tasks.
208   * @param tasks the tasks to execute.
209   * @param timeout the maximum time to wait.
210   * @param unit the time unit of the timeout argument.
211   * @return the result returned by one of the tasks.
212   * @throws InterruptedException if interrupted while waiting.
213   * @throws NullPointerException if tasks or any of its elements are null.
214   * @throws IllegalArgumentException if tasks empty.
215   * @throws ExecutionException if no task successfully completes.
216   * @throws RejectedExecutionException if tasks cannot be scheduled for execution.
217   * @throws TimeoutException if the given timeout elapses before any task successfully completes.
218   * @see java.util.concurrent.ExecutorService#invokeAny(java.util.Collection)
219   * @see java.util.concurrent.ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
220   */
221  @Override
222  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
223    final List<Future<T>> futureList = invokeAll(tasks, timeout, unit);
224    handleFutureList(futureList);
225    for (final Future<T> f : futureList) {
226      if (f.isDone() && !f.isCancelled()) return f.get();
227    }
228    return null;
229  }
230
231  /**
232   * Submit a value-returning task for execution and returns a Future representing the pending results of the task.
233   * @param <T> the type of result returned by the task.
234   * @param task the task to execute.
235   * @return a Future representing pending completion of the task.
236   * @see java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable)
237   */
238  @Override
239  public <T> Future<T> submit(final Callable<T> task) {
240    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
241    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, (T) null);
242    return batchHandler.addTask(task);
243  }
244
245  /**
246   * Submits a Runnable task for execution and returns a Future representing that task.
247   * @param task the task to execute.
248   * @return a future representing the status of the task completion.
249   * @see java.util.concurrent.ExecutorService#submit(java.lang.Runnable)
250   */
251  @Override
252  public Future<?> submit(final Runnable task) {
253    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
254    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, (Object) null);
255    return batchHandler.addTask(task, (Object) null);
256  }
257
258  /**
259   * Submits a Runnable task for execution and returns a Future representing that task that will upon completion return the given result.
260   * @param <T> the type of result returned by the task.
261   * @param task the task to execute.
262   * @param result the result to return .
263   * @return a Future representing pending completion of the task, and whose get() method will return the given result upon completion.
264   * @see java.util.concurrent.ExecutorService#submit(java.lang.Runnable, java.lang.Object)
265   */
266  @Override
267  public <T> Future<T> submit(final Runnable task, final T result) {
268    if (shuttingDown.get()) throw new RejectedExecutionException("Shutdown has already been requested");
269    if (task instanceof Task<?>) return batchHandler.addTask((Task<?>) task, result);
270    return batchHandler.addTask(task, result);
271  }
272
273  /**
274   * Executes the given command at some time in the future.
275   * The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
276   * @param command the command to execute.
277   * @see java.util.concurrent.Executor#execute(java.lang.Runnable)
278   */
279  @Override
280  public void execute(final Runnable command) {
281    submit(command);
282  }
283
284  /**
285   * Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs,
286   * or the current thread is interrupted, whichever happens first.
287   * @param timeout the maximum time to wait.
288   * @param unit the time unit of the timeout argument.
289   * @return true if this executor terminated and false if the timeout elapsed before termination.
290   * @throws InterruptedException if interrupted while waiting.
291   * @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
292   */
293  @Override
294  public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
295    final long millis = DateTimeUtils.toMillis(timeout, unit);
296    waitForTerminated(millis);
297    return isTerminated();
298  }
299
300  /**
301   * Determine whether this executor has been shut down.
302   * @return true if this executor has been shut down, false otherwise.
303   * @see java.util.concurrent.ExecutorService#isShutdown()
304   */
305  @Override
306  public boolean isShutdown() {
307    return shuttingDown.get();
308  }
309
310  /**
311   * Determine whether all tasks have completed following shut down.
312   * Note that isTerminated is never true unless either shutdown or shutdownNow was called first.
313   * @return true if all tasks have completed following shut down.
314   * @see java.util.concurrent.ExecutorService#isTerminated()
315   */
316  @Override
317  public boolean isTerminated() {
318    return terminated.get();
319  }
320
321  /**
322   * Set the terminated status for this executor.
323   * @see java.util.concurrent.ExecutorService#isTerminated()
324   */
325  private void setTerminated() {
326    terminated.set(true);
327    synchronized (this) {
328      notifyAll();
329    }
330  }
331
332  /**
333   * Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
334   * @see java.util.concurrent.ExecutorService#shutdown()
335   */
336  @Override
337  public void shutdown() {
338    shuttingDown.set(true);
339    synchronized (jobMap) {
340      if (debugEnabled) log.debug("normal shutdown requested, " + jobMap.size() + " jobs pending");
341      terminated.compareAndSet(false, jobMap.isEmpty());
342    }
343    batchHandler.close();
344  }
345
346  /**
347   * Attempts to stop all actively executing tasks, halts the processing of waiting tasks,
348   * and returns a list of the tasks that were awaiting execution.<br>
349   * This implementation simply waits for all submitted tasks to terminate, due to the complexity of stopping remote tasks.
350   * @return a list of tasks that never commenced execution.
351   * @see java.util.concurrent.ExecutorService#shutdownNow()
352   */
353  @Override
354  public List<Runnable> shutdownNow() {
355    shuttingDown.set(true);
356    synchronized (jobMap) {
357      if (debugEnabled) log.debug("immediate shutdown requested, " + jobMap.size() + " jobs pending");
358      jobMap.clear();
359    }
360    setTerminated();
361    batchHandler.close();
362    waitForTerminated(Long.MAX_VALUE);
363    return null;
364  }
365
366  /**
367   * Submit the specified job for execution on the grid.
368   * @param job the job to submit.
369   * @throws Exception if any error occurs.
370   */
371  void submitJob(final JPPFJob job) throws Exception {
372    if (debugEnabled) log.debug("submitting job '" + job.getName() + "' with " + job.getJobTasks().size() + " tasks");
373    client.submitJob(job);
374    synchronized (jobMap) {
375      jobMap.put(job.getUuid(), job);
376    }
377  }
378
379  /**
380   * Wait until this executor has terminated, or the specified timeout has expired, whichever happens first.
381   * @param timeout the maximum time to wait, zero means indefinite time.
382   */
383  private void waitForTerminated(final long timeout) {
384    long elapsed = 0L;
385    final long maxWait = timeout <= 0L ? Long.MAX_VALUE : timeout;
386    final long start = System.nanoTime();
387    while (!isTerminated() && (elapsed < maxWait)) {
388      synchronized (this) {
389        try {
390          wait(timeout - elapsed);
391        } catch (final InterruptedException e) {
392          log.error(e.getMessage(), e);
393        }
394        elapsed = (System.nanoTime() - start) / 1_000_000L;
395      }
396    }
397  }
398
399  /**
400   * Called when all results from a job have been received.
401   * @param event the event object.
402   * @see org.jppf.client.concurrent.FutureResultCollectorListener#resultsComplete(org.jppf.client.concurrent.FutureResultCollectorEvent)
403   * @exclude
404   */
405  @Override
406  public void jobReturned(final JobEvent event) {
407    final String jobUuid = event.getJob().getUuid();
408    synchronized (jobMap) {
409      jobMap.remove(jobUuid);
410      if (isShutdown() && jobMap.isEmpty()) setTerminated();
411    }
412  }
413
414  /**
415   * Get the minimum number of tasks that must be submitted before they are sent to the server.
416   * @return the batch size as an int.
417   */
418  public int getBatchSize() {
419    return batchHandler.getBatchSize();
420  }
421
422  /**
423   * Set the minimum number of tasks that must be submitted before they are sent to the server.
424   * @param batchSize the batch size as an int.
425   * @return this executor service, for method chaining.
426   */
427  public JPPFExecutorService setBatchSize(final int batchSize) {
428    if (debugEnabled) log.debug("setting batchSize = {}", batchSize);
429    batchHandler.setBatchSize(batchSize);
430    return this;
431  }
432
433  /**
434   * Get the maximum time to wait before the next batch of tasks is to be sent for execution.
435   * @return the timeout as a long.
436   */
437  public long getBatchTimeout() {
438    return batchHandler.getBatchTimeout();
439  }
440
441  /**
442   * Set the maximum time to wait before the next batch of tasks is to be sent for execution.
443   * @param batchTimeout the timeout as a long.
444   * @return this executor service, for method chaining.
445   */
446  public JPPFExecutorService setBatchTimeout(final long batchTimeout) {
447    if (debugEnabled) log.debug("setting batchTimeout = {}", batchTimeout);
448    batchHandler.setBatchTimeout(batchTimeout);
449    return this;
450  }
451
452  /**
453   * Get the configuration for this executor service.
454   * @return an {@link ExecutorServiceConfiguration} instance.
455   */
456  public ExecutorServiceConfiguration getConfiguration() {
457    return batchHandler.getConfig();
458  }
459
460  /**
461   * Reset the configuration for this executor service to a blank state.
462   * @return an {@link ExecutorServiceConfiguration} instance.
463   */
464  public ExecutorServiceConfiguration resetConfiguration() {
465    return batchHandler.resetConfig();
466  }
467}