001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2009 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2014 ForgeRock AS.
016 */
017package org.opends.server.replication.common;
018
019import org.opends.server.util.TimeThread;
020
021/**
022 * This class defines a structure that is used for storing the last {@link CSN}s
023 * generated on this server or received from other servers and generating new
024 * {@link CSN}s that are guaranteed to be larger than all the previously seen or
025 * generated CSNs.
026 */
027public class CSNGenerator
028{
029  private long lastTime;
030  /**
031   * The sequence number allows to distinguish changes that have been done in
032   * the same millisecond.
033   *
034   * @see #lastTime
035   */
036  private int seqnum;
037  private final int serverId;
038
039  /**
040   * Create a new {@link CSNGenerator}.
041   *
042   * @param serverId
043   *          id to use when creating {@link CSN}s.
044   * @param timestamp
045   *          time to start with.
046   */
047  public CSNGenerator(int serverId, long timestamp)
048  {
049    this.lastTime = timestamp;
050    this.serverId = serverId;
051    this.seqnum = 0;
052  }
053
054  /**
055  * Create a new {@link CSNGenerator}.
056  *
057  * @param serverId serverId to use when creating {@link CSN}s.
058  * @param state This generator will be created in a way that makes sure that
059  *              all {@link CSN}s generated will be larger than all the
060  *              {@link CSN}s currently in state.
061  */
062  public CSNGenerator(int serverId, ServerState state)
063  {
064    this.lastTime = TimeThread.getTime();
065    for (CSN csn : state)
066    {
067      if (this.lastTime < csn.getTime())
068      {
069        this.lastTime = csn.getTime();
070      }
071      if (csn.getServerId() == serverId)
072      {
073        this.seqnum = csn.getSeqnum();
074      }
075    }
076    this.serverId = serverId;
077  }
078
079  /**
080   * Generate a new {@link CSN}.
081   *
082   * @return the generated {@link CSN}
083   */
084  public CSN newCSN()
085  {
086    long curTime = TimeThread.getTime();
087    int mySeqnum;
088    long myTime;
089
090    synchronized(this)
091    {
092      if (curTime > lastTime)
093      {
094        lastTime = curTime;
095      }
096
097      if (++seqnum <= 0) // check no underflow happened
098      {
099        seqnum = 0;
100        lastTime++;
101      }
102      mySeqnum = seqnum;
103      myTime = lastTime;
104    }
105
106    return new CSN(myTime, mySeqnum, serverId);
107  }
108
109  /**
110   * Adjust the lastTime of this {@link CSNGenerator} with a {@link CSN} that we
111   * have received from another server.
112   * <p>
113   * This is necessary because we need that the {@link CSN} generated after
114   * processing an update received from other hosts to be larger than the
115   * received {@link CSN}
116   *
117   * @param number
118   *          the {@link CSN} to adjust with
119   */
120  public void adjust(CSN number)
121  {
122    if (number==null)
123    {
124      synchronized(this)
125      {
126        lastTime = TimeThread.getTime();
127        seqnum = 0;
128      }
129      return;
130    }
131
132    long rcvdTime = number.getTime();
133
134    int changeServerId = number.getServerId();
135    int changeSeqNum = number.getSeqnum();
136
137    /*
138     * need to synchronize with newCSN method so that we protect writing
139     * lastTime fields
140     */
141    synchronized(this)
142    {
143      if (lastTime <= rcvdTime)
144      {
145        lastTime = ++rcvdTime;
146      }
147
148      if (serverId == changeServerId && seqnum < changeSeqNum)
149      {
150        seqnum = changeSeqNum;
151      }
152    }
153  }
154
155  /**
156   * Adjust utility method that takes ServerState as a parameter.
157   * @param state the ServerState to adjust with
158   */
159  public void adjust(ServerState state)
160  {
161    for (CSN csn : state)
162    {
163      adjust(csn);
164    }
165  }
166}