001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2012-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import static org.opends.messages.ReplicationMessages.*; 020 021import java.util.TreeMap; 022 023import net.jcip.annotations.ThreadSafe; 024 025import org.forgerock.i18n.slf4j.LocalizedLogger; 026import org.opends.server.replication.common.CSN; 027import org.opends.server.replication.protocol.UpdateMsg; 028 029/** 030 * This class is used to build ordered lists of UpdateMsg. 031 * The order is defined by the order of the CSN of the UpdateMsg. 032 */ 033@ThreadSafe 034public class MsgQueue 035{ 036 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 037 038 private TreeMap<CSN, UpdateMsg> map = new TreeMap<>(); 039 /** 040 * FIXME JNR to be investigated: 041 * I strongly suspect that we could replace this field 042 * by using the synchronized keyword on each method. 043 * However, MessageHandler is weirdly synchronizing on msgQueue field 044 * even though it is touching the lateQueue field (?!?). 045 */ 046 private final Object lock = new Object(); 047 048 /** The total number of bytes for all the message in the queue. */ 049 private int bytesCount; 050 051 /** 052 * Return the first UpdateMsg in the MsgQueue. 053 * 054 * @return The first UpdateMsg in the MsgQueue. 055 */ 056 public UpdateMsg first() 057 { 058 synchronized (lock) 059 { 060 return map.get(map.firstKey()); 061 } 062 } 063 064 /** 065 * Returns the number of elements in this MsgQueue. 066 * 067 * @return The number of elements in this MsgQueue. 068 */ 069 public int count() 070 { 071 synchronized (lock) 072 { 073 return map.size(); 074 } 075 } 076 077 /** 078 * Returns the number of bytes in this MsgQueue. 079 * 080 * @return The number of bytes in this MsgQueue. 081 */ 082 public int bytesCount() 083 { 084 synchronized (lock) 085 { 086 return bytesCount; 087 } 088 } 089 090 /** 091 * Returns <tt>true</tt> if this MsgQueue contains no UpdateMsg. 092 * 093 * @return <tt>true</tt> if this MsgQueue contains no UpdateMsg. 094 */ 095 public boolean isEmpty() 096 { 097 synchronized (lock) 098 { 099 return map.isEmpty(); 100 } 101 } 102 103 /** 104 * Add an UpdateMsg to this MessageQueue. 105 * 106 * @param update The UpdateMsg to add to this MessageQueue. 107 */ 108 public void add(UpdateMsg update) 109 { 110 synchronized (lock) 111 { 112 final UpdateMsg msgSameCSN = map.put(update.getCSN(), update); 113 if (msgSameCSN != null) 114 { 115 try 116 { 117 if (msgSameCSN.getBytes().length != update.getBytes().length 118 || msgSameCSN.isAssured() != update.isAssured() 119 || msgSameCSN.getVersion() != update.getVersion()) 120 { 121 // Adding 2 msgs with the same CSN is ok only when the 2 msgs are the same 122 bytesCount += update.size() - msgSameCSN.size(); 123 logger.error(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CSN, msgSameCSN.getCSN(), msgSameCSN, update); 124 } 125 } 126 catch (Exception e) 127 { 128 logger.traceException(e); 129 } 130 } 131 else 132 { 133 // it is really an ADD 134 bytesCount += update.size(); 135 } 136 } 137 } 138 139 /** 140 * Get and remove the first UpdateMsg in this MessageQueue. 141 * 142 * @return The first UpdateMsg in this MessageQueue. 143 */ 144 public UpdateMsg removeFirst() 145 { 146 synchronized (lock) 147 { 148 // FIXME JNR replace next 2 lines with just that one: 149 // final UpdateMsg update = map.pollFirstEntry().getValue(); 150 final UpdateMsg update = map.get(map.firstKey()); 151 map.remove(update.getCSN()); 152 bytesCount -= update.size(); 153 if (map.isEmpty() && bytesCount != 0) 154 { 155 // should never happen 156 logger.error(ERR_BYTE_COUNT, bytesCount); 157 bytesCount = 0; 158 } 159 return update; 160 } 161 } 162 163 /** 164 * Returns <tt>true</tt> if this map contains an UpdateMsg 165 * with the same CSN as the given UpdateMsg. 166 * 167 * @param msg UpdateMsg whose presence in this queue is to be tested. 168 * 169 * @return <tt>true</tt> if this map contains an UpdateMsg 170 * with the same CSN as the given UpdateMsg. 171 */ 172 public boolean contains(UpdateMsg msg) 173 { 174 synchronized (lock) 175 { 176 return map.containsKey(msg.getCSN()); 177 } 178 } 179 180 /** Removes all UpdateMsg form this queue. */ 181 public void clear() 182 { 183 synchronized (lock) 184 { 185 map.clear(); 186 bytesCount = 0; 187 } 188 } 189 190 /** 191 * Consumes all the messages in this queue up to and including the passed in 192 * message. If the passed in message is not contained in the current queue, 193 * then all messages will be removed from it. 194 * 195 * @param finalMsg 196 * the final message to reach when consuming messages from this queue 197 */ 198 public void consumeUpTo(UpdateMsg finalMsg) 199 { 200 // FIXME this code could be more efficient if the msgQueue could call the 201 // following code (to be tested): 202 // if (!map.containsKey(finalMsg.getCSN())) { 203 // map.clear(); 204 // } else { 205 // map.headMap(finalMsg.getCSN(), true).clear(); 206 // } 207 208 final CSN finalCSN = finalMsg.getCSN(); 209 UpdateMsg msg; 210 do 211 { 212 msg = removeFirst(); 213 } 214 while (!finalCSN.equals(msg.getCSN())); 215 } 216 217 @Override 218 public String toString() 219 { 220 return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue=" + map.values(); 221 } 222}