001/*
002 * JPPF.
003 * Copyright (C) 2005-2019 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021
022import org.jppf.client.balancer.JobManagerClient;
023import org.jppf.client.balancer.queue.JPPFPriorityQueue;
024import org.jppf.client.debug.Debug;
025import org.jppf.client.event.ConnectionPoolListener;
026import org.jppf.discovery.ClientDriverDiscovery;
027import org.jppf.job.*;
028import org.jppf.load.balancer.LoadBalancingInformation;
029import org.jppf.load.balancer.spi.JPPFBundlerFactory;
030import org.jppf.node.policy.ExecutionPolicy;
031import org.jppf.node.protocol.Task;
032import org.jppf.utils.*;
033import org.jppf.utils.concurrent.*;
034import org.slf4j.*;
035
036/**
037 * This class provides an API to submit execution requests and administration commands,
038 * and request server information data.<br>
039 * It has its own unique identifier, used by the nodes, to determine whether classes from
040 * the submitting application should be dynamically reloaded or not, depending on whether
041 * the uuid has changed or not.
042 * @author Laurent Cohen
043 */
044public class JPPFClient extends AbstractGenericClient {
045  /**
046   * Logger for this class.
047   */
048  private static Logger log = LoggerFactory.getLogger(JPPFClient.class);
049  /**
050   * Determines whether debug-level logging is enabled.
051   */
052  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
053
054  /**
055   * Initialize this client with an automatically generated application UUID.
056   */
057  public JPPFClient() {
058    this(null, JPPFConfiguration.getProperties());
059  }
060
061  /**
062   * Initialize this client with the specified application UUID.
063   * @param uuid the unique identifier for this local client.
064   */
065  public JPPFClient(final String uuid) {
066    this(uuid, JPPFConfiguration.getProperties());
067  }
068
069  /**
070   * Initialize this client with an automatically generated application UUID.
071   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
072   */
073  public JPPFClient(final ConnectionPoolListener... listeners) {
074    this(null, JPPFConfiguration.getProperties(), listeners);
075  }
076
077  /**
078   * Initialize this client with the specified application UUID and new connection listeners.
079   * @param uuid the unique identifier for this local client.
080   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
081   */
082  public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
083    this(uuid, JPPFConfiguration.getProperties(), listeners);
084  }
085
086  /**
087   * Initialize this client with the specified configuration and connection listeners.
088   * @param config the JPPF configuration to use for this client.
089   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
090   */
091  public JPPFClient(final TypedProperties config, final ConnectionPoolListener... listeners) {
092    this(null, config, listeners);
093  }
094
095  /**
096   * Initialize this client with the specified application UUID and new connection listeners.
097   * @param uuid the unique identifier for this local client.
098   * @param config the JPPF configuration to use for this client.
099   * @param listeners the optional listeners to add to this JPPF client to receive notifications of new connections.
100   */
101  public JPPFClient(final String uuid, final TypedProperties config, final ConnectionPoolListener... listeners) {
102    super(uuid, config, listeners);
103    Debug.register(this);
104  }
105
106  @Override
107  JPPFClientConnectionImpl createConnection(final String name, final JPPFConnectionPool pool) {
108    return new JPPFClientConnectionImpl(this, name, pool);
109  }
110
111  /**
112   * Submit the specified job for execution.
113   * @param job the job to submit and execute.
114   * @return the job's results as a list of {@link Task tasks} if the job is {@link AbstractJPPFJob#isBlocking() blocking}, or {@code null} if it is non-blocking.
115   * @deprecated a job should be submittable either synchronously or asynchronously, regardless of its state.
116   * The way it is submitted is the user's choice at the time of submission, using one of {@link JPPFClient#submit(JPPFJob)} or {@link JPPFClient#submitAsync(JPPFJob)}. 
117   */
118  public List<Task<?>> submitJob(final JPPFJob job) {
119    if (job.isBlocking()) return submit(job);
120    submitAsync(job);
121    return null;
122  }
123
124  /**
125   * Submit the specified job for execution and wait for its completion.
126   * @param job the job to submit and execute.
127   * @return the job's results as a list of {@link Task tasks}.
128   * @since 6.1
129   */
130  public List<Task<?>> submit(final JPPFJob job) {
131    preSubmit(job);
132    getJobManager().submitJob(job);
133    return job.awaitResults();
134  }
135
136  /**
137   * Submit the specified job asynchronously for execution, without waiting for the job to complete.
138   * @param job the job to submit and execute.
139   * @return the submitted job.
140   * @since 6.1
141   */
142  public JPPFJob submitAsync(final JPPFJob job) {
143    preSubmit(job);
144    getJobManager().submitJob(job);
145    return job;
146  }
147
148  /**
149   * Perform pre-submission job checks and adjustments.
150   * @param job the jkob to check.
151   */
152  private void preSubmit(final JPPFJob job) {
153    if (isClosed()) throw new IllegalStateException("this client is closed");
154    if (job == null) throw new IllegalArgumentException("job cannot be null");
155    if (job.getJobTasks().isEmpty()) throw new IllegalStateException("job cannot be empty");
156    if (job.client != null) {
157      if (!job.isDone()) throw new IllegalStateException("this job is already submitted");
158      job.cancelled.set(false);
159      job.getResults().clear();
160    }
161    job.client = this;
162    if (debugEnabled) log.debug("submitting job {}", job);
163    ExecutionPolicy defaultPolicy = getDefaultPolicy();
164    ExecutionPolicy jobPolicy = job.getSLA().getExecutionPolicy();
165    if ((jobPolicy == null) && (defaultPolicy != null)) job.getSLA().setExecutionPolicy(defaultPolicy);
166    defaultPolicy = getDefaultClientPolicy();
167    jobPolicy = job.getClientSLA().getExecutionPolicy();
168    if ((jobPolicy == null) && (defaultPolicy != null)) job.getClientSLA().setExecutionPolicy(defaultPolicy);
169    if (log.isTraceEnabled()) {
170      job.forEach(t -> log.trace("task {}, pos={}, taskObject={}, taskObject class={}", t, t.getPosition(), t.getTaskObject(), (t.getTaskObject() != null) ? t.getTaskObject().getClass() : null));
171    }
172  }
173
174  /**
175   * {@inheritDoc}
176   * @exclude
177   */
178  @Override
179  protected JobManager createJobManager() {
180    JobManager jobManager = null;
181    try {
182      jobManager = new JobManagerClient(this, bundlerFactory);
183    } catch (final Exception e) {
184      log.error("Can't initialize job Manager", e);
185    }
186    return jobManager;
187  }
188
189  /**
190   * Reset this client, that is, close it if necessary, reload its configuration, then open it again.
191   * If the client is already closed or reseeting, this method has no effect.
192   * @see #reset(TypedProperties)
193   * @since 4.0
194   */
195  public void reset() {
196    if (isClosed()) return;
197    if (debugEnabled) log.debug("resetting client");
198    if (resetting.compareAndSet(false, true)) {
199      close(true);
200      JPPFConfiguration.reset();
201      init(JPPFConfiguration.getProperties());
202    }
203  }
204
205  /**
206   * Reset this client, that is, close it if necessary, then open it again, using the specified confguration.
207   * If the client is already closed or reseeting, this method has no effect.
208   * @param configuration the configuration to initialize this client with.
209   * @see #reset()
210   * @since 4.0
211   */
212  public void reset(final TypedProperties configuration) {
213    if (isClosed()) return;
214    if (debugEnabled) log.debug("resetting client");
215    if (resetting.compareAndSet(false, true)) {
216      close(true);
217      init(configuration);
218    }
219  }
220
221  /**
222   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE} status.
223   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE)}.
224   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
225   * @since 5.0
226   */
227  public JPPFConnectionPool awaitActiveConnectionPool() {
228    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE);
229  }
230
231  /**
232   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
233   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
234   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
235   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
236   * @since 5.0
237   */
238  public JPPFConnectionPool awaitWorkingConnectionPool() {
239    return awaitConnectionPool(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
240  }
241
242  /**
243   * Wait until there is at least one connection pool with at least one connection in one of the specified statuses.
244   * This is a shorthand for {@code awaitConnectionPool(Long.MAX_VALUE, statuses)}.
245   * @param statuses the possible statuses of the connections in the pools to wait for.
246   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
247   * @since 5.0
248   */
249  public JPPFConnectionPool awaitConnectionPool(final JPPFClientConnectionStatus...statuses) {
250    return awaitConnectionPool(Long.MAX_VALUE, statuses);
251  }
252
253  /**
254   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
255   * or until the specified timeout to expire, whichever happens first.
256   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
257   * @param statuses the possible statuses of the connections in the pools to wait for.
258   * @return a {@link JPPFConnectionPool} instance, or null if no pool has a connection in the one of the desird statuses.
259   * @since 5.0
260   */
261  public JPPFConnectionPool awaitConnectionPool(final long timeout, final JPPFClientConnectionStatus...statuses) {
262    final List<JPPFConnectionPool> list = awaitConnectionPools(timeout, statuses);
263    return list.isEmpty() ? null : list.get(0);
264  }
265
266  /**
267   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
268   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status.
269   * This is a shorthand for {@code awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
270   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
271   * @since 5.1
272   */
273  public List<JPPFConnectionPool> awaitWorkingConnectionPools() {
274    return awaitConnectionPools(Long.MAX_VALUE, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
275  }
276
277  /**
278   * Wait until there is at least one connection pool with at least one connection in the {@link JPPFClientConnectionStatus#ACTIVE ACTIVE}
279   * or {@link JPPFClientConnectionStatus#EXECUTING EXECUTING} status, or the specified tiemoput expires, whichever happens first.
280   * This is a shorthand for {@code awaitConnectionPools(tiemout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING)}.
281   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
282   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
283   * @since 5.1
284   */
285  public List<JPPFConnectionPool> awaitWorkingConnectionPools(final long timeout) {
286    return awaitConnectionPools(timeout, JPPFClientConnectionStatus.ACTIVE, JPPFClientConnectionStatus.EXECUTING);
287  }
288
289  /**
290   * Wait until at least one connection pool with at least one connection in one of the specified statuses,
291   * or until the specified timeout to expire, whichever happens first.
292   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
293   * @param statuses the possible statuses of the connections in the pools to wait for.
294   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
295   * @since 5.0
296   */
297  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final JPPFClientConnectionStatus...statuses) {
298    final MutableReference<List<JPPFConnectionPool>> ref = new MutableReference<>();
299    ConcurrentUtils.awaitCondition(() -> !ref.setSynchronized(findConnectionPools(statuses), pools).isEmpty(), timeout, 10L, false);
300    return ref.get();
301  }
302
303  /**
304   * Wait until there is at least one connection pool where the number of connections with the specified statuses
305   * satisfy the specified condition, or until the specified timeout expires, whichever happens first.
306   * @param operator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
307   * @param expectedConnections the expected number of connections to wait for.
308   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
309   * @param statuses the possible statuses of the connections in the pools to wait for.
310   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
311   * @since 5.0
312   */
313  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator operator, final int expectedConnections, final long timeout, final JPPFClientConnectionStatus...statuses) {
314    return awaitConnectionPools(Operator.AT_LEAST, 1, operator, expectedConnections, timeout, statuses);
315  }
316
317  /**
318   * Wait until at least the specified expected connection pools satisfy the condition where the number of connections with the specified statuses
319   * satisfy the specified connection operator, or until the specified timeout expires, whichever happens first.
320   * <p>As an example, to wait for at least 2 pools having each at least one ACTIVE connection, with a timeout of 5 seconds, one would use:
321   * <pre>
322   * JPPFClient client = new JPPFClient();
323   * client.awaitConnectionPools(Operator.AT_LEAST, 2, Operator.AT_LEAST, 1,
324   *   5000L, JPPFClientConnectionStatus.ACTIVE);
325   * </pre>
326   * @param poolOperator the condition on the number of expected pools to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
327   * @param expectedPools the expected number of pools to wait for.
328   * @param connectionOperator the condition on the number of connections to wait for. If {@code null}, it is assumed to be {@link Operator#EQUAL}.
329   * @param expectedConnections the expected number of connections to wait for.
330   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
331   * @param statuses the possible statuses of the connections in the pools to wait for.
332   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
333   * @since 6.0
334   */
335  public List<JPPFConnectionPool> awaitConnectionPools(final ComparisonOperator poolOperator, final int expectedPools, final ComparisonOperator connectionOperator, final int expectedConnections,
336    final long timeout, final JPPFClientConnectionStatus...statuses) {
337    final List<JPPFConnectionPool> result = new ArrayList<>();
338    ConcurrentUtils.awaitCondition(() -> {
339      if (!result.isEmpty()) result.clear();
340      final List<JPPFConnectionPool> temp = findConnectionPools(statuses);
341      for (final JPPFConnectionPool pool: temp) {
342        final List<JPPFClientConnection> list = pool.getConnections(statuses);
343        if (connectionOperator.evaluate(list.size(), expectedConnections)) result.add(pool);
344      }
345      return poolOperator.evaluate(result.size(), expectedPools);
346    }, timeout, 10L, false);
347    return result;
348  }
349
350  /**
351   * Wait until there is at least one connection pool where at least one connections passes the specified filter,
352   * or until the specified timeout expires, whichever happens first.
353   * @param timeout the maximum time to wait, in milliseconds. A value of zero means an infinite timeout.
354   * @param filter an implementation of the {@link ConnectionPoolFilter} interface. A {@code null} value is interpreted as no filter (all pools are accepted).
355   * @return a list of {@link JPPFConnectionPool} instances, possibly empty but never null.
356   * @since 5.0
357   */
358  public List<JPPFConnectionPool> awaitConnectionPools(final long timeout, final ConnectionPoolFilter<JPPFConnectionPool> filter) {
359    final List<JPPFConnectionPool> result = new ArrayList<>();
360    ConcurrentUtils.awaitCondition(() -> {
361      final List<JPPFConnectionPool> temp = getConnectionPools();
362      for (final JPPFConnectionPool pool: temp) {
363        if ((filter == null) || filter.accepts(pool)) result.add(pool);
364      }
365      return !result.isEmpty();
366    }, timeout, 10L, false);
367    return result;
368  }
369
370  @Override
371  public void close() {
372    log.info("closing {}", this);
373    Debug.unregister(this);
374    super.close();
375  }
376
377  /**
378   * Remove a custom driver discovery mechanism from those already registered.
379   * @param discovery the driver discovery to remove.
380   */
381  public void removeDriverDiscovery(final ClientDriverDiscovery discovery) {
382    discoveryHandler.removeDiscovery(discovery);
383  }
384
385  /**
386   * Get the current load-balancer settings.
387   * @return a {@link LoadBalancingInformation} instance, which encapsulates a load-balancing alfgorithm name, along with its parameters.
388   * @since 5.2.7
389   */
390  public LoadBalancingInformation getLoadBalancerSettings() {
391    final JobManager manager = getJobManager();
392    return (manager == null) ? null : manager.getLoadBalancerSettings();
393  }
394
395  /**
396   * Change the load balancer settings.
397   * @param algorithm the name of load-balancing alogrithm to use.
398   * @param parameters the algorithm's parameters, if any. The parmeter names are assumed no to be prefixed.
399   * @throws Exception if any error occurs or if the algorithm name is {@code null} or not known.
400   * @since 5.2.7
401   */
402  public void setLoadBalancerSettings(final String algorithm, final Properties parameters) throws Exception {
403    final JobManager manager = getJobManager();
404    if (manager != null) manager.setLoadBalancerSettings(algorithm, parameters);
405  }
406
407  /**
408   * Get the factory that creates load-balancer instances.
409   * @return an istance of {@link JPPFBundlerFactory}.
410   * @exclude
411   */
412  public JPPFBundlerFactory getBundlerFactory() {
413    return bundlerFactory;
414  }
415
416  /**
417   * @return the number of idle connections in this client.
418   * @exclude
419   */
420  public int nbIdleCOnnections() {
421    final JobManagerClient manager = (JobManagerClient) getJobManager();
422    return (manager == null) ? -1 : manager.nbAvailableConnections();
423  }
424
425  /**
426   * Get the default server-side job execution policy.
427   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
428   */
429  public ExecutionPolicy getDefaultPolicy() {
430    return defaultPolicy.get();
431  }
432
433  /**
434   * Set the default server-side job execution policy.
435   * @param defaultPolicy the execution policy to set as default, may be {@code null}.
436   */
437  public void setDefaultPolicy(final ExecutionPolicy defaultPolicy) {
438    this.defaultPolicy.set(defaultPolicy);
439  }
440
441  /**
442   * Get the default client-side job execution policy.
443   * @return an {@link ExecutionPolicy}, or {@code null} if none was specified.
444   */
445  public ExecutionPolicy getDefaultClientPolicy() {
446    return defaultClientPolicy.get();
447  }
448
449  /**
450   * Set the default client-side job execution policy.
451   * @param defaultClientPolicy the execution policy to set as default, may be {@code null}.
452   */
453  public void setDefaultClientPolicy(final ExecutionPolicy defaultClientPolicy) {
454    this.defaultClientPolicy.set(defaultClientPolicy);
455  }
456
457  /**
458   * Get the list of currently queued jobs.
459   * @return a list of {@link JPPFJob} instances, possibly empty.
460   */
461  public List<JPPFJob> getQueuedJobs() {
462    return getQueuedJobs(null);
463  }
464 
465  /**
466   * Get a list of currently queued jobs, filtered by a {@link JobSelector}.
467   * @param selector a job filter to apply. May be {@code null}, in which case all queued jobs are returned.
468   * @return a list of {@link JPPFJob} instances that satisfy the provided job selector, possibly empty.
469   */
470  public List<JPPFJob> getQueuedJobs(final JobSelector selector) {
471    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
472    final List<JPPFJob> allJobs = queue.getJPPFJobs();
473    if ((selector == null) || (selector instanceof AllJobsSelector)) return allJobs;
474    final List<JPPFJob> result = new ArrayList<>(allJobs.size() < 10 ? 10 : allJobs.size());
475    for (final JPPFJob job: allJobs) {
476      if (selector.accepts(job)) result.add(job);
477    }
478    return result;
479  }
480
481  /**
482   * Get the current number of queued jobs.
483   * @return the current queued jobs count as an {@code int}.
484   */
485  public int getQueuedJobsCount() {
486    return getQueuedJobsCount(null);
487  }
488 
489  /**
490   * Get the current number of jobs that satisfy a job selector.
491   * @param selector a job filter to apply. May be {@code null}, in which case the count of all queued jobs is returned.
492   * @return the number of queued jobs that satisfy the filter.
493   */
494  public int getQueuedJobsCount(final JobSelector selector) {
495    final JPPFPriorityQueue queue = ((JobManagerClient) getJobManager()).getQueue();
496    if ((selector == null) || (selector instanceof AllJobsSelector)) return queue.getQueueSize();
497    final List<JPPFJob> allJobs = queue.getJPPFJobs();
498    int result = 0;
499    for (final JPPFJob job: allJobs) {
500      if (selector.accepts(job)) result++;
501    }
502    return result;
503  }
504}