001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.load.balancer.persistence;
020
021import java.io.*;
022import java.nio.file.*;
023import java.util.*;
024
025import org.jppf.persistence.AbstractFilePersistence;
026import org.jppf.serialization.JPPFSerializationHelper;
027import org.jppf.utils.FileUtils;
028import org.slf4j.*;
029
030/**
031 * File-based persistence implementation for the state of the JPPF load-balancers.
032 * It relies on a directory structure is as follows:
033 * <p><pre class="jppf_pre">
034 * persistence_root
035 * |_channel_identifier<sub>1</sub>
036 * | |_algorithm<sub>1</sub>.data
037 * | |_...
038 * | |_algorithm<sub>P1</sub>.data
039 * |_...
040 * |_channel_identifier<sub>n</sub>
041 *   |_algorithm<sub>1</sub>.data
042 *   |_...
043 *   |_algorithm<sub>Pn</sub>.data</pre>
044 *
045 * <p>Where:
046 * <ul style="margin-top: 0px">
047 *   <li>Each <code>channel_identifier<sub>i</sub></code> represents a hash of a string concatenated from various properties of the channel.
048 *     A channel represents a connection between a node and a driver for server-side load-balancing, or between a driver and a client for client-side load-balancing.
049 *     This id is unique for each channel
050 *     and resilient over restarts of both related peers, contrary to their uuids, which are recreated each time a component starts. Using a hash also ensures that it
051 *     can be used a s a valid folder name in a file system</li>
052 *   <li>Each <code>algorithm<sub>i</sub></code> prefix is the hash of the related load-balancing algorithm name. Again, it ensures it can be used to form a valid file name</li>
053 *   <li>Each <code>algorithm<sub>i</sub></code>.data file represents the serialized state of the related load-balancer</li>
054 * </ul>
055 * @author Laurent Cohen
056 * @since 6.0
057 */
058public class FileLoadBalancerPersistence extends AbstractFilePersistence<LoadBalancerPersistenceInfo, LoadBalancerPersistenceException> implements LoadBalancerPersistence {
059  /**
060   * Logger for this class.
061   */
062  private static Logger log = LoggerFactory.getLogger(FileLoadBalancerPersistence.class);
063  /**
064   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
065   */
066  private static boolean debugEnabled = log.isDebugEnabled();
067  /**
068   * Determines whether the trace level is enabled in the log configuration, without the cost of a method call.
069   */
070  private static boolean traceEnabled = log.isTraceEnabled();
071  /**
072   * The default root path if none is specified.
073   */
074  public static final String DEFAULT_ROOT = "lb_persistence";
075
076  /**
077   * Initialize this persistence with the root path {@link #DEFAULT_ROOT} under the current user directory.
078   */
079  public FileLoadBalancerPersistence() {
080    this(DEFAULT_ROOT);
081  }
082
083  /**
084   * Initialize this persistence with the specified path as root directory.
085   * @param paths the root directory for this persistence.
086   */
087  public FileLoadBalancerPersistence(final String... paths) {
088    super(paths.length > 0 ? paths : new String[] { DEFAULT_ROOT });
089  }
090
091  @Override
092  public void store(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
093    try {
094      if (debugEnabled) log.debug("storing {}", info);
095      final Path nodeDir = getSubDir(info.getChannelID());
096      checkDirectory(nodeDir);
097      final Path path = getBundlerPath(nodeDir, info.getAlgorithmID(), false);
098      final Path tmpPath = getBundlerPath(nodeDir, info.getAlgorithmID(), true);
099      try (final BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(tmpPath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))) {
100        info.serializeToStream(out);
101      }
102      Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
103    } catch (final Exception e) {
104      throw new LoadBalancerPersistenceException(e);
105    }
106  }
107
108  @Override
109  public Object load(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
110    try {
111      if (debugEnabled) log.debug("loading {}", info);
112      final Path nodeDir = getSubDir(info.getChannelID());
113      if (!Files.exists(nodeDir)) return null;
114      final Path path = getBundlerPath(nodeDir, info.getAlgorithmID(), false);
115      if (!Files.exists(path)) return null;
116      try (final InputStream is = new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ))) {
117        return JPPFSerializationHelper.deserialize(is);
118      }
119    } catch (final Exception e) {
120      throw new LoadBalancerPersistenceException(e);
121    }
122  }
123
124  @Override
125  public void delete(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
126    try {
127      if (debugEnabled) log.debug("deleting bundlers for {}", info);
128      if ((info == null) || ((info.getChannelID() == null) && (info.getAlgorithmID() == null))) {
129        if (Files.exists(rootPath)) Files.walkFileTree(rootPath, new FileUtils.DeleteFileVisitor());
130      } else if (info.getAlgorithmID() == null) {
131        final Path channelDir = getSubDir(info.getChannelID());
132        if (Files.exists(channelDir)) Files.walkFileTree(channelDir, new FileUtils.DeleteFileVisitor());
133      } else if (info.getChannelID() == null) {
134        final String filename = info.getAlgorithmID() + DEFAULT_EXTENSION;
135        final List<Path> channelsToDelete = new ArrayList<>();
136        Files.walkFileTree(rootPath, new FileUtils.DeleteFileVisitor(new PathMatcher() {
137          @Override
138          public boolean matches(final Path path) {
139            final boolean b = filename.equals(path.getFileName().toString());
140            if (b) channelsToDelete.add(path.getParent());
141            return b;
142          }
143        }));
144        for (Path channelPath: channelsToDelete) deleteIfEmpty(channelPath);
145      } else {
146        final Path path = getBundlerPath(getSubDir(info.getChannelID()), info.getAlgorithmID(), false);
147        Files.deleteIfExists(path);
148        deleteIfEmpty(getSubDir(info.getChannelID()));
149      }
150    } catch (final Exception e) {
151      throw new LoadBalancerPersistenceException(e);
152    }
153  }
154
155  @Override
156  public List<String> list(final LoadBalancerPersistenceInfo info) throws LoadBalancerPersistenceException {
157    try {
158      final List<String> result = new ArrayList<>();
159      if ((info == null) || ((info.getChannelID() == null) && (info.getAlgorithmID() == null))) {
160        if (Files.exists(rootPath)) {
161          try (final DirectoryStream<Path> nodeDS = Files.newDirectoryStream(rootPath, new DirectoryFilter())) {
162            for (final Path path : nodeDS) {
163              if (path != null) result.add(path.getFileName().toString());
164            }
165          }
166        }
167      } else if (info.getAlgorithmID() == null) {
168        if (traceEnabled) log.trace("listing algos for request={}", info);
169        if (Files.exists(rootPath)) {
170          final Path channelDir = getSubDir(info.getChannelID());
171          if (traceEnabled) log.trace("listing algos in {}", channelDir);
172          if (Files.exists(channelDir)) {
173            if (traceEnabled) log.trace("listing algos in existing {}", channelDir);
174            try (final DirectoryStream<Path> channelDS = Files.newDirectoryStream(channelDir, new DirectoryStream.Filter<Path>() {
175              @Override
176              public boolean accept(final Path entry) throws IOException {
177                if (traceEnabled) log.trace("filter checking {}", entry);
178                return !Files.isDirectory(entry) && pathname(entry).endsWith(DEFAULT_EXTENSION);
179              }
180            })) {
181              for (final Path path : channelDS) {
182                if (path != null) {
183                  final String name = pathname(path.getFileName());
184                  final String algo = name.substring(0, name.length() - DEFAULT_EXTENSION.length());
185                  if (traceEnabled) log.trace("algo is {} for path={}", algo, path);
186                  result.add(algo);
187                }
188              }
189            }
190          }
191        }
192      } else if (info.getChannelID() == null) {
193        try (final DirectoryStream<Path> channelDS = Files.newDirectoryStream(rootPath, new DirectoryFilter())) {
194          for (final Path channel : channelDS) {
195            if (channel != null) {
196              final Path algoPath = getBundlerPath(channel, info.getAlgorithmID(), false);
197              if (Files.exists(algoPath)) result.add(pathname(channel.getFileName()));
198            }
199          }
200        }
201      } else {
202        final Path path = getBundlerPath(getSubDir(info.getChannelID()), info.getAlgorithmID(), false);
203        if (Files.exists(path)) result.add(info.getAlgorithmID());
204      }
205      if (debugEnabled) log.debug("identifiers of persisted bundler states: {} for request={}", result, info);
206      return result;
207    } catch (final Exception e) {
208      throw new LoadBalancerPersistenceException(e);
209    }
210  }
211
212  /**
213   * Get the path to the file containing a bundler's data.
214   * @param channelDir the direcory of the node for which to get a path.
215   * @param algorithm the load balancer algorithm name.
216   * @param temp whether to return a temporary file path.
217   * @return a {@link Path} instance.
218   */
219  private Path getBundlerPath(final Path channelDir, final String algorithm, final boolean temp) {
220    return Paths.get(pathname(channelDir), algorithm + (temp ? TEMP_EXTENSION : DEFAULT_EXTENSION));
221  }
222
223  /** @exclude */
224  @Override
225  protected LoadBalancerPersistenceException convertException(final Exception e) {
226    return (e instanceof LoadBalancerPersistenceException) ? (LoadBalancerPersistenceException) e : new LoadBalancerPersistenceException(e);
227  }
228}