001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2017 ForgeRock AS.
016 */
017package org.opends.server.replication.service;
018
019import java.io.IOException;
020import java.math.BigDecimal;
021import java.math.MathContext;
022import java.math.RoundingMode;
023import java.net.*;
024import java.util.*;
025import java.util.Map.Entry;
026import java.util.concurrent.ConcurrentSkipListMap;
027import java.util.concurrent.Semaphore;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicReference;
031
032import net.jcip.annotations.GuardedBy;
033import net.jcip.annotations.Immutable;
034
035import org.forgerock.i18n.LocalizableMessage;
036import org.forgerock.i18n.slf4j.LocalizedLogger;
037import org.forgerock.opendj.ldap.DN;
038import org.forgerock.util.Utils;
039import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
040import org.opends.server.core.DirectoryServer;
041import org.opends.server.replication.common.*;
042import org.opends.server.replication.plugin.MultimasterReplication;
043import org.opends.server.replication.plugin.MultimasterReplication.UnreachableReplicationServers;
044import org.opends.server.replication.protocol.ChangeStatusMsg;
045import org.opends.server.replication.protocol.MonitorMsg;
046import org.opends.server.replication.protocol.MonitorRequestMsg;
047import org.opends.server.replication.protocol.ProtocolVersion;
048import org.opends.server.replication.protocol.ReplServerStartDSMsg;
049import org.opends.server.replication.protocol.ReplServerStartMsg;
050import org.opends.server.replication.protocol.ReplSessionSecurity;
051import org.opends.server.replication.protocol.ReplicaOfflineMsg;
052import org.opends.server.replication.protocol.ReplicationMsg;
053import org.opends.server.replication.protocol.ServerStartMsg;
054import org.opends.server.replication.protocol.Session;
055import org.opends.server.replication.protocol.StartMsg;
056import org.opends.server.replication.protocol.StartSessionMsg;
057import org.opends.server.replication.protocol.StopMsg;
058import org.opends.server.replication.protocol.TopologyMsg;
059import org.opends.server.replication.protocol.UpdateMsg;
060import org.opends.server.replication.protocol.WindowMsg;
061import org.opends.server.replication.protocol.WindowProbeMsg;
062import org.opends.server.types.HostPort;
063
064import static org.opends.messages.ReplicationMessages.*;
065import static org.opends.server.replication.plugin.MultimasterReplication.getUnreachableReplicationServers;
066import static org.opends.server.replication.protocol.ProtocolVersion.*;
067import static org.opends.server.replication.server.ReplicationServer.*;
068import static org.opends.server.util.StaticUtils.*;
069
070/**
071 * The broker for Multi-master Replication.
072 */
073public class ReplicationBroker
074{
075
076  /**
077   * Immutable class containing information about whether the broker is
078   * connected to an RS and data associated to this connected RS.
079   */
080  @Immutable
081  private static final class ConnectedRS
082  {
083
084    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
085        NO_CONNECTED_SERVER);
086
087    /** The info of the RS we are connected to. */
088    private final ReplicationServerInfo rsInfo;
089    /** Contains a connected session to the RS if any exist, null otherwise. */
090    private final Session session;
091    private final String replicationServer;
092
093    private ConnectedRS(String replicationServer)
094    {
095      this.rsInfo = null;
096      this.session = null;
097      this.replicationServer = replicationServer;
098    }
099
100    private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
101    {
102      this.rsInfo = rsInfo;
103      this.session = session;
104      this.replicationServer = session != null ?
105          session.getReadableRemoteAddress()
106          : NO_CONNECTED_SERVER;
107    }
108
109    private static ConnectedRS stopped()
110    {
111      return new ConnectedRS("stopped");
112    }
113
114    private static ConnectedRS noConnectedRS()
115    {
116      return NO_CONNECTED_RS;
117    }
118
119    public int getServerId()
120    {
121      return rsInfo != null ? rsInfo.getServerId() : -1;
122    }
123
124    private byte getGroupId()
125    {
126      return rsInfo != null ? rsInfo.getGroupId() : -1;
127    }
128
129    private boolean isConnected()
130    {
131      return session != null;
132    }
133
134    /** {@inheritDoc} */
135    @Override
136    public String toString()
137    {
138      final StringBuilder sb = new StringBuilder();
139      toString(sb);
140      return sb.toString();
141    }
142
143    public void toString(StringBuilder sb)
144    {
145      sb.append("connected=").append(isConnected()).append(", ");
146      if (!isConnected())
147      {
148        sb.append("no connectedRS");
149      }
150      else
151      {
152        sb.append("connectedRS(serverId=").append(rsInfo.getServerId())
153          .append(", serverUrl=").append(rsInfo.getServerURL())
154          .append(", groupId=").append(rsInfo.getGroupId())
155          .append(")");
156      }
157    }
158
159  }
160  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
161  private volatile boolean shutdown;
162  private final Object startStopLock = new Object();
163  private volatile ReplicationDomainCfg config;
164  /**
165   * String reported under CSN=monitor when there is no connected RS.
166   */
167  static final String NO_CONNECTED_SERVER = "Not connected";
168  private final ServerState state;
169  private Semaphore sendWindow;
170  private int maxSendWindow;
171  private int rcvWindow = 100;
172  private int halfRcvWindow = rcvWindow / 2;
173  private int timeout;
174  private final ReplSessionSecurity replSessionSecurity;
175  /**
176   * The RS this DS is currently connected to.
177   * <p>
178   * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
179   * connected RS.
180   */
181  // @NotNull // for the reference
182  private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS());
183  /** Our replication domain. */
184  private final ReplicationDomain domain;
185  /**
186   * This object is used as a conditional event to be notified about
187   * the reception of monitor information from the Replication Server.
188   */
189  private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
190  /**
191   * A Map containing the ServerStates of all the replicas in the topology
192   * as seen by the ReplicationServer the last time it was polled or the last
193   * time it published monitoring information.
194   */
195  private Map<Integer, ServerState> replicaStates = new HashMap<>();
196  /** A thread to monitor heartbeats on the session. */
197  private HeartbeatMonitor heartbeatMonitor;
198  /** The number of times the connection was lost. */
199  private int numLostConnections;
200  /**
201   * When the broker cannot connect to any replication server
202   * it log an error and keeps continuing every second.
203   * This boolean is set when the first failure happens and is used
204   * to avoid repeating the error message for further failure to connect
205   * and to know that it is necessary to print a new message when the broker
206   * finally succeed to connect.
207   */
208  private volatile boolean connectionError;
209  private final Object connectPhaseLock = new Object();
210  /**
211   * The thread that publishes messages to the RS containing the current
212   * change time of this DS.
213   */
214  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
215  /*
216   * Properties for the last topology info received from the network.
217   */
218  /** Contains the last known state of the replication topology. */
219  private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology());
220  @GuardedBy("this")
221  private volatile int updateDoneCount;
222  private volatile boolean connectRequiresRecovery;
223
224  /**
225   * This integer defines when the best replication server checking algorithm
226   * should be engaged.
227   * Every time a monitoring message (each monitoring publisher period) is
228   * received, it is incremented. When it reaches 2, we run the checking
229   * algorithm to see if we must reconnect to another best replication server.
230   * Then we reset the value to 0. But when a topology message is received, the
231   * integer is reset to 0. This ensures that we wait at least one monitoring
232   * publisher period before running the algorithm, but also that we wait at
233   * least for a monitoring period after the last received topology message
234   * (topology stabilization).
235   */
236  private int mustRunBestServerCheckingAlgorithm;
237
238  /**
239   * The monitor provider for this replication domain.
240   * <p>
241   * The name of the monitor includes the local address and must therefore be
242   * re-registered every time the session is re-established or destroyed. The
243   * monitor provider can only be created (i.e. non-null) if there is a
244   * replication domain, which is not the case in unit tests.
245   */
246  private final ReplicationMonitor monitor;
247
248  /**
249   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
250   *
251   * @param replicationDomain The replication domain that is creating us.
252   * @param state The ServerState that should be used by this broker
253   *        when negotiating the session with the replicationServer.
254   * @param config The configuration to use.
255   * @param replSessionSecurity The session security configuration.
256   */
257  public ReplicationBroker(ReplicationDomain replicationDomain,
258      ServerState state, ReplicationDomainCfg config,
259      ReplSessionSecurity replSessionSecurity)
260  {
261    this.domain = replicationDomain;
262    this.state = state;
263    this.config = config;
264    this.replSessionSecurity = replSessionSecurity;
265    this.rcvWindow = getMaxRcvWindow();
266    this.halfRcvWindow = rcvWindow / 2;
267    this.shutdown = true;
268
269    /*
270     * Only create a monitor if there is a replication domain (this is not the
271     * case in some unit tests).
272     */
273    this.monitor = replicationDomain != null ? new ReplicationMonitor(
274        replicationDomain) : null;
275    registerReplicationMonitor();
276  }
277
278  /**
279   * Start the ReplicationBroker.
280   */
281  public void start()
282  {
283    synchronized (startStopLock)
284    {
285      if (!shutdown)
286      {
287        return;
288      }
289      shutdown = false;
290      this.rcvWindow = getMaxRcvWindow();
291      connectAsDataServer();
292    }
293  }
294
295  /**
296   * Gets the group id of the RS we are connected to.
297   * @return The group id of the RS we are connected to
298   */
299  public byte getRsGroupId()
300  {
301    return connectedRS.get().getGroupId();
302  }
303
304  /**
305   * Gets the server id of the RS we are connected to.
306   * @return The server id of the RS we are connected to
307   */
308  public int getRsServerId()
309  {
310    return connectedRS.get().getServerId();
311  }
312
313  /**
314   * Gets the server id.
315   * @return The server id
316   */
317  public int getServerId()
318  {
319    return config.getServerId();
320  }
321
322  private DN getBaseDN()
323  {
324    return config.getBaseDN();
325  }
326
327  private Set<String> getReplicationServerUrls()
328  {
329    return config.getReplicationServer();
330  }
331
332  private byte getGroupId()
333  {
334    return (byte) config.getGroupId();
335  }
336
337  /**
338   * Gets the server id.
339   * @return The server id
340   */
341  private long getGenerationID()
342  {
343    return domain.getGenerationID();
344  }
345
346  /**
347   * Set the generation id - for test purpose.
348   * @param generationID The generation id
349   */
350  public void setGenerationID(long generationID)
351  {
352    domain.setGenerationID(generationID);
353  }
354
355  /**
356   * Compares 2 replication servers addresses and returns true if they both
357   * represent the same replication server instance.
358   * @param rs1Url Replication server 1 address
359   * @param rs2Url Replication server 2 address
360   * @return True if both replication server addresses represent the same
361   * replication server instance, false otherwise.
362   */
363  private static boolean isSameReplicationServerUrl(String rs1Url,
364      String rs2Url)
365  {
366    try
367    {
368      final HostPort hp1 = HostPort.valueOf(rs1Url);
369      final HostPort hp2 = HostPort.valueOf(rs2Url);
370      return hp1.isEquivalentTo(hp2);
371    }
372    catch (RuntimeException ex)
373    {
374      // Not a RS url or not a valid port number: should not happen
375      return false;
376    }
377  }
378
379  /**
380   * Bag class for keeping info we get from a replication server in order to
381   * compute the best one to connect to. This is in fact a wrapper to a
382   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
383   * updated with a info coming from received topology messages or monitoring
384   * messages.
385   */
386  static class ReplicationServerInfo
387  {
388    private RSInfo rsInfo;
389    private final short protocolVersion;
390    private final DN baseDN;
391    private final int windowSize;
392    // @NotNull
393    private final ServerState serverState;
394    private final boolean sslEncryption;
395    private final int degradedStatusThreshold;
396    /** Keeps the 0 value if created with a ReplServerStartMsg. */
397    private int connectedDSNumber;
398    // @NotNull
399    private Set<Integer> connectedDSs;
400    /**
401     * Is this RS locally configured? (the RS is recognized as a usable server).
402     */
403    private boolean locallyConfigured = true;
404
405    /**
406     * Create a new instance of ReplicationServerInfo wrapping the passed
407     * message.
408     * @param msg LocalizableMessage to wrap.
409     * @param newServerURL Override serverURL.
410     * @return The new instance wrapping the passed message.
411     * @throws IllegalArgumentException If the passed message has an unexpected
412     *                                  type.
413     */
414    private static ReplicationServerInfo newInstance(
415      ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
416    {
417      final ReplicationServerInfo rsInfo = newInstance(msg);
418      rsInfo.setServerURL(newServerURL);
419      return rsInfo;
420    }
421
422    /**
423     * Create a new instance of ReplicationServerInfo wrapping the passed
424     * message.
425     * @param msg LocalizableMessage to wrap.
426     * @return The new instance wrapping the passed message.
427     * @throws IllegalArgumentException If the passed message has an unexpected
428     *                                  type.
429     */
430    static ReplicationServerInfo newInstance(ReplicationMsg msg)
431        throws IllegalArgumentException
432    {
433      if (msg instanceof ReplServerStartMsg)
434      {
435        // RS uses protocol V3 or lower
436        return new ReplicationServerInfo((ReplServerStartMsg) msg);
437      }
438      else if (msg instanceof ReplServerStartDSMsg)
439      {
440        // RS uses protocol V4 or higher
441        return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
442      }
443
444      // Unsupported message type: should not happen
445      throw new IllegalArgumentException("Unexpected PDU type: "
446          + msg.getClass().getName() + ":\n" + msg);
447    }
448
449    /**
450     * Constructs a ReplicationServerInfo object wrapping a
451     * {@link ReplServerStartMsg}.
452     *
453     * @param msg
454     *          The {@link ReplServerStartMsg} this object will wrap.
455     */
456    private ReplicationServerInfo(ReplServerStartMsg msg)
457    {
458      this.protocolVersion = msg.getVersion();
459      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
460          msg.getGenerationId(), msg.getGroupId(), 1);
461      this.baseDN = msg.getBaseDN();
462      this.windowSize = msg.getWindowSize();
463      final ServerState ss = msg.getServerState();
464      this.serverState = ss != null ? ss : new ServerState();
465      this.sslEncryption = msg.getSSLEncryption();
466      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
467    }
468
469    /**
470     * Constructs a ReplicationServerInfo object wrapping a
471     * {@link ReplServerStartDSMsg}.
472     *
473     * @param msg
474     *          The {@link ReplServerStartDSMsg} this object will wrap.
475     */
476    private ReplicationServerInfo(ReplServerStartDSMsg msg)
477    {
478      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
479          msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
480      this.protocolVersion = msg.getVersion();
481      this.baseDN = msg.getBaseDN();
482      this.windowSize = msg.getWindowSize();
483      final ServerState ss = msg.getServerState();
484      this.serverState = ss != null ? ss : new ServerState();
485      this.sslEncryption = msg.getSSLEncryption();
486      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
487      this.connectedDSNumber = msg.getConnectedDSNumber();
488    }
489
490    /**
491     * Constructs a new replication server info with the passed RSInfo internal
492     * values and the passed connected DSs.
493     *
494     * @param rsInfo
495     *          The RSinfo to use for the update
496     * @param connectedDSs
497     *          The new connected DSs
498     */
499    ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
500    {
501      this.rsInfo =
502          new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo
503              .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
504      this.protocolVersion = 0;
505      this.baseDN = null;
506      this.windowSize = 0;
507      this.connectedDSs = connectedDSs;
508      this.connectedDSNumber = connectedDSs.size();
509      this.sslEncryption = false;
510      this.degradedStatusThreshold = -1;
511      this.serverState = new ServerState();
512    }
513
514    /**
515     * Get the server state.
516     * @return The server state
517     */
518    public ServerState getServerState()
519    {
520      return serverState;
521    }
522
523    /**
524     * Get the group id.
525     * @return The group id
526     */
527    public byte getGroupId()
528    {
529      return rsInfo.getGroupId();
530    }
531
532    /**
533     * Get the server protocol version.
534     * @return the protocolVersion
535     */
536    public short getProtocolVersion()
537    {
538      return protocolVersion;
539    }
540
541    /**
542     * Get the generation id.
543     * @return the generationId
544     */
545    public long getGenerationId()
546    {
547      return rsInfo.getGenerationId();
548    }
549
550    /**
551     * Get the server id.
552     * @return the serverId
553     */
554    public int getServerId()
555    {
556      return rsInfo.getId();
557    }
558
559    /**
560     * Get the server URL.
561     * @return the serverURL
562     */
563    public String getServerURL()
564    {
565      return rsInfo.getServerUrl();
566    }
567
568    /**
569     * Get the base DN.
570     *
571     * @return the base DN
572     */
573    public DN getBaseDN()
574    {
575      return baseDN;
576    }
577
578    /**
579     * Get the window size.
580     * @return the windowSize
581     */
582    public int getWindowSize()
583    {
584      return windowSize;
585    }
586
587    /**
588     * Get the ssl encryption.
589     * @return the sslEncryption
590     */
591    public boolean isSslEncryption()
592    {
593      return sslEncryption;
594    }
595
596    /**
597     * Get the degraded status threshold.
598     * @return the degradedStatusThreshold
599     */
600    public int getDegradedStatusThreshold()
601    {
602      return degradedStatusThreshold;
603    }
604
605    /**
606     * Get the weight.
607     * @return the weight. Null if this object is a wrapper for
608     * a ReplServerStartMsg.
609     */
610    public int getWeight()
611    {
612      return rsInfo.getWeight();
613    }
614
615    /**
616     * Get the connected DS number.
617     * @return the connectedDSNumber. Null if this object is a wrapper for
618     * a ReplServerStartMsg.
619     */
620    public int getConnectedDSNumber()
621    {
622      return connectedDSNumber;
623    }
624
625    /**
626     * Converts the object to a RSInfo object.
627     * @return The RSInfo object matching this object.
628     */
629    RSInfo toRSInfo()
630    {
631      return rsInfo;
632    }
633
634    /**
635     * Updates replication server info with the passed RSInfo internal values
636     * and the passed connected DSs.
637     * @param rsInfo The RSinfo to use for the update
638     * @param connectedDSs The new connected DSs
639     */
640    private void update(RSInfo rsInfo, Set<Integer> connectedDSs)
641    {
642      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
643          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
644      this.connectedDSs = connectedDSs;
645      this.connectedDSNumber = connectedDSs.size();
646    }
647
648    private void setServerURL(String newServerURL)
649    {
650      rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
651          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
652    }
653
654    /**
655     * Updates replication server info with the passed server state.
656     * @param serverState The ServerState to use for the update
657     */
658    private void update(ServerState serverState)
659    {
660      this.serverState.update(serverState);
661    }
662
663    /**
664     * Get the getConnectedDSs.
665     * @return the getConnectedDSs
666     */
667    public Set<Integer> getConnectedDSs()
668    {
669      return connectedDSs;
670    }
671
672    /**
673     * Gets the locally configured status for this RS.
674     * @return the locallyConfigured
675     */
676    public boolean isLocallyConfigured()
677    {
678      return locallyConfigured;
679    }
680
681    /**
682     * Sets the locally configured status for this RS.
683     * @param locallyConfigured the locallyConfigured to set
684     */
685    public void setLocallyConfigured(boolean locallyConfigured)
686    {
687      this.locallyConfigured = locallyConfigured;
688    }
689
690    /**
691     * Returns a string representation of this object.
692     * @return A string representation of this object.
693     */
694    @Override
695    public String toString()
696    {
697      return "ReplServerInfo Url:" + getServerURL()
698          + " ServerId:" + getServerId()
699          + " GroupId:" + getGroupId()
700          + " connectedDSs:" + connectedDSs;
701    }
702  }
703
704  /**
705   * Contacts all replication servers to get information from them and being
706   * able to choose the more suitable.
707   * @return the collected information.
708   */
709  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
710  {
711    final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>();
712
713    final UnreachableReplicationServers unreachableReplicationServers = getUnreachableReplicationServers();
714    for (String serverUrl : getReplicationServerUrls())
715    {
716      if (unreachableReplicationServers.isUnreachable(HostPort.valueOf(serverUrl)))
717      {
718        continue;
719      }
720      // Connect to server + get and store info about it
721      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
722      final ReplicationServerInfo rsInfo = rs.rsInfo;
723      if (rsInfo != null)
724      {
725        rsInfos.put(rsInfo.getServerId(), rsInfo);
726      }
727    }
728
729    return rsInfos;
730  }
731
732  /**
733   * Connect to a ReplicationServer.
734   *
735   * Handshake sequences between a DS and a RS is divided into 2 logical
736   * consecutive phases (phase 1 and phase 2). DS always initiates connection
737   * and always sends first message:
738   *
739   * DS<->RS:
740   * -------
741   *
742   * phase 1:
743   * DS --- ServerStartMsg ---> RS
744   * DS <--- ReplServerStartDSMsg --- RS
745   * phase 2:
746   * DS --- StartSessionMsg ---> RS
747   * DS <--- TopologyMsg --- RS
748   *
749   * Before performing a full handshake sequence, DS searches for best suitable
750   * RS by making only phase 1 handshake to every RS he knows then closing
751   * connection. This allows to gather information on available RSs and then
752   * decide with which RS the full handshake (phase 1 then phase 2) will be
753   * finally performed.
754   *
755   * @throws NumberFormatException address was invalid
756   */
757  private void connectAsDataServer()
758  {
759    /*
760     * If a first connect or a connection failure occur, we go through here.
761     * force status machine to NOT_CONNECTED_STATUS so that monitoring can see
762     * that we are not connected.
763     */
764    domain.toNotConnectedStatus();
765
766    /*
767    Stop any existing heartbeat monitor and changeTime publisher
768    from a previous session.
769    */
770    stopRSHeartBeatMonitoring();
771    stopChangeTimeHeartBeatPublishing();
772    mustRunBestServerCheckingAlgorithm = 0;
773
774    synchronized (connectPhaseLock)
775    {
776      final int serverId = getServerId();
777      final DN baseDN = getBaseDN();
778
779      /*
780       * Connect to each replication server and get their ServerState then find
781       * out which one is the best to connect to.
782       */
783      if (logger.isTraceEnabled())
784      {
785        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one");
786      }
787
788      // Get info from every available replication servers
789      Map<Integer, ReplicationServerInfo> rsInfos = collectReplicationServersInfo();
790      computeNewTopology(toRSInfos(rsInfos));
791
792      if (rsInfos.isEmpty())
793      {
794        setConnectedRS(ConnectedRS.noConnectedRS());
795      }
796      else
797      {
798        // At least one server answered, find the best one.
799        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
800            rsInfos, serverId, getGroupId(), getGenerationID());
801
802        // Best found, now initialize connection to this one (handshake phase 1)
803        if (logger.isTraceEnabled())
804        {
805          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS());
806        }
807
808        final ConnectedRS electedRS = performPhaseOneHandshake(
809            evals.getBestRS().getServerURL(), true);
810        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
811        if (electedRsInfo != null)
812        {
813          /*
814          Update replication server info with potentially more up to date
815          data (server state for instance may have changed)
816          */
817          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
818
819          // Handshake phase 1 exchange went well
820
821          // Compute in which status we are starting the session to tell the RS
822          final ServerStatus initStatus = computeInitialServerStatus(
823              electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
824              electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
825
826          // Perform session start (handshake phase 2)
827          final TopologyMsg topologyMsg =
828              performPhaseTwoHandshake(electedRS, initStatus);
829
830          if (topologyMsg != null) // Handshake phase 2 exchange went well
831          {
832            connectToReplicationServer(electedRS, initStatus, topologyMsg);
833          } // Could perform handshake phase 2 with best
834        } // Could perform handshake phase 1 with best
835      }
836
837      // connectedRS has been updated by calls above, reload it
838      final ConnectedRS rs = connectedRS.get();
839      if (rs.isConnected())
840      {
841        connectPhaseLock.notify();
842
843        final long rsGenId = rs.rsInfo.getGenerationId();
844        final int rsServerId = rs.rsInfo.getServerId();
845        if (rsGenId == getGenerationID() || rsGenId == -1)
846        {
847          logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
848              rs.replicationServer, getGenerationID());
849        }
850        else
851        {
852          logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
853              rs.replicationServer, getGenerationID(), rsGenId);
854        }
855      }
856      else
857      {
858         // This server could not find any replicationServer.
859         // It's going to start in degraded mode. Log a message.
860        if (!connectionError)
861        {
862          connectionError = true;
863          connectPhaseLock.notify();
864
865          if (!rsInfos.isEmpty())
866          {
867            logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN,
868                Utils.joinAsString(", ", rsInfos.keySet()));
869          }
870          else
871          {
872            logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN);
873          }
874        }
875      }
876    }
877  }
878
879  private void computeNewTopology(List<RSInfo> newRSInfos)
880  {
881    final int rsServerId = getRsServerId();
882
883    Topology oldTopo;
884    Topology newTopo;
885    do
886    {
887      oldTopo = topology.get();
888      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
889          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
890    }
891    while (!topology.compareAndSet(oldTopo, newTopo));
892
893    if (logger.isTraceEnabled())
894    {
895      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
896    }
897  }
898
899  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
900      Topology newTopo)
901  {
902    final StringBuilder sb = new StringBuilder();
903    sb.append("rsServerId=").append(rsServerId);
904    if (newTopo.equals(oldTopo))
905    {
906      sb.append(", unchangedTopology=").append(newTopo);
907    }
908    else
909    {
910      sb.append(", oldTopology=").append(oldTopo);
911      sb.append(", newTopology=").append(newTopo);
912    }
913    return sb;
914  }
915
916  /**
917   * Connects to a replication server.
918   *
919   * @param rs
920   *          the Replication Server to connect to
921   * @param initStatus
922   *          The status to enter the state machine with
923   * @param topologyMsg
924   *          the message containing the topology information
925   */
926  private void connectToReplicationServer(ConnectedRS rs,
927      ServerStatus initStatus, TopologyMsg topologyMsg)
928  {
929    final DN baseDN = getBaseDN();
930    final ReplicationServerInfo rsInfo = rs.rsInfo;
931
932    boolean connectCompleted = false;
933    try
934    {
935      maxSendWindow = rsInfo.getWindowSize();
936
937      receiveTopo(topologyMsg, rs.getServerId());
938
939      /*
940      Log a message to let the administrator know that the failure was resolved.
941      Wake up all the thread that were waiting on the window
942      on the previous connection.
943      */
944      connectionError = false;
945      if (sendWindow != null)
946      {
947        /*
948         * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
949         * this semaphore will get blocked when they acquire it. However, we
950         * also need to make sure that we don't overflow the semaphore by
951         * releasing too many permits.
952         */
953        final int MAX_PERMITS = Integer.MAX_VALUE >>> 2;
954        if (sendWindow.availablePermits() < MAX_PERMITS)
955        {
956          /*
957           * At least 2^29 acquisitions would need to occur for this to be
958           * insufficient. In addition, at least 2^30 releases would need to
959           * occur for this to potentially overflow. Hopefully this is unlikely
960           * to happen.
961           */
962          sendWindow.release(MAX_PERMITS);
963        }
964      }
965      sendWindow = new Semaphore(maxSendWindow);
966      rcvWindow = getMaxRcvWindow();
967
968      domain.sessionInitiated(initStatus, rsInfo.getServerState());
969
970      final byte groupId = getGroupId();
971      if (rs.getGroupId() != groupId)
972      {
973        /*
974        Connected to replication server with wrong group id:
975        warn user and start heartbeat monitor to recover when a server
976        with the right group id shows up.
977        */
978        logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID,
979            groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId());
980      }
981      startRSHeartBeatMonitoring(rs);
982      if (rsInfo.getProtocolVersion() >=
983        ProtocolVersion.REPLICATION_PROTOCOL_V3)
984      {
985        startChangeTimeHeartBeatPublishing(rs);
986      }
987      connectCompleted = true;
988    }
989    catch (Exception e)
990    {
991      logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(),
992          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
993    }
994    finally
995    {
996      if (!connectCompleted)
997      {
998        setConnectedRS(ConnectedRS.noConnectedRS());
999      }
1000    }
1001  }
1002
1003  /**
1004   * Determines the status we are starting with according to our state and the
1005   * RS state.
1006   *
1007   * @param rsGenId The generation id of the RS
1008   * @param rsState The server state of the RS
1009   * @param degradedStatusThreshold The degraded status threshold of the RS
1010   * @param dsGenId The local generation id
1011   * @return The initial status
1012   */
1013  private ServerStatus computeInitialServerStatus(long rsGenId,
1014    ServerState rsState, int degradedStatusThreshold, long dsGenId)
1015  {
1016    if (rsGenId == -1)
1017    {
1018      // RS has no generation id
1019      return ServerStatus.NORMAL_STATUS;
1020    }
1021    else if (rsGenId != dsGenId)
1022    {
1023      // DS and RS do not have same generation id
1024      return ServerStatus.BAD_GEN_ID_STATUS;
1025    }
1026    else
1027    {
1028      /*
1029      DS and RS have same generation id
1030
1031      Determine if we are late or not to replay changes. RS uses a
1032      threshold value for pending changes to be replayed by a DS to
1033      determine if the DS is in normal status or in degraded status.
1034      Let's compare the local and remote server state using  this threshold
1035      value to determine if we are late or not
1036      */
1037
1038      int nChanges = ServerState.diffChanges(rsState, state);
1039      if (logger.isTraceEnabled())
1040      {
1041        debugInfo("computed " + nChanges + " changes late.");
1042      }
1043
1044      /*
1045      Check status to know if it is relevant to change the status. Do not
1046      take RSD lock to test. If we attempt to change the status whereas
1047      we are in a status that do not allows that, this will be noticed by
1048      the changeStatusFromStatusAnalyzer method. This allows to take the
1049      lock roughly only when needed versus every sleep time timeout.
1050      */
1051      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
1052      {
1053        return ServerStatus.DEGRADED_STATUS;
1054      }
1055      // degradedStatusThreshold value of '0' means no degrading system used
1056      // (no threshold): force normal status
1057      return ServerStatus.NORMAL_STATUS;
1058    }
1059  }
1060
1061
1062
1063  /**
1064   * Connect to the provided server performing the first phase handshake (start
1065   * messages exchange) and return the reply message from the replication
1066   * server, wrapped in a ReplicationServerInfo object.
1067   *
1068   * @param serverURL
1069   *          Server to connect to.
1070   * @param keepSession
1071   *          Do we keep session opened or not after handshake. Use true if want
1072   *          to perform handshake phase 2 with the same session and keep the
1073   *          session to create as the current one.
1074   * @return The answer from the server . Null if could not get an answer.
1075   */
1076  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
1077  {
1078    Session newSession = null;
1079    Socket socket = null;
1080    boolean hasConnected = false;
1081    LocalizableMessage errorMessage = null;
1082
1083    try
1084    {
1085      // Open a socket connection to the next candidate.
1086      socket = new Socket();
1087      socket.setReceiveBufferSize(1000000);
1088      socket.setTcpNoDelay(true);
1089      if (config.getSourceAddress() != null)
1090      {
1091        InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0);
1092        socket.bind(local);
1093      }
1094      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
1095      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
1096      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
1097      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
1098
1099      // Send our ServerStartMsg.
1100      final HostPort hp = new HostPort(
1101          socket.getLocalAddress().getHostName(), socket.getLocalPort());
1102      final String url = hp.toString();
1103      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
1104          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
1105          getGenerationID(), isSslEncryption, getGroupId());
1106      newSession.publish(serverStartMsg);
1107
1108      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
1109      // come back.
1110      ReplicationMsg msg = newSession.receive();
1111      if (logger.isTraceEnabled())
1112      {
1113        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
1114            + msg);
1115      }
1116
1117      // Wrap received message in a server info object
1118      final ReplicationServerInfo replServerInfo =
1119          ReplicationServerInfo.newInstance(msg, serverURL);
1120
1121      // Sanity check
1122      final DN repDN = replServerInfo.getBaseDN();
1123      if (!getBaseDN().equals(repDN))
1124      {
1125        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN());
1126        return setConnectedRS(ConnectedRS.noConnectedRS());
1127      }
1128
1129      /*
1130       * We have sent our own protocol version to the replication server. The
1131       * replication server will use the same one (or an older one if it is an
1132       * old replication server).
1133       */
1134      newSession.setProtocolVersion(
1135          getCompatibleVersion(replServerInfo.getProtocolVersion()));
1136
1137      if (!isSslEncryption)
1138      {
1139        newSession.stopEncryption();
1140      }
1141
1142      hasConnected = true;
1143
1144      if (keepSession)
1145      {
1146        // cannot store it yet,
1147        // only store after a successful phase two handshake
1148        return new ConnectedRS(replServerInfo, newSession);
1149      }
1150      return new ConnectedRS(replServerInfo, null);
1151    }
1152    catch (ConnectException e)
1153    {
1154      logger.traceException(e);
1155      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN());
1156    }
1157    catch (SocketTimeoutException e)
1158    {
1159      logger.traceException(e);
1160      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN());
1161    }
1162    catch (Exception e)
1163    {
1164      logger.traceException(e);
1165      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
1166          getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e));
1167    }
1168    finally
1169    {
1170      if (!hasConnected || !keepSession)
1171      {
1172        close(newSession);
1173        close(socket);
1174      }
1175
1176      if (!hasConnected && errorMessage != null && !connectionError)
1177      {
1178        // There was no server waiting on this host:port
1179        // Log a notice and will try the next replicationServer in the list
1180        if (keepSession) // Log error message only for final connection
1181        {
1182          // log the error message only once to avoid overflowing the error log
1183          logger.error(errorMessage);
1184        }
1185
1186        logger.trace(errorMessage);
1187      }
1188    }
1189    return setConnectedRS(ConnectedRS.noConnectedRS());
1190  }
1191
1192  /**
1193   * Performs the second phase handshake (send StartSessionMsg and receive
1194   * TopologyMsg messages exchange) and return the reply message from the
1195   * replication server.
1196   *
1197   * @param electedRS Server we are connecting with.
1198   * @param initStatus The status we are starting with
1199   * @return The ReplServerStartMsg the server replied. Null if could not
1200   *         get an answer.
1201   */
1202  private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
1203    ServerStatus initStatus)
1204  {
1205    try
1206    {
1207      // Send our StartSessionMsg.
1208      final StartSessionMsg startSessionMsg;
1209      startSessionMsg = new StartSessionMsg(
1210          initStatus,
1211          domain.getRefUrls(),
1212          domain.isAssured(),
1213          domain.getAssuredMode(),
1214          domain.getAssuredSdLevel());
1215      startSessionMsg.setEclIncludes(
1216          domain.getEclIncludes(domain.getServerId()),
1217          domain.getEclIncludesForDeletes(domain.getServerId()));
1218      final Session session = electedRS.session;
1219      session.publish(startSessionMsg);
1220
1221      // Read the TopologyMsg that should come back.
1222      final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
1223
1224      if (logger.isTraceEnabled())
1225      {
1226        debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
1227            + "\nAND RECEIVED:\n" + topologyMsg);
1228      }
1229
1230      // Alright set the timeout to the desired value
1231      session.setSoTimeout(timeout);
1232      setConnectedRS(electedRS);
1233      return topologyMsg;
1234    }
1235    catch (Exception e)
1236    {
1237      logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE,
1238          getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e));
1239
1240      setConnectedRS(ConnectedRS.noConnectedRS());
1241      return null;
1242    }
1243  }
1244
1245  /**
1246   * Class holding evaluation results for electing the best replication server
1247   * for the local directory server.
1248   */
1249  static class RSEvaluations
1250  {
1251    private final int localServerId;
1252    private Map<Integer, ReplicationServerInfo> bestRSs;
1253    private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>();
1254
1255    /**
1256     * Ctor.
1257     *
1258     * @param localServerId
1259     *          the serverId for the local directory server
1260     * @param rsInfos
1261     *          a Map of serverId => {@link ReplicationServerInfo} with all the
1262     *          candidate replication servers
1263     */
1264    RSEvaluations(int localServerId,
1265        Map<Integer, ReplicationServerInfo> rsInfos)
1266    {
1267      this.localServerId = localServerId;
1268      this.bestRSs = rsInfos;
1269    }
1270
1271    private boolean keepBest(LocalEvaluation eval)
1272    {
1273      if (eval.hasAcceptedAny())
1274      {
1275        bestRSs = eval.getAccepted();
1276        rsEvals.putAll(eval.getRejected());
1277        return true;
1278      }
1279      return false;
1280    }
1281
1282    /**
1283     * Sets the elected best replication server, rejecting all the other
1284     * replication servers with the supplied evaluation.
1285     *
1286     * @param bestRsId
1287     *          the serverId of the elected replication server
1288     * @param rejectedRSsEval
1289     *          the evaluation for all the rejected replication servers
1290     */
1291    private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval)
1292    {
1293      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
1294          this.bestRSs.entrySet().iterator(); it.hasNext();)
1295      {
1296        final Entry<Integer, ReplicationServerInfo> entry = it.next();
1297        final Integer rsId = entry.getKey();
1298        final ReplicationServerInfo rsInfo = entry.getValue();
1299        if (rsInfo.getServerId() != bestRsId)
1300        {
1301          it.remove();
1302        }
1303        rsEvals.put(rsId, rejectedRSsEval);
1304      }
1305    }
1306
1307    private void discardAll(LocalizableMessage eval)
1308    {
1309      for (Integer rsId : bestRSs.keySet())
1310      {
1311        rsEvals.put(rsId, eval);
1312      }
1313    }
1314
1315    private boolean foundBestRS()
1316    {
1317      return bestRSs.size() == 1;
1318    }
1319
1320    /**
1321     * Returns the {@link ReplicationServerInfo} for the best replication
1322     * server.
1323     *
1324     * @return the {@link ReplicationServerInfo} for the best replication server
1325     */
1326    ReplicationServerInfo getBestRS()
1327    {
1328      if (foundBestRS())
1329      {
1330        return bestRSs.values().iterator().next();
1331      }
1332      return null;
1333    }
1334
1335    /**
1336     * Returns the evaluations for all the candidate replication servers.
1337     *
1338     * @return a Map of serverId => LocalizableMessage containing the evaluation for each
1339     *         candidate replication servers.
1340     */
1341    Map<Integer, LocalizableMessage> getEvaluations()
1342    {
1343      if (foundBestRS())
1344      {
1345        final Integer bestRSServerId = getBestRS().getServerId();
1346        if (rsEvals.get(bestRSServerId) == null)
1347        {
1348          final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
1349          rsEvals.put(bestRSServerId, eval);
1350        }
1351      }
1352      return Collections.unmodifiableMap(rsEvals);
1353    }
1354
1355    /**
1356     * Returns the evaluation for the supplied replication server Id.
1357     * <p>
1358     * Note: "unknown RS" message is returned if the supplied replication server
1359     * was not part of the candidate replication servers.
1360     *
1361     * @param rsServerId
1362     *          the supplied replication server Id
1363     * @return the evaluation {@link LocalizableMessage} for the supplied replication
1364     *         server Id
1365     */
1366    private LocalizableMessage getEvaluation(int rsServerId)
1367    {
1368      final LocalizableMessage evaluation = getEvaluations().get(rsServerId);
1369      if (evaluation != null)
1370      {
1371        return evaluation;
1372      }
1373      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
1374    }
1375
1376    /** {@inheritDoc} */
1377    @Override
1378    public String toString()
1379    {
1380      return "Current best replication server Ids: " + bestRSs.keySet()
1381          + ", Evaluation of connected replication servers"
1382          + " (ServerId => Evaluation): " + rsEvals.keySet()
1383          + ", Any replication server not appearing here"
1384          + " could not be contacted.";
1385    }
1386  }
1387
1388  /**
1389   * Evaluation local to one filter.
1390   */
1391  private static class LocalEvaluation
1392  {
1393    private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>();
1394    private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>();
1395
1396    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
1397    {
1398      // forget previous eval, including undoing reject
1399      this.rsEvals.remove(rsInfo);
1400      this.accepted.put(rsId, rsInfo);
1401    }
1402
1403    private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason)
1404    {
1405      this.accepted.remove(rsInfo.getServerId()); // undo accept
1406      this.rsEvals.put(rsInfo, reason);
1407    }
1408
1409    private Map<Integer, ReplicationServerInfo> getAccepted()
1410    {
1411      return accepted;
1412    }
1413
1414    private ReplicationServerInfo[] getAcceptedRSInfos()
1415    {
1416      return accepted.values().toArray(
1417          new ReplicationServerInfo[accepted.size()]);
1418    }
1419
1420    public Map<Integer, LocalizableMessage> getRejected()
1421    {
1422      final Map<Integer, LocalizableMessage> result = new HashMap<>();
1423      for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet())
1424      {
1425        result.put(entry.getKey().getServerId(), entry.getValue());
1426      }
1427      return result;
1428    }
1429
1430    private boolean hasAcceptedAny()
1431    {
1432      return !accepted.isEmpty();
1433    }
1434
1435  }
1436
1437  /**
1438   * Returns the replication server that best fits our need so that we can
1439   * connect to it or determine if we must disconnect from current one to
1440   * re-connect to best server.
1441   * <p>
1442   * Note: this method is static for test purpose (access from unit tests)
1443   *
1444   * @param firstConnection True if we run this method for the very first
1445   * connection of the broker. False if we run this method to determine if the
1446   * replication server we are currently connected to is still the best or not.
1447   * @param rsServerId The id of the replication server we are currently
1448   * connected to. Only used when firstConnection is false.
1449   * @param myState The local server state.
1450   * @param rsInfos The list of available replication servers and their
1451   * associated information (choice will be made among them).
1452   * @param localServerId The server id for the suffix we are working for.
1453   * @param groupId The groupId we prefer being connected to if possible
1454   * @param generationId The generation id we are using
1455   * @return The computed best replication server. If the returned value is
1456   * null, the best replication server is undetermined but the local server must
1457   * disconnect (so the best replication server is another one than the current
1458   * one). Null can only be returned when firstConnection is false.
1459   */
1460  static RSEvaluations computeBestReplicationServer(
1461      boolean firstConnection, int rsServerId, ServerState myState,
1462      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
1463      byte groupId, long generationId)
1464  {
1465    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
1466    // Shortcut, if only one server, this is the best
1467    if (evals.foundBestRS())
1468    {
1469      return evals;
1470    }
1471
1472    /**
1473     * Apply some filtering criteria to determine the best servers list from
1474     * the available ones. The ordered list of criteria is (from more important
1475     * to less important):
1476     * - replication server has the same group id as the local DS one
1477     * - replication server has the same generation id as the local DS one
1478     * - replication server is up to date regarding changes generated by the
1479     *   local DS
1480     * - replication server in the same VM as local DS one
1481     */
1482    /*
1483    The list of best replication servers is filtered with each criteria. At
1484    each criteria, the list is replaced with the filtered one if there
1485    are some servers from the filtering, otherwise, the list is left as is
1486    and the new filtering for the next criteria is applied and so on.
1487
1488    Use only servers locally configured: those are servers declared in
1489    the local configuration. When the current method is called, for
1490    sure, at least one server from the list is locally configured
1491    */
1492    filterServersLocallyConfigured(evals, localServerId);
1493    // Some servers with same group id ?
1494    filterServersWithSameGroupId(evals, localServerId, groupId);
1495    // Some servers with same generation id ?
1496    final boolean rssWithSameGenerationIdExist =
1497        filterServersWithSameGenerationId(evals, localServerId, generationId);
1498    if (rssWithSameGenerationIdExist)
1499    {
1500      // If some servers with the right generation id this is useful to
1501      // run the local DS change criteria
1502      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
1503    }
1504    // Some servers in the local VM or local host?
1505    filterServersOnSameHost(evals, localServerId);
1506
1507    if (evals.foundBestRS())
1508    {
1509      return evals;
1510    }
1511
1512    /**
1513     * Now apply the choice based on the weight to the best servers list
1514     */
1515    if (firstConnection)
1516    {
1517      // We are not connected to a server yet
1518      computeBestServerForWeight(evals, -1, -1);
1519    }
1520    else
1521    {
1522      /*
1523       * We are already connected to a RS: compute the best RS as far as the
1524       * weights is concerned. If this is another one, some DS must disconnect.
1525       */
1526      computeBestServerForWeight(evals, rsServerId, localServerId);
1527    }
1528    return evals;
1529  }
1530
1531  /**
1532   * Creates a new list that contains only replication servers that are locally
1533   * configured.
1534   * @param evals The evaluation object
1535   */
1536  private static void filterServersLocallyConfigured(RSEvaluations evals,
1537      int localServerId)
1538  {
1539    final LocalEvaluation eval = new LocalEvaluation();
1540    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1541    {
1542      final Integer rsId = entry.getKey();
1543      final ReplicationServerInfo rsInfo = entry.getValue();
1544      if (rsInfo.isLocallyConfigured())
1545      {
1546        eval.accept(rsId, rsInfo);
1547      }
1548      else
1549      {
1550        eval.reject(rsInfo,
1551            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
1552      }
1553    }
1554    evals.keepBest(eval);
1555  }
1556
1557  /**
1558   * Creates a new list that contains only replication servers that have the
1559   * passed group id, from a passed replication server list.
1560   * @param evals The evaluation object
1561   * @param groupId The group id that must match
1562   */
1563  private static void filterServersWithSameGroupId(RSEvaluations evals,
1564      int localServerId, byte groupId)
1565  {
1566    final LocalEvaluation eval = new LocalEvaluation();
1567    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1568    {
1569      final Integer rsId = entry.getKey();
1570      final ReplicationServerInfo rsInfo = entry.getValue();
1571      if (rsInfo.getGroupId() == groupId)
1572      {
1573        eval.accept(rsId, rsInfo);
1574      }
1575      else
1576      {
1577        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
1578            rsId, rsInfo.getGroupId(), localServerId, groupId));
1579      }
1580    }
1581    evals.keepBest(eval);
1582  }
1583
1584  /**
1585   * Creates a new list that contains only replication servers that have the
1586   * provided generation id, from a provided replication server list.
1587   * When the selected replication servers have no change (empty serverState)
1588   * then the 'empty'(generationId==-1) replication servers are also included
1589   * in the result list.
1590   *
1591   * @param evals The evaluation object
1592   * @param generationId The generation id that must match
1593   * @return whether some replication server passed the filter
1594   */
1595  private static boolean filterServersWithSameGenerationId(
1596      RSEvaluations evals, long localServerId, long generationId)
1597  {
1598    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1599    final LocalEvaluation eval = new LocalEvaluation();
1600    boolean emptyState = true;
1601
1602    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1603    {
1604      final Integer rsId = entry.getKey();
1605      final ReplicationServerInfo rsInfo = entry.getValue();
1606      if (rsInfo.getGenerationId() == generationId)
1607      {
1608        eval.accept(rsId, rsInfo);
1609        if (!rsInfo.serverState.isEmpty())
1610        {
1611          emptyState = false;
1612        }
1613      }
1614      else if (rsInfo.getGenerationId() == -1)
1615      {
1616        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
1617            generationId, localServerId));
1618      }
1619      else
1620      {
1621        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
1622            rsId, rsInfo.getGenerationId(), localServerId, generationId));
1623      }
1624    }
1625
1626    if (emptyState)
1627    {
1628      // If the RS with a generationId have all an empty state,
1629      // then the 'empty'(genId=-1) RSes are also candidate
1630      for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1631      {
1632        ReplicationServerInfo rsInfo = entry.getValue();
1633        if (rsInfo.getGenerationId() == -1)
1634        {
1635          // will undo the reject of previously rejected RSs
1636          eval.accept(entry.getKey(), rsInfo);
1637        }
1638      }
1639    }
1640
1641    return evals.keepBest(eval);
1642  }
1643
1644  /**
1645   * Creates a new list that contains only replication servers that have the
1646   * latest changes from the passed DS, from a passed replication server list.
1647   * @param evals The evaluation object
1648   * @param localState The state of the local DS
1649   * @param localServerId The server id to consider for the changes
1650   */
1651  private static void filterServersWithAllLocalDSChanges(
1652      RSEvaluations evals, ServerState localState, int localServerId)
1653  {
1654    // Extract the CSN of the latest change generated by the local server
1655    final CSN localCSN = getCSN(localState, localServerId);
1656
1657    /**
1658     * Find replication servers that are up to date (or more up to date than us,
1659     * if for instance we failed and restarted, having sent some changes to the
1660     * RS but without having time to store our own state) regarding our own
1661     * server id. If some servers are more up to date, prefer this list but take
1662     * only the latest CSN.
1663     */
1664    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
1665    boolean foundRSMoreUpToDateThanLocalDS = false;
1666    CSN latestRsCSN = null;
1667    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1668    {
1669      final Integer rsId = entry.getKey();
1670      final ReplicationServerInfo rsInfo = entry.getValue();
1671      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
1672
1673      // Has this replication server the latest local change ?
1674      if (rsCSN.isOlderThan(localCSN))
1675      {
1676        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
1677            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1678      }
1679      else if (rsCSN.equals(localCSN))
1680      {
1681        // This replication server has exactly the latest change from the
1682        // local server
1683        if (!foundRSMoreUpToDateThanLocalDS)
1684        {
1685          mostUpToDateEval.accept(rsId, rsInfo);
1686        }
1687        else
1688        {
1689          mostUpToDateEval.reject(rsInfo,
1690            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1691              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1692        }
1693      }
1694      else if (rsCSN.isNewerThan(localCSN))
1695      {
1696        // This replication server is even more up to date than the local server
1697        if (latestRsCSN == null)
1698        {
1699          foundRSMoreUpToDateThanLocalDS = true;
1700          // all previous results are now outdated, reject them all
1701          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1702              localCSN);
1703          // Initialize the latest CSN
1704          latestRsCSN = rsCSN;
1705        }
1706
1707        if (rsCSN.equals(latestRsCSN))
1708        {
1709          mostUpToDateEval.accept(rsId, rsInfo);
1710        }
1711        else if (rsCSN.isNewerThan(latestRsCSN))
1712        {
1713          // This RS is even more up to date, reject all previously accepted RSs
1714          // and store this new RS
1715          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1716              localCSN);
1717          mostUpToDateEval.accept(rsId, rsInfo);
1718          latestRsCSN = rsCSN;
1719        }
1720        else
1721        {
1722          mostUpToDateEval.reject(rsInfo,
1723            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1724              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1725        }
1726      }
1727    }
1728    evals.keepBest(mostUpToDateEval);
1729  }
1730
1731  private static CSN getCSN(ServerState state, int serverId)
1732  {
1733    final CSN csn = state.getCSN(serverId);
1734    if (csn != null)
1735    {
1736      return csn;
1737    }
1738    return new CSN(0, 0, serverId);
1739  }
1740
1741  private static void rejectAllWithRSIsLaterThanBestRS(
1742      final LocalEvaluation eval, int localServerId, CSN localCSN)
1743  {
1744    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1745    {
1746      final String rsCSN =
1747          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
1748      final LocalizableMessage reason =
1749          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1750            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
1751      eval.reject(rsInfo, reason);
1752    }
1753  }
1754
1755  /**
1756   * Creates a new list that contains only replication servers that are on the
1757   * same host as the local DS, from a passed replication server list. This
1758   * method will gives priority to any replication server which is in the same
1759   * VM as this DS.
1760   *
1761   * @param evals The evaluation object
1762   */
1763  private static void filterServersOnSameHost(RSEvaluations evals,
1764      int localServerId)
1765  {
1766    /*
1767     * Initially look for all servers on the same host. If we find one in the
1768     * same VM, then narrow the search.
1769     */
1770    boolean foundRSInSameVM = false;
1771    final LocalEvaluation eval = new LocalEvaluation();
1772    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1773    {
1774      final Integer rsId = entry.getKey();
1775      final ReplicationServerInfo rsInfo = entry.getValue();
1776      final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
1777      if (hp.isLocalAddress())
1778      {
1779        if (isLocalReplicationServerPort(hp.getPort()))
1780        {
1781          if (!foundRSInSameVM)
1782          {
1783            // An RS in the same VM will always have priority.
1784            // Narrow the search to only include servers in this VM.
1785            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
1786            foundRSInSameVM = true;
1787          }
1788          eval.accept(rsId, rsInfo);
1789        }
1790        else if (!foundRSInSameVM)
1791        {
1792          // OK, accept RSs on the same machine because we have not found an RS
1793          // in the same VM yet
1794          eval.accept(rsId, rsInfo);
1795        }
1796        else
1797        {
1798          // Skip: we have found some RSs in the same VM, but this RS is not.
1799          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
1800              localServerId));
1801        }
1802      }
1803      else
1804      {
1805        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
1806            localServerId));
1807      }
1808    }
1809    evals.keepBest(eval);
1810  }
1811
1812  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
1813      int localServerId)
1814  {
1815    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1816    {
1817      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
1818          rsInfo.getServerId(), localServerId));
1819    }
1820  }
1821
1822  /**
1823   * Computes the best replication server the local server should be connected
1824   * to so that the load is correctly spread across the topology, following the
1825   * weights guidance.
1826   * Warning: This method is expected to be called with at least 2 servers in
1827   * bestServers
1828   * Note: this method is static for test purpose (access from unit tests)
1829   * @param evals The evaluation object
1830   * @param currentRsServerId The replication server the local server is
1831   *        currently connected to. -1 if the local server is not yet connected
1832   *        to any replication server.
1833   * @param localServerId The server id of the local server. This is not used
1834   *        when it is not connected to a replication server
1835   *        (currentRsServerId = -1)
1836   */
1837  static void computeBestServerForWeight(RSEvaluations evals,
1838      int currentRsServerId, int localServerId)
1839  {
1840    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1841    /*
1842     * - Compute the load goal of each RS, deducing it from the weights affected
1843     * to them.
1844     * - Compute the current load of each RS, deducing it from the DSs
1845     * currently connected to them.
1846     * - Compute the differences between the load goals and the current loads of
1847     * the RSs.
1848     */
1849    // Sum of the weights
1850    int sumOfWeights = 0;
1851    // Sum of the connected DSs
1852    int sumOfConnectedDSs = 0;
1853    for (ReplicationServerInfo rsInfo : bestServers.values())
1854    {
1855      sumOfWeights += rsInfo.getWeight();
1856      sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
1857    }
1858
1859    // Distance (difference) of the current loads to the load goals of each RS:
1860    // key:server id, value: distance
1861    Map<Integer, BigDecimal> loadDistances = new HashMap<>();
1862    // Precision for the operations (number of digits after the dot)
1863    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1864    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1865    {
1866      final Integer rsId = entry.getKey();
1867      final ReplicationServerInfo rsInfo = entry.getValue();
1868
1869      //  load goal = rs weight / sum of weights
1870      BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
1871          BigDecimal.valueOf(sumOfWeights), mathContext);
1872      BigDecimal currentLoadBd = BigDecimal.ZERO;
1873      if (sumOfConnectedDSs != 0)
1874      {
1875        // current load = number of connected DSs / total number of DSs
1876        int connectedDSs = rsInfo.getConnectedDSNumber();
1877        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
1878            BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
1879      }
1880      // load distance = load goal - current load
1881      BigDecimal loadDistanceBd =
1882        loadGoalBd.subtract(currentLoadBd, mathContext);
1883      loadDistances.put(rsId, loadDistanceBd);
1884    }
1885
1886    if (currentRsServerId == -1)
1887    {
1888      // The local server is not connected yet, find best server to connect to,
1889      // taking the weights into account.
1890      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
1891    }
1892    else
1893    {
1894      // The local server is currently connected to a RS, let's see if it must
1895      // disconnect or not, taking the weights into account.
1896      computeBestServerWhenConnected(evals, loadDistances, localServerId,
1897          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
1898    }
1899  }
1900
1901  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
1902      Map<Integer, BigDecimal> loadDistances, int localServerId)
1903  {
1904    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1905    /*
1906     * Find the server with the current highest distance to its load goal and
1907     * choose it. Make an exception if every server is correctly balanced,
1908     * that is every current load distances are equal to 0, in that case,
1909     * choose the server with the highest weight
1910     */
1911    int bestRsId = 0; // If all server equal, return the first one
1912    float highestDistance = Float.NEGATIVE_INFINITY;
1913    boolean allRsWithZeroDistance = true;
1914    int highestWeightRsId = -1;
1915    int highestWeight = -1;
1916    for (Integer rsId : bestServers.keySet())
1917    {
1918      float loadDistance = loadDistances.get(rsId).floatValue();
1919      if (loadDistance > highestDistance)
1920      {
1921        // This server is far more from its balance point
1922        bestRsId = rsId;
1923        highestDistance = loadDistance;
1924      }
1925      if (loadDistance != 0)
1926      {
1927        allRsWithZeroDistance = false;
1928      }
1929      int weight = bestServers.get(rsId).getWeight();
1930      if (weight > highestWeight)
1931      {
1932        // This server has a higher weight
1933        highestWeightRsId = rsId;
1934        highestWeight = weight;
1935      }
1936    }
1937    // All servers with a 0 distance ?
1938    if (allRsWithZeroDistance)
1939    {
1940      // Choose server with the highest weight
1941      bestRsId = highestWeightRsId;
1942    }
1943    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
1944        bestRsId));
1945  }
1946
1947  private static void computeBestServerWhenConnected(RSEvaluations evals,
1948      Map<Integer, BigDecimal> loadDistances, int localServerId,
1949      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
1950  {
1951    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1952    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1953    float currentLoadDistance =
1954      loadDistances.get(currentRsServerId).floatValue();
1955    if (currentLoadDistance < 0)
1956    {
1957      /*
1958      Too much DSs connected to the current RS, compared with its load
1959      goal:
1960      Determine the potential number of DSs to disconnect from the current
1961      RS and see if the local DS is part of them: the DSs that must
1962      disconnect are those with the lowest server id.
1963      Compute the sum of the distances of the load goals of the other RSs
1964      */
1965      BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
1966      for (Integer rsId : bestServers.keySet())
1967      {
1968        if (rsId != currentRsServerId)
1969        {
1970          sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
1971            loadDistances.get(rsId), mathContext);
1972        }
1973      }
1974
1975      if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
1976      {
1977        /*
1978        The average distance of the other RSs shows a lack of DSs.
1979        Compute the number of DSs to disconnect from the current RS,
1980        rounding to the nearest integer number. Do only this if there is
1981        no risk of yoyo effect: when the exact balance cannot be
1982        established due to the current number of DSs connected, do not
1983        disconnect a DS. A simple example where the balance cannot be
1984        reached is:
1985        - RS1 has weight 1 and 2 DSs
1986        - RS2 has weight 1 and 1 DS
1987        => disconnecting a DS from RS1 to reconnect it to RS2 would have no
1988        sense as this would lead to the reverse situation. In that case,
1989        the perfect balance cannot be reached and we must stick to the
1990        current situation, otherwise the DS would keep move between the 2
1991        RSs
1992        */
1993        float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
1994          multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
1995              .floatValue();
1996        int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
1997
1998        // Avoid yoyo effect
1999        if (overloadingDSsNumber == 1)
2000        {
2001          // What would be the new load distance for the current RS if
2002          // we disconnect some DSs ?
2003          ReplicationServerInfo currentReplicationServerInfo =
2004            bestServers.get(currentRsServerId);
2005
2006          int currentRsWeight = currentReplicationServerInfo.getWeight();
2007          BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
2008          BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
2009          BigDecimal currentRsLoadGoalBd =
2010            currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
2011          BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
2012          if (sumOfConnectedDSs != 0)
2013          {
2014            int connectedDSs = currentReplicationServerInfo.
2015              getConnectedDSNumber();
2016            BigDecimal potentialNewConnectedDSsBd =
2017                BigDecimal.valueOf(connectedDSs - 1);
2018            BigDecimal sumOfConnectedDSsBd =
2019                BigDecimal.valueOf(sumOfConnectedDSs);
2020            potentialCurrentRsNewLoadBd =
2021              potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
2022                mathContext);
2023          }
2024          BigDecimal potentialCurrentRsNewLoadDistanceBd =
2025            currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
2026              mathContext);
2027
2028          // What would be the new load distance for the other RSs ?
2029          BigDecimal additionalDsLoadBd =
2030              BigDecimal.ONE.divide(
2031                  BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
2032          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
2033            sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
2034                  mathContext);
2035
2036          /*
2037          Now compare both values: we must not disconnect the DS if this
2038          is for going in a situation where the load distance of the other
2039          RSs is the opposite of the future load distance of the local RS
2040          or we would evaluate that we should disconnect just after being
2041          arrived on the new RS. But we should disconnect if we reach the
2042          perfect balance (both values are 0).
2043          */
2044          if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd,
2045              potentialNewSumOfLoadDistancesOfOtherRSsBd))
2046          {
2047            // Avoid the yoyo effect, and keep the local DS connected to its
2048            // current RS
2049            evals.setBestRS(currentRsServerId,
2050                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
2051            return;
2052          }
2053        }
2054
2055        ReplicationServerInfo currentRsInfo =
2056            bestServers.get(currentRsServerId);
2057        if (isServerOverloadingRS(localServerId, currentRsInfo,
2058            overloadingDSsNumber))
2059        {
2060          // The local server is part of the DSs to disconnect
2061          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
2062              localServerId, currentRsServerId));
2063        }
2064        else
2065        {
2066          // The local server is not part of the servers to disconnect from the
2067          // current RS.
2068          evals.setBestRS(currentRsServerId,
2069              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
2070                  currentRsServerId));
2071        }
2072      } else {
2073        // The average distance of the other RSs does not show a lack of DSs:
2074        // no need to disconnect any DS from the current RS.
2075        evals.setBestRS(currentRsServerId,
2076            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
2077                currentRsServerId));
2078      }
2079    } else {
2080      // The RS load goal is reached or there are not enough DSs connected to
2081      // it to reach it: do not disconnect from this RS and return rsInfo for
2082      // this RS
2083      evals.setBestRS(currentRsServerId,
2084          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
2085              currentRsServerId));
2086    }
2087  }
2088
2089  private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance,
2090      BigDecimal otherRSsNewSumOfLoadDistances)
2091  {
2092    final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN);
2093    final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx);
2094    final BigDecimal otherRSsSumOfLoadDistances =
2095        otherRSsNewSumOfLoadDistances.round(roundCtx);
2096
2097    return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0
2098        && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0;
2099  }
2100
2101  /**
2102   * Returns whether the local DS is overloading the RS.
2103   * <p>
2104   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
2105   * DSs connected to this RS is ordered by serverId to use a consistent
2106   * ordering across all nodes in the topology. The serverIds which index in the
2107   * List are lower than "overloadingDSsNumber" will be evicted first.
2108   * <p>
2109   * This ordering is unfair since nodes with the lower serverIds will be
2110   * evicted more often than nodes with higher serverIds. However, it is a
2111   * consistent and reliable ordering applicable anywhere in the topology.
2112   */
2113  private static boolean isServerOverloadingRS(int localServerId,
2114      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
2115  {
2116    List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs());
2117    Collections.sort(serversConnectedToCurrentRS);
2118
2119    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
2120    return idx != -1 && idx < overloadingDSsNumber;
2121  }
2122
2123  /**
2124   * Start the heartbeat monitor thread.
2125   */
2126  private void startRSHeartBeatMonitoring(ConnectedRS rs)
2127  {
2128    final long heartbeatInterval = config.getHeartbeatInterval();
2129    if (heartbeatInterval > 0)
2130    {
2131      heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
2132          getBaseDN().toString(), rs.session, heartbeatInterval);
2133      heartbeatMonitor.start();
2134    }
2135  }
2136
2137  /**
2138   * Stop the heartbeat monitor thread.
2139   */
2140  private synchronized void stopRSHeartBeatMonitoring()
2141  {
2142    if (heartbeatMonitor != null)
2143    {
2144      heartbeatMonitor.shutdown();
2145      heartbeatMonitor = null;
2146    }
2147  }
2148
2149  /**
2150   * Restart the ReplicationBroker.
2151   * @param infiniteTry the socket which failed
2152   */
2153  public void reStart(boolean infiniteTry)
2154  {
2155    reStart(connectedRS.get().session, infiniteTry);
2156  }
2157
2158  /**
2159   * Restart the ReplicationServer broker after a failure.
2160   *
2161   * @param failingSession the socket which failed
2162   * @param infiniteTry the socket which failed
2163   */
2164  private void reStart(Session failingSession, boolean infiniteTry)
2165  {
2166    if (failingSession != null)
2167    {
2168      failingSession.close();
2169      numLostConnections++;
2170    }
2171
2172    ConnectedRS rs = connectedRS.get();
2173    if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
2174    {
2175      rs = setConnectedRS(ConnectedRS.noConnectedRS());
2176    }
2177
2178    while (true)
2179    {
2180      // Synchronize inside the loop in order to allow shutdown.
2181      synchronized (startStopLock)
2182      {
2183        if (rs.isConnected() || shutdown)
2184        {
2185          break;
2186        }
2187
2188        try
2189        {
2190          connectAsDataServer();
2191          rs = connectedRS.get();
2192        }
2193        catch (Exception e)
2194        {
2195          logger.error(NOTE_EXCEPTION_RESTARTING_SESSION,
2196              getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
2197        }
2198
2199        if (rs.isConnected() || !infiniteTry)
2200        {
2201          break;
2202        }
2203      }
2204      try
2205      {
2206          Thread.sleep(500);
2207      }
2208      catch (InterruptedException ignored)
2209      {
2210        // ignore
2211      }
2212    }
2213
2214    if (logger.isTraceEnabled())
2215    {
2216      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
2217          + rs.getServerId() + ") genId=" + getGenerationID());
2218    }
2219  }
2220
2221  /**
2222   * Publish a message to the other servers.
2223   * @param msg the message to publish
2224   */
2225  public void publish(ReplicationMsg msg)
2226  {
2227    publish(msg, false, true);
2228  }
2229
2230  /**
2231   * Publish a message to the other servers.
2232   * @param msg            The message to publish.
2233   * @param retryOnFailure Whether reconnect should automatically be done.
2234   * @return               Whether publish succeeded.
2235   */
2236  boolean publish(ReplicationMsg msg, boolean retryOnFailure)
2237  {
2238    return publish(msg, false, retryOnFailure);
2239  }
2240
2241  /**
2242   * Publish a recovery message to the other servers.
2243   * @param msg the message to publish
2244   */
2245  public void publishRecovery(ReplicationMsg msg)
2246  {
2247    publish(msg, true, true);
2248  }
2249
2250  /**
2251   * Publish a message to the other servers.
2252   * @param msg the message to publish
2253   * @param recoveryMsg the message is a recovery LocalizableMessage
2254   * @param retryOnFailure whether retry should be done on failure
2255   * @return whether the message was successfully sent.
2256   */
2257  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
2258      boolean retryOnFailure)
2259  {
2260    boolean done = false;
2261
2262    while (!done && !shutdown)
2263    {
2264      if (connectionError)
2265      {
2266        /*
2267        It was not possible to connect to any replication server.
2268        Since the operation was already processed, we have no other
2269        choice than to return without sending the ReplicationMsg
2270        and relying on the resend procedure of the connect phase to
2271        fix the problem when we finally connect.
2272        */
2273
2274        if (logger.isTraceEnabled())
2275        {
2276          debugInfo("publish(): Publishing a message is not possible due to"
2277              + " existing connection error.");
2278        }
2279
2280        return false;
2281      }
2282
2283      try
2284      {
2285        /*
2286        save the session at the time when we acquire the
2287        sendwindow credit so that we can make sure later
2288        that the session did not change in between.
2289        This is necessary to make sure that we don't publish a message
2290        on a session with a credit that was acquired from a previous
2291        session.
2292        */
2293        Session currentSession;
2294        Semaphore currentWindowSemaphore;
2295        synchronized (connectPhaseLock)
2296        {
2297          currentSession = connectedRS.get().session;
2298          currentWindowSemaphore = sendWindow;
2299        }
2300
2301        /*
2302        If the Replication domain has decided that there is a need to
2303        recover some changes then it is not allowed to send this
2304        change but it will be the responsibility of the recovery thread to
2305        do it.
2306        */
2307        if (!recoveryMsg & connectRequiresRecovery)
2308        {
2309          return false;
2310        }
2311
2312        boolean credit;
2313        if (msg instanceof UpdateMsg)
2314        {
2315          /*
2316          Acquiring the window credit must be done outside of the
2317          connectPhaseLock because it can be blocking and we don't
2318          want to hold off reconnection in case the connection dropped.
2319          */
2320          credit =
2321            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
2322        }
2323        else
2324        {
2325          credit = true;
2326        }
2327
2328        if (credit)
2329        {
2330          synchronized (connectPhaseLock)
2331          {
2332            /*
2333            session may have been set to null in the connection phase
2334            when restarting the broker for example.
2335            Check the session. If it has changed, some disconnection or
2336            reconnection happened and we need to restart from scratch.
2337            */
2338            final Session session = connectedRS.get().session;
2339            if (session != null && session == currentSession)
2340            {
2341              session.publish(msg);
2342              done = true;
2343            }
2344          }
2345        }
2346        if (!credit && currentWindowSemaphore.availablePermits() == 0)
2347        {
2348          synchronized (connectPhaseLock)
2349          {
2350            /*
2351            the window is still closed.
2352            Send a WindowProbeMsg message to wake up the receiver in case the
2353            window update message was lost somehow...
2354            then loop to check again if connection was closed.
2355            */
2356            Session session = connectedRS.get().session;
2357            if (session != null)
2358            {
2359              session.publish(new WindowProbeMsg());
2360            }
2361          }
2362        }
2363      }
2364      catch (IOException e)
2365      {
2366        if (logger.isTraceEnabled())
2367        {
2368          debugInfo("publish(): IOException caught: "
2369              + stackTraceToSingleLineString(e));
2370        }
2371        if (!retryOnFailure)
2372        {
2373          return false;
2374        }
2375
2376        // The receive threads should handle reconnection or
2377        // mark this broker in error. Just retry.
2378        synchronized (connectPhaseLock)
2379        {
2380          try
2381          {
2382            connectPhaseLock.wait(100);
2383          }
2384          catch (InterruptedException ignored)
2385          {
2386            if (logger.isTraceEnabled())
2387            {
2388              debugInfo("publish(): InterruptedException caught 1: "
2389                  + stackTraceToSingleLineString(ignored));
2390            }
2391          }
2392        }
2393      }
2394      catch (InterruptedException ignored)
2395      {
2396        // just loop.
2397        if (logger.isTraceEnabled())
2398        {
2399          debugInfo("publish(): InterruptedException caught 2: "
2400              + stackTraceToSingleLineString(ignored));
2401        }
2402      }
2403    }
2404    return true;
2405  }
2406
2407  /**
2408   * Receive a message.
2409   * This method is not thread-safe and should either always be
2410   * called in a single thread or protected by a locking mechanism
2411   * before being called. This is a wrapper to the method with a boolean version
2412   * so that we do not have to modify existing tests.
2413   *
2414   * @return the received message
2415   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2416   *         has expired
2417   */
2418  public ReplicationMsg receive() throws SocketTimeoutException
2419  {
2420    return receive(false, true, false);
2421  }
2422
2423  /**
2424   * Receive a message.
2425   * This method is not thread-safe and should either always be
2426   * called in a single thread or protected by a locking mechanism
2427   * before being called.
2428   *
2429   * @param reconnectToTheBestRS Whether broker will automatically switch
2430   *                             to the best suitable RS.
2431   * @param reconnectOnFailure   Whether broker will automatically reconnect
2432   *                             on failure.
2433   * @param returnOnTopoChange   Whether broker should return TopologyMsg
2434   *                             received.
2435   * @return the received message
2436   *
2437   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2438   *         has expired
2439   */
2440  ReplicationMsg receive(boolean reconnectToTheBestRS,
2441      boolean reconnectOnFailure, boolean returnOnTopoChange)
2442    throws SocketTimeoutException
2443  {
2444    while (!shutdown)
2445    {
2446      ConnectedRS rs = connectedRS.get();
2447      if (reconnectOnFailure && !rs.isConnected())
2448      {
2449        // infinite try to reconnect
2450        reStart(null, true);
2451        continue;
2452      }
2453
2454      // Save session information for later in case we need it for log messages
2455      // after the session has been closed and/or failed.
2456      if (rs.session == null)
2457      {
2458        // Must be shutting down.
2459        break;
2460      }
2461
2462      final int serverId = getServerId();
2463      final DN baseDN = getBaseDN();
2464      final int previousRsServerID = rs.getServerId();
2465      try
2466      {
2467        ReplicationMsg msg = rs.session.receive();
2468        if (msg instanceof UpdateMsg && !(msg instanceof ReplicaOfflineMsg))
2469        {
2470          synchronized (this)
2471          {
2472            rcvWindow--;
2473          }
2474        }
2475        if (msg instanceof WindowMsg)
2476        {
2477          final WindowMsg windowMsg = (WindowMsg) msg;
2478          sendWindow.release(windowMsg.getNumAck());
2479        }
2480        else if (msg instanceof TopologyMsg)
2481        {
2482          final TopologyMsg topoMsg = (TopologyMsg) msg;
2483          receiveTopo(topoMsg, getRsServerId());
2484          if (reconnectToTheBestRS)
2485          {
2486            // Reset wait time before next computation of best server
2487            mustRunBestServerCheckingAlgorithm = 0;
2488          }
2489
2490          // Caller wants to check what's changed
2491          if (returnOnTopoChange)
2492          {
2493            return msg;
2494          }
2495        }
2496        else if (msg instanceof StopMsg)
2497        {
2498          // RS performs a proper disconnection
2499          logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer,
2500              serverId, baseDN);
2501
2502          // Try to find a suitable RS
2503          reStart(rs.session, true);
2504        }
2505        else if (msg instanceof MonitorMsg)
2506        {
2507          // This is the response to a MonitorRequest that was sent earlier or
2508          // the regular message of the monitoring publisher of the RS.
2509          MonitorMsg monitorMsg = (MonitorMsg) msg;
2510
2511          // Extract and store replicas ServerStates
2512          final Map<Integer, ServerState> newReplicaStates = new HashMap<>();
2513          for (int srvId : toIterable(monitorMsg.ldapIterator()))
2514          {
2515            newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
2516          }
2517          replicaStates = newReplicaStates;
2518
2519          // Notify the sender that the response was received.
2520          synchronized (monitorResponse)
2521          {
2522            monitorResponse.set(true);
2523            monitorResponse.notify();
2524          }
2525
2526          // Update the replication servers ServerStates with new received info
2527          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
2528          for (int srvId : toIterable(monitorMsg.rsIterator()))
2529          {
2530            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
2531            if (rsInfo != null)
2532            {
2533              rsInfo.update(monitorMsg.getRSServerState(srvId));
2534            }
2535          }
2536
2537          /*
2538          Now if it is allowed, compute the best replication server to see if
2539          it is still the one we are currently connected to. If not,
2540          disconnect properly and let the connection algorithm re-connect to
2541          best replication server
2542          */
2543          if (reconnectToTheBestRS)
2544          {
2545            mustRunBestServerCheckingAlgorithm++;
2546            if (mustRunBestServerCheckingAlgorithm == 2)
2547            {
2548              // Stable topology (no topo msg since few seconds): proceed with
2549              // best server checking.
2550              final RSEvaluations evals = computeBestReplicationServer(
2551                  false, previousRsServerID, state,
2552                  rsInfos, serverId, getGroupId(), getGenerationID());
2553              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
2554              if (previousRsServerID != -1
2555                  && (bestServerInfo == null
2556                      || bestServerInfo.getServerId() != previousRsServerID))
2557              {
2558                // The best replication server is no more the one we are
2559                // currently using. Disconnect properly then reconnect.
2560                LocalizableMessage message;
2561                if (bestServerInfo == null)
2562                {
2563                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
2564                      serverId, previousRsServerID, rs.replicationServer, baseDN);
2565                }
2566                else
2567                {
2568                  final int bestRsServerId = bestServerInfo.getServerId();
2569                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
2570                      serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN,
2571                      evals.getEvaluation(previousRsServerID),
2572                      evals.getEvaluation(bestRsServerId));
2573                }
2574                logger.info(message);
2575                if (logger.isTraceEnabled())
2576                {
2577                  debugInfo("best replication servers evaluation results: " + evals);
2578                }
2579                reStart(true);
2580              }
2581
2582              // Reset wait time before next computation of best server
2583              mustRunBestServerCheckingAlgorithm = 0;
2584            }
2585          }
2586        }
2587        else
2588        {
2589          return msg;
2590        }
2591      }
2592      catch (SocketTimeoutException e)
2593      {
2594        throw e;
2595      }
2596      catch (Exception e)
2597      {
2598        logger.traceException(e);
2599
2600        if (!shutdown)
2601        {
2602          if (rs.session == null || !rs.session.closeInitiated())
2603          {
2604            // We did not initiate the close on our side, log an error message.
2605            logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED,
2606                serverId, baseDN, previousRsServerID, rs.replicationServer);
2607          }
2608
2609          if (!reconnectOnFailure)
2610          {
2611            break; // does not seem necessary to explicitly disconnect ..
2612          }
2613
2614          reStart(rs.session, true);
2615        }
2616      }
2617    } // while !shutdown
2618    return null;
2619  }
2620
2621  /**
2622   * Gets the States of all the Replicas currently in the Topology. When this
2623   * method is called, a Monitoring message will be sent to the Replication
2624   * Server to which this domain is currently connected so that it computes a
2625   * table containing information about all Directory Servers in the topology.
2626   * This Computation involves communications will all the servers currently
2627   * connected and
2628   *
2629   * @return The States of all Replicas in the topology (except us)
2630   */
2631  public Map<Integer, ServerState> getReplicaStates()
2632  {
2633    monitorResponse.set(false);
2634
2635    // publish Monitor Request LocalizableMessage to the Replication Server
2636    publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
2637
2638    // wait for Response up to 10 seconds.
2639    try
2640    {
2641      synchronized (monitorResponse)
2642      {
2643        if (!monitorResponse.get())
2644        {
2645          monitorResponse.wait(10000);
2646        }
2647      }
2648    } catch (InterruptedException e)
2649    {
2650      Thread.currentThread().interrupt();
2651    }
2652    return replicaStates;
2653  }
2654
2655  /**
2656   * This method allows to do the necessary computing for the window
2657   * management after treatment by the worker threads.
2658   *
2659   * This should be called once the replay thread have done their job
2660   * and the window can be open again.
2661   */
2662  public synchronized void updateWindowAfterReplay()
2663  {
2664    try
2665    {
2666      updateDoneCount++;
2667      final Session session = connectedRS.get().session;
2668      if (updateDoneCount >= halfRcvWindow && session != null)
2669      {
2670        session.publish(new WindowMsg(updateDoneCount));
2671        rcvWindow += updateDoneCount;
2672        updateDoneCount = 0;
2673      }
2674    } catch (IOException e)
2675    {
2676      // Any error on the socket will be handled by the thread calling receive()
2677      // just ignore.
2678    }
2679  }
2680
2681  /** Stop the server. */
2682  public void stop()
2683  {
2684    if (logger.isTraceEnabled() && !shutdown)
2685    {
2686      debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")");
2687    }
2688
2689    synchronized (startStopLock)
2690    {
2691      if (shutdown)
2692      {
2693        return;
2694      }
2695      domain.publishReplicaOfflineMsg();
2696      shutdown = true;
2697      setConnectedRS(ConnectedRS.stopped());
2698      stopRSHeartBeatMonitoring();
2699      stopChangeTimeHeartBeatPublishing();
2700      deregisterReplicationMonitor();
2701    }
2702  }
2703
2704  /**
2705   * Set a timeout value.
2706   * With this option set to a non-zero value, calls to the receive() method
2707   * block for only this amount of time after which a
2708   * java.net.SocketTimeoutException is raised.
2709   * The Broker is valid and usable even after such an Exception is raised.
2710   *
2711   * @param timeout the specified timeout, in milliseconds.
2712   * @throws SocketException if there is an error in the underlying protocol,
2713   *         such as a TCP error.
2714   */
2715  public void setSoTimeout(int timeout) throws SocketException
2716  {
2717    this.timeout = timeout;
2718    final Session session = connectedRS.get().session;
2719    if (session != null)
2720    {
2721      session.setSoTimeout(timeout);
2722    }
2723  }
2724
2725  /**
2726   * Get the name of the replicationServer to which this broker is currently
2727   * connected.
2728   *
2729   * @return the name of the replicationServer to which this domain
2730   *         is currently connected.
2731   */
2732  public String getReplicationServer()
2733  {
2734    return connectedRS.get().replicationServer;
2735  }
2736
2737  /**
2738   * Get the maximum receive window size.
2739   *
2740   * @return The maximum receive window size.
2741   */
2742  public int getMaxRcvWindow()
2743  {
2744    return config.getWindowSize();
2745  }
2746
2747  /**
2748   * Get the current receive window size.
2749   *
2750   * @return The current receive window size.
2751   */
2752  public int getCurrentRcvWindow()
2753  {
2754    return rcvWindow;
2755  }
2756
2757  /**
2758   * Get the maximum send window size.
2759   *
2760   * @return The maximum send window size.
2761   */
2762  public int getMaxSendWindow()
2763  {
2764    return maxSendWindow;
2765  }
2766
2767  /**
2768   * Get the current send window size.
2769   *
2770   * @return The current send window size.
2771   */
2772  public int getCurrentSendWindow()
2773  {
2774    if (isConnected())
2775    {
2776      return sendWindow.availablePermits();
2777    }
2778    return 0;
2779  }
2780
2781  /**
2782   * Get the number of times the connection was lost.
2783   * @return The number of times the connection was lost.
2784   */
2785  public int getNumLostConnections()
2786  {
2787    return numLostConnections;
2788  }
2789
2790  /**
2791   * Change some configuration parameters.
2792   *
2793   * @param newConfig  The new config to use.
2794   * @return                    A boolean indicating if the changes
2795   *                            requires to restart the service.
2796   */
2797  boolean changeConfig(ReplicationDomainCfg newConfig)
2798  {
2799    // These parameters needs to be renegotiated with the ReplicationServer
2800    // so if they have changed, that requires restarting the session with
2801    // the ReplicationServer.
2802    // A new session is necessary only when information regarding
2803    // the connection is modified
2804    boolean needToRestartSession =
2805        !newConfig.getReplicationServer().equals(config.getReplicationServer())
2806        || newConfig.getWindowSize() != config.getWindowSize()
2807        || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
2808        || newConfig.getGroupId() != config.getGroupId();
2809
2810    this.config = newConfig;
2811    this.rcvWindow = newConfig.getWindowSize();
2812    this.halfRcvWindow = this.rcvWindow / 2;
2813
2814    return needToRestartSession;
2815  }
2816
2817  /**
2818   * Get the version of the replication protocol.
2819   * @return The version of the replication protocol.
2820   */
2821  public short getProtocolVersion()
2822  {
2823    final Session session = connectedRS.get().session;
2824    if (session != null)
2825    {
2826      return session.getProtocolVersion();
2827    }
2828    return ProtocolVersion.getCurrentVersion();
2829  }
2830
2831  /**
2832   * Check if the broker is connected to a ReplicationServer and therefore
2833   * ready to received and send Replication Messages.
2834   *
2835   * @return true if the server is connected, false if not.
2836   */
2837  public boolean isConnected()
2838  {
2839    return connectedRS.get().isConnected();
2840  }
2841
2842  /**
2843   * Determine whether the connection to the replication server is encrypted.
2844   * @return true if the connection is encrypted, false otherwise.
2845   */
2846  public boolean isSessionEncrypted()
2847  {
2848    final Session session = connectedRS.get().session;
2849    return session != null ? session.isEncrypted() : false;
2850  }
2851
2852  /**
2853   * Signals the RS we just entered a new status.
2854   * @param newStatus The status the local DS just entered
2855   */
2856  public void signalStatusChange(ServerStatus newStatus)
2857  {
2858    try
2859    {
2860      connectedRS.get().session.publish(
2861          new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
2862    } catch (IOException ex)
2863    {
2864      logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(),
2865          ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
2866    }
2867  }
2868
2869  /**
2870   * Gets the info for DSs in the topology (except us).
2871   * @return The info for DSs in the topology (except us)
2872   */
2873  public Map<Integer, DSInfo> getReplicaInfos()
2874  {
2875    return topology.get().replicaInfos;
2876  }
2877
2878  /**
2879   * Gets the info for RSs in the topology (except the one we are connected
2880   * to).
2881   * @return The info for RSs in the topology (except the one we are connected
2882   * to)
2883   */
2884  public List<RSInfo> getRsInfos()
2885  {
2886    return toRSInfos(topology.get().rsInfos);
2887  }
2888
2889  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
2890  {
2891    final List<RSInfo> result = new ArrayList<>();
2892    for (ReplicationServerInfo rsInfo : rsInfos.values())
2893    {
2894      result.add(rsInfo.toRSInfo());
2895    }
2896    return result;
2897  }
2898
2899  /**
2900   * Processes an incoming TopologyMsg.
2901   * Updates the structures for the local view of the topology.
2902   *
2903   * @param topoMsg
2904   *          The topology information received from RS.
2905   * @param rsServerId
2906   *          the serverId to use for the connectedDS
2907   */
2908  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
2909  {
2910    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
2911    for (DSInfo dsInfo : newTopo.replicaInfos.values())
2912    {
2913      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
2914          .getEclIncludesForDeletes());
2915    }
2916  }
2917
2918  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
2919  {
2920    Topology oldTopo;
2921    Topology newTopo;
2922    do
2923    {
2924      oldTopo = topology.get();
2925      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
2926              getReplicationServerUrls(), oldTopo.rsInfos);
2927    }
2928    while (!topology.compareAndSet(oldTopo, newTopo));
2929
2930    if (logger.isTraceEnabled())
2931    {
2932      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
2933      sb.append(" received TopologyMsg=").append(topoMsg);
2934      debugInfo(sb);
2935    }
2936    return newTopo;
2937  }
2938
2939  /**
2940   * Contains the last known state of the replication topology.
2941   */
2942  static final class Topology
2943  {
2944
2945    /**
2946     * The RS's serverId that this DS was connected to when this topology state
2947     * was computed.
2948     */
2949    private final int rsServerId;
2950    /**
2951     * Info for other DSs.
2952     * <p>
2953     * Warning: does not contain info for us (for our server id)
2954     */
2955    final Map<Integer, DSInfo> replicaInfos;
2956    /**
2957     * The map of replication server info initialized at connection time and
2958     * regularly updated. This is used to decide to which best suitable
2959     * replication server one wants to connect. Key: replication server id
2960     * Value: replication server info for the matching replication server id
2961     */
2962    final Map<Integer, ReplicationServerInfo> rsInfos;
2963
2964    private Topology()
2965    {
2966      this.rsServerId = -1;
2967      this.replicaInfos = Collections.emptyMap();
2968      this.rsInfos = Collections.emptyMap();
2969    }
2970
2971    /**
2972     * Constructor to use when only the RSInfos need to be recomputed.
2973     *
2974     * @param dsInfosToKeep
2975     *          the DSInfos that will be stored as is
2976     * @param newRSInfos
2977     *          the new RSInfos from which to compute the new topology
2978     * @param dsServerId
2979     *          the DS serverId
2980     * @param rsServerId
2981     *          the current connected RS serverId
2982     * @param configuredReplicationServerUrls
2983     *          the configured replication server URLs
2984     * @param previousRsInfos
2985     *          the RSInfos computed in the previous Topology object
2986     */
2987    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
2988        int dsServerId, int rsServerId,
2989        Set<String> configuredReplicationServerUrls,
2990        Map<Integer, ReplicationServerInfo> previousRsInfos)
2991    {
2992      this.rsServerId = rsServerId;
2993      this.replicaInfos = dsInfosToKeep == null
2994          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
2995      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
2996          previousRsInfos, configuredReplicationServerUrls);
2997    }
2998
2999    /**
3000     * Constructor to use when a new TopologyMsg has been received.
3001     *
3002     * @param topoMsg
3003     *          the topology message containing the new DSInfos and RSInfos from
3004     *          which to compute the new topology
3005     * @param dsServerId
3006     *          the DS serverId
3007     * @param rsServerId
3008     *          the current connected RS serverId
3009     * @param configuredReplicationServerUrls
3010     *          the configured replication server URLs
3011     * @param previousRsInfos
3012     *          the RSInfos computed in the previous Topology object
3013     */
3014    Topology(TopologyMsg topoMsg, int dsServerId,
3015        int rsServerId, Set<String> configuredReplicationServerUrls,
3016        Map<Integer, ReplicationServerInfo> previousRsInfos)
3017    {
3018      this.rsServerId = rsServerId;
3019      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
3020      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
3021          previousRsInfos, configuredReplicationServerUrls);
3022    }
3023
3024    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
3025        int dsServerId)
3026    {
3027      final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos);
3028      copy.remove(dsServerId);
3029      return Collections.unmodifiableMap(copy);
3030    }
3031
3032    private Map<Integer, ReplicationServerInfo> computeRSInfos(
3033        int dsServerId, List<RSInfo> newRsInfos,
3034        Map<Integer, ReplicationServerInfo> previousRsInfos,
3035        Set<String> configuredReplicationServerUrls)
3036    {
3037      final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos);
3038
3039      // Update replication server info list with the received topology info
3040      final Set<Integer> rssToKeep = new HashSet<>();
3041      for (RSInfo newRSInfo : newRsInfos)
3042      {
3043        final int rsId = newRSInfo.getId();
3044        rssToKeep.add(rsId); // Mark this server as still existing
3045        Set<Integer> connectedDSs =
3046            computeDSsConnectedTo(rsId, dsServerId);
3047        ReplicationServerInfo rsInfo = results.get(rsId);
3048        if (rsInfo == null)
3049        {
3050          // New replication server, create info for it add it to the list
3051          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
3052          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
3053          results.put(rsId, rsInfo);
3054        }
3055        else
3056        {
3057          // Update the existing info for the replication server
3058          rsInfo.update(newRSInfo, connectedDSs);
3059        }
3060      }
3061
3062      // Remove any replication server that may have disappeared from the
3063      // topology
3064      results.keySet().retainAll(rssToKeep);
3065
3066      return Collections.unmodifiableMap(results);
3067    }
3068
3069    /** Computes the list of DSs connected to a particular RS. */
3070    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
3071    {
3072      final Set<Integer> connectedDSs = new HashSet<>();
3073      if (rsServerId == rsId)
3074      {
3075        /*
3076         * If we are computing connected DSs for the RS we are connected to, we
3077         * should count the local DS as the DSInfo of the local DS is not sent
3078         * by the replication server in the topology message. We must count
3079         * ourselves as a connected server.
3080         */
3081        connectedDSs.add(dsServerId);
3082      }
3083
3084      for (DSInfo dsInfo : replicaInfos.values())
3085      {
3086        if (dsInfo.getRsId() == rsId)
3087        {
3088          connectedDSs.add(dsInfo.getDsId());
3089        }
3090      }
3091
3092      return connectedDSs;
3093    }
3094
3095    /**
3096     * Sets the locally configured flag for the passed ReplicationServerInfo
3097     * object, analyzing the local configuration.
3098     *
3099     * @param rsInfo
3100     *          the Replication server to check and update
3101     * @param configuredReplicationServerUrls
3102     */
3103    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
3104        Set<String> configuredReplicationServerUrls)
3105    {
3106      // Determine if the passed ReplicationServerInfo has a URL that is present
3107      // in the locally configured replication servers
3108      String rsUrl = rsInfo.getServerURL();
3109      if (rsUrl == null)
3110      {
3111        // The ReplicationServerInfo has been generated from a server with
3112        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
3113        // ignore this server as we do not know how to connect to it
3114        rsInfo.setLocallyConfigured(false);
3115        return;
3116      }
3117      for (String serverUrl : configuredReplicationServerUrls)
3118      {
3119        if (isSameReplicationServerUrl(serverUrl, rsUrl))
3120        {
3121          // This RS is locally configured, mark this
3122          rsInfo.setLocallyConfigured(true);
3123          rsInfo.setServerURL(serverUrl);
3124          return;
3125        }
3126      }
3127      rsInfo.setLocallyConfigured(false);
3128    }
3129
3130    /** {@inheritDoc} */
3131    @Override
3132    public boolean equals(Object obj)
3133    {
3134      if (this == obj)
3135      {
3136        return true;
3137      }
3138      if (obj == null || getClass() != obj.getClass())
3139      {
3140        return false;
3141      }
3142      final Topology other = (Topology) obj;
3143      return rsServerId == other.rsServerId
3144          && Objects.equals(replicaInfos, other.replicaInfos)
3145          && Objects.equals(rsInfos, other.rsInfos)
3146          && urlsEqual1(replicaInfos, other.replicaInfos)
3147          && urlsEqual2(rsInfos, other.rsInfos);
3148    }
3149
3150    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
3151        Map<Integer, DSInfo> replicaInfos2)
3152    {
3153      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
3154      {
3155        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
3156        if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
3157        {
3158          return false;
3159        }
3160      }
3161      return true;
3162    }
3163
3164    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
3165        Map<Integer, ReplicationServerInfo> rsInfos2)
3166    {
3167      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
3168      {
3169        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
3170        if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
3171        {
3172          return false;
3173        }
3174      }
3175      return true;
3176    }
3177
3178    /** {@inheritDoc} */
3179    @Override
3180    public int hashCode()
3181    {
3182      final int prime = 31;
3183      int result = 1;
3184      result = prime * result + rsServerId;
3185      result = prime * result
3186          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
3187      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
3188      return result;
3189    }
3190
3191    /** {@inheritDoc} */
3192    @Override
3193    public String toString()
3194    {
3195      return getClass().getSimpleName()
3196          + " rsServerId=" + rsServerId
3197          + ", replicaInfos=" + replicaInfos.values()
3198          + ", rsInfos=" + rsInfos.values();
3199    }
3200  }
3201
3202  /**
3203   * Check if the broker could not find any Replication Server and therefore
3204   * connection attempt failed.
3205   *
3206   * @return true if the server could not connect to any Replication Server.
3207   */
3208  boolean hasConnectionError()
3209  {
3210    return connectionError;
3211  }
3212
3213  /**
3214   * Starts publishing to the RS the current timestamp used in this server.
3215   */
3216  private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
3217  {
3218    // Start a CSN heartbeat thread.
3219    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
3220    if (changeTimeHeartbeatInterval > 0)
3221    {
3222      final String threadName = "Replica DS(" + getServerId()
3223              + ") change time heartbeat publisher for domain \"" + getBaseDN()
3224              + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
3225
3226      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
3227          threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
3228      ctHeartbeatPublisherThread.start();
3229    }
3230    else
3231    {
3232      if (logger.isTraceEnabled())
3233      {
3234        debugInfo("is not configured to send CSN heartbeat interval");
3235      }
3236    }
3237  }
3238
3239  /**
3240   * Stops publishing to the RS the current timestamp used in this server.
3241   */
3242  private synchronized void stopChangeTimeHeartBeatPublishing()
3243  {
3244    if (ctHeartbeatPublisherThread != null)
3245    {
3246      ctHeartbeatPublisherThread.shutdown();
3247      ctHeartbeatPublisherThread = null;
3248    }
3249  }
3250
3251  /**
3252   * Set the connectRequiresRecovery to the provided value.
3253   * This flag is used to indicate if a recovery of Update is necessary
3254   * after a reconnection to a RS.
3255   * It is the responsibility of the ReplicationDomain to set it during the
3256   * sessionInitiated phase.
3257   *
3258   * @param b the new value of the connectRequiresRecovery.
3259   */
3260  public void setRecoveryRequired(boolean b)
3261  {
3262    connectRequiresRecovery = b;
3263  }
3264
3265  /**
3266   * Returns whether the broker is shutting down.
3267   * @return whether the broker is shutting down.
3268   */
3269  boolean shuttingDown()
3270  {
3271    return shutdown;
3272  }
3273
3274  /**
3275   * Returns the local address of this replication domain, or the empty string
3276   * if it is not yet connected.
3277   *
3278   * @return The local address.
3279   */
3280  String getLocalUrl()
3281  {
3282    final Session session = connectedRS.get().session;
3283    return session != null ? session.getLocalUrl() : "";
3284  }
3285
3286  /**
3287   * Returns the replication monitor instance name associated with this broker.
3288   *
3289   * @return The replication monitor instance name.
3290   */
3291  String getReplicationMonitorInstanceName()
3292  {
3293    // Only invoked by replication domain so always non-null.
3294    return monitor.getMonitorInstanceName();
3295  }
3296
3297  private ConnectedRS setConnectedRS(final ConnectedRS newRS)
3298  {
3299    final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
3300    if (!oldRS.equals(newRS) && oldRS.session != null)
3301    {
3302      // monitor name is changing, deregister before registering again
3303      deregisterReplicationMonitor();
3304      oldRS.session.close();
3305      registerReplicationMonitor();
3306    }
3307    return newRS;
3308  }
3309
3310  /**
3311   * Must be invoked each time the session changes because, the monitor name is
3312   * dynamically created with the session name, while monitor registration is
3313   * static.
3314   *
3315   * @see #monitor
3316   */
3317  private void registerReplicationMonitor()
3318  {
3319    // The monitor should not be registered if this is a unit test
3320    // because the replication domain is null.
3321    if (monitor != null)
3322    {
3323      DirectoryServer.registerMonitorProvider(monitor);
3324    }
3325  }
3326
3327  private void deregisterReplicationMonitor()
3328  {
3329    // The monitor should not be deregistered if this is a unit test
3330    // because the replication domain is null.
3331    if (monitor != null)
3332    {
3333      DirectoryServer.deregisterMonitorProvider(monitor);
3334    }
3335  }
3336
3337  /** {@inheritDoc} */
3338  @Override
3339  public String toString()
3340  {
3341    final StringBuilder sb = new StringBuilder();
3342    sb.append(getClass().getSimpleName())
3343      .append(" \"").append(getBaseDN()).append(" ")
3344      .append(getServerId()).append("\",")
3345      .append(" groupId=").append(getGroupId())
3346      .append(", genId=").append(getGenerationID())
3347      .append(", ");
3348    connectedRS.get().toString(sb);
3349    return sb.toString();
3350  }
3351
3352  private void debugInfo(CharSequence message)
3353  {
3354    logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
3355        + " and serverId=" + getServerId() + ": " + message);
3356  }
3357}