001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2008-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.service;
018
019import static org.opends.messages.ReplicationMessages.*;
020import static org.opends.server.replication.common.AssuredMode.*;
021import static org.opends.server.replication.common.StatusMachine.*;
022import static org.opends.server.util.CollectionUtils.*;
023
024import java.io.BufferedOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.SocketTimeoutException;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.Date;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicReference;
042
043import net.jcip.annotations.Immutable;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigException;
048import org.forgerock.opendj.ldap.ResultCode;
049import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.AssuredType;
050import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
051import org.opends.server.api.DirectoryThread;
052import org.opends.server.api.MonitorData;
053import org.opends.server.backends.task.Task;
054import org.opends.server.replication.common.AssuredMode;
055import org.opends.server.replication.common.CSN;
056import org.opends.server.replication.common.CSNGenerator;
057import org.opends.server.replication.common.DSInfo;
058import org.opends.server.replication.common.RSInfo;
059import org.opends.server.replication.common.ServerState;
060import org.opends.server.replication.common.ServerStatus;
061import org.opends.server.replication.common.StatusMachine;
062import org.opends.server.replication.common.StatusMachineEvent;
063import org.opends.server.replication.protocol.AckMsg;
064import org.opends.server.replication.protocol.ChangeStatusMsg;
065import org.opends.server.replication.protocol.DoneMsg;
066import org.opends.server.replication.protocol.EntryMsg;
067import org.opends.server.replication.protocol.ErrorMsg;
068import org.opends.server.replication.protocol.HeartbeatMsg;
069import org.opends.server.replication.protocol.InitializeRcvAckMsg;
070import org.opends.server.replication.protocol.InitializeRequestMsg;
071import org.opends.server.replication.protocol.InitializeTargetMsg;
072import org.opends.server.replication.protocol.ProtocolVersion;
073import org.opends.server.replication.protocol.ReplSessionSecurity;
074import org.opends.server.replication.protocol.ReplicationMsg;
075import org.opends.server.replication.protocol.ResetGenerationIdMsg;
076import org.opends.server.replication.protocol.RoutableMsg;
077import org.opends.server.replication.protocol.TopologyMsg;
078import org.opends.server.replication.protocol.UpdateMsg;
079import org.opends.server.tasks.InitializeTargetTask;
080import org.opends.server.tasks.InitializeTask;
081import org.forgerock.opendj.ldap.DN;
082import org.opends.server.types.DirectoryException;
083
084/**
085 * This class should be used as a base for Replication implementations.
086 * <p>
087 * It is intended that developer in need of a replication mechanism
088 * subclass this class with their own implementation.
089 * <p>
090 *   The startup phase of the ReplicationDomain subclass,
091 *   should read the list of replication servers from the configuration,
092 *   instantiate a {@link ServerState} then start the publish service
093 *   by calling {@link #startPublishService()}.
094 *   At this point it can start calling the {@link #publish(UpdateMsg)}
095 *   method if needed.
096 * <p>
097 *   When the startup phase reach the point when the subclass is ready
098 *   to handle updates the Replication Domain implementation should call the
099 *   {@link #startListenService()} method.
100 *   At this point a Listener thread is created on the Replication Service
101 *   and which can start receiving updates.
102 * <p>
103 *   When updates are received the Replication Service calls the
104 *   {@link #processUpdate(UpdateMsg)} method.
105 *   ReplicationDomain implementation should implement the appropriate code
106 *   for replaying the update on the local repository.
107 *   When fully done the subclass must call the
108 *   {@link #processUpdateDone(UpdateMsg, String)} method.
109 *   This allows to process the update asynchronously if necessary.
110 *
111 * <p>
112 *   To propagate changes to other replica, a ReplicationDomain implementation
113 *   must use the {@link #publish(UpdateMsg)} method.
114 * <p>
115 *   If the Full Initialization process is needed then implementation
116 *   for {@code importBackend(InputStream)} and
117 *   {@code exportBackend(OutputStream)} must be
118 *   provided.
119 * <p>
120 *   Full Initialization of a replica can be triggered by LDAP clients
121 *   by creating InitializeTasks or InitializeTargetTask.
122 *   Full initialization can also be triggered from the ReplicationDomain
123 *   implementation using methods {@link #initializeRemote(int, Task)}
124 *   or {@link #initializeFromRemote(int, Task)}.
125 * <p>
126 *   At shutdown time, the {@link #disableService()} method should be called to
127 *   cleanly stop the replication service.
128 */
129public abstract class ReplicationDomain
130{
131
132  /** Contains all the attributes included for the ECL (External Changelog). */
133  @Immutable
134  private static final class ECLIncludes
135  {
136    final Map<Integer, Set<String>> includedAttrsByServer;
137    final Set<String> includedAttrsAllServers;
138
139    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
140    final Set<String> includedAttrsForDeletesAllServers;
141
142    private ECLIncludes(
143        Map<Integer, Set<String>> includedAttrsByServer,
144        Set<String> includedAttrsAllServers,
145        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
146        Set<String> includedAttrsForDeletesAllServers)
147    {
148      this.includedAttrsByServer = includedAttrsByServer;
149      this.includedAttrsAllServers = includedAttrsAllServers;
150
151      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
152      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
153    }
154
155    @SuppressWarnings("unchecked")
156    public ECLIncludes()
157    {
158      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
159          Collections.EMPTY_SET);
160    }
161
162    /**
163     * Add attributes to be included in the ECL.
164     *
165     * @param serverId
166     *          Server where these attributes are configured.
167     * @param includeAttributes
168     *          Attributes to be included with all change records, may include
169     *          wild-cards.
170     * @param includeAttributesForDeletes
171     *          Additional attributes to be included with delete change records,
172     *          may include wild-cards.
173     * @return a new {@link ECLIncludes} object if included attributes have
174     *         changed, or the current object otherwise.
175     */
176    public ECLIncludes addIncludedAttributes(int serverId,
177        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
178    {
179      boolean configurationChanged = false;
180
181      Set<String> s1 = new HashSet<>(includeAttributes);
182
183      // Combine all+delete attributes.
184      Set<String> s2 = new HashSet<>(s1);
185      s2.addAll(includeAttributesForDeletes);
186
187      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
188      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
189      {
190        configurationChanged = true;
191        eclIncludesByServer = new HashMap<>(this.includedAttrsByServer);
192        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
193      }
194
195      Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer;
196      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
197      {
198        configurationChanged = true;
199        eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer);
200        eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2));
201      }
202
203      if (!configurationChanged)
204      {
205        return this;
206      }
207
208      // and rebuild the global list to be ready for usage
209      Set<String> eclIncludesAllServer = new HashSet<>();
210      for (Set<String> attributes : eclIncludesByServer.values())
211      {
212        eclIncludesAllServer.addAll(attributes);
213      }
214
215      Set<String> eclIncludesForDeletesAllServer = new HashSet<>();
216      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
217      {
218        eclIncludesForDeletesAllServer.addAll(attributes);
219      }
220      return new ECLIncludes(eclIncludesByServer,
221          Collections.unmodifiableSet(eclIncludesAllServer),
222          eclIncludesForDeletesByServer,
223          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
224    }
225  }
226
227  /**
228   * Current status for this replicated domain.
229   */
230  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
231  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
232
233  /** The configuration of the replication domain. */
234  protected volatile ReplicationDomainCfg config;
235  /**
236   * The assured configuration of the replication domain. It is a duplicate of
237   * {@link #config} because of its update model.
238   *
239   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
240   */
241  private volatile ReplicationDomainCfg assuredConfig;
242
243  /**
244   * The ReplicationBroker that is used by this ReplicationDomain to
245   * connect to the ReplicationService.
246   */
247  protected ReplicationBroker broker;
248
249  /**
250   * This Map is used to store all outgoing assured messages in order
251   * to be able to correlate all the coming back acks to the original
252   * operation.
253   */
254  private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>();
255  /**
256   * The context related to an import or export being processed
257   * Null when none is being processed.
258   */
259  private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>();
260
261  /**
262   * The Thread waiting for incoming update messages for this domain and pushing
263   * them to the global incoming update message queue for later processing by
264   * replay threads.
265   */
266  private volatile DirectoryThread listenerThread;
267
268  /** A set of counters used for Monitoring. */
269  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
270  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
271  private AtomicInteger numSentUpdates = new AtomicInteger(0);
272
273  /** Assured replication monitoring counters. */
274
275  /** Number of updates sent in Assured Mode, Safe Read. */
276  private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0);
277  /**
278   * Number of updates sent in Assured Mode, Safe Read, that have been
279   * successfully acknowledged.
280   */
281  private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0);
282  /**
283   * Number of updates sent in Assured Mode, Safe Read, that have not been
284   * successfully acknowledged (either because of timeout, wrong status or error
285   * at replay).
286   */
287  private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
288  /**
289   * Number of updates sent in Assured Mode, Safe Read, that have not been
290   * successfully acknowledged because of timeout.
291   */
292  private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0);
293  /**
294   * Number of updates sent in Assured Mode, Safe Read, that have not been
295   * successfully acknowledged because of wrong status.
296   */
297  private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0);
298  /**
299   * Number of updates sent in Assured Mode, Safe Read, that have not been
300   * successfully acknowledged because of replay error.
301   */
302  private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0);
303  /**
304   * Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
305   * that have not been successfully acknowledged (either because of timeout,
306   * wrong status or error at replay) for a particular server (DS or RS).
307   * <p>
308   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
309   */
310  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>();
311  /** Number of updates received in Assured Mode, Safe Read request. */
312  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
313  /**
314   * Number of updates received in Assured Mode, Safe Read request that we have
315   * acked without errors.
316   */
317  private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
318  /**
319   * Number of updates received in Assured Mode, Safe Read request that we have
320   * acked with errors.
321   */
322  private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
323  /** Number of updates sent in Assured Mode, Safe Data. */
324  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
325  /**
326   * Number of updates sent in Assured Mode, Safe Data, that have been
327   * successfully acknowledged.
328   */
329  private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0);
330  /**
331   * Number of updates sent in Assured Mode, Safe Data, that have not been
332   * successfully acknowledged because of timeout.
333   */
334  private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0);
335  /**
336   * Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
337   * that have not been successfully acknowledged because of timeout for a
338   * particular RS.
339   * <p>
340   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
341   */
342  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>();
343
344  /* Status related monitoring fields */
345
346  /**
347   * Indicates the date when the status changed. This may be used to indicate
348   * the date the session with the current replication server started (when
349   * status is NORMAL for instance). All the above assured monitoring fields
350   * are also reset each time the status is changed
351   */
352  private Date lastStatusChangeDate = new Date();
353
354  /**
355   * The state maintained by the Concrete Class.
356   */
357  private final ServerState state;
358
359  /**
360   * The generator that will be used to generate {@link CSN}
361   * for this domain.
362   */
363  private final CSNGenerator generator;
364
365  private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes());
366
367  /**
368   * An object used to protect the initialization of the underlying broker
369   * session of this ReplicationDomain.
370   */
371  private final Object sessionLock = new Object();
372
373  /**
374   * The generationId for this replication domain. It is made of a hash of the
375   * 1000 first entries for this domain.
376   */
377  protected volatile long generationId;
378
379  /**
380   * Returns the {@link CSNGenerator} that will be used to
381   * generate {@link CSN} for this domain.
382   *
383   * @return The {@link CSNGenerator} that will be used to
384   *         generate {@link CSN} for this domain.
385   */
386  public CSNGenerator getGenerator()
387  {
388    return generator;
389  }
390
391  /**
392   * Creates a ReplicationDomain with the provided parameters.
393   *
394   * @param config
395   *          The configuration object for this ReplicationDomain
396   * @param generationId
397   *          the generation of this ReplicationDomain
398   */
399  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
400  {
401    this(config, generationId, new ServerState());
402  }
403
404  /**
405   * Creates a ReplicationDomain with the provided parameters. (for unit test
406   * purpose only)
407   *
408   * @param config
409   *          The configuration object for this ReplicationDomain
410   * @param generationId
411   *          the generation of this ReplicationDomain
412   * @param serverState
413   *          The serverState to use
414   */
415  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
416      ServerState serverState)
417  {
418    this.config = config;
419    this.assuredConfig = config;
420    this.generationId = generationId;
421    this.state = serverState;
422    this.generator = new CSNGenerator(getServerId(), state);
423  }
424
425  /**
426   * Set the initial status of the domain and perform necessary initializations.
427   * This method will be called by the Broker each time the ReplicationBroker
428   * establish a new session to a Replication Server.
429   *
430   * Implementations may override this method when they need to perform
431   * additional computing after session establishment.
432   * The default implementation should be sufficient for ReplicationDomains
433   * that don't need to perform additional computing.
434   *
435   * @param initStatus              The status to enter the state machine with.
436   * @param rsState                 The ServerState of the ReplicationServer
437   *                                with which the session was established.
438   */
439  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
440  {
441    // Sanity check: is it a valid initial status?
442    if (!isValidInitialStatus(initStatus))
443    {
444      logger.error(ERR_DS_INVALID_INIT_STATUS, initStatus, getBaseDN(), getServerId());
445    }
446    else
447    {
448      status = initStatus;
449    }
450    generator.adjust(state);
451    generator.adjust(rsState);
452  }
453
454  /**
455   * Processes an incoming ChangeStatusMsg. Compute new status according to
456   * given order. Then update domain for being compliant with new status
457   * definition.
458   * @param csMsg The received status message
459   */
460  private void receiveChangeStatus(ChangeStatusMsg csMsg)
461  {
462    if (logger.isTraceEnabled())
463    {
464      logger.trace("Replication domain " + getBaseDN() +
465        " received change status message:\n" + csMsg);
466    }
467
468    ServerStatus reqStatus = csMsg.getRequestedStatus();
469
470    // Translate requested status to a state machine event
471    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
472    if (event == StatusMachineEvent.INVALID_EVENT)
473    {
474      logger.error(ERR_DS_INVALID_REQUESTED_STATUS, reqStatus, getBaseDN(), getServerId());
475      return;
476    }
477
478    // Set the new status to the requested one
479    setNewStatus(event);
480  }
481
482  /**
483   * Called when first connection or disconnection detected.
484   */
485  void toNotConnectedStatus()
486  {
487    // Go into not connected status
488    setNewStatus(StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT);
489  }
490
491  /**
492   * Perform whatever actions are needed to apply properties for being
493   * compliant with new status. Must be called in synchronized section for
494   * status. The new status is already set in status variable.
495   */
496  private void updateDomainForNewStatus()
497  {
498    switch (status)
499    {
500      case FULL_UPDATE_STATUS:
501        // Signal RS we just entered the full update status
502        broker.signalStatusChange(status);
503        break;
504      case NOT_CONNECTED_STATUS:
505      case NORMAL_STATUS:
506      case DEGRADED_STATUS:
507      case BAD_GEN_ID_STATUS:
508        break;
509      default:
510        if (logger.isTraceEnabled())
511        {
512          logger.trace("updateDomainForNewStatus: unexpected status: " + status);
513        }
514    }
515  }
516
517  /**
518   * Gets the status for this domain.
519   * @return The status for this domain.
520   */
521  public ServerStatus getStatus()
522  {
523    return status;
524  }
525
526  /**
527   * Returns the base DN of this ReplicationDomain. All Replication Domain using
528   * this baseDN will be connected through the Replication Service.
529   *
530   * @return The base DN of this ReplicationDomain
531   */
532  public DN getBaseDN()
533  {
534    return config.getBaseDN();
535  }
536
537  /**
538   * Get the server ID. The identifier of this Replication Domain inside the
539   * Replication Service. Each Domain must use a unique ServerID.
540   *
541   * @return The server ID.
542   */
543  public int getServerId()
544  {
545    return config.getServerId();
546  }
547
548  /**
549   * Window size used during initialization .. between - the
550   * initializer/exporter DS that listens/waits acknowledges and that slows down
551   * data msg publishing based on the slowest server - and each
552   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
553   * received.
554   *
555   * @return the initWindow
556   */
557  protected int getInitWindow()
558  {
559    return config.getInitializationWindowSize();
560  }
561
562  /**
563   * Tells if assured replication is enabled for this domain.
564   * @return True if assured replication is enabled for this domain.
565   */
566  public boolean isAssured()
567  {
568    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
569        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
570  }
571
572  /**
573   * Gives the mode for the assured replication of the domain. Only used when
574   * assured is true).
575   *
576   * @return The mode for the assured replication of the domain.
577   */
578  public AssuredMode getAssuredMode()
579  {
580    switch (assuredConfig.getAssuredType())
581    {
582    case SAFE_DATA:
583    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
584      return AssuredMode.SAFE_DATA_MODE;
585    case SAFE_READ:
586      return AssuredMode.SAFE_READ_MODE;
587    }
588    return null; // should never happen
589  }
590
591  /**
592   * Gives the assured Safe Data level of the replication of the domain. (used
593   * when assuredMode is SAFE_DATA).
594   *
595   * @return The assured level of the replication of the domain.
596   */
597  public byte getAssuredSdLevel()
598  {
599    return (byte) assuredConfig.getAssuredSdLevel();
600  }
601
602  /**
603   * Gives the assured timeout of the replication of the domain (in ms).
604   * @return The assured timeout of the replication of the domain.
605   */
606  public long getAssuredTimeout()
607  {
608    return assuredConfig.getAssuredTimeout();
609  }
610
611  /**
612   * Gets the group id for this domain.
613   * @return The group id for this domain.
614   */
615  public byte getGroupId()
616  {
617    return (byte) config.getGroupId();
618  }
619
620  /**
621   * Gets the referrals URLs this domain publishes. Referrals urls to be
622   * published to other servers of the topology.
623   * <p>
624   * TODO: fill that with all currently opened urls if no urls configured
625   *
626   * @return The referrals URLs this domain publishes.
627   */
628  public Set<String> getRefUrls()
629  {
630    return config.getReferralsUrl();
631  }
632
633  /**
634   * Gets the info for Replicas in the topology (except us).
635   * @return The info for Replicas in the topology (except us)
636   */
637  public Map<Integer, DSInfo> getReplicaInfos()
638  {
639    return broker.getReplicaInfos();
640  }
641
642  /**
643   * Returns information about the DS server related to the provided serverId.
644   * based on the TopologyMsg we received when the remote replica connected or
645   * disconnected. Return null when no server with the provided serverId is
646   * connected.
647   *
648   * @param  dsId The provided serverId of the remote replica
649   * @return the info related to this remote server if it is connected,
650   *                  null is the server is NOT connected.
651   */
652  private DSInfo getConnectedRemoteDS(int dsId)
653  {
654    return getReplicaInfos().get(dsId);
655  }
656
657  /**
658   * Gets the States of all the Replicas currently in the
659   * Topology.
660   * When this method is called, a Monitoring message will be sent
661   * to the Replication Server to which this domain is currently connected
662   * so that it computes a table containing information about
663   * all Directory Servers in the topology.
664   * This Computation involves communications will all the servers
665   * currently connected and
666   *
667   * @return The States of all Replicas in the topology (except us)
668   */
669  public Map<Integer, ServerState> getReplicaStates()
670  {
671    return broker.getReplicaStates();
672  }
673
674  /**
675   * Gets the info for RSs in the topology (except the one we are connected
676   * to).
677   * @return The info for RSs in the topology (except the one we are connected
678   * to)
679   */
680  public List<RSInfo> getRsInfos()
681  {
682    return broker.getRsInfos();
683  }
684
685
686  /**
687   * Gets the server ID of the Replication Server to which the domain
688   * is currently connected.
689   *
690   * @return The server ID of the Replication Server to which the domain
691   *         is currently connected.
692   */
693  public int getRsServerId()
694  {
695    return broker.getRsServerId();
696  }
697
698  /**
699   * Increment the number of processed updates.
700   */
701  private void incProcessedUpdates()
702  {
703    numProcessedUpdates.incrementAndGet();
704  }
705
706  /**
707   * Get the number of updates replayed by the replication.
708   *
709   * @return The number of updates replayed by the replication
710   */
711  int getNumProcessedUpdates()
712  {
713    if (numProcessedUpdates != null)
714    {
715      return numProcessedUpdates.get();
716    }
717    return 0;
718  }
719
720  /**
721   * Get the number of updates received by the replication plugin.
722   *
723   * @return the number of updates received
724   */
725  int getNumRcvdUpdates()
726  {
727    if (numRcvdUpdates != null)
728    {
729      return numRcvdUpdates.get();
730    }
731    return 0;
732  }
733
734  /**
735   * Get the number of updates sent by the replication plugin.
736   *
737   * @return the number of updates sent
738   */
739  int getNumSentUpdates()
740  {
741    if (numSentUpdates != null)
742    {
743      return numSentUpdates.get();
744    }
745    return 0;
746  }
747
748  /**
749   * Receives an update message from the replicationServer.
750   * The other types of messages are processed in an opaque way for the caller.
751   * Also responsible for updating the list of pending changes
752   * @return the received message - null if none
753   */
754  private UpdateMsg receive()
755  {
756    UpdateMsg update = null;
757
758    while (update == null)
759    {
760      InitializeRequestMsg initReqMsg = null;
761      ReplicationMsg msg;
762      try
763      {
764        msg = broker.receive(true, true, false);
765        if (msg == null)
766        {
767          // The server is in the shutdown process
768          return null;
769        }
770
771        if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg))
772        {
773          logger.trace("LocalizableMessage received <" + msg + ">");
774        }
775
776        if (msg instanceof AckMsg)
777        {
778          AckMsg ack = (AckMsg) msg;
779          receiveAck(ack);
780        }
781        else if (msg instanceof InitializeRequestMsg)
782        {
783          // Another server requests us to provide entries
784          // for a total update
785          initReqMsg = (InitializeRequestMsg)msg;
786        }
787        else if (msg instanceof InitializeTargetMsg)
788        {
789          // Another server is exporting its entries to us
790          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
791
792          /*
793          This must be done while we are still holding the broker lock
794          because we are now going to receive a bunch of entries from the
795          remote server and we want the import thread to catch them and
796          not the ListenerThread.
797          */
798          initialize(initTargetMsg, initTargetMsg.getSenderID());
799        }
800        else if (msg instanceof ErrorMsg)
801        {
802          ErrorMsg errorMsg = (ErrorMsg)msg;
803          ImportExportContext ieCtx = importExportContext.get();
804          if (ieCtx != null)
805          {
806            /*
807            This is an error termination for the 2 following cases :
808            - either during an export
809            - or before an import really started
810            For example, when we publish a request and the
811            replicationServer did not find the import source.
812
813            A remote error during the import will be received in the
814            receiveEntryBytes() method.
815            */
816            if (logger.isTraceEnabled())
817            {
818              logger.trace(
819                  "[IE] processErrorMsg:" + getServerId() +
820                  " baseDN: " + getBaseDN() +
821                  " Error Msg received: " + errorMsg);
822            }
823
824            if (errorMsg.getCreationTime() > ieCtx.startTime)
825            {
826              // consider only ErrorMsg that relate to the current import/export
827              processErrorMsg(errorMsg, ieCtx);
828            }
829            else
830            {
831              /*
832              Simply log - happen when the ErrorMsg relates to a previous
833              attempt of initialization while we have started a new one
834              on this side.
835              */
836              logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
837            }
838          }
839          else
840          {
841            // Simply log - happen if import/export has been terminated
842            // on our side before receiving this ErrorMsg.
843            logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
844          }
845        }
846        else if (msg instanceof ChangeStatusMsg)
847        {
848          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
849          receiveChangeStatus(csMsg);
850        }
851        else if (msg instanceof UpdateMsg)
852        {
853          update = (UpdateMsg) msg;
854          generator.adjust(update.getCSN());
855        }
856        else if (msg instanceof InitializeRcvAckMsg)
857        {
858          ImportExportContext ieCtx = importExportContext.get();
859          if (ieCtx != null)
860          {
861            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
862            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
863          }
864          // Trash this msg When no input/export is running/should never happen
865        }
866      }
867      catch (SocketTimeoutException e)
868      {
869        // just retry
870      }
871      /*
872      Test if we have received and export request message and
873      if that's the case handle it now.
874      This must be done outside of the portion of code protected
875      by the broker lock so that we keep receiving update
876      when we are doing and export and so that a possible
877      closure of the socket happening when we are publishing the
878      entries to the remote can be handled by the other
879      replay thread when they call this method and therefore the
880      broker.receive() method.
881      */
882      if (initReqMsg != null)
883      {
884        // Do this work in a thread to allow replay thread continue working
885        ExportThread exportThread = new ExportThread(
886            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
887        exportThread.start();
888      }
889    }
890
891    numRcvdUpdates.incrementAndGet();
892    if (update.isAssured()
893        && broker.getRsGroupId() == getGroupId()
894        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
895    {
896      assuredSrReceivedUpdates.incrementAndGet();
897    }
898    return update;
899  }
900
901  /**
902   * Updates the passed monitoring list of errors received for assured messages
903   * (safe data or safe read, depending of the passed list to update) for a
904   * particular server in the list. This increments the counter of error for the
905   * passed server, or creates an initial value of 1 error for it if the server
906   * is not yet present in the map.
907   * @param errorsByServer map of number of errors per serverID
908   * @param sid the ID of the server which produced an error
909   */
910  private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
911    Integer sid)
912  {
913    synchronized (errorsByServer)
914    {
915      Integer serverErrCount = errorsByServer.get(sid);
916      if (serverErrCount == null)
917      {
918        // Server not present in list, create an entry with an
919        // initial number of errors set to 1
920        errorsByServer.put(sid, 1);
921      } else
922      {
923        // Server already present in list, just increment number of
924        // errors for the server
925        int val = serverErrCount;
926        val++;
927        errorsByServer.put(sid, val);
928      }
929    }
930  }
931
932  /**
933   * Do the necessary processing when an AckMsg is received.
934   *
935   * @param ack The AckMsg that was received.
936   */
937  private void receiveAck(AckMsg ack)
938  {
939    CSN csn = ack.getCSN();
940
941    // Remove the message for pending ack list (this may already make the thread
942    // that is waiting for the ack be aware of its reception)
943    UpdateMsg update = waitingAckMsgs.remove(csn);
944
945    // Signal waiting thread ack has been received
946    if (update != null)
947    {
948      synchronized (update)
949      {
950        update.notify();
951      }
952
953      // Analyze status of embedded in the ack to see if everything went well
954      boolean hasTimeout = ack.hasTimeout();
955      boolean hasReplayErrors = ack.hasReplayError();
956      boolean hasWrongStatus = ack.hasWrongStatus();
957
958      AssuredMode updateAssuredMode = update.getAssuredMode();
959
960      if ( hasTimeout || hasReplayErrors || hasWrongStatus)
961      {
962        /*
963        Some problems detected: message did not correctly reach every
964        requested servers. Log problem
965        */
966        logger.info(NOTE_DS_RECEIVED_ACK_ERROR, getBaseDN(), getServerId(), update, ack.errorsToString());
967
968        List<Integer> failedServers = ack.getFailedServers();
969
970        // Increment assured replication monitoring counters
971        switch (updateAssuredMode)
972        {
973          case SAFE_READ_MODE:
974            assuredSrNotAcknowledgedUpdates.incrementAndGet();
975            if (hasTimeout)
976            {
977              assuredSrTimeoutUpdates.incrementAndGet();
978            }
979            if (hasReplayErrors)
980            {
981              assuredSrReplayErrorUpdates.incrementAndGet();
982            }
983            if (hasWrongStatus)
984            {
985              assuredSrWrongStatusUpdates.incrementAndGet();
986            }
987            if (failedServers != null) // This should always be the case !
988            {
989              for(Integer sid : failedServers)
990              {
991                updateAssuredErrorsByServer(
992                  assuredSrServerNotAcknowledgedUpdates, sid);
993              }
994            }
995            break;
996          case SAFE_DATA_MODE:
997            // The only possible cause of ack error in safe data mode is timeout
998            if (hasTimeout) // So should always be the case
999            {
1000              assuredSdTimeoutUpdates.incrementAndGet();
1001            }
1002            if (failedServers != null) // This should always be the case !
1003            {
1004              for(Integer sid : failedServers)
1005              {
1006                updateAssuredErrorsByServer(
1007                  assuredSdServerTimeoutUpdates, sid);
1008              }
1009            }
1010            break;
1011          default:
1012          // Should not happen
1013        }
1014      } else
1015      {
1016        // Update has been acknowledged without errors
1017        // Increment assured replication monitoring counters
1018        switch (updateAssuredMode)
1019        {
1020          case SAFE_READ_MODE:
1021            assuredSrAcknowledgedUpdates.incrementAndGet();
1022            break;
1023          case SAFE_DATA_MODE:
1024            assuredSdAcknowledgedUpdates.incrementAndGet();
1025            break;
1026          default:
1027          // Should not happen
1028        }
1029      }
1030    }
1031  }
1032
1033
1034  /*
1035   * After this point the code is related to the Total Update.
1036   */
1037
1038  /**
1039   * This thread is launched when we want to export data to another server.
1040   *
1041   * When a task is created locally (so this local server is the initiator)
1042   * of the export (Example: dsreplication initialize-all),
1043   * this thread is NOT used but the task thread is running the export instead).
1044   */
1045  private class ExportThread extends DirectoryThread
1046  {
1047    /** Id of server that will be initialized. */
1048    private final int serverIdToInitialize;
1049    private final int initWindow;
1050
1051
1052
1053    /**
1054     * Constructor for the ExportThread.
1055     *
1056     * @param serverIdToInitialize
1057     *          serverId of server that will receive entries
1058     * @param initWindow
1059     *          The value of the initialization window for flow control between
1060     *          the importer and the exporter.
1061     */
1062    public ExportThread(int serverIdToInitialize, int initWindow)
1063    {
1064      super("Export thread from serverId=" + getServerId() + " to serverId="
1065          + serverIdToInitialize);
1066      this.serverIdToInitialize = serverIdToInitialize;
1067      this.initWindow = initWindow;
1068    }
1069
1070
1071
1072    /**
1073     * Run method for this class.
1074     */
1075    @Override
1076    public void run()
1077    {
1078      if (logger.isTraceEnabled())
1079      {
1080        logger.trace("[IE] starting " + getName());
1081      }
1082      try
1083      {
1084        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
1085            initWindow);
1086      } catch (DirectoryException de)
1087      {
1088        /*
1089        An error message has been sent to the peer
1090        This server is not the initiator of the export so there is
1091        nothing more to do locally.
1092        */
1093      }
1094
1095      if (logger.isTraceEnabled())
1096      {
1097        logger.trace("[IE] ending " + getName());
1098      }
1099    }
1100  }
1101
1102  /**
1103   * This class contains the context related to an import or export launched on
1104   * the domain.
1105   */
1106  protected class ImportExportContext
1107  {
1108    /** The private task that initiated the operation. */
1109    private Task initializeTask;
1110    /** The destination in the case of an export. */
1111    private int exportTarget = RoutableMsg.UNKNOWN_SERVER;
1112    /** The source in the case of an import. */
1113    private int importSource = RoutableMsg.UNKNOWN_SERVER;
1114
1115    /** The total entry count expected to be processed. */
1116    private long entryCount;
1117    /** The count for the entry not yet processed. */
1118    private long entryLeftCount;
1119
1120    /** Exception raised during the initialization. */
1121    private DirectoryException exception;
1122
1123    /** Whether the context is related to an import or an export. */
1124    private final boolean importInProgress;
1125
1126    /** Current counter of messages exchanged during the initialization. */
1127    private int msgCnt;
1128
1129    /**
1130     * Number of connections lost when we start the initialization. Will help
1131     * counting connections lost during initialization,
1132     */
1133    private int initNumLostConnections;
1134
1135    /**
1136     * Request message sent when this server has the initializeFromRemote task.
1137     */
1138    private InitializeRequestMsg initReqMsgSent;
1139
1140    /**
1141     * Start time of the initialization process. ErrorMsg timestamped before
1142     * this startTime will be ignored.
1143     */
1144    private final long startTime;
1145
1146    /** List for replicas (DS) connected to the topology when initialization started. */
1147    private final Set<Integer> startList = new HashSet<>(0);
1148
1149    /**
1150     * List for replicas (DS) with a failure (disconnected from the topology)
1151     * since the initialization started.
1152     */
1153    private final Set<Integer> failureList = new HashSet<>(0);
1154
1155    /**
1156     * Flow control during initialization: for each remote server, counter of
1157     * messages received.
1158     */
1159    private final Map<Integer, Integer> ackVals = new HashMap<>();
1160    /** ServerId of the slowest server (the one with the smallest non null counter). */
1161    private int slowestServerId = -1;
1162
1163    private short exporterProtocolVersion = -1;
1164
1165    /** Window used during this initialization. */
1166    private int initWindow;
1167
1168    /** Number of attempt already done for this initialization. */
1169    private short attemptCnt;
1170
1171    /**
1172     * Creates a new IEContext.
1173     *
1174     * @param importInProgress true if the IEContext will be used
1175     *                         for and import, false if the IEContext
1176     *                         will be used for and export.
1177     */
1178    private ImportExportContext(boolean importInProgress)
1179    {
1180      this.importInProgress = importInProgress;
1181      this.startTime = System.currentTimeMillis();
1182      this.attemptCnt = 0;
1183    }
1184
1185    /**
1186     * Returns a boolean indicating if a total update import is currently in
1187     * Progress.
1188     *
1189     * @return A boolean indicating if a total update import is currently in
1190     *         Progress.
1191     */
1192    boolean importInProgress()
1193    {
1194      return importInProgress;
1195    }
1196
1197    /**
1198     * Returns the total number of entries to be processed when a total update
1199     * is in progress.
1200     *
1201     * @return The total number of entries to be processed when a total update
1202     *         is in progress.
1203     */
1204    long getTotalEntryCount()
1205    {
1206      return entryCount;
1207    }
1208
1209    /**
1210     * Returns the number of entries still to be processed when a total update
1211     * is in progress.
1212     *
1213     * @return The number of entries still to be processed when a total update
1214     *         is in progress.
1215     */
1216    long getLeftEntryCount()
1217    {
1218      return entryLeftCount;
1219    }
1220
1221    /**
1222     * Initializes the import/export counters with the provider value.
1223     * @param total Total number of entries to be processed.
1224     * @throws DirectoryException if an error occurred.
1225     */
1226    private void initializeCounters(long total) throws DirectoryException
1227    {
1228      entryCount = total;
1229      entryLeftCount = total;
1230
1231      if (initializeTask instanceof InitializeTask)
1232      {
1233        final InitializeTask task = (InitializeTask) initializeTask;
1234        task.setTotal(entryCount);
1235        task.setLeft(entryCount);
1236      }
1237      else if (initializeTask instanceof InitializeTargetTask)
1238      {
1239        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
1240        task.setTotal(entryCount);
1241        task.setLeft(entryCount);
1242      }
1243    }
1244
1245    /**
1246     * Update the counters of the task for each entry processed during
1247     * an import or export.
1248     *
1249     * @param  entriesDone The number of entries that were processed
1250     *                     since the last time this method was called.
1251     *
1252     * @throws DirectoryException if an error occurred.
1253     */
1254    private void updateCounters(int entriesDone) throws DirectoryException
1255    {
1256      entryLeftCount -= entriesDone;
1257
1258      if (initializeTask != null)
1259      {
1260        if (initializeTask instanceof InitializeTask)
1261        {
1262          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
1263        }
1264        else if (initializeTask instanceof InitializeTargetTask)
1265        {
1266          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
1267        }
1268      }
1269    }
1270
1271    /** {@inheritDoc} */
1272    @Override
1273    public String toString()
1274    {
1275      return "[Entry count=" + this.entryCount +
1276             ", Entry left count=" + this.entryLeftCount + "]";
1277    }
1278
1279    /**
1280     * Gets the server id of the exporting server.
1281     * @return the server id of the exporting server.
1282     */
1283    public int getExportTarget()
1284    {
1285      return exportTarget;
1286    }
1287
1288    /**
1289     * Gets the server id of the importing server.
1290     * @return the server id of the importing server.
1291     */
1292    public int getImportSource()
1293    {
1294      return importSource;
1295    }
1296
1297    /**
1298     * Get the exception that occurred during the import/export.
1299     * @return the exception that occurred during the import/export.
1300     */
1301    public DirectoryException getException()
1302    {
1303      return exception;
1304    }
1305
1306    /**
1307     * Set the exception that occurred during the import/export.
1308     * @param exception the exception that occurred during the import/export.
1309     */
1310    public void setException(DirectoryException exception)
1311    {
1312      this.exception = exception;
1313    }
1314
1315    /**
1316     * Only sets the exception that occurred during the import/export if none
1317     * was already set on this object.
1318     *
1319     * @param exception the exception that occurred during the import/export.
1320     */
1321    public void setExceptionIfNoneSet(DirectoryException exception)
1322    {
1323      if (exception == null)
1324      {
1325        this.exception = exception;
1326      }
1327    }
1328
1329    /**
1330     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
1331     * (updated via the listener thread)
1332     * @param serverId serverId of the acknowledger/receiver/importer server.
1333     * @param numAck   id of the message received.
1334     */
1335    private void setAckVal(int serverId, int numAck)
1336    {
1337      if (logger.isTraceEnabled())
1338      {
1339        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
1340      }
1341
1342      this.ackVals.put(serverId, numAck);
1343
1344      // Recompute the server with the minAck returned,means the slowest server.
1345      slowestServerId = serverId;
1346      for (Integer sid : importExportContext.get().ackVals.keySet())
1347      {
1348        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
1349        {
1350          slowestServerId = sid;
1351        }
1352      }
1353    }
1354
1355    /**
1356     * Returns the serverId of the server that acknowledged the smallest
1357     * EntryMsg id.
1358     * @return serverId of the server with latest acknowledge.
1359     *                  0 when no ack has been received yet.
1360     */
1361    public int getSlowestServer()
1362    {
1363      if (logger.isTraceEnabled())
1364      {
1365        logger.trace("[IE] getSlowestServer" + slowestServerId
1366            + " " + this.ackVals.get(slowestServerId));
1367      }
1368
1369      return this.slowestServerId;
1370    }
1371
1372  }
1373
1374  /**
1375   * Verifies that the given string represents a valid source
1376   * from which this server can be initialized.
1377   *
1378   * @param targetString The string representing the source
1379   * @return The source as a integer value
1380   * @throws DirectoryException if the string is not valid
1381   */
1382  public int decodeTarget(String targetString) throws DirectoryException
1383  {
1384    if ("all".equalsIgnoreCase(targetString))
1385    {
1386      return RoutableMsg.ALL_SERVERS;
1387    }
1388
1389    // So should be a serverID
1390    try
1391    {
1392      int target = Integer.decode(targetString);
1393      if (target >= 0)
1394      {
1395        // FIXME Could we check now that it is a know server in the domain ?
1396        // JNR: Yes please
1397      }
1398      return target;
1399    }
1400    catch (Exception e)
1401    {
1402      ResultCode resultCode = ResultCode.OTHER;
1403      LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get();
1404      throw new DirectoryException(resultCode, message, e);
1405    }
1406  }
1407
1408  /**
1409   * Initializes a remote server from this server.
1410   * <p>
1411   * The {@code exportBackend(OutputStream)} will therefore be called
1412   * on this server, and the {@code importBackend(InputStream)}
1413   * will be called on the remote server.
1414   * <p>
1415   * The InputStream and OutputStream given as a parameter to those
1416   * methods will be connected through the replication protocol.
1417   *
1418   * @param target   The server-id of the server that should be initialized.
1419   *                 The target can be discovered using the
1420   *                 {@link #getReplicaInfos()} method.
1421   * @param initTask The task that triggers this initialization and that should
1422   *                 be updated with its progress.
1423   *
1424   * @throws DirectoryException If it was not possible to publish the
1425   *                            Initialization message to the Topology.
1426   */
1427  public void initializeRemote(int target, Task initTask)
1428  throws DirectoryException
1429  {
1430    initializeRemote(target, getServerId(), initTask, getInitWindow());
1431  }
1432
1433  /**
1434   * Process the initialization of some other server or servers in the topology
1435   * specified by the target argument when this initialization specifying the
1436   * server that requests the initialization.
1437   *
1438   * @param serverToInitialize The target server that should be initialized.
1439   * @param serverRunningTheTask The server that initiated the export. It can
1440   * be the serverID of this server, or the serverID of a remote server.
1441   * @param initTask The task in this server that triggers this initialization
1442   * and that should be updated with its progress. Null when the export is done
1443   * following a request coming from a remote server (task is remote).
1444   * @param initWindow The value of the initialization window for flow control
1445   * between the importer and the exporter.
1446   *
1447   * @exception DirectoryException When an error occurs. No exception raised
1448   * means success.
1449   */
1450  protected void initializeRemote(int serverToInitialize,
1451      int serverRunningTheTask, Task initTask, int initWindow)
1452  throws DirectoryException
1453  {
1454    final ImportExportContext ieCtx = acquireIEContext(false);
1455
1456    /*
1457    We manage the list of servers to initialize in order :
1458    - to test at the end that all expected servers have reconnected
1459    after their import and with the right genId
1460    - to update the task with the server(s) where this test failed
1461    */
1462
1463    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1464    {
1465      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
1466          countEntries(), getBaseDN(), getServerId());
1467
1468      ieCtx.startList.addAll(getReplicaInfos().keySet());
1469
1470      // We manage the list of servers with which a flow control can be enabled
1471      for (DSInfo dsi : getReplicaInfos().values())
1472      {
1473        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1474        {
1475          ieCtx.setAckVal(dsi.getDsId(), 0);
1476        }
1477      }
1478    }
1479    else
1480    {
1481      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(),
1482          getBaseDN(), getServerId(), serverToInitialize);
1483
1484      ieCtx.startList.add(serverToInitialize);
1485
1486      // We manage the list of servers with which a flow control can be enabled
1487      for (DSInfo dsi : getReplicaInfos().values())
1488      {
1489        if (dsi.getDsId() == serverToInitialize &&
1490            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1491        {
1492          ieCtx.setAckVal(dsi.getDsId(), 0);
1493        }
1494      }
1495    }
1496
1497    DirectoryException exportRootException = null;
1498
1499    // loop for the case where the exporter is the initiator
1500    int attempt = 0;
1501    boolean done = false;
1502    while (!done && ++attempt < 2) // attempt loop
1503    {
1504      try
1505      {
1506        ieCtx.exportTarget = serverToInitialize;
1507        if (initTask != null)
1508        {
1509          ieCtx.initializeTask = initTask;
1510        }
1511        ieCtx.initializeCounters(countEntries());
1512        ieCtx.msgCnt = 0;
1513        ieCtx.initNumLostConnections = broker.getNumLostConnections();
1514        ieCtx.initWindow = initWindow;
1515
1516        // Send start message to the peer
1517        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
1518            getBaseDN(), getServerId(), serverToInitialize,
1519            serverRunningTheTask, ieCtx.entryCount, initWindow);
1520
1521        broker.publish(initTargetMsg);
1522
1523        // Wait for all servers to be ok
1524        waitForRemoteStartOfInit(ieCtx);
1525
1526        // Servers that left in the list are those for which we could not test
1527        // that they have been successfully initialized.
1528        if (!ieCtx.failureList.isEmpty())
1529        {
1530          throw new DirectoryException(
1531              ResultCode.OTHER,
1532              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDN(), ieCtx.failureList));
1533        }
1534
1535        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
1536
1537        // Notify the peer of the success
1538        broker.publish(
1539            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
1540      }
1541      catch(DirectoryException exportException)
1542      {
1543        // Give priority to the first exception raised - stored in the context
1544        final DirectoryException ieEx = ieCtx.exception;
1545        exportRootException = ieEx != null ? ieEx : exportException;
1546      }
1547
1548      if (logger.isTraceEnabled())
1549      {
1550        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
1551            + " export ends with connected=" + broker.isConnected()
1552            + " exportRootException=" + exportRootException);
1553      }
1554
1555      if (exportRootException != null)
1556      {
1557        try
1558        {
1559          /*
1560          Handling the errors during export
1561
1562          Note: we could have lost the connection and another thread
1563          the listener one) has already managed to reconnect.
1564          So we MUST rely on the test broker.isConnected()
1565          ONLY to do 'wait to be reconnected by another thread'
1566          (if not yet reconnected already).
1567          */
1568          if (!broker.isConnected())
1569          {
1570            // We are still disconnected, so we wait for the listener thread
1571            // to reconnect - wait 10s
1572            if (logger.isTraceEnabled())
1573            {
1574              logger.trace("[IE] Exporter wait for reconnection by the listener thread");
1575            }
1576            int att=0;
1577            while (!broker.shuttingDown()
1578                && !broker.isConnected()
1579                && ++att < 100)
1580            {
1581              try { Thread.sleep(100); }
1582              catch(Exception e){ /* do nothing */ }
1583            }
1584          }
1585
1586          if (initTask != null
1587              && broker.isConnected()
1588              && serverToInitialize != RoutableMsg.ALL_SERVERS)
1589          {
1590            /*
1591            NewAttempt case : In the case where
1592            - it's not an InitializeAll
1593            - AND the previous export attempt failed
1594            - AND we are (now) connected
1595            - and we own the task and this task is not an InitializeAll
1596            Let's :
1597            - sleep to let time to the other peer to reconnect if needed
1598            - and launch another attempt
1599            */
1600            try { Thread.sleep(1000); }
1601            catch(Exception e){ /* do nothing */ }
1602
1603            logger.info(NOTE_RESENDING_INIT_TARGET, exportRootException.getLocalizedMessage());
1604            continue;
1605          }
1606
1607          broker.publish(new ErrorMsg(
1608              serverToInitialize, exportRootException.getMessageObject()));
1609        }
1610        catch(Exception e)
1611        {
1612          // Ignore the failure raised while proceeding the root failure
1613        }
1614      }
1615
1616      // We are always done for this export ...
1617      // ... except in the NewAttempt case (see above)
1618      done = true;
1619
1620    } // attempt loop
1621
1622    // Wait for all servers to be ok, and build the failure list
1623    waitForRemoteEndOfInit(ieCtx);
1624
1625    // Servers that left in the list are those for which we could not test
1626    // that they have been successfully initialized.
1627    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
1628    {
1629      exportRootException = new DirectoryException(ResultCode.OTHER,
1630              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(getGenerationID(), ieCtx.failureList));
1631    }
1632
1633    // Don't forget to release IEcontext acquired at beginning.
1634    releaseIEContext(); // FIXME should not this be in a finally?
1635
1636    final String cause = exportRootException == null ? ""
1637        : exportRootException.getLocalizedMessage();
1638    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1639    {
1640      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL,
1641          getBaseDN(), getServerId(), cause);
1642    }
1643    else
1644    {
1645      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END,
1646          getBaseDN(), getServerId(), serverToInitialize, cause);
1647    }
1648
1649
1650    if (exportRootException != null)
1651    {
1652      throw exportRootException;
1653    }
1654  }
1655
1656  /**
1657   * For all remote servers in the start list:
1658   * - wait it has finished the import and present the expected generationID,
1659   * - build the failureList.
1660   */
1661  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
1662  {
1663    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1664
1665    if (logger.isTraceEnabled())
1666    {
1667      logger.trace("[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1668    }
1669
1670    int waitResultAttempt = 0;
1671    boolean done;
1672    do
1673    {
1674      done = true;
1675      for (DSInfo dsi : getReplicaInfos().values())
1676      {
1677        if (logger.isTraceEnabled())
1678        {
1679          logger.trace(
1680            "[IE] wait for start dsId " + dsi.getDsId()
1681            + " " + dsi.getStatus()
1682            + " " + dsi.getGenerationId()
1683            + " " + getGenerationID());
1684        }
1685        if (ieCtx.startList.contains(dsi.getDsId()))
1686        {
1687          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
1688          {
1689            // this one is still not doing the Full Update ... retry later
1690            done = false;
1691            try { Thread.sleep(100);
1692            }
1693            catch (InterruptedException e) {
1694              Thread.currentThread().interrupt();
1695            }
1696            waitResultAttempt++;
1697            break;
1698          }
1699          else
1700          {
1701            // this one is ok
1702            replicasWeAreWaitingFor.remove(dsi.getDsId());
1703          }
1704        }
1705      }
1706    }
1707    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
1708
1709    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1710
1711    if (logger.isTraceEnabled())
1712    {
1713      logger.trace("[IE] wait for start ends with " + ieCtx.failureList);
1714    }
1715  }
1716
1717  /**
1718   * For all remote servers in the start list:
1719   * - wait it has finished the import and present the expected generationID,
1720   * - build the failureList.
1721   */
1722  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
1723  {
1724    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1725
1726    if (logger.isTraceEnabled())
1727    {
1728      logger.trace("[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1729    }
1730
1731    /*
1732    In case some new servers appear during the init, we want them to be
1733    considered in the processing of sorting the successfully initialized
1734    and the others
1735    */
1736    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
1737
1738    boolean done;
1739    do
1740    {
1741      done = true;
1742      int reconnectMaxDelayInSec = 10;
1743      int reconnectWait = 0;
1744      Iterator<Integer> it = replicasWeAreWaitingFor.iterator();
1745      while (it.hasNext())
1746      {
1747        int serverId = it.next();
1748        if (ieCtx.failureList.contains(serverId))
1749        {
1750          /*
1751          this server has already been in error during initialization
1752          don't wait for it
1753          */
1754          continue;
1755        }
1756
1757        DSInfo dsInfo = getConnectedRemoteDS(serverId);
1758        if (dsInfo == null)
1759        {
1760          /*
1761          this server is disconnected
1762          may be for a long time if it crashed or had been stopped
1763          may be just the time to reconnect after import : should be short
1764          */
1765          if (++reconnectWait<reconnectMaxDelayInSec)
1766          {
1767            // let's still wait to give a chance to this server to reconnect
1768            done = false;
1769          }
1770          // Else we left enough time to the servers to reconnect
1771        }
1772        else
1773        {
1774          // this server is connected
1775          if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
1776          {
1777            // this one is still doing the Full Update ... retry later
1778            done = false;
1779            break;
1780          }
1781
1782          if (dsInfo.getGenerationId() == getGenerationID())
1783          { // and with the expected generationId
1784            // We're done with this server
1785            it.remove();
1786          }
1787        }
1788      }
1789
1790      // loop and wait
1791      if (!done)
1792      {
1793        try { Thread.sleep(1000); }
1794        catch (InterruptedException e) {
1795          Thread.currentThread().interrupt();
1796        } // 1sec
1797      }
1798    }
1799    while (!done && !broker.shuttingDown()); // infinite wait
1800
1801    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1802
1803    if (logger.isTraceEnabled())
1804    {
1805      logger.trace("[IE] wait for end ends with " + ieCtx.failureList);
1806    }
1807  }
1808
1809  /**
1810   * Get the ServerState maintained by the Concrete class.
1811   *
1812   * @return the ServerState maintained by the Concrete class.
1813   */
1814  public ServerState getServerState()
1815  {
1816    return state;
1817  }
1818
1819  /**
1820   * Acquire and initialize the import/export context, verifying no other
1821   * import/export is in progress.
1822   */
1823  private ImportExportContext acquireIEContext(boolean importInProgress)
1824      throws DirectoryException
1825  {
1826    final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
1827    if (!importExportContext.compareAndSet(null, ieCtx))
1828    {
1829      // Rejects 2 simultaneous exports
1830      LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
1831      throw new DirectoryException(ResultCode.OTHER, message);
1832    }
1833    return ieCtx;
1834  }
1835
1836  private void releaseIEContext()
1837  {
1838    importExportContext.set(null);
1839  }
1840
1841  /**
1842   * Processes an error message received while an export is
1843   * on going, or an import will start.
1844   *
1845   * @param errorMsg The error message received.
1846   */
1847  private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
1848  {
1849    //Exporting must not be stopped on the first error, if we run initialize-all
1850    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
1851    {
1852      // The ErrorMsg is received while we have started an initialization
1853      ieCtx.setExceptionIfNoneSet(new DirectoryException(
1854          ResultCode.OTHER, errorMsg.getDetails()));
1855
1856      /*
1857       * This can happen :
1858       * - on the first InitReqMsg sent when source in not known for example
1859       * - on the next attempt when source crashed and did not reconnect
1860       *   even after the nextInitAttemptDelay
1861       * During the import, the ErrorMsg will be received by receiveEntryBytes
1862       */
1863      if (ieCtx.initializeTask instanceof InitializeTask)
1864      {
1865        // Update the task that initiated the import
1866        ((InitializeTask) ieCtx.initializeTask)
1867            .updateTaskCompletionState(ieCtx.getException());
1868
1869        releaseIEContext();
1870      }
1871    }
1872  }
1873
1874  /**
1875   * Receives bytes related to an entry in the context of an import to
1876   * initialize the domain (called by ReplLDIFInputStream).
1877   *
1878   * @return The bytes. Null when the Done or Err message has been received
1879   */
1880  protected byte[] receiveEntryBytes()
1881  {
1882    ReplicationMsg msg;
1883    while (true)
1884    {
1885      ImportExportContext ieCtx = importExportContext.get();
1886      try
1887      {
1888        // In the context of the total update, we don't want any automatic
1889        // re-connection done transparently by the broker because of a better
1890        // RS or because of a connection failure.
1891        // We want to be notified of topology change in order to track a
1892        // potential disconnection of the exporter.
1893        msg = broker.receive(false, false, true);
1894
1895        if (logger.isTraceEnabled())
1896        {
1897          logger.trace("[IE] In "
1898              + broker.getReplicationMonitorInstanceName()
1899              + ", receiveEntryBytes " + msg);
1900        }
1901
1902        if (msg == null)
1903        {
1904          if (broker.shuttingDown())
1905          {
1906            // The server is in the shutdown process
1907            return null;
1908          }
1909          else
1910          {
1911            // Handle connection issues
1912            ieCtx.setExceptionIfNoneSet(new DirectoryException(
1913                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
1914                    .get(broker.getReplicationServer())));
1915            return null;
1916          }
1917        }
1918
1919        // Check good ordering of msg received
1920        if (msg instanceof EntryMsg)
1921        {
1922          EntryMsg entryMsg = (EntryMsg)msg;
1923          byte[] entryBytes = entryMsg.getEntryBytes();
1924          ieCtx.updateCounters(countEntryLimits(entryBytes));
1925
1926          if (ieCtx.exporterProtocolVersion >=
1927            ProtocolVersion.REPLICATION_PROTOCOL_V4)
1928          {
1929            // check the msgCnt of the msg received to check ordering
1930            if (++ieCtx.msgCnt != entryMsg.getMsgId())
1931            {
1932              ieCtx.setExceptionIfNoneSet(new DirectoryException(
1933                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(ieCtx.msgCnt, entryMsg.getMsgId())));
1934              return null;
1935            }
1936
1937            // send the ack of flow control mgmt
1938            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
1939            {
1940              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
1941                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
1942              broker.publish(amsg, false);
1943              if (logger.isTraceEnabled())
1944              {
1945                logger.trace("[IE] In "
1946                    + broker.getReplicationMonitorInstanceName()
1947                    + ", publish InitializeRcvAckMsg" + amsg);
1948              }
1949            }
1950          }
1951          return entryBytes;
1952        }
1953        else if (msg instanceof DoneMsg)
1954        {
1955          /*
1956          This is the normal termination of the import
1957          No error is stored and the import is ended by returning null
1958          */
1959          return null;
1960        }
1961        else if (msg instanceof ErrorMsg)
1962        {
1963          /*
1964          This is an error termination during the import
1965          The error is stored and the import is ended by returning null
1966          */
1967          if (ieCtx.getException() == null)
1968          {
1969            ErrorMsg errMsg = (ErrorMsg)msg;
1970            if (errMsg.getCreationTime() > ieCtx.startTime)
1971            {
1972              ieCtx.setException(
1973                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
1974              return null;
1975            }
1976          }
1977        }
1978        else
1979        {
1980          // Other messages received during an import are trashed except
1981          // the topologyMsg.
1982          if (msg instanceof TopologyMsg
1983              && getConnectedRemoteDS(ieCtx.importSource) == null)
1984          {
1985            LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
1986                getBaseDN(), getServerId(), ieCtx.importSource);
1987            ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, errMsg));
1988            return null;
1989          }
1990        }
1991      }
1992      catch(Exception e)
1993      {
1994        ieCtx.setExceptionIfNoneSet(new DirectoryException(
1995            ResultCode.OTHER,
1996            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
1997      }
1998    }
1999  }
2000
2001  /**
2002   * Count the number of entries in the provided byte[].
2003   * This is based on the hypothesis that the entries are separated
2004   * by a "\n\n" String.
2005   *
2006   * @param   entryBytes the set of bytes containing one or more entries.
2007   * @return  The number of entries in the provided byte[].
2008   */
2009  private int countEntryLimits(byte[] entryBytes)
2010  {
2011    return countEntryLimits(entryBytes, 0, entryBytes.length);
2012  }
2013
2014  /**
2015   * Count the number of entries in the provided byte[].
2016   * This is based on the hypothesis that the entries are separated
2017   * by a "\n\n" String.
2018   *
2019   * @param   entryBytes the set of bytes containing one or more entries.
2020   * @return  The number of entries in the provided byte[].
2021   */
2022  private int countEntryLimits(byte[] entryBytes, int pos, int length)
2023  {
2024    int entryCount = 0;
2025    int count = 0;
2026    while (count<=length-2)
2027    {
2028      if (entryBytes[pos+count] == '\n' && entryBytes[pos+count+1] == '\n')
2029      {
2030        entryCount++;
2031        count++;
2032      }
2033      count++;
2034    }
2035    return entryCount;
2036  }
2037
2038  /**
2039   * Exports an entry in LDIF format.
2040   *
2041   * @param lDIFEntry The entry to be exported in byte[] form.
2042   * @param pos       The starting Position in the array.
2043   * @param length    Number of array elements to be copied.
2044   *
2045   * @throws IOException when an error occurred.
2046   */
2047  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
2048      throws IOException
2049  {
2050    if (logger.isTraceEnabled())
2051    {
2052      logger.trace("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry));
2053    }
2054
2055    // build the message
2056    ImportExportContext ieCtx = importExportContext.get();
2057    EntryMsg entryMessage = new EntryMsg(
2058        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
2059        ++ieCtx.msgCnt);
2060
2061    // Waiting the slowest loop
2062    while (!broker.shuttingDown())
2063    {
2064      /*
2065      If an error was raised - like receiving an ErrorMsg from a remote
2066      server that have been stored by the listener thread in the ieContext,
2067      we just abandon the export by throwing an exception.
2068      */
2069      if (ieCtx.getException() != null)
2070      {
2071        throw new IOException(ieCtx.getException().getMessage());
2072      }
2073
2074      int slowestServerId = ieCtx.getSlowestServer();
2075      if (getConnectedRemoteDS(slowestServerId) == null)
2076      {
2077        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
2078            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
2079
2080        throw new IOException("IOException with nested DirectoryException",
2081            ieCtx.getException());
2082      }
2083
2084      int ourLastExportedCnt = ieCtx.msgCnt;
2085      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
2086
2087      if (logger.isTraceEnabled())
2088      {
2089        logger.trace("[IE] Entering exportLDIFEntry waiting " +
2090            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
2091      }
2092
2093      if (ourLastExportedCnt - slowestCnt > ieCtx.initWindow)
2094      {
2095        if (logger.isTraceEnabled())
2096        {
2097          logger.trace("[IE] Entering exportLDIFEntry waiting");
2098        }
2099
2100        // our export is too far beyond the slowest importer - let's wait
2101        try { Thread.sleep(100); }
2102        catch(Exception e) { /* do nothing */ }
2103
2104        // process any connection error
2105        if (broker.hasConnectionError()
2106          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2107        {
2108          // publish failed - store the error in the ieContext ...
2109          DirectoryException de = new DirectoryException(ResultCode.OTHER,
2110              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2111          ieCtx.setExceptionIfNoneSet(de);
2112          // .. and abandon the export by throwing an exception.
2113          throw new IOException(de.getMessage());
2114        }
2115      }
2116      else
2117      {
2118        if (logger.isTraceEnabled())
2119        {
2120          logger.trace("[IE] slowest got to us => stop waiting");
2121        }
2122        break;
2123      }
2124    } // Waiting the slowest loop
2125
2126    if (logger.isTraceEnabled())
2127    {
2128      logger.trace("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry));
2129    }
2130
2131    boolean sent = broker.publish(entryMessage, false);
2132
2133    // process any publish error
2134    if (!sent
2135        || broker.hasConnectionError()
2136        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2137    {
2138      // publish failed - store the error in the ieContext ...
2139      DirectoryException de = new DirectoryException(ResultCode.OTHER,
2140          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2141      ieCtx.setExceptionIfNoneSet(de);
2142      // .. and abandon the export by throwing an exception.
2143      throw new IOException(de.getMessage());
2144    }
2145
2146    // publish succeeded
2147    try
2148    {
2149      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
2150    }
2151    catch (DirectoryException de)
2152    {
2153      ieCtx.setExceptionIfNoneSet(de);
2154      // .. and abandon the export by throwing an exception.
2155      throw new IOException(de.getMessage());
2156    }
2157  }
2158
2159  /**
2160   * Initializes asynchronously this domain from a remote source server.
2161   * Before returning from this call, for the provided task :
2162   * - the progressing counters are updated during the initialization using
2163   *   setTotal() and setLeft().
2164   * - the end of the initialization using updateTaskCompletionState().
2165   * <p>
2166   * When this method is called, a request for initialization is sent to the
2167   * remote source server requesting initialization.
2168   * <p>
2169   *
2170   * @param source   The server-id of the source from which to initialize.
2171   *                 The source can be discovered using the
2172   *                 {@link #getReplicaInfos()} method.
2173   *
2174   * @param initTask The task that launched the initialization
2175   *                 and should be updated of its progress.
2176   *
2177   * @throws DirectoryException If it was not possible to publish the
2178   *                            Initialization message to the Topology.
2179   *                            The task state is updated.
2180   */
2181  public void initializeFromRemote(int source, Task initTask)
2182  throws DirectoryException
2183  {
2184    if (logger.isTraceEnabled())
2185    {
2186      logger.trace("[IE] Entering initializeFromRemote for " + this);
2187    }
2188
2189    LocalizableMessage errMsg = !broker.isConnected()
2190        ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDN())
2191        : null;
2192
2193    /*
2194    We must not test here whether the remote source is connected to
2195    the topology by testing if it stands in the replicas list since.
2196    In the case of a re-attempt of initialization, the listener thread is
2197    running this method directly coming from initialize() method and did
2198    not processed any topology message in between the failure and the
2199    new attempt.
2200    */
2201    try
2202    {
2203      /*
2204      We must immediately acquire a context to store the task inside
2205      The context will be used when we (the listener thread) will receive
2206      the InitializeTargetMsg, process the import, and at the end
2207      update the task.
2208      */
2209
2210      final ImportExportContext ieCtx = acquireIEContext(true);
2211      ieCtx.initializeTask = initTask;
2212      ieCtx.attemptCnt = 0;
2213      ieCtx.initReqMsgSent = new InitializeRequestMsg(
2214          getBaseDN(), getServerId(), source, getInitWindow());
2215      broker.publish(ieCtx.initReqMsgSent);
2216
2217      /*
2218      The normal success processing is now to receive InitTargetMsg then
2219      entries from the remote server.
2220      The error cases are :
2221      - either local error immediately caught below
2222      - a remote error we will receive as an ErrorMsg
2223      */
2224    }
2225    catch(DirectoryException de)
2226    {
2227      errMsg = de.getMessageObject();
2228    }
2229    catch(Exception e)
2230    {
2231      // Should not happen
2232      errMsg = LocalizableMessage.raw(e.getLocalizedMessage());
2233      logger.error(errMsg);
2234    }
2235
2236    // When error, update the task and raise the error to the caller
2237    if (errMsg != null)
2238    {
2239      // No need to call here updateTaskCompletionState - will be done
2240      // by the caller
2241      releaseIEContext();
2242      throw new DirectoryException(ResultCode.OTHER, errMsg);
2243    }
2244  }
2245
2246  /**
2247   * Processes an InitializeTargetMsg received from a remote server
2248   * meaning processes an initialization from the entries expected to be
2249   * received now.
2250   *
2251   * @param initTargetMsgReceived The message received from the remote server.
2252   *
2253   * @param requesterServerId The serverId of the server that requested the
2254   *                          initialization meaning the server where the
2255   *                          task has initially been created (this server,
2256   *                          or the remote server).
2257   */
2258  private void initialize(InitializeTargetMsg initTargetMsgReceived, int requesterServerId)
2259  {
2260    if (logger.isTraceEnabled())
2261    {
2262      logger.trace("[IE] Entering initialize - domain=" + this);
2263    }
2264
2265    InitializeTask initFromTask = null;
2266    int source = initTargetMsgReceived.getSenderID();
2267    ImportExportContext ieCtx = importExportContext.get();
2268    try
2269    {
2270      // Log starting
2271      logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START, getBaseDN(),
2272          initTargetMsgReceived.getSenderID(), getServerId());
2273
2274      // Go into full update status
2275      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
2276
2277      // Acquire an import context if no already done (and initialize).
2278      if (initTargetMsgReceived.getInitiatorID() != getServerId())
2279      {
2280        /*
2281        The initTargetMsgReceived is for an import initiated by the remote server.
2282        Test and set if no import already in progress
2283        */
2284        ieCtx = acquireIEContext(true);
2285      }
2286
2287      // Initialize stuff
2288      ieCtx.importSource = source;
2289      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
2290      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
2291      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
2292      initFromTask = (InitializeTask) ieCtx.initializeTask;
2293
2294      // Launch the import
2295      importBackend(new ReplInputStream(this));
2296    }
2297    catch (DirectoryException e)
2298    {
2299      /*
2300      Store the exception raised. It will be considered if no other exception
2301      has been previously stored in  the context
2302      */
2303      ieCtx.setExceptionIfNoneSet(e);
2304    }
2305    finally
2306    {
2307      if (logger.isTraceEnabled())
2308      {
2309        logger.trace("[IE] Domain=" + this
2310          + " ends import with exception=" + ieCtx.getException()
2311          + " connected=" + broker.isConnected());
2312      }
2313
2314      /*
2315      It is necessary to restart (reconnect to RS) for different reasons
2316      - when everything went well, reconnect in order to exchange
2317      new state, new generation ID
2318      - when we have connection failure, reconnect to retry a new import
2319      right here, right now
2320      we never want retryOnFailure if we fails reconnecting in the restart.
2321      */
2322      broker.reStart(false);
2323
2324      if (ieCtx.getException() != null
2325          && broker.isConnected()
2326          && initFromTask != null
2327          && ++ieCtx.attemptCnt < 2)
2328      {
2329          /*
2330          Worth a new attempt
2331          since initFromTask is in this server, connection is ok
2332          */
2333          try
2334          {
2335            /*
2336            Wait for the exporter to stabilize - eventually reconnect as
2337            well if it was connected to the same RS than the one we lost ...
2338            */
2339            Thread.sleep(1000);
2340
2341            /*
2342            Restart the whole import protocol exchange by sending again
2343            the request
2344            */
2345            logger.info(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST,
2346                ieCtx.getException().getLocalizedMessage());
2347
2348            broker.publish(ieCtx.initReqMsgSent);
2349
2350            ieCtx.initializeCounters(0);
2351            ieCtx.exception = null;
2352            ieCtx.msgCnt = 0;
2353
2354            // Processing of the received initTargetMsgReceived is done
2355            // let's wait for the next one
2356            return;
2357          }
2358          catch(Exception e)
2359          {
2360            /*
2361            An error occurs when sending a new request for a new import.
2362            This error is not stored, preferring to keep the initial one.
2363            */
2364            logger.error(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST,
2365                e.getLocalizedMessage(), ieCtx.getException().getLocalizedMessage());
2366          }
2367      }
2368
2369      // ===================
2370      // No new attempt case
2371
2372      if (logger.isTraceEnabled())
2373      {
2374        logger.trace("[IE] Domain=" + this
2375          + " ends initialization with exception=" + ieCtx.getException()
2376          + " connected=" + broker.isConnected()
2377          + " task=" + initFromTask
2378          + " attempt=" + ieCtx.attemptCnt);
2379      }
2380
2381      try
2382      {
2383        if (broker.isConnected() && ieCtx.getException() != null)
2384        {
2385          // Let's notify the exporter
2386          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
2387              ieCtx.getException().getMessageObject());
2388          broker.publish(errorMsg);
2389        }
2390        /*
2391        Update the task that initiated the import must be the last thing.
2392        Particularly, broker.restart() after import success must be done
2393        before some other operations/tasks to be launched,
2394        like resetting the generation ID.
2395        */
2396        if (initFromTask != null)
2397        {
2398          initFromTask.updateTaskCompletionState(ieCtx.getException());
2399        }
2400      }
2401      finally
2402      {
2403        String errorMsg = ieCtx.getException() != null ? ieCtx.getException().getLocalizedMessage() : "";
2404        logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END,
2405            getBaseDN(), initTargetMsgReceived.getSenderID(), getServerId(), errorMsg);
2406        releaseIEContext();
2407      } // finally
2408    } // finally
2409  }
2410
2411  /**
2412   * Return the protocol version of the DS related to the provided serverId.
2413   * Returns -1 when the protocol version is not known.
2414   * @param dsServerId The provided serverId.
2415   * @return The protocol version.
2416   */
2417  private short getProtocolVersion(int dsServerId)
2418  {
2419    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
2420    if (dsInfo != null)
2421    {
2422      return dsInfo.getProtocolVersion();
2423    }
2424    return -1;
2425  }
2426
2427  /**
2428   * Sets the status to a new value depending of the passed status machine
2429   * event.
2430   * @param event The event that may make the status be changed
2431   */
2432  protected void signalNewStatus(StatusMachineEvent event)
2433  {
2434    setNewStatus(event);
2435    broker.signalStatusChange(status);
2436  }
2437
2438  private void setNewStatus(StatusMachineEvent event)
2439  {
2440    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
2441    if (newStatus == ServerStatus.INVALID_STATUS)
2442    {
2443      logger.error(ERR_DS_CANNOT_CHANGE_STATUS, getBaseDN(), getServerId(), status, event);
2444      return;
2445    }
2446
2447    if (newStatus != status)
2448    {
2449      // Reset status date
2450      lastStatusChangeDate = new Date();
2451      // Reset monitoring counters if reconnection
2452      if (newStatus == ServerStatus.NOT_CONNECTED_STATUS)
2453      {
2454        resetMonitoringCounters();
2455      }
2456
2457      status = newStatus;
2458      if (logger.isTraceEnabled())
2459      {
2460        logger.trace("Replication domain " + getBaseDN()
2461            + " new status is: " + status);
2462      }
2463
2464      // Perform whatever actions are needed to apply properties for being
2465      // compliant with new status
2466      updateDomainForNewStatus();
2467    }
2468  }
2469
2470  /**
2471   * Returns a boolean indicating if an import or export is currently
2472   * processed.
2473   *
2474   * @return The status
2475   */
2476  public boolean ieRunning()
2477  {
2478    return importExportContext.get() != null;
2479  }
2480
2481  /**
2482   * Check the value of the Replication Servers generation ID.
2483   *
2484   * @param generationID        The expected value of the generation ID.
2485   *
2486   * @throws DirectoryException When the generation ID of the Replication
2487   *                            Servers is not the expected value.
2488   */
2489  private void checkGenerationID(long generationID) throws DirectoryException
2490  {
2491    boolean allSet = true;
2492
2493    for (int i = 0; i< 50; i++)
2494    {
2495      allSet = true;
2496      for (RSInfo rsInfo : getRsInfos())
2497      {
2498        // the 'empty' RSes (generationId==-1) are considered as good citizens
2499        if (rsInfo.getGenerationId() != -1 &&
2500            rsInfo.getGenerationId() != generationID)
2501        {
2502          try
2503          {
2504            Thread.sleep(i*100);
2505          } catch (InterruptedException e)
2506          {
2507            Thread.currentThread().interrupt();
2508          }
2509          allSet = false;
2510          break;
2511        }
2512      }
2513      if (allSet)
2514      {
2515        break;
2516      }
2517    }
2518    if (!allSet)
2519    {
2520      LocalizableMessage message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDN());
2521      throw new DirectoryException(ResultCode.OTHER, message);
2522    }
2523  }
2524
2525  /**
2526   * Reset the Replication Log.
2527   * Calling this method will remove all the Replication information that
2528   * was kept on all the Replication Servers currently connected in the
2529   * topology.
2530   *
2531   * @throws DirectoryException If this ReplicationDomain is not currently
2532   *                           connected to a Replication Server or it
2533   *                           was not possible to contact it.
2534   */
2535  void resetReplicationLog() throws DirectoryException
2536  {
2537    // Reset the Generation ID to -1 to clean the ReplicationServers.
2538    resetGenerationId(-1L);
2539
2540    // check that at least one ReplicationServer did change its generation-id
2541    checkGenerationID(-1);
2542
2543    // Reconnect to the Replication Server so that it adopts our GenerationID.
2544    restartService();
2545
2546    // wait for the domain to reconnect.
2547    int count = 0;
2548    while (!isConnected() && count < 10)
2549    {
2550      try
2551      {
2552        Thread.sleep(100);
2553      } catch (InterruptedException e)
2554      {
2555        Thread.currentThread().interrupt();
2556      }
2557    }
2558
2559    resetGenerationId(getGenerationID());
2560
2561    // check that at least one ReplicationServer did change its generation-id
2562    checkGenerationID(getGenerationID());
2563  }
2564
2565  /**
2566   * Reset the generationId of this domain in the whole topology.
2567   * A message is sent to the Replication Servers for them to reset
2568   * their change dbs.
2569   *
2570   * @param generationIdNewValue  The new value of the generation Id.
2571   * @throws DirectoryException   When an error occurs
2572   */
2573  public void resetGenerationId(Long generationIdNewValue)
2574      throws DirectoryException
2575  {
2576    if (logger.isTraceEnabled())
2577    {
2578      logger.trace("Server id " + getServerId() + " and domain "
2579          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
2580    }
2581
2582    ResetGenerationIdMsg genIdMessage =
2583        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
2584
2585    if (!isConnected())
2586    {
2587      LocalizableMessage message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDN(),
2588          getServerId(), genIdMessage.getGenerationId());
2589      throw new DirectoryException(ResultCode.OTHER, message);
2590    }
2591    broker.publish(genIdMessage);
2592
2593    // check that at least one ReplicationServer did change its generation-id
2594    checkGenerationID(getGenId(generationIdNewValue));
2595  }
2596
2597  private long getGenId(Long generationIdNewValue)
2598  {
2599    if (generationIdNewValue != null)
2600    {
2601      return generationIdNewValue;
2602    }
2603    return getGenerationID();
2604  }
2605
2606
2607  /*
2608   ******** End of The total Update code *********
2609   */
2610
2611  /*
2612   ******* Start of Monitoring Code **********
2613   */
2614
2615  /**
2616   * Get the maximum receive window size.
2617   *
2618   * @return The maximum receive window size.
2619   */
2620  int getMaxRcvWindow()
2621  {
2622    if (broker != null)
2623    {
2624      return broker.getMaxRcvWindow();
2625    }
2626    return 0;
2627  }
2628
2629  /**
2630   * Get the current receive window size.
2631   *
2632   * @return The current receive window size.
2633   */
2634  int getCurrentRcvWindow()
2635  {
2636    if (broker != null)
2637    {
2638      return broker.getCurrentRcvWindow();
2639    }
2640    return 0;
2641  }
2642
2643  /**
2644   * Get the maximum send window size.
2645   *
2646   * @return The maximum send window size.
2647   */
2648  int getMaxSendWindow()
2649  {
2650    if (broker != null)
2651    {
2652      return broker.getMaxSendWindow();
2653    }
2654    return 0;
2655  }
2656
2657  /**
2658   * Get the current send window size.
2659   *
2660   * @return The current send window size.
2661   */
2662  int getCurrentSendWindow()
2663  {
2664    if (broker != null)
2665    {
2666      return broker.getCurrentSendWindow();
2667    }
2668    return 0;
2669  }
2670
2671  /**
2672   * Get the number of times the replication connection was lost.
2673   * @return The number of times the replication connection was lost.
2674   */
2675  int getNumLostConnections()
2676  {
2677    if (broker != null)
2678    {
2679      return broker.getNumLostConnections();
2680    }
2681    return 0;
2682  }
2683
2684  /**
2685   * Determine whether the connection to the replication server is encrypted.
2686   * @return true if the connection is encrypted, false otherwise.
2687   */
2688  boolean isSessionEncrypted()
2689  {
2690    return broker != null && broker.isSessionEncrypted();
2691  }
2692
2693  /**
2694   * Check if the domain is connected to a ReplicationServer.
2695   *
2696   * @return true if the server is connected, false if not.
2697   */
2698  public boolean isConnected()
2699  {
2700    return broker != null && broker.isConnected();
2701  }
2702
2703  /**
2704   * Check if the domain has a connection error.
2705   * A Connection error happens when the broker could not be created
2706   * or when the broker could not find any ReplicationServer to connect to.
2707   *
2708   * @return true if the domain has a connection error.
2709   */
2710  public boolean hasConnectionError()
2711  {
2712    return broker == null || broker.hasConnectionError();
2713  }
2714
2715  /**
2716   * Get the name of the replicationServer to which this domain is currently
2717   * connected.
2718   *
2719   * @return the name of the replicationServer to which this domain
2720   *         is currently connected.
2721   */
2722  public String getReplicationServer()
2723  {
2724    if (broker != null)
2725    {
2726      return broker.getReplicationServer();
2727    }
2728    return ReplicationBroker.NO_CONNECTED_SERVER;
2729  }
2730
2731  /**
2732   * Gets the number of updates sent in assured safe read mode.
2733   * @return The number of updates sent in assured safe read mode.
2734   */
2735  public int getAssuredSrSentUpdates()
2736  {
2737    return assuredSrSentUpdates.get();
2738  }
2739
2740  /**
2741   * Gets the number of updates sent in assured safe read mode that have been
2742   * acknowledged without errors.
2743   * @return The number of updates sent in assured safe read mode that have been
2744   * acknowledged without errors.
2745   */
2746  public int getAssuredSrAcknowledgedUpdates()
2747  {
2748    return assuredSrAcknowledgedUpdates.get();
2749  }
2750
2751  /**
2752   * Gets the number of updates sent in assured safe read mode that have not
2753   * been acknowledged.
2754   * @return The number of updates sent in assured safe read mode that have not
2755   * been acknowledged.
2756   */
2757  public int getAssuredSrNotAcknowledgedUpdates()
2758  {
2759    return assuredSrNotAcknowledgedUpdates.get();
2760  }
2761
2762  /**
2763   * Gets the number of updates sent in assured safe read mode that have not
2764   * been acknowledged due to timeout error.
2765   * @return The number of updates sent in assured safe read mode that have not
2766   * been acknowledged due to timeout error.
2767   */
2768  public int getAssuredSrTimeoutUpdates()
2769  {
2770    return assuredSrTimeoutUpdates.get();
2771  }
2772
2773  /**
2774   * Gets the number of updates sent in assured safe read mode that have not
2775   * been acknowledged due to wrong status error.
2776   * @return The number of updates sent in assured safe read mode that have not
2777   * been acknowledged due to wrong status error.
2778   */
2779  public int getAssuredSrWrongStatusUpdates()
2780  {
2781    return assuredSrWrongStatusUpdates.get();
2782  }
2783
2784  /**
2785   * Gets the number of updates sent in assured safe read mode that have not
2786   * been acknowledged due to replay error.
2787   * @return The number of updates sent in assured safe read mode that have not
2788   * been acknowledged due to replay error.
2789   */
2790  public int getAssuredSrReplayErrorUpdates()
2791  {
2792    return assuredSrReplayErrorUpdates.get();
2793  }
2794
2795  /**
2796   * Gets the number of updates sent in assured safe read mode that have not
2797   * been acknowledged per server.
2798   * @return A copy of the map that contains the number of updates sent in
2799   * assured safe read mode that have not been acknowledged per server.
2800   */
2801  public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
2802  {
2803    synchronized(assuredSrServerNotAcknowledgedUpdates)
2804    {
2805      return new HashMap<>(assuredSrServerNotAcknowledgedUpdates);
2806    }
2807  }
2808
2809  /**
2810   * Gets the number of updates received in assured safe read mode request.
2811   * @return The number of updates received in assured safe read mode request.
2812   */
2813  public int getAssuredSrReceivedUpdates()
2814  {
2815    return assuredSrReceivedUpdates.get();
2816  }
2817
2818  /**
2819   * Gets the number of updates received in assured safe read mode that we acked
2820   * without error (no replay error).
2821   * @return The number of updates received in assured safe read mode that we
2822   * acked without error (no replay error).
2823   */
2824  public int getAssuredSrReceivedUpdatesAcked()
2825  {
2826    return this.assuredSrReceivedUpdatesAcked.get();
2827  }
2828
2829  /**
2830   * Gets the number of updates received in assured safe read mode that we did
2831   * not ack due to error (replay error).
2832   * @return The number of updates received in assured safe read mode that we
2833   * did not ack due to error (replay error).
2834   */
2835  public int getAssuredSrReceivedUpdatesNotAcked()
2836  {
2837    return this.assuredSrReceivedUpdatesNotAcked.get();
2838  }
2839
2840  /**
2841   * Gets the number of updates sent in assured safe data mode.
2842   * @return The number of updates sent in assured safe data mode.
2843   */
2844  public int getAssuredSdSentUpdates()
2845  {
2846    return assuredSdSentUpdates.get();
2847  }
2848
2849  /**
2850   * Gets the number of updates sent in assured safe data mode that have been
2851   * acknowledged without errors.
2852   * @return The number of updates sent in assured safe data mode that have been
2853   * acknowledged without errors.
2854   */
2855  public int getAssuredSdAcknowledgedUpdates()
2856  {
2857    return assuredSdAcknowledgedUpdates.get();
2858  }
2859
2860  /**
2861   * Gets the number of updates sent in assured safe data mode that have not
2862   * been acknowledged due to timeout error.
2863   * @return The number of updates sent in assured safe data mode that have not
2864   * been acknowledged due to timeout error.
2865   */
2866  public int getAssuredSdTimeoutUpdates()
2867  {
2868    return assuredSdTimeoutUpdates.get();
2869  }
2870
2871  /**
2872   * Gets the number of updates sent in assured safe data mode that have not
2873   * been acknowledged due to timeout error per server.
2874   * @return A copy of the map that contains the number of updates sent in
2875   * assured safe data mode that have not been acknowledged due to timeout
2876   * error per server.
2877   */
2878  public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
2879  {
2880    synchronized(assuredSdServerTimeoutUpdates)
2881    {
2882      return new HashMap<>(assuredSdServerTimeoutUpdates);
2883    }
2884  }
2885
2886  /**
2887   * Gets the date of the last status change.
2888   * @return The date of the last status change.
2889   */
2890  public Date getLastStatusChangeDate()
2891  {
2892    return lastStatusChangeDate;
2893  }
2894
2895  /**
2896   * Resets the values of the monitoring counters.
2897   */
2898  private void resetMonitoringCounters()
2899  {
2900    numProcessedUpdates = new AtomicInteger(0);
2901    numRcvdUpdates = new AtomicInteger(0);
2902    numSentUpdates = new AtomicInteger(0);
2903
2904    assuredSrSentUpdates = new AtomicInteger(0);
2905    assuredSrAcknowledgedUpdates = new AtomicInteger(0);
2906    assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
2907    assuredSrTimeoutUpdates = new AtomicInteger(0);
2908    assuredSrWrongStatusUpdates = new AtomicInteger(0);
2909    assuredSrReplayErrorUpdates = new AtomicInteger(0);
2910    synchronized (assuredSrServerNotAcknowledgedUpdates)
2911    {
2912      assuredSrServerNotAcknowledgedUpdates.clear();
2913    }
2914    assuredSrReceivedUpdates = new AtomicInteger(0);
2915    assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
2916    assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
2917    assuredSdSentUpdates = new AtomicInteger(0);
2918    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
2919    assuredSdTimeoutUpdates = new AtomicInteger(0);
2920    synchronized (assuredSdServerTimeoutUpdates)
2921    {
2922      assuredSdServerTimeoutUpdates.clear();
2923    }
2924  }
2925
2926  /*
2927   ********** End of Monitoring Code **************
2928   */
2929
2930  /**
2931   * Start the publish mechanism of the Replication Service. After this method
2932   * has been called, the publish service can be used by calling the
2933   * {@link #publish(UpdateMsg)} method.
2934   *
2935   * @throws ConfigException
2936   *           If the DirectoryServer configuration was incorrect.
2937   */
2938  public void startPublishService() throws ConfigException
2939  {
2940    synchronized (sessionLock)
2941    {
2942      if (broker == null)
2943      {
2944        // create the broker object used to publish and receive changes
2945        broker = new ReplicationBroker(
2946            this, state, config, new ReplSessionSecurity());
2947        broker.start();
2948      }
2949    }
2950  }
2951
2952  /**
2953   * Starts the receiver side of the Replication Service.
2954   * <p>
2955   * After this method has been called, the Replication Service will start
2956   * calling the {@link #processUpdate(UpdateMsg)}.
2957   * <p>
2958   * This method must be called once and must be called after the
2959   * {@link #startPublishService()}.
2960   */
2961  public void startListenService()
2962  {
2963    synchronized (sessionLock)
2964    {
2965      if (listenerThread != null)
2966      {
2967        return;
2968      }
2969
2970      final String threadName = "Replica DS(" + getServerId() + ") listener for domain \"" + getBaseDN() + "\"";
2971
2972      listenerThread = new DirectoryThread(new Runnable()
2973      {
2974        @Override
2975        public void run()
2976        {
2977          if (logger.isTraceEnabled())
2978          {
2979            logger.trace("Replication Listener thread starting.");
2980          }
2981
2982          // Loop processing any incoming update messages.
2983          while (!listenerThread.isShutdownInitiated())
2984          {
2985            final UpdateMsg updateMsg = receive();
2986            if (updateMsg == null)
2987            {
2988              // The server is shutting down.
2989              listenerThread.initiateShutdown();
2990            }
2991            else if (processUpdate(updateMsg)
2992                && updateMsg.contributesToDomainState())
2993            {
2994              /*
2995               * Warning: in synchronous mode, no way to tell the replay of an
2996               * update went wrong Just put null in processUpdateDone so that if
2997               * assured replication is used the ack is sent without error at
2998               * replay flag.
2999               */
3000              processUpdateDone(updateMsg, null);
3001              state.update(updateMsg.getCSN());
3002            }
3003          }
3004
3005          if (logger.isTraceEnabled())
3006          {
3007            logger.trace("Replication Listener thread stopping.");
3008          }
3009        }
3010      }, threadName);
3011
3012      listenerThread.start();
3013    }
3014  }
3015
3016  /**
3017   * Temporarily disable the Replication Service.
3018   * The Replication Service can be enabled again using
3019   * {@link #enableService()}.
3020   * <p>
3021   * It can be useful to disable the Replication Service when the
3022   * repository where the replicated information is stored becomes
3023   * temporarily unavailable and replicated updates can therefore not
3024   * be replayed during a while. This method is not MT safe.
3025   */
3026  public void disableService()
3027  {
3028    synchronized (sessionLock)
3029    {
3030      /*
3031      Stop the broker first in order to prevent the listener from
3032      reconnecting - see OPENDJ-457.
3033      */
3034      if (broker != null)
3035      {
3036        broker.stop();
3037      }
3038
3039      // Stop the listener thread
3040      if (listenerThread != null)
3041      {
3042        listenerThread.initiateShutdown();
3043        try
3044        {
3045          listenerThread.join();
3046        }
3047        catch (InterruptedException e)
3048        {
3049          // Give up waiting.
3050        }
3051        listenerThread = null;
3052      }
3053    }
3054  }
3055
3056  /**
3057   * Returns {@code true} if the listener thread is shutting down or has
3058   * shutdown.
3059   *
3060   * @return {@code true} if the listener thread is shutting down or has
3061   *         shutdown.
3062   */
3063  protected final boolean isListenerShuttingDown()
3064  {
3065    final DirectoryThread tmp = listenerThread;
3066    return tmp == null || tmp.isShutdownInitiated();
3067  }
3068
3069  /**
3070   * Restart the Replication service after a {@link #disableService()}.
3071   * <p>
3072   * The Replication Service will restart from the point indicated by the
3073   * {@link ServerState} that was given as a parameter to the
3074   * {@link #startPublishService()} at startup time.
3075   * <p>
3076   * If some data have changed in the repository during the period of time when
3077   * the Replication Service was disabled, this {@link ServerState} should
3078   * therefore be updated by the Replication Domain subclass before calling this
3079   * method. This method is not MT safe.
3080   */
3081  public void enableService()
3082  {
3083    synchronized (sessionLock)
3084    {
3085      broker.start();
3086      startListenService();
3087    }
3088  }
3089
3090  /**
3091   * Change some ReplicationDomain parameters.
3092   *
3093   * @param config
3094   *          The new configuration that this domain should now use.
3095   */
3096  protected void changeConfig(ReplicationDomainCfg config)
3097  {
3098    if (broker != null && broker.changeConfig(config))
3099    {
3100      restartService();
3101    }
3102  }
3103
3104  /**
3105   * Applies a configuration change to the attributes which should be included
3106   * in the ECL.
3107   *
3108   * @param includeAttributes
3109   *          attributes to be included with all change records.
3110   * @param includeAttributesForDeletes
3111   *          additional attributes to be included with delete change records.
3112   */
3113  public void changeConfig(Set<String> includeAttributes,
3114      Set<String> includeAttributesForDeletes)
3115  {
3116    final boolean attrsModified = setEclIncludes(
3117        getServerId(), includeAttributes, includeAttributesForDeletes);
3118    if (attrsModified && broker != null)
3119    {
3120      restartService();
3121    }
3122  }
3123
3124  private void restartService()
3125  {
3126    disableService();
3127    enableService();
3128  }
3129
3130  /**
3131   * This method should trigger an export of the replicated data.
3132   * to the provided outputStream.
3133   * When finished the outputStream should be flushed and closed.
3134   *
3135   * @param output               The OutputStream where the export should
3136   *                             be produced.
3137   * @throws DirectoryException  When needed.
3138   */
3139  protected abstract void exportBackend(OutputStream output)
3140           throws DirectoryException;
3141
3142  /**
3143   * This method should trigger an import of the replicated data.
3144   *
3145   * @param input                The InputStream from which
3146   *                             the import should be reading entries.
3147   *
3148   * @throws DirectoryException  When needed.
3149   */
3150  protected abstract void importBackend(InputStream input)
3151           throws DirectoryException;
3152
3153  /**
3154   * This method should return the total number of objects in the
3155   * replicated domain.
3156   * This count will be used for reporting.
3157   *
3158   * @throws DirectoryException when needed.
3159   *
3160   * @return The number of objects in the replication domain.
3161   */
3162  public abstract long countEntries() throws DirectoryException;
3163
3164
3165
3166  /**
3167   * This method should handle the processing of {@link UpdateMsg} receive from
3168   * remote replication entities.
3169   * <p>
3170   * This method will be called by a single thread and should therefore should
3171   * not be blocking.
3172   *
3173   * @param updateMsg
3174   *          The {@link UpdateMsg} that was received.
3175   * @return A boolean indicating if the processing is completed at return time.
3176   *         If <code> true </code> is returned, no further processing is
3177   *         necessary. If <code> false </code> is returned, the subclass should
3178   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
3179   *         update the ServerState When this processing is complete.
3180   */
3181  public abstract boolean processUpdate(UpdateMsg updateMsg);
3182
3183  /**
3184   * This method must be called after each call to
3185   * {@link #processUpdate(UpdateMsg)} when the processing of the
3186   * update is completed.
3187   * <p>
3188   * It is useful for implementation needing to process the update in an
3189   * asynchronous way or using several threads, but must be called even by
3190   * implementation doing it in a synchronous, single-threaded way.
3191   *
3192   * @param msg
3193   *          The UpdateMsg whose processing was completed.
3194   * @param replayErrorMsg
3195   *          if not null, this means an error occurred during the replay of
3196   *          this update, and this is the matching human readable message
3197   *          describing the problem.
3198   */
3199  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
3200  {
3201    broker.updateWindowAfterReplay();
3202
3203    /*
3204    Send an ack if it was requested and the group id is the same of the RS
3205    one. Only Safe Read mode makes sense in DS for returning an ack.
3206    */
3207    // Assured feature is supported starting from replication protocol V2
3208    if (msg.isAssured()
3209      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
3210    {
3211      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
3212      {
3213        if (broker.getRsGroupId() == getGroupId())
3214        {
3215          // Send the ack
3216          AckMsg ackMsg = new AckMsg(msg.getCSN());
3217          if (replayErrorMsg != null)
3218          {
3219            // Mark the error in the ack
3220            //   -> replay error occurred
3221            ackMsg.setHasReplayError(true);
3222            //   -> replay error occurred in our server
3223            ackMsg.setFailedServers(newArrayList(getServerId()));
3224          }
3225          broker.publish(ackMsg);
3226          if (replayErrorMsg != null)
3227          {
3228            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
3229          }
3230          else
3231          {
3232            assuredSrReceivedUpdatesAcked.incrementAndGet();
3233          }
3234        }
3235      }
3236      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
3237      {
3238        logger.error(ERR_DS_UNKNOWN_ASSURED_MODE, getServerId(), msg.getAssuredMode(), getBaseDN(), msg);
3239      }
3240        // Nothing to do in Assured safe data mode, only RS ack updates.
3241    }
3242
3243    incProcessedUpdates();
3244  }
3245
3246  /**
3247   * Prepare a message if it is to be sent in assured mode.
3248   * If the assured mode is enabled, this method should be called before
3249   * publish(UpdateMsg msg) method. This will configure the update accordingly
3250   * before it is sent and will prepare the mechanism that will block until the
3251   * matching ack is received. To wait for the ack after publish call, use
3252   * the waitForAckIfAssuredEnabled() method.
3253   * The expected typical usage in a service inheriting from this class is
3254   * the following sequence:
3255   * UpdateMsg msg = xxx;
3256   * prepareWaitForAckIfAssuredEnabled(msg);
3257   * publish(msg);
3258   * waitForAckIfAssuredEnabled(msg);
3259   *
3260   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
3261   * no effect if assured replication is disabled.
3262   * Note: this mechanism should not be used if using publish(byte[] msg)
3263   * version as usage of these methods is already hidden inside.
3264   *
3265   * @param msg The update message to be sent soon.
3266   */
3267  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
3268  {
3269    /*
3270     * If assured configured, set message accordingly to request an ack in the
3271     * right assured mode.
3272     * No ack requested for a RS with a different group id.
3273     * Assured replication supported for the same locality,
3274     * i.e: a topology working in the same geographical location).
3275     * If we are connected to a RS which is not in our locality,
3276     * no need to ask for an ack.
3277     */
3278    if (needsAck())
3279    {
3280      msg.setAssured(true);
3281      msg.setAssuredMode(getAssuredMode());
3282      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
3283      {
3284        msg.setSafeDataLevel(getAssuredSdLevel());
3285      }
3286
3287      // Add the assured message to the list of update that are waiting for acks
3288      waitingAckMsgs.put(msg.getCSN(), msg);
3289    }
3290  }
3291
3292  private boolean needsAck()
3293  {
3294    return isAssured() && broker.getRsGroupId() == getGroupId();
3295  }
3296
3297  /**
3298   * Wait for the processing of an assured message after it has been sent, if
3299   * assured replication is configured, otherwise, do nothing.
3300   * The prepareWaitForAckIfAssuredEnabled method should have been called
3301   * before, see its comment for the full picture.
3302   *
3303   * @param msg The UpdateMsg for which we are waiting for an ack.
3304   * @throws TimeoutException When the configured timeout occurs waiting for the
3305   * ack.
3306   */
3307  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
3308    throws TimeoutException
3309  {
3310    if (needsAck())
3311    {
3312      // Increment assured replication monitoring counters
3313      switch (getAssuredMode())
3314      {
3315        case SAFE_READ_MODE:
3316          assuredSrSentUpdates.incrementAndGet();
3317          break;
3318        case SAFE_DATA_MODE:
3319          assuredSdSentUpdates.incrementAndGet();
3320          break;
3321        default:
3322        // Should not happen
3323      }
3324    } else
3325    {
3326      // Not assured or bad group id, return immediately
3327      return;
3328    }
3329
3330    // Wait for the ack to be received, timing out if necessary
3331    long startTime = System.currentTimeMillis();
3332    synchronized (msg)
3333    {
3334      CSN csn = msg.getCSN();
3335      while (waitingAckMsgs.containsKey(csn))
3336      {
3337        try
3338        {
3339          /*
3340          WARNING: this timeout may be difficult to optimize: too low, it
3341          may use too much CPU, too high, it may penalize performance...
3342          */
3343          msg.wait(10);
3344        } catch (InterruptedException e)
3345        {
3346          if (logger.isTraceEnabled())
3347          {
3348            logger.trace("waitForAck method interrupted for replication " +
3349              "baseDN: " + getBaseDN());
3350          }
3351          break;
3352        }
3353        // Timeout ?
3354        if (System.currentTimeMillis() - startTime >= getAssuredTimeout())
3355        {
3356          /*
3357          Timeout occurred, be sure that ack is not being received and if so,
3358          remove the update from the wait list, log the timeout error and
3359          also update assured monitoring counters
3360          */
3361          final UpdateMsg update = waitingAckMsgs.remove(csn);
3362          if (update == null)
3363          {
3364            // Ack received just before timeout limit: we can exit
3365            break;
3366          }
3367
3368          // No luck, this is a real timeout
3369          // Increment assured replication monitoring counters
3370          switch (msg.getAssuredMode())
3371          {
3372          case SAFE_READ_MODE:
3373            assuredSrNotAcknowledgedUpdates.incrementAndGet();
3374            assuredSrTimeoutUpdates.incrementAndGet();
3375            // Increment number of errors for our RS
3376            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
3377                broker.getRsServerId());
3378            break;
3379          case SAFE_DATA_MODE:
3380            assuredSdTimeoutUpdates.incrementAndGet();
3381            // Increment number of errors for our RS
3382            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
3383                broker.getRsServerId());
3384            break;
3385          default:
3386            // Should not happen
3387          }
3388
3389          throw new TimeoutException("No ack received for message csn: " + csn
3390              + " and replication domain: " + getBaseDN() + " after "
3391              + getAssuredTimeout() + " ms.");
3392        }
3393      }
3394    }
3395  }
3396
3397  /**
3398   * Publish an {@link UpdateMsg} to the Replication Service.
3399   * <p>
3400   * The Replication Service will handle the delivery of this {@link UpdateMsg}
3401   * to all the participants of this Replication Domain. These members will be
3402   * receive this {@link UpdateMsg} through a call of the
3403   * {@link #processUpdate(UpdateMsg)} message.
3404   *
3405   * @param msg The UpdateMsg that should be published.
3406   */
3407  public void publish(UpdateMsg msg)
3408  {
3409    broker.publish(msg);
3410    if (msg.contributesToDomainState())
3411    {
3412      state.update(msg.getCSN());
3413    }
3414    numSentUpdates.incrementAndGet();
3415  }
3416
3417  /**
3418   * Publishes a replica offline message if all pending changes for current
3419   * replica have been sent out.
3420   */
3421  public void publishReplicaOfflineMsg()
3422  {
3423    // Here to be overridden
3424  }
3425
3426  /**
3427   * This method should return the generationID to use for this
3428   * ReplicationDomain.
3429   * This method can be called at any time after the ReplicationDomain
3430   * has been started.
3431   *
3432   * @return The GenerationID.
3433   */
3434  public long getGenerationID()
3435  {
3436    return generationId;
3437  }
3438
3439  /**
3440   * Sets the generationId for this replication domain.
3441   *
3442   * @param generationId
3443   *          the generationId to set
3444   */
3445  public void setGenerationID(long generationId)
3446  {
3447    this.generationId = generationId;
3448  }
3449
3450  /**
3451   * Subclasses should use this method to add additional monitoring information
3452   * in the ReplicationDomain.
3453   *
3454   * @param monitorData where to additional monitoring attributes
3455   */
3456  public void addAdditionalMonitoring(MonitorData monitorData)
3457  {
3458  }
3459
3460  /**
3461   * Returns the Import/Export context associated to this ReplicationDomain.
3462   *
3463   * @return the Import/Export context associated to this ReplicationDomain
3464   */
3465  protected ImportExportContext getImportExportContext()
3466  {
3467    return importExportContext.get();
3468  }
3469
3470  /**
3471   * Returns the local address of this replication domain, or the empty string
3472   * if it is not yet connected.
3473   *
3474   * @return The local address.
3475   */
3476  String getLocalUrl()
3477  {
3478    final ReplicationBroker tmp = broker;
3479    return tmp != null ? tmp.getLocalUrl() : "";
3480  }
3481
3482  /**
3483   * Set the attributes configured on a server to be included in the ECL.
3484   *
3485   * @param serverId
3486   *          Server where these attributes are configured.
3487   * @param includeAttributes
3488   *          Attributes to be included with all change records, may include
3489   *          wild-cards.
3490   * @param includeAttributesForDeletes
3491   *          Additional attributes to be included with delete change records,
3492   *          may include wild-cards.
3493   * @return {@code true} if the set of attributes was modified.
3494   */
3495  public boolean setEclIncludes(int serverId,
3496      Set<String> includeAttributes,
3497      Set<String> includeAttributesForDeletes)
3498  {
3499    ECLIncludes current;
3500    ECLIncludes updated;
3501    do
3502    {
3503      current = this.eclIncludes.get();
3504      updated = current.addIncludedAttributes(
3505          serverId, includeAttributes, includeAttributesForDeletes);
3506    }
3507    while (!this.eclIncludes.compareAndSet(current, updated));
3508    return current != updated;
3509  }
3510
3511
3512
3513  /**
3514   * Get the attributes to include in each change for the ECL.
3515   *
3516   * @return The attributes to include in each change for the ECL.
3517   */
3518  public Set<String> getEclIncludes()
3519  {
3520    return eclIncludes.get().includedAttrsAllServers;
3521  }
3522
3523
3524
3525  /**
3526   * Get the attributes to include in each delete change for the ECL.
3527   *
3528   * @return The attributes to include in each delete change for the ECL.
3529   */
3530  public Set<String> getEclIncludesForDeletes()
3531  {
3532    return eclIncludes.get().includedAttrsForDeletesAllServers;
3533  }
3534
3535
3536
3537  /**
3538   * Get the attributes to include in each change for the ECL for a given
3539   * serverId.
3540   *
3541   * @param serverId
3542   *          The serverId for which we want the include attributes.
3543   * @return The attributes.
3544   */
3545  Set<String> getEclIncludes(int serverId)
3546  {
3547    return eclIncludes.get().includedAttrsByServer.get(serverId);
3548  }
3549
3550
3551
3552  /**
3553   * Get the attributes to include in each change for the ECL for a given
3554   * serverId.
3555   *
3556   * @param serverId
3557   *          The serverId for which we want the include attributes.
3558   * @return The attributes.
3559   */
3560  Set<String> getEclIncludesForDeletes(int serverId)
3561  {
3562    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
3563  }
3564
3565  /**
3566   * Returns the CSN of the last Change that was fully processed by this
3567   * ReplicationDomain.
3568   *
3569   * @return The CSN of the last Change that was fully processed by this
3570   *         ReplicationDomain.
3571   */
3572  public CSN getLastLocalChange()
3573  {
3574    return state.getCSN(getServerId());
3575  }
3576
3577  /**
3578   * Gets and stores the assured replication configuration parameters. Returns a
3579   * boolean indicating if the passed configuration has changed compared to
3580   * previous values and the changes require a reconnection.
3581   *
3582   * @param config
3583   *          The configuration object
3584   * @param allowReconnection
3585   *          Tells if one must reconnect if significant changes occurred
3586   */
3587  protected void readAssuredConfig(ReplicationDomainCfg config,
3588      boolean allowReconnection)
3589  {
3590    // Disconnect if required: changing configuration values before
3591    // disconnection would make assured replication used immediately and
3592    // disconnection could cause some timeouts error.
3593    if (needReconnection(config) && allowReconnection)
3594    {
3595      disableService();
3596
3597      assuredConfig = config;
3598
3599      enableService();
3600    }
3601  }
3602
3603  private boolean needReconnection(ReplicationDomainCfg cfg)
3604  {
3605    final AssuredMode assuredMode = getAssuredMode();
3606    switch (cfg.getAssuredType())
3607    {
3608    case NOT_ASSURED:
3609      if (isAssured())
3610      {
3611        return true;
3612      }
3613      break;
3614    case SAFE_DATA:
3615      if (!isAssured() || assuredMode == SAFE_READ_MODE)
3616      {
3617        return true;
3618      }
3619      break;
3620    case SAFE_READ:
3621      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
3622      {
3623        return true;
3624      }
3625      break;
3626    }
3627
3628    return isAssured()
3629        && assuredMode == SAFE_DATA_MODE
3630        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
3631  }
3632
3633  /** {@inheritDoc} */
3634  @Override
3635  public String toString()
3636  {
3637    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
3638  }
3639}