001/*
002 * JPPF.
003 * Copyright (C) 2005-2016 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.jppf.process;
019
020import java.io.*;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023
024/**
025 * Wrapper around an external process started with {@link java.lang.ProcessBuilder ProcessBuilder}.
026 * Instances of this class read the output ands error streams generated by the process and provide
027 * a notification mechanism with separate events for the respective streams.
028 * @author Laurent Cohen
029 */
030public final class ProcessWrapper {
031  /**
032   * The process to handle.
033   */
034  private Process process = null;
035  /**
036   * The list of registered listeners.
037   */
038  protected final List<ProcessWrapperEventListener> eventListeners = new CopyOnWriteArrayList<>();
039
040  /**
041   * Initialize this process handler.
042   */
043  public ProcessWrapper() {
044  }
045
046  /**
047   * Initialize this process handler with the specified process.
048   * @param process the process to handle.
049   */
050  public ProcessWrapper(final Process process) {
051    setProcess(process);
052  }
053
054  /**
055   * Get the process to handle.
056   * @return a <code>Process</code> instance.
057   */
058  public Process getProcess() {
059    return process;
060  }
061
062  /**
063   * Set the process to handle.
064   * If the process has already been set through this setter or the corresponding constructor, this method does nothing.
065   * @param process - a <code>Process</code> instance.
066   */
067  public void setProcess(final Process process) {
068    if (this.process == null) {
069      this.process = process;
070      new StreamHandler(process.getInputStream(), true).start();
071      new StreamHandler(process.getErrorStream(), false).start();
072    }
073  }
074
075  /**
076   * Add a listener to the list of listeners.
077   * @param listener the listener to add to the list.
078   */
079  public void addListener(final ProcessWrapperEventListener listener) {
080    eventListeners.add(listener);
081  }
082
083  /**
084   * Remove a listener from the list of listeners.
085   * @param listener the listener to remove from the list.
086   */
087  public void removeListener(final ProcessWrapperEventListener listener) {
088    eventListeners.remove(listener);
089  }
090
091  /**
092   * Notify all listeners that a stream event has occurred.
093   * @param output true if the event is for the output stream, false if it is for the error stream.
094   * @param content the text that written to the stream.
095   */
096  protected synchronized void fireStreamEvent(final boolean output, final String content) {
097    ProcessWrapperEvent event = new ProcessWrapperEvent(content);
098    for (ProcessWrapperEventListener listener: eventListeners) {
099      if (output) listener.outputStreamAltered(event);
100      else listener.errorStreamAltered(event);
101    }
102  }
103
104  /**
105   * Used to empty the standard or error output of a process, so as not to block the process.
106   */
107  private class StreamHandler extends Thread {
108    /**
109     * Flag to determine whether the event is for output or error stream.
110     */
111    private boolean output = true;
112    /**
113     * The stream to get the output from.
114     */
115    private InputStream is = null;
116
117    /**
118     * Initialize this handler with the specified stream and buffer receiving its content.
119     * @param is the stream where output is taken from.
120     * @param output true if this event is for an output stream, false for an error stream.
121     */
122    public StreamHandler(final InputStream is, final boolean output) {
123      this.is = is;
124      this.output = output;
125    }
126
127    /**
128     * Monitor the stream for available data and write that data to the buffer.
129     */
130    @Override
131    public void run() {
132      try {
133        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
134        boolean end = false;
135        int bufferSize = 8*1024;
136        StringBuilder sb = new StringBuilder(bufferSize);
137        int count = 0;
138        while (!end) {
139          int c = reader.read();
140          if (c == -1) break;      // end of file (the process has exited)
141          if (c == '\r') continue; // skip the line feed
142          count++;
143          sb.append((char) c);
144          if ((sb.length() >= bufferSize) || (c == '\n')) {
145            fireStreamEvent(output, sb.toString());
146            sb = new StringBuilder(bufferSize);
147          }
148        }
149        Thread.sleep(1);
150      } catch(IOException ignore) {
151      } catch(Throwable t) {
152        t.printStackTrace();
153      }
154    }
155  }
156}