001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.utils.streams;
020
021import java.io.*;
022import java.nio.ByteBuffer;
023import java.nio.channels.*;
024
025import org.jppf.utils.pooling.DirectBufferPool;
026
027/**
028 * An {@link InputStream} implementation that reads from an underlying {@link ReadableByteChannel} which is assumed to be in <b>blocking mode</b>.
029 * @author Laurent Cohen
030 */
031public class ChannelInputStream extends InputStream {
032  /**
033   * The backing {@link ReadableByteChannel}.
034   */
035  private final ReadableByteChannel channel;
036
037  /**
038   * Initialize this output stream with the specified writeable channel.
039   * @param channel the channel to write to.
040   */
041  public ChannelInputStream(final ReadableByteChannel channel) {
042    this.channel = channel;
043  }
044
045  @Override
046  public int read() throws IOException {
047    final byte[] buf = new byte[1];
048    read(buf, 0, 1);
049    return buf[0] & 0xff;
050  }
051
052  @Override
053  public int read(final byte[] data) throws IOException {
054    return read(data, 0, data.length);
055  }
056
057  @Override
058  public int read(final byte[] buffer, final int offset, final int len) throws IOException {
059    final ByteBuffer data = ByteBuffer.wrap(buffer, offset, len);
060    ByteBuffer tmpBuffer = null;
061    try {
062      tmpBuffer = DirectBufferPool.provideBuffer();
063      final int remaining = data.remaining();
064      int count = 0;
065      while (count < remaining) {
066        if (data.remaining() < tmpBuffer.remaining()) tmpBuffer.limit(data.remaining());
067        final int n = channel.read(tmpBuffer);
068        if (n < 0) throw new EOFException();
069        else if (n > 0) {
070          count += n;
071          tmpBuffer.flip();
072          data.put(tmpBuffer);
073          tmpBuffer.clear();
074        }
075      }
076      return count;
077    } finally {
078      if (tmpBuffer != null) DirectBufferPool.releaseBuffer(tmpBuffer);
079    }
080  }
081}