001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2010–2011 ApexIdentity Inc.
015 * Portions Copyright 2011-2014 ForgeRock AS.
016 */
017
018package org.forgerock.openig.io;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.forgerock.util.Factory;
026
027/**
028 * Wraps an standard input stream with a stream that can branch to perform divergent reads.
029 * All divergence between branches is maintained in a temporary buffer.
030 * <p>
031 * <strong>Note:</strong> This stream and any branches it creates are not safe for use by
032 * multiple concurrent threads.
033 */
034public class BranchingStreamWrapper extends BranchingInputStream {
035
036    /** A shared object by all branches of the same input stream. */
037    private Trunk trunk;
038
039    /** Points to this branch's parent. */
040    private final BranchingStreamWrapper parent;
041
042    /** This branch's position relative to the trunk buffer. */
043    private int position;
044
045    /**
046     * Constructs a new branching input stream to wrap another input stream.
047     * <p>
048     * If the stream being wrapped is a branching input stream, this constructor will simply
049     * branch off of that existing stream rather than wrapping it with another branching
050     * input stream.
051     *
052     * @param in the stream to be wrapped.
053     * @param bufferFactory an object that can create new temporary buffers (e.g. @link TemporaryStorage}).
054     */
055    public BranchingStreamWrapper(InputStream in, Factory<Buffer> bufferFactory) {
056        if (in instanceof BranchingStreamWrapper) {
057            // branch off of existing trunk
058            BranchingStreamWrapper bsw = (BranchingStreamWrapper) in;
059            parent = bsw;
060            trunk = bsw.trunk;
061            position = bsw.position;
062        } else {
063            // wrapping a non-wrapping stream; sprout a new trunk
064            parent = null;
065            trunk = new Trunk(in, bufferFactory);
066        }
067        trunk.branches.add(this);
068    }
069
070    @Override
071    public BranchingStreamWrapper branch() throws IOException {
072        notClosed();
073        // constructor will branch
074        return new BranchingStreamWrapper(this, null);
075    }
076
077    boolean isClosed() {
078        return trunk == null;
079    }
080
081    @Override
082    public BranchingStreamWrapper parent() {
083        return parent;
084    }
085
086    /**
087     * Reads the next byte of data from the input stream.
088     *
089     * @return the next byte of data, or {@code -1} if the end of the stream is reached.
090     * @throws IOException if an I/O exception occurs.
091     */
092    @Override
093    public int read() throws IOException {
094        byte[] b = new byte[1];
095        return (read(b, 0, 1) > 0 ? (b[0] & 0xff) : -1);
096    }
097
098    /**
099     * Reads some number of bytes from the input stream and stores them into the buffer
100     * array {@code b}.
101     *
102     * @param b the buffer into which the data is read.
103     * @return the total number of bytes read into the buffer, or {@code -1} is there is no more data because the
104     * end of the stream has been reached.
105     * @throws IOException if an I/O exception occurs.
106     */
107    @Override
108    public int read(byte[] b) throws IOException {
109        return read(b, 0, b.length);
110    }
111
112    /**
113     * Reads up to {@code len} bytes of data from the input stream into an array of bytes.
114     *
115     * @param b the buffer into which the data is read.
116     * @param off the start offset in array {@code b} at which the data is written.
117     * @param len the maximum number of bytes to read.
118     * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
119     * end of the stream has been reached.
120     * @throws IOException if an I/O exception occurs.
121     */
122    @Override
123    public int read(byte[] b, int off, int len) throws IOException {
124        if (off < 0 || len < 0 || len > b.length - off) {
125            throw new IndexOutOfBoundsException();
126        }
127        notClosed();
128        int n;
129        // try reading from buffer first
130        if ((n = readBuffer(b, off, len)) == 0) {
131            // not buffered; cascade the call
132            if ((n = trunk.in.read(b, off, len)) >= 0) {
133                // write result to buffer if necessary
134                writeBuffer(b, off, n);
135            }
136        }
137        return n;
138    }
139
140    /**
141     * Skips over and discards {@code n} bytes of data from this input stream.
142     *
143     * @param n the number of bytes to be skipped.
144     * @return the actual number of bytes skipped.
145     * @throws IOException if an I/O exception occurs.
146     */
147    @Override
148    public long skip(long n) throws IOException {
149        if (n < 0) {
150            return 0;
151        }
152        notClosed();
153        if (trunk.buffer == null && trunk.branches.size() == 1) {
154            // not buffering; safely cascade call
155            return trunk.in.skip(n);
156        }
157        // stream nowhere, just to buffer (or unbuffer) the result skipped
158        return Streamer.stream(this, new NullOutputStream(), (int) Math.min(Integer.MAX_VALUE, n));
159    }
160
161    /**
162     * Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without
163     * blocking by the next invocation of a method for this input stream.
164     *
165     * @return an estimate of the number of bytes that can be read (or skipped over) from this input stream.
166     * @throws IOException
167     *             if an I/O exception occurs.
168     */
169    @Override
170    public int available() throws IOException {
171        notClosed();
172        if (trunk.buffer != null) {
173            int length = trunk.buffer.length();
174            if (position < length) {
175                // this branch is still reading from buffer
176                // report buffer availability
177                return length - position;
178            }
179        }
180        return trunk.in.available();
181    }
182
183    @Override
184    public void close() throws IOException {
185        // multiple calls to close are harmless
186        if (trunk != null) {
187            try {
188                closeBranches();
189                trunk.branches.remove(this);
190                // close buffer if applicable
191                reviewBuffer();
192                if (trunk.branches.size() == 0) {
193                    // last one out turn off the lights
194                    trunk.in.close();
195                }
196            } finally {
197                // if all else fails, this branch thinks it's closed
198                trunk = null;
199            }
200        }
201    }
202
203    private void closeBranches() throws IOException {
204        // multiple calls are harmless
205        if (trunk != null) {
206            ArrayList<BranchingStreamWrapper> branches = new ArrayList<BranchingStreamWrapper>(trunk.branches);
207            for (BranchingStreamWrapper branch : branches) {
208                if (branch.parent == this) {
209                    // recursively closes its children
210                    branch.close();
211                }
212            }
213        }
214    }
215
216    /**
217     * Closes this branching stream and all of the branches created from it.
218     *
219     * @throws Throwable
220     *         may be raised by super.finalize().
221     */
222    @Override
223    public void finalize() throws Throwable {
224        try {
225            close();
226        } catch (IOException ioe) {
227            // inappropriate to throw an exception when object is being collected
228        }
229        super.finalize();
230    }
231
232    /**
233     * Closes the trunk buffer if there is no divergence between branches and all remaining
234     * branch positions are outside the buffer.
235     *
236     * @throws IOException if an I/O exception occurs.
237     */
238    private void reviewBuffer() throws IOException {
239        if (trunk.buffer == null) {
240            // no buffer to review
241            return;
242        }
243        int length = trunk.buffer.length();
244        for (BranchingStreamWrapper branch : trunk.branches) {
245            if (branch.position < length) {
246                // branch is still using buffer; leave it alone
247                return;
248            }
249        }
250        // any remaining branches are non-divergent and outside buffer
251        trunk.buffer.close();
252        trunk.buffer = null;
253    }
254
255    /**
256     * Throws an {@link IOException} if the stream is closed.
257     */
258    private void notClosed() throws IOException {
259        if (trunk == null) {
260            throw new IOException("stream is closed");
261        }
262    }
263
264    private int readBuffer(byte[] b, int off, int len) throws IOException {
265        int n = 0;
266        if (trunk.buffer != null && trunk.buffer.length() > position) {
267            n = trunk.buffer.read(position, b, off, len);
268        }
269        position += n;
270        // see if the buffer can be closed after this operation
271        reviewBuffer();
272        return n;
273    }
274
275    private void writeBuffer(byte[] b, int off, int len) throws IOException {
276        if (trunk.buffer == null && trunk.branches.size() > 1) {
277            // diverging branches; allocate new buffer
278            trunk.buffer = trunk.bufferFactory.newInstance();
279            for (BranchingStreamWrapper branch : trunk.branches) {
280                // set each branch position to beginning of new buffer
281                branch.position = 0;
282            }
283        }
284        if (trunk.buffer != null) {
285            trunk.buffer.append(b, off, len);
286            position += len;
287        }
288    }
289
290    /** Object shared by all branches. */
291    private final class Trunk {
292        /** Keeps track of all branches on this trunk. */
293        private final List<BranchingStreamWrapper> branches =
294                new ArrayList<BranchingStreamWrapper>();
295        /** The input stream being wrapped by the branches. */
296        private final InputStream in;
297        /** An object that creates new temporary buffers. */
298        private final Factory<Buffer> bufferFactory;
299        /** A buffer to track diverging streams. Is {@code null} if there is no divergence. */
300        private Buffer buffer;
301
302        private Trunk(InputStream in, Factory<Buffer> factory) {
303            this.in = in;
304            this.bufferFactory = factory;
305        }
306    }
307}