001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.utils.stats;
019
020import java.io.*;
021import java.util.*;
022import java.util.concurrent.*;
023
024import org.jppf.utils.concurrent.JPPFThreadFactory;
025
026/**
027 * Instances of this class hold statistics snapshots.
028 * To create and add a new snapshot, <code>createSnapshot(String)</code> should be called first,
029 * before any call to one of the <code>addXXX()</code> methods, in order to respect thread safety.
030 * @author Laurent Cohen
031 */
032public class JPPFStatistics implements Serializable, Iterable<JPPFSnapshot> {
033  /**
034   * Explicit serialVersionUID.
035   */
036  private static final long serialVersionUID = 1L;
037  /**
038   * A filter which accepts all snapshots.
039   */
040  public static final Filter NOOP_FILTER = new Filter() {
041    @Override
042    public boolean accept(final JPPFSnapshot snapshot) {
043      return true;
044    }
045  };
046  /**
047   * Contains all snapshots currently handled.
048   */
049  private final ConcurrentHashMap<String, JPPFSnapshot> snapshots = new ConcurrentHashMap<>();
050  /**
051   * The list of listeners.
052   */
053  private transient List<ListenerInfo> listeners = new CopyOnWriteArrayList<>();
054  /**
055   * 
056   */
057  private final transient ExecutorService executor = Executors.newSingleThreadExecutor(new JPPFThreadFactory("StatsEventDispatcher"));
058
059  /**
060   * Default constructor.
061   * @exclude
062   */
063  public JPPFStatistics() {
064  }
065
066  /**
067   * Copy constructor.
068   * @param map the statistics map to copy.
069   * @exclude
070   */
071  private JPPFStatistics(final Map<String, JPPFSnapshot> map) {
072    for (Map.Entry<String, JPPFSnapshot> entry: map.entrySet()) snapshots.put(entry.getKey(), entry.getValue().copy());
073  }
074
075  /**
076   * Get a snapshot specified by its label.
077   * @param label the label of the snapshot to look up.
078   * @return a {@link JPPFSnapshot} instance, or <code>null</code> if none could be found with the specified label.
079   */
080  public JPPFSnapshot getSnapshot(final String label) {
081    return snapshots.get(label);
082  }
083
084  /**
085   * Create a snapshot with the specified label if it doesn't exist.
086   * If a snapshot with this label already exists, it is returned.
087   * @param label the label of the snapshot to create.
088   * @return a {@link JPPFSnapshot} instance representing the newly created snapshot or the exsting one.
089   * @exclude
090   */
091  public JPPFSnapshot createSnapshot(final String label) {
092    return createSnapshot(false, label);
093  }
094
095  /**
096   * Create a snapshot with the specified label if it doesn't exist.
097   * If a snapshot with this label already exists, it is returned.
098   * @param label the label of the snapshot to create.
099   * @param cumulative determines whether updates are accumulated instead of simply stored as latest value.
100   * @return a {@link JPPFSnapshot} instance representing the newly created snapshot or the exsting one.
101   * @exclude
102   */
103  public JPPFSnapshot createSnapshot(final boolean cumulative, final String label) {
104    final JPPFSnapshot newSnapshot = cumulative ? new CumulativeSnapshot(label) : new NonCumulativeSnapshot(label);
105    final JPPFSnapshot oldSnapshot = snapshots.putIfAbsent(label, newSnapshot);
106    final JPPFSnapshot snapshot = oldSnapshot == null ? newSnapshot : oldSnapshot;
107    if (!listeners.isEmpty()) fireEvent(snapshot, EventType.ADDED);
108    return snapshot;
109  }
110
111  /**
112   * Create a single value snapshot with the specified label if it doesn't exist.
113   * If a snapshot with this label already exists, it is returned.
114   * @param label the label of the snapshot to create.
115   * @return a {@link JPPFSnapshot} instance representing the newly created snapshot or the exsting one.
116   * @exclude
117   */
118  public JPPFSnapshot createSingleValueSnapshot(final String label) {
119    final JPPFSnapshot newSnapshot = new SingleValueSnapshot(label);
120    final JPPFSnapshot oldSnapshot = snapshots.putIfAbsent(label, newSnapshot);
121    final JPPFSnapshot snapshot = oldSnapshot == null ? newSnapshot : oldSnapshot;
122    if (!listeners.isEmpty()) fireEvent(snapshot, EventType.ADDED);
123    return snapshot;
124  }
125
126  /**
127   * Create an array of snapshots with the specified labels, if it doesn't exist.
128   * If one of the snapshots already exists, it is returned.
129   * @param labels the label of the snapshot to create.
130   * @return an array of {@link JPPFSnapshot} instances representing the newly created or exsting snapshots, in the same order as the input labels.
131   * @exclude
132   */
133  public JPPFSnapshot[] createSnapshots(final String...labels) {
134    return createSnapshots(false, labels);
135  }
136
137  /**
138   * Create an array of snapshots with the specified labels, if they don't exist.
139   * If any of the snapshots already exists, it is returned.
140   * @param labels the label of the snapshot to create.
141   * @param cumulative determines whether updates are accumulated instead of simply stored as latest value.
142   * @return an array of {@link JPPFSnapshot} instances representing the newly created or exsting snapshots, in the same order as the input labels.
143   * @exclude
144   */
145  public JPPFSnapshot[] createSnapshots(final boolean cumulative, final String...labels) {
146    final JPPFSnapshot[] snapshots = new JPPFSnapshot[labels.length];
147    for (int i=0; i<labels.length; i++) snapshots[i] = createSnapshot(cumulative, labels[i]);
148    return snapshots;
149  }
150
151  /**
152   * Create an array of single value snapshots with the specified labels, if they don't exist.
153   * If any of the snapshots already exists, it is returned.
154   * @param labels the label of the snapshot to create.
155   * @return an array of {@link JPPFSnapshot} instances representing the newly created or exsting snapshots, in the same order as the input labels.
156   * @exclude
157   */
158  public JPPFSnapshot[] createSingleValueSnapshots(final String...labels) {
159    final JPPFSnapshot[] snapshots = new JPPFSnapshot[labels.length];
160    for (int i=0; i<labels.length; i++) snapshots[i] = createSingleValueSnapshot(labels[i]);
161    return snapshots;
162  }
163
164  /**
165   * Remove the snapshot with the specified label.
166   * If a snapshot with this label already exists, it is returned.
167   * @param label the label of the snapshot to create.
168   * @return a {@link JPPFSnapshot} instance representing the removed snapshot or <code>null</code> if the label is not in the map.
169   * @exclude
170   */
171  public JPPFSnapshot removeSnapshot(final String label) {
172    final JPPFSnapshot snapshot = snapshots.remove(label);
173    if (!listeners.isEmpty()) fireEvent(snapshot, EventType.REMOVED);
174    return snapshot;
175  }
176
177  /**
178   * Add the specified value to the snapshot with the specified label.
179   * @param label the label of the snapshot to add the value to.
180   * @param value the accumulated sum of the values to add.
181   * @return a reference to the updated {@link JPPFSnapshot} object.
182   * @throws IllegalStateException if the snapshot does not exist.
183   * @exclude
184   */
185  public JPPFSnapshot addValue(final String label, final double value) {
186    return addValues(label, value, 1L);
187  }
188
189  /**
190   * Add the specified values to the snapshot with the specified label.
191   * @param label the label of the snapshot to add the values to.
192   * @param accumulatedValues the accumulated sum of the values to add.
193   * @param count the number of values in the accumalated values.
194   * @return a reference to the updated {@link JPPFSnapshot} object.
195   * @throws IllegalStateException if the snapshot does not exist.
196   * @exclude
197   */
198  public JPPFSnapshot addValues(final String label, final double accumulatedValues, final long count) {
199    final JPPFSnapshot snapshot = snapshots.get(label);
200    if (snapshot == null) throw new IllegalStateException("snapshot '" + label + "' was either not created or removed!");
201    snapshot.addValues(accumulatedValues, count);
202    if (!listeners.isEmpty()) fireEvent(snapshot, EventType.UPDATED);
203    return snapshot;
204  }
205
206  /**
207   * Build a copy of this stats object.
208   * @return a new <code>JPPFStats</code> instance, populated with the current values
209   * of the fields in this stats object.
210   * @exclude
211   */
212  public JPPFStatistics copy() {
213    return new JPPFStatistics(snapshots);
214  }
215
216  @Override
217  public String toString() {
218    final StringBuilder sb = new StringBuilder();
219    sb.append("JPPF statistics:");
220    for (final Map.Entry<String, JPPFSnapshot> entry: snapshots.entrySet()) sb.append('\n').append(entry.getValue());
221    return sb.toString();
222  }
223
224  /**
225   * Reset all contained snapshots to their initial values.
226   * @exclude
227   */
228  public void reset() {
229    reset(null);
230  }
231
232  /**
233   * Reset all contained snapshots to their initial values.
234   * @param filter determines which snapshots will be reset.
235   * @exclude
236   */
237  public void reset(final JPPFStatistics.Filter filter) {
238    for (final Map.Entry<String, JPPFSnapshot> entry: snapshots.entrySet()) {
239      final JPPFSnapshot snapshot = entry.getValue();
240      if ((filter == null) || filter.accept(snapshot)) snapshot.reset();
241    }
242  }
243
244  /**
245   * Remove all snapshots from this object.
246   * @exclude
247   */
248  public void clear() {
249    snapshots.clear();
250  }
251
252  /**
253   * Get all the snapshots in this object.
254   * @return a collection of {@link JPPFSnapshot} instances.
255   */
256  public Collection<JPPFSnapshot> getSnapshots() {
257    return new ArrayList<>(snapshots.values());
258  }
259
260  /**
261   * Get the snapshots which satisfy the specified filter.
262   * @param filter determines which snapshots will be part of the returned collection.
263   * @return a collection of {@link JPPFSnapshot} instances, possibly empty but never null.
264   */
265  public Collection<JPPFSnapshot> getSnapshots(final Filter filter) {
266    final List<JPPFSnapshot> list = new ArrayList<>(snapshots.size());
267    for (final Map.Entry<String, JPPFSnapshot> entry: snapshots.entrySet()) {
268      final JPPFSnapshot snapshot = entry.getValue();
269      if ((filter == null) || filter.accept(snapshot)) list.add(snapshot);
270    }
271    return list;
272  }
273
274  /**
275   * Add a listener to the list of listeners.
276   * This is equivalent to calling {@link #addListener(JPPFStatisticsListener, Filter) addListener(listener, null)}.
277   * @param listener the listener to add.
278   */
279  public void addListener(final JPPFStatisticsListener listener) {
280    addListener(listener, null);
281  }
282
283  /**
284   * Add a filtered listener to the list of listeners.
285   * @param listener the listener to add.
286   * @param filter the filter to apply to the listener. If {@code null}, then no filter is applied.
287   */
288  public void addListener(final JPPFStatisticsListener listener, final Filter filter) {
289    if (listener != null) listeners.add(new ListenerInfo(listener, filter));
290  }
291
292  /**
293   * Remove a listener to the list of listeners.
294   * @param listener the listener to remove.
295   */
296  public void removeListener(final JPPFStatisticsListener listener) {
297    if (listener != null) {
298      ListenerInfo toDelete = null;
299      for (final ListenerInfo info: listeners) {
300        if (listener.equals(info.listener) && (info.filter == NOOP_FILTER)) {
301          toDelete = info;
302          break;
303        }
304      }
305      if (toDelete != null) listeners.remove(toDelete);
306    }
307  }
308
309  /**
310   * Remove a listener to the list of listeners.
311   * @param listener the listener to remove.
312   * @param filter the filter associated ith the listener to remove.
313   */
314  public void removeListener(final JPPFStatisticsListener listener, final Filter filter ) {
315    if (listener != null) listeners.remove(new ListenerInfo(listener, filter));
316  }
317
318  /**
319   * Notify all listeners that a snapshot was created.
320   * @param snapshot the snapshot for which an event occurs.
321   * @param type the type of event: created, removd, update.
322   */
323  private void fireEvent(final JPPFSnapshot snapshot, final EventType type) {
324    executor.execute(new Runnable() {
325      @Override
326      public void run() {
327        final JPPFStatisticsEvent event = new JPPFStatisticsEvent(JPPFStatistics.this, snapshot);
328        final List<ListenerInfo> accepted = new ArrayList<>(listeners.size());
329        for (final ListenerInfo info: listeners) {
330          if (info.filter.accept(snapshot)) accepted.add(info);
331        }
332        switch(type) {
333          case ADDED:   for (ListenerInfo info: accepted) info.listener.snapshotAdded(event); break;
334          case REMOVED: for (ListenerInfo info: accepted) info.listener.snapshotRemoved(event); break;
335          case UPDATED: for (ListenerInfo info: accepted) info.listener.snapshotUpdated(event); break;
336        }
337      }
338    });
339  }
340
341  @Override
342  public Iterator<JPPFSnapshot> iterator() {
343    return snapshots.values().iterator();
344  }
345
346  /**
347   * A filter interface for snapshots.
348   */
349  public interface Filter {
350    /**
351     * Determines whether the specified snapshot is accepted by this filter.
352     * @param snapshot the snapshot to check.
353     * @return <code>true</code> if the snapshot is accepted, <code>false</code> otherwise.
354     */
355    boolean accept(JPPFSnapshot snapshot);
356  }
357
358  /**
359   * The possible types of events.
360   */
361  static enum EventType {
362    /**
363     * A snapshot was added.
364     */
365    ADDED,
366    /**
367     * A snapshot was removed.
368     */
369    REMOVED,
370    /**
371     * A snapshot was updated.
372     */
373    UPDATED
374  }
375
376  /**
377   * Association of a listener and filter.
378   */
379  private static class ListenerInfo {
380    /**
381     * The listener to filter.
382     */
383    public final JPPFStatisticsListener listener;
384    /**
385     * The filter to apply to the listener.
386     */
387    public final Filter filter;
388
389    /**
390     * Initialize this object.
391     * @param listener the listener to filter
392     * @param filter the filter to apply to the listener.
393     */
394    public ListenerInfo(final JPPFStatisticsListener listener, final Filter filter) {
395      if (listener == null) throw new IllegalArgumentException("the listener can never be null");
396      this.listener = listener;
397      this.filter = (filter == null) ? NOOP_FILTER : filter;
398    }
399
400    @Override
401    public int hashCode() {
402      int result = 31 + filter.hashCode();
403      result = 31 * result + listener.hashCode();
404      return result;
405    }
406
407    @Override
408    public boolean equals(final Object obj) {
409      if (this == obj) return true;
410      if (obj == null) return false;
411      if (getClass() != obj.getClass()) return false;
412      final ListenerInfo other = (ListenerInfo) obj;
413      return listener.equals(other.listener) && filter.equals(other.filter);
414    }
415  }
416
417  /**
418   * Saves the state of this object to a stream.
419   * @param oos the stream to write to.
420   * @throws IOException if an I/O error occurs.
421   */
422  @SuppressWarnings("static-method")
423  private void writeObject(final ObjectOutputStream oos) throws IOException {
424    oos.defaultWriteObject();
425  }
426
427  /**
428   * Restore the state of this object from a stream.
429   * @param ois the stream to read from.
430   * @throws IOException if an I/O error occurs.
431   * @throws ClassNotFoundException if a class cannot be found or initialized during deserialization.
432   */
433  private void readObject(final ObjectInputStream ois) throws IOException, ClassNotFoundException {
434    listeners = new CopyOnWriteArrayList<>();
435    ois.defaultReadObject();
436  }
437}