001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2017 ForgeRock AS. 015 */ 016package org.opends.server.backends; 017 018import static com.forgerock.opendj.util.StaticUtils.containsIgnoreCase; 019import static org.forgerock.opendj.ldap.schema.CoreSchema.*; 020import static org.opends.messages.BackendMessages.*; 021import static org.opends.messages.ReplicationMessages.*; 022import static org.opends.server.config.ConfigConstants.*; 023import static org.opends.server.controls.PersistentSearchChangeType.ADD; 024import static org.opends.server.core.DirectoryServer.*; 025import static org.opends.server.replication.plugin.MultimasterReplication.*; 026import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 027import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 028import static org.opends.server.util.LDIFWriter.*; 029import static org.opends.server.util.ServerConstants.*; 030import static org.opends.server.util.StaticUtils.*; 031 032import java.text.SimpleDateFormat; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Date; 036import java.util.Iterator; 037import java.util.LinkedHashMap; 038import java.util.LinkedList; 039import java.util.List; 040import java.util.Map; 041import java.util.Set; 042import java.util.TimeZone; 043import java.util.concurrent.ConcurrentLinkedQueue; 044import java.util.concurrent.ConcurrentSkipListMap; 045import java.util.concurrent.atomic.AtomicReference; 046 047import org.forgerock.i18n.LocalizableMessage; 048import org.forgerock.i18n.slf4j.LocalizedLogger; 049import org.forgerock.opendj.config.Configuration; 050import org.forgerock.opendj.config.server.ConfigException; 051import org.forgerock.opendj.ldap.AttributeDescription; 052import org.forgerock.opendj.ldap.ByteString; 053import org.forgerock.opendj.ldap.ConditionResult; 054import org.forgerock.opendj.ldap.DN; 055import org.forgerock.opendj.ldap.ModificationType; 056import org.forgerock.opendj.ldap.RDN; 057import org.forgerock.opendj.ldap.ResultCode; 058import org.forgerock.opendj.ldap.SearchScope; 059import org.forgerock.opendj.ldap.schema.AttributeType; 060import org.forgerock.opendj.ldap.schema.CoreSchema; 061import org.forgerock.opendj.ldap.schema.ObjectClass; 062import org.opends.server.api.Backend; 063import org.opends.server.controls.EntryChangeNotificationControl; 064import org.opends.server.controls.EntryChangelogNotificationControl; 065import org.opends.server.controls.ExternalChangelogRequestControl; 066import org.opends.server.controls.PersistentSearchControl; 067import org.opends.server.core.AddOperation; 068import org.opends.server.core.DeleteOperation; 069import org.opends.server.core.DirectoryServer; 070import org.opends.server.core.ModifyDNOperation; 071import org.opends.server.core.ModifyOperation; 072import org.opends.server.core.PersistentSearch; 073import org.opends.server.core.SearchOperation; 074import org.opends.server.core.ServerContext; 075import org.opends.server.replication.common.CSN; 076import org.opends.server.replication.common.MultiDomainServerState; 077import org.opends.server.replication.common.ServerState; 078import org.opends.server.replication.protocol.AddMsg; 079import org.opends.server.replication.protocol.DeleteMsg; 080import org.opends.server.replication.protocol.LDAPUpdateMsg; 081import org.opends.server.replication.protocol.ModifyCommonMsg; 082import org.opends.server.replication.protocol.ModifyDNMsg; 083import org.opends.server.replication.protocol.UpdateMsg; 084import org.opends.server.replication.server.ReplicationServer; 085import org.opends.server.replication.server.ReplicationServerDomain; 086import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 087import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 088import org.opends.server.replication.server.changelog.api.ChangelogDB; 089import org.opends.server.replication.server.changelog.api.ChangelogException; 090import org.opends.server.replication.server.changelog.api.DBCursor; 091import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 092import org.opends.server.replication.server.changelog.api.ReplicaId; 093import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 094import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; 095import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor; 096import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; 097import org.opends.server.types.Attribute; 098import org.opends.server.types.Attributes; 099import org.opends.server.types.BackupConfig; 100import org.opends.server.types.BackupDirectory; 101import org.opends.server.types.CanceledOperationException; 102import org.opends.server.types.Control; 103import org.opends.server.types.DirectoryException; 104import org.opends.server.types.Entry; 105import org.opends.server.types.FilterType; 106import org.opends.server.types.IndexType; 107import org.opends.server.types.InitializationException; 108import org.opends.server.types.LDIFExportConfig; 109import org.opends.server.types.LDIFImportConfig; 110import org.opends.server.types.LDIFImportResult; 111import org.opends.server.types.Modification; 112import org.opends.server.types.Privilege; 113import org.opends.server.types.RawAttribute; 114import org.opends.server.types.RestoreConfig; 115import org.opends.server.types.SearchFilter; 116import org.opends.server.types.WritabilityMode; 117import org.opends.server.util.CollectionUtils; 118import org.opends.server.util.StaticUtils; 119 120/** 121 * A backend that provides access to the changelog, i.e. the "cn=changelog" 122 * suffix. It is a read-only backend that is created by a 123 * {@link ReplicationServer} and is not configurable. 124 * <p> 125 * There are two modes to search the changelog: 126 * <ul> 127 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the 128 * request. The cookie provided in the control is used to retrieve entries from 129 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with 130 * the entries.</li> 131 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided 132 * with the request. The entries are retrieved using the ChangeNumberIndexDB and 133 * their attributes are set with the information from the ReplicasDBs. The 134 * <code>changeNumber</code> attribute value is set from the content of 135 * ChangeNumberIndexDB.</li> 136 * </ul> 137 * <h3>Searches flow</h3> 138 * <p> 139 * Here is the flow of searches within the changelog backend APIs: 140 * <ul> 141 * <li>Normal searches only go through: 142 * <ol> 143 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 144 * </ol> 145 * </li> 146 * <li>Persistent searches with <code>changesOnly=false</code> go through: 147 * <ol> 148 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 149 * (once, single threaded),</li> 150 * <li> 151 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 152 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 153 * threaded)</li> 154 * </ol> 155 * </li> 156 * <li>Persistent searches with <code>changesOnly=true</code> go through: 157 * <ol> 158 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 159 * (once, single threaded)</li> 160 * <li> 161 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 162 * threaded)</li> 163 * </ol> 164 * </li> 165 * </ul> 166 * 167 * @see ReplicationServer 168 */ 169public class ChangelogBackend extends Backend<Configuration> 170{ 171 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 172 173 /** The id of this backend. */ 174 public static final String BACKEND_ID = "changelog"; 175 176 private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L; 177 178 private static final String CHANGE_NUMBER_ATTR = "changeNumber"; 179 private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender"; 180 181 /** The set of objectclasses that will be used in root entry. */ 182 private static final Map<ObjectClass, String> 183 CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2); 184 static 185 { 186 CHANGELOG_ROOT_OBJECT_CLASSES.put(CoreSchema.getTopObjectClass(), OC_TOP); 187 CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getSchema().getObjectClass("container"), "container"); 188 } 189 190 /** The set of objectclasses that will be used in ECL entries. */ 191 private static final Map<ObjectClass, String> 192 CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2); 193 static 194 { 195 CHANGELOG_ENTRY_OBJECT_CLASSES.put(CoreSchema.getTopObjectClass(), OC_TOP); 196 CHANGELOG_ENTRY_OBJECT_CLASSES.put(getSchema().getObjectClass(OC_CHANGELOG_ENTRY), OC_CHANGELOG_ENTRY); 197 } 198 199 /** The base DN for the external change log. */ 200 public static final DN CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT); 201 202 /** The set of base DNs for this backend. */ 203 private Set<DN> baseDNs; 204 /** The set of supported controls for this backend. */ 205 private final Set<String> supportedControls = 206 CollectionUtils.newHashSet(OID_ECL_COOKIE_EXCHANGE_CONTROL, OID_PERSISTENT_SEARCH); 207 /** Whether the base changelog entry has subordinates. */ 208 private Boolean baseEntryHasSubordinates; 209 210 /** The replication server on which the changelog is read. */ 211 private final ReplicationServer replicationServer; 212 private final ECLEnabledDomainPredicate domainPredicate; 213 214 /** The set of cookie-based persistent searches registered with this backend. */ 215 private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>(); 216 /** The set of change number-based persistent searches registered with this backend. */ 217 private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches = 218 new ConcurrentLinkedQueue<>(); 219 220 /** 221 * Creates a new backend with the provided replication server. 222 * 223 * @param replicationServer 224 * The replication server on which the changes are read. 225 * @param domainPredicate 226 * Returns whether a domain is enabled for the external changelog. 227 */ 228 public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate) 229 { 230 this.replicationServer = replicationServer; 231 this.domainPredicate = domainPredicate; 232 setBackendID(BACKEND_ID); 233 setWritabilityMode(WritabilityMode.DISABLED); 234 setPrivateBackend(true); 235 } 236 237 private ChangelogDB getChangelogDB() 238 { 239 return replicationServer.getChangelogDB(); 240 } 241 242 /** 243 * Returns the ChangelogBackend configured for "cn=changelog" in this directory server. 244 * 245 * @return the ChangelogBackend configured for "cn=changelog" in this directory server 246 * @deprecated instead inject the required object where needed 247 */ 248 @Deprecated 249 public static ChangelogBackend getInstance() 250 { 251 return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN); 252 } 253 254 @Override 255 public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException 256 { 257 throw new UnsupportedOperationException("The changelog backend is not configurable"); 258 } 259 260 @Override 261 public void openBackend() throws InitializationException 262 { 263 baseDNs = Collections.singleton(CHANGELOG_BASE_DN); 264 265 try 266 { 267 DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true); 268 } 269 catch (final DirectoryException e) 270 { 271 throw new InitializationException( 272 ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e); 273 } 274 } 275 276 @Override 277 public void closeBackend() 278 { 279 try 280 { 281 DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN); 282 } 283 catch (final DirectoryException e) 284 { 285 logger.traceException(e); 286 } 287 } 288 289 @Override 290 public Set<DN> getBaseDNs() 291 { 292 return baseDNs; 293 } 294 295 @Override 296 public boolean isIndexed(final AttributeType attributeType, final IndexType indexType) 297 { 298 return true; 299 } 300 301 @Override 302 public Entry getEntry(final DN entryDN) throws DirectoryException 303 { 304 if (entryDN == null) 305 { 306 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 307 ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID())); 308 } 309 throw new RuntimeException("Not implemented"); 310 } 311 312 @Override 313 public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException 314 { 315 if (CHANGELOG_BASE_DN.equals(entryDN)) 316 { 317 final Boolean hasSubs = baseChangelogHasSubordinates(); 318 if (hasSubs == null) 319 { 320 return ConditionResult.UNDEFINED; 321 } 322 return ConditionResult.valueOf(hasSubs); 323 } 324 return ConditionResult.FALSE; 325 } 326 327 private Boolean baseChangelogHasSubordinates() throws DirectoryException 328 { 329 if (baseEntryHasSubordinates == null) 330 { 331 // compute its value 332 try 333 { 334 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 335 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 336 try (final MultiDomainDBCursor cursor = 337 replicationDomainDB.getCursorFrom(new MultiDomainServerState(), options, getExcludedBaseDNs())) 338 { 339 baseEntryHasSubordinates = cursor.next(); 340 } 341 } 342 catch (ChangelogException e) 343 { 344 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get( 345 "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e))); 346 } 347 } 348 return baseEntryHasSubordinates; 349 } 350 351 @Override 352 public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException 353 { 354 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 355 } 356 357 @Override 358 public long getNumberOfChildren(final DN parentDN) throws DirectoryException 359 { 360 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 361 } 362 363 /** 364 * Notifies persistent searches of this backend that a new cookie entry was added to it. 365 * <p> 366 * Note: This method correspond to the "persistent search" phase. 367 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 368 * <p> 369 * This method must only be called after the provided data have been persisted to disk. 370 * 371 * @param baseDN 372 * the baseDN of the newly added entry. 373 * @param updateMsg 374 * the update message of the newly added entry 375 * @throws ChangelogException 376 * If a problem occurs while notifying of the newly added entry. 377 */ 378 public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException 379 { 380 if (!(updateMsg instanceof LDAPUpdateMsg)) 381 { 382 return; 383 } 384 385 try 386 { 387 for (PersistentSearch pSearch : cookieBasedPersistentSearches) 388 { 389 final SearchOperation searchOp = pSearch.getSearchOperation(); 390 final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 391 entrySender.persistentSearchSendEntry(baseDN, updateMsg); 392 } 393 } 394 catch (DirectoryException e) 395 { 396 throw new ChangelogException(e.getMessageObject(), e); 397 } 398 } 399 400 /** 401 * Notifies persistent searches of this backend that a new change number entry was added to it. 402 * <p> 403 * Note: This method correspond to the "persistent search" phase. 404 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 405 * <p> 406 * This method must only be called after the provided data have been persisted to disk. 407 * 408 * @param baseDN 409 * the baseDN of the newly added entry. 410 * @param changeNumber 411 * the change number of the newly added entry. It will be greater 412 * than zero for entries added to the change number index and less 413 * than or equal to zero for entries added to any replica DB 414 * @param cookieString 415 * a string representing the cookie of the newly added entry. 416 * This is only meaningful for entries added to the change number index 417 * @param updateMsg 418 * the update message of the newly added entry 419 * @throws ChangelogException 420 * If a problem occurs while notifying of the newly added entry. 421 */ 422 public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) 423 throws ChangelogException 424 { 425 if (!(updateMsg instanceof LDAPUpdateMsg) 426 || changeNumberBasedPersistentSearches.isEmpty()) 427 { 428 return; 429 } 430 431 try 432 { 433 // changeNumber entry can be shared with multiple persistent searches 434 final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); 435 for (PersistentSearch pSearch : changeNumberBasedPersistentSearches) 436 { 437 final SearchOperation searchOp = pSearch.getSearchOperation(); 438 final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 439 entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry); 440 } 441 } 442 catch (DirectoryException e) 443 { 444 throw new ChangelogException(e.getMessageObject(), e); 445 } 446 } 447 448 private boolean isCookieBased(final SearchOperation searchOp) 449 { 450 for (Control c : searchOp.getRequestControls()) 451 { 452 if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID())) 453 { 454 return true; 455 } 456 } 457 return false; 458 } 459 460 @Override 461 public void addEntry(Entry entry, AddOperation addOperation) 462 throws DirectoryException, CanceledOperationException 463 { 464 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 465 ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID())); 466 } 467 468 @Override 469 public void deleteEntry(DN entryDN, DeleteOperation deleteOperation) 470 throws DirectoryException, CanceledOperationException 471 { 472 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 473 ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID())); 474 } 475 476 @Override 477 public void replaceEntry(Entry oldEntry, Entry newEntry, 478 ModifyOperation modifyOperation) throws DirectoryException, 479 CanceledOperationException 480 { 481 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 482 ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID())); 483 } 484 485 @Override 486 public void renameEntry(DN currentDN, Entry entry, 487 ModifyDNOperation modifyDNOperation) throws DirectoryException, 488 CanceledOperationException 489 { 490 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 491 ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID())); 492 } 493 494 /** 495 * {@inheritDoc} 496 * <p> 497 * Runs the "initial search" phase (as opposed to a "persistent search" 498 * phase). The "initial search" phase is the only search run by normal 499 * searches, but it is also run by persistent searches with 500 * <code>changesOnly=false</code>. Persistent searches with 501 * <code>changesOnly=true</code> never execute this code. 502 * <p> 503 * Note: this method is executed only once per persistent search, single 504 * threaded. 505 */ 506 @Override 507 public void search(final SearchOperation searchOperation) throws DirectoryException 508 { 509 checkChangelogReadPrivilege(searchOperation); 510 511 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 512 final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs); 513 514 final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter()); 515 try 516 { 517 final boolean isPersistentSearch = isPersistentSearch(searchOperation); 518 if (cookie != null) 519 { 520 initialSearchFromCookie( 521 getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch)); 522 } 523 else 524 { 525 initialSearchFromChangeNumber( 526 getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch)); 527 } 528 } 529 catch (ChangelogException e) 530 { 531 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get( 532 searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e))); 533 } 534 } 535 536 private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs) 537 throws DirectoryException 538 { 539 final ExternalChangelogRequestControl eclRequestControl = 540 searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); 541 if (eclRequestControl != null) 542 { 543 final MultiDomainServerState cookie = eclRequestControl.getCookie(); 544 validateProvidedCookie(cookie, excludedBaseDNs); 545 return cookie; 546 } 547 return null; 548 } 549 550 @Override 551 public Set<String> getSupportedControls() 552 { 553 return supportedControls; 554 } 555 556 @Override 557 public Set<String> getSupportedFeatures() 558 { 559 return Collections.emptySet(); 560 } 561 562 @Override 563 public boolean supports(BackendOperation backendOperation) 564 { 565 return false; 566 } 567 568 @Override 569 public void exportLDIF(final LDIFExportConfig exportConfig) 570 throws DirectoryException 571 { 572 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 573 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 574 } 575 576 @Override 577 public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) 578 throws DirectoryException 579 { 580 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 581 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 582 } 583 584 @Override 585 public void createBackup(BackupConfig backupConfig) throws DirectoryException 586 { 587 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 588 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 589 } 590 591 @Override 592 public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException 593 { 594 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 595 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 596 } 597 598 @Override 599 public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException 600 { 601 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 602 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 603 } 604 605 @Override 606 public long getEntryCount() 607 { 608 try 609 { 610 return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1; 611 } 612 catch (DirectoryException e) 613 { 614 logger.traceException(e); 615 return -1; 616 } 617 } 618 619 /** 620 * Represent the change number range targeted by a search operation. 621 * <p> 622 * This class should be visible for tests. 623 */ 624 static final class ChangeNumberRange 625 { 626 private long lowerBound = -1; 627 private long upperBound = -1; 628 629 /** 630 * Returns the lowest change number to retrieve (inclusive). 631 * 632 * @return the lowest change number 633 */ 634 long getLowerBound() 635 { 636 return lowerBound; 637 } 638 639 /** 640 * Returns the highest change number to retrieve (inclusive). 641 * 642 * @return the highest change number 643 */ 644 long getUpperBound() 645 { 646 return upperBound; 647 } 648 } 649 650 /** 651 * Returns the set of DNs to exclude from the search. 652 * 653 * @return the DNs corresponding to domains to exclude from the search. 654 * @throws DirectoryException 655 * If a DN can't be decoded. 656 */ 657 private static Set<DN> getExcludedBaseDNs() throws DirectoryException 658 { 659 return getExcludedChangelogDomains(); 660 } 661 662 /** 663 * Optimize the search parameters by analyzing the DN and filter. 664 * It also performs validation on some search parameters 665 * for both cookie and change number based changelogs. 666 * 667 * @param baseDN the provided search baseDN. 668 * @param userFilter the provided search filter. 669 * @return the optimized change number range 670 * @throws DirectoryException when an exception occurs. 671 */ 672 ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException 673 { 674 SearchFilter equalityFilter = null; 675 switch (baseDN.size()) 676 { 677 case 1: 678 // "cn=changelog" : use user-provided search filter. 679 break; 680 case 2: 681 // It is probably "changeNumber=xxx,cn=changelog", use equality filter 682 // But it also could be "<service-id>,cn=changelog" so need to check on attribute 683 equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR); 684 break; 685 default: 686 // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter 687 equalityFilter = buildSearchFilterFrom(baseDN, "replicationCSN"); 688 break; 689 } 690 691 return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter); 692 } 693 694 /** 695 * Build a search filter from given DN and attribute. 696 * 697 * @return the search filter or {@code null} if attribute is not present in 698 * the provided DN 699 */ 700 private SearchFilter buildSearchFilterFrom(final DN baseDN, final String attrName) 701 { 702 final RDN rdn = baseDN.rdn(); 703 AttributeType attrType = DirectoryServer.getSchema().getAttributeType(attrName); 704 final ByteString attrValue = rdn.getAttributeValue(attrType); 705 if (attrValue != null) 706 { 707 return SearchFilter.createEqualityFilter(attrType, attrValue); 708 } 709 return null; 710 } 711 712 private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException 713 { 714 final ChangeNumberRange range = new ChangeNumberRange(); 715 if (filter == null) 716 { 717 return range; 718 } 719 720 if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR)) 721 { 722 range.lowerBound = decodeChangeNumber(filter.getAssertionValue()); 723 } 724 else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR)) 725 { 726 range.upperBound = decodeChangeNumber(filter.getAssertionValue()); 727 } 728 else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR)) 729 { 730 final long number = decodeChangeNumber(filter.getAssertionValue()); 731 range.lowerBound = number; 732 range.upperBound = number; 733 } 734 else if (matches(filter, FilterType.EQUALITY, "replicationcsn")) 735 { 736 // == exact CSN 737 // validate provided CSN is correct 738 new CSN(filter.getAssertionValue().toString()); 739 } 740 else if (filter.getFilterType() == FilterType.AND) 741 { 742 // TODO: it looks like it could be generalized to N components, not only two 743 final Collection<SearchFilter> components = filter.getFilterComponents(); 744 final SearchFilter filters[] = components.toArray(new SearchFilter[0]); 745 long upper1 = -1; 746 long lower1 = -1; 747 long upper2 = -1; 748 long lower2 = -1; 749 if (filters.length > 0) 750 { 751 ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]); 752 upper1 = range1.upperBound; 753 lower1 = range1.lowerBound; 754 } 755 if (filters.length > 1) 756 { 757 ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]); 758 upper2 = range2.upperBound; 759 lower2 = range2.lowerBound; 760 } 761 if (upper1 == -1) 762 { 763 range.upperBound = upper2; 764 } 765 else if (upper2 == -1) 766 { 767 range.upperBound = upper1; 768 } 769 else 770 { 771 range.upperBound = Math.min(upper1, upper2); 772 } 773 774 range.lowerBound = Math.max(lower1, lower2); 775 } 776 return range; 777 } 778 779 private static long decodeChangeNumber(final ByteString assertionValue) 780 throws DirectoryException 781 { 782 try 783 { 784 return Long.decode(assertionValue.toString()); 785 } 786 catch (NumberFormatException e) 787 { 788 throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX, 789 LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue)); 790 } 791 } 792 793 private boolean matches(SearchFilter filter, FilterType filterType, String primaryName) 794 { 795 return filter.getFilterType() == filterType 796 && filter.getAttributeType() != null 797 && filter.getAttributeType().getNameOrOID().equalsIgnoreCase(primaryName); 798 } 799 800 /** Search the changelog when a cookie control is provided. */ 801 private void initialSearchFromCookie(final CookieEntrySender entrySender) 802 throws DirectoryException, ChangelogException 803 { 804 if (!sendBaseChangelogEntry(entrySender.searchOp)) 805 { // only return the base entry: stop here 806 return; 807 } 808 809 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 810 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); 811 try (final MultiDomainDBCursor cursor = 812 replicationDomainDB.getCursorFrom(entrySender.cookie, options, entrySender.excludedBaseDNs); 813 ECLMultiDomainDBCursor replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) 814 { 815 if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor)) 816 { 817 entrySender.transitioningToPersistentSearchPhase(); 818 sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor); 819 } 820 } 821 finally 822 { 823 entrySender.finalizeInitialSearch(); 824 } 825 } 826 827 private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation, 828 MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch) 829 { 830 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 831 { 832 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 833 } 834 return new CookieEntrySender(searchOperation, null, startPhase, cookie, excludedBaseDNs); 835 } 836 837 private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender, 838 final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException 839 { 840 boolean continueSearch = true; 841 while (continueSearch && replicaUpdatesCursor.next()) 842 { 843 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 844 final DN domainBaseDN = replicaUpdatesCursor.getData(); 845 continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN); 846 } 847 return continueSearch; 848 } 849 850 private boolean isPersistentSearch(SearchOperation op) 851 { 852 for (PersistentSearch pSearch : getPersistentSearches()) 853 { 854 if (op == pSearch.getSearchOperation()) 855 { 856 return true; 857 } 858 } 859 return false; 860 } 861 862 @Override 863 public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException 864 { 865 initializePersistentSearch(pSearch); 866 867 if (isCookieBased(pSearch.getSearchOperation())) 868 { 869 cookieBasedPersistentSearches.add(pSearch); 870 } 871 else 872 { 873 changeNumberBasedPersistentSearches.add(pSearch); 874 } 875 super.registerPersistentSearch(pSearch); 876 } 877 878 private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException 879 { 880 final SearchOperation searchOp = pSearch.getSearchOperation(); 881 882 // Validation must be done during registration for changes only persistent searches. 883 // Otherwise, when there is an initial search phase, 884 // validation is performed by the search() method. 885 if (pSearch.isChangesOnly()) 886 { 887 checkChangelogReadPrivilege(searchOp); 888 } 889 final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter()); 890 891 final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL; 892 PersistentSearchControl psearchControl = searchOp.getRequestControl(PersistentSearchControl.DECODER); 893 if (isCookieBased(searchOp)) 894 { 895 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 896 final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs); 897 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 898 new CookieEntrySender(searchOp, psearchControl, startPhase, cookie, excludedBaseDNs)); 899 } 900 else 901 { 902 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 903 new ChangeNumberEntrySender(searchOp, psearchControl, startPhase, range)); 904 } 905 } 906 907 private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs) 908 throws DirectoryException 909 { 910 if (isChangesOnly) 911 { 912 // this changesOnly persistent search will not go through #initialSearch() 913 // so we must initialize the cookie here 914 return getNewestCookie(searchOp); 915 } 916 return getCookieFromControl(searchOp, excludedBaseDNs); 917 } 918 919 private MultiDomainServerState getNewestCookie(SearchOperation searchOp) 920 { 921 if (!isCookieBased(searchOp)) 922 { 923 return null; 924 } 925 926 final MultiDomainServerState cookie = new MultiDomainServerState(); 927 for (final Iterator<ReplicationServerDomain> it = 928 replicationServer.getDomainIterator(); it.hasNext();) 929 { 930 final DN baseDN = it.next().getBaseDN(); 931 final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN); 932 cookie.update(baseDN, state); 933 } 934 return cookie; 935 } 936 937 /** 938 * Validates the cookie contained in search parameters by checking its content 939 * with the actual replication server state. 940 * 941 * @throws DirectoryException 942 * If the state is not valid 943 */ 944 private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs) 945 throws DirectoryException 946 { 947 if (cookie != null && !cookie.isEmpty()) 948 { 949 replicationServer.validateCookie(cookie, excludedBaseDNs); 950 } 951 } 952 953 /** Search the changelog using change number(s). */ 954 private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender) 955 throws ChangelogException, DirectoryException 956 { 957 if (!sendBaseChangelogEntry(entrySender.searchOp)) 958 { // only return the base entry: stop here 959 return; 960 } 961 962 final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>(); 963 try (DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber)) 964 { 965 final MultiDomainServerState cookie = new MultiDomainServerState(); 966 967 if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie)) 968 { 969 entrySender.transitioningToPersistentSearchPhase(); 970 sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie); 971 } 972 } 973 finally 974 { 975 entrySender.finalizeInitialSearch(); 976 StaticUtils.close(replicaUpdatesCursor.get()); 977 } 978 } 979 980 private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase, 981 final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch) 982 { 983 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 984 { 985 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 986 } 987 return new ChangeNumberEntrySender(searchOperation, null, SearchPhase.INITIAL, range); 988 } 989 990 private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, 991 DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, 992 MultiDomainServerState cookie) throws ChangelogException, DirectoryException 993 { 994 boolean continueSearch = true; 995 while (continueSearch && cnIndexDBCursor.next()) 996 { 997 // Handle the current cnIndex record 998 final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); 999 if (replicaUpdatesCursor.get() == null) 1000 { 1001 replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); 1002 if (isChangelogCookieAttributeRequired(entrySender.searchOp)) 1003 { 1004 initializeCookieForChangeNumberMode(cookie, cnIndexRecord); 1005 } // otherwise, the cookie is not required and/or will be filtered before sending the entry 1006 } 1007 else 1008 { 1009 cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1010 } 1011 continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); 1012 if (continueSearch) 1013 { 1014 final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN()); 1015 if (updateMsg != null) 1016 { 1017 continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); 1018 replicaUpdatesCursor.get().next(); 1019 } 1020 } 1021 } 1022 return continueSearch; 1023 } 1024 1025 private boolean isChangelogCookieAttributeRequired(SearchOperation searchOp) 1026 { 1027 final Set<String> requiredAttributes = searchOp.getAttributes(); 1028 return containsIgnoreCase(requiredAttributes, "changelogcookie") 1029 || containsIgnoreCase(requiredAttributes, ALL_OPERATIONAL_ATTRIBUTES) 1030 || queriesChangelogCookieAttribute(searchOp.getFilter()); 1031 } 1032 1033 /** Initialize the provided cookie from the provided change number index record. */ 1034 private void initializeCookieForChangeNumberMode( 1035 MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1036 { 1037 // TODO optimize positioning when the search filter is an equality filter on the changelogCookie? 1038 MultiDomainServerState startCookie = new MultiDomainServerState(); 1039 startCookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1040 1041 CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 1042 MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(startCookie, options); 1043 try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) 1044 { 1045 updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord); 1046 } 1047 } 1048 1049 private boolean queriesChangelogCookieAttribute(SearchFilter filter) 1050 { 1051 switch (filter.getFilterType()) 1052 { 1053 case AND: 1054 case OR: 1055 for (SearchFilter f : filter.getFilterComponents()) 1056 { 1057 if (queriesChangelogCookieAttribute(f)) 1058 { 1059 return true; 1060 } 1061 } 1062 return false; 1063 1064 case NOT: 1065 return queriesChangelogCookieAttribute(filter.getNotComponent()); 1066 1067 case PRESENT: 1068 case EQUALITY: 1069 case APPROXIMATE_MATCH: 1070 case SUBSTRING: 1071 case GREATER_OR_EQUAL: 1072 case LESS_OR_EQUAL: 1073 case EXTENSIBLE_MATCH: 1074 return filter.getAttributeType().hasName("changelogCookie"); 1075 1076 default: 1077 return false; 1078 } 1079 } 1080 1081 /** 1082 * Rebuilds the changelogcookie starting at the newest change number index record. 1083 * <p> 1084 * It updates the provided cookie with the changes from the provided ECL cursor, 1085 * up to (and including) the provided change number index record. 1086 * <p> 1087 * Therefore, after calling this method, the cursor is positioned 1088 * to the change immediately following the provided change number index record. 1089 * 1090 * @param cookie the cookie to update 1091 * @param cursor the cursor where to read changes from 1092 * @param cnIndexRecord the change number index record to go right after 1093 * @throws ChangelogException if any problem occurs 1094 */ 1095 public static void updateCookieToMediumConsistencyPoint( 1096 MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord) 1097 throws ChangelogException 1098 { 1099 if (cnIndexRecord == null) 1100 { 1101 return; 1102 } 1103 1104 while (cursor.next()) 1105 { 1106 UpdateMsg updateMsg = cursor.getRecord(); 1107 if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0) 1108 { 1109 break; 1110 } 1111 cookie.update(cursor.getData(), updateMsg.getCSN()); 1112 } 1113 } 1114 1115 private MultiDomainDBCursor initializeReplicaUpdatesCursor( 1116 final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1117 { 1118 final MultiDomainServerState state = new MultiDomainServerState(); 1119 state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1120 1121 // No need for ECLMultiDomainDBCursor in this case 1122 // as updateMsg will be matched with cnIndexRecord 1123 CursorOptions options = 1124 new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN()); 1125 final MultiDomainDBCursor replicaUpdatesCursor = 1126 getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options); 1127 replicaUpdatesCursor.next(); 1128 return replicaUpdatesCursor; 1129 } 1130 1131 /** 1132 * Returns the replica update message corresponding to the provided 1133 * cnIndexRecord. 1134 * 1135 * @return the update message, which may be {@code null} if the update message 1136 * could not be found because it was purged or because corresponding 1137 * baseDN was removed from the changelog 1138 * @throws DirectoryException 1139 * If inconsistency is detected between the available update 1140 * messages and the provided cnIndexRecord 1141 */ 1142 private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn) 1143 throws ChangelogException, DirectoryException 1144 { 1145 while (true) 1146 { 1147 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 1148 final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN()); 1149 if (compareIndexWithUpdateMsg < 0) { 1150 // Either update message has been purged or baseDN has been removed from changelogDB, 1151 // ignore current index record and go to the next one 1152 return null; 1153 } 1154 else if (compareIndexWithUpdateMsg == 0) 1155 { 1156 // Found the matching update message 1157 return updateMsg; 1158 } 1159 // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet 1160 if (!replicaUpdatesCursor.next()) 1161 { 1162 // Should never happen, as it means some messages have disappeared 1163 // TODO : put the correct I18N message 1164 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1165 LocalizableMessage.raw("Could not find replica update message matching index record. " + 1166 "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist.")); 1167 } 1168 } 1169 } 1170 1171 /** Returns a cursor on CNIndexDB for the provided first change number. */ 1172 private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor( 1173 final long firstChangeNumber) throws ChangelogException 1174 { 1175 final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB(); 1176 long changeNumberToUse = firstChangeNumber; 1177 if (changeNumberToUse <= 1) 1178 { 1179 final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); 1180 changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber(); 1181 } 1182 return cnIndexDB.getCursorFrom(changeNumberToUse); 1183 } 1184 1185 /** Creates a changelog entry. */ 1186 private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, 1187 final UpdateMsg msg) throws DirectoryException 1188 { 1189 if (msg instanceof AddMsg) 1190 { 1191 return createAddMsg(baseDN, changeNumber, cookie, msg); 1192 } 1193 else if (msg instanceof ModifyCommonMsg) 1194 { 1195 return createModifyMsg(baseDN, changeNumber, cookie, msg); 1196 } 1197 else if (msg instanceof DeleteMsg) 1198 { 1199 final DeleteMsg delMsg = (DeleteMsg) msg; 1200 return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName()); 1201 } 1202 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1203 LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN, 1204 msg.getClass())); 1205 } 1206 1207 /** 1208 * Creates an entry from an add message. 1209 * <p> 1210 * Map addMsg to an LDIF string for the 'changes' attribute, and pull out 1211 * change initiators name if available which is contained in the creatorsName 1212 * attribute. 1213 */ 1214 private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) 1215 throws DirectoryException 1216 { 1217 final AddMsg addMsg = (AddMsg) msg; 1218 String changeInitiatorsName = null; 1219 String ldifChanges = null; 1220 try 1221 { 1222 final StringBuilder builder = new StringBuilder(256); 1223 for (Attribute attr : addMsg.getAttributes()) 1224 { 1225 if (!attr.isEmpty() 1226 && attr.getAttributeDescription().getAttributeType().equals(getCreatorsNameAttributeType())) 1227 { 1228 // This attribute is not multi-valued. 1229 changeInitiatorsName = attr.iterator().next().toString(); 1230 } 1231 for (ByteString value : attr) 1232 { 1233 builder.append(attr.getAttributeDescription()); 1234 appendLDIFSeparatorAndValue(builder, value); 1235 builder.append('\n'); 1236 } 1237 } 1238 ldifChanges = builder.toString(); 1239 } 1240 catch (Exception e) 1241 { 1242 logEncodingMessageError("add", addMsg.getDN(), e); 1243 } 1244 1245 return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName); 1246 } 1247 1248 /** 1249 * Creates an entry from a modify message. 1250 * <p> 1251 * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull 1252 * out change initiators name if available which is contained in the 1253 * modifiersName attribute. 1254 */ 1255 private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, 1256 final UpdateMsg msg) throws DirectoryException 1257 { 1258 final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg; 1259 String changeInitiatorsName = null; 1260 String ldifChanges = null; 1261 try 1262 { 1263 final StringBuilder builder = new StringBuilder(128); 1264 for (Modification mod : modifyMsg.getMods()) 1265 { 1266 final Attribute attr = mod.getAttribute(); 1267 if (mod.getModificationType() == ModificationType.REPLACE 1268 && !attr.isEmpty() 1269 && attr.getAttributeDescription().getAttributeType().equals(getModifiersNameAttributeType())) 1270 { 1271 // This attribute is not multi-valued. 1272 changeInitiatorsName = attr.iterator().next().toString(); 1273 } 1274 final AttributeDescription attrDesc = attr.getAttributeDescription(); 1275 builder.append(mod.getModificationType()); 1276 builder.append(": "); 1277 builder.append(attrDesc); 1278 builder.append('\n'); 1279 1280 for (ByteString value : attr) 1281 { 1282 builder.append(attrDesc); 1283 appendLDIFSeparatorAndValue(builder, value); 1284 builder.append('\n'); 1285 } 1286 builder.append("-\n"); 1287 } 1288 ldifChanges = builder.toString(); 1289 } 1290 catch (Exception e) 1291 { 1292 logEncodingMessageError("modify", modifyMsg.getDN(), e); 1293 } 1294 1295 final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg; 1296 final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges, 1297 isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName); 1298 1299 if (isModifyDNMsg) 1300 { 1301 final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg; 1302 addAttribute(entry, "newrdn", modDNMsg.getNewRDN()); 1303 if (modDNMsg.getNewSuperior() != null) 1304 { 1305 addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior()); 1306 } 1307 addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); 1308 } 1309 return entry; 1310 } 1311 1312 /** 1313 * Log an encoding message error. 1314 * 1315 * @param messageType 1316 * String identifying type of message. Should be "add" or "modify". 1317 * @param entryDN 1318 * DN of original entry 1319 */ 1320 private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception) 1321 { 1322 logger.traceException(exception); 1323 logger.error(LocalizableMessage.raw( 1324 "An exception was encountered while trying to encode a replication " + messageType + " message for entry \"" 1325 + entryDN + "\" into an External Change Log entry: " + exception.getMessage())); 1326 } 1327 1328 private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException 1329 { 1330 if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp)) 1331 { 1332 throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS, 1333 NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get()); 1334 } 1335 } 1336 1337 /** 1338 * Create a changelog entry from a set of provided information. This is the part of 1339 * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN). 1340 */ 1341 private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie, 1342 final LDAPUpdateMsg msg, final String ldifChanges, final String changeType, 1343 final String changeInitiatorsName) throws DirectoryException 1344 { 1345 final CSN csn = msg.getCSN(); 1346 String dnString; 1347 if (changeNumber > 0) 1348 { 1349 // change number mode 1350 dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1351 } 1352 else 1353 { 1354 // Cookie mode 1355 dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1356 } 1357 1358 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1359 final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>(); 1360 1361 // Operational standard attributes 1362 addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs); 1363 addAttributeByType("numSubordinates", "0", userAttrs, opAttrs); 1364 addAttributeByType("hasSubordinates", "false", userAttrs, opAttrs); 1365 addAttributeByType("entryDN", dnString, userAttrs, opAttrs); 1366 1367 // REQUIRED attributes 1368 if (changeNumber > 0) 1369 { 1370 addAttributeByType("changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs); 1371 } 1372 SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME); 1373 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ?? 1374 final String format = dateFormat.format(new Date(csn.getTime())); 1375 addAttributeByType("changeTime", format, userAttrs, opAttrs); 1376 addAttributeByType("changeType", changeType, userAttrs, opAttrs); 1377 addAttributeByType("targetDN", msg.getDN().toString(), userAttrs, opAttrs); 1378 1379 // NON REQUESTED attributes 1380 addAttributeByType("replicationCSN", csn.toString(), userAttrs, opAttrs); 1381 addAttributeByType("replicaIdentifier", Integer.toString(csn.getServerId()), userAttrs, opAttrs); 1382 1383 if (ldifChanges != null) 1384 { 1385 addAttributeByType("changes", ldifChanges, userAttrs, opAttrs); 1386 } 1387 if (changeInitiatorsName != null) 1388 { 1389 addAttributeByType("changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs); 1390 } 1391 1392 final String targetUUID = msg.getEntryUUID(); 1393 if (targetUUID != null) 1394 { 1395 addAttributeByType("targetEntryUUID", targetUUID, userAttrs, opAttrs); 1396 } 1397 final String cookie2 = cookie != null ? cookie : ""; 1398 addAttributeByType("changeLogCookie", cookie2, userAttrs, opAttrs); 1399 1400 final List<RawAttribute> includedAttributes = msg.getEclIncludes(); 1401 if (includedAttributes != null && !includedAttributes.isEmpty()) 1402 { 1403 final StringBuilder builder = new StringBuilder(256); 1404 for (final RawAttribute includedAttribute : includedAttributes) 1405 { 1406 final String name = includedAttribute.getAttributeType(); 1407 for (final ByteString value : includedAttribute.getValues()) 1408 { 1409 builder.append(name); 1410 appendLDIFSeparatorAndValue(builder, value); 1411 builder.append('\n'); 1412 } 1413 } 1414 final String includedAttributesLDIF = builder.toString(); 1415 addAttributeByType("includedAttributes", includedAttributesLDIF, userAttrs, opAttrs); 1416 } 1417 1418 return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs); 1419 } 1420 1421 /** 1422 * Sends the entry if it matches the base, scope and filter of the current search operation. 1423 * It will also send the base changelog entry if it needs to be sent and was not sent before. 1424 * 1425 * @return {@code true} if search should continue, {@code false} otherwise 1426 */ 1427 private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, boolean returnECs, 1428 String cookie, long changeNumber) 1429 throws DirectoryException 1430 { 1431 if (matchBaseAndScopeAndFilter(searchOp, entry)) 1432 { 1433 return searchOp.returnEntry(entry, getControls(returnECs, cookie, changeNumber)); 1434 } 1435 // maybe the next entry will match? 1436 return true; 1437 } 1438 1439 /** Indicates if the provided entry matches the filter, base and scope. */ 1440 private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException 1441 { 1442 return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) 1443 && searchOp.getFilter().matchesEntry(entry); 1444 } 1445 1446 private static List<Control> getControls(boolean returnECs, String cookie, long changeNumber) 1447 { 1448 final List<Control> controls = new LinkedList<>(); 1449 if (cookie != null) 1450 { 1451 controls.add(new EntryChangelogNotificationControl(true, cookie)); 1452 } 1453 if (returnECs) 1454 { 1455 controls.add(new EntryChangeNotificationControl(ADD, changeNumber)); 1456 } 1457 return controls; 1458 } 1459 1460 /** 1461 * Create and returns the base changelog entry to the underlying search operation. 1462 * <p> 1463 * "initial search" phase must return the base entry immediately. 1464 * 1465 * @return {@code true} if search should continue, {@code false} otherwise 1466 */ 1467 private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException 1468 { 1469 final DN baseDN = searchOp.getBaseDN(); 1470 final SearchFilter filter = searchOp.getFilter(); 1471 final SearchScope scope = searchOp.getScope(); 1472 1473 if (ChangelogBackend.CHANGELOG_BASE_DN.isInScopeOf(baseDN, scope)) 1474 { 1475 final Entry entry = buildBaseChangelogEntry(); 1476 if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null)) 1477 { 1478 // Abandon, size limit reached. 1479 return false; 1480 } 1481 } 1482 return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN) 1483 || !scope.equals(SearchScope.BASE_OBJECT); 1484 } 1485 1486 private Entry buildBaseChangelogEntry() throws DirectoryException 1487 { 1488 final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates()); 1489 1490 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1491 final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>(); 1492 1493 // We never return the numSubordinates attribute for the base changelog entry 1494 // and there is a very good reason for that: 1495 // - Either we compute it before sending the entries, 1496 // -- then we risk returning more entries if new entries come in after we computed numSubordinates 1497 // -- or we risk returning less entries if purge kicks in after we computed numSubordinates 1498 // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError 1499 1500 addAttributeByName(ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs); 1501 addAttributeByName(ATTR_SUBSCHEMA_SUBENTRY, DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); 1502 addAttributeByName("hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs); 1503 addAttributeByName("entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs); 1504 return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); 1505 } 1506 1507 private static void addAttribute(final Entry e, final String attrType, final String attrValue) 1508 { 1509 e.addAttribute(Attributes.create(attrType, attrValue), null); 1510 } 1511 1512 private static void addAttributeByType(String attrName, String attrValue, 1513 Map<AttributeType, List<Attribute>> userAttrs, 1514 Map<AttributeType, List<Attribute>> operationalAttrs) 1515 { 1516 addAttribute(attrName, attrValue, userAttrs, operationalAttrs, true); 1517 } 1518 1519 private static void addAttributeByName(String attrName, String attrValue, 1520 Map<AttributeType, List<Attribute>> userAttrs, 1521 Map<AttributeType, List<Attribute>> operationalAttrs) 1522 { 1523 addAttribute(attrName, attrValue, userAttrs, operationalAttrs, false); 1524 } 1525 1526 private static void addAttribute(final String attrName, final String attrValue, 1527 final Map<AttributeType, List<Attribute>> userAttrs, 1528 final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType) 1529 { 1530 final Attribute a = addByType 1531 ? Attributes.create(getSchema().getAttributeType(attrName), attrValue) 1532 : Attributes.create(attrName, attrValue); 1533 final AttributeType attrType = a.getAttributeDescription().getAttributeType(); 1534 final List<Attribute> attrList = Collections.singletonList(a); 1535 if (attrType.isOperational()) 1536 { 1537 operationalAttrs.put(attrType, attrList); 1538 } 1539 else 1540 { 1541 userAttrs.put(attrType, attrList); 1542 } 1543 } 1544 1545 /** Describes the current search phase. */ 1546 private enum SearchPhase 1547 { 1548 /** 1549 * "Initial search" phase. The "initial search" phase is running 1550 * concurrently. All update notifications are ignored. 1551 */ 1552 INITIAL, 1553 /** 1554 * Transitioning from the "initial search" phase to the "persistent search" 1555 * phase. "Initial search" phase has finished reading from the DB. It now 1556 * verifies if any more updates have been persisted to the DB since stopping 1557 * and send them. All update notifications are blocked. 1558 */ 1559 TRANSITIONING, 1560 /** 1561 * "Persistent search" phase. "Initial search" phase has completed. All 1562 * update notifications are published. 1563 */ 1564 PERSISTENT; 1565 } 1566 1567 /** 1568 * Contains data to ensure that the same change is not sent twice to clients 1569 * because of race conditions between the "initial search" phase and the 1570 * "persistent search" phase. 1571 */ 1572 private static class SendEntryData<K extends Comparable<K>> 1573 { 1574 private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL); 1575 private final Object transitioningLock = new Object(); 1576 private volatile K lastKeySentByInitialSearch; 1577 1578 private SendEntryData(SearchPhase startPhase) 1579 { 1580 searchPhase.set(startPhase); 1581 } 1582 1583 private void finalizeInitialSearch() 1584 { 1585 searchPhase.set(SearchPhase.PERSISTENT); 1586 synchronized (transitioningLock) 1587 { // initial search phase has completed, release all persistent searches 1588 transitioningLock.notifyAll(); 1589 } 1590 } 1591 1592 public void transitioningToPersistentSearchPhase() 1593 { 1594 searchPhase.set(SearchPhase.TRANSITIONING); 1595 } 1596 1597 private void initialSearchSendsEntry(final K key) 1598 { 1599 lastKeySentByInitialSearch = key; 1600 } 1601 1602 private boolean persistentSearchCanSendEntry(K key) 1603 { 1604 final SearchPhase stateValue = searchPhase.get(); 1605 switch (stateValue) 1606 { 1607 case INITIAL: 1608 return false; 1609 case TRANSITIONING: 1610 synchronized (transitioningLock) 1611 { 1612 while (SearchPhase.TRANSITIONING.equals(searchPhase.get())) 1613 { 1614 // "initial search" phase is over, and is now verifying whether new 1615 // changes have been published to the DB. 1616 // Wait for this check to complete 1617 try 1618 { 1619 transitioningLock.wait(); 1620 } 1621 catch (InterruptedException e) 1622 { 1623 Thread.currentThread().interrupt(); 1624 // Shutdown must have been called. Stop sending entries. 1625 return false; 1626 } 1627 } 1628 } 1629 return key.compareTo(lastKeySentByInitialSearch) > 0; 1630 case PERSISTENT: 1631 return true; 1632 default: 1633 throw new RuntimeException("Not implemented for " + stateValue); 1634 } 1635 } 1636 } 1637 1638 /** Sends entries to clients for change number searches. */ 1639 private static class ChangeNumberEntrySender 1640 { 1641 private final SearchOperation searchOp; 1642 private final boolean containsAdd; 1643 private final boolean returnECs; 1644 private final long lowestChangeNumber; 1645 private final long highestChangeNumber; 1646 private final SendEntryData<Long> sendEntryData; 1647 1648 private ChangeNumberEntrySender(SearchOperation searchOp, PersistentSearchControl psearchControl, 1649 SearchPhase startPhase, ChangeNumberRange range) 1650 { 1651 this.searchOp = searchOp; 1652 this.containsAdd = psearchControl != null ? psearchControl.getChangeTypes().contains(ADD) : false; 1653 this.returnECs = psearchControl != null ? psearchControl.getReturnECs() : false; 1654 this.sendEntryData = new SendEntryData<>(startPhase); 1655 this.lowestChangeNumber = range.lowerBound; 1656 this.highestChangeNumber = range.upperBound; 1657 } 1658 1659 /** 1660 * Indicates if provided change number is compatible with last change 1661 * number. 1662 * 1663 * @param changeNumber 1664 * The change number to test. 1665 * @return {@code true} if and only if the provided change number is in the 1666 * range of the last change number. 1667 */ 1668 boolean changeNumberIsInRange(long changeNumber) 1669 { 1670 return highestChangeNumber == -1 || changeNumber <= highestChangeNumber; 1671 } 1672 1673 private void finalizeInitialSearch() 1674 { 1675 sendEntryData.finalizeInitialSearch(); 1676 } 1677 1678 private void transitioningToPersistentSearchPhase() 1679 { 1680 sendEntryData.transitioningToPersistentSearchPhase(); 1681 } 1682 1683 /** 1684 * @return {@code true} if search should continue, {@code false} otherwise 1685 */ 1686 private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg, 1687 MultiDomainServerState cookie) throws DirectoryException 1688 { 1689 final DN baseDN = cnIndexRecord.getBaseDN(); 1690 sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); 1691 final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); 1692 return sendEntryIfMatches(searchOp, entry, false, null, cnIndexRecord.getChangeNumber()); 1693 } 1694 1695 private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException 1696 { 1697 if (containsAdd && sendEntryData.persistentSearchCanSendEntry(changeNumber)) 1698 { 1699 sendEntryIfMatches(searchOp, entry, returnECs, null, changeNumber); 1700 } 1701 } 1702 } 1703 1704 /** Sends entries to clients for cookie-based searches. */ 1705 private static class CookieEntrySender { 1706 private final SearchOperation searchOp; 1707 private final boolean containsAdd; 1708 private final boolean returnECs; 1709 private final SearchPhase startPhase; 1710 private final Set<DN> excludedBaseDNs; 1711 private final MultiDomainServerState cookie; 1712 private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData = 1713 new ConcurrentSkipListMap<>(); 1714 1715 private CookieEntrySender(SearchOperation searchOp, PersistentSearchControl psearchControl, 1716 SearchPhase startPhase, MultiDomainServerState cookie, Set<DN> excludedBaseDNs) 1717 { 1718 this.searchOp = searchOp; 1719 this.containsAdd = psearchControl != null ? psearchControl.getChangeTypes().contains(ADD) : false; 1720 this.returnECs = psearchControl != null ? psearchControl.getReturnECs() : false; 1721 this.startPhase = startPhase; 1722 this.cookie = cookie; 1723 this.excludedBaseDNs = excludedBaseDNs; 1724 } 1725 1726 private void finalizeInitialSearch() 1727 { 1728 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1729 { 1730 sendEntryData.finalizeInitialSearch(); 1731 } 1732 } 1733 1734 private void transitioningToPersistentSearchPhase() 1735 { 1736 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1737 { 1738 sendEntryData.transitioningToPersistentSearchPhase(); 1739 } 1740 } 1741 1742 private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn) 1743 { 1744 final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId()); 1745 SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); 1746 if (data == null) 1747 { 1748 final SendEntryData<CSN> newData = new SendEntryData<>(startPhase); 1749 data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData); 1750 return data == null ? newData : data; 1751 } 1752 return data; 1753 } 1754 1755 private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException 1756 { 1757 final CSN csn = updateMsg.getCSN(); 1758 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1759 sendEntryData.initialSearchSendsEntry(csn); 1760 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1761 final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1762 return sendEntryIfMatches(searchOp, entry, false, cookieString, -1); 1763 } 1764 1765 private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg) 1766 throws DirectoryException 1767 { 1768 if (!containsAdd) 1769 { 1770 return; 1771 } 1772 final CSN csn = updateMsg.getCSN(); 1773 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1774 if (sendEntryData.persistentSearchCanSendEntry(csn)) 1775 { 1776 // multi threaded case: wait for the "initial search" phase to set the cookie 1777 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1778 final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1779 // FIXME JNR use this instead of previous line: 1780 // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString)); 1781 sendEntryIfMatches(searchOp, cookieEntry, returnECs, cookieString, -1); 1782 } 1783 } 1784 1785 private String updateCookie(DN baseDN, final CSN csn) 1786 { 1787 synchronized (cookie) 1788 { // forbid concurrent updates to the cookie 1789 cookie.update(baseDN, csn); 1790 return cookie.toString(); 1791 } 1792 } 1793 } 1794}