001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2013-2016 ForgeRock AS.
015 */
016package org.opends.server.replication.server.changelog.file;
017
018import java.util.Map.Entry;
019import java.util.Set;
020import java.util.concurrent.ConcurrentSkipListSet;
021
022import org.forgerock.i18n.slf4j.LocalizedLogger;
023import org.opends.server.api.DirectoryThread;
024import org.opends.server.backends.ChangelogBackend;
025import org.opends.server.replication.common.CSN;
026import org.opends.server.replication.common.MultiDomainServerState;
027import org.opends.server.replication.common.ServerState;
028import org.opends.server.replication.protocol.ReplicaOfflineMsg;
029import org.opends.server.replication.protocol.UpdateMsg;
030import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
031import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
032import org.opends.server.replication.server.changelog.api.ChangelogDB;
033import org.opends.server.replication.server.changelog.api.ChangelogException;
034import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
035import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
036import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
037import org.forgerock.opendj.ldap.DN;
038
039import static org.opends.messages.ReplicationMessages.*;
040import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
041import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
042import static org.opends.server.util.StaticUtils.*;
043
044/**
045 * Thread responsible for inserting replicated changes into the ChangeNumber
046 * Index DB (CNIndexDB for short).
047 * <p>
048 * Only changes older than the medium consistency point are inserted in the
049 * CNIndexDB. As a consequence this class is also responsible for maintaining
050 * the medium consistency point (indirectly through an
051 * {@link ECLMultiDomainDBCursor}).
052 */
053public class ChangeNumberIndexer extends DirectoryThread
054{
055  /** The tracer object for the debug logger. */
056  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
057
058  /**
059   * If it contains nothing, then the run method executes normally.
060   * Otherwise, the {@link #run()} method must clear its state
061   * for the supplied domain baseDNs. If a supplied domain is
062   * {@link DN#rootDN()}, then all domains will be cleared.
063   */
064  private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
065  private final ChangelogDB changelogDB;
066  private final ChangelogStateProvider changelogStateProvider;
067  private final ECLEnabledDomainPredicate predicate;
068
069  /*
070   * The following MultiDomainServerState fields must be thread safe, because
071   * 1) initialization can happen while the replication server starts receiving
072   * updates
073   * 2) many updates can happen concurrently.
074   */
075  /**
076   * Holds the last time each replica was seen alive, whether via updates or
077   * heartbeat notifications, or offline notifications. Data is held for each
078   * serverId cross domain.
079   * <p>
080   * Updates are persistent and stored in the replicaDBs, heartbeats are
081   * transient and are easily constructed on normal operations.
082   * <p>
083   * Note: This object is updated by both heartbeats and changes/updates.
084   */
085  private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
086
087  /** Note: This object is updated by replica offline messages. */
088  private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
089
090  /**
091   * Cursor across all the replicaDBs for all the replication domains. It is
092   * positioned on the next change that needs to be inserted in the CNIndexDB.
093   * <p>
094   * Note: it is only accessed from the {@link #run()} method.
095   *
096   * @NonNull
097   */
098  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
099  private MultiDomainServerState cookie = new MultiDomainServerState();
100
101  /**
102   * Builds a ChangeNumberIndexer object.
103   *  @param changelogDB
104   *          the changelogDB
105   * @param changelogStateProvider
106   *          the replication environment information for access to changelog state
107   */
108  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider)
109  {
110    this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate());
111  }
112
113  /**
114   * Builds a ChangeNumberIndexer object.
115   * @param changelogDB
116   *          the changelogDB
117   * @param changelogStateProvider
118   *          the changelog state used for initialization
119   * @param predicate
120   */
121  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider,
122      ECLEnabledDomainPredicate predicate)
123  {
124    super("Change number indexer");
125    this.changelogDB = changelogDB;
126    this.changelogStateProvider = changelogStateProvider;
127    this.predicate = predicate;
128  }
129
130  /**
131   * Ensures the medium consistency point is updated by heartbeats.
132   *
133   * @param baseDN
134   *          the baseDN of the domain for which the heartbeat is published
135   * @param heartbeatCSN
136   *          the CSN coming from the heartbeat
137   */
138  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
139  {
140    if (!predicate.isECLEnabledDomain(baseDN))
141    {
142      return;
143    }
144
145    final CSN oldestCSNBefore = getOldestLastAliveCSN();
146    lastAliveCSNs.update(baseDN, heartbeatCSN);
147    tryNotify(oldestCSNBefore);
148  }
149
150  /**
151   * Indicates if the replica corresponding to provided domain DN and server id
152   * is offline.
153   *
154   * @param domainDN
155   *          base DN of the replica
156   * @param serverId
157   *          server id of the replica
158   * @return {@code true} if replica is offline, {@code false} otherwise
159   */
160  public boolean isReplicaOffline(DN domainDN, int serverId)
161  {
162    return replicasOffline.getCSN(domainDN, serverId) != null;
163  }
164
165  /**
166   * Ensures the medium consistency point is updated by UpdateMsg.
167   *
168   * @param baseDN
169   *          the baseDN of the domain for which the heartbeat is published
170   * @param updateMsg
171   *          the updateMsg that will update the medium consistency point
172   * @throws ChangelogException
173   *           If a database problem happened
174   */
175  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
176      throws ChangelogException
177  {
178    if (!predicate.isECLEnabledDomain(baseDN))
179    {
180      return;
181    }
182
183    final CSN oldestCSNBefore = getOldestLastAliveCSN();
184    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
185    tryNotify(oldestCSNBefore);
186  }
187
188  /**
189   * Signals a replica went offline.
190   *
191   * @param baseDN
192   *          the replica's replication domain
193   * @param offlineCSN
194   *          the serverId and time of the replica that went offline
195   */
196  public void replicaOffline(DN baseDN, CSN offlineCSN)
197  {
198    if (!predicate.isECLEnabledDomain(baseDN))
199    {
200      return;
201    }
202
203    replicasOffline.update(baseDN, offlineCSN);
204    final CSN oldestCSNBefore = getOldestLastAliveCSN();
205    lastAliveCSNs.update(baseDN, offlineCSN);
206    tryNotify(oldestCSNBefore);
207  }
208
209  private CSN getOldestLastAliveCSN()
210  {
211    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
212  }
213
214  /**
215   * Notifies the Change number indexer thread if it will be able to do some
216   * work.
217   */
218  private void tryNotify(final CSN oldestCSNBefore)
219  {
220    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
221    {
222      synchronized (this)
223      {
224        notify();
225      }
226    }
227  }
228
229  /**
230   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
231   * have some work to do.
232   */
233  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
234  {
235    final CSN oldestCSNAfter = getOldestLastAliveCSN();
236    // ensure that all initial replicas alive information have been updated
237    // with CSNs that are acceptable for moving the medium consistency forward
238    return allInitialReplicasAreOfflineOrAlive()
239        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
240        // has the oldest CSN changed?
241        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
242  }
243
244  /**
245   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
246   * must be persisted to the change number index DB.
247   */
248  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
249  {
250    // ensure that all initial replicas alive information have been updated
251    // with CSNs that are acceptable for moving the medium consistency forward
252    return allInitialReplicasAreOfflineOrAlive()
253        // can we persist the next CSN?
254        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
255  }
256
257  /**
258   * Returns true only if the initial replicas known from the changelog state DB
259   * are either:
260   * <ul>
261   * <li>offline, so do not wait for them in order to compute medium consistency
262   * </li>
263   * <li>alive, because we received heartbeats or changes (so their last alive
264   * CSN has been updated to something past the oldest possible CSN), we have
265   * enough info to compute medium consistency</li>
266   * </ul>
267   * In both cases, we have enough information to compute medium consistency
268   * without waiting any further.
269   */
270  private boolean allInitialReplicasAreOfflineOrAlive()
271  {
272    for (DN baseDN : lastAliveCSNs)
273    {
274      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
275      {
276        if (csn.getTime() == 0
277            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
278        {
279          // this is the oldest possible CSN, but the replica is not offline
280          // we must wait for more up to date information from this replica
281          return false;
282        }
283      }
284    }
285    return true;
286  }
287
288  /**
289   * Restores in memory data needed to build the CNIndexDB. In particular,
290   * initializes the changes cursor to the medium consistency point.
291   */
292  private void initialize() throws ChangelogException
293  {
294    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
295
296    initializeLastAliveCSNs(domainDB);
297    initializeNextChangeCursor(domainDB);
298    initializeOfflineReplicas();
299  }
300
301  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
302  {
303    // Initialize the multi domain cursor only from the change number index record.
304    // The cookie is always empty at this stage.
305    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
306    final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
307    final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
308    final MultiDomainServerState unused = new MultiDomainServerState();
309    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
310
311    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
312    ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
313  }
314
315  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
316  {
317    for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet())
318    {
319      final DN baseDN = entry.getKey();
320      if (predicate.isECLEnabledDomain(baseDN))
321      {
322        for (Integer serverId : entry.getValue())
323        {
324          /*
325           * initialize with the oldest possible CSN in order for medium
326           * consistency to wait for all replicas to be alive before moving forward
327           */
328          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
329        }
330
331        final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
332        lastAliveCSNs.update(baseDN, latestKnownState);
333      }
334    }
335  }
336
337  private void initializeOfflineReplicas()
338  {
339    final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas();
340    for (DN baseDN : offlineReplicas)
341    {
342      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
343      {
344        if (predicate.isECLEnabledDomain(baseDN))
345        {
346          replicasOffline.update(baseDN, offlineCSN);
347          // a replica offline message could also be the very last time
348          // we heard from this replica :)
349          lastAliveCSNs.update(baseDN, offlineCSN);
350        }
351      }
352    }
353  }
354
355  private CSN oldestPossibleCSN(int serverId)
356  {
357    return new CSN(0, 0, serverId);
358  }
359
360  /** {@inheritDoc} */
361  @Override
362  public void initiateShutdown()
363  {
364    super.initiateShutdown();
365    synchronized (this)
366    {
367      notify();
368    }
369  }
370
371  /** {@inheritDoc} */
372  @Override
373  public void run()
374  {
375    try
376    {
377      /*
378       * initialize here to allow fast application start up and avoid errors due
379       * cursors being created in a different thread to the one where they are used.
380       */
381      initialize();
382
383      while (!isShutdownInitiated())
384      {
385        try
386        {
387          while (!domainsToClear.isEmpty())
388          {
389            final DN baseDNToClear = domainsToClear.first();
390            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
391            // Only release the waiting thread
392            // once this domain's state has been cleared.
393            domainsToClear.remove(baseDNToClear);
394          }
395          if (nextChangeForInsertDBCursor.shouldReInitialize())
396          {
397            nextChangeForInsertDBCursor.close();
398            initialize();
399          }
400          // Do not call DBCursor.next() here
401          // because we might not have consumed the last record,
402          // for example if we could not move the MCP forward
403          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
404          if (msg == null)
405          {
406            synchronized (this)
407            {
408              if (isShutdownInitiated())
409              {
410                continue;
411              }
412              wait();
413            }
414            // check whether new changes have been added to the ReplicaDBs
415            moveToNextChange();
416            continue;
417          }
418          else if (msg instanceof ReplicaOfflineMsg)
419          {
420            moveToNextChange();
421            continue;
422          }
423
424          final CSN csn = msg.getCSN();
425          final DN baseDN = nextChangeForInsertDBCursor.getData();
426          // FIXME problem: what if the serverId is not part of the ServerState?
427          // right now, change number will be blocked
428          if (!canMoveForwardMediumConsistencyPoint(csn))
429          {
430            // the oldest record to insert is newer than the medium consistency
431            // point. Let's wait for a change that can be published.
432            synchronized (this)
433            {
434              // double check to protect against a missed call to notify()
435              if (!canMoveForwardMediumConsistencyPoint(csn))
436              {
437                if (isShutdownInitiated())
438                {
439                  return;
440                }
441                wait();
442                // loop to check if changes older than the medium consistency
443                // point have been added to the ReplicaDBs
444                continue;
445              }
446            }
447          }
448
449          // OK, the oldest change is older than the medium consistency point
450          // let's publish it to the CNIndexDB.
451          final long changeNumber = changelogDB.getChangeNumberIndexDB()
452              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
453          if (!cookie.update(baseDN, csn))
454          {
455            throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn
456                + ") would have updated the cookie=" + cookie + ", but it did not");
457          }
458          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
459          moveForwardMediumConsistencyPoint(csn, baseDN);
460        }
461        catch (InterruptedException ignored)
462        {
463          // was shutdown called? loop to figure it out.
464          Thread.currentThread().interrupt();
465        }
466      }
467    }
468    catch (RuntimeException e)
469    {
470      logUnexpectedException(e);
471      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
472      throw e;
473    }
474    catch (Exception e)
475    {
476      logUnexpectedException(e);
477      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
478      throw new RuntimeException(e);
479    }
480    finally
481    {
482      nextChangeForInsertDBCursor.close();
483      nextChangeForInsertDBCursor = null;
484    }
485  }
486
487  private void moveToNextChange() throws ChangelogException
488  {
489    try
490    {
491      nextChangeForInsertDBCursor.next();
492    }
493    catch (AbortedChangelogCursorException e) {
494      if (domainsToClear.isEmpty())
495      {
496        // There is no domain to clear, thus it is
497        // not expected that a cursor is aborted
498        throw e;
499      }
500      // else assumes the aborted cursor is part of a domain
501      // that will be removed on the next iteration
502      logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s",
503          e, domainsToClear.size());
504    }
505  }
506
507  /**
508   * Notifies the {@link ChangelogBackend} that a new entry has been added.
509   *
510   * @param baseDN
511   *          the baseDN of the newly added entry.
512   * @param changeNumber
513   *          the change number of the newly added entry. It will be greater
514   *          than zero for entries added to the change number index and less
515   *          than or equal to zero for entries added to any replica DB
516   * @param cookie
517   *          the cookie of the newly added entry. This is only meaningful for
518   *          entries added to the change number index
519   * @param msg
520   *          the update message of the newly added entry
521   * @throws ChangelogException
522   *           If a problem occurs while notifying of the newly added entry.
523   */
524  protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
525      MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
526  {
527    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg);
528  }
529
530  /**
531   * Nothing can be done about it.
532   * <p>
533   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
534   * alert.
535   * <p>
536   * Message logged here gives corrective information to the administrator.
537   */
538  private void logUnexpectedException(Exception e)
539  {
540    logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
541        getClass().getSimpleName(), stackTraceToSingleLineString(e));
542  }
543
544  private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
545  {
546    final int mcServerId = mcCSN.getServerId();
547    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
548    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
549    if (offlineCSN != null)
550    {
551      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
552      {
553        // replica is back online, we can forget the last time it was offline
554        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
555      }
556      else if (offlineCSN.isOlderThan(mcCSN))
557      {
558        /*
559         * replica is not back online, Medium consistency point has gone past
560         * its last offline time, and there are no more changes after the
561         * offline CSN in the cursor: remove everything known about it
562         * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
563         * from the medium consistency RUV).
564         */
565        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
566      }
567    }
568
569    // advance the cursor we just read from,
570    // success/failure will be checked later
571    nextChangeForInsertDBCursor.next();
572  }
573
574  /**
575   * Asks the current thread to clear its state for the specified domain.
576   * <p>
577   * Note: This method blocks the current thread until state is cleared.
578   *
579   * @param baseDN
580   *          the baseDN to be cleared from this thread's state. {@code null} and
581   *          {@link DN#rootDN()} mean "clear all domains".
582   */
583  public void clear(DN baseDN)
584  {
585    final DN baseDNToClear = baseDN != null ? baseDN : DN.rootDN();
586    domainsToClear.add(baseDNToClear);
587    while (domainsToClear.contains(baseDNToClear)
588        && !State.TERMINATED.equals(getState()))
589    {
590      // wait until clear() has been done by thread, always waking it up
591      synchronized (this)
592      {
593        notify();
594      }
595      // ensures thread wait that this thread's state is cleaned up
596      Thread.yield();
597    }
598  }
599}