001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2008-2009 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.protocol;
018
019import java.util.*;
020import java.util.zip.DataFormatException;
021
022import org.forgerock.opendj.io.ASN1;
023import org.forgerock.opendj.io.ASN1Reader;
024import org.forgerock.opendj.io.ASN1Writer;
025import org.opends.server.replication.common.AssuredMode;
026import org.opends.server.replication.common.ServerStatus;
027import org.forgerock.opendj.ldap.ByteSequenceReader;
028import org.forgerock.opendj.ldap.ByteString;
029import org.forgerock.opendj.ldap.ByteStringBuilder;
030import org.forgerock.util.Utils;
031
032/**
033 * This message is used by DS to confirm a RS he wants to connect to him (open
034 * a session):
035 * Handshake sequence between DS and RS is like this:
036 * DS --- ServerStartMsg ---> RS
037 * DS <--- ReplServerStartMsg --- RS
038 * DS --- StartSessionMsg ---> RS
039 * DS <--- TopologyMsg --- RS
040 *
041 * This message contains:
042 * - status: the status we are entering the topology with
043 * - referrals URLs: the referrals URLs we allow peer DSs to use to refer to
044 * our domain when needed.
045 */
046public class StartSessionMsg extends ReplicationMsg
047{
048  /** The list of referrals URLs to the sending DS. */
049  private final List<String> referralsURLs = new ArrayList<>();
050  /** The initial status the DS starts with. */
051  private ServerStatus status = ServerStatus.INVALID_STATUS;
052  /** Assured replication enabled on DS or not. */
053  private boolean assuredFlag;
054  /** DS assured mode (relevant if assured replication enabled). */
055  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
056  /** DS safe data level (relevant if assured mode is safe data). */
057  private byte safeDataLevel = 1;
058
059  private Set<String> eclIncludes = new HashSet<>();
060  private Set<String> eclIncludesForDeletes = new HashSet<>();
061
062  /**
063   * Creates a new StartSessionMsg message from its encoded form.
064   *
065   * @param in The byte array containing the encoded form of the message.
066   * @param version The protocol version to use to decode the msg.
067   * @throws java.util.zip.DataFormatException If the byte array does not
068   * contain a valid encoded form of the message.
069   */
070  StartSessionMsg(byte[] in, short version) throws DataFormatException
071  {
072    if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
073    {
074      decode_V23(in);
075    }
076    else
077    {
078      decode_V45(in, version);
079    }
080  }
081
082  /**
083   * Creates a new  message with the given required parameters.
084   * @param status Status we are starting with
085   * @param referralsURLs Referrals URLs to be used by peer DSs
086   * @param assuredFlag If assured mode is enabled or not
087   * @param assuredMode Assured type
088   * @param safeDataLevel Assured mode safe data level
089   */
090  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs,
091    boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel)
092  {
093    this.referralsURLs.addAll(referralsURLs);
094    this.status = status;
095    this.assuredFlag = assuredFlag;
096    this.assuredMode = assuredMode;
097    this.safeDataLevel = safeDataLevel;
098  }
099
100  // ============
101  // Msg encoding
102  // ============
103
104  /** {@inheritDoc} */
105  @Override
106  public byte[] getBytes(short protocolVersion)
107  {
108    if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
109    {
110      return getBytes_V23();
111    }
112    else
113    {
114      return getBytes_V45(protocolVersion);
115    }
116  }
117
118  private byte[] getBytes_V45(short version)
119  {
120    try
121    {
122      ByteStringBuilder byteBuilder = new ByteStringBuilder();
123      ASN1Writer writer = ASN1.getWriter(byteBuilder);
124
125      byteBuilder.appendByte(MSG_TYPE_START_SESSION);
126      byteBuilder.appendByte(status.getValue());
127      byteBuilder.appendByte(assuredFlag ? 1 : 0);
128      byteBuilder.appendByte(assuredMode.getValue());
129      byteBuilder.appendByte(safeDataLevel);
130
131      writer.writeStartSequence();
132      for (String url : referralsURLs)
133      {
134        writer.writeOctetString(url);
135      }
136      writer.writeEndSequence();
137
138      writer.writeStartSequence();
139      for (String attrDef : eclIncludes)
140      {
141        writer.writeOctetString(attrDef);
142      }
143      writer.writeEndSequence();
144
145      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
146      {
147        writer.writeStartSequence();
148        for (String attrDef : eclIncludesForDeletes)
149        {
150          writer.writeOctetString(attrDef);
151        }
152        writer.writeEndSequence();
153      }
154
155      return byteBuilder.toByteArray();
156    }
157    catch (Exception e)
158    {
159      throw new RuntimeException(e);
160    }
161  }
162
163  private byte[] getBytes_V23()
164  {
165    /*
166     * The message is stored in the form:
167     * <message type><status><assured flag><assured mode><safe data level>
168     * <list of referrals urls>
169     * (each referral url terminates with 0)
170     */
171    final ByteArrayBuilder builder = new ByteArrayBuilder();
172    builder.appendByte(MSG_TYPE_START_SESSION);
173    builder.appendByte(status.getValue());
174    builder.appendBoolean(assuredFlag);
175    builder.appendByte(assuredMode.getValue());
176    builder.appendByte(safeDataLevel);
177
178    if (referralsURLs.size() >= 1)
179    {
180      for (String url : referralsURLs)
181      {
182        builder.appendString(url);
183      }
184    }
185    return builder.toByteArray();
186  }
187
188  // ============
189  // Msg decoding
190  // ============
191
192  private void decode_V45(byte[] in, short version) throws DataFormatException
193  {
194    ByteSequenceReader reader = ByteString.wrap(in).asReader();
195    try
196    {
197      if (reader.readByte() != MSG_TYPE_START_SESSION)
198      {
199        throw new DataFormatException("input is not a valid "
200            + getClass().getCanonicalName());
201      }
202
203      /*
204      status = ServerStatus.valueOf(asn1Reader.readOctetString().byteAt(0));
205      assuredFlag = (asn1Reader.readOctetString().byteAt(0) == 1);
206      assuredMode=AssuredMode.valueOf((asn1Reader.readOctetString().byteAt(0)));
207      safeDataLevel = asn1Reader.readOctetString().byteAt(0);
208      */
209      status = ServerStatus.valueOf(reader.readByte());
210      assuredFlag = reader.readByte() == 1;
211      assuredMode = AssuredMode.valueOf(reader.readByte());
212      safeDataLevel = reader.readByte();
213
214      ASN1Reader asn1Reader = ASN1.getReader(reader);
215
216      asn1Reader.readStartSequence();
217      while(asn1Reader.hasNextElement())
218      {
219        String s = asn1Reader.readOctetStringAsString();
220        this.referralsURLs.add(s);
221      }
222      asn1Reader.readEndSequence();
223
224      asn1Reader.readStartSequence();
225      while(asn1Reader.hasNextElement())
226      {
227        String s = asn1Reader.readOctetStringAsString();
228        this.eclIncludes.add(s);
229      }
230      asn1Reader.readEndSequence();
231
232      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
233      {
234        asn1Reader.readStartSequence();
235        while (asn1Reader.hasNextElement())
236        {
237          this.eclIncludesForDeletes.add(asn1Reader.readOctetStringAsString());
238        }
239        asn1Reader.readEndSequence();
240      }
241      else
242      {
243        // Default to using the same set of attributes for deletes.
244        this.eclIncludesForDeletes.addAll(eclIncludes);
245      }
246    }
247    catch (Exception e)
248    {
249      throw new RuntimeException(e);
250    }
251  }
252
253  private void decode_V23(byte[] in) throws DataFormatException
254  {
255    /*
256     * The message is stored in the form:
257     * <message type><status><assured flag><assured mode><safe data level>
258     * <list of referrals urls>
259     * (each referral url terminates with 0)
260     */
261    final ByteArrayScanner scanner = new ByteArrayScanner(in);
262    final byte msgType = scanner.nextByte();
263    if (msgType != MSG_TYPE_START_SESSION)
264    {
265      throw new DataFormatException(
266          "Input is not a valid " + getClass().getCanonicalName());
267    }
268
269    status = ServerStatus.valueOf(scanner.nextByte());
270    assuredFlag = scanner.nextBoolean();
271    assuredMode = AssuredMode.valueOf(scanner.nextByte());
272    safeDataLevel = scanner.nextByte();
273
274    while (!scanner.isEmpty())
275    {
276      referralsURLs.add(scanner.nextString());
277    }
278  }
279
280  /**
281   * Get the list of referrals URLs.
282   *
283   * @return The list of referrals URLs.
284   */
285  public List<String> getReferralsURLs()
286  {
287    return referralsURLs;
288  }
289
290  /**
291   * Get the status from this message.
292   * @return The status.
293   */
294  public ServerStatus getStatus()
295  {
296    return status;
297  }
298
299  /** {@inheritDoc} */
300  @Override
301  public String toString()
302  {
303    String urls = Utils.joinAsString(" | ", referralsURLs);
304    return "StartSessionMsg content:\nstatus: " + status +
305      "\nassuredFlag: " + assuredFlag +
306      "\nassuredMode: " + assuredMode +
307      "\nsafeDataLevel: " + safeDataLevel +
308      "\nreferralsURLs: " + urls +
309      "\nEclIncludes " + eclIncludes +
310      "\nEclIncludeForDeletes: " + eclIncludesForDeletes;
311  }
312
313  /**
314   * Returns true if assured mode is enabled.
315   * @return true if assured mode is enabled.
316   */
317  public boolean isAssured()
318  {
319    return assuredFlag;
320  }
321
322  /**
323   * Get the assured mode.
324   * @return the assured mode.
325   */
326  public AssuredMode getAssuredMode()
327  {
328    return assuredMode;
329  }
330
331  /**
332   * Get the safe data level.
333   * @return the safe data level.
334   */
335  public byte getSafeDataLevel()
336  {
337    return safeDataLevel;
338  }
339
340  /**
341   * Set the attributes configured on a server to be included in the ECL.
342   *
343   * @param includeAttributes
344   *          attributes to be included with all change records.
345   * @param includeAttributesForDeletes
346   *          additional attributes to be included with delete change records.
347   */
348  public void setEclIncludes(
349      Set<String> includeAttributes,
350      Set<String> includeAttributesForDeletes)
351  {
352    if (includeAttributes != null)
353    {
354      eclIncludes = includeAttributes;
355    }
356
357    if (includeAttributesForDeletes != null)
358    {
359      eclIncludesForDeletes = includeAttributesForDeletes;
360    }
361  }
362
363  /**
364   * Get the attributes to include in each change for the ECL.
365   *
366   * @return The attributes to include in each change for the ECL.
367   */
368  public Set<String> getEclIncludes()
369  {
370    return eclIncludes;
371  }
372
373
374
375  /**
376   * Get the attributes to include in each delete change for the ECL.
377   *
378   * @return The attributes to include in each delete change for the ECL.
379   */
380  public Set<String> getEclIncludesForDeletes()
381  {
382    return eclIncludesForDeletes;
383  }
384
385}