001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2013-2016 ForgeRock AS. 015 */ 016package org.opends.server.replication.server.changelog.file; 017 018import java.util.Map.Entry; 019import java.util.Set; 020import java.util.concurrent.ConcurrentSkipListSet; 021 022import org.forgerock.i18n.slf4j.LocalizedLogger; 023import org.opends.server.api.DirectoryThread; 024import org.opends.server.backends.ChangelogBackend; 025import org.opends.server.replication.common.CSN; 026import org.opends.server.replication.common.MultiDomainServerState; 027import org.opends.server.replication.common.ServerState; 028import org.opends.server.replication.protocol.ReplicaOfflineMsg; 029import org.opends.server.replication.protocol.UpdateMsg; 030import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; 031import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 032import org.opends.server.replication.server.changelog.api.ChangelogDB; 033import org.opends.server.replication.server.changelog.api.ChangelogException; 034import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; 035import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 036import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 037import org.forgerock.opendj.ldap.DN; 038 039import static org.opends.messages.ReplicationMessages.*; 040import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 041import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 042import static org.opends.server.util.StaticUtils.*; 043 044/** 045 * Thread responsible for inserting replicated changes into the ChangeNumber 046 * Index DB (CNIndexDB for short). 047 * <p> 048 * Only changes older than the medium consistency point are inserted in the 049 * CNIndexDB. As a consequence this class is also responsible for maintaining 050 * the medium consistency point (indirectly through an 051 * {@link ECLMultiDomainDBCursor}). 052 */ 053public class ChangeNumberIndexer extends DirectoryThread 054{ 055 /** The tracer object for the debug logger. */ 056 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 057 058 /** 059 * If it contains nothing, then the run method executes normally. 060 * Otherwise, the {@link #run()} method must clear its state 061 * for the supplied domain baseDNs. If a supplied domain is 062 * {@link DN#rootDN()}, then all domains will be cleared. 063 */ 064 private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>(); 065 private final ChangelogDB changelogDB; 066 private final ChangelogStateProvider changelogStateProvider; 067 private final ECLEnabledDomainPredicate predicate; 068 069 /* 070 * The following MultiDomainServerState fields must be thread safe, because 071 * 1) initialization can happen while the replication server starts receiving 072 * updates 073 * 2) many updates can happen concurrently. 074 */ 075 /** 076 * Holds the last time each replica was seen alive, whether via updates or 077 * heartbeat notifications, or offline notifications. Data is held for each 078 * serverId cross domain. 079 * <p> 080 * Updates are persistent and stored in the replicaDBs, heartbeats are 081 * transient and are easily constructed on normal operations. 082 * <p> 083 * Note: This object is updated by both heartbeats and changes/updates. 084 */ 085 private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); 086 087 /** Note: This object is updated by replica offline messages. */ 088 private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); 089 090 /** 091 * Cursor across all the replicaDBs for all the replication domains. It is 092 * positioned on the next change that needs to be inserted in the CNIndexDB. 093 * <p> 094 * Note: it is only accessed from the {@link #run()} method. 095 * 096 * @NonNull 097 */ 098 private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; 099 private MultiDomainServerState cookie = new MultiDomainServerState(); 100 101 /** 102 * Builds a ChangeNumberIndexer object. 103 * @param changelogDB 104 * the changelogDB 105 * @param changelogStateProvider 106 * the replication environment information for access to changelog state 107 */ 108 public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider) 109 { 110 this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate()); 111 } 112 113 /** 114 * Builds a ChangeNumberIndexer object. 115 * @param changelogDB 116 * the changelogDB 117 * @param changelogStateProvider 118 * the changelog state used for initialization 119 * @param predicate 120 */ 121 ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider, 122 ECLEnabledDomainPredicate predicate) 123 { 124 super("Change number indexer"); 125 this.changelogDB = changelogDB; 126 this.changelogStateProvider = changelogStateProvider; 127 this.predicate = predicate; 128 } 129 130 /** 131 * Ensures the medium consistency point is updated by heartbeats. 132 * 133 * @param baseDN 134 * the baseDN of the domain for which the heartbeat is published 135 * @param heartbeatCSN 136 * the CSN coming from the heartbeat 137 */ 138 public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) 139 { 140 if (!predicate.isECLEnabledDomain(baseDN)) 141 { 142 return; 143 } 144 145 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 146 lastAliveCSNs.update(baseDN, heartbeatCSN); 147 tryNotify(oldestCSNBefore); 148 } 149 150 /** 151 * Indicates if the replica corresponding to provided domain DN and server id 152 * is offline. 153 * 154 * @param domainDN 155 * base DN of the replica 156 * @param serverId 157 * server id of the replica 158 * @return {@code true} if replica is offline, {@code false} otherwise 159 */ 160 public boolean isReplicaOffline(DN domainDN, int serverId) 161 { 162 return replicasOffline.getCSN(domainDN, serverId) != null; 163 } 164 165 /** 166 * Ensures the medium consistency point is updated by UpdateMsg. 167 * 168 * @param baseDN 169 * the baseDN of the domain for which the heartbeat is published 170 * @param updateMsg 171 * the updateMsg that will update the medium consistency point 172 * @throws ChangelogException 173 * If a database problem happened 174 */ 175 public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) 176 throws ChangelogException 177 { 178 if (!predicate.isECLEnabledDomain(baseDN)) 179 { 180 return; 181 } 182 183 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 184 lastAliveCSNs.update(baseDN, updateMsg.getCSN()); 185 tryNotify(oldestCSNBefore); 186 } 187 188 /** 189 * Signals a replica went offline. 190 * 191 * @param baseDN 192 * the replica's replication domain 193 * @param offlineCSN 194 * the serverId and time of the replica that went offline 195 */ 196 public void replicaOffline(DN baseDN, CSN offlineCSN) 197 { 198 if (!predicate.isECLEnabledDomain(baseDN)) 199 { 200 return; 201 } 202 203 replicasOffline.update(baseDN, offlineCSN); 204 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 205 lastAliveCSNs.update(baseDN, offlineCSN); 206 tryNotify(oldestCSNBefore); 207 } 208 209 private CSN getOldestLastAliveCSN() 210 { 211 return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond(); 212 } 213 214 /** 215 * Notifies the Change number indexer thread if it will be able to do some 216 * work. 217 */ 218 private void tryNotify(final CSN oldestCSNBefore) 219 { 220 if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore)) 221 { 222 synchronized (this) 223 { 224 notify(); 225 } 226 } 227 } 228 229 /** 230 * Used for waking up the {@link ChangeNumberIndexer} thread because it might 231 * have some work to do. 232 */ 233 private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore) 234 { 235 final CSN oldestCSNAfter = getOldestLastAliveCSN(); 236 // ensure that all initial replicas alive information have been updated 237 // with CSNs that are acceptable for moving the medium consistency forward 238 return allInitialReplicasAreOfflineOrAlive() 239 && oldestCSNBefore != null // then oldestCSNAfter cannot be null 240 // has the oldest CSN changed? 241 && oldestCSNBefore.isOlderThan(oldestCSNAfter); 242 } 243 244 /** 245 * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN 246 * must be persisted to the change number index DB. 247 */ 248 private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist) 249 { 250 // ensure that all initial replicas alive information have been updated 251 // with CSNs that are acceptable for moving the medium consistency forward 252 return allInitialReplicasAreOfflineOrAlive() 253 // can we persist the next CSN? 254 && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN()); 255 } 256 257 /** 258 * Returns true only if the initial replicas known from the changelog state DB 259 * are either: 260 * <ul> 261 * <li>offline, so do not wait for them in order to compute medium consistency 262 * </li> 263 * <li>alive, because we received heartbeats or changes (so their last alive 264 * CSN has been updated to something past the oldest possible CSN), we have 265 * enough info to compute medium consistency</li> 266 * </ul> 267 * In both cases, we have enough information to compute medium consistency 268 * without waiting any further. 269 */ 270 private boolean allInitialReplicasAreOfflineOrAlive() 271 { 272 for (DN baseDN : lastAliveCSNs) 273 { 274 for (CSN csn : lastAliveCSNs.getServerState(baseDN)) 275 { 276 if (csn.getTime() == 0 277 && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) 278 { 279 // this is the oldest possible CSN, but the replica is not offline 280 // we must wait for more up to date information from this replica 281 return false; 282 } 283 } 284 } 285 return true; 286 } 287 288 /** 289 * Restores in memory data needed to build the CNIndexDB. In particular, 290 * initializes the changes cursor to the medium consistency point. 291 */ 292 private void initialize() throws ChangelogException 293 { 294 final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); 295 296 initializeLastAliveCSNs(domainDB); 297 initializeNextChangeCursor(domainDB); 298 initializeOfflineReplicas(); 299 } 300 301 private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException 302 { 303 // Initialize the multi domain cursor only from the change number index record. 304 // The cookie is always empty at this stage. 305 final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); 306 final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null; 307 final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn); 308 final MultiDomainServerState unused = new MultiDomainServerState(); 309 MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options); 310 311 nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); 312 ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord); 313 } 314 315 private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) 316 { 317 for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet()) 318 { 319 final DN baseDN = entry.getKey(); 320 if (predicate.isECLEnabledDomain(baseDN)) 321 { 322 for (Integer serverId : entry.getValue()) 323 { 324 /* 325 * initialize with the oldest possible CSN in order for medium 326 * consistency to wait for all replicas to be alive before moving forward 327 */ 328 lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); 329 } 330 331 final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); 332 lastAliveCSNs.update(baseDN, latestKnownState); 333 } 334 } 335 } 336 337 private void initializeOfflineReplicas() 338 { 339 final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas(); 340 for (DN baseDN : offlineReplicas) 341 { 342 for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) 343 { 344 if (predicate.isECLEnabledDomain(baseDN)) 345 { 346 replicasOffline.update(baseDN, offlineCSN); 347 // a replica offline message could also be the very last time 348 // we heard from this replica :) 349 lastAliveCSNs.update(baseDN, offlineCSN); 350 } 351 } 352 } 353 } 354 355 private CSN oldestPossibleCSN(int serverId) 356 { 357 return new CSN(0, 0, serverId); 358 } 359 360 /** {@inheritDoc} */ 361 @Override 362 public void initiateShutdown() 363 { 364 super.initiateShutdown(); 365 synchronized (this) 366 { 367 notify(); 368 } 369 } 370 371 /** {@inheritDoc} */ 372 @Override 373 public void run() 374 { 375 try 376 { 377 /* 378 * initialize here to allow fast application start up and avoid errors due 379 * cursors being created in a different thread to the one where they are used. 380 */ 381 initialize(); 382 383 while (!isShutdownInitiated()) 384 { 385 try 386 { 387 while (!domainsToClear.isEmpty()) 388 { 389 final DN baseDNToClear = domainsToClear.first(); 390 nextChangeForInsertDBCursor.removeDomain(baseDNToClear); 391 // Only release the waiting thread 392 // once this domain's state has been cleared. 393 domainsToClear.remove(baseDNToClear); 394 } 395 if (nextChangeForInsertDBCursor.shouldReInitialize()) 396 { 397 nextChangeForInsertDBCursor.close(); 398 initialize(); 399 } 400 // Do not call DBCursor.next() here 401 // because we might not have consumed the last record, 402 // for example if we could not move the MCP forward 403 final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); 404 if (msg == null) 405 { 406 synchronized (this) 407 { 408 if (isShutdownInitiated()) 409 { 410 continue; 411 } 412 wait(); 413 } 414 // check whether new changes have been added to the ReplicaDBs 415 moveToNextChange(); 416 continue; 417 } 418 else if (msg instanceof ReplicaOfflineMsg) 419 { 420 moveToNextChange(); 421 continue; 422 } 423 424 final CSN csn = msg.getCSN(); 425 final DN baseDN = nextChangeForInsertDBCursor.getData(); 426 // FIXME problem: what if the serverId is not part of the ServerState? 427 // right now, change number will be blocked 428 if (!canMoveForwardMediumConsistencyPoint(csn)) 429 { 430 // the oldest record to insert is newer than the medium consistency 431 // point. Let's wait for a change that can be published. 432 synchronized (this) 433 { 434 // double check to protect against a missed call to notify() 435 if (!canMoveForwardMediumConsistencyPoint(csn)) 436 { 437 if (isShutdownInitiated()) 438 { 439 return; 440 } 441 wait(); 442 // loop to check if changes older than the medium consistency 443 // point have been added to the ReplicaDBs 444 continue; 445 } 446 } 447 } 448 449 // OK, the oldest change is older than the medium consistency point 450 // let's publish it to the CNIndexDB. 451 final long changeNumber = changelogDB.getChangeNumberIndexDB() 452 .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); 453 if (!cookie.update(baseDN, csn)) 454 { 455 throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn 456 + ") would have updated the cookie=" + cookie + ", but it did not"); 457 } 458 notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); 459 moveForwardMediumConsistencyPoint(csn, baseDN); 460 } 461 catch (InterruptedException ignored) 462 { 463 // was shutdown called? loop to figure it out. 464 Thread.currentThread().interrupt(); 465 } 466 } 467 } 468 catch (RuntimeException e) 469 { 470 logUnexpectedException(e); 471 // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. 472 throw e; 473 } 474 catch (Exception e) 475 { 476 logUnexpectedException(e); 477 // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. 478 throw new RuntimeException(e); 479 } 480 finally 481 { 482 nextChangeForInsertDBCursor.close(); 483 nextChangeForInsertDBCursor = null; 484 } 485 } 486 487 private void moveToNextChange() throws ChangelogException 488 { 489 try 490 { 491 nextChangeForInsertDBCursor.next(); 492 } 493 catch (AbortedChangelogCursorException e) { 494 if (domainsToClear.isEmpty()) 495 { 496 // There is no domain to clear, thus it is 497 // not expected that a cursor is aborted 498 throw e; 499 } 500 // else assumes the aborted cursor is part of a domain 501 // that will be removed on the next iteration 502 logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s", 503 e, domainsToClear.size()); 504 } 505 } 506 507 /** 508 * Notifies the {@link ChangelogBackend} that a new entry has been added. 509 * 510 * @param baseDN 511 * the baseDN of the newly added entry. 512 * @param changeNumber 513 * the change number of the newly added entry. It will be greater 514 * than zero for entries added to the change number index and less 515 * than or equal to zero for entries added to any replica DB 516 * @param cookie 517 * the cookie of the newly added entry. This is only meaningful for 518 * entries added to the change number index 519 * @param msg 520 * the update message of the newly added entry 521 * @throws ChangelogException 522 * If a problem occurs while notifying of the newly added entry. 523 */ 524 protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, 525 MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException 526 { 527 ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg); 528 } 529 530 /** 531 * Nothing can be done about it. 532 * <p> 533 * Rely on the DirectoryThread uncaught exceptions handler for logging error + 534 * alert. 535 * <p> 536 * Message logged here gives corrective information to the administrator. 537 */ 538 private void logUnexpectedException(Exception e) 539 { 540 logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, 541 getClass().getSimpleName(), stackTraceToSingleLineString(e)); 542 } 543 544 private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException 545 { 546 final int mcServerId = mcCSN.getServerId(); 547 final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); 548 final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); 549 if (offlineCSN != null) 550 { 551 if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN)) 552 { 553 // replica is back online, we can forget the last time it was offline 554 replicasOffline.removeCSN(mcBaseDN, offlineCSN); 555 } 556 else if (offlineCSN.isOlderThan(mcCSN)) 557 { 558 /* 559 * replica is not back online, Medium consistency point has gone past 560 * its last offline time, and there are no more changes after the 561 * offline CSN in the cursor: remove everything known about it 562 * (offlineCSN from lastAliveCSN and remove all knowledge of this replica 563 * from the medium consistency RUV). 564 */ 565 lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); 566 } 567 } 568 569 // advance the cursor we just read from, 570 // success/failure will be checked later 571 nextChangeForInsertDBCursor.next(); 572 } 573 574 /** 575 * Asks the current thread to clear its state for the specified domain. 576 * <p> 577 * Note: This method blocks the current thread until state is cleared. 578 * 579 * @param baseDN 580 * the baseDN to be cleared from this thread's state. {@code null} and 581 * {@link DN#rootDN()} mean "clear all domains". 582 */ 583 public void clear(DN baseDN) 584 { 585 final DN baseDNToClear = baseDN != null ? baseDN : DN.rootDN(); 586 domainsToClear.add(baseDNToClear); 587 while (domainsToClear.contains(baseDNToClear) 588 && !State.TERMINATED.equals(getState())) 589 { 590 // wait until clear() has been done by thread, always waking it up 591 synchronized (this) 592 { 593 notify(); 594 } 595 // ensures thread wait that this thread's state is cleaned up 596 Thread.yield(); 597 } 598 } 599}