001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021
022import org.jppf.client.balancer.JobManagerClient;
023import org.jppf.client.balancer.queue.JPPFPriorityQueue;
024import org.jppf.client.debug.Debug;
025import org.jppf.client.event.ConnectionPoolListener;
026import org.jppf.discovery.ClientDriverDiscovery;
027import org.jppf.job.*;
028import org.jppf.load.balancer.LoadBalancingInformation;
029import org.jppf.load.balancer.spi.JPPFBundlerFactory;
030import org.jppf.node.policy.ExecutionPolicy;
031import org.jppf.node.protocol.Task;
032import org.jppf.utils.*;
033import org.jppf.utils.Operator;
034import org.jppf.utils.concurrent.*;
035import org.slf4j.*;
036
037/**
038 * This class provides an API to submit execution requests and administration commands,
039 * and request server information data.<br>
040 * It has its own unique identifier, used by the nodes, to determine whether classes from
041 * the submitting application should be dynamically reloaded or not, depending on whether
042 * the uuid has changed or not.
043 * @author Laurent Cohen
044 */
045public class JPPFClient extends AbstractGenericClient {
046  /**
047   * Logger for this class.
048   */
049  private static Logger log = LoggerFactory.getLogger(JPPFClient.class);
050  /**
051   * Determines whether debug-level logging is enabled.
052   */
053  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
054
055  /**
056   * Initialize this client with an automatically generated application UUID.
057   */
058  public JPPFClient() {
059    this(null, JPPFConfiguration.getProperties());
060  }
061
062  /**
063   * Initialize this client with the specified application UUID.
064   * @param uuid the unique identifier for this local client.
065   */
066  public JPPFClient(final String uuid) {
067    this(uuid, JPPFConfiguration.getProperties());
068  }
069
070  /**
071   * Initialize this client with an automatically generated application UUID.
072   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
073   */
074  public JPPFClient(final ConnectionPoolListener... listeners) {
075    this(null, JPPFConfiguration.getProperties(), listeners);
076  }
077
078  /**
079   * Initialize this client with the specified application UUID and new connection listeners.
080   * @param uuid the unique identifier for this local client.
081   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
082   */
083  public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
084    this(uuid, JPPFConfiguration.getProperties(), listeners);
085  }
086
087  /**
088   * Initialize this client with the specified configuration and connection listeners.
089   * @param config the JPPF configuration to use for this client.
090   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
091   * @exclude
092   */
093  public JPPFClient(final TypedProperties config, final ConnectionPoolListener... listeners) {
094    this(null, config, listeners);
095  }
096
097  /**
098   * Initialize this client with the specified application UUID and new connection listeners.
099   * @param uuid the unique identifier for this local client.
100   * @param config the JPPF configuration to use for this client.
101   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
102   * @exclude
103   */
104  public JPPFClient(final String uuid, final TypedProperties config, final ConnectionPoolListener... listeners) {
105    super(uuid, config, listeners);
106    Debug.register(this);
107  }
108
109  @Override
110  AbstractJPPFClientConnection createConnection(final String name, final JPPFConnectionPool pool) {
111    return new JPPFClientConnectionImpl(this, name, pool);
112  }
113
114  @Override
115  public List<Task<?>> submitJob(final JPPFJob job) {
116    if (isClosed()) throw new IllegalStateException("this client is closed");
117    if (job == null) throw new IllegalArgumentException("job cannot be null");
118    if (job.getJobTasks().isEmpty()) throw new IllegalStateException("job cannot be empty");
119    if (job.client != null) {
120      if (!job.isDone()) throw new IllegalStateException("this job is already submitted");
121      job.cancelled.set(false);
122      job.getResults().clear();
123    }
124    job.client = this;
125    if (debugEnabled) log.debug("submitting job {}", job);
126    ExecutionPolicy defaultPolicy = getDefaultPolicy();
127    ExecutionPolicy jobPolicy = job.getSLA().getExecutionPolicy();
128    if ((jobPolicy == null) && (defaultPolicy != null)) job.getSLA().setExecutionPolicy(defaultPolicy);
129    defaultPolicy = getDefaultClientPolicy();
130    jobPolicy = job.getClientSLA().getExecutionPolicy();
131    if ((jobPolicy == null) && (defaultPolicy != null)) job.getClientSLA().setExecutionPolicy(defaultPolicy);
132    if (log.isTraceEnabled()) {
133      for (Task<?> t: job) {
134        log.trace("task {}, position={}, taskObject={}, taskObject class={}", t, t.getPosition(), t.getTaskObject(), (t.getTaskObject() != null) ? t.getTaskObject().getClass() : null);
135      }
136    }
137    getJobManager().submitJob(job);
138    if (job.isBlocking()) return job.awaitResults();
139    return null;
140  }
141
142  /**
143   * @exclude
144   */
145  @Override
146  protected JobManager createJobManager() {
147    JobManager jobManager = null;
148    try {
149      jobManager = new JobManagerClient(this, bundlerFactory);
150    } catch (final Exception e) {
151      log.error("Can't initialize job Manager", e);
152    }
153    return jobManager;
154  }
155
156  /**
157   * Reset this client, that is, close it if necessary, reload its configuration, then open it again.
158   * If the client is already closed or reseeting, this method has no effect.
159   * @see #reset(TypedProperties)
160   * @since 4.0
161   */
162  public void reset() {
163    if (isClosed()) return;
164    if (debugEnabled) log.debug("resetting client");
165    if (resetting.compareAndSet(false, true)) {
166      close(true);
167      JPPFConfiguration.reset();
168      init(JPPFConfiguration.getProperties());
169    }
170  }
171
172  /**
173   * Reset this client, that is, close it if necessary, then open it again, using the specified confguration.
174   * If the client is already closed or reseeting, this method has no effect.
175   * @param configuration the configuration to initialize this client with.
176   * @see #reset()
177   * @since 4.0
178   */
179  public void reset(final TypedProperties configuration) {
180    if (isClosed()) return;
181    if (debugEnabled) log.debug("resetting client");
182    if (resetting.compareAndSet(false, true)) {
183      close(true);
184      init(configuration);
185    }
186  }
187
188  /**
189   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE} status.
190   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE)}.
191   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
192   * @since 5.0
193   */
194  public JPPFConnectionPool awaitActiveConnectionPool() {
195    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE);
196  }
197
198  /**
199   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
200   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
201   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
202   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
203   * @since 5.0
204   */
205  public JPPFConnectionPool awaitWorkingConnectionPool() {
206    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
207  }
208
209  /**
210   * Wait until there is at least one connection pool with at least one connection in one of the specified statuses.
211   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, statuses)}.
212   * @param statuses the possible statuses of the connections in the pools to wait for.
213   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
214   * @since 5.0
215   */
216  public JPPFConnectionPool awaitConnectionPool(final JPPFClientConnectionStatus...statuses) {
217    return awaitConnectionPool(Long.MAX_VALUE, statuses);
218  }
219
220  /**
221   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
222   * or until the specified timeout to expire, whichever happens first.
223   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
224   * @param statuses the possible statuses of the connections in the pools to wait for.
225   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
226   * @since 5.0
227   */
228  public JPPFConnectionPool awaitConnectionPool(final long timeout, final JPPFClientConnectionStatus...statuses) {
229    final List<JPPFConnectionPool> list = awaitConnectionPools(timeout, statuses);
230    return list.isEmpty() ? null : list.get(0);
231  }
232
233  /**
234   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
235   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
236   * This is a shorthand for {@code awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
237   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
238   * @since 5.1
239   */
240  public List<JPPFConnectionPool> awaitWorkingConnectionPools() {
241    return awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
242  }
243
244  /**
245   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
246   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status, or the specified tiemoput expires, whichever happens first.
247   * This is a shorthand for {@code awaitConnectionPools(tiemout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
248   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
249   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
250   * @since 5.1
251   */
252  public List<JPPFConnectionPool> awaitWorkingConnectionPools(final long timeout) {
253    return awaitConnectionPools(timeout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
254  }
255
256  /**
257   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
258   * or until the specified timeout to expire, whichever happens first.
259   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
260   * @param statuses the possible statuses of the connections in the pools to wait for.
261   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
262   * @since 5.0
263   */
264  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final JPPFClientConnectionStatus...statuses) {
265    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
266    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
267      @Override public boolean evaluate() {
268        return !ref.setSynchronized(findConnectionPools(statuses), pools).isEmpty();
269      }
270    }, timeout);
271    return ref.get();
272  }
273
274  /**
275   * Wait until there is at least one connection pool where the number of connections with the specified statuses
276   * satisfy the specified condition, or until the specified timeout expires, whichever happens first.
277   * @param operator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
278   * @param expectedConnections the expected number of connections to wait for.
279   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
280   * @param statuses the possible statuses of the connections in the pools to wait for.
281   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
282   * @since 5.0
283   */
284  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator operator, final int expectedConnections, final long timeout, final JPPFClientConnectionStatus...statuses) {
285    return awaitConnectionPools(Operator.AT_LEAST, 1, operator, expectedConnections, timeout, statuses);
286  }
287
288  /**
289   * Wait until at least the specified expected connection pools satisfy the condition where the number of connections with the specified statuses
290   * satisfy the specified connection operator, or until the specified timeout expires, whichever happens first.
291   * <p>As an example, to wait for at least 2 pools having each at least one ACTIVE connection, with a timeout of 5 seconds, one would use:
292   * <pre>
293   * JPPFClient client = new JPPFClient();
294   * client.awaitConnectionPools(Operator.AT_LEAST, 2, Operator.AT_LEAST, 1,
295   *   5000L, JPPFClientConnectionStatus.ACTIVE);
296   * </pre>
297   * @param poolOperator the condition on the number of expected pools to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
298   * @param expectedPools the expected number of pools to wait for.
299   * @param connectionOperator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
300   * @param expectedConnections the expected number of connections to wait for.
301   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
302   * @param statuses the possible statuses of the connections in the pools to wait for.
303   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
304   * @since 6.0
305   */
306  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator poolOperator, final int expectedPools, final ComparisonOperator connectionOperator, final int expectedConnections,
307    final long timeout, final JPPFClientConnectionStatus...statuses) {
308    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
309    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
310      @Override public boolean evaluate() {
311        final List<JPPFConnectionPool> result = new ArrayList<>();
312        final List<JPPFConnectionPool> temp = findConnectionPools(statuses);
313        for (final JPPFConnectionPool pool: temp) {
314          final List<JPPFClientConnection> list = pool.getConnections(statuses);
315          if (connectionOperator.evaluate(list.size(), expectedConnections)) result.add(pool);
316        }
317        ref.setSynchronized(result, pools);
318        return poolOperator.evaluate(result.size(), expectedPools);
319      }
320    }, timeout);
321    return ref.get();
322  }
323
324  /**
325   * Wait until there is at least one connection pool where at least one connections passes the specified filter,
326   * or until the specified timeout expires, whichever happens first.
327   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
328   * @param filter an implementation of the {@link ConnectionPoolFilter} interface. A {@code null} value is interpreted as no filter (all pools are accepted).
329   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
330   * @since 5.0
331   */
332  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final ConnectionPoolFilter<JPPFConnectionPool> filter) {
333    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
334    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
335      @Override public boolean evaluate() {
336        final List<JPPFConnectionPool> result = new ArrayList<>();
337        final List<JPPFConnectionPool> temp = getConnectionPools();
338        for (final JPPFConnectionPool pool: temp) {
339          if (filter.accepts(pool)) result.add(pool);
340        }
341        return !ref.setSynchronized(result, pools).isEmpty();
342      }
343    }, timeout);
344    return ref.get();
345  }
346
347  @Override
348  public void close() {
349    log.info("closing {}", this);
350    Debug.unregister(this);
351    super.close();
352  }
353
354  /**
355   * Remove a custom driver discovery mechanism from those already registered.
356   * @param discovery the driver discovery to remove.
357   */
358  public void removeDriverDiscovery(final ClientDriverDiscovery discovery) {
359    discoveryHandler.removeDiscovery(discovery);
360  }
361
362  /**
363   * Get the current load-balancer settings.
364   * @return a {@link LoadBalancingInformation} instance, which encapsulates a load-balancing alfgorithm name, along with its parameters.
365   * @since 5.2.7
366   */
367  public LoadBalancingInformation getLoadBalancerSettings() {
368    final JobManager manager = getJobManager();
369    return (manager == null) ? null : manager.getLoadBalancerSettings();
370  }
371
372  /**
373   * Change the load balancer settings.
374   * @param algorithm the name of load-balancing alogrithm to use.
375   * @param parameters the algorithm's parameters, if any. The parmeter names are assumed no to be prefixed.
376   * @throws Exception if any error occurs or if the algorithm name is {@code null} or not known.
377   * @since 5.2.7
378   */
379  public void setLoadBalancerSettings(final String algorithm, final Properties parameters) throws Exception {
380    final JobManager manager = getJobManager();
381    if (manager != null) manager.setLoadBalancerSettings(algorithm, parameters);
382  }
383
384  /**
385   * Get the factory that creates load-balancer instances.
386   * @return an istance of {@link JPPFBundlerFactory}.
387   * @exclude
388   */
389  public JPPFBundlerFactory getBundlerFactory() {
390    return bundlerFactory;
391  }
392
393  /**
394   * @return the number of idle connections in this client.
395   * @exclude
396   */
397  public int nbIdleCOnnections() {
398    final JobManagerClient manager = (JobManagerClient) getJobManager();
399    return (manager == null) ? -1 : manager.nbAvailableConnections();
400  }
401
402  /**
403   * Get the default server-side job execution policy.
404   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
405   */
406  public ExecutionPolicy getDefaultPolicy() {
407    defaultPolicyLock.lock();
408    try {
409      return defaultPolicy;
410    } finally {
411      defaultPolicyLock.unlock();
412    }
413  }
414
415  /**
416   * Set the default server-side job execution policy.
417   * @param defaultPolicy the execution policy to set as default, may be {@code null}.
418   */
419  public void setDefaultPolicy(final ExecutionPolicy defaultPolicy) {
420    defaultPolicyLock.lock();
421    try {
422      this.defaultPolicy = defaultPolicy;
423    } finally {
424      defaultPolicyLock.unlock();
425    }
426  }
427
428  /**
429   * Get the default client-side job execution policy.
430   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
431   */
432  public ExecutionPolicy getDefaultClientPolicy() {
433    defaultPolicyLock.lock();
434    try {
435      return defaultClientPolicy;
436    } finally {
437      defaultPolicyLock.unlock();
438    }
439  }
440
441  /**
442   * Set the default client-side job execution policy.
443   * @param defaultClientPolicy the execution policy to set as default, may be {@code null}.
444   */
445  public void setDefaultClientPolicy(final ExecutionPolicy defaultClientPolicy) {
446    defaultPolicyLock.lock();
447    try {
448      this.defaultClientPolicy = defaultClientPolicy;
449    } finally {
450      defaultPolicyLock.unlock();
451    }
452  }
453
454  /**
455   * Get the list of currently queued jobs.
456   * @return a list of {@link JPPFJob} instances, possibly empty.
457   */
458  public List<JPPFJob> getQueuedJobs() {
459    return getQueuedJobs(null);
460  }
461 
462  /**
463   * Get a list of currently queued jobs, filtered by a {@link JobSelector}.
464   * @param selector a job filter to apply. May be {@code null}, in which case all queued jobs are returned.
465   * @return a list of {@link JPPFJob} instances that satisfy the provided job selector, possibly empty.
466   */
467  public List<JPPFJob> getQueuedJobs(final JobSelector selector) {
468    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
469    final List<JPPFJob> allJobs = queue.getJPPFJobs();
470    if ((selector == null) || (selector instanceof AllJobsSelector)) return allJobs;
471    final List<JPPFJob> result = new ArrayList<>(allJobs.size() < 10 ? 10 : allJobs.size());
472    for (final JPPFJob job: allJobs) {
473      if (selector.accepts(job)) result.add(job);
474    }
475    return result;
476  }
477 
478  /**
479   * Get the current number of queued jobs.
480   * @return the current queued jobs count as an {@code int}.
481   */
482  public int getQueuedJobsCount() {
483    return getQueuedJobsCount(null);
484  }
485 
486  /**
487   * Get the current number of jobs that satisfy a job selector.
488   * @param selector a job filter to apply. May be {@code null}, in which case the count of all queued jobs is returned.
489   * @return the number of queued jobs that satisfy the filter.
490   */
491  public int getQueuedJobsCount(final JobSelector selector) {
492    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
493    if ((selector == null) || (selector instanceof AllJobsSelector)) return queue.getQueueSize();
494    final List<JPPFJob> allJobs = queue.getJPPFJobs();
495    int result = 0;
496    for (final JPPFJob job: allJobs) {
497      if (selector.accepts(job)) result++;
498    }
499    return result;
500  }
501}