001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2017 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import static org.opends.messages.ConfigMessages.*;
020import static org.opends.messages.ReplicationMessages.*;
021import static org.opends.server.replication.plugin.MultimasterReplication.getUnreachableReplicationServers;
022import static org.opends.server.util.StaticUtils.*;
023
024import java.io.File;
025import java.io.IOException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.ServerSocket;
029import java.net.Socket;
030import java.net.SocketTimeoutException;
031import java.net.UnknownHostException;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.CopyOnWriteArrayList;
042import java.util.concurrent.CopyOnWriteArraySet;
043import java.util.concurrent.atomic.AtomicBoolean;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigChangeResult;
048import org.forgerock.opendj.config.server.ConfigException;
049import org.forgerock.opendj.config.server.ConfigurationChangeListener;
050import org.forgerock.opendj.ldap.DN;
051import org.forgerock.opendj.ldap.ResultCode;
052import org.forgerock.opendj.ldap.SearchScope;
053import org.forgerock.opendj.ldap.schema.AttributeType;
054import org.forgerock.opendj.server.config.meta.VirtualAttributeCfgDefn.ConflictBehavior;
055import org.forgerock.opendj.server.config.server.ReplicationServerCfg;
056import org.forgerock.opendj.server.config.server.UserDefinedVirtualAttributeCfg;
057import org.opends.server.api.DiskSpaceMonitorHandler;
058import org.opends.server.api.VirtualAttributeProvider;
059import org.opends.server.backends.ChangelogBackend;
060import org.opends.server.core.DirectoryServer;
061import org.opends.server.crypto.CryptoSuite;
062import org.opends.server.extensions.DiskSpaceMonitor;
063import org.opends.server.replication.common.CSN;
064import org.opends.server.replication.common.MultiDomainServerState;
065import org.opends.server.replication.common.ServerState;
066import org.opends.server.replication.plugin.MultimasterReplication;
067import org.opends.server.replication.plugin.MultimasterReplication.UnreachableReplicationServers;
068import org.opends.server.replication.protocol.ReplServerStartMsg;
069import org.opends.server.replication.protocol.ReplSessionSecurity;
070import org.opends.server.replication.protocol.ReplicationMsg;
071import org.opends.server.replication.protocol.ServerStartMsg;
072import org.opends.server.replication.protocol.Session;
073import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
074import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
075import org.opends.server.replication.server.changelog.api.ChangelogDB;
076import org.opends.server.replication.server.changelog.api.ChangelogException;
077import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
078import org.opends.server.replication.server.changelog.file.FileChangelogDB;
079import org.opends.server.replication.service.DSRSShutdownSync;
080import org.opends.server.types.DirectoryException;
081import org.opends.server.types.HostPort;
082import org.opends.server.types.SearchFilter;
083import org.opends.server.types.VirtualAttributeRule;
084
085import net.jcip.annotations.GuardedBy;
086
087/**
088 * ReplicationServer Listener. This singleton is the main object of the
089 * replication server. It waits for the incoming connections and create listener
090 * and publisher objects for connection with LDAP servers and with replication
091 * servers It is responsible for creating the replication server
092 * replicationServerDomain and managing it
093 */
094public class ReplicationServer
095  implements ConfigurationChangeListener<ReplicationServerCfg>, DiskSpaceMonitorHandler
096{
097  private String serverURL;
098
099  private ServerSocket listenSocket;
100  @GuardedBy("threadsLock")
101  private Thread listenThread;
102  @GuardedBy("threadsLock")
103  private Thread connectThread;
104  private final Object threadsLock = new Object();
105
106  /** The current configuration of this replication server. */
107  private ReplicationServerCfg config;
108  private final DSRSShutdownSync dsrsShutdownSync;
109
110  /** This table is used to store the list of dn for which we are currently handling servers. */
111  private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<>();
112
113  /** The database storing the changes. */
114  private final ChangelogDB changelogDB;
115
116  /** The backend that allow to search the changes (external changelog). */
117  private ChangelogBackend changelogBackend;
118
119  private final AtomicBoolean shutdown = new AtomicBoolean();
120  private volatile boolean stopListen;
121  private final ReplSessionSecurity replSessionSecurity;
122
123  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
124
125  /** To know whether a domain is enabled for the external changelog. */
126  private final ECLEnabledDomainPredicate domainPredicate;
127
128  /**
129   * This is required for unit testing, so that we can keep track of all the
130   * replication servers which are running in the VM.
131   */
132  private static final Set<Integer> localPorts = new CopyOnWriteArraySet<>();
133
134  /** Monitors for synchronizing domain creation with the connect thread. */
135  private final Object domainTicketLock = new Object();
136  private final Object connectThreadLock = new Object();
137  private long domainTicket;
138
139  /**
140   * Holds the list of all replication servers instantiated in this VM.
141   * This allows to perform clean up of the RS databases in unit tests.
142   */
143  private static final List<ReplicationServer> allInstances = new CopyOnWriteArrayList<>();
144
145  private final CryptoSuite cryptoSuite;
146
147  private final DiskSpaceMonitor diskMonitor;
148
149  /**
150   * Creates a new Replication server using the provided configuration entry.
151   *
152   * @param cfg The configuration of this replication server.
153   * @throws ConfigException When Configuration is invalid.
154   */
155  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
156  {
157    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
158  }
159
160  /**
161   * Creates a new Replication server using the provided configuration entry and shutdown
162   * synchronization object.
163   *
164   * @param cfg The configuration of this replication server.
165   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
166   * @throws ConfigException When Configuration is invalid.
167   */
168  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
169  {
170    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
171  }
172
173  /**
174   * Creates a new Replication server using the provided configuration entry, shutdown
175   * synchronization object and domain predicate.
176   *
177   * @param cfg The configuration of this replication server.
178   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
179   * @param predicate Indicates whether a domain is enabled for the external changelog.
180   * @throws ConfigException When Configuration is invalid.
181   */
182  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
183      final ECLEnabledDomainPredicate predicate) throws ConfigException
184  {
185    this.config = cfg;
186    this.dsrsShutdownSync = dsrsShutdownSync;
187    this.domainPredicate = predicate;
188
189    enableExternalChangeLog();
190    cryptoSuite = DirectoryServer.getInstance().getServerContext().getCryptoManager().
191        newCryptoSuite(cfg.getCipherTransformation(), cfg.getCipherKeyLength(), cfg.isConfidentialityEnabled());
192
193    this.changelogDB = new FileChangelogDB(this, config.getReplicationDBDirectory(), cryptoSuite);
194
195    diskMonitor = DirectoryServer.getInstance().getServerContext().getDiskSpaceMonitor();
196    registerDiskMonitor(cfg);
197
198    replSessionSecurity = new ReplSessionSecurity();
199    initialize();
200    cfg.addChangeListener(this);
201
202    localPorts.add(getReplicationPort());
203
204    // Keep track of this new instance
205    allInstances.add(this);
206  }
207
208  private void registerDiskMonitor(final ReplicationServerCfg cfg)
209  {
210    diskMonitor.registerMonitoredDirectory("Replication Changelog disk space monitor",
211                                           getFileForPath(cfg.getReplicationDBDirectory()),
212                                           cfg.getDiskLowThreshold(),
213                                           cfg.getDiskFullThreshold(),
214                                           this);
215  }
216
217  private void deregisterDiskMonitor(final ReplicationServerCfg cfg)
218  {
219    diskMonitor.deregisterMonitoredDirectory(getFileForPath(cfg.getReplicationDBDirectory()), this);
220  }
221
222  private Set<HostPort> getConfiguredRSAddresses()
223  {
224    final Set<HostPort> results = new HashSet<>();
225    for (String serverAddress : this.config.getReplicationServer())
226    {
227      results.add(HostPort.valueOf(serverAddress));
228    }
229    return results;
230  }
231
232  /**
233   * Get the list of every replication servers instantiated in the current VM.
234   * @return The list of every replication servers instantiated in the current
235   * VM.
236   */
237  public static List<ReplicationServer> getAllInstances()
238  {
239    return allInstances;
240  }
241
242  /**
243   * The run method for the Listen thread.
244   * This thread accept incoming connections on the replication server
245   * ports from other replication servers or from LDAP servers
246   * and spawn further thread responsible for handling those connections
247   */
248  void runListen()
249  {
250    logger.info(NOTE_REPLICATION_SERVER_LISTENING,
251        getServerId(),
252        listenSocket.getInetAddress().getHostAddress(),
253        listenSocket.getLocalPort());
254
255    while (!shutdown.get() && !stopListen)
256    {
257      // Wait on the replicationServer port.
258      // Read incoming messages and create LDAP or ReplicationServer listener
259      // and Publisher.
260      try
261      {
262        Session session;
263        Socket newSocket = null;
264        try
265        {
266          newSocket = listenSocket.accept();
267          newSocket.setTcpNoDelay(true);
268          newSocket.setKeepAlive(true);
269          int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
270          session = replSessionSecurity.createServerSession(newSocket, timeoutMS);
271          if (session == null) // Error, go back to accept
272          {
273            continue;
274          }
275        }
276        catch (Exception e)
277        {
278          // If problems happen during the SSL handshake, it is necessary
279          // to close the socket to free the associated resources.
280          if (newSocket != null)
281          {
282            newSocket.close();
283          }
284          continue;
285        }
286
287        ReplicationMsg msg = session.receive();
288
289        final int queueSize = this.config.getQueueSize();
290        final int rcvWindow = this.config.getWindowSize();
291        if (msg instanceof ServerStartMsg)
292        {
293          DataServerHandler dsHandler = new DataServerHandler(
294              session, queueSize, this, rcvWindow);
295          dsHandler.startFromRemoteDS((ServerStartMsg) msg);
296        }
297        else if (msg instanceof ReplServerStartMsg)
298        {
299          ReplicationServerHandler rsHandler = new ReplicationServerHandler(
300              session, queueSize, this, rcvWindow);
301          rsHandler.startFromRemoteRS((ReplServerStartMsg) msg);
302        }
303        else
304        {
305          // We did not recognize the message, close session as what
306          // can happen after is undetermined and we do not want the server to
307          // be disturbed
308          logger.error(ERR_REPLICATION_UNEXPECTED_MESSAGE,
309                  session.getRemoteAddress().toString(),
310                  (msg == null) ? "(null)" : msg.getClass().getSimpleName());
311          session.close();
312        }
313      }
314      catch (Exception e)
315      {
316        // The socket has probably been closed as part of the
317        // shutdown or changing the port number process.
318        // Just log debug information and loop.
319        // Do not log the message during shutdown.
320        logger.traceException(e);
321        if (!shutdown.get())
322        {
323          logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage());
324        }
325      }
326    }
327  }
328
329  /**
330   * This method manages the connection with the other replication servers.
331   * It periodically checks that this replication server is indeed connected
332   * to all the other replication servers and if not attempts to
333   * make the connection.
334   */
335  void runConnect()
336  {
337    synchronized (connectThreadLock)
338    {
339      final UnreachableReplicationServers unreachableRSes = getUnreachableReplicationServers();
340      while (!shutdown.get())
341      {
342        HostPort localAddress = HostPort.localAddress(getReplicationPort());
343        for (ReplicationServerDomain domain : getReplicationServerDomains())
344        {
345          /*
346           * If there are N RSs configured then we will usually be connected to
347           * N-1 of them, since one of them is usually this RS. However, we
348           * cannot guarantee this since the configuration may not contain this
349           * RS.
350           */
351          final Set<HostPort> connectedRSAddresses = getConnectedRSAddresses(domain);
352          for (HostPort rsAddress : getConfiguredRSAddresses())
353          {
354            if (connectedRSAddresses.contains(rsAddress)
355                || unreachableRSes.isUnreachable(rsAddress)
356                // FIXME: this will need changing if we ever support listening on specific addresses.
357                || rsAddress.equals(localAddress))
358            {
359              continue;
360            }
361
362            try
363            {
364              connect(rsAddress, domain.getBaseDN());
365            }
366            catch (SocketTimeoutException ignored)
367            {
368              unreachableRSes.addServer(rsAddress);
369            }
370          }
371        }
372
373        // Notify any threads waiting with domain tickets after each iteration.
374        synchronized (domainTicketLock)
375        {
376          domainTicket++;
377          domainTicketLock.notifyAll();
378        }
379
380        // Retry each second.
381        final int randomizer = (int) (Math.random() * 100);
382        try
383        {
384          // Releases lock, allows threads to get domain ticket.
385          connectThreadLock.wait(1000 + randomizer);
386        }
387        catch (InterruptedException e)
388        {
389          // Signaled to shutdown.
390          return;
391        }
392      }
393    }
394  }
395
396  private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain)
397  {
398    Set<HostPort> results = new HashSet<>();
399    for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
400    {
401      results.add(HostPort.valueOf(rsHandler.getServerAddressURL()));
402    }
403    return results;
404  }
405
406  /**
407   * Establish a connection to the server with the address and port.
408   *
409   * @param remoteServerAddress
410   *          The address and port for the server
411   * @param baseDN
412   *          The baseDN of the connection
413   */
414  private void connect(HostPort remoteServerAddress, DN baseDN) throws SocketTimeoutException
415  {
416    boolean sslEncryption = replSessionSecurity.isSslEncryption();
417
418    logger.trace("RS " + getMonitorInstanceName() + " connects to " + remoteServerAddress);
419
420    Socket socket = new Socket();
421    Session session = null;
422    try
423    {
424      socket.setTcpNoDelay(true);
425      if (config.getSourceAddress() != null)
426      {
427        InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0);
428        socket.bind(local);
429      }
430      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
431      socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS);
432      session = replSessionSecurity.createClientSession(socket, timeoutMS);
433
434      ReplicationServerHandler rsHandler = new ReplicationServerHandler(
435          session, config.getQueueSize(), this, config.getWindowSize());
436      rsHandler.connect(baseDN, sslEncryption);
437    }
438    catch (SocketTimeoutException te)
439    {
440      logger.traceException(te);
441      close(socket);
442      throw te;
443    }
444    catch (Exception e)
445    {
446      logger.traceException(e);
447      close(session);
448      close(socket);
449    }
450  }
451
452  /** Initialization function for the replicationServer. */
453  private void initialize()
454  {
455    shutdown.set(false);
456
457    try
458    {
459      this.changelogDB.initializeDB();
460
461      setServerURL();
462
463      startConnectionThreads();
464
465      if (logger.isTraceEnabled())
466      {
467        logger.trace("RS " + getMonitorInstanceName() + " successfully initialized");
468      }
469    } catch (UnknownHostException e)
470    {
471      logger.error(ERR_UNKNOWN_HOSTNAME);
472    } catch (IOException e)
473    {
474      logger.error(ERR_COULD_NOT_BIND_CHANGELOG, getReplicationPort(), e.getMessage());
475    }
476  }
477
478  private void startConnectionThreads() throws IOException
479  {
480    shutdown.set(false);
481    synchronized (threadsLock)
482    {
483      listenSocket = new ServerSocket();
484      listenSocket.bind(new InetSocketAddress(getReplicationPort()));
485
486      // creates working threads: we must first connect, then start to listen.
487      if (logger.isTraceEnabled())
488      {
489        logger.trace("RS " + getMonitorInstanceName() + " creates connect thread");
490      }
491      connectThread = new ReplicationServerConnectThread(this);
492      connectThread.start();
493
494      if (logger.isTraceEnabled())
495      {
496        logger.trace("RS " + getMonitorInstanceName() + " creates listen thread");
497      }
498
499      listenThread = new ReplicationServerListenThread(this);
500      listenThread.start();
501    }
502  }
503  /**
504   * Enable the external changelog if it is not already enabled.
505   * <p>
506   * The external changelog is provided by the changelog backend.
507   *
508   * @throws ConfigException
509   *            If an error occurs.
510   */
511  private void enableExternalChangeLog() throws ConfigException
512  {
513    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
514    {
515      // Backend has already been created and initialized
516      // This can occurs in tests
517      return;
518    }
519    try
520    {
521      changelogBackend = new ChangelogBackend(this, domainPredicate);
522      changelogBackend.openBackend();
523      try
524      {
525        DirectoryServer.registerBackend(changelogBackend);
526      }
527      catch (Exception e)
528      {
529        logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
530            getExceptionMessage(e)));
531      }
532
533      registerVirtualAttributeRules();
534    }
535    catch (Exception e)
536    {
537      // TODO : I18N with correct message + what kind of exception should we really throw ?
538      // (Directory/Initialization/Config Exception)
539      throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e);
540    }
541  }
542
543  private void shutdownExternalChangelog()
544  {
545    if (changelogBackend != null)
546    {
547      DirectoryServer.deregisterBackend(changelogBackend);
548      changelogBackend.finalizeBackend();
549      changelogBackend = null;
550    }
551    deregisterVirtualAttributeRules();
552  }
553
554  private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
555  {
556    final List<VirtualAttributeRule> rules = new ArrayList<>();
557    rules.add(buildVirtualAttributeRule("lastexternalchangelogcookie", new LastCookieVirtualProvider(this)));
558    rules.add(buildVirtualAttributeRule("firstchangenumber", new FirstChangeNumberVirtualAttributeProvider(this)));
559    rules.add(buildVirtualAttributeRule("lastchangenumber", new LastChangeNumberVirtualAttributeProvider(this)));
560    rules.add(buildVirtualAttributeRule("changelog", new ChangelogBaseDNVirtualAttributeProvider()));
561    return rules;
562  }
563
564  private void registerVirtualAttributeRules() throws DirectoryException {
565    for (VirtualAttributeRule rule : getVirtualAttributesRules())
566    {
567      DirectoryServer.registerVirtualAttribute(rule);
568    }
569  }
570
571  private void deregisterVirtualAttributeRules()
572  {
573    try
574    {
575      for (VirtualAttributeRule rule : getVirtualAttributesRules())
576      {
577        DirectoryServer.deregisterVirtualAttribute(rule);
578      }
579    }
580    catch (DirectoryException e)
581    {
582      // Should never happen
583      throw new RuntimeException(e);
584    }
585  }
586
587  private static VirtualAttributeRule buildVirtualAttributeRule(String attrName,
588      VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> provider)
589      throws DirectoryException
590  {
591    ConflictBehavior conflictBehavior = ConflictBehavior.VIRTUAL_OVERRIDES_REAL;
592
593    try
594    {
595      Set<DN> baseDNs = Collections.singleton(DN.valueOf(""));
596      Set<DN> groupDNs = Collections.emptySet();
597      Set<SearchFilter> filters = Collections.singleton(SearchFilter.objectClassPresent());
598
599      // To avoid the configuration in cn=config just
600      // create a rule and register it into the DirectoryServer
601      provider.initializeVirtualAttributeProvider(null);
602
603      AttributeType attributeType = DirectoryServer.getSchema().getAttributeType(attrName);
604      return new VirtualAttributeRule(attributeType, provider,
605            baseDNs, SearchScope.BASE_OBJECT,
606            groupDNs, filters, conflictBehavior);
607    }
608    catch (Exception e)
609    {
610      LocalizableMessage message =
611        NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e);
612      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e);
613    }
614  }
615
616  /**
617   * Get the ReplicationServerDomain associated to the base DN given in
618   * parameter.
619   *
620   * @param baseDN
621   *          The base DN for which the ReplicationServerDomain must be
622   *          returned.
623   * @return The ReplicationServerDomain associated to the base DN given in
624   *         parameter.
625   */
626  public ReplicationServerDomain getReplicationServerDomain(DN baseDN)
627  {
628    return getReplicationServerDomain(baseDN, false);
629  }
630
631  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
632  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
633  {
634    Set<DN> domains = null;
635    synchronized (baseDNs)
636    {
637      domains = new HashSet<>(baseDNs.keySet());
638    }
639    domains.removeAll(excludedBaseDNs);
640    return domains;
641  }
642
643  /**
644   * Validate that provided cookie is coherent with this replication server,
645   * when ignoring the provided set of DNs.
646   * <p>
647   * The cookie is coherent if and only if it exactly has the set of DNs corresponding to
648   * the replication domains, and the states in the cookie are not older than oldest states
649   * in the server.
650   *
651   * @param cookie
652   *            The multi domain state (cookie) to validate.
653   * @param ignoredBaseDNs
654   *            The set of DNs to ignore when validating
655   * @throws DirectoryException
656   *            If the cookie is not valid
657   */
658  public void validateCookie(MultiDomainServerState cookie, Set<DN> ignoredBaseDNs) throws DirectoryException
659  {
660    final Set<DN> activeDomains = getDNsOfActiveDomainsInServer(ignoredBaseDNs);
661    final Set<DN> cookieDomains = getDNsOfCookie(cookie);
662
663    checkNoUnknownDomainIsProvidedInCookie(cookie, activeDomains, cookieDomains);
664    checkCookieIsNotOutdated(cookie, activeDomains);
665  }
666
667  private Set<DN> getDNsOfCookie(MultiDomainServerState cookie)
668  {
669    final Set<DN> cookieDomains = new HashSet<>();
670    for (final DN dn : cookie)
671    {
672      cookieDomains.add(dn);
673    }
674    return cookieDomains;
675  }
676
677  private Set<DN> getDNsOfActiveDomainsInServer(final Set<DN> ignoredBaseDNs) throws DirectoryException
678  {
679    final Set<DN> activeDomains = new HashSet<>();
680    for (final DN dn : getDomainDNs(ignoredBaseDNs))
681    {
682      final ServerState lastServerState = getReplicationServerDomain(dn).getLatestServerState();
683      if (!lastServerState.isEmpty())
684      {
685         activeDomains.add(dn);
686      }
687    }
688    return activeDomains;
689  }
690
691  private void checkNoUnknownDomainIsProvidedInCookie(final MultiDomainServerState cookie, final Set<DN> activeDomains,
692      final Set<DN> cookieDomains) throws DirectoryException
693  {
694    if (!activeDomains.containsAll(cookieDomains))
695    {
696      final Set<DN> unknownCookieDomains = new HashSet<>(cookieDomains);
697      unknownCookieDomains.removeAll(activeDomains);
698      final StringBuilder currentStartingCookie = new StringBuilder();
699      for (DN domainDN : activeDomains) {
700        currentStartingCookie.append(domainDN).append(":").append(cookie.getServerState(domainDN)).append(";");
701      }
702      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
703          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
704              unknownCookieDomains.toString(), currentStartingCookie));
705    }
706  }
707
708  private void checkCookieIsNotOutdated(final MultiDomainServerState cookie, final Set<DN> activeDomains)
709      throws DirectoryException
710  {
711    for (DN dn : activeDomains)
712    {
713      if (isCookieOutdatedForDomain(cookie, dn))
714      {
715        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
716            ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(dn.toString()));
717      }
718    }
719  }
720
721  /** Check that provided cookie is not outdated compared to the oldest state of a domain. */
722  private boolean isCookieOutdatedForDomain(MultiDomainServerState cookie, DN domainDN)
723  {
724    final ServerState providedState = cookie.getServerState(domainDN);
725    if (providedState == null)
726    {
727      // missing domains do not invalidate a cookie.
728      // results will include all the changes of the missing domains
729      return false;
730    }
731    final ServerState domainOldestState = getReplicationServerDomain(domainDN).getOldestState();
732    for (final CSN oldestCsn : domainOldestState)
733    {
734      final CSN providedCsn = providedState.getCSN(oldestCsn.getServerId());
735      if (providedCsn != null && providedCsn.isOlderThan(oldestCsn))
736      {
737        return true;
738      }
739    }
740    return false;
741  }
742
743  /**
744   * Get the ReplicationServerDomain associated to the base DN given in
745   * parameter.
746   *
747   * @param baseDN The base DN for which the ReplicationServerDomain must be
748   * returned.
749   * @param create Specifies whether to create the ReplicationServerDomain if
750   *        it does not already exist.
751   * @return The ReplicationServerDomain associated to the base DN given in
752   *         parameter.
753   */
754  public ReplicationServerDomain getReplicationServerDomain(DN baseDN,
755      boolean create)
756  {
757    synchronized (baseDNs)
758    {
759      ReplicationServerDomain domain = baseDNs.get(baseDN);
760      if (domain == null && create) {
761        domain = new ReplicationServerDomain(baseDN, this);
762        baseDNs.put(baseDN, domain);
763      }
764      return domain;
765    }
766  }
767
768  /** Waits for connections to this ReplicationServer. */
769  void waitConnections()
770  {
771    // Acquire a domain ticket and wait for a complete cycle of the connect
772    // thread.
773    final long myDomainTicket;
774    synchronized (connectThreadLock)
775    {
776      // Connect thread must be waiting.
777      synchronized (domainTicketLock)
778      {
779        // Determine the ticket which will be used in the next connect thread
780        // iteration.
781        myDomainTicket = domainTicket + 1;
782      }
783
784      // Wake up connect thread.
785      connectThreadLock.notify();
786    }
787
788    // Wait until the connect thread has processed next connect phase.
789    synchronized (domainTicketLock)
790    {
791      while (myDomainTicket > domainTicket && !shutdown.get())
792      {
793        try
794        {
795          // Wait with timeout so that we detect shutdown.
796          domainTicketLock.wait(500);
797        }
798        catch (InterruptedException e)
799        {
800          // Can't do anything with this.
801          Thread.currentThread().interrupt();
802        }
803      }
804    }
805  }
806
807  /** Shutdown the Replication Server service and all its connections. */
808  public void shutdown()
809  {
810    deregisterDiskMonitor(config);
811
812    shutdownDomainsAndConnectionThreads();
813    shutdownExternalChangelog();
814
815    try
816    {
817      this.changelogDB.shutdownDB();
818    }
819    catch (ChangelogException ignored)
820    {
821      logger.traceException(ignored);
822    }
823  }
824
825  private void shutdownDomainsAndConnectionThreads()
826  {
827    if (!shutdown.compareAndSet(false, true))
828    {
829      return;
830    }
831
832    localPorts.remove(getReplicationPort());
833    synchronized (threadsLock)
834    {
835      // shutdown the connect thread
836      if (connectThread != null)
837      {
838        connectThread.interrupt();
839      }
840
841      // shutdown the listener thread
842      close(listenSocket);
843      if (listenThread != null)
844      {
845        listenThread.interrupt();
846      }
847      listenThread = null;
848
849      // shutdown all the replication domains
850      for (ReplicationServerDomain domain : getReplicationServerDomains())
851      {
852        domain.shutdown();
853      }
854    }
855    // Remove this instance from the global instance list
856    allInstances.remove(this);
857  }
858
859  /**
860   * Retrieves the time after which changes must be deleted from the
861   * persistent storage (in milliseconds).
862   *
863   * @return  The time after which changes must be deleted from the
864   *          persistent storage (in milliseconds).
865   */
866  public long getPurgeDelay()
867  {
868    return this.config.getReplicationPurgeDelay() * 1000;
869  }
870
871  /**
872   * Check if the provided configuration is acceptable for add.
873   *
874   * @param configuration The configuration to check.
875   * @param unacceptableReasons When the configuration is not acceptable, this
876   *                            table is use to return the reasons why this
877   *                            configuration is not acceptable.
878   *
879   * @return true if the configuration is acceptable, false other wise.
880   */
881  public static boolean isConfigurationAcceptable(
882      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
883  {
884    int port = configuration.getReplicationPort();
885
886    try
887    {
888      ServerSocket tmpSocket = new ServerSocket();
889      tmpSocket.bind(new InetSocketAddress(port));
890      tmpSocket.close();
891      return true;
892    }
893    catch (Exception e)
894    {
895      LocalizableMessage message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage());
896      unacceptableReasons.add(message);
897      return false;
898    }
899  }
900
901  @Override
902  public ConfigChangeResult applyConfigurationChange(
903      ReplicationServerCfg configuration)
904  {
905    final ConfigChangeResult ccr = new ConfigChangeResult();
906
907    // Some of those properties change don't need specific code.
908    // They will be applied for next connections. Some others have immediate effect
909    final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses();
910
911    final ReplicationServerCfg oldConfig = this.config;
912    this.config = configuration;
913
914    disconnectRemovedReplicationServers(oldRSAddresses);
915
916    final long newPurgeDelay = config.getReplicationPurgeDelay();
917    if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
918    {
919      this.changelogDB.setPurgeDelay(getPurgeDelay());
920    }
921    final boolean computeCN = config.isComputeChangeNumber();
922    if (computeCN != oldConfig.isComputeChangeNumber())
923    {
924      try
925      {
926        this.changelogDB.setComputeChangeNumber(computeCN);
927      }
928      catch (ChangelogException e)
929      {
930        logger.traceException(e);
931        ccr.setResultCode(ResultCode.OPERATIONS_ERROR);
932      }
933    }
934
935    cryptoSuite.newParameters(config.getCipherTransformation(), config.getCipherKeyLength(),
936        config.isConfidentialityEnabled());
937
938    // changing the listen port requires to stop the listen thread and restart it.
939    synchronized (threadsLock)
940    {
941      if (getReplicationPort() != oldConfig.getReplicationPort() && listenThread != null)
942      {
943        stopListen = true;
944        try
945        {
946          close(listenSocket);
947          listenThread.join();
948          stopListen = false;
949
950          setServerURL();
951          listenSocket = new ServerSocket();
952          listenSocket.bind(new InetSocketAddress(getReplicationPort()));
953
954          listenThread = new ReplicationServerListenThread(this);
955          listenThread.start();
956        }
957        catch (IOException e)
958        {
959          logger.traceException(e);
960          logger.error(ERR_COULD_NOT_CLOSE_THE_SOCKET, e);
961        }
962        catch (InterruptedException e)
963        {
964          logger.traceException(e);
965          logger.error(ERR_COULD_NOT_STOP_LISTEN_THREAD, e);
966        }
967      }
968    }
969
970    // Update period value for monitoring publishers
971    if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
972    {
973      for (ReplicationServerDomain domain : getReplicationServerDomains())
974      {
975        domain.updateMonitoringPeriod(config.getMonitoringPeriod());
976      }
977    }
978
979    // Changed the group id ?
980    if (config.getGroupId() != oldConfig.getGroupId())
981    {
982      // Have a new group id: Disconnect every servers.
983      for (ReplicationServerDomain domain : getReplicationServerDomains())
984      {
985        domain.stopAllServers(true);
986      }
987    }
988
989    // Set a potential new weight
990    if (oldConfig.getWeight() != config.getWeight())
991    {
992      // Broadcast the new weight the the whole topology. This will make some
993      // DSs reconnect (if needed) to other RSs according to the new weight of
994      // this RS.
995      broadcastConfigChange();
996    }
997
998    final String newDir = config.getReplicationDBDirectory();
999    if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory()))
1000    {
1001      ccr.setAdminActionRequired(true);
1002    }
1003    else if (oldConfig.getDiskFullThreshold() != configuration.getDiskFullThreshold()
1004            || oldConfig.getDiskLowThreshold() != configuration.getDiskLowThreshold())
1005    {
1006      deregisterDiskMonitor(oldConfig);
1007      registerDiskMonitor(config);
1008    }
1009    return ccr;
1010  }
1011
1012  /**
1013   * Try and set a sensible URL for this replication server. Since we are
1014   * listening on all addresses there are a couple of potential candidates:
1015   * <ol>
1016   * <li>a matching server URL in the replication server's configuration,</li>
1017   * <li>hostname local address.</li>
1018   * </ol>
1019   */
1020  private void setServerURL() throws UnknownHostException
1021  {
1022    /*
1023     * First try the set of configured replication servers to see if one of them
1024     * is this replication server (this should always be the case).
1025     */
1026    for (HostPort rsAddress : getConfiguredRSAddresses())
1027    {
1028      /* No need validate the string format because the admin framework has already done it. */
1029      if (rsAddress.getPort() == getReplicationPort()
1030          && rsAddress.isLocalAddress())
1031      {
1032        serverURL = rsAddress.toString();
1033        return;
1034      }
1035    }
1036
1037    // Fall-back to the machine hostname.
1038    final String host = InetAddress.getLocalHost().getHostName();
1039    // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
1040    serverURL = new HostPort(host, getReplicationPort()).toString();
1041  }
1042
1043  /**
1044   * Broadcast a configuration change that just happened to the whole topology
1045   * by sending a TopologyMsg to every entity in the topology.
1046   */
1047  private void broadcastConfigChange()
1048  {
1049    for (ReplicationServerDomain domain : getReplicationServerDomains())
1050    {
1051      domain.sendTopoInfoToAll();
1052    }
1053  }
1054
1055  @Override
1056  public boolean isConfigurationChangeAcceptable(
1057      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
1058  {
1059    return true;
1060  }
1061
1062  /**
1063   * Get the value of generationId for the replication replicationServerDomain
1064   * associated with the provided baseDN.
1065   *
1066   * @param baseDN The baseDN of the replicationServerDomain.
1067   * @return The value of the generationID.
1068   */
1069  public long getGenerationId(DN baseDN)
1070  {
1071    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
1072    return rsd != null ? rsd.getGenerationId() : -1;
1073  }
1074
1075  /**
1076   * Get the serverId for this replication server.
1077   *
1078   * @return The value of the serverId.
1079   */
1080  public int getServerId()
1081  {
1082    return this.config.getReplicationServerId();
1083  }
1084
1085  /**
1086   * Do what needed when the config object related to this replication server
1087   * is deleted from the server configuration.
1088   */
1089  public void remove()
1090  {
1091    if (logger.isTraceEnabled())
1092    {
1093      logger.trace("RS " + getMonitorInstanceName() + " starts removing");
1094    }
1095    shutdown();
1096  }
1097
1098  /**
1099   * Returns an iterator on the list of replicationServerDomain.
1100   * Returns null if none.
1101   * @return the iterator.
1102   */
1103  public Iterator<ReplicationServerDomain> getDomainIterator()
1104  {
1105    return getReplicationServerDomains().iterator();
1106  }
1107
1108  /**
1109   * Get the assured mode timeout.
1110   * <p>
1111   * It is the Timeout (in milliseconds) when waiting for acknowledgments.
1112   *
1113   * @return The assured mode timeout.
1114   */
1115  public long getAssuredTimeout()
1116  {
1117    return this.config.getAssuredTimeout();
1118  }
1119
1120  /**
1121   * Get The replication server group id.
1122   * @return The replication server group id.
1123   */
1124  public byte getGroupId()
1125  {
1126    return (byte) this.config.getGroupId();
1127  }
1128
1129  /**
1130   * Get the degraded status threshold value for status analyzer.
1131   * <p>
1132   * The degraded status threshold is the number of pending changes for a DS,
1133   * considered as threshold value to put the DS in DEGRADED_STATUS. If value is
1134   * 0, status analyzer is disabled.
1135   *
1136   * @return The degraded status threshold value for status analyzer.
1137   */
1138  public int getDegradedStatusThreshold()
1139  {
1140    return this.config.getDegradedStatusThreshold();
1141  }
1142
1143  /**
1144   * Get the monitoring publisher period value.
1145   * <p>
1146   * It is the number of milliseconds to wait before sending new monitoring
1147   * messages. If value is 0, monitoring publisher is disabled.
1148   *
1149   * @return the monitoring publisher period value.
1150   */
1151  public long getMonitoringPublisherPeriod()
1152  {
1153    return this.config.getMonitoringPeriod();
1154  }
1155
1156  /**
1157   * Compute the list of replication servers that are not any more connected to
1158   * this Replication Server and stop the corresponding handlers.
1159   *
1160   * @param oldRSAddresses
1161   *          the old list of configured replication servers addresses.
1162   */
1163  private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses)
1164  {
1165    final Collection<HostPort> serversToDisconnect = new ArrayList<>();
1166
1167    final Set<HostPort> newRSAddresses = getConfiguredRSAddresses();
1168    for (HostPort oldRSAddress : oldRSAddresses)
1169    {
1170      if (!newRSAddresses.contains(oldRSAddress))
1171      {
1172        serversToDisconnect.add(oldRSAddress);
1173      }
1174    }
1175
1176    if (serversToDisconnect.isEmpty())
1177    {
1178      return;
1179    }
1180
1181    for (ReplicationServerDomain domain: getReplicationServerDomains())
1182    {
1183      domain.stopReplicationServers(serversToDisconnect);
1184    }
1185  }
1186
1187  /**
1188   * Retrieves a printable name for this Replication Server Instance.
1189   *
1190   * @return A printable name for this Replication Server Instance.
1191   */
1192  public String getMonitorInstanceName()
1193  {
1194    return "Replication Server " + getReplicationPort() + " " + getServerId();
1195  }
1196
1197  /**
1198   * Retrieves the port used by this ReplicationServer.
1199   *
1200   * @return The port used by this ReplicationServer.
1201   */
1202  public int getReplicationPort()
1203  {
1204    return config.getReplicationPort();
1205  }
1206
1207  /**
1208   * Getter on the server URL.
1209   * @return the server URL.
1210   */
1211  public String getServerURL()
1212  {
1213    return this.serverURL;
1214  }
1215
1216  /**
1217   * WARNING : only use this methods for tests purpose.
1218   *
1219   * Add the Replication Server given as a parameter in the list
1220   * of local replication servers.
1221   *
1222   * @param server The server to be added.
1223   */
1224  public static void onlyForTestsAddlocalReplicationServer(String server)
1225  {
1226    localPorts.add(HostPort.valueOf(server).getPort());
1227  }
1228
1229  /**
1230   * WARNING : only use this methods for tests purpose.
1231   *
1232   * Clear the list of local Replication Servers
1233   */
1234  public static void onlyForTestsClearLocalReplicationServerList()
1235  {
1236    localPorts.clear();
1237  }
1238
1239  /**
1240   * Returns {@code true} if the provided port is one of the ports that this
1241   * replication server is listening on.
1242   *
1243   * @param port
1244   *          The port to be checked.
1245   * @return {@code true} if the provided port is one of the ports that this
1246   *         replication server is listening on.
1247   */
1248  public static boolean isLocalReplicationServerPort(int port)
1249  {
1250    return localPorts.contains(port);
1251  }
1252
1253  /**
1254   * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external
1255   * changelog.
1256   *
1257   * @return the handler.
1258   */
1259  ChangeNumberIndexDB getChangeNumberIndexDB()
1260  {
1261    return this.changelogDB.getChangeNumberIndexDB();
1262  }
1263
1264  /**
1265   * Returns the oldest change number in the change number index DB.
1266   *
1267   * @return the oldest change number in the change number index DB
1268   * @throws DirectoryException
1269   *           When a problem happens
1270   */
1271  public long getOldestChangeNumber() throws DirectoryException
1272  {
1273    try
1274    {
1275      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
1276      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
1277      if (oldestRecord != null)
1278      {
1279        return oldestRecord.getChangeNumber();
1280      }
1281      // database is empty
1282      return cnIndexDB.getLastGeneratedChangeNumber();
1283    }
1284    catch (ChangelogException e)
1285    {
1286      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
1287    }
1288  }
1289
1290  /**
1291   * Returns the newest change number in the change number index DB.
1292   *
1293   * @return the newest change number in the change number index DB
1294   * @throws DirectoryException
1295   *           When a problem happens
1296   */
1297  public long getNewestChangeNumber() throws DirectoryException
1298  {
1299    try
1300    {
1301      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
1302      final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord();
1303      if (newestRecord != null)
1304      {
1305        return newestRecord.getChangeNumber();
1306      }
1307      // database is empty
1308      return cnIndexDB.getLastGeneratedChangeNumber();
1309    }
1310    catch (ChangelogException e)
1311    {
1312      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
1313    }
1314  }
1315
1316  /**
1317   * Returns the newest cookie value.
1318   *
1319   * @param excludedBaseDNs
1320   *          The set of baseDNs excluded from ECL.
1321   * @return the newest cookie value.
1322   */
1323  public MultiDomainServerState getNewestECLCookie(Set<DN> excludedBaseDNs)
1324  {
1325    // Initialize start state for all running domains with empty state
1326    final MultiDomainServerState result = new MultiDomainServerState();
1327    for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
1328    {
1329      if (!excludedBaseDNs.contains(rsDomain.getBaseDN()))
1330      {
1331        final ServerState latestDBServerState = rsDomain.getLatestServerState();
1332        if (!latestDBServerState.isEmpty())
1333        {
1334          result.replace(rsDomain.getBaseDN(), latestDBServerState);
1335        }
1336      }
1337    }
1338    return result;
1339  }
1340
1341  @Override
1342  public void diskLowThresholdReached(final File directory, final long thresholdInBytes)
1343  {
1344    logger.warn(WARN_DISK_LOW_CHANGELOG_DIRECTORY);
1345  }
1346
1347  @Override
1348  public void diskFullThresholdReached(final File directory, final long thresholdInBytes)
1349  {
1350    logger.error(ERR_DISK_FULL_CHANGELOG_DIRECTORY);
1351    shutdownDomainsAndConnectionThreads();
1352  }
1353
1354  @Override
1355  public void diskSpaceRestored(final File directory, final long lowThresholdInBytes,
1356                                final long fullThresholdInBytes)
1357  {
1358    try
1359    {
1360      startConnectionThreads();
1361      localPorts.add(getReplicationPort());
1362      allInstances.add(this);
1363      logger.warn(WARN_REPLICATION_SERVER_RESTARTED);
1364    }
1365    catch (IOException e)
1366    {
1367      logger.error(ERR_COULD_NOT_RESTART_CHANGELOG, getReplicationPort(), e.getMessage());
1368    }
1369  }
1370
1371  /**
1372   * Gets the weight affected to the replication server.
1373   * <p>
1374   * Each replication server of the topology has a weight. When combined
1375   * together, the weights of the replication servers of a same group can be
1376   * translated to a percentage that determines the quantity of directory
1377   * servers of the topology that should be connected to a replication server.
1378   * <p>
1379   * For instance imagine a topology with 3 replication servers (with the same
1380   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
1381   * RS1 should have 25% of the directory servers connected in the topology, RS2
1382   * 25%, and RS3 50%. This may be useful if the replication servers of the
1383   * topology have a different power and one wants to spread the load between
1384   * the replication servers according to their power.
1385   *
1386   * @return the weight
1387   */
1388  public int getWeight()
1389  {
1390    return this.config.getWeight();
1391  }
1392
1393  private Collection<ReplicationServerDomain> getReplicationServerDomains()
1394  {
1395    synchronized (baseDNs)
1396    {
1397      return new ArrayList<>(baseDNs.values());
1398    }
1399  }
1400
1401  /**
1402   * Returns the changelogDB.
1403   *
1404   * @return the changelogDB.
1405   */
1406  public ChangelogDB getChangelogDB()
1407  {
1408    return this.changelogDB;
1409  }
1410
1411  /**
1412   * Returns the synchronization object for shutdown of combined DS/RS instances.
1413   *
1414   * @return the synchronization object for shutdown of combined DS/RS instances.
1415   */
1416  DSRSShutdownSync getDSRSShutdownSync()
1417  {
1418    return dsrsShutdownSync;
1419  }
1420
1421  /**
1422   * Returns whether change-log indexing is enabled for this RS.
1423   * @return true if change-log indexing is enabled for this RS.
1424   */
1425  public boolean isChangeNumberEnabled()
1426  {
1427    return config.isComputeChangeNumber();
1428  }
1429
1430  /**
1431   * Returns whether the external change-log contains data from at least a domain.
1432   * @return whether the external change-log contains data from at least a domain
1433   */
1434  public boolean isECLEnabled()
1435  {
1436    return MultimasterReplication.isECLEnabled();
1437  }
1438
1439  /**
1440   * Return whether change-log records should be encrypted.
1441   * @return trus if change-log records should be encrypted
1442   */
1443  public boolean isEncrypted()
1444  {
1445    return config.isConfidentialityEnabled();
1446  }
1447
1448  @Override
1449  public String toString()
1450  {
1451    return "RS(" + getServerId() + ") on " + serverURL + ", domains=" + baseDNs.keySet();
1452  }
1453}