001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2014-2017 ForgeRock AS.
015 */
016package org.opends.server.replication.server.changelog.file;
017
018import java.util.Iterator;
019import java.util.Map.Entry;
020import java.util.concurrent.ConcurrentSkipListMap;
021
022import net.jcip.annotations.NotThreadSafe;
023
024import org.opends.server.replication.common.CSN;
025import org.opends.server.replication.protocol.UpdateMsg;
026import org.opends.server.replication.server.changelog.api.ChangelogException;
027import org.opends.server.replication.server.changelog.api.DBCursor;
028import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
029import org.forgerock.opendj.ldap.DN;
030
031/** Cursor iterating over a replication domain's replica DBs. */
032@NotThreadSafe
033public class DomainDBCursor extends CompositeDBCursor<Void>
034{
035  private final DN baseDN;
036  private final ReplicationDomainDB domainDB;
037  private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<>();
038  private final CursorOptions options;
039
040  /**
041   * Builds a DomainDBCursor instance.
042   *
043   * @param baseDN
044   *          the replication domain baseDN of this cursor
045   * @param domainDB
046   *          the DB for the provided replication domain
047   * @param options The cursor options
048   */
049  public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, CursorOptions options)
050  {
051    this.baseDN = baseDN;
052    this.domainDB = domainDB;
053    this.options = options;
054  }
055
056  /**
057   * Returns the replication domain baseDN of this cursor.
058   *
059   * @return the replication domain baseDN of this cursor.
060   */
061  public DN getBaseDN()
062  {
063    return baseDN;
064  }
065
066  /**
067   * Adds a replicaDB for this cursor to iterate over. Added cursors will be
068   * created and iterated over on the next call to {@link #next()}.
069   *
070   * @param serverId
071   *          the serverId of the replica
072   * @param startCSN
073   *          the CSN to use as a starting point
074   */
075  public void addReplicaDB(int serverId, CSN startCSN)
076  {
077    // only keep the oldest CSN that will be the new cursor's starting point
078    newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : CSN.MIN_VALUE);
079  }
080
081  @Override
082  protected void incorporateNewCursors() throws ChangelogException
083  {
084    for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
085    {
086      final Entry<Integer, CSN> pair = iter.next();
087      final int serverId = pair.getKey();
088      final CSN csn = pair.getValue();
089      final CSN startCSN = !CSN.MIN_VALUE.equals(csn) ? csn : null;
090      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startCSN, options);
091      addCursor(cursor, null);
092      iter.remove();
093    }
094  }
095
096  @Override
097  public void close()
098  {
099    super.close();
100    domainDB.unregisterCursor(this);
101    newReplicas.clear();
102  }
103
104}