001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.job.persistence.impl;
020
021import java.io.InputStream;
022import java.util.*;
023import java.util.concurrent.*;
024
025import org.jppf.job.persistence.*;
026import org.jppf.utils.*;
027import org.jppf.utils.concurrent.JPPFThreadFactory;
028import org.jppf.utils.configuration.JPPFProperties;
029import org.slf4j.*;
030
031/**
032 * An asynchronous wrapper for any other job persistence implementation. The methods of {@link JobPersistence} that do not return a result
033 * ({@code void} return type) are non-blocking and return immediately. All other methods will block until they are executed and their result is available.
034 * The execution of the interface's methods are delegated to a thread pool, whose size can be defined in the configuration or defaults to {@link Runtime#availableProcessors() Runtime.getRuntime().availableProcessors()}.
035 * <p>This asynchronous persistence can be configured in two forms:
036 * <pre class="jppf_pre">
037 * <span style="color: green"># shorten the configuration value for clarity</span>
038 * wrapper = org.jppf.job.persistence.impl.AsynchronousPersistence
039 * <span style="color: green"># asynchronous persistence with default thread pool size</span>
040 * jppf.job.persistence = ${wrapper} &lt;actual_persistence&gt; &lt;param1&gt; ... &lt;paramN&gt;
041 * <span style="color: green"># asynchronous persistence with a specified thread pool size</span>
042 * jppf.job.persistence = ${wrapper} &lt;pool_size&gt; &lt;actual_persistence&gt; &lt;param1&gt; ... &lt;paramN&gt;</pre>
043 * <p>Here is an example configuration for an asynchronous database persistence:
044 * <pre class="jppf_pre">
045 * pkg = org.jppf.job.persistence.impl
046 * <span style="color: green"># asynchronous database persistence with pool of 4 threads,</span>
047 * <span style="color: green"># a table named 'JPPF_TEST' and datasource named 'JobDS'</span>
048 * jppf.job.persistence = ${pkg}.AsynchronousPersistence 4 ${pkg}.DefaultDatabasePersistence JPPF_TEST JobDS</pre>
049 * @author Laurent Cohen
050 */
051public class AsynchronousPersistence implements JobPersistence {
052  /**
053   * Logger for this class.
054   */
055  private static Logger log = LoggerFactory.getLogger(AsynchronousPersistence.class);
056  /**
057   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
058   */
059  private static boolean debugEnabled = log.isDebugEnabled();
060  /**
061   * The actual persistence implementation to which operations are delegated.
062   */
063  private final JobPersistence delegate;
064  /**
065   * Performs asycnhronous operations.
066   */
067  private final ExecutorService executor;
068  /**
069   * Percentage of used heap above which asynchronous mode is switched off, to prevent out of memory conditions.
070   * When the used heap passes below this threshold, the asynchronous mode resumes.
071   */
072  private static final double MEMORY_THRESHOLD = JPPFConfiguration.get(JPPFProperties.JOB_PERSISTENCE_MEMORY_THRESHOLD);
073
074  /**
075   * Initialize this persistence with the specified parameters.
076   * @param params if the first parameter is a number, then it represents the number of threads that perform the asynchronous processing, and the remaining parameters
077   * represent the wrapped persistence implementation. Otherwise it represents the wrapped persistence and the remaining parameters are those of the wrapped persistence. 
078   * @throws JobPersistenceException if any error occurs.
079   */
080  public AsynchronousPersistence(final String...params) throws JobPersistenceException {
081    if ((params == null) || (params.length < 1) || (params[0] == null)) throw new JobPersistenceException("too few parameters");
082    int n = 1;
083    String[] forwardParams = null;
084    try {
085      n = Integer.valueOf(params[0]);
086      forwardParams = new String[params.length - 1];
087      System.arraycopy(params, 1, forwardParams, 0, params.length - 1);
088    } catch (@SuppressWarnings("unused") final NumberFormatException e) {
089      forwardParams = params;
090    }
091    if (n < 1) n = 1;
092    this.delegate = ReflectionHelper.invokeDefaultOrStringArrayConstructor(JobPersistence.class, getClass().getSimpleName(), forwardParams);
093    if (delegate == null) throw new JobPersistenceException("could not create job persistence " + Arrays.asList(params));
094    executor = createExecutor(n);
095  }
096
097  @Override
098  public void store(final Collection<PersistenceInfo> infos) throws JobPersistenceException {
099    if (debugEnabled) log.debug("storing {}", infos);
100    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) delegate.store(infos);
101    else {
102      execute(new PersistenceTask<Void>(false) {
103        @Override
104        public Void execute() throws JobPersistenceException {
105          delegate.store(infos);
106          return null;
107        }
108      });
109    }
110  }
111
112  @Override
113  public List<InputStream> load(final Collection<PersistenceInfo> infos) throws JobPersistenceException {
114    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) return delegate.load(infos);
115    return submit(new PersistenceTask<List<InputStream>>(true) {
116      @Override
117      public List<InputStream> execute() throws JobPersistenceException {
118        return delegate.load(infos);
119      }
120    });
121  }
122
123  @Override
124  public List<String> getPersistedJobUuids() throws JobPersistenceException {
125    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) return delegate.getPersistedJobUuids();
126    return submit(new PersistenceTask<List<String>>(true) {
127      @Override
128      public List<String> execute() throws JobPersistenceException {
129        return delegate.getPersistedJobUuids();
130      }
131    });
132  }
133
134  @Override
135  public int[] getTaskPositions(final String jobUuid) throws JobPersistenceException {
136    return getPositions(jobUuid, PersistenceObjectType.TASK);
137  }
138
139  @Override
140  public int[] getTaskResultPositions(final String jobUuid) throws JobPersistenceException {
141    return getPositions(jobUuid, PersistenceObjectType.TASK_RESULT);
142  }
143
144  /**
145   * Get the  positions for all the objects of the specified type in the specified job.
146   * @param jobUuid the uuid of the job for which to get the positions.
147   * @param type the type of object for which to get the positions, one of {@link PersistenceObjectType#TASK TASK} or {@link PersistenceObjectType#TASK_RESULT TASK_RESULT}.
148   * @return an array of int holding the positions.
149   * @throws JobPersistenceException if any error occurs.
150   */
151  private int[] getPositions(final String jobUuid, final PersistenceObjectType type) throws JobPersistenceException {
152    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) return (type == PersistenceObjectType.TASK) ? delegate.getTaskPositions(jobUuid) : delegate.getTaskResultPositions(jobUuid);
153    return submit(new PersistenceTask<int[]>(true) {
154      @Override
155      public int[] execute() throws JobPersistenceException {
156        return (type == PersistenceObjectType.TASK) ? delegate.getTaskPositions(jobUuid) : delegate.getTaskResultPositions(jobUuid);
157      }
158    });
159  }
160
161  @Override
162  public void deleteJob(final String jobUuid) throws JobPersistenceException {
163    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) delegate.deleteJob(jobUuid);
164    else execute(new PersistenceTask<Void>(false) {
165      @Override
166      public Void execute() throws JobPersistenceException {
167        delegate.deleteJob(jobUuid);
168        return null;
169      }
170    });
171  }
172
173  @Override
174  public boolean isJobPersisted(final String jobUuid) throws JobPersistenceException {
175    if (SystemUtils.heapUsagePct() >= MEMORY_THRESHOLD) return delegate.isJobPersisted(jobUuid);
176    return submit(new PersistenceTask<Boolean>(true) {
177      @Override
178      public Boolean execute() throws JobPersistenceException {
179        return delegate.isJobPersisted(jobUuid);
180      }
181    });
182  }
183
184  /**
185   * @param max the maximum thread pool size.
186   * @return an {@link ExecutorService}.
187   */
188  private static ExecutorService createExecutor(final int max) {
189    final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
190    return new ThreadPoolExecutor(1, max, 0L, TimeUnit.MILLISECONDS, queue, new JPPFThreadFactory("AsyncPersistence"));
191  }
192
193  /**
194   * Submit the specified persistence task for asynchronous execution and return the results once they are available.
195   * @param task the task to execute.
196   * @param <T> the type of result to return.
197   * @return the expected result form the task.
198   * @throws JobPersistenceException if any error occurs.
199   */
200  private <T> T submit(final PersistenceTask<T> task) throws JobPersistenceException {
201    try {
202      final Future<PersistenceTask<T>> f = executor.submit(task, task);
203      final PersistenceTask<T> t = f.get();
204      if (t.exception != null) throw t.exception;
205      if (debugEnabled) log.debug("got result = {}", t.result);
206      return t.result;
207    } catch (final ClassCastException e) {
208      log.error(e.getMessage(), e);
209      throw new JobPersistenceException(e);
210    } catch (final InterruptedException | ExecutionException e) {
211      throw new JobPersistenceException(e);
212    }
213  }
214
215  /**
216   * Submit the specified persistence task for execution some time in the future.
217   * @param task the task to execute.
218   * @param <T> the type of result the task returns.
219   */
220  private <T> void execute(final PersistenceTask<T> task) {
221    executor.execute(task);
222  }
223
224  /**
225   * A Runnable task that performs asynchronous delegation of a single operation
226   * of a concrete, sequential synchronous persistence implementation.
227   * @param <T> the type of result this task returns.
228   */
229  private static abstract class PersistenceTask<T> implements Runnable {
230    /**
231     * Logger for this class.
232     */
233    private static Logger logger = LoggerFactory.getLogger(AsynchronousPersistence.PersistenceTask.class);
234    /**
235     * The optional result of this task's execution.
236     */
237    T result;
238    /**
239     * An exception that may result from this task's execution.
240     */
241    JobPersistenceException exception;
242    /**
243     * Whether this task is expected to have a result.
244     */
245    private final boolean hasResult;
246
247    /**
248     * @param hasResult whether this task is expected to have a result.
249     */
250    private PersistenceTask(final boolean hasResult) {
251      this.hasResult = hasResult;
252    }
253
254    @Override
255    public void run() {
256      try {
257        result = execute();
258      } catch (final JobPersistenceException e) {
259        exception = e;
260        if (!hasResult) logger.error(e.getMessage(), e);
261      }
262    }
263
264    /**
265     * Execute the task.
266     * @return the execution result.
267     * @throws JobPersistenceException if any error occurs.
268     */
269    abstract T execute() throws JobPersistenceException;
270  }
271}