001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the License.
004 *
005 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
006 * specific language governing permission and limitations under the License.
007 *
008 * When distributing Covered Software, include this CDDL Header Notice in each file and include
009 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
010 * Header, with the fields enclosed by brackets [] replaced by your own identifying
011 * information: "Portions copyright [year] [name of copyright owner]".
012 *
013 * Copyright 2015 ForgeRock AS.
014 */
015
016package org.forgerock.util.promise;
017
018import java.util.ArrayList;
019import java.util.Arrays;
020import java.util.List;
021import java.util.concurrent.ExecutionException;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.forgerock.util.AsyncFunction;
026import org.forgerock.util.Function;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Utility methods for creating and composing {@link Promise}s.
032 */
033public final class Promises {
034    // TODO: n-of, etc.
035
036    /**
037     * These completed promise implementations provide an optimization for
038     * synchronous processing, which have been found to provide small but
039     * significant benefit:
040     *
041     * <ul>
042     *     <li>A small reduction in GC overhead (less frequent GCs = more
043     *     deterministic response times)</li>
044     *     <li>A reduction of the cost associated with calling then() on a
045     *     pre-completed promise versus the implementation in
046     *     org.forgerock.util.promise.PromiseImpl#then(Function, Function).
047     *     Specifically, a reduction in two volatile accesses as well as memory
048     *     again.</li>
049     * </ul>
050     *
051     * @param <V> {@inheritDoc}
052     * @param <E> {@inheritDoc}
053     */
054    private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> {
055
056        private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class);
057
058        @Override
059        public final boolean cancel(final boolean mayInterruptIfRunning) {
060            return false;
061        }
062
063        @Override
064        public final V get() throws ExecutionException {
065            if (hasResult()) {
066                return getResult();
067            } else if (hasException()) {
068                throw new ExecutionException(getException());
069            } else {
070                throw new ExecutionException(getRuntimeException());
071            }
072        }
073
074        @Override
075        public final V get(final long timeout, final TimeUnit unit) throws ExecutionException {
076            return get();
077        }
078
079        @Override
080        public final V getOrThrow() throws E {
081            if (hasResult()) {
082                return getResult();
083            } else if (hasException()) {
084                throw getException();
085            } else {
086                throw getRuntimeException();
087            }
088        }
089
090        @Override
091        public final V getOrThrow(final long timeout, final TimeUnit unit) throws E {
092            return getOrThrow();
093        }
094
095        @Override
096        public final V getOrThrowUninterruptibly() throws E {
097            return getOrThrow();
098        }
099
100        @Override
101        public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E {
102            return getOrThrow();
103        }
104
105        @Override
106        public final boolean isCancelled() {
107            return false;
108        }
109
110        @Override
111        public final boolean isDone() {
112            return true;
113        }
114
115        @Override
116        public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
117            if (hasException()) {
118                try {
119                    onException.handleException(getException());
120                } catch (RuntimeException e) {
121                    LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
122                }
123            }
124            return this;
125        }
126
127        @Override
128        public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
129            if (hasResult()) {
130                try {
131                    onResult.handleResult(getResult());
132                } catch (RuntimeException e) {
133                    LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
134                }
135            }
136            return this;
137        }
138
139        @Override
140        public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
141                final ExceptionHandler<? super E> onException) {
142            return thenOnResult(onResult).thenOnException(onException);
143        }
144
145        @Override
146        public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
147            if (hasResult() || hasException()) {
148                try {
149                    onResultOrException.run();
150                } catch (RuntimeException e) {
151                    LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
152                }
153            }
154            return this;
155        }
156
157        @Override
158        public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
159            return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
160        }
161
162        @Override
163        public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) {
164            return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
165        }
166
167        @Override
168        @SuppressWarnings("unchecked")
169        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
170                final Function<? super V, VOUT, EOUT> onResult,
171                final Function<? super E, VOUT, EOUT> onException) {
172            try {
173                if (hasResult()) {
174                    return newResultPromise(onResult.apply(getResult()));
175                } else if (hasException()) {
176                    return newResultPromise(onException.apply(getException()));
177                } else {
178                    return new RuntimeExceptionPromise<>(getRuntimeException());
179                }
180            } catch (final RuntimeException e) {
181                return new RuntimeExceptionPromise<>(e);
182            } catch (final Exception e) {
183                return newExceptionPromise((EOUT) e);
184            }
185        }
186
187        @Override
188        public final Promise<V, E> thenAlways(final Runnable always) {
189            try {
190                always.run();
191            } catch (RuntimeException e) {
192                LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
193            }
194            return this;
195        }
196
197        @Override
198        public Promise<V, E> thenFinally(Runnable onFinally) {
199            return thenAlways(onFinally);
200        }
201
202        @Override
203        public final <VOUT> Promise<VOUT, E> thenAsync(
204                final AsyncFunction<? super V, VOUT, E> onResult) {
205            return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
206        }
207
208        @Override
209        public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
210                final AsyncFunction<? super E, V, EOUT> onException) {
211            return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
212        }
213
214        @Override
215        @SuppressWarnings("unchecked")
216        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
217                final AsyncFunction<? super V, VOUT, EOUT> onResult,
218                final AsyncFunction<? super E, VOUT, EOUT> onException) {
219            try {
220                if (hasResult()) {
221                    return (Promise<VOUT, EOUT>) onResult.apply(getResult());
222                } else if (hasException()) {
223                    return (Promise<VOUT, EOUT>) onException.apply(getException());
224                } else {
225                    return new RuntimeExceptionPromise<>(getRuntimeException());
226                }
227            } catch (final RuntimeException e) {
228                return new RuntimeExceptionPromise<>(e);
229            } catch (final Exception e) {
230                return newExceptionPromise((EOUT) e);
231            }
232        }
233
234        @Override
235        public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) {
236            if (getRuntimeException() != null) {
237                try {
238                    onRuntimeException.handleRuntimeException(getRuntimeException());
239                } catch (RuntimeException e) {
240                    LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
241                }
242            }
243            return this;
244        }
245
246        abstract RuntimeException getRuntimeException();
247
248        abstract E getException();
249
250        abstract V getResult();
251
252        abstract boolean hasException();
253
254        abstract boolean hasResult();
255    }
256
257    private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
258        private final RuntimeException runtimeException;
259
260        private RuntimeExceptionPromise(RuntimeException runtimeException) {
261            this.runtimeException = runtimeException;
262        }
263
264        @Override
265        RuntimeException getRuntimeException() {
266            return runtimeException;
267        }
268
269        @Override
270        E getException() {
271            return null;
272        }
273
274        @Override
275        V getResult() {
276            return null;
277        }
278
279        @Override
280        boolean hasException() {
281            return false;
282        }
283
284        @Override
285        boolean hasResult() {
286            return false;
287        }
288    }
289
290    private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
291        private final E exception;
292
293        private ExceptionPromise(final E exception) {
294            this.exception = exception;
295        }
296
297        @Override
298        RuntimeException getRuntimeException() {
299            return null;
300        }
301
302        @Override
303        E getException() {
304            return exception;
305        }
306
307        @Override
308        V getResult() {
309            throw new IllegalStateException();
310        }
311
312        @Override
313        boolean hasException() {
314            return true;
315        }
316
317        @Override
318        boolean hasResult() {
319            return false;
320        }
321    }
322
323    private static final class ResultPromise<V, E extends Exception> extends
324            CompletedPromise<V, E> {
325        private final V value;
326
327        private ResultPromise(final V value) {
328            this.value = value;
329        }
330
331        @Override
332        RuntimeException getRuntimeException() {
333            return null;
334        }
335
336        @Override
337        E getException() {
338            throw new IllegalStateException();
339        }
340
341        @Override
342        V getResult() {
343            return value;
344        }
345
346        @Override
347        boolean hasException() {
348            return false;
349        }
350
351        @Override
352        boolean hasResult() {
353            return true;
354        }
355    }
356
357    private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC =
358            new AsyncFunction<Exception, Object, Exception>() {
359                @Override
360                public Promise<Object, Exception> apply(final Exception exception) throws Exception {
361                    return newExceptionPromise(exception);
362                }
363            };
364
365    private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC =
366            new Function<Exception, Object, Exception>() {
367                @Override
368                public Object apply(final Exception exception) throws Exception {
369                    throw exception;
370                }
371            };
372
373    private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC =
374            new AsyncFunction<Object, Object, Exception>() {
375                @Override
376                public Promise<Object, Exception> apply(final Object object) throws Exception {
377                    return newResultPromise(object);
378                }
379            };
380
381    private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC =
382            new Function<Object, Object, Exception>() {
383                @Override
384                public Object apply(final Object value) throws Exception {
385                    return value;
386                }
387            };
388
389    /**
390     * Returns a {@link Promise} representing an asynchronous task which has
391     * already failed with the provided exception. Attempts to get the result will
392     * immediately fail, and any listeners registered against the returned
393     * promise will be immediately invoked in the same thread as the caller.
394     *
395     * @param <V>
396     *            The type of the task's result, or {@link Void} if the task
397     *            does not return anything (i.e. it only has side-effects).
398     * @param <E>
399     *            The type of the exception thrown by the task if it fails, or
400     *            {@link NeverThrowsException} if the task cannot fail.
401     * @param exception
402     *            The exception indicating why the asynchronous task has failed.
403     * @return A {@link Promise} representing an asynchronous task which has
404     *         already failed with the provided exception.
405     */
406    public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) {
407        return new ExceptionPromise<>(exception);
408    }
409
410    /**
411     * Returns a {@link Promise} representing an asynchronous task which has
412     * already succeeded with the provided result. Attempts to get the result
413     * will immediately return the result, and any listeners registered against
414     * the returned promise will be immediately invoked in the same thread as
415     * the caller.
416     *
417     * @param <V>
418     *            The type of the task's result, or {@link Void} if the task
419     *            does not return anything (i.e. it only has side-effects).
420     * @param <E>
421     *            The type of the exception thrown by the task if it fails, or
422     *            {@link NeverThrowsException} if the task cannot fail.
423     * @param result
424     *            The result of the asynchronous task.
425     * @return A {@link Promise} representing an asynchronous task which has
426     *         already succeeded with the provided result.
427     */
428    public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) {
429        return new ResultPromise<>(result);
430    }
431
432    /**
433     * Returns a {@link Promise} which will be completed once all of the
434     * provided promises have succeeded, or as soon as one of them fails.
435     *
436     * @param <V>
437     *            The type of the tasks' result, or {@link Void} if the tasks do
438     *            not return anything (i.e. they only has side-effects).
439     * @param <E>
440     *            The type of the exception thrown by the tasks if they fail, or
441     *            {@link NeverThrowsException} if the tasks cannot fail.
442     * @param promises
443     *            The list of tasks to be combined.
444     * @return A {@link Promise} which will be completed once all of the
445     *         provided promises have succeeded, or as soon as one of them
446     *         fails.
447     */
448    public static <V, E extends Exception> Promise<List<V>, E> when(
449            final List<Promise<V, E>> promises) {
450        final int size = promises.size();
451        final AtomicInteger remaining = new AtomicInteger(size);
452        final List<V> results = new ArrayList<>(size);
453        final PromiseImpl<List<V>, E> composite = PromiseImpl.create();
454        for (final Promise<V, E> promise : promises) {
455            promise.thenOnResult(new ResultHandler<V>() {
456                @Override
457                public void handleResult(final V value) {
458                    synchronized (results) {
459                        results.add(value);
460                    }
461                    if (remaining.decrementAndGet() == 0) {
462                        composite.handleResult(results);
463                    }
464                }
465            }).thenOnException(new ExceptionHandler<E>() {
466                @Override
467                public void handleException(final E exception) {
468                    composite.handleException(exception);
469                }
470            }).thenOnRuntimeException(new RuntimeExceptionHandler() {
471                @Override
472                public void handleRuntimeException(RuntimeException exception) {
473                    composite.handleRuntimeException(exception);
474                }
475            });
476        }
477        if (promises.isEmpty()) {
478            composite.handleResult(results);
479        }
480        return composite;
481    }
482
483    /**
484     * Returns a {@link Promise} which will be completed once all of the
485     * provided promises have succeeded, or as soon as one of them fails.
486     *
487     * @param <V>
488     *            The type of the tasks' result, or {@link Void} if the tasks do
489     *            not return anything (i.e. they only has side-effects).
490     * @param <E>
491     *            The type of the exception thrown by the tasks if they fail, or
492     *            {@link NeverThrowsException} if the tasks cannot fail.
493     * @param promises
494     *            The list of tasks to be combined.
495     * @return A {@link Promise} which will be completed once all of the
496     *         provided promises have succeeded, or as soon as one of them
497     *         has thrown an exception.
498     */
499    @SafeVarargs
500    public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) {
501        return when(Arrays.asList(promises));
502    }
503
504    @SuppressWarnings("unchecked")
505    static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() {
506        return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC;
507    }
508
509    @SuppressWarnings("unchecked")
510    static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() {
511        return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC;
512    }
513
514    @SuppressWarnings("unchecked")
515    static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() {
516        return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC;
517    }
518
519    @SuppressWarnings("unchecked")
520    static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() {
521        return (Function<V, V, E>) RESULT_IDEM_FUNC;
522    }
523
524    private Promises() {
525        // Prevent instantiation.
526    }
527}