001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.load.balancer.impl;
019
020import java.io.Serializable;
021import java.util.*;
022import java.util.concurrent.locks.*;
023
024import org.jppf.load.balancer.*;
025import org.jppf.load.balancer.persistence.PersistentState;
026import org.slf4j.*;
027
028/**
029 * This class implements a self tuned bundle size algorithm. It starts using the
030 * bundle size defined in property file and starts changing it to find a better
031 * performance. The algorithm starts making The algorithm waits for some
032 * execution to get a mean execution time, and them make a change in bundle size
033 * Each time the change is done, it is done over a smaller range randomly
034 * selected (like Monte Carlo algorithm).
035 * 
036 * @author Domingos Creado
037 * @author Laurent Cohen
038 */
039public class AutoTunedBundler extends AbstractAdaptiveBundler<AnnealingTuneProfile> implements PersistentState {
040  /**
041   * Logger for this class.
042   */
043  private static Logger log = LoggerFactory.getLogger(AutoTunedBundler.class);
044  /**
045   * Determines whether trace level is set for logging.
046   */
047  private static boolean traceEnabled = log.isTraceEnabled();
048  /**
049   * Used to compute a pseudo-random increment to the bundle size, as part of a Monte Carlo random walk
050   * towards a good solution.
051   */
052  private Random rnd = new Random(System.nanoTime());
053  /**
054   * The state of this undler.
055   */
056  private final BundlerState state;
057  /**
058   * Lock used to synchronize access to the load-balancer state.
059   */
060  private final Lock lock = new ReentrantLock();
061
062  /**
063   * Creates a new instance with the initial size of bundle as the start size.
064   * @param profile the parameters of the auto-tuning algorithm,
065   * grouped as a performance analysis profile.
066   */
067  public AutoTunedBundler(final AnnealingTuneProfile profile) {
068    super(profile);
069    this.state = new BundlerState();
070    state.bundleSize = profile.size;
071    if (state.bundleSize < 1) state.bundleSize = 1;
072  }
073
074  @Override
075  public int getBundleSize() {
076    lock.lock();
077    try {
078      return state.bundleSize;
079    } finally {
080      lock.unlock();
081    }
082  }
083
084  @Override
085  public void dispose() {
086    lock.lock();
087    try {
088      state.samplesMap.clear();
089    } finally {
090      lock.unlock();
091    }
092  }
093
094  /**
095   * This method performs the actual bundle size computation, based on current and past
096   * performance data.<br>
097   * Depending on the the performance samples and profile parameters, the following actions
098   * may be triggered in this method:
099   * <ul>
100   * <li>samples collection (unconditional)</li>
101   * <li>detection of performance profile changes, if not currently being done</li>
102   * <li>when a performance profile change is detected, recompute the bundle size.</li>
103   * </ul>
104   * @param bundleSize bundle size of the new performance sample.
105   * @param time total execution time of the new sample.
106   */
107  @Override
108  public void feedback(final int bundleSize, final double time) {
109    if (traceEnabled) {
110      log.trace("Bundler#" + bundlerNumber + ": Got sample with bundleSize=" + bundleSize + " and totalTime=" + time);
111    }
112    // retrieving the record of the bundle size
113    PerformanceSample bundleSample;
114    lock.lock();
115    try {
116      bundleSample = state.samplesMap.get(bundleSize);
117      if (bundleSample == null) {
118        bundleSample = new PerformanceSample();
119        state.samplesMap.put(bundleSize, bundleSample);
120      }
121      final long samples = bundleSample.samples + bundleSize;
122      bundleSample.mean = (time + bundleSample.samples * bundleSample.mean) / samples;
123      bundleSample.samples = samples;
124      if (samples > profile.getMinSamplesToAnalyse()) {
125        performAnalysis();
126        if (traceEnabled) log.trace("Bundler#" + bundlerNumber + ": bundle size = " + bundleSize);
127      }
128    } finally {
129      lock.unlock();
130    }
131  }
132
133  /**
134   * Recompute the bundle size after a performance profile change has been detected.
135   */
136  private void performAnalysis() {
137    double stableMean = 0;
138    int bestSize = searchBestSize();
139    final int max = maxSize();
140    if ((max > 0) && (bestSize > max)) bestSize = max;
141    int counter = 0;
142    while (counter < profile.getMaxGuessToStable()) {
143      int diff = profile.createDiff(bestSize, state.samplesMap.size(), rnd);
144      if (diff < bestSize) {
145        // the second part is there to ensure the size is > 0
146        if (rnd.nextBoolean()) diff = -diff;
147      }
148      state.bundleSize = bestSize + diff;
149      if (state.samplesMap.get(state.bundleSize) == null) {
150        if (traceEnabled) log.trace("Bundler#" + bundlerNumber + ": The next bundle size that will be used is " + state.bundleSize);
151        return;
152      }
153      counter++;
154    }
155
156    state.bundleSize = Math.max(1, bestSize);
157    final PerformanceSample sample = state.samplesMap.get(state.bundleSize);
158    if (sample != null) {
159      stableMean = sample.mean;
160      state.samplesMap.clear();
161      state.samplesMap.put(state.bundleSize, sample);
162    }
163    if (traceEnabled) log.trace("Bundler#" + bundlerNumber + ": The bundle size converged to " + state.bundleSize + " with the mean execution of " + stableMean);
164  }
165
166  /**
167   * Lookup the best bundle size in the current samples map.
168   * @return the best bundle size as an int value.
169   */
170  private int searchBestSize() {
171    int bestSize = 0;
172    double minorMean = Double.POSITIVE_INFINITY;
173    for (final Integer size : state.samplesMap.keySet()) {
174      final PerformanceSample sample = state.samplesMap.get(size);
175      if (sample.mean < minorMean) {
176        bestSize = size;
177        minorMean = sample.mean;
178      }
179    }
180    if (traceEnabled) log.trace("Bundler#" + bundlerNumber + ": best size found = " + bestSize);
181    return bestSize;
182  }
183
184  @Override
185  public Object getState() {
186    lock.lock();
187    try {
188      return state;
189    } finally {
190      lock.unlock();
191    }
192  }
193
194  @Override
195  public void setState(final Object persistedState) {
196    final BundlerState other = (BundlerState) persistedState;
197    lock.lock();
198    try {
199      state.bundleSize = other.bundleSize;
200      state.samplesMap = other.samplesMap;
201    } finally {
202      lock.unlock();
203    }
204  }
205
206  @Override
207  public Lock getStateLock() {
208    return lock;
209  }
210
211  /**
212   * Holds the state of this bundler for persistence.
213   */
214  private static class BundlerState implements Serializable {
215    /**
216     * Explicit serialVersionUID.
217     */
218    private static final long serialVersionUID = 1L;
219    /**
220     * The current bundle size.
221     */
222    private int bundleSize = 1;
223    /**
224     * A map of performance samples, sorted by increasing bundle size.
225     */
226    private Map<Integer, PerformanceSample> samplesMap = new HashMap<>();
227  }
228}