001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import static org.opends.messages.ReplicationMessages.*;
020import static org.opends.server.replication.common.ServerStatus.*;
021import static org.opends.server.replication.common.StatusMachine.*;
022import static org.opends.server.replication.protocol.ProtocolVersion.*;
023import static org.opends.server.util.StaticUtils.*;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Date;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031
032import org.forgerock.i18n.LocalizableMessage;
033import org.forgerock.i18n.slf4j.LocalizedLogger;
034import org.forgerock.opendj.ldap.ResultCode;
035import org.opends.server.api.MonitorData;
036import org.opends.server.replication.common.AssuredMode;
037import org.opends.server.replication.common.DSInfo;
038import org.opends.server.replication.common.ServerState;
039import org.opends.server.replication.common.ServerStatus;
040import org.opends.server.replication.common.StatusMachine;
041import org.opends.server.replication.common.StatusMachineEvent;
042import org.opends.server.replication.protocol.ChangeStatusMsg;
043import org.opends.server.replication.protocol.ProtocolVersion;
044import org.opends.server.replication.protocol.ReplServerStartDSMsg;
045import org.opends.server.replication.protocol.ReplicationMsg;
046import org.opends.server.replication.protocol.ServerStartMsg;
047import org.opends.server.replication.protocol.Session;
048import org.opends.server.replication.protocol.StartMsg;
049import org.opends.server.replication.protocol.StartSessionMsg;
050import org.opends.server.replication.protocol.StopMsg;
051import org.opends.server.replication.protocol.TopologyMsg;
052import org.opends.server.types.DirectoryException;
053
054/**
055 * This class defines a server handler, which handles all interaction with a
056 * peer server (RS or DS).
057 */
058public class DataServerHandler extends ServerHandler
059{
060
061  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
062
063  /**
064   * Temporary generationId received in handshake/phase1, and used after
065   * handshake/phase2.
066   */
067  private long tmpGenerationId;
068
069  /** Status of this DS (only used if this server handler represents a DS). */
070  private ServerStatus status = ServerStatus.INVALID_STATUS;
071
072  /** Referrals URLs this DS is exporting. */
073  private List<String> refUrls = new ArrayList<>();
074  /** Assured replication enabled on DS or not. */
075  private boolean assuredFlag;
076  /** DS assured mode (relevant if assured replication enabled). */
077  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
078  /** DS safe data level (relevant if assured mode is safe data). */
079  private byte safeDataLevel = -1;
080  private Set<String> eclIncludes = new HashSet<>();
081  private Set<String> eclIncludesForDeletes = new HashSet<>();
082
083  /**
084   * Creates a new data server handler.
085   * @param session The session opened with the remote data server.
086   * @param queueSize The queue size.
087   * @param replicationServer The hosting RS.
088   * @param rcvWindowSize The receiving window size.
089   */
090  public DataServerHandler(
091      Session session,
092      int queueSize,
093      ReplicationServer replicationServer,
094      int rcvWindowSize)
095  {
096    super(session, queueSize, replicationServer, rcvWindowSize);
097  }
098
099  /**
100   * Order the peer DS server to change his status or close the connection
101   * according to the requested new generation id.
102   * @param newGenId The new generation id to take into account
103   * @throws IOException If IO error occurred.
104   */
105  public void changeStatusForResetGenId(long newGenId) throws IOException
106  {
107    StatusMachineEvent event = getStatusMachineEvent(newGenId);
108    if (event == null)
109    {
110      return;
111    }
112
113    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
114        && status == ServerStatus.FULL_UPDATE_STATUS)
115    {
116      // Prevent useless error message (full update status cannot lead to bad gen status)
117      logger.info(NOTE_BAD_GEN_ID_IN_FULL_UPDATE, replicationServer.getServerId(),
118          getBaseDN(), serverId, generationId, newGenId);
119      return;
120    }
121
122    changeStatus(event, "for reset gen id");
123  }
124
125  private StatusMachineEvent getStatusMachineEvent(long newGenId)
126  {
127    if (newGenId == -1)
128    {
129      // The generation id is being made invalid, let's put the DS
130      // into BAD_GEN_ID_STATUS
131      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
132    }
133    if (newGenId != generationId)
134    {
135      // This server has a bad generation id compared to new reference one,
136      // let's put it into BAD_GEN_ID_STATUS
137      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
138    }
139
140    if (status != ServerStatus.BAD_GEN_ID_STATUS)
141    {
142      if (logger.isTraceEnabled())
143      {
144        logger.trace("In RS " + replicationServer.getServerId()
145            + ", DS " + getServerId() + " for baseDN=" + getBaseDN()
146            + " has already generation id " + newGenId
147            + " so no ChangeStatusMsg sent to him.");
148      }
149      return null;
150    }
151
152    // This server has the good new reference generation id.
153    // Close connection with him to force his reconnection: DS will
154    // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
155
156    if (logger.isTraceEnabled())
157    {
158      logger.trace("In RS " + replicationServer.getServerId()
159          + ", closing connection to DS " + getServerId() + " for baseDN=" + getBaseDN()
160          + " to force reconnection as new local generationId"
161          + " and remote one match and DS is in bad gen id: " + newGenId);
162    }
163
164    // Connection closure must not be done calling RSD.stopHandler() as it
165    // would rewait the RSD lock that we already must have entering this
166    // method. This would lead to a reentrant lock which we do not want.
167    // So simply close the session, this will make the hang up appear
168    // after the reader thread that took the RSD lock releases it.
169    if (session != null
170        // V4 protocol introduced a StopMsg to properly close the
171        // connection between servers
172        && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
173    {
174      try
175      {
176        session.publish(new StopMsg());
177      }
178      catch (IOException ioe)
179      {
180        // Anyway, going to close session, so nothing to do
181      }
182    }
183
184    // NOT_CONNECTED_STATUS is the last one in RS session life: handler
185    // will soon disappear after this method call...
186    status = ServerStatus.NOT_CONNECTED_STATUS;
187    return null;
188  }
189
190  /**
191   * Change the status according to the event.
192   *
193   * @param event
194   *          The event to be used for new status computation
195   * @return The new status of the DS
196   * @throws IOException
197   *           When raised by the underlying session
198   */
199  public ServerStatus changeStatus(StatusMachineEvent event) throws IOException
200  {
201    return changeStatus(event, "from status analyzer");
202  }
203
204  private ServerStatus changeStatus(StatusMachineEvent event, String origin)
205      throws IOException
206  {
207    // Check state machine allows this new status (Sanity check)
208    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
209    if (newStatus == ServerStatus.INVALID_STATUS)
210    {
211      logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event);
212      // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
213      // versa. We may be trying to change the status while another status has
214      // just been entered: e.g a full update has just been engaged.
215      // In that case, just ignore attempt to change the status
216      return newStatus;
217    }
218
219    // Send message requesting to change the DS status
220    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
221
222    if (logger.isTraceEnabled())
223    {
224      logger.trace("In RS " + replicationServer.getServerId()
225          + " Sending change status " + origin + " to " + getServerId()
226          + " for baseDN=" + getBaseDN() + ":\n" + csMsg);
227    }
228
229    session.publish(csMsg);
230
231    status = newStatus;
232
233    return newStatus;
234  }
235
236  @Override
237  public MonitorData getMonitorData()
238  {
239    MonitorData attributes = super.getMonitorData();
240
241    // Add the specific DS ones
242    attributes.add("replica", serverURL);
243    attributes.add("connected-to", replicationServer.getMonitorInstanceName());
244
245    ReplicationDomainMonitorData md = replicationServerDomain.getDomainMonitorData();
246
247    // Oldest missing update
248    long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
249    if (approxFirstMissingDate > 0)
250    {
251      attributes.add("approx-older-change-not-synchronized", new Date(approxFirstMissingDate));
252      attributes.add("approx-older-change-not-synchronized-millis", approxFirstMissingDate);
253    }
254
255    attributes.add("missing-changes", md.getMissingChanges(serverId));
256    // Replication delay
257    attributes.add("approximate-delay", md.getApproxDelay(serverId));
258
259    ServerState state = md.getLDAPServerState(serverId);
260    if (state != null)
261    {
262      attributes.add("server-state", state.toStringSet());
263    }
264
265    return attributes;
266  }
267
268  @Override
269  public String getMonitorInstanceName()
270  {
271    return "Connected directory server DS(" + serverId + ") " + serverURL
272        + ",cn=" + replicationServerDomain.getMonitorInstanceName();
273  }
274
275  /**
276   * Gets the status of the connected DS.
277   * @return The status of the connected DS.
278   */
279  @Override
280  public ServerStatus getStatus()
281  {
282    return status;
283  }
284
285  @Override
286  public boolean isDataServer()
287  {
288    return true;
289  }
290
291  /**
292   * Process message of a remote server changing his status.
293   * @param csMsg The message containing the new status
294   * @return The new server status of the DS
295   */
296  public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
297  {
298    // Get the status the DS just entered
299    ServerStatus reqStatus = csMsg.getNewStatus();
300    // Translate new status to a state machine event
301    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
302    if (event == StatusMachineEvent.INVALID_EVENT)
303    {
304      logger.error(ERR_RS_INVALID_NEW_STATUS, reqStatus, getBaseDN(), serverId);
305      return ServerStatus.INVALID_STATUS;
306    }
307
308    // Check state machine allows this new status
309    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
310    if (newStatus == ServerStatus.INVALID_STATUS)
311    {
312      logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event);
313      return ServerStatus.INVALID_STATUS;
314    }
315
316    status = newStatus;
317    return status;
318  }
319
320  /**
321   * Processes a start message received from a remote data server.
322   * @param serverStartMsg The provided start message received.
323   * @return flag specifying whether the remote server requests encryption.
324   * @throws DirectoryException raised when an error occurs.
325   */
326  public boolean processStartFromRemote(ServerStartMsg serverStartMsg)
327  throws DirectoryException
328  {
329    session
330        .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion()));
331    tmpGenerationId = serverStartMsg.getGenerationId();
332    serverId = serverStartMsg.getServerId();
333    serverURL = serverStartMsg.getServerURL();
334    groupId = serverStartMsg.getGroupId();
335    heartbeatInterval = serverStartMsg.getHeartbeatInterval();
336
337    // generic stuff
338    setBaseDNAndDomain(serverStartMsg.getBaseDN(), true);
339    setInitialServerState(serverStartMsg.getServerState());
340    setSendWindowSize(serverStartMsg.getWindowSize());
341
342    if (heartbeatInterval < 0)
343    {
344      heartbeatInterval = 0;
345    }
346    return serverStartMsg.getSSLEncryption();
347  }
348
349  /** Send our own TopologyMsg to DS. */
350  private TopologyMsg sendTopoToRemoteDS() throws IOException
351  {
352    TopologyMsg outTopoMsg = replicationServerDomain
353        .createTopologyMsgForDS(this.serverId);
354    sendTopoInfo(outTopoMsg);
355    return outTopoMsg;
356  }
357
358  /**
359   * Starts the handler from a remote ServerStart message received from
360   * the remote data server.
361   * @param inServerStartMsg The provided ServerStart message received.
362   */
363  public void startFromRemoteDS(ServerStartMsg inServerStartMsg)
364  {
365    try
366    {
367      // initializations
368      localGenerationId = -1;
369      oldGenerationId = -100;
370
371      // processes the ServerStart message received
372      boolean sessionInitiatorSSLEncryption =
373        processStartFromRemote(inServerStartMsg);
374
375      /**
376       * Hack to be sure that if a server disconnects and reconnect, we
377       * let the reader thread see the closure and cleanup any reference
378       * to old connection. This must be done before taking the domain lock so
379       * that the reader thread has a chance to stop the handler.
380       *
381       * TODO: This hack should be removed and disconnection/reconnection
382       * properly dealt with.
383       */
384      if (replicationServerDomain.getConnectedDSs()
385          .containsKey(inServerStartMsg.getServerId()))
386      {
387        try {
388          Thread.sleep(100);
389        }
390        catch(Exception e){
391          abortStart(null);
392          return;
393        }
394      }
395
396      lockDomainNoTimeout();
397
398      localGenerationId = replicationServerDomain.getGenerationId();
399      oldGenerationId = localGenerationId;
400
401      if (replicationServerDomain.isAlreadyConnectedToDS(this))
402      {
403        abortStart(null);
404        return;
405      }
406
407      try
408      {
409        StartMsg outStartMsg = sendStartToRemote();
410
411        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
412
413        // The session initiator decides whether to use SSL.
414        // Until here session is encrypted then it depends on the negotiation
415        if (!sessionInitiatorSSLEncryption)
416        {
417          session.stopEncryption();
418        }
419
420        // wait and process StartSessionMsg from remote RS
421        StartSessionMsg inStartSessionMsg =
422          waitAndProcessStartSessionFromRemoteDS();
423        if (inStartSessionMsg == null)
424        {
425          // DS wants to properly close the connection (DS sent a StopMsg)
426          logStopReceived();
427          abortStart(null);
428          return;
429        }
430
431        // Send our own TopologyMsg to remote DS
432        TopologyMsg outTopoMsg = sendTopoToRemoteDS();
433
434        logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
435      }
436      catch (IOException e)
437      {
438        logger.traceException(e);
439        LocalizableMessage errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
440            inServerStartMsg.getServerId(), replicationServer.getServerId(), getExceptionMessage(e));
441        abortStart(errMessage);
442        return;
443      }
444      catch (Exception e)
445      {
446        logger.traceException(e);
447        // We do not need to support DS V1 connection, we just accept RS V1 connection:
448        // We just trash the message, log the event for debug purpose and close the connection
449        abortStart(null);
450        return;
451      }
452
453      replicationServerDomain.register(this);
454
455      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_DS, getReplicationServerId(), getServerId(),
456              replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
457
458      super.finalizeStart();
459    }
460    catch(DirectoryException de)
461    {
462      logger.traceException(de);
463      abortStart(de.getMessageObject());
464    }
465    catch(Exception e)
466    {
467      logger.traceException(e);
468      abortStart(null);
469    }
470    finally
471    {
472      releaseDomainLock();
473    }
474  }
475
476  /**
477   * Sends a start message to the remote DS.
478   *
479   * @return The StartMsg sent.
480   * @throws IOException
481   *           When an exception occurs.
482   */
483  private StartMsg sendStartToRemote() throws IOException
484  {
485    final StartMsg startMsg;
486
487    // Before V4 protocol, we sent a ReplServerStartMsg
488    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
489    {
490      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
491      startMsg = createReplServerStartMsg();
492    }
493    else
494    {
495      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
496      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
497          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
498          replicationServerDomain.getLatestServerState(),
499          localGenerationId, sslEncryption, getLocalGroupId(),
500          replicationServer.getDegradedStatusThreshold(),
501          replicationServer.getWeight(),
502          replicationServerDomain.getConnectedDSs().size());
503    }
504
505    send(startMsg);
506    return startMsg;
507  }
508
509  /**
510   * Creates a DSInfo structure representing this remote DS.
511   * @return The DSInfo structure representing this remote DS
512   */
513  public DSInfo toDSInfo()
514  {
515    return new DSInfo(serverId, serverURL, getReplicationServerId(),
516        generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId,
517        refUrls, eclIncludes, eclIncludesForDeletes, getProtocolVersion());
518  }
519
520  @Override
521  public String toString()
522  {
523    if (serverId != 0)
524    {
525      return "Replica DS(" + serverId + ") for domain \""
526          + replicationServerDomain.getBaseDN() + "\"";
527    }
528    return "Unknown server";
529  }
530
531  /**
532   * Wait receiving the StartSessionMsg from the remote DS and process it, or
533   * receiving a StopMsg to properly stop the handshake procedure.
534   * @return the startSessionMsg received or null DS sent a stop message to
535   *         not finish the handshake.
536   * @throws Exception
537   */
538  private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
539      throws Exception
540  {
541    ReplicationMsg msg = session.receive();
542
543    if (msg instanceof StopMsg)
544    {
545      // DS wants to stop handshake (was just for handshake phase one for RS
546      // choice). Return null to make the session be terminated.
547      return null;
548    } else if (!(msg instanceof StartSessionMsg))
549    {
550      LocalizableMessage message = LocalizableMessage.raw(
551          "Protocol error: StartSessionMsg required." + msg + " received.");
552      abortStart(message);
553      return null;
554    }
555
556    // Process StartSessionMsg sent by remote DS
557    StartSessionMsg startSessionMsg = (StartSessionMsg) msg;
558
559    this.status = startSessionMsg.getStatus();
560    // Sanity check: is it a valid initial status?
561    if (!isValidInitialStatus(this.status))
562    {
563      throw new DirectoryException(ResultCode.OTHER,
564          ERR_RS_INVALID_INIT_STATUS.get( this.status, getBaseDN(), serverId));
565    }
566
567    this.refUrls = startSessionMsg.getReferralsURLs();
568    this.assuredFlag = startSessionMsg.isAssured();
569    this.assuredMode = startSessionMsg.getAssuredMode();
570    this.safeDataLevel = startSessionMsg.getSafeDataLevel();
571    this.eclIncludes = startSessionMsg.getEclIncludes();
572    this.eclIncludesForDeletes = startSessionMsg.getEclIncludesForDeletes();
573
574    /*
575     * If we have already a generationID set for the domain
576     * then
577     *   if the connecting replica has not the same
578     *   then it is degraded locally and notified by an error message
579     * else
580     *   we set the generationID from the one received
581     *   (unsaved yet on disk . will be set with the 1rst change
582     * received)
583     */
584    generationId = tmpGenerationId;
585    if (localGenerationId > 0)
586    {
587      if (generationId != localGenerationId)
588      {
589        logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(),
590            generationId, getBaseDN(), getReplicationServerId(), localGenerationId);
591      }
592    }
593    else
594    {
595      // We are an empty ReplicationServer
596      if (generationId > 0 && !getServerState().isEmpty())
597      {
598        // If the LDAP server has already sent changes
599        // it is not expected to connect to an empty RS
600        logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(),
601            generationId, getBaseDN(), getReplicationServerId(), localGenerationId);
602      }
603      else
604      {
605        // The local RS is not initialized - take the one received
606        // WARNING: Must be done before computing topo message to send
607        // to peer server as topo message must embed valid generation id
608        // for our server
609        oldGenerationId = replicationServerDomain.changeGenerationId(generationId);
610      }
611    }
612    return startSessionMsg;
613  }
614
615  /**
616   * Process message of a remote server changing his status.
617   * @param csMsg The message containing the new status
618   */
619  public void receiveNewStatus(ChangeStatusMsg csMsg)
620  {
621    replicationServerDomain.processNewStatus(this, csMsg);
622  }
623}