001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2009-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.forgerock.opendj.ldap; 018 019import static com.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT; 020import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT; 021import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED; 022import static com.forgerock.opendj.ldap.CoreMessages.LDAP_CONNECTION_CONNECT_TIMEOUT; 023import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; 024import static java.util.concurrent.TimeUnit.*; 025 026import static org.forgerock.opendj.ldap.LdapException.newLdapException; 027import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest; 028import static org.forgerock.opendj.ldap.requests.Requests.newStartTLSExtendedRequest; 029import static org.forgerock.opendj.ldap.requests.Requests.unmodifiableSearchRequest; 030import static org.forgerock.opendj.ldap.responses.Responses.newBindResult; 031import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult; 032import static org.forgerock.opendj.ldap.responses.Responses.newResult; 033import static org.forgerock.opendj.ldap.spi.LdapPromiseImpl.newLdapPromiseImpl; 034import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise; 035import static org.forgerock.util.Utils.closeSilently; 036import static org.forgerock.util.promise.Promises.newResultPromise; 037import static org.forgerock.util.time.Duration.*; 038 039import java.util.Collections; 040import java.util.LinkedList; 041import java.util.List; 042import java.util.Queue; 043import java.util.concurrent.ConcurrentLinkedQueue; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.ScheduledFuture; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicReference; 050import java.util.concurrent.locks.AbstractQueuedSynchronizer; 051 052import javax.net.ssl.SSLContext; 053 054import org.forgerock.i18n.LocalizableMessage; 055import org.forgerock.i18n.slf4j.LocalizedLogger; 056import org.forgerock.opendj.ldap.requests.AbandonRequest; 057import org.forgerock.opendj.ldap.requests.AddRequest; 058import org.forgerock.opendj.ldap.requests.BindRequest; 059import org.forgerock.opendj.ldap.requests.CompareRequest; 060import org.forgerock.opendj.ldap.requests.DeleteRequest; 061import org.forgerock.opendj.ldap.requests.ExtendedRequest; 062import org.forgerock.opendj.ldap.requests.ModifyDNRequest; 063import org.forgerock.opendj.ldap.requests.ModifyRequest; 064import org.forgerock.opendj.ldap.requests.SearchRequest; 065import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest; 066import org.forgerock.opendj.ldap.requests.UnbindRequest; 067import org.forgerock.opendj.ldap.responses.BindResult; 068import org.forgerock.opendj.ldap.responses.CompareResult; 069import org.forgerock.opendj.ldap.responses.ExtendedResult; 070import org.forgerock.opendj.ldap.responses.Result; 071import org.forgerock.opendj.ldap.responses.SearchResultEntry; 072import org.forgerock.opendj.ldap.responses.SearchResultReference; 073import org.forgerock.opendj.ldap.spi.ConnectionState; 074import org.forgerock.opendj.ldap.spi.LDAPConnectionFactoryImpl; 075import org.forgerock.opendj.ldap.spi.LDAPConnectionImpl; 076import org.forgerock.opendj.ldap.spi.LdapPromiseImpl; 077import org.forgerock.opendj.ldap.spi.TransportProvider; 078import org.forgerock.util.AsyncFunction; 079import org.forgerock.util.Function; 080import org.forgerock.util.Option; 081import org.forgerock.util.Options; 082import org.forgerock.util.Reject; 083import org.forgerock.util.promise.ExceptionHandler; 084import org.forgerock.util.promise.Promise; 085import org.forgerock.util.promise.PromiseImpl; 086import org.forgerock.util.promise.ResultHandler; 087import org.forgerock.util.time.Duration; 088import org.forgerock.util.time.TimeService; 089 090import com.forgerock.opendj.util.ReferenceCountedObject; 091 092/** 093 * A factory class which can be used to obtain connections to an LDAP Directory Server. A connection attempt comprises 094 * of the following steps: 095 * <ul> 096 * <li>first of all a TCP connection to the remote LDAP server is obtained. The attempt will fail if a connection is 097 * not obtained within the configured {@link #CONNECT_TIMEOUT connect timeout} 098 * <li>if LDAPS (not StartTLS) is requested then an SSL handshake is performed. LDAPS is enabled by specifying the 099 * {@link #SSL_CONTEXT} option along with {@link #SSL_USE_STARTTLS} set to {@code false} 100 * <li>if StartTLS is requested then a StartTLS request is sent and then an SSL handshake performed once the response 101 * has been received. StartTLS is enabled by specifying the {@link #SSL_CONTEXT} option along with 102 * {@link #SSL_USE_STARTTLS} set to {@code true} 103 * <li>an initial authentication request is sent if the {@link #AUTHN_BIND_REQUEST} option is specified 104 * <li>if heart-beat support is enabled via the {@link #HEARTBEAT_ENABLED} option, and none of steps 2-4 were performed, 105 * then an initial heart-beat is sent in order to determine whether the directory service is available. 106 * <li>the connect attempt will fail if it does not complete within the configured connection timeout. If the SSL 107 * handshake, StartTLS request, initial bind request, or initial heart-beat fail for any reason then the connection 108 * attempt will be deemed to have failed and an appropriate error returned. 109 * </ul> 110 * Once a connection has been established heart-beats will be sent periodically on the connection based on the 111 * configured heart-beat interval. If the heart-beat times out then the server is assumed to be down and an appropriate 112 * {@link ConnectionException} generated and published to any registered {@link ConnectionEventListener}s. Note 113 * however, that heart-beats will only be sent when the connection is determined to be reasonably idle: there is no 114 * point in sending heart-beats if the connection has recently received a response. A connection is deemed to be idle 115 * if no response has been received during a period equivalent to half the heart-beat interval. 116 * <p> 117 * The LDAP protocol specifically precludes clients from performing operations while bind or startTLS requests are being 118 * performed. Likewise, a bind or startTLS request will cause active operations to be aborted. This factory coordinates 119 * heart-beats with bind or startTLS requests, ensuring that they are not performed concurrently. Specifically, bind and 120 * startTLS requests are queued up while a heart-beat is pending, and heart-beats are not sent at all while there are 121 * pending bind or startTLS requests. 122 */ 123public final class LDAPConnectionFactory extends CommonLDAPOptions implements ConnectionFactory { 124 private static final String CONNECT_TIMEOUT_PROPERTY = "org.forgerock.opendj.io.connectTimeout"; 125 private static final String REQUEST_TIMEOUT_PROPERTY = "org.forgerock.opendj.io.requestTimeout"; 126 private static final String TIMEOUT_PROPERTY = "org.forgerock.opendj.io.timeout"; 127 128 /** 129 * Configures the connection factory to return pre-authenticated connections using the specified {@link 130 * BindRequest}. The connections returned by the connection factory will support all operations with the exception 131 * of Bind requests. Attempts to perform a Bind will result in an {@code UnsupportedOperationException}. 132 * <p> 133 * If the Bind request fails for some reason (e.g. invalid credentials), then the connection attempt will fail and 134 * an {@link LdapException} will be thrown. 135 */ 136 public static final Option<BindRequest> AUTHN_BIND_REQUEST = Option.of(BindRequest.class, null); 137 138 /** 139 * Specifies the connect timeout spcified. If a connection is not established within the timeout period (incl. SSL 140 * negotiation, initial bind request, and/or heart-beat), then a {@link TimeoutResultException} error result will be 141 * returned. 142 * <p> 143 * The default operation timeout is 10 seconds and may be configured using the {@code 144 * org.forgerock.opendj.io.connectTimeout} property. A timeout setting of 0 causes the OS connect timeout to be 145 * used. 146 */ 147 public static final Option<Duration> CONNECT_TIMEOUT = 148 Option.withDefault(duration(getIntProperty(CONNECT_TIMEOUT_PROPERTY, 10000), MILLISECONDS)); 149 150 /** 151 * Configures the connection factory to periodically send "heart-beat" or "keep-alive" requests to the Directory 152 * Server. This feature allows client applications to proactively detect network problems or unresponsive 153 * servers. In addition, frequent heartbeat requests may also prevent load-balancers or Directory Servers from 154 * closing otherwise idle connections. 155 * <p> 156 * Before returning new connections to the application the factory will first send an initial heart-beat request in 157 * order to determine that the remote server is responsive. If the heart-beat request fails or is too slow to 158 * respond then the connection is closed immediately and an error returned to the client. 159 * <p> 160 * Once a connection has been established successfully (including the initial heart-beat request), the connection 161 * factory will periodically send heart-beat requests on the connection based on the configured heart-beat interval. 162 * If the Directory Server is too slow to respond to the heart-beat then the server is assumed to be down and an 163 * appropriate {@link ConnectionException} generated and published to any registered 164 * {@link ConnectionEventListener}s. Note however, that heart-beat requests will only be sent when the connection 165 * is determined to be reasonably idle: there is no point in sending heart-beats if the connection has recently 166 * received a response. A connection is deemed to be idle if no response has been received during a period 167 * equivalent to half the heart-beat interval. 168 * <p> 169 * The LDAP protocol specifically precludes clients from performing operations while bind or startTLS requests are 170 * being performed. Likewise, a bind or startTLS request will cause active operations to be aborted. The LDAP 171 * connection factory coordinates heart-beats with bind or startTLS requests, ensuring that they are not performed 172 * concurrently. Specifically, bind and startTLS requests are queued up while a heart-beat is pending, and 173 * heart-beats are not sent at all while there are pending bind or startTLS requests. 174 */ 175 public static final Option<Boolean> HEARTBEAT_ENABLED = Option.withDefault(false); 176 177 /** 178 * Specifies the time between successive heart-beat requests (default interval is 10 seconds). Heart-beats will only 179 * be sent if {@link #HEARTBEAT_ENABLED} is set to {@code true}. 180 * 181 * @see #HEARTBEAT_ENABLED 182 */ 183 public static final Option<Duration> HEARTBEAT_INTERVAL = Option.withDefault(duration(10, SECONDS)); 184 185 /** 186 * Specifies the scheduler which will be used for periodically sending heart-beat requests. A system-wide scheduler 187 * will be used by default. Heart-beats will only be sent if {@link #HEARTBEAT_ENABLED} is set to {@code true}. 188 * 189 * @see #HEARTBEAT_ENABLED 190 */ 191 public static final Option<ScheduledExecutorService> HEARTBEAT_SCHEDULER = 192 Option.of(ScheduledExecutorService.class, null); 193 194 /** 195 * Specifies the timeout for heart-beat requests, after which the remote Directory Server will be deemed to be 196 * unavailable (default timeout is 3 seconds). Heart-beats will only be sent if {@link #HEARTBEAT_ENABLED} is set to 197 * {@code true}. If a {@link #REQUEST_TIMEOUT request timeout} is also set then the lower of the two will be used 198 * for sending heart-beats. 199 * 200 * @see #HEARTBEAT_ENABLED 201 */ 202 public static final Option<Duration> HEARTBEAT_TIMEOUT = Option.withDefault(duration(3, SECONDS)); 203 204 /** 205 * Specifies the operation timeout. If a response is not received from the Directory Server within the timeout 206 * period, then the operation will be abandoned and a {@link TimeoutResultException} error result returned. A 207 * timeout setting of 0 disables operation timeout limits. 208 * <p> 209 * The default operation timeout is 0 (no timeout) and may be configured using the {@code 210 * org.forgerock.opendj.io.requestTimeout} property or the deprecated {@code org.forgerock.opendj.io.timeout} 211 * property. 212 */ 213 public static final Option<Duration> REQUEST_TIMEOUT = 214 Option.withDefault(duration(getIntProperty(REQUEST_TIMEOUT_PROPERTY, 215 getIntProperty(TIMEOUT_PROPERTY, 0)), 216 MILLISECONDS)); 217 218 /** 219 * Specifies the SSL context which will be used when initiating connections with the Directory Server. 220 * <p> 221 * By default no SSL context will be used, indicating that connections will not be secured. If an SSL context is set 222 * then connections will be secured using either SSL or StartTLS depending on {@link #SSL_USE_STARTTLS}. 223 */ 224 public static final Option<SSLContext> SSL_CONTEXT = Option.of(SSLContext.class, null); 225 226 /** 227 * Specifies the cipher suites enabled for secure connections with the Directory Server. 228 * <p> 229 * The suites must be supported by the SSLContext specified by option {@link #SSL_CONTEXT}. Only the suites listed 230 * in the parameter are enabled for use. 231 */ 232 @SuppressWarnings({ "unchecked", "rawtypes" }) 233 public static final Option<List<String>> SSL_ENABLED_CIPHER_SUITES = 234 (Option) Option.of(List.class, Collections.<String>emptyList()); 235 236 /** 237 * Specifies the protocol versions enabled for secure connections with the Directory Server. 238 * <p> 239 * The protocols must be supported by the SSLContext specified by option {@link #SSL_CONTEXT}. Only the protocols 240 * listed in the parameter are enabled for use. 241 */ 242 @SuppressWarnings({ "unchecked", "rawtypes" }) 243 public static final Option<List<String>> SSL_ENABLED_PROTOCOLS = 244 (Option) Option.of(List.class, Collections.<String>emptyList()); 245 246 /** 247 * Specifies whether SSL or StartTLS should be used for securing connections when an SSL context is specified. 248 * <p> 249 * By default SSL will be used in preference to StartTLS. 250 */ 251 public static final Option<Boolean> SSL_USE_STARTTLS = Option.withDefault(false); 252 253 /** Default heart-beat which will target the root DSE but not return any results. */ 254 private static final SearchRequest DEFAULT_HEARTBEAT = 255 unmodifiableSearchRequest(newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1")); 256 257 /** 258 * Specifies the parameters of the search request that will be used for heart-beats. The default heart-beat search 259 * request is a base object search against the root DSE requesting no attributes. Heart-beats will only be sent if 260 * {@link #HEARTBEAT_ENABLED} is set to {@code true}. 261 * 262 * @see #HEARTBEAT_ENABLED 263 */ 264 public static final Option<SearchRequest> HEARTBEAT_SEARCH_REQUEST = 265 Option.of(SearchRequest.class, DEFAULT_HEARTBEAT); 266 267 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 268 269 /** The overall timeout to use when establishing connections, including SSL, bind, and heart-beat. */ 270 private final long connectTimeoutMS; 271 272 /** 273 * The minimum amount of time the connection should remain idle (no responses) before starting to send heartbeats. 274 */ 275 private final long heartBeatDelayMS; 276 277 /** Indicates whether heartbeats should be performed. */ 278 private final Boolean heartBeatEnabled; 279 280 /** The heartbeat search request. */ 281 private final SearchRequest heartBeatRequest; 282 283 /** 284 * The heartbeat timeout in milli-seconds. The connection will be marked as failed if no heartbeat response is 285 * received within the timeout. 286 */ 287 private final long heartBeatTimeoutMS; 288 289 /** The interval between successive heartbeats. */ 290 private final long heartBeatintervalMS; 291 292 /** The factory responsible for handling the low-level network communication with the Directory Server. */ 293 private final LDAPConnectionFactoryImpl impl; 294 295 /** The optional bind request which will be used as the initial heartbeat if specified. */ 296 private final BindRequest initialBindRequest; 297 298 /** Flag which indicates whether this factory has been closed. */ 299 private final AtomicBoolean isClosed = new AtomicBoolean(); 300 301 /** A copy of the original options. This is only useful for debugging. */ 302 private final Options options; 303 304 /** Transport provider that provides the implementation of this factory. */ 305 private final TransportProvider provider; 306 307 /** 308 * Prevents the scheduler being released when there are remaining references (this factory or any connections). It 309 * is initially set to 1 because this factory has a reference. 310 */ 311 private final AtomicInteger referenceCount = new AtomicInteger(1); 312 313 /** The heartbeat scheduler. */ 314 private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; 315 316 /** Non-null if SSL or StartTLS should be used when creating new connections. */ 317 private final SSLContext sslContext; 318 319 /** The list of permitted SSL ciphers for SSL negotiation. */ 320 private final List<String> sslEnabledCipherSuites; 321 322 /** The list of permitted SSL protocols for SSL negotiation. */ 323 private final List<String> sslEnabledProtocols; 324 325 /** Indicates whether a StartTLS request should be sent immediately after connecting. */ 326 private final boolean sslUseStartTLS; 327 328 /** List of valid connections to which heartbeats will be sent. */ 329 private final List<ConnectionImpl> validConnections = new LinkedList<>(); 330 331 /** This is package private in order to allow unit tests to inject fake time stamps. */ 332 TimeService timeService = TimeService.SYSTEM; 333 334 /** Scheduled task which sends heart beats for all valid idle connections. */ 335 private final Runnable sendHeartBeatRunnable = new Runnable() { 336 @Override 337 public void run() { 338 boolean heartBeatSent = false; 339 for (final ConnectionImpl connection : getValidConnections()) { 340 heartBeatSent |= connection.sendHeartBeat(); 341 } 342 if (heartBeatSent) { 343 scheduler.get().schedule(checkHeartBeatRunnable, heartBeatTimeoutMS, MILLISECONDS); 344 } 345 } 346 }; 347 348 /** Scheduled task which checks that all heart beats have been received within the timeout period. */ 349 private final Runnable checkHeartBeatRunnable = new Runnable() { 350 @Override 351 public void run() { 352 for (final ConnectionImpl connection : getValidConnections()) { 353 connection.checkForHeartBeat(); 354 } 355 } 356 }; 357 358 /** The heartbeat scheduled future - which may be null if heartbeats are not being sent (no valid connections). */ 359 private ScheduledFuture<?> heartBeatFuture; 360 361 /** 362 * Creates a new LDAP connection factory which can be used to create LDAP connections to the Directory Server at the 363 * provided host and port number. 364 * 365 * @param host 366 * The host name. 367 * @param port 368 * The port number. 369 * @throws NullPointerException 370 * If {@code host} was {@code null}. 371 * @throws ProviderNotFoundException 372 * if no provider is available or if the provider requested using options is not found. 373 */ 374 public LDAPConnectionFactory(final String host, final int port) { 375 this(host, port, Options.defaultOptions()); 376 } 377 378 /** 379 * Creates a new LDAP connection factory which can be used to create LDAP connections to the Directory Server at the 380 * provided host and port number. 381 * 382 * @param host 383 * The host name. 384 * @param port 385 * The port number. 386 * @param options 387 * The LDAP options to use when creating connections. 388 * @throws NullPointerException 389 * If {@code host} or {@code options} was {@code null}. 390 * @throws ProviderNotFoundException 391 * if no provider is available or if the provider requested using options is not found. 392 */ 393 public LDAPConnectionFactory(final String host, final int port, final Options options) { 394 Reject.ifNull(host, options); 395 396 this.connectTimeoutMS = options.get(CONNECT_TIMEOUT).to(TimeUnit.MILLISECONDS); 397 Reject.ifTrue(connectTimeoutMS < 0, "connect timeout must be >= 0"); 398 Reject.ifTrue(options.get(REQUEST_TIMEOUT).getValue() < 0, "request timeout must be >= 0"); 399 400 this.heartBeatEnabled = options.get(HEARTBEAT_ENABLED); 401 this.heartBeatintervalMS = options.get(HEARTBEAT_INTERVAL).to(TimeUnit.MILLISECONDS); 402 this.heartBeatTimeoutMS = options.get(HEARTBEAT_TIMEOUT).to(TimeUnit.MILLISECONDS); 403 this.heartBeatDelayMS = heartBeatintervalMS / 2; 404 this.heartBeatRequest = options.get(HEARTBEAT_SEARCH_REQUEST); 405 if (heartBeatEnabled) { 406 Reject.ifTrue(heartBeatintervalMS <= 0, "heart-beat interval must be positive"); 407 Reject.ifTrue(heartBeatTimeoutMS <= 0, "heart-beat timeout must be positive"); 408 } 409 410 this.provider = getTransportProvider(options); 411 this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(options.get(HEARTBEAT_SCHEDULER)); 412 this.impl = provider.getLDAPConnectionFactory(host, port, options); 413 this.initialBindRequest = options.get(AUTHN_BIND_REQUEST); 414 this.sslContext = options.get(SSL_CONTEXT); 415 this.sslUseStartTLS = options.get(SSL_USE_STARTTLS); 416 this.sslEnabledProtocols = options.get(SSL_ENABLED_PROTOCOLS); 417 this.sslEnabledCipherSuites = options.get(SSL_ENABLED_CIPHER_SUITES); 418 419 this.options = Options.copyOf(options); 420 } 421 422 @Override 423 public void close() { 424 if (isClosed.compareAndSet(false, true)) { 425 synchronized (validConnections) { 426 if (!validConnections.isEmpty()) { 427 logger.debug(LocalizableMessage.raw( 428 "HeartbeatConnectionFactory '%s' is closing while %d active connections remain", 429 this, 430 validConnections.size())); 431 } 432 } 433 releaseScheduler(); 434 impl.close(); 435 } 436 } 437 438 @Override 439 public Connection getConnection() throws LdapException { 440 return getConnectionAsync().getOrThrowUninterruptibly(); 441 } 442 443 @Override 444 public Promise<Connection, LdapException> getConnectionAsync() { 445 acquireScheduler(); // Protect scheduler. 446 447 // Register the connect timeout timer. 448 final PromiseImpl<Connection, LdapException> promise = PromiseImpl.create(); 449 final AtomicReference<LDAPConnectionImpl> connectionHolder = new AtomicReference<>(); 450 final ScheduledFuture<?> timeoutFuture; 451 if (connectTimeoutMS > 0) { 452 timeoutFuture = scheduler.get().schedule(new Runnable() { 453 @Override 454 public void run() { 455 if (promise.tryHandleException(newConnectTimeoutError())) { 456 closeSilently(connectionHolder.get()); 457 releaseScheduler(); 458 } 459 } 460 }, connectTimeoutMS, MILLISECONDS); 461 } else { 462 timeoutFuture = null; 463 } 464 465 // Now connect, negotiate SSL, etc. 466 impl.getConnectionAsync() 467 // Save the connection. 468 .then(new Function<LDAPConnectionImpl, LDAPConnectionImpl, LdapException>() { 469 @Override 470 public LDAPConnectionImpl apply(final LDAPConnectionImpl connection) throws LdapException { 471 connectionHolder.set(connection); 472 return connection; 473 } 474 }) 475 .thenAsync(performStartTLSIfNeeded()) 476 .thenAsync(performSSLHandShakeIfNeeded(connectionHolder)) 477 .thenAsync(performInitialBindIfNeeded(connectionHolder)) 478 .thenAsync(performInitialHeartBeatIfNeeded(connectionHolder)) 479 .thenOnResult(new ResultHandler<Result>() { 480 @Override 481 public void handleResult(Result result) { 482 if (timeoutFuture != null) { 483 timeoutFuture.cancel(false); 484 } 485 final LDAPConnectionImpl connection = connectionHolder.get(); 486 final ConnectionImpl connectionImpl = new ConnectionImpl(connection); 487 if (!promise.tryHandleResult(registerConnection(connectionImpl))) { 488 connectionImpl.close(); 489 } 490 } 491 }) 492 .thenOnException(new ExceptionHandler<LdapException>() { 493 @Override 494 public void handleException(final LdapException e) { 495 if (timeoutFuture != null) { 496 timeoutFuture.cancel(false); 497 } 498 final LdapException connectException; 499 if (e instanceof ConnectionException || e instanceof AuthenticationException) { 500 connectException = e; 501 } else if (e instanceof TimeoutResultException) { 502 connectException = newHeartBeatTimeoutError(); 503 } else { 504 connectException = newLdapException(ResultCode.CLIENT_SIDE_SERVER_DOWN, 505 ERR_CONNECTION_UNEXPECTED.get(e), 506 e); 507 } 508 if (promise.tryHandleException(connectException)) { 509 closeSilently(connectionHolder.get()); 510 releaseScheduler(); 511 } 512 } 513 }); 514 515 return promise; 516 } 517 518 /** 519 * Returns the host name of the Directory Server. The returned host name is the same host name that was provided 520 * during construction and may be an IP address. More specifically, this method will not perform a reverse DNS 521 * lookup. 522 * 523 * @return The host name of the Directory Server. 524 */ 525 public String getHostName() { 526 return impl.getHostName(); 527 } 528 529 /** 530 * Returns the port of the Directory Server. 531 * 532 * @return The port of the Directory Server. 533 */ 534 public int getPort() { 535 return impl.getPort(); 536 } 537 538 /** 539 * Returns the name of the transport provider, which provides the implementation of this factory. 540 * 541 * @return The name of actual transport provider. 542 */ 543 public String getProviderName() { 544 return provider.getName(); 545 } 546 547 @Override 548 public String toString() { 549 return "LDAPConnectionFactory(provider=`" + getProviderName() + ", host='" + getHostName() + "', port=" 550 + getPort() + ", options=" + options + ")"; 551 } 552 553 private void acquireScheduler() { 554 /* 555 * If the factory is not closed then we need to prevent the scheduler from being released while the 556 * connection attempt is in progress. 557 */ 558 referenceCount.incrementAndGet(); 559 if (isClosed.get()) { 560 releaseScheduler(); 561 throw new IllegalStateException("Attempted to get a connection on closed factory"); 562 } 563 } 564 565 private ConnectionImpl[] getValidConnections() { 566 synchronized (validConnections) { 567 return validConnections.toArray(new ConnectionImpl[validConnections.size()]); 568 } 569 } 570 571 private LdapException newConnectTimeoutError() { 572 final LocalizableMessage msg = LDAP_CONNECTION_CONNECT_TIMEOUT.get(impl.getSocketAddress(), connectTimeoutMS); 573 return newLdapException(ResultCode.CLIENT_SIDE_CONNECT_ERROR, msg.toString()); 574 } 575 576 private LdapException newHeartBeatTimeoutError() { 577 return newLdapException(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT.get(heartBeatTimeoutMS)); 578 } 579 580 private AsyncFunction<Void, BindResult, LdapException> performInitialBindIfNeeded( 581 final AtomicReference<LDAPConnectionImpl> connectionHolder) { 582 return new AsyncFunction<Void, BindResult, LdapException>() { 583 @Override 584 public Promise<BindResult, LdapException> apply(final Void ignored) throws LdapException { 585 if (initialBindRequest != null) { 586 return connectionHolder.get().bindAsync(initialBindRequest, null); 587 } else { 588 return newResultPromise(newBindResult(ResultCode.SUCCESS)); 589 } 590 } 591 }; 592 } 593 594 private AsyncFunction<BindResult, Result, LdapException> performInitialHeartBeatIfNeeded( 595 final AtomicReference<LDAPConnectionImpl> connectionHolder) { 596 return new AsyncFunction<BindResult, Result, LdapException>() { 597 @Override 598 public Promise<Result, LdapException> apply(final BindResult ignored) throws LdapException { 599 // Only send an initial heartbeat if we haven't already interacted with the server. 600 if (heartBeatEnabled && sslContext == null && initialBindRequest == null) { 601 return connectionHolder.get().searchAsync(heartBeatRequest, null, null); 602 } else { 603 return newResultPromise(newResult(ResultCode.SUCCESS)); 604 } 605 } 606 }; 607 } 608 609 private AsyncFunction<ExtendedResult, Void, LdapException> performSSLHandShakeIfNeeded( 610 final AtomicReference<LDAPConnectionImpl> connectionHolder) { 611 return new AsyncFunction<ExtendedResult, Void, LdapException>() { 612 @Override 613 public Promise<Void, LdapException> apply(final ExtendedResult extendedResult) throws LdapException { 614 if (sslContext != null && !sslUseStartTLS) { 615 return connectionHolder.get().enableTLS(sslContext, sslEnabledProtocols, sslEnabledCipherSuites); 616 } else { 617 return newResultPromise(null); 618 } 619 } 620 }; 621 } 622 623 private AsyncFunction<LDAPConnectionImpl, ExtendedResult, LdapException> performStartTLSIfNeeded() { 624 return new AsyncFunction<LDAPConnectionImpl, ExtendedResult, LdapException>() { 625 @Override 626 public Promise<ExtendedResult, LdapException> apply(final LDAPConnectionImpl connection) 627 throws LdapException { 628 if (sslContext != null && sslUseStartTLS) { 629 final StartTLSExtendedRequest startTLS = newStartTLSExtendedRequest(sslContext) 630 .addEnabledCipherSuite(sslEnabledCipherSuites) 631 .addEnabledProtocol(sslEnabledProtocols); 632 return connection.extendedRequestAsync(startTLS, null); 633 } else { 634 return newResultPromise((ExtendedResult) newGenericExtendedResult(ResultCode.SUCCESS)); 635 } 636 } 637 }; 638 } 639 640 private Connection registerConnection(final ConnectionImpl heartBeatConnection) { 641 synchronized (validConnections) { 642 if (heartBeatEnabled && validConnections.isEmpty()) { 643 // This is the first active connection, so start the heart beat. 644 heartBeatFuture = scheduler.get() 645 .scheduleWithFixedDelay(sendHeartBeatRunnable, 646 0, 647 heartBeatintervalMS, 648 TimeUnit.MILLISECONDS); 649 } 650 validConnections.add(heartBeatConnection); 651 } 652 return heartBeatConnection; 653 } 654 655 private void releaseScheduler() { 656 if (referenceCount.decrementAndGet() == 0) { 657 scheduler.release(); 658 } 659 } 660 661 /** 662 * This synchronizer prevents Bind or StartTLS operations from being processed concurrently with heart-beats. This 663 * is required because the LDAP protocol specifically states that servers receiving a Bind operation should either 664 * wait for existing operations to complete or abandon them. The same presumably applies to StartTLS operations. 665 * Note that concurrent bind/StartTLS operations are not permitted. 666 * <p> 667 * This connection factory only coordinates Bind and StartTLS requests with heart-beats. It does not attempt to 668 * prevent or control attempts to send multiple concurrent Bind or StartTLS operations, etc. 669 * <p> 670 * This synchronizer can be thought of as cross between a read-write lock and a semaphore. Unlike a read-write lock 671 * there is no requirement that a thread releasing a lock must hold it. In addition, this synchronizer does not 672 * support reentrancy. A thread attempting to acquire exclusively more than once will deadlock, and a thread 673 * attempting to acquire shared more than once will succeed and be required to release an equivalent number of 674 * times. 675 * <p> 676 * The synchronizer has three states: 677 * <ul> 678 * <li> UNLOCKED(0) - the synchronizer may be acquired shared or exclusively 679 * <li> LOCKED_EXCLUSIVELY(-1) - the synchronizer is held exclusively and cannot be acquired shared or 680 * exclusively. An exclusive lock is held while a heart beat is in progress 681 * <li> LOCKED_SHARED(>0) - the synchronizer is held shared and cannot be acquired exclusively. N shared locks 682 * are held while N Bind or StartTLS operations are in progress. 683 * </ul> 684 */ 685 private static final class Sync extends AbstractQueuedSynchronizer { 686 /** Lock states. Positive values indicate that the shared lock is taken. */ 687 private static final int LOCKED_EXCLUSIVELY = -1; 688 private static final int UNLOCKED = 0; // initial state 689 690 /** Keep compiler quiet. */ 691 private static final long serialVersionUID = -3590428415442668336L; 692 693 boolean isHeld() { 694 return getState() != 0; 695 } 696 697 void lockShared() { 698 acquireShared(1); 699 } 700 701 boolean tryLockExclusively() { 702 return tryAcquire(0 /* unused */); 703 } 704 705 boolean tryLockShared() { 706 return tryAcquireShared(1) > 0; 707 } 708 709 void unlockExclusively() { 710 release(0 /* unused */); 711 } 712 713 void unlockShared() { 714 releaseShared(0 /* unused */); 715 } 716 717 @Override 718 protected boolean isHeldExclusively() { 719 return getState() == LOCKED_EXCLUSIVELY; 720 } 721 722 @Override 723 protected boolean tryAcquire(final int ignored) { 724 if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) { 725 setExclusiveOwnerThread(Thread.currentThread()); 726 return true; 727 } 728 return false; 729 } 730 731 @Override 732 protected int tryAcquireShared(final int readers) { 733 for (;;) { 734 final int state = getState(); 735 if (state == LOCKED_EXCLUSIVELY) { 736 return LOCKED_EXCLUSIVELY; // failed 737 } 738 final int newState = state + readers; 739 if (compareAndSetState(state, newState)) { 740 return newState; // succeeded + more readers allowed 741 } 742 } 743 } 744 745 @Override 746 protected boolean tryRelease(final int ignored) { 747 if (getState() != LOCKED_EXCLUSIVELY) { 748 throw new IllegalMonitorStateException(); 749 } 750 setExclusiveOwnerThread(null); 751 setState(UNLOCKED); 752 return true; 753 } 754 755 @Override 756 protected boolean tryReleaseShared(final int ignored) { 757 for (;;) { 758 final int state = getState(); 759 if (state == UNLOCKED || state == LOCKED_EXCLUSIVELY) { 760 throw new IllegalMonitorStateException(); 761 } 762 final int newState = state - 1; 763 if (compareAndSetState(state, newState)) { 764 /* 765 * We could always return true here, but since there cannot be waiting readers we can specialize 766 * for waiting writers. 767 */ 768 return newState == UNLOCKED; 769 } 770 } 771 } 772 773 } 774 775 /** A connection that sends heart beats and supports all operations. */ 776 private final class ConnectionImpl extends AbstractAsynchronousConnection implements ConnectionEventListener { 777 /** The wrapped connection. */ 778 private final LDAPConnectionImpl connectionImpl; 779 780 /** List of pending Bind or StartTLS requests which must be invoked once the current heart beat completes. */ 781 private final Queue<Runnable> pendingBindOrStartTLSRequests = new ConcurrentLinkedQueue<>(); 782 783 /** 784 * List of pending responses for all active operations. These will be signaled if no heart beat is detected 785 * within the permitted timeout period. 786 */ 787 private final Queue<LdapResultHandler<?>> pendingResults = new ConcurrentLinkedQueue<>(); 788 789 /** Internal connection state. */ 790 private final ConnectionState state = new ConnectionState(); 791 792 /** Coordinates heart-beats with Bind and StartTLS requests. */ 793 private final Sync sync = new Sync(); 794 795 /** Timestamp of last response received (any response, not just heart beats). */ 796 private volatile long lastResponseTimestamp = timeService.now(); 797 798 private ConnectionImpl(final LDAPConnectionImpl connectionImpl) { 799 this.connectionImpl = connectionImpl; 800 connectionImpl.addConnectionEventListener(this); 801 } 802 803 @Override 804 public LdapPromise<Void> abandonAsync(final AbandonRequest request) { 805 return connectionImpl.abandonAsync(request); 806 } 807 808 @Override 809 public LdapPromise<Result> addAsync( 810 final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 811 if (hasConnectionErrorOccurred()) { 812 return newConnectionErrorPromise(); 813 } 814 return timestampPromise(connectionImpl.addAsync(request, intermediateResponseHandler)); 815 } 816 817 @Override 818 public void addConnectionEventListener(final ConnectionEventListener listener) { 819 state.addConnectionEventListener(listener); 820 } 821 822 @Override 823 public LdapPromise<BindResult> bindAsync( 824 final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 825 if (hasConnectionErrorOccurred()) { 826 return newConnectionErrorPromise(); 827 } 828 if (sync.tryLockShared()) { 829 // Fast path 830 return timestampBindOrStartTLSPromise(connectionImpl.bindAsync(request, intermediateResponseHandler)); 831 } 832 return enqueueBindOrStartTLSPromise(new AsyncFunction<Void, BindResult, LdapException>() { 833 @Override 834 public Promise<BindResult, LdapException> apply(Void value) throws LdapException { 835 return timestampBindOrStartTLSPromise(connectionImpl.bindAsync(request, 836 intermediateResponseHandler)); 837 } 838 }); 839 } 840 841 @Override 842 public void close() { 843 handleConnectionClosed(); 844 connectionImpl.close(); 845 } 846 847 @Override 848 public void close(final UnbindRequest request, final String reason) { 849 handleConnectionClosed(); 850 connectionImpl.close(request, reason); 851 } 852 853 @Override 854 public LdapPromise<CompareResult> compareAsync( 855 final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 856 if (hasConnectionErrorOccurred()) { 857 return newConnectionErrorPromise(); 858 } 859 return timestampPromise(connectionImpl.compareAsync(request, intermediateResponseHandler)); 860 } 861 862 @Override 863 public LdapPromise<Result> deleteAsync( 864 final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 865 if (hasConnectionErrorOccurred()) { 866 return newConnectionErrorPromise(); 867 } 868 return timestampPromise(connectionImpl.deleteAsync(request, intermediateResponseHandler)); 869 } 870 871 @Override 872 public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync( 873 final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) { 874 if (hasConnectionErrorOccurred()) { 875 return newConnectionErrorPromise(); 876 } 877 if (!isStartTLSRequest(request)) { 878 return timestampPromise(connectionImpl.extendedRequestAsync(request, intermediateResponseHandler)); 879 } 880 if (sync.tryLockShared()) { 881 // Fast path 882 return timestampBindOrStartTLSPromise( 883 connectionImpl.extendedRequestAsync(request, intermediateResponseHandler)); 884 } 885 return enqueueBindOrStartTLSPromise(new AsyncFunction<Void, R, LdapException>() { 886 @Override 887 public Promise<R, LdapException> apply(Void value) throws LdapException { 888 return timestampBindOrStartTLSPromise( 889 connectionImpl.extendedRequestAsync(request, intermediateResponseHandler)); 890 } 891 }); 892 } 893 894 @Override 895 public void handleConnectionClosed() { 896 if (state.notifyConnectionClosed()) { 897 failPendingResults(newLdapException(ResultCode.CLIENT_SIDE_USER_CANCELLED, 898 HBCF_CONNECTION_CLOSED_BY_CLIENT.get())); 899 synchronized (validConnections) { 900 connectionImpl.removeConnectionEventListener(this); 901 validConnections.remove(this); 902 if (heartBeatEnabled && validConnections.isEmpty()) { 903 // This is the last active connection, so stop the heartbeat. 904 heartBeatFuture.cancel(false); 905 } 906 } 907 releaseScheduler(); 908 } 909 } 910 911 @Override 912 public void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) { 913 if (state.notifyConnectionError(isDisconnectNotification, error)) { 914 failPendingResults(error); 915 } 916 } 917 918 @Override 919 public void handleUnsolicitedNotification(final ExtendedResult notification) { 920 timestamp(notification); 921 state.notifyUnsolicitedNotification(notification); 922 } 923 924 @Override 925 public boolean isClosed() { 926 return state.isClosed(); 927 } 928 929 @Override 930 public boolean isValid() { 931 return state.isValid() && connectionImpl.isValid(); 932 } 933 934 @Override 935 public LdapPromise<Result> modifyAsync( 936 final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 937 if (hasConnectionErrorOccurred()) { 938 return newConnectionErrorPromise(); 939 } 940 return timestampPromise(connectionImpl.modifyAsync(request, intermediateResponseHandler)); 941 } 942 943 @Override 944 public LdapPromise<Result> modifyDNAsync( 945 final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) { 946 if (hasConnectionErrorOccurred()) { 947 return newConnectionErrorPromise(); 948 } 949 return timestampPromise(connectionImpl.modifyDNAsync(request, intermediateResponseHandler)); 950 } 951 952 @Override 953 public void removeConnectionEventListener(final ConnectionEventListener listener) { 954 state.removeConnectionEventListener(listener); 955 } 956 957 @Override 958 public LdapPromise<Result> searchAsync( 959 final SearchRequest request, 960 final IntermediateResponseHandler intermediateResponseHandler, 961 final SearchResultHandler searchHandler) { 962 if (hasConnectionErrorOccurred()) { 963 return newConnectionErrorPromise(); 964 } 965 966 final AtomicBoolean searchDone = new AtomicBoolean(); 967 final SearchResultHandler entryHandler = new SearchResultHandler() { 968 @Override 969 public synchronized boolean handleEntry(SearchResultEntry entry) { 970 if (!searchDone.get()) { 971 timestamp(entry); 972 if (searchHandler != null) { 973 searchHandler.handleEntry(entry); 974 } 975 } 976 return true; 977 } 978 979 @Override 980 public synchronized boolean handleReference(SearchResultReference reference) { 981 if (!searchDone.get()) { 982 timestamp(reference); 983 if (searchHandler != null) { 984 searchHandler.handleReference(reference); 985 } 986 } 987 return true; 988 } 989 }; 990 return timestampPromise(connectionImpl.searchAsync(request, intermediateResponseHandler, entryHandler) 991 .thenOnResultOrException(new Runnable() { 992 @Override 993 public void run() { 994 searchDone.getAndSet(true); 995 } 996 })); 997 } 998 999 @Override 1000 public String toString() { 1001 return connectionImpl.toString(); 1002 } 1003 1004 private void checkForHeartBeat() { 1005 if (sync.isHeld()) { 1006 /* 1007 * A heart beat or bind/startTLS is still in progress, but it should have completed by now. Let's 1008 * avoid aggressively terminating the connection, because the heart beat may simply have been delayed 1009 * by a sudden surge of activity. Therefore, only flag the connection as failed if no activity has been 1010 * seen on the connection since the heart beat was sent. 1011 */ 1012 final long currentTimeMillis = timeService.now(); 1013 if (lastResponseTimestamp < (currentTimeMillis - heartBeatTimeoutMS)) { 1014 logger.warn(LocalizableMessage.raw("No heartbeat detected for connection '%s'", connectionImpl)); 1015 handleConnectionError(false, newHeartBeatTimeoutError()); 1016 } 1017 } 1018 } 1019 1020 private boolean hasConnectionErrorOccurred() { 1021 return state.getConnectionError() != null; 1022 } 1023 1024 private <R extends Result> LdapPromise<R> enqueueBindOrStartTLSPromise( 1025 AsyncFunction<Void, R, LdapException> doRequest) { 1026 /* 1027 * A heart beat must be in progress so create a runnable task which will be executed when the heart beat 1028 * completes. 1029 */ 1030 final LdapPromiseImpl<Void> promise = newLdapPromiseImpl(); 1031 final LdapPromise<R> result = promise.thenAsync(doRequest); 1032 1033 // Enqueue and flush if the heart beat has completed in the mean time. 1034 pendingBindOrStartTLSRequests.offer(new Runnable() { 1035 @Override 1036 public void run() { 1037 // FIXME: Handle cancel chaining. 1038 if (!result.isCancelled()) { 1039 sync.lockShared(); // Will not block. 1040 promise.handleResult(null); 1041 } 1042 } 1043 }); 1044 flushPendingBindOrStartTLSRequests(); 1045 return result; 1046 } 1047 1048 private void failPendingResults(final LdapException error) { 1049 // Peek instead of pool because notification is responsible for removing the element from the queue. 1050 LdapResultHandler<?> pendingResult; 1051 while ((pendingResult = pendingResults.peek()) != null) { 1052 pendingResult.handleException(error); 1053 } 1054 } 1055 1056 private void flushPendingBindOrStartTLSRequests() { 1057 if (!pendingBindOrStartTLSRequests.isEmpty()) { 1058 /* 1059 * The pending requests will acquire the shared lock, but we take it here anyway to ensure that 1060 * pending requests do not get blocked. 1061 */ 1062 if (sync.tryLockShared()) { 1063 try { 1064 Runnable pendingRequest; 1065 while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) { 1066 // Dispatch the waiting request. This will not block. 1067 pendingRequest.run(); 1068 } 1069 } finally { 1070 sync.unlockShared(); 1071 } 1072 } 1073 } 1074 } 1075 1076 private boolean isStartTLSRequest(final ExtendedRequest<?> request) { 1077 return request.getOID().equals(StartTLSExtendedRequest.OID); 1078 } 1079 1080 private <R> LdapPromise<R> newConnectionErrorPromise() { 1081 return newFailedLdapPromise(state.getConnectionError()); 1082 } 1083 1084 private void releaseBindOrStartTLSLock() { 1085 sync.unlockShared(); 1086 } 1087 1088 private void releaseHeartBeatLock() { 1089 sync.unlockExclusively(); 1090 flushPendingBindOrStartTLSRequests(); 1091 } 1092 1093 /** 1094 * Sends a heart beat on this connection if required to do so. 1095 * 1096 * @return {@code true} if a heart beat was sent, otherwise {@code false}. 1097 */ 1098 private boolean sendHeartBeat() { 1099 // Don't attempt to send a heart beat if the connection has already failed. 1100 if (!state.isValid()) { 1101 return false; 1102 } 1103 1104 // Only send the heart beat if the connection has been idle for some time. 1105 final long currentTimeMillis = timeService.now(); 1106 if (currentTimeMillis < (lastResponseTimestamp + heartBeatDelayMS)) { 1107 return false; 1108 } 1109 1110 /* Don't send a heart beat if there is already a heart beat, bind, or startTLS in progress. Note that the 1111 * bind/startTLS response will update the lastResponseTimestamp as if it were a heart beat. 1112 */ 1113 if (sync.tryLockExclusively()) { 1114 try { 1115 connectionImpl.searchAsync(heartBeatRequest, null, new SearchResultHandler() { 1116 @Override 1117 public boolean handleEntry(final SearchResultEntry entry) { 1118 timestamp(entry); 1119 return true; 1120 } 1121 1122 @Override 1123 public boolean handleReference(final SearchResultReference reference) { 1124 timestamp(reference); 1125 return true; 1126 } 1127 }).thenOnResult(new org.forgerock.util.promise.ResultHandler<Result>() { 1128 @Override 1129 public void handleResult(Result result) { 1130 timestamp(result); 1131 releaseHeartBeatLock(); 1132 } 1133 }).thenOnException(new ExceptionHandler<LdapException>() { 1134 @Override 1135 public void handleException(LdapException exception) { 1136 /* 1137 * Connection failure will be handled by connection event listener. Ignore cancellation 1138 * errors since these indicate that the heart beat was aborted by a client-side close. 1139 */ 1140 if (!(exception instanceof CancelledResultException)) { 1141 /* 1142 * Log at debug level to avoid polluting the logs with benign password policy related 1143 * errors. See OPENDJ-1168 and OPENDJ-1167. 1144 */ 1145 logger.debug(LocalizableMessage.raw("Heartbeat failed for connection factory '%s'", 1146 LDAPConnectionFactory.this, 1147 exception)); 1148 timestamp(exception); 1149 } 1150 releaseHeartBeatLock(); 1151 } 1152 }); 1153 } catch (final IllegalStateException e) { 1154 /* 1155 * This may happen when we attempt to send the heart beat just after the connection is closed but 1156 * before we are notified. Release the lock because we're never going to get a response. 1157 */ 1158 releaseHeartBeatLock(); 1159 } 1160 } 1161 /* 1162 * Indicate that a the heartbeat should be checked even if a bind/startTLS is in progress, since these 1163 * operations will effectively act as the heartbeat. 1164 */ 1165 return true; 1166 } 1167 1168 private <R> R timestamp(final R response) { 1169 if (!(response instanceof ConnectionException)) { 1170 lastResponseTimestamp = timeService.now(); 1171 } 1172 return response; 1173 } 1174 1175 private <R extends Result> LdapPromise<R> timestampBindOrStartTLSPromise(LdapPromise<R> wrappedPromise) { 1176 return timestampPromise(wrappedPromise).thenOnResultOrException(new Runnable() { 1177 @Override 1178 public void run() { 1179 releaseBindOrStartTLSLock(); 1180 } 1181 }); 1182 } 1183 1184 private <R extends Result> LdapPromise<R> timestampPromise(LdapPromise<R> wrappedPromise) { 1185 final LdapPromiseImpl<R> outerPromise = new LdapPromiseImplWrapper<>(wrappedPromise); 1186 pendingResults.add(outerPromise); 1187 wrappedPromise.thenOnResult(new ResultHandler<R>() { 1188 @Override 1189 public void handleResult(R result) { 1190 outerPromise.handleResult(result); 1191 timestamp(result); 1192 } 1193 }).thenOnException(new ExceptionHandler<LdapException>() { 1194 @Override 1195 public void handleException(LdapException exception) { 1196 outerPromise.handleException(exception); 1197 timestamp(exception); 1198 } 1199 }); 1200 outerPromise.thenOnResultOrException(new Runnable() { 1201 @Override 1202 public void run() { 1203 pendingResults.remove(outerPromise); 1204 } 1205 }); 1206 if (hasConnectionErrorOccurred()) { 1207 outerPromise.handleException(state.getConnectionError()); 1208 } 1209 return outerPromise; 1210 } 1211 1212 private class LdapPromiseImplWrapper<R> extends LdapPromiseImpl<R> { 1213 protected LdapPromiseImplWrapper(final LdapPromise<R> wrappedPromise) { 1214 super(new PromiseImpl<R, LdapException>() { 1215 @Override 1216 protected LdapException tryCancel(boolean mayInterruptIfRunning) { 1217 /* 1218 * FIXME: if the inner cancel succeeds then this promise will be completed and we can never 1219 * indicate that this cancel request has succeeded. 1220 */ 1221 wrappedPromise.cancel(mayInterruptIfRunning); 1222 return null; 1223 } 1224 }, wrappedPromise.getRequestID()); 1225 } 1226 } 1227 } 1228}