001/*
002 * JPPF.
003 * Copyright (C) 2005-2019 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client.concurrent;
020
021import java.util.*;
022import java.util.concurrent.*;
023
024import org.jppf.client.JPPFJob;
025import org.jppf.client.event.*;
026import org.jppf.node.protocol.Task;
027import org.jppf.utils.LoggingUtils;
028import org.slf4j.*;
029
030/**
031 * A {@link CompletionService} which works specifically with {@link JPPFExecutorService}s.
032 * @param <V> the type of results returned by the submitted tasks.
033 * @author Laurent Cohen
034 */
035public class JPPFCompletionService<V> implements CompletionService<V> {
036  /**
037   * Logger for this class.
038   */
039  private static Logger log = LoggerFactory.getLogger(JPPFCompletionService.class);
040  /**
041   * Determines whether debug-level logging is enabled.
042   */
043  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
044  /**
045   * The executor to which tasks are submitted.
046   */
047  private final JPPFExecutorService executor;
048  /**
049   * Maps all the futures for the tasks submitted via this completion service.
050   */
051  private final Map<String, Map<Integer, JPPFTaskFuture<V>>> futureMap = new HashMap<>();
052  /**
053   * Listens to notifications of results received for the tasks submitted via this completion service. 
054   */
055  private final ResultCollectorListener listener = new ResultCollectorListener();
056  /**
057   * Holds the queue of futures that can be obtained from this completion service.
058   */
059  private final BlockingQueue<Future<V>> queue = new LinkedBlockingDeque<>();
060
061  /**
062   * Initialize this completion service with the specified executor.
063   * @param executor the executor to which tasks are submitted.
064   */
065  public JPPFCompletionService(final JPPFExecutorService executor) {
066    this.executor = executor;
067  }
068
069  @Override
070  public Future<V> submit(final Callable<V> task) {
071    return processFuture((JPPFTaskFuture<V>) executor.submit(task));
072  }
073
074  @Override
075  public Future<V> submit(final Runnable task, final V result) {
076    return processFuture((JPPFTaskFuture<V>) executor.submit(task, result));
077  }
078
079  @Override
080  public Future<V> take() throws InterruptedException {
081    return queue.take();
082  }
083
084  @Override
085  public Future<V> poll() {
086    return queue.poll();
087  }
088
089  @Override
090  public Future<V> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
091    return queue.poll(timeout, unit);
092  }
093
094  /**
095   * Process the future of a submitted task.
096   * @param future the future to process.
097   * @return the process future.
098   */
099  private JPPFTaskFuture<V> processFuture(final JPPFTaskFuture<V> future) {
100    final JPPFJob job = future.getJob();
101    final String uuid = job.getUuid();
102    synchronized(futureMap) {
103      Map<Integer, JPPFTaskFuture<V>> map = futureMap.get(uuid);
104      if (map == null) {
105        job.addJobListener(listener);
106        map = new HashMap<>();
107        futureMap.put(uuid, map);
108      }
109      map.put(future.getPosition(), future);
110    }
111    return future;
112  }
113
114  /**
115   * Process the completion of a task future.
116   * @param future the future to process.
117   */
118  private void processFutureCompletion(final JPPFTaskFuture<V> future) {
119    if (future == null) throw new IllegalArgumentException("future should not be null");
120    try {
121      future.getResult(0L);
122    } catch (final TimeoutException e) {
123      log.error(e.getMessage(), e);
124    }
125    queue.offer(future);
126  }
127
128  /**
129   * Listens to notifications from the <code>FutureResultCollector</code> associated to
130   * each job submitted by the {@link JPPFExecutorService}, and updates the queue according
131   * to the tasks that are completed.
132   */
133  private class ResultCollectorListener extends JobListenerAdapter {
134    @Override
135    public void jobReturned(final JobEvent event) {
136      final List<Task<?>> tasks = event.getJobTasks();
137      if (tasks != null) {
138        final JPPFJob job = event.getJob();
139        final String uuid = job.getUuid();
140        Map<Integer, JPPFTaskFuture<V>> map = null;
141        synchronized(futureMap) {
142          map = futureMap.get(uuid);
143        }
144        if (map == null) return;
145        for (Task<?> task: tasks) {
146          JPPFTaskFuture<V> future = null;
147          synchronized(futureMap) {
148            future = map.remove(task.getPosition());
149          }
150          if (future != null) processFutureCompletion(future);
151          if (debugEnabled) log.debug("added future[job uuid=" + uuid + ", position=" + task.getPosition() + "] to the queue");
152        }
153        synchronized(futureMap) {
154          if (map.isEmpty()) futureMap.remove(uuid);
155        }
156      }
157    }
158
159    @Override
160    public void jobEnded(final JobEvent event) {
161      final JPPFJob job = event.getJob();
162      final String uuid = job.getUuid();
163      Map<Integer, JPPFTaskFuture<V>> map = null;
164      synchronized(futureMap) {
165        map = futureMap.remove(uuid);
166      }
167      if (map != null) {
168        for (final Map.Entry<Integer, JPPFTaskFuture<V>> entry: map.entrySet()) {
169          final JPPFTaskFuture<V> future = entry.getValue();
170          processFutureCompletion(future);
171          if (debugEnabled) log.debug("added future[job uuid=" + uuid + ", position=" + future.getPosition() + "] to the queue");
172        }
173        synchronized(futureMap) {
174          futureMap.remove(uuid);
175        }
176      }
177    }
178  }
179}