001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.job.persistence.impl;
020
021import java.io.*;
022import java.sql.*;
023import java.util.*;
024
025import org.jppf.job.persistence.*;
026import org.jppf.persistence.AbstractDatabasePersistence;
027import org.jppf.utils.*;
028import org.jppf.utils.configuration.JPPFProperties;
029import org.slf4j.*;
030
031/**
032 * A job persistence implementation which stores jobs in a single database table. The table has the following structure:<br>
033 *
034 * <pre class="jppf_pre">
035 * CREATE TABLE &lt;table_name&gt; (
036 *   UUID varchar(250) NOT NULL,
037 *   TYPE varchar(20) NOT NULL,
038 *   POSITION int NOT NULL,
039 *   CONTENT blob NOT NULL,
040 *   PRIMARY KEY (UUID, TYPE, POSITION)
041 * );</pre>
042 *
043 * <p>Where:
044 * <ul style="margin-top: 0px">
045 *   <li>the UUID column represents the job uuid</li>
046 *   <li>the TYPE column represents the type of object, taken from the {@link PersistenceObjectType} enum</li>
047 *   <li>the POSITION column represents the object's position in the job if {@code TYPE} is 'task' or 'task_result', otherwise -1</li>
048 *   <li>the CONTENT column represents the serialized object</li>
049 * </ul>
050 *
051 * <p>The table name is specified in the JPPF configuration like this:<br>
052 * {@code jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence <table_name>}<br>
053 * If unspecified, it defaults to the {@linkplain #DEFAULT_TABLE default table name} 'JOB_PERSISTENCE'.
054 * If the table does not exist, JPPF will attempt to create it. If this fails for any reason, for instance if the user does not have sufficient privileges,
055 * then persistence will be disabled.
056 *
057 * <p>This database persistence implementation uses a <a href="https://github.com/brettwooldridge/HikariCP">HikariCP</a> connection pool and datasource.
058 * The datasource is specified by name in the configuration:<br>
059 * {@code jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence <table_name> <datasource_name>}<br>
060 * If unspecified, it defaults to 'job_persistence'. The datasource properties <b>must</b> be defined in the JPPF configuration like so:
061 *
062 * <pre class="jppf_pre">
063 * jppf.datasource.&lt;configId&gt;.name = &lt;datasource_name&gt;
064 * jppf.datasource.&lt;configId&gt;.&lt;hikaricp_property&gt; = &lt;value&gt;</pre>
065 *
066 * <p>Where:
067 *   <ul style="margin-top: 0px">
068 *   <li>{@code configId} is used to distinguish the datasource properties when multiple datasources are defined</li>
069 *   <li>the datasource {@code name} is mandatory and is used to store and retrieve the datasource in a custom registry.
070 *       It is also the datasource name used in the configuration of this job persistence implementation</li>
071 *   <li>{@code hikaricp_property} desginates any valid <a href="https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby">HikariCP configuration property</a>.
072 *       Properties not supported by HikariCP are simply ignored</li>
073 * </ul>
074 *
075 * <p>Here is a full example configuration:
076 * <pre class="jppf_pre">
077 * <span style="color: green"># persistence definition</span>
078 * jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence MY_TABLE <b>jobDS</b>
079 *
080 * <span style="color: green"># datasource definition</span>
081 * jppf.datasource.jobs.name = <b>jobDS</b>
082 * jppf.datasource.jobs.driverClassName = com.mysql.jdbc.Driver
083 * jppf.datasource.jobs.jdbcUrl = jdbc:mysql://localhost:3306/testjppf
084 * jppf.datasource.jobs.username = testjppf
085 * jppf.datasource.jobs.password = testjppf
086 * jppf.datasource.jobs.minimumIdle = 5
087 * jppf.datasource.jobs.maximumPoolSize = 10
088 * jppf.datasource.jobs.connectionTimeout = 30000
089 * jppf.datasource.jobs.idleTimeout = 600000</pre>
090 * @author Laurent Cohen
091 */
092public class DefaultDatabasePersistence extends AbstractDatabasePersistence<PersistenceInfo> implements JobPersistence {
093  /**
094   * Logger for this class.
095   */
096  private static Logger log = LoggerFactory.getLogger(DefaultDatabasePersistence.class);
097  /**
098   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
099   */
100  private static boolean debugEnabled = log.isDebugEnabled();
101  /**
102   * The default persistence table name.
103   */
104  protected static final String DEFAULT_TABLE = "JOB_PERSISTENCE";
105  /**
106   * The default persistence table name.
107   */
108  protected static final String DEFAULT_DATASOURCE = "job_persistence";
109  /**
110   * Whether to wrap input streams into buffered input streams.
111   */
112  private final boolean bufferStreams = JPPFConfiguration.getProperties().getBoolean("jppf.job.persistence.bufferStreams", true);
113
114  /**
115   * Intialize this persistence with the {@linkplain #DEFAULT_TABLE default table name}.
116   * @throws Exception if any error occurs.
117   */
118  public DefaultDatabasePersistence() throws Exception {
119    this(DEFAULT_TABLE, DEFAULT_DATASOURCE);
120  }
121
122  /**
123   * Intialize this persistence with a table name specified in the first string parmater.
124   * @param params if parameters are provided, they have this meaning:
125   * <ul style="margin-top: 0px">
126   * <li>params[0] is the table name, which defaults to 'JOB_PERSISTENCE'</li>
127   * <li>params[1] is the name of a datasource defined in the configuration, and defaults to 'job_persistence'</li>
128   * </ul>
129   * @throws Exception if any error occurs.
130   */
131  public DefaultDatabasePersistence(final String...params) throws Exception {
132    super(DEFAULT_TABLE, DEFAULT_DATASOURCE, JPPFProperties.JOB_PERSISTENCE_DDL_LOCATION, params);
133  }
134
135  @Override
136  public void store(final Collection<PersistenceInfo> infos) throws JobPersistenceException {
137    if (debugEnabled) log.debug("storing {}", infos);
138    try (Connection connection = dataSource.getConnection()) {
139      final boolean autocommit = connection.getAutoCommit();
140      final int isolation  = connection.getTransactionIsolation();
141      connection.setAutoCommit(false);
142      try {
143        connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
144        for (PersistenceInfo info: infos) storeElement(connection, info, null);
145        connection.commit();
146        if (debugEnabled) log.debug("commit done");
147      } catch(final Exception e) {
148        connection.rollback();
149        throw new JobPersistenceException(e);
150      } finally {
151        connection.setAutoCommit(autocommit);
152        connection.setTransactionIsolation(isolation);
153      }
154    } catch(final JobPersistenceException e) {
155      throw e;
156    } catch(final Exception e) {
157      throw new JobPersistenceException(e);
158    }
159  }
160
161  @Override
162  public List<InputStream> load(final Collection<PersistenceInfo> infos) throws JobPersistenceException {
163    if (debugEnabled) log.debug("loading {}", infos);
164    try (Connection connection = dataSource.getConnection()) {
165      final boolean autocommit = connection.getAutoCommit();
166      connection.setAutoCommit(false);
167      try {
168        final List<InputStream> result = new ArrayList<>(infos.size());
169        for (PersistenceInfo info: infos) {
170          try (PreparedStatement ps = prepareLoadStatement(connection, info)) {
171            try (ResultSet rs= ps.executeQuery()) {
172              if (rs.next()) result.add(getInputStream(rs.getBinaryStream(1)));
173            }
174          }
175        }
176        connection.commit();
177        return result;
178      } catch(final Exception e) {
179        connection.rollback();
180        throw e;
181      } finally {
182        connection.setAutoCommit(autocommit);
183      }
184    } catch(final JobPersistenceException e) {
185      throw e;
186    } catch(final Exception e) {
187      throw new JobPersistenceException(e);
188    }
189  }
190
191  @Override
192  public List<String> getPersistedJobUuids() throws JobPersistenceException {
193    try (Connection connection = dataSource.getConnection()) {
194      try (PreparedStatement ps = prepareGetAllUUidsStatement(connection)) {
195        try (ResultSet rs= ps.executeQuery()) {
196          final List<String> uuids = new ArrayList<>();
197          while (rs.next()) uuids.add(rs.getString(1));
198          if (debugEnabled) log.debug("uuids of persisted jobs: {}", uuids);
199          return uuids;
200        }
201      }
202    } catch(final JobPersistenceException e) {
203      throw e;
204    } catch(final Exception e) {
205      throw new JobPersistenceException(e);
206    }
207  }
208
209  @Override
210  public int[] getTaskPositions(final String jobUuid) throws JobPersistenceException {
211    return getPositions(jobUuid, PersistenceObjectType.TASK);
212  }
213
214  @Override
215  public int[] getTaskResultPositions(final String jobUuid) throws JobPersistenceException {
216    return getPositions(jobUuid, PersistenceObjectType.TASK_RESULT);
217  }
218
219  /**
220   * Get the  positions for all the objects of the specified type in the specified job.
221   * @param jobUuid the uuid of the job for which to get the positions.
222   * @param type the type of object for which to get the positions, one of {@link PersistenceObjectType#TASK TASK} or {@link PersistenceObjectType#TASK_RESULT TASK_RESULT}.
223   * @return an array of int holding the positions.
224   * @throws JobPersistenceException if any error occurs.
225   */
226  private int[] getPositions(final String jobUuid, final PersistenceObjectType type) throws JobPersistenceException {
227    try (Connection connection = dataSource.getConnection()) {
228      try (PreparedStatement ps = prepareGetPositionsStatement(connection, jobUuid, type)) {
229        try (ResultSet rs= ps.executeQuery()) {
230          final List<Integer> positions = new ArrayList<>();
231          while (rs.next()) positions.add(rs.getInt(1));
232          final int[] result = new int[positions.size()];
233          int i = 0;
234          for (Integer n: positions) result[i++] = n;
235          if (debugEnabled) log.debug("positions of {} for job uuid={} : {}", type, jobUuid, StringUtils.buildString(", ", "{", "}", result));
236          return result;
237        }
238      }
239    } catch(final JobPersistenceException e) {
240      throw e;
241    } catch(final Exception e) {
242      throw new JobPersistenceException(e);
243    }
244  }
245
246  @Override
247  public void deleteJob(final String jobUuid) throws JobPersistenceException {
248    if (debugEnabled) log.debug("deleting job with uuid = {}", jobUuid);
249    try (Connection connection = dataSource.getConnection();
250      PreparedStatement ps = prepareDeleteJobStatement(connection, jobUuid)) {
251      ps.executeUpdate();
252    } catch(final JobPersistenceException e) {
253      throw e;
254    } catch(final Exception e) {
255      throw new JobPersistenceException(e);
256    }
257  }
258
259  @Override
260  public boolean isJobPersisted(final String jobUuid) throws JobPersistenceException {
261    try (Connection connection = dataSource.getConnection()) {
262      try (final PreparedStatement ps = prepareJobHeaderCountStatement(connection, jobUuid)) {
263        try (final ResultSet rs= ps.executeQuery()) {
264          if (rs.next()) {
265            final int n = rs.getInt(1);
266            return n > 0;
267          }
268        }
269      }
270    } catch(final JobPersistenceException e) {
271      throw e;
272    } catch(final Exception e) {
273      throw new JobPersistenceException(e);
274    }
275    return false;
276  }
277
278  /** @exclude */
279  @Override
280  protected boolean lockForUpdate(final Connection connection, final PersistenceInfo info) throws Exception {
281    try (final PreparedStatement ps = connection.prepareStatement(getSQL("store.select.for.update"))) {
282      ps.setString(1, info.getJobUuid());
283      ps.setString(2, info.getType().name());
284      ps.setInt(3, info.getPosition());
285      try (ResultSet rs = ps.executeQuery()) {
286        return rs.next();
287      }
288    }
289  }
290
291  /** @exclude */
292  @Override
293  protected void insertElement(final Connection connection, final PersistenceInfo info, final byte[] bytes) throws Exception {
294    try (final PreparedStatement ps = connection.prepareStatement(getSQL("store.insert.sql"))) {
295      final InputStream is = getInputStream(info.getInputStream());
296      ps.setString(1, info.getJobUuid());
297      ps.setString(2, info.getType().name());
298      ps.setInt(3, info.getPosition());
299      ps.setBlob(4, is);
300      ps.executeUpdate();
301    }
302  }
303
304  /** @exclude */
305  @Override
306  protected void updateElement(final Connection connection, final PersistenceInfo info, final byte[] bytes) throws Exception {
307    try (PreparedStatement ps2 = connection.prepareStatement(getSQL("store.update.sql"))) {
308      final InputStream is = getInputStream(info.getInputStream());
309      ps2.setBlob(1, is);
310      ps2.setString(2, info.getJobUuid());
311      ps2.setString(3, info.getType().name());
312      ps2.setInt(4, info.getPosition());
313      ps2.executeUpdate();
314    }
315  }
316
317  /**
318   * Create a prepared statement which will insert or update an object n the database.
319   * @param connection the JDBC connection with which to create an dexecute the statement.
320   * @param info the information on the object to persist.
321   * @return a {@link PreparedStatement}.
322   * @throws Exception if any error occurs.
323   */
324  private PreparedStatement prepareLoadStatement(final Connection connection, final PersistenceInfo info) throws Exception {
325    final PreparedStatement ps = connection.prepareStatement(getSQL("load.sql"));
326    ps.setString(1, info.getJobUuid());
327    ps.setString(2, info.getType().name());
328    ps.setInt(3, info.getPosition());
329    return ps;
330  }
331
332  /**
333   * Create a prepared statement which obtain the positions of all tasks or task results for the specified job.
334   * @param connection the JDBC connection with which to create and execute the query.
335   * @param uuid the uuid of the job for which to get the positions.
336   * @param type the type of object for which to get the positions.
337   * @return a {@link PreparedStatement}.
338   * @throws Exception if any error occurs.
339   */
340  private PreparedStatement prepareGetPositionsStatement(final Connection connection, final String uuid, final PersistenceObjectType type) throws Exception {
341    final PreparedStatement ps = connection.prepareStatement(getSQL("get.positions.sql"));
342    ps.setString(1, uuid);
343    ps.setString(2, type.name());
344    return ps;
345  }
346
347  /**
348   * Create a prepared statement which obtain the positions of all tasks or task results for the specified job.
349   * @param connection the JDBC connection with which to create and execute the query.
350   * @return a {@link PreparedStatement}.
351   * @throws Exception if any error occurs.
352   */
353  private PreparedStatement prepareGetAllUUidsStatement(final Connection connection) throws Exception {
354    return connection.prepareStatement(getSQL("get.all.uuids.sql"));
355  }
356
357  /**
358   * Create a prepared statement which obtain the positions of all tasks or task results for the specified job.
359   * @param connection the JDBC connection with which to create and execute the statement.
360   * @param uuid the uuid of the job to delete.
361   * @return a {@link PreparedStatement}.
362   * @throws Exception if any error occurs.
363   */
364  private PreparedStatement prepareDeleteJobStatement(final Connection connection, final String uuid) throws Exception {
365    final PreparedStatement ps = connection.prepareStatement(getSQL("delete.job.sql"));
366    ps.setString(1, uuid);
367    return ps;
368  }
369
370  /**
371   * Create a prepared statement which counts the headers of the job with the specified uuid.
372   * @param connection the JDBC connection with which to create and execute the statement.
373   * @param uuid the uuid of the job to delete.
374   * @return a {@link PreparedStatement}.
375   * @throws Exception if any error occurs.
376   */
377  private PreparedStatement prepareJobHeaderCountStatement(final Connection connection, final String uuid) throws Exception {
378    final PreparedStatement ps = connection.prepareStatement(getSQL("exists.job.sql"));
379    ps.setString(1, uuid);
380    ps.setString(2, PersistenceObjectType.JOB_HEADER.name());
381    return ps;
382  }
383
384  /**
385   * Optionally wrap the specified stream into a buffered input stream, if {@link #bufferStreams} is {@link true}.
386   * @param is the stream to write.
387   * @return either {@code is} if {@link #bufferStreams} is {@link false}, or a {@link BufferedInputStream} wrapping it otherwise.
388   * @throws Exception if any error occurs.
389   */
390  private InputStream getInputStream(final InputStream is) throws Exception {
391    return !bufferStreams || (is instanceof BufferedInputStream) ? is : new BufferedInputStream(is);
392  }
393}