001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2009 Sun Microsystems, Inc. 015 * Portions Copyright 2015-2016 ForgeRock AS. 016 */ 017package org.opends.server.tools.makeldif; 018 019 020 021import java.io.ByteArrayOutputStream; 022import java.io.InputStream; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeUnit; 027 028import org.opends.server.types.LDIFExportConfig; 029import org.opends.server.util.LDIFException; 030import org.opends.server.util.LDIFWriter; 031 032 033 034/** 035 * This class creates an input stream that can be used to read entries generated 036 * by MakeLDIF as if they were being read from another source like a file. It 037 * has a fixed-size queue that dictates how many entries may be held in memory 038 * at any given time. 039 */ 040public class MakeLDIFInputStream 041 extends InputStream 042 implements EntryWriter 043{ 044 /** Indicates whether all of the entries have been generated. */ 045 private boolean allGenerated; 046 047 /** Indicates whether this input stream has been closed. */ 048 private boolean closed; 049 050 /** 051 * The byte array output stream that will be used to convert entries to byte 052 * arrays with their LDIF representations. 053 */ 054 private ByteArrayOutputStream entryOutputStream; 055 056 /** The byte array that will hold the LDIF representation of the next entry to be read. */ 057 private ByteBuffer entryBytes; 058 059 /** The IOException that should be thrown the next time a read is requested. */ 060 private IOException ioException; 061 062 /** The LDIF writer that will be used to write the entries to LDIF. */ 063 private LDIFWriter ldifWriter; 064 065 /** The queue used to hold generated entries until they can be read. */ 066 private LinkedBlockingQueue<TemplateEntry> entryQueue; 067 068 /** The background thread being used to actually generate the entries. */ 069 private MakeLDIFInputStreamThread generatorThread; 070 071 /** The template file to use to generate the entries. */ 072 private TemplateFile templateFile; 073 074 075 076 /** 077 * Creates a new MakeLDIF input stream that will generate entries based on the 078 * provided template file. 079 * 080 * @param templateFile The template file to use to generate the entries. 081 */ 082 public MakeLDIFInputStream(TemplateFile templateFile) 083 { 084 this.templateFile = templateFile; 085 086 allGenerated = false; 087 closed = false; 088 entryQueue = new LinkedBlockingQueue<>(10); 089 ioException = null; 090 entryBytes = null; 091 092 entryOutputStream = new ByteArrayOutputStream(8192); 093 LDIFExportConfig exportConfig = new LDIFExportConfig(entryOutputStream); 094 095 try 096 { 097 ldifWriter = new LDIFWriter(exportConfig); 098 } 099 catch (IOException ioe) 100 { 101 // This should never happen. 102 ioException = ioe; 103 } 104 105 generatorThread = new MakeLDIFInputStreamThread(this, templateFile); 106 generatorThread.start(); 107 } 108 109 110 111 /** Closes this input stream so that no more data may be read from it. */ 112 @Override 113 public void close() 114 { 115 closed = true; 116 ioException = null; 117 } 118 119 120 121 /** 122 * Reads a single byte of data from this input stream. 123 * 124 * @return The byte read from the input stream, or -1 if the end of the 125 * stream has been reached. 126 * 127 * @throws IOException If a problem has occurred while generating data for 128 * use by this input stream. 129 */ 130 @Override 131 public int read() 132 throws IOException 133 { 134 if (closed) 135 { 136 return -1; 137 } 138 else if (ioException != null) 139 { 140 throw ioException; 141 } 142 143 if ((entryBytes == null || !entryBytes.hasRemaining()) 144 && !getNextEntry()) 145 { 146 closed = true; 147 return -1; 148 } 149 150 return 0xFF & entryBytes.get(); 151 } 152 153 154 155 /** 156 * Reads data from this input stream. 157 * 158 * @param b The array into which the data should be read. 159 * @param off The position in the array at which point the data read may be 160 * placed. 161 * @param len The maximum number of bytes that may be read into the 162 * provided array. 163 * 164 * @return The number of bytes read from the input stream into the provided 165 * array, or -1 if the end of the stream has been reached. 166 * 167 * @throws IOException If a problem has occurred while generating data for 168 * use by this input stream. 169 */ 170 @Override 171 public int read(byte[] b, int off, int len) 172 throws IOException 173 { 174 if (closed) 175 { 176 return -1; 177 } 178 else if (ioException != null) 179 { 180 throw ioException; 181 } 182 183 if ((entryBytes == null || !entryBytes.hasRemaining()) 184 && !getNextEntry()) 185 { 186 closed = true; 187 return -1; 188 } 189 190 int bytesRead = Math.min(len, entryBytes.remaining()); 191 entryBytes.get(b, off, bytesRead); 192 return bytesRead; 193 } 194 195 196 197 @Override 198 public boolean writeEntry(TemplateEntry entry) 199 throws IOException, MakeLDIFException 200 { 201 while (! closed) 202 { 203 try 204 { 205 if (entryQueue.offer(entry, 500, TimeUnit.MILLISECONDS)) 206 { 207 return true; 208 } 209 } catch (InterruptedException ie) {} 210 } 211 212 return false; 213 } 214 215 216 217 @Override 218 public void closeEntryWriter() 219 { 220 allGenerated = true; 221 } 222 223 224 225 /** 226 * Sets the I/O exception that should be thrown on any subsequent calls to 227 * <CODE>available</CODE> or <CODE>read</CODE>. 228 * 229 * @param ioException The I/O exception that should be thrown. 230 */ 231 void setIOException(IOException ioException) 232 { 233 this.ioException = ioException; 234 } 235 236 237 238 /** 239 * Retrieves the next entry and puts it in the entry byte buffer. 240 * 241 * @return <CODE>true</CODE> if the next entry is available, or 242 * <CODE>false</CODE> if there are no more entries or if the input 243 * stream has been closed. 244 */ 245 private boolean getNextEntry() 246 { 247 TemplateEntry entry = entryQueue.poll(); 248 while (entry == null) 249 { 250 if (closed) 251 { 252 return false; 253 } 254 else if (allGenerated) 255 { 256 entry = entryQueue.poll(); 257 if (entry == null) 258 { 259 return false; 260 } 261 } 262 else 263 { 264 try 265 { 266 entry = entryQueue.poll(500, TimeUnit.MILLISECONDS); 267 } catch (InterruptedException ie) {} 268 } 269 } 270 271 try 272 { 273 entryOutputStream.reset(); 274 ldifWriter.writeTemplateEntry(entry); 275 ldifWriter.flush(); 276 entryBytes = ByteBuffer.wrap(entryOutputStream.toByteArray()); 277 } 278 catch (LDIFException le) 279 { 280 // This should never happen. 281 ioException = new IOException(le.getMessage()); 282 return false; 283 } 284 catch (IOException ioe) 285 { 286 // Neither should this. 287 ioException = ioe; 288 return false; 289 } 290 291 return true; 292 } 293} 294