001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2017 ForgeRock AS. 016 */ 017package org.opends.server.replication.plugin; 018 019import static org.opends.messages.ReplicationMessages.*; 020import static org.opends.server.replication.plugin.EntryHistorical.HISTORICAL_ATTACHMENT_NAME; 021import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; 022import static org.opends.server.util.ServerConstants.*; 023import static org.opends.server.util.StaticUtils.*; 024 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.CopyOnWriteArraySet; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantLock; 038 039import org.forgerock.i18n.LocalizableMessage; 040import org.forgerock.i18n.slf4j.LocalizedLogger; 041import org.forgerock.opendj.config.server.ConfigChangeResult; 042import org.forgerock.opendj.config.server.ConfigException; 043import org.forgerock.opendj.config.server.ConfigurationAddListener; 044import org.forgerock.opendj.config.server.ConfigurationChangeListener; 045import org.forgerock.opendj.config.server.ConfigurationDeleteListener; 046import org.forgerock.opendj.ldap.DN; 047import org.forgerock.opendj.ldap.ResultCode; 048import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; 049import org.forgerock.opendj.server.config.server.ReplicationSynchronizationProviderCfg; 050import org.opends.server.api.Backend; 051import org.opends.server.api.BackupTaskListener; 052import org.opends.server.api.ExportTaskListener; 053import org.opends.server.api.ImportTaskListener; 054import org.opends.server.api.RestoreTaskListener; 055import org.opends.server.api.SynchronizationProvider; 056import org.opends.server.core.DirectoryServer; 057import org.opends.server.replication.service.DSRSShutdownSync; 058import org.opends.server.types.BackupConfig; 059import org.opends.server.types.Control; 060import org.opends.server.types.DirectoryException; 061import org.opends.server.types.Entry; 062import org.opends.server.types.HostPort; 063import org.opends.server.types.LDIFExportConfig; 064import org.opends.server.types.LDIFImportConfig; 065import org.opends.server.types.Modification; 066import org.opends.server.types.Operation; 067import org.opends.server.types.RestoreConfig; 068import org.opends.server.types.SynchronizationProviderResult; 069import org.opends.server.types.operation.PluginOperation; 070import org.opends.server.types.operation.PostOperationAddOperation; 071import org.opends.server.types.operation.PostOperationDeleteOperation; 072import org.opends.server.types.operation.PostOperationModifyDNOperation; 073import org.opends.server.types.operation.PostOperationModifyOperation; 074import org.opends.server.types.operation.PostOperationOperation; 075import org.opends.server.types.operation.PreOperationAddOperation; 076import org.opends.server.types.operation.PreOperationDeleteOperation; 077import org.opends.server.types.operation.PreOperationModifyDNOperation; 078import org.opends.server.types.operation.PreOperationModifyOperation; 079import org.opends.server.util.Platform; 080import org.opends.server.util.TimeThread; 081 082/** 083 * This class is used to load the Replication code inside the JVM 084 * and to trigger initialization of the replication. 085 * 086 * It also extends the SynchronizationProvider class in order to have some 087 * replication code running during the operation process 088 * as pre-op, conflictResolution, and post-op. 089 */ 090public class MultimasterReplication 091 extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> 092 implements ConfigurationAddListener<ReplicationDomainCfg>, 093 ConfigurationDeleteListener<ReplicationDomainCfg>, 094 ConfigurationChangeListener 095 <ReplicationSynchronizationProviderCfg>, 096 BackupTaskListener, RestoreTaskListener, ImportTaskListener, 097 ExportTaskListener 098{ 099 /** Enum that symbolizes the state of the multimaster replication. */ 100 private enum State 101 { 102 STARTING, RUNNING, STOPPING 103 } 104 105 /** 106 * Keeps information on temporarily unreachable replication unreachableServers. 107 * RSes in the list will be deleted after an interval, depending on the number of configured replication 108 * unreachableServers, from the time the last server has been added. It guarantees the replication server, if any, 109 * will not timeout twice on the same RS while connecting to other RSes, blocking new incoming connections 110 * for too long (see {@link ReplicationServer#runConnect()} and how it updates {@code domainTicket}). 111 * A Data Server will use this information but not update it, since it can simply try the next in its list; 112 * once it is connected, it will recalculate the best server and reconnect if necessary. 113 */ 114 public static class UnreachableReplicationServers 115 { 116 private final Collection<HostPort> unreachableServers = new CopyOnWriteArraySet<>(); 117 private volatile long lastAddTime = TimeThread.getTime(); 118 private volatile int maxLifetimeMS = getConnectionTimeoutMS() * 3; // myself, my peer and one client timeout 119 120 Collection<HostPort> getUnreachableServers() 121 { 122 if (!unreachableServers.isEmpty() && TimeThread.getTime() - lastAddTime > maxLifetimeMS) 123 { 124 unreachableServers.clear(); 125 } 126 return unreachableServers; 127 } 128 129 void updateReplicationServersCount(int maxServers) 130 { 131 maxLifetimeMS = getConnectionTimeoutMS() * (maxServers + 1); 132 } 133 134 /** 135 * Add a server to the set of temporarily unreachable unreachableServers. 136 * 137 * @param server the server to add 138 */ 139 public void addServer(HostPort server) 140 { 141 lastAddTime = TimeThread.getTime(); 142 unreachableServers.add(server); 143 } 144 145 /** 146 * Returns {@code true} if the provided server is temporarily unreachable. 147 * 148 * @param server the server to check 149 * @return {@code true} if the provided server is temporarily unreachable 150 */ 151 public boolean isUnreachable(HostPort server) 152 { 153 return getUnreachableServers().contains(server); 154 } 155 } 156 157 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 158 159 private ReplicationServerListener replicationServerListener; 160 private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4); 161 private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync(); 162 /** The queue of received update messages, to be treated by the ReplayThread threads. */ 163 private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000); 164 /** The list of ReplayThread threads. */ 165 private static final List<ReplayThread> replayThreads = new ArrayList<>(); 166 /** The configurable number of replay threads. */ 167 private static int replayThreadNumber = 10; 168 /** Set of Replication Servers that could not be contacted. */ 169 private static final UnreachableReplicationServers unreachableRSes = new UnreachableReplicationServers(); 170 171 private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING); 172 173 /** The configurable connection/handshake timeout. */ 174 private static volatile int connectionTimeoutMS = 5000; 175 176 /** 177 * Finds the domain for a given DN. 178 * 179 * @param dn The DN for which the domain must be returned. 180 * @param pluginOp An optional operation for which the check is done. 181 * Can be null is the request has no associated operation. 182 * @return The domain for this DN. 183 */ 184 public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp) 185 { 186 /* 187 * Don't run the special replication code on Operation that are 188 * specifically marked as don't synchronize. 189 */ 190 if (pluginOp instanceof Operation) 191 { 192 final Operation op = (Operation) pluginOp; 193 if (op.dontSynchronize()) 194 { 195 return null; 196 } 197 198 /* 199 * Check if the provided operation is a repair operation and set the 200 * synchronization flags if necessary. 201 * The repair operations are tagged as synchronization operations so 202 * that the core server let the operation modify the entryuuid and 203 * ds-sync-hist attributes. 204 * They are also tagged as dontSynchronize so that the replication code 205 * running later do not generate CSN, solve conflicts and forward the 206 * operation to the replication server. 207 */ 208 for (Iterator<Control> it = op.getRequestControls().iterator(); it.hasNext();) 209 { 210 Control c = it.next(); 211 if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID())) 212 { 213 op.setSynchronizationOperation(true); 214 op.setDontSynchronize(true); 215 /* 216 remove this control from the list of controls since it has now been 217 processed and the local backend will fail if it finds a control that 218 it does not know about and that is marked as critical. 219 */ 220 it.remove(); 221 return null; 222 } 223 } 224 } 225 226 LDAPReplicationDomain domain = null; 227 DN temp = dn; 228 while (domain == null && temp != null) 229 { 230 domain = domains.get(temp); 231 temp = DirectoryServer.getParentDNInSuffix(temp); 232 } 233 234 return domain; 235 } 236 237 /** 238 * Creates a new domain from its configEntry, do the 239 * necessary initialization and starts it so that it is 240 * fully operational when this method returns. 241 * @param configuration The entry with the configuration of this domain. 242 * @return The domain created. 243 * @throws ConfigException When the configuration is not valid. 244 */ 245 public static LDAPReplicationDomain createNewDomain( 246 ReplicationDomainCfg configuration) 247 throws ConfigException 248 { 249 try 250 { 251 final LDAPReplicationDomain domain = new LDAPReplicationDomain( 252 configuration, updateToReplayQueue, dsrsShutdownSync); 253 if (domains.isEmpty()) 254 { 255 // Create the threads that will process incoming update messages 256 createReplayThreads(); 257 } 258 259 domains.put(domain.getBaseDN(), domain); 260 return domain; 261 } 262 catch (ConfigException e) 263 { 264 logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(), 265 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 266 } 267 return null; 268 } 269 270 /** 271 * Creates a new domain from its configEntry, do the necessary initialization 272 * and starts it so that it is fully operational when this method returns. It 273 * is only used for tests so far. 274 * 275 * @param configuration The entry with the configuration of this domain. 276 * @param queue The BlockingQueue that this domain will use. 277 * 278 * @return The domain created. 279 * 280 * @throws ConfigException When the configuration is not valid. 281 */ 282 static LDAPReplicationDomain createNewDomain( 283 ReplicationDomainCfg configuration, 284 BlockingQueue<UpdateToReplay> queue) 285 throws ConfigException 286 { 287 final LDAPReplicationDomain domain = 288 new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync); 289 domains.put(domain.getBaseDN(), domain); 290 return domain; 291 } 292 293 /** 294 * Deletes a domain. 295 * @param dn : the base DN of the domain to delete. 296 */ 297 public static void deleteDomain(DN dn) 298 { 299 LDAPReplicationDomain domain = domains.remove(dn); 300 if (domain != null) 301 { 302 domain.delete(); 303 } 304 305 // No replay threads running if no replication need 306 if (domains.isEmpty()) { 307 stopReplayThreads(); 308 } 309 } 310 311 @Override 312 public void initializeSynchronizationProvider( 313 ReplicationSynchronizationProviderCfg cfg) throws ConfigException 314 { 315 domains.clear(); 316 replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync); 317 318 // Register as an add and delete listener with the root configuration so we 319 // can be notified if Multimaster domain entries are added or removed. 320 cfg.addReplicationDomainAddListener(this); 321 cfg.addReplicationDomainDeleteListener(this); 322 323 // Register as a root configuration listener so that we can be notified if 324 // number of replay threads is changed and apply changes. 325 cfg.addReplicationChangeListener(this); 326 327 replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg); 328 connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE); 329 330 // Create the list of domains that are already defined. 331 for (String name : cfg.listReplicationDomains()) 332 { 333 createNewDomain(cfg.getReplicationDomain(name)); 334 } 335 336 // If any schema changes were made with the server offline, then handle them now. 337 List<Modification> offlineSchemaChanges = 338 DirectoryServer.getOfflineSchemaChanges(); 339 if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty()) 340 { 341 processSchemaChange(offlineSchemaChanges); 342 } 343 344 DirectoryServer.registerBackupTaskListener(this); 345 DirectoryServer.registerRestoreTaskListener(this); 346 DirectoryServer.registerExportTaskListener(this); 347 DirectoryServer.registerImportTaskListener(this); 348 349 DirectoryServer.registerSupportedControl( 350 ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL); 351 } 352 353 private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg) 354 { 355 Integer value = cfg.getNumUpdateReplayThreads(); 356 return value == null ? Platform.computeNumberOfThreads(16, 2.0f) : value; 357 } 358 359 /** Create the threads that will wait for incoming update messages. */ 360 private static synchronized void createReplayThreads() 361 { 362 replayThreads.clear(); 363 364 ReentrantLock switchQueueLock = new ReentrantLock(); 365 for (int i = 0; i < replayThreadNumber; i++) 366 { 367 ReplayThread replayThread = new ReplayThread(updateToReplayQueue, switchQueueLock); 368 replayThread.start(); 369 replayThreads.add(replayThread); 370 } 371 } 372 373 /** Stop the threads that are waiting for incoming update messages. */ 374 private static synchronized void stopReplayThreads() 375 { 376 // stop the replay threads 377 for (ReplayThread replayThread : replayThreads) 378 { 379 replayThread.shutdown(); 380 } 381 382 for (ReplayThread replayThread : replayThreads) 383 { 384 try 385 { 386 replayThread.join(); 387 } 388 catch(InterruptedException e) 389 { 390 Thread.currentThread().interrupt(); 391 } 392 } 393 replayThreads.clear(); 394 } 395 396 @Override 397 public boolean isConfigurationAddAcceptable( 398 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 399 { 400 return LDAPReplicationDomain.isConfigurationAcceptable( 401 configuration, unacceptableReasons); 402 } 403 404 @Override 405 public ConfigChangeResult applyConfigurationAdd( 406 ReplicationDomainCfg configuration) 407 { 408 ConfigChangeResult ccr = new ConfigChangeResult(); 409 try 410 { 411 LDAPReplicationDomain rd = createNewDomain(configuration); 412 if (State.RUNNING.equals(state.get())) 413 { 414 rd.start(); 415 if (State.STOPPING.equals(state.get())) { 416 rd.shutdown(); 417 } 418 } 419 } catch (ConfigException e) 420 { 421 // we should never get to this point because the configEntry has 422 // already been validated in isConfigurationAddAcceptable() 423 ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION); 424 } 425 return ccr; 426 } 427 428 @Override 429 public void doPostOperation(PostOperationAddOperation addOperation) 430 { 431 DN dn = addOperation.getEntryDN(); 432 genericPostOperation(addOperation, dn); 433 } 434 435 @Override 436 public void doPostOperation(PostOperationDeleteOperation deleteOperation) 437 { 438 DN dn = deleteOperation.getEntryDN(); 439 genericPostOperation(deleteOperation, dn); 440 } 441 442 @Override 443 public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation) 444 { 445 DN dn = modifyDNOperation.getEntryDN(); 446 genericPostOperation(modifyDNOperation, dn); 447 } 448 449 @Override 450 public void doPostOperation(PostOperationModifyOperation modifyOperation) 451 { 452 DN dn = modifyOperation.getEntryDN(); 453 genericPostOperation(modifyOperation, dn); 454 } 455 456 @Override 457 public SynchronizationProviderResult handleConflictResolution( 458 PreOperationModifyOperation modifyOperation) 459 { 460 LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); 461 if (domain != null) 462 { 463 return domain.handleConflictResolution(modifyOperation); 464 } 465 return new SynchronizationProviderResult.ContinueProcessing(); 466 } 467 468 @Override 469 public SynchronizationProviderResult handleConflictResolution( 470 PreOperationAddOperation addOperation) throws DirectoryException 471 { 472 LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); 473 if (domain != null) 474 { 475 return domain.handleConflictResolution(addOperation); 476 } 477 return new SynchronizationProviderResult.ContinueProcessing(); 478 } 479 480 @Override 481 public SynchronizationProviderResult handleConflictResolution( 482 PreOperationDeleteOperation deleteOperation) throws DirectoryException 483 { 484 LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); 485 if (domain != null) 486 { 487 return domain.handleConflictResolution(deleteOperation); 488 } 489 return new SynchronizationProviderResult.ContinueProcessing(); 490 } 491 492 @Override 493 public SynchronizationProviderResult handleConflictResolution( 494 PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException 495 { 496 LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); 497 if (domain != null) 498 { 499 return domain.handleConflictResolution(modifyDNOperation); 500 } 501 return new SynchronizationProviderResult.ContinueProcessing(); 502 } 503 504 @Override 505 public SynchronizationProviderResult 506 doPreOperation(PreOperationModifyOperation modifyOperation) 507 { 508 DN operationDN = modifyOperation.getEntryDN(); 509 LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation); 510 511 if (domain == null || !domain.solveConflict()) 512 { 513 return new SynchronizationProviderResult.ContinueProcessing(); 514 } 515 516 EntryHistorical historicalInformation = 517 (EntryHistorical) modifyOperation.getAttachment(HISTORICAL_ATTACHMENT_NAME); 518 if (historicalInformation == null) 519 { 520 Entry entry = modifyOperation.getModifiedEntry(); 521 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 522 modifyOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation); 523 } 524 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 525 historicalInformation.setHistoricalAttrToOperation(modifyOperation); 526 527 if (modifyOperation.getModifications().isEmpty()) 528 { 529 /* 530 * This operation becomes a no-op due to conflict resolution 531 * stop the processing and send an OK result 532 */ 533 return new SynchronizationProviderResult.StopProcessing( 534 ResultCode.SUCCESS, null); 535 } 536 537 return new SynchronizationProviderResult.ContinueProcessing(); 538 } 539 540 @Override 541 public SynchronizationProviderResult doPreOperation( 542 PreOperationDeleteOperation deleteOperation) throws DirectoryException 543 { 544 return new SynchronizationProviderResult.ContinueProcessing(); 545 } 546 547 @Override 548 public SynchronizationProviderResult doPreOperation( 549 PreOperationModifyDNOperation modifyDNOperation) 550 throws DirectoryException 551 { 552 DN operationDN = modifyDNOperation.getEntryDN(); 553 LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); 554 555 if (domain == null || !domain.solveConflict()) 556 { 557 return new SynchronizationProviderResult.ContinueProcessing(); 558 } 559 560 // The historical object is retrieved from the attachment created 561 // in the HandleConflictResolution phase. 562 EntryHistorical historicalInformation = (EntryHistorical) 563 modifyDNOperation.getAttachment(HISTORICAL_ATTACHMENT_NAME); 564 if (historicalInformation == null) 565 { 566 // When no Historical attached, create once by loading from the entry 567 // and attach it to the operation 568 Entry entry = modifyDNOperation.getUpdatedEntry(); 569 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 570 modifyDNOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation); 571 } 572 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 573 574 // Add to the operation the historical attribute : "dn:changeNumber:moddn" 575 historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); 576 577 return new SynchronizationProviderResult.ContinueProcessing(); 578 } 579 580 @Override 581 public SynchronizationProviderResult doPreOperation( 582 PreOperationAddOperation addOperation) 583 { 584 // Check replication domain 585 LDAPReplicationDomain domain = 586 findDomain(addOperation.getEntryDN(), addOperation); 587 if (domain == null) 588 { 589 return new SynchronizationProviderResult.ContinueProcessing(); 590 } 591 592 // For LOCAL op only, generate CSN and attach Context 593 if (!addOperation.isSynchronizationOperation()) 594 { 595 domain.doPreOperation(addOperation); 596 } 597 598 // Add to the operation the historical attribute : "dn:changeNumber:add" 599 EntryHistorical.setHistoricalAttrToOperation(addOperation); 600 601 return new SynchronizationProviderResult.ContinueProcessing(); 602 } 603 604 @Override 605 public void finalizeSynchronizationProvider() 606 { 607 setState(State.STOPPING); 608 609 for (LDAPReplicationDomain domain : domains.values()) 610 { 611 domain.shutdown(); 612 } 613 domains.clear(); 614 615 stopReplayThreads(); 616 617 if (replicationServerListener != null) 618 { 619 replicationServerListener.shutdown(); 620 } 621 622 DirectoryServer.deregisterBackupTaskListener(this); 623 DirectoryServer.deregisterRestoreTaskListener(this); 624 DirectoryServer.deregisterExportTaskListener(this); 625 DirectoryServer.deregisterImportTaskListener(this); 626 } 627 628 /** 629 * This method is called whenever the server detects a modification 630 * of the schema done by directly modifying the backing files 631 * of the schema backend. 632 * Call the schema Domain if it exists. 633 * 634 * @param modifications The list of modifications that was 635 * applied to the schema. 636 */ 637 @Override 638 public void processSchemaChange(List<Modification> modifications) 639 { 640 LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); 641 if (domain != null) 642 { 643 domain.synchronizeSchemaModifications(modifications); 644 } 645 } 646 647 @Override 648 public void processBackupBegin(Backend<?> backend, BackupConfig config) 649 { 650 for (DN dn : backend.getBaseDNs()) 651 { 652 LDAPReplicationDomain domain = findDomain(dn, null); 653 if (domain != null) 654 { 655 domain.backupStart(); 656 } 657 } 658 } 659 660 @Override 661 public void processBackupEnd(Backend<?> backend, BackupConfig config, boolean successful) 662 { 663 for (DN dn : backend.getBaseDNs()) 664 { 665 LDAPReplicationDomain domain = findDomain(dn, null); 666 if (domain != null) 667 { 668 domain.backupEnd(); 669 } 670 } 671 } 672 673 @Override 674 public void processRestoreBegin(Backend<?> backend, RestoreConfig config) 675 { 676 for (DN dn : backend.getBaseDNs()) 677 { 678 LDAPReplicationDomain domain = findDomain(dn, null); 679 if (domain != null) 680 { 681 domain.disable(); 682 } 683 } 684 } 685 686 @Override 687 public void processRestoreEnd(Backend<?> backend, RestoreConfig config, boolean successful) 688 { 689 for (DN dn : backend.getBaseDNs()) 690 { 691 LDAPReplicationDomain domain = findDomain(dn, null); 692 if (domain != null) 693 { 694 domain.enable(); 695 } 696 } 697 } 698 699 @Override 700 public void processImportBegin(Backend<?> backend, LDIFImportConfig config) 701 { 702 for (DN dn : backend.getBaseDNs()) 703 { 704 LDAPReplicationDomain domain = findDomain(dn, null); 705 if (domain != null) 706 { 707 domain.disable(); 708 } 709 } 710 } 711 712 @Override 713 public void processImportEnd(Backend<?> backend, LDIFImportConfig config, boolean successful) 714 { 715 for (DN dn : backend.getBaseDNs()) 716 { 717 LDAPReplicationDomain domain = findDomain(dn, null); 718 if (domain != null) 719 { 720 domain.enable(); 721 } 722 } 723 } 724 725 @Override 726 public void processExportBegin(Backend<?> backend, LDIFExportConfig config) 727 { 728 for (DN dn : backend.getBaseDNs()) 729 { 730 LDAPReplicationDomain domain = findDomain(dn, null); 731 if (domain != null) 732 { 733 domain.backupStart(); 734 } 735 } 736 } 737 738 @Override 739 public void processExportEnd(Backend<?> backend, LDIFExportConfig config, boolean successful) 740 { 741 for (DN dn : backend.getBaseDNs()) 742 { 743 LDAPReplicationDomain domain = findDomain(dn, null); 744 if (domain != null) 745 { 746 domain.backupEnd(); 747 } 748 } 749 } 750 751 @Override 752 public ConfigChangeResult applyConfigurationDelete( 753 ReplicationDomainCfg configuration) 754 { 755 deleteDomain(configuration.getBaseDN()); 756 757 return new ConfigChangeResult(); 758 } 759 760 @Override 761 public boolean isConfigurationDeleteAcceptable( 762 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 763 { 764 return true; 765 } 766 767 /** 768 * Generic code for all the postOperation entry point. 769 * 770 * @param operation The Operation for which the post-operation is called. 771 * @param dn The Dn for which the post-operation is called. 772 */ 773 private void genericPostOperation(PostOperationOperation operation, DN dn) 774 { 775 LDAPReplicationDomain domain = findDomain(dn, operation); 776 if (domain != null) { 777 domain.synchronize(operation); 778 } 779 } 780 781 /** 782 * Returns the replication server listener associated to that Multimaster 783 * Replication. 784 * @return the listener. 785 */ 786 public ReplicationServerListener getReplicationServerListener() 787 { 788 return replicationServerListener; 789 } 790 791 @Override 792 public boolean isConfigurationChangeAcceptable( 793 ReplicationSynchronizationProviderCfg configuration, 794 List<LocalizableMessage> unacceptableReasons) 795 { 796 return true; 797 } 798 799 @Override 800 public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration) 801 { 802 // Stop threads then restart new number of threads 803 stopReplayThreads(); 804 replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration); 805 if (!domains.isEmpty()) 806 { 807 createReplayThreads(); 808 } 809 810 connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(), 811 Integer.MAX_VALUE); 812 813 return new ConfigChangeResult(); 814 } 815 816 @Override 817 public void completeSynchronizationProvider() 818 { 819 for (LDAPReplicationDomain domain : domains.values()) 820 { 821 domain.start(); 822 } 823 setState(State.RUNNING); 824 } 825 826 private void setState(State newState) 827 { 828 state.set(newState); 829 synchronized (state) 830 { 831 state.notifyAll(); 832 } 833 } 834 835 /** 836 * Gets the number of handled domain objects. 837 * @return The number of handled domain objects 838 */ 839 public static int getNumberOfDomains() 840 { 841 return domains.size(); 842 } 843 844 /** 845 * Gets the Set of domain baseDN which are disabled for the external changelog. 846 * 847 * @return The Set of domain baseDNs which are disabled for the external changelog. 848 * @throws DirectoryException 849 * if a problem occurs 850 */ 851 public static Set<DN> getExcludedChangelogDomains() throws DirectoryException 852 { 853 final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1); 854 disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT)); 855 for (LDAPReplicationDomain domain : domains.values()) 856 { 857 if (!domain.isECLEnabled()) 858 { 859 disabledBaseDNs.add(domain.getBaseDN()); 860 } 861 } 862 return disabledBaseDNs; 863 } 864 865 /** 866 * Returns whether the provided baseDN represents a replication domain enabled 867 * for the external changelog. 868 * 869 * @param baseDN 870 * the replication domain to check 871 * @return true if the provided baseDN is enabled for the external changelog, 872 * false if the provided baseDN is disabled for the external changelog 873 * or unknown to multimaster replication. 874 */ 875 public static boolean isECLEnabledDomain(DN baseDN) 876 { 877 waitForStartup(); 878 // if state is STOPPING, then we need to return from this method 879 final LDAPReplicationDomain domain = domains.get(baseDN); 880 return domain != null && domain.isECLEnabled(); 881 } 882 883 /** 884 * Returns whether the external change-log contains data from at least a domain. 885 * @return whether the external change-log contains data from at least a domain 886 */ 887 public static boolean isECLEnabled() 888 { 889 waitForStartup(); 890 for (LDAPReplicationDomain domain : domains.values()) 891 { 892 if (domain.isECLEnabled()) 893 { 894 return true; 895 } 896 } 897 return false; 898 } 899 900 private static void waitForStartup() 901 { 902 if (State.STARTING.equals(state.get())) 903 { 904 synchronized (state) 905 { 906 while (State.STARTING.equals(state.get())) 907 { 908 try 909 { 910 state.wait(); 911 } 912 catch (InterruptedException ignored) 913 { 914 // loop and check state again 915 } 916 } 917 } 918 } 919 } 920 921 /** 922 * Returns the connection timeout in milli-seconds. 923 * 924 * @return The connection timeout in milli-seconds. 925 */ 926 public static int getConnectionTimeoutMS() 927 { 928 return connectionTimeoutMS; 929 } 930 931 static void updateReplicationServersCount(int numberOfRSes) 932 { 933 unreachableRSes.updateReplicationServersCount(numberOfRSes); 934 } 935 936 /** 937 * Returns temporarily unreachable Replication Servers. 938 * 939 * @return temporarily unreachable Replication Servers 940 */ 941 public static UnreachableReplicationServers getUnreachableReplicationServers() 942 { 943 return unreachableRSes; 944 } 945}