001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the License.
004 *
005 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
006 * specific language governing permission and limitations under the License.
007 *
008 * When distributing Covered Software, include this CDDL Header Notice in each file and include
009 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
010 * Header, with the fields enclosed by brackets [] replaced by your own identifying
011 * information: "Portions copyright [year] [name of copyright owner]".
012 *
013 * Copyright 2015 ForgeRock AS.
014 */
015package org.forgerock.util.promise;
016
017import java.util.Queue;
018import java.util.concurrent.ConcurrentLinkedQueue;
019import java.util.concurrent.ExecutionException;
020import java.util.concurrent.TimeUnit;
021import java.util.concurrent.TimeoutException;
022
023import org.forgerock.util.AsyncFunction;
024import org.forgerock.util.Function;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * An implementation of {@link Promise} which can be used as is, or as the basis
030 * for more complex asynchronous behavior. A {@code PromiseImpl} must be
031 * completed by invoking one of:
032 * <ul>
033 * <li>{@link #handleResult} - marks the promise as having succeeded with the
034 * provide result
035 * <li>{@link #handleException} - marks the promise as having failed with the
036 * provided exception
037 * <li>{@link #cancel} - requests cancellation of the asynchronous task
038 * represented by the promise. Cancellation is only supported if the
039 * {@link #tryCancel(boolean)} is overridden and returns an exception.
040 * </ul>
041 *
042 * @param <V>
043 *            The type of the task's result, or {@link Void} if the task does
044 *            not return anything (i.e. it only has side-effects).
045 * @param <E>
046 *            The type of the exception thrown by the task if it fails, or
047 *            {@link NeverThrowsException} if the task cannot fail.
048 * @see Promise
049 * @see Promises
050 */
051public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>,
052        ExceptionHandler<E>, RuntimeExceptionHandler {
053    // TODO: Is using monitor based sync better than AQS?
054
055    private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class);
056
057    private interface StateListener<V, E extends Exception> {
058        void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException);
059    }
060
061    /**
062     * State value indicating that this promise has not completed.
063     */
064    private static final int PENDING = 0;
065
066    /**
067     * State value indicating that this promise has completed successfully
068     * (result set).
069     */
070    private static final int HAS_RESULT = 1;
071
072    /**
073     * State value indicating that this promise has failed (exception set).
074     */
075    private static final int HAS_EXCEPTION = 2;
076
077    /**
078     * State value indicating that this promise has been cancelled (exception set).
079     */
080    private static final int CANCELLED = 3;
081
082    /**
083     * State value indicating that this promise has failed with a runtime exception.
084     */
085    private static final int HAS_RUNTIME_EXCEPTION = 4;
086
087    /**
088     * Creates a new pending {@link Promise} implementation.
089     *
090     * @param <V>
091     *            The type of the task's result, or {@link Void} if the task
092     *            does not return anything (i.e. it only has side-effects).
093     * @param <E>
094     *            The type of the exception thrown by the task if it fails, or
095     *            {@link NeverThrowsException} if the task cannot fail.
096     * @return A new pending {@link Promise} implementation.
097     */
098    public static <V, E extends Exception> PromiseImpl<V, E> create() {
099        return new PromiseImpl<>();
100    }
101
102    private volatile int state = PENDING;
103    private V result = null;
104    private E exception = null;
105    private RuntimeException runtimeException = null;
106
107    private final Queue<StateListener<V, E>> listeners =
108            new ConcurrentLinkedQueue<>();
109
110    /**
111     * Creates a new pending {@link Promise} implementation. This constructor is
112     * protected to allow for sub-classing.
113     */
114    protected PromiseImpl() {
115        // No implementation.
116    }
117
118    @Override
119    public final boolean cancel(final boolean mayInterruptIfRunning) {
120        if (isDone()) {
121            // Fail-fast.
122            return false;
123        }
124        final E exception = tryCancel(mayInterruptIfRunning);
125        return exception != null && setState(CANCELLED, null, exception, null);
126    }
127
128    @Override
129    public final V get() throws InterruptedException, ExecutionException {
130        await(); // Publishes.
131        return get0();
132    }
133
134    @Override
135    public final V get(final long timeout, final TimeUnit unit) throws InterruptedException,
136            ExecutionException, TimeoutException {
137        await(timeout, unit, false); // Publishes.
138        return get0();
139    }
140
141    @Override
142    public final V getOrThrow() throws InterruptedException, E {
143        await(); // Publishes.
144        return getOrThrow0();
145    }
146
147    @Override
148    public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException,
149            E, TimeoutException {
150        await(timeout, unit, false); // Publishes.
151        return getOrThrow0();
152    }
153
154    @Override
155    public final V getOrThrowUninterruptibly() throws E {
156        boolean wasInterrupted = false;
157        try {
158            while (true) {
159                try {
160                    return getOrThrow();
161                } catch (final InterruptedException e) {
162                    wasInterrupted = true;
163                }
164            }
165        } finally {
166            if (wasInterrupted) {
167                Thread.currentThread().interrupt();
168            }
169        }
170    }
171
172    @Override
173    public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E,
174            TimeoutException {
175        try {
176            await(timeout, unit, true); // Publishes.
177        } catch (InterruptedException ignored) {
178            // Will never occur since interrupts are ignored.
179        }
180        return getOrThrow0();
181    }
182
183    /**
184     * Signals that the asynchronous task represented by this promise has
185     * failed. If the task has already completed (i.e. {@code isDone() == true})
186     * then calling this method has no effect and the provided result will be
187     * discarded.
188     *
189     * @param exception
190     *            The exception indicating why the task failed.
191     * @see #tryHandleException(Exception)
192     */
193    @Override
194    public final void handleException(final E exception) {
195        tryHandleException(exception);
196    }
197
198    @Override
199    public void handleRuntimeException(RuntimeException exception) {
200        setState(HAS_RUNTIME_EXCEPTION, null, null, exception);
201    }
202
203    /**
204     * Signals that the asynchronous task represented by this promise has
205     * succeeded. If the task has already completed (i.e.
206     * {@code isDone() == true}) then calling this method has no effect and the
207     * provided result will be discarded.
208     *
209     * @param result
210     *            The result of the asynchronous task (may be {@code null}).
211     * @see #tryHandleResult(Object)
212     */
213    @Override
214    public final void handleResult(final V result) {
215        tryHandleResult(result);
216    }
217
218    /**
219     * Attempts to signal that the asynchronous task represented by this promise
220     * has failed. If the task has already completed (i.e.
221     * {@code isDone() == true}) then calling this method has no effect and
222     * {@code false} is returned.
223     * <p>
224     * This method should be used in cases where multiple threads may
225     * concurrently attempt to complete a promise and need to release resources
226     * if the completion attempt fails. For example, an asynchronous TCP connect
227     * attempt may complete after a timeout has expired. In this case the
228     * connection should be immediately closed because it is never going to be
229     * used.
230     *
231     * @param exception
232     *            The exception indicating why the task failed.
233     * @return {@code false} if this promise has already been completed, either
234     *         due to normal termination, an exception, or cancellation (i.e.
235     *         {@code isDone() == true}).
236     * @see #handleException(Exception)
237     * @see #isDone()
238     */
239    public final boolean tryHandleException(final E exception) {
240        return setState(HAS_EXCEPTION, null, exception, null);
241    }
242
243    /**
244     * Attempts to signal that the asynchronous task represented by this promise
245     * has succeeded. If the task has already completed (i.e.
246     * {@code isDone() == true}) then calling this method has no effect and
247     * {@code false} is returned.
248     * <p>
249     * This method should be used in cases where multiple threads may
250     * concurrently attempt to complete a promise and need to release resources
251     * if the completion attempt fails. For example, an asynchronous TCP connect
252     * attempt may complete after a timeout has expired. In this case the
253     * connection should be immediately closed because it is never going to be
254     * used.
255     *
256     * @param result
257     *            The result of the asynchronous task (may be {@code null}).
258     * @return {@code false} if this promise has already been completed, either
259     *         due to normal termination, an exception, or cancellation (i.e.
260     *         {@code isDone() == true}).
261     * @see #handleResult(Object)
262     * @see #isDone()
263     */
264    public final boolean tryHandleResult(final V result) {
265        return setState(HAS_RESULT, result, null, null);
266    }
267
268    @Override
269    public final boolean isCancelled() {
270        return state == CANCELLED;
271    }
272
273    @Override
274    public final boolean isDone() {
275        return state != PENDING;
276    }
277
278    @Override
279    public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
280        addOrFireListener(new StateListener<V, E>() {
281            @Override
282            public void handleStateChange(final int newState, final V result, final E exception,
283                    final RuntimeException runtimeException) {
284                if (newState == HAS_EXCEPTION || newState == CANCELLED) {
285                    try {
286                        onException.handleException(exception);
287                    } catch (RuntimeException e) {
288                        LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
289                    }
290                }
291            }
292        });
293        return this;
294    }
295
296    @Override
297    public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
298        addOrFireListener(new StateListener<V, E>() {
299            @Override
300            public void handleStateChange(final int newState, final V result, final E exception,
301                    final RuntimeException runtimeException) {
302                if (newState == HAS_RESULT) {
303                    try {
304                        onResult.handleResult(result);
305                    } catch (RuntimeException e) {
306                        LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
307                    }
308                }
309            }
310        });
311        return this;
312    }
313
314    @Override
315    public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
316            final ExceptionHandler<? super E> onException) {
317        addOrFireListener(new StateListener<V, E>() {
318            @Override
319            public void handleStateChange(final int newState, final V result, final E exception,
320                    final RuntimeException runtimeException) {
321                if (newState == HAS_RESULT) {
322                    try {
323                        onResult.handleResult(result);
324                    } catch (RuntimeException e) {
325                        LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
326                    }
327                } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
328                    try {
329                        onException.handleException(exception);
330                    } catch (RuntimeException e) {
331                        LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
332                    }
333                }
334            }
335        });
336        return this;
337    }
338
339    @Override
340    public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
341        addOrFireListener(new StateListener<V, E>() {
342            @Override
343            public void handleStateChange(final int newState, final V result, final E exception,
344                    final RuntimeException runtimeException) {
345                if (newState != HAS_RUNTIME_EXCEPTION) {
346                    try {
347                        onResultOrException.run();
348                    } catch (RuntimeException e) {
349                        LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
350                    }
351                }
352            }
353        });
354        return this;
355    }
356
357    @Override
358    public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
359        return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
360    }
361
362    @Override
363    public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) {
364        return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
365    }
366
367    @Override
368    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
369        final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) {
370        final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
371        addOrFireListener(new StateListener<V, E>() {
372            @Override
373            public void handleStateChange(final int newState, final V result, final E exception,
374                    final RuntimeException runtimeException) {
375                try {
376                    if (newState == HAS_RESULT) {
377                        chained.handleResult(onResult.apply(result));
378                    } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
379                        chained.handleResult(onException.apply(exception));
380                    } else {
381                        tryHandlingRuntimeException(runtimeException, chained);
382                    }
383                } catch (final RuntimeException e) {
384                    tryHandlingRuntimeException(e, chained);
385                } catch (final Exception e) {
386                    chained.handleException((EOUT) e);
387                }
388            }
389        });
390        return chained;
391    }
392
393    private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException,
394            final PromiseImpl<VOUT, EOUT> chained) {
395        try {
396            chained.handleRuntimeException(runtimeException);
397        } catch (Exception ignored) {
398            LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored);
399        }
400    }
401
402    @Override
403    public final Promise<V, E> thenAlways(final Runnable always) {
404        addOrFireListener(new StateListener<V, E>() {
405            @Override
406            public void handleStateChange(final int newState, final V result, final E exception,
407                    final RuntimeException runtimeException) {
408                try {
409                    always.run();
410                } catch (RuntimeException e) {
411                    LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
412                }
413            }
414        });
415        return this;
416    }
417
418    @Override
419    public final Promise<V, E> thenFinally(final Runnable onFinally) {
420        return thenAlways(onFinally);
421    }
422
423    @Override
424    public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) {
425        return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
426    }
427
428    @Override
429    public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(AsyncFunction<? super E, V, EOUT> onException) {
430        return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
431    }
432
433    @Override
434    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
435            final AsyncFunction<? super V, VOUT, EOUT> onResult,
436            final AsyncFunction<? super E, VOUT, EOUT> onException) {
437        final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
438        addOrFireListener(new StateListener<V, E>() {
439            @Override
440            @SuppressWarnings("unchecked")
441            public void handleStateChange(final int newState, final V result, final E exception,
442                    final RuntimeException runtimeException) {
443                try {
444                    if (newState == HAS_RESULT) {
445                        callNestedPromise(onResult.apply(result));
446                    } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
447                        callNestedPromise(onException.apply(exception));
448                    } else {
449                        tryHandlingRuntimeException(runtimeException, chained);
450                    }
451                } catch (final RuntimeException e) {
452                    tryHandlingRuntimeException(e, chained);
453                } catch (final Exception e) {
454                    chained.handleException((EOUT) e);
455                }
456            }
457
458            private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) {
459                nestedPromise
460                        .thenOnResult(chained)
461                        .thenOnException(chained)
462                        .thenOnRuntimeException(chained);
463            }
464        });
465        return chained;
466    }
467
468    @Override
469    public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) {
470        addOrFireListener(new StateListener<V, E>() {
471            @Override
472            public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) {
473                if (newState == HAS_RUNTIME_EXCEPTION) {
474                    try {
475                        onRuntimeException.handleRuntimeException(runtimeException);
476                    } catch (RuntimeException e) {
477                        LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
478                    }
479                }
480            }
481        });
482        return this;
483    }
484
485    /**
486     * Invoked when the client attempts to cancel the asynchronous task
487     * represented by this promise. Implementations which support cancellation
488     * should override this method to cancel the asynchronous task and, if
489     * successful, return an appropriate exception which can be used to signal
490     * that the task has failed.
491     * <p>
492     * By default cancellation is not supported and this method returns
493     * {@code null}.
494     *
495     * @param mayInterruptIfRunning
496     *            {@code true} if the thread executing this task should be
497     *            interrupted; otherwise, in-progress tasks are allowed to
498     *            complete.
499     * @return {@code null} if cancellation was not supported or not possible,
500     *         otherwise an appropriate exception.
501     */
502    protected E tryCancel(final boolean mayInterruptIfRunning) {
503        return null;
504    }
505
506    private void addOrFireListener(final StateListener<V, E> listener) {
507        final int stateBefore = state;
508        if (stateBefore != PENDING) {
509            handleCompletion(listener, stateBefore);
510        } else {
511            listeners.add(listener);
512            final int stateAfter = state;
513            if (stateAfter != PENDING && listeners.remove(listener)) {
514                handleCompletion(listener, stateAfter);
515            }
516        }
517    }
518
519    private void handleCompletion(final StateListener<V, E> listener, final int completedState) {
520        try {
521            listener.handleStateChange(completedState, result, exception, runtimeException);
522        } catch (RuntimeException ignored) {
523            LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored);
524        }
525    }
526
527    private V get0() throws ExecutionException {
528        if (runtimeException != null) {
529            throw new ExecutionException(runtimeException);
530        } else if (exception != null) {
531            throw new ExecutionException(exception);
532        } else {
533            return result;
534        }
535    }
536
537    private V getOrThrow0() throws E {
538        if (runtimeException != null) {
539            throw runtimeException;
540        } else if (exception != null) {
541            throw exception;
542        } else {
543            return result;
544        }
545    }
546
547    private boolean setState(final int newState, final V result, final E exception,
548            final RuntimeException runtimeException) {
549        synchronized (this) {
550            if (state != PENDING) {
551                // Already completed.
552                return false;
553            }
554            this.result = result;
555            this.exception = exception;
556            this.runtimeException = runtimeException;
557            state = newState; // Publishes.
558            notifyAll(); // Wake up any blocked threads.
559        }
560        StateListener<V, E> listener;
561        while ((listener = listeners.poll()) != null) {
562            handleCompletion(listener, newState);
563        }
564        return true;
565    }
566
567    private void await() throws InterruptedException {
568        // Use double-check for fast-path.
569        if (state == PENDING) {
570            synchronized (this) {
571                while (state == PENDING) {
572                    wait();
573                }
574            }
575        }
576    }
577
578    private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly)
579            throws InterruptedException, TimeoutException {
580        // Use double-check for fast-path.
581        if (state == PENDING) {
582            final long timeoutMS = unit.toMillis(timeout);
583            final long endTimeMS = System.currentTimeMillis() + timeoutMS;
584            boolean wasInterrupted = false;
585            try {
586                synchronized (this) {
587                    while (state == PENDING) {
588                        final long remainingTimeMS = endTimeMS - System.currentTimeMillis();
589                        if (remainingTimeMS <= 0) {
590                            throw new TimeoutException();
591                        }
592                        try {
593                            wait(remainingTimeMS);
594                        } catch (final InterruptedException e) {
595                            if (isUninterruptibly) {
596                                wasInterrupted = true;
597                            } else {
598                                throw e;
599                            }
600                        }
601                    }
602                }
603            } finally {
604                if (wasInterrupted) {
605                    Thread.currentThread().interrupt();
606                }
607            }
608        }
609    }
610}