001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.node.connection;
020
021import java.util.*;
022import java.util.concurrent.LinkedBlockingQueue;
023
024import org.jppf.utils.RegexUtils;
025import org.slf4j.*;
026
027/**
028 * Common abstact super class for connection strategies that read connection from groups of comma-separated values.
029 * @author Laurent Cohen
030 * @since 6.0
031 */
032public abstract class AbstractCsvConnectionStrategy implements DriverConnectionStrategy {
033  /**
034   * Logger for this class.
035   */
036  private static Logger log = LoggerFactory.getLogger(AbstractCsvConnectionStrategy.class);
037  /**
038   * The queue in which {@code DriverConnectionInfo} objects are stored.
039   */
040  final Queue<DriverConnectionInfo> queue = new LinkedBlockingQueue<>();
041  /**
042   * The fallback strategy to use in case the CSV file is not found or none of the driver defintions it contains is valid. 
043   */
044  final DriverConnectionStrategy fallbackStrategy;
045
046  /**
047   * Find and read the CSV data.
048   */
049  public AbstractCsvConnectionStrategy() {
050    readAllConnectionInfo();
051    fallbackStrategy = queue.isEmpty() ? new JPPFDefaultConnectionStrategy() : null;
052    if (log.isDebugEnabled()) {
053      if (queue.isEmpty()) log.debug("no valid driver definition found, falling back to default strategy");
054      else {
055        final StringBuilder sb = new StringBuilder("driver definitions:");
056        for (final DriverConnectionInfo info: queue) sb.append('\n').append(info);
057        log.debug(sb.toString());
058      }
059    }
060  }
061
062  @Override
063  public DriverConnectionInfo nextConnectionInfo(final DriverConnectionInfo currentInfo, final ConnectionContext context) {
064    if (fallbackStrategy != null) return fallbackStrategy.nextConnectionInfo(currentInfo, context);
065    if (log.isDebugEnabled()) log.debug("new connection request with prevInfo={} and context={}", currentInfo, context);
066    if ((currentInfo != null) && (context.getReason() == ConnectionReason.MANAGEMENT_REQUEST)) {
067      return currentInfo;
068    }
069    final DriverConnectionInfo info = queue.poll();
070    queue.offer(info);
071    return info;
072  }
073
074  /**
075   * Parse the CSV file specified in the configuration and convert each line
076   * into a {@link DriverConnectionInfo} which is then added to the queue.
077   */
078  void readAllConnectionInfo() {
079    try {
080      final List<String> lines = getConnectionInfoAsLines();
081      for (final String line: lines) {
082        final DriverConnectionInfo info = parseLine(line.trim());
083        if (info != null) {
084          if (log.isDebugEnabled()) log.debug("got connection info: {}", info);
085          queue.offer(info);
086        }
087      }
088    } catch (final Exception e) {
089      log.error(e.getMessage(), e);
090    }
091  }
092
093  /**
094   * @return a list of csv-fromatted lines.
095   */
096  abstract List<String> getConnectionInfoAsLines();
097
098  /**
099   * Parse a CSV line to generate a driver connection info.
100   * @param csv the csv line to parse.
101   * @return a {@link DriverConnectionInfo} instance, or {@code null} if the CSV is not valid or a comment.
102   */
103  static DriverConnectionInfo parseLine(final String csv) {
104    if (csv.startsWith("#")) return null;
105    final String[] tokens = RegexUtils.COMMA_PATTERN.split(csv);
106    if ((tokens != null) && (tokens.length == 4)) {
107      for (int i=0; i<tokens.length; i++) tokens[i] = tokens[i].trim();
108      final boolean secure = "true".equalsIgnoreCase(tokens[0]);
109      final String host = tokens[1];
110      final int port;
111      boolean recoveryEnabled = false;
112      try {
113        port = Integer.valueOf(tokens[2]);
114      } catch(@SuppressWarnings("unused") final Exception e) {
115        return null;
116      }
117      try {
118        final int recoveryPort = Integer.valueOf(tokens[3]);
119        recoveryEnabled = recoveryPort > 0;
120      } catch(@SuppressWarnings("unused") final Exception e) {
121        recoveryEnabled = Boolean.valueOf(tokens[3]);
122      }
123      return new JPPFDriverConnectionInfo(secure, host, port, recoveryEnabled);
124    }
125    return null;
126  }
127}