001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2017 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import static org.opends.messages.ConfigMessages.*; 020import static org.opends.messages.ReplicationMessages.*; 021import static org.opends.server.replication.plugin.MultimasterReplication.getUnreachableReplicationServers; 022import static org.opends.server.util.StaticUtils.*; 023 024import java.io.File; 025import java.io.IOException; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.ServerSocket; 029import java.net.Socket; 030import java.net.SocketTimeoutException; 031import java.net.UnknownHostException; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.Iterator; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.CopyOnWriteArrayList; 042import java.util.concurrent.CopyOnWriteArraySet; 043import java.util.concurrent.atomic.AtomicBoolean; 044 045import org.forgerock.i18n.LocalizableMessage; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigChangeResult; 048import org.forgerock.opendj.config.server.ConfigException; 049import org.forgerock.opendj.config.server.ConfigurationChangeListener; 050import org.forgerock.opendj.ldap.DN; 051import org.forgerock.opendj.ldap.ResultCode; 052import org.forgerock.opendj.ldap.SearchScope; 053import org.forgerock.opendj.ldap.schema.AttributeType; 054import org.forgerock.opendj.server.config.meta.VirtualAttributeCfgDefn.ConflictBehavior; 055import org.forgerock.opendj.server.config.server.ReplicationServerCfg; 056import org.forgerock.opendj.server.config.server.UserDefinedVirtualAttributeCfg; 057import org.opends.server.api.DiskSpaceMonitorHandler; 058import org.opends.server.api.VirtualAttributeProvider; 059import org.opends.server.backends.ChangelogBackend; 060import org.opends.server.core.DirectoryServer; 061import org.opends.server.crypto.CryptoSuite; 062import org.opends.server.extensions.DiskSpaceMonitor; 063import org.opends.server.replication.common.CSN; 064import org.opends.server.replication.common.MultiDomainServerState; 065import org.opends.server.replication.common.ServerState; 066import org.opends.server.replication.plugin.MultimasterReplication; 067import org.opends.server.replication.plugin.MultimasterReplication.UnreachableReplicationServers; 068import org.opends.server.replication.protocol.ReplServerStartMsg; 069import org.opends.server.replication.protocol.ReplSessionSecurity; 070import org.opends.server.replication.protocol.ReplicationMsg; 071import org.opends.server.replication.protocol.ServerStartMsg; 072import org.opends.server.replication.protocol.Session; 073import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 074import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 075import org.opends.server.replication.server.changelog.api.ChangelogDB; 076import org.opends.server.replication.server.changelog.api.ChangelogException; 077import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; 078import org.opends.server.replication.server.changelog.file.FileChangelogDB; 079import org.opends.server.replication.service.DSRSShutdownSync; 080import org.opends.server.types.DirectoryException; 081import org.opends.server.types.HostPort; 082import org.opends.server.types.SearchFilter; 083import org.opends.server.types.VirtualAttributeRule; 084 085import net.jcip.annotations.GuardedBy; 086 087/** 088 * ReplicationServer Listener. This singleton is the main object of the 089 * replication server. It waits for the incoming connections and create listener 090 * and publisher objects for connection with LDAP servers and with replication 091 * servers It is responsible for creating the replication server 092 * replicationServerDomain and managing it 093 */ 094public class ReplicationServer 095 implements ConfigurationChangeListener<ReplicationServerCfg>, DiskSpaceMonitorHandler 096{ 097 private String serverURL; 098 099 private ServerSocket listenSocket; 100 @GuardedBy("threadsLock") 101 private Thread listenThread; 102 @GuardedBy("threadsLock") 103 private Thread connectThread; 104 private final Object threadsLock = new Object(); 105 106 /** The current configuration of this replication server. */ 107 private ReplicationServerCfg config; 108 private final DSRSShutdownSync dsrsShutdownSync; 109 110 /** This table is used to store the list of dn for which we are currently handling servers. */ 111 private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<>(); 112 113 /** The database storing the changes. */ 114 private final ChangelogDB changelogDB; 115 116 /** The backend that allow to search the changes (external changelog). */ 117 private ChangelogBackend changelogBackend; 118 119 private final AtomicBoolean shutdown = new AtomicBoolean(); 120 private volatile boolean stopListen; 121 private final ReplSessionSecurity replSessionSecurity; 122 123 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 124 125 /** To know whether a domain is enabled for the external changelog. */ 126 private final ECLEnabledDomainPredicate domainPredicate; 127 128 /** 129 * This is required for unit testing, so that we can keep track of all the 130 * replication servers which are running in the VM. 131 */ 132 private static final Set<Integer> localPorts = new CopyOnWriteArraySet<>(); 133 134 /** Monitors for synchronizing domain creation with the connect thread. */ 135 private final Object domainTicketLock = new Object(); 136 private final Object connectThreadLock = new Object(); 137 private long domainTicket; 138 139 /** 140 * Holds the list of all replication servers instantiated in this VM. 141 * This allows to perform clean up of the RS databases in unit tests. 142 */ 143 private static final List<ReplicationServer> allInstances = new CopyOnWriteArrayList<>(); 144 145 private final CryptoSuite cryptoSuite; 146 147 private final DiskSpaceMonitor diskMonitor; 148 149 /** 150 * Creates a new Replication server using the provided configuration entry. 151 * 152 * @param cfg The configuration of this replication server. 153 * @throws ConfigException When Configuration is invalid. 154 */ 155 public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException 156 { 157 this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()); 158 } 159 160 /** 161 * Creates a new Replication server using the provided configuration entry and shutdown 162 * synchronization object. 163 * 164 * @param cfg The configuration of this replication server. 165 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 166 * @throws ConfigException When Configuration is invalid. 167 */ 168 public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException 169 { 170 this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate()); 171 } 172 173 /** 174 * Creates a new Replication server using the provided configuration entry, shutdown 175 * synchronization object and domain predicate. 176 * 177 * @param cfg The configuration of this replication server. 178 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 179 * @param predicate Indicates whether a domain is enabled for the external changelog. 180 * @throws ConfigException When Configuration is invalid. 181 */ 182 public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync, 183 final ECLEnabledDomainPredicate predicate) throws ConfigException 184 { 185 this.config = cfg; 186 this.dsrsShutdownSync = dsrsShutdownSync; 187 this.domainPredicate = predicate; 188 189 enableExternalChangeLog(); 190 cryptoSuite = DirectoryServer.getInstance().getServerContext().getCryptoManager(). 191 newCryptoSuite(cfg.getCipherTransformation(), cfg.getCipherKeyLength(), cfg.isConfidentialityEnabled()); 192 193 this.changelogDB = new FileChangelogDB(this, config.getReplicationDBDirectory(), cryptoSuite); 194 195 diskMonitor = DirectoryServer.getInstance().getServerContext().getDiskSpaceMonitor(); 196 registerDiskMonitor(cfg); 197 198 replSessionSecurity = new ReplSessionSecurity(); 199 initialize(); 200 cfg.addChangeListener(this); 201 202 localPorts.add(getReplicationPort()); 203 204 // Keep track of this new instance 205 allInstances.add(this); 206 } 207 208 private void registerDiskMonitor(final ReplicationServerCfg cfg) 209 { 210 diskMonitor.registerMonitoredDirectory("Replication Changelog disk space monitor", 211 getFileForPath(cfg.getReplicationDBDirectory()), 212 cfg.getDiskLowThreshold(), 213 cfg.getDiskFullThreshold(), 214 this); 215 } 216 217 private void deregisterDiskMonitor(final ReplicationServerCfg cfg) 218 { 219 diskMonitor.deregisterMonitoredDirectory(getFileForPath(cfg.getReplicationDBDirectory()), this); 220 } 221 222 private Set<HostPort> getConfiguredRSAddresses() 223 { 224 final Set<HostPort> results = new HashSet<>(); 225 for (String serverAddress : this.config.getReplicationServer()) 226 { 227 results.add(HostPort.valueOf(serverAddress)); 228 } 229 return results; 230 } 231 232 /** 233 * Get the list of every replication servers instantiated in the current VM. 234 * @return The list of every replication servers instantiated in the current 235 * VM. 236 */ 237 public static List<ReplicationServer> getAllInstances() 238 { 239 return allInstances; 240 } 241 242 /** 243 * The run method for the Listen thread. 244 * This thread accept incoming connections on the replication server 245 * ports from other replication servers or from LDAP servers 246 * and spawn further thread responsible for handling those connections 247 */ 248 void runListen() 249 { 250 logger.info(NOTE_REPLICATION_SERVER_LISTENING, 251 getServerId(), 252 listenSocket.getInetAddress().getHostAddress(), 253 listenSocket.getLocalPort()); 254 255 while (!shutdown.get() && !stopListen) 256 { 257 // Wait on the replicationServer port. 258 // Read incoming messages and create LDAP or ReplicationServer listener 259 // and Publisher. 260 try 261 { 262 Session session; 263 Socket newSocket = null; 264 try 265 { 266 newSocket = listenSocket.accept(); 267 newSocket.setTcpNoDelay(true); 268 newSocket.setKeepAlive(true); 269 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 270 session = replSessionSecurity.createServerSession(newSocket, timeoutMS); 271 if (session == null) // Error, go back to accept 272 { 273 continue; 274 } 275 } 276 catch (Exception e) 277 { 278 // If problems happen during the SSL handshake, it is necessary 279 // to close the socket to free the associated resources. 280 if (newSocket != null) 281 { 282 newSocket.close(); 283 } 284 continue; 285 } 286 287 ReplicationMsg msg = session.receive(); 288 289 final int queueSize = this.config.getQueueSize(); 290 final int rcvWindow = this.config.getWindowSize(); 291 if (msg instanceof ServerStartMsg) 292 { 293 DataServerHandler dsHandler = new DataServerHandler( 294 session, queueSize, this, rcvWindow); 295 dsHandler.startFromRemoteDS((ServerStartMsg) msg); 296 } 297 else if (msg instanceof ReplServerStartMsg) 298 { 299 ReplicationServerHandler rsHandler = new ReplicationServerHandler( 300 session, queueSize, this, rcvWindow); 301 rsHandler.startFromRemoteRS((ReplServerStartMsg) msg); 302 } 303 else 304 { 305 // We did not recognize the message, close session as what 306 // can happen after is undetermined and we do not want the server to 307 // be disturbed 308 logger.error(ERR_REPLICATION_UNEXPECTED_MESSAGE, 309 session.getRemoteAddress().toString(), 310 (msg == null) ? "(null)" : msg.getClass().getSimpleName()); 311 session.close(); 312 } 313 } 314 catch (Exception e) 315 { 316 // The socket has probably been closed as part of the 317 // shutdown or changing the port number process. 318 // Just log debug information and loop. 319 // Do not log the message during shutdown. 320 logger.traceException(e); 321 if (!shutdown.get()) 322 { 323 logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage()); 324 } 325 } 326 } 327 } 328 329 /** 330 * This method manages the connection with the other replication servers. 331 * It periodically checks that this replication server is indeed connected 332 * to all the other replication servers and if not attempts to 333 * make the connection. 334 */ 335 void runConnect() 336 { 337 synchronized (connectThreadLock) 338 { 339 final UnreachableReplicationServers unreachableRSes = getUnreachableReplicationServers(); 340 while (!shutdown.get()) 341 { 342 HostPort localAddress = HostPort.localAddress(getReplicationPort()); 343 for (ReplicationServerDomain domain : getReplicationServerDomains()) 344 { 345 /* 346 * If there are N RSs configured then we will usually be connected to 347 * N-1 of them, since one of them is usually this RS. However, we 348 * cannot guarantee this since the configuration may not contain this 349 * RS. 350 */ 351 final Set<HostPort> connectedRSAddresses = getConnectedRSAddresses(domain); 352 for (HostPort rsAddress : getConfiguredRSAddresses()) 353 { 354 if (connectedRSAddresses.contains(rsAddress) 355 || unreachableRSes.isUnreachable(rsAddress) 356 // FIXME: this will need changing if we ever support listening on specific addresses. 357 || rsAddress.equals(localAddress)) 358 { 359 continue; 360 } 361 362 try 363 { 364 connect(rsAddress, domain.getBaseDN()); 365 } 366 catch (SocketTimeoutException ignored) 367 { 368 unreachableRSes.addServer(rsAddress); 369 } 370 } 371 } 372 373 // Notify any threads waiting with domain tickets after each iteration. 374 synchronized (domainTicketLock) 375 { 376 domainTicket++; 377 domainTicketLock.notifyAll(); 378 } 379 380 // Retry each second. 381 final int randomizer = (int) (Math.random() * 100); 382 try 383 { 384 // Releases lock, allows threads to get domain ticket. 385 connectThreadLock.wait(1000 + randomizer); 386 } 387 catch (InterruptedException e) 388 { 389 // Signaled to shutdown. 390 return; 391 } 392 } 393 } 394 } 395 396 private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain) 397 { 398 Set<HostPort> results = new HashSet<>(); 399 for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values()) 400 { 401 results.add(HostPort.valueOf(rsHandler.getServerAddressURL())); 402 } 403 return results; 404 } 405 406 /** 407 * Establish a connection to the server with the address and port. 408 * 409 * @param remoteServerAddress 410 * The address and port for the server 411 * @param baseDN 412 * The baseDN of the connection 413 */ 414 private void connect(HostPort remoteServerAddress, DN baseDN) throws SocketTimeoutException 415 { 416 boolean sslEncryption = replSessionSecurity.isSslEncryption(); 417 418 logger.trace("RS " + getMonitorInstanceName() + " connects to " + remoteServerAddress); 419 420 Socket socket = new Socket(); 421 Session session = null; 422 try 423 { 424 socket.setTcpNoDelay(true); 425 if (config.getSourceAddress() != null) 426 { 427 InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0); 428 socket.bind(local); 429 } 430 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 431 socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS); 432 session = replSessionSecurity.createClientSession(socket, timeoutMS); 433 434 ReplicationServerHandler rsHandler = new ReplicationServerHandler( 435 session, config.getQueueSize(), this, config.getWindowSize()); 436 rsHandler.connect(baseDN, sslEncryption); 437 } 438 catch (SocketTimeoutException te) 439 { 440 logger.traceException(te); 441 close(socket); 442 throw te; 443 } 444 catch (Exception e) 445 { 446 logger.traceException(e); 447 close(session); 448 close(socket); 449 } 450 } 451 452 /** Initialization function for the replicationServer. */ 453 private void initialize() 454 { 455 shutdown.set(false); 456 457 try 458 { 459 this.changelogDB.initializeDB(); 460 461 setServerURL(); 462 463 startConnectionThreads(); 464 465 if (logger.isTraceEnabled()) 466 { 467 logger.trace("RS " + getMonitorInstanceName() + " successfully initialized"); 468 } 469 } catch (UnknownHostException e) 470 { 471 logger.error(ERR_UNKNOWN_HOSTNAME); 472 } catch (IOException e) 473 { 474 logger.error(ERR_COULD_NOT_BIND_CHANGELOG, getReplicationPort(), e.getMessage()); 475 } 476 } 477 478 private void startConnectionThreads() throws IOException 479 { 480 shutdown.set(false); 481 synchronized (threadsLock) 482 { 483 listenSocket = new ServerSocket(); 484 listenSocket.bind(new InetSocketAddress(getReplicationPort())); 485 486 // creates working threads: we must first connect, then start to listen. 487 if (logger.isTraceEnabled()) 488 { 489 logger.trace("RS " + getMonitorInstanceName() + " creates connect thread"); 490 } 491 connectThread = new ReplicationServerConnectThread(this); 492 connectThread.start(); 493 494 if (logger.isTraceEnabled()) 495 { 496 logger.trace("RS " + getMonitorInstanceName() + " creates listen thread"); 497 } 498 499 listenThread = new ReplicationServerListenThread(this); 500 listenThread.start(); 501 } 502 } 503 /** 504 * Enable the external changelog if it is not already enabled. 505 * <p> 506 * The external changelog is provided by the changelog backend. 507 * 508 * @throws ConfigException 509 * If an error occurs. 510 */ 511 private void enableExternalChangeLog() throws ConfigException 512 { 513 if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID)) 514 { 515 // Backend has already been created and initialized 516 // This can occurs in tests 517 return; 518 } 519 try 520 { 521 changelogBackend = new ChangelogBackend(this, domainPredicate); 522 changelogBackend.openBackend(); 523 try 524 { 525 DirectoryServer.registerBackend(changelogBackend); 526 } 527 catch (Exception e) 528 { 529 logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(), 530 getExceptionMessage(e))); 531 } 532 533 registerVirtualAttributeRules(); 534 } 535 catch (Exception e) 536 { 537 // TODO : I18N with correct message + what kind of exception should we really throw ? 538 // (Directory/Initialization/Config Exception) 539 throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e); 540 } 541 } 542 543 private void shutdownExternalChangelog() 544 { 545 if (changelogBackend != null) 546 { 547 DirectoryServer.deregisterBackend(changelogBackend); 548 changelogBackend.finalizeBackend(); 549 changelogBackend = null; 550 } 551 deregisterVirtualAttributeRules(); 552 } 553 554 private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException 555 { 556 final List<VirtualAttributeRule> rules = new ArrayList<>(); 557 rules.add(buildVirtualAttributeRule("lastexternalchangelogcookie", new LastCookieVirtualProvider(this))); 558 rules.add(buildVirtualAttributeRule("firstchangenumber", new FirstChangeNumberVirtualAttributeProvider(this))); 559 rules.add(buildVirtualAttributeRule("lastchangenumber", new LastChangeNumberVirtualAttributeProvider(this))); 560 rules.add(buildVirtualAttributeRule("changelog", new ChangelogBaseDNVirtualAttributeProvider())); 561 return rules; 562 } 563 564 private void registerVirtualAttributeRules() throws DirectoryException { 565 for (VirtualAttributeRule rule : getVirtualAttributesRules()) 566 { 567 DirectoryServer.registerVirtualAttribute(rule); 568 } 569 } 570 571 private void deregisterVirtualAttributeRules() 572 { 573 try 574 { 575 for (VirtualAttributeRule rule : getVirtualAttributesRules()) 576 { 577 DirectoryServer.deregisterVirtualAttribute(rule); 578 } 579 } 580 catch (DirectoryException e) 581 { 582 // Should never happen 583 throw new RuntimeException(e); 584 } 585 } 586 587 private static VirtualAttributeRule buildVirtualAttributeRule(String attrName, 588 VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> provider) 589 throws DirectoryException 590 { 591 ConflictBehavior conflictBehavior = ConflictBehavior.VIRTUAL_OVERRIDES_REAL; 592 593 try 594 { 595 Set<DN> baseDNs = Collections.singleton(DN.valueOf("")); 596 Set<DN> groupDNs = Collections.emptySet(); 597 Set<SearchFilter> filters = Collections.singleton(SearchFilter.objectClassPresent()); 598 599 // To avoid the configuration in cn=config just 600 // create a rule and register it into the DirectoryServer 601 provider.initializeVirtualAttributeProvider(null); 602 603 AttributeType attributeType = DirectoryServer.getSchema().getAttributeType(attrName); 604 return new VirtualAttributeRule(attributeType, provider, 605 baseDNs, SearchScope.BASE_OBJECT, 606 groupDNs, filters, conflictBehavior); 607 } 608 catch (Exception e) 609 { 610 LocalizableMessage message = 611 NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e); 612 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e); 613 } 614 } 615 616 /** 617 * Get the ReplicationServerDomain associated to the base DN given in 618 * parameter. 619 * 620 * @param baseDN 621 * The base DN for which the ReplicationServerDomain must be 622 * returned. 623 * @return The ReplicationServerDomain associated to the base DN given in 624 * parameter. 625 */ 626 public ReplicationServerDomain getReplicationServerDomain(DN baseDN) 627 { 628 return getReplicationServerDomain(baseDN, false); 629 } 630 631 /** Returns the replicated domain DNs minus the provided set of excluded DNs. */ 632 private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException 633 { 634 Set<DN> domains = null; 635 synchronized (baseDNs) 636 { 637 domains = new HashSet<>(baseDNs.keySet()); 638 } 639 domains.removeAll(excludedBaseDNs); 640 return domains; 641 } 642 643 /** 644 * Validate that provided cookie is coherent with this replication server, 645 * when ignoring the provided set of DNs. 646 * <p> 647 * The cookie is coherent if and only if it exactly has the set of DNs corresponding to 648 * the replication domains, and the states in the cookie are not older than oldest states 649 * in the server. 650 * 651 * @param cookie 652 * The multi domain state (cookie) to validate. 653 * @param ignoredBaseDNs 654 * The set of DNs to ignore when validating 655 * @throws DirectoryException 656 * If the cookie is not valid 657 */ 658 public void validateCookie(MultiDomainServerState cookie, Set<DN> ignoredBaseDNs) throws DirectoryException 659 { 660 final Set<DN> activeDomains = getDNsOfActiveDomainsInServer(ignoredBaseDNs); 661 final Set<DN> cookieDomains = getDNsOfCookie(cookie); 662 663 checkNoUnknownDomainIsProvidedInCookie(cookie, activeDomains, cookieDomains); 664 checkCookieIsNotOutdated(cookie, activeDomains); 665 } 666 667 private Set<DN> getDNsOfCookie(MultiDomainServerState cookie) 668 { 669 final Set<DN> cookieDomains = new HashSet<>(); 670 for (final DN dn : cookie) 671 { 672 cookieDomains.add(dn); 673 } 674 return cookieDomains; 675 } 676 677 private Set<DN> getDNsOfActiveDomainsInServer(final Set<DN> ignoredBaseDNs) throws DirectoryException 678 { 679 final Set<DN> activeDomains = new HashSet<>(); 680 for (final DN dn : getDomainDNs(ignoredBaseDNs)) 681 { 682 final ServerState lastServerState = getReplicationServerDomain(dn).getLatestServerState(); 683 if (!lastServerState.isEmpty()) 684 { 685 activeDomains.add(dn); 686 } 687 } 688 return activeDomains; 689 } 690 691 private void checkNoUnknownDomainIsProvidedInCookie(final MultiDomainServerState cookie, final Set<DN> activeDomains, 692 final Set<DN> cookieDomains) throws DirectoryException 693 { 694 if (!activeDomains.containsAll(cookieDomains)) 695 { 696 final Set<DN> unknownCookieDomains = new HashSet<>(cookieDomains); 697 unknownCookieDomains.removeAll(activeDomains); 698 final StringBuilder currentStartingCookie = new StringBuilder(); 699 for (DN domainDN : activeDomains) { 700 currentStartingCookie.append(domainDN).append(":").append(cookie.getServerState(domainDN)).append(";"); 701 } 702 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 703 ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( 704 unknownCookieDomains.toString(), currentStartingCookie)); 705 } 706 } 707 708 private void checkCookieIsNotOutdated(final MultiDomainServerState cookie, final Set<DN> activeDomains) 709 throws DirectoryException 710 { 711 for (DN dn : activeDomains) 712 { 713 if (isCookieOutdatedForDomain(cookie, dn)) 714 { 715 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 716 ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(dn.toString())); 717 } 718 } 719 } 720 721 /** Check that provided cookie is not outdated compared to the oldest state of a domain. */ 722 private boolean isCookieOutdatedForDomain(MultiDomainServerState cookie, DN domainDN) 723 { 724 final ServerState providedState = cookie.getServerState(domainDN); 725 if (providedState == null) 726 { 727 // missing domains do not invalidate a cookie. 728 // results will include all the changes of the missing domains 729 return false; 730 } 731 final ServerState domainOldestState = getReplicationServerDomain(domainDN).getOldestState(); 732 for (final CSN oldestCsn : domainOldestState) 733 { 734 final CSN providedCsn = providedState.getCSN(oldestCsn.getServerId()); 735 if (providedCsn != null && providedCsn.isOlderThan(oldestCsn)) 736 { 737 return true; 738 } 739 } 740 return false; 741 } 742 743 /** 744 * Get the ReplicationServerDomain associated to the base DN given in 745 * parameter. 746 * 747 * @param baseDN The base DN for which the ReplicationServerDomain must be 748 * returned. 749 * @param create Specifies whether to create the ReplicationServerDomain if 750 * it does not already exist. 751 * @return The ReplicationServerDomain associated to the base DN given in 752 * parameter. 753 */ 754 public ReplicationServerDomain getReplicationServerDomain(DN baseDN, 755 boolean create) 756 { 757 synchronized (baseDNs) 758 { 759 ReplicationServerDomain domain = baseDNs.get(baseDN); 760 if (domain == null && create) { 761 domain = new ReplicationServerDomain(baseDN, this); 762 baseDNs.put(baseDN, domain); 763 } 764 return domain; 765 } 766 } 767 768 /** Waits for connections to this ReplicationServer. */ 769 void waitConnections() 770 { 771 // Acquire a domain ticket and wait for a complete cycle of the connect 772 // thread. 773 final long myDomainTicket; 774 synchronized (connectThreadLock) 775 { 776 // Connect thread must be waiting. 777 synchronized (domainTicketLock) 778 { 779 // Determine the ticket which will be used in the next connect thread 780 // iteration. 781 myDomainTicket = domainTicket + 1; 782 } 783 784 // Wake up connect thread. 785 connectThreadLock.notify(); 786 } 787 788 // Wait until the connect thread has processed next connect phase. 789 synchronized (domainTicketLock) 790 { 791 while (myDomainTicket > domainTicket && !shutdown.get()) 792 { 793 try 794 { 795 // Wait with timeout so that we detect shutdown. 796 domainTicketLock.wait(500); 797 } 798 catch (InterruptedException e) 799 { 800 // Can't do anything with this. 801 Thread.currentThread().interrupt(); 802 } 803 } 804 } 805 } 806 807 /** Shutdown the Replication Server service and all its connections. */ 808 public void shutdown() 809 { 810 deregisterDiskMonitor(config); 811 812 shutdownDomainsAndConnectionThreads(); 813 shutdownExternalChangelog(); 814 815 try 816 { 817 this.changelogDB.shutdownDB(); 818 } 819 catch (ChangelogException ignored) 820 { 821 logger.traceException(ignored); 822 } 823 } 824 825 private void shutdownDomainsAndConnectionThreads() 826 { 827 if (!shutdown.compareAndSet(false, true)) 828 { 829 return; 830 } 831 832 localPorts.remove(getReplicationPort()); 833 synchronized (threadsLock) 834 { 835 // shutdown the connect thread 836 if (connectThread != null) 837 { 838 connectThread.interrupt(); 839 } 840 841 // shutdown the listener thread 842 close(listenSocket); 843 if (listenThread != null) 844 { 845 listenThread.interrupt(); 846 } 847 listenThread = null; 848 849 // shutdown all the replication domains 850 for (ReplicationServerDomain domain : getReplicationServerDomains()) 851 { 852 domain.shutdown(); 853 } 854 } 855 // Remove this instance from the global instance list 856 allInstances.remove(this); 857 } 858 859 /** 860 * Retrieves the time after which changes must be deleted from the 861 * persistent storage (in milliseconds). 862 * 863 * @return The time after which changes must be deleted from the 864 * persistent storage (in milliseconds). 865 */ 866 public long getPurgeDelay() 867 { 868 return this.config.getReplicationPurgeDelay() * 1000; 869 } 870 871 /** 872 * Check if the provided configuration is acceptable for add. 873 * 874 * @param configuration The configuration to check. 875 * @param unacceptableReasons When the configuration is not acceptable, this 876 * table is use to return the reasons why this 877 * configuration is not acceptable. 878 * 879 * @return true if the configuration is acceptable, false other wise. 880 */ 881 public static boolean isConfigurationAcceptable( 882 ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons) 883 { 884 int port = configuration.getReplicationPort(); 885 886 try 887 { 888 ServerSocket tmpSocket = new ServerSocket(); 889 tmpSocket.bind(new InetSocketAddress(port)); 890 tmpSocket.close(); 891 return true; 892 } 893 catch (Exception e) 894 { 895 LocalizableMessage message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage()); 896 unacceptableReasons.add(message); 897 return false; 898 } 899 } 900 901 @Override 902 public ConfigChangeResult applyConfigurationChange( 903 ReplicationServerCfg configuration) 904 { 905 final ConfigChangeResult ccr = new ConfigChangeResult(); 906 907 // Some of those properties change don't need specific code. 908 // They will be applied for next connections. Some others have immediate effect 909 final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses(); 910 911 final ReplicationServerCfg oldConfig = this.config; 912 this.config = configuration; 913 914 disconnectRemovedReplicationServers(oldRSAddresses); 915 916 final long newPurgeDelay = config.getReplicationPurgeDelay(); 917 if (newPurgeDelay != oldConfig.getReplicationPurgeDelay()) 918 { 919 this.changelogDB.setPurgeDelay(getPurgeDelay()); 920 } 921 final boolean computeCN = config.isComputeChangeNumber(); 922 if (computeCN != oldConfig.isComputeChangeNumber()) 923 { 924 try 925 { 926 this.changelogDB.setComputeChangeNumber(computeCN); 927 } 928 catch (ChangelogException e) 929 { 930 logger.traceException(e); 931 ccr.setResultCode(ResultCode.OPERATIONS_ERROR); 932 } 933 } 934 935 cryptoSuite.newParameters(config.getCipherTransformation(), config.getCipherKeyLength(), 936 config.isConfidentialityEnabled()); 937 938 // changing the listen port requires to stop the listen thread and restart it. 939 synchronized (threadsLock) 940 { 941 if (getReplicationPort() != oldConfig.getReplicationPort() && listenThread != null) 942 { 943 stopListen = true; 944 try 945 { 946 close(listenSocket); 947 listenThread.join(); 948 stopListen = false; 949 950 setServerURL(); 951 listenSocket = new ServerSocket(); 952 listenSocket.bind(new InetSocketAddress(getReplicationPort())); 953 954 listenThread = new ReplicationServerListenThread(this); 955 listenThread.start(); 956 } 957 catch (IOException e) 958 { 959 logger.traceException(e); 960 logger.error(ERR_COULD_NOT_CLOSE_THE_SOCKET, e); 961 } 962 catch (InterruptedException e) 963 { 964 logger.traceException(e); 965 logger.error(ERR_COULD_NOT_STOP_LISTEN_THREAD, e); 966 } 967 } 968 } 969 970 // Update period value for monitoring publishers 971 if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod()) 972 { 973 for (ReplicationServerDomain domain : getReplicationServerDomains()) 974 { 975 domain.updateMonitoringPeriod(config.getMonitoringPeriod()); 976 } 977 } 978 979 // Changed the group id ? 980 if (config.getGroupId() != oldConfig.getGroupId()) 981 { 982 // Have a new group id: Disconnect every servers. 983 for (ReplicationServerDomain domain : getReplicationServerDomains()) 984 { 985 domain.stopAllServers(true); 986 } 987 } 988 989 // Set a potential new weight 990 if (oldConfig.getWeight() != config.getWeight()) 991 { 992 // Broadcast the new weight the the whole topology. This will make some 993 // DSs reconnect (if needed) to other RSs according to the new weight of 994 // this RS. 995 broadcastConfigChange(); 996 } 997 998 final String newDir = config.getReplicationDBDirectory(); 999 if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory())) 1000 { 1001 ccr.setAdminActionRequired(true); 1002 } 1003 else if (oldConfig.getDiskFullThreshold() != configuration.getDiskFullThreshold() 1004 || oldConfig.getDiskLowThreshold() != configuration.getDiskLowThreshold()) 1005 { 1006 deregisterDiskMonitor(oldConfig); 1007 registerDiskMonitor(config); 1008 } 1009 return ccr; 1010 } 1011 1012 /** 1013 * Try and set a sensible URL for this replication server. Since we are 1014 * listening on all addresses there are a couple of potential candidates: 1015 * <ol> 1016 * <li>a matching server URL in the replication server's configuration,</li> 1017 * <li>hostname local address.</li> 1018 * </ol> 1019 */ 1020 private void setServerURL() throws UnknownHostException 1021 { 1022 /* 1023 * First try the set of configured replication servers to see if one of them 1024 * is this replication server (this should always be the case). 1025 */ 1026 for (HostPort rsAddress : getConfiguredRSAddresses()) 1027 { 1028 /* No need validate the string format because the admin framework has already done it. */ 1029 if (rsAddress.getPort() == getReplicationPort() 1030 && rsAddress.isLocalAddress()) 1031 { 1032 serverURL = rsAddress.toString(); 1033 return; 1034 } 1035 } 1036 1037 // Fall-back to the machine hostname. 1038 final String host = InetAddress.getLocalHost().getHostName(); 1039 // Ensure correct formatting of IPv6 addresses by using a HostPort instance. 1040 serverURL = new HostPort(host, getReplicationPort()).toString(); 1041 } 1042 1043 /** 1044 * Broadcast a configuration change that just happened to the whole topology 1045 * by sending a TopologyMsg to every entity in the topology. 1046 */ 1047 private void broadcastConfigChange() 1048 { 1049 for (ReplicationServerDomain domain : getReplicationServerDomains()) 1050 { 1051 domain.sendTopoInfoToAll(); 1052 } 1053 } 1054 1055 @Override 1056 public boolean isConfigurationChangeAcceptable( 1057 ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons) 1058 { 1059 return true; 1060 } 1061 1062 /** 1063 * Get the value of generationId for the replication replicationServerDomain 1064 * associated with the provided baseDN. 1065 * 1066 * @param baseDN The baseDN of the replicationServerDomain. 1067 * @return The value of the generationID. 1068 */ 1069 public long getGenerationId(DN baseDN) 1070 { 1071 final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); 1072 return rsd != null ? rsd.getGenerationId() : -1; 1073 } 1074 1075 /** 1076 * Get the serverId for this replication server. 1077 * 1078 * @return The value of the serverId. 1079 */ 1080 public int getServerId() 1081 { 1082 return this.config.getReplicationServerId(); 1083 } 1084 1085 /** 1086 * Do what needed when the config object related to this replication server 1087 * is deleted from the server configuration. 1088 */ 1089 public void remove() 1090 { 1091 if (logger.isTraceEnabled()) 1092 { 1093 logger.trace("RS " + getMonitorInstanceName() + " starts removing"); 1094 } 1095 shutdown(); 1096 } 1097 1098 /** 1099 * Returns an iterator on the list of replicationServerDomain. 1100 * Returns null if none. 1101 * @return the iterator. 1102 */ 1103 public Iterator<ReplicationServerDomain> getDomainIterator() 1104 { 1105 return getReplicationServerDomains().iterator(); 1106 } 1107 1108 /** 1109 * Get the assured mode timeout. 1110 * <p> 1111 * It is the Timeout (in milliseconds) when waiting for acknowledgments. 1112 * 1113 * @return The assured mode timeout. 1114 */ 1115 public long getAssuredTimeout() 1116 { 1117 return this.config.getAssuredTimeout(); 1118 } 1119 1120 /** 1121 * Get The replication server group id. 1122 * @return The replication server group id. 1123 */ 1124 public byte getGroupId() 1125 { 1126 return (byte) this.config.getGroupId(); 1127 } 1128 1129 /** 1130 * Get the degraded status threshold value for status analyzer. 1131 * <p> 1132 * The degraded status threshold is the number of pending changes for a DS, 1133 * considered as threshold value to put the DS in DEGRADED_STATUS. If value is 1134 * 0, status analyzer is disabled. 1135 * 1136 * @return The degraded status threshold value for status analyzer. 1137 */ 1138 public int getDegradedStatusThreshold() 1139 { 1140 return this.config.getDegradedStatusThreshold(); 1141 } 1142 1143 /** 1144 * Get the monitoring publisher period value. 1145 * <p> 1146 * It is the number of milliseconds to wait before sending new monitoring 1147 * messages. If value is 0, monitoring publisher is disabled. 1148 * 1149 * @return the monitoring publisher period value. 1150 */ 1151 public long getMonitoringPublisherPeriod() 1152 { 1153 return this.config.getMonitoringPeriod(); 1154 } 1155 1156 /** 1157 * Compute the list of replication servers that are not any more connected to 1158 * this Replication Server and stop the corresponding handlers. 1159 * 1160 * @param oldRSAddresses 1161 * the old list of configured replication servers addresses. 1162 */ 1163 private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses) 1164 { 1165 final Collection<HostPort> serversToDisconnect = new ArrayList<>(); 1166 1167 final Set<HostPort> newRSAddresses = getConfiguredRSAddresses(); 1168 for (HostPort oldRSAddress : oldRSAddresses) 1169 { 1170 if (!newRSAddresses.contains(oldRSAddress)) 1171 { 1172 serversToDisconnect.add(oldRSAddress); 1173 } 1174 } 1175 1176 if (serversToDisconnect.isEmpty()) 1177 { 1178 return; 1179 } 1180 1181 for (ReplicationServerDomain domain: getReplicationServerDomains()) 1182 { 1183 domain.stopReplicationServers(serversToDisconnect); 1184 } 1185 } 1186 1187 /** 1188 * Retrieves a printable name for this Replication Server Instance. 1189 * 1190 * @return A printable name for this Replication Server Instance. 1191 */ 1192 public String getMonitorInstanceName() 1193 { 1194 return "Replication Server " + getReplicationPort() + " " + getServerId(); 1195 } 1196 1197 /** 1198 * Retrieves the port used by this ReplicationServer. 1199 * 1200 * @return The port used by this ReplicationServer. 1201 */ 1202 public int getReplicationPort() 1203 { 1204 return config.getReplicationPort(); 1205 } 1206 1207 /** 1208 * Getter on the server URL. 1209 * @return the server URL. 1210 */ 1211 public String getServerURL() 1212 { 1213 return this.serverURL; 1214 } 1215 1216 /** 1217 * WARNING : only use this methods for tests purpose. 1218 * 1219 * Add the Replication Server given as a parameter in the list 1220 * of local replication servers. 1221 * 1222 * @param server The server to be added. 1223 */ 1224 public static void onlyForTestsAddlocalReplicationServer(String server) 1225 { 1226 localPorts.add(HostPort.valueOf(server).getPort()); 1227 } 1228 1229 /** 1230 * WARNING : only use this methods for tests purpose. 1231 * 1232 * Clear the list of local Replication Servers 1233 */ 1234 public static void onlyForTestsClearLocalReplicationServerList() 1235 { 1236 localPorts.clear(); 1237 } 1238 1239 /** 1240 * Returns {@code true} if the provided port is one of the ports that this 1241 * replication server is listening on. 1242 * 1243 * @param port 1244 * The port to be checked. 1245 * @return {@code true} if the provided port is one of the ports that this 1246 * replication server is listening on. 1247 */ 1248 public static boolean isLocalReplicationServerPort(int port) 1249 { 1250 return localPorts.contains(port); 1251 } 1252 1253 /** 1254 * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external 1255 * changelog. 1256 * 1257 * @return the handler. 1258 */ 1259 ChangeNumberIndexDB getChangeNumberIndexDB() 1260 { 1261 return this.changelogDB.getChangeNumberIndexDB(); 1262 } 1263 1264 /** 1265 * Returns the oldest change number in the change number index DB. 1266 * 1267 * @return the oldest change number in the change number index DB 1268 * @throws DirectoryException 1269 * When a problem happens 1270 */ 1271 public long getOldestChangeNumber() throws DirectoryException 1272 { 1273 try 1274 { 1275 final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); 1276 final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); 1277 if (oldestRecord != null) 1278 { 1279 return oldestRecord.getChangeNumber(); 1280 } 1281 // database is empty 1282 return cnIndexDB.getLastGeneratedChangeNumber(); 1283 } 1284 catch (ChangelogException e) 1285 { 1286 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e); 1287 } 1288 } 1289 1290 /** 1291 * Returns the newest change number in the change number index DB. 1292 * 1293 * @return the newest change number in the change number index DB 1294 * @throws DirectoryException 1295 * When a problem happens 1296 */ 1297 public long getNewestChangeNumber() throws DirectoryException 1298 { 1299 try 1300 { 1301 final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); 1302 final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); 1303 if (newestRecord != null) 1304 { 1305 return newestRecord.getChangeNumber(); 1306 } 1307 // database is empty 1308 return cnIndexDB.getLastGeneratedChangeNumber(); 1309 } 1310 catch (ChangelogException e) 1311 { 1312 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e); 1313 } 1314 } 1315 1316 /** 1317 * Returns the newest cookie value. 1318 * 1319 * @param excludedBaseDNs 1320 * The set of baseDNs excluded from ECL. 1321 * @return the newest cookie value. 1322 */ 1323 public MultiDomainServerState getNewestECLCookie(Set<DN> excludedBaseDNs) 1324 { 1325 // Initialize start state for all running domains with empty state 1326 final MultiDomainServerState result = new MultiDomainServerState(); 1327 for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) 1328 { 1329 if (!excludedBaseDNs.contains(rsDomain.getBaseDN())) 1330 { 1331 final ServerState latestDBServerState = rsDomain.getLatestServerState(); 1332 if (!latestDBServerState.isEmpty()) 1333 { 1334 result.replace(rsDomain.getBaseDN(), latestDBServerState); 1335 } 1336 } 1337 } 1338 return result; 1339 } 1340 1341 @Override 1342 public void diskLowThresholdReached(final File directory, final long thresholdInBytes) 1343 { 1344 logger.warn(WARN_DISK_LOW_CHANGELOG_DIRECTORY); 1345 } 1346 1347 @Override 1348 public void diskFullThresholdReached(final File directory, final long thresholdInBytes) 1349 { 1350 logger.error(ERR_DISK_FULL_CHANGELOG_DIRECTORY); 1351 shutdownDomainsAndConnectionThreads(); 1352 } 1353 1354 @Override 1355 public void diskSpaceRestored(final File directory, final long lowThresholdInBytes, 1356 final long fullThresholdInBytes) 1357 { 1358 try 1359 { 1360 startConnectionThreads(); 1361 localPorts.add(getReplicationPort()); 1362 allInstances.add(this); 1363 logger.warn(WARN_REPLICATION_SERVER_RESTARTED); 1364 } 1365 catch (IOException e) 1366 { 1367 logger.error(ERR_COULD_NOT_RESTART_CHANGELOG, getReplicationPort(), e.getMessage()); 1368 } 1369 } 1370 1371 /** 1372 * Gets the weight affected to the replication server. 1373 * <p> 1374 * Each replication server of the topology has a weight. When combined 1375 * together, the weights of the replication servers of a same group can be 1376 * translated to a percentage that determines the quantity of directory 1377 * servers of the topology that should be connected to a replication server. 1378 * <p> 1379 * For instance imagine a topology with 3 replication servers (with the same 1380 * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that 1381 * RS1 should have 25% of the directory servers connected in the topology, RS2 1382 * 25%, and RS3 50%. This may be useful if the replication servers of the 1383 * topology have a different power and one wants to spread the load between 1384 * the replication servers according to their power. 1385 * 1386 * @return the weight 1387 */ 1388 public int getWeight() 1389 { 1390 return this.config.getWeight(); 1391 } 1392 1393 private Collection<ReplicationServerDomain> getReplicationServerDomains() 1394 { 1395 synchronized (baseDNs) 1396 { 1397 return new ArrayList<>(baseDNs.values()); 1398 } 1399 } 1400 1401 /** 1402 * Returns the changelogDB. 1403 * 1404 * @return the changelogDB. 1405 */ 1406 public ChangelogDB getChangelogDB() 1407 { 1408 return this.changelogDB; 1409 } 1410 1411 /** 1412 * Returns the synchronization object for shutdown of combined DS/RS instances. 1413 * 1414 * @return the synchronization object for shutdown of combined DS/RS instances. 1415 */ 1416 DSRSShutdownSync getDSRSShutdownSync() 1417 { 1418 return dsrsShutdownSync; 1419 } 1420 1421 /** 1422 * Returns whether change-log indexing is enabled for this RS. 1423 * @return true if change-log indexing is enabled for this RS. 1424 */ 1425 public boolean isChangeNumberEnabled() 1426 { 1427 return config.isComputeChangeNumber(); 1428 } 1429 1430 /** 1431 * Returns whether the external change-log contains data from at least a domain. 1432 * @return whether the external change-log contains data from at least a domain 1433 */ 1434 public boolean isECLEnabled() 1435 { 1436 return MultimasterReplication.isECLEnabled(); 1437 } 1438 1439 /** 1440 * Return whether change-log records should be encrypted. 1441 * @return trus if change-log records should be encrypted 1442 */ 1443 public boolean isEncrypted() 1444 { 1445 return config.isConfidentialityEnabled(); 1446 } 1447 1448 @Override 1449 public String toString() 1450 { 1451 return "RS(" + getServerId() + ") on " + serverURL + ", domains=" + baseDNs.keySet(); 1452 } 1453}