001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2016 ForgeRock AS. 015 */ 016package org.opends.server.replication.server.changelog.file; 017 018import java.util.concurrent.atomic.AtomicReference; 019 020import org.opends.server.replication.common.CSN; 021import org.opends.server.replication.protocol.ReplicaOfflineMsg; 022import org.opends.server.replication.protocol.UpdateMsg; 023import org.opends.server.replication.server.changelog.api.ChangelogException; 024import org.opends.server.replication.server.changelog.api.DBCursor; 025import org.opends.server.replication.server.changelog.api.ReplicaId; 026import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 027 028/** 029 * {@link DBCursor} over a replica returning {@link UpdateMsg}s. 030 * <p> 031 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly 032 * return replica offline messages when the decorated DBCursor is exhausted and 033 * the offline CSN is newer than the last returned update CSN. 034 */ 035public class ReplicaCursor implements DBCursor<UpdateMsg> 036{ 037 /** @NonNull */ 038 private final DBCursor<UpdateMsg> cursor; 039 private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg = new AtomicReference<>(); 040 private UpdateMsg currentRecord; 041 042 private final ReplicaId replicaId; 043 private final ReplicationDomainDB domainDB; 044 045 /** 046 * Creates a ReplicaCursor object with a cursor to decorate 047 * and an offlineCSN to return as part of a ReplicaOfflineMsg. 048 * 049 * @param cursor 050 * the non-null underlying cursor that needs to be exhausted before 051 * we return a ReplicaOfflineMsg 052 * @param offlineCSN 053 * the offline CSN from which to builder the 054 * {@link ReplicaOfflineMsg} to return 055 * @param replicaId 056 * the replica identifier 057 * @param domainDB 058 * the DB for the provided replication domain 059 */ 060 public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, ReplicaId replicaId, ReplicationDomainDB domainDB) 061 { 062 this.cursor = cursor; 063 this.replicaId = replicaId; 064 this.domainDB = domainDB; 065 setOfflineCSN(offlineCSN); 066 } 067 068 /** 069 * Sets the offline CSN to be returned by this cursor. 070 * 071 * @param offlineCSN 072 * The offline CSN to be returned by this cursor. 073 * If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg 074 */ 075 public void setOfflineCSN(CSN offlineCSN) 076 { 077 if (offlineCSN != null) 078 { 079 ReplicaOfflineMsg prevOfflineMsg = this.replicaOfflineMsg.get(); 080 if (prevOfflineMsg == null || prevOfflineMsg.getCSN().isOlderThan(offlineCSN)) 081 { 082 // Do not spin if the the message for this replica has been changed. Either a newer 083 // message has arrived or the next cursor iteration will pick it up. 084 this.replicaOfflineMsg.compareAndSet(prevOfflineMsg, new ReplicaOfflineMsg(offlineCSN)); 085 } 086 } 087 else 088 { 089 this.replicaOfflineMsg.set(null); 090 } 091 } 092 093 /** {@inheritDoc} */ 094 @Override 095 public UpdateMsg getRecord() 096 { 097 return currentRecord; 098 } 099 100 /** 101 * Returns the replica identifier that this cursor is associated to. 102 * 103 * @return the replica identifier that this cursor is associated to 104 */ 105 public ReplicaId getReplicaId() 106 { 107 return replicaId; 108 } 109 110 /** {@inheritDoc} */ 111 @Override 112 public boolean next() throws ChangelogException 113 { 114 final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get(); 115 if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord)) 116 { 117 replicaOfflineMsg.compareAndSet(offlineMsg1, null); 118 } 119 120 // now verify if new changes have been added to the DB 121 // (cursors are automatically restarted) 122 final UpdateMsg lastUpdate = cursor.getRecord(); 123 final boolean hasNext = cursor.next(); 124 if (hasNext) 125 { 126 currentRecord = cursor.getRecord(); 127 return true; 128 } 129 130 // replicaDB just happened to be exhausted now 131 final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get(); 132 if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate)) 133 { 134 replicaOfflineMsg.compareAndSet(offlineMsg2, null); 135 currentRecord = null; 136 return false; 137 } 138 currentRecord = offlineMsg2; 139 return currentRecord != null; 140 } 141 142 /** It could also mean that the replica offline message has already been consumed. */ 143 private boolean isReplicaOfflineMsgOutdated( 144 final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg) 145 { 146 return offlineMsg != null 147 && updateMsg != null 148 && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN()); 149 } 150 151 /** {@inheritDoc} */ 152 @Override 153 public void close() 154 { 155 cursor.close(); 156 domainDB.unregisterCursor(this); 157 } 158 159 /** {@inheritDoc} */ 160 @Override 161 public String toString() 162 { 163 final ReplicaOfflineMsg msg = replicaOfflineMsg.get(); 164 return getClass().getSimpleName() 165 + " currentRecord=" + currentRecord 166 + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null) 167 + " cursor=" + cursor.toString().split("", 2)[1]; 168 } 169 170}