001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.load.balancer.impl;
019
020import org.jppf.load.balancer.*;
021import org.jppf.management.JPPFSystemInformation;
022import org.jppf.utils.TypedProperties;
023import org.jppf.utils.configuration.*;
024import org.slf4j.*;
025
026/**
027 * This implementation of a load-balancing algorithm illustrates the use of
028 * the {@link ChannelAwareness} APIs, by sending to each node at most <code>m * n</code> tasks,
029 * where <i>n</i> is the number of processing threads in the node, and <i>m</i> is a
030 * user-defined parameter which defaults to one.
031 * @author Laurent Cohen
032 */
033public class NodeThreadsBundler extends AbstractBundler<NodeThreadsProfile> implements ChannelAwareness {
034  /**
035   * Logger for this class.
036   */
037  private static Logger log = LoggerFactory.getLogger(NodeThreadsBundler.class);
038  /**
039   * Holds information about the node's environment and configuration.
040   */
041  private JPPFSystemInformation channelConfiguration = null;
042  /**
043   * The current number of tasks to send to the node.
044   */
045  private int bundleSize = 1;
046
047  /**
048   * Creates a new instance with the specified parameters profile.
049   * @param profile the parameters of the load-balancing algorithm.
050   */
051  public NodeThreadsBundler(final NodeThreadsProfile profile) {
052    super(profile);
053    if (log.isDebugEnabled()) log.debug("creating " + this.getClass().getSimpleName() + " #" + this.bundlerNumber);
054  }
055
056  /**
057   * Get the current number of tasks to send to the node.
058   * @return the bundle size as an int value.
059   */
060  @Override
061  public int getBundleSize() {
062    return bundleSize;
063  }
064
065  @Override
066  public JPPFSystemInformation getChannelConfiguration() {
067    return channelConfiguration;
068  }
069
070  @Override
071  public void setChannelConfiguration(final JPPFSystemInformation channelConfiguration) {
072    this.channelConfiguration = channelConfiguration;
073    computeBundleSize();
074    if (log.isDebugEnabled()) log.debug("setting node configuration on bundler #" + bundlerNumber + ": " + channelConfiguration);
075  }
076
077  /**
078   * Compute the number of tasks to send to the node. This is the actual algorithm implementation.
079   */
080  private void computeBundleSize() {
081    final JPPFSystemInformation nodeConfig = getChannelConfiguration();
082    if (nodeConfig == null) bundleSize = 1;
083    else {
084      // get the number of processing threads in the node
085      final TypedProperties jppf = getChannelConfiguration().getJppf();
086      final boolean isPeer = jppf.getBoolean("jppf.peer.driver", false);
087      final JPPFProperty<Integer> prop = isPeer ? JPPFProperties.PEER_PROCESSING_THREADS : JPPFProperties.PROCESSING_THREADS;
088      int nbThreads = jppf.getInt(prop.getName(), -1);
089      if (log.isDebugEnabled()) log.debug("bundler #" + this.bundlerNumber + " nb threads from config = " + nbThreads);
090      // if number of threads is not defined, we assume it is the number of available processors
091      if (nbThreads <= 0) nbThreads = getChannelConfiguration().getRuntime().getInt("availableProcessors");
092      if (nbThreads <= 0) nbThreads = 1;
093      int multiplicator = profile.getMultiplicator();
094      if (multiplicator <= 0) multiplicator = 1;
095      bundleSize = nbThreads * multiplicator;
096    }
097    // log the new bundle size
098    if (log.isDebugEnabled()) log.debug("bundler #" + this.bundlerNumber + " computed new bundle size = " + bundleSize);
099  }
100
101  /**
102   * Release the resources used by this bundler.
103   */
104  @Override
105  public void dispose() {
106    if (log.isDebugEnabled()) log.debug("disposing bundler #" + this.bundlerNumber);
107    this.channelConfiguration = null;
108  }
109}