001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2017 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import static org.opends.messages.ReplicationMessages.*;
020
021import java.io.IOException;
022import java.util.Random;
023import java.util.concurrent.Semaphore;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.forgerock.i18n.LocalizableMessage;
028import org.forgerock.i18n.slf4j.LocalizedLogger;
029import org.forgerock.opendj.config.server.ConfigException;
030import org.forgerock.opendj.ldap.ResultCode;
031import org.opends.server.api.MonitorData;
032import org.forgerock.opendj.server.config.server.MonitorProviderCfg;
033import org.opends.server.core.DirectoryServer;
034import org.opends.server.replication.common.AssuredMode;
035import org.opends.server.replication.common.CSN;
036import org.opends.server.replication.common.RSInfo;
037import org.opends.server.replication.common.ServerStatus;
038import org.opends.server.replication.protocol.AckMsg;
039import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
040import org.opends.server.replication.protocol.HeartbeatThread;
041import org.opends.server.replication.protocol.MonitorMsg;
042import org.opends.server.replication.protocol.MonitorRequestMsg;
043import org.opends.server.replication.protocol.ProtocolVersion;
044import org.opends.server.replication.protocol.ReplServerStartMsg;
045import org.opends.server.replication.protocol.ReplicaOfflineMsg;
046import org.opends.server.replication.protocol.ReplicationMsg;
047import org.opends.server.replication.protocol.ResetGenerationIdMsg;
048import org.opends.server.replication.protocol.RoutableMsg;
049import org.opends.server.replication.protocol.Session;
050import org.opends.server.replication.protocol.StartMsg;
051import org.opends.server.replication.protocol.StartSessionMsg;
052import org.opends.server.replication.protocol.TopologyMsg;
053import org.opends.server.replication.protocol.UpdateMsg;
054import org.opends.server.replication.protocol.WindowMsg;
055import org.opends.server.replication.server.changelog.api.ChangelogException;
056import org.opends.server.types.DirectoryException;
057import org.opends.server.types.InitializationException;
058
059/**
060 * This class defines a server handler  :
061 * - that is a MessageHandler (see this class for more details)
062 * - that handles all interaction with a peer server (RS or DS).
063 */
064public abstract class ServerHandler extends MessageHandler
065{
066
067  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
068
069  /**
070   * Time during which the server will wait for existing thread to stop
071   * during the shutdownWriter.
072   */
073  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
074
075  /**
076   * The serverId of the remote server.
077   */
078  protected int serverId;
079  /**
080   * The session opened with the remote server.
081   */
082  protected final Session session;
083
084  /**
085   * The serverURL of the remote server.
086   */
087  protected String serverURL;
088  /**
089   * Number of updates received from the server in assured safe read mode.
090   */
091  private int assuredSrReceivedUpdates;
092  /**
093   * Number of updates received from the server in assured safe read mode that
094   * timed out.
095   */
096  private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
097  /**
098   * Number of updates sent to the server in assured safe read mode.
099   */
100  private int assuredSrSentUpdates;
101  /**
102   * Number of updates sent to the server in assured safe read mode that timed
103   * out.
104   */
105  private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
106  /**
107   * Number of updates received from the server in assured safe data mode.
108   */
109  private int assuredSdReceivedUpdates;
110  /**
111   * Number of updates received from the server in assured safe data mode that
112   * timed out.
113   */
114  private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
115  /**
116   * Number of updates sent to the server in assured safe data mode.
117   */
118  private int assuredSdSentUpdates;
119
120  /**
121   * Number of updates sent to the server in assured safe data mode that timed out.
122   */
123  private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
124
125  /**
126   * The associated ServerWriter that sends messages to the remote server.
127   */
128  private ServerWriter writer;
129
130  /**
131   * The associated ServerReader that receives messages from the remote server.
132   */
133  private ServerReader reader;
134
135  /** Window. */
136  private int rcvWindow;
137  private final int rcvWindowSizeHalf;
138
139  /** The size of the receiving window. */
140  protected final int maxRcvWindow;
141  /** Semaphore that the writer uses to control the flow to the remote server. */
142  private Semaphore sendWindow;
143  /** The initial size of the sending window. */
144  private int sendWindowSize;
145  /** Remote generation id. */
146  protected long generationId = -1;
147  /** The generation id of the hosting RS. */
148  protected long localGenerationId = -1;
149  /** The generation id before processing a new start handshake. */
150  protected long oldGenerationId = -1;
151  /** Group id of this remote server. */
152  protected byte groupId = -1;
153  /** The SSL encryption after the negotiation with the peer. */
154  protected boolean sslEncryption;
155  /**
156   * The time in milliseconds between heartbeats from the replication
157   * server.  Zero means heartbeats are off.
158   */
159  protected long heartbeatInterval;
160
161  /** The thread that will send heartbeats. */
162  private HeartbeatThread heartbeatThread;
163
164  /** Set when ServerWriter is stopping. */
165  private volatile boolean shutdownWriter;
166
167  /** Weight of this remote server. */
168  protected int weight = 1;
169
170  /**
171   * Creates a new server handler instance with the provided socket.
172   *
173   * @param session The Session used by the ServerHandler to
174   *                 communicate with the remote entity.
175   * @param queueSize The maximum number of update that will be kept
176   *                  in memory by this ServerHandler.
177   * @param replicationServer The hosting replication server.
178   * @param rcvWindowSize The window size to receive from the remote server.
179   */
180  public ServerHandler(
181      Session session,
182      int queueSize,
183      ReplicationServer replicationServer,
184      int rcvWindowSize)
185  {
186    super(queueSize, replicationServer);
187    this.session = session;
188    this.rcvWindowSizeHalf = rcvWindowSize / 2;
189    this.maxRcvWindow = rcvWindowSize;
190    this.rcvWindow = rcvWindowSize;
191  }
192
193  /**
194   * Abort a start procedure currently establishing.
195   * @param reason The provided reason.
196   */
197  protected void abortStart(LocalizableMessage reason)
198  {
199    // We did not recognize the message, close session as what can happen after
200    // is undetermined and we do not want the server to be disturbed
201    Session localSession = session;
202    if (localSession != null)
203    {
204      if (reason != null)
205      {
206        if (logger.isTraceEnabled())
207        {
208         logger.trace("In " + this + " closing session with err=" + reason);
209        }
210        logger.error(reason);
211      }
212
213      // This method is only called when aborting a failing handshake and
214      // not StopMsg should be sent in such situation. StopMsg are only
215      // expected when full handshake has been performed, or at end of
216      // handshake phase 1, when DS was just gathering available RS info
217      localSession.close();
218    }
219
220    releaseDomainLock();
221
222    // If generation id of domain was changed, set it back to old value
223    // We may have changed it as it was -1 and we received a value >0 from peer
224    // server and the last topo message sent may have failed being sent: in that
225    // case retrieve old value of generation id for replication server domain
226    if (oldGenerationId != -100)
227    {
228      replicationServerDomain.changeGenerationId(oldGenerationId);
229    }
230  }
231
232  /**
233   * Releases the lock on the replication server domain if it was held.
234   */
235  protected void releaseDomainLock()
236  {
237    if (replicationServerDomain.hasLock())
238    {
239      replicationServerDomain.release();
240    }
241  }
242
243  /**
244   * Check the protocol window and send WindowMsg if necessary.
245   *
246   * @throws IOException when the session becomes unavailable.
247   */
248  public synchronized void checkWindow() throws IOException
249  {
250    if (rcvWindow < rcvWindowSizeHalf)
251    {
252      WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
253      session.publish(msg);
254      rcvWindow += rcvWindowSizeHalf;
255    }
256  }
257
258  /**
259   * Decrement the protocol window, then check if it is necessary
260   * to send a WindowMsg and send it.
261   *
262   * @throws IOException when the session becomes unavailable.
263   */
264  private synchronized void decAndCheckWindow() throws IOException
265  {
266    rcvWindow--;
267    checkWindow();
268  }
269
270  /**
271   * Finalize the initialization, create reader, writer, heartbeat system
272   * and monitoring system.
273   * @throws DirectoryException When an exception is raised.
274   */
275  protected void finalizeStart() throws DirectoryException
276  {
277    // FIXME:ECL We should refactor so that a SH always have a session
278    if (session != null)
279    {
280      try
281      {
282        // Disable timeout for next communications
283        session.setSoTimeout(0);
284      }
285      catch(Exception e)
286      { /* do nothing */
287      }
288
289      // sendWindow MUST be created before starting the writer
290      sendWindow = new Semaphore(sendWindowSize);
291
292      writer = new ServerWriter(session, this, replicationServerDomain,
293          replicationServer.getDSRSShutdownSync());
294      reader = new ServerReader(session, this);
295
296      session.setName("Replication server RS(" + getReplicationServerId()
297          + ") session thread to " + this + " at "
298          + session.getReadableRemoteAddress());
299      session.start();
300      try
301      {
302        session.waitForStartup();
303      }
304      catch (InterruptedException e)
305      {
306        final LocalizableMessage message =
307            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
308        throw new DirectoryException(ResultCode.OTHER, message, e);
309      }
310      reader.start();
311      writer.start();
312
313      // Create a thread to send heartbeat messages.
314      if (heartbeatInterval > 0)
315      {
316        String threadName = "Replication server RS(" + getReplicationServerId()
317            + ") heartbeat publisher to " + this + " at "
318            + session.getReadableRemoteAddress();
319        heartbeatThread = new HeartbeatThread(threadName, session,
320            heartbeatInterval / 3);
321        heartbeatThread.start();
322      }
323    }
324
325    DirectoryServer.deregisterMonitorProvider(this);
326    DirectoryServer.registerMonitorProvider(this);
327  }
328
329  /**
330   * Sends a message.
331   *
332   * @param msg
333   *          The message to be sent.
334   * @throws IOException
335   *           When it occurs while sending the message,
336   */
337  public void send(ReplicationMsg msg) throws IOException
338  {
339    // avoid logging anything for unit tests that include a null domain.
340    if (logger.isTraceEnabled())
341    {
342      logger.trace("In "
343          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
344          + this + " publishes message:\n" + msg);
345    }
346    session.publish(msg);
347  }
348
349  /**
350   * Get the age of the older change that has not yet been replicated
351   * to the server handled by this ServerHandler.
352   * @return The age if the older change has not yet been replicated
353   *         to the server handled by this ServerHandler.
354   */
355  public long getApproxFirstMissingDate()
356  {
357    // Get the older CSN received
358    CSN olderUpdateCSN = getOlderUpdateCSN();
359    if (olderUpdateCSN != null)
360    {
361      // If not present in the local RS db,
362      // then approximate with the older update time
363      return olderUpdateCSN.getTime();
364    }
365    return 0;
366  }
367
368  /**
369   * Get the number of updates received from the server in assured safe data
370   * mode.
371   * @return The number of updates received from the server in assured safe data
372   * mode
373   */
374  public int getAssuredSdReceivedUpdates()
375  {
376    return assuredSdReceivedUpdates;
377  }
378
379  /**
380   * Get the number of updates received from the server in assured safe data
381   * mode that timed out.
382   * @return The number of updates received from the server in assured safe data
383   * mode that timed out.
384   */
385  public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
386  {
387    return assuredSdReceivedUpdatesTimeout;
388  }
389
390  /**
391   * Get the number of updates sent to the server in assured safe data mode.
392   * @return The number of updates sent to the server in assured safe data mode
393   */
394  public int getAssuredSdSentUpdates()
395  {
396    return assuredSdSentUpdates;
397  }
398
399  /**
400   * Get the number of updates sent to the server in assured safe data mode that
401   * timed out.
402   * @return The number of updates sent to the server in assured safe data mode
403   * that timed out.
404   */
405  public AtomicInteger getAssuredSdSentUpdatesTimeout()
406  {
407    return assuredSdSentUpdatesTimeout;
408  }
409
410  /**
411   * Get the number of updates received from the server in assured safe read
412   * mode.
413   * @return The number of updates received from the server in assured safe read
414   * mode
415   */
416  public int getAssuredSrReceivedUpdates()
417  {
418    return assuredSrReceivedUpdates;
419  }
420
421  /**
422   * Get the number of updates received from the server in assured safe read
423   * mode that timed out.
424   * @return The number of updates received from the server in assured safe read
425   * mode that timed out.
426   */
427  public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
428  {
429    return assuredSrReceivedUpdatesTimeout;
430  }
431
432  /**
433   * Get the number of updates sent to the server in assured safe read mode.
434   * @return The number of updates sent to the server in assured safe read mode
435   */
436  public int getAssuredSrSentUpdates()
437  {
438    return assuredSrSentUpdates;
439  }
440
441  /**
442   * Get the number of updates sent to the server in assured safe read mode that
443   * timed out.
444   * @return The number of updates sent to the server in assured safe read mode
445   * that timed out.
446   */
447  public AtomicInteger getAssuredSrSentUpdatesTimeout()
448  {
449    return assuredSrSentUpdatesTimeout;
450  }
451
452  /**
453   * Returns the Replication Server Domain to which belongs this server handler.
454   *
455   * @return The replication server domain.
456   */
457  public ReplicationServerDomain getDomain()
458  {
459    return replicationServerDomain;
460  }
461
462  /**
463   * Returns the value of generationId for that handler.
464   * @return The value of the generationId.
465   */
466  public long getGenerationId()
467  {
468    return generationId;
469  }
470
471  /**
472   * Gets the group id of the server represented by this object.
473   * @return The group id of the server represented by this object.
474   */
475  public byte getGroupId()
476  {
477    return groupId;
478  }
479
480  /**
481   * Get our heartbeat interval.
482   * @return Our heartbeat interval.
483   */
484  public long getHeartbeatInterval()
485  {
486    return heartbeatInterval;
487  }
488
489  @Override
490  public MonitorData getMonitorData()
491  {
492    // Get the generic ones
493    MonitorData attributes = super.getMonitorData();
494
495    attributes.add("server-id", serverId);
496    attributes.add("domain-name", getBaseDN());
497
498    // Deprecated
499    attributes.add("max-waiting-changes", maxQueueSize);
500    attributes.add("sent-updates", getOutCount());
501    attributes.add("received-updates", getInCount());
502
503    // Assured counters
504    attributes.add("assured-sr-received-updates", getAssuredSrReceivedUpdates());
505    attributes.add("assured-sr-received-updates-timeout", getAssuredSrReceivedUpdatesTimeout());
506    attributes.add("assured-sr-sent-updates", getAssuredSrSentUpdates());
507    attributes.add("assured-sr-sent-updates-timeout", getAssuredSrSentUpdatesTimeout());
508    attributes.add("assured-sd-received-updates", getAssuredSdReceivedUpdates());
509    if (!isDataServer())
510    {
511      attributes.add("assured-sd-sent-updates", getAssuredSdSentUpdates());
512      attributes.add("assured-sd-sent-updates-timeout", getAssuredSdSentUpdatesTimeout());
513    } else
514    {
515      attributes.add("assured-sd-received-updates-timeout", getAssuredSdReceivedUpdatesTimeout());
516    }
517
518    // Window stats
519    attributes.add("max-send-window", sendWindowSize);
520    attributes.add("current-send-window", sendWindow.availablePermits());
521    attributes.add("max-rcv-window", maxRcvWindow);
522    attributes.add("current-rcv-window", rcvWindow);
523
524    // Encryption
525    attributes.add("ssl-encryption", session.isEncrypted());
526
527    // Data generation
528    attributes.add("generation-id", generationId);
529
530    return attributes;
531  }
532
533  /**
534   * Retrieves the name of this monitor provider.  It should be unique among all
535   * monitor providers, including all instances of the same monitor provider.
536   *
537   * @return  The name of this monitor provider.
538   */
539  @Override
540  public abstract String getMonitorInstanceName();
541
542  /**
543   * Gets the protocol version used with this remote server.
544   * @return The protocol version used with this remote server.
545   */
546  public short getProtocolVersion()
547  {
548    return session.getProtocolVersion();
549  }
550
551  /**
552   * Get the Server Id.
553   *
554   * @return the ID of the server to which this object is linked
555   */
556  public int getServerId()
557  {
558    return serverId;
559  }
560
561  /**
562   * Retrieves the URL for this server handler.
563   *
564   * @return  The URL for this server handler, in the form of an address and
565   *          port separated by a colon.
566   */
567  public String getServerURL()
568  {
569    return serverURL;
570  }
571
572  /**
573   * Return the ServerStatus.
574   * @return The server status.
575   */
576  protected abstract ServerStatus getStatus();
577
578  /**
579   * Increment the number of updates received from the server in assured safe
580   * data mode.
581   */
582  public void incrementAssuredSdReceivedUpdates()
583  {
584    assuredSdReceivedUpdates++;
585  }
586
587  /**
588   * Increment the number of updates received from the server in assured safe
589   * data mode that timed out.
590   */
591  public void incrementAssuredSdReceivedUpdatesTimeout()
592  {
593    assuredSdReceivedUpdatesTimeout.incrementAndGet();
594  }
595
596  /**
597   * Increment the number of updates sent to the server in assured safe data
598   * mode that timed out.
599   */
600  public void incrementAssuredSdSentUpdatesTimeout()
601  {
602    assuredSdSentUpdatesTimeout.incrementAndGet();
603  }
604
605  /**
606   * Increment the number of updates received from the server in assured safe
607   * read mode.
608   */
609  public void incrementAssuredSrReceivedUpdates()
610  {
611    assuredSrReceivedUpdates++;
612  }
613
614  /**
615   * Increment the number of updates received from the server in assured safe
616   * read mode that timed out.
617   */
618  public void incrementAssuredSrReceivedUpdatesTimeout()
619  {
620    assuredSrReceivedUpdatesTimeout.incrementAndGet();
621  }
622
623  /**
624   * Increment the number of updates sent to the server in assured safe read
625   * mode that timed out.
626   */
627  public void incrementAssuredSrSentUpdatesTimeout()
628  {
629    assuredSrSentUpdatesTimeout.incrementAndGet();
630  }
631
632  /** {@inheritDoc} */
633  @Override
634  public void initializeMonitorProvider(MonitorProviderCfg configuration)
635  throws ConfigException, InitializationException
636  {
637    // Nothing to do for now
638  }
639
640  /**
641   * Check if the server associated to this ServerHandler is a data server
642   * in the topology.
643   * @return true if the server is a data server.
644   */
645  public abstract boolean isDataServer();
646
647  /**
648   * Check if the server associated to this ServerHandler is a replication
649   * server.
650   * @return true if the server is a replication server.
651   */
652  public boolean isReplicationServer()
653  {
654    return !isDataServer();
655  }
656
657  // The handshake phase must be done by blocking any access to structures
658  // keeping info on connected servers, so that one can safely check for
659  // pre-existence of a server, send a coherent snapshot of known topology to
660  // peers, update the local view of the topology...
661  //
662  // For instance a kind of problem could be that while we connect with a
663  // peer RS, a DS is connecting at the same time and we could publish the
664  // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
665  //
666  // This method and every others that need to read/make changes to the
667  // structures holding topology for the domain should:
668  // - call ReplicationServerDomain.lock()
669  // - read/modify structures
670  // - call ReplicationServerDomain.release()
671  //
672  // More information is provided in comment of ReplicationServerDomain.lock()
673
674  /**
675   * Lock the domain without a timeout.
676   * <p>
677   * If domain already exists, lock it until handshake is finished otherwise it
678   * will be created and locked later in the method
679   *
680   * @throws DirectoryException
681   *           When an exception occurs.
682   * @throws InterruptedException
683   *           If the current thread was interrupted while waiting for the lock.
684   */
685  public void lockDomainNoTimeout() throws DirectoryException,
686      InterruptedException
687  {
688    if (!replicationServerDomain.hasLock())
689    {
690      replicationServerDomain.lock();
691    }
692  }
693
694  /**
695   * Lock the domain with a timeout.
696   * <p>
697   * Take the lock on the domain. WARNING: Here we try to acquire the lock with
698   * a timeout. This is for preventing a deadlock that may happen if there are
699   * cross connection attempts (for same domain) from this replication server
700   * and from a peer one.
701   * <p>
702   * Here is the scenario:
703   * <ol>
704   * <li>RS1 connect thread takes the domain lock and starts connection to RS2
705   * </li>
706   * <li>at the same time RS2 connect thread takes his domain lock and start
707   * connection to RS2</li>
708   * <li>RS2 listen thread starts processing received ReplServerStartMsg from
709   * RS1 and wants to acquire the lock on the domain (here) but cannot as RS2
710   * connect thread already has it</li>
711   * <li>RS1 listen thread starts processing received ReplServerStartMsg from
712   * RS2 and wants to acquire the lock on the domain (here) but cannot as RS1
713   * connect thread already has it</li>
714   * </ol>
715   * => Deadlock: 4 threads are locked.
716   * <p>
717   * To prevent threads locking in such situation, the listen threads here will
718   * both timeout trying to acquire the lock. The random time for the timeout
719   * should allow on connection attempt to be aborted whereas the other one
720   * should have time to finish in the same time.
721   * <p>
722   * Warning: the minimum time (3s) should be big enough to allow normal
723   * situation connections to terminate. The added random time should represent
724   * a big enough range so that the chance to have one listen thread timing out
725   * a lot before the peer one is great. When the first listen thread times out,
726   * the remote connect thread should release the lock and allow the peer listen
727   * thread to take the lock it was waiting for and process the connection
728   * attempt.
729   *
730   * @throws DirectoryException
731   *           When an exception occurs.
732   * @throws InterruptedException
733   *           If the current thread was interrupted while waiting for the lock.
734   */
735  public void lockDomainWithTimeout() throws DirectoryException,
736      InterruptedException
737  {
738    final Random random = new Random();
739    final int randomTime = random.nextInt(6); // Random from 0 to 5
740    // Wait at least 3 seconds + (0 to 5 seconds)
741    final long timeout = 3000 + randomTime * 1000;
742    final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
743    if (!lockAcquired)
744    {
745      LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
746          getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId());
747      throw new DirectoryException(ResultCode.OTHER, message);
748    }
749  }
750
751  /**
752   * Processes a routable message.
753   *
754   * @param msg The message to be processed.
755   */
756  void process(RoutableMsg msg)
757  {
758    if (logger.isTraceEnabled())
759    {
760      logger.trace("In "
761          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
762          + this + " processes routable msg received:" + msg);
763    }
764    replicationServerDomain.process(msg, this);
765  }
766
767  /**
768   * Responds to a monitor request message.
769   *
770   * @param msg
771   *          The monitor request message.
772   */
773  void processMonitorRequestMsg(MonitorRequestMsg msg)
774  {
775    replicationServerDomain.processMonitorRequestMsg(msg, this);
776  }
777
778  /**
779   * Responds to a monitor message.
780   *
781   * @param msg
782   *          The monitor message.
783   */
784  void processMonitorMsg(MonitorMsg msg)
785  {
786    replicationServerDomain.processMonitorMsg(msg, this);
787  }
788
789  /**
790   * Processes a change time heartbeat msg.
791   *
792   * @param msg
793   *          The message to be processed.
794   * @throws DirectoryException
795   *           When an exception is raised.
796   */
797  void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException
798  {
799    if (logger.isTraceEnabled())
800    {
801      logger.trace("In "
802          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
803          + this + " processes received msg:\n" + msg);
804    }
805    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
806  }
807
808  /**
809   * Process the reception of a WindowProbeMsg message.
810   *
811   * @throws IOException
812   *           When the session becomes unavailable.
813   */
814  public void replyToWindowProbe() throws IOException
815  {
816    if (rcvWindow > 0)
817    {
818      // The LDAP server believes that its window is closed while it is not,
819      // this means that some problem happened in the window exchange procedure!
820      // lets update the LDAP server with out current window size and hope
821      // that everything will work better in the future.
822      // TODO also log an error message.
823      session.publish(new WindowMsg(rcvWindow));
824    }
825    else
826    {
827      // Both the LDAP server and the replication server believes that the
828      // window is closed. Lets check the flowcontrol in case we
829      // can now resume operations and send a windowMessage if necessary.
830      checkWindow();
831    }
832  }
833
834  /**
835   * Sends the provided TopologyMsg to the peer server.
836   *
837   * @param topoMsg
838   *          The TopologyMsg message to be sent.
839   * @throws IOException
840   *           When it occurs while sending the message,
841   */
842  public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
843  {
844    // V1 Rs do not support the TopologyMsg
845    if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
846    {
847      send(topoMsg);
848    }
849  }
850
851  /**
852   * Set a new generation ID.
853   *
854   * @param generationId The new generation ID
855   *
856   */
857  public void setGenerationId(long generationId)
858  {
859    this.generationId = generationId;
860  }
861
862  /**
863   * Sets the window size when used when sending to the remote.
864   * @param size The provided window size.
865   */
866  protected void setSendWindowSize(int size)
867  {
868    this.sendWindowSize = size;
869  }
870
871  /**
872   * Shutdown This ServerHandler.
873   */
874  @Override
875  public void shutdown()
876  {
877    shutdownWriter = true;
878    setConsumerActive(false);
879    super.shutdown();
880
881    if (session != null)
882    {
883      session.close();
884    }
885    if (heartbeatThread != null)
886    {
887      heartbeatThread.shutdown();
888    }
889
890    DirectoryServer.deregisterMonitorProvider(this);
891
892    /*
893     * Be sure to wait for ServerWriter and ServerReader death
894     * It does not matter if we try to stop a thread which is us (reader
895     * or writer), but we must not wait for our own thread death.
896     */
897    try
898    {
899      if (writer != null && !Thread.currentThread().equals(writer))
900      {
901        writer.join(SHUTDOWN_JOIN_TIMEOUT);
902      }
903      if (reader != null && !Thread.currentThread().equals(reader))
904      {
905        reader.join(SHUTDOWN_JOIN_TIMEOUT);
906      }
907    } catch (InterruptedException e)
908    {
909      // don't try anymore to join and return.
910    }
911    if (logger.isTraceEnabled())
912    {
913      logger.trace("SH.shutdowned(" + this + ")");
914    }
915  }
916
917  /**
918   * Select the next update that must be sent to the server managed by this
919   * ServerHandler.
920   *
921   * @return the next update that must be sent to the server managed by this
922   *         ServerHandler.
923   * @throws ChangelogException
924   *            If a problem occurs when reading the changelog
925   */
926  public UpdateMsg take() throws ChangelogException
927  {
928    final UpdateMsg msg = getNextMessage();
929
930    if (!(msg instanceof ReplicaOfflineMsg))
931    {
932      acquirePermitInSendWindow();
933    }
934
935    if (msg != null)
936    {
937      incrementOutCount();
938      if (msg.isAssured())
939      {
940        incrementAssuredStats(msg);
941      }
942      return msg;
943    }
944    return null;
945  }
946
947  private void acquirePermitInSendWindow()
948  {
949    boolean acquired = false;
950    boolean interrupted = true;
951    do
952    {
953      try
954      {
955        acquired = sendWindow.tryAcquire(500, TimeUnit.MILLISECONDS);
956        interrupted = false;
957      } catch (InterruptedException e)
958      {
959        // loop until not interrupted
960      }
961    } while ((interrupted || !acquired) && !shutdownWriter);
962  }
963
964  private void incrementAssuredStats(final UpdateMsg msg)
965  {
966    if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
967    {
968      assuredSrSentUpdates++;
969    }
970    else if (!isDataServer())
971    {
972      assuredSdSentUpdates++;
973    }
974  }
975
976  /**
977   * Creates a RSInfo structure representing this remote RS.
978   * @return The RSInfo structure representing this remote RS
979   */
980  public RSInfo toRSInfo()
981  {
982    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
983  }
984
985  /**
986   * Update the send window size based on the credit specified in the
987   * given window message.
988   *
989   * @param windowMsg The Window LocalizableMessage containing the information
990   *                  necessary for updating the window size.
991   */
992  public void updateWindow(WindowMsg windowMsg)
993  {
994    sendWindow.release(windowMsg.getNumAck());
995  }
996
997  /**
998   * Log the messages involved in the start handshake.
999   * @param inStartMsg The message received first.
1000   * @param outStartMsg The message sent in response.
1001   */
1002  protected void logStartHandshakeRCVandSND(
1003      StartMsg inStartMsg,
1004      StartMsg outStartMsg)
1005  {
1006    if (logger.isTraceEnabled())
1007    {
1008      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1009          + ", " + getClass().getSimpleName() + " " + this + ":"
1010          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
1011          + "\nAND REPLIED:\n" + outStartMsg);
1012    }
1013  }
1014
1015  /**
1016   * Log the messages involved in the start handshake.
1017   * @param outStartMsg The message sent first.
1018   * @param inStartMsg The message received in response.
1019   */
1020  protected void logStartHandshakeSNDandRCV(
1021      StartMsg outStartMsg,
1022      StartMsg inStartMsg)
1023  {
1024    if (logger.isTraceEnabled())
1025    {
1026      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1027          + ", " + getClass().getSimpleName() + " " + this + ":"
1028          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
1029          + inStartMsg);
1030    }
1031  }
1032
1033  /**
1034   * Log the messages involved in the Topology handshake.
1035   * @param inTopoMsg The message received first.
1036   * @param outTopoMsg The message sent in response.
1037   */
1038  protected void logTopoHandshakeRCVandSND(
1039      TopologyMsg inTopoMsg,
1040      TopologyMsg outTopoMsg)
1041  {
1042    if (logger.isTraceEnabled())
1043    {
1044      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1045          + ", " + getClass().getSimpleName() + " " + this + ":"
1046          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
1047          + outTopoMsg);
1048    }
1049  }
1050
1051  /**
1052   * Log the messages involved in the Topology handshake.
1053   * @param outTopoMsg The message sent first.
1054   * @param inTopoMsg The message received in response.
1055   */
1056  protected void logTopoHandshakeSNDandRCV(
1057      TopologyMsg outTopoMsg,
1058      TopologyMsg inTopoMsg)
1059  {
1060    if (logger.isTraceEnabled())
1061    {
1062      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1063          + ", " + getClass().getSimpleName() + " " + this + ":"
1064          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
1065          + inTopoMsg);
1066    }
1067  }
1068
1069  /**
1070   * Log the messages involved in the Topology/StartSession handshake.
1071   * @param inStartSessionMsg The message received first.
1072   * @param outTopoMsg The message sent in response.
1073   */
1074  protected void logStartSessionHandshake(
1075      StartSessionMsg inStartSessionMsg,
1076      TopologyMsg outTopoMsg)
1077  {
1078    if (logger.isTraceEnabled())
1079    {
1080      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1081          + ", " + getClass().getSimpleName() + " " + this + " :"
1082          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
1083          + "\nAND REPLIED:\n" + outTopoMsg);
1084    }
1085  }
1086
1087  /**
1088   * Log stop message has been received.
1089   */
1090  protected void logStopReceived()
1091  {
1092    if (logger.isTraceEnabled())
1093    {
1094      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1095          + ", " + getClass().getSimpleName() + " " + this + " :"
1096          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
1097    }
1098  }
1099
1100  /**
1101   * Process a Ack message received.
1102   * @param ack the message received.
1103   */
1104  void processAck(AckMsg ack)
1105  {
1106    replicationServerDomain.processAck(ack, this);
1107  }
1108
1109  /**
1110   * Get the reference generation id (associated with the changes in the db).
1111   * @return the reference generation id.
1112   */
1113  public long getReferenceGenId()
1114  {
1115    return replicationServerDomain.getGenerationId();
1116  }
1117
1118  /**
1119   * Process a ResetGenerationIdMsg message received.
1120   * @param msg the message received.
1121   */
1122  void processResetGenId(ResetGenerationIdMsg msg)
1123  {
1124    replicationServerDomain.resetGenerationId(this, msg);
1125  }
1126
1127  /**
1128   * Put a new update message received.
1129   * @param update the update message received.
1130   * @throws IOException when it occurs.
1131   */
1132  public void put(UpdateMsg update) throws IOException
1133  {
1134    if (!(update instanceof ReplicaOfflineMsg))
1135    {
1136      decAndCheckWindow();
1137    }
1138    replicationServerDomain.put(update, this);
1139  }
1140
1141  /**
1142   * Stop this handler.
1143   */
1144  public void doStop()
1145  {
1146    replicationServerDomain.stopServer(this, false);
1147  }
1148
1149  /**
1150   * Creates a ReplServerStartMsg for the current ServerHandler.
1151   *
1152   * @return a new ReplServerStartMsg for the current ServerHandler.
1153   */
1154  protected ReplServerStartMsg createReplServerStartMsg()
1155  {
1156    return new ReplServerStartMsg(getReplicationServerId(),
1157        getReplicationServerURL(), getBaseDN(), maxRcvWindow,
1158        replicationServerDomain.getLatestServerState(), localGenerationId,
1159        sslEncryption, getLocalGroupId(),
1160        replicationServer.getDegradedStatusThreshold());
1161  }
1162
1163  /**
1164   * Returns a "badly disconnected" error message for this server handler.
1165   *
1166   * @return a "badly disconnected" error message for this server handler
1167   */
1168  public LocalizableMessage getBadlyDisconnectedErrorMessage()
1169  {
1170    if (isDataServer())
1171    {
1172      return ERR_DS_BADLY_DISCONNECTED.get(getReplicationServerId(),
1173          getServerId(), session.getReadableRemoteAddress(), getBaseDN());
1174    }
1175    return ERR_RS_BADLY_DISCONNECTED.get(getReplicationServerId(),
1176        getServerId(), session.getReadableRemoteAddress(), getBaseDN());
1177  }
1178}