001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2007-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.protocol;
018
019import java.util.*;
020import java.util.zip.DataFormatException;
021
022import org.opends.server.replication.common.AssuredMode;
023import org.opends.server.replication.common.DSInfo;
024import org.opends.server.replication.common.RSInfo;
025import org.opends.server.replication.common.ServerStatus;
026
027import static org.opends.server.replication.protocol.ProtocolVersion.*;
028
029/**
030 * This class defines a message that is sent:
031 * - By a RS to the other RSs in the topology, containing:
032 *   - the DSs directly connected to the RS in the DS infos
033 *   - only this RS in the RS infos
034 * - By a RS to his connected DSs, containing every DSs and RSs he knows.
035 * In that case the message contains:
036 *   - every DSs the RS knows except the destinator DS in the DS infos
037 *   - every connected RSs (including the sending RS) in the RS infos
038 *
039 * Exchanging these messages allows to have each RS or DS take
040 * appropriate decisions according to the current topology:
041 * - a RS can route a message to a DS
042 * - a DS can decide towards which peer DS send referrals
043 * ...
044 */
045public class TopologyMsg extends ReplicationMsg
046{
047  /** Information for the DSs (aka replicas) known in the topology. */
048  private final Map<Integer, DSInfo> replicaInfos;
049  /** Information for the RSs known in the topology. */
050  private final List<RSInfo> rsInfos;
051
052  /**
053   * Creates a new changelogInfo message from its encoded form.
054   *
055   * @param in The byte array containing the encoded form of the message.
056   * @param version The protocol version to use to decode the msg.
057   * @throws java.util.zip.DataFormatException If the byte array does not
058   * contain a valid encoded form of the message.
059   */
060  TopologyMsg(byte[] in, short version) throws DataFormatException
061  {
062    final ByteArrayScanner scanner = new ByteArrayScanner(in);
063    final byte msgType = scanner.nextByte();
064    if (msgType != MSG_TYPE_TOPOLOGY)
065    {
066      throw new DataFormatException("Input is not a valid "
067          + getClass().getCanonicalName());
068    }
069
070    // Read the DS info entries, first read number of them
071    int nDsInfo = scanner.nextByte();
072    final Map<Integer, DSInfo> replicaInfos = new HashMap<>(Math.max(0, nDsInfo));
073    while (nDsInfo > 0 && !scanner.isEmpty())
074    {
075      final DSInfo dsInfo = nextDSInfo(scanner, version);
076      replicaInfos.put(dsInfo.getDsId(), dsInfo);
077      nDsInfo--;
078    }
079
080    // Read the RS info entries
081    int nRsInfo = scanner.nextByte();
082    final List<RSInfo> rsInfos = new ArrayList<>(Math.max(0, nRsInfo));
083    while (nRsInfo > 0 && !scanner.isEmpty())
084    {
085      rsInfos.add(nextRSInfo(scanner, version));
086      nRsInfo--;
087    }
088
089    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
090    this.rsInfos = Collections.unmodifiableList(rsInfos);
091  }
092
093  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
094      throws DataFormatException
095  {
096    final int dsId = scanner.nextIntUTF8();
097    final String dsUrl =
098        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
099    final int rsId = scanner.nextIntUTF8();
100    final long generationId = scanner.nextLongUTF8();
101    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
102    final boolean assuredFlag = scanner.nextBoolean();
103    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
104    final byte safeDataLevel = scanner.nextByte();
105    final byte groupId = scanner.nextByte();
106
107    final List<String> refUrls = new ArrayList<>();
108    scanner.nextStrings(refUrls);
109
110    final Set<String> attrs = new HashSet<>();
111    final Set<String> delattrs = new HashSet<>();
112    short protocolVersion = -1;
113    if (version >= REPLICATION_PROTOCOL_V4)
114    {
115      scanner.nextStrings(attrs);
116
117      if (version >= REPLICATION_PROTOCOL_V5)
118      {
119        scanner.nextStrings(delattrs);
120      }
121      else
122      {
123        // Default to using the same set of attributes for deletes.
124        delattrs.addAll(attrs);
125      }
126
127      protocolVersion = scanner.nextByte();
128    }
129
130    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
131        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
132        protocolVersion);
133  }
134
135  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
136      throws DataFormatException
137  {
138    final int rsId = scanner.nextIntUTF8();
139    final long generationId = scanner.nextLongUTF8();
140    final byte groupId = scanner.nextByte();
141
142    int weight = 1;
143    String serverUrl = null;
144    if (version >= REPLICATION_PROTOCOL_V4)
145    {
146      serverUrl = scanner.nextString();
147      weight = scanner.nextIntUTF8();
148    }
149
150    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
151  }
152
153  /**
154   * Creates a new message of the currently connected servers.
155   *
156   * @param dsInfos The collection of currently connected DS servers ID.
157   * @param rsInfos The list of currently connected RS servers ID.
158   */
159  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
160  {
161    if (dsInfos == null || dsInfos.isEmpty())
162    {
163      this.replicaInfos = Collections.emptyMap();
164    }
165    else
166    {
167      Map<Integer, DSInfo> replicas = new HashMap<>();
168      for (DSInfo dsInfo : dsInfos)
169      {
170        replicas.put(dsInfo.getDsId(), dsInfo);
171      }
172      this.replicaInfos = Collections.unmodifiableMap(replicas);
173    }
174
175    if (rsInfos == null || rsInfos.isEmpty())
176    {
177      this.rsInfos = Collections.emptyList();
178    }
179    else
180    {
181      this.rsInfos =
182          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
183    }
184  }
185
186  // ============
187  // Msg encoding
188  // ============
189
190  /** {@inheritDoc} */
191  @Override
192  public byte[] getBytes(short version)
193  {
194    /**
195     * Message has the following form:
196     * <pdu type><number of following DSInfo entries>[<DSInfo>]*
197     * <number of following RSInfo entries>[<RSInfo>]*
198     */
199    final ByteArrayBuilder builder = new ByteArrayBuilder();
200    builder.appendByte(MSG_TYPE_TOPOLOGY);
201
202    // Put DS infos
203    builder.appendByte(replicaInfos.size());
204    for (DSInfo dsInfo : replicaInfos.values())
205    {
206      builder.appendIntUTF8(dsInfo.getDsId());
207      if (version >= REPLICATION_PROTOCOL_V6)
208      {
209        builder.appendString(dsInfo.getDsUrl());
210      }
211      builder.appendIntUTF8(dsInfo.getRsId());
212      builder.appendLongUTF8(dsInfo.getGenerationId());
213      builder.appendByte(dsInfo.getStatus().getValue());
214      builder.appendBoolean(dsInfo.isAssured());
215      builder.appendByte(dsInfo.getAssuredMode().getValue());
216      builder.appendByte(dsInfo.getSafeDataLevel());
217      builder.appendByte(dsInfo.getGroupId());
218
219      builder.appendStrings(dsInfo.getRefUrls());
220
221      if (version >= REPLICATION_PROTOCOL_V4)
222      {
223        builder.appendStrings(dsInfo.getEclIncludes());
224        if (version >= REPLICATION_PROTOCOL_V5)
225        {
226          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
227        }
228        builder.appendByte(dsInfo.getProtocolVersion());
229      }
230    }
231
232    // Put RS infos
233    builder.appendByte(rsInfos.size());
234    for (RSInfo rsInfo : rsInfos)
235    {
236      builder.appendIntUTF8(rsInfo.getId());
237      builder.appendLongUTF8(rsInfo.getGenerationId());
238      builder.appendByte(rsInfo.getGroupId());
239
240      if (version >= REPLICATION_PROTOCOL_V4)
241      {
242        builder.appendString(rsInfo.getServerUrl());
243        builder.appendIntUTF8(rsInfo.getWeight());
244      }
245    }
246
247    return builder.toByteArray();
248  }
249
250  /** {@inheritDoc} */
251  @Override
252  public String toString()
253  {
254    String dsStr = "";
255    for (DSInfo dsInfo : replicaInfos.values())
256    {
257      dsStr += dsInfo + "\n----------------------------\n";
258    }
259
260    String rsStr = "";
261    for (RSInfo rsInfo : rsInfos)
262    {
263      rsStr += rsInfo + "\n----------------------------\n";
264    }
265
266    return "TopologyMsg content:"
267      + "\n----------------------------"
268      + "\nCONNECTED DS SERVERS:"
269      + "\n--------------------\n"
270      + dsStr
271      + "CONNECTED RS SERVERS:"
272      + "\n--------------------\n"
273      + rsStr
274      + ("".equals(rsStr) ? "----------------------------\n" : "");
275  }
276
277  /**
278   * Get the DS infos.
279   *
280   * @return The DS infos
281   */
282  public Map<Integer, DSInfo> getReplicaInfos()
283  {
284    return replicaInfos;
285  }
286
287  /**
288   * Get the RS infos.
289   *
290   * @return The RS infos
291   */
292  public List<RSInfo> getRsInfos()
293  {
294    return rsInfos;
295  }
296}