001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2015-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.plugin;
018
019
020import java.io.IOException;
021import java.io.OutputStream;
022
023import org.opends.server.util.ServerConstants;
024
025/**
026 * This class creates an output stream that can be used to export entries
027 * to a synchronization domain.
028 */
029public class ReplLDIFOutputStream
030       extends OutputStream
031{
032  /** The number of entries to be exported. */
033  private final long numEntries;
034
035  /** The current number of entries exported. */
036  private long numExportedEntries;
037  private String entryBuffer = "";
038
039  /** The checksum for computing the generation id. */
040  private final GenerationIdChecksum checkSum = new GenerationIdChecksum();
041
042  /**
043   * Creates a new ReplLDIFOutputStream related to a replication
044   * domain.
045   *
046   * @param numEntries The max number of entry to process.
047   */
048  public ReplLDIFOutputStream(long numEntries)
049  {
050    this.numEntries = numEntries;
051  }
052
053  @Override
054  public void write(int i) throws IOException
055  {
056    throw new IOException("Invalid call");
057  }
058
059  /**
060   * Get the value of the underlying checksum.
061   * @return The value of the underlying checksum
062   */
063  public long getChecksumValue()
064  {
065    return checkSum.getValue();
066  }
067
068  @Override
069  public void write(byte b[], int off, int len) throws IOException
070  {
071    String ebytes = entryBuffer;
072    entryBuffer = "";
073
074    ebytes = ebytes + new String(b, off, len);
075    int endIndex = ebytes.length();
076
077    while (true)
078    {
079      // if we have the bytes for an entry, let's make an entry and send it
080      int endOfEntryIndex = ebytes.indexOf(ServerConstants.EOL + ServerConstants.EOL);
081      if (endOfEntryIndex < 0)
082      {
083        // a next call to us will provide more bytes to make an entry
084        entryBuffer = entryBuffer.concat(ebytes);
085        break;
086      }
087
088      endOfEntryIndex += 2;
089      entryBuffer = ebytes.substring(0, endOfEntryIndex);
090
091      // Send the entry
092      if (numEntries > 0 && getNumExportedEntries() > numEntries)
093      {
094        // This outputstream has reached the total number
095        // of entries to export.
096        throw new IOException();
097      }
098
099      // Add the entry bytes to the checksum
100      byte[] entryBytes = entryBuffer.getBytes();
101      checkSum.update(entryBytes, 0, entryBytes.length);
102
103      numExportedEntries++;
104      entryBuffer = "";
105
106      if (endIndex == endOfEntryIndex)
107      {
108        // no more data to process
109        break;
110      }
111      // loop to the data of the next entry
112      ebytes = ebytes.substring(endOfEntryIndex, endIndex);
113      endIndex = ebytes.length();
114    }
115  }
116
117  /**
118   * Return the number of exported entries.
119   * @return the numExportedEntries
120   */
121  public long getNumExportedEntries() {
122    return numExportedEntries;
123  }
124}