001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2017 ForgeRock AS. 015 */ 016package org.opends.server.replication.server.changelog.file; 017 018import java.util.Iterator; 019import java.util.Map.Entry; 020import java.util.concurrent.ConcurrentSkipListMap; 021 022import net.jcip.annotations.NotThreadSafe; 023 024import org.opends.server.replication.common.CSN; 025import org.opends.server.replication.protocol.UpdateMsg; 026import org.opends.server.replication.server.changelog.api.ChangelogException; 027import org.opends.server.replication.server.changelog.api.DBCursor; 028import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 029import org.forgerock.opendj.ldap.DN; 030 031/** Cursor iterating over a replication domain's replica DBs. */ 032@NotThreadSafe 033public class DomainDBCursor extends CompositeDBCursor<Void> 034{ 035 private final DN baseDN; 036 private final ReplicationDomainDB domainDB; 037 private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<>(); 038 private final CursorOptions options; 039 040 /** 041 * Builds a DomainDBCursor instance. 042 * 043 * @param baseDN 044 * the replication domain baseDN of this cursor 045 * @param domainDB 046 * the DB for the provided replication domain 047 * @param options The cursor options 048 */ 049 public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, CursorOptions options) 050 { 051 this.baseDN = baseDN; 052 this.domainDB = domainDB; 053 this.options = options; 054 } 055 056 /** 057 * Returns the replication domain baseDN of this cursor. 058 * 059 * @return the replication domain baseDN of this cursor. 060 */ 061 public DN getBaseDN() 062 { 063 return baseDN; 064 } 065 066 /** 067 * Adds a replicaDB for this cursor to iterate over. Added cursors will be 068 * created and iterated over on the next call to {@link #next()}. 069 * 070 * @param serverId 071 * the serverId of the replica 072 * @param startCSN 073 * the CSN to use as a starting point 074 */ 075 public void addReplicaDB(int serverId, CSN startCSN) 076 { 077 // only keep the oldest CSN that will be the new cursor's starting point 078 newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : CSN.MIN_VALUE); 079 } 080 081 @Override 082 protected void incorporateNewCursors() throws ChangelogException 083 { 084 for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();) 085 { 086 final Entry<Integer, CSN> pair = iter.next(); 087 final int serverId = pair.getKey(); 088 final CSN csn = pair.getValue(); 089 final CSN startCSN = !CSN.MIN_VALUE.equals(csn) ? csn : null; 090 final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startCSN, options); 091 addCursor(cursor, null); 092 iter.remove(); 093 } 094 } 095 096 @Override 097 public void close() 098 { 099 super.close(); 100 domainDB.unregisterCursor(this); 101 newReplicas.clear(); 102 } 103 104}