001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2009 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import java.net.SocketException;
020
021import org.forgerock.i18n.LocalizableMessage;
022import org.opends.server.api.DirectoryThread;
023import org.forgerock.i18n.slf4j.LocalizedLogger;
024import org.opends.server.replication.common.ServerStatus;
025import org.opends.server.replication.protocol.ReplicaOfflineMsg;
026import org.opends.server.replication.protocol.Session;
027import org.opends.server.replication.protocol.UpdateMsg;
028import org.opends.server.replication.service.DSRSShutdownSync;
029
030import static org.opends.messages.ReplicationMessages.*;
031import static org.opends.server.replication.common.ServerStatus.*;
032import static org.opends.server.util.StaticUtils.*;
033
034/**
035 * This class defines a server writer, which is used to send changes to a
036 * directory server.
037 */
038public class ServerWriter extends DirectoryThread
039{
040  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
041
042  private final Session session;
043  private final ServerHandler handler;
044  private final ReplicationServerDomain replicationServerDomain;
045  private final DSRSShutdownSync dsrsShutdownSync;
046
047  /**
048   * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
049   * for new updates and forward them to the server
050   *
051   * @param session
052   *          the Session that will be used to send updates.
053   * @param handler
054   *          handler for which the ServerWriter is created.
055   * @param replicationServerDomain
056   *          The ReplicationServerDomain of this ServerWriter.
057   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
058   */
059  public ServerWriter(Session session, ServerHandler handler,
060      ReplicationServerDomain replicationServerDomain,
061      DSRSShutdownSync dsrsShutdownSync)
062  {
063    // Session may be null for ECLServerWriter.
064    super("Replication server RS(" + handler.getReplicationServerId()
065        + ") writing to " + handler + " at "
066        + (session != null ? session.getReadableRemoteAddress() : "unknown"));
067
068    this.session = session;
069    this.handler = handler;
070    this.replicationServerDomain = replicationServerDomain;
071    this.dsrsShutdownSync = dsrsShutdownSync;
072  }
073
074  /**
075   * Run method for the ServerWriter.
076   * Loops waiting for changes from the ReplicationServerDomain and forward them
077   * to the other servers
078   */
079  @Override
080  public void run()
081  {
082    if (logger.isTraceEnabled())
083    {
084      logger.trace(getName() + " starting");
085    }
086
087    LocalizableMessage errMessage = null;
088    try
089    {
090      boolean shutdown = false;
091      while (!shutdown
092          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
093      {
094        final UpdateMsg updateMsg = this.handler.take();
095        if (updateMsg == null)
096        {
097          // this connection is closing
098          errMessage = LocalizableMessage.raw(
099           "Connection closure: null update returned by domain.");
100          shutdown = true;
101        }
102        else if (!isUpdateMsgFiltered(updateMsg))
103        {
104          // Publish the update to the remote server using a protocol version it supports
105          session.publish(updateMsg);
106          if (updateMsg instanceof ReplicaOfflineMsg)
107          {
108            dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
109          }
110        }
111      }
112    }
113    catch (SocketException e)
114    {
115      /*
116       * The remote host has disconnected and this particular Tree is going to
117       * be removed, just ignore the exception and let the thread die as well
118       */
119      errMessage = handler.getBadlyDisconnectedErrorMessage();
120      logger.error(errMessage);
121    }
122    catch (Exception e)
123    {
124      /*
125       * An unexpected error happened.
126       * Log an error and close the connection.
127       */
128      errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler +
129                        " " +  stackTraceToSingleLineString(e));
130      logger.error(errMessage);
131    }
132    finally {
133      session.close();
134      replicationServerDomain.stopServer(handler, false);
135      if (logger.isTraceEnabled())
136      {
137        logger.trace(getName() + " stopped " + errMessage);
138      }
139    }
140  }
141
142  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
143  {
144    if (handler.isDataServer())
145    {
146      /**
147       * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
148       *
149       * The RSD lock should not be taken here as it is acceptable to have a delay
150       * between the time the server has a wrong status and the fact we detect it:
151       * the updates that succeed to pass during this time will have no impact on remote server.
152       * But it is interesting to not saturate uselessly the network
153       * if the updates are not necessary so this check to stop sending updates is interesting anyway.
154       * Not taking the RSD lock allows to have better performances in normal mode (most of the time).
155       */
156      final ServerStatus dsStatus = handler.getStatus();
157      if (dsStatus == BAD_GEN_ID_STATUS)
158      {
159        logger.warn(WARN_IGNORING_UPDATE_TO_DS_BADGENID, handler.getReplicationServerId(),
160            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
161            session.getReadableRemoteAddress(),
162            handler.getGenerationId(),
163            replicationServerDomain.getGenerationId());
164        return true;
165      }
166      else if (dsStatus == FULL_UPDATE_STATUS)
167      {
168        logger.warn(WARN_IGNORING_UPDATE_TO_DS_FULLUP, handler.getReplicationServerId(),
169            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
170            session.getReadableRemoteAddress());
171        return true;
172      }
173    }
174    else
175    {
176      /**
177       * Ignore updates to RS with bad gen id
178       * (no system managed status for a RS)
179       */
180      final long referenceGenerationId = replicationServerDomain.getGenerationId();
181      if (referenceGenerationId != handler.getGenerationId()
182          || referenceGenerationId == -1 || handler.getGenerationId() == -1)
183      {
184        logger.error(WARN_IGNORING_UPDATE_TO_RS,
185            handler.getReplicationServerId(),
186            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
187            session.getReadableRemoteAddress(),
188            handler.getGenerationId(),
189            referenceGenerationId);
190        return true;
191      }
192    }
193    return false;
194  }
195}