001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2010–2011 ApexIdentity Inc. 015 * Portions Copyright 2011-2014 ForgeRock AS. 016 */ 017 018package org.forgerock.openig.io; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.forgerock.util.Factory; 026 027/** 028 * Wraps an standard input stream with a stream that can branch to perform divergent reads. 029 * All divergence between branches is maintained in a temporary buffer. 030 * <p> 031 * <strong>Note:</strong> This stream and any branches it creates are not safe for use by 032 * multiple concurrent threads. 033 */ 034public class BranchingStreamWrapper extends BranchingInputStream { 035 036 /** A shared object by all branches of the same input stream. */ 037 private Trunk trunk; 038 039 /** Points to this branch's parent. */ 040 private final BranchingStreamWrapper parent; 041 042 /** This branch's position relative to the trunk buffer. */ 043 private int position; 044 045 /** 046 * Constructs a new branching input stream to wrap another input stream. 047 * <p> 048 * If the stream being wrapped is a branching input stream, this constructor will simply 049 * branch off of that existing stream rather than wrapping it with another branching 050 * input stream. 051 * 052 * @param in the stream to be wrapped. 053 * @param bufferFactory an object that can create new temporary buffers (e.g. @link TemporaryStorage}). 054 */ 055 public BranchingStreamWrapper(InputStream in, Factory<Buffer> bufferFactory) { 056 if (in instanceof BranchingStreamWrapper) { 057 // branch off of existing trunk 058 BranchingStreamWrapper bsw = (BranchingStreamWrapper) in; 059 parent = bsw; 060 trunk = bsw.trunk; 061 position = bsw.position; 062 } else { 063 // wrapping a non-wrapping stream; sprout a new trunk 064 parent = null; 065 trunk = new Trunk(in, bufferFactory); 066 } 067 trunk.branches.add(this); 068 } 069 070 @Override 071 public BranchingStreamWrapper branch() throws IOException { 072 notClosed(); 073 // constructor will branch 074 return new BranchingStreamWrapper(this, null); 075 } 076 077 boolean isClosed() { 078 return trunk == null; 079 } 080 081 @Override 082 public BranchingStreamWrapper parent() { 083 return parent; 084 } 085 086 /** 087 * Reads the next byte of data from the input stream. 088 * 089 * @return the next byte of data, or {@code -1} if the end of the stream is reached. 090 * @throws IOException if an I/O exception occurs. 091 */ 092 @Override 093 public int read() throws IOException { 094 byte[] b = new byte[1]; 095 return (read(b, 0, 1) > 0 ? (b[0] & 0xff) : -1); 096 } 097 098 /** 099 * Reads some number of bytes from the input stream and stores them into the buffer 100 * array {@code b}. 101 * 102 * @param b the buffer into which the data is read. 103 * @return the total number of bytes read into the buffer, or {@code -1} is there is no more data because the 104 * end of the stream has been reached. 105 * @throws IOException if an I/O exception occurs. 106 */ 107 @Override 108 public int read(byte[] b) throws IOException { 109 return read(b, 0, b.length); 110 } 111 112 /** 113 * Reads up to {@code len} bytes of data from the input stream into an array of bytes. 114 * 115 * @param b the buffer into which the data is read. 116 * @param off the start offset in array {@code b} at which the data is written. 117 * @param len the maximum number of bytes to read. 118 * @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the 119 * end of the stream has been reached. 120 * @throws IOException if an I/O exception occurs. 121 */ 122 @Override 123 public int read(byte[] b, int off, int len) throws IOException { 124 if (off < 0 || len < 0 || len > b.length - off) { 125 throw new IndexOutOfBoundsException(); 126 } 127 notClosed(); 128 int n; 129 // try reading from buffer first 130 if ((n = readBuffer(b, off, len)) == 0) { 131 // not buffered; cascade the call 132 if ((n = trunk.in.read(b, off, len)) >= 0) { 133 // write result to buffer if necessary 134 writeBuffer(b, off, n); 135 } 136 } 137 return n; 138 } 139 140 /** 141 * Skips over and discards {@code n} bytes of data from this input stream. 142 * 143 * @param n the number of bytes to be skipped. 144 * @return the actual number of bytes skipped. 145 * @throws IOException if an I/O exception occurs. 146 */ 147 @Override 148 public long skip(long n) throws IOException { 149 if (n < 0) { 150 return 0; 151 } 152 notClosed(); 153 if (trunk.buffer == null && trunk.branches.size() == 1) { 154 // not buffering; safely cascade call 155 return trunk.in.skip(n); 156 } 157 // stream nowhere, just to buffer (or unbuffer) the result skipped 158 return Streamer.stream(this, new NullOutputStream(), (int) Math.min(Integer.MAX_VALUE, n)); 159 } 160 161 /** 162 * Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without 163 * blocking by the next invocation of a method for this input stream. 164 * 165 * @return an estimate of the number of bytes that can be read (or skipped over) from this input stream. 166 * @throws IOException 167 * if an I/O exception occurs. 168 */ 169 @Override 170 public int available() throws IOException { 171 notClosed(); 172 if (trunk.buffer != null) { 173 int length = trunk.buffer.length(); 174 if (position < length) { 175 // this branch is still reading from buffer 176 // report buffer availability 177 return length - position; 178 } 179 } 180 return trunk.in.available(); 181 } 182 183 @Override 184 public void close() throws IOException { 185 // multiple calls to close are harmless 186 if (trunk != null) { 187 try { 188 closeBranches(); 189 trunk.branches.remove(this); 190 // close buffer if applicable 191 reviewBuffer(); 192 if (trunk.branches.size() == 0) { 193 // last one out turn off the lights 194 trunk.in.close(); 195 } 196 } finally { 197 // if all else fails, this branch thinks it's closed 198 trunk = null; 199 } 200 } 201 } 202 203 private void closeBranches() throws IOException { 204 // multiple calls are harmless 205 if (trunk != null) { 206 ArrayList<BranchingStreamWrapper> branches = new ArrayList<BranchingStreamWrapper>(trunk.branches); 207 for (BranchingStreamWrapper branch : branches) { 208 if (branch.parent == this) { 209 // recursively closes its children 210 branch.close(); 211 } 212 } 213 } 214 } 215 216 /** 217 * Closes this branching stream and all of the branches created from it. 218 * 219 * @throws Throwable 220 * may be raised by super.finalize(). 221 */ 222 @Override 223 public void finalize() throws Throwable { 224 try { 225 close(); 226 } catch (IOException ioe) { 227 // inappropriate to throw an exception when object is being collected 228 } 229 super.finalize(); 230 } 231 232 /** 233 * Closes the trunk buffer if there is no divergence between branches and all remaining 234 * branch positions are outside the buffer. 235 * 236 * @throws IOException if an I/O exception occurs. 237 */ 238 private void reviewBuffer() throws IOException { 239 if (trunk.buffer == null) { 240 // no buffer to review 241 return; 242 } 243 int length = trunk.buffer.length(); 244 for (BranchingStreamWrapper branch : trunk.branches) { 245 if (branch.position < length) { 246 // branch is still using buffer; leave it alone 247 return; 248 } 249 } 250 // any remaining branches are non-divergent and outside buffer 251 trunk.buffer.close(); 252 trunk.buffer = null; 253 } 254 255 /** 256 * Throws an {@link IOException} if the stream is closed. 257 */ 258 private void notClosed() throws IOException { 259 if (trunk == null) { 260 throw new IOException("stream is closed"); 261 } 262 } 263 264 private int readBuffer(byte[] b, int off, int len) throws IOException { 265 int n = 0; 266 if (trunk.buffer != null && trunk.buffer.length() > position) { 267 n = trunk.buffer.read(position, b, off, len); 268 } 269 position += n; 270 // see if the buffer can be closed after this operation 271 reviewBuffer(); 272 return n; 273 } 274 275 private void writeBuffer(byte[] b, int off, int len) throws IOException { 276 if (trunk.buffer == null && trunk.branches.size() > 1) { 277 // diverging branches; allocate new buffer 278 trunk.buffer = trunk.bufferFactory.newInstance(); 279 for (BranchingStreamWrapper branch : trunk.branches) { 280 // set each branch position to beginning of new buffer 281 branch.position = 0; 282 } 283 } 284 if (trunk.buffer != null) { 285 trunk.buffer.append(b, off, len); 286 position += len; 287 } 288 } 289 290 /** Object shared by all branches. */ 291 private final class Trunk { 292 /** Keeps track of all branches on this trunk. */ 293 private final List<BranchingStreamWrapper> branches = 294 new ArrayList<BranchingStreamWrapper>(); 295 /** The input stream being wrapped by the branches. */ 296 private final InputStream in; 297 /** An object that creates new temporary buffers. */ 298 private final Factory<Buffer> bufferFactory; 299 /** A buffer to track diverging streams. Is {@code null} if there is no divergence. */ 300 private Buffer buffer; 301 302 private Trunk(InputStream in, Factory<Buffer> factory) { 303 this.in = in; 304 this.bufferFactory = factory; 305 } 306 } 307}