001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import static org.opends.messages.ReplicationMessages.*;
020import static org.opends.server.replication.protocol.ProtocolVersion.*;
021import static org.opends.server.util.StaticUtils.*;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028
029import org.forgerock.i18n.LocalizableMessage;
030import org.forgerock.i18n.slf4j.LocalizedLogger;
031import org.forgerock.opendj.ldap.DN;
032import org.forgerock.opendj.ldap.ResultCode;
033import org.opends.server.api.MonitorData;
034import org.opends.server.replication.common.DSInfo;
035import org.opends.server.replication.common.RSInfo;
036import org.opends.server.replication.common.ServerState;
037import org.opends.server.replication.common.ServerStatus;
038import org.opends.server.replication.protocol.ProtocolVersion;
039import org.opends.server.replication.protocol.ReplServerStartMsg;
040import org.opends.server.replication.protocol.ReplicationMsg;
041import org.opends.server.replication.protocol.Session;
042import org.opends.server.replication.protocol.StopMsg;
043import org.opends.server.replication.protocol.TopologyMsg;
044import org.opends.server.types.DirectoryException;
045import org.opends.server.types.HostPort;
046
047/**
048 * This class defines a server handler, which handles all interaction with a
049 * peer replication server.
050 */
051public class ReplicationServerHandler extends ServerHandler
052{
053  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
054
055  /** Properties filled only if remote server is a RS. */
056  private String serverAddressURL;
057  /**
058   * This collection will contain as many elements as there are
059   * LDAP servers connected to the remote replication server.
060   */
061  private final Map<Integer, LightweightServerHandler> remoteDirectoryServers = new ConcurrentHashMap<>();
062
063  /**
064   * Starts this handler based on a start message received from remote server.
065   * @param inReplServerStartMsg The start msg provided by the remote server.
066   * @return Whether the remote server requires encryption or not.
067   * @throws DirectoryException When a problem occurs.
068   */
069  private boolean processStartFromRemote(
070        ReplServerStartMsg inReplServerStartMsg)
071        throws DirectoryException
072  {
073    try
074    {
075      short protocolVersion = getCompatibleVersion(inReplServerStartMsg
076          .getVersion());
077      session.setProtocolVersion(protocolVersion);
078      generationId = inReplServerStartMsg.getGenerationId();
079      serverId = inReplServerStartMsg.getServerId();
080      serverURL = inReplServerStartMsg.getServerURL();
081      serverAddressURL = toServerAddressURL(serverURL);
082      setBaseDNAndDomain(inReplServerStartMsg.getBaseDN(), false);
083      setInitialServerState(inReplServerStartMsg.getServerState());
084      setSendWindowSize(inReplServerStartMsg.getWindowSize());
085      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
086      {
087        // We support connection from a V1 RS
088        // Only V2 protocol has the group id in repl server start message
089        this.groupId = inReplServerStartMsg.getGroupId();
090      }
091
092      oldGenerationId = -100;
093    }
094    catch(Exception e)
095    {
096      LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage());
097      throw new DirectoryException(ResultCode.OTHER, message);
098    }
099    return inReplServerStartMsg.getSSLEncryption();
100  }
101
102  private String toServerAddressURL(String serverURL)
103  {
104    final int port = HostPort.valueOf(serverURL).getPort();
105    // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
106    return new HostPort(session.getRemoteAddress(), port).toString();
107  }
108
109  /**
110   * Sends a start message to the remote RS.
111   *
112   * @return The ReplServerStartMsg sent.
113   * @throws IOException
114   *           When an exception occurs.
115   */
116  private ReplServerStartMsg sendStartToRemote() throws IOException
117  {
118    ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg();
119    send(outReplServerStartMsg);
120    return outReplServerStartMsg;
121  }
122
123  /**
124   * Creates a new handler object to a remote replication server.
125   * @param session The session with the remote RS.
126   * @param queueSize The queue size to manage updates to that RS.
127   * @param replicationServer The hosting local RS object.
128   * @param rcvWindowSize The receiving window size.
129   */
130  public ReplicationServerHandler(
131      Session session,
132      int queueSize,
133      ReplicationServer replicationServer,
134      int rcvWindowSize)
135  {
136    super(session, queueSize, replicationServer, rcvWindowSize);
137  }
138
139  /**
140   * Connect the hosting RS to the RS represented by THIS handler
141   * on an outgoing connection.
142   * @param baseDN The baseDN
143   * @param sslEncryption The sslEncryption requested to the remote RS.
144   * @throws DirectoryException when an error occurs.
145   */
146  public void connect(DN baseDN, boolean sslEncryption)
147      throws DirectoryException
148  {
149    // we are the initiator and decides of the encryption
150    this.sslEncryption = sslEncryption;
151
152    setBaseDNAndDomain(baseDN, false);
153
154    localGenerationId = replicationServerDomain.getGenerationId();
155    oldGenerationId = localGenerationId;
156
157    try
158    {
159      lockDomainNoTimeout();
160
161      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
162
163      // Wait answer
164      ReplicationMsg msg = session.receive();
165
166      // Reject bad responses
167      if (!(msg instanceof ReplServerStartMsg))
168      {
169        if (msg instanceof StopMsg)
170        {
171          // Remote replication server is probably shutting down or simultaneous
172          // cross-connect detected.
173          abortStart(null);
174        }
175        else
176        {
177          LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
178              .getClass().getCanonicalName(), "ReplServerStartMsg");
179          abortStart(message);
180        }
181        return;
182      }
183
184      processStartFromRemote((ReplServerStartMsg) msg);
185
186      if (replicationServerDomain.isAlreadyConnectedToRS(this))
187      {
188        // Simultaneous cross connect.
189        abortStart(null);
190        return;
191      }
192
193      /*
194      Since we are going to send the topology message before having received
195      one, we need to set the generation ID as soon as possible if it is
196      currently uninitialized. See OpenDJ-121.
197      */
198      if (localGenerationId < 0 && generationId > 0)
199      {
200        oldGenerationId =
201            replicationServerDomain.changeGenerationId(generationId);
202      }
203
204      logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg);
205
206      // Until here session is encrypted then it depends on the negotiation
207      // The session initiator decides whether to use SSL.
208      if (!this.sslEncryption)
209      {
210        session.stopEncryption();
211      }
212
213      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
214      {
215        /*
216        Only protocol version above V1 has a phase 2 handshake
217        NOW PROCEED WITH SECOND PHASE OF HANDSHAKE:
218        TopologyMsg then TopologyMsg (with a RS)
219
220        Send our own TopologyMsg to remote RS
221        */
222        TopologyMsg outTopoMsg =
223            replicationServerDomain.createTopologyMsgForRS();
224        sendTopoInfo(outTopoMsg);
225
226        // wait and process Topo from remote RS
227        TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS();
228        if (inTopoMsg == null)
229        {
230          // Simultaneous cross connect.
231          abortStart(null);
232          return;
233        }
234
235        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
236
237        /*
238        FIXME: i think this should be done for all protocol version !!
239        not only those > V1
240        */
241        replicationServerDomain.register(this);
242
243        /*
244        Process TopologyMsg sent by remote RS: store matching new info
245        (this will also warn our connected DSs of the new received info)
246        */
247        replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
248      }
249
250      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_TO_RS, getReplicationServerId(), getServerId(),
251          replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
252
253      super.finalizeStart();
254    }
255    catch (IOException e)
256    {
257      logger.traceException(e);
258      LocalizableMessage errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
259          getReplicationServerId(), session.getReadableRemoteAddress(), getExceptionMessage(e));
260      abortStart(errMessage);
261    }
262    catch (DirectoryException e)
263    {
264      logger.traceException(e);
265      abortStart(e.getMessageObject());
266    }
267    catch (Exception e)
268    {
269      logger.traceException(e);
270      abortStart(LocalizableMessage.raw(e.getLocalizedMessage()));
271    }
272    finally
273    {
274      releaseDomainLock();
275    }
276  }
277
278  /**
279   * Starts the handler from a remote ReplServerStart message received from
280   * the remote replication server.
281   * @param inReplServerStartMsg The provided ReplServerStart message received.
282   */
283  public void startFromRemoteRS(ReplServerStartMsg inReplServerStartMsg)
284  {
285    localGenerationId = -1;
286    oldGenerationId = -100;
287    try
288    {
289      // The initiator decides if the session is encrypted
290      sslEncryption = processStartFromRemote(inReplServerStartMsg);
291
292      lockDomainWithTimeout();
293
294      if (replicationServerDomain.isAlreadyConnectedToRS(this))
295      {
296        abortStart(null);
297        return;
298      }
299
300      this.localGenerationId = replicationServerDomain.getGenerationId();
301      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
302
303      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
304
305      /*
306      until here session is encrypted then it depends on the negotiation
307      The session initiator decides whether to use SSL.
308      */
309      if (!sslEncryption)
310      {
311        session.stopEncryption();
312      }
313
314      TopologyMsg inTopoMsg = null;
315      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
316      {
317        /*
318        Only protocol version above V1 has a phase 2 handshake
319        NOW PROCEED WITH SECOND PHASE OF HANDSHAKE:
320        TopologyMsg then TopologyMsg (with a RS)
321        wait and process Topo from remote RS
322        */
323        inTopoMsg = waitAndProcessTopoFromRemoteRS();
324        if (inTopoMsg == null)
325        {
326          // Simultaneous cross connect.
327          abortStart(null);
328          return;
329        }
330
331        // send our own TopologyMsg to remote RS
332        TopologyMsg outTopoMsg = replicationServerDomain
333            .createTopologyMsgForRS();
334        sendTopoInfo(outTopoMsg);
335
336        logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
337      }
338      else
339      {
340        // Terminate connection from a V1 RS
341
342        // if the remote RS and the local RS have the same genID
343        // then it's ok and nothing else to do
344        if (generationId == localGenerationId)
345        {
346          if (logger.isTraceEnabled())
347          {
348            logger.trace("In " + replicationServer.getMonitorInstanceName()
349                + " " + this + " RS V1 with serverID=" + serverId
350                + " is connected with the right generation ID");
351          }
352        } else
353        {
354          checkGenerationId();
355        }
356        /*
357        Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
358        all the servers of the topology. We prefer not not send a TopologyMsg
359        for giving partial/false information to the V2 servers as for
360        instance we don't have the connected DS of the V1 RS...When the V1
361        RS will be upgraded in his turn, topo info will be sent and accurate.
362        That way, there is  no risk to have false/incomplete information in
363        other servers.
364        */
365      }
366
367      replicationServerDomain.register(this);
368
369      // Process TopologyMsg sent by remote RS: store matching new info
370      // (this will also warn our connected DSs of the new received info)
371      if (inTopoMsg!=null)
372      {
373        replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
374      }
375
376      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_RS, getReplicationServerId(), getServerId(),
377          replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
378
379      super.finalizeStart();
380    }
381    catch (IOException e)
382    {
383      logger.traceException(e);
384      abortStart(ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
385          inReplServerStartMsg.getServerId(), replicationServer.getServerId(), getExceptionMessage(e)));
386    }
387    catch (DirectoryException e)
388    {
389      logger.traceException(e);
390      abortStart(e.getMessageObject());
391    }
392    catch (Exception e)
393    {
394      logger.traceException(e);
395      abortStart(LocalizableMessage.raw(e.getLocalizedMessage()));
396    }
397    finally
398    {
399      releaseDomainLock();
400    }
401  }
402
403  /**
404   * Wait receiving the TopologyMsg from the remote RS and process it.
405   * @return the topologyMsg received or {@code null} if stop was received.
406   * @throws DirectoryException
407   */
408  private TopologyMsg waitAndProcessTopoFromRemoteRS()
409      throws DirectoryException
410  {
411    ReplicationMsg msg;
412    try
413    {
414      msg = session.receive();
415    }
416    catch(Exception e)
417    {
418      LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage());
419      throw new DirectoryException(ResultCode.OTHER, message);
420    }
421
422    if (!(msg instanceof TopologyMsg))
423    {
424      if (msg instanceof StopMsg)
425      {
426        // Remote replication server is probably shutting down, or cross
427        // connection attempt.
428        return null;
429      }
430
431      LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(
432          msg.getClass().getCanonicalName(), "TopologyMsg");
433      throw new DirectoryException(ResultCode.OTHER, message);
434    }
435
436    // Remote RS sent his topo msg
437    TopologyMsg inTopoMsg = (TopologyMsg) msg;
438
439    /* Store remote RS weight if it has one.
440     * For protocol version < 4, use default value of 1 for weight
441     */
442    if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
443    {
444      // List should only contain RS info for sender
445      RSInfo rsInfo = inTopoMsg.getRsInfos().get(0);
446      weight = rsInfo.getWeight();
447    }
448
449    /*
450    if the remote RS and the local RS have the same genID
451    then it's ok and nothing else to do
452    */
453    if (generationId == localGenerationId)
454    {
455      if (logger.isTraceEnabled())
456      {
457        logger.trace("In " + replicationServer.getMonitorInstanceName()
458            + " RS with serverID=" + serverId
459            + " is connected with the right generation ID, same as local ="
460            + generationId);
461      }
462    }
463    else
464    {
465      checkGenerationId();
466    }
467
468    return inTopoMsg;
469  }
470
471  /**
472   * Checks local generation ID against the remote RS one,
473   * and logs Warning messages if needed.
474   */
475  private void checkGenerationId()
476  {
477    if (localGenerationId <= 0)
478    {
479      // The local RS is not initialized - take the one received
480      // WARNING: Must be done before computing topo message to send to peer
481      // server as topo message must embed valid generation id for our server
482      oldGenerationId =
483          replicationServerDomain.changeGenerationId(generationId);
484      return;
485    }
486
487    // the local RS is initialized
488    if (generationId > 0
489        // the remote RS is initialized. If not, there's nothing to do anyway.
490        && generationId != localGenerationId)
491    {
492      /* Either:
493       *
494       * 1) The 2 RS have different generationID
495       * replicationServerDomain.getGenerationIdSavedStatus() == true
496       *
497       * if the present RS has received changes regarding its gen ID and so will
498       * not change without a reset then we are just degrading the peer.
499       *
500       * 2) This RS has never received any changes for the current gen ID.
501       *
502       * Example case:
503       * - we are in RS1
504       * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
505       * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
506       * - we are in RS1 and we receive a START msg from RS2
507       * - Each RS keeps its genID / is degraded and when LS2
508       * will be populated from LS1 everything will become ok.
509       *
510       * Issue:
511       * FIXME : Would it be a good idea in some cases to just set the gen ID
512       * received from the peer RS specially if the peer has a non null state
513       * and we have a null state ?
514       * replicationServerDomain.setGenerationId(generationId, false);
515       */
516      logger.warn(WARN_BAD_GENERATION_ID_FROM_RS, serverId, session.getReadableRemoteAddress(), generationId,
517          getBaseDN(), getReplicationServerId(), localGenerationId);
518    }
519  }
520
521  /** {@inheritDoc} */
522  @Override
523  public boolean isDataServer()
524  {
525    return false;
526  }
527
528  /**
529   * Add the DSinfos of the connected Directory Servers
530   * to the List of DSInfo provided as a parameter.
531   *
532   * @param dsInfos The List of DSInfo that should be updated
533   *                with the DSInfo for the remoteDirectoryServers
534   *                connected to this ServerHandler.
535   */
536  public void addDSInfos(List<DSInfo> dsInfos)
537  {
538    synchronized (remoteDirectoryServers)
539    {
540      for (LightweightServerHandler ls : remoteDirectoryServers.values())
541      {
542        dsInfos.add(ls.toDSInfo());
543      }
544    }
545  }
546
547  /**
548   * Shutdown This ServerHandler.
549   */
550  @Override
551  public void shutdown()
552  {
553    super.shutdown();
554    clearRemoteLSHandlers();
555  }
556
557  private void clearRemoteLSHandlers()
558  {
559    synchronized (remoteDirectoryServers)
560    {
561      for (LightweightServerHandler lsh : remoteDirectoryServers.values())
562      {
563        lsh.stopHandler();
564      }
565      remoteDirectoryServers.clear();
566    }
567  }
568
569  /**
570   * Stores topology information received from a peer RS and that must be kept
571   * in RS handler.
572   *
573   * @param topoMsg The received topology message
574   */
575  public void processTopoInfoFromRS(TopologyMsg topoMsg)
576  {
577    // List should only contain RS info for sender
578    final RSInfo rsInfo = topoMsg.getRsInfos().get(0);
579    generationId = rsInfo.getGenerationId();
580    groupId = rsInfo.getGroupId();
581    weight = rsInfo.getWeight();
582
583    synchronized (remoteDirectoryServers)
584    {
585      clearRemoteLSHandlers();
586
587      // Creates the new structure according to the message received.
588      for (DSInfo dsInfo : topoMsg.getReplicaInfos().values())
589      {
590        // For each DS connected to the peer RS
591        DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId);
592        LightweightServerHandler lsh =
593            new LightweightServerHandler(this, clonedDSInfo);
594        lsh.startHandler();
595        remoteDirectoryServers.put(lsh.getServerId(), lsh);
596      }
597    }
598  }
599
600  /**
601   * When this handler is connected to a replication server, specifies if
602   * a wanted server is connected to this replication server.
603   *
604   * @param serverId The server we want to know if it is connected
605   * to the replication server represented by this handler.
606   * @return boolean True is the wanted server is connected to the server
607   * represented by this handler.
608   */
609  public boolean isRemoteLDAPServer(int serverId)
610  {
611    synchronized (remoteDirectoryServers)
612    {
613      for (LightweightServerHandler server : remoteDirectoryServers.values())
614      {
615        if (serverId == server.getServerId())
616        {
617          return true;
618        }
619      }
620      return false;
621    }
622  }
623
624  /**
625   * When the handler is connected to a replication server, specifies the
626   * replication server has remote LDAP servers connected to it.
627   *
628   * @return boolean True is the replication server has remote LDAP servers
629   * connected to it.
630   */
631  public boolean hasRemoteLDAPServers()
632  {
633    return !remoteDirectoryServers.isEmpty();
634  }
635
636  /**
637   * Return a Set containing the servers known by this replicationServer.
638   * @return a set containing the servers known by this replicationServer.
639   */
640  public Set<Integer> getConnectedDirectoryServerIds()
641  {
642    return remoteDirectoryServers.keySet();
643  }
644
645  /** {@inheritDoc} */
646  @Override
647  public String getMonitorInstanceName()
648  {
649    return "Connected replication server RS(" + serverId + ") " + serverURL
650        + ",cn=" + replicationServerDomain.getMonitorInstanceName();
651  }
652
653  @Override
654  public MonitorData getMonitorData()
655  {
656    MonitorData attributes = super.getMonitorData();
657
658    ReplicationDomainMonitorData md = replicationServerDomain.getDomainMonitorData();
659    attributes.add("Replication-Server", serverURL);
660    attributes.add("missing-changes", md.getMissingChangesRS(serverId));
661
662    ServerState state = md.getRSStates(serverId);
663    if (state != null)
664    {
665      attributes.add("server-state", state.toStringSet());
666    }
667
668    return attributes;
669  }
670
671  /** {@inheritDoc} */
672  @Override
673  public String toString()
674  {
675    if (serverId != 0)
676    {
677      return "Replication server RS(" + serverId + ") for domain \""
678          + replicationServerDomain.getBaseDN() + "\"";
679    }
680    return "Unknown server";
681  }
682
683  /**
684   * Gets the status of the connected DS.
685   * @return The status of the connected DS.
686   */
687  @Override
688  public ServerStatus getStatus()
689  {
690    return ServerStatus.INVALID_STATUS;
691  }
692
693  /**
694   * Retrieves the Address URL for this server handler.
695   *
696   * @return  The Address URL for this server handler,
697   *          in the form of an IP address and port separated by a colon.
698   */
699  public String getServerAddressURL()
700  {
701    return serverAddressURL;
702  }
703
704  /**
705   * Receives a topology msg.
706   * @param topoMsg The message received.
707   * @throws DirectoryException when it occurs.
708   * @throws IOException when it occurs.
709   */
710  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
711  throws DirectoryException, IOException
712  {
713    replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
714  }
715
716}