001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.load.balancer.persistence;
020
021import java.io.*;
022import java.sql.*;
023import java.util.*;
024
025import org.jppf.persistence.AbstractDatabasePersistence;
026import org.jppf.serialization.JPPFSerializationHelper;
027import org.jppf.utils.configuration.JPPFProperties;
028import org.slf4j.*;
029
030/**
031 * A job persistence implementation which stores jobs in a single database table. The table has the following structure:<br>
032 *
033 * <pre class="jppf_pre">
034 * CREATE TABLE &lt;table_name&gt; (
035 *   NODEID varchar(250) NOT NULL,
036 *   ALGORITHMID varchar(250) NOT NULL,
037 *   STATE blob NOT NULL,
038 *   PRIMARY KEY (NODEID, ALGORITHMID)
039 * );</pre>
040 *
041 * <p>Where:
042 * <ul style="margin-top: 0px">
043 *   <li>the NODEID column represents a hash of a string concatenated from various properties of the node. This id is unique for each node
044 *   and resilient over node restarts, contrary to the node uuid, which is recreated each time a node starts</li>
045 *   <li>the ALGORITHMID column is a hash of the load-balancer's algorithm name.</li>
046 *   <li>the STATE column represents the serialized state of the load-balancer, such as provided by {@link PersistentState#getState()}</li>
047 * </ul>
048 *
049 * <p>The table name is specified in the JPPF configuration like this:<br>
050 * <pre class="jppf_pre">
051 * lb.pkg = org.jppf.load.balancer.persistence
052 * jppf.load.balancing.persistence = ${lb.pkg}.DatabaseLoadBalancerPersistence &lt;table_name&gt;</pre>
053 * <br>If unspecified, it defaults to the {@linkplain #DEFAULT_TABLE default table name} 'load_balancer'.
054 * If the table does not exist, JPPF will attempt to create it. If this fails for any reason, for instance if the user does not have sufficient privileges,
055 * then persistence will be disabled.
056 *
057 * <p>This database persistence implementation uses a <a href="https://github.com/brettwooldridge/HikariCP">HikariCP</a> connection pool and datasource.
058 * The datasource is specified by name in the configuration:<br>
059 * {@code jppf.load.balancing.persistence = org.jppf.load.balancer.persistence.DatabaseLoadBalancerPersistence <table_name> <datasource_name>}<br>
060 * If unspecified, it defaults to 'loadBalancerDS'. The datasource properties <b>must</b> be defined in the JPPF configuration like so:
061 *
062 * <pre style="padding: 5px 5px 5px 0px; display: inline-block; margin: 0px; background-color: #E0E0F0">
063 * jppf.datasource.&lt;configId&gt;.name = &lt;datasource_name&gt;
064 * jppf.datasource.&lt;configId&gt;.&lt;hikaricp_property&gt; = &lt;value&gt;</pre>
065 *
066 * <p>Where:
067 *   <ul style="margin-top: 0px">
068 *   <li>{@code configId} is used to distinguish the datasource properties when multiple datasources are defined</li>
069 *   <li>the datasource {@code name} is mandatory and is used to store and retrieve the datasource in a custom registry.
070 *       It is also the datasource name used in the configuration of this job persistence implementation</li>
071 *   <li>{@code hikaricp_property} desginates any valid <a href="https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby">HikariCP configuration property</a>.
072 *       Properties not supported by HikariCP are simply ignored</li>
073 * </ul>
074 *
075 * <p>Here is a full example configuration:
076 * <pre class="jppf_pre">
077 * <span style="color: green"># persistence definition</span>
078 * lb.pkg = org.jppf.load.balancer.persistence
079 * jppf.load.balancing.persistence = ${lb.pkg}.DatabaseLoadBalancerPersistence MY_TABLE <b>loadBalancerDS</b>
080 *
081 * <span style="color: green"># datasource definition</span>
082 * jppf.datasource.lb.name = <b>loadBalancerDS</b>
083 * jppf.datasource.lb.driverClassName = com.mysql.jdbc.Driver
084 * jppf.datasource.lb.jdbcUrl = jdbc:mysql://localhost:3306/testjppf
085 * jppf.datasource.lb.username = testjppf
086 * jppf.datasource.lb.password = testjppf
087 * jppf.datasource.lb.minimumIdle = 5
088 * jppf.datasource.lb.maximumPoolSize = 10
089 * jppf.datasource.lb.connectionTimeout = 30000
090 * jppf.datasource.lb.idleTimeout = 600000</pre>
091 *
092 * @author Laurent Cohen
093 * @since 6.0
094 */
095public class DatabaseLoadBalancerPersistence extends AbstractDatabasePersistence<LoadBalancerPersistenceInfo> implements LoadBalancerPersistence {
096  /**
097   * Logger for this class.
098   */
099  private static Logger log = LoggerFactory.getLogger(DatabaseLoadBalancerPersistence.class);
100  /**
101   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
102   */
103  private static boolean debugEnabled = log.isDebugEnabled();
104  /**
105   * The default persistence table name.
106   */
107  public static final String DEFAULT_TABLE = "load_balancer";
108  /**
109   * The default persistence datasource name.
110   */
111  public static final String DEFAULT_DATASOURCE = "loadBalancerDS";
112
113  /**
114   * Intialize this persistence with the {@linkplain #DEFAULT_TABLE default table name}.
115   * @throws Exception if any error occurs.
116   */
117  public DatabaseLoadBalancerPersistence() throws Exception {
118    this(DEFAULT_TABLE, DEFAULT_DATASOURCE);
119  }
120
121  /**
122   * Intialize this persistence with a table name specified in the first string parameter.
123   * @param params if parameters are provided, they have this meaning:
124   * <ul style="margin-top: 0px">
125   * <li>params[0] is the table name, which defaults to 'JOB_PERSISTENCE'</li>
126   * <li>params[1] is the name of a datasource defined in the configuration, and defaults to 'job_persistence'</li>
127   * </ul>
128   * @throws Exception if any error occurs.
129   */
130  public DatabaseLoadBalancerPersistence(final String...params) throws Exception {
131    super(DEFAULT_TABLE, DEFAULT_DATASOURCE, JPPFProperties.LOAD_BALANCING_PERSISTENCE_DDL_LOCATION, params);
132  }
133
134  @Override
135  public Object load(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
136    if (debugEnabled) log.debug("loading {}", info);
137    try (final Connection connection = dataSource.getConnection()) {
138      final String sql = getSQL("load.sql");
139      try (PreparedStatement ps = connection.prepareStatement(sql)) {
140        ps.setString(1, info.getChannelID());
141        ps.setString(2, info.getAlgorithmID());
142        try (ResultSet rs = ps.executeQuery()) {
143          if (rs.next()) {
144            return JPPFSerializationHelper.deserialize(rs.getBinaryStream(1));
145          }
146        }
147      }
148    } catch(final Exception e) {
149      throw new LoadBalancerPersistenceException(e);
150    }
151    return null;
152  }
153
154  @Override
155  public void store(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
156    if (debugEnabled) log.debug("storing {}", info);
157    try (Connection connection = dataSource.getConnection()) {
158      final boolean autocommit = connection.getAutoCommit();
159      final int isolation  = connection.getTransactionIsolation();
160      connection.setAutoCommit(false);
161      try {
162        connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
163        storeElement(connection, info, info.getStateAsBytes());
164        connection.commit();
165      } catch(final Exception e) {
166        connection.rollback();
167        throw new LoadBalancerPersistenceException(e);
168      } finally {
169        connection.setAutoCommit(autocommit);
170        connection.setTransactionIsolation(isolation);
171      }
172    } catch(final Exception e) {
173      throw new LoadBalancerPersistenceException(e);
174    }
175  }
176
177  @Override
178  public void delete(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
179    if (debugEnabled) log.debug("deleting {}", info);
180    String sql = null;
181    String[] args = EMPTY_STRINGS;
182    if ((info == null) || ((info.getChannelID() == null) && (info.getAlgorithmID() == null))) {
183      sql = getSQL("delete.sql");
184    } else if (info.getAlgorithmID() == null) {
185      sql = getSQL("delete.node.sql");
186      args = new String[] { info.getChannelID() };
187    } else if (info.getChannelID() == null) {
188      sql = getSQL("delete.algo.all.nodes.sql");
189      args = new String[] { info.getAlgorithmID() };
190    } else {
191      sql = getSQL("delete.algo.sql");
192      args = new String[] { info.getChannelID(), info.getAlgorithmID() };
193    }
194    try (final Connection connection = dataSource.getConnection()) {
195      try (final PreparedStatement ps = connection.prepareStatement(sql)) {
196        for (int i=0; i<args.length; i++) ps.setString(i + 1, args[i]);
197        ps.executeUpdate();
198      }
199    } catch(final Exception e) {
200      throw new LoadBalancerPersistenceException(e);
201    }
202  }
203
204  @Override
205  public List<String> list(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
206    String sql = null;
207    String[] args = EMPTY_STRINGS;
208    if ((info == null) || ((info.getChannelID() == null) && (info.getAlgorithmID() == null))) {
209      sql = getSQL("get.all.nodes.sql");
210    } else if (info.getAlgorithmID() == null) {
211      sql = getSQL("get.all.algos.for.node.sql");
212      args = new String[] { info.getChannelID() };
213    } else if (info.getChannelID() == null) {
214      sql = getSQL("get.all.nodes.with.algo.sql");
215      args = new String[] { info.getAlgorithmID() };
216    } else {
217      sql = getSQL("get.node.with.algo.sql");
218      args = new String[] { info.getChannelID(), info.getAlgorithmID() };
219    }
220    final List<String> result = new ArrayList<>();
221    try (final Connection connection = dataSource.getConnection()) {
222      try (final PreparedStatement ps = connection.prepareStatement(sql)) {
223        for (int i=0; i<args.length; i++) ps.setString(i + 1, args[i]);
224        try (final ResultSet rs = ps.executeQuery()) {
225          while (rs.next()) result.add(rs.getString(1));
226        }
227      }
228    } catch(final Exception e) {
229      throw new LoadBalancerPersistenceException(e);
230    }
231    if (debugEnabled) log.debug("result for {} is {}", info, result);
232    return result;
233  }
234
235  /** @exclude */
236  @Override
237  protected boolean lockForUpdate(final Connection connection, final LoadBalancerPersistenceInfo info) throws Exception {
238    try (final PreparedStatement ps = connection.prepareStatement(getSQL("select.for.update.sql"))) {
239      ps.setString(1, info.getChannelID());
240      ps.setString(2, info.getAlgorithmID());
241      try (final ResultSet rs = ps.executeQuery()) {
242        return rs.next();
243      }
244    }
245  }
246
247  /** @exclude */
248  @Override
249  protected void insertElement(final Connection connection, final LoadBalancerPersistenceInfo info, final byte[] bytes) throws Exception {
250    try (final PreparedStatement ps = connection.prepareStatement(getSQL("insert.sql"))) {
251      try (final InputStream is = new ByteArrayInputStream(bytes)) {
252        ps.setString(1, info.getChannelID());
253        ps.setString(2, info.getAlgorithmID());
254        ps.setBlob(3, is);
255        ps.executeUpdate();
256      }
257    }
258  }
259
260  /** @exclude */
261  @Override
262  protected void updateElement(final Connection connection, final LoadBalancerPersistenceInfo info, final byte[] bytes) throws Exception {
263    try (PreparedStatement ps2 = connection.prepareStatement(getSQL("update.sql"))) {
264      try (InputStream is = new ByteArrayInputStream(bytes)) {
265        ps2.setBlob(1, is);
266        ps2.setString(2, info.getChannelID());
267        ps2.setString(3, info.getAlgorithmID());
268        ps2.executeUpdate();
269      }
270    }
271  }
272}