001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2017 ForgeRock AS. 016 */ 017package org.opends.server.replication.plugin; 018 019import static org.forgerock.opendj.ldap.ResultCode.*; 020import static org.opends.messages.ReplicationMessages.*; 021import static org.opends.messages.ToolMessages.*; 022import static org.opends.server.protocols.internal.InternalClientConnection.*; 023import static org.opends.server.protocols.internal.Requests.*; 024import static org.opends.server.replication.plugin.EntryHistorical.*; 025import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT; 026import static org.opends.server.schema.SchemaConstants.EXTMR_HISTORICAL_CSN_RANGE_OID; 027import static org.opends.server.util.CollectionUtils.*; 028import static org.opends.server.util.ServerConstants.*; 029import static org.opends.server.util.StaticUtils.*; 030 031import java.io.File; 032import java.io.InputStream; 033import java.io.OutputStream; 034import java.io.StringReader; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Date; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.LinkedHashMap; 043import java.util.LinkedHashSet; 044import java.util.LinkedList; 045import java.util.List; 046import java.util.Map; 047import java.util.NoSuchElementException; 048import java.util.Set; 049import java.util.SortedMap; 050import java.util.StringTokenizer; 051import java.util.TreeMap; 052import java.util.concurrent.BlockingQueue; 053import java.util.concurrent.TimeUnit; 054import java.util.concurrent.TimeoutException; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.atomic.AtomicInteger; 057import java.util.concurrent.atomic.AtomicReference; 058import java.util.zip.DataFormatException; 059 060import org.forgerock.i18n.LocalizableMessage; 061import org.forgerock.i18n.LocalizedIllegalArgumentException; 062import org.forgerock.i18n.slf4j.LocalizedLogger; 063import org.forgerock.opendj.adapter.server3x.Converters; 064import org.forgerock.opendj.config.server.ConfigChangeResult; 065import org.forgerock.opendj.config.server.ConfigException; 066import org.forgerock.opendj.config.server.ConfigurationChangeListener; 067import org.forgerock.opendj.ldap.AVA; 068import org.forgerock.opendj.ldap.AttributeDescription; 069import org.forgerock.opendj.ldap.ByteString; 070import org.forgerock.opendj.ldap.DN; 071import org.forgerock.opendj.ldap.DecodeException; 072import org.forgerock.opendj.ldap.ModificationType; 073import org.forgerock.opendj.ldap.RDN; 074import org.forgerock.opendj.ldap.ResultCode; 075import org.forgerock.opendj.ldap.SearchScope; 076import org.forgerock.opendj.ldap.schema.AttributeType; 077import org.forgerock.opendj.ldap.schema.ObjectClass; 078import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.IsolationPolicy; 079import org.forgerock.opendj.server.config.server.ExternalChangelogDomainCfg; 080import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; 081import org.opends.server.api.AlertGenerator; 082import org.opends.server.api.Backend; 083import org.opends.server.api.Backend.BackendOperation; 084import org.opends.server.api.BackendInitializationListener; 085import org.opends.server.api.DirectoryThread; 086import org.opends.server.api.MonitorData; 087import org.opends.server.api.ServerShutdownListener; 088import org.opends.server.api.SynchronizationProvider; 089import org.opends.server.backends.task.Task; 090import org.opends.server.config.ConfigConstants; 091import org.opends.server.config.ConfigurationHandler; 092import org.opends.server.controls.PagedResultsControl; 093import org.opends.server.core.AddOperation; 094import org.opends.server.core.DeleteOperation; 095import org.opends.server.core.DirectoryServer; 096import org.opends.server.core.LockFileManager; 097import org.opends.server.core.ModifyDNOperation; 098import org.opends.server.core.ModifyDNOperationBasis; 099import org.opends.server.core.ModifyOperation; 100import org.opends.server.core.ModifyOperationBasis; 101import org.opends.server.protocols.internal.InternalClientConnection; 102import org.opends.server.protocols.internal.InternalSearchListener; 103import org.opends.server.protocols.internal.InternalSearchOperation; 104import org.opends.server.protocols.internal.Requests; 105import org.opends.server.protocols.internal.SearchRequest; 106import org.opends.server.protocols.ldap.LDAPAttribute; 107import org.opends.server.protocols.ldap.LDAPControl; 108import org.opends.server.protocols.ldap.LDAPFilter; 109import org.opends.server.protocols.ldap.LDAPModification; 110import org.opends.server.replication.common.CSN; 111import org.opends.server.replication.common.ServerState; 112import org.opends.server.replication.common.ServerStatus; 113import org.opends.server.replication.common.StatusMachineEvent; 114import org.opends.server.replication.protocol.AddContext; 115import org.opends.server.replication.protocol.AddMsg; 116import org.opends.server.replication.protocol.DeleteContext; 117import org.opends.server.replication.protocol.DeleteMsg; 118import org.opends.server.replication.protocol.LDAPUpdateMsg; 119import org.opends.server.replication.protocol.ModifyContext; 120import org.opends.server.replication.protocol.ModifyDNMsg; 121import org.opends.server.replication.protocol.ModifyDnContext; 122import org.opends.server.replication.protocol.ModifyMsg; 123import org.opends.server.replication.protocol.OperationContext; 124import org.opends.server.replication.protocol.RoutableMsg; 125import org.opends.server.replication.protocol.UpdateMsg; 126import org.opends.server.replication.service.DSRSShutdownSync; 127import org.opends.server.replication.service.ReplicationBroker; 128import org.opends.server.replication.service.ReplicationDomain; 129import org.opends.server.tasks.PurgeConflictsHistoricalTask; 130import org.opends.server.tasks.TaskUtils; 131import org.opends.server.types.AdditionalLogItem; 132import org.opends.server.types.Attribute; 133import org.opends.server.types.AttributeBuilder; 134import org.opends.server.types.Attributes; 135import org.opends.server.types.Control; 136import org.opends.server.types.DirectoryException; 137import org.opends.server.types.Entry; 138import org.opends.server.types.ExistingFileBehavior; 139import org.opends.server.types.LDAPException; 140import org.opends.server.types.LDIFExportConfig; 141import org.opends.server.types.LDIFImportConfig; 142import org.opends.server.types.Modification; 143import org.opends.server.types.Operation; 144import org.opends.server.types.OperationType; 145import org.opends.server.types.RawModification; 146import org.opends.server.types.Schema; 147import org.opends.server.types.SearchFilter; 148import org.opends.server.types.SearchResultEntry; 149import org.opends.server.types.SearchResultReference; 150import org.opends.server.types.SynchronizationProviderResult; 151import org.opends.server.types.operation.PluginOperation; 152import org.opends.server.types.operation.PostOperationAddOperation; 153import org.opends.server.types.operation.PostOperationDeleteOperation; 154import org.opends.server.types.operation.PostOperationModifyDNOperation; 155import org.opends.server.types.operation.PostOperationModifyOperation; 156import org.opends.server.types.operation.PostOperationOperation; 157import org.opends.server.types.operation.PreOperationAddOperation; 158import org.opends.server.types.operation.PreOperationDeleteOperation; 159import org.opends.server.types.operation.PreOperationModifyDNOperation; 160import org.opends.server.types.operation.PreOperationModifyOperation; 161import org.opends.server.util.LDIFReader; 162import org.opends.server.util.TimeThread; 163import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; 164 165/** 166 * This class implements the bulk part of the Directory Server side 167 * of the replication code. 168 * It contains the root method for publishing a change, 169 * processing a change received from the replicationServer service, 170 * handle conflict resolution, 171 * handle protocol messages from the replicationServer. 172 * <p> 173 * FIXME Move this class to org.opends.server.replication.service 174 * or the equivalent package once this code is moved to a maven module. 175 */ 176public final class LDAPReplicationDomain extends ReplicationDomain 177 implements ConfigurationChangeListener<ReplicationDomainCfg>, 178 AlertGenerator, BackendInitializationListener, ServerShutdownListener 179{ 180 /** 181 * Set of attributes that will return all the user attributes and the 182 * replication related operational attributes when used in a search operation. 183 */ 184 private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS = 185 newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*"); 186 187 /** 188 * Initializing replication for the domain initiates backend finalization/initialization 189 * This flag prevents the Replication Domain to disable/enable itself when 190 * it is the event initiator 191 */ 192 private boolean ignoreBackendInitializationEvent; 193 194 private volatile boolean serverShutdownRequested; 195 196 @Override 197 public String getShutdownListenerName() { 198 return "LDAPReplicationDomain " + getBaseDN(); 199 } 200 201 @Override 202 public void processServerShutdown(LocalizableMessage reason) { 203 serverShutdownRequested = true; 204 } 205 206 207 /** 208 * This class is used in the session establishment phase 209 * when no Replication Server with all the local changes has been found 210 * and we therefore need to recover them. 211 * A search is then performed on the database using this 212 * internalSearchListener. 213 */ 214 private class ScanSearchListener implements InternalSearchListener 215 { 216 private final CSN startCSN; 217 private final CSN endCSN; 218 219 public ScanSearchListener(CSN startCSN, CSN endCSN) 220 { 221 this.startCSN = startCSN; 222 this.endCSN = endCSN; 223 } 224 225 @Override 226 public void handleInternalSearchEntry( 227 InternalSearchOperation searchOperation, SearchResultEntry searchEntry) 228 throws DirectoryException 229 { 230 // Build the list of Operations that happened on this entry after startCSN 231 // and before endCSN and add them to the replayOperations list 232 Iterable<FakeOperation> updates = generateFakeOperations(searchEntry, getServerId()); 233 234 for (FakeOperation op : updates) 235 { 236 CSN csn = op.getCSN(); 237 if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN)) 238 { 239 synchronized (replayOperations) 240 { 241 replayOperations.put(csn, op); 242 } 243 } 244 } 245 } 246 247 @Override 248 public void handleInternalSearchReference( 249 InternalSearchOperation searchOperation, 250 SearchResultReference searchReference) throws DirectoryException 251 { 252 // Nothing to do. 253 } 254 } 255 256 @Override 257 public void performBackendPreInitializationProcessing(Backend<?> backend) { 258 // Nothing to do 259 } 260 261 @Override 262 public void performBackendPostFinalizationProcessing(Backend<?> backend) { 263 // Nothing to do 264 } 265 266 @Override 267 public void performBackendPostInitializationProcessing(Backend<?> backend) { 268 if (!ignoreBackendInitializationEvent 269 && getBackend().getBackendID().equals(backend.getBackendID())) { 270 enable(); 271 } 272 } 273 274 @Override 275 public void performBackendPreFinalizationProcessing(Backend<?> backend) { 276 // Do not disable itself during a shutdown 277 // And ignore the event if this replica is the event trigger (e.g. importing). 278 if (!ignoreBackendInitializationEvent 279 && !serverShutdownRequested 280 && getBackend().getBackendID().equals(backend.getBackendID())) { 281 disable(); 282 } 283 } 284 285 /** The fully-qualified name of this class. */ 286 private static final String CLASS_NAME = LDAPReplicationDomain.class.getName(); 287 288 /** 289 * The attribute used to mark conflicting entries. 290 * The value of this attribute should be the dn that this entry was 291 * supposed to have when it was marked as conflicting. 292 */ 293 public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; 294 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 295 296 private final DSRSShutdownSync dsrsShutdownSync; 297 /** 298 * The update to replay message queue where the listener thread is going to 299 * push incoming update messages. 300 */ 301 private final BlockingQueue<UpdateToReplay> updateToReplayQueue; 302 /** The number of naming conflicts successfully resolved. */ 303 private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); 304 /** The number of modify conflicts successfully resolved. */ 305 private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); 306 /** The number of unresolved naming conflicts. */ 307 private final AtomicInteger numUnresolvedNamingConflicts = 308 new AtomicInteger(); 309 /** The number of updates replayed successfully by the replication. */ 310 private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger(); 311 312 private final PersistentServerState state; 313 private volatile boolean generationIdSavedStatus; 314 315 /** 316 * This object is used to store the list of update currently being done on the local database. 317 * It is useful to make sure that the local operations are sent in a correct order to the 318 * replication server and that the ServerState is not updated too early. 319 */ 320 private final PendingChanges pendingChanges; 321 private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null); 322 323 /** 324 * It contain the updates that were done on other servers, transmitted by the 325 * replication server and that are currently replayed. 326 * <p> 327 * It is useful to make sure that dependencies between operations are 328 * correctly fulfilled and to make sure that the ServerState is not updated 329 * too early. 330 */ 331 private final RemotePendingChanges remotePendingChanges; 332 private boolean solveConflictFlag = true; 333 334 private final InternalClientConnection conn = getRootConnection(); 335 private final AtomicBoolean shutdown = new AtomicBoolean(); 336 private volatile boolean disabled; 337 338 /** 339 * This list is used to temporary store operations that needs to be replayed 340 * at session establishment time. 341 */ 342 private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>(); 343 344 private ExternalChangelogDomain eclDomain; 345 346 /** A boolean indicating if the thread used to save the persistentServerState is terminated. */ 347 private volatile boolean done = true; 348 349 private final ServerStateFlush flushThread; 350 351 /** The attribute name used to store the generation id in the backend. */ 352 private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id"; 353 /** The attribute name used to store the fractional include configuration in the backend. */ 354 static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include"; 355 /** The attribute name used to store the fractional exclude configuration in the backend. */ 356 static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude"; 357 358 /** 359 * Fractional replication variables. 360 */ 361 362 /** Holds the fractional configuration for this domain, if any. */ 363 private final FractionalConfig fractionalConfig; 364 365 /** The list of attributes that cannot be used in fractional replication configuration. */ 366 private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[] 367 { 368 "objectClass", 369 "2.5.4.0" // objectClass OID 370 }; 371 372 /** 373 * When true, this flag is used to force the domain status to be put in bad 374 * data set just after the connection to the replication server. 375 * This must be used when fractional replication is enabled with a 376 * configuration different from the previous one (or at the very first 377 * fractional usage time) : after connection, a ChangeStatusMsg is sent 378 * requesting the bad data set status. Then none of the update messages 379 * received from the replication server are taken into account until the 380 * backend is synchronized with brand new data set compliant with the new 381 * fractional configuration (i.e with compliant fractional configuration in 382 * domain root entry). 383 */ 384 private boolean forceBadDataSet; 385 386 /** 387 * The message id to be used when an import is stopped with error by 388 * the fractional replication ldif import plugin. 389 */ 390 private int importErrorMessageId = -1; 391 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */ 392 static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1; 393 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */ 394 static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2; 395 396 /* 397 * Definitions for the return codes of the 398 * fractionalFilterOperation(PreOperationModifyOperation 399 * modifyOperation, boolean performFiltering) method 400 */ 401 /** 402 * The operation contains attributes subject to fractional filtering according 403 * to the fractional configuration. 404 */ 405 private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1; 406 /** 407 * The operation contains no attributes subject to fractional filtering 408 * according to the fractional configuration. 409 */ 410 private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2; 411 /** The operation should become a no-op. */ 412 private static final int FRACTIONAL_BECOME_NO_OP = 3; 413 414 /** 415 * The last CSN purged in this domain. Allows to have a continuous purging 416 * process from one purge processing (task run) to the next one. Values 0 when 417 * the server starts. 418 */ 419 private CSN lastCSNPurgedFromHist = CSN.MIN_VALUE; 420 421 /** 422 * The thread that periodically saves the ServerState of this 423 * LDAPReplicationDomain in the database. 424 */ 425 private class ServerStateFlush extends DirectoryThread 426 { 427 protected ServerStateFlush() 428 { 429 super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\""); 430 } 431 432 @Override 433 public void run() 434 { 435 done = false; 436 437 while (!isShutdownInitiated()) 438 { 439 try 440 { 441 synchronized (this) 442 { 443 wait(1000); 444 if (!disabled && !ieRunning()) 445 { 446 state.save(); 447 } 448 } 449 } 450 catch (InterruptedException e) 451 { 452 // Thread interrupted: check for shutdown. 453 Thread.currentThread().interrupt(); 454 } 455 } 456 state.save(); 457 458 done = true; 459 } 460 } 461 462 /** 463 * The thread that is responsible to update the RS to which this domain is 464 * connected in case it is late and there is no RS which is up to date. 465 */ 466 private class RSUpdater extends DirectoryThread 467 { 468 private final CSN startCSN; 469 470 protected RSUpdater(CSN replServerMaxCSN) 471 { 472 super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\""); 473 this.startCSN = replServerMaxCSN; 474 } 475 476 @Override 477 public void run() 478 { 479 // Replication server is missing some of our changes: 480 // let's send them to him. 481 logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES); 482 483 /* 484 * Get all the changes that have not been seen by this 485 * replication server and publish them. 486 */ 487 try 488 { 489 if (buildAndPublishMissingChanges(startCSN, broker)) 490 { 491 logger.trace(DEBUG_CHANGES_SENT); 492 synchronized(replayOperations) 493 { 494 replayOperations.clear(); 495 } 496 } 497 else 498 { 499 /* 500 * An error happened trying to search for the updates 501 * This server will start accepting again new updates but 502 * some inconsistencies will stay between servers. 503 * Log an error for the repair tool 504 * that will need to re-synchronize the servers. 505 */ 506 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 507 } 508 } 509 catch (Exception e) 510 { 511 /* 512 * An error happened trying to search for the updates 513 * This server will start accepting again new updates but 514 * some inconsistencies will stay between servers. 515 * Log an error for the repair tool 516 * that will need to re-synchronize the servers. 517 */ 518 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 519 } 520 finally 521 { 522 broker.setRecoveryRequired(false); 523 // RSUpdater thread has finished its work, let's remove it from memory 524 // so another RSUpdater thread can be started if needed. 525 rsUpdater.compareAndSet(this, null); 526 } 527 } 528 } 529 530 /** 531 * Creates a new ReplicationDomain using configuration from configEntry. 532 * 533 * @param configuration The configuration of this ReplicationDomain. 534 * @param updateToReplayQueue The queue for update messages to replay. 535 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 536 * @throws ConfigException In case of invalid configuration. 537 */ 538 LDAPReplicationDomain(ReplicationDomainCfg configuration, 539 BlockingQueue<UpdateToReplay> updateToReplayQueue, 540 DSRSShutdownSync dsrsShutdownSync) throws ConfigException 541 { 542 super(configuration, -1); 543 544 this.updateToReplayQueue = updateToReplayQueue; 545 this.dsrsShutdownSync = dsrsShutdownSync; 546 547 // Get assured configuration 548 readAssuredConfig(configuration, false); 549 550 // Get fractional configuration 551 fractionalConfig = new FractionalConfig(getBaseDN()); 552 readFractionalConfig(configuration, false); 553 storeECLConfiguration(configuration); 554 solveConflictFlag = isSolveConflict(configuration); 555 556 Backend<?> backend = getBackend(); 557 if (backend == null) 558 { 559 throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN())); 560 } 561 562 try 563 { 564 generationId = loadGenerationId(); 565 } 566 catch (DirectoryException e) 567 { 568 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 569 } 570 571 /* 572 * Create a new Persistent Server State that will be used to store 573 * the last CSN seen from all LDAP servers in the topology. 574 */ 575 state = new PersistentServerState(getBaseDN(), getServerId(), 576 getServerState()); 577 flushThread = new ServerStateFlush(); 578 579 /* 580 * CSNGenerator is used to create new unique CSNs for each operation done on 581 * this replication domain. 582 * 583 * The generator time is adjusted to the time of the last CSN received from 584 * remote other servers. 585 */ 586 pendingChanges = new PendingChanges(getGenerator(), this); 587 remotePendingChanges = new RemotePendingChanges(getServerState()); 588 589 // listen for changes on the configuration 590 configuration.addChangeListener(this); 591 592 // register as an AlertGenerator 593 DirectoryServer.registerAlertGenerator(this); 594 595 DirectoryServer.registerBackendInitializationListener(this); 596 DirectoryServer.registerShutdownListener(this); 597 598 startPublishService(); 599 } 600 601 /** 602 * Modify conflicts are solved for all suffixes but the schema suffix because 603 * we don't want to store extra information in the schema ldif files. This has 604 * no negative impact because the changes on schema should not produce 605 * conflicts. 606 */ 607 private boolean isSolveConflict(ReplicationDomainCfg cfg) 608 { 609 return !getBaseDN().equals(DirectoryServer.getSchemaDN()) 610 && cfg.isSolveConflicts(); 611 } 612 613 /** 614 * Sets the error message id to be used when online import is stopped with 615 * error by the fractional replication ldif import plugin. 616 * @param importErrorMessageId The message to use. 617 */ 618 void setImportErrorMessageId(int importErrorMessageId) 619 { 620 this.importErrorMessageId = importErrorMessageId; 621 } 622 623 /** 624 * This flag is used by the fractional replication ldif import plugin to stop 625 * the (online) import process if a fractional configuration inconsistency is 626 * detected by it. 627 * 628 * @return true if the online import currently in progress should continue, 629 * false otherwise. 630 */ 631 private boolean isFollowImport() 632 { 633 return importErrorMessageId == -1; 634 } 635 636 /** 637 * Gets and stores the fractional replication configuration parameters. 638 * @param configuration The configuration object 639 * @param allowReconnection Tells if one must reconnect if significant changes 640 * occurred 641 */ 642 private void readFractionalConfig(ReplicationDomainCfg configuration, 643 boolean allowReconnection) 644 { 645 // Read the configuration entry 646 FractionalConfig newFractionalConfig; 647 try 648 { 649 newFractionalConfig = FractionalConfig.toFractionalConfig(configuration); 650 } 651 catch(ConfigException e) 652 { 653 // Should not happen as normally already called without problem in 654 // isConfigurationChangeAcceptable or isConfigurationAcceptable 655 // if we come up to this method 656 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 657 return; 658 } 659 660 /** 661 * Is there any change in fractional configuration ? 662 */ 663 664 // Compute current configuration 665 boolean needReconnection; 666 try 667 { 668 needReconnection = !FractionalConfig. 669 isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig); 670 } 671 catch (ConfigException e) 672 { 673 // Should not happen 674 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 675 return; 676 } 677 678 // Disable service if configuration changed 679 final boolean needRestart = needReconnection && allowReconnection; 680 if (needRestart) 681 { 682 disableService(); 683 } 684 // Set new configuration 685 int newFractionalMode = newFractionalConfig.fractionalConfigToInt(); 686 fractionalConfig.setFractional(newFractionalMode != 687 FractionalConfig.NOT_FRACTIONAL); 688 if (fractionalConfig.isFractional()) 689 { 690 // Set new fractional configuration values 691 fractionalConfig.setFractionalExclusive( 692 newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 693 fractionalConfig.setFractionalSpecificClassesAttributes( 694 newFractionalConfig.getFractionalSpecificClassesAttributes()); 695 fractionalConfig.setFractionalAllClassesAttributes( 696 newFractionalConfig.fractionalAllClassesAttributes); 697 } else 698 { 699 // Reset default values 700 fractionalConfig.setFractionalExclusive(true); 701 fractionalConfig.setFractionalSpecificClassesAttributes( 702 new HashMap<String, Set<String>>()); 703 fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>()); 704 } 705 706 // Reconnect if required 707 if (needRestart) 708 { 709 enableService(); 710 } 711 } 712 713 /** 714 * Return true if the fractional configuration stored in the domain root 715 * entry of the backend is equivalent to the fractional configuration stored 716 * in the local variables. 717 */ 718 private boolean isBackendFractionalConfigConsistent() 719 { 720 // Read config stored in domain root entry 721 if (logger.isTraceEnabled()) 722 { 723 logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN()); 724 } 725 726 // Search the domain root entry that is used to save the generation id 727 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 728 .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE); 729 InternalSearchOperation search = conn.processSearch(request); 730 731 if (search.getResultCode() != ResultCode.SUCCESS 732 && search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 733 { 734 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 735 logger.error(ERR_SEARCHING_GENERATION_ID, getBaseDN(), errorMsg); 736 return false; 737 } 738 739 SearchResultEntry resultEntry = findReplicationSearchResultEntry(search); 740 if (resultEntry == null) 741 { 742 /* 743 * The backend is probably empty: if there is some fractional 744 * configuration in memory, we do not let the domain being connected, 745 * otherwise, it's ok 746 */ 747 return !fractionalConfig.isFractional(); 748 } 749 750 // Now extract fractional configuration if any 751 Iterator<ByteString> exclIt = getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE); 752 Iterator<ByteString> inclIt = getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE); 753 754 // Compare backend and local fractional configuration 755 return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt); 756 } 757 758 private SearchResultEntry findReplicationSearchResultEntry( 759 InternalSearchOperation searchOperation) 760 { 761 final SearchResultEntry resultEntry = getFirstResult(searchOperation); 762 if (resultEntry != null) 763 { 764 AttributeType synchronizationGenIDType = DirectoryServer.getSchema().getAttributeType(REPLICATION_GENERATION_ID); 765 List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType); 766 if (!attrs.isEmpty()) 767 { 768 Attribute attr = attrs.get(0); 769 if (attr.size() == 1) 770 { 771 return resultEntry; 772 } 773 if (attr.size() > 1) 774 { 775 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 776 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 777 } 778 } 779 } 780 return null; 781 } 782 783 private Iterator<ByteString> getAttributeValueIterator(SearchResultEntry resultEntry, String attrName) 784 { 785 AttributeType attrType = DirectoryServer.getSchema().getAttributeType(attrName); 786 List<Attribute> exclAttrs = resultEntry.getAttribute(attrType); 787 if (!exclAttrs.isEmpty()) 788 { 789 Attribute exclAttr = exclAttrs.get(0); 790 if (exclAttr != null) 791 { 792 return exclAttr.iterator(); 793 } 794 } 795 return null; 796 } 797 798 /** 799 * Return true if the fractional configuration passed as fractional 800 * configuration attribute values is equivalent to the fractional 801 * configuration stored in the local variables. 802 * @param fractionalConfig The local fractional configuration 803 * @param exclIt Fractional exclude mode configuration attribute values to 804 * analyze. 805 * @param inclIt Fractional include mode configuration attribute values to 806 * analyze. 807 * @return True if the fractional configuration passed as fractional 808 * configuration attribute values is equivalent to the fractional 809 * configuration stored in the local variables. 810 */ 811 static boolean isFractionalConfigConsistent( 812 FractionalConfig fractionalConfig, Iterator<ByteString> exclIt, Iterator<ByteString> inclIt) 813 { 814 // Parse fractional configuration stored in passed fractional configuration attributes values 815 Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>(); 816 Set<String> storedFractionalAllClassesAttributes = new HashSet<>(); 817 818 int storedFractionalMode; 819 try 820 { 821 storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt, 822 inclIt, storedFractionalSpecificClassesAttributes, 823 storedFractionalAllClassesAttributes); 824 } catch (ConfigException e) 825 { 826 // Should not happen as configuration in domain root entry is flushed 827 // from valid configuration in local variables 828 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 829 return false; 830 } 831 832 FractionalConfig storedFractionalConfig = new FractionalConfig( 833 fractionalConfig.getBaseDn()); 834 storedFractionalConfig.setFractional(storedFractionalMode != 835 FractionalConfig.NOT_FRACTIONAL); 836 // Set stored fractional configuration values 837 if (storedFractionalConfig.isFractional()) 838 { 839 storedFractionalConfig.setFractionalExclusive( 840 storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 841 } 842 storedFractionalConfig.setFractionalSpecificClassesAttributes( 843 storedFractionalSpecificClassesAttributes); 844 storedFractionalConfig.setFractionalAllClassesAttributes( 845 storedFractionalAllClassesAttributes); 846 847 /* 848 * Compare configuration stored in passed fractional configuration 849 * attributes with local variable one 850 */ 851 try 852 { 853 return FractionalConfig. 854 isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig); 855 } catch (ConfigException e) 856 { 857 // Should not happen as configuration in domain root entry is flushed 858 // from valid configuration in local variables so both should have already 859 // been checked 860 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 861 return false; 862 } 863 } 864 865 /** 866 * Compare 2 attribute collections and returns true if they are equivalent. 867 * 868 * @param attributes1 869 * First attribute collection to compare. 870 * @param attributes2 871 * Second attribute collection to compare. 872 * @return True if both attribute collection are equivalent. 873 * @throws ConfigException 874 * If some attributes could not be retrieved from the schema. 875 */ 876 private static boolean areAttributesEquivalent( 877 Collection<String> attributes1, Collection<String> attributes2) 878 throws ConfigException 879 { 880 // Compare all classes attributes 881 if (attributes1.size() != attributes2.size()) 882 { 883 return false; 884 } 885 886 // Check consistency of all classes attributes 887 Schema schema = DirectoryServer.getSchema(); 888 /* 889 * For each attribute in attributes1, check there is the matching 890 * one in attributes2. 891 */ 892 for (String attrName1 : attributes1) 893 { 894 // Get attribute from attributes1 895 AttributeType attributeType1 = schema.getAttributeType(attrName1); 896 if (attributeType1.isPlaceHolder()) 897 { 898 throw new ConfigException( 899 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1)); 900 } 901 // Look for matching one in attributes2 902 boolean foundAttribute = false; 903 for (String attrName2 : attributes2) 904 { 905 AttributeType attributeType2 = schema.getAttributeType(attrName2); 906 if (attributeType2.isPlaceHolder()) 907 { 908 throw new ConfigException( 909 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2)); 910 } 911 if (attributeType1.equals(attributeType2)) 912 { 913 foundAttribute = true; 914 break; 915 } 916 } 917 // Found matching attribute ? 918 if (!foundAttribute) 919 { 920 return false; 921 } 922 } 923 924 return true; 925 } 926 927 /** 928 * Check that the passed fractional configuration is acceptable 929 * regarding configuration syntax, schema constraints... 930 * Throws an exception if the configuration is not acceptable. 931 * @param configuration The configuration to analyze. 932 * @throws org.opends.server.config.ConfigException if the configuration is 933 * not acceptable. 934 */ 935 private static void isFractionalConfigAcceptable( 936 ReplicationDomainCfg configuration) throws ConfigException 937 { 938 /* 939 * Parse fractional configuration 940 */ 941 942 // Read the configuration entry 943 FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig( 944 configuration); 945 946 if (!newFractionalConfig.isFractional()) 947 { 948 // Nothing to check 949 return; 950 } 951 952 // Prepare variables to be filled with config 953 Map<String, Set<String>> newFractionalSpecificClassesAttributes = 954 newFractionalConfig.getFractionalSpecificClassesAttributes(); 955 Set<String> newFractionalAllClassesAttributes = 956 newFractionalConfig.getFractionalAllClassesAttributes(); 957 958 /* 959 * Check attributes consistency : we only allow to filter MAY (optional) 960 * attributes of a class : to be compliant with the schema, no MUST 961 * (mandatory) attribute can be filtered by fractional replication. 962 */ 963 964 // Check consistency of specific classes attributes 965 Schema schema = DirectoryServer.getSchema(); 966 int fractionalMode = newFractionalConfig.fractionalConfigToInt(); 967 for (String className : newFractionalSpecificClassesAttributes.keySet()) 968 { 969 // Does the class exist ? 970 ObjectClass fractionalClass = schema.getObjectClass(className); 971 if (fractionalClass.isPlaceHolder()) 972 { 973 throw new ConfigException( 974 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className)); 975 } 976 977 boolean isExtensibleObjectClass = fractionalClass.isExtensible(); 978 979 Set<String> attributes = 980 newFractionalSpecificClassesAttributes.get(className); 981 982 for (String attrName : attributes) 983 { 984 // Not a prohibited attribute ? 985 if (isFractionalProhibitedAttr(attrName)) 986 { 987 throw new ConfigException( 988 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 989 } 990 991 // Does the attribute exist ? 992 AttributeType attributeType = schema.getAttributeType(attrName); 993 if (!attributeType.isPlaceHolder()) 994 { 995 // No more checking for the extensibleObject class 996 if (!isExtensibleObjectClass 997 && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL 998 // Exclusive mode : the attribute must be optional 999 && !fractionalClass.isOptional(attributeType)) 1000 { 1001 throw new ConfigException( 1002 NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName, 1003 className)); 1004 } 1005 } 1006 else 1007 { 1008 throw new ConfigException( 1009 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 1010 } 1011 } 1012 } 1013 1014 // Check consistency of all classes attributes 1015 for (String attrName : newFractionalAllClassesAttributes) 1016 { 1017 // Not a prohibited attribute ? 1018 if (isFractionalProhibitedAttr(attrName)) 1019 { 1020 throw new ConfigException( 1021 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 1022 } 1023 1024 // Does the attribute exist ? 1025 if (schema.getAttributeType(attrName) == null) 1026 { 1027 throw new ConfigException( 1028 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Test if the passed attribute is not allowed to be used in configuration of 1035 * fractional replication. 1036 * @param attr Attribute to test. 1037 * @return true if the attribute is prohibited. 1038 */ 1039 private static boolean isFractionalProhibitedAttr(String attr) 1040 { 1041 for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES) 1042 { 1043 if (forbiddenAttr.equalsIgnoreCase(attr)) 1044 { 1045 return true; 1046 } 1047 } 1048 return false; 1049 } 1050 1051 /** 1052 * If fractional replication is enabled, this analyzes the operation and 1053 * suppresses the forbidden attributes in it so that they are not added in 1054 * the local backend. 1055 * 1056 * @param addOperation The operation to modify based on fractional 1057 * replication configuration 1058 * @param performFiltering Tells if the effective attribute filtering should 1059 * be performed or if the call is just to analyze if there are some 1060 * attributes filtered by fractional configuration 1061 * @return true if the operation contains some attributes subject to filtering 1062 * by the fractional configuration 1063 */ 1064 private boolean fractionalFilterOperation( 1065 PreOperationAddOperation addOperation, boolean performFiltering) 1066 { 1067 return fractionalRemoveAttributesFromEntry(fractionalConfig, 1068 addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(), 1069 addOperation.getUserAttributes(), performFiltering); 1070 } 1071 1072 /** 1073 * If fractional replication is enabled, this analyzes the operation and 1074 * suppresses the forbidden attributes in it so that they are not added in 1075 * the local backend. 1076 * 1077 * @param modifyDNOperation The operation to modify based on fractional 1078 * replication configuration 1079 * @param performFiltering Tells if the effective modifications should 1080 * be performed or if the call is just to analyze if there are some 1081 * inconsistency with fractional configuration 1082 * @return true if the operation is inconsistent with fractional 1083 * configuration 1084 */ 1085 private boolean fractionalFilterOperation( 1086 PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering) 1087 { 1088 // Quick exit if not called for analyze and 1089 if (performFiltering && modifyDNOperation.deleteOldRDN()) 1090 { 1091 // The core will remove any occurrence of attribute that was part of the 1092 // old RDN, nothing more to do. 1093 return true; // Will not be used as analyze was not requested 1094 } 1095 1096 // Create a list of filtered attributes for this entry 1097 Entry concernedEntry = modifyDNOperation.getOriginalEntry(); 1098 Set<AttributeType> fractionalConcernedAttributes = 1099 createFractionalConcernedAttrList(fractionalConfig, 1100 concernedEntry.getObjectClasses().keySet()); 1101 1102 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1103 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1104 { 1105 // No attributes to filter 1106 return false; 1107 } 1108 1109 /* 1110 * Analyze the old and new rdn to see if they are some attributes to be 1111 * removed: if the oldRDN contains some forbidden attributes (for instance 1112 * it is possible if the entry was created with an add operation and the 1113 * RDN used contains a forbidden attribute: in this case the attribute value 1114 * has been kept to be consistent with the dn of the entry.) that are no 1115 * more part of the new RDN, we must remove any attribute of this type by 1116 * putting a modification to delete the attribute. 1117 */ 1118 1119 boolean inconsistentOperation = false; 1120 RDN rdn = modifyDNOperation.getEntryDN().rdn(); 1121 RDN newRdn = modifyDNOperation.getNewRDN(); 1122 1123 // Go through each attribute of the old RDN 1124 for (AVA ava : rdn) 1125 { 1126 AttributeType attributeType = ava.getAttributeType(); 1127 // Is it present in the fractional attributes established list ? 1128 boolean foundAttribute = 1129 fractionalConcernedAttributes.contains(attributeType); 1130 if (canRemoveAttribute(fractionalExclusive, foundAttribute) 1131 && !newRdn.hasAttributeType(attributeType) 1132 && !modifyDNOperation.deleteOldRDN()) 1133 { 1134 /* 1135 * A forbidden attribute is in the old RDN and no more in the new RDN, 1136 * and it has not been requested to remove attributes from old RDN: 1137 * let's remove the attribute from the entry to stay consistent with 1138 * fractional configuration 1139 */ 1140 Modification modification = new Modification(ModificationType.DELETE, 1141 Attributes.empty(attributeType)); 1142 modifyDNOperation.addModification(modification); 1143 inconsistentOperation = true; 1144 } 1145 } 1146 1147 return inconsistentOperation; 1148 } 1149 1150 /** 1151 * Remove attributes from an entry, according to the passed fractional 1152 * configuration. The entry is represented by the 2 passed parameters. 1153 * The attributes to be removed are removed using the remove method on the 1154 * passed iterator for the attributes in the entry. 1155 * @param fractionalConfig The fractional configuration to use 1156 * @param entryRdn The rdn of the entry to add 1157 * @param classes The object classes representing the entry to modify 1158 * @param attributesMap The map of attributes/values to be potentially removed 1159 * from the entry. 1160 * @param performFiltering Tells if the effective attribute filtering should 1161 * be performed or if the call is just an analyze to see if there are some 1162 * attributes filtered by fractional configuration 1163 * @return true if the operation contains some attributes subject to filtering 1164 * by the fractional configuration 1165 */ 1166 static boolean fractionalRemoveAttributesFromEntry( 1167 FractionalConfig fractionalConfig, RDN entryRdn, 1168 Map<ObjectClass,String> classes, Map<AttributeType, 1169 List<Attribute>> attributesMap, boolean performFiltering) 1170 { 1171 boolean hasSomeAttributesToFilter = false; 1172 /* 1173 * Prepare a list of attributes to be included/excluded according to the 1174 * fractional replication configuration 1175 */ 1176 1177 Set<AttributeType> fractionalConcernedAttributes = 1178 createFractionalConcernedAttrList(fractionalConfig, classes.keySet()); 1179 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1180 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1181 { 1182 return false; // No attributes to filter 1183 } 1184 1185 // Prepare list of object classes of the added entry 1186 Set<ObjectClass> entryClasses = classes.keySet(); 1187 1188 /* 1189 * Go through the user attributes and remove those that match filtered one 1190 * - exclude mode : remove only attributes that are in 1191 * fractionalConcernedAttributes 1192 * - include mode : remove any attribute that is not in 1193 * fractionalConcernedAttributes 1194 */ 1195 List<List<Attribute>> newRdnAttrLists = new ArrayList<>(); 1196 List<AttributeType> rdnAttrTypes = new ArrayList<>(); 1197 final Set<AttributeType> attrTypes = attributesMap.keySet(); 1198 for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();) 1199 { 1200 AttributeType attributeType = iter.next(); 1201 1202 // Only optional attributes may be removed 1203 if (isMandatoryAttribute(entryClasses, attributeType) 1204 // Do not remove an attribute if it is a prohibited one 1205 || isFractionalProhibited(attributeType) 1206 || !canRemoveAttribute(attributeType, fractionalExclusive, fractionalConcernedAttributes)) 1207 { 1208 continue; 1209 } 1210 1211 if (!performFiltering) 1212 { 1213 // The call was just to check : at least one attribute to filter 1214 // found, return immediately the answer; 1215 return true; 1216 } 1217 1218 // Do not remove an attribute/value that is part of the RDN of the 1219 // entry as it is forbidden 1220 if (entryRdn.hasAttributeType(attributeType)) 1221 { 1222 /* 1223 We must remove all values of the attributes map for this 1224 attribute type but the one that has the value which is in the RDN 1225 of the entry. In fact the (underlying )attribute list does not 1226 support remove so we have to create a new list, keeping only the 1227 attribute value which is the same as in the RDN 1228 */ 1229 ByteString rdnAttributeValue = 1230 entryRdn.getAttributeValue(attributeType); 1231 List<Attribute> attrList = attributesMap.get(attributeType); 1232 ByteString sameAttrValue = null; 1233 // Locate the attribute value identical to the one in the RDN 1234 for (Attribute attr : attrList) 1235 { 1236 if (attr.contains(rdnAttributeValue)) 1237 { 1238 for (ByteString attrValue : attr) { 1239 if (rdnAttributeValue.equals(attrValue)) { 1240 // Keep the value we want 1241 sameAttrValue = attrValue; 1242 } else { 1243 hasSomeAttributesToFilter = true; 1244 } 1245 } 1246 } 1247 else 1248 { 1249 hasSomeAttributesToFilter = true; 1250 } 1251 } 1252 // Recreate the attribute list with only the RDN attribute value 1253 if (sameAttrValue != null) 1254 // Paranoia check: should never be the case as we should always 1255 // find the attribute/value pair matching the pair in the RDN 1256 { 1257 // Construct and store new attribute list 1258 newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue)); 1259 /* 1260 Store matching attribute type 1261 The mapping will be done using object from rdnAttrTypes as key 1262 and object from newRdnAttrLists (at same index) as value in 1263 the user attribute map to be modified 1264 */ 1265 rdnAttrTypes.add(attributeType); 1266 } 1267 } 1268 else 1269 { 1270 // Found an attribute to remove, remove it from the list. 1271 iter.remove(); 1272 hasSomeAttributesToFilter = true; 1273 } 1274 } 1275 // Now overwrite the attribute values for the attribute types present in the 1276 // RDN, if there are some filtered attributes in the RDN 1277 for (int index = 0 ; index < rdnAttrTypes.size() ; index++) 1278 { 1279 attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index)); 1280 } 1281 return hasSomeAttributesToFilter; 1282 } 1283 1284 private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType) 1285 { 1286 for (ObjectClass objectClass : entryClasses) 1287 { 1288 if (objectClass.isRequired(attributeType)) 1289 { 1290 return true; 1291 } 1292 } 1293 return false; 1294 } 1295 1296 private static boolean isFractionalProhibited(AttributeType attrType) 1297 { 1298 String attributeName = attrType.getNameOrOID(); 1299 return (attributeName != null && isFractionalProhibitedAttr(attributeName)) 1300 || isFractionalProhibitedAttr(attrType.getOID()); 1301 } 1302 1303 private static boolean canRemoveAttribute(AttributeType attributeType, 1304 boolean fractionalExclusive, Set<AttributeType> fractionalConcernedAttributes) 1305 { 1306 // Now remove the attribute or modification if: 1307 // - exclusive mode and attribute is in configuration 1308 // - inclusive mode and attribute is not in configuration 1309 return canRemoveAttribute(fractionalExclusive, 1310 fractionalConcernedAttributes.contains(attributeType)); 1311 } 1312 1313 private static boolean canRemoveAttribute(boolean fractionalExclusive, boolean foundAttribute) 1314 { 1315 return (foundAttribute && fractionalExclusive) 1316 || (!foundAttribute && !fractionalExclusive); 1317 } 1318 1319 /** 1320 * Prepares a list of attributes of interest for the fractional feature. 1321 * @param fractionalConfig The fractional configuration to use 1322 * @param entryObjectClasses The object classes of an entry on which an 1323 * operation is going to be performed. 1324 * @return The list of attributes of the entry to be excluded/included 1325 * when the operation will be performed. 1326 */ 1327 private static Set<AttributeType> createFractionalConcernedAttrList( 1328 FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses) 1329 { 1330 /* 1331 * Is the concerned entry of a type concerned by fractional replication configuration ? 1332 * If yes, add the matching attribute names to a set of attributes 1333 * to take into account for filtering (inclusive or exclusive mode). 1334 * Using a Set to avoid duplicate attributes (from 2 inheriting classes for instance) 1335 */ 1336 Set<String> fractionalConcernedAttributes = new HashSet<>(); 1337 1338 // Get object classes the entry matches 1339 Set<String> fractionalAllClassesAttributes = 1340 fractionalConfig.getFractionalAllClassesAttributes(); 1341 Map<String, Set<String>> fractionalSpecificClassesAttributes = 1342 fractionalConfig.getFractionalSpecificClassesAttributes(); 1343 1344 Set<String> fractionalClasses = 1345 fractionalSpecificClassesAttributes.keySet(); 1346 for (ObjectClass entryObjectClass : entryObjectClasses) 1347 { 1348 for(String fractionalClass : fractionalClasses) 1349 { 1350 if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase())) 1351 { 1352 fractionalConcernedAttributes.addAll( 1353 fractionalSpecificClassesAttributes.get(fractionalClass)); 1354 } 1355 } 1356 } 1357 1358 // Add to the set any attribute which is class independent 1359 fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes); 1360 1361 Set<AttributeType> results = new HashSet<>(); 1362 for (String attrName : fractionalConcernedAttributes) 1363 { 1364 results.add(DirectoryServer.getSchema().getAttributeType(attrName)); 1365 } 1366 return results; 1367 } 1368 1369 /** 1370 * If fractional replication is enabled, this analyzes the operation and 1371 * suppresses the forbidden attributes in it so that they are not added/ 1372 * deleted/modified in the local backend. 1373 * 1374 * @param modifyOperation The operation to modify based on fractional 1375 * replication configuration 1376 * @param performFiltering Tells if the effective attribute filtering should 1377 * be performed or if the call is just to analyze if there are some 1378 * attributes filtered by fractional configuration 1379 * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES, 1380 * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP 1381 */ 1382 private int fractionalFilterOperation(PreOperationModifyOperation 1383 modifyOperation, boolean performFiltering) 1384 { 1385 /* 1386 * Prepare a list of attributes to be included/excluded according to the 1387 * fractional replication configuration 1388 */ 1389 1390 Entry modifiedEntry = modifyOperation.getCurrentEntry(); 1391 Set<AttributeType> fractionalConcernedAttributes = 1392 createFractionalConcernedAttrList(fractionalConfig, modifiedEntry.getObjectClasses().keySet()); 1393 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1394 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1395 { 1396 // No attributes to filter 1397 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1398 } 1399 1400 // Prepare list of object classes of the modified entry 1401 DN entryToModifyDn = modifyOperation.getEntryDN(); 1402 Entry entryToModify; 1403 try 1404 { 1405 entryToModify = DirectoryServer.getEntry(entryToModifyDn); 1406 } 1407 catch(DirectoryException e) 1408 { 1409 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 1410 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1411 } 1412 Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet(); 1413 1414 /* 1415 * Now go through the attribute modifications and filter the mods according 1416 * to the fractional configuration (using the just established concerned 1417 * attributes list): 1418 * - delete attributes: remove them if regarding a filtered attribute 1419 * - add attributes: remove them if regarding a filtered attribute 1420 * - modify attributes: remove them if regarding a filtered attribute 1421 */ 1422 1423 int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1424 List<Modification> mods = modifyOperation.getModifications(); 1425 Iterator<Modification> modsIt = mods.iterator(); 1426 while (modsIt.hasNext()) 1427 { 1428 Modification mod = modsIt.next(); 1429 Attribute attr = mod.getAttribute(); 1430 AttributeType attrType = attr.getAttributeDescription().getAttributeType(); 1431 // Fractional replication ignores operational attributes 1432 if (attrType.isOperational() 1433 || isMandatoryAttribute(entryClasses, attrType) 1434 || isFractionalProhibited(attrType) 1435 || !canRemoveAttribute(attrType, fractionalExclusive, 1436 fractionalConcernedAttributes)) 1437 { 1438 continue; 1439 } 1440 1441 if (!performFiltering) 1442 { 1443 // The call was just to check : at least one attribute to filter 1444 // found, return immediately the answer; 1445 return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1446 } 1447 1448 // Found a modification to remove, remove it from the list. 1449 modsIt.remove(); 1450 result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1451 if (mods.isEmpty()) 1452 { 1453 // This operation must become a no-op as no more modification in it 1454 return FRACTIONAL_BECOME_NO_OP; 1455 } 1456 } 1457 1458 return result; 1459 } 1460 1461 /** 1462 * This is overwritten to allow stopping the (online) import process by the 1463 * fractional ldif import plugin when it detects that the (imported) remote 1464 * data set is not consistent with the local fractional configuration. 1465 * {@inheritDoc} 1466 */ 1467 @Override 1468 protected byte[] receiveEntryBytes() 1469 { 1470 if (isFollowImport()) 1471 { 1472 // Ok, next entry is allowed to be received 1473 return super.receiveEntryBytes(); 1474 } 1475 1476 // Fractional ldif import plugin detected inconsistency between local and 1477 // remote server fractional configuration and is stopping the import 1478 // process: 1479 // This is an error termination during the import 1480 // The error is stored and the import is ended by returning null 1481 final ImportExportContext ieCtx = getImportExportContext(); 1482 LocalizableMessage msg = null; 1483 switch (importErrorMessageId) 1484 { 1485 case IMPORT_ERROR_MESSAGE_BAD_REMOTE: 1486 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource()); 1487 break; 1488 case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL: 1489 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource()); 1490 break; 1491 } 1492 ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); 1493 return null; 1494 } 1495 1496 /** 1497 * This is overwritten to allow stopping the (online) export process if the 1498 * local domain is fractional and the destination is all other servers: 1499 * This make no sense to have only fractional servers in a replicated 1500 * topology. This prevents from administrator manipulation error that would 1501 * lead to whole topology data corruption. 1502 * {@inheritDoc} 1503 */ 1504 @Override 1505 protected void initializeRemote(int target, int requestorID, 1506 Task initTask, int initWindow) throws DirectoryException 1507 { 1508 if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional()) 1509 { 1510 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId()); 1511 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); 1512 } 1513 1514 super.initializeRemote(target, requestorID, initTask, initWindow); 1515 } 1516 1517 /** 1518 * Implement the handleConflictResolution phase of the deleteOperation. 1519 * 1520 * @param deleteOperation The deleteOperation. 1521 * @return A SynchronizationProviderResult indicating if the operation 1522 * can continue. 1523 */ 1524 SynchronizationProviderResult handleConflictResolution( 1525 PreOperationDeleteOperation deleteOperation) 1526 { 1527 if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected()) 1528 { 1529 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1530 return new SynchronizationProviderResult.StopProcessing( 1531 ResultCode.UNWILLING_TO_PERFORM, msg); 1532 } 1533 1534 DeleteContext ctx = 1535 (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); 1536 Entry deletedEntry = deleteOperation.getEntryToDelete(); 1537 1538 if (ctx != null) 1539 { 1540 /* 1541 * This is a replication operation 1542 * Check that the modified entry has the same entryuuid 1543 * as it was in the original message. 1544 */ 1545 String operationEntryUUID = ctx.getEntryUUID(); 1546 String deletedEntryUUID = getEntryUUID(deletedEntry); 1547 if (!operationEntryUUID.equals(deletedEntryUUID)) 1548 { 1549 /* 1550 * The changes entry is not the same entry as the one on 1551 * the original change was performed. 1552 * Probably the original entry was renamed and replaced with 1553 * another entry. 1554 * We must not let the change proceed, return a negative 1555 * result and set the result code to NO_SUCH_OBJECT. 1556 * When the operation will return, the thread that started the operation 1557 * will try to find the correct entry and restart a new operation. 1558 */ 1559 return new SynchronizationProviderResult.StopProcessing( 1560 ResultCode.NO_SUCH_OBJECT, null); 1561 } 1562 } 1563 else 1564 { 1565 // There is no replication context attached to the operation 1566 // so this is not a replication operation. 1567 CSN csn = generateCSN(deleteOperation); 1568 String modifiedEntryUUID = getEntryUUID(deletedEntry); 1569 ctx = new DeleteContext(csn, modifiedEntryUUID); 1570 deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); 1571 1572 synchronized (replayOperations) 1573 { 1574 int size = replayOperations.size(); 1575 if (size >= 10000) 1576 { 1577 replayOperations.remove(replayOperations.firstKey()); 1578 } 1579 FakeOperation op = new FakeDelOperation( 1580 deleteOperation.getEntryDN(), csn, modifiedEntryUUID); 1581 replayOperations.put(csn, op); 1582 } 1583 } 1584 1585 return new SynchronizationProviderResult.ContinueProcessing(); 1586 } 1587 1588 /** 1589 * Implement the handleConflictResolution phase of the addOperation. 1590 * 1591 * @param addOperation The AddOperation. 1592 * @return A SynchronizationProviderResult indicating if the operation 1593 * can continue. 1594 */ 1595 SynchronizationProviderResult handleConflictResolution( 1596 PreOperationAddOperation addOperation) 1597 { 1598 if (!addOperation.isSynchronizationOperation() && !brokerIsConnected()) 1599 { 1600 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1601 return new SynchronizationProviderResult.StopProcessing( 1602 ResultCode.UNWILLING_TO_PERFORM, msg); 1603 } 1604 1605 if (fractionalConfig.isFractional()) 1606 { 1607 if (addOperation.isSynchronizationOperation()) 1608 { 1609 /* 1610 * Filter attributes here for fractional replication. If fractional 1611 * replication is enabled, we analyze the operation to suppress the 1612 * forbidden attributes in it so that they are not added in the local 1613 * backend. This must be called before any other plugin is called, to 1614 * keep coherency across plugin calls. 1615 */ 1616 fractionalFilterOperation(addOperation, true); 1617 } 1618 else 1619 { 1620 /* 1621 * Direct access from an LDAP client : if some attributes are to be 1622 * removed according to the fractional configuration, simply forbid 1623 * the operation 1624 */ 1625 if (fractionalFilterOperation(addOperation, false)) 1626 { 1627 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation); 1628 return new SynchronizationProviderResult.StopProcessing( 1629 ResultCode.UNWILLING_TO_PERFORM, msg); 1630 } 1631 } 1632 } 1633 1634 if (addOperation.isSynchronizationOperation()) 1635 { 1636 AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); 1637 /* 1638 * If an entry with the same entry uniqueID already exist then 1639 * this operation has already been replayed in the past. 1640 */ 1641 String uuid = ctx.getEntryUUID(); 1642 if (findEntryDN(uuid) != null) 1643 { 1644 return new SynchronizationProviderResult.StopProcessing( 1645 ResultCode.NO_OPERATION, null); 1646 } 1647 1648 /* The parent entry may have been renamed here since the change was done 1649 * on the first server, and another entry have taken the former dn 1650 * of the parent entry 1651 */ 1652 1653 String parentEntryUUID = ctx.getParentEntryUUID(); 1654 // root entry have no parent, there is no need to check for it. 1655 if (parentEntryUUID != null) 1656 { 1657 // There is a potential of perfs improvement here 1658 // if we could avoid the following parent entry retrieval 1659 DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID()); 1660 if (parentDnFromCtx == null) 1661 { 1662 // The parent does not exist with the specified unique id 1663 // stop the operation with NO_SUCH_OBJECT and let the 1664 // conflict resolution or the dependency resolution solve this. 1665 return new SynchronizationProviderResult.StopProcessing( 1666 ResultCode.NO_SUCH_OBJECT, null); 1667 } 1668 1669 DN entryDN = addOperation.getEntryDN(); 1670 DN parentDnFromEntryDn = DirectoryServer.getParentDNInSuffix(entryDN); 1671 if (parentDnFromEntryDn != null 1672 && !parentDnFromCtx.equals(parentDnFromEntryDn)) 1673 { 1674 // parentEntry has been renamed 1675 // replication name conflict resolution is expected to fix that 1676 // later in the flow 1677 return new SynchronizationProviderResult.StopProcessing( 1678 ResultCode.NO_SUCH_OBJECT, null); 1679 } 1680 } 1681 } 1682 return new SynchronizationProviderResult.ContinueProcessing(); 1683 } 1684 1685 /** 1686 * Check that the broker associated to this ReplicationDomain has found 1687 * a Replication Server and that this LDAP server is therefore able to 1688 * process operations. 1689 * If not, set the ResultCode, the response message, 1690 * interrupt the operation, and return false 1691 * 1692 * @return true when it OK to process the Operation, false otherwise. 1693 * When false is returned the resultCode and the response message 1694 * is also set in the Operation. 1695 */ 1696 private boolean brokerIsConnected() 1697 { 1698 final IsolationPolicy isolationPolicy = config.getIsolationPolicy(); 1699 if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) 1700 { 1701 // this policy imply that we always accept updates. 1702 return true; 1703 } 1704 if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) 1705 { 1706 // this isolation policy specifies that the updates are denied 1707 // when the broker had problems during the connection phase 1708 // Updates are still accepted if the broker is currently connecting.. 1709 return !hasConnectionError(); 1710 } 1711 // we should never get there as the only possible policies are 1712 // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES 1713 return true; 1714 } 1715 1716 /** 1717 * Implement the handleConflictResolution phase of the ModifyDNOperation. 1718 * 1719 * @param modifyDNOperation The ModifyDNOperation. 1720 * @return A SynchronizationProviderResult indicating if the operation 1721 * can continue. 1722 */ 1723 SynchronizationProviderResult handleConflictResolution( 1724 PreOperationModifyDNOperation modifyDNOperation) 1725 { 1726 if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected()) 1727 { 1728 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1729 return new SynchronizationProviderResult.StopProcessing( 1730 ResultCode.UNWILLING_TO_PERFORM, msg); 1731 } 1732 1733 if (fractionalConfig.isFractional()) 1734 { 1735 if (modifyDNOperation.isSynchronizationOperation()) 1736 { 1737 /* 1738 * Filter operation here for fractional replication. If fractional 1739 * replication is enabled, we analyze the operation and modify it if 1740 * necessary to stay consistent with what is defined in fractional 1741 * configuration. 1742 */ 1743 fractionalFilterOperation(modifyDNOperation, true); 1744 } 1745 else 1746 { 1747 /* 1748 * Direct access from an LDAP client : something is inconsistent with 1749 * the fractional configuration, forbid the operation. 1750 */ 1751 if (fractionalFilterOperation(modifyDNOperation, false)) 1752 { 1753 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation); 1754 return new SynchronizationProviderResult.StopProcessing( 1755 ResultCode.UNWILLING_TO_PERFORM, msg); 1756 } 1757 } 1758 } 1759 1760 ModifyDnContext ctx = 1761 (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); 1762 if (ctx != null) 1763 { 1764 /* 1765 * This is a replication operation 1766 * Check that the modified entry has the same entryuuid 1767 * as was in the original message. 1768 */ 1769 final String modifiedEntryUUID = 1770 getEntryUUID(modifyDNOperation.getOriginalEntry()); 1771 if (!modifiedEntryUUID.equals(ctx.getEntryUUID())) 1772 { 1773 /* 1774 * The modified entry is not the same entry as the one on 1775 * the original change was performed. 1776 * Probably the original entry was renamed and replaced with 1777 * another entry. 1778 * We must not let the change proceed, return a negative 1779 * result and set the result code to NO_SUCH_OBJECT. 1780 * When the operation will return, the thread that started the operation 1781 * will try to find the correct entry and restart a new operation. 1782 */ 1783 return new SynchronizationProviderResult.StopProcessing( 1784 ResultCode.NO_SUCH_OBJECT, null); 1785 } 1786 1787 if (modifyDNOperation.getNewSuperior() != null) 1788 { 1789 /* 1790 * Also check that the current id of the 1791 * parent is the same as when the operation was performed. 1792 */ 1793 String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1794 if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null 1795 && !newParentId.equals(ctx.getNewSuperiorEntryUUID())) 1796 { 1797 return new SynchronizationProviderResult.StopProcessing( 1798 ResultCode.NO_SUCH_OBJECT, null); 1799 } 1800 } 1801 1802 /* 1803 * If the object has been renamed more recently than this 1804 * operation, cancel the operation. 1805 */ 1806 EntryHistorical hist = newInstanceFromEntry(modifyDNOperation.getOriginalEntry()); 1807 if (hist.addedOrRenamedAfter(ctx.getCSN())) 1808 { 1809 return new SynchronizationProviderResult.StopProcessing( 1810 ResultCode.NO_OPERATION, null); 1811 } 1812 } 1813 else 1814 { 1815 // There is no replication context attached to the operation 1816 // so this is not a replication operation. 1817 CSN csn = generateCSN(modifyDNOperation); 1818 String newParentId = null; 1819 if (modifyDNOperation.getNewSuperior() != null) 1820 { 1821 newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1822 } 1823 1824 Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); 1825 String modifiedEntryUUID = getEntryUUID(modifiedEntry); 1826 ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId); 1827 modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); 1828 } 1829 return new SynchronizationProviderResult.ContinueProcessing(); 1830 } 1831 1832 /** 1833 * Handle the conflict resolution. 1834 * Called by the core server after locking the entry and before 1835 * starting the actual modification. 1836 * @param modifyOperation the operation 1837 * @return code indicating is operation must proceed 1838 */ 1839 SynchronizationProviderResult handleConflictResolution( 1840 PreOperationModifyOperation modifyOperation) 1841 { 1842 if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) 1843 { 1844 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1845 return new SynchronizationProviderResult.StopProcessing( 1846 ResultCode.UNWILLING_TO_PERFORM, msg); 1847 } 1848 1849 if (fractionalConfig.isFractional()) 1850 { 1851 if (modifyOperation.isSynchronizationOperation()) 1852 { 1853 /* 1854 * Filter attributes here for fractional replication. If fractional 1855 * replication is enabled, we analyze the operation and modify it so 1856 * that no forbidden attribute is added/modified/deleted in the local 1857 * backend. This must be called before any other plugin is called, to 1858 * keep coherency across plugin calls. 1859 */ 1860 if (fractionalFilterOperation(modifyOperation, true) == 1861 FRACTIONAL_BECOME_NO_OP) 1862 { 1863 // Every modifications filtered in this operation: the operation 1864 // becomes a no-op 1865 return new SynchronizationProviderResult.StopProcessing( 1866 ResultCode.NO_OPERATION, null); 1867 } 1868 } 1869 else 1870 { 1871 /* 1872 * Direct access from an LDAP client : if some attributes are to be 1873 * removed according to the fractional configuration, simply forbid 1874 * the operation 1875 */ 1876 switch(fractionalFilterOperation(modifyOperation, false)) 1877 { 1878 case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES: 1879 // Ok, let the operation happen 1880 break; 1881 case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES: 1882 // Some attributes not compliant with fractional configuration : 1883 // forbid the operation 1884 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation); 1885 return new SynchronizationProviderResult.StopProcessing( 1886 ResultCode.UNWILLING_TO_PERFORM, msg); 1887 } 1888 } 1889 } 1890 1891 ModifyContext ctx = 1892 (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); 1893 1894 Entry modifiedEntry = modifyOperation.getModifiedEntry(); 1895 if (ctx == null) 1896 { 1897 // No replication ctx attached => not a replicated operation 1898 // - create a ctx with : CSN, entryUUID 1899 // - attach the context to the op 1900 1901 CSN csn = generateCSN(modifyOperation); 1902 ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry)); 1903 1904 modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); 1905 } 1906 else 1907 { 1908 // Replication ctx attached => this is a replicated operation being 1909 // replayed here, it is necessary to 1910 // - check if the entry has been renamed 1911 // - check for conflicts 1912 String modifiedEntryUUID = ctx.getEntryUUID(); 1913 String currentEntryUUID = getEntryUUID(modifiedEntry); 1914 if (currentEntryUUID != null 1915 && !currentEntryUUID.equals(modifiedEntryUUID)) 1916 { 1917 /* 1918 * The current modified entry is not the same entry as the one on 1919 * the original modification was performed. 1920 * Probably the original entry was renamed and replaced with 1921 * another entry. 1922 * We must not let the modification proceed, return a negative 1923 * result and set the result code to NO_SUCH_OBJECT. 1924 * When the operation will return, the thread that started the 1925 * operation will try to find the correct entry and restart a new 1926 * operation. 1927 */ 1928 return new SynchronizationProviderResult.StopProcessing( 1929 ResultCode.NO_SUCH_OBJECT, null); 1930 } 1931 1932 // Solve the conflicts between modify operations 1933 EntryHistorical historicalInformation = newInstanceFromEntry(modifiedEntry); 1934 modifyOperation.setAttachment(HISTORICAL_ATTACHMENT_NAME, historicalInformation); 1935 1936 if (historicalInformation.replayOperation(modifyOperation, modifiedEntry)) 1937 { 1938 numResolvedModifyConflicts.incrementAndGet(); 1939 } 1940 } 1941 return new SynchronizationProviderResult.ContinueProcessing(); 1942 } 1943 1944 /** 1945 * The preOperation phase for the add Operation. 1946 * Its job is to generate the replication context associated to the 1947 * operation. It is necessary to do it in this phase because contrary to 1948 * the other operations, the entry UUID is not set when the handleConflict 1949 * phase is called. 1950 * 1951 * @param addOperation The Add Operation. 1952 */ 1953 void doPreOperation(PreOperationAddOperation addOperation) 1954 { 1955 final CSN csn = generateCSN(addOperation); 1956 final String entryUUID = getEntryUUID(addOperation); 1957 final AddContext ctx = new AddContext(csn, entryUUID, 1958 findEntryUUID(DirectoryServer.getParentDNInSuffix(addOperation.getEntryDN()))); 1959 addOperation.setAttachment(SYNCHROCONTEXT, ctx); 1960 } 1961 1962 @Override 1963 public void publishReplicaOfflineMsg() 1964 { 1965 pendingChanges.putReplicaOfflineMsg(); 1966 dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN()); 1967 } 1968 1969 /** 1970 * Check if an operation must be synchronized. 1971 * Also update the list of pending changes and the server RUV 1972 * @param op the operation 1973 */ 1974 void synchronize(PostOperationOperation op) 1975 { 1976 ResultCode result = op.getResultCode(); 1977 // Note that a failed non-replication operation might not have a change 1978 // number. 1979 CSN curCSN = OperationContext.getCSN(op); 1980 if (curCSN != null && config.isLogChangenumber()) 1981 { 1982 op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), 1983 "replicationCSN", curCSN)); 1984 } 1985 1986 if (result == ResultCode.SUCCESS) 1987 { 1988 if (op.isSynchronizationOperation()) 1989 { // Replaying a sync operation 1990 numReplayedPostOpCalled.incrementAndGet(); 1991 try 1992 { 1993 remotePendingChanges.commit(curCSN); 1994 } 1995 catch (NoSuchElementException e) 1996 { 1997 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 1998 return; 1999 } 2000 } 2001 else 2002 { 2003 // Generate a replication message for a successful non-replication 2004 // operation. 2005 LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op); 2006 2007 if (msg == null) 2008 { 2009 /* 2010 * This is an operation type that we do not know about 2011 * It should never happen. 2012 */ 2013 pendingChanges.remove(curCSN); 2014 logger.error(ERR_UNKNOWN_TYPE, op.getOperationType()); 2015 return; 2016 } 2017 2018 addEntryAttributesForCL(msg,op); 2019 2020 // If assured replication is configured, this will prepare blocking 2021 // mechanism. If assured replication is disabled, this returns 2022 // immediately 2023 prepareWaitForAckIfAssuredEnabled(msg); 2024 try 2025 { 2026 msg.encode(); 2027 pendingChanges.commitAndPushCommittedChanges(curCSN, msg); 2028 } 2029 catch (NoSuchElementException e) 2030 { 2031 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 2032 return; 2033 } 2034 // If assured replication is enabled, this will wait for the matching 2035 // ack or time out. If assured replication is disabled, this returns 2036 // immediately 2037 try 2038 { 2039 waitForAckIfAssuredEnabled(msg); 2040 } catch (TimeoutException ex) 2041 { 2042 // This exception may only be raised if assured replication is enabled 2043 logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg); 2044 } 2045 } 2046 2047 /* 2048 * If the operation is a DELETE on the base entry of the suffix 2049 * that is replicated, the generation is now lost because the 2050 * DB is empty. We need to save it again the next time we add an entry. 2051 */ 2052 if (OperationType.DELETE.equals(op.getOperationType()) 2053 && ((PostOperationDeleteOperation) op) 2054 .getEntryDN().equals(getBaseDN())) 2055 { 2056 generationIdSavedStatus = false; 2057 } 2058 2059 if (!generationIdSavedStatus) 2060 { 2061 saveGenerationId(generationId); 2062 } 2063 } 2064 else if (!op.isSynchronizationOperation() && curCSN != null) 2065 { 2066 // Remove an unsuccessful non-replication operation from the pending 2067 // changes list. 2068 pendingChanges.remove(curCSN); 2069 pendingChanges.pushCommittedChanges(); 2070 } 2071 2072 checkForClearedConflict(op); 2073 } 2074 2075 /** 2076 * Check if the operation that just happened has cleared a conflict : 2077 * Clearing a conflict happens if the operation has free a DN that 2078 * for which an other entry was in conflict. 2079 * Steps: 2080 * - get the DN freed by a DELETE or MODRDN op 2081 * - search for entries put in the conflict space (dn=entryUUID'+'....) 2082 * because the expected DN was not available (ds-sync-conflict=expected DN) 2083 * - retain the entry with the oldest conflict 2084 * - rename this entry with the freedDN as it was expected originally 2085 */ 2086 private void checkForClearedConflict(PostOperationOperation op) 2087 { 2088 OperationType type = op.getOperationType(); 2089 if (op.getResultCode() != ResultCode.SUCCESS) 2090 { 2091 // those operations cannot have cleared a conflict 2092 return; 2093 } 2094 2095 DN freedDN; 2096 if (type == OperationType.DELETE) 2097 { 2098 freedDN = ((PostOperationDeleteOperation) op).getEntryDN(); 2099 } 2100 else if (type == OperationType.MODIFY_DN) 2101 { 2102 freedDN = ((PostOperationModifyDNOperation) op).getEntryDN(); 2103 } 2104 else 2105 { 2106 return; 2107 } 2108 2109 SearchFilter filter; 2110 try 2111 { 2112 filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT, 2113 ByteString.valueOfUtf8(freedDN.toString())).toSearchFilter(); 2114 } 2115 catch (DirectoryException e) 2116 { 2117 // can not happen? 2118 logger.traceException(e); 2119 return; 2120 } 2121 2122 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 2123 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 2124 InternalSearchOperation searchOp = conn.processSearch(request); 2125 2126 Entry entryToRename = null; 2127 CSN entryToRenameCSN = null; 2128 for (SearchResultEntry entry : searchOp.getSearchEntries()) 2129 { 2130 EntryHistorical history = newInstanceFromEntry(entry); 2131 if (entryToRename == null) 2132 { 2133 entryToRename = entry; 2134 entryToRenameCSN = history.getDNDate(); 2135 } 2136 else if (!history.addedOrRenamedAfter(entryToRenameCSN)) 2137 { 2138 // this conflict is older than the previous, keep it. 2139 entryToRename = entry; 2140 entryToRenameCSN = history.getDNDate(); 2141 } 2142 } 2143 2144 if (entryToRename != null) 2145 { 2146 DN entryDN = entryToRename.getName(); 2147 ModifyDNOperation newOp = renameEntry( 2148 entryDN, freedDN.rdn(), freedDN.parent(), false); 2149 2150 ResultCode res = newOp.getResultCode(); 2151 if (res != ResultCode.SUCCESS) 2152 { 2153 logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res); 2154 } 2155 } 2156 } 2157 2158 /** 2159 * Rename an Entry Using a synchronization, non-replicated operation. 2160 * This method should be used instead of the InternalConnection methods 2161 * when the operation that need to be run must be local only and therefore 2162 * not replicated to the RS. 2163 * 2164 * @param targetDN The DN of the entry to rename. 2165 * @param newRDN The new RDN to be used. 2166 * @param parentDN The parentDN to be used. 2167 * @param markConflict A boolean indicating is this entry should be marked 2168 * as a conflicting entry. In such case the 2169 * DS_SYNC_CONFLICT attribute will be added to the entry 2170 * with the value of its original DN. 2171 * If false, the DS_SYNC_CONFLICT attribute will be 2172 * cleared. 2173 * 2174 * @return The operation that was run to rename the entry. 2175 */ 2176 private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN, 2177 boolean markConflict) 2178 { 2179 ModifyDNOperation newOp = new ModifyDNOperationBasis( 2180 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 2181 targetDN, newRDN, false, parentDN); 2182 2183 if (markConflict) 2184 { 2185 Attribute attr = Attributes.create(DS_SYNC_CONFLICT, targetDN.toString()); 2186 newOp.addModification(new Modification(ModificationType.REPLACE, attr)); 2187 } 2188 else 2189 { 2190 Attribute attr = Attributes.empty(DS_SYNC_CONFLICT); 2191 newOp.addModification(new Modification(ModificationType.DELETE, attr)); 2192 } 2193 2194 runAsSynchronizedOperation(newOp); 2195 return newOp; 2196 } 2197 2198 private void runAsSynchronizedOperation(Operation op) 2199 { 2200 op.setInternalOperation(true); 2201 op.setSynchronizationOperation(true); 2202 op.setDontSynchronize(true); 2203 op.run(); 2204 } 2205 2206 /** Delete this ReplicationDomain. */ 2207 void delete() 2208 { 2209 shutdown(); 2210 removeECLDomainCfg(); 2211 } 2212 2213 /** Shutdown this ReplicationDomain. */ 2214 public void shutdown() 2215 { 2216 if (shutdown.compareAndSet(false, true)) 2217 { 2218 final RSUpdater rsUpdater = this.rsUpdater.get(); 2219 if (rsUpdater != null) 2220 { 2221 rsUpdater.initiateShutdown(); 2222 } 2223 2224 // stop the thread in charge of flushing the ServerState. 2225 if (flushThread != null) 2226 { 2227 flushThread.initiateShutdown(); 2228 synchronized (flushThread) 2229 { 2230 flushThread.notify(); 2231 } 2232 } 2233 2234 DirectoryServer.deregisterAlertGenerator(this); 2235 DirectoryServer.deregisterBackendInitializationListener(this); 2236 DirectoryServer.deregisterShutdownListener(this); 2237 2238 // stop the ReplicationDomain 2239 disableService(); 2240 } 2241 2242 // wait for completion of the ServerStateFlush thread. 2243 try 2244 { 2245 while (!done) 2246 { 2247 Thread.sleep(50); 2248 } 2249 } catch (InterruptedException e) 2250 { 2251 Thread.currentThread().interrupt(); 2252 } 2253 } 2254 2255 /** 2256 * Marks the specified message as the one currently processed by a replay thread. 2257 * @param msg the message being processed 2258 */ 2259 void markInProgress(LDAPUpdateMsg msg) 2260 { 2261 remotePendingChanges.markInProgress(msg); 2262 } 2263 2264 /** 2265 * Create and replay a synchronized Operation from an UpdateMsg. 2266 * 2267 * @param msg 2268 * The UpdateMsg to be replayed. 2269 * @param shutdown 2270 * whether the server initiated shutdown 2271 */ 2272 void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown) 2273 { 2274 // Try replay the operation, then flush (replaying) any pending operation 2275 // whose dependency has been replayed until no more left. 2276 do 2277 { 2278 Operation op = null; // the last operation on which replay was attempted 2279 boolean dependency = false; 2280 String replayErrorMsg = null; 2281 CSN csn = null; 2282 try 2283 { 2284 // The next operation for which to attempt replay. 2285 // This local variable allow to keep error messages in the "op" local 2286 // variable until the next loop iteration starts. 2287 // "op" is already initialized to the next Operation because of the 2288 // error handling paths. 2289 Operation nextOp = op = msg.createOperation(conn); 2290 dependency = remotePendingChanges.checkDependencies(op, msg); 2291 boolean replayDone = false; 2292 int retryCount = 10; 2293 while (!dependency && !replayDone && retryCount-- > 0) 2294 { 2295 if (shutdown.get()) 2296 { 2297 // shutdown initiated, let's leave 2298 return; 2299 } 2300 // Try replay the operation 2301 op = nextOp; 2302 op.setInternalOperation(true); 2303 op.setSynchronizationOperation(true); 2304 2305 // Always add the ManageDSAIT control so that updates to referrals 2306 // are processed locally. 2307 op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL)); 2308 2309 // Add the permissive modify control in order to handle updates to values which are already present 2310 // in the entry based on equality matching, but whose raw content differs. e.g., changing the case 2311 // or a case-insensitive value. See OPENDJ-2792 2312 if (op instanceof ModifyOperation) 2313 { 2314 op.addRequestControl(new LDAPControl(OID_PERMISSIVE_MODIFY_CONTROL)); 2315 } 2316 2317 csn = OperationContext.getCSN(op); 2318 op.run(); 2319 2320 ResultCode result = op.getResultCode(); 2321 2322 if (result != ResultCode.SUCCESS) 2323 { 2324 if (result == ResultCode.NO_OPERATION) 2325 { 2326 // Pre-operation conflict resolution detected that the operation 2327 // was a no-op. For example, an add which has already been 2328 // replayed, or a modify DN operation on an entry which has been 2329 // renamed by a more recent modify DN. 2330 replayDone = true; 2331 } 2332 else if (result == ResultCode.BUSY) 2333 { 2334 /* 2335 * We probably could not get a lock (OPENDJ-885). Give the server 2336 * another chance to process this operation immediately. 2337 */ 2338 Thread.yield(); 2339 continue; 2340 } 2341 else if (result == ResultCode.UNAVAILABLE) 2342 { 2343 /* 2344 * It can happen when a rebuild is performed or the backend is 2345 * offline (OPENDJ-49). Give the server another chance to process 2346 * this operation after some time. 2347 */ 2348 Thread.sleep(50); 2349 continue; 2350 } 2351 else if (op instanceof ModifyOperation) 2352 { 2353 ModifyOperation castOp = (ModifyOperation) op; 2354 dependency = remotePendingChanges.checkDependencies(castOp); 2355 ModifyMsg modifyMsg = (ModifyMsg) msg; 2356 replayDone = !dependency && solveNamingConflict(castOp, modifyMsg); 2357 } 2358 else if (op instanceof DeleteOperation) 2359 { 2360 DeleteOperation castOp = (DeleteOperation) op; 2361 dependency = remotePendingChanges.checkDependencies(castOp); 2362 replayDone = !dependency && solveNamingConflict(castOp, msg); 2363 } 2364 else if (op instanceof AddOperation) 2365 { 2366 AddOperation castOp = (AddOperation) op; 2367 AddMsg addMsg = (AddMsg) msg; 2368 dependency = remotePendingChanges.checkDependencies(castOp); 2369 replayDone = !dependency && solveNamingConflict(castOp, addMsg); 2370 } 2371 else if (op instanceof ModifyDNOperation) 2372 { 2373 ModifyDNOperation castOp = (ModifyDNOperation) op; 2374 ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg; 2375 dependency = remotePendingChanges.checkDependencies(modifyDNMsg); 2376 replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg); 2377 } 2378 else 2379 { 2380 replayDone = true; // unknown type of operation ?! 2381 } 2382 2383 if (replayDone) 2384 { 2385 // the update became a dummy update and the result 2386 // of the conflict resolution phase is to do nothing. 2387 // however we still need to push this change to the serverState 2388 updateError(csn); 2389 } 2390 else 2391 { 2392 /* 2393 * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution 2394 * modified it and try replaying it again. Dependencies might have been replayed by now. 2395 * Note: When msg is a DeleteMsg, the DeleteOperation is properly 2396 * created with subtreeDelete request control when needed. 2397 */ 2398 nextOp = msg.createOperation(conn); 2399 } 2400 } 2401 else 2402 { 2403 replayDone = true; 2404 } 2405 } 2406 2407 if (!replayDone && !dependency) 2408 { 2409 // Continue with the next change but the servers could now become 2410 // inconsistent. 2411 // Let the repair tool know about this. 2412 final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get( 2413 op, op.getErrorMessage()); 2414 logger.error(message); 2415 numUnresolvedNamingConflicts.incrementAndGet(); 2416 replayErrorMsg = message.toString(); 2417 updateError(csn); 2418 } 2419 } catch (DecodeException | LDAPException | DataFormatException e) 2420 { 2421 replayErrorMsg = logDecodingOperationError(msg, e); 2422 } catch (Exception e) 2423 { 2424 if (csn != null) 2425 { 2426 /* 2427 * An Exception happened during the replay process. 2428 * Continue with the next change but the servers will now start 2429 * to be inconsistent. 2430 * Let the repair tool know about this. 2431 */ 2432 LocalizableMessage message = 2433 ERR_EXCEPTION_REPLAYING_OPERATION.get( 2434 stackTraceToSingleLineString(e), op); 2435 logger.error(message); 2436 replayErrorMsg = message.toString(); 2437 updateError(csn); 2438 } else 2439 { 2440 replayErrorMsg = logDecodingOperationError(msg, e); 2441 } 2442 } finally 2443 { 2444 if (!dependency) 2445 { 2446 processUpdateDone(msg, replayErrorMsg); 2447 } 2448 } 2449 2450 // Now replay any pending update that had a dependency and whose 2451 // dependency has been replayed, do that until no more updates of that 2452 // type left... 2453 msg = remotePendingChanges.getNextUpdate(); 2454 } while (msg != null); 2455 } 2456 2457 private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e) 2458 { 2459 LocalizableMessage message = 2460 ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e)); 2461 logger.error(message); 2462 return message.toString(); 2463 } 2464 2465 /** 2466 * This method is called when an error happens while replaying 2467 * an operation. 2468 * It is necessary because the postOperation does not always get 2469 * called when error or Exceptions happen during the operation replay. 2470 * 2471 * @param csn the CSN of the operation with error. 2472 */ 2473 private void updateError(CSN csn) 2474 { 2475 try 2476 { 2477 remotePendingChanges.commit(csn); 2478 } 2479 catch (NoSuchElementException e) 2480 { 2481 // A failure occurred after the change had been removed from the pending 2482 // changes table. 2483 if (logger.isTraceEnabled()) 2484 { 2485 logger.trace( 2486 "LDAPReplicationDomain.updateError: Unable to find remote " 2487 + "pending change for CSN %s", csn); 2488 } 2489 } 2490 } 2491 2492 /** 2493 * Generate a new CSN and insert it in the pending list. 2494 * 2495 * @param operation 2496 * The operation for which the CSN must be generated. 2497 * @return The new CSN. 2498 */ 2499 private CSN generateCSN(PluginOperation operation) 2500 { 2501 return pendingChanges.putLocalOperation(operation); 2502 } 2503 2504 /** 2505 * Find the Unique Id of the entry with the provided DN by doing a 2506 * search of the entry and extracting its entryUUID from its attributes. 2507 * 2508 * @param dn The dn of the entry for which the unique Id is searched. 2509 * 2510 * @return The unique Id of the entry with the provided DN. 2511 */ 2512 static String findEntryUUID(DN dn) 2513 { 2514 if (dn == null) 2515 { 2516 return null; 2517 } 2518 final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT) 2519 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME); 2520 final InternalSearchOperation search = getRootConnection().processSearch(request); 2521 final SearchResultEntry resultEntry = getFirstResult(search); 2522 if (resultEntry != null) 2523 { 2524 return getEntryUUID(resultEntry); 2525 } 2526 return null; 2527 } 2528 2529 private static SearchResultEntry getFirstResult(InternalSearchOperation search) 2530 { 2531 if (search.getResultCode() == ResultCode.SUCCESS) 2532 { 2533 final LinkedList<SearchResultEntry> results = search.getSearchEntries(); 2534 if (!results.isEmpty()) 2535 { 2536 return results.getFirst(); 2537 } 2538 } 2539 return null; 2540 } 2541 2542 /** 2543 * Find the current DN of an entry from its entry UUID. 2544 * 2545 * @param uuid the Entry Unique ID. 2546 * @return The current DN of the entry or null if there is no entry with 2547 * the specified UUID. 2548 */ 2549 private DN findEntryDN(String uuid) 2550 { 2551 try 2552 { 2553 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid); 2554 InternalSearchOperation search = conn.processSearch(request); 2555 final SearchResultEntry resultEntry = getFirstResult(search); 2556 if (resultEntry != null) 2557 { 2558 return resultEntry.getName(); 2559 } 2560 } 2561 catch (DirectoryException e) 2562 { 2563 // never happens because the filter is always valid. 2564 } 2565 return null; 2566 } 2567 2568 /** 2569 * Solve a conflict detected when replaying a modify operation. 2570 * 2571 * @param op The operation that triggered the conflict detection. 2572 * @param msg The operation that triggered the conflict detection. 2573 * @return true if the process is completed, false if it must continue.. 2574 */ 2575 private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg) 2576 { 2577 ResultCode result = op.getResultCode(); 2578 ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); 2579 String entryUUID = ctx.getEntryUUID(); 2580 2581 if (result == ResultCode.NO_SUCH_OBJECT) 2582 { 2583 /* 2584 * The operation is a modification but 2585 * the entry has been renamed on a different master in the same time. 2586 * search if the entry has been renamed, and return the new dn 2587 * of the entry. 2588 */ 2589 DN newDN = findEntryDN(entryUUID); 2590 if (newDN != null) 2591 { 2592 // There is an entry with the same unique id as this modify operation 2593 // replay the modify using the current dn of this entry. 2594 msg.setDN(newDN); 2595 numResolvedNamingConflicts.incrementAndGet(); 2596 return false; 2597 } 2598 else 2599 { 2600 // This entry does not exist anymore. 2601 // It has probably been deleted, stop the processing of this operation 2602 numResolvedNamingConflicts.incrementAndGet(); 2603 return true; 2604 } 2605 } 2606 else if (result == ResultCode.NOT_ALLOWED_ON_RDN) 2607 { 2608 DN currentDN = findEntryDN(entryUUID); 2609 RDN currentRDN; 2610 if (currentDN != null) 2611 { 2612 currentRDN = currentDN.rdn(); 2613 } 2614 else 2615 { 2616 // The entry does not exist anymore. 2617 numResolvedNamingConflicts.incrementAndGet(); 2618 return true; 2619 } 2620 2621 // The modify operation is trying to delete the value that is 2622 // currently used in the RDN. We need to alter the modify so that it does 2623 // not remove the current RDN value(s). 2624 2625 List<Modification> mods = op.getModifications(); 2626 for (Modification mod : mods) 2627 { 2628 AttributeType modAttrType = mod.getAttribute().getAttributeDescription().getAttributeType(); 2629 if ((mod.getModificationType() == ModificationType.DELETE 2630 || mod.getModificationType() == ModificationType.REPLACE) 2631 && currentRDN.hasAttributeType(modAttrType)) 2632 { 2633 // the attribute can't be deleted because it is used in the RDN, 2634 // turn this operation is a replace with the current RDN value(s); 2635 mod.setModificationType(ModificationType.REPLACE); 2636 Attribute newAttribute = mod.getAttribute(); 2637 AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute); 2638 attrBuilder.add(currentRDN.getAttributeValue(modAttrType)); 2639 mod.setAttribute(attrBuilder.toAttribute()); 2640 } 2641 } 2642 msg.setMods(mods); 2643 numResolvedNamingConflicts.incrementAndGet(); 2644 return false; 2645 } 2646 else 2647 { 2648 // The other type of errors can not be caused by naming conflicts. 2649 // Log a message for the repair tool. 2650 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2651 op, ctx.getCSN(), result, op.getErrorMessage()); 2652 return true; 2653 } 2654 } 2655 2656 /** 2657 * Solve a conflict detected when replaying a delete operation. 2658 * 2659 * @param op The operation that triggered the conflict detection. 2660 * @param msg The operation that triggered the conflict detection. 2661 * @return true if the process is completed, false if it must continue.. 2662 */ 2663 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg) 2664 { 2665 ResultCode result = op.getResultCode(); 2666 DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); 2667 String entryUUID = ctx.getEntryUUID(); 2668 2669 if (result == ResultCode.NO_SUCH_OBJECT) 2670 { 2671 /* Find if the entry is still in the database. */ 2672 DN currentDN = findEntryDN(entryUUID); 2673 if (currentDN == null) 2674 { 2675 /* 2676 * The entry has already been deleted, either because this delete 2677 * has already been replayed or because another concurrent delete 2678 * has already done the job. 2679 * In any case, there is nothing more to do. 2680 */ 2681 numResolvedNamingConflicts.incrementAndGet(); 2682 return true; 2683 } 2684 else 2685 { 2686 // This entry has been renamed, replay the delete using its new DN. 2687 msg.setDN(currentDN); 2688 numResolvedNamingConflicts.incrementAndGet(); 2689 return false; 2690 } 2691 } 2692 else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) 2693 { 2694 /* 2695 * This may happen when we replay a DELETE done on a master 2696 * but children of this entry have been added on another master. 2697 * 2698 * Rename all the children by adding entryuuid in dn and delete this entry. 2699 * 2700 * The action taken here must be consistent with the actions 2701 * done in the solveNamingConflict(AddOperation) method 2702 * when we are adding an entry whose parent entry has already been deleted. 2703 */ 2704 if (findAndRenameChild(op.getEntryDN(), op)) 2705 { 2706 numUnresolvedNamingConflicts.incrementAndGet(); 2707 } 2708 2709 return false; 2710 } 2711 else 2712 { 2713 // The other type of errors can not be caused by naming conflicts. 2714 // Log a message for the repair tool. 2715 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2716 op, ctx.getCSN(), result, op.getErrorMessage()); 2717 return true; 2718 } 2719 } 2720 2721/** 2722 * Solve a conflict detected when replaying a Modify DN operation. 2723 * 2724 * @param op The operation that triggered the conflict detection. 2725 * @param msg The operation that triggered the conflict detection. 2726 * @return true if the process is completed, false if it must continue. 2727 * @throws Exception When the operation is not valid. 2728 */ 2729private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg) 2730 throws Exception 2731{ 2732 ResultCode result = op.getResultCode(); 2733 ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); 2734 String entryUUID = ctx.getEntryUUID(); 2735 String newSuperiorID = ctx.getNewSuperiorEntryUUID(); 2736 2737 /* 2738 * four possible cases : 2739 * - the modified entry has been renamed 2740 * - the new parent has been renamed 2741 * - the operation is replayed for the second time. 2742 * - the entry has been deleted 2743 * action : 2744 * - change the target dn and the new parent dn and 2745 * restart the operation, 2746 * - don't do anything if the operation is replayed. 2747 */ 2748 2749 // get the current DN of this entry in the database. 2750 DN currentDN = findEntryDN(entryUUID); 2751 2752 // Construct the new DN to use for the entry. 2753 DN entryDN = op.getEntryDN(); 2754 DN newSuperior; 2755 RDN newRDN = op.getNewRDN(); 2756 2757 if (newSuperiorID != null) 2758 { 2759 newSuperior = findEntryDN(newSuperiorID); 2760 } 2761 else 2762 { 2763 newSuperior = entryDN.parent(); 2764 } 2765 2766 //If we could not find the new parent entry, we missed this entry 2767 // earlier or it has disappeared from the database 2768 // Log this information for the repair tool and mark the entry 2769 // as conflicting. 2770 // stop the processing. 2771 if (newSuperior == null) 2772 { 2773 markConflictEntry(op, currentDN, currentDN.parent().child(newRDN)); 2774 numUnresolvedNamingConflicts.incrementAndGet(); 2775 return true; 2776 } 2777 2778 DN newDN = newSuperior.child(newRDN); 2779 2780 if (currentDN == null) 2781 { 2782 // The entry targeted by the Modify DN is not in the database 2783 // anymore. 2784 // This is a conflict between a delete and this modify DN. 2785 // The entry has been deleted, we can safely assume 2786 // that the operation is completed. 2787 numResolvedNamingConflicts.incrementAndGet(); 2788 return true; 2789 } 2790 2791 // if the newDN and the current DN match then the operation 2792 // is a no-op (this was probably a second replay) 2793 // don't do anything. 2794 if (newDN.equals(currentDN)) 2795 { 2796 numResolvedNamingConflicts.incrementAndGet(); 2797 return true; 2798 } 2799 2800 if (result == ResultCode.NO_SUCH_OBJECT 2801 || result == ResultCode.UNWILLING_TO_PERFORM 2802 || result == ResultCode.OBJECTCLASS_VIOLATION) 2803 { 2804 /* 2805 * The entry or it's new parent has not been found 2806 * reconstruct the operation with the DN we just built 2807 */ 2808 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2809 modifyDnMsg.setDN(currentDN); 2810 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2811 numResolvedNamingConflicts.incrementAndGet(); 2812 return false; 2813 } 2814 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2815 { 2816 /* 2817 * This may happen when two modifyDn operation 2818 * are done on different servers but with the same target DN 2819 * add the conflict object class to the entry 2820 * and rename it using its entryuuid. 2821 */ 2822 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2823 markConflictEntry(op, op.getEntryDN(), newDN); 2824 modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID, 2825 modifyDnMsg.getNewRDN())); 2826 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2827 numUnresolvedNamingConflicts.incrementAndGet(); 2828 return false; 2829 } 2830 else 2831 { 2832 // The other type of errors can not be caused by naming conflicts. 2833 // Log a message for the repair tool. 2834 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2835 op, ctx.getCSN(), result, op.getErrorMessage()); 2836 return true; 2837 } 2838} 2839 2840 /** 2841 * Solve a conflict detected when replaying a ADD operation. 2842 * 2843 * @param op The operation that triggered the conflict detection. 2844 * @param msg The message that triggered the conflict detection. 2845 * @return true if the process is completed, false if it must continue. 2846 * @throws Exception When the operation is not valid. 2847 */ 2848 private boolean solveNamingConflict(AddOperation op, AddMsg msg) 2849 throws Exception 2850 { 2851 ResultCode result = op.getResultCode(); 2852 AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); 2853 String entryUUID = ctx.getEntryUUID(); 2854 String parentUniqueId = ctx.getParentEntryUUID(); 2855 2856 if (result == ResultCode.NO_SUCH_OBJECT) 2857 { 2858 /* 2859 * This can happen if the parent has been renamed or deleted 2860 * find the parent dn and calculate a new dn for the entry 2861 */ 2862 if (parentUniqueId == null) 2863 { 2864 /* 2865 * This entry is the base dn of the backend. 2866 * It is quite surprising that the operation result be NO_SUCH_OBJECT. 2867 * There is nothing more we can do except log a 2868 * message for the repair tool to look at this problem. 2869 * TODO : Log the message 2870 */ 2871 return true; 2872 } 2873 DN parentDn = findEntryDN(parentUniqueId); 2874 if (parentDn == null) 2875 { 2876 /* 2877 * The parent has been deleted 2878 * rename the entry as a conflicting entry. 2879 * The action taken here must be consistent with the actions 2880 * done when in the solveNamingConflict(DeleteOperation) method 2881 * when we are deleting an entry that have some child entries. 2882 */ 2883 addConflict(msg); 2884 2885 String conflictRDN = 2886 generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString()); 2887 msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN())); 2888 // reset the parent entryUUID so that the check done is the 2889 // handleConflict phase does not fail. 2890 msg.setParentEntryUUID(null); 2891 numUnresolvedNamingConflicts.incrementAndGet(); 2892 } 2893 else 2894 { 2895 msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn)); 2896 numResolvedNamingConflicts.incrementAndGet(); 2897 } 2898 return false; 2899 } 2900 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2901 { 2902 /* 2903 * This can happen if 2904 * - two adds are done on different servers but with the 2905 * same target DN. 2906 * - the same ADD is being replayed for the second time on this server. 2907 * if the entryUUID already exist, assume this is a replay and 2908 * don't do anything 2909 * if the entry unique id do not exist, generate conflict. 2910 */ 2911 if (findEntryDN(entryUUID) != null) 2912 { 2913 // entry already exist : this is a replay 2914 return true; 2915 } 2916 else 2917 { 2918 addConflict(msg); 2919 String conflictRDN = 2920 generateConflictRDN(entryUUID, msg.getDN().toString()); 2921 msg.setDN(DN.valueOf(conflictRDN)); 2922 numUnresolvedNamingConflicts.incrementAndGet(); 2923 return false; 2924 } 2925 } 2926 else 2927 { 2928 // The other type of errors can not be caused by naming conflicts. 2929 // log a message for the repair tool. 2930 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2931 op, ctx.getCSN(), result, op.getErrorMessage()); 2932 return true; 2933 } 2934 } 2935 2936 /** 2937 * Find all the entries below the provided DN and rename them 2938 * so that they stay below the baseDN of this replicationDomain and 2939 * use the conflicting name and attribute. 2940 * 2941 * @param entryDN The DN of the entry whose child must be renamed. 2942 * @param conflictOp The Operation that generated the conflict. 2943 */ 2944 private boolean findAndRenameChild(DN entryDN, Operation conflictOp) 2945 { 2946 /* 2947 * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the 2948 * entryUUID has not changed or try to use the entryUUID rather than the 2949 * DN.". entryUUID can be obtained from the caller of the current method. 2950 */ 2951 boolean conflict = false; 2952 2953 // Find and rename child entries. 2954 final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL) 2955 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME); 2956 InternalSearchOperation op = conn.processSearch(request); 2957 if (op.getResultCode() == ResultCode.SUCCESS) 2958 { 2959 for (SearchResultEntry entry : op.getSearchEntries()) 2960 { 2961 /* 2962 * Check the ADD and ModRDN date of the child entry 2963 * (All of them, not only the one that are newer than the DEL op) 2964 * and keep the entry as a conflicting entry. 2965 */ 2966 conflict = true; 2967 renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry)); 2968 } 2969 } 2970 else 2971 { 2972 // log error and information for the REPAIR tool. 2973 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode()); 2974 } 2975 2976 return conflict; 2977 } 2978 2979 /** 2980 * Rename an entry that was conflicting so that it stays below the 2981 * baseDN of the replicationDomain. 2982 * 2983 * @param conflictOp The Operation that caused the conflict. 2984 * @param dn The DN of the entry to be renamed. 2985 * @param entryUUID The uniqueID of the entry to be renamed. 2986 */ 2987 private void renameConflictEntry(Operation conflictOp, DN dn, 2988 String entryUUID) 2989 { 2990 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn); 2991 DirectoryServer.sendAlertNotification(this, 2992 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2993 2994 RDN newRDN = generateDeleteConflictDn(entryUUID, dn); 2995 ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true); 2996 2997 if (newOp.getResultCode() != ResultCode.SUCCESS) 2998 { 2999 // log information for the repair tool. 3000 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, 3001 dn, conflictOp, newOp.getResultCode()); 3002 } 3003 } 3004 3005 /** 3006 * Generate a modification to add the conflict attribute to an entry 3007 * whose Dn is now conflicting with another entry. 3008 * 3009 * @param op The operation causing the conflict. 3010 * @param currentDN The current DN of the operation to mark as conflicting. 3011 * @param conflictDN The newDn on which the conflict happened. 3012 */ 3013 private void markConflictEntry(Operation op, DN currentDN, DN conflictDN) 3014 { 3015 // create new internal modify operation and run it. 3016 Attribute attr = Attributes.create(DS_SYNC_CONFLICT, conflictDN.toString()); 3017 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 3018 3019 ModifyOperation newOp = new ModifyOperationBasis( 3020 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 3021 currentDN, mods); 3022 runAsSynchronizedOperation(newOp); 3023 3024 if (newOp.getResultCode() != ResultCode.SUCCESS) 3025 { 3026 // Log information for the repair tool. 3027 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode()); 3028 } 3029 3030 // Generate an alert to let the administration know that some 3031 // conflict could not be solved. 3032 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN); 3033 DirectoryServer.sendAlertNotification(this, 3034 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 3035 } 3036 3037 /** 3038 * Add the conflict attribute to an entry that could 3039 * not be added because it is conflicting with another entry. 3040 * 3041 * @param msg The conflicting Add Operation. 3042 * 3043 * @throws DecodeException When an encoding error happened manipulating the 3044 * msg. 3045 */ 3046 private void addConflict(AddMsg msg) throws DecodeException 3047 { 3048 String normalizedDN = msg.getDN().toString(); 3049 3050 // Generate an alert to let the administrator know that some 3051 // conflict could not be solved. 3052 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN); 3053 DirectoryServer.sendAlertNotification(this, 3054 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 3055 3056 // Add the conflict attribute 3057 msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN); 3058 } 3059 3060 /** 3061 * Generate the Dn to use for a conflicting entry. 3062 * 3063 * @param entryUUID The unique identifier of the entry involved in the 3064 * conflict. 3065 * @param rdn Original rdn. 3066 * @return The generated RDN for a conflicting entry. 3067 */ 3068 private String generateConflictRDN(String entryUUID, String rdn) 3069 { 3070 return "entryuuid=" + entryUUID + "+" + rdn; 3071 } 3072 3073 /** 3074 * Generate the RDN to use for a conflicting entry whose father was deleted. 3075 * 3076 * @param entryUUID The unique identifier of the entry involved in the 3077 * conflict. 3078 * @param dn The original DN of the entry. 3079 * 3080 * @return The generated RDN for a conflicting entry. 3081 */ 3082 private RDN generateDeleteConflictDn(String entryUUID, DN dn) 3083 { 3084 String newRDN = "entryuuid=" + entryUUID + "+" + dn.rdn(); 3085 try 3086 { 3087 return RDN.valueOf(newRDN); 3088 } 3089 catch (LocalizedIllegalArgumentException e) 3090 { 3091 // cannot happen 3092 return null; 3093 } 3094 } 3095 3096 /** 3097 * Check if the domain solve conflicts. 3098 * 3099 * @return a boolean indicating if the domain should solve conflicts. 3100 */ 3101 boolean solveConflict() 3102 { 3103 return solveConflictFlag; 3104 } 3105 3106 /** 3107 * Disable the replication on this domain. 3108 * The session to the replication server will be stopped. 3109 * The domain will not be destroyed but call to the pre-operation 3110 * methods will result in failure. 3111 * The listener thread will be destroyed. 3112 * The monitor informations will still be accessible. 3113 */ 3114 public void disable() 3115 { 3116 state.save(); 3117 state.clearInMemory(); 3118 disabled = true; 3119 disableService(); // This will cut the session and wake up the listener 3120 } 3121 3122 /** 3123 * Do what necessary when the data have changed : load state, load 3124 * generation Id. 3125 * If there is no such information check if there is a 3126 * ReplicaUpdateVector entry and translate it into a state 3127 * and generationId. 3128 * @exception DirectoryException Thrown when an error occurs. 3129 */ 3130 private void loadDataState() throws DirectoryException 3131 { 3132 state.clearInMemory(); 3133 state.loadState(); 3134 getGenerator().adjust(state.getMaxCSN(getServerId())); 3135 3136 // Retrieves the generation ID associated with the data imported 3137 generationId = loadGenerationId(); 3138 } 3139 3140 /** 3141 * Enable back the domain after a previous disable. 3142 * The domain will connect back to a replication Server and 3143 * will recreate threads to listen for messages from the Synchronization 3144 * server. 3145 * The generationId will be retrieved or computed if necessary. 3146 * The ServerState will also be read again from the local database. 3147 */ 3148 public void enable() 3149 { 3150 try 3151 { 3152 loadDataState(); 3153 } 3154 catch (Exception e) 3155 { 3156 /* TODO should mark that replicationServer service is 3157 * not available, log an error and retry upon timeout 3158 * should we stop the modifications ? 3159 */ 3160 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3161 return; 3162 } 3163 3164 enableService(); 3165 3166 disabled = false; 3167 } 3168 3169 /** 3170 * Compute the data generationId associated with the current data present 3171 * in the backend for this domain. 3172 * @return The computed generationId. 3173 * @throws DirectoryException When an error occurs. 3174 */ 3175 private long computeGenerationId() throws DirectoryException 3176 { 3177 final long genId = exportBackend(null, true); 3178 if (logger.isTraceEnabled()) 3179 { 3180 logger.trace("Computed generationId: generationId=" + genId); 3181 } 3182 return genId; 3183 } 3184 3185 /** 3186 * Run a modify operation to update the entry whose DN is given as 3187 * a parameter with the generationID information. 3188 * 3189 * @param entryDN The DN of the entry to be updated. 3190 * @param generationId The value of the generationID to be saved. 3191 * 3192 * @return A ResultCode indicating if the operation was successful. 3193 */ 3194 private ResultCode runSaveGenerationId(DN entryDN, long generationId) 3195 { 3196 // The generationId is stored in the root entry of the domain. 3197 final ByteString asn1BaseDn = ByteString.valueOfUtf8(entryDN.toString()); 3198 3199 LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId)); 3200 List<RawModification> mods = new ArrayList<>(1); 3201 mods.add(new LDAPModification(ModificationType.REPLACE, attr)); 3202 3203 ModifyOperation op = new ModifyOperationBasis( 3204 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 3205 asn1BaseDn, mods); 3206 runAsSynchronizedOperation(op); 3207 return op.getResultCode(); 3208 } 3209 3210 /** 3211 * Stores the value of the generationId. 3212 * @param generationId The value of the generationId. 3213 * @return a ResultCode indicating if the method was successful. 3214 */ 3215 private ResultCode saveGenerationId(long generationId) 3216 { 3217 ResultCode result = runSaveGenerationId(getBaseDN(), generationId); 3218 if (result != ResultCode.SUCCESS) 3219 { 3220 generationIdSavedStatus = false; 3221 if (result == ResultCode.NO_SUCH_OBJECT) 3222 { 3223 // If the base entry does not exist, save the generation 3224 // ID in the config entry 3225 result = runSaveGenerationId(config.dn(), generationId); 3226 } 3227 3228 if (result != ResultCode.SUCCESS) 3229 { 3230 logger.error(ERR_UPDATING_GENERATION_ID, getBaseDN(), result.getName()); 3231 } 3232 } 3233 else 3234 { 3235 generationIdSavedStatus = true; 3236 } 3237 return result; 3238 } 3239 3240 /** 3241 * Load the GenerationId from the root entry of the domain 3242 * from the REPLICATION_GENERATION_ID attribute in database 3243 * to memory, or compute it if not found. 3244 * 3245 * @return generationId The retrieved value of generationId 3246 * @throws DirectoryException When an error occurs. 3247 */ 3248 private long loadGenerationId() throws DirectoryException 3249 { 3250 if (logger.isTraceEnabled()) 3251 { 3252 logger.trace("Attempt to read generation ID from DB " + getBaseDN()); 3253 } 3254 3255 // Search the database entry that is used to periodically save the generation id 3256 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 3257 .addAttribute(REPLICATION_GENERATION_ID); 3258 InternalSearchOperation search = conn.processSearch(request); 3259 if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT) 3260 { 3261 // if the base entry does not exist look for the generationID 3262 // in the config entry. 3263 request.setName(config.dn()); 3264 search = conn.processSearch(request); 3265 } 3266 3267 boolean found = false; 3268 long aGenerationId = -1; 3269 if (search.getResultCode() != ResultCode.SUCCESS) 3270 { 3271 if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 3272 { 3273 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 3274 logger.error(ERR_SEARCHING_GENERATION_ID, getBaseDN(), errorMsg); 3275 } 3276 } 3277 else 3278 { 3279 List<SearchResultEntry> result = search.getSearchEntries(); 3280 SearchResultEntry resultEntry = result.get(0); 3281 if (resultEntry != null) 3282 { 3283 List<Attribute> attrs = resultEntry.getAttribute(REPLICATION_GENERATION_ID); 3284 if (!attrs.isEmpty()) 3285 { 3286 Attribute attr = attrs.get(0); 3287 if (attr.size()>1) 3288 { 3289 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 3290 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 3291 } 3292 else if (attr.size() == 1) 3293 { 3294 found = true; 3295 try 3296 { 3297 aGenerationId = Long.decode(attr.iterator().next().toString()); 3298 } 3299 catch(Exception e) 3300 { 3301 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3302 } 3303 } 3304 } 3305 } 3306 } 3307 3308 if (!found) 3309 { 3310 aGenerationId = computeGenerationId(); 3311 saveGenerationId(aGenerationId); 3312 3313 if (logger.isTraceEnabled()) 3314 { 3315 logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId); 3316 } 3317 } 3318 else 3319 { 3320 generationIdSavedStatus = true; 3321 if (logger.isTraceEnabled()) 3322 { 3323 logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN() 3324 + " generationId=" + aGenerationId); 3325 } 3326 } 3327 return aGenerationId; 3328 } 3329 3330 /** 3331 * Do whatever is needed when a backup is started. 3332 * We need to make sure that the serverState is correctly save. 3333 */ 3334 void backupStart() 3335 { 3336 state.save(); 3337 } 3338 3339 /** Do whatever is needed when a backup is finished. */ 3340 void backupEnd() 3341 { 3342 // Nothing is needed at the moment 3343 } 3344 3345 /* 3346 * Total Update >> 3347 */ 3348 3349 /** 3350 * This method trigger an export of the replicated data. 3351 * 3352 * @param output The OutputStream where the export should 3353 * be produced. 3354 * @throws DirectoryException When needed. 3355 */ 3356 @Override 3357 protected void exportBackend(OutputStream output) throws DirectoryException 3358 { 3359 exportBackend(output, false); 3360 } 3361 3362 /** 3363 * Export the entries from the backend and/or compute the generation ID. 3364 * The ieContext must have been set before calling. 3365 * 3366 * @param output The OutputStream where the export should 3367 * be produced. 3368 * @param checksumOutput A boolean indicating if this export is 3369 * invoked to perform a checksum only 3370 * 3371 * @return The computed GenerationID. 3372 * 3373 * @throws DirectoryException when an error occurred 3374 */ 3375 private long exportBackend(OutputStream output, boolean checksumOutput) 3376 throws DirectoryException 3377 { 3378 Backend<?> backend = getBackend(); 3379 3380 // Acquire a shared lock for the backend. 3381 try 3382 { 3383 String lockFile = LockFileManager.getBackendLockFileName(backend); 3384 StringBuilder failureReason = new StringBuilder(); 3385 if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) 3386 { 3387 LocalizableMessage message = 3388 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3389 logger.error(message); 3390 throw new DirectoryException(ResultCode.OTHER, message); 3391 } 3392 } 3393 catch (Exception e) 3394 { 3395 LocalizableMessage message = 3396 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), 3397 stackTraceToSingleLineString(e)); 3398 logger.error(message); 3399 throw new DirectoryException(ResultCode.OTHER, message); 3400 } 3401 3402 long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN()); 3403 long entryCount = Math.min(numberOfEntries, 1000); 3404 OutputStream os; 3405 ReplLDIFOutputStream ros = null; 3406 if (checksumOutput) 3407 { 3408 ros = new ReplLDIFOutputStream(entryCount); 3409 os = ros; 3410 try 3411 { 3412 os.write(Long.toString(numberOfEntries).getBytes()); 3413 } 3414 catch(Exception e) 3415 { 3416 // Should never happen 3417 } 3418 } 3419 else 3420 { 3421 os = output; 3422 } 3423 3424 // baseDN branch is the only one included in the export 3425 LDIFExportConfig exportConfig = new LDIFExportConfig(os); 3426 exportConfig.setIncludeBranches(newArrayList(getBaseDN())); 3427 3428 // For the checksum computing mode, only consider the 'stable' attributes 3429 if (checksumOutput) 3430 { 3431 String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" }; 3432 Set<AttributeType> includeAttributes = new HashSet<>(); 3433 for (String attrName : includeAttributeStrings) 3434 { 3435 includeAttributes.add(DirectoryServer.getSchema().getAttributeType(attrName)); 3436 } 3437 exportConfig.setIncludeAttributes(includeAttributes); 3438 } 3439 3440 // Launch the export. 3441 long genID = 0; 3442 try 3443 { 3444 backend.exportLDIF(exportConfig); 3445 } 3446 catch (DirectoryException de) 3447 { 3448 if (ros == null || ros.getNumExportedEntries() < entryCount) 3449 { 3450 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); 3451 logger.error(message); 3452 throw new DirectoryException(ResultCode.OTHER, message); 3453 } 3454 } 3455 catch (Exception e) 3456 { 3457 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e)); 3458 logger.error(message); 3459 throw new DirectoryException(ResultCode.OTHER, message); 3460 } 3461 finally 3462 { 3463 // Clean up after the export by closing the export config. 3464 // Will also flush the export and export the remaining entries. 3465 exportConfig.close(); 3466 3467 if (checksumOutput) 3468 { 3469 genID = ros.getChecksumValue(); 3470 } 3471 3472 // Release the shared lock on the backend. 3473 try 3474 { 3475 String lockFile = LockFileManager.getBackendLockFileName(backend); 3476 StringBuilder failureReason = new StringBuilder(); 3477 if (! LockFileManager.releaseLock(lockFile, failureReason)) 3478 { 3479 LocalizableMessage message = 3480 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3481 logger.warn(message); 3482 throw new DirectoryException(ResultCode.OTHER, message); 3483 } 3484 } 3485 catch (Exception e) 3486 { 3487 LocalizableMessage message = 3488 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), 3489 stackTraceToSingleLineString(e)); 3490 logger.warn(message); 3491 throw new DirectoryException(ResultCode.OTHER, message); 3492 } 3493 } 3494 return genID; 3495 } 3496 3497 /** 3498 * Process backend before import. 3499 * 3500 * @param backend 3501 * The backend. 3502 * @throws DirectoryException 3503 * If the backend could not be disabled or locked exclusively. 3504 */ 3505 private void preBackendImport(Backend<?> backend) throws DirectoryException 3506 { 3507 // Prevent the processing of the backend finalisation event as the import will disable the attached backend 3508 ignoreBackendInitializationEvent = true; 3509 3510 // FIXME setBackendEnabled should be part of TaskUtils ? 3511 TaskUtils.disableBackend(backend.getBackendID()); 3512 3513 // Acquire an exclusive lock for the backend. 3514 String lockFile = LockFileManager.getBackendLockFileName(backend); 3515 StringBuilder failureReason = new StringBuilder(); 3516 if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) 3517 { 3518 LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3519 logger.error(message); 3520 throw new DirectoryException(ResultCode.OTHER, message); 3521 } 3522 } 3523 3524 /** 3525 * This method triggers an import of the replicated data. 3526 * 3527 * @param input The InputStream from which the data are read. 3528 * @throws DirectoryException When needed. 3529 */ 3530 @Override 3531 protected void importBackend(InputStream input) throws DirectoryException 3532 { 3533 Backend<?> backend = getBackend(); 3534 3535 LDIFImportConfig importConfig = null; 3536 ImportExportContext ieCtx = getImportExportContext(); 3537 try 3538 { 3539 if (!backend.supports(BackendOperation.LDIF_IMPORT)) 3540 { 3541 ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER, 3542 ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID()))); 3543 return; 3544 } 3545 3546 importConfig = new LDIFImportConfig(input); 3547 importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN())); 3548 // We should not validate schema for replication 3549 importConfig.setValidateSchema(false); 3550 // Allow fractional replication ldif import plugin to be called 3551 importConfig.setInvokeImportPlugins(true); 3552 // Reset the follow import flag and message before starting the import 3553 importErrorMessageId = -1; 3554 3555 // TODO How to deal with rejected entries during the import 3556 File rejectsFile = 3557 getFileForPath("logs" + File.separator + "replInitRejectedEntries"); 3558 importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(), 3559 ExistingFileBehavior.OVERWRITE); 3560 3561 // Process import 3562 preBackendImport(backend); 3563 backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext()); 3564 } 3565 catch(Exception e) 3566 { 3567 ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, 3568 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e)))); 3569 } 3570 finally 3571 { 3572 try 3573 { 3574 // Cleanup 3575 if (importConfig != null) 3576 { 3577 importConfig.close(); 3578 closeBackendImport(backend); // Re-enable backend 3579 backend = getBackend(); 3580 } 3581 3582 loadDataState(); 3583 3584 if (ieCtx.getException() != null) 3585 { 3586 // When an error occurred during an import, most of times 3587 // the generationId coming in the root entry of the imported data, 3588 // is not valid anymore (partial data in the backend). 3589 generationId = computeGenerationId(); 3590 saveGenerationId(generationId); 3591 } 3592 } 3593 catch (DirectoryException fe) 3594 { 3595 // If we already catch an Exception it's quite possible 3596 // that the loadDataState() and setGenerationId() fail 3597 // so we don't bother about the new Exception. 3598 // However if there was no Exception before we want 3599 // to return this Exception to the task creator. 3600 ieCtx.setExceptionIfNoneSet(new DirectoryException( 3601 ResultCode.OTHER, 3602 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe)))); 3603 } 3604 } 3605 3606 if (ieCtx.getException() != null) 3607 { 3608 throw ieCtx.getException(); 3609 } 3610 } 3611 3612 /** 3613 * Make post import operations. 3614 * @param backend The backend implied in the import. 3615 * @exception DirectoryException Thrown when an error occurs. 3616 */ 3617 private void closeBackendImport(Backend<?> backend) throws DirectoryException 3618 { 3619 String lockFile = LockFileManager.getBackendLockFileName(backend); 3620 StringBuilder failureReason = new StringBuilder(); 3621 3622 // Release lock 3623 if (!LockFileManager.releaseLock(lockFile, failureReason)) 3624 { 3625 LocalizableMessage message = 3626 WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3627 logger.warn(message); 3628 throw new DirectoryException(ResultCode.OTHER, message); 3629 } 3630 3631 TaskUtils.enableBackend(backend.getBackendID()); 3632 3633 // Restore the processing of backend finalization events. 3634 ignoreBackendInitializationEvent = false; 3635 3636 } 3637 3638 /** 3639 * Retrieves a replication domain based on the baseDN. 3640 * 3641 * @param baseDN The baseDN of the domain to retrieve 3642 * @return The domain retrieved 3643 * @throws DirectoryException When an error occurred or no domain 3644 * match the provided baseDN. 3645 */ 3646 public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN) 3647 throws DirectoryException 3648 { 3649 LDAPReplicationDomain replicationDomain = null; 3650 3651 // Retrieves the domain 3652 for (SynchronizationProvider<?> provider : 3653 DirectoryServer.getSynchronizationProviders()) 3654 { 3655 if (!(provider instanceof MultimasterReplication)) 3656 { 3657 LocalizableMessage message = ERR_INVALID_PROVIDER.get(); 3658 throw new DirectoryException(ResultCode.OTHER, message); 3659 } 3660 3661 // From the domainDN retrieves the replication domain 3662 LDAPReplicationDomain domain = 3663 MultimasterReplication.findDomain(baseDN, null); 3664 if (domain == null) 3665 { 3666 break; 3667 } 3668 if (replicationDomain != null) 3669 { 3670 // Should never happen 3671 LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get(); 3672 throw new DirectoryException(ResultCode.OTHER, message); 3673 } 3674 replicationDomain = domain; 3675 } 3676 3677 if (replicationDomain == null) 3678 { 3679 throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN)); 3680 } 3681 return replicationDomain; 3682 } 3683 3684 /** 3685 * Returns the backend associated to this domain. 3686 * @return The associated backend. 3687 */ 3688 private Backend<?> getBackend() 3689 { 3690 return DirectoryServer.getBackend(getBaseDN()); 3691 } 3692 3693 /* 3694 * <<Total Update 3695 */ 3696 3697 /** 3698 * Push the schema modifications contained in the given parameter as a 3699 * modification that would happen on a local server. The modifications are not 3700 * applied to the local schema backend and historical information is not 3701 * updated; but a CSN is generated and the ServerState associated to the 3702 * schema domain is updated. 3703 * 3704 * @param modifications 3705 * The schema modifications to push 3706 */ 3707 void synchronizeSchemaModifications(List<Modification> modifications) 3708 { 3709 ModifyOperation op = new ModifyOperationBasis( 3710 conn, nextOperationID(), nextMessageID(), null, 3711 DirectoryServer.getSchemaDN(), modifications); 3712 3713 final Entry schema; 3714 try 3715 { 3716 schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN()); 3717 } 3718 catch (DirectoryException e) 3719 { 3720 logger.traceException(e); 3721 logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(), 3722 stackTraceToSingleLineString(e))); 3723 return; 3724 } 3725 3726 LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op); 3727 CSN csn = generateCSN(localOp); 3728 OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema)); 3729 localOp.setAttachment(SYNCHROCONTEXT, ctx); 3730 localOp.setResultCode(ResultCode.SUCCESS); 3731 synchronize(localOp); 3732 } 3733 3734 /** 3735 * Check if the provided configuration is acceptable for add. 3736 * 3737 * @param configuration The configuration to check. 3738 * @param unacceptableReasons When the configuration is not acceptable, this 3739 * table is use to return the reasons why this 3740 * configuration is not acceptable. 3741 * 3742 * @return true if the configuration is acceptable, false other wise. 3743 */ 3744 static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration, 3745 List<LocalizableMessage> unacceptableReasons) 3746 { 3747 // Check that there is not already a domain with the same DN 3748 final DN dn = configuration.getBaseDN(); 3749 LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null); 3750 if (domain != null && domain.getBaseDN().equals(dn)) 3751 { 3752 unacceptableReasons.add(ERR_SYNC_INVALID_DN.get()); 3753 return false; 3754 } 3755 3756 // Check that the base DN is configured as a base-dn of the directory server 3757 if (DirectoryServer.getBackend(dn) == null) 3758 { 3759 unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn)); 3760 return false; 3761 } 3762 3763 // Check fractional configuration 3764 try 3765 { 3766 isFractionalConfigAcceptable(configuration); 3767 } catch (ConfigException e) 3768 { 3769 unacceptableReasons.add(e.getMessageObject()); 3770 return false; 3771 } 3772 3773 return true; 3774 } 3775 3776 @Override 3777 public ConfigChangeResult applyConfigurationChange( 3778 ReplicationDomainCfg configuration) 3779 { 3780 this.config = configuration; 3781 changeConfig(configuration); 3782 3783 // Read assured + fractional configuration and each time reconnect if needed 3784 readAssuredConfig(configuration, true); 3785 readFractionalConfig(configuration, true); 3786 3787 solveConflictFlag = isSolveConflict(configuration); 3788 3789 final ConfigChangeResult ccr = new ConfigChangeResult(); 3790 try 3791 { 3792 storeECLConfiguration(configuration); 3793 } 3794 catch(Exception e) 3795 { 3796 ccr.setResultCode(ResultCode.OTHER); 3797 } 3798 return ccr; 3799 } 3800 3801 @Override 3802 public boolean isConfigurationChangeAcceptable( 3803 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 3804 { 3805 // Check that a import/export is not in progress 3806 if (ieRunning()) 3807 { 3808 unacceptableReasons.add( 3809 NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get()); 3810 return false; 3811 } 3812 3813 // Check fractional configuration 3814 try 3815 { 3816 isFractionalConfigAcceptable(configuration); 3817 return true; 3818 } 3819 catch (ConfigException e) 3820 { 3821 unacceptableReasons.add(e.getMessageObject()); 3822 return false; 3823 } 3824 } 3825 3826 @Override 3827 public Map<String, String> getAlerts() 3828 { 3829 Map<String, String> alerts = new LinkedHashMap<>(); 3830 3831 alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, 3832 ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT); 3833 return alerts; 3834 } 3835 3836 @Override 3837 public String getClassName() 3838 { 3839 return CLASS_NAME; 3840 } 3841 3842 @Override 3843 public DN getComponentEntryDN() 3844 { 3845 return config.dn(); 3846 } 3847 3848 /** Starts the Replication Domain. */ 3849 public void start() 3850 { 3851 // Create the ServerStateFlush thread 3852 flushThread.start(); 3853 3854 startListenService(); 3855 } 3856 3857 /** Remove the configuration of the external changelog from this domain configuration. */ 3858 private void removeECLDomainCfg() 3859 { 3860 try 3861 { 3862 DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn()); 3863 if (DirectoryServer.getConfigurationHandler().hasEntry(eclConfigEntryDN)) 3864 { 3865 DirectoryServer.getConfigurationHandler().deleteEntry(eclConfigEntryDN); 3866 } 3867 } 3868 catch(Exception e) 3869 { 3870 logger.traceException(e); 3871 logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e)); 3872 } 3873 } 3874 3875 /** 3876 * Store the provided ECL configuration for the domain. 3877 * @param domCfg The provided configuration. 3878 * @throws ConfigException When an error occurred. 3879 */ 3880 private void storeECLConfiguration(ReplicationDomainCfg domCfg) 3881 throws ConfigException 3882 { 3883 ExternalChangelogDomainCfg eclDomCfg = null; 3884 // create the ecl config if it does not exist 3885 // There may not be any config entry related to this domain in some 3886 // unit test cases 3887 try 3888 { 3889 DN configDn = config.dn(); 3890 ConfigurationHandler configHandler = DirectoryServer.getConfigurationHandler(); 3891 if (configHandler.hasEntry(config.dn())) 3892 { 3893 try 3894 { eclDomCfg = domCfg.getExternalChangelogDomain(); 3895 } catch(Exception e) { /* do nothing */ } 3896 // domain with no config entry only when running unit tests 3897 if (eclDomCfg == null) 3898 { 3899 // no ECL config provided hence create a default one 3900 // create the default one 3901 DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn); 3902 if (!configHandler.hasEntry(eclConfigEntryDN)) 3903 { 3904 // no entry exist yet for the ECL config for this domain 3905 // create it 3906 String ldif = makeLdif( 3907 "dn: cn=external changelog," + configDn, 3908 "objectClass: top", 3909 "objectClass: ds-cfg-external-changelog-domain", 3910 "cn: external changelog", 3911 "ds-cfg-enabled: " + !getBackend().isPrivateBackend()); 3912 LDIFImportConfig ldifImportConfig = new LDIFImportConfig( 3913 new StringReader(ldif)); 3914 // No need to validate schema in replication 3915 ldifImportConfig.setValidateSchema(false); 3916 LDIFReader reader = new LDIFReader(ldifImportConfig); 3917 Entry eclEntry = reader.readEntry(); 3918 configHandler.addEntry(Converters.from(eclEntry)); 3919 ldifImportConfig.close(); 3920 } 3921 } 3922 } 3923 eclDomCfg = domCfg.getExternalChangelogDomain(); 3924 if (eclDomain != null) 3925 { 3926 eclDomain.applyConfigurationChange(eclDomCfg); 3927 } 3928 else 3929 { 3930 // Create the ECL domain object 3931 eclDomain = new ExternalChangelogDomain(this, eclDomCfg); 3932 } 3933 } 3934 catch (Exception e) 3935 { 3936 throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( 3937 "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e); 3938 } 3939 } 3940 3941 private static String makeLdif(String... lines) 3942 { 3943 final StringBuilder buffer = new StringBuilder(); 3944 for (String line : lines) { 3945 buffer.append(line).append(EOL); 3946 } 3947 // Append an extra line so we can append LDIF Strings. 3948 buffer.append(EOL); 3949 return buffer.toString(); 3950 } 3951 3952 @Override 3953 public void sessionInitiated(ServerStatus initStatus, ServerState rsState) 3954 { 3955 // Check domain fractional configuration consistency with local 3956 // configuration variables 3957 forceBadDataSet = !isBackendFractionalConfigConsistent(); 3958 3959 super.sessionInitiated(initStatus, rsState); 3960 3961 // Now for bad data set status if needed 3962 if (forceBadDataSet) 3963 { 3964 signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT); 3965 logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN()); 3966 return; // Do not send changes to the replication server 3967 } 3968 3969 try 3970 { 3971 /* 3972 * We must not publish changes to a replicationServer that has 3973 * not seen all our previous changes because this could cause 3974 * some other ldap servers to miss those changes. 3975 * Check that the ReplicationServer has seen all our previous 3976 * changes. 3977 */ 3978 CSN replServerMaxCSN = rsState.getCSN(getServerId()); 3979 3980 // we don't want to update from here (a DS) an empty RS because 3981 // normally the RS should have been updated by other RSes except for 3982 // very last changes lost if the local connection was broken 3983 // ... hence the RS we are connected to should not be empty 3984 // ... or if it is empty, it is due to a voluntary reset 3985 // and we don't want to update it with our changes that could be huge. 3986 if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0) 3987 { 3988 CSN ourMaxCSN = state.getMaxCSN(getServerId()); 3989 if (ourMaxCSN != null 3990 && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN)) 3991 { 3992 pendingChanges.setRecovering(true); 3993 broker.setRecoveryRequired(true); 3994 final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN); 3995 if (this.rsUpdater.compareAndSet(null, rsUpdater)) 3996 { 3997 rsUpdater.start(); 3998 } 3999 } 4000 } 4001 } catch (Exception e) 4002 { 4003 logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e)); 4004 } 4005 } 4006 4007 /** 4008 * Build the list of changes that have been processed by this server after the 4009 * CSN given as a parameter and publish them using the given session. 4010 * 4011 * @param startCSN 4012 * The CSN where we need to start the search 4013 * @param session 4014 * The session to use to publish the changes 4015 * @return A boolean indicating he success of the operation. 4016 * @throws Exception 4017 * if an Exception happens during the search. 4018 */ 4019 boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session) 4020 throws Exception 4021 { 4022 // Trim the changes in replayOperations that are older than the startCSN. 4023 synchronized (replayOperations) 4024 { 4025 Iterator<CSN> it = replayOperations.keySet().iterator(); 4026 while (it.hasNext()) 4027 { 4028 if (shutdown.get()) 4029 { 4030 return false; 4031 } 4032 if (it.next().isNewerThan(startCSN)) 4033 { 4034 break; 4035 } 4036 it.remove(); 4037 } 4038 } 4039 4040 CSN lastRetrievedChange; 4041 InternalSearchOperation op; 4042 CSN currentStartCSN = startCSN; 4043 do 4044 { 4045 if (shutdown.get()) 4046 { 4047 return false; 4048 } 4049 4050 lastRetrievedChange = null; 4051 // We can't do the search in one go because we need to store the results 4052 // so that we are sure we send the operations in order and because the 4053 // list might be large. 4054 // So we search by interval of 10 seconds and store the results in the 4055 // replayOperations list so that they are sorted before sending them. 4056 long missingChangesDelta = currentStartCSN.getTime() + 10000; 4057 CSN endCSN = new CSN(missingChangesDelta, Integer.MAX_VALUE, getServerId()); 4058 4059 ScanSearchListener listener = 4060 new ScanSearchListener(currentStartCSN, endCSN); 4061 op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN, 4062 listener); 4063 4064 // Publish and remove all the changes from the replayOperations list 4065 // that are older than the endCSN. 4066 final List<FakeOperation> opsToSend = new LinkedList<>(); 4067 synchronized (replayOperations) 4068 { 4069 Iterator<FakeOperation> itOp = replayOperations.values().iterator(); 4070 while (itOp.hasNext()) 4071 { 4072 if (shutdown.get()) 4073 { 4074 return false; 4075 } 4076 FakeOperation fakeOp = itOp.next(); 4077 if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check 4078 || !state.cover(fakeOp.getCSN()) 4079 // do not look for replay operations in the future 4080 || currentStartCSN.isNewerThan(now())) 4081 { 4082 break; 4083 } 4084 4085 lastRetrievedChange = fakeOp.getCSN(); 4086 opsToSend.add(fakeOp); 4087 itOp.remove(); 4088 } 4089 } 4090 4091 for (FakeOperation opToSend : opsToSend) 4092 { 4093 if (shutdown.get()) 4094 { 4095 return false; 4096 } 4097 session.publishRecovery(opToSend.generateMessage()); 4098 } 4099 4100 if (lastRetrievedChange != null) 4101 { 4102 if (logger.isDebugEnabled()) 4103 { 4104 logger.debug(LocalizableMessage.raw("publish loop" 4105 + " >=" + currentStartCSN + " <=" + endCSN 4106 + " nentries=" + op.getEntriesSent() 4107 + " result=" + op.getResultCode() 4108 + " lastRetrievedChange=" + lastRetrievedChange)); 4109 } 4110 currentStartCSN = lastRetrievedChange; 4111 } 4112 else 4113 { 4114 if (logger.isDebugEnabled()) 4115 { 4116 logger.debug(LocalizableMessage.raw("publish loop" 4117 + " >=" + currentStartCSN + " <=" + endCSN 4118 + " nentries=" + op.getEntriesSent() 4119 + " result=" + op.getResultCode() 4120 + " no changes")); 4121 } 4122 currentStartCSN = endCSN; 4123 } 4124 } while (pendingChanges.recoveryUntil(currentStartCSN) 4125 && op.getResultCode().equals(ResultCode.SUCCESS)); 4126 4127 return op.getResultCode().equals(ResultCode.SUCCESS); 4128 } 4129 4130 private static CSN now() 4131 { 4132 // ensure now() will always come last with isNewerThan() test, 4133 // even though the timestamp, or the timestamp and seqnum would be the same 4134 return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE); 4135 } 4136 4137 /** 4138 * Search for the changes that happened since fromCSN based on the historical 4139 * attribute. The only changes that will be send will be the one generated on 4140 * the serverId provided in fromCSN. 4141 * 4142 * @param baseDN 4143 * the base DN 4144 * @param startCsnInclusive 4145 * The CSN from which we want the changes 4146 * @param endCsnInclusive 4147 * The max CSN that the search should return 4148 * @param resultListener 4149 * The listener that will process the entries returned 4150 * @return the internal search operation 4151 * @throws Exception 4152 * when raised. 4153 */ 4154 private static InternalSearchOperation searchForChangedEntries(DN baseDN, 4155 CSN startCsnInclusive, CSN endCsnInclusive, InternalSearchListener resultListener) 4156 throws Exception 4157 { 4158 final String assertionValue = endCsnInclusive == null 4159 ? ">= " + startCsnInclusive 4160 : ">= " + startCsnInclusive + ", <= " + endCsnInclusive; 4161 final SearchFilter filter = 4162 SearchFilter.createExtensibleMatchFilter(AttributeDescription.valueOf(HISTORICAL_ATTRIBUTE_NAME) 4163 .getAttributeType(), 4164 ByteString.valueOfUtf8(assertionValue), 4165 EXTMR_HISTORICAL_CSN_RANGE_OID, false); 4166 SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter) 4167 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 4168 return getRootConnection().processSearch(request, resultListener); 4169 } 4170 4171 /** 4172 * Search for the changes that happened since fromCSN based on the historical 4173 * attribute. The only changes that will be send will be the one generated on 4174 * the serverId provided in fromCSN. 4175 * 4176 * @param baseDN 4177 * the base DN 4178 * @param fromCSN 4179 * The CSN from which we want the changes 4180 * @param resultListener 4181 * that will process the entries returned. 4182 * @return the internal search operation 4183 * @throws Exception 4184 * when raised. 4185 */ 4186 static InternalSearchOperation searchForChangedEntries(DN baseDN, 4187 CSN fromCSN, InternalSearchListener resultListener) throws Exception 4188 { 4189 return searchForChangedEntries(baseDN, fromCSN, null, resultListener); 4190 } 4191 4192 /** 4193 * This method should return the total number of objects in the 4194 * replicated domain. 4195 * This count will be used for reporting. 4196 * 4197 * @throws DirectoryException when needed. 4198 * 4199 * @return The number of objects in the replication domain. 4200 */ 4201 @Override 4202 public long countEntries() throws DirectoryException 4203 { 4204 Backend<?> backend = getBackend(); 4205 if (!backend.supports(BackendOperation.LDIF_EXPORT)) 4206 { 4207 LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID()); 4208 logger.error(msg); 4209 throw new DirectoryException(ResultCode.OTHER, msg); 4210 } 4211 4212 return backend.getNumberOfEntriesInBaseDN(getBaseDN()); 4213 } 4214 4215 @Override 4216 public boolean processUpdate(UpdateMsg updateMsg) 4217 { 4218 // Ignore message if fractional configuration is inconsistent and 4219 // we have been passed into bad data set status 4220 if (forceBadDataSet) 4221 { 4222 return false; 4223 } 4224 4225 if (updateMsg instanceof LDAPUpdateMsg) 4226 { 4227 LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg; 4228 4229 // Put the UpdateMsg in the RemotePendingChanges list. 4230 if (!remotePendingChanges.putRemoteUpdate(msg)) 4231 { 4232 /* 4233 * Already received this change so ignore it. This may happen if there 4234 * are uncommitted changes in the queue and session failover occurs 4235 * causing a recovery of all changes since the current committed server 4236 * state. See OPENDJ-1115. 4237 */ 4238 if (logger.isTraceEnabled()) 4239 { 4240 logger.trace( 4241 "LDAPReplicationDomain.processUpdate: ignoring " 4242 + "duplicate change %s", msg.getCSN()); 4243 } 4244 return true; 4245 } 4246 4247 // Put update message into the replay queue 4248 // (block until some place in the queue is available) 4249 final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); 4250 while (!isListenerShuttingDown()) 4251 { 4252 // loop until we can offer to the queue or shutdown was initiated 4253 try 4254 { 4255 if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS)) 4256 { 4257 // successful offer to the queue, let's exit the loop 4258 break; 4259 } 4260 } 4261 catch (InterruptedException e) 4262 { 4263 // Thread interrupted: check for shutdown. 4264 Thread.currentThread().interrupt(); 4265 } 4266 } 4267 4268 return false; 4269 } 4270 4271 // unknown message type, this should not happen, just ignore it. 4272 return true; 4273 } 4274 4275 @Override 4276 public void addAdditionalMonitoring(MonitorData attributes) 4277 { 4278 attributes.add("pending-updates", pendingChanges.size()); 4279 attributes.add("replayed-updates-ok", numReplayedPostOpCalled); 4280 attributes.add("resolved-modify-conflicts", numResolvedModifyConflicts); 4281 attributes.add("resolved-naming-conflicts", numResolvedNamingConflicts); 4282 attributes.add("unresolved-naming-conflicts", numUnresolvedNamingConflicts); 4283 attributes.add("remote-pending-changes-size", remotePendingChanges.getQueueSize()); 4284 attributes.add("dependent-changes-size", remotePendingChanges.getDependentChangesSize()); 4285 attributes.add("changes-in-progress-size", remotePendingChanges.changesInProgressSize()); 4286 } 4287 4288 /** 4289 * Verifies that the given string represents a valid source 4290 * from which this server can be initialized. 4291 * @param sourceString The string representing the source 4292 * @return The source as a integer value 4293 * @throws DirectoryException if the string is not valid 4294 */ 4295 public int decodeSource(String sourceString) throws DirectoryException 4296 { 4297 int source = 0; 4298 try 4299 { 4300 source = Integer.decode(sourceString); 4301 if (source >= -1 && source != getServerId()) 4302 { 4303 // TODO Verifies serverID is in the domain 4304 // We should check here that this is a server implied 4305 // in the current domain. 4306 return source; 4307 } 4308 } 4309 catch (Exception e) 4310 { 4311 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get( 4312 getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e)); 4313 throw new DirectoryException(ResultCode.OTHER, message, e); 4314 } 4315 4316 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, ""); 4317 throw new DirectoryException(ResultCode.OTHER, message); 4318 } 4319 4320 /** 4321 * Called by synchronize post op plugin in order to add the entry historical 4322 * attributes to the UpdateMsg. 4323 * @param msg an replication update message 4324 * @param op the operation in progress 4325 */ 4326 private void addEntryAttributesForCL(UpdateMsg msg, 4327 PostOperationOperation op) 4328 { 4329 if (op instanceof PostOperationDeleteOperation) 4330 { 4331 PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op; 4332 final Set<String> names = getEclIncludesForDeletes(); 4333 Entry entry = delOp.getEntryToDelete(); 4334 final DeleteMsg deleteMsg = (DeleteMsg) msg; 4335 deleteMsg.setEclIncludes(getIncludedAttributes(entry, names)); 4336 4337 // For delete only, add the Authorized DN since it's required in the 4338 // ECL entry but is not part of rest of the message. 4339 DN deleterDN = delOp.getAuthorizationDN(); 4340 if (deleterDN != null) 4341 { 4342 deleteMsg.setInitiatorsName(deleterDN.toString()); 4343 } 4344 } 4345 else if (op instanceof PostOperationModifyOperation) 4346 { 4347 PostOperationModifyOperation modOp = (PostOperationModifyOperation) op; 4348 Set<String> names = getEclIncludes(); 4349 Entry entry = modOp.getCurrentEntry(); 4350 ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4351 } 4352 else if (op instanceof PostOperationModifyDNOperation) 4353 { 4354 PostOperationModifyDNOperation modDNOp = 4355 (PostOperationModifyDNOperation) op; 4356 Set<String> names = getEclIncludes(); 4357 Entry entry = modDNOp.getOriginalEntry(); 4358 ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4359 } 4360 else if (op instanceof PostOperationAddOperation) 4361 { 4362 PostOperationAddOperation addOp = (PostOperationAddOperation) op; 4363 Set<String> names = getEclIncludes(); 4364 Entry entry = addOp.getEntryToAdd(); 4365 ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4366 } 4367 } 4368 4369 private Collection<Attribute> getIncludedAttributes(Entry entry, 4370 Set<String> names) 4371 { 4372 if (names.isEmpty()) 4373 { 4374 // Fast-path. 4375 return Collections.emptySet(); 4376 } 4377 else if (names.size() == 1 && names.contains("*")) 4378 { 4379 // Potential fast-path for delete operations. 4380 List<Attribute> attributes = new LinkedList<>(); 4381 for (List<Attribute> attributeList : entry.getUserAttributes().values()) 4382 { 4383 attributes.addAll(attributeList); 4384 } 4385 Attribute objectClassAttribute = entry.getObjectClassAttribute(); 4386 if (objectClassAttribute != null) 4387 { 4388 attributes.add(objectClassAttribute); 4389 } 4390 return attributes; 4391 } 4392 else 4393 { 4394 // Expand @objectclass references in attribute list if needed. 4395 // We do this now in order to take into account dynamic schema changes. 4396 final Set<String> expandedNames = getExpandedNames(names); 4397 final Entry filteredEntry = 4398 entry.filterEntry(expandedNames, false, false, false); 4399 return filteredEntry.getAttributes(); 4400 } 4401 } 4402 4403 private Set<String> getExpandedNames(Set<String> names) 4404 { 4405 // Only rebuild the attribute set if necessary. 4406 if (!needsExpanding(names)) 4407 { 4408 return names; 4409 } 4410 4411 final Set<String> expandedNames = new HashSet<>(names.size()); 4412 for (String name : names) 4413 { 4414 if (name.startsWith("@")) 4415 { 4416 String ocName = name.substring(1); 4417 ObjectClass objectClass = DirectoryServer.getSchema().getObjectClass(ocName); 4418 if (!objectClass.isPlaceHolder()) 4419 { 4420 for (AttributeType at : objectClass.getRequiredAttributes()) 4421 { 4422 expandedNames.add(at.getNameOrOID()); 4423 } 4424 for (AttributeType at : objectClass.getOptionalAttributes()) 4425 { 4426 expandedNames.add(at.getNameOrOID()); 4427 } 4428 } 4429 } 4430 else 4431 { 4432 expandedNames.add(name); 4433 } 4434 } 4435 return expandedNames; 4436 } 4437 4438 private boolean needsExpanding(Set<String> names) 4439 { 4440 for (String name : names) 4441 { 4442 if (name.startsWith("@")) 4443 { 4444 return true; 4445 } 4446 } 4447 return false; 4448 } 4449 4450 /** 4451 * Gets the fractional configuration of this domain. 4452 * @return The fractional configuration of this domain. 4453 */ 4454 FractionalConfig getFractionalConfig() 4455 { 4456 return fractionalConfig; 4457 } 4458 4459 /** 4460 * This bean is a utility class used for holding the parsing 4461 * result of a fractional configuration. It also contains some facility 4462 * methods like fractional configuration comparison... 4463 */ 4464 static class FractionalConfig 4465 { 4466 /** 4467 * Tells if fractional replication is enabled or not (some fractional 4468 * constraints have been put in place). If this is true then 4469 * fractionalExclusive explains the configuration mode and either 4470 * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or 4471 * both should be filled with something. 4472 */ 4473 private boolean fractional; 4474 4475 /** 4476 * - If true, tells that the configured fractional replication is exclusive: 4477 * Every attributes contained in fractionalSpecificClassesAttributes and 4478 * fractionalAllClassesAttributes should be ignored when replaying operation 4479 * in local backend. 4480 * - If false, tells that the configured fractional replication is 4481 * inclusive: 4482 * Only attributes contained in fractionalSpecificClassesAttributes and 4483 * fractionalAllClassesAttributes should be taken into account in local 4484 * backend. 4485 */ 4486 private boolean fractionalExclusive = true; 4487 4488 /** 4489 * Used in fractional replication: holds attributes of a specific object class. 4490 * - key = object class (name or OID of the class) 4491 * - value = the attributes of that class that should be taken into account 4492 * (inclusive or exclusive fractional replication) (name or OID of the 4493 * attribute) 4494 * When an operation coming from the network is to be locally replayed, if 4495 * the concerned entry has an objectClass attribute equals to 'key': 4496 * - inclusive mode: only the attributes in 'value' will be added/deleted/modified 4497 * - exclusive mode: the attributes in 'value' will not be added/deleted/modified 4498 */ 4499 private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>(); 4500 4501 /** 4502 * Used in fractional replication: holds attributes of any object class. 4503 * When an operation coming from the network is to be locally replayed: 4504 * - inclusive mode: only attributes of the matching entry not present in 4505 * fractionalAllClassesAttributes will be added/deleted/modified 4506 * - exclusive mode: attributes of the matching entry present in 4507 * fractionalAllClassesAttributes will not be added/deleted/modified 4508 * The attributes may be in human readable form of OID form. 4509 */ 4510 private Set<String> fractionalAllClassesAttributes = new HashSet<>(); 4511 4512 /** Base DN the fractional configuration is for. */ 4513 private final DN baseDN; 4514 4515 /** 4516 * Constructs a new fractional configuration object. 4517 * @param baseDN The base DN the object is for. 4518 */ 4519 private FractionalConfig(DN baseDN) 4520 { 4521 this.baseDN = baseDN; 4522 } 4523 4524 /** 4525 * Getter for fractional. 4526 * @return True if the configuration has fractional enabled 4527 */ 4528 boolean isFractional() 4529 { 4530 return fractional; 4531 } 4532 4533 /** 4534 * Set the fractional parameter. 4535 * @param fractional The fractional parameter 4536 */ 4537 private void setFractional(boolean fractional) 4538 { 4539 this.fractional = fractional; 4540 } 4541 4542 /** 4543 * Getter for fractionalExclusive. 4544 * @return True if the configuration has fractional exclusive enabled 4545 */ 4546 boolean isFractionalExclusive() 4547 { 4548 return fractionalExclusive; 4549 } 4550 4551 /** 4552 * Set the fractionalExclusive parameter. 4553 * @param fractionalExclusive The fractionalExclusive parameter 4554 */ 4555 private void setFractionalExclusive(boolean fractionalExclusive) 4556 { 4557 this.fractionalExclusive = fractionalExclusive; 4558 } 4559 4560 /** 4561 * Getter for fractionalSpecificClassesAttributes attribute. 4562 * @return The fractionalSpecificClassesAttributes attribute. 4563 */ 4564 Map<String, Set<String>> getFractionalSpecificClassesAttributes() 4565 { 4566 return fractionalSpecificClassesAttributes; 4567 } 4568 4569 /** 4570 * Set the fractionalSpecificClassesAttributes parameter. 4571 * @param fractionalSpecificClassesAttributes The 4572 * fractionalSpecificClassesAttributes parameter to set. 4573 */ 4574 private void setFractionalSpecificClassesAttributes( 4575 Map<String, Set<String>> fractionalSpecificClassesAttributes) 4576 { 4577 this.fractionalSpecificClassesAttributes = 4578 fractionalSpecificClassesAttributes; 4579 } 4580 4581 /** 4582 * Getter for fractionalSpecificClassesAttributes attribute. 4583 * @return The fractionalSpecificClassesAttributes attribute. 4584 */ 4585 Set<String> getFractionalAllClassesAttributes() 4586 { 4587 return fractionalAllClassesAttributes; 4588 } 4589 4590 /** 4591 * Set the fractionalAllClassesAttributes parameter. 4592 * @param fractionalAllClassesAttributes The 4593 * fractionalSpecificClassesAttributes parameter to set. 4594 */ 4595 private void setFractionalAllClassesAttributes( 4596 Set<String> fractionalAllClassesAttributes) 4597 { 4598 this.fractionalAllClassesAttributes = fractionalAllClassesAttributes; 4599 } 4600 4601 /** 4602 * Getter for the base baseDN. 4603 * @return The baseDN attribute. 4604 */ 4605 DN getBaseDn() 4606 { 4607 return baseDN; 4608 } 4609 4610 /** 4611 * Extract the fractional configuration from the passed domain configuration 4612 * entry. 4613 * @param configuration The configuration object 4614 * @return The fractional replication configuration. 4615 * @throws ConfigException If an error occurred. 4616 */ 4617 static FractionalConfig toFractionalConfig( 4618 ReplicationDomainCfg configuration) throws ConfigException 4619 { 4620 // Prepare fractional configuration variables to parse 4621 Iterator<String> exclIt = configuration.getFractionalExclude().iterator(); 4622 Iterator<String> inclIt = configuration.getFractionalInclude().iterator(); 4623 4624 // Get potentially new fractional configuration 4625 Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>(); 4626 Set<String> newFractionalAllClassesAttributes = new HashSet<>(); 4627 4628 int newFractionalMode = parseFractionalConfig(exclIt, inclIt, 4629 newFractionalSpecificClassesAttributes, 4630 newFractionalAllClassesAttributes); 4631 4632 // Create matching parsed config object 4633 FractionalConfig result = new FractionalConfig(configuration.getBaseDN()); 4634 switch (newFractionalMode) 4635 { 4636 case NOT_FRACTIONAL: 4637 result.setFractional(false); 4638 result.setFractionalExclusive(true); 4639 break; 4640 case EXCLUSIVE_FRACTIONAL: 4641 case INCLUSIVE_FRACTIONAL: 4642 result.setFractional(true); 4643 result.setFractionalExclusive( 4644 newFractionalMode == EXCLUSIVE_FRACTIONAL); 4645 break; 4646 } 4647 result.setFractionalSpecificClassesAttributes( 4648 newFractionalSpecificClassesAttributes); 4649 result.setFractionalAllClassesAttributes( 4650 newFractionalAllClassesAttributes); 4651 return result; 4652 } 4653 4654 /** 4655 * Parses a fractional replication configuration, filling the empty passed 4656 * variables and returning the used fractional mode. The 2 passed variables 4657 * to fill should be initialized (not null) and empty. 4658 * @param exclIt The list of fractional exclude configuration values (may be 4659 * null) 4660 * @param inclIt The list of fractional include configuration values (may be 4661 * null) 4662 * @param fractionalSpecificClassesAttributes An empty map to be filled with 4663 * what is read from the fractional configuration properties. 4664 * @param fractionalAllClassesAttributes An empty list to be filled with 4665 * what is read from the fractional configuration properties. 4666 * @return the fractional mode deduced from the passed configuration: 4667 * not fractional, exclusive fractional or inclusive fractional 4668 * modes 4669 */ 4670 private static int parseFractionalConfig( 4671 Iterator<?> exclIt, Iterator<?> inclIt, 4672 Map<String, Set<String>> fractionalSpecificClassesAttributes, 4673 Set<String> fractionalAllClassesAttributes) throws ConfigException 4674 { 4675 // Determine if fractional-exclude or fractional-include property is used: 4676 // only one of them is allowed 4677 int fractionalMode; 4678 Iterator<?> iterator; 4679 if (exclIt != null && exclIt.hasNext()) 4680 { 4681 if (inclIt != null && inclIt.hasNext()) 4682 { 4683 throw new ConfigException( 4684 NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get()); 4685 } 4686 4687 fractionalMode = EXCLUSIVE_FRACTIONAL; 4688 iterator = exclIt; 4689 } 4690 else 4691 { 4692 if (inclIt != null && inclIt.hasNext()) 4693 { 4694 fractionalMode = INCLUSIVE_FRACTIONAL; 4695 iterator = inclIt; 4696 } 4697 else 4698 { 4699 return NOT_FRACTIONAL; 4700 } 4701 } 4702 4703 while (iterator.hasNext()) 4704 { 4705 // Parse a value with the form class:attr1,attr2... 4706 // or *:attr1,attr2... 4707 String fractCfgStr = iterator.next().toString(); 4708 StringTokenizer st = new StringTokenizer(fractCfgStr, ":"); 4709 int nTokens = st.countTokens(); 4710 if (nTokens < 2) 4711 { 4712 throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT.get(fractCfgStr)); 4713 } 4714 // Get the class name 4715 String classNameLower = st.nextToken().toLowerCase(); 4716 boolean allClasses = "*".equals(classNameLower); 4717 // Get the attributes 4718 String attributes = st.nextToken(); 4719 st = new StringTokenizer(attributes, ","); 4720 while (st.hasMoreTokens()) 4721 { 4722 String attrNameLower = st.nextToken().toLowerCase(); 4723 // Store attribute in the appropriate variable 4724 if (allClasses) 4725 { 4726 fractionalAllClassesAttributes.add(attrNameLower); 4727 } 4728 else 4729 { 4730 Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower); 4731 if (attrList == null) 4732 { 4733 attrList = new LinkedHashSet<>(); 4734 fractionalSpecificClassesAttributes.put(classNameLower, attrList); 4735 } 4736 attrList.add(attrNameLower); 4737 } 4738 } 4739 } 4740 return fractionalMode; 4741 } 4742 4743 /** Return type of the parseFractionalConfig method. */ 4744 private static final int NOT_FRACTIONAL = 0; 4745 private static final int EXCLUSIVE_FRACTIONAL = 1; 4746 private static final int INCLUSIVE_FRACTIONAL = 2; 4747 4748 /** 4749 * Get an integer representation of the domain fractional configuration. 4750 * @return An integer representation of the domain fractional configuration. 4751 */ 4752 private int fractionalConfigToInt() 4753 { 4754 if (!fractional) 4755 { 4756 return NOT_FRACTIONAL; 4757 } 4758 else if (fractionalExclusive) 4759 { 4760 return EXCLUSIVE_FRACTIONAL; 4761 } 4762 return INCLUSIVE_FRACTIONAL; 4763 } 4764 4765 /** 4766 * Compare 2 fractional replication configurations and returns true if they 4767 * are equivalent. 4768 * @param cfg1 First fractional configuration 4769 * @param cfg2 Second fractional configuration 4770 * @return True if both configurations are equivalent. 4771 * @throws ConfigException If some classes or attributes could not be 4772 * retrieved from the schema. 4773 */ 4774 private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1, 4775 FractionalConfig cfg2) throws ConfigException 4776 { 4777 // Compare base DNs just to be consistent 4778 if (!cfg1.getBaseDn().equals(cfg2.getBaseDn())) 4779 { 4780 return false; 4781 } 4782 4783 // Compare modes 4784 if (cfg1.isFractional() != cfg2.isFractional() 4785 || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()) 4786 { 4787 return false; 4788 } 4789 4790 // Compare all classes attributes 4791 Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes(); 4792 Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes(); 4793 if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2)) 4794 { 4795 return false; 4796 } 4797 4798 // Compare specific classes attributes 4799 Map<String, Set<String>> specificClassesAttrs1 = 4800 cfg1.getFractionalSpecificClassesAttributes(); 4801 Map<String, Set<String>> specificClassesAttrs2 = 4802 cfg2.getFractionalSpecificClassesAttributes(); 4803 if (specificClassesAttrs1.size() != specificClassesAttrs2.size()) 4804 { 4805 return false; 4806 } 4807 4808 /* 4809 * Check consistency of specific classes attributes 4810 * 4811 * For each class in specificClassesAttributes1, check that the attribute 4812 * list is equivalent to specificClassesAttributes2 attribute list 4813 */ 4814 Schema schema = DirectoryServer.getSchema(); 4815 for (String className1 : specificClassesAttrs1.keySet()) 4816 { 4817 // Get class from specificClassesAttributes1 4818 ObjectClass objectClass1 = schema.getObjectClass(className1); 4819 if (objectClass1.isPlaceHolder()) 4820 { 4821 throw new ConfigException( 4822 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1)); 4823 } 4824 4825 // Look for matching one in specificClassesAttributes2 4826 boolean foundClass = false; 4827 for (String className2 : specificClassesAttrs2.keySet()) 4828 { 4829 ObjectClass objectClass2 = schema.getObjectClass(className2); 4830 if (objectClass2.isPlaceHolder()) 4831 { 4832 throw new ConfigException( 4833 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2)); 4834 } 4835 if (objectClass1.equals(objectClass2)) 4836 { 4837 foundClass = true; 4838 // Now compare the 2 attribute lists 4839 Set<String> attributes1 = specificClassesAttrs1.get(className1); 4840 Set<String> attributes2 = specificClassesAttrs2.get(className2); 4841 if (!areAttributesEquivalent(attributes1, attributes2)) 4842 { 4843 return false; 4844 } 4845 break; 4846 } 4847 } 4848 // Found matching class ? 4849 if (!foundClass) 4850 { 4851 return false; 4852 } 4853 } 4854 4855 return true; 4856 } 4857 } 4858 4859 /** 4860 * Specifies whether this domain is enabled/disabled regarding the ECL. 4861 * @return enabled/disabled for the ECL. 4862 */ 4863 boolean isECLEnabled() 4864 { 4865 return this.eclDomain.isEnabled(); 4866 } 4867 4868 /** 4869 * Return the minimum time (in ms) that the domain keeps the historical 4870 * information necessary to solve conflicts. 4871 * 4872 * @return the purge delay. 4873 */ 4874 long getHistoricalPurgeDelay() 4875 { 4876 return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; 4877 } 4878 4879 /** 4880 * Check and purge the historical attribute on all eligible entries under this domain. 4881 * 4882 * The purging logic is the same applied to individual entries during modify operations. This 4883 * task may be useful in scenarios where a large number of changes are made as a one-off occurrence. 4884 * Running a purge-historical after the 'ds-cfg-conflicts-historical-purge-delay' period has elapsed 4885 * would clear out obsolete historical data from all the modified entries reducing the overall 4886 * database size. 4887 * 4888 * @param task 4889 * the task raising this purge. 4890 * @param endDate 4891 * the date to stop this task whether the job is done or not. 4892 * @throws DirectoryException 4893 * when an exception happens. 4894 */ 4895 public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task, 4896 long endDate) throws DirectoryException 4897 { 4898 logger.trace("[PURGE] purgeConflictsHistorical " 4899 + "on domain: " + getBaseDN() 4900 + "endDate:" + new Date(endDate) 4901 + "lastCSNPurgedFromHist: " 4902 + lastCSNPurgedFromHist.toStringUI()); 4903 4904 4905 // It would be nice to have an upper bound on this filter to eliminate results that don't have a purgeable 4906 // csn in them. However, historicalCsnOrderingMatch keys start with serverid rather than timestamp so this 4907 // isn't possible. 4908 String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")"; 4909 4910 int count = 0; 4911 boolean finished = false; 4912 ByteString pagingCookie = null; 4913 4914 while(!finished) 4915 { 4916 if (task != null) 4917 { 4918 task.setProgressStats(lastCSNPurgedFromHist, count); 4919 } 4920 4921 finished = true; 4922 4923 4924 SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 4925 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS) 4926 .addControl(new PagedResultsControl(false, ConfigConstants.DEFAULT_SIZE_LIMIT, pagingCookie)) 4927 .setSizeLimit(ConfigConstants.DEFAULT_SIZE_LIMIT + 1); 4928 4929 InternalSearchOperation searchOp = conn.processSearch(request); 4930 4931 for (Control c : searchOp.getResponseControls()) 4932 { 4933 if (c.getOID().equals(OID_PAGED_RESULTS_CONTROL)) 4934 { 4935 ByteString newPagingCookie = ((PagedResultsControl)c).getCookie(); 4936 4937 if( newPagingCookie != null && 4938 newPagingCookie.length() > 0 && 4939 !newPagingCookie.equals(pagingCookie)) 4940 { 4941 pagingCookie = newPagingCookie; 4942 finished = false; 4943 } 4944 } 4945 } 4946 4947 for (SearchResultEntry entry : searchOp.getSearchEntries()) 4948 { 4949 long maxTimeToRun = endDate - TimeThread.getTime(); 4950 if (maxTimeToRun < 0) { 4951 throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED, 4952 LocalizableMessage.raw(" end date reached")); 4953 } 4954 4955 EntryHistorical entryHist = newInstanceFromEntry(entry); 4956 4957 CSN latestOldCSN = entryHist.getOldestCSN(); 4958 entryHist.setPurgeDelay(getHistoricalPurgeDelay()); 4959 Attribute attr = entryHist.encodeAndPurge(); 4960 4961 if(entryHist.getLastPurgedValuesCount() > 0) 4962 { 4963 lastCSNPurgedFromHist = latestOldCSN; 4964 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 4965 count += entryHist.getLastPurgedValuesCount(); 4966 ModifyOperation newOp = new ModifyOperationBasis( 4967 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 4968 entry.getName(), mods); 4969 runAsSynchronizedOperation(newOp); 4970 4971 if (newOp.getResultCode() != ResultCode.SUCCESS) 4972 { 4973 // Log information for the repair tool. 4974 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode()); 4975 } 4976 else if (task != null) 4977 { 4978 task.setProgressStats(lastCSNPurgedFromHist, count); 4979 } 4980 } 4981 } 4982 } 4983 // If a full sweep was completed, the lastCSNPurgedFromHist must be reset so that the next 4984 // run-through starts from the beginning. Otherwise, subsequent runs of the task would only 4985 // pick up purgeable changes for the last server id. 4986 lastCSNPurgedFromHist = CSN.MIN_VALUE; 4987 } 4988}