001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.forgerock.opendj.ldif;
018
019import java.util.NoSuchElementException;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.LinkedBlockingQueue;
022import java.util.concurrent.TimeUnit;
023
024import org.forgerock.opendj.ldap.Connection;
025import org.forgerock.opendj.ldap.LdapException;
026import org.forgerock.opendj.ldap.LdapPromise;
027import org.forgerock.opendj.ldap.ResultCode;
028import org.forgerock.opendj.ldap.LdapResultHandler;
029import org.forgerock.opendj.ldap.SearchResultHandler;
030import org.forgerock.opendj.ldap.SearchResultReferenceIOException;
031import org.forgerock.opendj.ldap.requests.SearchRequest;
032import org.forgerock.opendj.ldap.responses.Response;
033import org.forgerock.opendj.ldap.responses.Responses;
034import org.forgerock.opendj.ldap.responses.Result;
035import org.forgerock.opendj.ldap.responses.SearchResultEntry;
036import org.forgerock.opendj.ldap.responses.SearchResultReference;
037import org.forgerock.util.Reject;
038
039import static org.forgerock.opendj.ldap.LdapException.*;
040
041/**
042 * A {@code ConnectionEntryReader} is a bridge from {@code Connection}s to
043 * {@code EntryReader}s. A connection entry reader allows applications to
044 * iterate over search results as they are returned from the server during a
045 * search operation.
046 * <p>
047 * The Search operation is performed synchronously, blocking until a search
048 * result entry is received. If a search result indicates that the search
049 * operation has failed for some reason then the error result is propagated to
050 * the caller using an {@code LdapException}. If a search result
051 * reference is returned then it is propagated to the caller using a
052 * {@code SearchResultReferenceIOException}.
053 * <p>
054 * The following code illustrates how a {@code ConnectionEntryReader} may be
055 * used:
056 *
057 * <pre>
058 * Connection connection = ...;
059 * ConnectionEntryReader reader = connection.search(&quot;dc=example,dc=com&quot;,
060 *     SearchScope.WHOLE_SUBTREE, &quot;(objectClass=person)&quot;);
061 * try
062 * {
063 *   while (reader.hasNext())
064 *   {
065 *     if (reader.isEntry())
066 *     {
067 *       SearchResultEntry entry = reader.readEntry();
068 *
069 *       // Handle entry...
070 *     }
071 *     else
072 *     {
073 *       SearchResultReference ref = reader.readReference();
074 *
075 *       // Handle continuation reference...
076 *     }
077 *   }
078 *
079 *   Result result = reader.readResult();
080 *   // Handle controls included with the search result...
081 * }
082 * catch (IOException e)
083 * {
084 *   // Handle exceptions...
085 * }
086 * finally
087 * {
088 *   reader.close();
089 * }
090 * </pre>
091 *
092 * <b>NOTE:</b> although this class is non-final, sub-classing is not supported
093 * except when creating mock objects for unit tests. This class has been
094 * selected specifically because it is the only aspect of the {@code Connection}
095 * interface which is not mockable.
096 */
097public class ConnectionEntryReader implements EntryReader {
098    /* See OPENDJ-1124 for more discussion about why this class is non-final. */
099
100    /** Result handler that places all responses in a queue. */
101    private static final class BufferHandler implements SearchResultHandler, LdapResultHandler<Result> {
102        private final BlockingQueue<Response> responses;
103        private volatile boolean isInterrupted;
104
105        private BufferHandler(final BlockingQueue<Response> responses) {
106            this.responses = responses;
107        }
108
109        @Override
110        public boolean handleEntry(final SearchResultEntry entry) {
111            try {
112                responses.put(entry);
113                return true;
114            } catch (final InterruptedException e) {
115                // Prevent the reader from waiting for a result that will never
116                // arrive.
117                isInterrupted = true;
118                Thread.currentThread().interrupt();
119                return false;
120            }
121        }
122
123        @Override
124        public void handleException(final LdapException error) {
125            try {
126                responses.put(error.getResult());
127            } catch (final InterruptedException e) {
128                // Prevent the reader from waiting for a result that will never
129                // arrive.
130                isInterrupted = true;
131                Thread.currentThread().interrupt();
132            }
133        }
134
135        @Override
136        public boolean handleReference(final SearchResultReference reference) {
137            try {
138                responses.put(reference);
139                return true;
140            } catch (final InterruptedException e) {
141                // Prevent the reader from waiting for a result that will never
142                // arrive.
143                isInterrupted = true;
144                Thread.currentThread().interrupt();
145                return false;
146            }
147        }
148
149        @Override
150        public void handleResult(final Result result) {
151            try {
152                responses.put(result);
153            } catch (final InterruptedException e) {
154                // Prevent the reader from waiting for a result that will never
155                // arrive.
156                isInterrupted = true;
157                Thread.currentThread().interrupt();
158            }
159        }
160    }
161
162    private final BufferHandler buffer;
163    private final LdapPromise<Result> promise;
164    private Response nextResponse;
165
166    /**
167     * Creates a new connection entry reader whose destination is the provided
168     * connection using an unbounded {@code LinkedBlockingQueue}.
169     *
170     * @param connection
171     *            The connection to use.
172     * @param searchRequest
173     *            The search request to retrieve entries with.
174     * @throws NullPointerException
175     *             If {@code connection} was {@code null}.
176     */
177    public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest) {
178        this(connection, searchRequest, new LinkedBlockingQueue<Response>());
179    }
180
181    /**
182     * Creates a new connection entry reader whose destination is the provided
183     * connection.
184     *
185     * @param connection
186     *            The connection to use.
187     * @param searchRequest
188     *            The search request to retrieve entries with.
189     * @param entries
190     *            The {@code BlockingQueue} implementation to use when queuing
191     *            the returned entries.
192     * @throws NullPointerException
193     *             If {@code connection} was {@code null}.
194     */
195    public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest,
196        final BlockingQueue<Response> entries) {
197        Reject.ifNull(connection);
198        buffer = new BufferHandler(entries);
199        promise = connection.searchAsync(searchRequest, buffer).thenOnResult(buffer).thenOnException(buffer);
200    }
201
202    /** Closes this connection entry reader, canceling the search request if it is still active. */
203    @Override
204    public void close() {
205        // Cancel the search if it is still running.
206        promise.cancel(true);
207    }
208
209    @Override
210    public boolean hasNext() throws LdapException {
211        // Poll for the next response if needed.
212        final Response r = getNextResponse();
213        if (!(r instanceof Result)) {
214            // Entry or reference.
215            return true;
216        }
217
218        // Final result.
219        final Result result = (Result) r;
220        if (result.isSuccess()) {
221            return false;
222        }
223
224        throw newLdapException(result);
225    }
226
227    /**
228     * Waits for the next search result entry or reference to become available
229     * and returns {@code true} if it is an entry, or {@code false} if it is a
230     * reference.
231     *
232     * @return {@code true} if the next search result is an entry, or
233     *         {@code false} if it is a reference.
234     * @throws LdapException
235     *             If there are no more search result entries or references and
236     *             the search result code indicates that the search operation
237     *             failed for some reason.
238     * @throws NoSuchElementException
239     *             If there are no more search result entries or references and
240     *             the search result code indicates that the search operation
241     *             succeeded.
242     */
243    public boolean isEntry() throws LdapException {
244        // Throws LdapException if search returned error.
245        if (!hasNext()) {
246            // Search has completed successfully.
247            throw new NoSuchElementException();
248        }
249
250        // Entry or reference?
251        final Response r = nextResponse;
252        if (r instanceof SearchResultEntry) {
253            return true;
254        } else if (r instanceof SearchResultReference) {
255            return false;
256        } else {
257            throw new RuntimeException("Unexpected response type: " + r.getClass());
258        }
259    }
260
261    /**
262     * Waits for the next search result entry or reference to become available
263     * and returns {@code true} if it is a reference, or {@code false} if it is
264     * an entry.
265     *
266     * @return {@code true} if the next search result is a reference, or
267     *         {@code false} if it is an entry.
268     * @throws LdapException
269     *             If there are no more search result entries or references and
270     *             the search result code indicates that the search operation
271     *             failed for some reason.
272     * @throws NoSuchElementException
273     *             If there are no more search result entries or references and
274     *             the search result code indicates that the search operation
275     *             succeeded.
276     */
277    public boolean isReference() throws LdapException {
278        return !isEntry();
279    }
280
281    /**
282     * Waits for the next search result entry or reference to become available
283     * and, if it is an entry, returns it as a {@code SearchResultEntry}. If the
284     * next search response is a reference then this method will throw a
285     * {@code SearchResultReferenceIOException}.
286     *
287     * @return The next search result entry.
288     * @throws SearchResultReferenceIOException
289     *             If the next search response was a search result reference.
290     *             This connection entry reader may still contain remaining
291     *             search results and references which can be retrieved using
292     *             additional calls to this method.
293     * @throws LdapException
294     *             If there are no more search result entries or references and
295     *             the search result code indicates that the search operation
296     *             failed for some reason.
297     * @throws NoSuchElementException
298     *             If there are no more search result entries or references and
299     *             the search result code indicates that the search operation
300     *             succeeded.
301     */
302    @Override
303    public SearchResultEntry readEntry() throws SearchResultReferenceIOException, LdapException {
304        if (isEntry()) {
305            final SearchResultEntry entry = (SearchResultEntry) nextResponse;
306            nextResponse = null;
307            return entry;
308        } else {
309            final SearchResultReference reference = (SearchResultReference) nextResponse;
310            nextResponse = null;
311            throw new SearchResultReferenceIOException(reference);
312        }
313    }
314
315    /**
316     * Waits for the next search result entry or reference to become available
317     * and, if it is a reference, returns it as a {@code SearchResultReference}.
318     * If the next search response is an entry then this method will return
319     * {@code null}.
320     *
321     * @return The next search result reference, or {@code null} if the next
322     *         response was a search result entry.
323     * @throws LdapException
324     *             If there are no more search result entries or references and
325     *             the search result code indicates that the search operation
326     *             failed for some reason.
327     * @throws NoSuchElementException
328     *             If there are no more search result entries or references and
329     *             the search result code indicates that the search operation
330     *             succeeded.
331     */
332    public SearchResultReference readReference() throws LdapException {
333        if (isReference()) {
334            final SearchResultReference reference = (SearchResultReference) nextResponse;
335            nextResponse = null;
336            return reference;
337        } else {
338            return null;
339        }
340    }
341
342    /**
343     * Waits for the next search response to become available and returns it if
344     * it is a search result indicating that the search completed successfully.
345     * If the search result indicates that the search failed then an
346     * {@link LdapException} is thrown. Otherwise, if the search
347     * response represents an entry or reference then an
348     * {@code IllegalStateException} is thrown.
349     * <p>
350     * This method should only be called if {@link #hasNext()} has, or will,
351     * return {@code false}.
352     * <p>
353     * It is not necessary to call this method once all search result entries
354     * have been processed, but it may be useful to do so in order to inspect
355     * any controls which were included with the result. For example, this
356     * method may be called in order to obtain the next paged results cookie
357     * once the current page of results has been processed.
358     *
359     * @return The search result indicating success.
360     * @throws LdapException
361     *             If the search result indicates that the search operation
362     *             failed for some reason.
363     * @throws IllegalStateException
364     *             If there are remaining search result entries or references to
365     *             be processed. In other words, if {@link #hasNext()} would
366     *             return {@code true}.
367     */
368    public Result readResult() throws LdapException {
369        if (hasNext()) {
370            throw new IllegalStateException();
371        } else {
372            return (Result) nextResponse;
373        }
374    }
375
376    private Response getNextResponse() throws LdapException {
377        while (nextResponse == null) {
378            try {
379                nextResponse = buffer.responses.poll(50, TimeUnit.MILLISECONDS);
380            } catch (final InterruptedException e) {
381                throw newLdapException(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
382            }
383
384            if (nextResponse == null && buffer.isInterrupted) {
385                // The worker thread processing the result was interrupted so no
386                // result will ever arrive. We don't want to hang this thread
387                // forever while we wait, so terminate now.
388                nextResponse = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR);
389                break;
390            }
391        }
392        return nextResponse;
393    }
394}