001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Timer;
028import java.util.TimerTask;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicReference;
032import java.util.concurrent.locks.ReentrantLock;
033
034import net.jcip.annotations.GuardedBy;
035
036import org.forgerock.i18n.LocalizableMessage;
037import org.forgerock.i18n.LocalizableMessageBuilder;
038import org.forgerock.i18n.slf4j.LocalizedLogger;
039import org.forgerock.opendj.ldap.ResultCode;
040import org.opends.server.api.MonitorData;
041import org.forgerock.opendj.server.config.server.MonitorProviderCfg;
042import org.opends.server.api.MonitorProvider;
043import org.opends.server.core.DirectoryServer;
044import org.opends.server.replication.common.CSN;
045import org.opends.server.replication.common.DSInfo;
046import org.opends.server.replication.common.RSInfo;
047import org.opends.server.replication.common.ServerState;
048import org.opends.server.replication.common.ServerStatus;
049import org.opends.server.replication.common.StatusMachineEvent;
050import org.opends.server.replication.protocol.AckMsg;
051import org.opends.server.replication.protocol.ChangeStatusMsg;
052import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
053import org.opends.server.replication.protocol.ErrorMsg;
054import org.opends.server.replication.protocol.MonitorMsg;
055import org.opends.server.replication.protocol.MonitorRequestMsg;
056import org.opends.server.replication.protocol.ReplicaOfflineMsg;
057import org.opends.server.replication.protocol.ResetGenerationIdMsg;
058import org.opends.server.replication.protocol.RoutableMsg;
059import org.opends.server.replication.protocol.TopologyMsg;
060import org.opends.server.replication.protocol.UpdateMsg;
061import org.opends.server.replication.server.changelog.api.ChangelogException;
062import org.opends.server.replication.server.changelog.api.DBCursor;
063import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
064import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
065import org.forgerock.opendj.ldap.DN;
066import org.opends.server.types.DirectoryException;
067import org.opends.server.types.HostPort;
068
069import static org.opends.messages.ReplicationMessages.*;
070import static org.opends.server.replication.common.ServerStatus.*;
071import static org.opends.server.replication.common.StatusMachineEvent.*;
072import static org.opends.server.replication.protocol.ProtocolVersion.*;
073import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
074import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
075import static org.opends.server.util.CollectionUtils.*;
076import static org.opends.server.util.StaticUtils.*;
077
078/**
079 * This class define an in-memory cache that will be used to store
080 * the messages that have been received from an LDAP server or
081 * from another replication server and that should be forwarded to
082 * other servers.
083 *
084 * The size of the cache is set by configuration.
085 * If the cache becomes bigger than the configured size, the older messages
086 * are removed and should they be needed again must be read from the backing
087 * file
088 *
089 * it runs a thread that is responsible for saving the messages
090 * received to the disk and for trimming them
091 * Decision to trim can be based on disk space or age of the message
092 */
093public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
094{
095  private final DN baseDN;
096
097  /**
098   * Periodically verifies whether the connected DSs are late and publishes any
099   * pending status messages.
100   */
101  private final StatusAnalyzer statusAnalyzer;
102
103  /**
104   * The monitoring publisher that periodically sends monitoring messages to the
105   * topology. Using an AtomicReference to avoid leaking references to costly
106   * threads.
107   */
108  private final AtomicReference<MonitoringPublisher> monitoringPublisher = new AtomicReference<>();
109  /** Maintains monitor data for the current domain. */
110  private final ReplicationDomainMonitor domainMonitor = new ReplicationDomainMonitor(this);
111
112  /**
113   * The following map contains one balanced tree for each replica ID to which
114   * we are currently publishing the first update in the balanced tree is the
115   * next change that we must push to this particular server.
116   */
117  private final Map<Integer, DataServerHandler> connectedDSs = new ConcurrentHashMap<>();
118
119  /**
120   * This map contains one ServerHandler for each replication servers with which
121   * we are connected (so normally all the replication servers) the first update
122   * in the balanced tree is the next change that we must push to this
123   * particular server.
124   */
125  private final Map<Integer, ReplicationServerHandler> connectedRSs = new ConcurrentHashMap<>();
126
127  private final ReplicationDomainDB domainDB;
128  /** The ReplicationServer that created the current instance. */
129  private final ReplicationServer localReplicationServer;
130
131  /**
132   * The generationId of the current replication domain. The generationId is
133   * computed by hashing the first 1000 entries in the DB.
134   */
135  private volatile long generationId = -1;
136  /**
137   * JNR, this is legacy code, hard to follow logic. I think what this field
138   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
139   * replication topology in place? As soon as an answer to any of these
140   * question comes true, then it is set to true.
141   * <p>
142   * It looks like the only use of this field is to prevent the
143   * {@link #generationId} from being reset by
144   * {@link #resetGenerationIdIfPossible()}.
145   */
146  private volatile boolean generationIdSavedStatus;
147
148  /** The tracer object for the debug logger. */
149  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
150
151  /**
152   * The needed info for each received assured update message we are waiting
153   * acks for.
154   * <p>
155   * Key: a CSN matching a received update message which requested
156   * assured mode usage (either safe read or safe data mode)
157   * <p>
158   * Value: The object holding every info needed about the already received acks
159   * as well as the acks to be received.
160   *
161   * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
162   *      classes javadoc.
163   */
164  private final Map<CSN, ExpectedAcksInfo> waitingAcks = new ConcurrentHashMap<>();
165
166  /**
167   * The timer used to run the timeout code (timer tasks) for the assured update
168   * messages we are waiting acks for.
169   */
170  private final Timer assuredTimeoutTimer;
171  /**
172   * Counter used to purge the timer tasks references in assuredTimeoutTimer,
173   * every n number of treated assured messages.
174   */
175  private int assuredTimeoutTimerPurgeCounter;
176
177
178
179  /**
180   * Stores pending status messages such as DS change time heartbeats for future
181   * forwarding to the rest of the topology. This class is required in order to
182   * decouple inbound IO processing from outbound IO processing and avoid
183   * potential inter-process deadlocks. In particular, the {@code ServerReader}
184   * thread must not send messages.
185   */
186  private static class PendingStatusMessages
187  {
188    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats = new HashMap<>(1);
189    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs = new HashMap<>(1);
190    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs = new HashMap<>(1);
191    private boolean sendRSTopologyMsg;
192    private boolean sendDSTopologyMsg;
193    private int excludedDSForTopologyMsg = -1;
194
195    /**
196     * Enqueues a TopologyMsg for all the connected directory servers in order
197     * to let them know the topology (every known DSs and RSs).
198     *
199     * @param excludedDS
200     *          If not null, the topology message will not be sent to this DS.
201     */
202    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
203    {
204      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
205      if (sendDSTopologyMsg)
206      {
207        if (excludedServerId != excludedDSForTopologyMsg)
208        {
209          excludedDSForTopologyMsg = -1;
210        }
211      }
212      else
213      {
214        sendDSTopologyMsg = true;
215        excludedDSForTopologyMsg = excludedServerId;
216      }
217    }
218
219    /**
220     * Enqueues a TopologyMsg for all the connected replication servers in order
221     * to let them know our connected LDAP servers.
222     */
223    private void enqueueTopoInfoToAllRSs()
224    {
225      sendRSTopologyMsg = true;
226    }
227
228    /**
229     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
230     * all other RS instances.
231     *
232     * @param msg
233     *          The heartbeat message.
234     */
235    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
236    {
237      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
238    }
239
240    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
241    {
242      pendingDSMonitorMsgs.put(dsServerId, msg);
243    }
244
245    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
246    {
247      pendingRSMonitorMsgs.put(rsServerId, msg);
248    }
249
250    /** {@inheritDoc} */
251    @Override
252    public String toString()
253    {
254      return getClass().getSimpleName()
255          + " pendingHeartbeats=" + pendingHeartbeats
256          + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs
257          + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs
258          + ", sendRSTopologyMsg=" + sendRSTopologyMsg
259          + ", sendDSTopologyMsg=" + sendDSTopologyMsg
260          + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg;
261    }
262  }
263
264  private final Object pendingStatusMessagesLock = new Object();
265
266  @GuardedBy("pendingStatusMessagesLock")
267  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
268
269  /**
270   * Creates a new ReplicationServerDomain associated to the baseDN.
271   *
272   * @param baseDN
273   *          The baseDN associated to the ReplicationServerDomain.
274   * @param localReplicationServer
275   *          the ReplicationServer that created this instance.
276   */
277  public ReplicationServerDomain(DN baseDN,
278      ReplicationServer localReplicationServer)
279  {
280    this.baseDN = baseDN;
281    this.localReplicationServer = localReplicationServer;
282    this.assuredTimeoutTimer = new Timer("Replication server RS("
283        + localReplicationServer.getServerId()
284        + ") assured timer for domain \"" + baseDN + "\"", true);
285    this.domainDB =
286        localReplicationServer.getChangelogDB().getReplicationDomainDB();
287    this.statusAnalyzer = new StatusAnalyzer(this);
288    this.statusAnalyzer.start();
289    DirectoryServer.registerMonitorProvider(this);
290  }
291
292  /**
293   * Add an update that has been received to the list of
294   * updates that must be forwarded to all other servers.
295   *
296   * @param updateMsg  The update that has been received.
297   * @param sourceHandler The ServerHandler for the server from which the
298   *        update was received
299   * @throws IOException When an IO exception happens during the update
300   *         processing.
301   */
302  public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException
303  {
304    sourceHandler.updateServerState(updateMsg);
305    sourceHandler.incrementInCount();
306    setGenerationIdIfUnset(sourceHandler.getGenerationId());
307
308    /**
309     * If this is an assured message (a message requesting ack), we must
310     * construct the ExpectedAcksInfo object with the right number of expected
311     * acks before posting message to the writers. Otherwise some writers may
312     * have time to post, receive the ack and increment received ack counter
313     * (kept in ExpectedAcksInfo object) and we could think the acknowledgment
314     * is fully processed although it may be not (some other acks from other
315     * servers are not yet arrived). So for that purpose we do a pre-loop
316     * to determine to who we will post an assured message.
317     * Whether the assured mode is safe read or safe data, we anyway do not
318     * support the assured replication feature across topologies with different
319     * group ids. The assured feature insures assured replication based on the
320     * same locality (group id). For instance in double data center deployment
321     * (2 group id usage) with assured replication enabled, an assured message
322     * sent from data center 1 (group id = 1) will be sent to servers of both
323     * data centers, but one will request and wait acks only from servers of the
324     * data center 1.
325     */
326    final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler);
327
328    if (!publishUpdateMsg(updateMsg))
329    {
330      return;
331    }
332
333    final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo);
334
335    /**
336     * The update message equivalent to the originally received update message,
337     * but with assured flag disabled. This message is the one that should be
338     * sent to non eligible servers for assured mode.
339     * We need a clone like of the original message with assured flag off, to be
340     * posted to servers we don't want to wait the ack from (not normal status
341     * servers or servers with different group id). This must be done because
342     * the posted message is a reference so each writer queue gets the same
343     * reference, thus, changing the assured flag of an object is done for every
344     * references posted on every writer queues. That is why we need a message
345     * version with assured flag on and another one with assured flag off.
346     */
347    final NotAssuredUpdateMsg notAssuredUpdateMsg =
348        preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null;
349
350    // Push the message to the replication servers
351    if (sourceHandler.isDataServer())
352    {
353      for (ReplicationServerHandler rsHandler : connectedRSs.values())
354      {
355        /**
356         * Ignore updates to RS with bad gen id
357         * (no system managed status for a RS)
358         */
359        if (!isDifferentGenerationId(rsHandler, updateMsg))
360        {
361          addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
362        }
363      }
364    }
365
366    // Push the message to the LDAP servers
367    for (DataServerHandler dsHandler : connectedDSs.values())
368    {
369      // Do not forward the change to the server that just sent it
370      if (dsHandler != sourceHandler
371          && !isUpdateMsgFiltered(updateMsg, dsHandler))
372      {
373        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
374      }
375    }
376  }
377
378  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
379      UpdateMsg updateMsg)
380  {
381    final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId());
382    if (isDifferent && logger.isTraceEnabled())
383    {
384      debug("updateMsg " + updateMsg.getCSN()
385          + " will not be sent to replication server "
386          + rsHandler.getServerId() + " with generation id "
387          + rsHandler.getGenerationId() + " different from local "
388          + "generation id " + generationId);
389    }
390    return isDifferent;
391  }
392
393  /**
394   * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS.
395   * <p>
396   * The RSD lock should not be taken here as it is acceptable to have a delay
397   * between the time the server has a wrong status and the fact we detect it:
398   * the updates that succeed to pass during this time will have no impact on
399   * remote server. But it is interesting to not saturate uselessly the network
400   * if the updates are not necessary so this check to stop sending updates is
401   * interesting anyway. Not taking the RSD lock allows to have better
402   * performances in normal mode (most of the time).
403   */
404  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler)
405  {
406    final ServerStatus dsStatus = dsHandler.getStatus();
407    if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
408    {
409      if (logger.isTraceEnabled())
410      {
411        debug("updateMsg " + updateMsg.getCSN()
412            + " will not be sent to directory server "
413            + dsHandler.getServerId() + " with generation id "
414            + dsHandler.getGenerationId() + " different from local "
415            + "generation id " + generationId);
416      }
417      return true;
418    }
419    else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
420    {
421      if (logger.isTraceEnabled())
422      {
423        debug("updateMsg " + updateMsg.getCSN()
424            + " will not be sent to directory server "
425            + dsHandler.getServerId() + " as it is in full update");
426      }
427      return true;
428    }
429    // Replica offline messages should not get to a connected DS, they are meant to be
430    // exchanged only between RSes
431    return updateMsg instanceof ReplicaOfflineMsg;
432  }
433
434  private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
435      ServerHandler sourceHandler) throws IOException
436  {
437    // Assured feature is supported starting from replication protocol V2
438    if (!updateMsg.isAssured()
439        || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2)
440    {
441      return null;
442    }
443
444    // According to assured sub-mode, prepare structures to keep track of
445    // the acks we are interested in.
446    switch (updateMsg.getAssuredMode())
447    {
448    case SAFE_DATA_MODE:
449      sourceHandler.incrementAssuredSdReceivedUpdates();
450      return processSafeDataUpdateMsg(updateMsg, sourceHandler);
451
452    case SAFE_READ_MODE:
453      sourceHandler.incrementAssuredSrReceivedUpdates();
454      return processSafeReadUpdateMsg(updateMsg, sourceHandler);
455
456    default:
457      // Unknown assured mode: should never happen
458      logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
459          localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg);
460      return null;
461    }
462  }
463
464  private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo)
465  {
466    List<Integer> expectedServers = null;
467    if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null)
468    {
469      expectedServers = preparedAssuredInfo.expectedServers;
470      // Store the expected acks info into the global map.
471      // The code for processing reception of acks for this update will update
472      // info kept in this object and if enough acks received, it will send
473      // back the final ack to the requester and remove the object from this map
474      // OR
475      // The following timer will time out and send an timeout ack to the
476      // requester if the acks are not received in time. The timer will also
477      // remove the object from this map.
478      final CSN csn = updateMsg.getCSN();
479      waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
480
481      // Arm timer for this assured update message (wait for acks until it times out)
482      final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
483      assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout());
484      // Purge timer every 100 treated messages
485      assuredTimeoutTimerPurgeCounter++;
486      if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
487      {
488        assuredTimeoutTimer.purge();
489      }
490    }
491
492    return expectedServers != null ? expectedServers : Collections.<Integer> emptyList();
493  }
494
495  private boolean publishUpdateMsg(UpdateMsg updateMsg)
496  {
497    try
498    {
499      if (updateMsg instanceof ReplicaOfflineMsg)
500      {
501        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
502        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
503        return true;
504      }
505
506      if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
507      {
508        /*
509         * JNR: Matt and I had a hard time figuring out where to put this
510         * synchronized block. We elected to put it here, but without a strong
511         * conviction.
512         */
513        synchronized (generationIDLock)
514        {
515          /*
516           * JNR: I think the generationIdSavedStatus is set to true because
517           * method above created a ReplicaDB which assumes the generationId was
518           * communicated to another server. Hence setting true on this field
519           * prevent the generationId from being reset.
520           */
521          generationIdSavedStatus = true;
522        }
523      }
524      return true;
525    }
526    catch (ChangelogException e)
527    {
528      /*
529       * Because of database problem we can't save any more changes from at
530       * least one LDAP server. This replicationServer therefore can't do it's
531       * job properly anymore and needs to close all its connections and
532       * shutdown itself.
533       */
534      logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e));
535      localReplicationServer.shutdown();
536      return false;
537    }
538  }
539
540  private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg,
541      NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers)
542  {
543    // Assured mode: post an assured or not assured matching update message
544    // according to what has been computed for the destination server
545    if (notAssuredUpdateMsg != null
546        && !assuredServers.contains(sHandler.getServerId()))
547    {
548      sHandler.add(notAssuredUpdateMsg);
549    }
550    else
551    {
552      sHandler.add(updateMsg);
553    }
554  }
555
556  /**
557   * Helper class to be the return type of a method that processes a just
558   * received assured update message:
559   * - processSafeReadUpdateMsg
560   * - processSafeDataUpdateMsg
561   * This is a facility to pack many interesting returned object.
562   */
563  private class PreparedAssuredInfo
564  {
565      /**
566       * The list of servers identified as servers we are interested in
567       * receiving acks from. If this list is not null, then expectedAcksInfo
568       * should be not null.
569       * Servers that are not in this list are servers not eligible for an ack
570       * request.
571       */
572      public List<Integer> expectedServers;
573
574      /**
575       * The constructed ExpectedAcksInfo object to be used when acks will be
576       * received. Null if expectedServers is null.
577       */
578      public ExpectedAcksInfo expectedAcksInfo;
579  }
580
581  /**
582   * Process a just received assured update message in Safe Read mode. If the
583   * ack can be sent immediately, it is done here. This will also determine to
584   * which suitable servers an ack should be requested from, and which ones are
585   * not eligible for an ack request.
586   * This method is an helper method for the put method. Have a look at the put
587   * method for a better understanding.
588   * @param update The just received assured update to process.
589   * @param sourceHandler The ServerHandler for the server from which the
590   *        update was received
591   * @return A suitable PreparedAssuredInfo object that contains every needed
592   * info to proceed with post to server writers.
593   * @throws IOException When an IO exception happens during the update
594   *         processing.
595   */
596  private PreparedAssuredInfo processSafeReadUpdateMsg(
597    UpdateMsg update, ServerHandler sourceHandler) throws IOException
598  {
599    CSN csn = update.getCSN();
600    byte groupId = localReplicationServer.getGroupId();
601    byte sourceGroupId = sourceHandler.getGroupId();
602    List<Integer> expectedServers = new ArrayList<>();
603    List<Integer> wrongStatusServers = new ArrayList<>();
604
605    if (sourceGroupId == groupId)
606      // Assured feature does not cross different group ids
607    {
608      if (sourceHandler.isDataServer())
609      {
610        collectRSsEligibleForAssuredReplication(groupId, expectedServers);
611      }
612
613      // Look for DS eligible for assured
614      for (DataServerHandler dsHandler : connectedDSs.values())
615      {
616        // Don't forward the change to the server that just sent it
617        if (dsHandler == sourceHandler)
618        {
619          continue;
620        }
621        if (dsHandler.getGroupId() == groupId)
622          // No ack expected from a DS with different group id
623        {
624          ServerStatus serverStatus = dsHandler.getStatus();
625          if (serverStatus == ServerStatus.NORMAL_STATUS)
626          {
627            expectedServers.add(dsHandler.getServerId());
628          } else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
629            // No ack expected from a DS with wrong status
630            wrongStatusServers.add(dsHandler.getServerId());
631          }
632          /*
633           * else
634           * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
635           * We do not want this to be reported as an error to the update
636           * maker -> no pollution or potential misunderstanding when
637           * reading logs or monitoring and it was just administration (for
638           * instance new server is being configured in topo: it goes in bad
639           * gen then full update).
640           */
641        }
642      }
643    }
644
645    // Return computed structures
646    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
647    if (!expectedServers.isEmpty())
648    {
649      // Some other acks to wait for
650      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn,
651        sourceHandler, expectedServers, wrongStatusServers);
652      preparedAssuredInfo.expectedServers = expectedServers;
653    }
654
655    if (preparedAssuredInfo.expectedServers == null)
656    {
657      // No eligible servers found, send the ack immediately
658      sourceHandler.send(new AckMsg(csn));
659    }
660
661    return preparedAssuredInfo;
662  }
663
664  /**
665   * Process a just received assured update message in Safe Data mode. If the
666   * ack can be sent immediately, it is done here. This will also determine to
667   * which suitable servers an ack should be requested from, and which ones are
668   * not eligible for an ack request.
669   * This method is an helper method for the put method. Have a look at the put
670   * method for a better understanding.
671   * @param update The just received assured update to process.
672   * @param sourceHandler The ServerHandler for the server from which the
673   *        update was received
674   * @return A suitable PreparedAssuredInfo object that contains every needed
675   * info to proceed with post to server writers.
676   * @throws IOException When an IO exception happens during the update
677   *         processing.
678   */
679  private PreparedAssuredInfo processSafeDataUpdateMsg(
680    UpdateMsg update, ServerHandler sourceHandler) throws IOException
681  {
682    CSN csn = update.getCSN();
683    boolean interestedInAcks = false;
684    byte safeDataLevel = update.getSafeDataLevel();
685    byte groupId = localReplicationServer.getGroupId();
686    byte sourceGroupId = sourceHandler.getGroupId();
687    if (safeDataLevel < (byte) 1)
688    {
689      // Should never happen
690      logger.error(ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL,
691          localReplicationServer.getServerId(), safeDataLevel, baseDN, update);
692    } else if (sourceGroupId == groupId
693    // Assured feature does not cross different group IDS
694        && isSameGenerationId(sourceHandler.getGenerationId()))
695    // Ignore assured updates from wrong generationId servers
696    {
697        if (sourceHandler.isDataServer())
698        {
699          if (safeDataLevel == (byte) 1)
700          {
701            /**
702             * Immediately return the ack for an assured message in safe data
703             * mode with safe data level 1, coming from a DS. No need to wait
704             * for more acks
705             */
706            sourceHandler.send(new AckMsg(csn));
707          } else
708          {
709            /**
710             * level > 1 : We need further acks
711             * The message will be posted in assured mode to eligible
712             * servers. The embedded safe data level is not changed, and his
713             * value will be used by a remote RS to determine if he must send
714             * an ack (level > 1) or not (level = 1)
715             */
716            interestedInAcks = true;
717          }
718        } else
719        { // A RS sent us the safe data message, for sure no further ack to wait
720          /**
721           * Level 1 has already been reached so no further acks to wait.
722           * Just deal with level > 1
723           */
724          if (safeDataLevel > (byte) 1)
725          {
726            sourceHandler.send(new AckMsg(csn));
727          }
728        }
729    }
730
731    List<Integer> expectedServers = new ArrayList<>();
732    if (interestedInAcks && sourceHandler.isDataServer())
733    {
734      collectRSsEligibleForAssuredReplication(groupId, expectedServers);
735    }
736
737    // Return computed structures
738    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
739    int nExpectedServers = expectedServers.size();
740    if (interestedInAcks) // interestedInAcks so level > 1
741    {
742      if (nExpectedServers > 0)
743      {
744        // Some other acks to wait for
745        int sdl = update.getSafeDataLevel();
746        int neededAdditionalServers = sdl - 1;
747        // Change the number of expected acks if not enough available eligible
748        // servers: the level is a best effort thing, we do not want to timeout
749        // at every assured SD update for instance if a RS has had his gen id
750        // reseted
751        byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
752          (byte)sdl : // Keep level as it was
753          (byte)(nExpectedServers+1); // Change level to match what's available
754        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn,
755          sourceHandler, finalSdl, expectedServers);
756        preparedAssuredInfo.expectedServers = expectedServers;
757      } else
758      {
759        // level > 1 and source is a DS but no eligible servers found, send the
760        // ack immediately
761        sourceHandler.send(new AckMsg(csn));
762      }
763    }
764
765    return preparedAssuredInfo;
766  }
767
768  private void collectRSsEligibleForAssuredReplication(byte groupId,
769      List<Integer> expectedServers)
770  {
771    for (ReplicationServerHandler rsHandler : connectedRSs.values())
772    {
773      if (rsHandler.getGroupId() == groupId
774      // No ack expected from a RS with different group id
775            && isSameGenerationId(rsHandler.getGenerationId())
776        // No ack expected from a RS with bad gen id
777        )
778      {
779        expectedServers.add(rsHandler.getServerId());
780      }
781    }
782  }
783
784  private boolean isSameGenerationId(long generationId)
785  {
786    return this.generationId > 0 && this.generationId == generationId;
787  }
788
789  private boolean isDifferentGenerationId(long generationId)
790  {
791    return this.generationId > 0 && this.generationId != generationId;
792  }
793
794  /**
795   * Process an ack received from a given server.
796   *
797   * @param ack The ack message received.
798   * @param ackingServer The server handler of the server that sent the ack.
799   */
800  void processAck(AckMsg ack, ServerHandler ackingServer)
801  {
802    // Retrieve the expected acks info for the update matching the original
803    // sent update.
804    CSN csn = ack.getCSN();
805    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
806
807    if (expectedAcksInfo != null)
808    {
809      // Prevent concurrent access from processAck() or AssuredTimeoutTask.run()
810      synchronized (expectedAcksInfo)
811      {
812        if (expectedAcksInfo.isCompleted())
813        {
814          // Timeout code is sending a timeout ack, do nothing and let him
815          // remove object from the map
816          return;
817        }
818        /**
819         *
820         * If this is the last ack we were waiting from, immediately create and
821         * send the final ack to the original server
822         */
823        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
824        {
825          // Remove the object from the map as no more needed
826          waitingAcks.remove(csn);
827          AckMsg finalAck = expectedAcksInfo.createAck(false);
828          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
829          try
830          {
831            origServer.send(finalAck);
832          } catch (IOException e)
833          {
834            /**
835             * An error happened trying the send back an ack to the server.
836             * Log an error and close the connection to this server.
837             */
838            LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
839            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
840                localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN));
841            mb.append(" ");
842            mb.append(stackTraceToSingleLineString(e));
843            logger.error(mb.toMessage());
844            stopServer(origServer, false);
845          }
846          // Mark the ack info object as completed to prevent potential timeout
847          // code parallel run
848          expectedAcksInfo.completed();
849        }
850      }
851    }
852    /* Else the timeout occurred for the update matching this CSN
853     * and the ack with timeout error has probably already been sent.
854     */
855  }
856
857  /**
858   * The code run when the timeout occurs while waiting for acks of the
859   * eligible servers. This basically sends a timeout ack (with any additional
860   * error info) to the original server that sent an assured update message.
861   */
862  private class AssuredTimeoutTask extends TimerTask
863  {
864    private CSN csn;
865
866    /**
867     * Constructor for the timer task.
868     * @param csn The CSN of the assured update we are waiting acks for
869     */
870    public AssuredTimeoutTask(CSN csn)
871    {
872      this.csn = csn;
873    }
874
875    /**
876     * Run when the assured timeout for an assured update message we are waiting
877     * acks for occurs.
878     */
879    @Override
880    public void run()
881    {
882      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
883
884      if (expectedAcksInfo != null)
885      {
886        synchronized (expectedAcksInfo)
887        {
888          if (expectedAcksInfo.isCompleted())
889          {
890            // processAck() code is sending the ack, do nothing and let him
891            // remove object from the map
892            return;
893          }
894          // Remove the object from the map as no more needed
895          waitingAcks.remove(csn);
896          // Create the timeout ack and send him to the server the assured
897          // update message came from
898          AckMsg finalAck = expectedAcksInfo.createAck(true);
899          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
900          if (logger.isTraceEnabled())
901          {
902            debug("sending timeout for assured update with CSN " + csn
903                + " to serverId=" + origServer.getServerId());
904          }
905          try
906          {
907            origServer.send(finalAck);
908          } catch (IOException e)
909          {
910            /**
911             * An error happened trying the send back an ack to the server.
912             * Log an error and close the connection to this server.
913             */
914            LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
915            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
916                localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN));
917            mb.append(" ");
918            mb.append(stackTraceToSingleLineString(e));
919            logger.error(mb.toMessage());
920            stopServer(origServer, false);
921          }
922          // Increment assured counters
923          boolean safeRead =
924              expectedAcksInfo instanceof SafeReadExpectedAcksInfo;
925          if (safeRead)
926          {
927            origServer.incrementAssuredSrReceivedUpdatesTimeout();
928          } else
929          {
930            if (origServer.isDataServer())
931            {
932              origServer.incrementAssuredSdReceivedUpdatesTimeout();
933            }
934          }
935          //   retrieve expected servers in timeout to increment their counter
936          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
937          for (Integer serverId : serversInTimeout)
938          {
939            ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
940            ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
941            if (expectedDSInTimeout != null)
942            {
943              if (safeRead)
944              {
945                expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout();
946              } // else no SD update sent to a DS (meaningless)
947            } else if (expectedRSInTimeout != null)
948            {
949              if (safeRead)
950              {
951                expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout();
952              }
953              else
954              {
955                expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout();
956              }
957            }
958            // else server disappeared ? Let's forget about it.
959          }
960          // Mark the ack info object as completed to prevent potential
961          // processAck() code parallel run
962          expectedAcksInfo.completed();
963        }
964      }
965    }
966  }
967
968
969  /**
970   * Stop operations with a list of replication servers.
971   *
972   * @param serversToDisconnect
973   *          the replication servers addresses for which we want to stop
974   *          operations
975   */
976  public void stopReplicationServers(Collection<HostPort> serversToDisconnect)
977  {
978    for (ReplicationServerHandler rsHandler : connectedRSs.values())
979    {
980      if (serversToDisconnect.contains(
981            HostPort.valueOf(rsHandler.getServerAddressURL())))
982      {
983        stopServer(rsHandler, false);
984      }
985    }
986  }
987
988  /**
989   * Stop operations with all servers this domain is connected with (RS and DS).
990   *
991   * @param shutdown A boolean indicating if the stop is due to a
992   *                 shutdown condition.
993   */
994  public void stopAllServers(boolean shutdown)
995  {
996    for (ReplicationServerHandler rsHandler : connectedRSs.values())
997    {
998      stopServer(rsHandler, shutdown);
999    }
1000
1001    for (DataServerHandler dsHandler : connectedDSs.values())
1002    {
1003      stopServer(dsHandler, shutdown);
1004    }
1005  }
1006
1007  /**
1008   * Checks whether it is already connected to a DS with same id.
1009   *
1010   * @param dsHandler
1011   *          the DS we want to check
1012   * @return true if this DS is already connected to the current server
1013   */
1014  public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler)
1015  {
1016    if (connectedDSs.containsKey(dsHandler.getServerId()))
1017    {
1018      // looks like two connected LDAP servers have the same serverId
1019      logger.error(ERR_DUPLICATE_SERVER_ID, localReplicationServer.getMonitorInstanceName(),
1020          connectedDSs.get(dsHandler.getServerId()), dsHandler, dsHandler.getServerId());
1021      return true;
1022    }
1023    return false;
1024  }
1025
1026  /**
1027   * Stop operations with a given server.
1028   *
1029   * @param sHandler the server for which we want to stop operations.
1030   * @param shutdown A boolean indicating if the stop is due to a
1031   *                 shutdown condition.
1032   */
1033  public void stopServer(ServerHandler sHandler, boolean shutdown)
1034  {
1035    // TODO JNR merge with stopServer(MessageHandler)
1036    if (logger.isTraceEnabled())
1037    {
1038      debug("stopServer() on the server handler " + sHandler);
1039    }
1040    /*
1041     * We must prevent deadlock on replication server domain lock, when for
1042     * instance this code is called from dying ServerReader but also dying
1043     * ServerWriter at the same time, or from a thread that wants to shut down
1044     * the handler. So use a thread safe flag to know if the job must be done
1045     * or not (is already being processed or not).
1046     */
1047    if (!sHandler.engageShutdown())
1048      // Only do this once (prevent other thread to enter here again)
1049    {
1050      if (!shutdown)
1051      {
1052        try
1053        {
1054          // Acquire lock on domain (see more details in comment of start()
1055          // method of ServerHandler)
1056          lock();
1057        }
1058        catch (InterruptedException ex)
1059        {
1060          // We can't deal with this here, so re-interrupt thread so that it is
1061          // caught during subsequent IO.
1062          Thread.currentThread().interrupt();
1063          return;
1064        }
1065      }
1066
1067      try
1068      {
1069        // Stop useless monitoring publisher if no more RS or DS in domain
1070        if ( (connectedDSs.size() + connectedRSs.size() )== 1)
1071        {
1072          if (logger.isTraceEnabled())
1073          {
1074            debug("remote server " + sHandler
1075                + " is the last RS/DS to be stopped:"
1076                + " stopping monitoring publisher");
1077          }
1078          stopMonitoringPublisher();
1079        }
1080
1081        if (connectedRSs.containsKey(sHandler.getServerId()))
1082        {
1083          unregisterServerHandler(sHandler, shutdown, false);
1084        }
1085        else if (connectedDSs.containsKey(sHandler.getServerId()))
1086        {
1087          unregisterServerHandler(sHandler, shutdown, true);
1088        }
1089      }
1090      catch(Exception e)
1091      {
1092        logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1093      }
1094      finally
1095      {
1096        if (!shutdown)
1097        {
1098          release();
1099        }
1100      }
1101    }
1102  }
1103
1104  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
1105      boolean isDirectoryServer)
1106  {
1107    unregisterServerHandler(sHandler);
1108    sHandler.shutdown();
1109
1110    resetGenerationIdIfPossible();
1111    if (!shutdown)
1112    {
1113      synchronized (pendingStatusMessagesLock)
1114      {
1115        if (isDirectoryServer)
1116        {
1117          // Update the remote replication servers with our list
1118          // of connected LDAP servers
1119          pendingStatusMessages.enqueueTopoInfoToAllRSs();
1120        }
1121        // Warn our DSs that a RS or DS has quit (does not use this
1122        // handler as already removed from list)
1123        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
1124      }
1125      statusAnalyzer.notifyPendingStatusMessage();
1126    }
1127  }
1128
1129  /**
1130   * Unregister this handler from the list of handlers registered to this
1131   * domain.
1132   * @param sHandler the provided handler to unregister.
1133   */
1134  private void unregisterServerHandler(ServerHandler sHandler)
1135  {
1136    if (sHandler.isReplicationServer())
1137    {
1138      connectedRSs.remove(sHandler.getServerId());
1139    }
1140    else
1141    {
1142      connectedDSs.remove(sHandler.getServerId());
1143    }
1144  }
1145
1146  /**
1147   * This method resets the generationId for this domain if there is no LDAP
1148   * server currently connected in the whole topology on this domain and if the
1149   * generationId has never been saved.
1150   * <ul>
1151   * <li>test emptiness of {@link #connectedDSs} list</li>
1152   * <li>traverse {@link #connectedRSs} list and test for each if DS are
1153   * connected</li>
1154   * </ul>
1155   * So it strongly relies on the {@link #connectedDSs} list
1156   */
1157  private void resetGenerationIdIfPossible()
1158  {
1159    if (logger.isTraceEnabled())
1160    {
1161      debug("mayResetGenerationId generationIdSavedStatus="
1162          + generationIdSavedStatus);
1163    }
1164
1165    // If there is no more any LDAP server connected to this domain in the
1166    // topology and the generationId has never been saved, then we can reset
1167    // it and the next LDAP server to connect will become the new reference.
1168    boolean ldapServersConnectedInTheTopology = false;
1169    if (connectedDSs.isEmpty())
1170    {
1171      for (ReplicationServerHandler rsHandler : connectedRSs.values())
1172      {
1173        if (generationId != rsHandler.getGenerationId())
1174        {
1175          if (logger.isTraceEnabled())
1176          {
1177            debug("mayResetGenerationId skip RS " + rsHandler
1178                + " that has different genId");
1179          }
1180        }
1181        else if (rsHandler.hasRemoteLDAPServers())
1182        {
1183          ldapServersConnectedInTheTopology = true;
1184
1185          if (logger.isTraceEnabled())
1186          {
1187            debug("mayResetGenerationId RS " + rsHandler
1188                + " has ldap servers connected to it"
1189                + " - will not reset generationId");
1190          }
1191          break;
1192        }
1193      }
1194    }
1195    else
1196    {
1197      ldapServersConnectedInTheTopology = true;
1198
1199      if (logger.isTraceEnabled())
1200      {
1201        debug("has ldap servers connected to it - will not reset generationId");
1202      }
1203    }
1204
1205    if (!ldapServersConnectedInTheTopology
1206        && !generationIdSavedStatus
1207        && generationId != -1)
1208    {
1209      changeGenerationId(-1);
1210    }
1211  }
1212
1213  /**
1214   * Checks whether a remote RS is already connected to this hosting RS.
1215   *
1216   * @param rsHandler
1217   *          The handler for the remote RS.
1218   * @return flag specifying whether the remote RS is already connected.
1219   * @throws DirectoryException
1220   *           when a problem occurs.
1221   */
1222  public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler)
1223      throws DirectoryException
1224  {
1225    ReplicationServerHandler oldRsHandler =
1226        connectedRSs.get(rsHandler.getServerId());
1227    if (oldRsHandler == null)
1228    {
1229      return false;
1230    }
1231
1232    if (oldRsHandler.getServerAddressURL().equals(
1233        rsHandler.getServerAddressURL()))
1234    {
1235      // this is the same server, this means that our ServerStart messages
1236      // have been sent at about the same time and 2 connections
1237      // have been established.
1238      // Silently drop this connection.
1239      return true;
1240    }
1241
1242    // looks like two replication servers have the same serverId
1243    // log an error message and drop this connection.
1244    LocalizableMessage message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
1245        localReplicationServer.getMonitorInstanceName(),
1246        oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(),
1247        rsHandler.getServerId());
1248    throw new DirectoryException(ResultCode.OTHER, message);
1249  }
1250
1251  /**
1252   * Creates and returns a cursor across this replication domain.
1253   * <p>
1254   * Client code must call {@link DBCursor#next()} to advance the cursor to the
1255   * next available record.
1256   * <p>
1257   * When the cursor is not used anymore, client code MUST call the
1258   * {@link DBCursor#close()} method to free the resources and locks used by the
1259   * cursor.
1260   *
1261   * @param startAfterServerState
1262   *          Starting point for the replicaDB cursors. If null, start from the
1263   *          oldest CSN
1264   * @return a non null {@link DBCursor} going from oldest to newest CSN
1265   * @throws ChangelogException
1266   *           If a database problem happened
1267   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, CursorOptions)
1268   */
1269  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
1270      throws ChangelogException
1271  {
1272    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
1273    return domainDB.getCursorFrom(baseDN, startAfterServerState, options);
1274  }
1275
1276  /**
1277   * Get the baseDN.
1278   *
1279   * @return Returns the baseDN.
1280   */
1281  public DN getBaseDN()
1282  {
1283    return baseDN;
1284  }
1285
1286  /**
1287   * Retrieves the destination handlers for a routable message.
1288   *
1289   * @param msg The message to route.
1290   * @param senderHandler The handler of the server that published this message.
1291   * @return The list of destination handlers.
1292   */
1293  private List<ServerHandler> getDestinationServers(RoutableMsg msg,
1294    ServerHandler senderHandler)
1295  {
1296    List<ServerHandler> servers = new ArrayList<>();
1297
1298    if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER)
1299    {
1300      // TODO Import from the "closest server" to be implemented
1301    } else if (msg.getDestination() == RoutableMsg.ALL_SERVERS)
1302    {
1303      if (!senderHandler.isReplicationServer())
1304      {
1305        // Send to all replication servers with a least one remote
1306        // server connected
1307        for (ReplicationServerHandler rsh : connectedRSs.values())
1308        {
1309          if (rsh.hasRemoteLDAPServers())
1310          {
1311            servers.add(rsh);
1312          }
1313        }
1314      }
1315
1316      // Sends to all connected LDAP servers
1317      for (DataServerHandler destinationHandler : connectedDSs.values())
1318      {
1319        // Don't loop on the sender
1320        if (destinationHandler == senderHandler)
1321        {
1322          continue;
1323        }
1324        servers.add(destinationHandler);
1325      }
1326    } else
1327    {
1328      // Destination is one server
1329      DataServerHandler destinationHandler =
1330        connectedDSs.get(msg.getDestination());
1331      if (destinationHandler != null)
1332      {
1333        servers.add(destinationHandler);
1334      } else
1335      {
1336        // the targeted server is NOT connected
1337        // Let's search for the replication server that MAY
1338        // have the targeted server connected.
1339        if (senderHandler.isDataServer())
1340        {
1341          for (ReplicationServerHandler rsHandler : connectedRSs.values())
1342          {
1343            // Send to all replication servers with a least one remote
1344            // server connected
1345            if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
1346            {
1347              servers.add(rsHandler);
1348            }
1349          }
1350        }
1351      }
1352    }
1353    return servers;
1354  }
1355
1356
1357
1358  /**
1359   * Processes a message coming from one server in the topology and potentially
1360   * forwards it to one or all other servers.
1361   *
1362   * @param msg
1363   *          The message received and to be processed.
1364   * @param sender
1365   *          The server handler of the server that sent the message.
1366   */
1367  void process(RoutableMsg msg, ServerHandler sender)
1368  {
1369    if (msg.getDestination() == localReplicationServer.getServerId())
1370    {
1371      // Handle routable messages targeted at this RS.
1372      if (msg instanceof ErrorMsg)
1373      {
1374        ErrorMsg errorMsg = (ErrorMsg) msg;
1375        logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
1376      }
1377      else
1378      {
1379        replyWithUnroutableMsgType(sender, msg);
1380      }
1381    }
1382    else
1383    {
1384      // Forward message not destined for this RS.
1385      List<ServerHandler> servers = getDestinationServers(msg, sender);
1386      if (!servers.isEmpty())
1387      {
1388        forwardMsgToAllServers(msg, servers, sender);
1389      }
1390      else
1391      {
1392        replyWithUnreachablePeerMsg(sender, msg);
1393      }
1394    }
1395  }
1396
1397  /**
1398   * Responds to a monitor request message.
1399   *
1400   * @param msg
1401   *          The monitor request message.
1402   * @param sender
1403   *          The DS/RS which sent the monitor request.
1404   */
1405  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
1406  {
1407    enqueueMonitorMsg(msg, sender);
1408  }
1409
1410  /**
1411   * Responds to a monitor message.
1412   *
1413   * @param msg
1414   *          The monitor message
1415   * @param sender
1416   *          The DS/RS which sent the monitor.
1417   */
1418  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
1419  {
1420    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
1421  }
1422
1423  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
1424      RoutableMsg msg)
1425  {
1426    String msgClassname = msg.getClass().getCanonicalName();
1427    logger.info(NOTE_ERR_ROUTING_TO_SERVER, msgClassname);
1428
1429    LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1430    mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
1431    mb.append("serverID:").append(msg.getDestination());
1432    ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage());
1433    try
1434    {
1435      msgEmitter.send(errMsg);
1436    }
1437    catch (IOException ignored)
1438    {
1439      // an error happened on the sender session trying to recover
1440      // from an error on the receiver session.
1441      // Not much more we can do at this point.
1442    }
1443  }
1444
1445  private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter,
1446      RoutableMsg msg)
1447  {
1448    LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1449    mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination()));
1450    mb.append(" In Replication Server=").append(
1451      this.localReplicationServer.getMonitorInstanceName());
1452    mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
1453    mb.append(" Details:routing table is empty");
1454    final LocalizableMessage message = mb.toMessage();
1455    logger.error(message);
1456
1457    ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(),
1458        msg.getSenderID(), message);
1459    try
1460    {
1461      msgEmitter.send(errMsg);
1462    }
1463    catch (IOException ignored)
1464    {
1465      // TODO Handle error properly (sender timeout in addition)
1466      /*
1467       * An error happened trying to send an error msg to this server.
1468       * Log an error and close the connection to this server.
1469       */
1470      logger.error(ERR_CHANGELOG_ERROR_SENDING_ERROR, this, ignored);
1471      stopServer(msgEmitter, false);
1472    }
1473  }
1474
1475  private void forwardMsgToAllServers(RoutableMsg msg,
1476      List<ServerHandler> servers, ServerHandler sender)
1477  {
1478    for (ServerHandler targetHandler : servers)
1479    {
1480      try
1481      {
1482        targetHandler.send(msg);
1483      } catch (IOException ioe)
1484      {
1485        /*
1486         * An error happened trying to send a routable message to its
1487         * destination server.
1488         * Send back an error to the originator of the message.
1489         */
1490        LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1491        mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination()));
1492        mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
1493        mb.append(" Details: ").append(ioe.getLocalizedMessage());
1494        final LocalizableMessage message = mb.toMessage();
1495        logger.error(message);
1496
1497        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
1498        try
1499        {
1500          sender.send(errMsg);
1501        } catch (IOException ioe1)
1502        {
1503          // an error happened on the sender session trying to recover
1504          // from an error on the receiver session.
1505          // We don't have much solution left beside closing the sessions.
1506          stopServer(sender, false);
1507          stopServer(targetHandler, false);
1508        }
1509      // TODO Handle error properly (sender timeout in addition)
1510      }
1511    }
1512  }
1513
1514  /**
1515   * Creates a new monitor message including monitoring information for the
1516   * whole topology.
1517   *
1518   * @param sender
1519   *          The sender of this message.
1520   * @param destination
1521   *          The destination of this message.
1522   * @return The newly created and filled MonitorMsg. Null if a problem occurred
1523   *         during message creation.
1524   * @throws InterruptedException
1525   *           if this thread is interrupted while waiting for a response
1526   */
1527  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
1528      throws InterruptedException
1529  {
1530    return createGlobalTopologyMonitorMsg(sender, destination,
1531        domainMonitor.recomputeMonitorData());
1532  }
1533
1534  private MonitorMsg createGlobalTopologyMonitorMsg(int sender,
1535      int destination, ReplicationDomainMonitorData monitorData)
1536  {
1537    final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
1538    returnMsg.setReplServerDbState(getLatestServerState());
1539
1540    // Add the server state for each DS and RS currently in the topology.
1541    for (int replicaId : toIterable(monitorData.ldapIterator()))
1542    {
1543      returnMsg.setServerState(replicaId,
1544          monitorData.getLDAPServerState(replicaId),
1545          monitorData.getApproxFirstMissingDate(replicaId), true);
1546    }
1547
1548    for (int replicaId : toIterable(monitorData.rsIterator()))
1549    {
1550      returnMsg.setServerState(replicaId,
1551          monitorData.getRSStates(replicaId),
1552          monitorData.getRSApproxFirstMissingDate(replicaId), false);
1553    }
1554
1555    return returnMsg;
1556  }
1557
1558
1559
1560  /**
1561   * Creates a new monitor message including monitoring information for the
1562   * topology directly connected to this RS. This includes information for: -
1563   * local RS - all direct DSs - all direct RSs
1564   *
1565   * @param sender
1566   *          The sender of this message.
1567   * @param destination
1568   *          The destination of this message.
1569   * @return The newly created and filled MonitorMsg. Null if the current thread
1570   *         was interrupted while attempting to get the domain lock.
1571   */
1572  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
1573  {
1574    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
1575    monitorMsg.setReplServerDbState(getLatestServerState());
1576
1577    // Add the server state for each connected DS and RS.
1578    for (DataServerHandler dsHandler : this.connectedDSs.values())
1579    {
1580      monitorMsg.setServerState(dsHandler.getServerId(),
1581          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
1582          true);
1583    }
1584
1585    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
1586    {
1587      monitorMsg.setServerState(rsHandler.getServerId(),
1588          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
1589          false);
1590    }
1591    return monitorMsg;
1592  }
1593
1594  /**
1595   * Shutdown this ReplicationServerDomain.
1596   */
1597  public void shutdown()
1598  {
1599    DirectoryServer.deregisterMonitorProvider(this);
1600
1601    // Terminate the assured timer
1602    assuredTimeoutTimer.cancel();
1603
1604    stopAllServers(true);
1605    statusAnalyzer.shutdown();
1606  }
1607
1608  /**
1609   * Returns the latest most current ServerState describing the newest CSNs for
1610   * each server in this domain.
1611   *
1612   * @return The ServerState describing the newest CSNs for each server in in
1613   *         this domain.
1614   */
1615  public ServerState getLatestServerState()
1616  {
1617    return domainDB.getDomainNewestCSNs(baseDN);
1618  }
1619
1620  /** {@inheritDoc} */
1621  @Override
1622  public String toString()
1623  {
1624    return "ReplicationServerDomain " + baseDN;
1625  }
1626
1627
1628
1629  /**
1630   * Creates a TopologyMsg filled with information to be sent to a remote RS.
1631   * We send remote RS the info of every DS that are directly connected to us
1632   * plus our own info as RS.
1633   * @return A suitable TopologyMsg PDU to be sent to a peer RS
1634   */
1635  public TopologyMsg createTopologyMsgForRS()
1636  {
1637    List<DSInfo> dsInfos = new ArrayList<>();
1638    for (DataServerHandler dsHandler : connectedDSs.values())
1639    {
1640      dsInfos.add(dsHandler.toDSInfo());
1641    }
1642
1643    // Create info for the local RS
1644    List<RSInfo> rsInfos = newArrayList(toRSInfo(localReplicationServer, generationId));
1645
1646    return new TopologyMsg(dsInfos, rsInfos);
1647  }
1648
1649  /**
1650   * Creates a TopologyMsg filled with information to be sent to a DS.
1651   * We send remote DS the info of every known DS and RS in the topology (our
1652   * directly connected DSs plus the DSs connected to other RSs) except himself.
1653   * Also put info related to local RS.
1654   *
1655   * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and
1656   * that we must not include in the DS list.
1657   * @return A suitable TopologyMsg PDU to be sent to a peer DS
1658   */
1659  public TopologyMsg createTopologyMsgForDS(int destDsId)
1660  {
1661    // Go through every DSs (except recipient of msg)
1662    List<DSInfo> dsInfos = new ArrayList<>();
1663    for (DataServerHandler dsHandler : connectedDSs.values())
1664    {
1665      if (dsHandler.getServerId() == destDsId)
1666      {
1667        continue;
1668      }
1669      dsInfos.add(dsHandler.toDSInfo());
1670    }
1671
1672
1673    List<RSInfo> rsInfos = new ArrayList<>();
1674    // Add our own info (local RS)
1675    rsInfos.add(toRSInfo(localReplicationServer, generationId));
1676
1677    // Go through every peer RSs (and get their connected DSs), also add info
1678    // for RSs
1679    for (ReplicationServerHandler rsHandler : connectedRSs.values())
1680    {
1681      rsInfos.add(rsHandler.toRSInfo());
1682
1683      rsHandler.addDSInfos(dsInfos);
1684    }
1685
1686    return new TopologyMsg(dsInfos, rsInfos);
1687  }
1688
1689  private RSInfo toRSInfo(ReplicationServer rs, long generationId)
1690  {
1691    return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId,
1692        rs.getGroupId(), rs.getWeight());
1693  }
1694
1695  /**
1696   * Get the generationId associated to this domain.
1697   *
1698   * @return The generationId
1699   */
1700  public long getGenerationId()
1701  {
1702    return generationId;
1703  }
1704
1705  /**
1706   * Initialize the value of the generationID for this ReplicationServerDomain.
1707   * This method is intended to be used for initialization at startup and
1708   * simply stores the new value without any additional processing.
1709   * For example it does not clear the change-log DBs
1710   *
1711   * @param generationId The new value of generationId.
1712   */
1713  public void initGenerationID(long generationId)
1714  {
1715    synchronized (generationIDLock)
1716    {
1717      this.generationId = generationId;
1718      this.generationIdSavedStatus = true;
1719    }
1720  }
1721
1722  /**
1723   * Sets the provided value as the new in memory generationId.
1724   * Also clear the changelog databases.
1725   *
1726   * @param generationId The new value of generationId.
1727   * @return The old generation id
1728   */
1729  public long changeGenerationId(long generationId)
1730  {
1731    synchronized (generationIDLock)
1732    {
1733      long oldGenerationId = this.generationId;
1734
1735      if (this.generationId != generationId)
1736      {
1737        clearDbs();
1738
1739        this.generationId = generationId;
1740        this.generationIdSavedStatus = false;
1741      }
1742      return oldGenerationId;
1743    }
1744  }
1745
1746  /**
1747   * Resets the generationID.
1748   *
1749   * @param senderHandler The handler associated to the server
1750   *        that requested to reset the generationId.
1751   * @param genIdMsg The reset generation ID msg received.
1752   */
1753  public void resetGenerationId(ServerHandler senderHandler,
1754    ResetGenerationIdMsg genIdMsg)
1755  {
1756    if (logger.isTraceEnabled())
1757    {
1758      debug("Receiving ResetGenerationIdMsg from "
1759          + senderHandler.getServerId() + ":\n" + genIdMsg);
1760    }
1761
1762    try
1763    {
1764      // Acquire lock on domain (see more details in comment of start() method
1765      // of ServerHandler)
1766      lock();
1767    }
1768    catch (InterruptedException ex)
1769    {
1770      // We can't deal with this here, so re-interrupt thread so that it is
1771      // caught during subsequent IO.
1772      Thread.currentThread().interrupt();
1773      return;
1774    }
1775
1776    try
1777    {
1778      final long newGenId = genIdMsg.getGenerationId();
1779      if (newGenId != this.generationId)
1780      {
1781        changeGenerationId(newGenId);
1782      }
1783      else
1784      {
1785        // Order to take a gen id we already have, just ignore
1786        if (logger.isTraceEnabled())
1787        {
1788          debug("Reset generation id requested but generationId was already "
1789              + this.generationId + ":\n" + genIdMsg);
1790        }
1791      }
1792
1793      // If we are the first replication server warned,
1794      // then forwards the reset message to the remote replication servers
1795      for (ServerHandler rsHandler : connectedRSs.values())
1796      {
1797        try
1798        {
1799          // After we'll have sent the message , the remote RS will adopt
1800          // the new genId
1801          rsHandler.setGenerationId(newGenId);
1802          if (senderHandler.isDataServer())
1803          {
1804            rsHandler.send(genIdMsg);
1805          }
1806        } catch (IOException e)
1807        {
1808          logger.error(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID, baseDN, e.getMessage());
1809        }
1810      }
1811
1812      // Change status of the connected DSs according to the requested new
1813      // reference generation id
1814      for (DataServerHandler dsHandler : connectedDSs.values())
1815      {
1816        try
1817        {
1818          dsHandler.changeStatusForResetGenId(newGenId);
1819        } catch (IOException e)
1820        {
1821          logger.error(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID, baseDN,
1822              dsHandler.getServerId(), e.getMessage());
1823        }
1824      }
1825
1826      // Update every peers (RS/DS) with potential topology changes (status
1827      // change). Rather than doing that each time a DS has a status change
1828      // (consecutive to reset gen id message), we prefer advertising once for
1829      // all after changes (less packet sent), here at the end of the reset msg
1830      // treatment.
1831      sendTopoInfoToAll();
1832
1833      logger.info(NOTE_RESET_GENERATION_ID, baseDN, newGenId);
1834    }
1835    catch(Exception e)
1836    {
1837      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1838    }
1839    finally
1840    {
1841      release();
1842    }
1843  }
1844
1845  /**
1846   * Process message of a remote server changing his status.
1847   * @param senderHandler The handler associated to the server
1848   *        that changed his status.
1849   * @param csMsg The message containing the new status
1850   */
1851  public void processNewStatus(DataServerHandler senderHandler,
1852    ChangeStatusMsg csMsg)
1853  {
1854    if (logger.isTraceEnabled())
1855    {
1856      debug("receiving ChangeStatusMsg from " + senderHandler.getServerId()
1857          + ":\n" + csMsg);
1858    }
1859
1860    try
1861    {
1862      // Acquire lock on domain (see more details in comment of start() method
1863      // of ServerHandler)
1864      lock();
1865    }
1866    catch (InterruptedException ex)
1867    {
1868      // We can't deal with this here, so re-interrupt thread so that it is
1869      // caught during subsequent IO.
1870      Thread.currentThread().interrupt();
1871      return;
1872    }
1873
1874    try
1875    {
1876      ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
1877      if (newStatus == ServerStatus.INVALID_STATUS)
1878      {
1879        // Already logged an error in processNewStatus()
1880        // just return not to forward a bad status to topology
1881        return;
1882      }
1883
1884      enqueueTopoInfoToAllExcept(senderHandler);
1885
1886      logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
1887          senderHandler.getServerId(), baseDN, newStatus);
1888    }
1889    catch(Exception e)
1890    {
1891      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1892    }
1893    finally
1894    {
1895      release();
1896    }
1897  }
1898
1899  /**
1900   * Change the status of a directory server according to the event generated
1901   * from the status analyzer.
1902   * @param dsHandler The handler of the directory server to update
1903   * @param event The event to be used for new status computation
1904   * @return True if we have been interrupted (must stop), false otherwise
1905   */
1906  private boolean changeStatus(DataServerHandler dsHandler,
1907      StatusMachineEvent event)
1908  {
1909    try
1910    {
1911      // Acquire lock on domain (see ServerHandler#start() for more details)
1912      lock();
1913    }
1914    catch (InterruptedException ex)
1915    {
1916      // We have been interrupted for dying, from stopStatusAnalyzer
1917      // to prevent deadlock in this situation:
1918      // RS is being shutdown, and stopServer will call stopStatusAnalyzer.
1919      // Domain lock is taken by shutdown thread while status analyzer thread
1920      // is willing to change the status of a server at the same time so is
1921      // waiting for the domain lock at the same time. As shutdown thread is
1922      // waiting for analyzer thread death, a deadlock occurs. So we force
1923      // interruption of the status analyzer thread death after 2 seconds if
1924      // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
1925      // to have the analyzer thread taking the domain lock only when the
1926      // status of a DS has to be changed. See more comments in run method of
1927      // StatusAnalyzer.
1928      if (logger.isTraceEnabled())
1929      {
1930        logger.trace("Status analyzer for domain " + baseDN
1931            + " has been interrupted when"
1932            + " trying to acquire domain lock for changing the status of DS "
1933            + dsHandler.getServerId());
1934      }
1935      return true;
1936    }
1937
1938    try
1939    {
1940      ServerStatus newStatus = ServerStatus.INVALID_STATUS;
1941      ServerStatus oldStatus = dsHandler.getStatus();
1942      try
1943      {
1944        newStatus = dsHandler.changeStatus(event);
1945      }
1946      catch (IOException e)
1947      {
1948        logger.error(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER,
1949            baseDN, dsHandler.getServerId(), e.getMessage());
1950      }
1951
1952      if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus)
1953      {
1954        // Change was impossible or already occurred (see StatusAnalyzer
1955        // comments)
1956        return false;
1957      }
1958
1959      enqueueTopoInfoToAllExcept(dsHandler);
1960    }
1961    catch (Exception e)
1962    {
1963      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1964    }
1965    finally
1966    {
1967      release();
1968    }
1969
1970    return false;
1971  }
1972
1973  /**
1974   * Update every peers (RS/DS) with topology changes.
1975   */
1976  public void sendTopoInfoToAll()
1977  {
1978    enqueueTopoInfoToAllExcept(null);
1979  }
1980
1981  /**
1982   * Update every peers (RS/DS) with topology changes but one DS.
1983   *
1984   * @param dsHandler
1985   *          if not null, the topology message will not be sent to this DS
1986   */
1987  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
1988  {
1989    synchronized (pendingStatusMessagesLock)
1990    {
1991      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
1992      pendingStatusMessages.enqueueTopoInfoToAllRSs();
1993    }
1994    statusAnalyzer.notifyPendingStatusMessage();
1995  }
1996
1997  /**
1998   * Clears the Db associated with that domain.
1999   */
2000  private void clearDbs()
2001  {
2002    try
2003    {
2004      domainDB.removeDomain(baseDN);
2005    }
2006    catch (ChangelogException e)
2007    {
2008      logger.error(ERR_ERROR_CLEARING_DB, baseDN, e.getMessage(), e);
2009    }
2010  }
2011
2012  /**
2013   * Returns whether the provided server is in degraded
2014   * state due to the fact that the peer server has an invalid
2015   * generationId for this domain.
2016   *
2017   * @param serverId The serverId for which we want to know the
2018   *                 the state.
2019   * @return Whether it is degraded or not.
2020   */
2021  public boolean isDegradedDueToGenerationId(int serverId)
2022  {
2023    if (logger.isTraceEnabled())
2024    {
2025      debug("isDegraded serverId=" + serverId + " given local generation Id="
2026          + this.generationId);
2027    }
2028
2029    ServerHandler sHandler = connectedRSs.get(serverId);
2030    if (sHandler == null)
2031    {
2032      sHandler = connectedDSs.get(serverId);
2033      if (sHandler == null)
2034      {
2035        return false;
2036      }
2037    }
2038
2039    if (logger.isTraceEnabled())
2040    {
2041      debug("Compute degradation of serverId=" + serverId
2042          + " LS server generation Id=" + sHandler.getGenerationId());
2043    }
2044    return sHandler.getGenerationId() != this.generationId;
2045  }
2046
2047  /**
2048   * Process topology information received from a peer RS.
2049   * @param topoMsg The just received topo message from remote RS
2050   * @param rsHandler The handler that received the message.
2051   * @param allowResetGenId True for allowing to reset the generation id (
2052   * when called after initial handshake)
2053   * @throws IOException If an error occurred.
2054   * @throws DirectoryException If an error occurred.
2055   */
2056  public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
2057      ReplicationServerHandler rsHandler, boolean allowResetGenId)
2058      throws IOException, DirectoryException
2059  {
2060    if (logger.isTraceEnabled())
2061    {
2062      debug("receiving TopologyMsg from serverId=" + rsHandler.getServerId()
2063          + ":\n" + topoMsg);
2064    }
2065
2066    try
2067    {
2068      // Acquire lock on domain (see more details in comment of start() method
2069      // of ServerHandler)
2070      lock();
2071    }
2072    catch (InterruptedException ex)
2073    {
2074      // We can't deal with this here, so re-interrupt thread so that it is
2075      // caught during subsequent IO.
2076      Thread.currentThread().interrupt();
2077      return;
2078    }
2079
2080    try
2081    {
2082      // Store DS connected to remote RS & update information about the peer RS
2083      rsHandler.processTopoInfoFromRS(topoMsg);
2084
2085      // Handle generation id
2086      if (allowResetGenId)
2087      {
2088        resetGenerationIdIfPossible();
2089        setGenerationIdIfUnset(rsHandler.getGenerationId());
2090      }
2091
2092      if (isDifferentGenerationId(rsHandler.getGenerationId()))
2093      {
2094        LocalizableMessage message = WARN_BAD_GENERATION_ID_FROM_RS.get(rsHandler.getServerId(),
2095            rsHandler.session.getReadableRemoteAddress(), rsHandler.getGenerationId(),
2096            baseDN, getLocalRSServerId(), generationId);
2097        logger.warn(message);
2098
2099        ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
2100            rsHandler.getServerId(), message);
2101        rsHandler.send(errorMsg);
2102      }
2103
2104      /*
2105       * Sends the currently known topology information to every connected
2106       * DS we have.
2107       */
2108      synchronized (pendingStatusMessagesLock)
2109      {
2110        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
2111      }
2112      statusAnalyzer.notifyPendingStatusMessage();
2113    }
2114    catch(Exception e)
2115    {
2116      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
2117    }
2118    finally
2119    {
2120      release();
2121    }
2122  }
2123
2124  private void setGenerationIdIfUnset(long generationId)
2125  {
2126    if (this.generationId < 0)
2127    {
2128      this.generationId = generationId;
2129    }
2130  }
2131
2132  /**
2133   * Returns the latest monitor data available for this replication server
2134   * domain.
2135   *
2136   * @return The latest monitor data available for this replication server
2137   *         domain, which is never {@code null}.
2138   */
2139  ReplicationDomainMonitorData getDomainMonitorData()
2140  {
2141    return domainMonitor.getMonitorData();
2142  }
2143
2144  /**
2145   * Get the map of connected DSs.
2146   * @return The map of connected DSs
2147   */
2148  public Map<Integer, DataServerHandler> getConnectedDSs()
2149  {
2150    return Collections.unmodifiableMap(connectedDSs);
2151  }
2152
2153  /**
2154   * Get the map of connected RSs.
2155   * @return The map of connected RSs
2156   */
2157  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
2158  {
2159    return Collections.unmodifiableMap(connectedRSs);
2160  }
2161
2162
2163  /**
2164   * A synchronization mechanism is created to insure exclusive access to the
2165   * domain. The goal is to have a consistent view of the topology by locking
2166   * the structures holding the topology view of the domain:
2167   * {@link #connectedDSs} and {@link #connectedRSs}. When a connection is
2168   * established with a peer DS or RS, the lock should be taken before updating
2169   * these structures, then released. The same mechanism should be used when
2170   * updating any data related to the view of the topology: for instance if the
2171   * status of a DS is changed, the lock should be taken before updating the
2172   * matching server handler and sending the topology messages to peers and
2173   * released after.... This allows every member of the topology to have a
2174   * consistent view of the topology and to be sure it will not miss some
2175   * information.
2176   * <p>
2177   * So the locking system must be called (not exhaustive list):
2178   * <ul>
2179   * <li>when connection established with a DS or RS</li>
2180   * <li>when connection ended with a DS or RS</li>
2181   * <li>when receiving a TopologyMsg and updating structures</li>
2182   * <li>when creating and sending a TopologyMsg</li>
2183   * <li>when a DS status is changing (ChangeStatusMsg received or sent)...</li>
2184   * </ul>
2185   */
2186  private final ReentrantLock lock = new ReentrantLock();
2187
2188  /**
2189   * This lock is used to protect the generationId variable.
2190   */
2191  private final Object generationIDLock = new Object();
2192
2193  /**
2194   * Tests if the current thread has the lock on this domain.
2195   * @return True if the current thread has the lock.
2196   */
2197  public boolean hasLock()
2198  {
2199    return lock.getHoldCount() > 0;
2200  }
2201
2202  /**
2203   * Takes the lock on this domain (blocking until lock can be acquired) or
2204   * calling thread is interrupted.
2205   * @throws java.lang.InterruptedException If interrupted.
2206   */
2207  public void lock() throws InterruptedException
2208  {
2209    lock.lockInterruptibly();
2210  }
2211
2212  /**
2213   * Releases the lock on this domain.
2214   */
2215  public void release()
2216  {
2217    lock.unlock();
2218  }
2219
2220  /**
2221   * Tries to acquire the lock on the domain within a given amount of time.
2222   * @param timeout The amount of milliseconds to wait for acquiring the lock.
2223   * @return True if the lock was acquired, false if timeout occurred.
2224   * @throws java.lang.InterruptedException When call was interrupted.
2225   */
2226  public boolean tryLock(long timeout) throws InterruptedException
2227  {
2228    return lock.tryLock(timeout, TimeUnit.MILLISECONDS);
2229  }
2230
2231  /**
2232   * Starts the monitoring publisher for the domain if not already started.
2233   */
2234  private void startMonitoringPublisher()
2235  {
2236    long period = localReplicationServer.getMonitoringPublisherPeriod();
2237    if (period > 0) // 0 means no monitoring publisher
2238    {
2239      final MonitoringPublisher thread = new MonitoringPublisher(this, period);
2240      if (monitoringPublisher.compareAndSet(null, thread))
2241      {
2242        thread.start();
2243      }
2244    }
2245  }
2246
2247  /**
2248   * Stops the monitoring publisher for the domain.
2249   */
2250  private void stopMonitoringPublisher()
2251  {
2252    final MonitoringPublisher thread = monitoringPublisher.get();
2253    if (thread != null && monitoringPublisher.compareAndSet(thread, null))
2254    {
2255      thread.shutdown();
2256      thread.waitForShutdown();
2257    }
2258  }
2259
2260  /** {@inheritDoc} */
2261  @Override
2262  public void initializeMonitorProvider(MonitorProviderCfg configuraiton)
2263  {
2264    // Nothing to do for now
2265  }
2266
2267  /** {@inheritDoc} */
2268  @Override
2269  public String getMonitorInstanceName()
2270  {
2271    return "Replication server RS(" + localReplicationServer.getServerId()
2272        + ") " + localReplicationServer.getServerURL() + ",cn="
2273        + baseDN.toString().replace(',', '_').replace('=', '_')
2274        + ",cn=Replication";
2275  }
2276
2277  @Override
2278  public MonitorData getMonitorData()
2279  {
2280    int serverId = localReplicationServer.getServerId();
2281
2282    final MonitorData attributes = new MonitorData(5);
2283    attributes.add("replication-server-id", serverId);
2284    attributes.add("replication-server-port", localReplicationServer.getReplicationPort());
2285    attributes.add("domain-name", baseDN);
2286    attributes.add("generation-id", baseDN + " " + generationId);
2287    attributes.add("missing-changes", getDomainMonitorData().getMissingChangesRS(serverId));
2288    return attributes;
2289  }
2290
2291  /**
2292   * Returns the oldest known state for the domain, made of the oldest CSN
2293   * stored for each serverId.
2294   * <p>
2295   * Note: Because the replication changelogDB trimming always keep one change
2296   * whatever its date, the CSN contained in the returned state can be very old.
2297   *
2298   * @return the start state of the domain.
2299   */
2300  public ServerState getOldestState()
2301  {
2302    return domainDB.getDomainOldestCSNs(baseDN);
2303  }
2304
2305  private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg)
2306  {
2307    for (int i = 1; i <= 2; i++)
2308    {
2309      if (!handler.shuttingDown()
2310          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
2311      {
2312        try
2313        {
2314          handler.sendTopoInfo(msg);
2315          break;
2316        }
2317        catch (IOException e)
2318        {
2319          if (i == 2)
2320          {
2321            logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
2322                baseDN, type, handler.getServerId(), e.getMessage());
2323          }
2324        }
2325      }
2326      sleep(100);
2327    }
2328  }
2329
2330
2331
2332  /**
2333   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
2334   * value received, and forwarding the message to the other RSes.
2335   * @param senderHandler The handler for the server that sent the heartbeat.
2336   * @param msg The message to process.
2337   * @throws DirectoryException
2338   *           if a problem occurs
2339   */
2340  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
2341      ChangeTimeHeartbeatMsg msg) throws DirectoryException
2342  {
2343    try
2344    {
2345      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
2346    }
2347    catch (ChangelogException e)
2348    {
2349      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e
2350          .getMessageObject(), e);
2351    }
2352
2353    if (senderHandler.isDataServer())
2354    {
2355      /*
2356       * If we are the first replication server warned, then forward the message
2357       * to the remote replication servers.
2358       */
2359      synchronized (pendingStatusMessagesLock)
2360      {
2361        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
2362      }
2363      statusAnalyzer.notifyPendingStatusMessage();
2364    }
2365  }
2366
2367  /**
2368   * Return the monitor instance name of the ReplicationServer that created the
2369   * current instance.
2370   *
2371   * @return the monitor instance name of the ReplicationServer that created the
2372   *         current instance.
2373   */
2374  String getLocalRSMonitorInstanceName()
2375  {
2376    return this.localReplicationServer.getMonitorInstanceName();
2377  }
2378
2379  /**
2380   * Return the serverId of the ReplicationServer that created the current
2381   * instance.
2382   *
2383   * @return the serverId of the ReplicationServer that created the current
2384   *         instance.
2385   */
2386  int getLocalRSServerId()
2387  {
2388    return this.localReplicationServer.getServerId();
2389  }
2390
2391  /**
2392   * Update the monitoring publisher with the new period value.
2393   *
2394   * @param period
2395   *          The new period value.
2396   */
2397  void updateMonitoringPeriod(long period)
2398  {
2399    if (period == 0)
2400    {
2401      // Requested to stop monitoring publishers
2402      stopMonitoringPublisher();
2403      return;
2404    }
2405
2406    final MonitoringPublisher mpThread = monitoringPublisher.get();
2407    if (mpThread != null) // it is running
2408    {
2409      mpThread.setPeriod(period);
2410    }
2411    else if (!connectedDSs.isEmpty() || !connectedRSs.isEmpty())
2412    {
2413      // Requested to start monitoring publishers with provided period value
2414      startMonitoringPublisher();
2415    }
2416  }
2417
2418  /**
2419   * Registers a DS handler into this domain and notifies the domain about the
2420   * new DS.
2421   *
2422   * @param dsHandler
2423   *          The Directory Server Handler to register
2424   */
2425  public void register(DataServerHandler dsHandler)
2426  {
2427    startMonitoringPublisher();
2428
2429    // connected with new DS: store handler.
2430    connectedDSs.put(dsHandler.getServerId(), dsHandler);
2431
2432    // Tell peer RSs and DSs a new DS just connected to us
2433    // No need to re-send TopologyMsg to this just new DS
2434    enqueueTopoInfoToAllExcept(dsHandler);
2435  }
2436
2437  /**
2438   * Registers the RS handler into this domain and notifies the domain.
2439   *
2440   * @param rsHandler
2441   *          The Replication Server Handler to register
2442   */
2443  public void register(ReplicationServerHandler rsHandler)
2444  {
2445    startMonitoringPublisher();
2446
2447    // connected with new RS (either outgoing or incoming
2448    // connection): store handler.
2449    connectedRSs.put(rsHandler.getServerId(), rsHandler);
2450  }
2451
2452  private void debug(String message)
2453  {
2454    logger.trace("In ReplicationServerDomain serverId="
2455        + localReplicationServer.getServerId() + " for baseDN=" + baseDN
2456        + " and port=" + localReplicationServer.getReplicationPort()
2457        + ": " + message);
2458  }
2459
2460
2461
2462  /**
2463   * Go through each connected DS, get the number of pending changes we have for
2464   * it and change status accordingly if threshold value is crossed/uncrossed.
2465   */
2466  void checkDSDegradedStatus()
2467  {
2468    final int degradedStatusThreshold = localReplicationServer
2469        .getDegradedStatusThreshold();
2470    // Threshold value = 0 means no status analyzer (no degrading system)
2471    // we should not have that as the status analyzer thread should not be
2472    // created if this is the case, but for sanity purpose, we add this
2473    // test
2474    if (degradedStatusThreshold > 0)
2475    {
2476      for (DataServerHandler serverHandler : connectedDSs.values())
2477      {
2478        // Get number of pending changes for this server
2479        final int nChanges = serverHandler.getRcvMsgQueueSize();
2480        if (logger.isTraceEnabled())
2481        {
2482          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
2483              + getBaseDN() + ": " + "Status analyzer: DS "
2484              + serverHandler.getServerId() + " has " + nChanges
2485              + " message(s) in writer queue.");
2486        }
2487
2488        // Check status to know if it is relevant to change the status. Do not
2489        // take RSD lock to test. If we attempt to change the status whereas
2490        // the current status does allow it, this will be noticed by
2491        // the changeStatusFromStatusAnalyzer() method. This allows to take the
2492        // lock roughly only when needed versus every sleep time timeout.
2493        if (nChanges >= degradedStatusThreshold)
2494        {
2495          if (serverHandler.getStatus() == NORMAL_STATUS
2496              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
2497          {
2498            break; // Interrupted.
2499          }
2500        }
2501        else
2502        {
2503          if (serverHandler.getStatus() == DEGRADED_STATUS
2504              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
2505          {
2506            break; // Interrupted.
2507          }
2508        }
2509      }
2510    }
2511  }
2512
2513
2514
2515  /**
2516   * Sends any enqueued status messages to the rest of the topology.
2517   */
2518  void sendPendingStatusMessages()
2519  {
2520    /*
2521     * Take a snapshot of pending status notifications in order to avoid holding
2522     * the broadcast lock for too long. In addition, clear the notifications so
2523     * that they are not resent the next time.
2524     */
2525    final PendingStatusMessages savedState;
2526    synchronized (pendingStatusMessagesLock)
2527    {
2528      savedState = pendingStatusMessages;
2529      pendingStatusMessages = new PendingStatusMessages();
2530    }
2531    sendPendingChangeTimeHeartbeatMsgs(savedState);
2532    sendPendingTopologyMsgs(savedState);
2533    sendPendingMonitorMsgs(savedState);
2534  }
2535
2536
2537
2538  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
2539  {
2540    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
2541        .entrySet())
2542    {
2543      ServerHandler ds = connectedDSs.get(msg.getKey());
2544      if (ds != null)
2545      {
2546        try
2547        {
2548          ds.send(msg.getValue());
2549        }
2550        catch (IOException e)
2551        {
2552          // Ignore: connection closed.
2553        }
2554      }
2555    }
2556    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
2557        .entrySet())
2558    {
2559      ServerHandler rs = connectedRSs.get(msg.getKey());
2560      if (rs != null)
2561      {
2562        try
2563        {
2564          rs.send(msg.getValue());
2565        }
2566        catch (IOException e)
2567        {
2568          // We log the error. The requestor will detect a timeout or
2569          // any other failure on the connection.
2570
2571          // FIXME: why do we log for RSs but not DSs?
2572          logger.traceException(e);
2573          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue().getDestination());
2574        }
2575      }
2576    }
2577  }
2578
2579
2580
2581  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
2582  {
2583    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats.values())
2584    {
2585      for (ReplicationServerHandler rsHandler : connectedRSs.values())
2586      {
2587        try
2588        {
2589          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
2590          {
2591            rsHandler.send(pendingHeartbeat);
2592          }
2593        }
2594        catch (IOException e)
2595        {
2596          logger.traceException(e);
2597          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
2598              + localReplicationServer.getReplicationPort() + " " + baseDN
2599              + " " + localReplicationServer.getServerId());
2600          stopServer(rsHandler, false);
2601        }
2602      }
2603    }
2604  }
2605
2606
2607
2608  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
2609  {
2610    if (pendingMsgs.sendDSTopologyMsg)
2611    {
2612      for (ServerHandler handler : connectedDSs.values())
2613      {
2614        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
2615        {
2616          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
2617              .getServerId());
2618          sendTopologyMsg("directory", handler, topoMsg);
2619        }
2620      }
2621    }
2622
2623    if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty())
2624    {
2625      final TopologyMsg topoMsg = createTopologyMsgForRS();
2626      for (ServerHandler handler : connectedRSs.values())
2627      {
2628        sendTopologyMsg("replication", handler, topoMsg);
2629      }
2630    }
2631  }
2632
2633
2634
2635  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
2636  {
2637    /*
2638     * If the request comes from a Directory Server we need to build the full
2639     * list of all servers in the topology and send back a MonitorMsg with the
2640     * full list of all the servers in the topology.
2641     */
2642    if (sender.isDataServer())
2643    {
2644      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
2645          msg.getDestination(), msg.getSenderID(),
2646          domainMonitor.getMonitorData());
2647      synchronized (pendingStatusMessagesLock)
2648      {
2649        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
2650            monitorMsg);
2651      }
2652    }
2653    else
2654    {
2655      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
2656          msg.getDestination(), msg.getSenderID());
2657      synchronized (pendingStatusMessagesLock)
2658      {
2659        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
2660            monitorMsg);
2661      }
2662    }
2663    statusAnalyzer.notifyPendingStatusMessage();
2664  }
2665}