001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2014-2016 ForgeRock AS.
015 */
016package org.opends.server.replication.server.changelog.file;
017
018import static org.opends.messages.ReplicationMessages.*;
019import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
020import static org.opends.server.util.StaticUtils.*;
021
022import java.io.File;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.ConcurrentSkipListMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicReference;
034
035import net.jcip.annotations.GuardedBy;
036
037import org.forgerock.i18n.LocalizableMessageBuilder;
038import org.forgerock.i18n.slf4j.LocalizedLogger;
039import org.forgerock.opendj.config.DurationUnit;
040import org.forgerock.opendj.config.server.ConfigException;
041import org.forgerock.util.Pair;
042import org.forgerock.util.time.TimeService;
043import org.opends.server.api.DirectoryThread;
044import org.opends.server.backends.ChangelogBackend;
045import org.opends.server.crypto.CryptoSuite;
046import org.opends.server.replication.common.CSN;
047import org.opends.server.replication.common.MultiDomainServerState;
048import org.opends.server.replication.common.ServerState;
049import org.opends.server.replication.protocol.UpdateMsg;
050import org.opends.server.replication.server.ChangelogState;
051import org.opends.server.replication.server.ReplicationServer;
052import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
053import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
054import org.opends.server.replication.server.changelog.api.ChangelogDB;
055import org.opends.server.replication.server.changelog.api.ChangelogException;
056import org.opends.server.replication.server.changelog.api.DBCursor;
057import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
058import org.opends.server.replication.server.changelog.api.ReplicaId;
059import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
060import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
061import org.forgerock.opendj.ldap.DN;
062import org.opends.server.util.StaticUtils;
063import org.opends.server.util.TimeThread;
064
065/** Log file implementation of the ChangelogDB interface. */
066public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
067{
068  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
069
070  /**
071   * This map contains the List of updates received from each LDAP server.
072   * <p>
073   * When removing a domainMap, code:
074   * <ol>
075   * <li>first get the domainMap</li>
076   * <li>synchronized on the domainMap</li>
077   * <li>remove the domainMap</li>
078   * <li>then check it's not null</li>
079   * <li>then close all inside</li>
080   * </ol>
081   * When creating a replicaDB, synchronize on the domainMap to avoid
082   * concurrent shutdown.
083   */
084  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
085      new ConcurrentHashMap<>();
086  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
087      new ConcurrentSkipListMap<>();
088  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>();
089  private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
090      new ConcurrentSkipListMap<>();
091  private ReplicationEnvironment replicationEnv;
092  private final File dbDirectory;
093
094  /**
095   * The handler of the changelog database, the database stores the relation
096   * between a change number and the associated cookie.
097   */
098  @GuardedBy("cnIndexDBLock")
099  private FileChangeNumberIndexDB cnIndexDB;
100  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>();
101
102  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
103  private final Object cnIndexDBLock = new Object();
104
105  /**
106   * The purge delay (in milliseconds). Records in the changelog DB that are
107   * older than this delay might be removed.
108   */
109  private volatile long purgeDelayInMillis;
110  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>();
111
112  /** The local replication server. */
113  private final ReplicationServer replicationServer;
114  private final AtomicBoolean shutdown = new AtomicBoolean();
115
116  private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor();
117  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
118      new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY);
119
120  private final CryptoSuite cryptoSuite;
121  /**
122   * Creates a new changelog DB.
123   *
124   * @param replicationServer
125   *          the local replication server.
126   * @param dbDirectoryPath
127   *          the path where the changelog files reside.
128   * @param cryptoSuite
129   *          the cryptosuite to use for encryption
130   * @throws ConfigException
131   *           if a problem occurs opening the supplied directory
132   */
133  public FileChangelogDB(final ReplicationServer replicationServer, String dbDirectoryPath, CryptoSuite cryptoSuite)
134      throws ConfigException
135  {
136    this.replicationServer = replicationServer;
137    this.dbDirectory = makeDir(dbDirectoryPath);
138    this.cryptoSuite = cryptoSuite;
139  }
140
141  private File makeDir(final String dbDirName) throws ConfigException
142  {
143    // Check that this path exists or create it.
144    final File dbDirectory = getFileForPath(dbDirName);
145    try
146    {
147      if (!dbDirectory.exists())
148      {
149        dbDirectory.mkdir();
150      }
151      return dbDirectory;
152    }
153    catch (Exception e)
154    {
155      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
156          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
157      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
158    }
159  }
160
161  private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
162  {
163    final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
164    if (domainMap != null)
165    {
166      return domainMap;
167    }
168    return Collections.emptyMap();
169  }
170
171  private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
172  {
173    return getDomainMap(baseDN).get(serverId);
174  }
175
176  /**
177   * Returns a {@link FileReplicaDB}, possibly creating it.
178   *
179   * @param baseDN
180   *          the baseDN for which to create a ReplicaDB
181   * @param serverId
182   *          the serverId for which to create a ReplicaDB
183   * @param server
184   *          the ReplicationServer
185   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
186   * @throws ChangelogException
187   *           if a problem occurred with the database
188   */
189  Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
190      final ReplicationServer server) throws ChangelogException
191  {
192    while (!shutdown.get())
193    {
194      final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
195      final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
196      if (result != null)
197      {
198        final Boolean dbWasCreated = result.getSecond();
199        if (dbWasCreated)
200        { // new replicaDB => update all cursors with it
201          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
202          if (cursors != null && !cursors.isEmpty())
203          {
204            for (DomainDBCursor cursor : cursors)
205            {
206              cursor.addReplicaDB(serverId, null);
207            }
208          }
209        }
210
211        return result;
212      }
213    }
214    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
215  }
216
217  private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
218  {
219    // happy path: the domainMap already exists
220    final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
221    if (currentValue != null)
222    {
223      return currentValue;
224    }
225
226    // unlucky, the domainMap does not exist: take the hit and create the
227    // newValue, even though the same could be done concurrently by another thread
228    final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<>();
229    final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
230    if (previousValue != null)
231    {
232      // there was already a value associated to the key, let's use it
233      return previousValue;
234    }
235
236    // we just created a new domain => update all cursors
237    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
238    {
239      cursor.addDomain(baseDN, null);
240    }
241    return newValue;
242  }
243
244  private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
245      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
246  {
247    // happy path: the replicaDB already exists
248    FileReplicaDB currentValue = domainMap.get(serverId);
249    if (currentValue != null)
250    {
251      return Pair.of(currentValue, false);
252    }
253
254    // unlucky, the replicaDB does not exist: take the hit and synchronize
255    // on the domainMap to create a new ReplicaDB
256    synchronized (domainMap)
257    {
258      // double-check
259      currentValue = domainMap.get(serverId);
260      if (currentValue != null)
261      {
262        return Pair.of(currentValue, false);
263      }
264
265      if (domainToReplicaDBs.get(baseDN) != domainMap)
266      {
267        // The domainMap could have been concurrently removed because
268        // 1) a shutdown was initiated or 2) an initialize was called.
269        // Return will allow the code to:
270        // 1) shutdown properly or 2) lazily recreate the replicaDB
271        return null;
272      }
273
274      final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, cryptoSuite, replicationEnv);
275      domainMap.put(serverId, newDB);
276      return Pair.of(newDB, true);
277    }
278  }
279
280  @Override
281  public void initializeDB()
282  {
283    try
284    {
285      replicationEnv = new ReplicationEnvironment(dbDirectory.getAbsolutePath(), replicationServer, TimeService.SYSTEM);
286      final ChangelogState changelogState = replicationEnv.getChangelogState();
287      initializeToChangelogState(changelogState);
288      if (replicationServer.isChangeNumberEnabled())
289      {
290        startIndexer();
291      }
292      setPurgeDelay(replicationServer.getPurgeDelay());
293    }
294    catch (ChangelogException e)
295    {
296      logger.traceException(e);
297      logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
298    }
299  }
300
301  private void initializeToChangelogState(final ChangelogState changelogState)
302      throws ChangelogException
303  {
304    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
305    {
306      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
307    }
308    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
309    {
310      for (int serverId : entry.getValue())
311      {
312        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
313      }
314    }
315  }
316
317  private void shutdownChangeNumberIndexDB() throws ChangelogException
318  {
319    synchronized (cnIndexDBLock)
320    {
321      if (cnIndexDB != null)
322      {
323        cnIndexDB.shutdown();
324      }
325    }
326  }
327
328  @Override
329  public void shutdownDB() throws ChangelogException
330  {
331    if (!this.shutdown.compareAndSet(false, true))
332    { // shutdown has already been initiated
333      return;
334    }
335
336    shutdownCNIndexerAndPurger();
337
338    // Remember the first exception because :
339    // - we want to try to remove everything we want to remove
340    // - then throw the first encountered exception
341    ChangelogException firstException = null;
342
343    // now we can safely shutdown all DBs
344    try
345    {
346      shutdownChangeNumberIndexDB();
347    }
348    catch (ChangelogException e)
349    {
350      firstException = e;
351    }
352
353    for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
354        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
355    {
356      final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
357      synchronized (domainMap)
358      {
359        it.remove();
360        for (FileReplicaDB replicaDB : domainMap.values())
361        {
362          replicaDB.shutdown();
363        }
364      }
365    }
366    if (replicationEnv != null)
367    {
368      replicationEnv.shutdown();
369    }
370
371    if (firstException != null)
372    {
373      throw firstException;
374    }
375  }
376
377  private void shutdownCNIndexerAndPurger()
378  {
379    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
380    if (indexer != null)
381    {
382      indexer.initiateShutdown();
383    }
384    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
385    if (purger != null)
386    {
387      purger.initiateShutdown();
388    }
389
390    // wait for shutdown of the threads holding cursors
391    try
392    {
393      if (indexer != null)
394      {
395        indexer.join();
396      }
397      if (purger != null)
398      {
399        purger.join();
400      }
401    }
402    catch (InterruptedException e)
403    {
404      // do nothing: we are already shutting down
405    }
406  }
407
408  /**
409   * Clears all records from the changelog (does not remove the changelog itself).
410   *
411   * @throws ChangelogException
412   *           If an error occurs when clearing the changelog.
413   */
414  public void clearDB() throws ChangelogException
415  {
416    if (!dbDirectory.exists())
417    {
418      return;
419    }
420
421    // Remember the first exception because :
422    // - we want to try to remove everything we want to remove
423    // - then throw the first encountered exception
424    ChangelogException firstException = null;
425
426    for (DN baseDN : this.domainToReplicaDBs.keySet())
427    {
428      removeDomain(baseDN);
429    }
430
431    synchronized (cnIndexDBLock)
432    {
433      if (cnIndexDB != null)
434      {
435        try
436        {
437          cnIndexDB.clear();
438        }
439        catch (ChangelogException e)
440        {
441          firstException = e;
442        }
443
444        try
445        {
446          shutdownChangeNumberIndexDB();
447        }
448        catch (ChangelogException e)
449        {
450          if (firstException == null)
451          {
452            firstException = e;
453          }
454          else
455          {
456            logger.traceException(e);
457          }
458        }
459
460        cnIndexDB = null;
461      }
462    }
463
464    if (firstException != null)
465    {
466      throw firstException;
467    }
468  }
469
470  @Override
471  public void removeDB() throws ChangelogException
472  {
473    shutdownDB();
474    StaticUtils.recursiveDelete(dbDirectory);
475  }
476
477  @Override
478  public ServerState getDomainOldestCSNs(DN baseDN)
479  {
480    final ServerState result = new ServerState();
481    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
482    {
483      result.update(replicaDB.getOldestCSN());
484    }
485    return result;
486  }
487
488  @Override
489  public ServerState getDomainNewestCSNs(DN baseDN)
490  {
491    final ServerState result = new ServerState();
492    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
493    {
494      result.update(replicaDB.getNewestCSN());
495    }
496    return result;
497  }
498
499  @Override
500  public void removeDomain(DN baseDN) throws ChangelogException
501  {
502    // Remember the first exception because :
503    // - we want to try to remove everything we want to remove
504    // - then throw the first encountered exception
505    ChangelogException firstException = null;
506
507    // 1- clear the replica DBs
508    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
509    if (domainMap != null)
510    {
511      final ChangeNumberIndexer indexer = this.cnIndexer.get();
512      if (indexer != null)
513      {
514        indexer.clear(baseDN);
515      }
516      synchronized (domainMap)
517      {
518        domainMap = domainToReplicaDBs.remove(baseDN);
519        for (FileReplicaDB replicaDB : domainMap.values())
520        {
521          try
522          {
523            replicaDB.clear();
524          }
525          catch (ChangelogException e)
526          {
527            firstException = e;
528          }
529          replicaDB.shutdown();
530        }
531      }
532    }
533
534
535    // 2- clear the changelogstate DB
536    try
537    {
538      replicationEnv.clearGenerationId(baseDN);
539    }
540    catch (ChangelogException e)
541    {
542      if (firstException == null)
543      {
544        firstException = e;
545      }
546      else
547      {
548        logger.traceException(e);
549      }
550    }
551
552    if (firstException != null)
553    {
554      throw firstException;
555    }
556  }
557
558  @Override
559  public void setPurgeDelay(final long purgeDelayInMillis)
560  {
561    this.purgeDelayInMillis = purgeDelayInMillis;
562
563    // Rotation time interval for CN Index DB log file
564    // needs to be a fraction of the purge delay
565    // to ensure there is at least one file to purge
566    replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2);
567
568    if (purgeDelayInMillis > 0)
569    {
570      startCNPurger();
571    }
572    else
573    {
574      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
575      if (purgerToStop != null)
576      { // stop this purger
577        purgerToStop.initiateShutdown();
578      }
579    }
580  }
581
582  private void startCNPurger()
583  {
584    final ChangelogDBPurger newPurger = new ChangelogDBPurger();
585    if (cnPurger.compareAndSet(null, newPurger))
586    { // no purger was running, run this new one
587      newPurger.start();
588    }
589    else
590    { // a purger was already running, just wake that one up
591      // to verify if some entries can be purged
592      final ChangelogDBPurger currentPurger = cnPurger.get();
593      synchronized (currentPurger)
594      {
595        currentPurger.notify();
596      }
597    }
598  }
599
600  @Override
601  public void setComputeChangeNumber(final boolean computeChangeNumber)
602      throws ChangelogException
603  {
604    if (computeChangeNumber)
605    {
606      startIndexer();
607    }
608    else
609    {
610      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
611      if (indexer != null)
612      {
613        indexer.initiateShutdown();
614      }
615    }
616  }
617
618  void resetChangeNumberIndex(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException
619  {
620    if (!replicationServer.isChangeNumberEnabled())
621    {
622      throw new ChangelogException(ERR_REPLICATION_CHANGE_NUMBER_DISABLED.get(baseDN));
623    }
624    if (!getDomainNewestCSNs(baseDN).cover(newFirstCSN))
625    {
626      throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT.get(newFirstCN, baseDN,
627          newFirstCSN));
628    }
629    if (getDomainOldestCSNs(baseDN).getCSN(newFirstCSN.getServerId()).isNewerThan(newFirstCSN))
630    {
631      throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD.get(newFirstCN, newFirstCSN));
632    }
633
634    shutdownCNIndexerAndPurger();
635    synchronized (cnIndexDBLock)
636    {
637      cnIndexDB.clearAndSetChangeNumber(newFirstCN);
638      cnIndexDB.addRecord(new ChangeNumberIndexRecord(newFirstCN, baseDN, newFirstCSN));
639    }
640    startIndexer();
641    if (purgeDelayInMillis > 0)
642    {
643      startCNPurger();
644    }
645  }
646
647  private void startIndexer()
648  {
649    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
650    if (cnIndexer.compareAndSet(null, indexer))
651    {
652      indexer.start();
653    }
654  }
655
656  @Override
657  public ChangeNumberIndexDB getChangeNumberIndexDB()
658  {
659    synchronized (cnIndexDBLock)
660    {
661      if (cnIndexDB == null)
662      {
663        try
664        {
665          cnIndexDB = new FileChangeNumberIndexDB(this, replicationEnv);
666        }
667        catch (Exception e)
668        {
669          logger.traceException(e);
670          logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage());
671        }
672      }
673      return cnIndexDB;
674    }
675  }
676
677  @Override
678  public ReplicationDomainDB getReplicationDomainDB()
679  {
680    return this;
681  }
682
683  @Override
684  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options)
685      throws ChangelogException
686  {
687    final Set<DN> excludedDomainDns = Collections.emptySet();
688    return getCursorFrom(startState, options, excludedDomainDns);
689  }
690
691  @Override
692  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
693      CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException
694  {
695    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options);
696    registeredMultiDomainCursors.add(cursor);
697    for (DN baseDN : domainToReplicaDBs.keySet())
698    {
699      if (!excludedDomainDns.contains(baseDN))
700      {
701        cursor.addDomain(baseDN, startState.getServerState(baseDN));
702      }
703    }
704    return cursor;
705  }
706
707  @Override
708  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options)
709      throws ChangelogException
710  {
711    final DomainDBCursor cursor = newDomainDBCursor(baseDN, options);
712    for (int serverId : getDomainMap(baseDN).keySet())
713    {
714      // get the last already sent CSN from that server to get a cursor
715      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
716      cursor.addReplicaDB(serverId, lastCSN);
717    }
718    return cursor;
719  }
720
721  private DomainDBCursor newDomainDBCursor(final DN baseDN, final CursorOptions options)
722  {
723    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options);
724    putCursor(registeredDomainCursors, baseDN, cursor);
725    return cursor;
726  }
727
728  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
729  {
730    final MultiDomainServerState offlineReplicas =
731        replicationEnv.getChangelogState().getOfflineReplicas();
732    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
733    if (offlineCSN != null
734        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
735    {
736      return offlineCSN;
737    }
738    return null;
739  }
740
741  @Override
742  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
743      CursorOptions options) throws ChangelogException
744  {
745    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
746    if (replicaDB != null)
747    {
748      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
749      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
750          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
751      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
752      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
753      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
754
755      putCursor(replicaCursors, replicaId, replicaCursor);
756
757      return replicaCursor;
758    }
759    return EMPTY_CURSOR_REPLICA_DB;
760  }
761
762  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
763  {
764    CopyOnWriteArrayList<V> cursors = map.get(key);
765    if (cursors == null)
766    {
767      cursors = new CopyOnWriteArrayList<>();
768      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
769      if (previousValue != null)
770      {
771        cursors = previousValue;
772      }
773    }
774    cursors.add(cursor);
775  }
776
777  @Override
778  public void unregisterCursor(final DBCursor<?> cursor)
779  {
780    if (cursor instanceof MultiDomainDBCursor)
781    {
782      registeredMultiDomainCursors.remove(cursor);
783    }
784    else if (cursor instanceof DomainDBCursor)
785    {
786      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
787      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
788      if (cursors != null)
789      {
790        cursors.remove(cursor);
791      }
792    }
793    else if (cursor instanceof ReplicaCursor)
794    {
795      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
796      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
797      if (cursors != null)
798      {
799        cursors.remove(cursor);
800      }
801    }
802  }
803
804  @Override
805  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
806  {
807    final CSN csn = updateMsg.getCSN();
808    final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
809        csn.getServerId(), replicationServer);
810    final FileReplicaDB replicaDB = pair.getFirst();
811    replicaDB.add(updateMsg);
812
813    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
814
815    final ChangeNumberIndexer indexer = cnIndexer.get();
816    if (indexer != null)
817    {
818      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
819      indexer.publishUpdateMsg(baseDN, updateMsg);
820    }
821    return pair.getSecond(); // replica DB was created
822  }
823
824  @Override
825  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
826  {
827    final ChangeNumberIndexer indexer = cnIndexer.get();
828    if (indexer != null)
829    {
830      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
831      indexer.publishHeartbeat(baseDN, heartbeatCSN);
832    }
833  }
834
835  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
836      throws ChangelogException
837  {
838    if (indexer.isReplicaOffline(baseDN, serverId))
839    {
840      replicationEnv.notifyReplicaOnline(baseDN, serverId);
841    }
842    updateCursorsWithOfflineCSN(baseDN, serverId, null);
843  }
844
845  @Override
846  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
847  {
848    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
849    final ChangeNumberIndexer indexer = cnIndexer.get();
850    if (indexer != null)
851    {
852      indexer.replicaOffline(baseDN, offlineCSN);
853    }
854    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
855  }
856
857  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
858  {
859    final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
860    if (cursors != null)
861    {
862      for (ReplicaCursor cursor : cursors)
863      {
864        cursor.setOfflineCSN(offlineCSN);
865      }
866    }
867  }
868
869  /**
870   * The thread purging the changelogDB on a regular interval. Records are
871   * purged from the changelogDB if they are older than a delay specified in
872   * seconds. The purge process works in two steps:
873   * <ol>
874   * <li>first purge the changeNumberIndexDB and retrieve information to drive
875   * replicaDBs purging</li>
876   * <li>proceed to purge each replicaDBs based on the information collected
877   * when purging the changeNumberIndexDB</li>
878   * </ol>
879   */
880  private final class ChangelogDBPurger extends DirectoryThread
881  {
882    private static final int DEFAULT_SLEEP = 500;
883
884    protected ChangelogDBPurger()
885    {
886      super("Changelog DB purger");
887    }
888
889    @Override
890    public void run()
891    {
892      // initialize CNIndexDB
893      getChangeNumberIndexDB();
894      boolean canDisplayNothingToPurgeMsg = true;
895      while (!isShutdownInitiated())
896      {
897        try
898        {
899          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
900          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
901          final CSN oldestNotPurgedCSN;
902
903          if (!replicationServer.isChangeNumberEnabled() || !replicationServer.isECLEnabled())
904          {
905            oldestNotPurgedCSN = purgeCSN;
906          }
907          else
908          {
909            final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
910            if (localCNIndexDB == null)
911            { // shutdown has been initiated
912              return;
913            }
914
915            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
916            if (oldestNotPurgedCSN == null)
917            { // shutdown may have been initiated...
918              // ... or change number index DB determined there is nothing to purge,
919              // wait for new changes to come in.
920
921              // Note we cannot sleep for as long as the purge delay
922              // (3 days default), because we might receive late updates
923              // that will have to be purged before the purge delay elapses.
924              // This can particularly happen in case of network partitions.
925              if (!isShutdownInitiated())
926              {
927                synchronized (this)
928                {
929                  if (!isShutdownInitiated())
930                  {
931                    if (canDisplayNothingToPurgeMsg)
932                    {
933                      logger.trace("Nothing to purge, waiting for new changes");
934                      canDisplayNothingToPurgeMsg = false;
935                    }
936                    wait(DEFAULT_SLEEP);
937                  }
938                }
939              }
940              continue;
941            }
942          }
943
944          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
945          {
946            for (final FileReplicaDB replicaDB : domainMap.values())
947            {
948              replicaDB.purgeUpTo(oldestNotPurgedCSN);
949            }
950          }
951
952          if (!isShutdownInitiated())
953          {
954            synchronized (this)
955            {
956              if (!isShutdownInitiated())
957              {
958                final long sleepTime = computeSleepTimeUntilNextPurge(oldestNotPurgedCSN);
959                if (logger.isTraceEnabled())
960                {
961                  tracePurgeDetails(purgeCSN, oldestNotPurgedCSN, sleepTime);
962                  canDisplayNothingToPurgeMsg = true;
963                }
964                wait(sleepTime);
965              }
966            }
967          }
968        }
969        catch (InterruptedException e)
970        {
971          // shutdown initiated?
972        }
973        catch (Exception e)
974        {
975          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
976          if (replicationServer != null)
977          {
978            replicationServer.shutdown();
979          }
980        }
981      }
982    }
983
984    private void tracePurgeDetails(final CSN purgeCSN, final CSN oldestNotPurgedCSN, final long sleepTime)
985    {
986      if (purgeCSN.equals(oldestNotPurgedCSN.toStringUI()))
987      {
988        logger.trace("Purged up to %s. "
989            + "now sleeping until next purge during %s",
990            purgeCSN.toStringUI(), DurationUnit.toString(sleepTime));
991      }
992      else
993      {
994        logger.trace("Asked to purge up to %s, actually purged up to %s (not included). "
995            + "now sleeping until next purge during %s",
996            purgeCSN.toStringUI(), oldestNotPurgedCSN.toStringUI(), DurationUnit.toString(sleepTime));
997      }
998    }
999
1000    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
1001    {
1002      final long nextPurgeTime = notPurgedCSN.getTime();
1003      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
1004      if (currentPurgeTime < nextPurgeTime)
1005      {
1006        // sleep till the next CSN to purge,
1007        return nextPurgeTime - currentPurgeTime;
1008      }
1009      // wait a bit before purging more
1010      return DEFAULT_SLEEP;
1011    }
1012
1013    @Override
1014    public void initiateShutdown()
1015    {
1016      super.initiateShutdown();
1017      synchronized (this)
1018      {
1019        notify(); // wake up the purger thread for faster shutdown
1020      }
1021    }
1022  }
1023}