001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import java.net.SocketException;
020
021import org.forgerock.i18n.LocalizableMessage;
022import org.opends.server.api.DirectoryThread;
023import org.forgerock.i18n.slf4j.LocalizedLogger;
024import org.opends.server.replication.common.ServerStatus;
025import org.opends.server.replication.protocol.*;
026
027import static org.opends.messages.ReplicationMessages.*;
028import static org.opends.server.replication.common.ServerStatus.*;
029import static org.opends.server.util.StaticUtils.*;
030
031/**
032 * This class implement the part of the replicationServer that is reading
033 * the connection from the LDAP servers to get all the updates that
034 * were done on this replica and forward them to other servers.
035 *
036 * A single thread is dedicated to this work.
037 * It waits in a blocking mode on the connection from the LDAP server
038 * and upon receiving an update puts in into the replicationServer cache
039 * from where the other servers will grab it.
040 */
041public class ServerReader extends DirectoryThread
042{
043  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
044  private final Session session;
045  private final ServerHandler handler;
046
047  /**
048   * Constructor for the LDAP server reader part of the replicationServer.
049   *
050   * @param session
051   *          The Session from which to read the data.
052   * @param handler
053   *          The server handler for this server reader.
054   */
055  public ServerReader(Session session, ServerHandler handler)
056  {
057    super("Replication server RS(" + handler.getReplicationServerId()
058        + ") reading from " + handler + " at "
059        + session.getReadableRemoteAddress());
060    this.session = session;
061    this.handler = handler;
062  }
063
064  /**
065   * Create a loop that reads changes and hands them off to be processed.
066   */
067  @Override
068  public void run()
069  {
070    if (logger.isTraceEnabled())
071    {
072      logger.trace(getName() + " starting");
073    }
074    /*
075     * wait on input stream
076     * grab all incoming messages and publish them to the
077     * replicationServerDomain
078     */
079    LocalizableMessage errMessage = null;
080    try
081    {
082      while (true)
083      {
084        try
085        {
086          final ReplicationMsg msg = session.receive();
087
088          if (logger.isTraceEnabled())
089          {
090            logger.trace("In " + getName() + " receives " + msg);
091          }
092
093          if (msg instanceof AckMsg)
094          {
095            handler.checkWindow();
096            handler.processAck((AckMsg) msg);
097          }
098          else if (msg instanceof UpdateMsg)
099          {
100            final UpdateMsg updateMsg = (UpdateMsg) msg;
101            if (!isUpdateMsgFiltered(updateMsg))
102            {
103              handler.put(updateMsg);
104            }
105          }
106          else if (msg instanceof WindowMsg)
107          {
108            handler.updateWindow((WindowMsg) msg);
109          }
110          else if (msg instanceof MonitorRequestMsg)
111          {
112            handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
113          }
114          else if (msg instanceof MonitorMsg)
115          {
116            handler.processMonitorMsg((MonitorMsg) msg);
117          }
118          else if (msg instanceof RoutableMsg)
119          {
120            /*
121             * Note that we handle monitor messages separately since they in
122             * fact never need "routing" and are instead sent directly between
123             * connected peers. Doing so allows us to more clearly decouple
124             * write IO from the reader thread (see OPENDJ-1354).
125             */
126            handler.process((RoutableMsg) msg);
127          }
128          else if (msg instanceof ResetGenerationIdMsg)
129          {
130            handler.processResetGenId((ResetGenerationIdMsg) msg);
131          }
132          else if (msg instanceof WindowProbeMsg)
133          {
134            handler.replyToWindowProbe();
135          }
136          else if (msg instanceof TopologyMsg)
137          {
138            ReplicationServerHandler rsh = (ReplicationServerHandler) handler;
139            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
140          }
141          else if (msg instanceof ChangeStatusMsg)
142          {
143            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
144            try
145            {
146              DataServerHandler dsh = (DataServerHandler) handler;
147              dsh.receiveNewStatus(csMsg);
148            }
149            catch (Exception e)
150            {
151              errMessage = ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
152                  handler.getBaseDN(), handler.getServerId(), csMsg);
153              logger.error(errMessage);
154            }
155          }
156          else if (msg instanceof ChangeTimeHeartbeatMsg)
157          {
158            handler.process((ChangeTimeHeartbeatMsg) msg);
159          }
160          else if (msg instanceof StopMsg)
161          {
162            /*
163             * Peer server is properly disconnecting: go out of here to properly
164             * close the server handler going to finally block.
165             */
166            if (logger.isTraceEnabled())
167            {
168              logger.trace(handler
169                  + " has properly disconnected from this replication server "
170                  + handler.getReplicationServerId());
171            }
172            return;
173          }
174          else if (msg == null)
175          {
176            /*
177             * The remote server has sent an unknown message, close the
178             * connection.
179             */
180            errMessage = NOTE_READER_NULL_MSG.get(handler);
181            logger.info(errMessage);
182            return;
183          }
184        }
185        catch (NotSupportedOldVersionPDUException e)
186        {
187          /*
188           * Received a V1 PDU we do not need to support: we just trash the
189           * message and log the event for debug purpose, then continue
190           * receiving messages.
191           */
192          logException(e);
193        }
194      }
195    }
196    catch (SocketException e)
197    {
198      /*
199       * The connection has been broken Log a message and exit from this loop So
200       * that this handler is stopped.
201       */
202      logException(e);
203      if (!handler.shuttingDown())
204      {
205        errMessage = handler.getBadlyDisconnectedErrorMessage();
206        logger.error(errMessage);
207      }
208    }
209    catch (Exception e)
210    {
211      /*
212       * The remote server has sent an unknown message, close the connection.
213       */
214      errMessage = NOTE_READER_EXCEPTION.get(handler,
215          stackTraceToSingleLineString(e));
216      logger.info(errMessage);
217    }
218    finally
219    {
220      /*
221       * The thread only exits the loop above if some error condition happen.
222       * Attempt to close the socket and stop the server handler.
223       */
224      if (logger.isTraceEnabled())
225      {
226        logger.trace("In " + getName() + " closing the session");
227      }
228      session.close();
229      handler.doStop();
230      if (logger.isTraceEnabled())
231      {
232        logger.trace(getName() + " stopped: " + errMessage);
233      }
234    }
235  }
236
237  /**
238   * Returns whether the update message is filtered in one of those cases:
239   * <ul>
240   * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li>
241   * <li>Ignore updates from RS with bad gen id</li>
242   * </ul>
243   */
244  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
245  {
246    if (handler.isDataServer())
247    {
248      /**
249       * Ignore updates from DS in bad BAD_GENID_STATUS or
250       * FULL_UPDATE_STATUS
251       *
252       * The RSD lock should not be taken here as it is acceptable to
253       * have a delay between the time the server has a wrong status and
254       * the fact we detect it: the updates that succeed to pass during
255       * this time will have no impact on remote server. But it is
256       * interesting to not saturate uselessly the network if the
257       * updates are not necessary so this check to stop sending updates
258       * is interesting anyway. Not taking the RSD lock allows to have
259       * better performances in normal mode (most of the time).
260       */
261      final ServerStatus dsStatus = handler.getStatus();
262      if (dsStatus == BAD_GEN_ID_STATUS)
263      {
264        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID,
265            handler.getReplicationServerId(), updateMsg.getCSN(),
266            handler.getBaseDN(), handler.getServerId(),
267            session.getReadableRemoteAddress(),
268            handler.getGenerationId(), handler.getReferenceGenId());
269        return true;
270      }
271      else if (dsStatus == FULL_UPDATE_STATUS)
272      {
273        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP,
274            handler.getReplicationServerId(), updateMsg.getCSN(),
275            handler.getBaseDN(), handler.getServerId(),
276            session.getReadableRemoteAddress());
277        return true;
278      }
279    }
280    else
281    {
282      /**
283       * Ignore updates from RS with bad gen id
284       * (no system managed status for a RS)
285       */
286      long referenceGenerationId = handler.getReferenceGenId();
287      if (referenceGenerationId > 0
288          && referenceGenerationId != handler.getGenerationId())
289      {
290        logger.error(WARN_IGNORING_UPDATE_FROM_RS,
291            handler.getReplicationServerId(), updateMsg.getCSN(),
292            handler.getBaseDN(), handler.getServerId(),
293            session.getReadableRemoteAddress(),
294            handler.getGenerationId(), referenceGenerationId);
295        return true;
296      }
297    }
298    return false;
299  }
300
301  private void logException(Exception e)
302  {
303    if (logger.isTraceEnabled())
304    {
305      logger.trace("In " + getName() + " " + stackTraceToSingleLineString(e));
306    }
307  }
308}