001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Timer; 028import java.util.TimerTask; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicReference; 032import java.util.concurrent.locks.ReentrantLock; 033 034import net.jcip.annotations.GuardedBy; 035 036import org.forgerock.i18n.LocalizableMessage; 037import org.forgerock.i18n.LocalizableMessageBuilder; 038import org.forgerock.i18n.slf4j.LocalizedLogger; 039import org.forgerock.opendj.ldap.ResultCode; 040import org.opends.server.api.MonitorData; 041import org.forgerock.opendj.server.config.server.MonitorProviderCfg; 042import org.opends.server.api.MonitorProvider; 043import org.opends.server.core.DirectoryServer; 044import org.opends.server.replication.common.CSN; 045import org.opends.server.replication.common.DSInfo; 046import org.opends.server.replication.common.RSInfo; 047import org.opends.server.replication.common.ServerState; 048import org.opends.server.replication.common.ServerStatus; 049import org.opends.server.replication.common.StatusMachineEvent; 050import org.opends.server.replication.protocol.AckMsg; 051import org.opends.server.replication.protocol.ChangeStatusMsg; 052import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; 053import org.opends.server.replication.protocol.ErrorMsg; 054import org.opends.server.replication.protocol.MonitorMsg; 055import org.opends.server.replication.protocol.MonitorRequestMsg; 056import org.opends.server.replication.protocol.ReplicaOfflineMsg; 057import org.opends.server.replication.protocol.ResetGenerationIdMsg; 058import org.opends.server.replication.protocol.RoutableMsg; 059import org.opends.server.replication.protocol.TopologyMsg; 060import org.opends.server.replication.protocol.UpdateMsg; 061import org.opends.server.replication.server.changelog.api.ChangelogException; 062import org.opends.server.replication.server.changelog.api.DBCursor; 063import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 064import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 065import org.forgerock.opendj.ldap.DN; 066import org.opends.server.types.DirectoryException; 067import org.opends.server.types.HostPort; 068 069import static org.opends.messages.ReplicationMessages.*; 070import static org.opends.server.replication.common.ServerStatus.*; 071import static org.opends.server.replication.common.StatusMachineEvent.*; 072import static org.opends.server.replication.protocol.ProtocolVersion.*; 073import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 074import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 075import static org.opends.server.util.CollectionUtils.*; 076import static org.opends.server.util.StaticUtils.*; 077 078/** 079 * This class define an in-memory cache that will be used to store 080 * the messages that have been received from an LDAP server or 081 * from another replication server and that should be forwarded to 082 * other servers. 083 * 084 * The size of the cache is set by configuration. 085 * If the cache becomes bigger than the configured size, the older messages 086 * are removed and should they be needed again must be read from the backing 087 * file 088 * 089 * it runs a thread that is responsible for saving the messages 090 * received to the disk and for trimming them 091 * Decision to trim can be based on disk space or age of the message 092 */ 093public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> 094{ 095 private final DN baseDN; 096 097 /** 098 * Periodically verifies whether the connected DSs are late and publishes any 099 * pending status messages. 100 */ 101 private final StatusAnalyzer statusAnalyzer; 102 103 /** 104 * The monitoring publisher that periodically sends monitoring messages to the 105 * topology. Using an AtomicReference to avoid leaking references to costly 106 * threads. 107 */ 108 private final AtomicReference<MonitoringPublisher> monitoringPublisher = new AtomicReference<>(); 109 /** Maintains monitor data for the current domain. */ 110 private final ReplicationDomainMonitor domainMonitor = new ReplicationDomainMonitor(this); 111 112 /** 113 * The following map contains one balanced tree for each replica ID to which 114 * we are currently publishing the first update in the balanced tree is the 115 * next change that we must push to this particular server. 116 */ 117 private final Map<Integer, DataServerHandler> connectedDSs = new ConcurrentHashMap<>(); 118 119 /** 120 * This map contains one ServerHandler for each replication servers with which 121 * we are connected (so normally all the replication servers) the first update 122 * in the balanced tree is the next change that we must push to this 123 * particular server. 124 */ 125 private final Map<Integer, ReplicationServerHandler> connectedRSs = new ConcurrentHashMap<>(); 126 127 private final ReplicationDomainDB domainDB; 128 /** The ReplicationServer that created the current instance. */ 129 private final ReplicationServer localReplicationServer; 130 131 /** 132 * The generationId of the current replication domain. The generationId is 133 * computed by hashing the first 1000 entries in the DB. 134 */ 135 private volatile long generationId = -1; 136 /** 137 * JNR, this is legacy code, hard to follow logic. I think what this field 138 * tries to say is: "is the generationId in use anywhere?", i.e. is there a 139 * replication topology in place? As soon as an answer to any of these 140 * question comes true, then it is set to true. 141 * <p> 142 * It looks like the only use of this field is to prevent the 143 * {@link #generationId} from being reset by 144 * {@link #resetGenerationIdIfPossible()}. 145 */ 146 private volatile boolean generationIdSavedStatus; 147 148 /** The tracer object for the debug logger. */ 149 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 150 151 /** 152 * The needed info for each received assured update message we are waiting 153 * acks for. 154 * <p> 155 * Key: a CSN matching a received update message which requested 156 * assured mode usage (either safe read or safe data mode) 157 * <p> 158 * Value: The object holding every info needed about the already received acks 159 * as well as the acks to be received. 160 * 161 * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub 162 * classes javadoc. 163 */ 164 private final Map<CSN, ExpectedAcksInfo> waitingAcks = new ConcurrentHashMap<>(); 165 166 /** 167 * The timer used to run the timeout code (timer tasks) for the assured update 168 * messages we are waiting acks for. 169 */ 170 private final Timer assuredTimeoutTimer; 171 /** 172 * Counter used to purge the timer tasks references in assuredTimeoutTimer, 173 * every n number of treated assured messages. 174 */ 175 private int assuredTimeoutTimerPurgeCounter; 176 177 178 179 /** 180 * Stores pending status messages such as DS change time heartbeats for future 181 * forwarding to the rest of the topology. This class is required in order to 182 * decouple inbound IO processing from outbound IO processing and avoid 183 * potential inter-process deadlocks. In particular, the {@code ServerReader} 184 * thread must not send messages. 185 */ 186 private static class PendingStatusMessages 187 { 188 private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats = new HashMap<>(1); 189 private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs = new HashMap<>(1); 190 private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs = new HashMap<>(1); 191 private boolean sendRSTopologyMsg; 192 private boolean sendDSTopologyMsg; 193 private int excludedDSForTopologyMsg = -1; 194 195 /** 196 * Enqueues a TopologyMsg for all the connected directory servers in order 197 * to let them know the topology (every known DSs and RSs). 198 * 199 * @param excludedDS 200 * If not null, the topology message will not be sent to this DS. 201 */ 202 private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS) 203 { 204 int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1; 205 if (sendDSTopologyMsg) 206 { 207 if (excludedServerId != excludedDSForTopologyMsg) 208 { 209 excludedDSForTopologyMsg = -1; 210 } 211 } 212 else 213 { 214 sendDSTopologyMsg = true; 215 excludedDSForTopologyMsg = excludedServerId; 216 } 217 } 218 219 /** 220 * Enqueues a TopologyMsg for all the connected replication servers in order 221 * to let them know our connected LDAP servers. 222 */ 223 private void enqueueTopoInfoToAllRSs() 224 { 225 sendRSTopologyMsg = true; 226 } 227 228 /** 229 * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to 230 * all other RS instances. 231 * 232 * @param msg 233 * The heartbeat message. 234 */ 235 private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg) 236 { 237 pendingHeartbeats.put(msg.getCSN().getServerId(), msg); 238 } 239 240 private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg) 241 { 242 pendingDSMonitorMsgs.put(dsServerId, msg); 243 } 244 245 private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg) 246 { 247 pendingRSMonitorMsgs.put(rsServerId, msg); 248 } 249 250 /** {@inheritDoc} */ 251 @Override 252 public String toString() 253 { 254 return getClass().getSimpleName() 255 + " pendingHeartbeats=" + pendingHeartbeats 256 + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs 257 + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs 258 + ", sendRSTopologyMsg=" + sendRSTopologyMsg 259 + ", sendDSTopologyMsg=" + sendDSTopologyMsg 260 + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg; 261 } 262 } 263 264 private final Object pendingStatusMessagesLock = new Object(); 265 266 @GuardedBy("pendingStatusMessagesLock") 267 private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages(); 268 269 /** 270 * Creates a new ReplicationServerDomain associated to the baseDN. 271 * 272 * @param baseDN 273 * The baseDN associated to the ReplicationServerDomain. 274 * @param localReplicationServer 275 * the ReplicationServer that created this instance. 276 */ 277 public ReplicationServerDomain(DN baseDN, 278 ReplicationServer localReplicationServer) 279 { 280 this.baseDN = baseDN; 281 this.localReplicationServer = localReplicationServer; 282 this.assuredTimeoutTimer = new Timer("Replication server RS(" 283 + localReplicationServer.getServerId() 284 + ") assured timer for domain \"" + baseDN + "\"", true); 285 this.domainDB = 286 localReplicationServer.getChangelogDB().getReplicationDomainDB(); 287 this.statusAnalyzer = new StatusAnalyzer(this); 288 this.statusAnalyzer.start(); 289 DirectoryServer.registerMonitorProvider(this); 290 } 291 292 /** 293 * Add an update that has been received to the list of 294 * updates that must be forwarded to all other servers. 295 * 296 * @param updateMsg The update that has been received. 297 * @param sourceHandler The ServerHandler for the server from which the 298 * update was received 299 * @throws IOException When an IO exception happens during the update 300 * processing. 301 */ 302 public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException 303 { 304 sourceHandler.updateServerState(updateMsg); 305 sourceHandler.incrementInCount(); 306 setGenerationIdIfUnset(sourceHandler.getGenerationId()); 307 308 /** 309 * If this is an assured message (a message requesting ack), we must 310 * construct the ExpectedAcksInfo object with the right number of expected 311 * acks before posting message to the writers. Otherwise some writers may 312 * have time to post, receive the ack and increment received ack counter 313 * (kept in ExpectedAcksInfo object) and we could think the acknowledgment 314 * is fully processed although it may be not (some other acks from other 315 * servers are not yet arrived). So for that purpose we do a pre-loop 316 * to determine to who we will post an assured message. 317 * Whether the assured mode is safe read or safe data, we anyway do not 318 * support the assured replication feature across topologies with different 319 * group ids. The assured feature insures assured replication based on the 320 * same locality (group id). For instance in double data center deployment 321 * (2 group id usage) with assured replication enabled, an assured message 322 * sent from data center 1 (group id = 1) will be sent to servers of both 323 * data centers, but one will request and wait acks only from servers of the 324 * data center 1. 325 */ 326 final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler); 327 328 if (!publishUpdateMsg(updateMsg)) 329 { 330 return; 331 } 332 333 final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo); 334 335 /** 336 * The update message equivalent to the originally received update message, 337 * but with assured flag disabled. This message is the one that should be 338 * sent to non eligible servers for assured mode. 339 * We need a clone like of the original message with assured flag off, to be 340 * posted to servers we don't want to wait the ack from (not normal status 341 * servers or servers with different group id). This must be done because 342 * the posted message is a reference so each writer queue gets the same 343 * reference, thus, changing the assured flag of an object is done for every 344 * references posted on every writer queues. That is why we need a message 345 * version with assured flag on and another one with assured flag off. 346 */ 347 final NotAssuredUpdateMsg notAssuredUpdateMsg = 348 preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null; 349 350 // Push the message to the replication servers 351 if (sourceHandler.isDataServer()) 352 { 353 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 354 { 355 /** 356 * Ignore updates to RS with bad gen id 357 * (no system managed status for a RS) 358 */ 359 if (!isDifferentGenerationId(rsHandler, updateMsg)) 360 { 361 addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); 362 } 363 } 364 } 365 366 // Push the message to the LDAP servers 367 for (DataServerHandler dsHandler : connectedDSs.values()) 368 { 369 // Do not forward the change to the server that just sent it 370 if (dsHandler != sourceHandler 371 && !isUpdateMsgFiltered(updateMsg, dsHandler)) 372 { 373 addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); 374 } 375 } 376 } 377 378 private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, 379 UpdateMsg updateMsg) 380 { 381 final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId()); 382 if (isDifferent && logger.isTraceEnabled()) 383 { 384 debug("updateMsg " + updateMsg.getCSN() 385 + " will not be sent to replication server " 386 + rsHandler.getServerId() + " with generation id " 387 + rsHandler.getGenerationId() + " different from local " 388 + "generation id " + generationId); 389 } 390 return isDifferent; 391 } 392 393 /** 394 * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS. 395 * <p> 396 * The RSD lock should not be taken here as it is acceptable to have a delay 397 * between the time the server has a wrong status and the fact we detect it: 398 * the updates that succeed to pass during this time will have no impact on 399 * remote server. But it is interesting to not saturate uselessly the network 400 * if the updates are not necessary so this check to stop sending updates is 401 * interesting anyway. Not taking the RSD lock allows to have better 402 * performances in normal mode (most of the time). 403 */ 404 private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler) 405 { 406 final ServerStatus dsStatus = dsHandler.getStatus(); 407 if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) 408 { 409 if (logger.isTraceEnabled()) 410 { 411 debug("updateMsg " + updateMsg.getCSN() 412 + " will not be sent to directory server " 413 + dsHandler.getServerId() + " with generation id " 414 + dsHandler.getGenerationId() + " different from local " 415 + "generation id " + generationId); 416 } 417 return true; 418 } 419 else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) 420 { 421 if (logger.isTraceEnabled()) 422 { 423 debug("updateMsg " + updateMsg.getCSN() 424 + " will not be sent to directory server " 425 + dsHandler.getServerId() + " as it is in full update"); 426 } 427 return true; 428 } 429 // Replica offline messages should not get to a connected DS, they are meant to be 430 // exchanged only between RSes 431 return updateMsg instanceof ReplicaOfflineMsg; 432 } 433 434 private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg, 435 ServerHandler sourceHandler) throws IOException 436 { 437 // Assured feature is supported starting from replication protocol V2 438 if (!updateMsg.isAssured() 439 || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2) 440 { 441 return null; 442 } 443 444 // According to assured sub-mode, prepare structures to keep track of 445 // the acks we are interested in. 446 switch (updateMsg.getAssuredMode()) 447 { 448 case SAFE_DATA_MODE: 449 sourceHandler.incrementAssuredSdReceivedUpdates(); 450 return processSafeDataUpdateMsg(updateMsg, sourceHandler); 451 452 case SAFE_READ_MODE: 453 sourceHandler.incrementAssuredSrReceivedUpdates(); 454 return processSafeReadUpdateMsg(updateMsg, sourceHandler); 455 456 default: 457 // Unknown assured mode: should never happen 458 logger.error(ERR_RS_UNKNOWN_ASSURED_MODE, 459 localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg); 460 return null; 461 } 462 } 463 464 private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo) 465 { 466 List<Integer> expectedServers = null; 467 if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null) 468 { 469 expectedServers = preparedAssuredInfo.expectedServers; 470 // Store the expected acks info into the global map. 471 // The code for processing reception of acks for this update will update 472 // info kept in this object and if enough acks received, it will send 473 // back the final ack to the requester and remove the object from this map 474 // OR 475 // The following timer will time out and send an timeout ack to the 476 // requester if the acks are not received in time. The timer will also 477 // remove the object from this map. 478 final CSN csn = updateMsg.getCSN(); 479 waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo); 480 481 // Arm timer for this assured update message (wait for acks until it times out) 482 final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn); 483 assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout()); 484 // Purge timer every 100 treated messages 485 assuredTimeoutTimerPurgeCounter++; 486 if ((assuredTimeoutTimerPurgeCounter % 100) == 0) 487 { 488 assuredTimeoutTimer.purge(); 489 } 490 } 491 492 return expectedServers != null ? expectedServers : Collections.<Integer> emptyList(); 493 } 494 495 private boolean publishUpdateMsg(UpdateMsg updateMsg) 496 { 497 try 498 { 499 if (updateMsg instanceof ReplicaOfflineMsg) 500 { 501 final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg; 502 this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN()); 503 return true; 504 } 505 506 if (this.domainDB.publishUpdateMsg(baseDN, updateMsg)) 507 { 508 /* 509 * JNR: Matt and I had a hard time figuring out where to put this 510 * synchronized block. We elected to put it here, but without a strong 511 * conviction. 512 */ 513 synchronized (generationIDLock) 514 { 515 /* 516 * JNR: I think the generationIdSavedStatus is set to true because 517 * method above created a ReplicaDB which assumes the generationId was 518 * communicated to another server. Hence setting true on this field 519 * prevent the generationId from being reset. 520 */ 521 generationIdSavedStatus = true; 522 } 523 } 524 return true; 525 } 526 catch (ChangelogException e) 527 { 528 /* 529 * Because of database problem we can't save any more changes from at 530 * least one LDAP server. This replicationServer therefore can't do it's 531 * job properly anymore and needs to close all its connections and 532 * shutdown itself. 533 */ 534 logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e)); 535 localReplicationServer.shutdown(); 536 return false; 537 } 538 } 539 540 private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg, 541 NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers) 542 { 543 // Assured mode: post an assured or not assured matching update message 544 // according to what has been computed for the destination server 545 if (notAssuredUpdateMsg != null 546 && !assuredServers.contains(sHandler.getServerId())) 547 { 548 sHandler.add(notAssuredUpdateMsg); 549 } 550 else 551 { 552 sHandler.add(updateMsg); 553 } 554 } 555 556 /** 557 * Helper class to be the return type of a method that processes a just 558 * received assured update message: 559 * - processSafeReadUpdateMsg 560 * - processSafeDataUpdateMsg 561 * This is a facility to pack many interesting returned object. 562 */ 563 private class PreparedAssuredInfo 564 { 565 /** 566 * The list of servers identified as servers we are interested in 567 * receiving acks from. If this list is not null, then expectedAcksInfo 568 * should be not null. 569 * Servers that are not in this list are servers not eligible for an ack 570 * request. 571 */ 572 public List<Integer> expectedServers; 573 574 /** 575 * The constructed ExpectedAcksInfo object to be used when acks will be 576 * received. Null if expectedServers is null. 577 */ 578 public ExpectedAcksInfo expectedAcksInfo; 579 } 580 581 /** 582 * Process a just received assured update message in Safe Read mode. If the 583 * ack can be sent immediately, it is done here. This will also determine to 584 * which suitable servers an ack should be requested from, and which ones are 585 * not eligible for an ack request. 586 * This method is an helper method for the put method. Have a look at the put 587 * method for a better understanding. 588 * @param update The just received assured update to process. 589 * @param sourceHandler The ServerHandler for the server from which the 590 * update was received 591 * @return A suitable PreparedAssuredInfo object that contains every needed 592 * info to proceed with post to server writers. 593 * @throws IOException When an IO exception happens during the update 594 * processing. 595 */ 596 private PreparedAssuredInfo processSafeReadUpdateMsg( 597 UpdateMsg update, ServerHandler sourceHandler) throws IOException 598 { 599 CSN csn = update.getCSN(); 600 byte groupId = localReplicationServer.getGroupId(); 601 byte sourceGroupId = sourceHandler.getGroupId(); 602 List<Integer> expectedServers = new ArrayList<>(); 603 List<Integer> wrongStatusServers = new ArrayList<>(); 604 605 if (sourceGroupId == groupId) 606 // Assured feature does not cross different group ids 607 { 608 if (sourceHandler.isDataServer()) 609 { 610 collectRSsEligibleForAssuredReplication(groupId, expectedServers); 611 } 612 613 // Look for DS eligible for assured 614 for (DataServerHandler dsHandler : connectedDSs.values()) 615 { 616 // Don't forward the change to the server that just sent it 617 if (dsHandler == sourceHandler) 618 { 619 continue; 620 } 621 if (dsHandler.getGroupId() == groupId) 622 // No ack expected from a DS with different group id 623 { 624 ServerStatus serverStatus = dsHandler.getStatus(); 625 if (serverStatus == ServerStatus.NORMAL_STATUS) 626 { 627 expectedServers.add(dsHandler.getServerId()); 628 } else if (serverStatus == ServerStatus.DEGRADED_STATUS) { 629 // No ack expected from a DS with wrong status 630 wrongStatusServers.add(dsHandler.getServerId()); 631 } 632 /* 633 * else 634 * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: 635 * We do not want this to be reported as an error to the update 636 * maker -> no pollution or potential misunderstanding when 637 * reading logs or monitoring and it was just administration (for 638 * instance new server is being configured in topo: it goes in bad 639 * gen then full update). 640 */ 641 } 642 } 643 } 644 645 // Return computed structures 646 PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); 647 if (!expectedServers.isEmpty()) 648 { 649 // Some other acks to wait for 650 preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn, 651 sourceHandler, expectedServers, wrongStatusServers); 652 preparedAssuredInfo.expectedServers = expectedServers; 653 } 654 655 if (preparedAssuredInfo.expectedServers == null) 656 { 657 // No eligible servers found, send the ack immediately 658 sourceHandler.send(new AckMsg(csn)); 659 } 660 661 return preparedAssuredInfo; 662 } 663 664 /** 665 * Process a just received assured update message in Safe Data mode. If the 666 * ack can be sent immediately, it is done here. This will also determine to 667 * which suitable servers an ack should be requested from, and which ones are 668 * not eligible for an ack request. 669 * This method is an helper method for the put method. Have a look at the put 670 * method for a better understanding. 671 * @param update The just received assured update to process. 672 * @param sourceHandler The ServerHandler for the server from which the 673 * update was received 674 * @return A suitable PreparedAssuredInfo object that contains every needed 675 * info to proceed with post to server writers. 676 * @throws IOException When an IO exception happens during the update 677 * processing. 678 */ 679 private PreparedAssuredInfo processSafeDataUpdateMsg( 680 UpdateMsg update, ServerHandler sourceHandler) throws IOException 681 { 682 CSN csn = update.getCSN(); 683 boolean interestedInAcks = false; 684 byte safeDataLevel = update.getSafeDataLevel(); 685 byte groupId = localReplicationServer.getGroupId(); 686 byte sourceGroupId = sourceHandler.getGroupId(); 687 if (safeDataLevel < (byte) 1) 688 { 689 // Should never happen 690 logger.error(ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL, 691 localReplicationServer.getServerId(), safeDataLevel, baseDN, update); 692 } else if (sourceGroupId == groupId 693 // Assured feature does not cross different group IDS 694 && isSameGenerationId(sourceHandler.getGenerationId())) 695 // Ignore assured updates from wrong generationId servers 696 { 697 if (sourceHandler.isDataServer()) 698 { 699 if (safeDataLevel == (byte) 1) 700 { 701 /** 702 * Immediately return the ack for an assured message in safe data 703 * mode with safe data level 1, coming from a DS. No need to wait 704 * for more acks 705 */ 706 sourceHandler.send(new AckMsg(csn)); 707 } else 708 { 709 /** 710 * level > 1 : We need further acks 711 * The message will be posted in assured mode to eligible 712 * servers. The embedded safe data level is not changed, and his 713 * value will be used by a remote RS to determine if he must send 714 * an ack (level > 1) or not (level = 1) 715 */ 716 interestedInAcks = true; 717 } 718 } else 719 { // A RS sent us the safe data message, for sure no further ack to wait 720 /** 721 * Level 1 has already been reached so no further acks to wait. 722 * Just deal with level > 1 723 */ 724 if (safeDataLevel > (byte) 1) 725 { 726 sourceHandler.send(new AckMsg(csn)); 727 } 728 } 729 } 730 731 List<Integer> expectedServers = new ArrayList<>(); 732 if (interestedInAcks && sourceHandler.isDataServer()) 733 { 734 collectRSsEligibleForAssuredReplication(groupId, expectedServers); 735 } 736 737 // Return computed structures 738 PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); 739 int nExpectedServers = expectedServers.size(); 740 if (interestedInAcks) // interestedInAcks so level > 1 741 { 742 if (nExpectedServers > 0) 743 { 744 // Some other acks to wait for 745 int sdl = update.getSafeDataLevel(); 746 int neededAdditionalServers = sdl - 1; 747 // Change the number of expected acks if not enough available eligible 748 // servers: the level is a best effort thing, we do not want to timeout 749 // at every assured SD update for instance if a RS has had his gen id 750 // reseted 751 byte finalSdl = (nExpectedServers >= neededAdditionalServers) ? 752 (byte)sdl : // Keep level as it was 753 (byte)(nExpectedServers+1); // Change level to match what's available 754 preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn, 755 sourceHandler, finalSdl, expectedServers); 756 preparedAssuredInfo.expectedServers = expectedServers; 757 } else 758 { 759 // level > 1 and source is a DS but no eligible servers found, send the 760 // ack immediately 761 sourceHandler.send(new AckMsg(csn)); 762 } 763 } 764 765 return preparedAssuredInfo; 766 } 767 768 private void collectRSsEligibleForAssuredReplication(byte groupId, 769 List<Integer> expectedServers) 770 { 771 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 772 { 773 if (rsHandler.getGroupId() == groupId 774 // No ack expected from a RS with different group id 775 && isSameGenerationId(rsHandler.getGenerationId()) 776 // No ack expected from a RS with bad gen id 777 ) 778 { 779 expectedServers.add(rsHandler.getServerId()); 780 } 781 } 782 } 783 784 private boolean isSameGenerationId(long generationId) 785 { 786 return this.generationId > 0 && this.generationId == generationId; 787 } 788 789 private boolean isDifferentGenerationId(long generationId) 790 { 791 return this.generationId > 0 && this.generationId != generationId; 792 } 793 794 /** 795 * Process an ack received from a given server. 796 * 797 * @param ack The ack message received. 798 * @param ackingServer The server handler of the server that sent the ack. 799 */ 800 void processAck(AckMsg ack, ServerHandler ackingServer) 801 { 802 // Retrieve the expected acks info for the update matching the original 803 // sent update. 804 CSN csn = ack.getCSN(); 805 ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); 806 807 if (expectedAcksInfo != null) 808 { 809 // Prevent concurrent access from processAck() or AssuredTimeoutTask.run() 810 synchronized (expectedAcksInfo) 811 { 812 if (expectedAcksInfo.isCompleted()) 813 { 814 // Timeout code is sending a timeout ack, do nothing and let him 815 // remove object from the map 816 return; 817 } 818 /** 819 * 820 * If this is the last ack we were waiting from, immediately create and 821 * send the final ack to the original server 822 */ 823 if (expectedAcksInfo.processReceivedAck(ackingServer, ack)) 824 { 825 // Remove the object from the map as no more needed 826 waitingAcks.remove(csn); 827 AckMsg finalAck = expectedAcksInfo.createAck(false); 828 ServerHandler origServer = expectedAcksInfo.getRequesterServer(); 829 try 830 { 831 origServer.send(finalAck); 832 } catch (IOException e) 833 { 834 /** 835 * An error happened trying the send back an ack to the server. 836 * Log an error and close the connection to this server. 837 */ 838 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 839 mb.append(ERR_RS_ERROR_SENDING_ACK.get( 840 localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN)); 841 mb.append(" "); 842 mb.append(stackTraceToSingleLineString(e)); 843 logger.error(mb.toMessage()); 844 stopServer(origServer, false); 845 } 846 // Mark the ack info object as completed to prevent potential timeout 847 // code parallel run 848 expectedAcksInfo.completed(); 849 } 850 } 851 } 852 /* Else the timeout occurred for the update matching this CSN 853 * and the ack with timeout error has probably already been sent. 854 */ 855 } 856 857 /** 858 * The code run when the timeout occurs while waiting for acks of the 859 * eligible servers. This basically sends a timeout ack (with any additional 860 * error info) to the original server that sent an assured update message. 861 */ 862 private class AssuredTimeoutTask extends TimerTask 863 { 864 private CSN csn; 865 866 /** 867 * Constructor for the timer task. 868 * @param csn The CSN of the assured update we are waiting acks for 869 */ 870 public AssuredTimeoutTask(CSN csn) 871 { 872 this.csn = csn; 873 } 874 875 /** 876 * Run when the assured timeout for an assured update message we are waiting 877 * acks for occurs. 878 */ 879 @Override 880 public void run() 881 { 882 ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); 883 884 if (expectedAcksInfo != null) 885 { 886 synchronized (expectedAcksInfo) 887 { 888 if (expectedAcksInfo.isCompleted()) 889 { 890 // processAck() code is sending the ack, do nothing and let him 891 // remove object from the map 892 return; 893 } 894 // Remove the object from the map as no more needed 895 waitingAcks.remove(csn); 896 // Create the timeout ack and send him to the server the assured 897 // update message came from 898 AckMsg finalAck = expectedAcksInfo.createAck(true); 899 ServerHandler origServer = expectedAcksInfo.getRequesterServer(); 900 if (logger.isTraceEnabled()) 901 { 902 debug("sending timeout for assured update with CSN " + csn 903 + " to serverId=" + origServer.getServerId()); 904 } 905 try 906 { 907 origServer.send(finalAck); 908 } catch (IOException e) 909 { 910 /** 911 * An error happened trying the send back an ack to the server. 912 * Log an error and close the connection to this server. 913 */ 914 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 915 mb.append(ERR_RS_ERROR_SENDING_ACK.get( 916 localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN)); 917 mb.append(" "); 918 mb.append(stackTraceToSingleLineString(e)); 919 logger.error(mb.toMessage()); 920 stopServer(origServer, false); 921 } 922 // Increment assured counters 923 boolean safeRead = 924 expectedAcksInfo instanceof SafeReadExpectedAcksInfo; 925 if (safeRead) 926 { 927 origServer.incrementAssuredSrReceivedUpdatesTimeout(); 928 } else 929 { 930 if (origServer.isDataServer()) 931 { 932 origServer.incrementAssuredSdReceivedUpdatesTimeout(); 933 } 934 } 935 // retrieve expected servers in timeout to increment their counter 936 List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers(); 937 for (Integer serverId : serversInTimeout) 938 { 939 ServerHandler expectedDSInTimeout = connectedDSs.get(serverId); 940 ServerHandler expectedRSInTimeout = connectedRSs.get(serverId); 941 if (expectedDSInTimeout != null) 942 { 943 if (safeRead) 944 { 945 expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout(); 946 } // else no SD update sent to a DS (meaningless) 947 } else if (expectedRSInTimeout != null) 948 { 949 if (safeRead) 950 { 951 expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout(); 952 } 953 else 954 { 955 expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout(); 956 } 957 } 958 // else server disappeared ? Let's forget about it. 959 } 960 // Mark the ack info object as completed to prevent potential 961 // processAck() code parallel run 962 expectedAcksInfo.completed(); 963 } 964 } 965 } 966 } 967 968 969 /** 970 * Stop operations with a list of replication servers. 971 * 972 * @param serversToDisconnect 973 * the replication servers addresses for which we want to stop 974 * operations 975 */ 976 public void stopReplicationServers(Collection<HostPort> serversToDisconnect) 977 { 978 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 979 { 980 if (serversToDisconnect.contains( 981 HostPort.valueOf(rsHandler.getServerAddressURL()))) 982 { 983 stopServer(rsHandler, false); 984 } 985 } 986 } 987 988 /** 989 * Stop operations with all servers this domain is connected with (RS and DS). 990 * 991 * @param shutdown A boolean indicating if the stop is due to a 992 * shutdown condition. 993 */ 994 public void stopAllServers(boolean shutdown) 995 { 996 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 997 { 998 stopServer(rsHandler, shutdown); 999 } 1000 1001 for (DataServerHandler dsHandler : connectedDSs.values()) 1002 { 1003 stopServer(dsHandler, shutdown); 1004 } 1005 } 1006 1007 /** 1008 * Checks whether it is already connected to a DS with same id. 1009 * 1010 * @param dsHandler 1011 * the DS we want to check 1012 * @return true if this DS is already connected to the current server 1013 */ 1014 public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler) 1015 { 1016 if (connectedDSs.containsKey(dsHandler.getServerId())) 1017 { 1018 // looks like two connected LDAP servers have the same serverId 1019 logger.error(ERR_DUPLICATE_SERVER_ID, localReplicationServer.getMonitorInstanceName(), 1020 connectedDSs.get(dsHandler.getServerId()), dsHandler, dsHandler.getServerId()); 1021 return true; 1022 } 1023 return false; 1024 } 1025 1026 /** 1027 * Stop operations with a given server. 1028 * 1029 * @param sHandler the server for which we want to stop operations. 1030 * @param shutdown A boolean indicating if the stop is due to a 1031 * shutdown condition. 1032 */ 1033 public void stopServer(ServerHandler sHandler, boolean shutdown) 1034 { 1035 // TODO JNR merge with stopServer(MessageHandler) 1036 if (logger.isTraceEnabled()) 1037 { 1038 debug("stopServer() on the server handler " + sHandler); 1039 } 1040 /* 1041 * We must prevent deadlock on replication server domain lock, when for 1042 * instance this code is called from dying ServerReader but also dying 1043 * ServerWriter at the same time, or from a thread that wants to shut down 1044 * the handler. So use a thread safe flag to know if the job must be done 1045 * or not (is already being processed or not). 1046 */ 1047 if (!sHandler.engageShutdown()) 1048 // Only do this once (prevent other thread to enter here again) 1049 { 1050 if (!shutdown) 1051 { 1052 try 1053 { 1054 // Acquire lock on domain (see more details in comment of start() 1055 // method of ServerHandler) 1056 lock(); 1057 } 1058 catch (InterruptedException ex) 1059 { 1060 // We can't deal with this here, so re-interrupt thread so that it is 1061 // caught during subsequent IO. 1062 Thread.currentThread().interrupt(); 1063 return; 1064 } 1065 } 1066 1067 try 1068 { 1069 // Stop useless monitoring publisher if no more RS or DS in domain 1070 if ( (connectedDSs.size() + connectedRSs.size() )== 1) 1071 { 1072 if (logger.isTraceEnabled()) 1073 { 1074 debug("remote server " + sHandler 1075 + " is the last RS/DS to be stopped:" 1076 + " stopping monitoring publisher"); 1077 } 1078 stopMonitoringPublisher(); 1079 } 1080 1081 if (connectedRSs.containsKey(sHandler.getServerId())) 1082 { 1083 unregisterServerHandler(sHandler, shutdown, false); 1084 } 1085 else if (connectedDSs.containsKey(sHandler.getServerId())) 1086 { 1087 unregisterServerHandler(sHandler, shutdown, true); 1088 } 1089 } 1090 catch(Exception e) 1091 { 1092 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1093 } 1094 finally 1095 { 1096 if (!shutdown) 1097 { 1098 release(); 1099 } 1100 } 1101 } 1102 } 1103 1104 private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, 1105 boolean isDirectoryServer) 1106 { 1107 unregisterServerHandler(sHandler); 1108 sHandler.shutdown(); 1109 1110 resetGenerationIdIfPossible(); 1111 if (!shutdown) 1112 { 1113 synchronized (pendingStatusMessagesLock) 1114 { 1115 if (isDirectoryServer) 1116 { 1117 // Update the remote replication servers with our list 1118 // of connected LDAP servers 1119 pendingStatusMessages.enqueueTopoInfoToAllRSs(); 1120 } 1121 // Warn our DSs that a RS or DS has quit (does not use this 1122 // handler as already removed from list) 1123 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); 1124 } 1125 statusAnalyzer.notifyPendingStatusMessage(); 1126 } 1127 } 1128 1129 /** 1130 * Unregister this handler from the list of handlers registered to this 1131 * domain. 1132 * @param sHandler the provided handler to unregister. 1133 */ 1134 private void unregisterServerHandler(ServerHandler sHandler) 1135 { 1136 if (sHandler.isReplicationServer()) 1137 { 1138 connectedRSs.remove(sHandler.getServerId()); 1139 } 1140 else 1141 { 1142 connectedDSs.remove(sHandler.getServerId()); 1143 } 1144 } 1145 1146 /** 1147 * This method resets the generationId for this domain if there is no LDAP 1148 * server currently connected in the whole topology on this domain and if the 1149 * generationId has never been saved. 1150 * <ul> 1151 * <li>test emptiness of {@link #connectedDSs} list</li> 1152 * <li>traverse {@link #connectedRSs} list and test for each if DS are 1153 * connected</li> 1154 * </ul> 1155 * So it strongly relies on the {@link #connectedDSs} list 1156 */ 1157 private void resetGenerationIdIfPossible() 1158 { 1159 if (logger.isTraceEnabled()) 1160 { 1161 debug("mayResetGenerationId generationIdSavedStatus=" 1162 + generationIdSavedStatus); 1163 } 1164 1165 // If there is no more any LDAP server connected to this domain in the 1166 // topology and the generationId has never been saved, then we can reset 1167 // it and the next LDAP server to connect will become the new reference. 1168 boolean ldapServersConnectedInTheTopology = false; 1169 if (connectedDSs.isEmpty()) 1170 { 1171 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1172 { 1173 if (generationId != rsHandler.getGenerationId()) 1174 { 1175 if (logger.isTraceEnabled()) 1176 { 1177 debug("mayResetGenerationId skip RS " + rsHandler 1178 + " that has different genId"); 1179 } 1180 } 1181 else if (rsHandler.hasRemoteLDAPServers()) 1182 { 1183 ldapServersConnectedInTheTopology = true; 1184 1185 if (logger.isTraceEnabled()) 1186 { 1187 debug("mayResetGenerationId RS " + rsHandler 1188 + " has ldap servers connected to it" 1189 + " - will not reset generationId"); 1190 } 1191 break; 1192 } 1193 } 1194 } 1195 else 1196 { 1197 ldapServersConnectedInTheTopology = true; 1198 1199 if (logger.isTraceEnabled()) 1200 { 1201 debug("has ldap servers connected to it - will not reset generationId"); 1202 } 1203 } 1204 1205 if (!ldapServersConnectedInTheTopology 1206 && !generationIdSavedStatus 1207 && generationId != -1) 1208 { 1209 changeGenerationId(-1); 1210 } 1211 } 1212 1213 /** 1214 * Checks whether a remote RS is already connected to this hosting RS. 1215 * 1216 * @param rsHandler 1217 * The handler for the remote RS. 1218 * @return flag specifying whether the remote RS is already connected. 1219 * @throws DirectoryException 1220 * when a problem occurs. 1221 */ 1222 public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler) 1223 throws DirectoryException 1224 { 1225 ReplicationServerHandler oldRsHandler = 1226 connectedRSs.get(rsHandler.getServerId()); 1227 if (oldRsHandler == null) 1228 { 1229 return false; 1230 } 1231 1232 if (oldRsHandler.getServerAddressURL().equals( 1233 rsHandler.getServerAddressURL())) 1234 { 1235 // this is the same server, this means that our ServerStart messages 1236 // have been sent at about the same time and 2 connections 1237 // have been established. 1238 // Silently drop this connection. 1239 return true; 1240 } 1241 1242 // looks like two replication servers have the same serverId 1243 // log an error message and drop this connection. 1244 LocalizableMessage message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( 1245 localReplicationServer.getMonitorInstanceName(), 1246 oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(), 1247 rsHandler.getServerId()); 1248 throw new DirectoryException(ResultCode.OTHER, message); 1249 } 1250 1251 /** 1252 * Creates and returns a cursor across this replication domain. 1253 * <p> 1254 * Client code must call {@link DBCursor#next()} to advance the cursor to the 1255 * next available record. 1256 * <p> 1257 * When the cursor is not used anymore, client code MUST call the 1258 * {@link DBCursor#close()} method to free the resources and locks used by the 1259 * cursor. 1260 * 1261 * @param startAfterServerState 1262 * Starting point for the replicaDB cursors. If null, start from the 1263 * oldest CSN 1264 * @return a non null {@link DBCursor} going from oldest to newest CSN 1265 * @throws ChangelogException 1266 * If a database problem happened 1267 * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, CursorOptions) 1268 */ 1269 public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) 1270 throws ChangelogException 1271 { 1272 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); 1273 return domainDB.getCursorFrom(baseDN, startAfterServerState, options); 1274 } 1275 1276 /** 1277 * Get the baseDN. 1278 * 1279 * @return Returns the baseDN. 1280 */ 1281 public DN getBaseDN() 1282 { 1283 return baseDN; 1284 } 1285 1286 /** 1287 * Retrieves the destination handlers for a routable message. 1288 * 1289 * @param msg The message to route. 1290 * @param senderHandler The handler of the server that published this message. 1291 * @return The list of destination handlers. 1292 */ 1293 private List<ServerHandler> getDestinationServers(RoutableMsg msg, 1294 ServerHandler senderHandler) 1295 { 1296 List<ServerHandler> servers = new ArrayList<>(); 1297 1298 if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER) 1299 { 1300 // TODO Import from the "closest server" to be implemented 1301 } else if (msg.getDestination() == RoutableMsg.ALL_SERVERS) 1302 { 1303 if (!senderHandler.isReplicationServer()) 1304 { 1305 // Send to all replication servers with a least one remote 1306 // server connected 1307 for (ReplicationServerHandler rsh : connectedRSs.values()) 1308 { 1309 if (rsh.hasRemoteLDAPServers()) 1310 { 1311 servers.add(rsh); 1312 } 1313 } 1314 } 1315 1316 // Sends to all connected LDAP servers 1317 for (DataServerHandler destinationHandler : connectedDSs.values()) 1318 { 1319 // Don't loop on the sender 1320 if (destinationHandler == senderHandler) 1321 { 1322 continue; 1323 } 1324 servers.add(destinationHandler); 1325 } 1326 } else 1327 { 1328 // Destination is one server 1329 DataServerHandler destinationHandler = 1330 connectedDSs.get(msg.getDestination()); 1331 if (destinationHandler != null) 1332 { 1333 servers.add(destinationHandler); 1334 } else 1335 { 1336 // the targeted server is NOT connected 1337 // Let's search for the replication server that MAY 1338 // have the targeted server connected. 1339 if (senderHandler.isDataServer()) 1340 { 1341 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1342 { 1343 // Send to all replication servers with a least one remote 1344 // server connected 1345 if (rsHandler.isRemoteLDAPServer(msg.getDestination())) 1346 { 1347 servers.add(rsHandler); 1348 } 1349 } 1350 } 1351 } 1352 } 1353 return servers; 1354 } 1355 1356 1357 1358 /** 1359 * Processes a message coming from one server in the topology and potentially 1360 * forwards it to one or all other servers. 1361 * 1362 * @param msg 1363 * The message received and to be processed. 1364 * @param sender 1365 * The server handler of the server that sent the message. 1366 */ 1367 void process(RoutableMsg msg, ServerHandler sender) 1368 { 1369 if (msg.getDestination() == localReplicationServer.getServerId()) 1370 { 1371 // Handle routable messages targeted at this RS. 1372 if (msg instanceof ErrorMsg) 1373 { 1374 ErrorMsg errorMsg = (ErrorMsg) msg; 1375 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 1376 } 1377 else 1378 { 1379 replyWithUnroutableMsgType(sender, msg); 1380 } 1381 } 1382 else 1383 { 1384 // Forward message not destined for this RS. 1385 List<ServerHandler> servers = getDestinationServers(msg, sender); 1386 if (!servers.isEmpty()) 1387 { 1388 forwardMsgToAllServers(msg, servers, sender); 1389 } 1390 else 1391 { 1392 replyWithUnreachablePeerMsg(sender, msg); 1393 } 1394 } 1395 } 1396 1397 /** 1398 * Responds to a monitor request message. 1399 * 1400 * @param msg 1401 * The monitor request message. 1402 * @param sender 1403 * The DS/RS which sent the monitor request. 1404 */ 1405 void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender) 1406 { 1407 enqueueMonitorMsg(msg, sender); 1408 } 1409 1410 /** 1411 * Responds to a monitor message. 1412 * 1413 * @param msg 1414 * The monitor message 1415 * @param sender 1416 * The DS/RS which sent the monitor. 1417 */ 1418 void processMonitorMsg(MonitorMsg msg, ServerHandler sender) 1419 { 1420 domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId()); 1421 } 1422 1423 private void replyWithUnroutableMsgType(ServerHandler msgEmitter, 1424 RoutableMsg msg) 1425 { 1426 String msgClassname = msg.getClass().getCanonicalName(); 1427 logger.info(NOTE_ERR_ROUTING_TO_SERVER, msgClassname); 1428 1429 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1430 mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); 1431 mb.append("serverID:").append(msg.getDestination()); 1432 ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage()); 1433 try 1434 { 1435 msgEmitter.send(errMsg); 1436 } 1437 catch (IOException ignored) 1438 { 1439 // an error happened on the sender session trying to recover 1440 // from an error on the receiver session. 1441 // Not much more we can do at this point. 1442 } 1443 } 1444 1445 private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter, 1446 RoutableMsg msg) 1447 { 1448 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1449 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination())); 1450 mb.append(" In Replication Server=").append( 1451 this.localReplicationServer.getMonitorInstanceName()); 1452 mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); 1453 mb.append(" Details:routing table is empty"); 1454 final LocalizableMessage message = mb.toMessage(); 1455 logger.error(message); 1456 1457 ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(), 1458 msg.getSenderID(), message); 1459 try 1460 { 1461 msgEmitter.send(errMsg); 1462 } 1463 catch (IOException ignored) 1464 { 1465 // TODO Handle error properly (sender timeout in addition) 1466 /* 1467 * An error happened trying to send an error msg to this server. 1468 * Log an error and close the connection to this server. 1469 */ 1470 logger.error(ERR_CHANGELOG_ERROR_SENDING_ERROR, this, ignored); 1471 stopServer(msgEmitter, false); 1472 } 1473 } 1474 1475 private void forwardMsgToAllServers(RoutableMsg msg, 1476 List<ServerHandler> servers, ServerHandler sender) 1477 { 1478 for (ServerHandler targetHandler : servers) 1479 { 1480 try 1481 { 1482 targetHandler.send(msg); 1483 } catch (IOException ioe) 1484 { 1485 /* 1486 * An error happened trying to send a routable message to its 1487 * destination server. 1488 * Send back an error to the originator of the message. 1489 */ 1490 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1491 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination())); 1492 mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); 1493 mb.append(" Details: ").append(ioe.getLocalizedMessage()); 1494 final LocalizableMessage message = mb.toMessage(); 1495 logger.error(message); 1496 1497 ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message); 1498 try 1499 { 1500 sender.send(errMsg); 1501 } catch (IOException ioe1) 1502 { 1503 // an error happened on the sender session trying to recover 1504 // from an error on the receiver session. 1505 // We don't have much solution left beside closing the sessions. 1506 stopServer(sender, false); 1507 stopServer(targetHandler, false); 1508 } 1509 // TODO Handle error properly (sender timeout in addition) 1510 } 1511 } 1512 } 1513 1514 /** 1515 * Creates a new monitor message including monitoring information for the 1516 * whole topology. 1517 * 1518 * @param sender 1519 * The sender of this message. 1520 * @param destination 1521 * The destination of this message. 1522 * @return The newly created and filled MonitorMsg. Null if a problem occurred 1523 * during message creation. 1524 * @throws InterruptedException 1525 * if this thread is interrupted while waiting for a response 1526 */ 1527 public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination) 1528 throws InterruptedException 1529 { 1530 return createGlobalTopologyMonitorMsg(sender, destination, 1531 domainMonitor.recomputeMonitorData()); 1532 } 1533 1534 private MonitorMsg createGlobalTopologyMonitorMsg(int sender, 1535 int destination, ReplicationDomainMonitorData monitorData) 1536 { 1537 final MonitorMsg returnMsg = new MonitorMsg(sender, destination); 1538 returnMsg.setReplServerDbState(getLatestServerState()); 1539 1540 // Add the server state for each DS and RS currently in the topology. 1541 for (int replicaId : toIterable(monitorData.ldapIterator())) 1542 { 1543 returnMsg.setServerState(replicaId, 1544 monitorData.getLDAPServerState(replicaId), 1545 monitorData.getApproxFirstMissingDate(replicaId), true); 1546 } 1547 1548 for (int replicaId : toIterable(monitorData.rsIterator())) 1549 { 1550 returnMsg.setServerState(replicaId, 1551 monitorData.getRSStates(replicaId), 1552 monitorData.getRSApproxFirstMissingDate(replicaId), false); 1553 } 1554 1555 return returnMsg; 1556 } 1557 1558 1559 1560 /** 1561 * Creates a new monitor message including monitoring information for the 1562 * topology directly connected to this RS. This includes information for: - 1563 * local RS - all direct DSs - all direct RSs 1564 * 1565 * @param sender 1566 * The sender of this message. 1567 * @param destination 1568 * The destination of this message. 1569 * @return The newly created and filled MonitorMsg. Null if the current thread 1570 * was interrupted while attempting to get the domain lock. 1571 */ 1572 private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) 1573 { 1574 final MonitorMsg monitorMsg = new MonitorMsg(sender, destination); 1575 monitorMsg.setReplServerDbState(getLatestServerState()); 1576 1577 // Add the server state for each connected DS and RS. 1578 for (DataServerHandler dsHandler : this.connectedDSs.values()) 1579 { 1580 monitorMsg.setServerState(dsHandler.getServerId(), 1581 dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(), 1582 true); 1583 } 1584 1585 for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) 1586 { 1587 monitorMsg.setServerState(rsHandler.getServerId(), 1588 rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(), 1589 false); 1590 } 1591 return monitorMsg; 1592 } 1593 1594 /** 1595 * Shutdown this ReplicationServerDomain. 1596 */ 1597 public void shutdown() 1598 { 1599 DirectoryServer.deregisterMonitorProvider(this); 1600 1601 // Terminate the assured timer 1602 assuredTimeoutTimer.cancel(); 1603 1604 stopAllServers(true); 1605 statusAnalyzer.shutdown(); 1606 } 1607 1608 /** 1609 * Returns the latest most current ServerState describing the newest CSNs for 1610 * each server in this domain. 1611 * 1612 * @return The ServerState describing the newest CSNs for each server in in 1613 * this domain. 1614 */ 1615 public ServerState getLatestServerState() 1616 { 1617 return domainDB.getDomainNewestCSNs(baseDN); 1618 } 1619 1620 /** {@inheritDoc} */ 1621 @Override 1622 public String toString() 1623 { 1624 return "ReplicationServerDomain " + baseDN; 1625 } 1626 1627 1628 1629 /** 1630 * Creates a TopologyMsg filled with information to be sent to a remote RS. 1631 * We send remote RS the info of every DS that are directly connected to us 1632 * plus our own info as RS. 1633 * @return A suitable TopologyMsg PDU to be sent to a peer RS 1634 */ 1635 public TopologyMsg createTopologyMsgForRS() 1636 { 1637 List<DSInfo> dsInfos = new ArrayList<>(); 1638 for (DataServerHandler dsHandler : connectedDSs.values()) 1639 { 1640 dsInfos.add(dsHandler.toDSInfo()); 1641 } 1642 1643 // Create info for the local RS 1644 List<RSInfo> rsInfos = newArrayList(toRSInfo(localReplicationServer, generationId)); 1645 1646 return new TopologyMsg(dsInfos, rsInfos); 1647 } 1648 1649 /** 1650 * Creates a TopologyMsg filled with information to be sent to a DS. 1651 * We send remote DS the info of every known DS and RS in the topology (our 1652 * directly connected DSs plus the DSs connected to other RSs) except himself. 1653 * Also put info related to local RS. 1654 * 1655 * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and 1656 * that we must not include in the DS list. 1657 * @return A suitable TopologyMsg PDU to be sent to a peer DS 1658 */ 1659 public TopologyMsg createTopologyMsgForDS(int destDsId) 1660 { 1661 // Go through every DSs (except recipient of msg) 1662 List<DSInfo> dsInfos = new ArrayList<>(); 1663 for (DataServerHandler dsHandler : connectedDSs.values()) 1664 { 1665 if (dsHandler.getServerId() == destDsId) 1666 { 1667 continue; 1668 } 1669 dsInfos.add(dsHandler.toDSInfo()); 1670 } 1671 1672 1673 List<RSInfo> rsInfos = new ArrayList<>(); 1674 // Add our own info (local RS) 1675 rsInfos.add(toRSInfo(localReplicationServer, generationId)); 1676 1677 // Go through every peer RSs (and get their connected DSs), also add info 1678 // for RSs 1679 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1680 { 1681 rsInfos.add(rsHandler.toRSInfo()); 1682 1683 rsHandler.addDSInfos(dsInfos); 1684 } 1685 1686 return new TopologyMsg(dsInfos, rsInfos); 1687 } 1688 1689 private RSInfo toRSInfo(ReplicationServer rs, long generationId) 1690 { 1691 return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId, 1692 rs.getGroupId(), rs.getWeight()); 1693 } 1694 1695 /** 1696 * Get the generationId associated to this domain. 1697 * 1698 * @return The generationId 1699 */ 1700 public long getGenerationId() 1701 { 1702 return generationId; 1703 } 1704 1705 /** 1706 * Initialize the value of the generationID for this ReplicationServerDomain. 1707 * This method is intended to be used for initialization at startup and 1708 * simply stores the new value without any additional processing. 1709 * For example it does not clear the change-log DBs 1710 * 1711 * @param generationId The new value of generationId. 1712 */ 1713 public void initGenerationID(long generationId) 1714 { 1715 synchronized (generationIDLock) 1716 { 1717 this.generationId = generationId; 1718 this.generationIdSavedStatus = true; 1719 } 1720 } 1721 1722 /** 1723 * Sets the provided value as the new in memory generationId. 1724 * Also clear the changelog databases. 1725 * 1726 * @param generationId The new value of generationId. 1727 * @return The old generation id 1728 */ 1729 public long changeGenerationId(long generationId) 1730 { 1731 synchronized (generationIDLock) 1732 { 1733 long oldGenerationId = this.generationId; 1734 1735 if (this.generationId != generationId) 1736 { 1737 clearDbs(); 1738 1739 this.generationId = generationId; 1740 this.generationIdSavedStatus = false; 1741 } 1742 return oldGenerationId; 1743 } 1744 } 1745 1746 /** 1747 * Resets the generationID. 1748 * 1749 * @param senderHandler The handler associated to the server 1750 * that requested to reset the generationId. 1751 * @param genIdMsg The reset generation ID msg received. 1752 */ 1753 public void resetGenerationId(ServerHandler senderHandler, 1754 ResetGenerationIdMsg genIdMsg) 1755 { 1756 if (logger.isTraceEnabled()) 1757 { 1758 debug("Receiving ResetGenerationIdMsg from " 1759 + senderHandler.getServerId() + ":\n" + genIdMsg); 1760 } 1761 1762 try 1763 { 1764 // Acquire lock on domain (see more details in comment of start() method 1765 // of ServerHandler) 1766 lock(); 1767 } 1768 catch (InterruptedException ex) 1769 { 1770 // We can't deal with this here, so re-interrupt thread so that it is 1771 // caught during subsequent IO. 1772 Thread.currentThread().interrupt(); 1773 return; 1774 } 1775 1776 try 1777 { 1778 final long newGenId = genIdMsg.getGenerationId(); 1779 if (newGenId != this.generationId) 1780 { 1781 changeGenerationId(newGenId); 1782 } 1783 else 1784 { 1785 // Order to take a gen id we already have, just ignore 1786 if (logger.isTraceEnabled()) 1787 { 1788 debug("Reset generation id requested but generationId was already " 1789 + this.generationId + ":\n" + genIdMsg); 1790 } 1791 } 1792 1793 // If we are the first replication server warned, 1794 // then forwards the reset message to the remote replication servers 1795 for (ServerHandler rsHandler : connectedRSs.values()) 1796 { 1797 try 1798 { 1799 // After we'll have sent the message , the remote RS will adopt 1800 // the new genId 1801 rsHandler.setGenerationId(newGenId); 1802 if (senderHandler.isDataServer()) 1803 { 1804 rsHandler.send(genIdMsg); 1805 } 1806 } catch (IOException e) 1807 { 1808 logger.error(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID, baseDN, e.getMessage()); 1809 } 1810 } 1811 1812 // Change status of the connected DSs according to the requested new 1813 // reference generation id 1814 for (DataServerHandler dsHandler : connectedDSs.values()) 1815 { 1816 try 1817 { 1818 dsHandler.changeStatusForResetGenId(newGenId); 1819 } catch (IOException e) 1820 { 1821 logger.error(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID, baseDN, 1822 dsHandler.getServerId(), e.getMessage()); 1823 } 1824 } 1825 1826 // Update every peers (RS/DS) with potential topology changes (status 1827 // change). Rather than doing that each time a DS has a status change 1828 // (consecutive to reset gen id message), we prefer advertising once for 1829 // all after changes (less packet sent), here at the end of the reset msg 1830 // treatment. 1831 sendTopoInfoToAll(); 1832 1833 logger.info(NOTE_RESET_GENERATION_ID, baseDN, newGenId); 1834 } 1835 catch(Exception e) 1836 { 1837 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1838 } 1839 finally 1840 { 1841 release(); 1842 } 1843 } 1844 1845 /** 1846 * Process message of a remote server changing his status. 1847 * @param senderHandler The handler associated to the server 1848 * that changed his status. 1849 * @param csMsg The message containing the new status 1850 */ 1851 public void processNewStatus(DataServerHandler senderHandler, 1852 ChangeStatusMsg csMsg) 1853 { 1854 if (logger.isTraceEnabled()) 1855 { 1856 debug("receiving ChangeStatusMsg from " + senderHandler.getServerId() 1857 + ":\n" + csMsg); 1858 } 1859 1860 try 1861 { 1862 // Acquire lock on domain (see more details in comment of start() method 1863 // of ServerHandler) 1864 lock(); 1865 } 1866 catch (InterruptedException ex) 1867 { 1868 // We can't deal with this here, so re-interrupt thread so that it is 1869 // caught during subsequent IO. 1870 Thread.currentThread().interrupt(); 1871 return; 1872 } 1873 1874 try 1875 { 1876 ServerStatus newStatus = senderHandler.processNewStatus(csMsg); 1877 if (newStatus == ServerStatus.INVALID_STATUS) 1878 { 1879 // Already logged an error in processNewStatus() 1880 // just return not to forward a bad status to topology 1881 return; 1882 } 1883 1884 enqueueTopoInfoToAllExcept(senderHandler); 1885 1886 logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS, 1887 senderHandler.getServerId(), baseDN, newStatus); 1888 } 1889 catch(Exception e) 1890 { 1891 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1892 } 1893 finally 1894 { 1895 release(); 1896 } 1897 } 1898 1899 /** 1900 * Change the status of a directory server according to the event generated 1901 * from the status analyzer. 1902 * @param dsHandler The handler of the directory server to update 1903 * @param event The event to be used for new status computation 1904 * @return True if we have been interrupted (must stop), false otherwise 1905 */ 1906 private boolean changeStatus(DataServerHandler dsHandler, 1907 StatusMachineEvent event) 1908 { 1909 try 1910 { 1911 // Acquire lock on domain (see ServerHandler#start() for more details) 1912 lock(); 1913 } 1914 catch (InterruptedException ex) 1915 { 1916 // We have been interrupted for dying, from stopStatusAnalyzer 1917 // to prevent deadlock in this situation: 1918 // RS is being shutdown, and stopServer will call stopStatusAnalyzer. 1919 // Domain lock is taken by shutdown thread while status analyzer thread 1920 // is willing to change the status of a server at the same time so is 1921 // waiting for the domain lock at the same time. As shutdown thread is 1922 // waiting for analyzer thread death, a deadlock occurs. So we force 1923 // interruption of the status analyzer thread death after 2 seconds if 1924 // it has not finished (see StatusAnalyzer.waitForShutdown). This allows 1925 // to have the analyzer thread taking the domain lock only when the 1926 // status of a DS has to be changed. See more comments in run method of 1927 // StatusAnalyzer. 1928 if (logger.isTraceEnabled()) 1929 { 1930 logger.trace("Status analyzer for domain " + baseDN 1931 + " has been interrupted when" 1932 + " trying to acquire domain lock for changing the status of DS " 1933 + dsHandler.getServerId()); 1934 } 1935 return true; 1936 } 1937 1938 try 1939 { 1940 ServerStatus newStatus = ServerStatus.INVALID_STATUS; 1941 ServerStatus oldStatus = dsHandler.getStatus(); 1942 try 1943 { 1944 newStatus = dsHandler.changeStatus(event); 1945 } 1946 catch (IOException e) 1947 { 1948 logger.error(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER, 1949 baseDN, dsHandler.getServerId(), e.getMessage()); 1950 } 1951 1952 if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus) 1953 { 1954 // Change was impossible or already occurred (see StatusAnalyzer 1955 // comments) 1956 return false; 1957 } 1958 1959 enqueueTopoInfoToAllExcept(dsHandler); 1960 } 1961 catch (Exception e) 1962 { 1963 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1964 } 1965 finally 1966 { 1967 release(); 1968 } 1969 1970 return false; 1971 } 1972 1973 /** 1974 * Update every peers (RS/DS) with topology changes. 1975 */ 1976 public void sendTopoInfoToAll() 1977 { 1978 enqueueTopoInfoToAllExcept(null); 1979 } 1980 1981 /** 1982 * Update every peers (RS/DS) with topology changes but one DS. 1983 * 1984 * @param dsHandler 1985 * if not null, the topology message will not be sent to this DS 1986 */ 1987 private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler) 1988 { 1989 synchronized (pendingStatusMessagesLock) 1990 { 1991 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler); 1992 pendingStatusMessages.enqueueTopoInfoToAllRSs(); 1993 } 1994 statusAnalyzer.notifyPendingStatusMessage(); 1995 } 1996 1997 /** 1998 * Clears the Db associated with that domain. 1999 */ 2000 private void clearDbs() 2001 { 2002 try 2003 { 2004 domainDB.removeDomain(baseDN); 2005 } 2006 catch (ChangelogException e) 2007 { 2008 logger.error(ERR_ERROR_CLEARING_DB, baseDN, e.getMessage(), e); 2009 } 2010 } 2011 2012 /** 2013 * Returns whether the provided server is in degraded 2014 * state due to the fact that the peer server has an invalid 2015 * generationId for this domain. 2016 * 2017 * @param serverId The serverId for which we want to know the 2018 * the state. 2019 * @return Whether it is degraded or not. 2020 */ 2021 public boolean isDegradedDueToGenerationId(int serverId) 2022 { 2023 if (logger.isTraceEnabled()) 2024 { 2025 debug("isDegraded serverId=" + serverId + " given local generation Id=" 2026 + this.generationId); 2027 } 2028 2029 ServerHandler sHandler = connectedRSs.get(serverId); 2030 if (sHandler == null) 2031 { 2032 sHandler = connectedDSs.get(serverId); 2033 if (sHandler == null) 2034 { 2035 return false; 2036 } 2037 } 2038 2039 if (logger.isTraceEnabled()) 2040 { 2041 debug("Compute degradation of serverId=" + serverId 2042 + " LS server generation Id=" + sHandler.getGenerationId()); 2043 } 2044 return sHandler.getGenerationId() != this.generationId; 2045 } 2046 2047 /** 2048 * Process topology information received from a peer RS. 2049 * @param topoMsg The just received topo message from remote RS 2050 * @param rsHandler The handler that received the message. 2051 * @param allowResetGenId True for allowing to reset the generation id ( 2052 * when called after initial handshake) 2053 * @throws IOException If an error occurred. 2054 * @throws DirectoryException If an error occurred. 2055 */ 2056 public void receiveTopoInfoFromRS(TopologyMsg topoMsg, 2057 ReplicationServerHandler rsHandler, boolean allowResetGenId) 2058 throws IOException, DirectoryException 2059 { 2060 if (logger.isTraceEnabled()) 2061 { 2062 debug("receiving TopologyMsg from serverId=" + rsHandler.getServerId() 2063 + ":\n" + topoMsg); 2064 } 2065 2066 try 2067 { 2068 // Acquire lock on domain (see more details in comment of start() method 2069 // of ServerHandler) 2070 lock(); 2071 } 2072 catch (InterruptedException ex) 2073 { 2074 // We can't deal with this here, so re-interrupt thread so that it is 2075 // caught during subsequent IO. 2076 Thread.currentThread().interrupt(); 2077 return; 2078 } 2079 2080 try 2081 { 2082 // Store DS connected to remote RS & update information about the peer RS 2083 rsHandler.processTopoInfoFromRS(topoMsg); 2084 2085 // Handle generation id 2086 if (allowResetGenId) 2087 { 2088 resetGenerationIdIfPossible(); 2089 setGenerationIdIfUnset(rsHandler.getGenerationId()); 2090 } 2091 2092 if (isDifferentGenerationId(rsHandler.getGenerationId())) 2093 { 2094 LocalizableMessage message = WARN_BAD_GENERATION_ID_FROM_RS.get(rsHandler.getServerId(), 2095 rsHandler.session.getReadableRemoteAddress(), rsHandler.getGenerationId(), 2096 baseDN, getLocalRSServerId(), generationId); 2097 logger.warn(message); 2098 2099 ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(), 2100 rsHandler.getServerId(), message); 2101 rsHandler.send(errorMsg); 2102 } 2103 2104 /* 2105 * Sends the currently known topology information to every connected 2106 * DS we have. 2107 */ 2108 synchronized (pendingStatusMessagesLock) 2109 { 2110 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); 2111 } 2112 statusAnalyzer.notifyPendingStatusMessage(); 2113 } 2114 catch(Exception e) 2115 { 2116 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 2117 } 2118 finally 2119 { 2120 release(); 2121 } 2122 } 2123 2124 private void setGenerationIdIfUnset(long generationId) 2125 { 2126 if (this.generationId < 0) 2127 { 2128 this.generationId = generationId; 2129 } 2130 } 2131 2132 /** 2133 * Returns the latest monitor data available for this replication server 2134 * domain. 2135 * 2136 * @return The latest monitor data available for this replication server 2137 * domain, which is never {@code null}. 2138 */ 2139 ReplicationDomainMonitorData getDomainMonitorData() 2140 { 2141 return domainMonitor.getMonitorData(); 2142 } 2143 2144 /** 2145 * Get the map of connected DSs. 2146 * @return The map of connected DSs 2147 */ 2148 public Map<Integer, DataServerHandler> getConnectedDSs() 2149 { 2150 return Collections.unmodifiableMap(connectedDSs); 2151 } 2152 2153 /** 2154 * Get the map of connected RSs. 2155 * @return The map of connected RSs 2156 */ 2157 public Map<Integer, ReplicationServerHandler> getConnectedRSs() 2158 { 2159 return Collections.unmodifiableMap(connectedRSs); 2160 } 2161 2162 2163 /** 2164 * A synchronization mechanism is created to insure exclusive access to the 2165 * domain. The goal is to have a consistent view of the topology by locking 2166 * the structures holding the topology view of the domain: 2167 * {@link #connectedDSs} and {@link #connectedRSs}. When a connection is 2168 * established with a peer DS or RS, the lock should be taken before updating 2169 * these structures, then released. The same mechanism should be used when 2170 * updating any data related to the view of the topology: for instance if the 2171 * status of a DS is changed, the lock should be taken before updating the 2172 * matching server handler and sending the topology messages to peers and 2173 * released after.... This allows every member of the topology to have a 2174 * consistent view of the topology and to be sure it will not miss some 2175 * information. 2176 * <p> 2177 * So the locking system must be called (not exhaustive list): 2178 * <ul> 2179 * <li>when connection established with a DS or RS</li> 2180 * <li>when connection ended with a DS or RS</li> 2181 * <li>when receiving a TopologyMsg and updating structures</li> 2182 * <li>when creating and sending a TopologyMsg</li> 2183 * <li>when a DS status is changing (ChangeStatusMsg received or sent)...</li> 2184 * </ul> 2185 */ 2186 private final ReentrantLock lock = new ReentrantLock(); 2187 2188 /** 2189 * This lock is used to protect the generationId variable. 2190 */ 2191 private final Object generationIDLock = new Object(); 2192 2193 /** 2194 * Tests if the current thread has the lock on this domain. 2195 * @return True if the current thread has the lock. 2196 */ 2197 public boolean hasLock() 2198 { 2199 return lock.getHoldCount() > 0; 2200 } 2201 2202 /** 2203 * Takes the lock on this domain (blocking until lock can be acquired) or 2204 * calling thread is interrupted. 2205 * @throws java.lang.InterruptedException If interrupted. 2206 */ 2207 public void lock() throws InterruptedException 2208 { 2209 lock.lockInterruptibly(); 2210 } 2211 2212 /** 2213 * Releases the lock on this domain. 2214 */ 2215 public void release() 2216 { 2217 lock.unlock(); 2218 } 2219 2220 /** 2221 * Tries to acquire the lock on the domain within a given amount of time. 2222 * @param timeout The amount of milliseconds to wait for acquiring the lock. 2223 * @return True if the lock was acquired, false if timeout occurred. 2224 * @throws java.lang.InterruptedException When call was interrupted. 2225 */ 2226 public boolean tryLock(long timeout) throws InterruptedException 2227 { 2228 return lock.tryLock(timeout, TimeUnit.MILLISECONDS); 2229 } 2230 2231 /** 2232 * Starts the monitoring publisher for the domain if not already started. 2233 */ 2234 private void startMonitoringPublisher() 2235 { 2236 long period = localReplicationServer.getMonitoringPublisherPeriod(); 2237 if (period > 0) // 0 means no monitoring publisher 2238 { 2239 final MonitoringPublisher thread = new MonitoringPublisher(this, period); 2240 if (monitoringPublisher.compareAndSet(null, thread)) 2241 { 2242 thread.start(); 2243 } 2244 } 2245 } 2246 2247 /** 2248 * Stops the monitoring publisher for the domain. 2249 */ 2250 private void stopMonitoringPublisher() 2251 { 2252 final MonitoringPublisher thread = monitoringPublisher.get(); 2253 if (thread != null && monitoringPublisher.compareAndSet(thread, null)) 2254 { 2255 thread.shutdown(); 2256 thread.waitForShutdown(); 2257 } 2258 } 2259 2260 /** {@inheritDoc} */ 2261 @Override 2262 public void initializeMonitorProvider(MonitorProviderCfg configuraiton) 2263 { 2264 // Nothing to do for now 2265 } 2266 2267 /** {@inheritDoc} */ 2268 @Override 2269 public String getMonitorInstanceName() 2270 { 2271 return "Replication server RS(" + localReplicationServer.getServerId() 2272 + ") " + localReplicationServer.getServerURL() + ",cn=" 2273 + baseDN.toString().replace(',', '_').replace('=', '_') 2274 + ",cn=Replication"; 2275 } 2276 2277 @Override 2278 public MonitorData getMonitorData() 2279 { 2280 int serverId = localReplicationServer.getServerId(); 2281 2282 final MonitorData attributes = new MonitorData(5); 2283 attributes.add("replication-server-id", serverId); 2284 attributes.add("replication-server-port", localReplicationServer.getReplicationPort()); 2285 attributes.add("domain-name", baseDN); 2286 attributes.add("generation-id", baseDN + " " + generationId); 2287 attributes.add("missing-changes", getDomainMonitorData().getMissingChangesRS(serverId)); 2288 return attributes; 2289 } 2290 2291 /** 2292 * Returns the oldest known state for the domain, made of the oldest CSN 2293 * stored for each serverId. 2294 * <p> 2295 * Note: Because the replication changelogDB trimming always keep one change 2296 * whatever its date, the CSN contained in the returned state can be very old. 2297 * 2298 * @return the start state of the domain. 2299 */ 2300 public ServerState getOldestState() 2301 { 2302 return domainDB.getDomainOldestCSNs(baseDN); 2303 } 2304 2305 private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg) 2306 { 2307 for (int i = 1; i <= 2; i++) 2308 { 2309 if (!handler.shuttingDown() 2310 && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) 2311 { 2312 try 2313 { 2314 handler.sendTopoInfo(msg); 2315 break; 2316 } 2317 catch (IOException e) 2318 { 2319 if (i == 2) 2320 { 2321 logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, 2322 baseDN, type, handler.getServerId(), e.getMessage()); 2323 } 2324 } 2325 } 2326 sleep(100); 2327 } 2328 } 2329 2330 2331 2332 /** 2333 * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp) 2334 * value received, and forwarding the message to the other RSes. 2335 * @param senderHandler The handler for the server that sent the heartbeat. 2336 * @param msg The message to process. 2337 * @throws DirectoryException 2338 * if a problem occurs 2339 */ 2340 void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, 2341 ChangeTimeHeartbeatMsg msg) throws DirectoryException 2342 { 2343 try 2344 { 2345 domainDB.replicaHeartbeat(baseDN, msg.getCSN()); 2346 } 2347 catch (ChangelogException e) 2348 { 2349 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e 2350 .getMessageObject(), e); 2351 } 2352 2353 if (senderHandler.isDataServer()) 2354 { 2355 /* 2356 * If we are the first replication server warned, then forward the message 2357 * to the remote replication servers. 2358 */ 2359 synchronized (pendingStatusMessagesLock) 2360 { 2361 pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg); 2362 } 2363 statusAnalyzer.notifyPendingStatusMessage(); 2364 } 2365 } 2366 2367 /** 2368 * Return the monitor instance name of the ReplicationServer that created the 2369 * current instance. 2370 * 2371 * @return the monitor instance name of the ReplicationServer that created the 2372 * current instance. 2373 */ 2374 String getLocalRSMonitorInstanceName() 2375 { 2376 return this.localReplicationServer.getMonitorInstanceName(); 2377 } 2378 2379 /** 2380 * Return the serverId of the ReplicationServer that created the current 2381 * instance. 2382 * 2383 * @return the serverId of the ReplicationServer that created the current 2384 * instance. 2385 */ 2386 int getLocalRSServerId() 2387 { 2388 return this.localReplicationServer.getServerId(); 2389 } 2390 2391 /** 2392 * Update the monitoring publisher with the new period value. 2393 * 2394 * @param period 2395 * The new period value. 2396 */ 2397 void updateMonitoringPeriod(long period) 2398 { 2399 if (period == 0) 2400 { 2401 // Requested to stop monitoring publishers 2402 stopMonitoringPublisher(); 2403 return; 2404 } 2405 2406 final MonitoringPublisher mpThread = monitoringPublisher.get(); 2407 if (mpThread != null) // it is running 2408 { 2409 mpThread.setPeriod(period); 2410 } 2411 else if (!connectedDSs.isEmpty() || !connectedRSs.isEmpty()) 2412 { 2413 // Requested to start monitoring publishers with provided period value 2414 startMonitoringPublisher(); 2415 } 2416 } 2417 2418 /** 2419 * Registers a DS handler into this domain and notifies the domain about the 2420 * new DS. 2421 * 2422 * @param dsHandler 2423 * The Directory Server Handler to register 2424 */ 2425 public void register(DataServerHandler dsHandler) 2426 { 2427 startMonitoringPublisher(); 2428 2429 // connected with new DS: store handler. 2430 connectedDSs.put(dsHandler.getServerId(), dsHandler); 2431 2432 // Tell peer RSs and DSs a new DS just connected to us 2433 // No need to re-send TopologyMsg to this just new DS 2434 enqueueTopoInfoToAllExcept(dsHandler); 2435 } 2436 2437 /** 2438 * Registers the RS handler into this domain and notifies the domain. 2439 * 2440 * @param rsHandler 2441 * The Replication Server Handler to register 2442 */ 2443 public void register(ReplicationServerHandler rsHandler) 2444 { 2445 startMonitoringPublisher(); 2446 2447 // connected with new RS (either outgoing or incoming 2448 // connection): store handler. 2449 connectedRSs.put(rsHandler.getServerId(), rsHandler); 2450 } 2451 2452 private void debug(String message) 2453 { 2454 logger.trace("In ReplicationServerDomain serverId=" 2455 + localReplicationServer.getServerId() + " for baseDN=" + baseDN 2456 + " and port=" + localReplicationServer.getReplicationPort() 2457 + ": " + message); 2458 } 2459 2460 2461 2462 /** 2463 * Go through each connected DS, get the number of pending changes we have for 2464 * it and change status accordingly if threshold value is crossed/uncrossed. 2465 */ 2466 void checkDSDegradedStatus() 2467 { 2468 final int degradedStatusThreshold = localReplicationServer 2469 .getDegradedStatusThreshold(); 2470 // Threshold value = 0 means no status analyzer (no degrading system) 2471 // we should not have that as the status analyzer thread should not be 2472 // created if this is the case, but for sanity purpose, we add this 2473 // test 2474 if (degradedStatusThreshold > 0) 2475 { 2476 for (DataServerHandler serverHandler : connectedDSs.values()) 2477 { 2478 // Get number of pending changes for this server 2479 final int nChanges = serverHandler.getRcvMsgQueueSize(); 2480 if (logger.isTraceEnabled()) 2481 { 2482 logger.trace("In RS " + getLocalRSServerId() + ", for baseDN=" 2483 + getBaseDN() + ": " + "Status analyzer: DS " 2484 + serverHandler.getServerId() + " has " + nChanges 2485 + " message(s) in writer queue."); 2486 } 2487 2488 // Check status to know if it is relevant to change the status. Do not 2489 // take RSD lock to test. If we attempt to change the status whereas 2490 // the current status does allow it, this will be noticed by 2491 // the changeStatusFromStatusAnalyzer() method. This allows to take the 2492 // lock roughly only when needed versus every sleep time timeout. 2493 if (nChanges >= degradedStatusThreshold) 2494 { 2495 if (serverHandler.getStatus() == NORMAL_STATUS 2496 && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT)) 2497 { 2498 break; // Interrupted. 2499 } 2500 } 2501 else 2502 { 2503 if (serverHandler.getStatus() == DEGRADED_STATUS 2504 && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT)) 2505 { 2506 break; // Interrupted. 2507 } 2508 } 2509 } 2510 } 2511 } 2512 2513 2514 2515 /** 2516 * Sends any enqueued status messages to the rest of the topology. 2517 */ 2518 void sendPendingStatusMessages() 2519 { 2520 /* 2521 * Take a snapshot of pending status notifications in order to avoid holding 2522 * the broadcast lock for too long. In addition, clear the notifications so 2523 * that they are not resent the next time. 2524 */ 2525 final PendingStatusMessages savedState; 2526 synchronized (pendingStatusMessagesLock) 2527 { 2528 savedState = pendingStatusMessages; 2529 pendingStatusMessages = new PendingStatusMessages(); 2530 } 2531 sendPendingChangeTimeHeartbeatMsgs(savedState); 2532 sendPendingTopologyMsgs(savedState); 2533 sendPendingMonitorMsgs(savedState); 2534 } 2535 2536 2537 2538 private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs) 2539 { 2540 for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs 2541 .entrySet()) 2542 { 2543 ServerHandler ds = connectedDSs.get(msg.getKey()); 2544 if (ds != null) 2545 { 2546 try 2547 { 2548 ds.send(msg.getValue()); 2549 } 2550 catch (IOException e) 2551 { 2552 // Ignore: connection closed. 2553 } 2554 } 2555 } 2556 for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs 2557 .entrySet()) 2558 { 2559 ServerHandler rs = connectedRSs.get(msg.getKey()); 2560 if (rs != null) 2561 { 2562 try 2563 { 2564 rs.send(msg.getValue()); 2565 } 2566 catch (IOException e) 2567 { 2568 // We log the error. The requestor will detect a timeout or 2569 // any other failure on the connection. 2570 2571 // FIXME: why do we log for RSs but not DSs? 2572 logger.traceException(e); 2573 logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue().getDestination()); 2574 } 2575 } 2576 } 2577 } 2578 2579 2580 2581 private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs) 2582 { 2583 for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats.values()) 2584 { 2585 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 2586 { 2587 try 2588 { 2589 if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3) 2590 { 2591 rsHandler.send(pendingHeartbeat); 2592 } 2593 } 2594 catch (IOException e) 2595 { 2596 logger.traceException(e); 2597 logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server " 2598 + localReplicationServer.getReplicationPort() + " " + baseDN 2599 + " " + localReplicationServer.getServerId()); 2600 stopServer(rsHandler, false); 2601 } 2602 } 2603 } 2604 } 2605 2606 2607 2608 private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs) 2609 { 2610 if (pendingMsgs.sendDSTopologyMsg) 2611 { 2612 for (ServerHandler handler : connectedDSs.values()) 2613 { 2614 if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg) 2615 { 2616 final TopologyMsg topoMsg = createTopologyMsgForDS(handler 2617 .getServerId()); 2618 sendTopologyMsg("directory", handler, topoMsg); 2619 } 2620 } 2621 } 2622 2623 if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty()) 2624 { 2625 final TopologyMsg topoMsg = createTopologyMsgForRS(); 2626 for (ServerHandler handler : connectedRSs.values()) 2627 { 2628 sendTopologyMsg("replication", handler, topoMsg); 2629 } 2630 } 2631 } 2632 2633 2634 2635 private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender) 2636 { 2637 /* 2638 * If the request comes from a Directory Server we need to build the full 2639 * list of all servers in the topology and send back a MonitorMsg with the 2640 * full list of all the servers in the topology. 2641 */ 2642 if (sender.isDataServer()) 2643 { 2644 MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( 2645 msg.getDestination(), msg.getSenderID(), 2646 domainMonitor.getMonitorData()); 2647 synchronized (pendingStatusMessagesLock) 2648 { 2649 pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(), 2650 monitorMsg); 2651 } 2652 } 2653 else 2654 { 2655 MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( 2656 msg.getDestination(), msg.getSenderID()); 2657 synchronized (pendingStatusMessagesLock) 2658 { 2659 pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(), 2660 monitorMsg); 2661 } 2662 } 2663 statusAnalyzer.notifyPendingStatusMessage(); 2664 } 2665}