001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2009 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.protocol; 018 019import static org.opends.server.util.StaticUtils.*; 020 021import java.io.*; 022import java.net.Socket; 023import java.net.SocketException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReentrantLock; 030import java.util.zip.DataFormatException; 031 032import javax.net.ssl.SSLSocket; 033 034import org.opends.server.api.DirectoryThread; 035import org.forgerock.i18n.slf4j.LocalizedLogger; 036import org.opends.server.util.StaticUtils; 037 038/** 039 * This class defines a replication session using TLS. 040 */ 041public final class Session extends DirectoryThread implements Closeable 042{ 043 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 044 045 private final Socket plainSocket; 046 private final SSLSocket secureSocket; 047 private final InputStream plainInput; 048 private final OutputStream plainOutput; 049 private final byte[] rcvLengthBuf = new byte[8]; 050 private final String readableRemoteAddress; 051 private final String remoteAddress; 052 private final String localUrl; 053 054 /** 055 * The time the last message published to this session. 056 */ 057 private volatile long lastPublishTime; 058 059 /** 060 * The time the last message was received on this session. 061 */ 062 private volatile long lastReceiveTime; 063 064 /** 065 * Close and error guarded by stateLock: use a different lock to publish since 066 * publishing can block, and we don't want to block while closing failed 067 * connections. 068 */ 069 private final Object stateLock = new Object(); 070 private volatile boolean closeInitiated; 071 private Throwable sessionError; 072 073 /** 074 * Publish guarded by publishLock: use a full lock here so that we can 075 * optionally publish StopMsg during close. 076 */ 077 private final Lock publishLock = new ReentrantLock(); 078 079 /** 080 * These do not need synchronization because they are only modified during the 081 * initial single threaded handshake. 082 */ 083 private short protocolVersion = ProtocolVersion.getCurrentVersion(); 084 /** Initially encrypted. */ 085 private boolean isEncrypted = true; 086 087 /** 088 * Use a buffered input stream to avoid too many system calls. 089 */ 090 private BufferedInputStream input; 091 092 /** 093 * Use a buffered output stream in order to combine message length and content 094 * into a single TCP packet if possible. 095 */ 096 private BufferedOutputStream output; 097 098 private final LinkedBlockingQueue<byte[]> sendQueue = new LinkedBlockingQueue<>(4000); 099 private AtomicBoolean isRunning = new AtomicBoolean(false); 100 private final CountDownLatch latch = new CountDownLatch(1); 101 102 /** 103 * Creates a new Session. 104 * 105 * @param socket 106 * The regular Socket on which the SocketSession will be based. 107 * @param secureSocket 108 * The secure Socket on which the SocketSession will be based. 109 * @throws IOException 110 * When an IException happens on the socket. 111 */ 112 public Session(final Socket socket, 113 final SSLSocket secureSocket) throws IOException 114 { 115 super("Replication Session from "+ socket.getLocalSocketAddress() + 116 " to " + socket.getRemoteSocketAddress()); 117 if (logger.isTraceEnabled()) 118 { 119 logger.trace( 120 "Creating Session from %s to %s in %s", 121 socket.getLocalSocketAddress(), 122 socket.getRemoteSocketAddress(), 123 stackTraceToSingleLineString(new Exception())); 124 } 125 126 this.plainSocket = socket; 127 this.secureSocket = secureSocket; 128 this.plainInput = plainSocket.getInputStream(); 129 this.plainOutput = plainSocket.getOutputStream(); 130 this.input = new BufferedInputStream(secureSocket.getInputStream()); 131 this.output = new BufferedOutputStream(secureSocket.getOutputStream()); 132 this.readableRemoteAddress = plainSocket.getRemoteSocketAddress() 133 .toString(); 134 this.remoteAddress = plainSocket.getInetAddress().getHostAddress(); 135 this.localUrl = plainSocket.getLocalAddress().getHostName() + ":" 136 + plainSocket.getLocalPort(); 137 } 138 139 140 141 /** 142 * This method is called when the session with the remote must be closed. 143 * This object won't be used anymore after this method is called. 144 */ 145 @Override 146 public void close() 147 { 148 Throwable localSessionError; 149 150 synchronized (stateLock) 151 { 152 if (closeInitiated) 153 { 154 return; 155 } 156 157 localSessionError = sessionError; 158 closeInitiated = true; 159 } 160 161 try { 162 interrupt(); 163 join(); 164 } 165 catch (InterruptedException e) { 166 Thread.currentThread().interrupt(); 167 } 168 169 // Perform close outside of critical section. 170 if (logger.isTraceEnabled()) 171 { 172 if (localSessionError == null) 173 { 174 logger.trace( 175 "Closing Session from %s to %s in %s", 176 plainSocket.getLocalSocketAddress(), 177 plainSocket.getRemoteSocketAddress(), 178 stackTraceToSingleLineString(new Exception())); 179 } 180 else 181 { 182 logger.traceException(localSessionError, 183 "Aborting Session from %s to %s in %s due to the following error", 184 plainSocket.getLocalSocketAddress(), 185 plainSocket.getRemoteSocketAddress(), 186 stackTraceToSingleLineString(new Exception())); 187 } 188 } 189 190 // V4 protocol introduces a StopMsg to properly end communications. 191 if (localSessionError == null 192 && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 193 { 194 try 195 { 196 publish(new StopMsg()); 197 } 198 catch (final IOException ignored) 199 { 200 // Ignore errors on close. 201 } 202 } 203 204 StaticUtils.close(plainSocket, secureSocket); 205 } 206 207 208 209 /** 210 * This methods allows to determine if the session close was initiated 211 * on this Session. 212 * 213 * @return A boolean allowing to determine if the session close was initiated 214 * on this Session. 215 */ 216 public boolean closeInitiated() 217 { 218 synchronized (stateLock) 219 { 220 return closeInitiated; 221 } 222 } 223 224 225 226 /** 227 * Gets the time the last replication message was published on this 228 * session. 229 * @return The timestamp in milliseconds of the last message published. 230 */ 231 public long getLastPublishTime() 232 { 233 return lastPublishTime; 234 } 235 236 237 238 /** 239 * Gets the time the last replication message was received on this 240 * session. 241 * @return The timestamp in milliseconds of the last message received. 242 */ 243 public long getLastReceiveTime() 244 { 245 if (lastReceiveTime == 0) 246 { 247 return System.currentTimeMillis(); 248 } 249 return lastReceiveTime; 250 } 251 252 253 254 /** 255 * Retrieve the local URL in the form host:port. 256 * 257 * @return The local URL. 258 */ 259 public String getLocalUrl() 260 { 261 return localUrl; 262 } 263 264 265 266 /** 267 * Retrieve the human readable address of the remote server. 268 * 269 * @return The human readable address of the remote server. 270 */ 271 public String getReadableRemoteAddress() 272 { 273 return readableRemoteAddress; 274 } 275 276 277 278 /** 279 * Retrieve the IP address of the remote server. 280 * 281 * @return The IP address of the remote server. 282 */ 283 public String getRemoteAddress() 284 { 285 return remoteAddress; 286 } 287 288 289 290 /** 291 * Determine whether the session is using a security layer. 292 * @return true if the connection is encrypted, false otherwise. 293 */ 294 public boolean isEncrypted() 295 { 296 return isEncrypted; 297 } 298 299 300 301 /** 302 * Sends a replication message to the remote peer. 303 * 304 * @param msg 305 * The message to be sent. 306 * @throws IOException 307 * If an IO error occurred. 308 */ 309 public void publish(final ReplicationMsg msg) throws IOException 310 { 311 final byte[] buffer = msg.getBytes(protocolVersion); 312 if (buffer == null) 313 { 314 // skip anything that cannot be encoded for this peer. 315 return; 316 } 317 if (isRunning.get()) 318 { 319 while (!closeInitiated) 320 { 321 try 322 { 323 // Avoid blocking forever so that we can check for session closure. 324 if (sendQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) 325 { 326 return; 327 } 328 } 329 catch (final InterruptedException e) 330 { 331 setSessionError(e); 332 throw new IOException(e.getMessage()); 333 } 334 } 335 } 336 else 337 { 338 send(buffer); 339 } 340 } 341 342 /** Sends a replication message already encoded to the socket. 343 * 344 * @param buffer 345 * the encoded buffer 346 * @throws IOException if the message could not be sent 347 */ 348 private void send(final byte[] buffer) throws IOException 349 { 350 final String str = String.format("%08x", buffer.length); 351 final byte[] sendLengthBuf = str.getBytes(); 352 353 publishLock.lock(); 354 try 355 { 356 /* 357 * The buffered output stream ensures that the message is usually sent as 358 * a single TCP packet. 359 */ 360 output.write(sendLengthBuf); 361 output.write(buffer); 362 output.flush(); 363 } catch (final IOException e) { 364 setSessionError(e); 365 throw e; 366 } 367 finally 368 { 369 publishLock.unlock(); 370 } 371 372 lastPublishTime = System.currentTimeMillis(); 373 } 374 375 376 377 /** 378 * Attempt to receive a ReplicationMsg. 379 * This method should block the calling thread until a 380 * ReplicationMsg is available or until an error condition. 381 * 382 * This method can only be called by a single thread and therefore does not 383 * need to implement any replication. 384 * 385 * @return The ReplicationMsg that was received. 386 * @throws IOException When error happened during IO process. 387 * @throws DataFormatException When the data received is not formatted as a 388 * ReplicationMsg. 389 * @throws NotSupportedOldVersionPDUException If the received PDU is part of 390 * an old protocol version and we do not support it. 391 */ 392 public ReplicationMsg receive() throws IOException, 393 DataFormatException, NotSupportedOldVersionPDUException 394 { 395 try 396 { 397 /* 398 * Let's start the stop-watch before waiting on read for the heartbeat 399 * check to be operational. 400 */ 401 lastReceiveTime = System.currentTimeMillis(); 402 403 // Read the first 8 bytes containing the packet length. 404 read(rcvLengthBuf); 405 final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); 406 407 try 408 { 409 final byte[] buffer = new byte[totalLength]; 410 read(buffer); 411 412 /* 413 * We do not want the heartbeat to close the session when we are 414 * processing a message even a time consuming one. 415 */ 416 lastReceiveTime = 0; 417 return ReplicationMsg.generateMsg(buffer, protocolVersion); 418 } 419 catch (final OutOfMemoryError e) 420 { 421 throw new IOException("Packet too large, can't allocate " 422 + totalLength + " bytes."); 423 } 424 } 425 catch (final IOException | DataFormatException | NotSupportedOldVersionPDUException | RuntimeException e) 426 { 427 setSessionError(e); 428 throw e; 429 } 430 } 431 432 private void read(byte[] buffer) throws IOException 433 { 434 final int totalLength = buffer.length; 435 int length = 0; 436 while (length < totalLength) 437 { 438 final int read = input.read(buffer, length, totalLength - length); 439 if (read == -1) 440 { 441 lastReceiveTime = 0; 442 throw new IOException("no more data"); 443 } 444 length += read; 445 } 446 } 447 448 /** 449 * This method is called at the establishment of the session and can 450 * be used to record the version of the protocol that is currently used. 451 * 452 * @param version The version of the protocol that is currently used. 453 */ 454 public void setProtocolVersion(final short version) 455 { 456 protocolVersion = version; 457 } 458 459 460 /** 461 * Returns the version of the protocol that is currently used. 462 * 463 * @return The version of the protocol that is currently used. 464 */ 465 public short getProtocolVersion() 466 { 467 return protocolVersion; 468 } 469 470 471 472 /** 473 * Set a timeout value. 474 * With this option set to a non-zero value, calls to the receive() method 475 * block for only this amount of time after which a 476 * java.net.SocketTimeoutException is raised. 477 * The Broker is valid and usable even after such an Exception is raised. 478 * 479 * @param timeout the specified timeout, in milliseconds. 480 * @throws SocketException if there is an error in the underlying protocol, 481 * such as a TCP error. 482 */ 483 public void setSoTimeout(final int timeout) throws SocketException 484 { 485 plainSocket.setSoTimeout(timeout); 486 } 487 488 489 490 /** 491 * Stop using the security layer, if there is any. 492 */ 493 public void stopEncryption() 494 { 495 /* 496 * The secure socket has been configured not to auto close the underlying 497 * plain socket. We should close it here and properly tear down the SSL 498 * session, but this is not compatible with the existing protocol. 499 */ 500 if (false) 501 { 502 StaticUtils.close(secureSocket); 503 } 504 505 input = new BufferedInputStream(plainInput); 506 output = new BufferedOutputStream(plainOutput); 507 isEncrypted = false; 508 } 509 510 511 512 private void setSessionError(final Exception e) 513 { 514 synchronized (stateLock) 515 { 516 if (sessionError == null) 517 { 518 sessionError = e; 519 } 520 } 521 } 522 523 /** 524 * Run method for the Session. 525 * Loops waiting for buffers from the queue and sends them when available. 526 */ 527 @Override 528 public void run() 529 { 530 isRunning.set(true); 531 latch.countDown(); 532 if (logger.isTraceEnabled()) 533 { 534 logger.trace(getName() + " starting."); 535 } 536 boolean needClosing = false; 537 while (!closeInitiated) 538 { 539 byte[] buffer; 540 try 541 { 542 buffer = sendQueue.take(); 543 } 544 catch (InterruptedException ie) 545 { 546 break; 547 } 548 try 549 { 550 send(buffer); 551 } 552 catch (IOException e) 553 { 554 setSessionError(e); 555 needClosing = true; 556 } 557 } 558 isRunning.set(false); 559 if (needClosing) 560 { 561 close(); 562 } 563 if (logger.isTraceEnabled()) 564 { 565 logger.trace(getName() + " stopped."); 566 } 567 } 568 569 /** 570 * This method can be called to wait until the session thread is 571 * properly started. 572 * @throws InterruptedException when interrupted 573 */ 574 public void waitForStartup() throws InterruptedException 575 { 576 latch.await(); 577 } 578}