001/*
002 * JPPF.
003 * Copyright (C) 2005-2019 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021
022import org.jppf.client.balancer.JobManagerClient;
023import org.jppf.client.balancer.queue.JPPFPriorityQueue;
024import org.jppf.client.debug.Debug;
025import org.jppf.client.event.ConnectionPoolListener;
026import org.jppf.discovery.ClientDriverDiscovery;
027import org.jppf.job.*;
028import org.jppf.load.balancer.LoadBalancingInformation;
029import org.jppf.load.balancer.spi.JPPFBundlerFactory;
030import org.jppf.node.policy.ExecutionPolicy;
031import org.jppf.node.protocol.Task;
032import org.jppf.utils.*;
033import org.jppf.utils.concurrent.*;
034import org.slf4j.*;
035
036/**
037 * This class provides an API to submit execution requests and administration commands,
038 * and request server information data.<br>
039 * It has its own unique identifier, used by the nodes, to determine whether classes from
040 * the submitting application should be dynamically reloaded or not, depending on whether
041 * the uuid has changed or not.
042 * @author Laurent Cohen
043 */
044public class JPPFClient extends AbstractGenericClient {
045  /**
046   * Logger for this class.
047   */
048  private static Logger log = LoggerFactory.getLogger(JPPFClient.class);
049  /**
050   * Determines whether debug-level logging is enabled.
051   */
052  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
053
054  /**
055   * Initialize this client with an automatically generated application UUID.
056   */
057  public JPPFClient() {
058    this(null, JPPFConfiguration.getProperties());
059  }
060
061  /**
062   * Initialize this client with the specified application UUID.
063   * @param uuid the unique identifier for this local client.
064   */
065  public JPPFClient(final String uuid) {
066    this(uuid, JPPFConfiguration.getProperties());
067  }
068
069  /**
070   * Initialize this client with an automatically generated application UUID.
071   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
072   */
073  public JPPFClient(final ConnectionPoolListener... listeners) {
074    this(null, JPPFConfiguration.getProperties(), listeners);
075  }
076
077  /**
078   * Initialize this client with the specified application UUID and new connection listeners.
079   * @param uuid the unique identifier for this local client.
080   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
081   */
082  public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
083    this(uuid, JPPFConfiguration.getProperties(), listeners);
084  }
085
086  /**
087   * Initialize this client with the specified configuration and connection listeners.
088   * @param config the JPPF configuration to use for this client.
089   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
090   * @exclude
091   */
092  public JPPFClient(final TypedProperties config, final ConnectionPoolListener... listeners) {
093    this(null, config, listeners);
094  }
095
096  /**
097   * Initialize this client with the specified application UUID and new connection listeners.
098   * @param uuid the unique identifier for this local client.
099   * @param config the JPPF configuration to use for this client.
100   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
101   * @exclude
102   */
103  public JPPFClient(final String uuid, final TypedProperties config, final ConnectionPoolListener... listeners) {
104    super(uuid, config, listeners);
105    Debug.register(this);
106  }
107
108  @Override
109  AbstractJPPFClientConnection createConnection(final String name, final JPPFConnectionPool pool) {
110    return new JPPFClientConnectionImpl(this, name, pool);
111  }
112
113  /**
114   * Submit the specified job for execution.
115   * @param job the job to submit and execute.
116   * @return the job's results as a list of {@link Task tasks} if the job is {@link AbstractJPPFJob#isBlocking() blocking}, or {@code null} if it is non-blocking.
117   * @deprecated a job should be submittable either synchronously or asynchronously, regardless of its state.
118   * The way it is submitted is the user's choice at the time of submission, using one of {@link JPPFClient#submit(JPPFJob)} or {@link JPPFClient#submitAsync(JPPFJob)}. 
119   */
120  public List<Task<?>> submitJob(final JPPFJob job) {
121    if (job.isBlocking()) return submit(job);
122    submitAsync(job);
123    return null;
124  }
125
126  /**
127   * Submit the specified job for execution and wait for its completion.
128   * @param job the job to submit and execute.
129   * @return the job's results as a list of {@link Task tasks}.
130   * @since 6.1
131   */
132  public List<Task<?>> submit(final JPPFJob job) {
133    preSubmit(job);
134    getJobManager().submitJob(job);
135    return job.awaitResults();
136  }
137
138  /**
139   * Submit the specified job asynchronously for execution, without waiting for the job to complete.
140   * @param job the job to submit and execute.
141   * @return the submitted job.
142   * @since 6.1
143   */
144  public JPPFJob submitAsync(final JPPFJob job) {
145    preSubmit(job);
146    getJobManager().submitJob(job);
147    return job;
148  }
149
150  /**
151   * Perform pre-submission job checks and adjustments.
152   * @param job the jkob to check.
153   */
154  private void preSubmit(final JPPFJob job) {
155    if (isClosed()) throw new IllegalStateException("this client is closed");
156    if (job == null) throw new IllegalArgumentException("job cannot be null");
157    if (job.getJobTasks().isEmpty()) throw new IllegalStateException("job cannot be empty");
158    if (job.client != null) {
159      if (!job.isDone()) throw new IllegalStateException("this job is already submitted");
160      job.cancelled.set(false);
161      job.getResults().clear();
162    }
163    job.client = this;
164    if (debugEnabled) log.debug("submitting job {}", job);
165    ExecutionPolicy defaultPolicy = getDefaultPolicy();
166    ExecutionPolicy jobPolicy = job.getSLA().getExecutionPolicy();
167    if ((jobPolicy == null) && (defaultPolicy != null)) job.getSLA().setExecutionPolicy(defaultPolicy);
168    defaultPolicy = getDefaultClientPolicy();
169    jobPolicy = job.getClientSLA().getExecutionPolicy();
170    if ((jobPolicy == null) && (defaultPolicy != null)) job.getClientSLA().setExecutionPolicy(defaultPolicy);
171    if (log.isTraceEnabled()) {
172      job.forEach(t -> log.trace("task {}, pos={}, taskObject={}, taskObject class={}", t, t.getPosition(), t.getTaskObject(), (t.getTaskObject() != null) ? t.getTaskObject().getClass() : null));
173    }
174  }
175
176  /**
177   * {@inheritDoc}
178   * @exclude
179   */
180  @Override
181  protected JobManager createJobManager() {
182    JobManager jobManager = null;
183    try {
184      jobManager = new JobManagerClient(this, bundlerFactory);
185    } catch (final Exception e) {
186      log.error("Can't initialize job Manager", e);
187    }
188    return jobManager;
189  }
190
191  /**
192   * Reset this client, that is, close it if necessary, reload its configuration, then open it again.
193   * If the client is already closed or reseeting, this method has no effect.
194   * @see #reset(TypedProperties)
195   * @since 4.0
196   */
197  public void reset() {
198    if (isClosed()) return;
199    if (debugEnabled) log.debug("resetting client");
200    if (resetting.compareAndSet(false, true)) {
201      close(true);
202      JPPFConfiguration.reset();
203      init(JPPFConfiguration.getProperties());
204    }
205  }
206
207  /**
208   * Reset this client, that is, close it if necessary, then open it again, using the specified confguration.
209   * If the client is already closed or reseeting, this method has no effect.
210   * @param configuration the configuration to initialize this client with.
211   * @see #reset()
212   * @since 4.0
213   */
214  public void reset(final TypedProperties configuration) {
215    if (isClosed()) return;
216    if (debugEnabled) log.debug("resetting client");
217    if (resetting.compareAndSet(false, true)) {
218      close(true);
219      init(configuration);
220    }
221  }
222
223  /**
224   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE} status.
225   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE)}.
226   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
227   * @since 5.0
228   */
229  public JPPFConnectionPool awaitActiveConnectionPool() {
230    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE);
231  }
232
233  /**
234   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
235   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
236   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
237   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
238   * @since 5.0
239   */
240  public JPPFConnectionPool awaitWorkingConnectionPool() {
241    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
242  }
243
244  /**
245   * Wait until there is at least one connection pool with at least one connection in one of the specified statuses.
246   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, statuses)}.
247   * @param statuses the possible statuses of the connections in the pools to wait for.
248   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
249   * @since 5.0
250   */
251  public JPPFConnectionPool awaitConnectionPool(final JPPFClientConnectionStatus...statuses) {
252    return awaitConnectionPool(Long.MAX_VALUE, statuses);
253  }
254
255  /**
256   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
257   * or until the specified timeout to expire, whichever happens first.
258   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
259   * @param statuses the possible statuses of the connections in the pools to wait for.
260   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
261   * @since 5.0
262   */
263  public JPPFConnectionPool awaitConnectionPool(final long timeout, final JPPFClientConnectionStatus...statuses) {
264    final List<JPPFConnectionPool> list = awaitConnectionPools(timeout, statuses);
265    return list.isEmpty() ? null : list.get(0);
266  }
267
268  /**
269   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
270   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
271   * This is a shorthand for {@code awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
272   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
273   * @since 5.1
274   */
275  public List<JPPFConnectionPool> awaitWorkingConnectionPools() {
276    return awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
277  }
278
279  /**
280   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
281   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status, or the specified tiemoput expires, whichever happens first.
282   * This is a shorthand for {@code awaitConnectionPools(tiemout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
283   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
284   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
285   * @since 5.1
286   */
287  public List<JPPFConnectionPool> awaitWorkingConnectionPools(final long timeout) {
288    return awaitConnectionPools(timeout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
289  }
290
291  /**
292   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
293   * or until the specified timeout to expire, whichever happens first.
294   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
295   * @param statuses the possible statuses of the connections in the pools to wait for.
296   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
297   * @since 5.0
298   */
299  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final JPPFClientConnectionStatus...statuses) {
300    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
301    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
302      @Override public boolean evaluate() {
303        return !ref.setSynchronized(findConnectionPools(statuses), pools).isEmpty();
304      }
305    }, timeout);
306    return ref.get();
307  }
308
309  /**
310   * Wait until there is at least one connection pool where the number of connections with the specified statuses
311   * satisfy the specified condition, or until the specified timeout expires, whichever happens first.
312   * @param operator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
313   * @param expectedConnections the expected number of connections to wait for.
314   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
315   * @param statuses the possible statuses of the connections in the pools to wait for.
316   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
317   * @since 5.0
318   */
319  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator operator, final int expectedConnections, final long timeout, final JPPFClientConnectionStatus...statuses) {
320    return awaitConnectionPools(Operator.AT_LEAST, 1, operator, expectedConnections, timeout, statuses);
321  }
322
323  /**
324   * Wait until at least the specified expected connection pools satisfy the condition where the number of connections with the specified statuses
325   * satisfy the specified connection operator, or until the specified timeout expires, whichever happens first.
326   * <p>As an example, to wait for at least 2 pools having each at least one ACTIVE connection, with a timeout of 5 seconds, one would use:
327   * <pre>
328   * JPPFClient client = new JPPFClient();
329   * client.awaitConnectionPools(Operator.AT_LEAST, 2, Operator.AT_LEAST, 1,
330   *   5000L, JPPFClientConnectionStatus.ACTIVE);
331   * </pre>
332   * @param poolOperator the condition on the number of expected pools to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
333   * @param expectedPools the expected number of pools to wait for.
334   * @param connectionOperator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
335   * @param expectedConnections the expected number of connections to wait for.
336   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
337   * @param statuses the possible statuses of the connections in the pools to wait for.
338   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
339   * @since 6.0
340   */
341  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator poolOperator, final int expectedPools, final ComparisonOperator connectionOperator, final int expectedConnections,
342    final long timeout, final JPPFClientConnectionStatus...statuses) {
343    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
344    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
345      @Override public boolean evaluate() {
346        final List<JPPFConnectionPool> result = new ArrayList<>();
347        final List<JPPFConnectionPool> temp = findConnectionPools(statuses);
348        for (final JPPFConnectionPool pool: temp) {
349          final List<JPPFClientConnection> list = pool.getConnections(statuses);
350          if (connectionOperator.evaluate(list.size(), expectedConnections)) result.add(pool);
351        }
352        ref.setSynchronized(result, pools);
353        return poolOperator.evaluate(result.size(), expectedPools);
354      }
355    }, timeout);
356    return ref.get();
357  }
358
359  /**
360   * Wait until there is at least one connection pool where at least one connections passes the specified filter,
361   * or until the specified timeout expires, whichever happens first.
362   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
363   * @param filter an implementation of the {@link ConnectionPoolFilter} interface. A {@code null} value is interpreted as no filter (all pools are accepted).
364   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
365   * @since 5.0
366   */
367  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final ConnectionPoolFilter<JPPFConnectionPool> filter) {
368    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
369    ConcurrentUtils.awaitCondition(new ConcurrentUtils.Condition() {
370      @Override public boolean evaluate() {
371        final List<JPPFConnectionPool> result = new ArrayList<>();
372        final List<JPPFConnectionPool> temp = getConnectionPools();
373        for (final JPPFConnectionPool pool: temp) {
374          if (filter.accepts(pool)) result.add(pool);
375        }
376        return !ref.setSynchronized(result, pools).isEmpty();
377      }
378    }, timeout);
379    return ref.get();
380  }
381
382  @Override
383  public void close() {
384    log.info("closing {}", this);
385    Debug.unregister(this);
386    super.close();
387  }
388
389  /**
390   * Remove a custom driver discovery mechanism from those already registered.
391   * @param discovery the driver discovery to remove.
392   */
393  public void removeDriverDiscovery(final ClientDriverDiscovery discovery) {
394    discoveryHandler.removeDiscovery(discovery);
395  }
396
397  /**
398   * Get the current load-balancer settings.
399   * @return a {@link LoadBalancingInformation} instance, which encapsulates a load-balancing alfgorithm name, along with its parameters.
400   * @since 5.2.7
401   */
402  public LoadBalancingInformation getLoadBalancerSettings() {
403    final JobManager manager = getJobManager();
404    return (manager == null) ? null : manager.getLoadBalancerSettings();
405  }
406
407  /**
408   * Change the load balancer settings.
409   * @param algorithm the name of load-balancing alogrithm to use.
410   * @param parameters the algorithm's parameters, if any. The parmeter names are assumed no to be prefixed.
411   * @throws Exception if any error occurs or if the algorithm name is {@code null} or not known.
412   * @since 5.2.7
413   */
414  public void setLoadBalancerSettings(final String algorithm, final Properties parameters) throws Exception {
415    final JobManager manager = getJobManager();
416    if (manager != null) manager.setLoadBalancerSettings(algorithm, parameters);
417  }
418
419  /**
420   * Get the factory that creates load-balancer instances.
421   * @return an istance of {@link JPPFBundlerFactory}.
422   * @exclude
423   */
424  public JPPFBundlerFactory getBundlerFactory() {
425    return bundlerFactory;
426  }
427
428  /**
429   * @return the number of idle connections in this client.
430   * @exclude
431   */
432  public int nbIdleCOnnections() {
433    final JobManagerClient manager = (JobManagerClient) getJobManager();
434    return (manager == null) ? -1 : manager.nbAvailableConnections();
435  }
436
437  /**
438   * Get the default server-side job execution policy.
439   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
440   */
441  public ExecutionPolicy getDefaultPolicy() {
442    defaultPolicyLock.lock();
443    try {
444      return defaultPolicy;
445    } finally {
446      defaultPolicyLock.unlock();
447    }
448  }
449
450  /**
451   * Set the default server-side job execution policy.
452   * @param defaultPolicy the execution policy to set as default, may be {@code null}.
453   */
454  public void setDefaultPolicy(final ExecutionPolicy defaultPolicy) {
455    defaultPolicyLock.lock();
456    try {
457      this.defaultPolicy = defaultPolicy;
458    } finally {
459      defaultPolicyLock.unlock();
460    }
461  }
462
463  /**
464   * Get the default client-side job execution policy.
465   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
466   */
467  public ExecutionPolicy getDefaultClientPolicy() {
468    defaultPolicyLock.lock();
469    try {
470      return defaultClientPolicy;
471    } finally {
472      defaultPolicyLock.unlock();
473    }
474  }
475
476  /**
477   * Set the default client-side job execution policy.
478   * @param defaultClientPolicy the execution policy to set as default, may be {@code null}.
479   */
480  public void setDefaultClientPolicy(final ExecutionPolicy defaultClientPolicy) {
481    defaultPolicyLock.lock();
482    try {
483      this.defaultClientPolicy = defaultClientPolicy;
484    } finally {
485      defaultPolicyLock.unlock();
486    }
487  }
488
489  /**
490   * Get the list of currently queued jobs.
491   * @return a list of {@link JPPFJob} instances, possibly empty.
492   */
493  public List<JPPFJob> getQueuedJobs() {
494    return getQueuedJobs(null);
495  }
496 
497  /**
498   * Get a list of currently queued jobs, filtered by a {@link JobSelector}.
499   * @param selector a job filter to apply. May be {@code null}, in which case all queued jobs are returned.
500   * @return a list of {@link JPPFJob} instances that satisfy the provided job selector, possibly empty.
501   */
502  public List<JPPFJob> getQueuedJobs(final JobSelector selector) {
503    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
504    final List<JPPFJob> allJobs = queue.getJPPFJobs();
505    if ((selector == null) || (selector instanceof AllJobsSelector)) return allJobs;
506    final List<JPPFJob> result = new ArrayList<>(allJobs.size() < 10 ? 10 : allJobs.size());
507    for (final JPPFJob job: allJobs) {
508      if (selector.accepts(job)) result.add(job);
509    }
510    return result;
511  }
512
513  /**
514   * Get the current number of queued jobs.
515   * @return the current queued jobs count as an {@code int}.
516   */
517  public int getQueuedJobsCount() {
518    return getQueuedJobsCount(null);
519  }
520 
521  /**
522   * Get the current number of jobs that satisfy a job selector.
523   * @param selector a job filter to apply. May be {@code null}, in which case the count of all queued jobs is returned.
524   * @return the number of queued jobs that satisfy the filter.
525   */
526  public int getQueuedJobsCount(final JobSelector selector) {
527    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
528    if ((selector == null) || (selector instanceof AllJobsSelector)) return queue.getQueueSize();
529    final List<JPPFJob> allJobs = queue.getJPPFJobs();
530    int result = 0;
531    for (final JPPFJob job: allJobs) {
532      if (selector.accepts(job)) result++;
533    }
534    return result;
535  }
536}