001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2012-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import static org.opends.messages.ReplicationMessages.*;
020
021import java.util.TreeMap;
022
023import net.jcip.annotations.ThreadSafe;
024
025import org.forgerock.i18n.slf4j.LocalizedLogger;
026import org.opends.server.replication.common.CSN;
027import org.opends.server.replication.protocol.UpdateMsg;
028
029/**
030 * This class is used to build ordered lists of UpdateMsg.
031 * The order is defined by the order of the CSN of the UpdateMsg.
032 */
033@ThreadSafe
034public class MsgQueue
035{
036  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
037
038  private TreeMap<CSN, UpdateMsg> map = new TreeMap<>();
039  /**
040   * FIXME JNR to be investigated:
041   * I strongly suspect that we could replace this field
042   * by using the synchronized keyword on each method.
043   * However, MessageHandler is weirdly synchronizing on msgQueue field
044   * even though it is touching the lateQueue field (?!?).
045   */
046  private final Object lock = new Object();
047
048  /** The total number of bytes for all the message in the queue. */
049  private int bytesCount;
050
051  /**
052   * Return the first UpdateMsg in the MsgQueue.
053   *
054   * @return The first UpdateMsg in the MsgQueue.
055   */
056  public UpdateMsg first()
057  {
058    synchronized (lock)
059    {
060      return map.get(map.firstKey());
061    }
062  }
063
064  /**
065   * Returns the number of elements in this MsgQueue.
066   *
067   * @return The number of elements in this MsgQueue.
068   */
069  public int count()
070  {
071    synchronized (lock)
072    {
073      return map.size();
074    }
075  }
076
077  /**
078   * Returns the number of bytes in this MsgQueue.
079   *
080   * @return The number of bytes in this MsgQueue.
081   */
082  public int bytesCount()
083  {
084    synchronized (lock)
085    {
086      return bytesCount;
087    }
088  }
089
090  /**
091   * Returns <tt>true</tt> if this MsgQueue contains no UpdateMsg.
092   *
093   * @return <tt>true</tt> if this MsgQueue contains no UpdateMsg.
094   */
095  public boolean isEmpty()
096  {
097    synchronized (lock)
098    {
099      return map.isEmpty();
100    }
101  }
102
103  /**
104   * Add an UpdateMsg to this MessageQueue.
105   *
106   * @param update The UpdateMsg to add to this MessageQueue.
107   */
108  public void add(UpdateMsg update)
109  {
110    synchronized (lock)
111    {
112      final UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
113      if (msgSameCSN != null)
114      {
115        try
116        {
117          if (msgSameCSN.getBytes().length != update.getBytes().length
118              || msgSameCSN.isAssured() != update.isAssured()
119              || msgSameCSN.getVersion() != update.getVersion())
120          {
121            // Adding 2 msgs with the same CSN is ok only when the 2 msgs are the same
122            bytesCount += update.size() - msgSameCSN.size();
123            logger.error(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CSN, msgSameCSN.getCSN(), msgSameCSN, update);
124          }
125        }
126        catch (Exception e)
127        {
128          logger.traceException(e);
129        }
130      }
131      else
132      {
133        // it is really an ADD
134        bytesCount += update.size();
135      }
136    }
137  }
138
139  /**
140   * Get and remove the first UpdateMsg in this MessageQueue.
141   *
142   * @return The first UpdateMsg in this MessageQueue.
143   */
144  public UpdateMsg removeFirst()
145  {
146    synchronized (lock)
147    {
148      // FIXME JNR replace next 2 lines with just that one:
149      // final UpdateMsg update = map.pollFirstEntry().getValue();
150      final UpdateMsg update = map.get(map.firstKey());
151      map.remove(update.getCSN());
152      bytesCount -= update.size();
153      if (map.isEmpty() && bytesCount != 0)
154      {
155        // should never happen
156        logger.error(ERR_BYTE_COUNT, bytesCount);
157        bytesCount = 0;
158      }
159      return update;
160    }
161  }
162
163  /**
164   * Returns <tt>true</tt> if this map contains an UpdateMsg
165   * with the same CSN as the given UpdateMsg.
166   *
167   * @param msg UpdateMsg whose presence in this queue is to be tested.
168   *
169   * @return <tt>true</tt> if this map contains an UpdateMsg
170   *         with the same CSN as the given UpdateMsg.
171   */
172  public boolean contains(UpdateMsg msg)
173  {
174    synchronized (lock)
175    {
176      return map.containsKey(msg.getCSN());
177    }
178  }
179
180  /** Removes all UpdateMsg form this queue. */
181  public void clear()
182  {
183    synchronized (lock)
184    {
185      map.clear();
186      bytesCount = 0;
187    }
188  }
189
190  /**
191   * Consumes all the messages in this queue up to and including the passed in
192   * message. If the passed in message is not contained in the current queue,
193   * then all messages will be removed from it.
194   *
195   * @param finalMsg
196   *          the final message to reach when consuming messages from this queue
197   */
198  public void consumeUpTo(UpdateMsg finalMsg)
199  {
200    // FIXME this code could be more efficient if the msgQueue could call the
201    // following code (to be tested):
202    // if (!map.containsKey(finalMsg.getCSN())) {
203    // map.clear();
204    // } else {
205    // map.headMap(finalMsg.getCSN(), true).clear();
206    // }
207
208    final CSN finalCSN = finalMsg.getCSN();
209    UpdateMsg msg;
210    do
211    {
212      msg = removeFirst();
213    }
214    while (!finalCSN.equals(msg.getCSN()));
215  }
216
217  @Override
218  public String toString()
219  {
220    return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue=" + map.values();
221  }
222}