001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2017 ForgeRock AS.
016 */
017package org.opends.server.replication.plugin;
018
019import static org.opends.messages.ReplicationMessages.*;
020import static org.opends.server.replication.plugin.EntryHistorical.HISTORICAL_ATTACHMENT_NAME;
021import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
022import static org.opends.server.util.ServerConstants.*;
023import static org.opends.server.util.StaticUtils.*;
024
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.CopyOnWriteArraySet;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantLock;
038
039import org.forgerock.i18n.LocalizableMessage;
040import org.forgerock.i18n.slf4j.LocalizedLogger;
041import org.forgerock.opendj.config.server.ConfigChangeResult;
042import org.forgerock.opendj.config.server.ConfigException;
043import org.forgerock.opendj.config.server.ConfigurationAddListener;
044import org.forgerock.opendj.config.server.ConfigurationChangeListener;
045import org.forgerock.opendj.config.server.ConfigurationDeleteListener;
046import org.forgerock.opendj.ldap.DN;
047import org.forgerock.opendj.ldap.ResultCode;
048import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
049import org.forgerock.opendj.server.config.server.ReplicationSynchronizationProviderCfg;
050import org.opends.server.api.Backend;
051import org.opends.server.api.BackupTaskListener;
052import org.opends.server.api.ExportTaskListener;
053import org.opends.server.api.ImportTaskListener;
054import org.opends.server.api.RestoreTaskListener;
055import org.opends.server.api.SynchronizationProvider;
056import org.opends.server.core.DirectoryServer;
057import org.opends.server.replication.service.DSRSShutdownSync;
058import org.opends.server.types.BackupConfig;
059import org.opends.server.types.Control;
060import org.opends.server.types.DirectoryException;
061import org.opends.server.types.Entry;
062import org.opends.server.types.HostPort;
063import org.opends.server.types.LDIFExportConfig;
064import org.opends.server.types.LDIFImportConfig;
065import org.opends.server.types.Modification;
066import org.opends.server.types.Operation;
067import org.opends.server.types.RestoreConfig;
068import org.opends.server.types.SynchronizationProviderResult;
069import org.opends.server.types.operation.PluginOperation;
070import org.opends.server.types.operation.PostOperationAddOperation;
071import org.opends.server.types.operation.PostOperationDeleteOperation;
072import org.opends.server.types.operation.PostOperationModifyDNOperation;
073import org.opends.server.types.operation.PostOperationModifyOperation;
074import org.opends.server.types.operation.PostOperationOperation;
075import org.opends.server.types.operation.PreOperationAddOperation;
076import org.opends.server.types.operation.PreOperationDeleteOperation;
077import org.opends.server.types.operation.PreOperationModifyDNOperation;
078import org.opends.server.types.operation.PreOperationModifyOperation;
079import org.opends.server.util.Platform;
080import org.opends.server.util.TimeThread;
081
082/**
083 * This class is used to load the Replication code inside the JVM
084 * and to trigger initialization of the replication.
085 *
086 * It also extends the SynchronizationProvider class in order to have some
087 * replication code running during the operation process
088 * as pre-op, conflictResolution, and post-op.
089 */
090public class MultimasterReplication
091       extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
092       implements ConfigurationAddListener<ReplicationDomainCfg>,
093                  ConfigurationDeleteListener<ReplicationDomainCfg>,
094                  ConfigurationChangeListener
095                  <ReplicationSynchronizationProviderCfg>,
096                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
097                  ExportTaskListener
098{
099  /** Enum that symbolizes the state of the multimaster replication. */
100  private enum State
101  {
102    STARTING, RUNNING, STOPPING
103  }
104
105  /**
106   * Keeps information on temporarily unreachable replication unreachableServers.
107   * RSes in the list will be deleted after an interval, depending on the number of configured replication
108   * unreachableServers, from the time the last server has been added. It guarantees the replication server, if any,
109   * will not timeout twice on the same RS while connecting to other RSes, blocking new incoming connections
110   * for too long (see {@link ReplicationServer#runConnect()} and how it updates {@code domainTicket}).
111   * A Data Server will use this information but not update it, since it can simply try the next in its list;
112   * once it is connected, it will recalculate the best server and reconnect if necessary.
113   */
114  public static class UnreachableReplicationServers
115  {
116    private final Collection<HostPort> unreachableServers = new CopyOnWriteArraySet<>();
117    private volatile long lastAddTime = TimeThread.getTime();
118    private volatile int maxLifetimeMS = getConnectionTimeoutMS() * 3; // myself, my peer and one client timeout
119
120    Collection<HostPort> getUnreachableServers()
121    {
122      if (!unreachableServers.isEmpty() && TimeThread.getTime() - lastAddTime > maxLifetimeMS)
123      {
124        unreachableServers.clear();
125      }
126      return unreachableServers;
127    }
128
129    void updateReplicationServersCount(int maxServers)
130    {
131      maxLifetimeMS = getConnectionTimeoutMS() * (maxServers + 1);
132    }
133
134    /**
135     * Add a server to the set of temporarily unreachable unreachableServers.
136     *
137     * @param server the server to add
138     */
139    public void addServer(HostPort server)
140    {
141      lastAddTime = TimeThread.getTime();
142      unreachableServers.add(server);
143    }
144
145    /**
146     * Returns {@code true} if the provided server is temporarily unreachable.
147     *
148     * @param server the server to check
149     * @return {@code true} if the provided server is temporarily unreachable
150     */
151    public boolean isUnreachable(HostPort server)
152    {
153      return getUnreachableServers().contains(server);
154    }
155  }
156
157  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
158
159  private ReplicationServerListener replicationServerListener;
160  private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4);
161  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
162  /** The queue of received update messages, to be treated by the ReplayThread threads. */
163  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000);
164  /** The list of ReplayThread threads. */
165  private static final List<ReplayThread> replayThreads = new ArrayList<>();
166  /** The configurable number of replay threads. */
167  private static int replayThreadNumber = 10;
168  /** Set of Replication Servers that could not be contacted. */
169  private static final UnreachableReplicationServers unreachableRSes = new UnreachableReplicationServers();
170
171  private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING);
172
173  /** The configurable connection/handshake timeout. */
174  private static volatile int connectionTimeoutMS = 5000;
175
176  /**
177   * Finds the domain for a given DN.
178   *
179   * @param dn         The DN for which the domain must be returned.
180   * @param pluginOp   An optional operation for which the check is done.
181   *                   Can be null is the request has no associated operation.
182   * @return           The domain for this DN.
183   */
184  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
185  {
186    /*
187     * Don't run the special replication code on Operation that are
188     * specifically marked as don't synchronize.
189     */
190    if (pluginOp instanceof Operation)
191    {
192        final Operation op = (Operation) pluginOp;
193        if (op.dontSynchronize())
194        {
195          return null;
196        }
197
198        /*
199         * Check if the provided operation is a repair operation and set the
200         * synchronization flags if necessary.
201         * The repair operations are tagged as synchronization operations so
202         * that the core server let the operation modify the entryuuid and
203         * ds-sync-hist attributes.
204         * They are also tagged as dontSynchronize so that the replication code
205         * running later do not generate CSN, solve conflicts and forward the
206         * operation to the replication server.
207         */
208        for (Iterator<Control> it = op.getRequestControls().iterator(); it.hasNext();)
209        {
210          Control c = it.next();
211          if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID()))
212          {
213            op.setSynchronizationOperation(true);
214            op.setDontSynchronize(true);
215            /*
216            remove this control from the list of controls since it has now been
217            processed and the local backend will fail if it finds a control that
218            it does not know about and that is marked as critical.
219            */
220            it.remove();
221            return null;
222          }
223        }
224    }
225
226    LDAPReplicationDomain domain = null;
227    DN temp = dn;
228    while (domain == null && temp != null)
229    {
230      domain = domains.get(temp);
231      temp = DirectoryServer.getParentDNInSuffix(temp);
232    }
233
234    return domain;
235  }
236
237  /**
238   * Creates a new domain from its configEntry, do the
239   * necessary initialization and starts it so that it is
240   * fully operational when this method returns.
241   * @param configuration The entry with the configuration of this domain.
242   * @return The domain created.
243   * @throws ConfigException When the configuration is not valid.
244   */
245  public static LDAPReplicationDomain createNewDomain(
246      ReplicationDomainCfg configuration)
247      throws ConfigException
248  {
249    try
250    {
251      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
252          configuration, updateToReplayQueue, dsrsShutdownSync);
253      if (domains.isEmpty())
254      {
255        // Create the threads that will process incoming update messages
256        createReplayThreads();
257      }
258
259      domains.put(domain.getBaseDN(), domain);
260      return domain;
261    }
262    catch (ConfigException e)
263    {
264      logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(),
265          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
266    }
267    return null;
268  }
269
270  /**
271   * Creates a new domain from its configEntry, do the necessary initialization
272   * and starts it so that it is fully operational when this method returns. It
273   * is only used for tests so far.
274   *
275   * @param configuration The entry with the configuration of this domain.
276   * @param queue         The BlockingQueue that this domain will use.
277   *
278   * @return              The domain created.
279   *
280   * @throws ConfigException When the configuration is not valid.
281   */
282  static LDAPReplicationDomain createNewDomain(
283      ReplicationDomainCfg configuration,
284      BlockingQueue<UpdateToReplay> queue)
285      throws ConfigException
286  {
287    final LDAPReplicationDomain domain =
288        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
289    domains.put(domain.getBaseDN(), domain);
290    return domain;
291  }
292
293  /**
294   * Deletes a domain.
295   * @param dn : the base DN of the domain to delete.
296   */
297  public static void deleteDomain(DN dn)
298  {
299    LDAPReplicationDomain domain = domains.remove(dn);
300    if (domain != null)
301    {
302      domain.delete();
303    }
304
305    // No replay threads running if no replication need
306    if (domains.isEmpty()) {
307      stopReplayThreads();
308    }
309  }
310
311  @Override
312  public void initializeSynchronizationProvider(
313      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
314  {
315    domains.clear();
316    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
317
318    // Register as an add and delete listener with the root configuration so we
319    // can be notified if Multimaster domain entries are added or removed.
320    cfg.addReplicationDomainAddListener(this);
321    cfg.addReplicationDomainDeleteListener(this);
322
323    // Register as a root configuration listener so that we can be notified if
324    // number of replay threads is changed and apply changes.
325    cfg.addReplicationChangeListener(this);
326
327    replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg);
328    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
329
330    //  Create the list of domains that are already defined.
331    for (String name : cfg.listReplicationDomains())
332    {
333      createNewDomain(cfg.getReplicationDomain(name));
334    }
335
336    // If any schema changes were made with the server offline, then handle them now.
337    List<Modification> offlineSchemaChanges =
338         DirectoryServer.getOfflineSchemaChanges();
339    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
340    {
341      processSchemaChange(offlineSchemaChanges);
342    }
343
344    DirectoryServer.registerBackupTaskListener(this);
345    DirectoryServer.registerRestoreTaskListener(this);
346    DirectoryServer.registerExportTaskListener(this);
347    DirectoryServer.registerImportTaskListener(this);
348
349    DirectoryServer.registerSupportedControl(
350        ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
351  }
352
353  private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg)
354  {
355    Integer value = cfg.getNumUpdateReplayThreads();
356    return value == null ? Platform.computeNumberOfThreads(16, 2.0f) : value;
357  }
358
359  /** Create the threads that will wait for incoming update messages. */
360  private static synchronized void createReplayThreads()
361  {
362    replayThreads.clear();
363
364    ReentrantLock switchQueueLock = new ReentrantLock();
365    for (int i = 0; i < replayThreadNumber; i++)
366    {
367      ReplayThread replayThread = new ReplayThread(updateToReplayQueue, switchQueueLock);
368      replayThread.start();
369      replayThreads.add(replayThread);
370    }
371  }
372
373  /** Stop the threads that are waiting for incoming update messages. */
374  private static synchronized void stopReplayThreads()
375  {
376    //  stop the replay threads
377    for (ReplayThread replayThread : replayThreads)
378    {
379      replayThread.shutdown();
380    }
381
382    for (ReplayThread replayThread : replayThreads)
383    {
384      try
385      {
386        replayThread.join();
387      }
388      catch(InterruptedException e)
389      {
390        Thread.currentThread().interrupt();
391      }
392    }
393    replayThreads.clear();
394  }
395
396  @Override
397  public boolean isConfigurationAddAcceptable(
398      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
399  {
400    return LDAPReplicationDomain.isConfigurationAcceptable(
401      configuration, unacceptableReasons);
402  }
403
404  @Override
405  public ConfigChangeResult applyConfigurationAdd(
406     ReplicationDomainCfg configuration)
407  {
408    ConfigChangeResult ccr = new ConfigChangeResult();
409    try
410    {
411      LDAPReplicationDomain rd = createNewDomain(configuration);
412      if (State.RUNNING.equals(state.get()))
413      {
414        rd.start();
415        if (State.STOPPING.equals(state.get())) {
416          rd.shutdown();
417        }
418      }
419    } catch (ConfigException e)
420    {
421      // we should never get to this point because the configEntry has
422      // already been validated in isConfigurationAddAcceptable()
423      ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
424    }
425    return ccr;
426  }
427
428  @Override
429  public void doPostOperation(PostOperationAddOperation addOperation)
430  {
431    DN dn = addOperation.getEntryDN();
432    genericPostOperation(addOperation, dn);
433  }
434
435  @Override
436  public void doPostOperation(PostOperationDeleteOperation deleteOperation)
437  {
438    DN dn = deleteOperation.getEntryDN();
439    genericPostOperation(deleteOperation, dn);
440  }
441
442  @Override
443  public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation)
444  {
445    DN dn = modifyDNOperation.getEntryDN();
446    genericPostOperation(modifyDNOperation, dn);
447  }
448
449  @Override
450  public void doPostOperation(PostOperationModifyOperation modifyOperation)
451  {
452    DN dn = modifyOperation.getEntryDN();
453    genericPostOperation(modifyOperation, dn);
454  }
455
456  @Override
457  public SynchronizationProviderResult handleConflictResolution(
458      PreOperationModifyOperation modifyOperation)
459  {
460    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
461    if (domain != null)
462    {
463      return domain.handleConflictResolution(modifyOperation);
464    }
465    return new SynchronizationProviderResult.ContinueProcessing();
466  }
467
468  @Override
469  public SynchronizationProviderResult handleConflictResolution(
470      PreOperationAddOperation addOperation) throws DirectoryException
471  {
472    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
473    if (domain != null)
474    {
475      return domain.handleConflictResolution(addOperation);
476    }
477    return new SynchronizationProviderResult.ContinueProcessing();
478  }
479
480  @Override
481  public SynchronizationProviderResult handleConflictResolution(
482      PreOperationDeleteOperation deleteOperation) throws DirectoryException
483  {
484    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
485    if (domain != null)
486    {
487      return domain.handleConflictResolution(deleteOperation);
488    }
489    return new SynchronizationProviderResult.ContinueProcessing();
490  }
491
492  @Override
493  public SynchronizationProviderResult handleConflictResolution(
494      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
495  {
496    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
497    if (domain != null)
498    {
499      return domain.handleConflictResolution(modifyDNOperation);
500    }
501    return new SynchronizationProviderResult.ContinueProcessing();
502  }
503
504  @Override
505  public SynchronizationProviderResult
506         doPreOperation(PreOperationModifyOperation modifyOperation)
507  {
508    DN operationDN = modifyOperation.getEntryDN();
509    LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation);
510
511    if (domain == null || !domain.solveConflict())
512    {
513      return new SynchronizationProviderResult.ContinueProcessing();
514    }
515
516    EntryHistorical historicalInformation =
517      (EntryHistorical) modifyOperation.getAttachment(HISTORICAL_ATTACHMENT_NAME);
518    if (historicalInformation == null)
519    {
520      Entry entry = modifyOperation.getModifiedEntry();
521      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
522      modifyOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation);
523    }
524    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
525    historicalInformation.setHistoricalAttrToOperation(modifyOperation);
526
527    if (modifyOperation.getModifications().isEmpty())
528    {
529      /*
530       * This operation becomes a no-op due to conflict resolution
531       * stop the processing and send an OK result
532       */
533      return new SynchronizationProviderResult.StopProcessing(
534          ResultCode.SUCCESS, null);
535    }
536
537    return new SynchronizationProviderResult.ContinueProcessing();
538  }
539
540  @Override
541  public SynchronizationProviderResult doPreOperation(
542         PreOperationDeleteOperation deleteOperation) throws DirectoryException
543  {
544    return new SynchronizationProviderResult.ContinueProcessing();
545  }
546
547  @Override
548  public SynchronizationProviderResult doPreOperation(
549         PreOperationModifyDNOperation modifyDNOperation)
550         throws DirectoryException
551  {
552    DN operationDN = modifyDNOperation.getEntryDN();
553    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
554
555    if (domain == null || !domain.solveConflict())
556    {
557      return new SynchronizationProviderResult.ContinueProcessing();
558    }
559
560    // The historical object is retrieved from the attachment created
561    // in the HandleConflictResolution phase.
562    EntryHistorical historicalInformation = (EntryHistorical)
563    modifyDNOperation.getAttachment(HISTORICAL_ATTACHMENT_NAME);
564    if (historicalInformation == null)
565    {
566      // When no Historical attached, create once by loading from the entry
567      // and attach it to the operation
568      Entry entry = modifyDNOperation.getUpdatedEntry();
569      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
570      modifyDNOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation);
571    }
572    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
573
574    // Add to the operation the historical attribute : "dn:changeNumber:moddn"
575    historicalInformation.setHistoricalAttrToOperation(modifyDNOperation);
576
577    return new SynchronizationProviderResult.ContinueProcessing();
578  }
579
580  @Override
581  public SynchronizationProviderResult doPreOperation(
582         PreOperationAddOperation addOperation)
583  {
584    // Check replication domain
585    LDAPReplicationDomain domain =
586      findDomain(addOperation.getEntryDN(), addOperation);
587    if (domain == null)
588    {
589      return new SynchronizationProviderResult.ContinueProcessing();
590    }
591
592    // For LOCAL op only, generate CSN and attach Context
593    if (!addOperation.isSynchronizationOperation())
594    {
595      domain.doPreOperation(addOperation);
596    }
597
598    // Add to the operation the historical attribute : "dn:changeNumber:add"
599    EntryHistorical.setHistoricalAttrToOperation(addOperation);
600
601    return new SynchronizationProviderResult.ContinueProcessing();
602  }
603
604  @Override
605  public void finalizeSynchronizationProvider()
606  {
607    setState(State.STOPPING);
608
609    for (LDAPReplicationDomain domain : domains.values())
610    {
611      domain.shutdown();
612    }
613    domains.clear();
614
615    stopReplayThreads();
616
617    if (replicationServerListener != null)
618    {
619      replicationServerListener.shutdown();
620    }
621
622    DirectoryServer.deregisterBackupTaskListener(this);
623    DirectoryServer.deregisterRestoreTaskListener(this);
624    DirectoryServer.deregisterExportTaskListener(this);
625    DirectoryServer.deregisterImportTaskListener(this);
626  }
627
628  /**
629   * This method is called whenever the server detects a modification
630   * of the schema done by directly modifying the backing files
631   * of the schema backend.
632   * Call the schema Domain if it exists.
633   *
634   * @param  modifications  The list of modifications that was
635   *                                      applied to the schema.
636   */
637  @Override
638  public void processSchemaChange(List<Modification> modifications)
639  {
640    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
641    if (domain != null)
642    {
643      domain.synchronizeSchemaModifications(modifications);
644    }
645  }
646
647  @Override
648  public void processBackupBegin(Backend<?> backend, BackupConfig config)
649  {
650    for (DN dn : backend.getBaseDNs())
651    {
652      LDAPReplicationDomain domain = findDomain(dn, null);
653      if (domain != null)
654      {
655        domain.backupStart();
656      }
657    }
658  }
659
660  @Override
661  public void processBackupEnd(Backend<?> backend, BackupConfig config, boolean successful)
662  {
663    for (DN dn : backend.getBaseDNs())
664    {
665      LDAPReplicationDomain domain = findDomain(dn, null);
666      if (domain != null)
667      {
668        domain.backupEnd();
669      }
670    }
671  }
672
673  @Override
674  public void processRestoreBegin(Backend<?> backend, RestoreConfig config)
675  {
676    for (DN dn : backend.getBaseDNs())
677    {
678      LDAPReplicationDomain domain = findDomain(dn, null);
679      if (domain != null)
680      {
681        domain.disable();
682      }
683    }
684  }
685
686  @Override
687  public void processRestoreEnd(Backend<?> backend, RestoreConfig config, boolean successful)
688  {
689    for (DN dn : backend.getBaseDNs())
690    {
691      LDAPReplicationDomain domain = findDomain(dn, null);
692      if (domain != null)
693      {
694        domain.enable();
695      }
696    }
697  }
698
699  @Override
700  public void processImportBegin(Backend<?> backend, LDIFImportConfig config)
701  {
702    for (DN dn : backend.getBaseDNs())
703    {
704      LDAPReplicationDomain domain = findDomain(dn, null);
705      if (domain != null)
706      {
707        domain.disable();
708      }
709    }
710  }
711
712  @Override
713  public void processImportEnd(Backend<?> backend, LDIFImportConfig config, boolean successful)
714  {
715    for (DN dn : backend.getBaseDNs())
716    {
717      LDAPReplicationDomain domain = findDomain(dn, null);
718      if (domain != null)
719      {
720        domain.enable();
721      }
722    }
723  }
724
725  @Override
726  public void processExportBegin(Backend<?> backend, LDIFExportConfig config)
727  {
728    for (DN dn : backend.getBaseDNs())
729    {
730      LDAPReplicationDomain domain = findDomain(dn, null);
731      if (domain != null)
732      {
733        domain.backupStart();
734      }
735    }
736  }
737
738  @Override
739  public void processExportEnd(Backend<?> backend, LDIFExportConfig config, boolean successful)
740  {
741    for (DN dn : backend.getBaseDNs())
742    {
743      LDAPReplicationDomain domain = findDomain(dn, null);
744      if (domain != null)
745      {
746        domain.backupEnd();
747      }
748    }
749  }
750
751  @Override
752  public ConfigChangeResult applyConfigurationDelete(
753      ReplicationDomainCfg configuration)
754  {
755    deleteDomain(configuration.getBaseDN());
756
757    return new ConfigChangeResult();
758  }
759
760  @Override
761  public boolean isConfigurationDeleteAcceptable(
762      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
763  {
764    return true;
765  }
766
767  /**
768   * Generic code for all the postOperation entry point.
769   *
770   * @param operation The Operation for which the post-operation is called.
771   * @param dn The Dn for which the post-operation is called.
772   */
773  private void genericPostOperation(PostOperationOperation operation, DN dn)
774  {
775    LDAPReplicationDomain domain = findDomain(dn, operation);
776    if (domain != null) {
777      domain.synchronize(operation);
778    }
779  }
780
781  /**
782   * Returns the replication server listener associated to that Multimaster
783   * Replication.
784   * @return the listener.
785   */
786  public ReplicationServerListener getReplicationServerListener()
787  {
788    return replicationServerListener;
789  }
790
791  @Override
792  public boolean isConfigurationChangeAcceptable(
793      ReplicationSynchronizationProviderCfg configuration,
794      List<LocalizableMessage> unacceptableReasons)
795  {
796    return true;
797  }
798
799  @Override
800  public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration)
801  {
802    // Stop threads then restart new number of threads
803    stopReplayThreads();
804    replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration);
805    if (!domains.isEmpty())
806    {
807      createReplayThreads();
808    }
809
810    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
811        Integer.MAX_VALUE);
812
813    return new ConfigChangeResult();
814  }
815
816  @Override
817  public void completeSynchronizationProvider()
818  {
819    for (LDAPReplicationDomain domain : domains.values())
820    {
821      domain.start();
822    }
823    setState(State.RUNNING);
824  }
825
826  private void setState(State newState)
827  {
828    state.set(newState);
829    synchronized (state)
830    {
831      state.notifyAll();
832    }
833  }
834
835  /**
836   * Gets the number of handled domain objects.
837   * @return The number of handled domain objects
838   */
839  public static int getNumberOfDomains()
840  {
841    return domains.size();
842  }
843
844  /**
845   * Gets the Set of domain baseDN which are disabled for the external changelog.
846   *
847   * @return The Set of domain baseDNs which are disabled for the external changelog.
848   * @throws DirectoryException
849   *            if a problem occurs
850   */
851  public static Set<DN> getExcludedChangelogDomains() throws DirectoryException
852  {
853    final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1);
854    disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT));
855    for (LDAPReplicationDomain domain : domains.values())
856    {
857      if (!domain.isECLEnabled())
858      {
859        disabledBaseDNs.add(domain.getBaseDN());
860      }
861    }
862    return disabledBaseDNs;
863  }
864
865  /**
866   * Returns whether the provided baseDN represents a replication domain enabled
867   * for the external changelog.
868   *
869   * @param baseDN
870   *          the replication domain to check
871   * @return true if the provided baseDN is enabled for the external changelog,
872   *         false if the provided baseDN is disabled for the external changelog
873   *         or unknown to multimaster replication.
874   */
875  public static boolean isECLEnabledDomain(DN baseDN)
876  {
877    waitForStartup();
878    // if state is STOPPING, then we need to return from this method
879    final LDAPReplicationDomain domain = domains.get(baseDN);
880    return domain != null && domain.isECLEnabled();
881  }
882
883  /**
884   * Returns whether the external change-log contains data from at least a domain.
885   * @return whether the external change-log contains data from at least a domain
886   */
887  public static boolean isECLEnabled()
888  {
889    waitForStartup();
890    for (LDAPReplicationDomain domain : domains.values())
891    {
892      if (domain.isECLEnabled())
893      {
894        return true;
895      }
896    }
897    return false;
898  }
899
900  private static void waitForStartup()
901  {
902    if (State.STARTING.equals(state.get()))
903    {
904      synchronized (state)
905      {
906        while (State.STARTING.equals(state.get()))
907        {
908          try
909          {
910            state.wait();
911          }
912          catch (InterruptedException ignored)
913          {
914            // loop and check state again
915          }
916        }
917      }
918    }
919  }
920
921  /**
922   * Returns the connection timeout in milli-seconds.
923   *
924   * @return The connection timeout in milli-seconds.
925   */
926  public static int getConnectionTimeoutMS()
927  {
928    return connectionTimeoutMS;
929  }
930
931  static void updateReplicationServersCount(int numberOfRSes)
932  {
933    unreachableRSes.updateReplicationServersCount(numberOfRSes);
934  }
935
936  /**
937   * Returns temporarily unreachable Replication Servers.
938   *
939   * @return temporarily unreachable Replication Servers
940   */
941  public static UnreachableReplicationServers getUnreachableReplicationServers()
942  {
943    return unreachableRSes;
944  }
945}