DBRunner.java

/*
 * JPPF.
 * Copyright (C) 2005-2019 JPPF Team.
 * http://www.jppf.org
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jppf.example.nodelifecycle.client;

import java.sql.*;
import java.util.Collection;

import org.jppf.client.*;
import org.jppf.client.event.*;
import org.jppf.management.*;
import org.jppf.node.protocol.Task;
import org.jppf.utils.*;
import org.jppf.utils.Operator;
import org.slf4j.*;

/**
 * Runner class for the square matrix multiplication demo.
 * @author Laurent Cohen
 */
public class DBRunner {
  /**
   * Logger for this class.
   */
  private static Logger log = LoggerFactory.getLogger(DBRunner.class);
  /**
   * JPPF client used to submit execution requests.
   */
  private static JPPFClient jppfClient = null;
  /**
   * A JMX connection to one of the nodes.
   */
  private static JMXNodeConnectionWrapper jmxNode = null;

  /***
   * Send a job with a number of tasks that insert a row in a database table.
   * We use the management APIs to kill a node before the job execution is complete,
   * so we can demonstrate the transaction recovery mechanism (implemented in Atomikos)
   * on the node side. Once the job is complete, we display all the rows in the table.
   * @param args the first argument, if any, will be used as the JPPF client's uuid.
   */
  public static void main(final String... args) {
    try {
      final TypedProperties config = JPPFConfiguration.getProperties();
      final int nbTasks = config.getInt("job.nbtasks", 20);
      final long taskSleepTime = config.getLong("task.sleep.time", 2000L);
      final long timeBeforeRestartNode = config.getLong("time.before.restart.node", 6000L);
      if ((args != null) && (args.length > 0)) jppfClient = new JPPFClient(args[0]);
      else jppfClient = new JPPFClient();
      // Initialize the JMX connection to the node
      getNode();
      // Create a job with the specified number of tasks
      final JPPFJob job = new JPPFJob();
      job.setName("NodeLifeCycle demo job");
      for (int i = 1; i <= nbTasks; i++) {
        final DBTask task = new DBTask(taskSleepTime);
        task.setId("" + i);
        job.add(task);
      }
      // customize the result listener to display a message each time a task result is received
      final JobListener jobListener = new JobListenerAdapter() {
        @Override
        public synchronized void jobReturned(final JobEvent event) {
          for (Task<?> task : event.getJobTasks()) {
            if (task.getThrowable() != null) output("task " + task.getId() + " error: " + task.getThrowable().getMessage());
            else output("task " + task.getId() + " result: " + task.getResult());
          }
        }
      };
      job.addJobListener(jobListener);
      jppfClient.submitAsync(job);
      Thread.sleep(timeBeforeRestartNode);
      // restart the node to demonstrate the transaction recovery
      output("restarting node");
      restartNode();
      // wait for the job completion
      job.awaitResults();
      // display the list of rows in the DB table
      if (config.getBoolean("display.db.content", false)) displayDBContent();
      output("demo ended");
    } catch (final Exception e) {
      e.printStackTrace();
    } finally {
      if (jppfClient != null) jppfClient.close();
    }
  }

  /**
   * Get a JMX connection to one of the nodes.
   * @return the node connection as a {@link JMXNodeConnectionWrapper} instance.
   * @throws Exception if the node connection could not be established.
   */
  private static JMXNodeConnectionWrapper getNode() throws Exception {
    if (jmxNode == null) {
      final JMXDriverConnectionWrapper jmxDriver = jppfClient.awaitActiveConnectionPool().awaitJMXConnections(Operator.AT_LEAST, 1, true).get(0);
      final Collection<JPPFManagementInfo> nodesInfo = jmxDriver.nodesInformation();
      final JPPFManagementInfo info = nodesInfo.iterator().next();
      jmxNode = new JMXNodeConnectionWrapper(info.getHost(), info.getPort());
      jmxNode.connect();
    }
    return jmxNode;
  }

  /**
   * Kill the node.
   */
  private static void restartNode() {
    try {
      final JMXNodeConnectionWrapper jmxNode = getNode();
      jmxNode.restart();
    } catch (final Exception e) {
      output("Could not restart a node:\n" + ExceptionUtils.getStackTrace(e));
    }
  }

  /**
   * List all the rows in the TASK_RESULT table.
   * @throws Exception if any error occurs.
   */
  private static void displayDBContent() throws Exception {
    Class.forName("org.h2.Driver");
    final Connection c = DriverManager.getConnection("jdbc:h2:tcp://localhost:9092/./jppf_samples;SCHEMA=PUBLIC", "jppf", "jppf");
    //Connection c = DriverManager.getConnection("jdbc:postgresql://localhost:5432/jppf_samples", "jppf", "jppf");
    final String sql = "SELECT * FROM task_result";
    final Statement stmt = c.createStatement();
    final ResultSet rs = stmt.executeQuery(sql);
    int count = 1;
    output("\n***** displaying the DB table content *****");
    while (rs.next()) {
      final StringBuilder sb = new StringBuilder();
      sb.append("row ").append(count).append(": ");
      sb.append("id=").append(rs.getObject("id"));
      sb.append(", task_id=").append(rs.getObject("task_id"));
      sb.append(", message=").append(rs.getObject("message"));
      output(sb.toString());
      count++;
    }
    output("***** end of DB table content *****");
    rs.close();
    stmt.close();
    c.close();
  }

  /**
   * Print a message to the console and/or log file.
   * @param message the message to print.
   */
  private static void output(final String message) {
    System.out.println(message);
    log.info(message);
  }
}