001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.io.Serializable;
022import java.util.*;
023
024import org.jppf.node.protocol.Task;
025import org.jppf.utils.*;
026import org.jppf.utils.concurrent.ThreadSynchronization;
027import org.slf4j.*;
028
029/**
030 * Instances of this class hold and manage the results of a job.
031 * @author Laurent Cohen
032 */
033public class JobResults extends ThreadSynchronization implements Serializable {
034  /**
035   * Explicit serialVersionUID.
036   */
037  private static final long serialVersionUID = 1L;
038  /**
039   * Logger for this class.
040   */
041  private static Logger log = LoggerFactory.getLogger(JobResults.class);
042  /**
043   * Determines whether debug-level logging is enabled.
044   */
045  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
046  /**
047   * Determines whether trace-level logging is enabled.
048   */
049  private static boolean traceEnabled = log.isTraceEnabled();
050  /**
051   * A map containing the tasks that have been successfully executed,
052   * ordered by ascending position in the submitted list of tasks.
053   */
054  private final SortedMap<Integer, Task<?>> resultMap = new TreeMap<>();
055
056  /**
057   * Get the current number of received results.
058   * @return the number of results as an int.
059   */
060  public synchronized int size() {
061    return resultMap.size();
062  }
063
064  /**
065   * Determine whether this job received a result for the task at the specified position.
066   * @param position the task position to check.
067   * @return <code>true</code> if a result was received, <code>false</code> otherwise.
068   */
069  public synchronized boolean hasResult(final int position) {
070    return resultMap.containsKey(position);
071  }
072
073  /**
074   * Get the result for the task at the specified position.
075   * @param position the position of the task to get.
076   * @return a <code>Task</code> instance, or null if no result was received for a task at this position.
077   */
078  public synchronized Task<?> getResultTask(final int position) {
079    return resultMap.get(position);
080  }
081
082  /**
083   * Add the specified results to this job.
084   * @param tasks the list of tasks for which results were received.
085   */
086  public synchronized void addResults(final List<Task<?>> tasks) {
087    if (debugEnabled) log.debug("adding {} results", tasks.size());
088    for (final Task<?> task : tasks) {
089      final int pos = task.getPosition();
090      if (traceEnabled) log.debug("adding result at positon {}", pos);
091      if (hasResult(pos)) log.warn("position {} (out of {}) already has a result", pos, tasks.size());
092      resultMap.put(pos, task);
093    }
094  }
095
096  /**
097   * Get all the tasks received as results for this job.
098   * @return a collection of {@link Task} instances.
099   */
100  public synchronized Collection<Task<?>> getAllResults() {
101    return Collections.unmodifiableCollection(resultMap.values());
102  }
103
104  /**
105   * Get all the tasks received as results for this job.
106   * @return a collection of {@link Task} instances.
107   */
108  public synchronized List<Task<?>> getResultsList() {
109    return new ArrayList<>(resultMap.values());
110  }
111
112  @Override
113  public String toString() {
114    final StringBuilder sb = new StringBuilder();
115    sb.append(getClass().getSimpleName()).append('[');
116    sb.append("size=").append(size());
117    synchronized (this) {
118      sb.append(", positions=").append(resultMap.keySet());
119    }
120    sb.append(']');
121    return sb.toString();
122  }
123
124  /**
125   * Wait for the execution results of the specified task to be received.
126   * @param position the position of the task in the job it is a part of.
127   * @return the task whose results were received, or null if the timeout expired before it was received.
128   */
129  public synchronized Task<?> waitForTask(final int position) {
130    return waitForTask(position, Long.MAX_VALUE);
131  }
132
133  /**
134   * Wait for the execution results of the specified task to be received.
135   * @param position the position of the task in the job it is a part of.
136   * @param timeout maximum number of milliseconds to wait.
137   * @return the task whose results were received, or null if the timeout expired before it was received.
138   */
139  public synchronized Task<?> waitForTask(final int position, final long timeout) {
140    final long start = System.nanoTime();
141    while (((System.nanoTime() - start) / 1_000_000L < timeout) && !hasResult(position)) goToSleep(1L);
142    return getResultTask(position);
143  }
144
145  /**
146   * Clear all results in case the job is manually resubmitted.
147   */
148  public synchronized void clear() {
149    resultMap.clear();
150  }
151}