001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.forgerock.opendj.ldif; 018 019import java.util.NoSuchElementException; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.LinkedBlockingQueue; 022import java.util.concurrent.TimeUnit; 023 024import org.forgerock.opendj.ldap.Connection; 025import org.forgerock.opendj.ldap.LdapException; 026import org.forgerock.opendj.ldap.LdapPromise; 027import org.forgerock.opendj.ldap.ResultCode; 028import org.forgerock.opendj.ldap.LdapResultHandler; 029import org.forgerock.opendj.ldap.SearchResultHandler; 030import org.forgerock.opendj.ldap.SearchResultReferenceIOException; 031import org.forgerock.opendj.ldap.requests.SearchRequest; 032import org.forgerock.opendj.ldap.responses.Response; 033import org.forgerock.opendj.ldap.responses.Responses; 034import org.forgerock.opendj.ldap.responses.Result; 035import org.forgerock.opendj.ldap.responses.SearchResultEntry; 036import org.forgerock.opendj.ldap.responses.SearchResultReference; 037import org.forgerock.util.Reject; 038 039import static org.forgerock.opendj.ldap.LdapException.*; 040 041/** 042 * A {@code ConnectionEntryReader} is a bridge from {@code Connection}s to 043 * {@code EntryReader}s. A connection entry reader allows applications to 044 * iterate over search results as they are returned from the server during a 045 * search operation. 046 * <p> 047 * The Search operation is performed synchronously, blocking until a search 048 * result entry is received. If a search result indicates that the search 049 * operation has failed for some reason then the error result is propagated to 050 * the caller using an {@code LdapException}. If a search result 051 * reference is returned then it is propagated to the caller using a 052 * {@code SearchResultReferenceIOException}. 053 * <p> 054 * The following code illustrates how a {@code ConnectionEntryReader} may be 055 * used: 056 * 057 * <pre> 058 * Connection connection = ...; 059 * ConnectionEntryReader reader = connection.search("dc=example,dc=com", 060 * SearchScope.WHOLE_SUBTREE, "(objectClass=person)"); 061 * try 062 * { 063 * while (reader.hasNext()) 064 * { 065 * if (reader.isEntry()) 066 * { 067 * SearchResultEntry entry = reader.readEntry(); 068 * 069 * // Handle entry... 070 * } 071 * else 072 * { 073 * SearchResultReference ref = reader.readReference(); 074 * 075 * // Handle continuation reference... 076 * } 077 * } 078 * 079 * Result result = reader.readResult(); 080 * // Handle controls included with the search result... 081 * } 082 * catch (IOException e) 083 * { 084 * // Handle exceptions... 085 * } 086 * finally 087 * { 088 * reader.close(); 089 * } 090 * </pre> 091 * 092 * <b>NOTE:</b> although this class is non-final, sub-classing is not supported 093 * except when creating mock objects for unit tests. This class has been 094 * selected specifically because it is the only aspect of the {@code Connection} 095 * interface which is not mockable. 096 */ 097public class ConnectionEntryReader implements EntryReader { 098 /* See OPENDJ-1124 for more discussion about why this class is non-final. */ 099 100 /** Result handler that places all responses in a queue. */ 101 private static final class BufferHandler implements SearchResultHandler, LdapResultHandler<Result> { 102 private final BlockingQueue<Response> responses; 103 private volatile boolean isInterrupted; 104 105 private BufferHandler(final BlockingQueue<Response> responses) { 106 this.responses = responses; 107 } 108 109 @Override 110 public boolean handleEntry(final SearchResultEntry entry) { 111 try { 112 responses.put(entry); 113 return true; 114 } catch (final InterruptedException e) { 115 // Prevent the reader from waiting for a result that will never 116 // arrive. 117 isInterrupted = true; 118 Thread.currentThread().interrupt(); 119 return false; 120 } 121 } 122 123 @Override 124 public void handleException(final LdapException error) { 125 try { 126 responses.put(error.getResult()); 127 } catch (final InterruptedException e) { 128 // Prevent the reader from waiting for a result that will never 129 // arrive. 130 isInterrupted = true; 131 Thread.currentThread().interrupt(); 132 } 133 } 134 135 @Override 136 public boolean handleReference(final SearchResultReference reference) { 137 try { 138 responses.put(reference); 139 return true; 140 } catch (final InterruptedException e) { 141 // Prevent the reader from waiting for a result that will never 142 // arrive. 143 isInterrupted = true; 144 Thread.currentThread().interrupt(); 145 return false; 146 } 147 } 148 149 @Override 150 public void handleResult(final Result result) { 151 try { 152 responses.put(result); 153 } catch (final InterruptedException e) { 154 // Prevent the reader from waiting for a result that will never 155 // arrive. 156 isInterrupted = true; 157 Thread.currentThread().interrupt(); 158 } 159 } 160 } 161 162 private final BufferHandler buffer; 163 private final LdapPromise<Result> promise; 164 private Response nextResponse; 165 166 /** 167 * Creates a new connection entry reader whose destination is the provided 168 * connection using an unbounded {@code LinkedBlockingQueue}. 169 * 170 * @param connection 171 * The connection to use. 172 * @param searchRequest 173 * The search request to retrieve entries with. 174 * @throws NullPointerException 175 * If {@code connection} was {@code null}. 176 */ 177 public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest) { 178 this(connection, searchRequest, new LinkedBlockingQueue<Response>()); 179 } 180 181 /** 182 * Creates a new connection entry reader whose destination is the provided 183 * connection. 184 * 185 * @param connection 186 * The connection to use. 187 * @param searchRequest 188 * The search request to retrieve entries with. 189 * @param entries 190 * The {@code BlockingQueue} implementation to use when queuing 191 * the returned entries. 192 * @throws NullPointerException 193 * If {@code connection} was {@code null}. 194 */ 195 public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest, 196 final BlockingQueue<Response> entries) { 197 Reject.ifNull(connection); 198 buffer = new BufferHandler(entries); 199 promise = connection.searchAsync(searchRequest, buffer).thenOnResult(buffer).thenOnException(buffer); 200 } 201 202 /** Closes this connection entry reader, canceling the search request if it is still active. */ 203 @Override 204 public void close() { 205 // Cancel the search if it is still running. 206 promise.cancel(true); 207 } 208 209 @Override 210 public boolean hasNext() throws LdapException { 211 // Poll for the next response if needed. 212 final Response r = getNextResponse(); 213 if (!(r instanceof Result)) { 214 // Entry or reference. 215 return true; 216 } 217 218 // Final result. 219 final Result result = (Result) r; 220 if (result.isSuccess()) { 221 return false; 222 } 223 224 throw newLdapException(result); 225 } 226 227 /** 228 * Waits for the next search result entry or reference to become available 229 * and returns {@code true} if it is an entry, or {@code false} if it is a 230 * reference. 231 * 232 * @return {@code true} if the next search result is an entry, or 233 * {@code false} if it is a reference. 234 * @throws LdapException 235 * If there are no more search result entries or references and 236 * the search result code indicates that the search operation 237 * failed for some reason. 238 * @throws NoSuchElementException 239 * If there are no more search result entries or references and 240 * the search result code indicates that the search operation 241 * succeeded. 242 */ 243 public boolean isEntry() throws LdapException { 244 // Throws LdapException if search returned error. 245 if (!hasNext()) { 246 // Search has completed successfully. 247 throw new NoSuchElementException(); 248 } 249 250 // Entry or reference? 251 final Response r = nextResponse; 252 if (r instanceof SearchResultEntry) { 253 return true; 254 } else if (r instanceof SearchResultReference) { 255 return false; 256 } else { 257 throw new RuntimeException("Unexpected response type: " + r.getClass()); 258 } 259 } 260 261 /** 262 * Waits for the next search result entry or reference to become available 263 * and returns {@code true} if it is a reference, or {@code false} if it is 264 * an entry. 265 * 266 * @return {@code true} if the next search result is a reference, or 267 * {@code false} if it is an entry. 268 * @throws LdapException 269 * If there are no more search result entries or references and 270 * the search result code indicates that the search operation 271 * failed for some reason. 272 * @throws NoSuchElementException 273 * If there are no more search result entries or references and 274 * the search result code indicates that the search operation 275 * succeeded. 276 */ 277 public boolean isReference() throws LdapException { 278 return !isEntry(); 279 } 280 281 /** 282 * Waits for the next search result entry or reference to become available 283 * and, if it is an entry, returns it as a {@code SearchResultEntry}. If the 284 * next search response is a reference then this method will throw a 285 * {@code SearchResultReferenceIOException}. 286 * 287 * @return The next search result entry. 288 * @throws SearchResultReferenceIOException 289 * If the next search response was a search result reference. 290 * This connection entry reader may still contain remaining 291 * search results and references which can be retrieved using 292 * additional calls to this method. 293 * @throws LdapException 294 * If there are no more search result entries or references and 295 * the search result code indicates that the search operation 296 * failed for some reason. 297 * @throws NoSuchElementException 298 * If there are no more search result entries or references and 299 * the search result code indicates that the search operation 300 * succeeded. 301 */ 302 @Override 303 public SearchResultEntry readEntry() throws SearchResultReferenceIOException, LdapException { 304 if (isEntry()) { 305 final SearchResultEntry entry = (SearchResultEntry) nextResponse; 306 nextResponse = null; 307 return entry; 308 } else { 309 final SearchResultReference reference = (SearchResultReference) nextResponse; 310 nextResponse = null; 311 throw new SearchResultReferenceIOException(reference); 312 } 313 } 314 315 /** 316 * Waits for the next search result entry or reference to become available 317 * and, if it is a reference, returns it as a {@code SearchResultReference}. 318 * If the next search response is an entry then this method will return 319 * {@code null}. 320 * 321 * @return The next search result reference, or {@code null} if the next 322 * response was a search result entry. 323 * @throws LdapException 324 * If there are no more search result entries or references and 325 * the search result code indicates that the search operation 326 * failed for some reason. 327 * @throws NoSuchElementException 328 * If there are no more search result entries or references and 329 * the search result code indicates that the search operation 330 * succeeded. 331 */ 332 public SearchResultReference readReference() throws LdapException { 333 if (isReference()) { 334 final SearchResultReference reference = (SearchResultReference) nextResponse; 335 nextResponse = null; 336 return reference; 337 } else { 338 return null; 339 } 340 } 341 342 /** 343 * Waits for the next search response to become available and returns it if 344 * it is a search result indicating that the search completed successfully. 345 * If the search result indicates that the search failed then an 346 * {@link LdapException} is thrown. Otherwise, if the search 347 * response represents an entry or reference then an 348 * {@code IllegalStateException} is thrown. 349 * <p> 350 * This method should only be called if {@link #hasNext()} has, or will, 351 * return {@code false}. 352 * <p> 353 * It is not necessary to call this method once all search result entries 354 * have been processed, but it may be useful to do so in order to inspect 355 * any controls which were included with the result. For example, this 356 * method may be called in order to obtain the next paged results cookie 357 * once the current page of results has been processed. 358 * 359 * @return The search result indicating success. 360 * @throws LdapException 361 * If the search result indicates that the search operation 362 * failed for some reason. 363 * @throws IllegalStateException 364 * If there are remaining search result entries or references to 365 * be processed. In other words, if {@link #hasNext()} would 366 * return {@code true}. 367 */ 368 public Result readResult() throws LdapException { 369 if (hasNext()) { 370 throw new IllegalStateException(); 371 } else { 372 return (Result) nextResponse; 373 } 374 } 375 376 private Response getNextResponse() throws LdapException { 377 while (nextResponse == null) { 378 try { 379 nextResponse = buffer.responses.poll(50, TimeUnit.MILLISECONDS); 380 } catch (final InterruptedException e) { 381 throw newLdapException(ResultCode.CLIENT_SIDE_USER_CANCELLED, e); 382 } 383 384 if (nextResponse == null && buffer.isInterrupted) { 385 // The worker thread processing the result was interrupted so no 386 // result will ever arrive. We don't want to hang this thread 387 // forever while we wait, so terminate now. 388 nextResponse = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR); 389 break; 390 } 391 } 392 return nextResponse; 393 } 394}