001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.utils.concurrent;
020
021import java.util.*;
022import java.util.concurrent.*;
023import java.util.concurrent.atomic.*;
024import java.util.concurrent.locks.*;
025
026import org.jppf.utils.*;
027import org.slf4j.Logger;
028
029/**
030 * A thread pool based on an ubounded queue which supports a core and maximum number of threads, along with a TTL for non-core threads.
031 * <p>Core threads are always live and are always prefered when available for new tasks.
032 * Non-core threads are created up to the maximum number of threads, after which tasks are simpy put into the queue.
033 * @author Laurent Cohen
034 */
035public class JPPFThreadPool extends AbstractExecutorService {
036  /**
037   * Logger for this class.
038   */
039  private static final Logger log = LoggingUtils.getLogger(JPPFThreadPool.class, false);
040  /**
041   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
042   */
043  private static final boolean traceEnabled = log.isTraceEnabled();
044  /**
045   * The tasks queue.
046   */
047  private final BlockingQueue<Runnable> queue;
048  /**
049   * The thread factory.
050   */
051  private final ThreadFactory threadFactory;
052  /**
053   * The number of core threads.
054   */
055  private final int coreThreads;
056  /**
057   * The maximum number of threads.
058   */
059  private volatile int maxThreads = Integer.MAX_VALUE;
060  /**
061   * The thread time-to-live.
062   */
063  private volatile long ttl = Long.MAX_VALUE;
064  /**
065   * The live workers.
066   */
067  private final Map<Worker, Boolean> workers = new HashMap<>();
068  /**
069   * Synchronizes access to the workers map.
070   */
071  private final Lock workersLock = new ReentrantLock();
072  /**
073   * Synchronizes access to the workers map.
074   */
075  private final Lock mainLock = new ReentrantLock();
076  /**
077   * Generates sequential worker ids.
078   */
079  private final AtomicInteger workerIdSequence = new AtomicInteger(0);
080  /**
081   * Peak count of threads.
082   */
083  private final SynchronizedInteger peakThreadCount = new SynchronizedInteger(0);
084  /**
085   * Whether this executor has been shutdown.
086   */
087  private final AtomicBoolean shutdown = new AtomicBoolean(false);
088  /**
089   * Whether this executor has been shutdown immediately.
090   */
091  private volatile boolean immediateShutdown;
092  /**
093   * Whether this executor is terminated.
094   */
095  private final AtomicBoolean terminated = new AtomicBoolean(false);
096  /**
097   *
098   */
099  private final Stats stats = new Stats();
100  /**
101   * Current number of idle workers.
102   */
103  private int idleWorkers;
104  /**
105   * Current number of busy workers.
106   */
107  private int busyWorkers;
108
109  /**
110   * Create a fixed size thread pool with the specified number of threads, infinite thread time-to-live and a {@link Executors#defaultThreadFactory() default thread factory}.
111   * @param coreThreads the number of threads in the pool.
112   */
113  public JPPFThreadPool(final int coreThreads) {
114    this(coreThreads, coreThreads, Long.MAX_VALUE, Executors.defaultThreadFactory());
115  }
116
117  /**
118   * Initialize with the specified number of core threads and thread factory.
119   * @param coreThreads the number of core threads.
120   * @param threadFactory the thread factory.
121   */
122  public JPPFThreadPool(final int coreThreads, final ThreadFactory threadFactory) {
123    this(coreThreads, coreThreads, -1L, threadFactory);
124  }
125
126  /**
127   * Initialize with the specified number of core threads, maximum number of threads, thread ttl and a {@link Executors#defaultThreadFactory() default thread factory}.
128   * @param coreThreads the number of core threads.
129   * @param maxThreads the maximum number of threads.
130   * @param ttl the thread time-to-live in milliseconds.
131   */
132  public JPPFThreadPool(final int coreThreads, final int maxThreads, final long ttl) {
133    this(coreThreads, maxThreads, ttl, Executors.defaultThreadFactory());
134  }
135
136  /**
137   * Initialize with the specified number of core threads, maximum number of threads, ttl and thread factory.
138   * @param coreThreads the number of core threads.
139   * @param maxThreads the maximum number of threads.
140   * @param ttl the thread time-to-live in milliseconds.
141   * @param threadFactory the thread factory.
142   */
143  public JPPFThreadPool(final int coreThreads, final int maxThreads, final long ttl, final ThreadFactory threadFactory) {
144    this(coreThreads, maxThreads, ttl, threadFactory, new LinkedBlockingQueue<Runnable>());
145  }
146
147  /**
148   * Initialize with the specified number of core threads, maximum number of threads, ttl and thread factory.
149   * @param coreThreads the number of core threads.
150   * @param maxThreads the maximum number of threads.
151   * @param ttl the thread time-to-live in milliseconds.
152   * @param threadFactory the thread factory.
153   * @param queue the queue to use.
154   */
155  public JPPFThreadPool(final int coreThreads, final int maxThreads, final long ttl, final ThreadFactory threadFactory, final BlockingQueue<Runnable> queue) {
156    this.queue = queue;
157    this.coreThreads = coreThreads;
158    this.maxThreads = maxThreads;
159    this.ttl = ((ttl <= 0L) || (ttl == Long.MAX_VALUE) ? -1L : ttl);
160    this.threadFactory = threadFactory;
161    //for (int i=0; i<coreThreads; i++) new Worker(null);
162  }
163
164  /**
165   * @return the maximum number of threads.
166   */
167  public int getMaxThreads() {
168    return maxThreads;
169  }
170
171  /**
172   * Set the maximum number of threads.
173   * @param maxThreads the new maximum number of threads.
174   */
175  public void setMaxThreads(final int maxThreads) {
176    this.maxThreads = maxThreads;
177  }
178
179  /**
180   * @return the non-threads' time-to-live.
181   */
182  public long getTtl() {
183    return ttl;
184  }
185
186  /**
187   * Set the non-core threads' time-to-live.
188   * @param ttl the ttl in millis.
189   */
190  public void setTtl(final long ttl) {
191    this.ttl = ttl;
192  }
193
194  /**
195   * @return the peak thread count for this thread pool.
196   */
197  public int getPeakThreadCount() {
198    return peakThreadCount.get();
199  }
200
201  /**
202   * Execute the specified task some time in the future.
203   * @param task the task to execute.
204   */
205  @Override
206  public void execute(final Runnable task) {
207    if (task == null) throw new NullPointerException("the task cannot be null");
208    int count = 0;
209    for (;;) {
210      if (!queue.offer(task)) {
211        if (addWorker() || (++count >= 10)) {
212          new Worker(task);
213          break;
214        }
215      } else {
216        stats.queued.incrementAndGet();
217        break;
218      }
219    }
220    /*
221    if (addWorker()) new Worker(task);
222    else if (!queue.offer(task)) new Worker(task);
223    else stats.queued.incrementAndGet();
224    */
225    stats.submitted.incrementAndGet();
226    if (traceEnabled) log.trace("adding task {} to queue", task);
227  }
228
229  /**
230   * @return whether to start a new worker.
231   */
232  private boolean addWorker() {
233    final int idle, busy;
234    synchronized(mainLock) {
235      idle = idleWorkers;
236      busy = busyWorkers;
237    }
238    return (idle + busy < coreThreads) || ((idle <= 0) && (busy < maxThreads));
239  }
240
241  @Override
242  public void shutdown() {
243    if (shutdown.compareAndSet(false, true)) {
244      notifyWorkers(false);
245      if (traceEnabled) log.trace(String.format("shutdown requested: queue size = %,d; worker count = %,d", queue.size(), idleWorkers + busyWorkers));
246    }
247  }
248
249  @Override
250  public List<Runnable> shutdownNow() {
251    if (shutdown.compareAndSet(false, true)) {
252      immediateShutdown = true;
253      notifyWorkers(true);
254      if (traceEnabled) log.trace("immediate shutdown requested");
255      final List<Runnable> remainingTasks = new ArrayList<>(queue.size());
256      queue.drainTo(remainingTasks);
257      return remainingTasks;
258    }
259    return null;
260  }
261
262  /**
263   * Interrupt the remaining live workers.
264   * @param interrupt whether to interrupt the workers.
265   */
266  private void notifyWorkers(final boolean interrupt) {
267    synchronized(workersLock) {
268      for (final Map.Entry<Worker, Boolean> workerEntry: workers.entrySet()) {
269        final Worker worker = workerEntry.getKey();
270        worker.workerShutdown = true;
271        if (interrupt || worker.idle) worker.interrupt();
272      }
273    }
274  }
275
276  @Override
277  public boolean isShutdown() {
278    return shutdown.get();
279  }
280
281  @Override
282  public boolean isTerminated() {
283    if (!shutdown.get()) return false;
284    if (terminated.get()) return true;
285    final int size;
286    synchronized(workersLock) {
287      size = workers.size();
288    }
289    final boolean b = (size <= 0) && queue.isEmpty();
290    terminated.set(b);
291    return b;
292  }
293
294  @Override
295  public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
296    if (isTerminated()) return true;
297    final long millis = unit.toMillis(timeout);
298    final ConcurrentUtils.Condition condition = new ConcurrentUtils.Condition() {
299      @Override
300      public boolean evaluate() {
301        return isTerminated();
302      }
303    };
304    return ConcurrentUtils.awaitCondition(new ThreadSynchronization(), condition, millis, 10L);
305  }
306
307  @Override
308  public String toString() {
309    return new StringBuilder(getClass().getSimpleName()).append('[')
310      .append("coreThreads=").append(coreThreads)
311      .append(", maxThreads=").append(maxThreads)
312      .append(", ttl=").append(ttl)
313      .append(", peakThreads=").append(peakThreadCount.get())
314      .append(", stats={").append(stats).append('}')
315      .append(']').toString();
316  }
317
318  /**
319   * Instances of this class represent the worker threads in the pool.
320   */
321  private final class Worker implements Runnable, Comparable<Worker> {
322    /**
323     * The associated thread.
324     */
325    private final Thread thread;
326    /**
327     * This worker's id.
328     */
329    private final int id;
330    /**
331     * The state of this worker.
332     */
333    private boolean idle;
334    /**
335     * First task assigned to this worker at construction time, if any.
336     */
337    private Runnable firstTask;
338    /**
339     * Whether this worker was notified of an executor shutdown.
340     */
341    volatile boolean workerShutdown;
342    /**
343     * 
344     */
345    private boolean hasBusy;
346
347    /**
348     * Initialize this worker with its first task to execute.
349     * An associated thread is also created, using the thread pool's {@link ThreadFactory}.
350     * @param firstTask .
351     */
352    private Worker(final Runnable firstTask) {
353      id = workerIdSequence.incrementAndGet();
354      this.thread = threadFactory.newThread(this);
355      this.firstTask = firstTask;
356      this.idle = (firstTask == null);
357      final int size;
358      synchronized(workersLock) {
359        workers.put(this, Boolean.TRUE);
360        size = workers.size();
361      }
362      peakThreadCount.compareAndSet(Operator.LESS_THAN, size);
363      synchronized(mainLock) {
364        if (idle) idleWorkers++;
365        else busyWorkers++;
366        hasBusy = busyWorkers > 0;
367      }
368      thread.start();
369    }
370
371    @Override
372    public void run() {
373      Runnable task = firstTask;
374      firstTask = null;
375      while (!shouldStop()) {
376        if (task == null) {
377          try {
378            setIdle(true);
379            if (traceEnabled) log.trace("{} entering IDLE state", this);
380            final long timeout = ttl;
381            task = (timeout <= 0L) ? queue.take() : queue.poll(timeout, TimeUnit.MILLISECONDS);
382            if ((task == null) && (id > coreThreads)) break;
383          } catch (final InterruptedException e) {
384            if (traceEnabled) log.trace("terminating {} due to interrupt: {}", this, ExceptionUtils.getStackTrace(e));
385            break;
386          }
387        }
388        if (task != null) {
389          setIdle(false);
390          if (traceEnabled) log.trace("{} executing task {}", this, task);
391          try {
392            task.run();
393          } catch (final Exception e) {
394            if (traceEnabled) log.trace(String.format("%s caught exception while executing task %s:%n%s", this, task, ExceptionUtils.getStackTrace(e)));
395          } finally {
396            task = null;
397            stats.completed.incrementAndGet();
398          }
399        }
400      }
401      synchronized(workersLock) {
402        workers.remove(this);
403      }
404      synchronized(mainLock) {
405        if (idle) idleWorkers--;
406        else busyWorkers--;
407        hasBusy = busyWorkers > 0;
408      }
409      if (traceEnabled) log.trace("terminating {}", this);
410    }
411
412    /**
413     * @return whether this worker should stop processing tasks.
414     */
415    private boolean shouldStop() {
416      return immediateShutdown || (this.workerShutdown && queue.isEmpty() && hasBusy);
417      //return this.workerShutdown && (immediateShutdown || (queue.isEmpty() && !hasBusy));
418    }
419
420    /**
421     * Interrupt this worker.
422     */
423    private void interrupt() {
424      if ((thread != null) && thread.isAlive() && !thread.isInterrupted()) thread.interrupt();
425    }
426
427    /**
428     * Set the state of this worker.
429     * @param idle the new state.
430     */
431    private void setIdle(final boolean idle) {
432      if (this.idle == idle) return;
433      if (traceEnabled) log.trace(String.format("%s transitioning from %s to %s", this, this.idle, idle));
434      this.idle = idle;
435      updateWorkerCounts(this.idle ? -1 : 1);
436    }
437
438    /**
439     * Called when a worker changes state.
440     * @param update .
441     */
442    private void updateWorkerCounts(final int update) {
443      synchronized(mainLock) {
444        idleWorkers -= update;
445        busyWorkers += update;
446        hasBusy = busyWorkers > 0;
447      }
448    }
449
450    @Override
451    public String toString() {
452      return getClass().getSimpleName() + "[id=" + id + ']';
453    }
454
455    @Override
456    public int compareTo(final Worker other) {
457      //return id < other.id ? -1 : (id > other.id ? 1 : 0);
458      if (idle) return other.idle ? compare(id, other.id) : -1;
459      return other.idle ? 1 : compare(id, other.id); 
460    }
461
462    /**
463     * 
464     * @param i1 .
465     * @param i2 .
466     * @return .
467     */
468    private int compare(final int i1, final int i2) {
469      return i1 < i2 ? -1 : (i1 > i2 ? 1 : 0);
470    }
471  }
472
473  /**
474   * Statistic for the executor.
475   */
476  private static class Stats {
477    /**
478     * 
479     */
480    private AtomicInteger submitted = new AtomicInteger(), queued = new AtomicInteger(), completed = new AtomicInteger();
481
482    @Override
483    public String toString() {
484      return String.format("submitted: %,d, queued=%,d, completed: %,d", submitted.get(), queued.get(), completed.get());
485    }
486  }
487}