001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2017 ForgeRock AS.
016 */
017package org.opends.server.replication.plugin;
018
019import static org.forgerock.opendj.ldap.ResultCode.*;
020import static org.opends.messages.ReplicationMessages.*;
021import static org.opends.messages.ToolMessages.*;
022import static org.opends.server.protocols.internal.InternalClientConnection.*;
023import static org.opends.server.protocols.internal.Requests.*;
024import static org.opends.server.replication.plugin.EntryHistorical.*;
025import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
026import static org.opends.server.schema.SchemaConstants.EXTMR_HISTORICAL_CSN_RANGE_OID;
027import static org.opends.server.util.CollectionUtils.*;
028import static org.opends.server.util.ServerConstants.*;
029import static org.opends.server.util.StaticUtils.*;
030
031import java.io.File;
032import java.io.InputStream;
033import java.io.OutputStream;
034import java.io.StringReader;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Date;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.LinkedHashMap;
043import java.util.LinkedHashSet;
044import java.util.LinkedList;
045import java.util.List;
046import java.util.Map;
047import java.util.NoSuchElementException;
048import java.util.Set;
049import java.util.SortedMap;
050import java.util.StringTokenizer;
051import java.util.TreeMap;
052import java.util.concurrent.BlockingQueue;
053import java.util.concurrent.TimeUnit;
054import java.util.concurrent.TimeoutException;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.atomic.AtomicInteger;
057import java.util.concurrent.atomic.AtomicReference;
058import java.util.zip.DataFormatException;
059
060import org.forgerock.i18n.LocalizableMessage;
061import org.forgerock.i18n.LocalizedIllegalArgumentException;
062import org.forgerock.i18n.slf4j.LocalizedLogger;
063import org.forgerock.opendj.adapter.server3x.Converters;
064import org.forgerock.opendj.config.server.ConfigChangeResult;
065import org.forgerock.opendj.config.server.ConfigException;
066import org.forgerock.opendj.config.server.ConfigurationChangeListener;
067import org.forgerock.opendj.ldap.AVA;
068import org.forgerock.opendj.ldap.AttributeDescription;
069import org.forgerock.opendj.ldap.ByteString;
070import org.forgerock.opendj.ldap.DN;
071import org.forgerock.opendj.ldap.DecodeException;
072import org.forgerock.opendj.ldap.ModificationType;
073import org.forgerock.opendj.ldap.RDN;
074import org.forgerock.opendj.ldap.ResultCode;
075import org.forgerock.opendj.ldap.SearchScope;
076import org.forgerock.opendj.ldap.schema.AttributeType;
077import org.forgerock.opendj.ldap.schema.ObjectClass;
078import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.IsolationPolicy;
079import org.forgerock.opendj.server.config.server.ExternalChangelogDomainCfg;
080import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
081import org.opends.server.api.AlertGenerator;
082import org.opends.server.api.Backend;
083import org.opends.server.api.Backend.BackendOperation;
084import org.opends.server.api.BackendInitializationListener;
085import org.opends.server.api.DirectoryThread;
086import org.opends.server.api.MonitorData;
087import org.opends.server.api.ServerShutdownListener;
088import org.opends.server.api.SynchronizationProvider;
089import org.opends.server.backends.task.Task;
090import org.opends.server.config.ConfigConstants;
091import org.opends.server.config.ConfigurationHandler;
092import org.opends.server.controls.PagedResultsControl;
093import org.opends.server.core.AddOperation;
094import org.opends.server.core.DeleteOperation;
095import org.opends.server.core.DirectoryServer;
096import org.opends.server.core.LockFileManager;
097import org.opends.server.core.ModifyDNOperation;
098import org.opends.server.core.ModifyDNOperationBasis;
099import org.opends.server.core.ModifyOperation;
100import org.opends.server.core.ModifyOperationBasis;
101import org.opends.server.protocols.internal.InternalClientConnection;
102import org.opends.server.protocols.internal.InternalSearchListener;
103import org.opends.server.protocols.internal.InternalSearchOperation;
104import org.opends.server.protocols.internal.Requests;
105import org.opends.server.protocols.internal.SearchRequest;
106import org.opends.server.protocols.ldap.LDAPAttribute;
107import org.opends.server.protocols.ldap.LDAPControl;
108import org.opends.server.protocols.ldap.LDAPFilter;
109import org.opends.server.protocols.ldap.LDAPModification;
110import org.opends.server.replication.common.CSN;
111import org.opends.server.replication.common.ServerState;
112import org.opends.server.replication.common.ServerStatus;
113import org.opends.server.replication.common.StatusMachineEvent;
114import org.opends.server.replication.protocol.AddContext;
115import org.opends.server.replication.protocol.AddMsg;
116import org.opends.server.replication.protocol.DeleteContext;
117import org.opends.server.replication.protocol.DeleteMsg;
118import org.opends.server.replication.protocol.LDAPUpdateMsg;
119import org.opends.server.replication.protocol.ModifyContext;
120import org.opends.server.replication.protocol.ModifyDNMsg;
121import org.opends.server.replication.protocol.ModifyDnContext;
122import org.opends.server.replication.protocol.ModifyMsg;
123import org.opends.server.replication.protocol.OperationContext;
124import org.opends.server.replication.protocol.RoutableMsg;
125import org.opends.server.replication.protocol.UpdateMsg;
126import org.opends.server.replication.service.DSRSShutdownSync;
127import org.opends.server.replication.service.ReplicationBroker;
128import org.opends.server.replication.service.ReplicationDomain;
129import org.opends.server.tasks.PurgeConflictsHistoricalTask;
130import org.opends.server.tasks.TaskUtils;
131import org.opends.server.types.AdditionalLogItem;
132import org.opends.server.types.Attribute;
133import org.opends.server.types.AttributeBuilder;
134import org.opends.server.types.Attributes;
135import org.opends.server.types.Control;
136import org.opends.server.types.DirectoryException;
137import org.opends.server.types.Entry;
138import org.opends.server.types.ExistingFileBehavior;
139import org.opends.server.types.LDAPException;
140import org.opends.server.types.LDIFExportConfig;
141import org.opends.server.types.LDIFImportConfig;
142import org.opends.server.types.Modification;
143import org.opends.server.types.Operation;
144import org.opends.server.types.OperationType;
145import org.opends.server.types.RawModification;
146import org.opends.server.types.Schema;
147import org.opends.server.types.SearchFilter;
148import org.opends.server.types.SearchResultEntry;
149import org.opends.server.types.SearchResultReference;
150import org.opends.server.types.SynchronizationProviderResult;
151import org.opends.server.types.operation.PluginOperation;
152import org.opends.server.types.operation.PostOperationAddOperation;
153import org.opends.server.types.operation.PostOperationDeleteOperation;
154import org.opends.server.types.operation.PostOperationModifyDNOperation;
155import org.opends.server.types.operation.PostOperationModifyOperation;
156import org.opends.server.types.operation.PostOperationOperation;
157import org.opends.server.types.operation.PreOperationAddOperation;
158import org.opends.server.types.operation.PreOperationDeleteOperation;
159import org.opends.server.types.operation.PreOperationModifyDNOperation;
160import org.opends.server.types.operation.PreOperationModifyOperation;
161import org.opends.server.util.LDIFReader;
162import org.opends.server.util.TimeThread;
163import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
164
165/**
166 *  This class implements the bulk part of the Directory Server side
167 *  of the replication code.
168 *  It contains the root method for publishing a change,
169 *  processing a change received from the replicationServer service,
170 *  handle conflict resolution,
171 *  handle protocol messages from the replicationServer.
172 * <p>
173 * FIXME Move this class to org.opends.server.replication.service
174 * or the equivalent package once this code is moved to a maven module.
175 */
176public final class LDAPReplicationDomain extends ReplicationDomain
177       implements ConfigurationChangeListener<ReplicationDomainCfg>,
178                  AlertGenerator, BackendInitializationListener, ServerShutdownListener
179{
180  /**
181   * Set of attributes that will return all the user attributes and the
182   * replication related operational attributes when used in a search operation.
183   */
184  private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS =
185      newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*");
186
187  /**
188   * Initializing replication for the domain initiates backend finalization/initialization
189   * This flag prevents the Replication Domain to disable/enable itself when
190   * it is the event initiator
191   */
192  private boolean ignoreBackendInitializationEvent;
193
194  private volatile boolean  serverShutdownRequested;
195
196  @Override
197  public String getShutdownListenerName() {
198    return "LDAPReplicationDomain " + getBaseDN();
199  }
200
201  @Override
202  public void processServerShutdown(LocalizableMessage reason) {
203    serverShutdownRequested = true;
204  }
205
206
207  /**
208   * This class is used in the session establishment phase
209   * when no Replication Server with all the local changes has been found
210   * and we therefore need to recover them.
211   * A search is then performed on the database using this
212   * internalSearchListener.
213   */
214  private class ScanSearchListener implements InternalSearchListener
215  {
216    private final CSN startCSN;
217    private final CSN endCSN;
218
219    public ScanSearchListener(CSN startCSN, CSN endCSN)
220    {
221      this.startCSN = startCSN;
222      this.endCSN = endCSN;
223    }
224
225    @Override
226    public void handleInternalSearchEntry(
227        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
228        throws DirectoryException
229    {
230      // Build the list of Operations that happened on this entry after startCSN
231      // and before endCSN and add them to the replayOperations list
232      Iterable<FakeOperation> updates = generateFakeOperations(searchEntry, getServerId());
233
234      for (FakeOperation op : updates)
235      {
236        CSN csn = op.getCSN();
237        if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN))
238        {
239          synchronized (replayOperations)
240          {
241            replayOperations.put(csn, op);
242          }
243        }
244      }
245    }
246
247    @Override
248    public void handleInternalSearchReference(
249        InternalSearchOperation searchOperation,
250        SearchResultReference searchReference) throws DirectoryException
251    {
252       // Nothing to do.
253    }
254  }
255
256  @Override
257  public void performBackendPreInitializationProcessing(Backend<?> backend) {
258    // Nothing to do
259  }
260
261  @Override
262  public void performBackendPostFinalizationProcessing(Backend<?> backend) {
263    // Nothing to do
264  }
265
266  @Override
267  public void performBackendPostInitializationProcessing(Backend<?> backend) {
268    if (!ignoreBackendInitializationEvent
269            && getBackend().getBackendID().equals(backend.getBackendID())) {
270      enable();
271    }
272  }
273
274  @Override
275  public void performBackendPreFinalizationProcessing(Backend<?> backend) {
276    // Do not disable itself during a shutdown
277    // And ignore the event if this replica is the event trigger (e.g. importing).
278    if (!ignoreBackendInitializationEvent
279            && !serverShutdownRequested
280            && getBackend().getBackendID().equals(backend.getBackendID())) {
281      disable();
282    }
283  }
284
285  /** The fully-qualified name of this class. */
286  private static final String CLASS_NAME = LDAPReplicationDomain.class.getName();
287
288  /**
289   * The attribute used to mark conflicting entries.
290   * The value of this attribute should be the dn that this entry was
291   * supposed to have when it was marked as conflicting.
292   */
293  public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
294  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
295
296  private final DSRSShutdownSync dsrsShutdownSync;
297  /**
298   * The update to replay message queue where the listener thread is going to
299   * push incoming update messages.
300   */
301  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
302  /** The number of naming conflicts successfully resolved. */
303  private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
304  /** The number of modify conflicts successfully resolved. */
305  private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
306  /** The number of unresolved naming conflicts. */
307  private final AtomicInteger numUnresolvedNamingConflicts =
308      new AtomicInteger();
309  /** The number of updates replayed successfully by the replication. */
310  private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger();
311
312  private final PersistentServerState state;
313  private volatile boolean generationIdSavedStatus;
314
315  /**
316   * This object is used to store the list of update currently being done on the local database.
317   * It is useful to make sure that the local operations are sent in a correct order to the
318   * replication server and that the ServerState is not updated too early.
319   */
320  private final PendingChanges pendingChanges;
321  private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null);
322
323  /**
324   * It contain the updates that were done on other servers, transmitted by the
325   * replication server and that are currently replayed.
326   * <p>
327   * It is useful to make sure that dependencies between operations are
328   * correctly fulfilled and to make sure that the ServerState is not updated
329   * too early.
330   */
331  private final RemotePendingChanges remotePendingChanges;
332  private boolean solveConflictFlag = true;
333
334  private final InternalClientConnection conn = getRootConnection();
335  private final AtomicBoolean shutdown = new AtomicBoolean();
336  private volatile boolean disabled;
337
338  /**
339   * This list is used to temporary store operations that needs to be replayed
340   * at session establishment time.
341   */
342  private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>();
343
344  private ExternalChangelogDomain eclDomain;
345
346  /** A boolean indicating if the thread used to save the persistentServerState is terminated. */
347  private volatile boolean done = true;
348
349  private final ServerStateFlush flushThread;
350
351  /** The attribute name used to store the generation id in the backend. */
352  private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id";
353  /** The attribute name used to store the fractional include configuration in the backend. */
354  static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include";
355  /** The attribute name used to store the fractional exclude configuration in the backend. */
356  static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude";
357
358  /**
359   * Fractional replication variables.
360   */
361
362  /** Holds the fractional configuration for this domain, if any. */
363  private final FractionalConfig fractionalConfig;
364
365  /** The list of attributes that cannot be used in fractional replication configuration. */
366  private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[]
367  {
368    "objectClass",
369    "2.5.4.0" // objectClass OID
370  };
371
372  /**
373   * When true, this flag is used to force the domain status to be put in bad
374   * data set just after the connection to the replication server.
375   * This must be used when fractional replication is enabled with a
376   * configuration different from the previous one (or at the very first
377   * fractional usage time) : after connection, a ChangeStatusMsg is sent
378   * requesting the bad data set status. Then none of the update messages
379   * received from the replication server are taken into account until the
380   * backend is synchronized with brand new data set compliant with the new
381   * fractional configuration (i.e with compliant fractional configuration in
382   * domain root entry).
383   */
384  private boolean forceBadDataSet;
385
386  /**
387   * The message id to be used when an import is stopped with error by
388   * the fractional replication ldif import plugin.
389   */
390  private int importErrorMessageId = -1;
391  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */
392  static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1;
393  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */
394  static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2;
395
396  /*
397   * Definitions for the return codes of the
398   * fractionalFilterOperation(PreOperationModifyOperation
399   *  modifyOperation, boolean performFiltering) method
400   */
401  /**
402   * The operation contains attributes subject to fractional filtering according
403   * to the fractional configuration.
404   */
405  private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
406  /**
407   * The operation contains no attributes subject to fractional filtering
408   * according to the fractional configuration.
409   */
410  private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
411  /** The operation should become a no-op. */
412  private static final int FRACTIONAL_BECOME_NO_OP = 3;
413
414  /**
415   * The last CSN purged in this domain. Allows to have a continuous purging
416   * process from one purge processing (task run) to the next one. Values 0 when
417   * the server starts.
418   */
419  private CSN lastCSNPurgedFromHist = CSN.MIN_VALUE;
420
421  /**
422   * The thread that periodically saves the ServerState of this
423   * LDAPReplicationDomain in the database.
424   */
425  private class ServerStateFlush extends DirectoryThread
426  {
427    protected ServerStateFlush()
428    {
429      super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\"");
430    }
431
432    @Override
433    public void run()
434    {
435      done = false;
436
437      while (!isShutdownInitiated())
438      {
439        try
440        {
441          synchronized (this)
442          {
443            wait(1000);
444            if (!disabled && !ieRunning())
445            {
446              state.save();
447            }
448          }
449        }
450        catch (InterruptedException e)
451        {
452          // Thread interrupted: check for shutdown.
453          Thread.currentThread().interrupt();
454        }
455      }
456      state.save();
457
458      done = true;
459    }
460  }
461
462  /**
463   * The thread that is responsible to update the RS to which this domain is
464   * connected in case it is late and there is no RS which is up to date.
465   */
466  private class RSUpdater extends DirectoryThread
467  {
468    private final CSN startCSN;
469
470    protected RSUpdater(CSN replServerMaxCSN)
471    {
472      super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\"");
473      this.startCSN = replServerMaxCSN;
474    }
475
476    @Override
477    public void run()
478    {
479      // Replication server is missing some of our changes:
480      // let's send them to him.
481      logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES);
482
483      /*
484       * Get all the changes that have not been seen by this
485       * replication server and publish them.
486       */
487      try
488      {
489        if (buildAndPublishMissingChanges(startCSN, broker))
490        {
491          logger.trace(DEBUG_CHANGES_SENT);
492          synchronized(replayOperations)
493          {
494            replayOperations.clear();
495          }
496        }
497        else
498        {
499          /*
500           * An error happened trying to search for the updates
501           * This server will start accepting again new updates but
502           * some inconsistencies will stay between servers.
503           * Log an error for the repair tool
504           * that will need to re-synchronize the servers.
505           */
506          logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
507        }
508      }
509      catch (Exception e)
510      {
511        /*
512         * An error happened trying to search for the updates
513         * This server will start accepting again new updates but
514         * some inconsistencies will stay between servers.
515         * Log an error for the repair tool
516         * that will need to re-synchronize the servers.
517         */
518        logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
519      }
520      finally
521      {
522        broker.setRecoveryRequired(false);
523        // RSUpdater thread has finished its work, let's remove it from memory
524        // so another RSUpdater thread can be started if needed.
525        rsUpdater.compareAndSet(this, null);
526      }
527    }
528  }
529
530  /**
531   * Creates a new ReplicationDomain using configuration from configEntry.
532   *
533   * @param configuration    The configuration of this ReplicationDomain.
534   * @param updateToReplayQueue The queue for update messages to replay.
535   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
536   * @throws ConfigException In case of invalid configuration.
537   */
538  LDAPReplicationDomain(ReplicationDomainCfg configuration,
539      BlockingQueue<UpdateToReplay> updateToReplayQueue,
540      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
541  {
542    super(configuration, -1);
543
544    this.updateToReplayQueue = updateToReplayQueue;
545    this.dsrsShutdownSync = dsrsShutdownSync;
546
547    // Get assured configuration
548    readAssuredConfig(configuration, false);
549
550    // Get fractional configuration
551    fractionalConfig = new FractionalConfig(getBaseDN());
552    readFractionalConfig(configuration, false);
553    storeECLConfiguration(configuration);
554    solveConflictFlag = isSolveConflict(configuration);
555
556    Backend<?> backend = getBackend();
557    if (backend == null)
558    {
559      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN()));
560    }
561
562    try
563    {
564      generationId = loadGenerationId();
565    }
566    catch (DirectoryException e)
567    {
568      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
569    }
570
571    /*
572     * Create a new Persistent Server State that will be used to store
573     * the last CSN seen from all LDAP servers in the topology.
574     */
575    state = new PersistentServerState(getBaseDN(), getServerId(),
576        getServerState());
577    flushThread = new ServerStateFlush();
578
579    /*
580     * CSNGenerator is used to create new unique CSNs for each operation done on
581     * this replication domain.
582     *
583     * The generator time is adjusted to the time of the last CSN received from
584     * remote other servers.
585     */
586    pendingChanges = new PendingChanges(getGenerator(), this);
587    remotePendingChanges = new RemotePendingChanges(getServerState());
588
589    // listen for changes on the configuration
590    configuration.addChangeListener(this);
591
592    // register as an AlertGenerator
593    DirectoryServer.registerAlertGenerator(this);
594
595    DirectoryServer.registerBackendInitializationListener(this);
596    DirectoryServer.registerShutdownListener(this);
597
598    startPublishService();
599  }
600
601  /**
602   * Modify conflicts are solved for all suffixes but the schema suffix because
603   * we don't want to store extra information in the schema ldif files. This has
604   * no negative impact because the changes on schema should not produce
605   * conflicts.
606   */
607  private boolean isSolveConflict(ReplicationDomainCfg cfg)
608  {
609    return !getBaseDN().equals(DirectoryServer.getSchemaDN())
610        && cfg.isSolveConflicts();
611  }
612
613  /**
614   * Sets the error message id to be used when online import is stopped with
615   * error by the fractional replication ldif import plugin.
616   * @param importErrorMessageId The message to use.
617   */
618  void setImportErrorMessageId(int importErrorMessageId)
619  {
620    this.importErrorMessageId = importErrorMessageId;
621  }
622
623  /**
624   * This flag is used by the fractional replication ldif import plugin to stop
625   * the (online) import process if a fractional configuration inconsistency is
626   * detected by it.
627   *
628   * @return true if the online import currently in progress should continue,
629   *         false otherwise.
630   */
631  private boolean isFollowImport()
632  {
633    return importErrorMessageId == -1;
634  }
635
636  /**
637   * Gets and stores the fractional replication configuration parameters.
638   * @param configuration The configuration object
639   * @param allowReconnection Tells if one must reconnect if significant changes
640   *        occurred
641   */
642  private void readFractionalConfig(ReplicationDomainCfg configuration,
643    boolean allowReconnection)
644  {
645    // Read the configuration entry
646    FractionalConfig newFractionalConfig;
647    try
648    {
649      newFractionalConfig = FractionalConfig.toFractionalConfig(configuration);
650    }
651    catch(ConfigException e)
652    {
653      // Should not happen as normally already called without problem in
654      // isConfigurationChangeAcceptable or isConfigurationAcceptable
655      // if we come up to this method
656      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
657      return;
658    }
659
660    /**
661     * Is there any change in fractional configuration ?
662     */
663
664    // Compute current configuration
665    boolean needReconnection;
666     try
667    {
668      needReconnection = !FractionalConfig.
669        isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig);
670    }
671    catch  (ConfigException e)
672    {
673      // Should not happen
674      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
675      return;
676    }
677
678    // Disable service if configuration changed
679    final boolean needRestart = needReconnection && allowReconnection;
680    if (needRestart)
681    {
682      disableService();
683    }
684    // Set new configuration
685    int newFractionalMode = newFractionalConfig.fractionalConfigToInt();
686    fractionalConfig.setFractional(newFractionalMode !=
687      FractionalConfig.NOT_FRACTIONAL);
688    if (fractionalConfig.isFractional())
689    {
690      // Set new fractional configuration values
691      fractionalConfig.setFractionalExclusive(
692          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
693      fractionalConfig.setFractionalSpecificClassesAttributes(
694        newFractionalConfig.getFractionalSpecificClassesAttributes());
695      fractionalConfig.setFractionalAllClassesAttributes(
696        newFractionalConfig.fractionalAllClassesAttributes);
697    } else
698    {
699      // Reset default values
700      fractionalConfig.setFractionalExclusive(true);
701      fractionalConfig.setFractionalSpecificClassesAttributes(
702        new HashMap<String, Set<String>>());
703      fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>());
704    }
705
706    // Reconnect if required
707    if (needRestart)
708    {
709      enableService();
710    }
711  }
712
713  /**
714   * Return true if the fractional configuration stored in the domain root
715   * entry of the backend is equivalent to the fractional configuration stored
716   * in the local variables.
717   */
718  private boolean isBackendFractionalConfigConsistent()
719  {
720    // Read config stored in domain root entry
721    if (logger.isTraceEnabled())
722    {
723      logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN());
724    }
725
726    // Search the domain root entry that is used to save the generation id
727    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
728        .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE);
729    InternalSearchOperation search = conn.processSearch(request);
730
731    if (search.getResultCode() != ResultCode.SUCCESS
732        && search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
733    {
734      String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
735      logger.error(ERR_SEARCHING_GENERATION_ID, getBaseDN(), errorMsg);
736      return false;
737    }
738
739    SearchResultEntry resultEntry = findReplicationSearchResultEntry(search);
740    if (resultEntry == null)
741    {
742      /*
743       * The backend is probably empty: if there is some fractional
744       * configuration in memory, we do not let the domain being connected,
745       * otherwise, it's ok
746       */
747      return !fractionalConfig.isFractional();
748    }
749
750    // Now extract fractional configuration if any
751    Iterator<ByteString> exclIt = getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE);
752    Iterator<ByteString> inclIt = getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE);
753
754    // Compare backend and local fractional configuration
755    return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt);
756  }
757
758  private SearchResultEntry findReplicationSearchResultEntry(
759      InternalSearchOperation searchOperation)
760  {
761    final SearchResultEntry resultEntry = getFirstResult(searchOperation);
762    if (resultEntry != null)
763    {
764      AttributeType synchronizationGenIDType = DirectoryServer.getSchema().getAttributeType(REPLICATION_GENERATION_ID);
765      List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType);
766      if (!attrs.isEmpty())
767      {
768        Attribute attr = attrs.get(0);
769        if (attr.size() == 1)
770        {
771          return resultEntry;
772        }
773        if (attr.size() > 1)
774        {
775          String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
776          logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
777        }
778      }
779    }
780    return null;
781  }
782
783  private Iterator<ByteString> getAttributeValueIterator(SearchResultEntry resultEntry, String attrName)
784  {
785    AttributeType attrType = DirectoryServer.getSchema().getAttributeType(attrName);
786    List<Attribute> exclAttrs = resultEntry.getAttribute(attrType);
787    if (!exclAttrs.isEmpty())
788    {
789      Attribute exclAttr = exclAttrs.get(0);
790      if (exclAttr != null)
791      {
792        return exclAttr.iterator();
793      }
794    }
795    return null;
796  }
797
798  /**
799   * Return true if the fractional configuration passed as fractional
800   * configuration attribute values is equivalent to the fractional
801   * configuration stored in the local variables.
802   * @param fractionalConfig The local fractional configuration
803   * @param exclIt Fractional exclude mode configuration attribute values to
804   * analyze.
805   * @param inclIt Fractional include mode configuration attribute values to
806   * analyze.
807   * @return True if the fractional configuration passed as fractional
808   * configuration attribute values is equivalent to the fractional
809   * configuration stored in the local variables.
810   */
811  static boolean isFractionalConfigConsistent(
812      FractionalConfig fractionalConfig, Iterator<ByteString> exclIt, Iterator<ByteString> inclIt)
813  {
814    // Parse fractional configuration stored in passed fractional configuration attributes values
815    Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>();
816    Set<String> storedFractionalAllClassesAttributes = new HashSet<>();
817
818    int storedFractionalMode;
819    try
820    {
821      storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt,
822        inclIt, storedFractionalSpecificClassesAttributes,
823        storedFractionalAllClassesAttributes);
824    } catch (ConfigException e)
825    {
826      // Should not happen as configuration in domain root entry is flushed
827      // from valid configuration in local variables
828      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
829      return false;
830    }
831
832    FractionalConfig storedFractionalConfig = new FractionalConfig(
833      fractionalConfig.getBaseDn());
834    storedFractionalConfig.setFractional(storedFractionalMode !=
835      FractionalConfig.NOT_FRACTIONAL);
836    // Set stored fractional configuration values
837    if (storedFractionalConfig.isFractional())
838    {
839      storedFractionalConfig.setFractionalExclusive(
840          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
841    }
842    storedFractionalConfig.setFractionalSpecificClassesAttributes(
843      storedFractionalSpecificClassesAttributes);
844    storedFractionalConfig.setFractionalAllClassesAttributes(
845      storedFractionalAllClassesAttributes);
846
847    /*
848     * Compare configuration stored in passed fractional configuration
849     * attributes with local variable one
850     */
851    try
852    {
853      return FractionalConfig.
854        isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig);
855    } catch (ConfigException e)
856    {
857      // Should not happen as configuration in domain root entry is flushed
858      // from valid configuration in local variables so both should have already
859      // been checked
860      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
861      return false;
862    }
863  }
864
865  /**
866   * Compare 2 attribute collections and returns true if they are equivalent.
867   *
868   * @param attributes1
869   *          First attribute collection to compare.
870   * @param attributes2
871   *          Second attribute collection to compare.
872   * @return True if both attribute collection are equivalent.
873   * @throws ConfigException
874   *           If some attributes could not be retrieved from the schema.
875   */
876  private static boolean areAttributesEquivalent(
877      Collection<String> attributes1, Collection<String> attributes2)
878      throws ConfigException
879  {
880    // Compare all classes attributes
881    if (attributes1.size() != attributes2.size())
882    {
883      return false;
884    }
885
886    // Check consistency of all classes attributes
887    Schema schema = DirectoryServer.getSchema();
888    /*
889     * For each attribute in attributes1, check there is the matching
890     * one in attributes2.
891     */
892    for (String attrName1 : attributes1)
893    {
894      // Get attribute from attributes1
895      AttributeType attributeType1 = schema.getAttributeType(attrName1);
896      if (attributeType1.isPlaceHolder())
897      {
898        throw new ConfigException(
899          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1));
900      }
901      // Look for matching one in attributes2
902      boolean foundAttribute = false;
903      for (String attrName2 : attributes2)
904      {
905        AttributeType attributeType2 = schema.getAttributeType(attrName2);
906        if (attributeType2.isPlaceHolder())
907        {
908          throw new ConfigException(
909            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2));
910        }
911        if (attributeType1.equals(attributeType2))
912        {
913          foundAttribute = true;
914          break;
915        }
916      }
917      // Found matching attribute ?
918      if (!foundAttribute)
919      {
920        return false;
921      }
922    }
923
924    return true;
925  }
926
927  /**
928   * Check that the passed fractional configuration is acceptable
929   * regarding configuration syntax, schema constraints...
930   * Throws an exception if the configuration is not acceptable.
931   * @param configuration The configuration to analyze.
932   * @throws org.opends.server.config.ConfigException if the configuration is
933   * not acceptable.
934   */
935  private static void isFractionalConfigAcceptable(
936    ReplicationDomainCfg configuration) throws ConfigException
937  {
938    /*
939     * Parse fractional configuration
940     */
941
942    // Read the configuration entry
943    FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig(
944        configuration);
945
946    if (!newFractionalConfig.isFractional())
947    {
948      // Nothing to check
949      return;
950    }
951
952    // Prepare variables to be filled with config
953    Map<String, Set<String>> newFractionalSpecificClassesAttributes =
954      newFractionalConfig.getFractionalSpecificClassesAttributes();
955    Set<String> newFractionalAllClassesAttributes =
956      newFractionalConfig.getFractionalAllClassesAttributes();
957
958    /*
959     * Check attributes consistency : we only allow to filter MAY (optional)
960     * attributes of a class : to be compliant with the schema, no MUST
961     * (mandatory) attribute can be filtered by fractional replication.
962     */
963
964    // Check consistency of specific classes attributes
965    Schema schema = DirectoryServer.getSchema();
966    int fractionalMode = newFractionalConfig.fractionalConfigToInt();
967    for (String className : newFractionalSpecificClassesAttributes.keySet())
968    {
969      // Does the class exist ?
970      ObjectClass fractionalClass = schema.getObjectClass(className);
971      if (fractionalClass.isPlaceHolder())
972      {
973        throw new ConfigException(
974          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className));
975      }
976
977      boolean isExtensibleObjectClass = fractionalClass.isExtensible();
978
979      Set<String> attributes =
980        newFractionalSpecificClassesAttributes.get(className);
981
982      for (String attrName : attributes)
983      {
984        // Not a prohibited attribute ?
985        if (isFractionalProhibitedAttr(attrName))
986        {
987          throw new ConfigException(
988            NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
989        }
990
991        // Does the attribute exist ?
992        AttributeType attributeType = schema.getAttributeType(attrName);
993        if (!attributeType.isPlaceHolder())
994        {
995          // No more checking for the extensibleObject class
996          if (!isExtensibleObjectClass
997              && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL
998              // Exclusive mode : the attribute must be optional
999              && !fractionalClass.isOptional(attributeType))
1000          {
1001            throw new ConfigException(
1002                NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName,
1003                    className));
1004          }
1005        }
1006        else
1007        {
1008          throw new ConfigException(
1009            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
1010        }
1011      }
1012    }
1013
1014    // Check consistency of all classes attributes
1015    for (String attrName : newFractionalAllClassesAttributes)
1016    {
1017      // Not a prohibited attribute ?
1018      if (isFractionalProhibitedAttr(attrName))
1019      {
1020        throw new ConfigException(
1021          NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
1022      }
1023
1024      // Does the attribute exist ?
1025      if (schema.getAttributeType(attrName) == null)
1026      {
1027        throw new ConfigException(
1028          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
1029      }
1030    }
1031  }
1032
1033  /**
1034   * Test if the passed attribute is not allowed to be used in configuration of
1035   * fractional replication.
1036   * @param attr Attribute to test.
1037   * @return true if the attribute is prohibited.
1038   */
1039  private static boolean isFractionalProhibitedAttr(String attr)
1040  {
1041    for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES)
1042    {
1043      if (forbiddenAttr.equalsIgnoreCase(attr))
1044      {
1045        return true;
1046      }
1047    }
1048    return false;
1049  }
1050
1051  /**
1052   * If fractional replication is enabled, this analyzes the operation and
1053   * suppresses the forbidden attributes in it so that they are not added in
1054   * the local backend.
1055   *
1056   * @param addOperation The operation to modify based on fractional
1057   * replication configuration
1058   * @param performFiltering Tells if the effective attribute filtering should
1059   * be performed or if the call is just to analyze if there are some
1060   * attributes filtered by fractional configuration
1061   * @return true if the operation contains some attributes subject to filtering
1062   * by the fractional configuration
1063   */
1064  private boolean fractionalFilterOperation(
1065    PreOperationAddOperation addOperation, boolean performFiltering)
1066  {
1067    return fractionalRemoveAttributesFromEntry(fractionalConfig,
1068      addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(),
1069      addOperation.getUserAttributes(), performFiltering);
1070  }
1071
1072  /**
1073   * If fractional replication is enabled, this analyzes the operation and
1074   * suppresses the forbidden attributes in it so that they are not added in
1075   * the local backend.
1076   *
1077   * @param modifyDNOperation The operation to modify based on fractional
1078   * replication configuration
1079   * @param performFiltering Tells if the effective modifications should
1080   * be performed or if the call is just to analyze if there are some
1081   * inconsistency with fractional configuration
1082   * @return true if the operation is inconsistent with fractional
1083   * configuration
1084   */
1085  private boolean fractionalFilterOperation(
1086    PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
1087  {
1088    // Quick exit if not called for analyze and
1089    if (performFiltering && modifyDNOperation.deleteOldRDN())
1090    {
1091      // The core will remove any occurrence of attribute that was part of the
1092      // old RDN, nothing more to do.
1093      return true; // Will not be used as analyze was not requested
1094    }
1095
1096    // Create a list of filtered attributes for this entry
1097    Entry concernedEntry = modifyDNOperation.getOriginalEntry();
1098    Set<AttributeType> fractionalConcernedAttributes =
1099      createFractionalConcernedAttrList(fractionalConfig,
1100      concernedEntry.getObjectClasses().keySet());
1101
1102    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1103    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1104    {
1105      // No attributes to filter
1106      return false;
1107    }
1108
1109    /*
1110     * Analyze the old and new rdn to see if they are some attributes to be
1111     * removed: if the oldRDN contains some forbidden attributes (for instance
1112     * it is possible if the entry was created with an add operation and the
1113     * RDN used contains a forbidden attribute: in this case the attribute value
1114     * has been kept to be consistent with the dn of the entry.) that are no
1115     * more part of the new RDN, we must remove any attribute of this type by
1116     * putting a modification to delete the attribute.
1117     */
1118
1119    boolean inconsistentOperation = false;
1120    RDN rdn = modifyDNOperation.getEntryDN().rdn();
1121    RDN newRdn = modifyDNOperation.getNewRDN();
1122
1123    // Go through each attribute of the old RDN
1124    for (AVA ava : rdn)
1125    {
1126      AttributeType attributeType = ava.getAttributeType();
1127      // Is it present in the fractional attributes established list ?
1128      boolean foundAttribute =
1129          fractionalConcernedAttributes.contains(attributeType);
1130      if (canRemoveAttribute(fractionalExclusive, foundAttribute)
1131          && !newRdn.hasAttributeType(attributeType)
1132          && !modifyDNOperation.deleteOldRDN())
1133      {
1134        /*
1135         * A forbidden attribute is in the old RDN and no more in the new RDN,
1136         * and it has not been requested to remove attributes from old RDN:
1137         * let's remove the attribute from the entry to stay consistent with
1138         * fractional configuration
1139         */
1140        Modification modification = new Modification(ModificationType.DELETE,
1141          Attributes.empty(attributeType));
1142        modifyDNOperation.addModification(modification);
1143        inconsistentOperation = true;
1144      }
1145    }
1146
1147    return inconsistentOperation;
1148  }
1149
1150  /**
1151   * Remove attributes from an entry, according to the passed fractional
1152   * configuration. The entry is represented by the 2 passed parameters.
1153   * The attributes to be removed are removed using the remove method on the
1154   * passed iterator for the attributes in the entry.
1155   * @param fractionalConfig The fractional configuration to use
1156   * @param entryRdn The rdn of the entry to add
1157   * @param classes The object classes representing the entry to modify
1158   * @param attributesMap The map of attributes/values to be potentially removed
1159   * from the entry.
1160   * @param performFiltering Tells if the effective attribute filtering should
1161   * be performed or if the call is just an analyze to see if there are some
1162   * attributes filtered by fractional configuration
1163   * @return true if the operation contains some attributes subject to filtering
1164   * by the fractional configuration
1165   */
1166   static boolean fractionalRemoveAttributesFromEntry(
1167    FractionalConfig fractionalConfig, RDN entryRdn,
1168    Map<ObjectClass,String> classes, Map<AttributeType,
1169    List<Attribute>> attributesMap, boolean performFiltering)
1170  {
1171    boolean hasSomeAttributesToFilter = false;
1172    /*
1173     * Prepare a list of attributes to be included/excluded according to the
1174     * fractional replication configuration
1175     */
1176
1177    Set<AttributeType> fractionalConcernedAttributes =
1178      createFractionalConcernedAttrList(fractionalConfig, classes.keySet());
1179    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1180    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1181    {
1182      return false; // No attributes to filter
1183    }
1184
1185    // Prepare list of object classes of the added entry
1186    Set<ObjectClass> entryClasses = classes.keySet();
1187
1188    /*
1189     * Go through the user attributes and remove those that match filtered one
1190     * - exclude mode : remove only attributes that are in
1191     * fractionalConcernedAttributes
1192     * - include mode : remove any attribute that is not in
1193     * fractionalConcernedAttributes
1194     */
1195    List<List<Attribute>> newRdnAttrLists = new ArrayList<>();
1196    List<AttributeType> rdnAttrTypes = new ArrayList<>();
1197    final Set<AttributeType> attrTypes = attributesMap.keySet();
1198    for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();)
1199    {
1200      AttributeType attributeType = iter.next();
1201
1202      // Only optional attributes may be removed
1203      if (isMandatoryAttribute(entryClasses, attributeType)
1204      // Do not remove an attribute if it is a prohibited one
1205          || isFractionalProhibited(attributeType)
1206          || !canRemoveAttribute(attributeType, fractionalExclusive, fractionalConcernedAttributes))
1207      {
1208        continue;
1209      }
1210
1211      if (!performFiltering)
1212      {
1213        // The call was just to check : at least one attribute to filter
1214        // found, return immediately the answer;
1215        return true;
1216      }
1217
1218      // Do not remove an attribute/value that is part of the RDN of the
1219      // entry as it is forbidden
1220      if (entryRdn.hasAttributeType(attributeType))
1221      {
1222        /*
1223        We must remove all values of the attributes map for this
1224        attribute type but the one that has the value which is in the RDN
1225        of the entry. In fact the (underlying )attribute list does not
1226        support remove so we have to create a new list, keeping only the
1227        attribute value which is the same as in the RDN
1228        */
1229        ByteString rdnAttributeValue =
1230          entryRdn.getAttributeValue(attributeType);
1231        List<Attribute> attrList = attributesMap.get(attributeType);
1232        ByteString sameAttrValue = null;
1233        // Locate the attribute value identical to the one in the RDN
1234        for (Attribute attr : attrList)
1235        {
1236          if (attr.contains(rdnAttributeValue))
1237          {
1238            for (ByteString attrValue : attr) {
1239              if (rdnAttributeValue.equals(attrValue)) {
1240                // Keep the value we want
1241                sameAttrValue = attrValue;
1242              } else {
1243                hasSomeAttributesToFilter = true;
1244              }
1245            }
1246          }
1247          else
1248          {
1249            hasSomeAttributesToFilter = true;
1250          }
1251        }
1252        //    Recreate the attribute list with only the RDN attribute value
1253        if (sameAttrValue != null)
1254          // Paranoia check: should never be the case as we should always
1255          // find the attribute/value pair matching the pair in the RDN
1256        {
1257          // Construct and store new attribute list
1258          newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue));
1259          /*
1260          Store matching attribute type
1261          The mapping will be done using object from rdnAttrTypes as key
1262          and object from newRdnAttrLists (at same index) as value in
1263          the user attribute map to be modified
1264          */
1265          rdnAttrTypes.add(attributeType);
1266        }
1267      }
1268      else
1269      {
1270        // Found an attribute to remove, remove it from the list.
1271        iter.remove();
1272        hasSomeAttributesToFilter = true;
1273      }
1274    }
1275    // Now overwrite the attribute values for the attribute types present in the
1276    // RDN, if there are some filtered attributes in the RDN
1277    for (int index = 0 ; index < rdnAttrTypes.size() ; index++)
1278    {
1279      attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index));
1280    }
1281    return hasSomeAttributesToFilter;
1282  }
1283
1284   private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType)
1285   {
1286     for (ObjectClass objectClass : entryClasses)
1287     {
1288       if (objectClass.isRequired(attributeType))
1289       {
1290         return true;
1291       }
1292     }
1293     return false;
1294   }
1295
1296   private static boolean isFractionalProhibited(AttributeType attrType)
1297   {
1298     String attributeName = attrType.getNameOrOID();
1299     return (attributeName != null && isFractionalProhibitedAttr(attributeName))
1300         || isFractionalProhibitedAttr(attrType.getOID());
1301   }
1302
1303  private static boolean canRemoveAttribute(AttributeType attributeType,
1304      boolean fractionalExclusive, Set<AttributeType> fractionalConcernedAttributes)
1305  {
1306    // Now remove the attribute or modification if:
1307    // - exclusive mode and attribute is in configuration
1308    // - inclusive mode and attribute is not in configuration
1309    return canRemoveAttribute(fractionalExclusive,
1310        fractionalConcernedAttributes.contains(attributeType));
1311  }
1312
1313  private static boolean canRemoveAttribute(boolean fractionalExclusive, boolean foundAttribute)
1314  {
1315    return (foundAttribute && fractionalExclusive)
1316        || (!foundAttribute && !fractionalExclusive);
1317  }
1318
1319  /**
1320   * Prepares a list of attributes of interest for the fractional feature.
1321   * @param fractionalConfig The fractional configuration to use
1322   * @param entryObjectClasses The object classes of an entry on which an
1323   * operation is going to be performed.
1324   * @return The list of attributes of the entry to be excluded/included
1325   * when the operation will be performed.
1326   */
1327  private static Set<AttributeType> createFractionalConcernedAttrList(
1328    FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses)
1329  {
1330    /*
1331     * Is the concerned entry of a type concerned by fractional replication configuration ?
1332     * If yes, add the matching attribute names to a set of attributes
1333     * to take into account for filtering (inclusive or exclusive mode).
1334     * Using a Set to avoid duplicate attributes (from 2 inheriting classes for instance)
1335     */
1336    Set<String> fractionalConcernedAttributes = new HashSet<>();
1337
1338    // Get object classes the entry matches
1339    Set<String> fractionalAllClassesAttributes =
1340      fractionalConfig.getFractionalAllClassesAttributes();
1341    Map<String, Set<String>> fractionalSpecificClassesAttributes =
1342      fractionalConfig.getFractionalSpecificClassesAttributes();
1343
1344    Set<String> fractionalClasses =
1345        fractionalSpecificClassesAttributes.keySet();
1346    for (ObjectClass entryObjectClass : entryObjectClasses)
1347    {
1348      for(String fractionalClass : fractionalClasses)
1349      {
1350        if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase()))
1351        {
1352          fractionalConcernedAttributes.addAll(
1353              fractionalSpecificClassesAttributes.get(fractionalClass));
1354        }
1355      }
1356    }
1357
1358    // Add to the set any attribute which is class independent
1359    fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes);
1360
1361    Set<AttributeType> results = new HashSet<>();
1362    for (String attrName : fractionalConcernedAttributes)
1363    {
1364      results.add(DirectoryServer.getSchema().getAttributeType(attrName));
1365    }
1366    return results;
1367  }
1368
1369  /**
1370   * If fractional replication is enabled, this analyzes the operation and
1371   * suppresses the forbidden attributes in it so that they are not added/
1372   * deleted/modified in the local backend.
1373   *
1374   * @param modifyOperation The operation to modify based on fractional
1375   * replication configuration
1376   * @param performFiltering Tells if the effective attribute filtering should
1377   * be performed or if the call is just to analyze if there are some
1378   * attributes filtered by fractional configuration
1379   * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES,
1380   * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP
1381   */
1382  private int fractionalFilterOperation(PreOperationModifyOperation
1383    modifyOperation, boolean performFiltering)
1384  {
1385    /*
1386     * Prepare a list of attributes to be included/excluded according to the
1387     * fractional replication configuration
1388     */
1389
1390    Entry modifiedEntry = modifyOperation.getCurrentEntry();
1391    Set<AttributeType> fractionalConcernedAttributes =
1392        createFractionalConcernedAttrList(fractionalConfig, modifiedEntry.getObjectClasses().keySet());
1393    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1394    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1395    {
1396      // No attributes to filter
1397      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1398    }
1399
1400    // Prepare list of object classes of the modified entry
1401    DN entryToModifyDn = modifyOperation.getEntryDN();
1402    Entry entryToModify;
1403    try
1404    {
1405      entryToModify = DirectoryServer.getEntry(entryToModifyDn);
1406    }
1407    catch(DirectoryException e)
1408    {
1409      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
1410      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1411    }
1412    Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet();
1413
1414    /*
1415     * Now go through the attribute modifications and filter the mods according
1416     * to the fractional configuration (using the just established concerned
1417     * attributes list):
1418     * - delete attributes: remove them if regarding a filtered attribute
1419     * - add attributes: remove them if regarding a filtered attribute
1420     * - modify attributes: remove them if regarding a filtered attribute
1421     */
1422
1423    int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1424    List<Modification> mods = modifyOperation.getModifications();
1425    Iterator<Modification> modsIt = mods.iterator();
1426    while (modsIt.hasNext())
1427    {
1428      Modification mod = modsIt.next();
1429      Attribute attr = mod.getAttribute();
1430      AttributeType attrType = attr.getAttributeDescription().getAttributeType();
1431      // Fractional replication ignores operational attributes
1432      if (attrType.isOperational()
1433          || isMandatoryAttribute(entryClasses, attrType)
1434          || isFractionalProhibited(attrType)
1435          || !canRemoveAttribute(attrType, fractionalExclusive,
1436              fractionalConcernedAttributes))
1437      {
1438        continue;
1439      }
1440
1441      if (!performFiltering)
1442      {
1443        // The call was just to check : at least one attribute to filter
1444        // found, return immediately the answer;
1445        return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1446      }
1447
1448      // Found a modification to remove, remove it from the list.
1449      modsIt.remove();
1450      result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1451      if (mods.isEmpty())
1452      {
1453        // This operation must become a no-op as no more modification in it
1454        return FRACTIONAL_BECOME_NO_OP;
1455      }
1456    }
1457
1458    return result;
1459  }
1460
1461  /**
1462   * This is overwritten to allow stopping the (online) import process by the
1463   * fractional ldif import plugin when it detects that the (imported) remote
1464   * data set is not consistent with the local fractional configuration.
1465   * {@inheritDoc}
1466   */
1467  @Override
1468  protected byte[] receiveEntryBytes()
1469  {
1470    if (isFollowImport())
1471    {
1472      // Ok, next entry is allowed to be received
1473      return super.receiveEntryBytes();
1474    }
1475
1476    // Fractional ldif import plugin detected inconsistency between local and
1477    // remote server fractional configuration and is stopping the import
1478    // process:
1479    // This is an error termination during the import
1480    // The error is stored and the import is ended by returning null
1481    final ImportExportContext ieCtx = getImportExportContext();
1482    LocalizableMessage msg = null;
1483    switch (importErrorMessageId)
1484    {
1485    case IMPORT_ERROR_MESSAGE_BAD_REMOTE:
1486      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource());
1487      break;
1488    case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL:
1489      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource());
1490      break;
1491    }
1492    ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
1493    return null;
1494  }
1495
1496  /**
1497   * This is overwritten to allow stopping the (online) export process if the
1498   * local domain is fractional and the destination is all other servers:
1499   * This make no sense to have only fractional servers in a replicated
1500   * topology. This prevents from administrator manipulation error that would
1501   * lead to whole topology data corruption.
1502   * {@inheritDoc}
1503   */
1504  @Override
1505  protected void initializeRemote(int target, int requestorID,
1506    Task initTask, int initWindow) throws DirectoryException
1507  {
1508    if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional())
1509    {
1510      LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId());
1511      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
1512    }
1513
1514    super.initializeRemote(target, requestorID, initTask, initWindow);
1515  }
1516
1517  /**
1518   * Implement the  handleConflictResolution phase of the deleteOperation.
1519   *
1520   * @param deleteOperation The deleteOperation.
1521   * @return A SynchronizationProviderResult indicating if the operation
1522   *         can continue.
1523   */
1524  SynchronizationProviderResult handleConflictResolution(
1525         PreOperationDeleteOperation deleteOperation)
1526  {
1527    if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected())
1528    {
1529      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1530      return new SynchronizationProviderResult.StopProcessing(
1531          ResultCode.UNWILLING_TO_PERFORM, msg);
1532    }
1533
1534    DeleteContext ctx =
1535      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
1536    Entry deletedEntry = deleteOperation.getEntryToDelete();
1537
1538    if (ctx != null)
1539    {
1540      /*
1541       * This is a replication operation
1542       * Check that the modified entry has the same entryuuid
1543       * as it was in the original message.
1544       */
1545      String operationEntryUUID = ctx.getEntryUUID();
1546      String deletedEntryUUID = getEntryUUID(deletedEntry);
1547      if (!operationEntryUUID.equals(deletedEntryUUID))
1548      {
1549        /*
1550         * The changes entry is not the same entry as the one on
1551         * the original change was performed.
1552         * Probably the original entry was renamed and replaced with
1553         * another entry.
1554         * We must not let the change proceed, return a negative
1555         * result and set the result code to NO_SUCH_OBJECT.
1556         * When the operation will return, the thread that started the operation
1557         * will try to find the correct entry and restart a new operation.
1558         */
1559        return new SynchronizationProviderResult.StopProcessing(
1560            ResultCode.NO_SUCH_OBJECT, null);
1561      }
1562    }
1563    else
1564    {
1565      // There is no replication context attached to the operation
1566      // so this is not a replication operation.
1567      CSN csn = generateCSN(deleteOperation);
1568      String modifiedEntryUUID = getEntryUUID(deletedEntry);
1569      ctx = new DeleteContext(csn, modifiedEntryUUID);
1570      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
1571
1572      synchronized (replayOperations)
1573      {
1574        int size = replayOperations.size();
1575        if (size >= 10000)
1576        {
1577          replayOperations.remove(replayOperations.firstKey());
1578        }
1579        FakeOperation op = new FakeDelOperation(
1580            deleteOperation.getEntryDN(), csn, modifiedEntryUUID);
1581        replayOperations.put(csn, op);
1582      }
1583    }
1584
1585    return new SynchronizationProviderResult.ContinueProcessing();
1586  }
1587
1588  /**
1589   * Implement the  handleConflictResolution phase of the addOperation.
1590   *
1591   * @param addOperation The AddOperation.
1592   * @return A SynchronizationProviderResult indicating if the operation
1593   *         can continue.
1594   */
1595  SynchronizationProviderResult handleConflictResolution(
1596      PreOperationAddOperation addOperation)
1597  {
1598    if (!addOperation.isSynchronizationOperation() && !brokerIsConnected())
1599    {
1600      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1601      return new SynchronizationProviderResult.StopProcessing(
1602          ResultCode.UNWILLING_TO_PERFORM, msg);
1603    }
1604
1605    if (fractionalConfig.isFractional())
1606    {
1607      if (addOperation.isSynchronizationOperation())
1608      {
1609        /*
1610         * Filter attributes here for fractional replication. If fractional
1611         * replication is enabled, we analyze the operation to suppress the
1612         * forbidden attributes in it so that they are not added in the local
1613         * backend. This must be called before any other plugin is called, to
1614         * keep coherency across plugin calls.
1615         */
1616        fractionalFilterOperation(addOperation, true);
1617      }
1618      else
1619      {
1620        /*
1621         * Direct access from an LDAP client : if some attributes are to be
1622         * removed according to the fractional configuration, simply forbid
1623         * the operation
1624         */
1625        if (fractionalFilterOperation(addOperation, false))
1626        {
1627          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation);
1628          return new SynchronizationProviderResult.StopProcessing(
1629            ResultCode.UNWILLING_TO_PERFORM, msg);
1630        }
1631      }
1632    }
1633
1634    if (addOperation.isSynchronizationOperation())
1635    {
1636      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
1637      /*
1638       * If an entry with the same entry uniqueID already exist then
1639       * this operation has already been replayed in the past.
1640       */
1641      String uuid = ctx.getEntryUUID();
1642      if (findEntryDN(uuid) != null)
1643      {
1644        return new SynchronizationProviderResult.StopProcessing(
1645            ResultCode.NO_OPERATION, null);
1646      }
1647
1648      /* The parent entry may have been renamed here since the change was done
1649       * on the first server, and another entry have taken the former dn
1650       * of the parent entry
1651       */
1652
1653      String parentEntryUUID = ctx.getParentEntryUUID();
1654      // root entry have no parent, there is no need to check for it.
1655      if (parentEntryUUID != null)
1656      {
1657        // There is a potential of perfs improvement here
1658        // if we could avoid the following parent entry retrieval
1659        DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID());
1660        if (parentDnFromCtx == null)
1661        {
1662          // The parent does not exist with the specified unique id
1663          // stop the operation with NO_SUCH_OBJECT and let the
1664          // conflict resolution or the dependency resolution solve this.
1665          return new SynchronizationProviderResult.StopProcessing(
1666              ResultCode.NO_SUCH_OBJECT, null);
1667        }
1668
1669        DN entryDN = addOperation.getEntryDN();
1670        DN parentDnFromEntryDn = DirectoryServer.getParentDNInSuffix(entryDN);
1671        if (parentDnFromEntryDn != null
1672            && !parentDnFromCtx.equals(parentDnFromEntryDn))
1673        {
1674          // parentEntry has been renamed
1675          // replication name conflict resolution is expected to fix that
1676          // later in the flow
1677          return new SynchronizationProviderResult.StopProcessing(
1678              ResultCode.NO_SUCH_OBJECT, null);
1679        }
1680      }
1681    }
1682    return new SynchronizationProviderResult.ContinueProcessing();
1683  }
1684
1685  /**
1686   * Check that the broker associated to this ReplicationDomain has found
1687   * a Replication Server and that this LDAP server is therefore able to
1688   * process operations.
1689   * If not, set the ResultCode, the response message,
1690   * interrupt the operation, and return false
1691   *
1692   * @return  true when it OK to process the Operation, false otherwise.
1693   *          When false is returned the resultCode and the response message
1694   *          is also set in the Operation.
1695   */
1696  private boolean brokerIsConnected()
1697  {
1698    final IsolationPolicy isolationPolicy = config.getIsolationPolicy();
1699    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
1700    {
1701      // this policy imply that we always accept updates.
1702      return true;
1703    }
1704    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
1705    {
1706      // this isolation policy specifies that the updates are denied
1707      // when the broker had problems during the connection phase
1708      // Updates are still accepted if the broker is currently connecting..
1709      return !hasConnectionError();
1710    }
1711    // we should never get there as the only possible policies are
1712    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
1713    return true;
1714  }
1715
1716  /**
1717   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
1718   *
1719   * @param modifyDNOperation The ModifyDNOperation.
1720   * @return A SynchronizationProviderResult indicating if the operation
1721   *         can continue.
1722   */
1723  SynchronizationProviderResult handleConflictResolution(
1724      PreOperationModifyDNOperation modifyDNOperation)
1725  {
1726    if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected())
1727    {
1728      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1729      return new SynchronizationProviderResult.StopProcessing(
1730          ResultCode.UNWILLING_TO_PERFORM, msg);
1731    }
1732
1733    if (fractionalConfig.isFractional())
1734    {
1735      if (modifyDNOperation.isSynchronizationOperation())
1736      {
1737        /*
1738         * Filter operation here for fractional replication. If fractional
1739         * replication is enabled, we analyze the operation and modify it if
1740         * necessary to stay consistent with what is defined in fractional
1741         * configuration.
1742         */
1743        fractionalFilterOperation(modifyDNOperation, true);
1744      }
1745      else
1746      {
1747        /*
1748         * Direct access from an LDAP client : something is inconsistent with
1749         * the fractional configuration, forbid the operation.
1750         */
1751        if (fractionalFilterOperation(modifyDNOperation, false))
1752        {
1753          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation);
1754          return new SynchronizationProviderResult.StopProcessing(
1755            ResultCode.UNWILLING_TO_PERFORM, msg);
1756        }
1757      }
1758    }
1759
1760    ModifyDnContext ctx =
1761      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
1762    if (ctx != null)
1763    {
1764      /*
1765       * This is a replication operation
1766       * Check that the modified entry has the same entryuuid
1767       * as was in the original message.
1768       */
1769      final String modifiedEntryUUID =
1770          getEntryUUID(modifyDNOperation.getOriginalEntry());
1771      if (!modifiedEntryUUID.equals(ctx.getEntryUUID()))
1772      {
1773        /*
1774         * The modified entry is not the same entry as the one on
1775         * the original change was performed.
1776         * Probably the original entry was renamed and replaced with
1777         * another entry.
1778         * We must not let the change proceed, return a negative
1779         * result and set the result code to NO_SUCH_OBJECT.
1780         * When the operation will return, the thread that started the operation
1781         * will try to find the correct entry and restart a new operation.
1782         */
1783        return new SynchronizationProviderResult.StopProcessing(
1784            ResultCode.NO_SUCH_OBJECT, null);
1785      }
1786
1787      if (modifyDNOperation.getNewSuperior() != null)
1788      {
1789        /*
1790         * Also check that the current id of the
1791         * parent is the same as when the operation was performed.
1792         */
1793        String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1794        if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null
1795            && !newParentId.equals(ctx.getNewSuperiorEntryUUID()))
1796        {
1797        return new SynchronizationProviderResult.StopProcessing(
1798            ResultCode.NO_SUCH_OBJECT, null);
1799        }
1800      }
1801
1802      /*
1803       * If the object has been renamed more recently than this
1804       * operation, cancel the operation.
1805       */
1806      EntryHistorical hist = newInstanceFromEntry(modifyDNOperation.getOriginalEntry());
1807      if (hist.addedOrRenamedAfter(ctx.getCSN()))
1808      {
1809        return new SynchronizationProviderResult.StopProcessing(
1810            ResultCode.NO_OPERATION, null);
1811      }
1812    }
1813    else
1814    {
1815      // There is no replication context attached to the operation
1816      // so this is not a replication operation.
1817      CSN csn = generateCSN(modifyDNOperation);
1818      String newParentId = null;
1819      if (modifyDNOperation.getNewSuperior() != null)
1820      {
1821        newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1822      }
1823
1824      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
1825      String modifiedEntryUUID = getEntryUUID(modifiedEntry);
1826      ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId);
1827      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
1828    }
1829    return new SynchronizationProviderResult.ContinueProcessing();
1830  }
1831
1832  /**
1833   * Handle the conflict resolution.
1834   * Called by the core server after locking the entry and before
1835   * starting the actual modification.
1836   * @param modifyOperation the operation
1837   * @return code indicating is operation must proceed
1838   */
1839  SynchronizationProviderResult handleConflictResolution(
1840         PreOperationModifyOperation modifyOperation)
1841  {
1842    if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
1843    {
1844      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1845      return new SynchronizationProviderResult.StopProcessing(
1846          ResultCode.UNWILLING_TO_PERFORM, msg);
1847    }
1848
1849    if (fractionalConfig.isFractional())
1850    {
1851      if  (modifyOperation.isSynchronizationOperation())
1852      {
1853        /*
1854         * Filter attributes here for fractional replication. If fractional
1855         * replication is enabled, we analyze the operation and modify it so
1856         * that no forbidden attribute is added/modified/deleted in the local
1857         * backend. This must be called before any other plugin is called, to
1858         * keep coherency across plugin calls.
1859         */
1860        if (fractionalFilterOperation(modifyOperation, true) ==
1861          FRACTIONAL_BECOME_NO_OP)
1862        {
1863          // Every modifications filtered in this operation: the operation
1864          // becomes a no-op
1865          return new SynchronizationProviderResult.StopProcessing(
1866            ResultCode.NO_OPERATION, null);
1867        }
1868      }
1869      else
1870      {
1871        /*
1872         * Direct access from an LDAP client : if some attributes are to be
1873         * removed according to the fractional configuration, simply forbid
1874         * the operation
1875         */
1876        switch(fractionalFilterOperation(modifyOperation, false))
1877        {
1878          case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES:
1879            // Ok, let the operation happen
1880            break;
1881          case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES:
1882            // Some attributes not compliant with fractional configuration :
1883            // forbid the operation
1884            LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation);
1885            return new SynchronizationProviderResult.StopProcessing(
1886              ResultCode.UNWILLING_TO_PERFORM, msg);
1887        }
1888      }
1889    }
1890
1891    ModifyContext ctx =
1892      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
1893
1894    Entry modifiedEntry = modifyOperation.getModifiedEntry();
1895    if (ctx == null)
1896    {
1897      // No replication ctx attached => not a replicated operation
1898      // - create a ctx with : CSN, entryUUID
1899      // - attach the context to the op
1900
1901      CSN csn = generateCSN(modifyOperation);
1902      ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry));
1903
1904      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
1905    }
1906    else
1907    {
1908      // Replication ctx attached => this is a replicated operation being
1909      // replayed here, it is necessary to
1910      // - check if the entry has been renamed
1911      // - check for conflicts
1912      String modifiedEntryUUID = ctx.getEntryUUID();
1913      String currentEntryUUID = getEntryUUID(modifiedEntry);
1914      if (currentEntryUUID != null
1915          && !currentEntryUUID.equals(modifiedEntryUUID))
1916      {
1917        /*
1918         * The current modified entry is not the same entry as the one on
1919         * the original modification was performed.
1920         * Probably the original entry was renamed and replaced with
1921         * another entry.
1922         * We must not let the modification proceed, return a negative
1923         * result and set the result code to NO_SUCH_OBJECT.
1924         * When the operation will return, the thread that started the
1925         * operation will try to find the correct entry and restart a new
1926         * operation.
1927         */
1928         return new SynchronizationProviderResult.StopProcessing(
1929              ResultCode.NO_SUCH_OBJECT, null);
1930      }
1931
1932      // Solve the conflicts between modify operations
1933      EntryHistorical historicalInformation = newInstanceFromEntry(modifiedEntry);
1934      modifyOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation);
1935
1936      if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
1937      {
1938        numResolvedModifyConflicts.incrementAndGet();
1939      }
1940    }
1941    return new SynchronizationProviderResult.ContinueProcessing();
1942  }
1943
1944  /**
1945   * The preOperation phase for the add Operation.
1946   * Its job is to generate the replication context associated to the
1947   * operation. It is necessary to do it in this phase because contrary to
1948   * the other operations, the entry UUID is not set when the handleConflict
1949   * phase is called.
1950   *
1951   * @param addOperation The Add Operation.
1952   */
1953  void doPreOperation(PreOperationAddOperation addOperation)
1954  {
1955    final CSN csn = generateCSN(addOperation);
1956    final String entryUUID = getEntryUUID(addOperation);
1957    final AddContext ctx = new AddContext(csn, entryUUID,
1958        findEntryUUID(DirectoryServer.getParentDNInSuffix(addOperation.getEntryDN())));
1959    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
1960  }
1961
1962  @Override
1963  public void publishReplicaOfflineMsg()
1964  {
1965    pendingChanges.putReplicaOfflineMsg();
1966    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
1967  }
1968
1969  /**
1970   * Check if an operation must be synchronized.
1971   * Also update the list of pending changes and the server RUV
1972   * @param op the operation
1973   */
1974  void synchronize(PostOperationOperation op)
1975  {
1976    ResultCode result = op.getResultCode();
1977    // Note that a failed non-replication operation might not have a change
1978    // number.
1979    CSN curCSN = OperationContext.getCSN(op);
1980    if (curCSN != null && config.isLogChangenumber())
1981    {
1982      op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
1983          "replicationCSN", curCSN));
1984    }
1985
1986    if (result == ResultCode.SUCCESS)
1987    {
1988      if (op.isSynchronizationOperation())
1989      { // Replaying a sync operation
1990        numReplayedPostOpCalled.incrementAndGet();
1991        try
1992        {
1993          remotePendingChanges.commit(curCSN);
1994        }
1995        catch (NoSuchElementException e)
1996        {
1997          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
1998          return;
1999        }
2000      }
2001      else
2002      {
2003        // Generate a replication message for a successful non-replication
2004        // operation.
2005        LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op);
2006
2007        if (msg == null)
2008        {
2009          /*
2010          * This is an operation type that we do not know about
2011          * It should never happen.
2012          */
2013          pendingChanges.remove(curCSN);
2014          logger.error(ERR_UNKNOWN_TYPE, op.getOperationType());
2015          return;
2016        }
2017
2018        addEntryAttributesForCL(msg,op);
2019
2020        // If assured replication is configured, this will prepare blocking
2021        // mechanism. If assured replication is disabled, this returns
2022        // immediately
2023        prepareWaitForAckIfAssuredEnabled(msg);
2024        try
2025        {
2026          msg.encode();
2027          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
2028        }
2029        catch (NoSuchElementException e)
2030        {
2031          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
2032          return;
2033        }
2034        // If assured replication is enabled, this will wait for the matching
2035        // ack or time out. If assured replication is disabled, this returns
2036        // immediately
2037        try
2038        {
2039          waitForAckIfAssuredEnabled(msg);
2040        } catch (TimeoutException ex)
2041        {
2042          // This exception may only be raised if assured replication is enabled
2043          logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg);
2044        }
2045      }
2046
2047      /*
2048       * If the operation is a DELETE on the base entry of the suffix
2049       * that is replicated, the generation is now lost because the
2050       * DB is empty. We need to save it again the next time we add an entry.
2051       */
2052      if (OperationType.DELETE.equals(op.getOperationType())
2053          && ((PostOperationDeleteOperation) op)
2054                .getEntryDN().equals(getBaseDN()))
2055      {
2056        generationIdSavedStatus = false;
2057      }
2058
2059      if (!generationIdSavedStatus)
2060      {
2061        saveGenerationId(generationId);
2062      }
2063    }
2064    else if (!op.isSynchronizationOperation() && curCSN != null)
2065    {
2066      // Remove an unsuccessful non-replication operation from the pending
2067      // changes list.
2068      pendingChanges.remove(curCSN);
2069      pendingChanges.pushCommittedChanges();
2070    }
2071
2072    checkForClearedConflict(op);
2073  }
2074
2075  /**
2076   * Check if the operation that just happened has cleared a conflict :
2077   * Clearing a conflict happens if the operation has free a DN that
2078   * for which an other entry was in conflict.
2079   * Steps:
2080   * - get the DN freed by a DELETE or MODRDN op
2081   * - search for entries put in the conflict space (dn=entryUUID'+'....)
2082   *   because the expected DN was not available (ds-sync-conflict=expected DN)
2083   * - retain the entry with the oldest conflict
2084   * - rename this entry with the freedDN as it was expected originally
2085   */
2086   private void checkForClearedConflict(PostOperationOperation op)
2087   {
2088     OperationType type = op.getOperationType();
2089     if (op.getResultCode() != ResultCode.SUCCESS)
2090     {
2091       // those operations cannot have cleared a conflict
2092       return;
2093     }
2094
2095     DN freedDN;
2096     if (type == OperationType.DELETE)
2097     {
2098       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
2099     }
2100     else if (type == OperationType.MODIFY_DN)
2101     {
2102       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
2103     }
2104     else
2105     {
2106       return;
2107     }
2108
2109    SearchFilter filter;
2110    try
2111    {
2112      filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT,
2113          ByteString.valueOfUtf8(freedDN.toString())).toSearchFilter();
2114    }
2115    catch (DirectoryException e)
2116    {
2117      // can not happen?
2118      logger.traceException(e);
2119      return;
2120    }
2121
2122    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
2123        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
2124    InternalSearchOperation searchOp =  conn.processSearch(request);
2125
2126     Entry entryToRename = null;
2127     CSN entryToRenameCSN = null;
2128     for (SearchResultEntry entry : searchOp.getSearchEntries())
2129     {
2130       EntryHistorical history = newInstanceFromEntry(entry);
2131       if (entryToRename == null)
2132       {
2133         entryToRename = entry;
2134         entryToRenameCSN = history.getDNDate();
2135       }
2136       else if (!history.addedOrRenamedAfter(entryToRenameCSN))
2137       {
2138         // this conflict is older than the previous, keep it.
2139         entryToRename = entry;
2140         entryToRenameCSN = history.getDNDate();
2141       }
2142     }
2143
2144     if (entryToRename != null)
2145     {
2146       DN entryDN = entryToRename.getName();
2147       ModifyDNOperation newOp = renameEntry(
2148           entryDN, freedDN.rdn(), freedDN.parent(), false);
2149
2150       ResultCode res = newOp.getResultCode();
2151       if (res != ResultCode.SUCCESS)
2152       {
2153        logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res);
2154       }
2155     }
2156   }
2157
2158  /**
2159   * Rename an Entry Using a synchronization, non-replicated operation.
2160   * This method should be used instead of the InternalConnection methods
2161   * when the operation that need to be run must be local only and therefore
2162   * not replicated to the RS.
2163   *
2164   * @param targetDN     The DN of the entry to rename.
2165   * @param newRDN       The new RDN to be used.
2166   * @param parentDN     The parentDN to be used.
2167   * @param markConflict A boolean indicating is this entry should be marked
2168   *                     as a conflicting entry. In such case the
2169   *                     DS_SYNC_CONFLICT attribute will be added to the entry
2170   *                     with the value of its original DN.
2171   *                     If false, the DS_SYNC_CONFLICT attribute will be
2172   *                     cleared.
2173   *
2174   * @return The operation that was run to rename the entry.
2175   */
2176  private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN,
2177      boolean markConflict)
2178  {
2179    ModifyDNOperation newOp = new ModifyDNOperationBasis(
2180        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
2181        targetDN, newRDN, false, parentDN);
2182
2183    if (markConflict)
2184    {
2185      Attribute attr = Attributes.create(DS_SYNC_CONFLICT, targetDN.toString());
2186      newOp.addModification(new Modification(ModificationType.REPLACE, attr));
2187    }
2188    else
2189    {
2190      Attribute attr = Attributes.empty(DS_SYNC_CONFLICT);
2191      newOp.addModification(new Modification(ModificationType.DELETE, attr));
2192    }
2193
2194    runAsSynchronizedOperation(newOp);
2195    return newOp;
2196  }
2197
2198  private void runAsSynchronizedOperation(Operation op)
2199  {
2200    op.setInternalOperation(true);
2201    op.setSynchronizationOperation(true);
2202    op.setDontSynchronize(true);
2203    op.run();
2204  }
2205
2206  /** Delete this ReplicationDomain. */
2207  void delete()
2208  {
2209    shutdown();
2210    removeECLDomainCfg();
2211  }
2212
2213  /** Shutdown this ReplicationDomain. */
2214  public void shutdown()
2215  {
2216    if (shutdown.compareAndSet(false, true))
2217    {
2218      final RSUpdater rsUpdater = this.rsUpdater.get();
2219      if (rsUpdater != null)
2220      {
2221        rsUpdater.initiateShutdown();
2222      }
2223
2224      // stop the thread in charge of flushing the ServerState.
2225      if (flushThread != null)
2226      {
2227        flushThread.initiateShutdown();
2228        synchronized (flushThread)
2229        {
2230          flushThread.notify();
2231        }
2232      }
2233
2234      DirectoryServer.deregisterAlertGenerator(this);
2235      DirectoryServer.deregisterBackendInitializationListener(this);
2236      DirectoryServer.deregisterShutdownListener(this);
2237
2238      // stop the ReplicationDomain
2239      disableService();
2240    }
2241
2242    // wait for completion of the ServerStateFlush thread.
2243    try
2244    {
2245      while (!done)
2246      {
2247        Thread.sleep(50);
2248      }
2249    } catch (InterruptedException e)
2250    {
2251      Thread.currentThread().interrupt();
2252    }
2253  }
2254
2255  /**
2256   * Marks the specified message as the one currently processed by a replay thread.
2257   * @param msg the message being processed
2258   */
2259  void markInProgress(LDAPUpdateMsg msg)
2260  {
2261    remotePendingChanges.markInProgress(msg);
2262  }
2263
2264  /**
2265   * Create and replay a synchronized Operation from an UpdateMsg.
2266   *
2267   * @param msg
2268   *          The UpdateMsg to be replayed.
2269   * @param shutdown
2270   *          whether the server initiated shutdown
2271   */
2272  void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
2273  {
2274    // Try replay the operation, then flush (replaying) any pending operation
2275    // whose dependency has been replayed until no more left.
2276    do
2277    {
2278      Operation op = null; // the last operation on which replay was attempted
2279      boolean dependency = false;
2280      String replayErrorMsg = null;
2281      CSN csn = null;
2282      try
2283      {
2284        // The next operation for which to attempt replay.
2285        // This local variable allow to keep error messages in the "op" local
2286        // variable until the next loop iteration starts.
2287        // "op" is already initialized to the next Operation because of the
2288        // error handling paths.
2289        Operation nextOp = op = msg.createOperation(conn);
2290        dependency = remotePendingChanges.checkDependencies(op, msg);
2291        boolean replayDone = false;
2292        int retryCount = 10;
2293        while (!dependency && !replayDone && retryCount-- > 0)
2294        {
2295          if (shutdown.get())
2296          {
2297            // shutdown initiated, let's leave
2298            return;
2299          }
2300          // Try replay the operation
2301          op = nextOp;
2302          op.setInternalOperation(true);
2303          op.setSynchronizationOperation(true);
2304
2305          // Always add the ManageDSAIT control so that updates to referrals
2306          // are processed locally.
2307          op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL));
2308
2309          // Add the permissive modify control in order to handle updates to values which are already present
2310          // in the entry based on equality matching, but whose raw content differs. e.g., changing the case
2311          // or a case-insensitive value. See OPENDJ-2792
2312          if (op instanceof ModifyOperation)
2313          {
2314            op.addRequestControl(new LDAPControl(OID_PERMISSIVE_MODIFY_CONTROL));
2315          }
2316
2317          csn = OperationContext.getCSN(op);
2318          op.run();
2319
2320          ResultCode result = op.getResultCode();
2321
2322          if (result != ResultCode.SUCCESS)
2323          {
2324            if (result == ResultCode.NO_OPERATION)
2325            {
2326              // Pre-operation conflict resolution detected that the operation
2327              // was a no-op. For example, an add which has already been
2328              // replayed, or a modify DN operation on an entry which has been
2329              // renamed by a more recent modify DN.
2330              replayDone = true;
2331            }
2332            else if (result == ResultCode.BUSY)
2333            {
2334              /*
2335               * We probably could not get a lock (OPENDJ-885). Give the server
2336               * another chance to process this operation immediately.
2337               */
2338              Thread.yield();
2339              continue;
2340            }
2341            else if (result == ResultCode.UNAVAILABLE)
2342            {
2343              /*
2344               * It can happen when a rebuild is performed or the backend is
2345               * offline (OPENDJ-49). Give the server another chance to process
2346               * this operation after some time.
2347               */
2348              Thread.sleep(50);
2349              continue;
2350            }
2351            else if (op instanceof ModifyOperation)
2352            {
2353              ModifyOperation castOp = (ModifyOperation) op;
2354              dependency = remotePendingChanges.checkDependencies(castOp);
2355              ModifyMsg modifyMsg = (ModifyMsg) msg;
2356              replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
2357            }
2358            else if (op instanceof DeleteOperation)
2359            {
2360              DeleteOperation castOp = (DeleteOperation) op;
2361              dependency = remotePendingChanges.checkDependencies(castOp);
2362              replayDone = !dependency && solveNamingConflict(castOp, msg);
2363            }
2364            else if (op instanceof AddOperation)
2365            {
2366              AddOperation castOp = (AddOperation) op;
2367              AddMsg addMsg = (AddMsg) msg;
2368              dependency = remotePendingChanges.checkDependencies(castOp);
2369              replayDone = !dependency && solveNamingConflict(castOp, addMsg);
2370            }
2371            else if (op instanceof ModifyDNOperation)
2372            {
2373              ModifyDNOperation castOp = (ModifyDNOperation) op;
2374              ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
2375              dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
2376              replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
2377            }
2378            else
2379            {
2380              replayDone = true; // unknown type of operation ?!
2381            }
2382
2383            if (replayDone)
2384            {
2385              // the update became a dummy update and the result
2386              // of the conflict resolution phase is to do nothing.
2387              // however we still need to push this change to the serverState
2388              updateError(csn);
2389            }
2390            else
2391            {
2392              /*
2393               * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
2394               * modified it and try replaying it again. Dependencies might have been replayed by now.
2395               *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
2396               *  created with subtreeDelete request control when needed.
2397               */
2398              nextOp = msg.createOperation(conn);
2399            }
2400          }
2401          else
2402          {
2403            replayDone = true;
2404          }
2405        }
2406
2407        if (!replayDone && !dependency)
2408        {
2409          // Continue with the next change but the servers could now become
2410          // inconsistent.
2411          // Let the repair tool know about this.
2412          final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get(
2413              op, op.getErrorMessage());
2414          logger.error(message);
2415          numUnresolvedNamingConflicts.incrementAndGet();
2416          replayErrorMsg = message.toString();
2417          updateError(csn);
2418        }
2419      } catch (DecodeException | LDAPException | DataFormatException e)
2420      {
2421        replayErrorMsg = logDecodingOperationError(msg, e);
2422      } catch (Exception e)
2423      {
2424        if (csn != null)
2425        {
2426          /*
2427           * An Exception happened during the replay process.
2428           * Continue with the next change but the servers will now start
2429           * to be inconsistent.
2430           * Let the repair tool know about this.
2431           */
2432          LocalizableMessage message =
2433              ERR_EXCEPTION_REPLAYING_OPERATION.get(
2434                  stackTraceToSingleLineString(e), op);
2435          logger.error(message);
2436          replayErrorMsg = message.toString();
2437          updateError(csn);
2438        } else
2439        {
2440          replayErrorMsg = logDecodingOperationError(msg, e);
2441        }
2442      } finally
2443      {
2444        if (!dependency)
2445        {
2446          processUpdateDone(msg, replayErrorMsg);
2447        }
2448      }
2449
2450      // Now replay any pending update that had a dependency and whose
2451      // dependency has been replayed, do that until no more updates of that
2452      // type left...
2453      msg = remotePendingChanges.getNextUpdate();
2454    } while (msg != null);
2455  }
2456
2457  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
2458  {
2459    LocalizableMessage message =
2460        ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e));
2461    logger.error(message);
2462    return message.toString();
2463  }
2464
2465  /**
2466   * This method is called when an error happens while replaying
2467   * an operation.
2468   * It is necessary because the postOperation does not always get
2469   * called when error or Exceptions happen during the operation replay.
2470   *
2471   * @param csn the CSN of the operation with error.
2472   */
2473  private void updateError(CSN csn)
2474  {
2475    try
2476    {
2477      remotePendingChanges.commit(csn);
2478    }
2479    catch (NoSuchElementException e)
2480    {
2481      // A failure occurred after the change had been removed from the pending
2482      // changes table.
2483      if (logger.isTraceEnabled())
2484      {
2485        logger.trace(
2486            "LDAPReplicationDomain.updateError: Unable to find remote "
2487                + "pending change for CSN %s", csn);
2488      }
2489    }
2490  }
2491
2492  /**
2493   * Generate a new CSN and insert it in the pending list.
2494   *
2495   * @param operation
2496   *          The operation for which the CSN must be generated.
2497   * @return The new CSN.
2498   */
2499  private CSN generateCSN(PluginOperation operation)
2500  {
2501    return pendingChanges.putLocalOperation(operation);
2502  }
2503
2504  /**
2505   * Find the Unique Id of the entry with the provided DN by doing a
2506   * search of the entry and extracting its entryUUID from its attributes.
2507   *
2508   * @param dn The dn of the entry for which the unique Id is searched.
2509   *
2510   * @return The unique Id of the entry with the provided DN.
2511   */
2512  static String findEntryUUID(DN dn)
2513  {
2514    if (dn == null)
2515    {
2516      return null;
2517    }
2518    final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT)
2519        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME);
2520    final InternalSearchOperation search = getRootConnection().processSearch(request);
2521    final SearchResultEntry resultEntry = getFirstResult(search);
2522    if (resultEntry != null)
2523    {
2524      return getEntryUUID(resultEntry);
2525    }
2526    return null;
2527  }
2528
2529  private static SearchResultEntry getFirstResult(InternalSearchOperation search)
2530  {
2531    if (search.getResultCode() == ResultCode.SUCCESS)
2532    {
2533      final LinkedList<SearchResultEntry> results = search.getSearchEntries();
2534      if (!results.isEmpty())
2535      {
2536        return results.getFirst();
2537      }
2538    }
2539    return null;
2540  }
2541
2542  /**
2543   * Find the current DN of an entry from its entry UUID.
2544   *
2545   * @param uuid the Entry Unique ID.
2546   * @return The current DN of the entry or null if there is no entry with
2547   *         the specified UUID.
2548   */
2549  private DN findEntryDN(String uuid)
2550  {
2551    try
2552    {
2553      final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid);
2554      InternalSearchOperation search = conn.processSearch(request);
2555      final SearchResultEntry resultEntry = getFirstResult(search);
2556      if (resultEntry != null)
2557      {
2558        return resultEntry.getName();
2559      }
2560    }
2561    catch (DirectoryException e)
2562    {
2563      // never happens because the filter is always valid.
2564    }
2565    return null;
2566  }
2567
2568  /**
2569   * Solve a conflict detected when replaying a modify operation.
2570   *
2571   * @param op The operation that triggered the conflict detection.
2572   * @param msg The operation that triggered the conflict detection.
2573   * @return true if the process is completed, false if it must continue..
2574   */
2575  private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg)
2576  {
2577    ResultCode result = op.getResultCode();
2578    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
2579    String entryUUID = ctx.getEntryUUID();
2580
2581    if (result == ResultCode.NO_SUCH_OBJECT)
2582    {
2583      /*
2584       * The operation is a modification but
2585       * the entry has been renamed on a different master in the same time.
2586       * search if the entry has been renamed, and return the new dn
2587       * of the entry.
2588       */
2589      DN newDN = findEntryDN(entryUUID);
2590      if (newDN != null)
2591      {
2592        // There is an entry with the same unique id as this modify operation
2593        // replay the modify using the current dn of this entry.
2594        msg.setDN(newDN);
2595        numResolvedNamingConflicts.incrementAndGet();
2596        return false;
2597      }
2598      else
2599      {
2600        // This entry does not exist anymore.
2601        // It has probably been deleted, stop the processing of this operation
2602        numResolvedNamingConflicts.incrementAndGet();
2603        return true;
2604      }
2605    }
2606    else if (result == ResultCode.NOT_ALLOWED_ON_RDN)
2607    {
2608      DN currentDN = findEntryDN(entryUUID);
2609      RDN currentRDN;
2610      if (currentDN != null)
2611      {
2612        currentRDN = currentDN.rdn();
2613      }
2614      else
2615      {
2616        // The entry does not exist anymore.
2617        numResolvedNamingConflicts.incrementAndGet();
2618        return true;
2619      }
2620
2621      // The modify operation is trying to delete the value that is
2622      // currently used in the RDN. We need to alter the modify so that it does
2623      // not remove the current RDN value(s).
2624
2625      List<Modification> mods = op.getModifications();
2626      for (Modification mod : mods)
2627      {
2628        AttributeType modAttrType = mod.getAttribute().getAttributeDescription().getAttributeType();
2629        if ((mod.getModificationType() == ModificationType.DELETE
2630              || mod.getModificationType() == ModificationType.REPLACE)
2631            && currentRDN.hasAttributeType(modAttrType))
2632        {
2633          // the attribute can't be deleted because it is used in the RDN,
2634          // turn this operation is a replace with the current RDN value(s);
2635          mod.setModificationType(ModificationType.REPLACE);
2636          Attribute newAttribute = mod.getAttribute();
2637          AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute);
2638          attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
2639          mod.setAttribute(attrBuilder.toAttribute());
2640        }
2641      }
2642      msg.setMods(mods);
2643      numResolvedNamingConflicts.incrementAndGet();
2644      return false;
2645    }
2646    else
2647    {
2648      // The other type of errors can not be caused by naming conflicts.
2649      // Log a message for the repair tool.
2650      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2651          op, ctx.getCSN(), result, op.getErrorMessage());
2652      return true;
2653    }
2654  }
2655
2656 /**
2657  * Solve a conflict detected when replaying a delete operation.
2658  *
2659  * @param op The operation that triggered the conflict detection.
2660  * @param msg The operation that triggered the conflict detection.
2661  * @return true if the process is completed, false if it must continue..
2662  */
2663 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg)
2664 {
2665   ResultCode result = op.getResultCode();
2666   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
2667   String entryUUID = ctx.getEntryUUID();
2668
2669   if (result == ResultCode.NO_SUCH_OBJECT)
2670   {
2671     /* Find if the entry is still in the database. */
2672     DN currentDN = findEntryDN(entryUUID);
2673     if (currentDN == null)
2674     {
2675       /*
2676        * The entry has already been deleted, either because this delete
2677        * has already been replayed or because another concurrent delete
2678        * has already done the job.
2679        * In any case, there is nothing more to do.
2680        */
2681       numResolvedNamingConflicts.incrementAndGet();
2682       return true;
2683     }
2684     else
2685     {
2686       // This entry has been renamed, replay the delete using its new DN.
2687       msg.setDN(currentDN);
2688       numResolvedNamingConflicts.incrementAndGet();
2689       return false;
2690     }
2691   }
2692   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
2693   {
2694     /*
2695      * This may happen when we replay a DELETE done on a master
2696      * but children of this entry have been added on another master.
2697      *
2698      * Rename all the children by adding entryuuid in dn and delete this entry.
2699      *
2700      * The action taken here must be consistent with the actions
2701      * done in the solveNamingConflict(AddOperation) method
2702      * when we are adding an entry whose parent entry has already been deleted.
2703      */
2704     if (findAndRenameChild(op.getEntryDN(), op))
2705     {
2706       numUnresolvedNamingConflicts.incrementAndGet();
2707     }
2708
2709     return false;
2710   }
2711   else
2712   {
2713     // The other type of errors can not be caused by naming conflicts.
2714     // Log a message for the repair tool.
2715     logger.error(ERR_ERROR_REPLAYING_OPERATION,
2716         op, ctx.getCSN(), result, op.getErrorMessage());
2717     return true;
2718   }
2719 }
2720
2721/**
2722 * Solve a conflict detected when replaying a Modify DN operation.
2723 *
2724 * @param op The operation that triggered the conflict detection.
2725 * @param msg The operation that triggered the conflict detection.
2726 * @return true if the process is completed, false if it must continue.
2727 * @throws Exception When the operation is not valid.
2728 */
2729private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg)
2730    throws Exception
2731{
2732  ResultCode result = op.getResultCode();
2733  ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
2734  String entryUUID = ctx.getEntryUUID();
2735  String newSuperiorID = ctx.getNewSuperiorEntryUUID();
2736
2737  /*
2738   * four possible cases :
2739   * - the modified entry has been renamed
2740   * - the new parent has been renamed
2741   * - the operation is replayed for the second time.
2742   * - the entry has been deleted
2743   * action :
2744   *  - change the target dn and the new parent dn and
2745   *        restart the operation,
2746   *  - don't do anything if the operation is replayed.
2747   */
2748
2749  // get the current DN of this entry in the database.
2750  DN currentDN = findEntryDN(entryUUID);
2751
2752  // Construct the new DN to use for the entry.
2753  DN entryDN = op.getEntryDN();
2754  DN newSuperior;
2755  RDN newRDN = op.getNewRDN();
2756
2757  if (newSuperiorID != null)
2758  {
2759    newSuperior = findEntryDN(newSuperiorID);
2760  }
2761  else
2762  {
2763    newSuperior = entryDN.parent();
2764  }
2765
2766  //If we could not find the new parent entry, we missed this entry
2767  // earlier or it has disappeared from the database
2768  // Log this information for the repair tool and mark the entry
2769  // as conflicting.
2770  // stop the processing.
2771  if (newSuperior == null)
2772  {
2773    markConflictEntry(op, currentDN, currentDN.parent().child(newRDN));
2774    numUnresolvedNamingConflicts.incrementAndGet();
2775    return true;
2776  }
2777
2778  DN newDN = newSuperior.child(newRDN);
2779
2780  if (currentDN == null)
2781  {
2782    // The entry targeted by the Modify DN is not in the database
2783    // anymore.
2784    // This is a conflict between a delete and this modify DN.
2785    // The entry has been deleted, we can safely assume
2786    // that the operation is completed.
2787    numResolvedNamingConflicts.incrementAndGet();
2788    return true;
2789  }
2790
2791  // if the newDN and the current DN match then the operation
2792  // is a no-op (this was probably a second replay)
2793  // don't do anything.
2794  if (newDN.equals(currentDN))
2795  {
2796    numResolvedNamingConflicts.incrementAndGet();
2797    return true;
2798  }
2799
2800  if (result == ResultCode.NO_SUCH_OBJECT
2801      || result == ResultCode.UNWILLING_TO_PERFORM
2802      || result == ResultCode.OBJECTCLASS_VIOLATION)
2803  {
2804    /*
2805     * The entry or it's new parent has not been found
2806     * reconstruct the operation with the DN we just built
2807     */
2808    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2809    modifyDnMsg.setDN(currentDN);
2810    modifyDnMsg.setNewSuperior(newSuperior.toString());
2811    numResolvedNamingConflicts.incrementAndGet();
2812    return false;
2813  }
2814  else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2815  {
2816    /*
2817     * This may happen when two modifyDn operation
2818     * are done on different servers but with the same target DN
2819     * add the conflict object class to the entry
2820     * and rename it using its entryuuid.
2821     */
2822    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2823    markConflictEntry(op, op.getEntryDN(), newDN);
2824    modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID,
2825                          modifyDnMsg.getNewRDN()));
2826    modifyDnMsg.setNewSuperior(newSuperior.toString());
2827    numUnresolvedNamingConflicts.incrementAndGet();
2828    return false;
2829  }
2830  else
2831  {
2832    // The other type of errors can not be caused by naming conflicts.
2833    // Log a message for the repair tool.
2834    logger.error(ERR_ERROR_REPLAYING_OPERATION,
2835        op, ctx.getCSN(), result, op.getErrorMessage());
2836    return true;
2837  }
2838}
2839
2840  /**
2841   * Solve a conflict detected when replaying a ADD operation.
2842   *
2843   * @param op The operation that triggered the conflict detection.
2844   * @param msg The message that triggered the conflict detection.
2845   * @return true if the process is completed, false if it must continue.
2846   * @throws Exception When the operation is not valid.
2847   */
2848  private boolean solveNamingConflict(AddOperation op, AddMsg msg)
2849      throws Exception
2850  {
2851    ResultCode result = op.getResultCode();
2852    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
2853    String entryUUID = ctx.getEntryUUID();
2854    String parentUniqueId = ctx.getParentEntryUUID();
2855
2856    if (result == ResultCode.NO_SUCH_OBJECT)
2857    {
2858      /*
2859       * This can happen if the parent has been renamed or deleted
2860       * find the parent dn and calculate a new dn for the entry
2861       */
2862      if (parentUniqueId == null)
2863      {
2864        /*
2865         * This entry is the base dn of the backend.
2866         * It is quite surprising that the operation result be NO_SUCH_OBJECT.
2867         * There is nothing more we can do except log a
2868         * message for the repair tool to look at this problem.
2869         * TODO : Log the message
2870         */
2871        return true;
2872      }
2873      DN parentDn = findEntryDN(parentUniqueId);
2874      if (parentDn == null)
2875      {
2876        /*
2877         * The parent has been deleted
2878         * rename the entry as a conflicting entry.
2879         * The action taken here must be consistent with the actions
2880         * done when in the solveNamingConflict(DeleteOperation) method
2881         * when we are deleting an entry that have some child entries.
2882         */
2883        addConflict(msg);
2884
2885        String conflictRDN =
2886            generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString());
2887        msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN()));
2888        // reset the parent entryUUID so that the check done is the
2889        // handleConflict phase does not fail.
2890        msg.setParentEntryUUID(null);
2891        numUnresolvedNamingConflicts.incrementAndGet();
2892      }
2893      else
2894      {
2895        msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn));
2896        numResolvedNamingConflicts.incrementAndGet();
2897      }
2898      return false;
2899    }
2900    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2901    {
2902      /*
2903       * This can happen if
2904       *  - two adds are done on different servers but with the
2905       *    same target DN.
2906       *  - the same ADD is being replayed for the second time on this server.
2907       * if the entryUUID already exist, assume this is a replay and
2908       *        don't do anything
2909       * if the entry unique id do not exist, generate conflict.
2910       */
2911      if (findEntryDN(entryUUID) != null)
2912      {
2913        // entry already exist : this is a replay
2914        return true;
2915      }
2916      else
2917      {
2918        addConflict(msg);
2919        String conflictRDN =
2920            generateConflictRDN(entryUUID, msg.getDN().toString());
2921        msg.setDN(DN.valueOf(conflictRDN));
2922        numUnresolvedNamingConflicts.incrementAndGet();
2923        return false;
2924      }
2925    }
2926    else
2927    {
2928      // The other type of errors can not be caused by naming conflicts.
2929      // log a message for the repair tool.
2930      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2931          op, ctx.getCSN(), result, op.getErrorMessage());
2932      return true;
2933    }
2934  }
2935
2936  /**
2937   * Find all the entries below the provided DN and rename them
2938   * so that they stay below the baseDN of this replicationDomain and
2939   * use the conflicting name and attribute.
2940   *
2941   * @param entryDN    The DN of the entry whose child must be renamed.
2942   * @param conflictOp The Operation that generated the conflict.
2943   */
2944  private boolean findAndRenameChild(DN entryDN, Operation conflictOp)
2945  {
2946    /*
2947     * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the
2948     * entryUUID has not changed or try to use the entryUUID rather than the
2949     * DN.". entryUUID can be obtained from the caller of the current method.
2950     */
2951    boolean conflict = false;
2952
2953    // Find and rename child entries.
2954    final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL)
2955        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME);
2956    InternalSearchOperation op = conn.processSearch(request);
2957    if (op.getResultCode() == ResultCode.SUCCESS)
2958    {
2959      for (SearchResultEntry entry : op.getSearchEntries())
2960      {
2961        /*
2962         * Check the ADD and ModRDN date of the child entry
2963         * (All of them, not only the one that are newer than the DEL op)
2964         * and keep the entry as a conflicting entry.
2965         */
2966        conflict = true;
2967        renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry));
2968      }
2969    }
2970    else
2971    {
2972      // log error and information for the REPAIR tool.
2973      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode());
2974    }
2975
2976    return conflict;
2977  }
2978
2979  /**
2980   * Rename an entry that was conflicting so that it stays below the
2981   * baseDN of the replicationDomain.
2982   *
2983   * @param conflictOp The Operation that caused the conflict.
2984   * @param dn         The DN of the entry to be renamed.
2985   * @param entryUUID        The uniqueID of the entry to be renamed.
2986   */
2987  private void renameConflictEntry(Operation conflictOp, DN dn,
2988      String entryUUID)
2989  {
2990    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn);
2991    DirectoryServer.sendAlertNotification(this,
2992        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2993
2994    RDN newRDN = generateDeleteConflictDn(entryUUID, dn);
2995    ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true);
2996
2997    if (newOp.getResultCode() != ResultCode.SUCCESS)
2998    {
2999      // log information for the repair tool.
3000      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY,
3001          dn, conflictOp, newOp.getResultCode());
3002    }
3003  }
3004
3005  /**
3006   * Generate a modification to add the conflict attribute to an entry
3007   * whose Dn is now conflicting with another entry.
3008   *
3009   * @param op        The operation causing the conflict.
3010   * @param currentDN The current DN of the operation to mark as conflicting.
3011   * @param conflictDN     The newDn on which the conflict happened.
3012   */
3013  private void markConflictEntry(Operation op, DN currentDN, DN conflictDN)
3014  {
3015    // create new internal modify operation and run it.
3016    Attribute attr = Attributes.create(DS_SYNC_CONFLICT, conflictDN.toString());
3017    List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
3018
3019    ModifyOperation newOp = new ModifyOperationBasis(
3020          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
3021          currentDN, mods);
3022    runAsSynchronizedOperation(newOp);
3023
3024    if (newOp.getResultCode() != ResultCode.SUCCESS)
3025    {
3026      // Log information for the repair tool.
3027      logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode());
3028    }
3029
3030    // Generate an alert to let the administration know that some
3031    // conflict could not be solved.
3032    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN);
3033    DirectoryServer.sendAlertNotification(this,
3034        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
3035  }
3036
3037  /**
3038   * Add the conflict attribute to an entry that could
3039   * not be added because it is conflicting with another entry.
3040   *
3041   * @param msg            The conflicting Add Operation.
3042   *
3043   * @throws DecodeException When an encoding error happened manipulating the
3044   *                       msg.
3045   */
3046  private void addConflict(AddMsg msg) throws DecodeException
3047  {
3048    String normalizedDN = msg.getDN().toString();
3049
3050    // Generate an alert to let the administrator know that some
3051    // conflict could not be solved.
3052    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
3053    DirectoryServer.sendAlertNotification(this,
3054        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
3055
3056    // Add the conflict attribute
3057    msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
3058  }
3059
3060  /**
3061   * Generate the Dn to use for a conflicting entry.
3062   *
3063   * @param entryUUID The unique identifier of the entry involved in the
3064   * conflict.
3065   * @param rdn Original rdn.
3066   * @return The generated RDN for a conflicting entry.
3067   */
3068  private String generateConflictRDN(String entryUUID, String rdn)
3069  {
3070    return "entryuuid=" + entryUUID + "+" + rdn;
3071  }
3072
3073  /**
3074   * Generate the RDN to use for a conflicting entry whose father was deleted.
3075   *
3076   * @param entryUUID The unique identifier of the entry involved in the
3077   *                 conflict.
3078   * @param dn       The original DN of the entry.
3079   *
3080   * @return The generated RDN for a conflicting entry.
3081   */
3082  private RDN generateDeleteConflictDn(String entryUUID, DN dn)
3083  {
3084    String newRDN =  "entryuuid=" + entryUUID + "+" + dn.rdn();
3085    try
3086    {
3087      return RDN.valueOf(newRDN);
3088    }
3089    catch (LocalizedIllegalArgumentException e)
3090    {
3091      // cannot happen
3092      return null;
3093    }
3094  }
3095
3096  /**
3097   * Check if the domain solve conflicts.
3098   *
3099   * @return a boolean indicating if the domain should solve conflicts.
3100   */
3101  boolean solveConflict()
3102  {
3103    return solveConflictFlag;
3104  }
3105
3106  /**
3107   * Disable the replication on this domain.
3108   * The session to the replication server will be stopped.
3109   * The domain will not be destroyed but call to the pre-operation
3110   * methods will result in failure.
3111   * The listener thread will be destroyed.
3112   * The monitor informations will still be accessible.
3113   */
3114  public void disable()
3115  {
3116    state.save();
3117    state.clearInMemory();
3118    disabled = true;
3119    disableService(); // This will cut the session and wake up the listener
3120  }
3121
3122  /**
3123   * Do what necessary when the data have changed : load state, load
3124   * generation Id.
3125   * If there is no such information check if there is a
3126   * ReplicaUpdateVector entry and translate it into a state
3127   * and generationId.
3128   * @exception DirectoryException Thrown when an error occurs.
3129   */
3130  private void loadDataState() throws DirectoryException
3131  {
3132    state.clearInMemory();
3133    state.loadState();
3134    getGenerator().adjust(state.getMaxCSN(getServerId()));
3135
3136    // Retrieves the generation ID associated with the data imported
3137    generationId = loadGenerationId();
3138  }
3139
3140  /**
3141   * Enable back the domain after a previous disable.
3142   * The domain will connect back to a replication Server and
3143   * will recreate threads to listen for messages from the Synchronization
3144   * server.
3145   * The generationId will be retrieved or computed if necessary.
3146   * The ServerState will also be read again from the local database.
3147   */
3148  public void enable()
3149  {
3150    try
3151    {
3152      loadDataState();
3153    }
3154    catch (Exception e)
3155    {
3156      /* TODO should mark that replicationServer service is
3157       * not available, log an error and retry upon timeout
3158       * should we stop the modifications ?
3159       */
3160      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3161      return;
3162    }
3163
3164    enableService();
3165
3166    disabled = false;
3167  }
3168
3169  /**
3170   * Compute the data generationId associated with the current data present
3171   * in the backend for this domain.
3172   * @return The computed generationId.
3173   * @throws DirectoryException When an error occurs.
3174   */
3175  private long computeGenerationId() throws DirectoryException
3176  {
3177    final long genId = exportBackend(null, true);
3178    if (logger.isTraceEnabled())
3179    {
3180      logger.trace("Computed generationId: generationId=" + genId);
3181    }
3182    return genId;
3183  }
3184
3185  /**
3186   * Run a modify operation to update the entry whose DN is given as
3187   * a parameter with the generationID information.
3188   *
3189   * @param entryDN       The DN of the entry to be updated.
3190   * @param generationId  The value of the generationID to be saved.
3191   *
3192   * @return A ResultCode indicating if the operation was successful.
3193   */
3194  private ResultCode runSaveGenerationId(DN entryDN, long generationId)
3195  {
3196    // The generationId is stored in the root entry of the domain.
3197    final ByteString asn1BaseDn = ByteString.valueOfUtf8(entryDN.toString());
3198
3199    LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId));
3200    List<RawModification> mods = new ArrayList<>(1);
3201    mods.add(new LDAPModification(ModificationType.REPLACE, attr));
3202
3203    ModifyOperation op = new ModifyOperationBasis(
3204          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
3205          asn1BaseDn, mods);
3206    runAsSynchronizedOperation(op);
3207    return op.getResultCode();
3208  }
3209
3210  /**
3211   * Stores the value of the generationId.
3212   * @param generationId The value of the generationId.
3213   * @return a ResultCode indicating if the method was successful.
3214   */
3215  private ResultCode saveGenerationId(long generationId)
3216  {
3217    ResultCode result = runSaveGenerationId(getBaseDN(), generationId);
3218    if (result != ResultCode.SUCCESS)
3219    {
3220      generationIdSavedStatus = false;
3221      if (result == ResultCode.NO_SUCH_OBJECT)
3222      {
3223        // If the base entry does not exist, save the generation
3224        // ID in the config entry
3225        result = runSaveGenerationId(config.dn(), generationId);
3226      }
3227
3228      if (result != ResultCode.SUCCESS)
3229      {
3230        logger.error(ERR_UPDATING_GENERATION_ID, getBaseDN(), result.getName());
3231      }
3232    }
3233    else
3234    {
3235      generationIdSavedStatus = true;
3236    }
3237    return result;
3238  }
3239
3240  /**
3241   * Load the GenerationId from the root entry of the domain
3242   * from the REPLICATION_GENERATION_ID attribute in database
3243   * to memory, or compute it if not found.
3244   *
3245   * @return generationId The retrieved value of generationId
3246   * @throws DirectoryException When an error occurs.
3247   */
3248  private long loadGenerationId() throws DirectoryException
3249  {
3250    if (logger.isTraceEnabled())
3251    {
3252      logger.trace("Attempt to read generation ID from DB " + getBaseDN());
3253    }
3254
3255    // Search the database entry that is used to periodically save the generation id
3256    final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
3257        .addAttribute(REPLICATION_GENERATION_ID);
3258    InternalSearchOperation search = conn.processSearch(request);
3259    if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT)
3260    {
3261      // if the base entry does not exist look for the generationID
3262      // in the config entry.
3263      request.setName(config.dn());
3264      search = conn.processSearch(request);
3265    }
3266
3267    boolean found = false;
3268    long aGenerationId = -1;
3269    if (search.getResultCode() != ResultCode.SUCCESS)
3270    {
3271      if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
3272      {
3273        String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
3274        logger.error(ERR_SEARCHING_GENERATION_ID, getBaseDN(), errorMsg);
3275      }
3276    }
3277    else
3278    {
3279      List<SearchResultEntry> result = search.getSearchEntries();
3280      SearchResultEntry resultEntry = result.get(0);
3281      if (resultEntry != null)
3282      {
3283        List<Attribute> attrs = resultEntry.getAttribute(REPLICATION_GENERATION_ID);
3284        if (!attrs.isEmpty())
3285        {
3286          Attribute attr = attrs.get(0);
3287          if (attr.size()>1)
3288          {
3289            String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
3290            logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
3291          }
3292          else if (attr.size() == 1)
3293          {
3294            found = true;
3295            try
3296            {
3297              aGenerationId = Long.decode(attr.iterator().next().toString());
3298            }
3299            catch(Exception e)
3300            {
3301              logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3302            }
3303          }
3304        }
3305      }
3306    }
3307
3308    if (!found)
3309    {
3310      aGenerationId = computeGenerationId();
3311      saveGenerationId(aGenerationId);
3312
3313      if (logger.isTraceEnabled())
3314      {
3315        logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId);
3316      }
3317    }
3318    else
3319    {
3320      generationIdSavedStatus = true;
3321      if (logger.isTraceEnabled())
3322      {
3323        logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN()
3324            + " generationId=" + aGenerationId);
3325      }
3326    }
3327    return aGenerationId;
3328  }
3329
3330  /**
3331   * Do whatever is needed when a backup is started.
3332   * We need to make sure that the serverState is correctly save.
3333   */
3334  void backupStart()
3335  {
3336    state.save();
3337  }
3338
3339  /** Do whatever is needed when a backup is finished. */
3340  void backupEnd()
3341  {
3342    // Nothing is needed at the moment
3343  }
3344
3345  /*
3346   * Total Update >>
3347   */
3348
3349  /**
3350   * This method trigger an export of the replicated data.
3351   *
3352   * @param output               The OutputStream where the export should
3353   *                             be produced.
3354   * @throws DirectoryException  When needed.
3355   */
3356  @Override
3357  protected void exportBackend(OutputStream output) throws DirectoryException
3358  {
3359    exportBackend(output, false);
3360  }
3361
3362  /**
3363   * Export the entries from the backend and/or compute the generation ID.
3364   * The ieContext must have been set before calling.
3365   *
3366   * @param output              The OutputStream where the export should
3367   *                            be produced.
3368   * @param checksumOutput      A boolean indicating if this export is
3369   *                            invoked to perform a checksum only
3370   *
3371   * @return The computed       GenerationID.
3372   *
3373   * @throws DirectoryException when an error occurred
3374   */
3375  private long exportBackend(OutputStream output, boolean checksumOutput)
3376      throws DirectoryException
3377  {
3378    Backend<?> backend = getBackend();
3379
3380    //  Acquire a shared lock for the backend.
3381    try
3382    {
3383      String lockFile = LockFileManager.getBackendLockFileName(backend);
3384      StringBuilder failureReason = new StringBuilder();
3385      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
3386      {
3387        LocalizableMessage message =
3388            ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3389        logger.error(message);
3390        throw new DirectoryException(ResultCode.OTHER, message);
3391      }
3392    }
3393    catch (Exception e)
3394    {
3395      LocalizableMessage message =
3396          ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(),
3397              stackTraceToSingleLineString(e));
3398      logger.error(message);
3399      throw new DirectoryException(ResultCode.OTHER, message);
3400    }
3401
3402    long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN());
3403    long entryCount = Math.min(numberOfEntries, 1000);
3404    OutputStream os;
3405    ReplLDIFOutputStream ros = null;
3406    if (checksumOutput)
3407    {
3408      ros = new ReplLDIFOutputStream(entryCount);
3409      os = ros;
3410      try
3411      {
3412        os.write(Long.toString(numberOfEntries).getBytes());
3413      }
3414      catch(Exception e)
3415      {
3416        // Should never happen
3417      }
3418    }
3419    else
3420    {
3421      os = output;
3422    }
3423
3424    // baseDN branch is the only one included in the export
3425    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
3426    exportConfig.setIncludeBranches(newArrayList(getBaseDN()));
3427
3428    // For the checksum computing mode, only consider the 'stable' attributes
3429    if (checksumOutput)
3430    {
3431      String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" };
3432      Set<AttributeType> includeAttributes = new HashSet<>();
3433      for (String attrName : includeAttributeStrings)
3434      {
3435        includeAttributes.add(DirectoryServer.getSchema().getAttributeType(attrName));
3436      }
3437      exportConfig.setIncludeAttributes(includeAttributes);
3438    }
3439
3440    //  Launch the export.
3441    long genID = 0;
3442    try
3443    {
3444      backend.exportLDIF(exportConfig);
3445    }
3446    catch (DirectoryException de)
3447    {
3448      if (ros == null || ros.getNumExportedEntries() < entryCount)
3449      {
3450        LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
3451        logger.error(message);
3452        throw new DirectoryException(ResultCode.OTHER, message);
3453      }
3454    }
3455    catch (Exception e)
3456    {
3457      LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e));
3458      logger.error(message);
3459      throw new DirectoryException(ResultCode.OTHER, message);
3460    }
3461    finally
3462    {
3463      // Clean up after the export by closing the export config.
3464      // Will also flush the export and export the remaining entries.
3465      exportConfig.close();
3466
3467      if (checksumOutput)
3468      {
3469        genID = ros.getChecksumValue();
3470      }
3471
3472      //  Release the shared lock on the backend.
3473      try
3474      {
3475        String lockFile = LockFileManager.getBackendLockFileName(backend);
3476        StringBuilder failureReason = new StringBuilder();
3477        if (! LockFileManager.releaseLock(lockFile, failureReason))
3478        {
3479          LocalizableMessage message =
3480              WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3481          logger.warn(message);
3482          throw new DirectoryException(ResultCode.OTHER, message);
3483        }
3484      }
3485      catch (Exception e)
3486      {
3487        LocalizableMessage message =
3488            WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(),
3489                stackTraceToSingleLineString(e));
3490        logger.warn(message);
3491        throw new DirectoryException(ResultCode.OTHER, message);
3492      }
3493    }
3494    return genID;
3495  }
3496
3497  /**
3498   * Process backend before import.
3499   *
3500   * @param backend
3501   *          The backend.
3502   * @throws DirectoryException
3503   *           If the backend could not be disabled or locked exclusively.
3504   */
3505  private void preBackendImport(Backend<?> backend) throws DirectoryException
3506  {
3507    // Prevent the processing of the backend finalisation event as the import will disable the attached backend
3508    ignoreBackendInitializationEvent = true;
3509
3510    // FIXME setBackendEnabled should be part of TaskUtils ?
3511    TaskUtils.disableBackend(backend.getBackendID());
3512
3513    // Acquire an exclusive lock for the backend.
3514    String lockFile = LockFileManager.getBackendLockFileName(backend);
3515    StringBuilder failureReason = new StringBuilder();
3516    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
3517    {
3518      LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3519      logger.error(message);
3520      throw new DirectoryException(ResultCode.OTHER, message);
3521    }
3522  }
3523
3524  /**
3525   * This method triggers an import of the replicated data.
3526   *
3527   * @param input                The InputStream from which the data are read.
3528   * @throws DirectoryException  When needed.
3529   */
3530  @Override
3531  protected void importBackend(InputStream input) throws DirectoryException
3532  {
3533    Backend<?> backend = getBackend();
3534
3535    LDIFImportConfig importConfig = null;
3536    ImportExportContext ieCtx = getImportExportContext();
3537    try
3538    {
3539      if (!backend.supports(BackendOperation.LDIF_IMPORT))
3540      {
3541        ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
3542            ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
3543        return;
3544      }
3545
3546      importConfig = new LDIFImportConfig(input);
3547      importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN()));
3548      // We should not validate schema for replication
3549      importConfig.setValidateSchema(false);
3550      // Allow fractional replication ldif import plugin to be called
3551      importConfig.setInvokeImportPlugins(true);
3552      // Reset the follow import flag and message before starting the import
3553      importErrorMessageId = -1;
3554
3555      // TODO How to deal with rejected entries during the import
3556      File rejectsFile =
3557          getFileForPath("logs" + File.separator + "replInitRejectedEntries");
3558      importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
3559          ExistingFileBehavior.OVERWRITE);
3560
3561      // Process import
3562      preBackendImport(backend);
3563      backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext());
3564    }
3565    catch(Exception e)
3566    {
3567      ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
3568          ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e))));
3569    }
3570    finally
3571    {
3572      try
3573      {
3574        // Cleanup
3575        if (importConfig != null)
3576        {
3577          importConfig.close();
3578          closeBackendImport(backend); // Re-enable backend
3579          backend = getBackend();
3580        }
3581
3582        loadDataState();
3583
3584        if (ieCtx.getException() != null)
3585        {
3586          // When an error occurred during an import, most of times
3587          // the generationId coming in the root entry of the imported data,
3588          // is not valid anymore (partial data in the backend).
3589          generationId = computeGenerationId();
3590          saveGenerationId(generationId);
3591        }
3592      }
3593      catch (DirectoryException fe)
3594      {
3595        // If we already catch an Exception it's quite possible
3596        // that the loadDataState() and setGenerationId() fail
3597        // so we don't bother about the new Exception.
3598        // However if there was no Exception before we want
3599        // to return this Exception to the task creator.
3600        ieCtx.setExceptionIfNoneSet(new DirectoryException(
3601            ResultCode.OTHER,
3602            ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe))));
3603      }
3604    }
3605
3606    if (ieCtx.getException() != null)
3607    {
3608      throw ieCtx.getException();
3609    }
3610  }
3611
3612  /**
3613   * Make post import operations.
3614   * @param backend The backend implied in the import.
3615   * @exception DirectoryException Thrown when an error occurs.
3616   */
3617  private void closeBackendImport(Backend<?> backend) throws DirectoryException
3618  {
3619    String lockFile = LockFileManager.getBackendLockFileName(backend);
3620    StringBuilder failureReason = new StringBuilder();
3621
3622    // Release lock
3623    if (!LockFileManager.releaseLock(lockFile, failureReason))
3624    {
3625      LocalizableMessage message =
3626          WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3627      logger.warn(message);
3628      throw new DirectoryException(ResultCode.OTHER, message);
3629    }
3630
3631    TaskUtils.enableBackend(backend.getBackendID());
3632
3633    // Restore the processing of backend finalization events.
3634    ignoreBackendInitializationEvent = false;
3635
3636  }
3637
3638  /**
3639   * Retrieves a replication domain based on the baseDN.
3640   *
3641   * @param baseDN The baseDN of the domain to retrieve
3642   * @return The domain retrieved
3643   * @throws DirectoryException When an error occurred or no domain
3644   * match the provided baseDN.
3645   */
3646  public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN)
3647      throws DirectoryException
3648  {
3649    LDAPReplicationDomain replicationDomain = null;
3650
3651    // Retrieves the domain
3652    for (SynchronizationProvider<?> provider :
3653      DirectoryServer.getSynchronizationProviders())
3654    {
3655      if (!(provider instanceof MultimasterReplication))
3656      {
3657        LocalizableMessage message = ERR_INVALID_PROVIDER.get();
3658        throw new DirectoryException(ResultCode.OTHER, message);
3659      }
3660
3661      // From the domainDN retrieves the replication domain
3662      LDAPReplicationDomain domain =
3663        MultimasterReplication.findDomain(baseDN, null);
3664      if (domain == null)
3665      {
3666        break;
3667      }
3668      if (replicationDomain != null)
3669      {
3670        // Should never happen
3671        LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
3672        throw new DirectoryException(ResultCode.OTHER, message);
3673      }
3674      replicationDomain = domain;
3675    }
3676
3677    if (replicationDomain == null)
3678    {
3679      throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN));
3680    }
3681    return replicationDomain;
3682  }
3683
3684  /**
3685   * Returns the backend associated to this domain.
3686   * @return The associated backend.
3687   */
3688  private Backend<?> getBackend()
3689  {
3690    return DirectoryServer.getBackend(getBaseDN());
3691  }
3692
3693  /*
3694   * <<Total Update
3695   */
3696
3697  /**
3698   * Push the schema modifications contained in the given parameter as a
3699   * modification that would happen on a local server. The modifications are not
3700   * applied to the local schema backend and historical information is not
3701   * updated; but a CSN is generated and the ServerState associated to the
3702   * schema domain is updated.
3703   *
3704   * @param modifications
3705   *          The schema modifications to push
3706   */
3707  void synchronizeSchemaModifications(List<Modification> modifications)
3708  {
3709    ModifyOperation op = new ModifyOperationBasis(
3710        conn, nextOperationID(), nextMessageID(), null,
3711        DirectoryServer.getSchemaDN(), modifications);
3712
3713    final Entry schema;
3714    try
3715    {
3716      schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN());
3717    }
3718    catch (DirectoryException e)
3719    {
3720      logger.traceException(e);
3721      logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(),
3722              stackTraceToSingleLineString(e)));
3723      return;
3724    }
3725
3726    LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op);
3727    CSN csn = generateCSN(localOp);
3728    OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema));
3729    localOp.setAttachment(SYNCHROCONTEXT, ctx);
3730    localOp.setResultCode(ResultCode.SUCCESS);
3731    synchronize(localOp);
3732  }
3733
3734  /**
3735   * Check if the provided configuration is acceptable for add.
3736   *
3737   * @param configuration The configuration to check.
3738   * @param unacceptableReasons When the configuration is not acceptable, this
3739   *                            table is use to return the reasons why this
3740   *                            configuration is not acceptable.
3741   *
3742   * @return true if the configuration is acceptable, false other wise.
3743   */
3744  static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration,
3745      List<LocalizableMessage> unacceptableReasons)
3746  {
3747    // Check that there is not already a domain with the same DN
3748    final DN dn = configuration.getBaseDN();
3749    LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
3750    if (domain != null && domain.getBaseDN().equals(dn))
3751    {
3752      unacceptableReasons.add(ERR_SYNC_INVALID_DN.get());
3753      return false;
3754    }
3755
3756    // Check that the base DN is configured as a base-dn of the directory server
3757    if (DirectoryServer.getBackend(dn) == null)
3758    {
3759      unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn));
3760      return false;
3761    }
3762
3763    // Check fractional configuration
3764    try
3765    {
3766      isFractionalConfigAcceptable(configuration);
3767    } catch (ConfigException e)
3768    {
3769      unacceptableReasons.add(e.getMessageObject());
3770      return false;
3771    }
3772
3773    return true;
3774  }
3775
3776  @Override
3777  public ConfigChangeResult applyConfigurationChange(
3778         ReplicationDomainCfg configuration)
3779  {
3780    this.config = configuration;
3781    changeConfig(configuration);
3782
3783    // Read assured + fractional configuration and each time reconnect if needed
3784    readAssuredConfig(configuration, true);
3785    readFractionalConfig(configuration, true);
3786
3787    solveConflictFlag = isSolveConflict(configuration);
3788
3789    final ConfigChangeResult ccr = new ConfigChangeResult();
3790    try
3791    {
3792      storeECLConfiguration(configuration);
3793    }
3794    catch(Exception e)
3795    {
3796      ccr.setResultCode(ResultCode.OTHER);
3797    }
3798    return ccr;
3799  }
3800
3801  @Override
3802  public boolean isConfigurationChangeAcceptable(
3803         ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
3804  {
3805    // Check that a import/export is not in progress
3806    if (ieRunning())
3807    {
3808      unacceptableReasons.add(
3809          NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get());
3810      return false;
3811    }
3812
3813    // Check fractional configuration
3814    try
3815    {
3816      isFractionalConfigAcceptable(configuration);
3817      return true;
3818    }
3819    catch (ConfigException e)
3820    {
3821      unacceptableReasons.add(e.getMessageObject());
3822      return false;
3823    }
3824  }
3825
3826  @Override
3827  public Map<String, String> getAlerts()
3828  {
3829    Map<String, String> alerts = new LinkedHashMap<>();
3830
3831    alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT,
3832               ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT);
3833    return alerts;
3834  }
3835
3836  @Override
3837  public String getClassName()
3838  {
3839    return CLASS_NAME;
3840  }
3841
3842  @Override
3843  public DN getComponentEntryDN()
3844  {
3845    return config.dn();
3846  }
3847
3848  /** Starts the Replication Domain. */
3849  public void start()
3850  {
3851    // Create the ServerStateFlush thread
3852    flushThread.start();
3853
3854    startListenService();
3855  }
3856
3857  /** Remove the configuration of the external changelog from this domain configuration. */
3858  private void removeECLDomainCfg()
3859  {
3860    try
3861    {
3862      DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn());
3863      if (DirectoryServer.getConfigurationHandler().hasEntry(eclConfigEntryDN))
3864      {
3865        DirectoryServer.getConfigurationHandler().deleteEntry(eclConfigEntryDN);
3866      }
3867    }
3868    catch(Exception e)
3869    {
3870      logger.traceException(e);
3871      logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e));
3872    }
3873  }
3874
3875  /**
3876   * Store the provided ECL configuration for the domain.
3877   * @param  domCfg       The provided configuration.
3878   * @throws ConfigException When an error occurred.
3879   */
3880  private void storeECLConfiguration(ReplicationDomainCfg domCfg)
3881      throws ConfigException
3882  {
3883    ExternalChangelogDomainCfg eclDomCfg = null;
3884    // create the ecl config if it does not exist
3885    // There may not be any config entry related to this domain in some
3886    // unit test cases
3887    try
3888    {
3889      DN configDn = config.dn();
3890      ConfigurationHandler configHandler = DirectoryServer.getConfigurationHandler();
3891      if (configHandler.hasEntry(config.dn()))
3892      {
3893        try
3894        { eclDomCfg = domCfg.getExternalChangelogDomain();
3895        } catch(Exception e) { /* do nothing */ }
3896        // domain with no config entry only when running unit tests
3897        if (eclDomCfg == null)
3898        {
3899          // no ECL config provided hence create a default one
3900          // create the default one
3901          DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn);
3902          if (!configHandler.hasEntry(eclConfigEntryDN))
3903          {
3904            // no entry exist yet for the ECL config for this domain
3905            // create it
3906            String ldif = makeLdif(
3907                "dn: cn=external changelog," + configDn,
3908                "objectClass: top",
3909                "objectClass: ds-cfg-external-changelog-domain",
3910                "cn: external changelog",
3911                "ds-cfg-enabled: " + !getBackend().isPrivateBackend());
3912            LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
3913                new StringReader(ldif));
3914            // No need to validate schema in replication
3915            ldifImportConfig.setValidateSchema(false);
3916            LDIFReader reader = new LDIFReader(ldifImportConfig);
3917            Entry eclEntry = reader.readEntry();
3918            configHandler.addEntry(Converters.from(eclEntry));
3919            ldifImportConfig.close();
3920          }
3921        }
3922      }
3923      eclDomCfg = domCfg.getExternalChangelogDomain();
3924      if (eclDomain != null)
3925      {
3926        eclDomain.applyConfigurationChange(eclDomCfg);
3927      }
3928      else
3929      {
3930        // Create the ECL domain object
3931        eclDomain = new ExternalChangelogDomain(this, eclDomCfg);
3932      }
3933    }
3934    catch (Exception e)
3935    {
3936      throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get(
3937          "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e);
3938    }
3939  }
3940
3941  private static String makeLdif(String... lines)
3942  {
3943    final StringBuilder buffer = new StringBuilder();
3944    for (String line : lines) {
3945      buffer.append(line).append(EOL);
3946    }
3947    // Append an extra line so we can append LDIF Strings.
3948    buffer.append(EOL);
3949    return buffer.toString();
3950  }
3951
3952  @Override
3953  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
3954  {
3955    // Check domain fractional configuration consistency with local
3956    // configuration variables
3957    forceBadDataSet = !isBackendFractionalConfigConsistent();
3958
3959    super.sessionInitiated(initStatus, rsState);
3960
3961    // Now for bad data set status if needed
3962    if (forceBadDataSet)
3963    {
3964      signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
3965      logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN());
3966      return; // Do not send changes to the replication server
3967    }
3968
3969    try
3970    {
3971      /*
3972       * We must not publish changes to a replicationServer that has
3973       * not seen all our previous changes because this could cause
3974       * some other ldap servers to miss those changes.
3975       * Check that the ReplicationServer has seen all our previous
3976       * changes.
3977       */
3978      CSN replServerMaxCSN = rsState.getCSN(getServerId());
3979
3980      // we don't want to update from here (a DS) an empty RS because
3981      // normally the RS should have been updated by other RSes except for
3982      // very last changes lost if the local connection was broken
3983      // ... hence the RS we are connected to should not be empty
3984      // ... or if it is empty, it is due to a voluntary reset
3985      // and we don't want to update it with our changes that could be huge.
3986      if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0)
3987      {
3988        CSN ourMaxCSN = state.getMaxCSN(getServerId());
3989        if (ourMaxCSN != null
3990            && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN))
3991        {
3992          pendingChanges.setRecovering(true);
3993          broker.setRecoveryRequired(true);
3994          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
3995          if (this.rsUpdater.compareAndSet(null, rsUpdater))
3996          {
3997            rsUpdater.start();
3998          }
3999        }
4000      }
4001    } catch (Exception e)
4002    {
4003      logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e));
4004    }
4005  }
4006
4007  /**
4008   * Build the list of changes that have been processed by this server after the
4009   * CSN given as a parameter and publish them using the given session.
4010   *
4011   * @param startCSN
4012   *          The CSN where we need to start the search
4013   * @param session
4014   *          The session to use to publish the changes
4015   * @return A boolean indicating he success of the operation.
4016   * @throws Exception
4017   *           if an Exception happens during the search.
4018   */
4019  boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session)
4020      throws Exception
4021  {
4022    // Trim the changes in replayOperations that are older than the startCSN.
4023    synchronized (replayOperations)
4024    {
4025      Iterator<CSN> it = replayOperations.keySet().iterator();
4026      while (it.hasNext())
4027      {
4028        if (shutdown.get())
4029        {
4030          return false;
4031        }
4032        if (it.next().isNewerThan(startCSN))
4033        {
4034          break;
4035        }
4036        it.remove();
4037      }
4038    }
4039
4040    CSN lastRetrievedChange;
4041    InternalSearchOperation op;
4042    CSN currentStartCSN = startCSN;
4043    do
4044    {
4045      if (shutdown.get())
4046      {
4047        return false;
4048      }
4049
4050      lastRetrievedChange = null;
4051      // We can't do the search in one go because we need to store the results
4052      // so that we are sure we send the operations in order and because the
4053      // list might be large.
4054      // So we search by interval of 10 seconds and store the results in the
4055      // replayOperations list so that they are sorted before sending them.
4056      long missingChangesDelta = currentStartCSN.getTime() + 10000;
4057      CSN endCSN = new CSN(missingChangesDelta, Integer.MAX_VALUE, getServerId());
4058
4059      ScanSearchListener listener =
4060        new ScanSearchListener(currentStartCSN, endCSN);
4061      op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN,
4062              listener);
4063
4064      // Publish and remove all the changes from the replayOperations list
4065      // that are older than the endCSN.
4066      final List<FakeOperation> opsToSend = new LinkedList<>();
4067      synchronized (replayOperations)
4068      {
4069        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
4070        while (itOp.hasNext())
4071        {
4072          if (shutdown.get())
4073          {
4074            return false;
4075          }
4076          FakeOperation fakeOp = itOp.next();
4077          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
4078              || !state.cover(fakeOp.getCSN())
4079              // do not look for replay operations in the future
4080              || currentStartCSN.isNewerThan(now()))
4081          {
4082            break;
4083          }
4084
4085          lastRetrievedChange = fakeOp.getCSN();
4086          opsToSend.add(fakeOp);
4087          itOp.remove();
4088        }
4089      }
4090
4091      for (FakeOperation opToSend : opsToSend)
4092      {
4093        if (shutdown.get())
4094        {
4095          return false;
4096        }
4097        session.publishRecovery(opToSend.generateMessage());
4098      }
4099
4100      if (lastRetrievedChange != null)
4101      {
4102        if (logger.isDebugEnabled())
4103        {
4104          logger.debug(LocalizableMessage.raw("publish loop"
4105                  + " >=" + currentStartCSN + " <=" + endCSN
4106                  + " nentries=" + op.getEntriesSent()
4107                  + " result=" + op.getResultCode()
4108                  + " lastRetrievedChange=" + lastRetrievedChange));
4109        }
4110        currentStartCSN = lastRetrievedChange;
4111      }
4112      else
4113      {
4114        if (logger.isDebugEnabled())
4115        {
4116          logger.debug(LocalizableMessage.raw("publish loop"
4117                  + " >=" + currentStartCSN + " <=" + endCSN
4118                  + " nentries=" + op.getEntriesSent()
4119                  + " result=" + op.getResultCode()
4120                  + " no changes"));
4121        }
4122        currentStartCSN = endCSN;
4123      }
4124    } while (pendingChanges.recoveryUntil(currentStartCSN)
4125          && op.getResultCode().equals(ResultCode.SUCCESS));
4126
4127    return op.getResultCode().equals(ResultCode.SUCCESS);
4128  }
4129
4130  private static CSN now()
4131  {
4132    // ensure now() will always come last with isNewerThan() test,
4133    // even though the timestamp, or the timestamp and seqnum would be the same
4134    return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE);
4135  }
4136
4137  /**
4138   * Search for the changes that happened since fromCSN based on the historical
4139   * attribute. The only changes that will be send will be the one generated on
4140   * the serverId provided in fromCSN.
4141   *
4142   * @param baseDN
4143   *          the base DN
4144   * @param startCsnInclusive
4145   *          The CSN from which we want the changes
4146   * @param endCsnInclusive
4147   *          The max CSN that the search should return
4148   * @param resultListener
4149   *          The listener that will process the entries returned
4150   * @return the internal search operation
4151   * @throws Exception
4152   *           when raised.
4153   */
4154  private static InternalSearchOperation searchForChangedEntries(DN baseDN,
4155      CSN startCsnInclusive, CSN endCsnInclusive, InternalSearchListener resultListener)
4156      throws Exception
4157  {
4158    final String assertionValue = endCsnInclusive == null
4159        ? ">= " + startCsnInclusive
4160        : ">= " + startCsnInclusive + ", <= " + endCsnInclusive;
4161    final SearchFilter filter =
4162        SearchFilter.createExtensibleMatchFilter(AttributeDescription.valueOf(HISTORICAL_ATTRIBUTE_NAME)
4163                                                     .getAttributeType(),
4164                                                 ByteString.valueOfUtf8(assertionValue),
4165                                                 EXTMR_HISTORICAL_CSN_RANGE_OID, false);
4166    SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter)
4167        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
4168    return getRootConnection().processSearch(request, resultListener);
4169  }
4170
4171  /**
4172   * Search for the changes that happened since fromCSN based on the historical
4173   * attribute. The only changes that will be send will be the one generated on
4174   * the serverId provided in fromCSN.
4175   *
4176   * @param baseDN
4177   *          the base DN
4178   * @param fromCSN
4179   *          The CSN from which we want the changes
4180   * @param resultListener
4181   *          that will process the entries returned.
4182   * @return the internal search operation
4183   * @throws Exception
4184   *           when raised.
4185   */
4186  static InternalSearchOperation searchForChangedEntries(DN baseDN,
4187      CSN fromCSN, InternalSearchListener resultListener) throws Exception
4188  {
4189    return searchForChangedEntries(baseDN, fromCSN, null, resultListener);
4190  }
4191
4192  /**
4193   * This method should return the total number of objects in the
4194   * replicated domain.
4195   * This count will be used for reporting.
4196   *
4197   * @throws DirectoryException when needed.
4198   *
4199   * @return The number of objects in the replication domain.
4200   */
4201  @Override
4202  public long countEntries() throws DirectoryException
4203  {
4204    Backend<?> backend = getBackend();
4205    if (!backend.supports(BackendOperation.LDIF_EXPORT))
4206    {
4207      LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID());
4208      logger.error(msg);
4209      throw new DirectoryException(ResultCode.OTHER, msg);
4210    }
4211
4212    return backend.getNumberOfEntriesInBaseDN(getBaseDN());
4213  }
4214
4215  @Override
4216  public boolean processUpdate(UpdateMsg updateMsg)
4217  {
4218    // Ignore message if fractional configuration is inconsistent and
4219    // we have been passed into bad data set status
4220    if (forceBadDataSet)
4221    {
4222      return false;
4223    }
4224
4225    if (updateMsg instanceof LDAPUpdateMsg)
4226    {
4227      LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
4228
4229      // Put the UpdateMsg in the RemotePendingChanges list.
4230      if (!remotePendingChanges.putRemoteUpdate(msg))
4231      {
4232        /*
4233         * Already received this change so ignore it. This may happen if there
4234         * are uncommitted changes in the queue and session failover occurs
4235         * causing a recovery of all changes since the current committed server
4236         * state. See OPENDJ-1115.
4237         */
4238        if (logger.isTraceEnabled())
4239        {
4240          logger.trace(
4241                  "LDAPReplicationDomain.processUpdate: ignoring "
4242                  + "duplicate change %s", msg.getCSN());
4243        }
4244        return true;
4245      }
4246
4247      // Put update message into the replay queue
4248      // (block until some place in the queue is available)
4249      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
4250      while (!isListenerShuttingDown())
4251      {
4252        // loop until we can offer to the queue or shutdown was initiated
4253        try
4254        {
4255          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
4256          {
4257            // successful offer to the queue, let's exit the loop
4258            break;
4259          }
4260        }
4261        catch (InterruptedException e)
4262        {
4263          // Thread interrupted: check for shutdown.
4264          Thread.currentThread().interrupt();
4265        }
4266      }
4267
4268      return false;
4269    }
4270
4271    // unknown message type, this should not happen, just ignore it.
4272    return true;
4273  }
4274
4275  @Override
4276  public void addAdditionalMonitoring(MonitorData attributes)
4277  {
4278    attributes.add("pending-updates", pendingChanges.size());
4279    attributes.add("replayed-updates-ok", numReplayedPostOpCalled);
4280    attributes.add("resolved-modify-conflicts", numResolvedModifyConflicts);
4281    attributes.add("resolved-naming-conflicts", numResolvedNamingConflicts);
4282    attributes.add("unresolved-naming-conflicts", numUnresolvedNamingConflicts);
4283    attributes.add("remote-pending-changes-size", remotePendingChanges.getQueueSize());
4284    attributes.add("dependent-changes-size", remotePendingChanges.getDependentChangesSize());
4285    attributes.add("changes-in-progress-size", remotePendingChanges.changesInProgressSize());
4286  }
4287
4288  /**
4289   * Verifies that the given string represents a valid source
4290   * from which this server can be initialized.
4291   * @param sourceString The string representing the source
4292   * @return The source as a integer value
4293   * @throws DirectoryException if the string is not valid
4294   */
4295  public int decodeSource(String sourceString) throws DirectoryException
4296  {
4297    int source = 0;
4298    try
4299    {
4300      source = Integer.decode(sourceString);
4301      if (source >= -1 && source != getServerId())
4302      {
4303        // TODO Verifies serverID is in the domain
4304        // We should check here that this is a server implied
4305        // in the current domain.
4306        return source;
4307      }
4308    }
4309    catch (Exception e)
4310    {
4311      LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(
4312          getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e));
4313      throw new DirectoryException(ResultCode.OTHER, message, e);
4314    }
4315
4316    LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, "");
4317    throw new DirectoryException(ResultCode.OTHER, message);
4318  }
4319
4320  /**
4321   * Called by synchronize post op plugin in order to add the entry historical
4322   * attributes to the UpdateMsg.
4323   * @param msg an replication update message
4324   * @param op  the operation in progress
4325   */
4326  private void addEntryAttributesForCL(UpdateMsg msg,
4327      PostOperationOperation op)
4328  {
4329    if (op instanceof PostOperationDeleteOperation)
4330    {
4331      PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op;
4332      final Set<String> names = getEclIncludesForDeletes();
4333      Entry entry = delOp.getEntryToDelete();
4334      final DeleteMsg deleteMsg = (DeleteMsg) msg;
4335      deleteMsg.setEclIncludes(getIncludedAttributes(entry, names));
4336
4337      // For delete only, add the Authorized DN since it's required in the
4338      // ECL entry but is not part of rest of the message.
4339      DN deleterDN = delOp.getAuthorizationDN();
4340      if (deleterDN != null)
4341      {
4342        deleteMsg.setInitiatorsName(deleterDN.toString());
4343      }
4344    }
4345    else if (op instanceof PostOperationModifyOperation)
4346    {
4347      PostOperationModifyOperation modOp = (PostOperationModifyOperation) op;
4348      Set<String> names = getEclIncludes();
4349      Entry entry = modOp.getCurrentEntry();
4350      ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4351    }
4352    else if (op instanceof PostOperationModifyDNOperation)
4353    {
4354      PostOperationModifyDNOperation modDNOp =
4355        (PostOperationModifyDNOperation) op;
4356      Set<String> names = getEclIncludes();
4357      Entry entry = modDNOp.getOriginalEntry();
4358      ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4359    }
4360    else if (op instanceof PostOperationAddOperation)
4361    {
4362      PostOperationAddOperation addOp = (PostOperationAddOperation) op;
4363      Set<String> names = getEclIncludes();
4364      Entry entry = addOp.getEntryToAdd();
4365      ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4366    }
4367  }
4368
4369  private Collection<Attribute> getIncludedAttributes(Entry entry,
4370      Set<String> names)
4371  {
4372    if (names.isEmpty())
4373    {
4374      // Fast-path.
4375      return Collections.emptySet();
4376    }
4377    else if (names.size() == 1 && names.contains("*"))
4378    {
4379      // Potential fast-path for delete operations.
4380      List<Attribute> attributes = new LinkedList<>();
4381      for (List<Attribute> attributeList : entry.getUserAttributes().values())
4382      {
4383        attributes.addAll(attributeList);
4384      }
4385      Attribute objectClassAttribute = entry.getObjectClassAttribute();
4386      if (objectClassAttribute != null)
4387      {
4388        attributes.add(objectClassAttribute);
4389      }
4390      return attributes;
4391    }
4392    else
4393    {
4394      // Expand @objectclass references in attribute list if needed.
4395      // We do this now in order to take into account dynamic schema changes.
4396      final Set<String> expandedNames = getExpandedNames(names);
4397      final Entry filteredEntry =
4398          entry.filterEntry(expandedNames, false, false, false);
4399      return filteredEntry.getAttributes();
4400    }
4401  }
4402
4403  private Set<String> getExpandedNames(Set<String> names)
4404  {
4405    // Only rebuild the attribute set if necessary.
4406    if (!needsExpanding(names))
4407    {
4408      return names;
4409    }
4410
4411    final Set<String> expandedNames = new HashSet<>(names.size());
4412    for (String name : names)
4413    {
4414      if (name.startsWith("@"))
4415      {
4416        String ocName = name.substring(1);
4417        ObjectClass objectClass = DirectoryServer.getSchema().getObjectClass(ocName);
4418        if (!objectClass.isPlaceHolder())
4419        {
4420          for (AttributeType at : objectClass.getRequiredAttributes())
4421          {
4422            expandedNames.add(at.getNameOrOID());
4423          }
4424          for (AttributeType at : objectClass.getOptionalAttributes())
4425          {
4426            expandedNames.add(at.getNameOrOID());
4427          }
4428        }
4429      }
4430      else
4431      {
4432        expandedNames.add(name);
4433      }
4434    }
4435    return expandedNames;
4436  }
4437
4438  private boolean needsExpanding(Set<String> names)
4439  {
4440    for (String name : names)
4441    {
4442      if (name.startsWith("@"))
4443      {
4444        return true;
4445      }
4446    }
4447    return false;
4448  }
4449
4450  /**
4451   * Gets the fractional configuration of this domain.
4452   * @return The fractional configuration of this domain.
4453   */
4454  FractionalConfig getFractionalConfig()
4455  {
4456    return fractionalConfig;
4457  }
4458
4459  /**
4460   * This bean is a utility class used for holding the parsing
4461   * result of a fractional configuration. It also contains some facility
4462   * methods like fractional configuration comparison...
4463   */
4464  static class FractionalConfig
4465  {
4466    /**
4467     * Tells if fractional replication is enabled or not (some fractional
4468     * constraints have been put in place). If this is true then
4469     * fractionalExclusive explains the configuration mode and either
4470     * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or
4471     * both should be filled with something.
4472     */
4473    private boolean fractional;
4474
4475    /**
4476     * - If true, tells that the configured fractional replication is exclusive:
4477     * Every attributes contained in fractionalSpecificClassesAttributes and
4478     * fractionalAllClassesAttributes should be ignored when replaying operation
4479     * in local backend.
4480     * - If false, tells that the configured fractional replication is
4481     * inclusive:
4482     * Only attributes contained in fractionalSpecificClassesAttributes and
4483     * fractionalAllClassesAttributes should be taken into account in local
4484     * backend.
4485     */
4486    private boolean fractionalExclusive = true;
4487
4488    /**
4489     * Used in fractional replication: holds attributes of a specific object class.
4490     * - key = object class (name or OID of the class)
4491     * - value = the attributes of that class that should be taken into account
4492     * (inclusive or exclusive fractional replication) (name or OID of the
4493     * attribute)
4494     * When an operation coming from the network is to be locally replayed, if
4495     * the concerned entry has an objectClass attribute equals to 'key':
4496     * - inclusive mode: only the attributes in 'value' will be added/deleted/modified
4497     * - exclusive mode: the attributes in 'value' will not be added/deleted/modified
4498     */
4499    private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>();
4500
4501    /**
4502     * Used in fractional replication: holds attributes of any object class.
4503     * When an operation coming from the network is to be locally replayed:
4504     * - inclusive mode: only attributes of the matching entry not present in
4505     * fractionalAllClassesAttributes will be added/deleted/modified
4506     * - exclusive mode: attributes of the matching entry present in
4507     * fractionalAllClassesAttributes will not be added/deleted/modified
4508     * The attributes may be in human readable form of OID form.
4509     */
4510    private Set<String> fractionalAllClassesAttributes = new HashSet<>();
4511
4512    /** Base DN the fractional configuration is for. */
4513    private final DN baseDN;
4514
4515    /**
4516     * Constructs a new fractional configuration object.
4517     * @param baseDN The base DN the object is for.
4518     */
4519    private FractionalConfig(DN baseDN)
4520    {
4521      this.baseDN = baseDN;
4522    }
4523
4524    /**
4525     * Getter for fractional.
4526     * @return True if the configuration has fractional enabled
4527     */
4528    boolean isFractional()
4529    {
4530      return fractional;
4531    }
4532
4533    /**
4534     * Set the fractional parameter.
4535     * @param fractional The fractional parameter
4536     */
4537    private void setFractional(boolean fractional)
4538    {
4539      this.fractional = fractional;
4540    }
4541
4542    /**
4543     * Getter for fractionalExclusive.
4544     * @return True if the configuration has fractional exclusive enabled
4545     */
4546    boolean isFractionalExclusive()
4547    {
4548      return fractionalExclusive;
4549    }
4550
4551    /**
4552     * Set the fractionalExclusive parameter.
4553     * @param fractionalExclusive The fractionalExclusive parameter
4554     */
4555    private void setFractionalExclusive(boolean fractionalExclusive)
4556    {
4557      this.fractionalExclusive = fractionalExclusive;
4558    }
4559
4560    /**
4561     * Getter for fractionalSpecificClassesAttributes attribute.
4562     * @return The fractionalSpecificClassesAttributes attribute.
4563     */
4564    Map<String, Set<String>> getFractionalSpecificClassesAttributes()
4565    {
4566      return fractionalSpecificClassesAttributes;
4567    }
4568
4569    /**
4570     * Set the fractionalSpecificClassesAttributes parameter.
4571     * @param fractionalSpecificClassesAttributes The
4572     * fractionalSpecificClassesAttributes parameter to set.
4573     */
4574    private void setFractionalSpecificClassesAttributes(
4575        Map<String, Set<String>> fractionalSpecificClassesAttributes)
4576    {
4577      this.fractionalSpecificClassesAttributes =
4578        fractionalSpecificClassesAttributes;
4579    }
4580
4581    /**
4582     * Getter for fractionalSpecificClassesAttributes attribute.
4583     * @return The fractionalSpecificClassesAttributes attribute.
4584     */
4585    Set<String> getFractionalAllClassesAttributes()
4586    {
4587      return fractionalAllClassesAttributes;
4588    }
4589
4590    /**
4591     * Set the fractionalAllClassesAttributes parameter.
4592     * @param fractionalAllClassesAttributes The
4593     * fractionalSpecificClassesAttributes parameter to set.
4594     */
4595    private void setFractionalAllClassesAttributes(
4596        Set<String> fractionalAllClassesAttributes)
4597    {
4598      this.fractionalAllClassesAttributes = fractionalAllClassesAttributes;
4599    }
4600
4601    /**
4602     * Getter for the base baseDN.
4603     * @return The baseDN attribute.
4604     */
4605    DN getBaseDn()
4606    {
4607      return baseDN;
4608    }
4609
4610    /**
4611     * Extract the fractional configuration from the passed domain configuration
4612     * entry.
4613     * @param configuration The configuration object
4614     * @return The fractional replication configuration.
4615     * @throws ConfigException If an error occurred.
4616     */
4617    static FractionalConfig toFractionalConfig(
4618      ReplicationDomainCfg configuration) throws ConfigException
4619    {
4620      // Prepare fractional configuration variables to parse
4621      Iterator<String> exclIt = configuration.getFractionalExclude().iterator();
4622      Iterator<String> inclIt = configuration.getFractionalInclude().iterator();
4623
4624      // Get potentially new fractional configuration
4625      Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>();
4626      Set<String> newFractionalAllClassesAttributes = new HashSet<>();
4627
4628      int newFractionalMode = parseFractionalConfig(exclIt, inclIt,
4629        newFractionalSpecificClassesAttributes,
4630        newFractionalAllClassesAttributes);
4631
4632      // Create matching parsed config object
4633      FractionalConfig result = new FractionalConfig(configuration.getBaseDN());
4634      switch (newFractionalMode)
4635      {
4636        case NOT_FRACTIONAL:
4637          result.setFractional(false);
4638          result.setFractionalExclusive(true);
4639          break;
4640        case EXCLUSIVE_FRACTIONAL:
4641        case INCLUSIVE_FRACTIONAL:
4642          result.setFractional(true);
4643          result.setFractionalExclusive(
4644              newFractionalMode == EXCLUSIVE_FRACTIONAL);
4645          break;
4646      }
4647      result.setFractionalSpecificClassesAttributes(
4648        newFractionalSpecificClassesAttributes);
4649      result.setFractionalAllClassesAttributes(
4650        newFractionalAllClassesAttributes);
4651      return result;
4652    }
4653
4654    /**
4655     * Parses a fractional replication configuration, filling the empty passed
4656     * variables and returning the used fractional mode. The 2 passed variables
4657     * to fill should be initialized (not null) and empty.
4658     * @param exclIt The list of fractional exclude configuration values (may be
4659     *               null)
4660     * @param inclIt The list of fractional include configuration values (may be
4661     *               null)
4662     * @param fractionalSpecificClassesAttributes An empty map to be filled with
4663     *        what is read from the fractional configuration properties.
4664     * @param fractionalAllClassesAttributes An empty list to be filled with
4665     *        what is read from the fractional configuration properties.
4666     * @return the fractional mode deduced from the passed configuration:
4667     *         not fractional, exclusive fractional or inclusive fractional
4668     *         modes
4669     */
4670    private static int parseFractionalConfig(
4671      Iterator<?> exclIt, Iterator<?> inclIt,
4672      Map<String, Set<String>> fractionalSpecificClassesAttributes,
4673      Set<String> fractionalAllClassesAttributes) throws ConfigException
4674    {
4675      // Determine if fractional-exclude or fractional-include property is used:
4676      // only one of them is allowed
4677      int fractionalMode;
4678      Iterator<?> iterator;
4679      if (exclIt != null && exclIt.hasNext())
4680      {
4681        if (inclIt != null && inclIt.hasNext())
4682        {
4683          throw new ConfigException(
4684            NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get());
4685        }
4686
4687        fractionalMode = EXCLUSIVE_FRACTIONAL;
4688        iterator = exclIt;
4689      }
4690      else
4691      {
4692        if (inclIt != null && inclIt.hasNext())
4693        {
4694          fractionalMode = INCLUSIVE_FRACTIONAL;
4695          iterator = inclIt;
4696        }
4697        else
4698        {
4699          return NOT_FRACTIONAL;
4700        }
4701      }
4702
4703      while (iterator.hasNext())
4704      {
4705        // Parse a value with the form class:attr1,attr2...
4706        // or *:attr1,attr2...
4707        String fractCfgStr = iterator.next().toString();
4708        StringTokenizer st = new StringTokenizer(fractCfgStr, ":");
4709        int nTokens = st.countTokens();
4710        if (nTokens < 2)
4711        {
4712          throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT.get(fractCfgStr));
4713        }
4714        // Get the class name
4715        String classNameLower = st.nextToken().toLowerCase();
4716        boolean allClasses = "*".equals(classNameLower);
4717        // Get the attributes
4718        String attributes = st.nextToken();
4719        st = new StringTokenizer(attributes, ",");
4720        while (st.hasMoreTokens())
4721        {
4722          String attrNameLower = st.nextToken().toLowerCase();
4723          // Store attribute in the appropriate variable
4724          if (allClasses)
4725          {
4726            fractionalAllClassesAttributes.add(attrNameLower);
4727          }
4728          else
4729          {
4730            Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower);
4731            if (attrList == null)
4732            {
4733              attrList = new LinkedHashSet<>();
4734              fractionalSpecificClassesAttributes.put(classNameLower, attrList);
4735            }
4736            attrList.add(attrNameLower);
4737          }
4738        }
4739      }
4740      return fractionalMode;
4741    }
4742
4743    /** Return type of the parseFractionalConfig method. */
4744    private static final int NOT_FRACTIONAL = 0;
4745    private static final int EXCLUSIVE_FRACTIONAL = 1;
4746    private static final int INCLUSIVE_FRACTIONAL = 2;
4747
4748    /**
4749     * Get an integer representation of the domain fractional configuration.
4750     * @return An integer representation of the domain fractional configuration.
4751     */
4752    private int fractionalConfigToInt()
4753    {
4754      if (!fractional)
4755      {
4756        return NOT_FRACTIONAL;
4757      }
4758      else if (fractionalExclusive)
4759      {
4760        return EXCLUSIVE_FRACTIONAL;
4761      }
4762      return INCLUSIVE_FRACTIONAL;
4763    }
4764
4765    /**
4766     * Compare 2 fractional replication configurations and returns true if they
4767     * are equivalent.
4768     * @param cfg1 First fractional configuration
4769     * @param cfg2 Second fractional configuration
4770     * @return True if both configurations are equivalent.
4771     * @throws ConfigException If some classes or attributes could not be
4772     * retrieved from the schema.
4773     */
4774    private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1,
4775        FractionalConfig cfg2) throws ConfigException
4776    {
4777      // Compare base DNs just to be consistent
4778      if (!cfg1.getBaseDn().equals(cfg2.getBaseDn()))
4779      {
4780        return false;
4781      }
4782
4783      // Compare modes
4784      if (cfg1.isFractional() != cfg2.isFractional()
4785          || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
4786      {
4787        return false;
4788      }
4789
4790      // Compare all classes attributes
4791      Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes();
4792      Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes();
4793      if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2))
4794      {
4795        return false;
4796      }
4797
4798      // Compare specific classes attributes
4799      Map<String, Set<String>> specificClassesAttrs1 =
4800          cfg1.getFractionalSpecificClassesAttributes();
4801      Map<String, Set<String>> specificClassesAttrs2 =
4802          cfg2.getFractionalSpecificClassesAttributes();
4803      if (specificClassesAttrs1.size() != specificClassesAttrs2.size())
4804      {
4805        return false;
4806      }
4807
4808      /*
4809       * Check consistency of specific classes attributes
4810       *
4811       * For each class in specificClassesAttributes1, check that the attribute
4812       * list is equivalent to specificClassesAttributes2 attribute list
4813       */
4814      Schema schema = DirectoryServer.getSchema();
4815      for (String className1 : specificClassesAttrs1.keySet())
4816      {
4817        // Get class from specificClassesAttributes1
4818        ObjectClass objectClass1 = schema.getObjectClass(className1);
4819        if (objectClass1.isPlaceHolder())
4820        {
4821          throw new ConfigException(
4822            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1));
4823        }
4824
4825        // Look for matching one in specificClassesAttributes2
4826        boolean foundClass = false;
4827        for (String className2 : specificClassesAttrs2.keySet())
4828        {
4829          ObjectClass objectClass2 = schema.getObjectClass(className2);
4830          if (objectClass2.isPlaceHolder())
4831          {
4832            throw new ConfigException(
4833              NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2));
4834          }
4835          if (objectClass1.equals(objectClass2))
4836          {
4837            foundClass = true;
4838            // Now compare the 2 attribute lists
4839            Set<String> attributes1 = specificClassesAttrs1.get(className1);
4840            Set<String> attributes2 = specificClassesAttrs2.get(className2);
4841            if (!areAttributesEquivalent(attributes1, attributes2))
4842            {
4843              return false;
4844            }
4845            break;
4846          }
4847        }
4848        // Found matching class ?
4849        if (!foundClass)
4850        {
4851          return false;
4852        }
4853      }
4854
4855      return true;
4856    }
4857  }
4858
4859  /**
4860   * Specifies whether this domain is enabled/disabled regarding the ECL.
4861   * @return enabled/disabled for the ECL.
4862   */
4863  boolean isECLEnabled()
4864  {
4865    return this.eclDomain.isEnabled();
4866  }
4867
4868  /**
4869   * Return the minimum time (in ms) that the domain keeps the historical
4870   * information necessary to solve conflicts.
4871   *
4872   * @return the purge delay.
4873   */
4874  long getHistoricalPurgeDelay()
4875  {
4876    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
4877  }
4878
4879  /**
4880   * Check and purge the historical attribute on all eligible entries under this domain.
4881   *
4882   * The purging logic is the same applied to individual entries during modify operations. This
4883   * task may be useful in scenarios where a large number of changes are made as a one-off occurrence.
4884   * Running a purge-historical after the 'ds-cfg-conflicts-historical-purge-delay' period has elapsed
4885   * would clear out obsolete historical data from all the modified entries reducing the overall
4886   * database size.
4887   *
4888   * @param task
4889   *          the task raising this purge.
4890   * @param endDate
4891   *          the date to stop this task whether the job is done or not.
4892   * @throws DirectoryException
4893   *           when an exception happens.
4894   */
4895  public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
4896      long endDate) throws DirectoryException
4897  {
4898    logger.trace("[PURGE] purgeConflictsHistorical "
4899         + "on domain: " + getBaseDN()
4900         + "endDate:" + new Date(endDate)
4901         + "lastCSNPurgedFromHist: "
4902         + lastCSNPurgedFromHist.toStringUI());
4903
4904
4905    // It would be nice to have an upper bound on this filter to eliminate results that don't have a purgeable
4906    // csn in them. However, historicalCsnOrderingMatch keys start with serverid rather than timestamp so this
4907    // isn't possible.
4908    String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
4909
4910    int count = 0;
4911    boolean finished = false;
4912    ByteString pagingCookie = null;
4913
4914    while(!finished)
4915    {
4916      if (task != null)
4917      {
4918        task.setProgressStats(lastCSNPurgedFromHist, count);
4919      }
4920
4921      finished = true;
4922
4923
4924      SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
4925          .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS)
4926          .addControl(new PagedResultsControl(false, ConfigConstants.DEFAULT_SIZE_LIMIT, pagingCookie))
4927          .setSizeLimit(ConfigConstants.DEFAULT_SIZE_LIMIT + 1);
4928
4929      InternalSearchOperation searchOp = conn.processSearch(request);
4930
4931      for (Control c : searchOp.getResponseControls())
4932      {
4933        if (c.getOID().equals(OID_PAGED_RESULTS_CONTROL))
4934        {
4935          ByteString newPagingCookie = ((PagedResultsControl)c).getCookie();
4936
4937          if( newPagingCookie != null &&
4938              newPagingCookie.length() > 0 &&
4939              !newPagingCookie.equals(pagingCookie))
4940          {
4941            pagingCookie = newPagingCookie;
4942            finished = false;
4943          }
4944        }
4945      }
4946
4947      for (SearchResultEntry entry : searchOp.getSearchEntries())
4948      {
4949        long maxTimeToRun = endDate - TimeThread.getTime();
4950        if (maxTimeToRun < 0) {
4951          throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
4952              LocalizableMessage.raw(" end date reached"));
4953        }
4954
4955        EntryHistorical entryHist = newInstanceFromEntry(entry);
4956
4957        CSN latestOldCSN = entryHist.getOldestCSN();
4958        entryHist.setPurgeDelay(getHistoricalPurgeDelay());
4959        Attribute attr = entryHist.encodeAndPurge();
4960
4961        if(entryHist.getLastPurgedValuesCount() > 0)
4962        {
4963          lastCSNPurgedFromHist = latestOldCSN;
4964          List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
4965          count += entryHist.getLastPurgedValuesCount();
4966          ModifyOperation newOp = new ModifyOperationBasis(
4967              conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
4968              entry.getName(), mods);
4969          runAsSynchronizedOperation(newOp);
4970
4971          if (newOp.getResultCode() != ResultCode.SUCCESS)
4972          {
4973            // Log information for the repair tool.
4974            logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
4975          }
4976          else if (task != null)
4977          {
4978            task.setProgressStats(lastCSNPurgedFromHist, count);
4979          }
4980        }
4981      }
4982    }
4983    // If a full sweep was completed, the lastCSNPurgedFromHist must be reset so that the next
4984    // run-through starts from the beginning. Otherwise, subsequent runs of the task would only
4985    // pick up purgeable changes for the last server id.
4986    lastCSNPurgedFromHist = CSN.MIN_VALUE;
4987  }
4988}