001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2016 ForgeRock AS. 015 */ 016package org.opends.server.replication.server.changelog.file; 017 018import static org.opends.messages.ReplicationMessages.*; 019import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 020import static org.opends.server.util.StaticUtils.*; 021 022import java.io.File; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.ConcurrentSkipListMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicReference; 034 035import net.jcip.annotations.GuardedBy; 036 037import org.forgerock.i18n.LocalizableMessageBuilder; 038import org.forgerock.i18n.slf4j.LocalizedLogger; 039import org.forgerock.opendj.config.DurationUnit; 040import org.forgerock.opendj.config.server.ConfigException; 041import org.forgerock.util.Pair; 042import org.forgerock.util.time.TimeService; 043import org.opends.server.api.DirectoryThread; 044import org.opends.server.backends.ChangelogBackend; 045import org.opends.server.crypto.CryptoSuite; 046import org.opends.server.replication.common.CSN; 047import org.opends.server.replication.common.MultiDomainServerState; 048import org.opends.server.replication.common.ServerState; 049import org.opends.server.replication.protocol.UpdateMsg; 050import org.opends.server.replication.server.ChangelogState; 051import org.opends.server.replication.server.ReplicationServer; 052import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 053import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 054import org.opends.server.replication.server.changelog.api.ChangelogDB; 055import org.opends.server.replication.server.changelog.api.ChangelogException; 056import org.opends.server.replication.server.changelog.api.DBCursor; 057import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 058import org.opends.server.replication.server.changelog.api.ReplicaId; 059import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 060import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; 061import org.forgerock.opendj.ldap.DN; 062import org.opends.server.util.StaticUtils; 063import org.opends.server.util.TimeThread; 064 065/** Log file implementation of the ChangelogDB interface. */ 066public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB 067{ 068 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 069 070 /** 071 * This map contains the List of updates received from each LDAP server. 072 * <p> 073 * When removing a domainMap, code: 074 * <ol> 075 * <li>first get the domainMap</li> 076 * <li>synchronized on the domainMap</li> 077 * <li>remove the domainMap</li> 078 * <li>then check it's not null</li> 079 * <li>then close all inside</li> 080 * </ol> 081 * When creating a replicaDB, synchronize on the domainMap to avoid 082 * concurrent shutdown. 083 */ 084 private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = 085 new ConcurrentHashMap<>(); 086 private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = 087 new ConcurrentSkipListMap<>(); 088 private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>(); 089 private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = 090 new ConcurrentSkipListMap<>(); 091 private ReplicationEnvironment replicationEnv; 092 private final File dbDirectory; 093 094 /** 095 * The handler of the changelog database, the database stores the relation 096 * between a change number and the associated cookie. 097 */ 098 @GuardedBy("cnIndexDBLock") 099 private FileChangeNumberIndexDB cnIndexDB; 100 private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>(); 101 102 /** Used for protecting {@link ChangeNumberIndexDB} related state. */ 103 private final Object cnIndexDBLock = new Object(); 104 105 /** 106 * The purge delay (in milliseconds). Records in the changelog DB that are 107 * older than this delay might be removed. 108 */ 109 private volatile long purgeDelayInMillis; 110 private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>(); 111 112 /** The local replication server. */ 113 private final ReplicationServer replicationServer; 114 private final AtomicBoolean shutdown = new AtomicBoolean(); 115 116 private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor(); 117 private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = 118 new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY); 119 120 private final CryptoSuite cryptoSuite; 121 /** 122 * Creates a new changelog DB. 123 * 124 * @param replicationServer 125 * the local replication server. 126 * @param dbDirectoryPath 127 * the path where the changelog files reside. 128 * @param cryptoSuite 129 * the cryptosuite to use for encryption 130 * @throws ConfigException 131 * if a problem occurs opening the supplied directory 132 */ 133 public FileChangelogDB(final ReplicationServer replicationServer, String dbDirectoryPath, CryptoSuite cryptoSuite) 134 throws ConfigException 135 { 136 this.replicationServer = replicationServer; 137 this.dbDirectory = makeDir(dbDirectoryPath); 138 this.cryptoSuite = cryptoSuite; 139 } 140 141 private File makeDir(final String dbDirName) throws ConfigException 142 { 143 // Check that this path exists or create it. 144 final File dbDirectory = getFileForPath(dbDirName); 145 try 146 { 147 if (!dbDirectory.exists()) 148 { 149 dbDirectory.mkdir(); 150 } 151 return dbDirectory; 152 } 153 catch (Exception e) 154 { 155 final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( 156 e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); 157 throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); 158 } 159 } 160 161 private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN) 162 { 163 final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 164 if (domainMap != null) 165 { 166 return domainMap; 167 } 168 return Collections.emptyMap(); 169 } 170 171 private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId) 172 { 173 return getDomainMap(baseDN).get(serverId); 174 } 175 176 /** 177 * Returns a {@link FileReplicaDB}, possibly creating it. 178 * 179 * @param baseDN 180 * the baseDN for which to create a ReplicaDB 181 * @param serverId 182 * the serverId for which to create a ReplicaDB 183 * @param server 184 * the ReplicationServer 185 * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created 186 * @throws ChangelogException 187 * if a problem occurred with the database 188 */ 189 Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, 190 final ReplicationServer server) throws ChangelogException 191 { 192 while (!shutdown.get()) 193 { 194 final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); 195 final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); 196 if (result != null) 197 { 198 final Boolean dbWasCreated = result.getSecond(); 199 if (dbWasCreated) 200 { // new replicaDB => update all cursors with it 201 final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); 202 if (cursors != null && !cursors.isEmpty()) 203 { 204 for (DomainDBCursor cursor : cursors) 205 { 206 cursor.addReplicaDB(serverId, null); 207 } 208 } 209 } 210 211 return result; 212 } 213 } 214 throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); 215 } 216 217 private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN) 218 { 219 // happy path: the domainMap already exists 220 final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); 221 if (currentValue != null) 222 { 223 return currentValue; 224 } 225 226 // unlucky, the domainMap does not exist: take the hit and create the 227 // newValue, even though the same could be done concurrently by another thread 228 final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<>(); 229 final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); 230 if (previousValue != null) 231 { 232 // there was already a value associated to the key, let's use it 233 return previousValue; 234 } 235 236 // we just created a new domain => update all cursors 237 for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) 238 { 239 cursor.addDomain(baseDN, null); 240 } 241 return newValue; 242 } 243 244 private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap, 245 final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException 246 { 247 // happy path: the replicaDB already exists 248 FileReplicaDB currentValue = domainMap.get(serverId); 249 if (currentValue != null) 250 { 251 return Pair.of(currentValue, false); 252 } 253 254 // unlucky, the replicaDB does not exist: take the hit and synchronize 255 // on the domainMap to create a new ReplicaDB 256 synchronized (domainMap) 257 { 258 // double-check 259 currentValue = domainMap.get(serverId); 260 if (currentValue != null) 261 { 262 return Pair.of(currentValue, false); 263 } 264 265 if (domainToReplicaDBs.get(baseDN) != domainMap) 266 { 267 // The domainMap could have been concurrently removed because 268 // 1) a shutdown was initiated or 2) an initialize was called. 269 // Return will allow the code to: 270 // 1) shutdown properly or 2) lazily recreate the replicaDB 271 return null; 272 } 273 274 final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, cryptoSuite, replicationEnv); 275 domainMap.put(serverId, newDB); 276 return Pair.of(newDB, true); 277 } 278 } 279 280 @Override 281 public void initializeDB() 282 { 283 try 284 { 285 replicationEnv = new ReplicationEnvironment(dbDirectory.getAbsolutePath(), replicationServer, TimeService.SYSTEM); 286 final ChangelogState changelogState = replicationEnv.getChangelogState(); 287 initializeToChangelogState(changelogState); 288 if (replicationServer.isChangeNumberEnabled()) 289 { 290 startIndexer(); 291 } 292 setPurgeDelay(replicationServer.getPurgeDelay()); 293 } 294 catch (ChangelogException e) 295 { 296 logger.traceException(e); 297 logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()); 298 } 299 } 300 301 private void initializeToChangelogState(final ChangelogState changelogState) 302 throws ChangelogException 303 { 304 for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet()) 305 { 306 replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); 307 } 308 for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) 309 { 310 for (int serverId : entry.getValue()) 311 { 312 getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); 313 } 314 } 315 } 316 317 private void shutdownChangeNumberIndexDB() throws ChangelogException 318 { 319 synchronized (cnIndexDBLock) 320 { 321 if (cnIndexDB != null) 322 { 323 cnIndexDB.shutdown(); 324 } 325 } 326 } 327 328 @Override 329 public void shutdownDB() throws ChangelogException 330 { 331 if (!this.shutdown.compareAndSet(false, true)) 332 { // shutdown has already been initiated 333 return; 334 } 335 336 shutdownCNIndexerAndPurger(); 337 338 // Remember the first exception because : 339 // - we want to try to remove everything we want to remove 340 // - then throw the first encountered exception 341 ChangelogException firstException = null; 342 343 // now we can safely shutdown all DBs 344 try 345 { 346 shutdownChangeNumberIndexDB(); 347 } 348 catch (ChangelogException e) 349 { 350 firstException = e; 351 } 352 353 for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it = 354 this.domainToReplicaDBs.values().iterator(); it.hasNext();) 355 { 356 final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next(); 357 synchronized (domainMap) 358 { 359 it.remove(); 360 for (FileReplicaDB replicaDB : domainMap.values()) 361 { 362 replicaDB.shutdown(); 363 } 364 } 365 } 366 if (replicationEnv != null) 367 { 368 replicationEnv.shutdown(); 369 } 370 371 if (firstException != null) 372 { 373 throw firstException; 374 } 375 } 376 377 private void shutdownCNIndexerAndPurger() 378 { 379 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 380 if (indexer != null) 381 { 382 indexer.initiateShutdown(); 383 } 384 final ChangelogDBPurger purger = cnPurger.getAndSet(null); 385 if (purger != null) 386 { 387 purger.initiateShutdown(); 388 } 389 390 // wait for shutdown of the threads holding cursors 391 try 392 { 393 if (indexer != null) 394 { 395 indexer.join(); 396 } 397 if (purger != null) 398 { 399 purger.join(); 400 } 401 } 402 catch (InterruptedException e) 403 { 404 // do nothing: we are already shutting down 405 } 406 } 407 408 /** 409 * Clears all records from the changelog (does not remove the changelog itself). 410 * 411 * @throws ChangelogException 412 * If an error occurs when clearing the changelog. 413 */ 414 public void clearDB() throws ChangelogException 415 { 416 if (!dbDirectory.exists()) 417 { 418 return; 419 } 420 421 // Remember the first exception because : 422 // - we want to try to remove everything we want to remove 423 // - then throw the first encountered exception 424 ChangelogException firstException = null; 425 426 for (DN baseDN : this.domainToReplicaDBs.keySet()) 427 { 428 removeDomain(baseDN); 429 } 430 431 synchronized (cnIndexDBLock) 432 { 433 if (cnIndexDB != null) 434 { 435 try 436 { 437 cnIndexDB.clear(); 438 } 439 catch (ChangelogException e) 440 { 441 firstException = e; 442 } 443 444 try 445 { 446 shutdownChangeNumberIndexDB(); 447 } 448 catch (ChangelogException e) 449 { 450 if (firstException == null) 451 { 452 firstException = e; 453 } 454 else 455 { 456 logger.traceException(e); 457 } 458 } 459 460 cnIndexDB = null; 461 } 462 } 463 464 if (firstException != null) 465 { 466 throw firstException; 467 } 468 } 469 470 @Override 471 public void removeDB() throws ChangelogException 472 { 473 shutdownDB(); 474 StaticUtils.recursiveDelete(dbDirectory); 475 } 476 477 @Override 478 public ServerState getDomainOldestCSNs(DN baseDN) 479 { 480 final ServerState result = new ServerState(); 481 for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) 482 { 483 result.update(replicaDB.getOldestCSN()); 484 } 485 return result; 486 } 487 488 @Override 489 public ServerState getDomainNewestCSNs(DN baseDN) 490 { 491 final ServerState result = new ServerState(); 492 for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) 493 { 494 result.update(replicaDB.getNewestCSN()); 495 } 496 return result; 497 } 498 499 @Override 500 public void removeDomain(DN baseDN) throws ChangelogException 501 { 502 // Remember the first exception because : 503 // - we want to try to remove everything we want to remove 504 // - then throw the first encountered exception 505 ChangelogException firstException = null; 506 507 // 1- clear the replica DBs 508 Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 509 if (domainMap != null) 510 { 511 final ChangeNumberIndexer indexer = this.cnIndexer.get(); 512 if (indexer != null) 513 { 514 indexer.clear(baseDN); 515 } 516 synchronized (domainMap) 517 { 518 domainMap = domainToReplicaDBs.remove(baseDN); 519 for (FileReplicaDB replicaDB : domainMap.values()) 520 { 521 try 522 { 523 replicaDB.clear(); 524 } 525 catch (ChangelogException e) 526 { 527 firstException = e; 528 } 529 replicaDB.shutdown(); 530 } 531 } 532 } 533 534 535 // 2- clear the changelogstate DB 536 try 537 { 538 replicationEnv.clearGenerationId(baseDN); 539 } 540 catch (ChangelogException e) 541 { 542 if (firstException == null) 543 { 544 firstException = e; 545 } 546 else 547 { 548 logger.traceException(e); 549 } 550 } 551 552 if (firstException != null) 553 { 554 throw firstException; 555 } 556 } 557 558 @Override 559 public void setPurgeDelay(final long purgeDelayInMillis) 560 { 561 this.purgeDelayInMillis = purgeDelayInMillis; 562 563 // Rotation time interval for CN Index DB log file 564 // needs to be a fraction of the purge delay 565 // to ensure there is at least one file to purge 566 replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2); 567 568 if (purgeDelayInMillis > 0) 569 { 570 startCNPurger(); 571 } 572 else 573 { 574 final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); 575 if (purgerToStop != null) 576 { // stop this purger 577 purgerToStop.initiateShutdown(); 578 } 579 } 580 } 581 582 private void startCNPurger() 583 { 584 final ChangelogDBPurger newPurger = new ChangelogDBPurger(); 585 if (cnPurger.compareAndSet(null, newPurger)) 586 { // no purger was running, run this new one 587 newPurger.start(); 588 } 589 else 590 { // a purger was already running, just wake that one up 591 // to verify if some entries can be purged 592 final ChangelogDBPurger currentPurger = cnPurger.get(); 593 synchronized (currentPurger) 594 { 595 currentPurger.notify(); 596 } 597 } 598 } 599 600 @Override 601 public void setComputeChangeNumber(final boolean computeChangeNumber) 602 throws ChangelogException 603 { 604 if (computeChangeNumber) 605 { 606 startIndexer(); 607 } 608 else 609 { 610 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 611 if (indexer != null) 612 { 613 indexer.initiateShutdown(); 614 } 615 } 616 } 617 618 void resetChangeNumberIndex(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException 619 { 620 if (!replicationServer.isChangeNumberEnabled()) 621 { 622 throw new ChangelogException(ERR_REPLICATION_CHANGE_NUMBER_DISABLED.get(baseDN)); 623 } 624 if (!getDomainNewestCSNs(baseDN).cover(newFirstCSN)) 625 { 626 throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT.get(newFirstCN, baseDN, 627 newFirstCSN)); 628 } 629 if (getDomainOldestCSNs(baseDN).getCSN(newFirstCSN.getServerId()).isNewerThan(newFirstCSN)) 630 { 631 throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD.get(newFirstCN, newFirstCSN)); 632 } 633 634 shutdownCNIndexerAndPurger(); 635 synchronized (cnIndexDBLock) 636 { 637 cnIndexDB.clearAndSetChangeNumber(newFirstCN); 638 cnIndexDB.addRecord(new ChangeNumberIndexRecord(newFirstCN, baseDN, newFirstCSN)); 639 } 640 startIndexer(); 641 if (purgeDelayInMillis > 0) 642 { 643 startCNPurger(); 644 } 645 } 646 647 private void startIndexer() 648 { 649 final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv); 650 if (cnIndexer.compareAndSet(null, indexer)) 651 { 652 indexer.start(); 653 } 654 } 655 656 @Override 657 public ChangeNumberIndexDB getChangeNumberIndexDB() 658 { 659 synchronized (cnIndexDBLock) 660 { 661 if (cnIndexDB == null) 662 { 663 try 664 { 665 cnIndexDB = new FileChangeNumberIndexDB(this, replicationEnv); 666 } 667 catch (Exception e) 668 { 669 logger.traceException(e); 670 logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage()); 671 } 672 } 673 return cnIndexDB; 674 } 675 } 676 677 @Override 678 public ReplicationDomainDB getReplicationDomainDB() 679 { 680 return this; 681 } 682 683 @Override 684 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options) 685 throws ChangelogException 686 { 687 final Set<DN> excludedDomainDns = Collections.emptySet(); 688 return getCursorFrom(startState, options, excludedDomainDns); 689 } 690 691 @Override 692 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, 693 CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException 694 { 695 final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options); 696 registeredMultiDomainCursors.add(cursor); 697 for (DN baseDN : domainToReplicaDBs.keySet()) 698 { 699 if (!excludedDomainDns.contains(baseDN)) 700 { 701 cursor.addDomain(baseDN, startState.getServerState(baseDN)); 702 } 703 } 704 return cursor; 705 } 706 707 @Override 708 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options) 709 throws ChangelogException 710 { 711 final DomainDBCursor cursor = newDomainDBCursor(baseDN, options); 712 for (int serverId : getDomainMap(baseDN).keySet()) 713 { 714 // get the last already sent CSN from that server to get a cursor 715 final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; 716 cursor.addReplicaDB(serverId, lastCSN); 717 } 718 return cursor; 719 } 720 721 private DomainDBCursor newDomainDBCursor(final DN baseDN, final CursorOptions options) 722 { 723 final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options); 724 putCursor(registeredDomainCursors, baseDN, cursor); 725 return cursor; 726 } 727 728 private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) 729 { 730 final MultiDomainServerState offlineReplicas = 731 replicationEnv.getChangelogState().getOfflineReplicas(); 732 final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); 733 if (offlineCSN != null 734 && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) 735 { 736 return offlineCSN; 737 } 738 return null; 739 } 740 741 @Override 742 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, 743 CursorOptions options) throws ChangelogException 744 { 745 final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); 746 if (replicaDB != null) 747 { 748 final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); 749 final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( 750 actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); 751 final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); 752 final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); 753 final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); 754 755 putCursor(replicaCursors, replicaId, replicaCursor); 756 757 return replicaCursor; 758 } 759 return EMPTY_CURSOR_REPLICA_DB; 760 } 761 762 private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) 763 { 764 CopyOnWriteArrayList<V> cursors = map.get(key); 765 if (cursors == null) 766 { 767 cursors = new CopyOnWriteArrayList<>(); 768 CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); 769 if (previousValue != null) 770 { 771 cursors = previousValue; 772 } 773 } 774 cursors.add(cursor); 775 } 776 777 @Override 778 public void unregisterCursor(final DBCursor<?> cursor) 779 { 780 if (cursor instanceof MultiDomainDBCursor) 781 { 782 registeredMultiDomainCursors.remove(cursor); 783 } 784 else if (cursor instanceof DomainDBCursor) 785 { 786 final DomainDBCursor domainCursor = (DomainDBCursor) cursor; 787 final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); 788 if (cursors != null) 789 { 790 cursors.remove(cursor); 791 } 792 } 793 else if (cursor instanceof ReplicaCursor) 794 { 795 final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; 796 final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); 797 if (cursors != null) 798 { 799 cursors.remove(cursor); 800 } 801 } 802 } 803 804 @Override 805 public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException 806 { 807 final CSN csn = updateMsg.getCSN(); 808 final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, 809 csn.getServerId(), replicationServer); 810 final FileReplicaDB replicaDB = pair.getFirst(); 811 replicaDB.add(updateMsg); 812 813 ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg); 814 815 final ChangeNumberIndexer indexer = cnIndexer.get(); 816 if (indexer != null) 817 { 818 notifyReplicaOnline(indexer, baseDN, csn.getServerId()); 819 indexer.publishUpdateMsg(baseDN, updateMsg); 820 } 821 return pair.getSecond(); // replica DB was created 822 } 823 824 @Override 825 public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException 826 { 827 final ChangeNumberIndexer indexer = cnIndexer.get(); 828 if (indexer != null) 829 { 830 notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); 831 indexer.publishHeartbeat(baseDN, heartbeatCSN); 832 } 833 } 834 835 private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) 836 throws ChangelogException 837 { 838 if (indexer.isReplicaOffline(baseDN, serverId)) 839 { 840 replicationEnv.notifyReplicaOnline(baseDN, serverId); 841 } 842 updateCursorsWithOfflineCSN(baseDN, serverId, null); 843 } 844 845 @Override 846 public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException 847 { 848 replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); 849 final ChangeNumberIndexer indexer = cnIndexer.get(); 850 if (indexer != null) 851 { 852 indexer.replicaOffline(baseDN, offlineCSN); 853 } 854 updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); 855 } 856 857 private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) 858 { 859 final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); 860 if (cursors != null) 861 { 862 for (ReplicaCursor cursor : cursors) 863 { 864 cursor.setOfflineCSN(offlineCSN); 865 } 866 } 867 } 868 869 /** 870 * The thread purging the changelogDB on a regular interval. Records are 871 * purged from the changelogDB if they are older than a delay specified in 872 * seconds. The purge process works in two steps: 873 * <ol> 874 * <li>first purge the changeNumberIndexDB and retrieve information to drive 875 * replicaDBs purging</li> 876 * <li>proceed to purge each replicaDBs based on the information collected 877 * when purging the changeNumberIndexDB</li> 878 * </ol> 879 */ 880 private final class ChangelogDBPurger extends DirectoryThread 881 { 882 private static final int DEFAULT_SLEEP = 500; 883 884 protected ChangelogDBPurger() 885 { 886 super("Changelog DB purger"); 887 } 888 889 @Override 890 public void run() 891 { 892 // initialize CNIndexDB 893 getChangeNumberIndexDB(); 894 boolean canDisplayNothingToPurgeMsg = true; 895 while (!isShutdownInitiated()) 896 { 897 try 898 { 899 final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; 900 final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); 901 final CSN oldestNotPurgedCSN; 902 903 if (!replicationServer.isChangeNumberEnabled() || !replicationServer.isECLEnabled()) 904 { 905 oldestNotPurgedCSN = purgeCSN; 906 } 907 else 908 { 909 final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB; 910 if (localCNIndexDB == null) 911 { // shutdown has been initiated 912 return; 913 } 914 915 oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); 916 if (oldestNotPurgedCSN == null) 917 { // shutdown may have been initiated... 918 // ... or change number index DB determined there is nothing to purge, 919 // wait for new changes to come in. 920 921 // Note we cannot sleep for as long as the purge delay 922 // (3 days default), because we might receive late updates 923 // that will have to be purged before the purge delay elapses. 924 // This can particularly happen in case of network partitions. 925 if (!isShutdownInitiated()) 926 { 927 synchronized (this) 928 { 929 if (!isShutdownInitiated()) 930 { 931 if (canDisplayNothingToPurgeMsg) 932 { 933 logger.trace("Nothing to purge, waiting for new changes"); 934 canDisplayNothingToPurgeMsg = false; 935 } 936 wait(DEFAULT_SLEEP); 937 } 938 } 939 } 940 continue; 941 } 942 } 943 944 for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values()) 945 { 946 for (final FileReplicaDB replicaDB : domainMap.values()) 947 { 948 replicaDB.purgeUpTo(oldestNotPurgedCSN); 949 } 950 } 951 952 if (!isShutdownInitiated()) 953 { 954 synchronized (this) 955 { 956 if (!isShutdownInitiated()) 957 { 958 final long sleepTime = computeSleepTimeUntilNextPurge(oldestNotPurgedCSN); 959 if (logger.isTraceEnabled()) 960 { 961 tracePurgeDetails(purgeCSN, oldestNotPurgedCSN, sleepTime); 962 canDisplayNothingToPurgeMsg = true; 963 } 964 wait(sleepTime); 965 } 966 } 967 } 968 } 969 catch (InterruptedException e) 970 { 971 // shutdown initiated? 972 } 973 catch (Exception e) 974 { 975 logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e)); 976 if (replicationServer != null) 977 { 978 replicationServer.shutdown(); 979 } 980 } 981 } 982 } 983 984 private void tracePurgeDetails(final CSN purgeCSN, final CSN oldestNotPurgedCSN, final long sleepTime) 985 { 986 if (purgeCSN.equals(oldestNotPurgedCSN.toStringUI())) 987 { 988 logger.trace("Purged up to %s. " 989 + "now sleeping until next purge during %s", 990 purgeCSN.toStringUI(), DurationUnit.toString(sleepTime)); 991 } 992 else 993 { 994 logger.trace("Asked to purge up to %s, actually purged up to %s (not included). " 995 + "now sleeping until next purge during %s", 996 purgeCSN.toStringUI(), oldestNotPurgedCSN.toStringUI(), DurationUnit.toString(sleepTime)); 997 } 998 } 999 1000 private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) 1001 { 1002 final long nextPurgeTime = notPurgedCSN.getTime(); 1003 final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; 1004 if (currentPurgeTime < nextPurgeTime) 1005 { 1006 // sleep till the next CSN to purge, 1007 return nextPurgeTime - currentPurgeTime; 1008 } 1009 // wait a bit before purging more 1010 return DEFAULT_SLEEP; 1011 } 1012 1013 @Override 1014 public void initiateShutdown() 1015 { 1016 super.initiateShutdown(); 1017 synchronized (this) 1018 { 1019 notify(); // wake up the purger thread for faster shutdown 1020 } 1021 } 1022 } 1023}