001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2014-2016 ForgeRock AS.
015 */
016package org.opends.server.replication.server.changelog.file;
017
018import java.util.concurrent.atomic.AtomicReference;
019
020import org.opends.server.replication.common.CSN;
021import org.opends.server.replication.protocol.ReplicaOfflineMsg;
022import org.opends.server.replication.protocol.UpdateMsg;
023import org.opends.server.replication.server.changelog.api.ChangelogException;
024import org.opends.server.replication.server.changelog.api.DBCursor;
025import org.opends.server.replication.server.changelog.api.ReplicaId;
026import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
027
028/**
029 * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
030 * <p>
031 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
032 * return replica offline messages when the decorated DBCursor is exhausted and
033 * the offline CSN is newer than the last returned update CSN.
034 */
035public class ReplicaCursor implements DBCursor<UpdateMsg>
036{
037  /** @NonNull */
038  private final DBCursor<UpdateMsg> cursor;
039  private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg = new AtomicReference<>();
040  private UpdateMsg currentRecord;
041
042  private final ReplicaId replicaId;
043  private final ReplicationDomainDB domainDB;
044
045  /**
046   * Creates a ReplicaCursor object with a cursor to decorate
047   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
048   *
049   * @param cursor
050   *          the non-null underlying cursor that needs to be exhausted before
051   *          we return a ReplicaOfflineMsg
052   * @param offlineCSN
053   *          the offline CSN from which to builder the
054   *          {@link ReplicaOfflineMsg} to return
055   * @param replicaId
056   *          the replica identifier
057   * @param domainDB
058   *          the DB for the provided replication domain
059   */
060  public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, ReplicaId replicaId, ReplicationDomainDB domainDB)
061  {
062    this.cursor = cursor;
063    this.replicaId = replicaId;
064    this.domainDB = domainDB;
065    setOfflineCSN(offlineCSN);
066  }
067
068  /**
069   * Sets the offline CSN to be returned by this cursor.
070   *
071   * @param offlineCSN
072   *          The offline CSN to be returned by this cursor.
073   *          If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
074   */
075  public void setOfflineCSN(CSN offlineCSN)
076  {
077    if (offlineCSN != null)
078    {
079      ReplicaOfflineMsg prevOfflineMsg = this.replicaOfflineMsg.get();
080      if (prevOfflineMsg == null || prevOfflineMsg.getCSN().isOlderThan(offlineCSN))
081      {
082        // Do not spin if the the message for this replica has been changed. Either a newer
083        // message has arrived or the next cursor iteration will pick it up.
084        this.replicaOfflineMsg.compareAndSet(prevOfflineMsg, new ReplicaOfflineMsg(offlineCSN));
085      }
086    }
087    else
088    {
089      this.replicaOfflineMsg.set(null);
090    }
091  }
092
093  /** {@inheritDoc} */
094  @Override
095  public UpdateMsg getRecord()
096  {
097    return currentRecord;
098  }
099
100  /**
101   * Returns the replica identifier that this cursor is associated to.
102   *
103   * @return the replica identifier that this cursor is associated to
104   */
105  public ReplicaId getReplicaId()
106  {
107    return replicaId;
108  }
109
110  /** {@inheritDoc} */
111  @Override
112  public boolean next() throws ChangelogException
113  {
114    final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
115    if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
116    {
117      replicaOfflineMsg.compareAndSet(offlineMsg1, null);
118    }
119
120    // now verify if new changes have been added to the DB
121    // (cursors are automatically restarted)
122    final UpdateMsg lastUpdate = cursor.getRecord();
123    final boolean hasNext = cursor.next();
124    if (hasNext)
125    {
126      currentRecord = cursor.getRecord();
127      return true;
128    }
129
130    // replicaDB just happened to be exhausted now
131    final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
132    if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
133    {
134      replicaOfflineMsg.compareAndSet(offlineMsg2, null);
135      currentRecord = null;
136      return false;
137    }
138    currentRecord = offlineMsg2;
139    return currentRecord != null;
140  }
141
142  /** It could also mean that the replica offline message has already been consumed. */
143  private boolean isReplicaOfflineMsgOutdated(
144      final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
145  {
146    return offlineMsg != null
147        && updateMsg != null
148        && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
149  }
150
151  /** {@inheritDoc} */
152  @Override
153  public void close()
154  {
155    cursor.close();
156    domainDB.unregisterCursor(this);
157  }
158
159  /** {@inheritDoc} */
160  @Override
161  public String toString()
162  {
163    final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
164    return getClass().getSimpleName()
165        + " currentRecord=" + currentRecord
166        + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
167        + " cursor=" + cursor.toString().split("", 2)[1];
168  }
169
170}