001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021
022import org.jppf.client.balancer.JobManagerClient;
023import org.jppf.client.debug.Debug;
024import org.jppf.client.event.*;
025import org.jppf.discovery.ClientDriverDiscovery;
026import org.jppf.load.balancer.LoadBalancingInformation;
027import org.jppf.node.protocol.Task;
028import org.jppf.utils.*;
029import org.slf4j.*;
030
031/**
032 * This class provides an API to submit execution requests and administration commands,
033 * and request server information data.<br>
034 * It has its own unique identifier, used by the nodes, to determine whether classes from
035 * the submitting application should be dynamically reloaded or not, depending on whether
036 * the uuid has changed or not.
037 * @author Laurent Cohen
038 */
039public class JPPFClient extends AbstractGenericClient {
040  /**
041   * Logger for this class.
042   */
043  private static Logger log = LoggerFactory.getLogger(JPPFClient.class);
044  /**
045   * Determines whether debug-level logging is enabled.
046   */
047  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
048
049  /**
050   * Initialize this client with an automatically generated application UUID.
051   */
052  public JPPFClient() {
053    super(null, JPPFConfiguration.getProperties());
054    Debug.register(this);
055  }
056
057  /**
058   * Initialize this client with the specified application UUID.
059   * @param uuid the unique identifier for this local client.
060   */
061  public JPPFClient(final String uuid) {
062    super(uuid, JPPFConfiguration.getProperties());
063    Debug.register(this);
064  }
065
066  /**
067   * Initialize this client with an automatically generated application UUID.
068   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
069   * @deprecated use {@link #JPPFClient(ConnectionPoolListener[])} instead.
070   */
071  public JPPFClient(final ClientListener... listeners) {
072    super(null, JPPFConfiguration.getProperties(), toDelegation(listeners));
073    Debug.register(this);
074  }
075
076  /**
077   * Initialize this client with the specified application UUID and new connection listeners.
078   * @param uuid the unique identifier for this local client.
079   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
080   * @deprecated use {@link #JPPFClient(String, ConnectionPoolListener[])} instead.
081   */
082  public JPPFClient(final String uuid, final ClientListener... listeners) {
083    super(uuid, JPPFConfiguration.getProperties(), toDelegation(listeners));
084    Debug.register(this);
085  }
086
087  /**
088   * Initialize this client with the specified application UUID and new connection listeners.
089   * @param uuid the unique identifier for this local client.
090   * @param config the JPPF configuration to use for this client.
091   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
092   * @deprecated use {@link #JPPFClient(String, TypedProperties, ConnectionPoolListener[])} instead.
093   */
094  public JPPFClient(final String uuid, final TypedProperties config, final ClientListener... listeners) {
095    super(uuid, config, toDelegation(listeners));
096    Debug.register(this);
097  }
098
099  /**
100   * Initialize this client with an automatically generated application UUID.
101   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
102   */
103  public JPPFClient(final ConnectionPoolListener... listeners) {
104    super(null, JPPFConfiguration.getProperties(), listeners);
105    Debug.register(this);
106  }
107
108  /**
109   * Initialize this client with the specified application UUID and new connection listeners.
110   * @param uuid the unique identifier for this local client.
111   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
112   */
113  public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
114    super(uuid, JPPFConfiguration.getProperties(), listeners);
115    Debug.register(this);
116  }
117
118  /**
119   * Initialize this client with the specified application UUID and new connection listeners.
120   * @param uuid the unique identifier for this local client.
121   * @param config the JPPF configuration to use for this client.
122   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
123   * @exclude
124   */
125  public JPPFClient(final String uuid, final TypedProperties config, final ConnectionPoolListener... listeners) {
126    super(uuid, config, listeners);
127    Debug.register(this);
128  }
129
130  @Override
131  AbstractJPPFClientConnection createConnection(final String name, final JPPFConnectionPool pool) {
132    return new JPPFClientConnectionImpl(this, name, pool);
133  }
134
135  @Override
136  public List<Task<?>> submitJob(final JPPFJob job) {
137    if (isClosed()) throw new IllegalStateException("this client is closed");
138    if (job == null) throw new IllegalArgumentException("job cannot be null");
139    if (job.getJobTasks().isEmpty()) throw new IllegalStateException("job cannot be empty");
140    if (job.client != null) {
141      if (!job.isDone()) throw new IllegalStateException("this job is already submitted");
142      job.cancelled.set(false);
143      job.getResults().clear();
144    }
145    job.client = this;
146    if (debugEnabled) log.debug("submitting job {}", job);
147    if (log.isTraceEnabled()) {
148      for (Task<?> task: job) {
149        log.trace(String.format("task %s, position=%d, taskObject=%s, taskObject class=%s", task, task.getPosition(), task.getTaskObject(),
150          (task.getTaskObject() != null) ? task.getTaskObject().getClass() : null));
151      }
152    }
153    getJobManager().submitJob(job);
154    if (job.isBlocking()) return job.awaitResults();
155    return null;
156  }
157
158  /**
159   * {@inheritDoc}
160   * @exclude
161   */
162  @Override
163  protected JobManager createJobManager() {
164    JobManager jobManager = null;
165    try {
166      jobManager = new JobManagerClient(this);
167    } catch (Exception e) {
168      log.error("Can't initialize job Manager", e);
169    }
170    return jobManager;
171  }
172
173  /**
174   * Reset this client, that is, close it if necessary, reload its configuration, then open it again.
175   * If the client is already closed or reseeting, this method has no effect.
176   * @see #reset(TypedProperties)
177   * @since 4.0
178   */
179  public void reset() {
180    if (isClosed()) return;
181    if (debugEnabled) log.debug("resetting client");
182    if (resetting.compareAndSet(false, true)) {
183      close(true);
184      JPPFConfiguration.reset();
185      init(JPPFConfiguration.getProperties());
186    }
187  }
188
189  /**
190   * Reset this client, that is, close it if necessary, then open it again, using the specified confguration.
191   * If the client is already closed or reseeting, this method has no effect.
192   * @param configuration the configuration to initialize this client with.
193   * @see #reset()
194   * @since 4.0
195   */
196  public void reset(final TypedProperties configuration) {
197    if (isClosed()) return;
198    if (debugEnabled) log.debug("resetting client");
199    if (resetting.compareAndSet(false, true)) {
200      close(true);
201      init(configuration);
202    }
203  }
204
205  /**
206   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE} status.
207   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE)}.
208   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
209   * @since 5.0
210   */
211  public JPPFConnectionPool awaitActiveConnectionPool() {
212    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE);
213  }
214
215  /**
216   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
217   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
218   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
219   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
220   * @since 5.0
221   */
222  public JPPFConnectionPool awaitWorkingConnectionPool() {
223    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
224  }
225
226  /**
227   * Wait until there is at least one connection pool with at least one connection in one of the specified statuses.
228   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, statuses)}.
229   * @param statuses the possible statuses of the connections in the pools to wait for.
230   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
231   * @since 5.0
232   */
233  public JPPFConnectionPool awaitConnectionPool(final JPPFClientConnectionStatus...statuses) {
234    return awaitConnectionPool(Long.MAX_VALUE, statuses);
235  }
236
237  /**
238   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
239   * or until the specified timeout to expire, whichever happens first.
240   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
241   * @param statuses the possible statuses of the connections in the pools to wait for.
242   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
243   * @since 5.0
244   */
245  public JPPFConnectionPool awaitConnectionPool(final long timeout, final JPPFClientConnectionStatus...statuses) {
246    List<JPPFConnectionPool> list = awaitConnectionPools(timeout, statuses);
247    return list.isEmpty() ? null : list.get(0);
248  }
249
250  /**
251   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
252   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
253   * This is a shorthand for {@code awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
254   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
255   * @since 5.1
256   */
257  public List<JPPFConnectionPool> awaitWorkingConnectionPools() {
258    return awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
259  }
260
261  /**
262   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
263   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status, or the specified timeout expires, whichever happens first.
264   * This is a shorthand for {@code awaitConnectionPools(tiemout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
265   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
266   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
267   * @since 5.1
268   */
269  public List<JPPFConnectionPool> awaitWorkingConnectionPools(final long timeout) {
270    return awaitConnectionPools(timeout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
271  }
272
273  /**
274   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
275   * or until the specified timeout to expire, whichever happens first.
276   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
277   * @param statuses the possible statuses of the connections in the pools to wait for.
278   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
279   * @since 5.0
280   */
281  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final JPPFClientConnectionStatus...statuses) {
282    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
283    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
284      @Override public boolean evaluate() {
285        return !ref.setSynchronized(findConnectionPools(statuses), pools).isEmpty();
286      }
287    }, timeout);
288    return ref.get();
289  }
290
291  /**
292   * Wait until there is at least one connection pool where the number of connections with the specified statuses
293   * satisfy the specified condition, or until the specified timeout expires, whichever happens first.
294   * @param operator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
295   * @param expectedConnections the expected number of connections to wait for.
296   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
297   * @param statuses the possible statuses of the connections in the pools to wait for.
298   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
299   * @since 5.0
300   */
301  public List<JPPFConnectionPool> awaitConnectionPools(final Operator operator, final int expectedConnections, final long timeout, final JPPFClientConnectionStatus...statuses) {
302    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
303    final ConnectionPoolFilter<JPPFConnectionPool> filter = new ConnectionPoolFilter<JPPFConnectionPool>() {
304      @Override
305      public boolean accepts(final JPPFConnectionPool pool) {
306        List<JPPFClientConnection> list = pool.getConnections(statuses);
307        return operator.evaluate(list.size(), expectedConnections);
308      }
309    };
310    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
311      @Override public boolean evaluate() {
312        List<JPPFConnectionPool> result = new ArrayList<>();
313        List<JPPFConnectionPool> temp = findConnectionPools(statuses);
314        for (JPPFConnectionPool pool: temp) {
315          if (filter.accepts(pool)) result.add(pool);
316        }
317        boolean empty = ref.setSynchronized(result, pools).isEmpty();
318        return !empty || (empty && (expectedConnections <= 0));
319      }
320    }, timeout);
321    return ref.get();
322  }
323
324  /**
325   * Wait until there is at least one connection pool where at least one connections passes the specified filter,
326   * or until the specified timeout expires, whichever happens first.
327   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
328   * @param filter an implementation of the {@link ConnectionPoolFilter} interface. A {@code null} value is interpreted as no filter (all pools are accepted).
329   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
330   * @since 5.0
331   */
332  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final ConnectionPoolFilter filter) {
333    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
334    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
335      @Override public boolean evaluate() {
336        List<JPPFConnectionPool> result = new ArrayList<>();
337        List<JPPFConnectionPool> temp = getConnectionPools();
338        for (JPPFConnectionPool pool: temp) {
339          if (filter.accepts(pool)) result.add(pool);
340        }
341        return !ref.setSynchronized(result, pools).isEmpty();
342      }
343    }, timeout);
344    return ref.get();
345  }
346
347  @Override
348  public void close() {
349    Debug.unregister(this);
350    super.close();
351  }
352
353  /**
354   * Convert the specified client listeners into {@link ClientListenerDelegation} instances.
355   * @param listeners the array of listners to convert.
356   * @return an array of {@link ClientListenerDelegation} instances, possibly empty but never null.
357   */
358  @SuppressWarnings("deprecation")
359  private static ConnectionPoolListener[] toDelegation(final ClientListener[] listeners) {
360    if ((listeners == null) || (listeners.length <= 0)) return new ConnectionPoolListener[0];
361    ConnectionPoolListener[] clds = new ConnectionPoolListener[listeners.length];
362    for (int i=0; i<listeners.length; i++) clds[i] = new ClientListenerDelegation(listeners[i]);
363    return clds;
364  }
365
366  /**
367   * Add a custom driver discovery mechanism to those already registered, if any.
368   * @param discovery the driver discovery to add.
369   */
370  public void addDriverDiscovery(final ClientDriverDiscovery discovery) {
371    discoveryHandler.addDiscovery(discovery);
372  }
373
374  /**
375   * Remove a custom driver discovery mechanism from those already registered.
376   * @param discovery the driver discovery to remove.
377   */
378  public void removeDriverDiscovery(final ClientDriverDiscovery discovery) {
379    discoveryHandler.removeDiscovery(discovery);
380  }
381
382  /**
383   * Get the current load-balancer settings.
384   * @return a {@link LoadBalancingInformation} instance, which encapsulates a load-balancing alfgorithm name, along with its parameters.
385   * @since 5.2.7
386   */
387  public LoadBalancingInformation getLoadBalancerSettings() {
388    JobManager manager = getJobManager();
389    return (manager == null) ? null : manager.getLoadBalancerSettings();
390  }
391
392  /**
393   * Change the load balancer settings.
394   * @param algorithm the name of load-balancing alogrithm to use.
395   * @param parameters the algorithm's parameters, if any. The parmeter names are assumed no to be prefixed.
396   * @throws Exception if any error occurs or if the algorithm name is {@code null} or not known.
397   * @since 5.2.7
398   */
399  public void setLoadBalancerSettings(final String algorithm, final Properties parameters) throws Exception {
400    JobManager manager = getJobManager();
401    if (manager != null) manager.setLoadBalancerSettings(algorithm, parameters);
402  }
403}