001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021
022import org.jppf.client.balancer.JobManagerClient;
023import org.jppf.client.debug.Debug;
024import org.jppf.client.event.ConnectionPoolListener;
025import org.jppf.discovery.ClientDriverDiscovery;
026import org.jppf.load.balancer.LoadBalancingInformation;
027import org.jppf.load.balancer.spi.JPPFBundlerFactory;
028import org.jppf.node.policy.ExecutionPolicy;
029import org.jppf.node.protocol.Task;
030import org.jppf.utils.*;
031import org.jppf.utils.Operator;
032import org.jppf.utils.concurrent.*;
033import org.slf4j.*;
034
035/**
036 * This class provides an API to submit execution requests and administration commands,
037 * and request server information data.<br>
038 * It has its own unique identifier, used by the nodes, to determine whether classes from
039 * the submitting application should be dynamically reloaded or not, depending on whether
040 * the uuid has changed or not.
041 * @author Laurent Cohen
042 */
043public class JPPFClient extends AbstractGenericClient {
044  /**
045   * Logger for this class.
046   */
047  private static Logger log = LoggerFactory.getLogger(JPPFClient.class);
048  /**
049   * Determines whether debug-level logging is enabled.
050   */
051  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
052
053  /**
054   * Initialize this client with an automatically generated application UUID.
055   */
056  public JPPFClient() {
057    this(null, JPPFConfiguration.getProperties());
058  }
059
060  /**
061   * Initialize this client with the specified application UUID.
062   * @param uuid the unique identifier for this local client.
063   */
064  public JPPFClient(final String uuid) {
065    this(uuid, JPPFConfiguration.getProperties());
066  }
067
068  /**
069   * Initialize this client with an automatically generated application UUID.
070   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
071   */
072  public JPPFClient(final ConnectionPoolListener... listeners) {
073    this(null, JPPFConfiguration.getProperties(), listeners);
074  }
075
076  /**
077   * Initialize this client with the specified application UUID and new connection listeners.
078   * @param uuid the unique identifier for this local client.
079   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
080   */
081  public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
082    this(uuid, JPPFConfiguration.getProperties(), listeners);
083  }
084
085  /**
086   * Initialize this client with the specified configuration and connection listeners.
087   * @param config the JPPF configuration to use for this client.
088   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
089   * @exclude
090   */
091  public JPPFClient(final TypedProperties config, final ConnectionPoolListener... listeners) {
092    this(null, config, listeners);
093  }
094
095  /**
096   * Initialize this client with the specified application UUID and new connection listeners.
097   * @param uuid the unique identifier for this local client.
098   * @param config the JPPF configuration to use for this client.
099   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
100   * @exclude
101   */
102  public JPPFClient(final String uuid, final TypedProperties config, final ConnectionPoolListener... listeners) {
103    super(uuid, config, listeners);
104    Debug.register(this);
105  }
106
107  @Override
108  AbstractJPPFClientConnection createConnection(final String name, final JPPFConnectionPool pool) {
109    return new JPPFClientConnectionImpl(this, name, pool);
110  }
111
112  @Override
113  public List<Task<?>> submitJob(final JPPFJob job) {
114    if (isClosed()) throw new IllegalStateException("this client is closed");
115    if (job == null) throw new IllegalArgumentException("job cannot be null");
116    if (job.getJobTasks().isEmpty()) throw new IllegalStateException("job cannot be empty");
117    if (job.client != null) {
118      if (!job.isDone()) throw new IllegalStateException("this job is already submitted");
119      job.cancelled.set(false);
120      job.getResults().clear();
121    }
122    job.client = this;
123    if (debugEnabled) log.debug("submitting job {}", job);
124    ExecutionPolicy defaultPolicy = getDefaultPolicy();
125    ExecutionPolicy jobPolicy = job.getSLA().getExecutionPolicy();
126    if ((jobPolicy == null) && (defaultPolicy != null)) job.getSLA().setExecutionPolicy(defaultPolicy);
127    defaultPolicy = getDefaultClientPolicy();
128    jobPolicy = job.getClientSLA().getExecutionPolicy();
129    if ((jobPolicy == null) && (defaultPolicy != null)) job.getClientSLA().setExecutionPolicy(defaultPolicy);
130    if (log.isTraceEnabled()) {
131      for (Task<?> t: job) {
132        log.trace("task {}, position={}, taskObject={}, taskObject class={}", t, t.getPosition(), t.getTaskObject(), (t.getTaskObject() != null) ? t.getTaskObject().getClass() : null);
133      }
134    }
135    getJobManager().submitJob(job);
136    if (job.isBlocking()) return job.awaitResults();
137    return null;
138  }
139
140  /**
141   * @exclude
142   */
143  @Override
144  protected JobManager createJobManager() {
145    JobManager jobManager = null;
146    try {
147      jobManager = new JobManagerClient(this, bundlerFactory);
148    } catch (final Exception e) {
149      log.error("Can't initialize job Manager", e);
150    }
151    return jobManager;
152  }
153
154  /**
155   * Reset this client, that is, close it if necessary, reload its configuration, then open it again.
156   * If the client is already closed or reseeting, this method has no effect.
157   * @see #reset(TypedProperties)
158   * @since 4.0
159   */
160  public void reset() {
161    if (isClosed()) return;
162    if (debugEnabled) log.debug("resetting client");
163    if (resetting.compareAndSet(false, true)) {
164      close(true);
165      JPPFConfiguration.reset();
166      init(JPPFConfiguration.getProperties());
167    }
168  }
169
170  /**
171   * Reset this client, that is, close it if necessary, then open it again, using the specified confguration.
172   * If the client is already closed or reseeting, this method has no effect.
173   * @param configuration the configuration to initialize this client with.
174   * @see #reset()
175   * @since 4.0
176   */
177  public void reset(final TypedProperties configuration) {
178    if (isClosed()) return;
179    if (debugEnabled) log.debug("resetting client");
180    if (resetting.compareAndSet(false, true)) {
181      close(true);
182      init(configuration);
183    }
184  }
185
186  /**
187   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE} status.
188   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE)}.
189   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
190   * @since 5.0
191   */
192  public JPPFConnectionPool awaitActiveConnectionPool() {
193    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE);
194  }
195
196  /**
197   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
198   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
199   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
200   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
201   * @since 5.0
202   */
203  public JPPFConnectionPool awaitWorkingConnectionPool() {
204    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
205  }
206
207  /**
208   * Wait until there is at least one connection pool with at least one connection in one of the specified statuses.
209   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, statuses)}.
210   * @param statuses the possible statuses of the connections in the pools to wait for.
211   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
212   * @since 5.0
213   */
214  public JPPFConnectionPool awaitConnectionPool(final JPPFClientConnectionStatus...statuses) {
215    return awaitConnectionPool(Long.MAX_VALUE, statuses);
216  }
217
218  /**
219   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
220   * or until the specified timeout to expire, whichever happens first.
221   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
222   * @param statuses the possible statuses of the connections in the pools to wait for.
223   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
224   * @since 5.0
225   */
226  public JPPFConnectionPool awaitConnectionPool(final long timeout, final JPPFClientConnectionStatus...statuses) {
227    final List<JPPFConnectionPool> list = awaitConnectionPools(timeout, statuses);
228    return list.isEmpty() ? null : list.get(0);
229  }
230
231  /**
232   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
233   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
234   * This is a shorthand for {@code awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
235   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
236   * @since 5.1
237   */
238  public List<JPPFConnectionPool> awaitWorkingConnectionPools() {
239    return awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
240  }
241
242  /**
243   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
244   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status, or the specified tiemoput expires, whichever happens first.
245   * This is a shorthand for {@code awaitConnectionPools(tiemout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
246   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
247   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
248   * @since 5.1
249   */
250  public List<JPPFConnectionPool> awaitWorkingConnectionPools(final long timeout) {
251    return awaitConnectionPools(timeout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
252  }
253
254  /**
255   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
256   * or until the specified timeout to expire, whichever happens first.
257   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
258   * @param statuses the possible statuses of the connections in the pools to wait for.
259   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
260   * @since 5.0
261   */
262  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final JPPFClientConnectionStatus...statuses) {
263    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
264    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
265      @Override public boolean evaluate() {
266        return !ref.setSynchronized(findConnectionPools(statuses), pools).isEmpty();
267      }
268    }, timeout);
269    return ref.get();
270  }
271
272  /**
273   * Wait until there is at least one connection pool where the number of connections with the specified statuses
274   * satisfy the specified condition, or until the specified timeout expires, whichever happens first.
275   * @param operator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
276   * @param expectedConnections the expected number of connections to wait for.
277   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
278   * @param statuses the possible statuses of the connections in the pools to wait for.
279   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
280   * @since 5.0
281   */
282  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator operator, final int expectedConnections, final long timeout, final JPPFClientConnectionStatus...statuses) {
283    return awaitConnectionPools(Operator.AT_LEAST, 1, operator, expectedConnections, timeout, statuses);
284  }
285
286  /**
287   * Wait until at least the specified expected connection pools satisfy the condition where the number of connections with the specified statuses
288   * satisfy the specified connection operator, or until the specified timeout expires, whichever happens first.
289   * <p>As an example, to wait for at least 2 pools having each at least one ACTIVE connection, with a timeout of 5 seconds, one would use:
290   * <pre>
291   * JPPFClient client = new JPPFClient();
292   * client.awaitConnectionPools(Operator.AT_LEAST, 2, Operator.AT_LEAST, 1,
293   *   5000L, JPPFClientConnectionStatus.ACTIVE);
294   * </pre>
295   * @param poolOperator the condition on the number of expected pools to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
296   * @param expectedPools the expected number of pools to wait for.
297   * @param connectionOperator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
298   * @param expectedConnections the expected number of connections to wait for.
299   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
300   * @param statuses the possible statuses of the connections in the pools to wait for.
301   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
302   * @since 6.0
303   */
304  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator poolOperator, final int expectedPools, final ComparisonOperator connectionOperator, final int expectedConnections,
305    final long timeout, final JPPFClientConnectionStatus...statuses) {
306    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
307    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
308      @Override public boolean evaluate() {
309        final List<JPPFConnectionPool> result = new ArrayList<>();
310        final List<JPPFConnectionPool> temp = findConnectionPools(statuses);
311        for (final JPPFConnectionPool pool: temp) {
312          final List<JPPFClientConnection> list = pool.getConnections(statuses);
313          if (connectionOperator.evaluate(list.size(), expectedConnections)) result.add(pool);
314        }
315        ref.setSynchronized(result, pools);
316        return poolOperator.evaluate(result.size(), expectedPools);
317      }
318    }, timeout);
319    return ref.get();
320  }
321
322  /**
323   * Wait until there is at least one connection pool where at least one connections passes the specified filter,
324   * or until the specified timeout expires, whichever happens first.
325   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
326   * @param filter an implementation of the {@link ConnectionPoolFilter} interface. A {@code null} value is interpreted as no filter (all pools are accepted).
327   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
328   * @since 5.0
329   */
330  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final ConnectionPoolFilter<JPPFConnectionPool> filter) {
331    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
332    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
333      @Override public boolean evaluate() {
334        final List<JPPFConnectionPool> result = new ArrayList<>();
335        final List<JPPFConnectionPool> temp = getConnectionPools();
336        for (final JPPFConnectionPool pool: temp) {
337          if (filter.accepts(pool)) result.add(pool);
338        }
339        return !ref.setSynchronized(result, pools).isEmpty();
340      }
341    }, timeout);
342    return ref.get();
343  }
344
345  @Override
346  public void close() {
347    log.info("closing {}", this);
348    Debug.unregister(this);
349    super.close();
350  }
351
352  /**
353   * Remove a custom driver discovery mechanism from those already registered.
354   * @param discovery the driver discovery to remove.
355   */
356  public void removeDriverDiscovery(final ClientDriverDiscovery discovery) {
357    discoveryHandler.removeDiscovery(discovery);
358  }
359
360  /**
361   * Get the current load-balancer settings.
362   * @return a {@link LoadBalancingInformation} instance, which encapsulates a load-balancing alfgorithm name, along with its parameters.
363   * @since 5.2.7
364   */
365  public LoadBalancingInformation getLoadBalancerSettings() {
366    final JobManager manager = getJobManager();
367    return (manager == null) ? null : manager.getLoadBalancerSettings();
368  }
369
370  /**
371   * Change the load balancer settings.
372   * @param algorithm the name of load-balancing alogrithm to use.
373   * @param parameters the algorithm's parameters, if any. The parmeter names are assumed no to be prefixed.
374   * @throws Exception if any error occurs or if the algorithm name is {@code null} or not known.
375   * @since 5.2.7
376   */
377  public void setLoadBalancerSettings(final String algorithm, final Properties parameters) throws Exception {
378    final JobManager manager = getJobManager();
379    if (manager != null) manager.setLoadBalancerSettings(algorithm, parameters);
380  }
381
382  /**
383   * Get the factory that creates load-balancer instances.
384   * @return an istance of {@link JPPFBundlerFactory}.
385   * @exclude
386   */
387  public JPPFBundlerFactory getBundlerFactory() {
388    return bundlerFactory;
389  }
390
391  /**
392   * @return the number of idle connections in this client.
393   * @exclude
394   */
395  public int nbIdleCOnnections() {
396    final JobManagerClient manager = (JobManagerClient) getJobManager();
397    return (manager == null) ? -1 : manager.nbAvailableConnections();
398  }
399
400  /**
401   * Get the default server-side job execution policy.
402   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
403   */
404  public ExecutionPolicy getDefaultPolicy() {
405    defaultPolicyLock.lock();
406    try {
407      return defaultPolicy;
408    } finally {
409      defaultPolicyLock.unlock();
410    }
411  }
412
413  /**
414   * Set the default server-side job execution policy.
415   * @param defaultPolicy the execution policy to set as default, may be {@code null}.
416   */
417  public void setDefaultPolicy(final ExecutionPolicy defaultPolicy) {
418    defaultPolicyLock.lock();
419    try {
420      this.defaultPolicy = defaultPolicy;
421    } finally {
422      defaultPolicyLock.unlock();
423    }
424  }
425
426  /**
427   * Get the default client-side job execution policy.
428   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
429   */
430  public ExecutionPolicy getDefaultClientPolicy() {
431    defaultPolicyLock.lock();
432    try {
433      return defaultClientPolicy;
434    } finally {
435      defaultPolicyLock.unlock();
436    }
437  }
438
439  /**
440   * Set the default client-side job execution policy.
441   * @param defaultClientPolicy the execution policy to set as default, may be {@code null}.
442   */
443  public void setDefaultClientPolicy(final ExecutionPolicy defaultClientPolicy) {
444    defaultPolicyLock.lock();
445    try {
446      this.defaultClientPolicy = defaultClientPolicy;
447    } finally {
448      defaultPolicyLock.unlock();
449    }
450  }
451}