001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.utils;
020
021import java.util.concurrent.*;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.jppf.JPPFTimeoutException;
025import org.slf4j.*;
026
027/**
028 * A set of utility methods to facilitate concurrent and multithreaded rpogramming.
029 * @author Laurent Cohen
030 * @since 5.0
031 */
032public final class ConcurrentUtils {
033  /**
034   * Logger for this class.
035   */
036  private static Logger log = LoggerFactory.getLogger(ConcurrentUtils.class);
037  /**
038   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
039   */
040  private static boolean debugEnabled = log.isDebugEnabled();
041
042  /**
043   * Instantiation is not permitted.
044   */
045  private ConcurrentUtils() {
046  }
047
048  /**
049   * Wait until the specified condition is fulfilled, or the timeout expires, whichever happens first.
050   * The specified monitor may be notified at any time during the execution of this method, at which time it will check the condition again.
051   * @param monitor the monitor to wait for.
052   * @param condition the condition to check.
053   * @param millis the milliseconds part of the timeout. A value of zero means an infinite timeout.
054   * @return true if the condition is {@code null} or was fulfilled before the timeout expired, {@code false} otherwise.
055   * @throws IllegalArgumentException if the millis or nanos are negative, or if the nanos are greater than 999999.
056   */
057  public static boolean awaitCondition(final ThreadSynchronization monitor, final Condition condition, final long millis) throws IllegalArgumentException {
058    if (monitor == null) throw new IllegalArgumentException("monitor cannot be null");
059    if (condition == null) return true;
060    if (millis < 0L) throw new IllegalArgumentException("millis cannot be negative");
061    long timeout = (millis > 0L) ? millis : Long.MAX_VALUE;
062    boolean fulfilled = false;
063    long elapsed = 0L;
064    final long start = System.nanoTime();
065    while (!(fulfilled = condition.evaluate()) && ((elapsed = (System.nanoTime() - start) / 1_000_000L) < timeout)) {
066      monitor.goToSleep(1L);
067    }
068    return fulfilled;
069  }
070
071  /**
072   * Wait until the specified condition is fulfilled, or the timeout expires, whichever happens first.
073   * This method waits for 1 millisecond each time the condition check fails and until the condition is fulfilled or the timeout expires.
074   * @param condition the condition to check.
075   * @param millis the milliseconds part of the timeout. A value of zero means an infinite timeout.
076   * @return true if the condition is {@code null} or was fulfilled before the timeout expired, {@code false} otherwise.
077   * @throws IllegalArgumentException if the millis are negative.
078   */
079  public static boolean awaitCondition(final Condition condition, final long millis) throws IllegalArgumentException {
080    return awaitCondition(condition, millis, false);
081  }
082
083  /**
084   * Wait until the specified condition is fulfilled, or the timeout expires, whichever happens first.
085   * This method waits for 1 millisecond each time the condition check fails and until the condition is fulfilled or the timeout expires.
086   * @param condition the condition to check.
087   * @param millis the milliseconds part of the timeout. A value of zero means an infinite timeout.
088   * @param throwExceptionOnTImeout whether to raise an exception if the timeout expires.
089   * @return true if the condition is {@code null} or was fulfilled before the timeout expired, {@code false} otherwise.
090   * @throws IllegalArgumentException if the millis are negative.
091   * @throws JPPFTimeoutException if the timeout expires.
092   */
093  public static boolean awaitCondition(final Condition condition, final long millis, final boolean throwExceptionOnTImeout) throws IllegalArgumentException, JPPFTimeoutException {
094    if (condition == null) return true;
095    if (millis < 0L) throw new IllegalArgumentException("millis cannot be negative");
096    long timeout = millis > 0L ? millis : Long.MAX_VALUE;
097    long start = System.nanoTime();
098    ThreadSynchronization monitor = new ThreadSynchronization() { };
099    boolean fulfilled = false;
100    long elapsed = 0L;
101    while (!(fulfilled = condition.evaluate()) && ((elapsed = (System.nanoTime() - start) / 1_000_000L) < timeout)) {
102      monitor.goToSleep(1L);
103    }
104    if ((elapsed > timeout) && throwExceptionOnTImeout) throw new JPPFTimeoutException(String.format("exceeded timeout of %,d", timeout));
105    return fulfilled;
106  }
107
108  /**
109   * Wait until the specified condition is fulfilled, or the timeout expires, whichever happens first.
110   * This method waits for 1 millisecond each time the condition check fails and until the condition is fulfilled or the timeout expires.
111   * @param condition the condition to check.
112   * @param millis the milliseconds part of the timeout. A value of zero means an infinite timeout.
113   * @param throwExceptionOnTImeout whether to raise an exception if the timeout expires.
114   * @return true if the condition is {@code null} or was fulfilled before the timeout expired, {@code false} otherwise.
115   * @throws IllegalArgumentException if the millis are negative.
116   * @throws JPPFTimeoutException if the timeout expires.
117   */
118  public static boolean awaitInterruptibleCondition(final Condition condition, final long millis, final boolean throwExceptionOnTImeout)
119    throws IllegalArgumentException, JPPFTimeoutException {
120    if (condition == null) return true;
121    if (millis < 0L) throw new IllegalArgumentException("millis cannot be negative");
122    final long timeout = millis > 0L ? millis : Long.MAX_VALUE;
123    final CountDownLatch countDown = new CountDownLatch(1);
124    final ThreadSynchronization monitor = new ThreadSynchronization() { };
125    final AtomicBoolean fulfilled = new AtomicBoolean(false);
126    Runnable r = new Runnable() {
127      @Override
128      public void run() {
129        boolean ok = false;
130        synchronized(monitor) {
131          try {
132            while (!(ok = condition.evaluate())) monitor.wait(1L);
133          } catch (Exception e) {
134            ok = false;
135          }
136          fulfilled.set(ok);
137          countDown.countDown();
138        }
139      }
140    };
141    Thread thread = new Thread(r);
142    thread.start();
143    try {
144      countDown.await(timeout, TimeUnit.MILLISECONDS);
145    } catch (InterruptedException e) {
146      thread.interrupt();
147      countDown.countDown();
148    }
149    if (fulfilled.get()) return true;
150    if (throwExceptionOnTImeout) throw new JPPFTimeoutException(String.format("exceeded timeout of %,d", timeout));
151    return false;
152  }
153
154  /**
155   * Wait until the specified condition is fulfilled.
156   * @param condition the condition to check.
157   * @return true whenever the condition is {@code null} or gets fulfilled, {@code false} otherwise.
158   */
159  public static boolean awaitCondition(final Condition condition) {
160    return awaitCondition(condition, 0L);
161  }
162
163  /**
164   * Execute the specified {@code Callable} in a retry loop.
165   * @param <T> the return type of the {@code Callable}.
166   * @param maxTries the maximum number of attempts to execute the {@code Callable}.
167   * @param retryDelay the delay between two attempts.
168   * @param callable the {@code Callable} to execute.
169   * @return the return value of the first successful execution of the {@code Callable}.
170   * @throws Exception if after maxRetries the {@code Callable} threw an exception.
171   */
172  public static <T> T runWithRetry(final int maxTries, final long retryDelay, final Callable<T> callable) throws Exception {
173    if (maxTries <= 0) throw new IllegalArgumentException("maxTries bust be > 0");
174    if (retryDelay <= 0L) throw new IllegalArgumentException("retryDelay bust be > 0");
175    if (callable == null) return null;
176    int tryCount = 0;
177    Exception lastException = null;
178    while (tryCount < maxTries) {
179      try {
180        return callable.call();
181      } catch (Exception e) {
182        tryCount++;
183        lastException = e;
184        if (tryCount < maxTries) {
185          if (debugEnabled) log.debug(String.format("Got exception at attempt %d/%d, retrying in %d ms: %s", tryCount, maxTries, retryDelay, ExceptionUtils.getMessage(e)));
186          Thread.sleep(retryDelay);
187        }
188      }
189    }
190    throw lastException;
191  }
192
193  /**
194   * This interface represents a condition to evaluate to either {@code true} or {@code false}.
195   */
196  public static interface Condition {
197    /**
198     * Evaluate this condition.
199     * @return {@code true} if the condition is fulfilled, {@code false} otherwise.
200     */
201    boolean evaluate();
202  }
203}