001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.client;
019
020import java.util.*;
021import java.util.concurrent.*;
022import java.util.concurrent.locks.*;
023
024import javax.sql.DataSource;
025
026import org.jppf.client.balancer.*;
027import org.jppf.client.balancer.queue.JPPFPriorityQueue;
028import org.jppf.client.event.*;
029import org.jppf.discovery.*;
030import org.jppf.load.balancer.persistence.*;
031import org.jppf.load.balancer.spi.JPPFBundlerFactory;
032import org.jppf.node.policy.*;
033import org.jppf.persistence.JPPFDatasourceFactory;
034import org.jppf.queue.*;
035import org.jppf.startup.JPPFClientStartupSPI;
036import org.jppf.utils.*;
037import org.jppf.utils.concurrent.*;
038import org.jppf.utils.configuration.*;
039import org.jppf.utils.hooks.HookFactory;
040import org.slf4j.*;
041
042/**
043 * This class provides an API to submit execution requests and administration commands,
044 * and request server information data.<br>
045 * It has its own unique identifier, used by the nodes, to determine whether classes from
046 * the submitting application should be dynamically reloaded or not, depending on whether
047 * the uuid has changed or not.
048 * @author Laurent Cohen
049 */
050public abstract class AbstractGenericClient extends AbstractJPPFClient implements QueueListener<ClientJob, ClientJob, ClientTaskBundle> {
051  /**
052   * Logger for this class.
053   */
054  private static Logger log = LoggerFactory.getLogger(AbstractGenericClient.class);
055  /**
056   * Determines whether debug-level logging is enabled.
057   */
058  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
059  /**
060   * Constant for JPPF automatic connection discovery.
061   */
062  static final String VALUE_JPPF_DISCOVERY = "jppf_discovery";
063  /**
064   * The pool of threads used for submitting execution requests.
065   */
066  private ThreadPoolExecutor executor = null;
067  /**
068   * Performs server discovery.
069   */
070  private JPPFMulticastReceiverThread receiverThread = null;
071  /**
072   * The job manager.
073   */
074  private JobManager jobManager;
075  /**
076   * Handles the class loaders used for inbound class loading requests from the servers.
077   */
078  private final ClassLoaderRegistrationHandler classLoaderRegistrationHandler;
079  /**
080   * The list of listeners on the queue associated with this client.
081   */
082  private final List<ClientQueueListener> queueListeners = new CopyOnWriteArrayList<>();
083  /**
084   * Supports built-in and custom discovery mechanisms.
085   */
086  @SuppressWarnings({ "unchecked", "rawtypes" })
087  final DriverDiscoveryHandler<ClientConnectionPoolInfo> discoveryHandler = new DriverDiscoveryHandler(ClientDriverDiscovery.class);
088  /**
089   * Listens to new connection pool notifications from {@link DriverDiscovery} instances.
090   */
091  private ClientDriverDiscoveryListener discoveryListener;
092  /**
093   * The factory that creates load-balancer instances.
094   */
095  JPPFBundlerFactory bundlerFactory;
096  /**
097   * Manages the persisted states of the load-balancers.
098   */
099  LoadBalancerPersistenceManager loadBalancerPersistenceManager;
100  /**
101   * Synchronizes access to the default client and server side job sla execution policy. 
102   */
103  final Lock defaultPolicyLock = new ReentrantLock();
104  /**
105   * The default server-side job sla execution policy.
106   */
107  ExecutionPolicy defaultPolicy;
108  /**
109   * The default client-side job sla execution policy.
110   */
111  ExecutionPolicy defaultClientPolicy;
112
113  /**
114   * Initialize this client with a specified application UUID.
115   * @param uuid the unique identifier for this local client.
116   * @param configuration the object holding the JPPF configuration.
117   * @param listeners the listeners to add to this JPPF client to receive notifications of new connections.
118   */
119  public AbstractGenericClient(final String uuid, final TypedProperties configuration, final ConnectionPoolListener... listeners) {
120    super(uuid);
121    this.classLoaderRegistrationHandler = new ClassLoaderRegistrationHandler();
122    if ((listeners != null) && (listeners.length > 0)) {
123      for (ConnectionPoolListener listener: listeners) {
124        if (listener != null) addConnectionPoolListener(listener);
125      }
126    }
127    discoveryListener = new ClientDriverDiscoveryListener(this);
128    init(configuration);
129  }
130
131  /**
132   * Initialize this client with the specified configuration.
133   * @param configuration the configuration to use with this client.
134   */
135  protected void init(final TypedProperties configuration) {
136    if (debugEnabled) log.debug("initializing client");
137    closed.set(false);
138    resetting.set(false);
139    this.config = initConfig(configuration);
140    try {
141      final Map<String, DataSource> result = JPPFDatasourceFactory.getInstance().createDataSources(config);
142      log.info("created client-side datasources: {}", result.keySet());
143    } catch (final Exception e) {
144      log.error(e.getMessage(), e);
145    }
146    this.bundlerFactory = new JPPFBundlerFactory(JPPFBundlerFactory.Defaults.CLIENT, config);
147    this.loadBalancerPersistenceManager = new LoadBalancerPersistenceManager(this.bundlerFactory);
148    try {
149      HookFactory.registerSPIMultipleHook(JPPFClientStartupSPI.class, null, null).invoke("run");
150    } catch (final Exception e) {
151      log.error(e.getMessage(), e);
152    }
153    final int coreThreads = Runtime.getRuntime().availableProcessors();
154    final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
155    executor = new ThreadPoolExecutor(coreThreads, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, queue, new JPPFThreadFactory("JPPF Client"));
156    executor.allowCoreThreadTimeOut(true);
157    if (jobManager == null) jobManager = createJobManager();
158    defaultPolicyLock.lock();
159    try {
160      defaultPolicy = retrieveDefaultPolicy(JPPFProperties.JOB_SLA_DEFAULT_POLICY);
161      defaultClientPolicy = retrieveDefaultPolicy(JPPFProperties.JOB_CLIENT_SLA_DEFAULT_POLICY);
162    } finally {
163      defaultPolicyLock.unlock();
164    }
165    final Runnable r = new Runnable() {
166      @Override
167      public void run() {
168        initPools(config);
169      }
170    };
171    ThreadUtils.startThread(r, "InitPools");
172  }
173
174  /**
175   * Retrieve and parse an execution àpolicy from the specified configuration property.
176   * @param prop the configuration propoerty from which to retrieve the execution policy.
177   * @return the retrieved policy, or {@code null} if no policy was specified or if the parsing failed.
178   */
179  private ExecutionPolicy retrieveDefaultPolicy(final JPPFProperty<String> prop) {
180    final String policyXML = PolicyUtils.resolvePolicy(config, prop.getName());
181    if (policyXML != null) {
182      try {
183        return PolicyParser.parsePolicy(policyXML.trim());
184      } catch (final Exception e) {
185        log.warn("failed to parse execution policy for {}, with content = {}\n{}", prop.getName(), policyXML, ExceptionUtils.getStackTrace(e));
186      }
187    }
188    return null;
189  }
190
191  /**
192   * Initialize this client's configuration.
193   * @param configuration an object holding the JPPF configuration.
194   * @return <code>TypedProperties</code> instance holding JPPF configuration. Never be <code>null</code>.
195   * @exclude
196   */
197  protected TypedProperties initConfig(final Object configuration) {
198    if (configuration instanceof TypedProperties) return (TypedProperties) configuration;
199    return JPPFConfiguration.getProperties();
200  }
201
202  /**
203   * @exclude
204   */
205  @Override
206  protected void initPools(final TypedProperties config) {
207    if (debugEnabled) log.debug("initializing connections");
208    if (config.get(JPPFProperties.LOCAL_EXECUTION_ENABLED)) setLocalExecutionEnabled(true);
209    discoveryHandler.register(discoveryListener.open()).start();
210    if (config.get(JPPFProperties.REMOTE_EXECUTION_ENABLED)) addDriverDiscovery(new ClientConfigDriverDiscovery(config));
211  }
212
213  /**
214   * Called when a new connection pool is read from the configuration.
215   * @param info the information required for the connection to connect to the driver.
216   */
217  void newConnectionPool(final ClientConnectionPoolInfo info) {
218    if (debugEnabled) log.debug("new connection pool: {}", info.getName());
219    final int size = info.getPoolSize() > 0 ? info.getPoolSize() : 1;
220    final Runnable r = new Runnable() {
221      @Override public void run() {
222        final JPPFConnectionPool pool = new JPPFConnectionPool((JPPFClient) AbstractGenericClient.this, poolSequence.incrementAndGet(), info);
223        pool.setDriverPort(info.getPort());
224        synchronized(pools) {
225          pools.putValue(info.getPriority(), pool);
226        }
227        HostIP hostIP = new HostIP(info.getHost(), info.getHost());
228        if (getConfig().get(JPPFProperties.RESOLVE_ADDRESSES)) hostIP = NetworkUtils.getHostIP(info.getHost());
229        if (debugEnabled) log.debug("'{}' was resolved into '{}'", info.getHost(), hostIP.hostName());
230        pool.setDriverHostIP(hostIP);
231        fireConnectionPoolAdded(pool);
232        for (int i=1; i<=size; i++) {
233          if (isClosed()) return;
234          submitNewConnection(pool);
235        }
236        pool.initHeartbeat();
237      }
238    };
239    executor.execute(r);
240  }
241
242  /**
243   * Called to submit the initialization of a new connection.
244   * @param pool thez connection pool to which the connection belongs.
245   * @exclude
246   */
247  protected void submitNewConnection(final JPPFConnectionPool pool) {
248    final AbstractJPPFClientConnection c = createConnection(pool.getName() + "-" + pool.nextSequence(), pool);
249    newConnection(c);
250  }
251
252  /**
253   * Create a new driver connection based on the specified parameters.
254   * @param name the name of the connection.
255   * @param pool id of the connection pool the connection belongs to.
256   * @return an instance of a subclass of {@link AbstractJPPFClientConnection}.
257   */
258  abstract AbstractJPPFClientConnection createConnection(String name, final JPPFConnectionPool pool);
259
260  @Override
261  void newConnection(final AbstractJPPFClientConnection c) {
262    if (isClosed()) return;
263    log.info("connection [" + c.getName() + "] created");
264    c.addClientConnectionStatusListener(this);
265    c.submitInitialization();
266    fireConnectionAdded(c);
267    if (debugEnabled) log.debug("end of of newConnection({})", c.getName());
268  }
269
270  /**
271   * Invoked when the status of a connection has changed to <code>JPPFClientConnectionStatus.FAILED</code>.
272   * @param connection the connection that failed.
273   * @exclude
274   */
275  @Override
276  protected void connectionFailed(final JPPFClientConnection connection) {
277    if (debugEnabled) log.debug("Connection [{}] {}", connection.getName(), connection.getStatus());
278    final JPPFConnectionPool pool = connection.getConnectionPool();
279    connection.close();
280    final boolean poolRemoved = removeClientConnection(connection);
281    fireConnectionRemoved(connection);
282    if (poolRemoved) {
283      fireConnectionPoolRemoved(pool);
284      if (receiverThread != null) receiverThread.removeConnectionInformation(connection.getDriverUuid());
285      final ClientConnectionPoolInfo info = pool.getDiscoveryInfo();
286      if (info != null) {
287        final boolean b = discoveryListener.onPoolRemoved(info);
288        if (debugEnabled) log.debug("removal of {} = {}", info, b);
289      }
290    }
291  }
292
293  @Override
294  public void close() {
295    close(false);
296  }
297
298  /**
299   * Close this client.
300   * @param reset if <code>true</code>, then this client is left in a state where it can be reopened.
301   * @exclude
302   */
303  protected void close(final boolean reset) {
304    try {
305      log.info("closing JPPF client with uuid={}, PID={}", getUuid(), SystemUtils.getPID());
306      if (!closed.compareAndSet(false, true)) return;
307      if (debugEnabled) log.debug("closing discovery handler");
308      discoveryListener.close();
309      discoveryHandler.stop();
310      if (debugEnabled) log.debug("closing broadcast receiver");
311      if (receiverThread != null) {
312        receiverThread.close();
313        receiverThread = null;
314      }
315      if (debugEnabled) log.debug("unregistering startup classes");
316      HookFactory.unregister(JPPFClientStartupSPI.class);
317      if (jobManager != null) {
318        if (reset) {
319          if (debugEnabled) log.debug("resetting job manager");
320          jobManager.reset();
321        } else {
322          if (debugEnabled) log.debug("closing job manager");
323          jobManager.close();
324          jobManager = null;
325        }
326      }
327      if (debugEnabled) log.debug("closing executor");
328      if (executor != null) {
329        executor.shutdownNow();
330        executor = null;
331      }
332      if (debugEnabled) log.debug("clearing registered class loaders");
333      classLoaderRegistrationHandler.close();
334      super.close();
335    } catch(final Throwable t) {
336      log.error(t.getMessage(), t);
337    }
338  }
339
340  /**
341   * Determine whether local execution is enabled on this client.
342   * @return <code>true</code> if local execution is enabled, <code>false</code> otherwise.
343   */
344  public boolean isLocalExecutionEnabled() {
345    final JobManager jobManager = getJobManager();
346    return (jobManager != null) && jobManager.isLocalExecutionEnabled();
347  }
348
349  /**
350   * Specify whether local execution is enabled on this client.
351   * @param localExecutionEnabled <code>true</code> to enable local execution, <code>false</code> otherwise
352   */
353  public void setLocalExecutionEnabled(final boolean localExecutionEnabled) {
354    final JobManager jobManager = getJobManager();
355    if (jobManager != null) jobManager.setLocalExecutionEnabled(localExecutionEnabled);
356  }
357
358  /**
359   * Determine whether there is a client connection available for execution.
360   * @return true if at least one connection is available, false otherwise.
361   */
362  public boolean hasAvailableConnection() {
363    final JobManager jobManager = getJobManager();
364    return (jobManager != null) && jobManager.hasAvailableConnection();
365  }
366
367  /**
368   * @exclude
369   */
370  @Override
371  public void statusChanged(final ClientConnectionStatusEvent event) {
372    super.statusChanged(event);
373    final JobManager jobManager = getJobManager();
374    if(jobManager != null) {
375      final ClientConnectionStatusListener listener = jobManager.getClientConnectionStatusListener();
376      if (listener != null) listener.statusChanged(event);
377    }
378  }
379
380  /**
381   * Get the pool of threads used for submitting execution requests.
382   * @return a {@link ThreadPoolExecutor} instance.
383   * @exclude
384   */
385  public ThreadPoolExecutor getExecutor() {
386    return executor;
387  }
388
389  /**
390   * Get the job manager for this JPPF client.
391   * @return a <code>JobManager</code> instance.
392   * @exclude
393   */
394  public JobManager getJobManager() {
395    return jobManager;
396  }
397
398  /**
399   * Create the job manager for this JPPF client.
400   * @return a <code>JobManager</code> instance.
401   */
402  abstract JobManager createJobManager();
403
404  /**
405   * Cancel the job with the specified id.
406   * @param jobId the id of the job to cancel.
407   * @throws Exception if any error occurs.
408   * @see org.jppf.server.job.management.DriverJobManagementMBean#cancelJob(java.lang.String)
409   * @return a <code>true</code> when cancel was successful <code>false</code> otherwise.
410   */
411  public boolean cancelJob(final String jobId) throws Exception {
412    if (jobId == null || jobId.isEmpty()) throw new IllegalArgumentException("jobUUID is blank");
413    if (debugEnabled) log.debug("request to cancel job with uuid=" + jobId);
414    return getJobManager().cancelJob(jobId);
415  }
416
417  /**
418   * Get a class loader associated with a job.
419   * @param uuid unique id assigned to classLoader. Added as temporary fix for problems hanging jobs.
420   * @return a {@code Collection} of {@code RegisteredClassLoader} instances.
421   * @exclude
422   */
423  public Collection<ClassLoader> getRegisteredClassLoaders(final String uuid) {
424    return classLoaderRegistrationHandler.getRegisteredClassLoaders(uuid);
425  }
426
427  /**
428   * Register a class loader for the specified job uuid.
429   * @param cl the <code>ClassLoader</code> instance to register.
430   * @param uuid the uuid of the job for which the class loader is registered.
431   * @return a <code>RegisteredClassLoader</code> instance.
432   */
433  public ClassLoader registerClassLoader(final ClassLoader cl, final String uuid) {
434    return classLoaderRegistrationHandler.registerClassLoader(cl, uuid);
435  }
436
437  /**
438   * Unregisters the class loader associated with the specified job uuid.
439   * @param uuid the uuid of the job the class loaders are associated with.
440   * @exclude
441   */
442  public void unregisterClassLoaders(final String uuid) {
443    classLoaderRegistrationHandler.unregister(uuid);
444  }
445
446  /**
447   * Register the specified listener to receive client queue event notifications.
448   * @param listener the listener to register.
449   * @since 4.1
450   */
451  public void addClientQueueListener(final ClientQueueListener listener) {
452    queueListeners.add(listener);
453  }
454
455  /**
456   * Unregister the specified listener.
457   * @param listener the listener to unregister.
458   * @since 4.1
459   */
460  public void removeClientQueueListener(final ClientQueueListener listener) {
461    queueListeners.remove(listener);
462  }
463
464  /**
465   * Notify all client queue listeners that a queue event has occurred.
466   * @param qEvent the actual event which occurred in the queue.
467   * @param jobAdded {@code true} for a job added event, {@code false} for a job removed event.
468   * @exclude
469   */
470  protected void fireQueueEvent(final QueueEvent<ClientJob, ClientJob, ClientTaskBundle> qEvent, final boolean jobAdded) {
471    final ClientQueueEvent event = new ClientQueueEvent((JPPFClient) this, qEvent.getJob().getJob(), (JPPFPriorityQueue) qEvent.getQueue());
472    if (jobAdded) {
473      for (final ClientQueueListener listener: queueListeners) listener.jobAdded(event);
474    } else {
475      for (final ClientQueueListener listener: queueListeners) listener.jobRemoved(event);
476    }
477  }
478
479  /**
480   * @exclude
481   */
482  @Override
483  public void bundleAdded(final QueueEvent<ClientJob, ClientJob, ClientTaskBundle> event) {
484    fireQueueEvent(event, true);
485  }
486
487  /**
488   * @exclude
489   */
490  @Override
491  public void bundleRemoved(final QueueEvent<ClientJob, ClientJob, ClientTaskBundle> event) {
492    fireQueueEvent(event, false);
493  }
494
495  /**
496   * Get the object that manages the persisted states of the load-balancers.
497   * @return an instance of {@link LoadBalancerPersistenceManagement}.
498   * @since 6.0
499   */
500  public LoadBalancerPersistenceManagement getLoadBalancerPersistenceManagement() {
501    return loadBalancerPersistenceManager;
502  }
503
504  /**
505   * Add a custom driver discovery mechanism to those already registered, if any.
506   * @param discovery the driver discovery to add.
507   */
508  public void addDriverDiscovery(final ClientDriverDiscovery discovery) {
509    discoveryHandler.addDiscovery(discovery);
510  }
511}