001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2009 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.protocol;
018
019import static org.opends.server.util.StaticUtils.*;
020
021import java.io.*;
022import java.net.Socket;
023import java.net.SocketException;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReentrantLock;
030import java.util.zip.DataFormatException;
031
032import javax.net.ssl.SSLSocket;
033
034import org.opends.server.api.DirectoryThread;
035import org.forgerock.i18n.slf4j.LocalizedLogger;
036import org.opends.server.util.StaticUtils;
037
038/**
039 * This class defines a replication session using TLS.
040 */
041public final class Session extends DirectoryThread implements Closeable
042{
043  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
044
045  private final Socket plainSocket;
046  private final SSLSocket secureSocket;
047  private final InputStream plainInput;
048  private final OutputStream plainOutput;
049  private final byte[] rcvLengthBuf = new byte[8];
050  private final String readableRemoteAddress;
051  private final String remoteAddress;
052  private final String localUrl;
053
054  /**
055   * The time the last message published to this session.
056   */
057  private volatile long lastPublishTime;
058
059  /**
060   * The time the last message was received on this session.
061   */
062  private volatile long lastReceiveTime;
063
064  /**
065   * Close and error guarded by stateLock: use a different lock to publish since
066   * publishing can block, and we don't want to block while closing failed
067   * connections.
068   */
069  private final Object stateLock = new Object();
070  private volatile boolean closeInitiated;
071  private Throwable sessionError;
072
073  /**
074   * Publish guarded by publishLock: use a full lock here so that we can
075   * optionally publish StopMsg during close.
076   */
077  private final Lock publishLock = new ReentrantLock();
078
079  /**
080   * These do not need synchronization because they are only modified during the
081   * initial single threaded handshake.
082   */
083  private short protocolVersion = ProtocolVersion.getCurrentVersion();
084  /** Initially encrypted. */
085  private boolean isEncrypted = true;
086
087  /**
088   * Use a buffered input stream to avoid too many system calls.
089   */
090  private BufferedInputStream input;
091
092  /**
093   * Use a buffered output stream in order to combine message length and content
094   * into a single TCP packet if possible.
095   */
096  private BufferedOutputStream output;
097
098  private final LinkedBlockingQueue<byte[]> sendQueue = new LinkedBlockingQueue<>(4000);
099  private AtomicBoolean isRunning = new AtomicBoolean(false);
100  private final CountDownLatch latch = new CountDownLatch(1);
101
102  /**
103   * Creates a new Session.
104   *
105   * @param socket
106   *          The regular Socket on which the SocketSession will be based.
107   * @param secureSocket
108   *          The secure Socket on which the SocketSession will be based.
109   * @throws IOException
110   *           When an IException happens on the socket.
111   */
112  public Session(final Socket socket,
113                 final SSLSocket secureSocket) throws IOException
114  {
115    super("Replication Session from "+ socket.getLocalSocketAddress() +
116        " to " + socket.getRemoteSocketAddress());
117    if (logger.isTraceEnabled())
118    {
119      logger.trace(
120          "Creating Session from %s to %s in %s",
121          socket.getLocalSocketAddress(),
122          socket.getRemoteSocketAddress(),
123          stackTraceToSingleLineString(new Exception()));
124    }
125
126    this.plainSocket = socket;
127    this.secureSocket = secureSocket;
128    this.plainInput = plainSocket.getInputStream();
129    this.plainOutput = plainSocket.getOutputStream();
130    this.input = new BufferedInputStream(secureSocket.getInputStream());
131    this.output = new BufferedOutputStream(secureSocket.getOutputStream());
132    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
133        .toString();
134    this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
135    this.localUrl = plainSocket.getLocalAddress().getHostName() + ":"
136        + plainSocket.getLocalPort();
137  }
138
139
140
141  /**
142   * This method is called when the session with the remote must be closed.
143   * This object won't be used anymore after this method is called.
144   */
145  @Override
146  public void close()
147  {
148    Throwable localSessionError;
149
150    synchronized (stateLock)
151    {
152      if (closeInitiated)
153      {
154        return;
155      }
156
157      localSessionError = sessionError;
158      closeInitiated = true;
159    }
160
161    try {
162      interrupt();
163      join();
164    }
165    catch (InterruptedException e) {
166      Thread.currentThread().interrupt();
167    }
168
169    // Perform close outside of critical section.
170    if (logger.isTraceEnabled())
171    {
172      if (localSessionError == null)
173      {
174        logger.trace(
175            "Closing Session from %s to %s in %s",
176            plainSocket.getLocalSocketAddress(),
177            plainSocket.getRemoteSocketAddress(),
178            stackTraceToSingleLineString(new Exception()));
179      }
180      else
181      {
182        logger.traceException(localSessionError,
183            "Aborting Session from %s to %s in %s due to the following error",
184            plainSocket.getLocalSocketAddress(),
185            plainSocket.getRemoteSocketAddress(),
186            stackTraceToSingleLineString(new Exception()));
187      }
188    }
189
190    // V4 protocol introduces a StopMsg to properly end communications.
191    if (localSessionError == null
192        && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
193    {
194      try
195      {
196        publish(new StopMsg());
197      }
198      catch (final IOException ignored)
199      {
200        // Ignore errors on close.
201      }
202    }
203
204    StaticUtils.close(plainSocket, secureSocket);
205  }
206
207
208
209  /**
210   * This methods allows to determine if the session close was initiated
211   * on this Session.
212   *
213   * @return A boolean allowing to determine if the session close was initiated
214   * on this Session.
215   */
216  public boolean closeInitiated()
217  {
218    synchronized (stateLock)
219    {
220      return closeInitiated;
221    }
222  }
223
224
225
226  /**
227   * Gets the time the last replication message was published on this
228   * session.
229   * @return The timestamp in milliseconds of the last message published.
230   */
231  public long getLastPublishTime()
232  {
233    return lastPublishTime;
234  }
235
236
237
238  /**
239   * Gets the time the last replication message was received on this
240   * session.
241   * @return The timestamp in milliseconds of the last message received.
242   */
243  public long getLastReceiveTime()
244  {
245    if (lastReceiveTime == 0)
246    {
247      return System.currentTimeMillis();
248    }
249    return lastReceiveTime;
250  }
251
252
253
254  /**
255   * Retrieve the local URL in the form host:port.
256   *
257   * @return The local URL.
258   */
259  public String getLocalUrl()
260  {
261    return localUrl;
262  }
263
264
265
266  /**
267   * Retrieve the human readable address of the remote server.
268   *
269   * @return The human readable address of the remote server.
270   */
271  public String getReadableRemoteAddress()
272  {
273    return readableRemoteAddress;
274  }
275
276
277
278  /**
279   * Retrieve the IP address of the remote server.
280   *
281   * @return The IP address of the remote server.
282   */
283  public String getRemoteAddress()
284  {
285    return remoteAddress;
286  }
287
288
289
290  /**
291   * Determine whether the session is using a security layer.
292   * @return true if the connection is encrypted, false otherwise.
293   */
294  public boolean isEncrypted()
295  {
296    return isEncrypted;
297  }
298
299
300
301  /**
302   * Sends a replication message to the remote peer.
303   *
304   * @param msg
305   *          The message to be sent.
306   * @throws IOException
307   *           If an IO error occurred.
308   */
309  public void publish(final ReplicationMsg msg) throws IOException
310  {
311    final byte[] buffer = msg.getBytes(protocolVersion);
312    if (buffer == null)
313    {
314      // skip anything that cannot be encoded for this peer.
315      return;
316    }
317    if (isRunning.get())
318    {
319      while (!closeInitiated)
320      {
321        try
322        {
323          // Avoid blocking forever so that we can check for session closure.
324          if (sendQueue.offer(buffer, 100, TimeUnit.MILLISECONDS))
325          {
326            return;
327          }
328        }
329        catch (final InterruptedException e)
330        {
331          setSessionError(e);
332          throw new IOException(e.getMessage());
333        }
334      }
335    }
336    else
337    {
338      send(buffer);
339    }
340  }
341
342  /** Sends a replication message already encoded to the socket.
343   *
344   * @param buffer
345   *          the encoded buffer
346   * @throws IOException if the message could not be sent
347   */
348  private void send(final byte[] buffer) throws IOException
349  {
350    final String str = String.format("%08x", buffer.length);
351    final byte[] sendLengthBuf = str.getBytes();
352
353    publishLock.lock();
354    try
355    {
356      /*
357       * The buffered output stream ensures that the message is usually sent as
358       * a single TCP packet.
359       */
360      output.write(sendLengthBuf);
361      output.write(buffer);
362      output.flush();
363    } catch (final IOException e) {
364      setSessionError(e);
365      throw e;
366    }
367    finally
368    {
369      publishLock.unlock();
370    }
371
372    lastPublishTime = System.currentTimeMillis();
373  }
374
375
376
377  /**
378   * Attempt to receive a ReplicationMsg.
379   * This method should block the calling thread until a
380   * ReplicationMsg is available or until an error condition.
381   *
382   * This method can only be called by a single thread and therefore does not
383   * need to implement any replication.
384   *
385   * @return The ReplicationMsg that was received.
386   * @throws IOException When error happened during IO process.
387   * @throws DataFormatException When the data received is not formatted as a
388   *         ReplicationMsg.
389   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
390   * an old protocol version and we do not support it.
391   */
392  public ReplicationMsg receive() throws IOException,
393      DataFormatException, NotSupportedOldVersionPDUException
394  {
395    try
396    {
397      /*
398       * Let's start the stop-watch before waiting on read for the heartbeat
399       * check to be operational.
400       */
401      lastReceiveTime = System.currentTimeMillis();
402
403      // Read the first 8 bytes containing the packet length.
404      read(rcvLengthBuf);
405      final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
406
407      try
408      {
409        final byte[] buffer = new byte[totalLength];
410        read(buffer);
411
412        /*
413         * We do not want the heartbeat to close the session when we are
414         * processing a message even a time consuming one.
415         */
416        lastReceiveTime = 0;
417        return ReplicationMsg.generateMsg(buffer, protocolVersion);
418      }
419      catch (final OutOfMemoryError e)
420      {
421        throw new IOException("Packet too large, can't allocate "
422            + totalLength + " bytes.");
423      }
424    }
425    catch (final IOException | DataFormatException | NotSupportedOldVersionPDUException | RuntimeException e)
426    {
427      setSessionError(e);
428      throw e;
429    }
430  }
431
432  private void read(byte[] buffer) throws IOException
433  {
434    final int totalLength = buffer.length;
435    int length = 0;
436    while (length < totalLength)
437    {
438      final int read = input.read(buffer, length, totalLength - length);
439      if (read == -1)
440      {
441        lastReceiveTime = 0;
442        throw new IOException("no more data");
443      }
444      length += read;
445    }
446  }
447
448  /**
449   * This method is called at the establishment of the session and can
450   * be used to record the version of the protocol that is currently used.
451   *
452   * @param version The version of the protocol that is currently used.
453   */
454  public void setProtocolVersion(final short version)
455  {
456    protocolVersion = version;
457  }
458
459
460  /**
461   * Returns the version of the protocol that is currently used.
462   *
463   * @return The version of the protocol that is currently used.
464   */
465  public short getProtocolVersion()
466  {
467    return protocolVersion;
468  }
469
470
471
472  /**
473   * Set a timeout value.
474   * With this option set to a non-zero value, calls to the receive() method
475   * block for only this amount of time after which a
476   * java.net.SocketTimeoutException is raised.
477   * The Broker is valid and usable even after such an Exception is raised.
478   *
479   * @param timeout the specified timeout, in milliseconds.
480   * @throws SocketException if there is an error in the underlying protocol,
481   *         such as a TCP error.
482   */
483  public void setSoTimeout(final int timeout) throws SocketException
484  {
485    plainSocket.setSoTimeout(timeout);
486  }
487
488
489
490  /**
491   * Stop using the security layer, if there is any.
492   */
493  public void stopEncryption()
494  {
495    /*
496     * The secure socket has been configured not to auto close the underlying
497     * plain socket. We should close it here and properly tear down the SSL
498     * session, but this is not compatible with the existing protocol.
499     */
500    if (false)
501    {
502      StaticUtils.close(secureSocket);
503    }
504
505    input = new BufferedInputStream(plainInput);
506    output = new BufferedOutputStream(plainOutput);
507    isEncrypted = false;
508  }
509
510
511
512  private void setSessionError(final Exception e)
513  {
514    synchronized (stateLock)
515    {
516      if (sessionError == null)
517      {
518        sessionError = e;
519      }
520    }
521  }
522
523  /**
524   * Run method for the Session.
525   * Loops waiting for buffers from the queue and sends them when available.
526   */
527  @Override
528  public void run()
529  {
530    isRunning.set(true);
531    latch.countDown();
532    if (logger.isTraceEnabled())
533    {
534      logger.trace(getName() + " starting.");
535    }
536    boolean needClosing = false;
537    while (!closeInitiated)
538    {
539      byte[] buffer;
540      try
541      {
542        buffer = sendQueue.take();
543      }
544      catch (InterruptedException ie)
545      {
546        break;
547      }
548      try
549      {
550        send(buffer);
551      }
552      catch (IOException e)
553      {
554        setSessionError(e);
555        needClosing = true;
556      }
557    }
558    isRunning.set(false);
559    if (needClosing)
560    {
561      close();
562    }
563    if (logger.isTraceEnabled())
564    {
565      logger.trace(getName() + " stopped.");
566    }
567  }
568
569  /**
570   * This method can be called to wait until the session thread is
571   * properly started.
572   * @throws InterruptedException when interrupted
573   */
574  public void waitForStartup() throws InterruptedException
575  {
576    latch.await();
577  }
578}