001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the License. 004 * 005 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 006 * specific language governing permission and limitations under the License. 007 * 008 * When distributing Covered Software, include this CDDL Header Notice in each file and include 009 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 010 * Header, with the fields enclosed by brackets [] replaced by your own identifying 011 * information: "Portions copyright [year] [name of copyright owner]". 012 * 013 * Copyright 2015 ForgeRock AS. 014 */ 015package org.forgerock.util.promise; 016 017import java.util.Queue; 018import java.util.concurrent.ConcurrentLinkedQueue; 019import java.util.concurrent.ExecutionException; 020import java.util.concurrent.TimeUnit; 021import java.util.concurrent.TimeoutException; 022 023import org.forgerock.util.AsyncFunction; 024import org.forgerock.util.Function; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * An implementation of {@link Promise} which can be used as is, or as the basis 030 * for more complex asynchronous behavior. A {@code PromiseImpl} must be 031 * completed by invoking one of: 032 * <ul> 033 * <li>{@link #handleResult} - marks the promise as having succeeded with the 034 * provide result 035 * <li>{@link #handleException} - marks the promise as having failed with the 036 * provided exception 037 * <li>{@link #cancel} - requests cancellation of the asynchronous task 038 * represented by the promise. Cancellation is only supported if the 039 * {@link #tryCancel(boolean)} is overridden and returns an exception. 040 * </ul> 041 * 042 * @param <V> 043 * The type of the task's result, or {@link Void} if the task does 044 * not return anything (i.e. it only has side-effects). 045 * @param <E> 046 * The type of the exception thrown by the task if it fails, or 047 * {@link NeverThrowsException} if the task cannot fail. 048 * @see Promise 049 * @see Promises 050 */ 051public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>, 052 ExceptionHandler<E>, RuntimeExceptionHandler { 053 // TODO: Is using monitor based sync better than AQS? 054 055 private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class); 056 057 private interface StateListener<V, E extends Exception> { 058 void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException); 059 } 060 061 /** 062 * State value indicating that this promise has not completed. 063 */ 064 private static final int PENDING = 0; 065 066 /** 067 * State value indicating that this promise has completed successfully 068 * (result set). 069 */ 070 private static final int HAS_RESULT = 1; 071 072 /** 073 * State value indicating that this promise has failed (exception set). 074 */ 075 private static final int HAS_EXCEPTION = 2; 076 077 /** 078 * State value indicating that this promise has been cancelled (exception set). 079 */ 080 private static final int CANCELLED = 3; 081 082 /** 083 * State value indicating that this promise has failed with a runtime exception. 084 */ 085 private static final int HAS_RUNTIME_EXCEPTION = 4; 086 087 /** 088 * Creates a new pending {@link Promise} implementation. 089 * 090 * @param <V> 091 * The type of the task's result, or {@link Void} if the task 092 * does not return anything (i.e. it only has side-effects). 093 * @param <E> 094 * The type of the exception thrown by the task if it fails, or 095 * {@link NeverThrowsException} if the task cannot fail. 096 * @return A new pending {@link Promise} implementation. 097 */ 098 public static <V, E extends Exception> PromiseImpl<V, E> create() { 099 return new PromiseImpl<>(); 100 } 101 102 private volatile int state = PENDING; 103 private V result = null; 104 private E exception = null; 105 private RuntimeException runtimeException = null; 106 107 private final Queue<StateListener<V, E>> listeners = 108 new ConcurrentLinkedQueue<>(); 109 110 /** 111 * Creates a new pending {@link Promise} implementation. This constructor is 112 * protected to allow for sub-classing. 113 */ 114 protected PromiseImpl() { 115 // No implementation. 116 } 117 118 @Override 119 public final boolean cancel(final boolean mayInterruptIfRunning) { 120 if (isDone()) { 121 // Fail-fast. 122 return false; 123 } 124 final E exception = tryCancel(mayInterruptIfRunning); 125 return exception != null && setState(CANCELLED, null, exception, null); 126 } 127 128 @Override 129 public final V get() throws InterruptedException, ExecutionException { 130 await(); // Publishes. 131 return get0(); 132 } 133 134 @Override 135 public final V get(final long timeout, final TimeUnit unit) throws InterruptedException, 136 ExecutionException, TimeoutException { 137 await(timeout, unit, false); // Publishes. 138 return get0(); 139 } 140 141 @Override 142 public final V getOrThrow() throws InterruptedException, E { 143 await(); // Publishes. 144 return getOrThrow0(); 145 } 146 147 @Override 148 public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException, 149 E, TimeoutException { 150 await(timeout, unit, false); // Publishes. 151 return getOrThrow0(); 152 } 153 154 @Override 155 public final V getOrThrowUninterruptibly() throws E { 156 boolean wasInterrupted = false; 157 try { 158 while (true) { 159 try { 160 return getOrThrow(); 161 } catch (final InterruptedException e) { 162 wasInterrupted = true; 163 } 164 } 165 } finally { 166 if (wasInterrupted) { 167 Thread.currentThread().interrupt(); 168 } 169 } 170 } 171 172 @Override 173 public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E, 174 TimeoutException { 175 try { 176 await(timeout, unit, true); // Publishes. 177 } catch (InterruptedException ignored) { 178 // Will never occur since interrupts are ignored. 179 } 180 return getOrThrow0(); 181 } 182 183 /** 184 * Signals that the asynchronous task represented by this promise has 185 * failed. If the task has already completed (i.e. {@code isDone() == true}) 186 * then calling this method has no effect and the provided result will be 187 * discarded. 188 * 189 * @param exception 190 * The exception indicating why the task failed. 191 * @see #tryHandleException(Exception) 192 */ 193 @Override 194 public final void handleException(final E exception) { 195 tryHandleException(exception); 196 } 197 198 @Override 199 public void handleRuntimeException(RuntimeException exception) { 200 setState(HAS_RUNTIME_EXCEPTION, null, null, exception); 201 } 202 203 /** 204 * Signals that the asynchronous task represented by this promise has 205 * succeeded. If the task has already completed (i.e. 206 * {@code isDone() == true}) then calling this method has no effect and the 207 * provided result will be discarded. 208 * 209 * @param result 210 * The result of the asynchronous task (may be {@code null}). 211 * @see #tryHandleResult(Object) 212 */ 213 @Override 214 public final void handleResult(final V result) { 215 tryHandleResult(result); 216 } 217 218 /** 219 * Attempts to signal that the asynchronous task represented by this promise 220 * has failed. If the task has already completed (i.e. 221 * {@code isDone() == true}) then calling this method has no effect and 222 * {@code false} is returned. 223 * <p> 224 * This method should be used in cases where multiple threads may 225 * concurrently attempt to complete a promise and need to release resources 226 * if the completion attempt fails. For example, an asynchronous TCP connect 227 * attempt may complete after a timeout has expired. In this case the 228 * connection should be immediately closed because it is never going to be 229 * used. 230 * 231 * @param exception 232 * The exception indicating why the task failed. 233 * @return {@code false} if this promise has already been completed, either 234 * due to normal termination, an exception, or cancellation (i.e. 235 * {@code isDone() == true}). 236 * @see #handleException(Exception) 237 * @see #isDone() 238 */ 239 public final boolean tryHandleException(final E exception) { 240 return setState(HAS_EXCEPTION, null, exception, null); 241 } 242 243 /** 244 * Attempts to signal that the asynchronous task represented by this promise 245 * has succeeded. If the task has already completed (i.e. 246 * {@code isDone() == true}) then calling this method has no effect and 247 * {@code false} is returned. 248 * <p> 249 * This method should be used in cases where multiple threads may 250 * concurrently attempt to complete a promise and need to release resources 251 * if the completion attempt fails. For example, an asynchronous TCP connect 252 * attempt may complete after a timeout has expired. In this case the 253 * connection should be immediately closed because it is never going to be 254 * used. 255 * 256 * @param result 257 * The result of the asynchronous task (may be {@code null}). 258 * @return {@code false} if this promise has already been completed, either 259 * due to normal termination, an exception, or cancellation (i.e. 260 * {@code isDone() == true}). 261 * @see #handleResult(Object) 262 * @see #isDone() 263 */ 264 public final boolean tryHandleResult(final V result) { 265 return setState(HAS_RESULT, result, null, null); 266 } 267 268 @Override 269 public final boolean isCancelled() { 270 return state == CANCELLED; 271 } 272 273 @Override 274 public final boolean isDone() { 275 return state != PENDING; 276 } 277 278 @Override 279 public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) { 280 addOrFireListener(new StateListener<V, E>() { 281 @Override 282 public void handleStateChange(final int newState, final V result, final E exception, 283 final RuntimeException runtimeException) { 284 if (newState == HAS_EXCEPTION || newState == CANCELLED) { 285 try { 286 onException.handleException(exception); 287 } catch (RuntimeException e) { 288 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 289 } 290 } 291 } 292 }); 293 return this; 294 } 295 296 @Override 297 public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) { 298 addOrFireListener(new StateListener<V, E>() { 299 @Override 300 public void handleStateChange(final int newState, final V result, final E exception, 301 final RuntimeException runtimeException) { 302 if (newState == HAS_RESULT) { 303 try { 304 onResult.handleResult(result); 305 } catch (RuntimeException e) { 306 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 307 } 308 } 309 } 310 }); 311 return this; 312 } 313 314 @Override 315 public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult, 316 final ExceptionHandler<? super E> onException) { 317 addOrFireListener(new StateListener<V, E>() { 318 @Override 319 public void handleStateChange(final int newState, final V result, final E exception, 320 final RuntimeException runtimeException) { 321 if (newState == HAS_RESULT) { 322 try { 323 onResult.handleResult(result); 324 } catch (RuntimeException e) { 325 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 326 } 327 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) { 328 try { 329 onException.handleException(exception); 330 } catch (RuntimeException e) { 331 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 332 } 333 } 334 } 335 }); 336 return this; 337 } 338 339 @Override 340 public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) { 341 addOrFireListener(new StateListener<V, E>() { 342 @Override 343 public void handleStateChange(final int newState, final V result, final E exception, 344 final RuntimeException runtimeException) { 345 if (newState != HAS_RUNTIME_EXCEPTION) { 346 try { 347 onResultOrException.run(); 348 } catch (RuntimeException e) { 349 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 350 } 351 } 352 } 353 }); 354 return this; 355 } 356 357 @Override 358 public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) { 359 return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction()); 360 } 361 362 @Override 363 public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) { 364 return then(Promises.<V, EOUT>resultIdempotentFunction(), onException); 365 } 366 367 @Override 368 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 369 final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) { 370 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>(); 371 addOrFireListener(new StateListener<V, E>() { 372 @Override 373 public void handleStateChange(final int newState, final V result, final E exception, 374 final RuntimeException runtimeException) { 375 try { 376 if (newState == HAS_RESULT) { 377 chained.handleResult(onResult.apply(result)); 378 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) { 379 chained.handleResult(onException.apply(exception)); 380 } else { 381 tryHandlingRuntimeException(runtimeException, chained); 382 } 383 } catch (final RuntimeException e) { 384 tryHandlingRuntimeException(e, chained); 385 } catch (final Exception e) { 386 chained.handleException((EOUT) e); 387 } 388 } 389 }); 390 return chained; 391 } 392 393 private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException, 394 final PromiseImpl<VOUT, EOUT> chained) { 395 try { 396 chained.handleRuntimeException(runtimeException); 397 } catch (Exception ignored) { 398 LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored); 399 } 400 } 401 402 @Override 403 public final Promise<V, E> thenAlways(final Runnable always) { 404 addOrFireListener(new StateListener<V, E>() { 405 @Override 406 public void handleStateChange(final int newState, final V result, final E exception, 407 final RuntimeException runtimeException) { 408 try { 409 always.run(); 410 } catch (RuntimeException e) { 411 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 412 } 413 } 414 }); 415 return this; 416 } 417 418 @Override 419 public final Promise<V, E> thenFinally(final Runnable onFinally) { 420 return thenAlways(onFinally); 421 } 422 423 @Override 424 public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) { 425 return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction()); 426 } 427 428 @Override 429 public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(AsyncFunction<? super E, V, EOUT> onException) { 430 return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException); 431 } 432 433 @Override 434 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 435 final AsyncFunction<? super V, VOUT, EOUT> onResult, 436 final AsyncFunction<? super E, VOUT, EOUT> onException) { 437 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>(); 438 addOrFireListener(new StateListener<V, E>() { 439 @Override 440 @SuppressWarnings("unchecked") 441 public void handleStateChange(final int newState, final V result, final E exception, 442 final RuntimeException runtimeException) { 443 try { 444 if (newState == HAS_RESULT) { 445 callNestedPromise(onResult.apply(result)); 446 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) { 447 callNestedPromise(onException.apply(exception)); 448 } else { 449 tryHandlingRuntimeException(runtimeException, chained); 450 } 451 } catch (final RuntimeException e) { 452 tryHandlingRuntimeException(e, chained); 453 } catch (final Exception e) { 454 chained.handleException((EOUT) e); 455 } 456 } 457 458 private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) { 459 nestedPromise 460 .thenOnResult(chained) 461 .thenOnException(chained) 462 .thenOnRuntimeException(chained); 463 } 464 }); 465 return chained; 466 } 467 468 @Override 469 public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) { 470 addOrFireListener(new StateListener<V, E>() { 471 @Override 472 public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) { 473 if (newState == HAS_RUNTIME_EXCEPTION) { 474 try { 475 onRuntimeException.handleRuntimeException(runtimeException); 476 } catch (RuntimeException e) { 477 LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e); 478 } 479 } 480 } 481 }); 482 return this; 483 } 484 485 /** 486 * Invoked when the client attempts to cancel the asynchronous task 487 * represented by this promise. Implementations which support cancellation 488 * should override this method to cancel the asynchronous task and, if 489 * successful, return an appropriate exception which can be used to signal 490 * that the task has failed. 491 * <p> 492 * By default cancellation is not supported and this method returns 493 * {@code null}. 494 * 495 * @param mayInterruptIfRunning 496 * {@code true} if the thread executing this task should be 497 * interrupted; otherwise, in-progress tasks are allowed to 498 * complete. 499 * @return {@code null} if cancellation was not supported or not possible, 500 * otherwise an appropriate exception. 501 */ 502 protected E tryCancel(final boolean mayInterruptIfRunning) { 503 return null; 504 } 505 506 private void addOrFireListener(final StateListener<V, E> listener) { 507 final int stateBefore = state; 508 if (stateBefore != PENDING) { 509 handleCompletion(listener, stateBefore); 510 } else { 511 listeners.add(listener); 512 final int stateAfter = state; 513 if (stateAfter != PENDING && listeners.remove(listener)) { 514 handleCompletion(listener, stateAfter); 515 } 516 } 517 } 518 519 private void handleCompletion(final StateListener<V, E> listener, final int completedState) { 520 try { 521 listener.handleStateChange(completedState, result, exception, runtimeException); 522 } catch (RuntimeException ignored) { 523 LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored); 524 } 525 } 526 527 private V get0() throws ExecutionException { 528 if (runtimeException != null) { 529 throw new ExecutionException(runtimeException); 530 } else if (exception != null) { 531 throw new ExecutionException(exception); 532 } else { 533 return result; 534 } 535 } 536 537 private V getOrThrow0() throws E { 538 if (runtimeException != null) { 539 throw runtimeException; 540 } else if (exception != null) { 541 throw exception; 542 } else { 543 return result; 544 } 545 } 546 547 private boolean setState(final int newState, final V result, final E exception, 548 final RuntimeException runtimeException) { 549 synchronized (this) { 550 if (state != PENDING) { 551 // Already completed. 552 return false; 553 } 554 this.result = result; 555 this.exception = exception; 556 this.runtimeException = runtimeException; 557 state = newState; // Publishes. 558 notifyAll(); // Wake up any blocked threads. 559 } 560 StateListener<V, E> listener; 561 while ((listener = listeners.poll()) != null) { 562 handleCompletion(listener, newState); 563 } 564 return true; 565 } 566 567 private void await() throws InterruptedException { 568 // Use double-check for fast-path. 569 if (state == PENDING) { 570 synchronized (this) { 571 while (state == PENDING) { 572 wait(); 573 } 574 } 575 } 576 } 577 578 private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly) 579 throws InterruptedException, TimeoutException { 580 // Use double-check for fast-path. 581 if (state == PENDING) { 582 final long timeoutMS = unit.toMillis(timeout); 583 final long endTimeMS = System.currentTimeMillis() + timeoutMS; 584 boolean wasInterrupted = false; 585 try { 586 synchronized (this) { 587 while (state == PENDING) { 588 final long remainingTimeMS = endTimeMS - System.currentTimeMillis(); 589 if (remainingTimeMS <= 0) { 590 throw new TimeoutException(); 591 } 592 try { 593 wait(remainingTimeMS); 594 } catch (final InterruptedException e) { 595 if (isUninterruptibly) { 596 wasInterrupted = true; 597 } else { 598 throw e; 599 } 600 } 601 } 602 } 603 } finally { 604 if (wasInterrupted) { 605 Thread.currentThread().interrupt(); 606 } 607 } 608 } 609 } 610}