001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2009 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.protocol;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.zip.DataFormatException;
025
026import org.forgerock.opendj.io.ASN1Reader;
027import org.forgerock.opendj.io.ASN1Writer;
028import org.opends.server.replication.common.CSN;
029import org.opends.server.replication.common.ServerState;
030
031/**
032 * This message is part of the replication protocol.
033 * RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring
034 * information.
035 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
036 * MonitorMsg.
037 */
038public class MonitorMsg extends ReplicationMsg
039{
040  /**
041   * The destination server or servers of this message.
042   */
043  private final int destination;
044
045  /**
046   * The serverID of the server that sends this message.
047   */
048  private final int senderID;
049
050
051
052  /**
053   * Data structure to manage the state and the approximation of the data of the
054   * first missing change for each LDAP server connected to a Replication
055   * Server.
056   */
057  private static class ServerData
058  {
059    private ServerState state;
060    private long approxFirstMissingDate;
061  }
062
063  /**
064   * Data structure to manage the state of this replication server
065   * and the state information for the servers connected to it.
066   */
067  private static class SubTopoMonitorData
068  {
069    /** This replication server DbState. */
070    private ServerState replServerDbState;
071    /** The data related to the LDAP servers connected to this RS. */
072    private final Map<Integer, ServerData> ldapStates = new HashMap<>();
073    /** The data related to the RS servers connected to this RS. */
074    private final Map<Integer, ServerData> rsStates = new HashMap<>();
075  }
076
077  private final SubTopoMonitorData data = new SubTopoMonitorData();
078
079  /**
080   * Creates a new MonitorMsg.
081   *
082   * @param sender The sender of this message.
083   * @param destination The destination of this message.
084   */
085  public MonitorMsg(int sender, int destination)
086  {
087    this.senderID = sender;
088    this.destination = destination;
089  }
090
091  /**
092   * Sets the state of the replication server.
093   * @param state The state.
094   */
095  public void setReplServerDbState(ServerState state)
096  {
097    data.replServerDbState = state;
098  }
099
100  /**
101   * Sets the information of an LDAP server.
102   * @param serverId The serverID.
103   * @param state The server state.
104   * @param approxFirstMissingDate  The approximation of the date
105   * of the older missing change. null when none.
106   * @param isLDAPServer Specifies whether the server is a DS or a RS
107   */
108  public void setServerState(int serverId, ServerState state,
109      long approxFirstMissingDate, boolean isLDAPServer)
110  {
111    final ServerData sd = new ServerData();
112    sd.state = state;
113    sd.approxFirstMissingDate = approxFirstMissingDate;
114    if (isLDAPServer)
115    {
116      data.ldapStates.put(serverId, sd);
117    }
118    else
119    {
120      data.rsStates.put(serverId, sd);
121    }
122  }
123
124  /**
125   * Get the server state for the LDAP server with the provided serverId.
126   * @param serverId The provided serverId.
127   * @return The state.
128   */
129  public ServerState getLDAPServerState(int serverId)
130  {
131    return data.ldapStates.get(serverId).state;
132  }
133
134  /**
135   * Get the server state for the RS server with the provided serverId.
136   * @param serverId The provided serverId.
137   * @return The state.
138   */
139  public ServerState getRSServerState(int serverId)
140  {
141    return data.rsStates.get(serverId).state;
142  }
143
144  /**
145   * Get the approximation of the date of the older missing change for the
146   * LDAP Server with the provided server Id.
147   * @param serverId The provided serverId.
148   * @return The approximated state.
149   */
150  public long getLDAPApproxFirstMissingDate(int serverId)
151  {
152    return data.ldapStates.get(serverId).approxFirstMissingDate;
153  }
154
155  /**
156   * Get the approximation of the date of the older missing change for the
157   * RS Server with the provided server Id.
158   * @param serverId The provided serverId.
159   * @return The approximated state.
160   */
161  public long getRSApproxFirstMissingDate(int serverId)
162  {
163    return data.rsStates.get(serverId).approxFirstMissingDate;
164  }
165
166  /**
167   * Creates a new EntryMessage from its encoded form.
168   *
169   * @param in       The byte array containing the encoded form of the message.
170   * @param version  The version of the protocol to use to decode the msg.
171   * @throws DataFormatException If the byte array does not contain a valid
172   *                             encoded form of the ServerStartMessage.
173   */
174  MonitorMsg(byte[] in, short version) throws DataFormatException
175  {
176    final ByteArrayScanner scanner = new ByteArrayScanner(in);
177    if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
178    {
179      throw new DataFormatException("input is not a valid "
180          + getClass().getCanonicalName());
181    }
182
183    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
184    {
185      this.senderID = scanner.nextIntUTF8();
186      this.destination = scanner.nextIntUTF8();
187    }
188    else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
189    {
190      this.senderID = scanner.nextShort();
191      this.destination = scanner.nextShort();
192    }
193    else
194    {
195      this.senderID = scanner.nextInt();
196      this.destination = scanner.nextInt();
197    }
198
199    ASN1Reader asn1Reader = scanner.getASN1Reader();
200    try
201    {
202      asn1Reader.readStartSequence();
203      // loop on the servers
204      while(asn1Reader.hasNextElement())
205      {
206        ServerState newState = new ServerState();
207        int serverId = 0;
208        long outime = 0;
209        boolean isLDAPServer = false;
210
211        asn1Reader.readStartSequence();
212        // loop on the list of CSN of the state
213        while(asn1Reader.hasNextElement())
214        {
215          CSN csn;
216          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
217          {
218            csn = CSN.valueOf(asn1Reader.readOctetString());
219          }
220          else
221          {
222            csn = CSN.valueOf(asn1Reader.readOctetStringAsString());
223          }
224
225          if (data.replServerDbState != null && serverId == 0)
226          {
227            // we are on the first CSN that is a fake CSN to store the serverId
228            // and the older update time
229            serverId = csn.getServerId();
230            outime = csn.getTime();
231            isLDAPServer = csn.getSeqnum() > 0;
232          }
233          else
234          {
235            // we are on a normal CSN
236            newState.update(csn);
237          }
238        }
239        asn1Reader.readEndSequence();
240
241        if (data.replServerDbState == null)
242        {
243          // the first state is the replication state
244          data.replServerDbState = newState;
245        }
246        else
247        {
248          // the next states are the server states
249          setServerState(serverId, newState, outime, isLDAPServer);
250        }
251      }
252      asn1Reader.readEndSequence();
253    } catch(Exception e)
254    { /* do nothing */
255    }
256  }
257
258  /** {@inheritDoc} */
259  @Override
260  public byte[] getBytes(short protocolVersion)
261  {
262    try
263    {
264      final ByteArrayBuilder builder = new ByteArrayBuilder();
265      builder.appendByte(MSG_TYPE_REPL_SERVER_MONITOR);
266      append(builder, senderID, protocolVersion);
267      append(builder, destination, protocolVersion);
268
269      /* Put the serverStates ... */
270      ASN1Writer writer = builder.getASN1Writer();
271      writer.writeStartSequence();
272      {
273        /* first put the Replication Server state */
274        writer.writeStartSequence();
275        {
276          data.replServerDbState.writeTo(writer, protocolVersion);
277        }
278        writer.writeEndSequence();
279
280        // then the DS + RS server data
281        writeServerStates(protocolVersion, writer, false /* DS */);
282        writeServerStates(protocolVersion, writer, true /* RS */);
283      }
284      writer.writeEndSequence();
285
286      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
287      {
288        // legacy coding mistake
289        builder.appendByte(0);
290      }
291      return builder.toByteArray();
292    }
293    catch (Exception e)
294    {
295      return null;
296    }
297  }
298
299  private void append(final ByteArrayBuilder builder, int data,
300      short protocolVersion)
301  {
302    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
303    {
304      builder.appendIntUTF8(data);
305    }
306    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
307    {
308      builder.appendShort(data);
309    }
310    else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
311    {
312      builder.appendInt(data);
313    }
314  }
315
316  private void writeServerStates(short protocolVersion, ASN1Writer writer,
317      boolean writeRSStates) throws IOException
318  {
319    final Map<Integer, ServerData> servers =
320        writeRSStates ? data.rsStates : data.ldapStates;
321    final int seqNum = writeRSStates ? 0 : 1;
322    for (Map.Entry<Integer, ServerData> server : servers.entrySet())
323    {
324      writer.writeStartSequence();
325      {
326        /*
327         * A fake CSN helps storing the LDAP server ID. The sequence number will
328         * be used to differentiate between an LDAP server (1) or an RS (0).
329         */
330        CSN csn = new CSN(
331            server.getValue().approxFirstMissingDate, seqNum,
332            server.getKey());
333        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
334        {
335          writer.writeOctetString(csn.toByteString());
336        }
337        else
338        {
339          writer.writeOctetString(csn.toString());
340        }
341
342        // the CSNs that make the state
343        server.getValue().state.writeTo(writer, protocolVersion);
344      }
345      writer.writeEndSequence();
346    }
347  }
348
349  /**
350   * Get the state of the replication server that sent this message.
351   * @return The state.
352   */
353  public ServerState getReplServerDbState()
354  {
355    return data.replServerDbState;
356  }
357
358  /**
359   * Returns an iterator on the serverId of the connected LDAP servers.
360   * @return The iterator.
361   */
362  public Iterator<Integer> ldapIterator()
363  {
364    return data.ldapStates.keySet().iterator();
365  }
366
367  /**
368   * Returns an iterator on the serverId of the connected RS servers.
369   * @return The iterator.
370   */
371  public Iterator<Integer> rsIterator()
372  {
373    return data.rsStates.keySet().iterator();
374  }
375
376  /**
377   * Get the destination.
378   *
379   * @return the destination
380   */
381  public int getDestination()
382  {
383    return destination;
384  }
385
386  /**
387   * Get the server ID of the server that sent this message.
388   *
389   * @return the server id
390   */
391  public int getSenderID()
392  {
393    return senderID;
394  }
395
396  /** {@inheritDoc} */
397  @Override
398  public String toString()
399  {
400    final StringBuilder stateS = new StringBuilder("\nRState:[");
401    stateS.append(data.replServerDbState);
402    stateS.append("]");
403
404    stateS.append("\nLDAPStates:[");
405    for (Entry<Integer, ServerData> entry : data.ldapStates.entrySet())
406    {
407      ServerData sd = entry.getValue();
408      stateS.append("\n[LSstate(").append(entry.getKey()).append(")=")
409            .append(sd.state).append("]").append(" afmd=")
410            .append(sd.approxFirstMissingDate).append("]");
411    }
412
413    stateS.append("\nRSStates:[");
414    for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
415    {
416      final ServerData sd = entry.getValue();
417      stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
418            .append(sd.state).append("]").append(" afmd=")
419            .append(sd.approxFirstMissingDate).append("]");
420    }
421    return getClass().getCanonicalName() +
422    "[ sender=" + this.senderID +
423    " destination=" + this.destination +
424    " data=[" + stateS + "]" +
425    "]";
426  }
427}