001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the License. 004 * 005 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 006 * specific language governing permission and limitations under the License. 007 * 008 * When distributing Covered Software, include this CDDL Header Notice in each file and include 009 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 010 * Header, with the fields enclosed by brackets [] replaced by your own identifying 011 * information: "Portions copyright [year] [name of copyright owner]". 012 * 013 * Copyright 2015 ForgeRock AS. 014 */ 015 016package org.forgerock.util.promise; 017 018import java.util.ArrayList; 019import java.util.Arrays; 020import java.util.List; 021import java.util.concurrent.ExecutionException; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.forgerock.util.AsyncFunction; 026import org.forgerock.util.Function; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Utility methods for creating and composing {@link Promise}s. 032 */ 033public final class Promises { 034 // TODO: n-of, etc. 035 036 /** 037 * These completed promise implementations provide an optimization for 038 * synchronous processing, which have been found to provide small but 039 * significant benefit: 040 * 041 * <ul> 042 * <li>A small reduction in GC overhead (less frequent GCs = more 043 * deterministic response times)</li> 044 * <li>A reduction of the cost associated with calling then() on a 045 * pre-completed promise versus the implementation in 046 * org.forgerock.util.promise.PromiseImpl#then(Function, Function). 047 * Specifically, a reduction in two volatile accesses as well as memory 048 * again.</li> 049 * </ul> 050 * 051 * @param <V> {@inheritDoc} 052 * @param <E> {@inheritDoc} 053 */ 054 private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> { 055 056 private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class); 057 058 @Override 059 public final boolean cancel(final boolean mayInterruptIfRunning) { 060 return false; 061 } 062 063 @Override 064 public final V get() throws ExecutionException { 065 if (hasResult()) { 066 return getResult(); 067 } else if (hasException()) { 068 throw new ExecutionException(getException()); 069 } else { 070 throw new ExecutionException(getRuntimeException()); 071 } 072 } 073 074 @Override 075 public final V get(final long timeout, final TimeUnit unit) throws ExecutionException { 076 return get(); 077 } 078 079 @Override 080 public final V getOrThrow() throws E { 081 if (hasResult()) { 082 return getResult(); 083 } else if (hasException()) { 084 throw getException(); 085 } else { 086 throw getRuntimeException(); 087 } 088 } 089 090 @Override 091 public final V getOrThrow(final long timeout, final TimeUnit unit) throws E { 092 return getOrThrow(); 093 } 094 095 @Override 096 public final V getOrThrowUninterruptibly() throws E { 097 return getOrThrow(); 098 } 099 100 @Override 101 public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E { 102 return getOrThrow(); 103 } 104 105 @Override 106 public final boolean isCancelled() { 107 return false; 108 } 109 110 @Override 111 public final boolean isDone() { 112 return true; 113 } 114 115 @Override 116 public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) { 117 if (hasException()) { 118 try { 119 onException.handleException(getException()); 120 } catch (RuntimeException e) { 121 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 122 } 123 } 124 return this; 125 } 126 127 @Override 128 public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) { 129 if (hasResult()) { 130 try { 131 onResult.handleResult(getResult()); 132 } catch (RuntimeException e) { 133 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 134 } 135 } 136 return this; 137 } 138 139 @Override 140 public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult, 141 final ExceptionHandler<? super E> onException) { 142 return thenOnResult(onResult).thenOnException(onException); 143 } 144 145 @Override 146 public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) { 147 if (hasResult() || hasException()) { 148 try { 149 onResultOrException.run(); 150 } catch (RuntimeException e) { 151 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 152 } 153 } 154 return this; 155 } 156 157 @Override 158 public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) { 159 return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction()); 160 } 161 162 @Override 163 public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) { 164 return then(Promises.<V, EOUT>resultIdempotentFunction(), onException); 165 } 166 167 @Override 168 @SuppressWarnings("unchecked") 169 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 170 final Function<? super V, VOUT, EOUT> onResult, 171 final Function<? super E, VOUT, EOUT> onException) { 172 try { 173 if (hasResult()) { 174 return newResultPromise(onResult.apply(getResult())); 175 } else if (hasException()) { 176 return newResultPromise(onException.apply(getException())); 177 } else { 178 return new RuntimeExceptionPromise<>(getRuntimeException()); 179 } 180 } catch (final RuntimeException e) { 181 return new RuntimeExceptionPromise<>(e); 182 } catch (final Exception e) { 183 return newExceptionPromise((EOUT) e); 184 } 185 } 186 187 @Override 188 public final Promise<V, E> thenAlways(final Runnable always) { 189 try { 190 always.run(); 191 } catch (RuntimeException e) { 192 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 193 } 194 return this; 195 } 196 197 @Override 198 public Promise<V, E> thenFinally(Runnable onFinally) { 199 return thenAlways(onFinally); 200 } 201 202 @Override 203 public final <VOUT> Promise<VOUT, E> thenAsync( 204 final AsyncFunction<? super V, VOUT, E> onResult) { 205 return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction()); 206 } 207 208 @Override 209 public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync( 210 final AsyncFunction<? super E, V, EOUT> onException) { 211 return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException); 212 } 213 214 @Override 215 @SuppressWarnings("unchecked") 216 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 217 final AsyncFunction<? super V, VOUT, EOUT> onResult, 218 final AsyncFunction<? super E, VOUT, EOUT> onException) { 219 try { 220 if (hasResult()) { 221 return (Promise<VOUT, EOUT>) onResult.apply(getResult()); 222 } else if (hasException()) { 223 return (Promise<VOUT, EOUT>) onException.apply(getException()); 224 } else { 225 return new RuntimeExceptionPromise<>(getRuntimeException()); 226 } 227 } catch (final RuntimeException e) { 228 return new RuntimeExceptionPromise<>(e); 229 } catch (final Exception e) { 230 return newExceptionPromise((EOUT) e); 231 } 232 } 233 234 @Override 235 public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) { 236 if (getRuntimeException() != null) { 237 try { 238 onRuntimeException.handleRuntimeException(getRuntimeException()); 239 } catch (RuntimeException e) { 240 LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e); 241 } 242 } 243 return this; 244 } 245 246 abstract RuntimeException getRuntimeException(); 247 248 abstract E getException(); 249 250 abstract V getResult(); 251 252 abstract boolean hasException(); 253 254 abstract boolean hasResult(); 255 } 256 257 private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> { 258 private final RuntimeException runtimeException; 259 260 private RuntimeExceptionPromise(RuntimeException runtimeException) { 261 this.runtimeException = runtimeException; 262 } 263 264 @Override 265 RuntimeException getRuntimeException() { 266 return runtimeException; 267 } 268 269 @Override 270 E getException() { 271 return null; 272 } 273 274 @Override 275 V getResult() { 276 return null; 277 } 278 279 @Override 280 boolean hasException() { 281 return false; 282 } 283 284 @Override 285 boolean hasResult() { 286 return false; 287 } 288 } 289 290 private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> { 291 private final E exception; 292 293 private ExceptionPromise(final E exception) { 294 this.exception = exception; 295 } 296 297 @Override 298 RuntimeException getRuntimeException() { 299 return null; 300 } 301 302 @Override 303 E getException() { 304 return exception; 305 } 306 307 @Override 308 V getResult() { 309 throw new IllegalStateException(); 310 } 311 312 @Override 313 boolean hasException() { 314 return true; 315 } 316 317 @Override 318 boolean hasResult() { 319 return false; 320 } 321 } 322 323 private static final class ResultPromise<V, E extends Exception> extends 324 CompletedPromise<V, E> { 325 private final V value; 326 327 private ResultPromise(final V value) { 328 this.value = value; 329 } 330 331 @Override 332 RuntimeException getRuntimeException() { 333 return null; 334 } 335 336 @Override 337 E getException() { 338 throw new IllegalStateException(); 339 } 340 341 @Override 342 V getResult() { 343 return value; 344 } 345 346 @Override 347 boolean hasException() { 348 return false; 349 } 350 351 @Override 352 boolean hasResult() { 353 return true; 354 } 355 } 356 357 private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC = 358 new AsyncFunction<Exception, Object, Exception>() { 359 @Override 360 public Promise<Object, Exception> apply(final Exception exception) throws Exception { 361 return newExceptionPromise(exception); 362 } 363 }; 364 365 private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC = 366 new Function<Exception, Object, Exception>() { 367 @Override 368 public Object apply(final Exception exception) throws Exception { 369 throw exception; 370 } 371 }; 372 373 private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC = 374 new AsyncFunction<Object, Object, Exception>() { 375 @Override 376 public Promise<Object, Exception> apply(final Object object) throws Exception { 377 return newResultPromise(object); 378 } 379 }; 380 381 private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC = 382 new Function<Object, Object, Exception>() { 383 @Override 384 public Object apply(final Object value) throws Exception { 385 return value; 386 } 387 }; 388 389 /** 390 * Returns a {@link Promise} representing an asynchronous task which has 391 * already failed with the provided exception. Attempts to get the result will 392 * immediately fail, and any listeners registered against the returned 393 * promise will be immediately invoked in the same thread as the caller. 394 * 395 * @param <V> 396 * The type of the task's result, or {@link Void} if the task 397 * does not return anything (i.e. it only has side-effects). 398 * @param <E> 399 * The type of the exception thrown by the task if it fails, or 400 * {@link NeverThrowsException} if the task cannot fail. 401 * @param exception 402 * The exception indicating why the asynchronous task has failed. 403 * @return A {@link Promise} representing an asynchronous task which has 404 * already failed with the provided exception. 405 */ 406 public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) { 407 return new ExceptionPromise<>(exception); 408 } 409 410 /** 411 * Returns a {@link Promise} representing an asynchronous task which has 412 * already succeeded with the provided result. Attempts to get the result 413 * will immediately return the result, and any listeners registered against 414 * the returned promise will be immediately invoked in the same thread as 415 * the caller. 416 * 417 * @param <V> 418 * The type of the task's result, or {@link Void} if the task 419 * does not return anything (i.e. it only has side-effects). 420 * @param <E> 421 * The type of the exception thrown by the task if it fails, or 422 * {@link NeverThrowsException} if the task cannot fail. 423 * @param result 424 * The result of the asynchronous task. 425 * @return A {@link Promise} representing an asynchronous task which has 426 * already succeeded with the provided result. 427 */ 428 public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) { 429 return new ResultPromise<>(result); 430 } 431 432 /** 433 * Returns a {@link Promise} which will be completed once all of the 434 * provided promises have succeeded, or as soon as one of them fails. 435 * 436 * @param <V> 437 * The type of the tasks' result, or {@link Void} if the tasks do 438 * not return anything (i.e. they only has side-effects). 439 * @param <E> 440 * The type of the exception thrown by the tasks if they fail, or 441 * {@link NeverThrowsException} if the tasks cannot fail. 442 * @param promises 443 * The list of tasks to be combined. 444 * @return A {@link Promise} which will be completed once all of the 445 * provided promises have succeeded, or as soon as one of them 446 * fails. 447 */ 448 public static <V, E extends Exception> Promise<List<V>, E> when( 449 final List<Promise<V, E>> promises) { 450 final int size = promises.size(); 451 final AtomicInteger remaining = new AtomicInteger(size); 452 final List<V> results = new ArrayList<>(size); 453 final PromiseImpl<List<V>, E> composite = PromiseImpl.create(); 454 for (final Promise<V, E> promise : promises) { 455 promise.thenOnResult(new ResultHandler<V>() { 456 @Override 457 public void handleResult(final V value) { 458 synchronized (results) { 459 results.add(value); 460 } 461 if (remaining.decrementAndGet() == 0) { 462 composite.handleResult(results); 463 } 464 } 465 }).thenOnException(new ExceptionHandler<E>() { 466 @Override 467 public void handleException(final E exception) { 468 composite.handleException(exception); 469 } 470 }).thenOnRuntimeException(new RuntimeExceptionHandler() { 471 @Override 472 public void handleRuntimeException(RuntimeException exception) { 473 composite.handleRuntimeException(exception); 474 } 475 }); 476 } 477 if (promises.isEmpty()) { 478 composite.handleResult(results); 479 } 480 return composite; 481 } 482 483 /** 484 * Returns a {@link Promise} which will be completed once all of the 485 * provided promises have succeeded, or as soon as one of them fails. 486 * 487 * @param <V> 488 * The type of the tasks' result, or {@link Void} if the tasks do 489 * not return anything (i.e. they only has side-effects). 490 * @param <E> 491 * The type of the exception thrown by the tasks if they fail, or 492 * {@link NeverThrowsException} if the tasks cannot fail. 493 * @param promises 494 * The list of tasks to be combined. 495 * @return A {@link Promise} which will be completed once all of the 496 * provided promises have succeeded, or as soon as one of them 497 * has thrown an exception. 498 */ 499 @SafeVarargs 500 public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) { 501 return when(Arrays.asList(promises)); 502 } 503 504 @SuppressWarnings("unchecked") 505 static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() { 506 return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC; 507 } 508 509 @SuppressWarnings("unchecked") 510 static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() { 511 return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC; 512 } 513 514 @SuppressWarnings("unchecked") 515 static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() { 516 return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC; 517 } 518 519 @SuppressWarnings("unchecked") 520 static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() { 521 return (Function<V, V, E>) RESULT_IDEM_FUNC; 522 } 523 524 private Promises() { 525 // Prevent instantiation. 526 } 527}