001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2008-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.service; 018 019import static org.opends.messages.ReplicationMessages.*; 020import static org.opends.server.replication.common.AssuredMode.*; 021import static org.opends.server.replication.common.StatusMachine.*; 022import static org.opends.server.util.CollectionUtils.*; 023 024import java.io.BufferedOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.SocketTimeoutException; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.Date; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicReference; 042 043import net.jcip.annotations.Immutable; 044 045import org.forgerock.i18n.LocalizableMessage; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigException; 048import org.forgerock.opendj.ldap.ResultCode; 049import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.AssuredType; 050import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; 051import org.opends.server.api.DirectoryThread; 052import org.opends.server.api.MonitorData; 053import org.opends.server.backends.task.Task; 054import org.opends.server.replication.common.AssuredMode; 055import org.opends.server.replication.common.CSN; 056import org.opends.server.replication.common.CSNGenerator; 057import org.opends.server.replication.common.DSInfo; 058import org.opends.server.replication.common.RSInfo; 059import org.opends.server.replication.common.ServerState; 060import org.opends.server.replication.common.ServerStatus; 061import org.opends.server.replication.common.StatusMachine; 062import org.opends.server.replication.common.StatusMachineEvent; 063import org.opends.server.replication.protocol.AckMsg; 064import org.opends.server.replication.protocol.ChangeStatusMsg; 065import org.opends.server.replication.protocol.DoneMsg; 066import org.opends.server.replication.protocol.EntryMsg; 067import org.opends.server.replication.protocol.ErrorMsg; 068import org.opends.server.replication.protocol.HeartbeatMsg; 069import org.opends.server.replication.protocol.InitializeRcvAckMsg; 070import org.opends.server.replication.protocol.InitializeRequestMsg; 071import org.opends.server.replication.protocol.InitializeTargetMsg; 072import org.opends.server.replication.protocol.ProtocolVersion; 073import org.opends.server.replication.protocol.ReplSessionSecurity; 074import org.opends.server.replication.protocol.ReplicationMsg; 075import org.opends.server.replication.protocol.ResetGenerationIdMsg; 076import org.opends.server.replication.protocol.RoutableMsg; 077import org.opends.server.replication.protocol.TopologyMsg; 078import org.opends.server.replication.protocol.UpdateMsg; 079import org.opends.server.tasks.InitializeTargetTask; 080import org.opends.server.tasks.InitializeTask; 081import org.forgerock.opendj.ldap.DN; 082import org.opends.server.types.DirectoryException; 083 084/** 085 * This class should be used as a base for Replication implementations. 086 * <p> 087 * It is intended that developer in need of a replication mechanism 088 * subclass this class with their own implementation. 089 * <p> 090 * The startup phase of the ReplicationDomain subclass, 091 * should read the list of replication servers from the configuration, 092 * instantiate a {@link ServerState} then start the publish service 093 * by calling {@link #startPublishService()}. 094 * At this point it can start calling the {@link #publish(UpdateMsg)} 095 * method if needed. 096 * <p> 097 * When the startup phase reach the point when the subclass is ready 098 * to handle updates the Replication Domain implementation should call the 099 * {@link #startListenService()} method. 100 * At this point a Listener thread is created on the Replication Service 101 * and which can start receiving updates. 102 * <p> 103 * When updates are received the Replication Service calls the 104 * {@link #processUpdate(UpdateMsg)} method. 105 * ReplicationDomain implementation should implement the appropriate code 106 * for replaying the update on the local repository. 107 * When fully done the subclass must call the 108 * {@link #processUpdateDone(UpdateMsg, String)} method. 109 * This allows to process the update asynchronously if necessary. 110 * 111 * <p> 112 * To propagate changes to other replica, a ReplicationDomain implementation 113 * must use the {@link #publish(UpdateMsg)} method. 114 * <p> 115 * If the Full Initialization process is needed then implementation 116 * for {@code importBackend(InputStream)} and 117 * {@code exportBackend(OutputStream)} must be 118 * provided. 119 * <p> 120 * Full Initialization of a replica can be triggered by LDAP clients 121 * by creating InitializeTasks or InitializeTargetTask. 122 * Full initialization can also be triggered from the ReplicationDomain 123 * implementation using methods {@link #initializeRemote(int, Task)} 124 * or {@link #initializeFromRemote(int, Task)}. 125 * <p> 126 * At shutdown time, the {@link #disableService()} method should be called to 127 * cleanly stop the replication service. 128 */ 129public abstract class ReplicationDomain 130{ 131 132 /** Contains all the attributes included for the ECL (External Changelog). */ 133 @Immutable 134 private static final class ECLIncludes 135 { 136 final Map<Integer, Set<String>> includedAttrsByServer; 137 final Set<String> includedAttrsAllServers; 138 139 final Map<Integer, Set<String>> includedAttrsForDeletesByServer; 140 final Set<String> includedAttrsForDeletesAllServers; 141 142 private ECLIncludes( 143 Map<Integer, Set<String>> includedAttrsByServer, 144 Set<String> includedAttrsAllServers, 145 Map<Integer, Set<String>> includedAttrsForDeletesByServer, 146 Set<String> includedAttrsForDeletesAllServers) 147 { 148 this.includedAttrsByServer = includedAttrsByServer; 149 this.includedAttrsAllServers = includedAttrsAllServers; 150 151 this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer; 152 this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers; 153 } 154 155 @SuppressWarnings("unchecked") 156 public ECLIncludes() 157 { 158 this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP, 159 Collections.EMPTY_SET); 160 } 161 162 /** 163 * Add attributes to be included in the ECL. 164 * 165 * @param serverId 166 * Server where these attributes are configured. 167 * @param includeAttributes 168 * Attributes to be included with all change records, may include 169 * wild-cards. 170 * @param includeAttributesForDeletes 171 * Additional attributes to be included with delete change records, 172 * may include wild-cards. 173 * @return a new {@link ECLIncludes} object if included attributes have 174 * changed, or the current object otherwise. 175 */ 176 public ECLIncludes addIncludedAttributes(int serverId, 177 Set<String> includeAttributes, Set<String> includeAttributesForDeletes) 178 { 179 boolean configurationChanged = false; 180 181 Set<String> s1 = new HashSet<>(includeAttributes); 182 183 // Combine all+delete attributes. 184 Set<String> s2 = new HashSet<>(s1); 185 s2.addAll(includeAttributesForDeletes); 186 187 Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer; 188 if (!s1.equals(this.includedAttrsByServer.get(serverId))) 189 { 190 configurationChanged = true; 191 eclIncludesByServer = new HashMap<>(this.includedAttrsByServer); 192 eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); 193 } 194 195 Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer; 196 if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId))) 197 { 198 configurationChanged = true; 199 eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer); 200 eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2)); 201 } 202 203 if (!configurationChanged) 204 { 205 return this; 206 } 207 208 // and rebuild the global list to be ready for usage 209 Set<String> eclIncludesAllServer = new HashSet<>(); 210 for (Set<String> attributes : eclIncludesByServer.values()) 211 { 212 eclIncludesAllServer.addAll(attributes); 213 } 214 215 Set<String> eclIncludesForDeletesAllServer = new HashSet<>(); 216 for (Set<String> attributes : eclIncludesForDeletesByServer.values()) 217 { 218 eclIncludesForDeletesAllServer.addAll(attributes); 219 } 220 return new ECLIncludes(eclIncludesByServer, 221 Collections.unmodifiableSet(eclIncludesAllServer), 222 eclIncludesForDeletesByServer, 223 Collections.unmodifiableSet(eclIncludesForDeletesAllServer)); 224 } 225 } 226 227 /** 228 * Current status for this replicated domain. 229 */ 230 private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS; 231 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 232 233 /** The configuration of the replication domain. */ 234 protected volatile ReplicationDomainCfg config; 235 /** 236 * The assured configuration of the replication domain. It is a duplicate of 237 * {@link #config} because of its update model. 238 * 239 * @see #readAssuredConfig(ReplicationDomainCfg, boolean) 240 */ 241 private volatile ReplicationDomainCfg assuredConfig; 242 243 /** 244 * The ReplicationBroker that is used by this ReplicationDomain to 245 * connect to the ReplicationService. 246 */ 247 protected ReplicationBroker broker; 248 249 /** 250 * This Map is used to store all outgoing assured messages in order 251 * to be able to correlate all the coming back acks to the original 252 * operation. 253 */ 254 private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>(); 255 /** 256 * The context related to an import or export being processed 257 * Null when none is being processed. 258 */ 259 private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>(); 260 261 /** 262 * The Thread waiting for incoming update messages for this domain and pushing 263 * them to the global incoming update message queue for later processing by 264 * replay threads. 265 */ 266 private volatile DirectoryThread listenerThread; 267 268 /** A set of counters used for Monitoring. */ 269 private AtomicInteger numProcessedUpdates = new AtomicInteger(0); 270 private AtomicInteger numRcvdUpdates = new AtomicInteger(0); 271 private AtomicInteger numSentUpdates = new AtomicInteger(0); 272 273 /** Assured replication monitoring counters. */ 274 275 /** Number of updates sent in Assured Mode, Safe Read. */ 276 private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0); 277 /** 278 * Number of updates sent in Assured Mode, Safe Read, that have been 279 * successfully acknowledged. 280 */ 281 private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0); 282 /** 283 * Number of updates sent in Assured Mode, Safe Read, that have not been 284 * successfully acknowledged (either because of timeout, wrong status or error 285 * at replay). 286 */ 287 private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0); 288 /** 289 * Number of updates sent in Assured Mode, Safe Read, that have not been 290 * successfully acknowledged because of timeout. 291 */ 292 private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0); 293 /** 294 * Number of updates sent in Assured Mode, Safe Read, that have not been 295 * successfully acknowledged because of wrong status. 296 */ 297 private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0); 298 /** 299 * Number of updates sent in Assured Mode, Safe Read, that have not been 300 * successfully acknowledged because of replay error. 301 */ 302 private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0); 303 /** 304 * Multiple values allowed: number of updates sent in Assured Mode, Safe Read, 305 * that have not been successfully acknowledged (either because of timeout, 306 * wrong status or error at replay) for a particular server (DS or RS). 307 * <p> 308 * String format: <server id>:<number of failed updates> 309 */ 310 private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>(); 311 /** Number of updates received in Assured Mode, Safe Read request. */ 312 private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); 313 /** 314 * Number of updates received in Assured Mode, Safe Read request that we have 315 * acked without errors. 316 */ 317 private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0); 318 /** 319 * Number of updates received in Assured Mode, Safe Read request that we have 320 * acked with errors. 321 */ 322 private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); 323 /** Number of updates sent in Assured Mode, Safe Data. */ 324 private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0); 325 /** 326 * Number of updates sent in Assured Mode, Safe Data, that have been 327 * successfully acknowledged. 328 */ 329 private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0); 330 /** 331 * Number of updates sent in Assured Mode, Safe Data, that have not been 332 * successfully acknowledged because of timeout. 333 */ 334 private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0); 335 /** 336 * Multiple values allowed: number of updates sent in Assured Mode, Safe Data, 337 * that have not been successfully acknowledged because of timeout for a 338 * particular RS. 339 * <p> 340 * String format: <server id>:<number of failed updates> 341 */ 342 private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>(); 343 344 /* Status related monitoring fields */ 345 346 /** 347 * Indicates the date when the status changed. This may be used to indicate 348 * the date the session with the current replication server started (when 349 * status is NORMAL for instance). All the above assured monitoring fields 350 * are also reset each time the status is changed 351 */ 352 private Date lastStatusChangeDate = new Date(); 353 354 /** 355 * The state maintained by the Concrete Class. 356 */ 357 private final ServerState state; 358 359 /** 360 * The generator that will be used to generate {@link CSN} 361 * for this domain. 362 */ 363 private final CSNGenerator generator; 364 365 private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes()); 366 367 /** 368 * An object used to protect the initialization of the underlying broker 369 * session of this ReplicationDomain. 370 */ 371 private final Object sessionLock = new Object(); 372 373 /** 374 * The generationId for this replication domain. It is made of a hash of the 375 * 1000 first entries for this domain. 376 */ 377 protected volatile long generationId; 378 379 /** 380 * Returns the {@link CSNGenerator} that will be used to 381 * generate {@link CSN} for this domain. 382 * 383 * @return The {@link CSNGenerator} that will be used to 384 * generate {@link CSN} for this domain. 385 */ 386 public CSNGenerator getGenerator() 387 { 388 return generator; 389 } 390 391 /** 392 * Creates a ReplicationDomain with the provided parameters. 393 * 394 * @param config 395 * The configuration object for this ReplicationDomain 396 * @param generationId 397 * the generation of this ReplicationDomain 398 */ 399 public ReplicationDomain(ReplicationDomainCfg config, long generationId) 400 { 401 this(config, generationId, new ServerState()); 402 } 403 404 /** 405 * Creates a ReplicationDomain with the provided parameters. (for unit test 406 * purpose only) 407 * 408 * @param config 409 * The configuration object for this ReplicationDomain 410 * @param generationId 411 * the generation of this ReplicationDomain 412 * @param serverState 413 * The serverState to use 414 */ 415 public ReplicationDomain(ReplicationDomainCfg config, long generationId, 416 ServerState serverState) 417 { 418 this.config = config; 419 this.assuredConfig = config; 420 this.generationId = generationId; 421 this.state = serverState; 422 this.generator = new CSNGenerator(getServerId(), state); 423 } 424 425 /** 426 * Set the initial status of the domain and perform necessary initializations. 427 * This method will be called by the Broker each time the ReplicationBroker 428 * establish a new session to a Replication Server. 429 * 430 * Implementations may override this method when they need to perform 431 * additional computing after session establishment. 432 * The default implementation should be sufficient for ReplicationDomains 433 * that don't need to perform additional computing. 434 * 435 * @param initStatus The status to enter the state machine with. 436 * @param rsState The ServerState of the ReplicationServer 437 * with which the session was established. 438 */ 439 public void sessionInitiated(ServerStatus initStatus, ServerState rsState) 440 { 441 // Sanity check: is it a valid initial status? 442 if (!isValidInitialStatus(initStatus)) 443 { 444 logger.error(ERR_DS_INVALID_INIT_STATUS, initStatus, getBaseDN(), getServerId()); 445 } 446 else 447 { 448 status = initStatus; 449 } 450 generator.adjust(state); 451 generator.adjust(rsState); 452 } 453 454 /** 455 * Processes an incoming ChangeStatusMsg. Compute new status according to 456 * given order. Then update domain for being compliant with new status 457 * definition. 458 * @param csMsg The received status message 459 */ 460 private void receiveChangeStatus(ChangeStatusMsg csMsg) 461 { 462 if (logger.isTraceEnabled()) 463 { 464 logger.trace("Replication domain " + getBaseDN() + 465 " received change status message:\n" + csMsg); 466 } 467 468 ServerStatus reqStatus = csMsg.getRequestedStatus(); 469 470 // Translate requested status to a state machine event 471 StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); 472 if (event == StatusMachineEvent.INVALID_EVENT) 473 { 474 logger.error(ERR_DS_INVALID_REQUESTED_STATUS, reqStatus, getBaseDN(), getServerId()); 475 return; 476 } 477 478 // Set the new status to the requested one 479 setNewStatus(event); 480 } 481 482 /** 483 * Called when first connection or disconnection detected. 484 */ 485 void toNotConnectedStatus() 486 { 487 // Go into not connected status 488 setNewStatus(StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT); 489 } 490 491 /** 492 * Perform whatever actions are needed to apply properties for being 493 * compliant with new status. Must be called in synchronized section for 494 * status. The new status is already set in status variable. 495 */ 496 private void updateDomainForNewStatus() 497 { 498 switch (status) 499 { 500 case FULL_UPDATE_STATUS: 501 // Signal RS we just entered the full update status 502 broker.signalStatusChange(status); 503 break; 504 case NOT_CONNECTED_STATUS: 505 case NORMAL_STATUS: 506 case DEGRADED_STATUS: 507 case BAD_GEN_ID_STATUS: 508 break; 509 default: 510 if (logger.isTraceEnabled()) 511 { 512 logger.trace("updateDomainForNewStatus: unexpected status: " + status); 513 } 514 } 515 } 516 517 /** 518 * Gets the status for this domain. 519 * @return The status for this domain. 520 */ 521 public ServerStatus getStatus() 522 { 523 return status; 524 } 525 526 /** 527 * Returns the base DN of this ReplicationDomain. All Replication Domain using 528 * this baseDN will be connected through the Replication Service. 529 * 530 * @return The base DN of this ReplicationDomain 531 */ 532 public DN getBaseDN() 533 { 534 return config.getBaseDN(); 535 } 536 537 /** 538 * Get the server ID. The identifier of this Replication Domain inside the 539 * Replication Service. Each Domain must use a unique ServerID. 540 * 541 * @return The server ID. 542 */ 543 public int getServerId() 544 { 545 return config.getServerId(); 546 } 547 548 /** 549 * Window size used during initialization .. between - the 550 * initializer/exporter DS that listens/waits acknowledges and that slows down 551 * data msg publishing based on the slowest server - and each 552 * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg 553 * received. 554 * 555 * @return the initWindow 556 */ 557 protected int getInitWindow() 558 { 559 return config.getInitializationWindowSize(); 560 } 561 562 /** 563 * Tells if assured replication is enabled for this domain. 564 * @return True if assured replication is enabled for this domain. 565 */ 566 public boolean isAssured() 567 { 568 return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType()) 569 || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType()); 570 } 571 572 /** 573 * Gives the mode for the assured replication of the domain. Only used when 574 * assured is true). 575 * 576 * @return The mode for the assured replication of the domain. 577 */ 578 public AssuredMode getAssuredMode() 579 { 580 switch (assuredConfig.getAssuredType()) 581 { 582 case SAFE_DATA: 583 case NOT_ASSURED: // The assured mode will be ignored in that case anyway 584 return AssuredMode.SAFE_DATA_MODE; 585 case SAFE_READ: 586 return AssuredMode.SAFE_READ_MODE; 587 } 588 return null; // should never happen 589 } 590 591 /** 592 * Gives the assured Safe Data level of the replication of the domain. (used 593 * when assuredMode is SAFE_DATA). 594 * 595 * @return The assured level of the replication of the domain. 596 */ 597 public byte getAssuredSdLevel() 598 { 599 return (byte) assuredConfig.getAssuredSdLevel(); 600 } 601 602 /** 603 * Gives the assured timeout of the replication of the domain (in ms). 604 * @return The assured timeout of the replication of the domain. 605 */ 606 public long getAssuredTimeout() 607 { 608 return assuredConfig.getAssuredTimeout(); 609 } 610 611 /** 612 * Gets the group id for this domain. 613 * @return The group id for this domain. 614 */ 615 public byte getGroupId() 616 { 617 return (byte) config.getGroupId(); 618 } 619 620 /** 621 * Gets the referrals URLs this domain publishes. Referrals urls to be 622 * published to other servers of the topology. 623 * <p> 624 * TODO: fill that with all currently opened urls if no urls configured 625 * 626 * @return The referrals URLs this domain publishes. 627 */ 628 public Set<String> getRefUrls() 629 { 630 return config.getReferralsUrl(); 631 } 632 633 /** 634 * Gets the info for Replicas in the topology (except us). 635 * @return The info for Replicas in the topology (except us) 636 */ 637 public Map<Integer, DSInfo> getReplicaInfos() 638 { 639 return broker.getReplicaInfos(); 640 } 641 642 /** 643 * Returns information about the DS server related to the provided serverId. 644 * based on the TopologyMsg we received when the remote replica connected or 645 * disconnected. Return null when no server with the provided serverId is 646 * connected. 647 * 648 * @param dsId The provided serverId of the remote replica 649 * @return the info related to this remote server if it is connected, 650 * null is the server is NOT connected. 651 */ 652 private DSInfo getConnectedRemoteDS(int dsId) 653 { 654 return getReplicaInfos().get(dsId); 655 } 656 657 /** 658 * Gets the States of all the Replicas currently in the 659 * Topology. 660 * When this method is called, a Monitoring message will be sent 661 * to the Replication Server to which this domain is currently connected 662 * so that it computes a table containing information about 663 * all Directory Servers in the topology. 664 * This Computation involves communications will all the servers 665 * currently connected and 666 * 667 * @return The States of all Replicas in the topology (except us) 668 */ 669 public Map<Integer, ServerState> getReplicaStates() 670 { 671 return broker.getReplicaStates(); 672 } 673 674 /** 675 * Gets the info for RSs in the topology (except the one we are connected 676 * to). 677 * @return The info for RSs in the topology (except the one we are connected 678 * to) 679 */ 680 public List<RSInfo> getRsInfos() 681 { 682 return broker.getRsInfos(); 683 } 684 685 686 /** 687 * Gets the server ID of the Replication Server to which the domain 688 * is currently connected. 689 * 690 * @return The server ID of the Replication Server to which the domain 691 * is currently connected. 692 */ 693 public int getRsServerId() 694 { 695 return broker.getRsServerId(); 696 } 697 698 /** 699 * Increment the number of processed updates. 700 */ 701 private void incProcessedUpdates() 702 { 703 numProcessedUpdates.incrementAndGet(); 704 } 705 706 /** 707 * Get the number of updates replayed by the replication. 708 * 709 * @return The number of updates replayed by the replication 710 */ 711 int getNumProcessedUpdates() 712 { 713 if (numProcessedUpdates != null) 714 { 715 return numProcessedUpdates.get(); 716 } 717 return 0; 718 } 719 720 /** 721 * Get the number of updates received by the replication plugin. 722 * 723 * @return the number of updates received 724 */ 725 int getNumRcvdUpdates() 726 { 727 if (numRcvdUpdates != null) 728 { 729 return numRcvdUpdates.get(); 730 } 731 return 0; 732 } 733 734 /** 735 * Get the number of updates sent by the replication plugin. 736 * 737 * @return the number of updates sent 738 */ 739 int getNumSentUpdates() 740 { 741 if (numSentUpdates != null) 742 { 743 return numSentUpdates.get(); 744 } 745 return 0; 746 } 747 748 /** 749 * Receives an update message from the replicationServer. 750 * The other types of messages are processed in an opaque way for the caller. 751 * Also responsible for updating the list of pending changes 752 * @return the received message - null if none 753 */ 754 private UpdateMsg receive() 755 { 756 UpdateMsg update = null; 757 758 while (update == null) 759 { 760 InitializeRequestMsg initReqMsg = null; 761 ReplicationMsg msg; 762 try 763 { 764 msg = broker.receive(true, true, false); 765 if (msg == null) 766 { 767 // The server is in the shutdown process 768 return null; 769 } 770 771 if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg)) 772 { 773 logger.trace("LocalizableMessage received <" + msg + ">"); 774 } 775 776 if (msg instanceof AckMsg) 777 { 778 AckMsg ack = (AckMsg) msg; 779 receiveAck(ack); 780 } 781 else if (msg instanceof InitializeRequestMsg) 782 { 783 // Another server requests us to provide entries 784 // for a total update 785 initReqMsg = (InitializeRequestMsg)msg; 786 } 787 else if (msg instanceof InitializeTargetMsg) 788 { 789 // Another server is exporting its entries to us 790 InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg; 791 792 /* 793 This must be done while we are still holding the broker lock 794 because we are now going to receive a bunch of entries from the 795 remote server and we want the import thread to catch them and 796 not the ListenerThread. 797 */ 798 initialize(initTargetMsg, initTargetMsg.getSenderID()); 799 } 800 else if (msg instanceof ErrorMsg) 801 { 802 ErrorMsg errorMsg = (ErrorMsg)msg; 803 ImportExportContext ieCtx = importExportContext.get(); 804 if (ieCtx != null) 805 { 806 /* 807 This is an error termination for the 2 following cases : 808 - either during an export 809 - or before an import really started 810 For example, when we publish a request and the 811 replicationServer did not find the import source. 812 813 A remote error during the import will be received in the 814 receiveEntryBytes() method. 815 */ 816 if (logger.isTraceEnabled()) 817 { 818 logger.trace( 819 "[IE] processErrorMsg:" + getServerId() + 820 " baseDN: " + getBaseDN() + 821 " Error Msg received: " + errorMsg); 822 } 823 824 if (errorMsg.getCreationTime() > ieCtx.startTime) 825 { 826 // consider only ErrorMsg that relate to the current import/export 827 processErrorMsg(errorMsg, ieCtx); 828 } 829 else 830 { 831 /* 832 Simply log - happen when the ErrorMsg relates to a previous 833 attempt of initialization while we have started a new one 834 on this side. 835 */ 836 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 837 } 838 } 839 else 840 { 841 // Simply log - happen if import/export has been terminated 842 // on our side before receiving this ErrorMsg. 843 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 844 } 845 } 846 else if (msg instanceof ChangeStatusMsg) 847 { 848 ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; 849 receiveChangeStatus(csMsg); 850 } 851 else if (msg instanceof UpdateMsg) 852 { 853 update = (UpdateMsg) msg; 854 generator.adjust(update.getCSN()); 855 } 856 else if (msg instanceof InitializeRcvAckMsg) 857 { 858 ImportExportContext ieCtx = importExportContext.get(); 859 if (ieCtx != null) 860 { 861 InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; 862 ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); 863 } 864 // Trash this msg When no input/export is running/should never happen 865 } 866 } 867 catch (SocketTimeoutException e) 868 { 869 // just retry 870 } 871 /* 872 Test if we have received and export request message and 873 if that's the case handle it now. 874 This must be done outside of the portion of code protected 875 by the broker lock so that we keep receiving update 876 when we are doing and export and so that a possible 877 closure of the socket happening when we are publishing the 878 entries to the remote can be handled by the other 879 replay thread when they call this method and therefore the 880 broker.receive() method. 881 */ 882 if (initReqMsg != null) 883 { 884 // Do this work in a thread to allow replay thread continue working 885 ExportThread exportThread = new ExportThread( 886 initReqMsg.getSenderID(), initReqMsg.getInitWindow()); 887 exportThread.start(); 888 } 889 } 890 891 numRcvdUpdates.incrementAndGet(); 892 if (update.isAssured() 893 && broker.getRsGroupId() == getGroupId() 894 && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 895 { 896 assuredSrReceivedUpdates.incrementAndGet(); 897 } 898 return update; 899 } 900 901 /** 902 * Updates the passed monitoring list of errors received for assured messages 903 * (safe data or safe read, depending of the passed list to update) for a 904 * particular server in the list. This increments the counter of error for the 905 * passed server, or creates an initial value of 1 error for it if the server 906 * is not yet present in the map. 907 * @param errorsByServer map of number of errors per serverID 908 * @param sid the ID of the server which produced an error 909 */ 910 private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer, 911 Integer sid) 912 { 913 synchronized (errorsByServer) 914 { 915 Integer serverErrCount = errorsByServer.get(sid); 916 if (serverErrCount == null) 917 { 918 // Server not present in list, create an entry with an 919 // initial number of errors set to 1 920 errorsByServer.put(sid, 1); 921 } else 922 { 923 // Server already present in list, just increment number of 924 // errors for the server 925 int val = serverErrCount; 926 val++; 927 errorsByServer.put(sid, val); 928 } 929 } 930 } 931 932 /** 933 * Do the necessary processing when an AckMsg is received. 934 * 935 * @param ack The AckMsg that was received. 936 */ 937 private void receiveAck(AckMsg ack) 938 { 939 CSN csn = ack.getCSN(); 940 941 // Remove the message for pending ack list (this may already make the thread 942 // that is waiting for the ack be aware of its reception) 943 UpdateMsg update = waitingAckMsgs.remove(csn); 944 945 // Signal waiting thread ack has been received 946 if (update != null) 947 { 948 synchronized (update) 949 { 950 update.notify(); 951 } 952 953 // Analyze status of embedded in the ack to see if everything went well 954 boolean hasTimeout = ack.hasTimeout(); 955 boolean hasReplayErrors = ack.hasReplayError(); 956 boolean hasWrongStatus = ack.hasWrongStatus(); 957 958 AssuredMode updateAssuredMode = update.getAssuredMode(); 959 960 if ( hasTimeout || hasReplayErrors || hasWrongStatus) 961 { 962 /* 963 Some problems detected: message did not correctly reach every 964 requested servers. Log problem 965 */ 966 logger.info(NOTE_DS_RECEIVED_ACK_ERROR, getBaseDN(), getServerId(), update, ack.errorsToString()); 967 968 List<Integer> failedServers = ack.getFailedServers(); 969 970 // Increment assured replication monitoring counters 971 switch (updateAssuredMode) 972 { 973 case SAFE_READ_MODE: 974 assuredSrNotAcknowledgedUpdates.incrementAndGet(); 975 if (hasTimeout) 976 { 977 assuredSrTimeoutUpdates.incrementAndGet(); 978 } 979 if (hasReplayErrors) 980 { 981 assuredSrReplayErrorUpdates.incrementAndGet(); 982 } 983 if (hasWrongStatus) 984 { 985 assuredSrWrongStatusUpdates.incrementAndGet(); 986 } 987 if (failedServers != null) // This should always be the case ! 988 { 989 for(Integer sid : failedServers) 990 { 991 updateAssuredErrorsByServer( 992 assuredSrServerNotAcknowledgedUpdates, sid); 993 } 994 } 995 break; 996 case SAFE_DATA_MODE: 997 // The only possible cause of ack error in safe data mode is timeout 998 if (hasTimeout) // So should always be the case 999 { 1000 assuredSdTimeoutUpdates.incrementAndGet(); 1001 } 1002 if (failedServers != null) // This should always be the case ! 1003 { 1004 for(Integer sid : failedServers) 1005 { 1006 updateAssuredErrorsByServer( 1007 assuredSdServerTimeoutUpdates, sid); 1008 } 1009 } 1010 break; 1011 default: 1012 // Should not happen 1013 } 1014 } else 1015 { 1016 // Update has been acknowledged without errors 1017 // Increment assured replication monitoring counters 1018 switch (updateAssuredMode) 1019 { 1020 case SAFE_READ_MODE: 1021 assuredSrAcknowledgedUpdates.incrementAndGet(); 1022 break; 1023 case SAFE_DATA_MODE: 1024 assuredSdAcknowledgedUpdates.incrementAndGet(); 1025 break; 1026 default: 1027 // Should not happen 1028 } 1029 } 1030 } 1031 } 1032 1033 1034 /* 1035 * After this point the code is related to the Total Update. 1036 */ 1037 1038 /** 1039 * This thread is launched when we want to export data to another server. 1040 * 1041 * When a task is created locally (so this local server is the initiator) 1042 * of the export (Example: dsreplication initialize-all), 1043 * this thread is NOT used but the task thread is running the export instead). 1044 */ 1045 private class ExportThread extends DirectoryThread 1046 { 1047 /** Id of server that will be initialized. */ 1048 private final int serverIdToInitialize; 1049 private final int initWindow; 1050 1051 1052 1053 /** 1054 * Constructor for the ExportThread. 1055 * 1056 * @param serverIdToInitialize 1057 * serverId of server that will receive entries 1058 * @param initWindow 1059 * The value of the initialization window for flow control between 1060 * the importer and the exporter. 1061 */ 1062 public ExportThread(int serverIdToInitialize, int initWindow) 1063 { 1064 super("Export thread from serverId=" + getServerId() + " to serverId=" 1065 + serverIdToInitialize); 1066 this.serverIdToInitialize = serverIdToInitialize; 1067 this.initWindow = initWindow; 1068 } 1069 1070 1071 1072 /** 1073 * Run method for this class. 1074 */ 1075 @Override 1076 public void run() 1077 { 1078 if (logger.isTraceEnabled()) 1079 { 1080 logger.trace("[IE] starting " + getName()); 1081 } 1082 try 1083 { 1084 initializeRemote(serverIdToInitialize, serverIdToInitialize, null, 1085 initWindow); 1086 } catch (DirectoryException de) 1087 { 1088 /* 1089 An error message has been sent to the peer 1090 This server is not the initiator of the export so there is 1091 nothing more to do locally. 1092 */ 1093 } 1094 1095 if (logger.isTraceEnabled()) 1096 { 1097 logger.trace("[IE] ending " + getName()); 1098 } 1099 } 1100 } 1101 1102 /** 1103 * This class contains the context related to an import or export launched on 1104 * the domain. 1105 */ 1106 protected class ImportExportContext 1107 { 1108 /** The private task that initiated the operation. */ 1109 private Task initializeTask; 1110 /** The destination in the case of an export. */ 1111 private int exportTarget = RoutableMsg.UNKNOWN_SERVER; 1112 /** The source in the case of an import. */ 1113 private int importSource = RoutableMsg.UNKNOWN_SERVER; 1114 1115 /** The total entry count expected to be processed. */ 1116 private long entryCount; 1117 /** The count for the entry not yet processed. */ 1118 private long entryLeftCount; 1119 1120 /** Exception raised during the initialization. */ 1121 private DirectoryException exception; 1122 1123 /** Whether the context is related to an import or an export. */ 1124 private final boolean importInProgress; 1125 1126 /** Current counter of messages exchanged during the initialization. */ 1127 private int msgCnt; 1128 1129 /** 1130 * Number of connections lost when we start the initialization. Will help 1131 * counting connections lost during initialization, 1132 */ 1133 private int initNumLostConnections; 1134 1135 /** 1136 * Request message sent when this server has the initializeFromRemote task. 1137 */ 1138 private InitializeRequestMsg initReqMsgSent; 1139 1140 /** 1141 * Start time of the initialization process. ErrorMsg timestamped before 1142 * this startTime will be ignored. 1143 */ 1144 private final long startTime; 1145 1146 /** List for replicas (DS) connected to the topology when initialization started. */ 1147 private final Set<Integer> startList = new HashSet<>(0); 1148 1149 /** 1150 * List for replicas (DS) with a failure (disconnected from the topology) 1151 * since the initialization started. 1152 */ 1153 private final Set<Integer> failureList = new HashSet<>(0); 1154 1155 /** 1156 * Flow control during initialization: for each remote server, counter of 1157 * messages received. 1158 */ 1159 private final Map<Integer, Integer> ackVals = new HashMap<>(); 1160 /** ServerId of the slowest server (the one with the smallest non null counter). */ 1161 private int slowestServerId = -1; 1162 1163 private short exporterProtocolVersion = -1; 1164 1165 /** Window used during this initialization. */ 1166 private int initWindow; 1167 1168 /** Number of attempt already done for this initialization. */ 1169 private short attemptCnt; 1170 1171 /** 1172 * Creates a new IEContext. 1173 * 1174 * @param importInProgress true if the IEContext will be used 1175 * for and import, false if the IEContext 1176 * will be used for and export. 1177 */ 1178 private ImportExportContext(boolean importInProgress) 1179 { 1180 this.importInProgress = importInProgress; 1181 this.startTime = System.currentTimeMillis(); 1182 this.attemptCnt = 0; 1183 } 1184 1185 /** 1186 * Returns a boolean indicating if a total update import is currently in 1187 * Progress. 1188 * 1189 * @return A boolean indicating if a total update import is currently in 1190 * Progress. 1191 */ 1192 boolean importInProgress() 1193 { 1194 return importInProgress; 1195 } 1196 1197 /** 1198 * Returns the total number of entries to be processed when a total update 1199 * is in progress. 1200 * 1201 * @return The total number of entries to be processed when a total update 1202 * is in progress. 1203 */ 1204 long getTotalEntryCount() 1205 { 1206 return entryCount; 1207 } 1208 1209 /** 1210 * Returns the number of entries still to be processed when a total update 1211 * is in progress. 1212 * 1213 * @return The number of entries still to be processed when a total update 1214 * is in progress. 1215 */ 1216 long getLeftEntryCount() 1217 { 1218 return entryLeftCount; 1219 } 1220 1221 /** 1222 * Initializes the import/export counters with the provider value. 1223 * @param total Total number of entries to be processed. 1224 * @throws DirectoryException if an error occurred. 1225 */ 1226 private void initializeCounters(long total) throws DirectoryException 1227 { 1228 entryCount = total; 1229 entryLeftCount = total; 1230 1231 if (initializeTask instanceof InitializeTask) 1232 { 1233 final InitializeTask task = (InitializeTask) initializeTask; 1234 task.setTotal(entryCount); 1235 task.setLeft(entryCount); 1236 } 1237 else if (initializeTask instanceof InitializeTargetTask) 1238 { 1239 final InitializeTargetTask task = (InitializeTargetTask) initializeTask; 1240 task.setTotal(entryCount); 1241 task.setLeft(entryCount); 1242 } 1243 } 1244 1245 /** 1246 * Update the counters of the task for each entry processed during 1247 * an import or export. 1248 * 1249 * @param entriesDone The number of entries that were processed 1250 * since the last time this method was called. 1251 * 1252 * @throws DirectoryException if an error occurred. 1253 */ 1254 private void updateCounters(int entriesDone) throws DirectoryException 1255 { 1256 entryLeftCount -= entriesDone; 1257 1258 if (initializeTask != null) 1259 { 1260 if (initializeTask instanceof InitializeTask) 1261 { 1262 ((InitializeTask)initializeTask).setLeft(entryLeftCount); 1263 } 1264 else if (initializeTask instanceof InitializeTargetTask) 1265 { 1266 ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); 1267 } 1268 } 1269 } 1270 1271 /** {@inheritDoc} */ 1272 @Override 1273 public String toString() 1274 { 1275 return "[Entry count=" + this.entryCount + 1276 ", Entry left count=" + this.entryLeftCount + "]"; 1277 } 1278 1279 /** 1280 * Gets the server id of the exporting server. 1281 * @return the server id of the exporting server. 1282 */ 1283 public int getExportTarget() 1284 { 1285 return exportTarget; 1286 } 1287 1288 /** 1289 * Gets the server id of the importing server. 1290 * @return the server id of the importing server. 1291 */ 1292 public int getImportSource() 1293 { 1294 return importSource; 1295 } 1296 1297 /** 1298 * Get the exception that occurred during the import/export. 1299 * @return the exception that occurred during the import/export. 1300 */ 1301 public DirectoryException getException() 1302 { 1303 return exception; 1304 } 1305 1306 /** 1307 * Set the exception that occurred during the import/export. 1308 * @param exception the exception that occurred during the import/export. 1309 */ 1310 public void setException(DirectoryException exception) 1311 { 1312 this.exception = exception; 1313 } 1314 1315 /** 1316 * Only sets the exception that occurred during the import/export if none 1317 * was already set on this object. 1318 * 1319 * @param exception the exception that occurred during the import/export. 1320 */ 1321 public void setExceptionIfNoneSet(DirectoryException exception) 1322 { 1323 if (exception == null) 1324 { 1325 this.exception = exception; 1326 } 1327 } 1328 1329 /** 1330 * Set the id of the EntryMsg acknowledged from a receiver (importer)server. 1331 * (updated via the listener thread) 1332 * @param serverId serverId of the acknowledger/receiver/importer server. 1333 * @param numAck id of the message received. 1334 */ 1335 private void setAckVal(int serverId, int numAck) 1336 { 1337 if (logger.isTraceEnabled()) 1338 { 1339 logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck); 1340 } 1341 1342 this.ackVals.put(serverId, numAck); 1343 1344 // Recompute the server with the minAck returned,means the slowest server. 1345 slowestServerId = serverId; 1346 for (Integer sid : importExportContext.get().ackVals.keySet()) 1347 { 1348 if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) 1349 { 1350 slowestServerId = sid; 1351 } 1352 } 1353 } 1354 1355 /** 1356 * Returns the serverId of the server that acknowledged the smallest 1357 * EntryMsg id. 1358 * @return serverId of the server with latest acknowledge. 1359 * 0 when no ack has been received yet. 1360 */ 1361 public int getSlowestServer() 1362 { 1363 if (logger.isTraceEnabled()) 1364 { 1365 logger.trace("[IE] getSlowestServer" + slowestServerId 1366 + " " + this.ackVals.get(slowestServerId)); 1367 } 1368 1369 return this.slowestServerId; 1370 } 1371 1372 } 1373 1374 /** 1375 * Verifies that the given string represents a valid source 1376 * from which this server can be initialized. 1377 * 1378 * @param targetString The string representing the source 1379 * @return The source as a integer value 1380 * @throws DirectoryException if the string is not valid 1381 */ 1382 public int decodeTarget(String targetString) throws DirectoryException 1383 { 1384 if ("all".equalsIgnoreCase(targetString)) 1385 { 1386 return RoutableMsg.ALL_SERVERS; 1387 } 1388 1389 // So should be a serverID 1390 try 1391 { 1392 int target = Integer.decode(targetString); 1393 if (target >= 0) 1394 { 1395 // FIXME Could we check now that it is a know server in the domain ? 1396 // JNR: Yes please 1397 } 1398 return target; 1399 } 1400 catch (Exception e) 1401 { 1402 ResultCode resultCode = ResultCode.OTHER; 1403 LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get(); 1404 throw new DirectoryException(resultCode, message, e); 1405 } 1406 } 1407 1408 /** 1409 * Initializes a remote server from this server. 1410 * <p> 1411 * The {@code exportBackend(OutputStream)} will therefore be called 1412 * on this server, and the {@code importBackend(InputStream)} 1413 * will be called on the remote server. 1414 * <p> 1415 * The InputStream and OutputStream given as a parameter to those 1416 * methods will be connected through the replication protocol. 1417 * 1418 * @param target The server-id of the server that should be initialized. 1419 * The target can be discovered using the 1420 * {@link #getReplicaInfos()} method. 1421 * @param initTask The task that triggers this initialization and that should 1422 * be updated with its progress. 1423 * 1424 * @throws DirectoryException If it was not possible to publish the 1425 * Initialization message to the Topology. 1426 */ 1427 public void initializeRemote(int target, Task initTask) 1428 throws DirectoryException 1429 { 1430 initializeRemote(target, getServerId(), initTask, getInitWindow()); 1431 } 1432 1433 /** 1434 * Process the initialization of some other server or servers in the topology 1435 * specified by the target argument when this initialization specifying the 1436 * server that requests the initialization. 1437 * 1438 * @param serverToInitialize The target server that should be initialized. 1439 * @param serverRunningTheTask The server that initiated the export. It can 1440 * be the serverID of this server, or the serverID of a remote server. 1441 * @param initTask The task in this server that triggers this initialization 1442 * and that should be updated with its progress. Null when the export is done 1443 * following a request coming from a remote server (task is remote). 1444 * @param initWindow The value of the initialization window for flow control 1445 * between the importer and the exporter. 1446 * 1447 * @exception DirectoryException When an error occurs. No exception raised 1448 * means success. 1449 */ 1450 protected void initializeRemote(int serverToInitialize, 1451 int serverRunningTheTask, Task initTask, int initWindow) 1452 throws DirectoryException 1453 { 1454 final ImportExportContext ieCtx = acquireIEContext(false); 1455 1456 /* 1457 We manage the list of servers to initialize in order : 1458 - to test at the end that all expected servers have reconnected 1459 after their import and with the right genId 1460 - to update the task with the server(s) where this test failed 1461 */ 1462 1463 if (serverToInitialize == RoutableMsg.ALL_SERVERS) 1464 { 1465 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL, 1466 countEntries(), getBaseDN(), getServerId()); 1467 1468 ieCtx.startList.addAll(getReplicaInfos().keySet()); 1469 1470 // We manage the list of servers with which a flow control can be enabled 1471 for (DSInfo dsi : getReplicaInfos().values()) 1472 { 1473 if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) 1474 { 1475 ieCtx.setAckVal(dsi.getDsId(), 0); 1476 } 1477 } 1478 } 1479 else 1480 { 1481 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(), 1482 getBaseDN(), getServerId(), serverToInitialize); 1483 1484 ieCtx.startList.add(serverToInitialize); 1485 1486 // We manage the list of servers with which a flow control can be enabled 1487 for (DSInfo dsi : getReplicaInfos().values()) 1488 { 1489 if (dsi.getDsId() == serverToInitialize && 1490 dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) 1491 { 1492 ieCtx.setAckVal(dsi.getDsId(), 0); 1493 } 1494 } 1495 } 1496 1497 DirectoryException exportRootException = null; 1498 1499 // loop for the case where the exporter is the initiator 1500 int attempt = 0; 1501 boolean done = false; 1502 while (!done && ++attempt < 2) // attempt loop 1503 { 1504 try 1505 { 1506 ieCtx.exportTarget = serverToInitialize; 1507 if (initTask != null) 1508 { 1509 ieCtx.initializeTask = initTask; 1510 } 1511 ieCtx.initializeCounters(countEntries()); 1512 ieCtx.msgCnt = 0; 1513 ieCtx.initNumLostConnections = broker.getNumLostConnections(); 1514 ieCtx.initWindow = initWindow; 1515 1516 // Send start message to the peer 1517 InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( 1518 getBaseDN(), getServerId(), serverToInitialize, 1519 serverRunningTheTask, ieCtx.entryCount, initWindow); 1520 1521 broker.publish(initTargetMsg); 1522 1523 // Wait for all servers to be ok 1524 waitForRemoteStartOfInit(ieCtx); 1525 1526 // Servers that left in the list are those for which we could not test 1527 // that they have been successfully initialized. 1528 if (!ieCtx.failureList.isEmpty()) 1529 { 1530 throw new DirectoryException( 1531 ResultCode.OTHER, 1532 ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDN(), ieCtx.failureList)); 1533 } 1534 1535 exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); 1536 1537 // Notify the peer of the success 1538 broker.publish( 1539 new DoneMsg(getServerId(), initTargetMsg.getDestination())); 1540 } 1541 catch(DirectoryException exportException) 1542 { 1543 // Give priority to the first exception raised - stored in the context 1544 final DirectoryException ieEx = ieCtx.exception; 1545 exportRootException = ieEx != null ? ieEx : exportException; 1546 } 1547 1548 if (logger.isTraceEnabled()) 1549 { 1550 logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName() 1551 + " export ends with connected=" + broker.isConnected() 1552 + " exportRootException=" + exportRootException); 1553 } 1554 1555 if (exportRootException != null) 1556 { 1557 try 1558 { 1559 /* 1560 Handling the errors during export 1561 1562 Note: we could have lost the connection and another thread 1563 the listener one) has already managed to reconnect. 1564 So we MUST rely on the test broker.isConnected() 1565 ONLY to do 'wait to be reconnected by another thread' 1566 (if not yet reconnected already). 1567 */ 1568 if (!broker.isConnected()) 1569 { 1570 // We are still disconnected, so we wait for the listener thread 1571 // to reconnect - wait 10s 1572 if (logger.isTraceEnabled()) 1573 { 1574 logger.trace("[IE] Exporter wait for reconnection by the listener thread"); 1575 } 1576 int att=0; 1577 while (!broker.shuttingDown() 1578 && !broker.isConnected() 1579 && ++att < 100) 1580 { 1581 try { Thread.sleep(100); } 1582 catch(Exception e){ /* do nothing */ } 1583 } 1584 } 1585 1586 if (initTask != null 1587 && broker.isConnected() 1588 && serverToInitialize != RoutableMsg.ALL_SERVERS) 1589 { 1590 /* 1591 NewAttempt case : In the case where 1592 - it's not an InitializeAll 1593 - AND the previous export attempt failed 1594 - AND we are (now) connected 1595 - and we own the task and this task is not an InitializeAll 1596 Let's : 1597 - sleep to let time to the other peer to reconnect if needed 1598 - and launch another attempt 1599 */ 1600 try { Thread.sleep(1000); } 1601 catch(Exception e){ /* do nothing */ } 1602 1603 logger.info(NOTE_RESENDING_INIT_TARGET, exportRootException.getLocalizedMessage()); 1604 continue; 1605 } 1606 1607 broker.publish(new ErrorMsg( 1608 serverToInitialize, exportRootException.getMessageObject())); 1609 } 1610 catch(Exception e) 1611 { 1612 // Ignore the failure raised while proceeding the root failure 1613 } 1614 } 1615 1616 // We are always done for this export ... 1617 // ... except in the NewAttempt case (see above) 1618 done = true; 1619 1620 } // attempt loop 1621 1622 // Wait for all servers to be ok, and build the failure list 1623 waitForRemoteEndOfInit(ieCtx); 1624 1625 // Servers that left in the list are those for which we could not test 1626 // that they have been successfully initialized. 1627 if (!ieCtx.failureList.isEmpty() && exportRootException == null) 1628 { 1629 exportRootException = new DirectoryException(ResultCode.OTHER, 1630 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(getGenerationID(), ieCtx.failureList)); 1631 } 1632 1633 // Don't forget to release IEcontext acquired at beginning. 1634 releaseIEContext(); // FIXME should not this be in a finally? 1635 1636 final String cause = exportRootException == null ? "" 1637 : exportRootException.getLocalizedMessage(); 1638 if (serverToInitialize == RoutableMsg.ALL_SERVERS) 1639 { 1640 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL, 1641 getBaseDN(), getServerId(), cause); 1642 } 1643 else 1644 { 1645 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END, 1646 getBaseDN(), getServerId(), serverToInitialize, cause); 1647 } 1648 1649 1650 if (exportRootException != null) 1651 { 1652 throw exportRootException; 1653 } 1654 } 1655 1656 /** 1657 * For all remote servers in the start list: 1658 * - wait it has finished the import and present the expected generationID, 1659 * - build the failureList. 1660 */ 1661 private void waitForRemoteStartOfInit(ImportExportContext ieCtx) 1662 { 1663 final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList); 1664 1665 if (logger.isTraceEnabled()) 1666 { 1667 logger.trace("[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); 1668 } 1669 1670 int waitResultAttempt = 0; 1671 boolean done; 1672 do 1673 { 1674 done = true; 1675 for (DSInfo dsi : getReplicaInfos().values()) 1676 { 1677 if (logger.isTraceEnabled()) 1678 { 1679 logger.trace( 1680 "[IE] wait for start dsId " + dsi.getDsId() 1681 + " " + dsi.getStatus() 1682 + " " + dsi.getGenerationId() 1683 + " " + getGenerationID()); 1684 } 1685 if (ieCtx.startList.contains(dsi.getDsId())) 1686 { 1687 if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) 1688 { 1689 // this one is still not doing the Full Update ... retry later 1690 done = false; 1691 try { Thread.sleep(100); 1692 } 1693 catch (InterruptedException e) { 1694 Thread.currentThread().interrupt(); 1695 } 1696 waitResultAttempt++; 1697 break; 1698 } 1699 else 1700 { 1701 // this one is ok 1702 replicasWeAreWaitingFor.remove(dsi.getDsId()); 1703 } 1704 } 1705 } 1706 } 1707 while (!done && waitResultAttempt < 1200 && !broker.shuttingDown()); 1708 1709 ieCtx.failureList.addAll(replicasWeAreWaitingFor); 1710 1711 if (logger.isTraceEnabled()) 1712 { 1713 logger.trace("[IE] wait for start ends with " + ieCtx.failureList); 1714 } 1715 } 1716 1717 /** 1718 * For all remote servers in the start list: 1719 * - wait it has finished the import and present the expected generationID, 1720 * - build the failureList. 1721 */ 1722 private void waitForRemoteEndOfInit(ImportExportContext ieCtx) 1723 { 1724 final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList); 1725 1726 if (logger.isTraceEnabled()) 1727 { 1728 logger.trace("[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); 1729 } 1730 1731 /* 1732 In case some new servers appear during the init, we want them to be 1733 considered in the processing of sorting the successfully initialized 1734 and the others 1735 */ 1736 replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet()); 1737 1738 boolean done; 1739 do 1740 { 1741 done = true; 1742 int reconnectMaxDelayInSec = 10; 1743 int reconnectWait = 0; 1744 Iterator<Integer> it = replicasWeAreWaitingFor.iterator(); 1745 while (it.hasNext()) 1746 { 1747 int serverId = it.next(); 1748 if (ieCtx.failureList.contains(serverId)) 1749 { 1750 /* 1751 this server has already been in error during initialization 1752 don't wait for it 1753 */ 1754 continue; 1755 } 1756 1757 DSInfo dsInfo = getConnectedRemoteDS(serverId); 1758 if (dsInfo == null) 1759 { 1760 /* 1761 this server is disconnected 1762 may be for a long time if it crashed or had been stopped 1763 may be just the time to reconnect after import : should be short 1764 */ 1765 if (++reconnectWait<reconnectMaxDelayInSec) 1766 { 1767 // let's still wait to give a chance to this server to reconnect 1768 done = false; 1769 } 1770 // Else we left enough time to the servers to reconnect 1771 } 1772 else 1773 { 1774 // this server is connected 1775 if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS) 1776 { 1777 // this one is still doing the Full Update ... retry later 1778 done = false; 1779 break; 1780 } 1781 1782 if (dsInfo.getGenerationId() == getGenerationID()) 1783 { // and with the expected generationId 1784 // We're done with this server 1785 it.remove(); 1786 } 1787 } 1788 } 1789 1790 // loop and wait 1791 if (!done) 1792 { 1793 try { Thread.sleep(1000); } 1794 catch (InterruptedException e) { 1795 Thread.currentThread().interrupt(); 1796 } // 1sec 1797 } 1798 } 1799 while (!done && !broker.shuttingDown()); // infinite wait 1800 1801 ieCtx.failureList.addAll(replicasWeAreWaitingFor); 1802 1803 if (logger.isTraceEnabled()) 1804 { 1805 logger.trace("[IE] wait for end ends with " + ieCtx.failureList); 1806 } 1807 } 1808 1809 /** 1810 * Get the ServerState maintained by the Concrete class. 1811 * 1812 * @return the ServerState maintained by the Concrete class. 1813 */ 1814 public ServerState getServerState() 1815 { 1816 return state; 1817 } 1818 1819 /** 1820 * Acquire and initialize the import/export context, verifying no other 1821 * import/export is in progress. 1822 */ 1823 private ImportExportContext acquireIEContext(boolean importInProgress) 1824 throws DirectoryException 1825 { 1826 final ImportExportContext ieCtx = new ImportExportContext(importInProgress); 1827 if (!importExportContext.compareAndSet(null, ieCtx)) 1828 { 1829 // Rejects 2 simultaneous exports 1830 LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); 1831 throw new DirectoryException(ResultCode.OTHER, message); 1832 } 1833 return ieCtx; 1834 } 1835 1836 private void releaseIEContext() 1837 { 1838 importExportContext.set(null); 1839 } 1840 1841 /** 1842 * Processes an error message received while an export is 1843 * on going, or an import will start. 1844 * 1845 * @param errorMsg The error message received. 1846 */ 1847 private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx) 1848 { 1849 //Exporting must not be stopped on the first error, if we run initialize-all 1850 if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS) 1851 { 1852 // The ErrorMsg is received while we have started an initialization 1853 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1854 ResultCode.OTHER, errorMsg.getDetails())); 1855 1856 /* 1857 * This can happen : 1858 * - on the first InitReqMsg sent when source in not known for example 1859 * - on the next attempt when source crashed and did not reconnect 1860 * even after the nextInitAttemptDelay 1861 * During the import, the ErrorMsg will be received by receiveEntryBytes 1862 */ 1863 if (ieCtx.initializeTask instanceof InitializeTask) 1864 { 1865 // Update the task that initiated the import 1866 ((InitializeTask) ieCtx.initializeTask) 1867 .updateTaskCompletionState(ieCtx.getException()); 1868 1869 releaseIEContext(); 1870 } 1871 } 1872 } 1873 1874 /** 1875 * Receives bytes related to an entry in the context of an import to 1876 * initialize the domain (called by ReplLDIFInputStream). 1877 * 1878 * @return The bytes. Null when the Done or Err message has been received 1879 */ 1880 protected byte[] receiveEntryBytes() 1881 { 1882 ReplicationMsg msg; 1883 while (true) 1884 { 1885 ImportExportContext ieCtx = importExportContext.get(); 1886 try 1887 { 1888 // In the context of the total update, we don't want any automatic 1889 // re-connection done transparently by the broker because of a better 1890 // RS or because of a connection failure. 1891 // We want to be notified of topology change in order to track a 1892 // potential disconnection of the exporter. 1893 msg = broker.receive(false, false, true); 1894 1895 if (logger.isTraceEnabled()) 1896 { 1897 logger.trace("[IE] In " 1898 + broker.getReplicationMonitorInstanceName() 1899 + ", receiveEntryBytes " + msg); 1900 } 1901 1902 if (msg == null) 1903 { 1904 if (broker.shuttingDown()) 1905 { 1906 // The server is in the shutdown process 1907 return null; 1908 } 1909 else 1910 { 1911 // Handle connection issues 1912 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1913 ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT 1914 .get(broker.getReplicationServer()))); 1915 return null; 1916 } 1917 } 1918 1919 // Check good ordering of msg received 1920 if (msg instanceof EntryMsg) 1921 { 1922 EntryMsg entryMsg = (EntryMsg)msg; 1923 byte[] entryBytes = entryMsg.getEntryBytes(); 1924 ieCtx.updateCounters(countEntryLimits(entryBytes)); 1925 1926 if (ieCtx.exporterProtocolVersion >= 1927 ProtocolVersion.REPLICATION_PROTOCOL_V4) 1928 { 1929 // check the msgCnt of the msg received to check ordering 1930 if (++ieCtx.msgCnt != entryMsg.getMsgId()) 1931 { 1932 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1933 ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(ieCtx.msgCnt, entryMsg.getMsgId()))); 1934 return null; 1935 } 1936 1937 // send the ack of flow control mgmt 1938 if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0) 1939 { 1940 final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( 1941 getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt); 1942 broker.publish(amsg, false); 1943 if (logger.isTraceEnabled()) 1944 { 1945 logger.trace("[IE] In " 1946 + broker.getReplicationMonitorInstanceName() 1947 + ", publish InitializeRcvAckMsg" + amsg); 1948 } 1949 } 1950 } 1951 return entryBytes; 1952 } 1953 else if (msg instanceof DoneMsg) 1954 { 1955 /* 1956 This is the normal termination of the import 1957 No error is stored and the import is ended by returning null 1958 */ 1959 return null; 1960 } 1961 else if (msg instanceof ErrorMsg) 1962 { 1963 /* 1964 This is an error termination during the import 1965 The error is stored and the import is ended by returning null 1966 */ 1967 if (ieCtx.getException() == null) 1968 { 1969 ErrorMsg errMsg = (ErrorMsg)msg; 1970 if (errMsg.getCreationTime() > ieCtx.startTime) 1971 { 1972 ieCtx.setException( 1973 new DirectoryException(ResultCode.OTHER,errMsg.getDetails())); 1974 return null; 1975 } 1976 } 1977 } 1978 else 1979 { 1980 // Other messages received during an import are trashed except 1981 // the topologyMsg. 1982 if (msg instanceof TopologyMsg 1983 && getConnectedRemoteDS(ieCtx.importSource) == null) 1984 { 1985 LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get( 1986 getBaseDN(), getServerId(), ieCtx.importSource); 1987 ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, errMsg)); 1988 return null; 1989 } 1990 } 1991 } 1992 catch(Exception e) 1993 { 1994 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1995 ResultCode.OTHER, 1996 ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); 1997 } 1998 } 1999 } 2000 2001 /** 2002 * Count the number of entries in the provided byte[]. 2003 * This is based on the hypothesis that the entries are separated 2004 * by a "\n\n" String. 2005 * 2006 * @param entryBytes the set of bytes containing one or more entries. 2007 * @return The number of entries in the provided byte[]. 2008 */ 2009 private int countEntryLimits(byte[] entryBytes) 2010 { 2011 return countEntryLimits(entryBytes, 0, entryBytes.length); 2012 } 2013 2014 /** 2015 * Count the number of entries in the provided byte[]. 2016 * This is based on the hypothesis that the entries are separated 2017 * by a "\n\n" String. 2018 * 2019 * @param entryBytes the set of bytes containing one or more entries. 2020 * @return The number of entries in the provided byte[]. 2021 */ 2022 private int countEntryLimits(byte[] entryBytes, int pos, int length) 2023 { 2024 int entryCount = 0; 2025 int count = 0; 2026 while (count<=length-2) 2027 { 2028 if (entryBytes[pos+count] == '\n' && entryBytes[pos+count+1] == '\n') 2029 { 2030 entryCount++; 2031 count++; 2032 } 2033 count++; 2034 } 2035 return entryCount; 2036 } 2037 2038 /** 2039 * Exports an entry in LDIF format. 2040 * 2041 * @param lDIFEntry The entry to be exported in byte[] form. 2042 * @param pos The starting Position in the array. 2043 * @param length Number of array elements to be copied. 2044 * 2045 * @throws IOException when an error occurred. 2046 */ 2047 void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) 2048 throws IOException 2049 { 2050 if (logger.isTraceEnabled()) 2051 { 2052 logger.trace("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry)); 2053 } 2054 2055 // build the message 2056 ImportExportContext ieCtx = importExportContext.get(); 2057 EntryMsg entryMessage = new EntryMsg( 2058 getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, 2059 ++ieCtx.msgCnt); 2060 2061 // Waiting the slowest loop 2062 while (!broker.shuttingDown()) 2063 { 2064 /* 2065 If an error was raised - like receiving an ErrorMsg from a remote 2066 server that have been stored by the listener thread in the ieContext, 2067 we just abandon the export by throwing an exception. 2068 */ 2069 if (ieCtx.getException() != null) 2070 { 2071 throw new IOException(ieCtx.getException().getMessage()); 2072 } 2073 2074 int slowestServerId = ieCtx.getSlowestServer(); 2075 if (getConnectedRemoteDS(slowestServerId) == null) 2076 { 2077 ieCtx.setException(new DirectoryException(ResultCode.OTHER, 2078 ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer()))); 2079 2080 throw new IOException("IOException with nested DirectoryException", 2081 ieCtx.getException()); 2082 } 2083 2084 int ourLastExportedCnt = ieCtx.msgCnt; 2085 int slowestCnt = ieCtx.ackVals.get(slowestServerId); 2086 2087 if (logger.isTraceEnabled()) 2088 { 2089 logger.trace("[IE] Entering exportLDIFEntry waiting " + 2090 " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); 2091 } 2092 2093 if (ourLastExportedCnt - slowestCnt > ieCtx.initWindow) 2094 { 2095 if (logger.isTraceEnabled()) 2096 { 2097 logger.trace("[IE] Entering exportLDIFEntry waiting"); 2098 } 2099 2100 // our export is too far beyond the slowest importer - let's wait 2101 try { Thread.sleep(100); } 2102 catch(Exception e) { /* do nothing */ } 2103 2104 // process any connection error 2105 if (broker.hasConnectionError() 2106 || broker.getNumLostConnections() != ieCtx.initNumLostConnections) 2107 { 2108 // publish failed - store the error in the ieContext ... 2109 DirectoryException de = new DirectoryException(ResultCode.OTHER, 2110 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId())); 2111 ieCtx.setExceptionIfNoneSet(de); 2112 // .. and abandon the export by throwing an exception. 2113 throw new IOException(de.getMessage()); 2114 } 2115 } 2116 else 2117 { 2118 if (logger.isTraceEnabled()) 2119 { 2120 logger.trace("[IE] slowest got to us => stop waiting"); 2121 } 2122 break; 2123 } 2124 } // Waiting the slowest loop 2125 2126 if (logger.isTraceEnabled()) 2127 { 2128 logger.trace("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry)); 2129 } 2130 2131 boolean sent = broker.publish(entryMessage, false); 2132 2133 // process any publish error 2134 if (!sent 2135 || broker.hasConnectionError() 2136 || broker.getNumLostConnections() != ieCtx.initNumLostConnections) 2137 { 2138 // publish failed - store the error in the ieContext ... 2139 DirectoryException de = new DirectoryException(ResultCode.OTHER, 2140 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId())); 2141 ieCtx.setExceptionIfNoneSet(de); 2142 // .. and abandon the export by throwing an exception. 2143 throw new IOException(de.getMessage()); 2144 } 2145 2146 // publish succeeded 2147 try 2148 { 2149 ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length)); 2150 } 2151 catch (DirectoryException de) 2152 { 2153 ieCtx.setExceptionIfNoneSet(de); 2154 // .. and abandon the export by throwing an exception. 2155 throw new IOException(de.getMessage()); 2156 } 2157 } 2158 2159 /** 2160 * Initializes asynchronously this domain from a remote source server. 2161 * Before returning from this call, for the provided task : 2162 * - the progressing counters are updated during the initialization using 2163 * setTotal() and setLeft(). 2164 * - the end of the initialization using updateTaskCompletionState(). 2165 * <p> 2166 * When this method is called, a request for initialization is sent to the 2167 * remote source server requesting initialization. 2168 * <p> 2169 * 2170 * @param source The server-id of the source from which to initialize. 2171 * The source can be discovered using the 2172 * {@link #getReplicaInfos()} method. 2173 * 2174 * @param initTask The task that launched the initialization 2175 * and should be updated of its progress. 2176 * 2177 * @throws DirectoryException If it was not possible to publish the 2178 * Initialization message to the Topology. 2179 * The task state is updated. 2180 */ 2181 public void initializeFromRemote(int source, Task initTask) 2182 throws DirectoryException 2183 { 2184 if (logger.isTraceEnabled()) 2185 { 2186 logger.trace("[IE] Entering initializeFromRemote for " + this); 2187 } 2188 2189 LocalizableMessage errMsg = !broker.isConnected() 2190 ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDN()) 2191 : null; 2192 2193 /* 2194 We must not test here whether the remote source is connected to 2195 the topology by testing if it stands in the replicas list since. 2196 In the case of a re-attempt of initialization, the listener thread is 2197 running this method directly coming from initialize() method and did 2198 not processed any topology message in between the failure and the 2199 new attempt. 2200 */ 2201 try 2202 { 2203 /* 2204 We must immediately acquire a context to store the task inside 2205 The context will be used when we (the listener thread) will receive 2206 the InitializeTargetMsg, process the import, and at the end 2207 update the task. 2208 */ 2209 2210 final ImportExportContext ieCtx = acquireIEContext(true); 2211 ieCtx.initializeTask = initTask; 2212 ieCtx.attemptCnt = 0; 2213 ieCtx.initReqMsgSent = new InitializeRequestMsg( 2214 getBaseDN(), getServerId(), source, getInitWindow()); 2215 broker.publish(ieCtx.initReqMsgSent); 2216 2217 /* 2218 The normal success processing is now to receive InitTargetMsg then 2219 entries from the remote server. 2220 The error cases are : 2221 - either local error immediately caught below 2222 - a remote error we will receive as an ErrorMsg 2223 */ 2224 } 2225 catch(DirectoryException de) 2226 { 2227 errMsg = de.getMessageObject(); 2228 } 2229 catch(Exception e) 2230 { 2231 // Should not happen 2232 errMsg = LocalizableMessage.raw(e.getLocalizedMessage()); 2233 logger.error(errMsg); 2234 } 2235 2236 // When error, update the task and raise the error to the caller 2237 if (errMsg != null) 2238 { 2239 // No need to call here updateTaskCompletionState - will be done 2240 // by the caller 2241 releaseIEContext(); 2242 throw new DirectoryException(ResultCode.OTHER, errMsg); 2243 } 2244 } 2245 2246 /** 2247 * Processes an InitializeTargetMsg received from a remote server 2248 * meaning processes an initialization from the entries expected to be 2249 * received now. 2250 * 2251 * @param initTargetMsgReceived The message received from the remote server. 2252 * 2253 * @param requesterServerId The serverId of the server that requested the 2254 * initialization meaning the server where the 2255 * task has initially been created (this server, 2256 * or the remote server). 2257 */ 2258 private void initialize(InitializeTargetMsg initTargetMsgReceived, int requesterServerId) 2259 { 2260 if (logger.isTraceEnabled()) 2261 { 2262 logger.trace("[IE] Entering initialize - domain=" + this); 2263 } 2264 2265 InitializeTask initFromTask = null; 2266 int source = initTargetMsgReceived.getSenderID(); 2267 ImportExportContext ieCtx = importExportContext.get(); 2268 try 2269 { 2270 // Log starting 2271 logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START, getBaseDN(), 2272 initTargetMsgReceived.getSenderID(), getServerId()); 2273 2274 // Go into full update status 2275 setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); 2276 2277 // Acquire an import context if no already done (and initialize). 2278 if (initTargetMsgReceived.getInitiatorID() != getServerId()) 2279 { 2280 /* 2281 The initTargetMsgReceived is for an import initiated by the remote server. 2282 Test and set if no import already in progress 2283 */ 2284 ieCtx = acquireIEContext(true); 2285 } 2286 2287 // Initialize stuff 2288 ieCtx.importSource = source; 2289 ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); 2290 ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); 2291 ieCtx.exporterProtocolVersion = getProtocolVersion(source); 2292 initFromTask = (InitializeTask) ieCtx.initializeTask; 2293 2294 // Launch the import 2295 importBackend(new ReplInputStream(this)); 2296 } 2297 catch (DirectoryException e) 2298 { 2299 /* 2300 Store the exception raised. It will be considered if no other exception 2301 has been previously stored in the context 2302 */ 2303 ieCtx.setExceptionIfNoneSet(e); 2304 } 2305 finally 2306 { 2307 if (logger.isTraceEnabled()) 2308 { 2309 logger.trace("[IE] Domain=" + this 2310 + " ends import with exception=" + ieCtx.getException() 2311 + " connected=" + broker.isConnected()); 2312 } 2313 2314 /* 2315 It is necessary to restart (reconnect to RS) for different reasons 2316 - when everything went well, reconnect in order to exchange 2317 new state, new generation ID 2318 - when we have connection failure, reconnect to retry a new import 2319 right here, right now 2320 we never want retryOnFailure if we fails reconnecting in the restart. 2321 */ 2322 broker.reStart(false); 2323 2324 if (ieCtx.getException() != null 2325 && broker.isConnected() 2326 && initFromTask != null 2327 && ++ieCtx.attemptCnt < 2) 2328 { 2329 /* 2330 Worth a new attempt 2331 since initFromTask is in this server, connection is ok 2332 */ 2333 try 2334 { 2335 /* 2336 Wait for the exporter to stabilize - eventually reconnect as 2337 well if it was connected to the same RS than the one we lost ... 2338 */ 2339 Thread.sleep(1000); 2340 2341 /* 2342 Restart the whole import protocol exchange by sending again 2343 the request 2344 */ 2345 logger.info(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST, 2346 ieCtx.getException().getLocalizedMessage()); 2347 2348 broker.publish(ieCtx.initReqMsgSent); 2349 2350 ieCtx.initializeCounters(0); 2351 ieCtx.exception = null; 2352 ieCtx.msgCnt = 0; 2353 2354 // Processing of the received initTargetMsgReceived is done 2355 // let's wait for the next one 2356 return; 2357 } 2358 catch(Exception e) 2359 { 2360 /* 2361 An error occurs when sending a new request for a new import. 2362 This error is not stored, preferring to keep the initial one. 2363 */ 2364 logger.error(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST, 2365 e.getLocalizedMessage(), ieCtx.getException().getLocalizedMessage()); 2366 } 2367 } 2368 2369 // =================== 2370 // No new attempt case 2371 2372 if (logger.isTraceEnabled()) 2373 { 2374 logger.trace("[IE] Domain=" + this 2375 + " ends initialization with exception=" + ieCtx.getException() 2376 + " connected=" + broker.isConnected() 2377 + " task=" + initFromTask 2378 + " attempt=" + ieCtx.attemptCnt); 2379 } 2380 2381 try 2382 { 2383 if (broker.isConnected() && ieCtx.getException() != null) 2384 { 2385 // Let's notify the exporter 2386 ErrorMsg errorMsg = new ErrorMsg(requesterServerId, 2387 ieCtx.getException().getMessageObject()); 2388 broker.publish(errorMsg); 2389 } 2390 /* 2391 Update the task that initiated the import must be the last thing. 2392 Particularly, broker.restart() after import success must be done 2393 before some other operations/tasks to be launched, 2394 like resetting the generation ID. 2395 */ 2396 if (initFromTask != null) 2397 { 2398 initFromTask.updateTaskCompletionState(ieCtx.getException()); 2399 } 2400 } 2401 finally 2402 { 2403 String errorMsg = ieCtx.getException() != null ? ieCtx.getException().getLocalizedMessage() : ""; 2404 logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END, 2405 getBaseDN(), initTargetMsgReceived.getSenderID(), getServerId(), errorMsg); 2406 releaseIEContext(); 2407 } // finally 2408 } // finally 2409 } 2410 2411 /** 2412 * Return the protocol version of the DS related to the provided serverId. 2413 * Returns -1 when the protocol version is not known. 2414 * @param dsServerId The provided serverId. 2415 * @return The protocol version. 2416 */ 2417 private short getProtocolVersion(int dsServerId) 2418 { 2419 final DSInfo dsInfo = getReplicaInfos().get(dsServerId); 2420 if (dsInfo != null) 2421 { 2422 return dsInfo.getProtocolVersion(); 2423 } 2424 return -1; 2425 } 2426 2427 /** 2428 * Sets the status to a new value depending of the passed status machine 2429 * event. 2430 * @param event The event that may make the status be changed 2431 */ 2432 protected void signalNewStatus(StatusMachineEvent event) 2433 { 2434 setNewStatus(event); 2435 broker.signalStatusChange(status); 2436 } 2437 2438 private void setNewStatus(StatusMachineEvent event) 2439 { 2440 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 2441 if (newStatus == ServerStatus.INVALID_STATUS) 2442 { 2443 logger.error(ERR_DS_CANNOT_CHANGE_STATUS, getBaseDN(), getServerId(), status, event); 2444 return; 2445 } 2446 2447 if (newStatus != status) 2448 { 2449 // Reset status date 2450 lastStatusChangeDate = new Date(); 2451 // Reset monitoring counters if reconnection 2452 if (newStatus == ServerStatus.NOT_CONNECTED_STATUS) 2453 { 2454 resetMonitoringCounters(); 2455 } 2456 2457 status = newStatus; 2458 if (logger.isTraceEnabled()) 2459 { 2460 logger.trace("Replication domain " + getBaseDN() 2461 + " new status is: " + status); 2462 } 2463 2464 // Perform whatever actions are needed to apply properties for being 2465 // compliant with new status 2466 updateDomainForNewStatus(); 2467 } 2468 } 2469 2470 /** 2471 * Returns a boolean indicating if an import or export is currently 2472 * processed. 2473 * 2474 * @return The status 2475 */ 2476 public boolean ieRunning() 2477 { 2478 return importExportContext.get() != null; 2479 } 2480 2481 /** 2482 * Check the value of the Replication Servers generation ID. 2483 * 2484 * @param generationID The expected value of the generation ID. 2485 * 2486 * @throws DirectoryException When the generation ID of the Replication 2487 * Servers is not the expected value. 2488 */ 2489 private void checkGenerationID(long generationID) throws DirectoryException 2490 { 2491 boolean allSet = true; 2492 2493 for (int i = 0; i< 50; i++) 2494 { 2495 allSet = true; 2496 for (RSInfo rsInfo : getRsInfos()) 2497 { 2498 // the 'empty' RSes (generationId==-1) are considered as good citizens 2499 if (rsInfo.getGenerationId() != -1 && 2500 rsInfo.getGenerationId() != generationID) 2501 { 2502 try 2503 { 2504 Thread.sleep(i*100); 2505 } catch (InterruptedException e) 2506 { 2507 Thread.currentThread().interrupt(); 2508 } 2509 allSet = false; 2510 break; 2511 } 2512 } 2513 if (allSet) 2514 { 2515 break; 2516 } 2517 } 2518 if (!allSet) 2519 { 2520 LocalizableMessage message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDN()); 2521 throw new DirectoryException(ResultCode.OTHER, message); 2522 } 2523 } 2524 2525 /** 2526 * Reset the Replication Log. 2527 * Calling this method will remove all the Replication information that 2528 * was kept on all the Replication Servers currently connected in the 2529 * topology. 2530 * 2531 * @throws DirectoryException If this ReplicationDomain is not currently 2532 * connected to a Replication Server or it 2533 * was not possible to contact it. 2534 */ 2535 void resetReplicationLog() throws DirectoryException 2536 { 2537 // Reset the Generation ID to -1 to clean the ReplicationServers. 2538 resetGenerationId(-1L); 2539 2540 // check that at least one ReplicationServer did change its generation-id 2541 checkGenerationID(-1); 2542 2543 // Reconnect to the Replication Server so that it adopts our GenerationID. 2544 restartService(); 2545 2546 // wait for the domain to reconnect. 2547 int count = 0; 2548 while (!isConnected() && count < 10) 2549 { 2550 try 2551 { 2552 Thread.sleep(100); 2553 } catch (InterruptedException e) 2554 { 2555 Thread.currentThread().interrupt(); 2556 } 2557 } 2558 2559 resetGenerationId(getGenerationID()); 2560 2561 // check that at least one ReplicationServer did change its generation-id 2562 checkGenerationID(getGenerationID()); 2563 } 2564 2565 /** 2566 * Reset the generationId of this domain in the whole topology. 2567 * A message is sent to the Replication Servers for them to reset 2568 * their change dbs. 2569 * 2570 * @param generationIdNewValue The new value of the generation Id. 2571 * @throws DirectoryException When an error occurs 2572 */ 2573 public void resetGenerationId(Long generationIdNewValue) 2574 throws DirectoryException 2575 { 2576 if (logger.isTraceEnabled()) 2577 { 2578 logger.trace("Server id " + getServerId() + " and domain " 2579 + getBaseDN() + " resetGenerationId " + generationIdNewValue); 2580 } 2581 2582 ResetGenerationIdMsg genIdMessage = 2583 new ResetGenerationIdMsg(getGenId(generationIdNewValue)); 2584 2585 if (!isConnected()) 2586 { 2587 LocalizableMessage message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDN(), 2588 getServerId(), genIdMessage.getGenerationId()); 2589 throw new DirectoryException(ResultCode.OTHER, message); 2590 } 2591 broker.publish(genIdMessage); 2592 2593 // check that at least one ReplicationServer did change its generation-id 2594 checkGenerationID(getGenId(generationIdNewValue)); 2595 } 2596 2597 private long getGenId(Long generationIdNewValue) 2598 { 2599 if (generationIdNewValue != null) 2600 { 2601 return generationIdNewValue; 2602 } 2603 return getGenerationID(); 2604 } 2605 2606 2607 /* 2608 ******** End of The total Update code ********* 2609 */ 2610 2611 /* 2612 ******* Start of Monitoring Code ********** 2613 */ 2614 2615 /** 2616 * Get the maximum receive window size. 2617 * 2618 * @return The maximum receive window size. 2619 */ 2620 int getMaxRcvWindow() 2621 { 2622 if (broker != null) 2623 { 2624 return broker.getMaxRcvWindow(); 2625 } 2626 return 0; 2627 } 2628 2629 /** 2630 * Get the current receive window size. 2631 * 2632 * @return The current receive window size. 2633 */ 2634 int getCurrentRcvWindow() 2635 { 2636 if (broker != null) 2637 { 2638 return broker.getCurrentRcvWindow(); 2639 } 2640 return 0; 2641 } 2642 2643 /** 2644 * Get the maximum send window size. 2645 * 2646 * @return The maximum send window size. 2647 */ 2648 int getMaxSendWindow() 2649 { 2650 if (broker != null) 2651 { 2652 return broker.getMaxSendWindow(); 2653 } 2654 return 0; 2655 } 2656 2657 /** 2658 * Get the current send window size. 2659 * 2660 * @return The current send window size. 2661 */ 2662 int getCurrentSendWindow() 2663 { 2664 if (broker != null) 2665 { 2666 return broker.getCurrentSendWindow(); 2667 } 2668 return 0; 2669 } 2670 2671 /** 2672 * Get the number of times the replication connection was lost. 2673 * @return The number of times the replication connection was lost. 2674 */ 2675 int getNumLostConnections() 2676 { 2677 if (broker != null) 2678 { 2679 return broker.getNumLostConnections(); 2680 } 2681 return 0; 2682 } 2683 2684 /** 2685 * Determine whether the connection to the replication server is encrypted. 2686 * @return true if the connection is encrypted, false otherwise. 2687 */ 2688 boolean isSessionEncrypted() 2689 { 2690 return broker != null && broker.isSessionEncrypted(); 2691 } 2692 2693 /** 2694 * Check if the domain is connected to a ReplicationServer. 2695 * 2696 * @return true if the server is connected, false if not. 2697 */ 2698 public boolean isConnected() 2699 { 2700 return broker != null && broker.isConnected(); 2701 } 2702 2703 /** 2704 * Check if the domain has a connection error. 2705 * A Connection error happens when the broker could not be created 2706 * or when the broker could not find any ReplicationServer to connect to. 2707 * 2708 * @return true if the domain has a connection error. 2709 */ 2710 public boolean hasConnectionError() 2711 { 2712 return broker == null || broker.hasConnectionError(); 2713 } 2714 2715 /** 2716 * Get the name of the replicationServer to which this domain is currently 2717 * connected. 2718 * 2719 * @return the name of the replicationServer to which this domain 2720 * is currently connected. 2721 */ 2722 public String getReplicationServer() 2723 { 2724 if (broker != null) 2725 { 2726 return broker.getReplicationServer(); 2727 } 2728 return ReplicationBroker.NO_CONNECTED_SERVER; 2729 } 2730 2731 /** 2732 * Gets the number of updates sent in assured safe read mode. 2733 * @return The number of updates sent in assured safe read mode. 2734 */ 2735 public int getAssuredSrSentUpdates() 2736 { 2737 return assuredSrSentUpdates.get(); 2738 } 2739 2740 /** 2741 * Gets the number of updates sent in assured safe read mode that have been 2742 * acknowledged without errors. 2743 * @return The number of updates sent in assured safe read mode that have been 2744 * acknowledged without errors. 2745 */ 2746 public int getAssuredSrAcknowledgedUpdates() 2747 { 2748 return assuredSrAcknowledgedUpdates.get(); 2749 } 2750 2751 /** 2752 * Gets the number of updates sent in assured safe read mode that have not 2753 * been acknowledged. 2754 * @return The number of updates sent in assured safe read mode that have not 2755 * been acknowledged. 2756 */ 2757 public int getAssuredSrNotAcknowledgedUpdates() 2758 { 2759 return assuredSrNotAcknowledgedUpdates.get(); 2760 } 2761 2762 /** 2763 * Gets the number of updates sent in assured safe read mode that have not 2764 * been acknowledged due to timeout error. 2765 * @return The number of updates sent in assured safe read mode that have not 2766 * been acknowledged due to timeout error. 2767 */ 2768 public int getAssuredSrTimeoutUpdates() 2769 { 2770 return assuredSrTimeoutUpdates.get(); 2771 } 2772 2773 /** 2774 * Gets the number of updates sent in assured safe read mode that have not 2775 * been acknowledged due to wrong status error. 2776 * @return The number of updates sent in assured safe read mode that have not 2777 * been acknowledged due to wrong status error. 2778 */ 2779 public int getAssuredSrWrongStatusUpdates() 2780 { 2781 return assuredSrWrongStatusUpdates.get(); 2782 } 2783 2784 /** 2785 * Gets the number of updates sent in assured safe read mode that have not 2786 * been acknowledged due to replay error. 2787 * @return The number of updates sent in assured safe read mode that have not 2788 * been acknowledged due to replay error. 2789 */ 2790 public int getAssuredSrReplayErrorUpdates() 2791 { 2792 return assuredSrReplayErrorUpdates.get(); 2793 } 2794 2795 /** 2796 * Gets the number of updates sent in assured safe read mode that have not 2797 * been acknowledged per server. 2798 * @return A copy of the map that contains the number of updates sent in 2799 * assured safe read mode that have not been acknowledged per server. 2800 */ 2801 public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates() 2802 { 2803 synchronized(assuredSrServerNotAcknowledgedUpdates) 2804 { 2805 return new HashMap<>(assuredSrServerNotAcknowledgedUpdates); 2806 } 2807 } 2808 2809 /** 2810 * Gets the number of updates received in assured safe read mode request. 2811 * @return The number of updates received in assured safe read mode request. 2812 */ 2813 public int getAssuredSrReceivedUpdates() 2814 { 2815 return assuredSrReceivedUpdates.get(); 2816 } 2817 2818 /** 2819 * Gets the number of updates received in assured safe read mode that we acked 2820 * without error (no replay error). 2821 * @return The number of updates received in assured safe read mode that we 2822 * acked without error (no replay error). 2823 */ 2824 public int getAssuredSrReceivedUpdatesAcked() 2825 { 2826 return this.assuredSrReceivedUpdatesAcked.get(); 2827 } 2828 2829 /** 2830 * Gets the number of updates received in assured safe read mode that we did 2831 * not ack due to error (replay error). 2832 * @return The number of updates received in assured safe read mode that we 2833 * did not ack due to error (replay error). 2834 */ 2835 public int getAssuredSrReceivedUpdatesNotAcked() 2836 { 2837 return this.assuredSrReceivedUpdatesNotAcked.get(); 2838 } 2839 2840 /** 2841 * Gets the number of updates sent in assured safe data mode. 2842 * @return The number of updates sent in assured safe data mode. 2843 */ 2844 public int getAssuredSdSentUpdates() 2845 { 2846 return assuredSdSentUpdates.get(); 2847 } 2848 2849 /** 2850 * Gets the number of updates sent in assured safe data mode that have been 2851 * acknowledged without errors. 2852 * @return The number of updates sent in assured safe data mode that have been 2853 * acknowledged without errors. 2854 */ 2855 public int getAssuredSdAcknowledgedUpdates() 2856 { 2857 return assuredSdAcknowledgedUpdates.get(); 2858 } 2859 2860 /** 2861 * Gets the number of updates sent in assured safe data mode that have not 2862 * been acknowledged due to timeout error. 2863 * @return The number of updates sent in assured safe data mode that have not 2864 * been acknowledged due to timeout error. 2865 */ 2866 public int getAssuredSdTimeoutUpdates() 2867 { 2868 return assuredSdTimeoutUpdates.get(); 2869 } 2870 2871 /** 2872 * Gets the number of updates sent in assured safe data mode that have not 2873 * been acknowledged due to timeout error per server. 2874 * @return A copy of the map that contains the number of updates sent in 2875 * assured safe data mode that have not been acknowledged due to timeout 2876 * error per server. 2877 */ 2878 public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates() 2879 { 2880 synchronized(assuredSdServerTimeoutUpdates) 2881 { 2882 return new HashMap<>(assuredSdServerTimeoutUpdates); 2883 } 2884 } 2885 2886 /** 2887 * Gets the date of the last status change. 2888 * @return The date of the last status change. 2889 */ 2890 public Date getLastStatusChangeDate() 2891 { 2892 return lastStatusChangeDate; 2893 } 2894 2895 /** 2896 * Resets the values of the monitoring counters. 2897 */ 2898 private void resetMonitoringCounters() 2899 { 2900 numProcessedUpdates = new AtomicInteger(0); 2901 numRcvdUpdates = new AtomicInteger(0); 2902 numSentUpdates = new AtomicInteger(0); 2903 2904 assuredSrSentUpdates = new AtomicInteger(0); 2905 assuredSrAcknowledgedUpdates = new AtomicInteger(0); 2906 assuredSrNotAcknowledgedUpdates = new AtomicInteger(0); 2907 assuredSrTimeoutUpdates = new AtomicInteger(0); 2908 assuredSrWrongStatusUpdates = new AtomicInteger(0); 2909 assuredSrReplayErrorUpdates = new AtomicInteger(0); 2910 synchronized (assuredSrServerNotAcknowledgedUpdates) 2911 { 2912 assuredSrServerNotAcknowledgedUpdates.clear(); 2913 } 2914 assuredSrReceivedUpdates = new AtomicInteger(0); 2915 assuredSrReceivedUpdatesAcked = new AtomicInteger(0); 2916 assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); 2917 assuredSdSentUpdates = new AtomicInteger(0); 2918 assuredSdAcknowledgedUpdates = new AtomicInteger(0); 2919 assuredSdTimeoutUpdates = new AtomicInteger(0); 2920 synchronized (assuredSdServerTimeoutUpdates) 2921 { 2922 assuredSdServerTimeoutUpdates.clear(); 2923 } 2924 } 2925 2926 /* 2927 ********** End of Monitoring Code ************** 2928 */ 2929 2930 /** 2931 * Start the publish mechanism of the Replication Service. After this method 2932 * has been called, the publish service can be used by calling the 2933 * {@link #publish(UpdateMsg)} method. 2934 * 2935 * @throws ConfigException 2936 * If the DirectoryServer configuration was incorrect. 2937 */ 2938 public void startPublishService() throws ConfigException 2939 { 2940 synchronized (sessionLock) 2941 { 2942 if (broker == null) 2943 { 2944 // create the broker object used to publish and receive changes 2945 broker = new ReplicationBroker( 2946 this, state, config, new ReplSessionSecurity()); 2947 broker.start(); 2948 } 2949 } 2950 } 2951 2952 /** 2953 * Starts the receiver side of the Replication Service. 2954 * <p> 2955 * After this method has been called, the Replication Service will start 2956 * calling the {@link #processUpdate(UpdateMsg)}. 2957 * <p> 2958 * This method must be called once and must be called after the 2959 * {@link #startPublishService()}. 2960 */ 2961 public void startListenService() 2962 { 2963 synchronized (sessionLock) 2964 { 2965 if (listenerThread != null) 2966 { 2967 return; 2968 } 2969 2970 final String threadName = "Replica DS(" + getServerId() + ") listener for domain \"" + getBaseDN() + "\""; 2971 2972 listenerThread = new DirectoryThread(new Runnable() 2973 { 2974 @Override 2975 public void run() 2976 { 2977 if (logger.isTraceEnabled()) 2978 { 2979 logger.trace("Replication Listener thread starting."); 2980 } 2981 2982 // Loop processing any incoming update messages. 2983 while (!listenerThread.isShutdownInitiated()) 2984 { 2985 final UpdateMsg updateMsg = receive(); 2986 if (updateMsg == null) 2987 { 2988 // The server is shutting down. 2989 listenerThread.initiateShutdown(); 2990 } 2991 else if (processUpdate(updateMsg) 2992 && updateMsg.contributesToDomainState()) 2993 { 2994 /* 2995 * Warning: in synchronous mode, no way to tell the replay of an 2996 * update went wrong Just put null in processUpdateDone so that if 2997 * assured replication is used the ack is sent without error at 2998 * replay flag. 2999 */ 3000 processUpdateDone(updateMsg, null); 3001 state.update(updateMsg.getCSN()); 3002 } 3003 } 3004 3005 if (logger.isTraceEnabled()) 3006 { 3007 logger.trace("Replication Listener thread stopping."); 3008 } 3009 } 3010 }, threadName); 3011 3012 listenerThread.start(); 3013 } 3014 } 3015 3016 /** 3017 * Temporarily disable the Replication Service. 3018 * The Replication Service can be enabled again using 3019 * {@link #enableService()}. 3020 * <p> 3021 * It can be useful to disable the Replication Service when the 3022 * repository where the replicated information is stored becomes 3023 * temporarily unavailable and replicated updates can therefore not 3024 * be replayed during a while. This method is not MT safe. 3025 */ 3026 public void disableService() 3027 { 3028 synchronized (sessionLock) 3029 { 3030 /* 3031 Stop the broker first in order to prevent the listener from 3032 reconnecting - see OPENDJ-457. 3033 */ 3034 if (broker != null) 3035 { 3036 broker.stop(); 3037 } 3038 3039 // Stop the listener thread 3040 if (listenerThread != null) 3041 { 3042 listenerThread.initiateShutdown(); 3043 try 3044 { 3045 listenerThread.join(); 3046 } 3047 catch (InterruptedException e) 3048 { 3049 // Give up waiting. 3050 } 3051 listenerThread = null; 3052 } 3053 } 3054 } 3055 3056 /** 3057 * Returns {@code true} if the listener thread is shutting down or has 3058 * shutdown. 3059 * 3060 * @return {@code true} if the listener thread is shutting down or has 3061 * shutdown. 3062 */ 3063 protected final boolean isListenerShuttingDown() 3064 { 3065 final DirectoryThread tmp = listenerThread; 3066 return tmp == null || tmp.isShutdownInitiated(); 3067 } 3068 3069 /** 3070 * Restart the Replication service after a {@link #disableService()}. 3071 * <p> 3072 * The Replication Service will restart from the point indicated by the 3073 * {@link ServerState} that was given as a parameter to the 3074 * {@link #startPublishService()} at startup time. 3075 * <p> 3076 * If some data have changed in the repository during the period of time when 3077 * the Replication Service was disabled, this {@link ServerState} should 3078 * therefore be updated by the Replication Domain subclass before calling this 3079 * method. This method is not MT safe. 3080 */ 3081 public void enableService() 3082 { 3083 synchronized (sessionLock) 3084 { 3085 broker.start(); 3086 startListenService(); 3087 } 3088 } 3089 3090 /** 3091 * Change some ReplicationDomain parameters. 3092 * 3093 * @param config 3094 * The new configuration that this domain should now use. 3095 */ 3096 protected void changeConfig(ReplicationDomainCfg config) 3097 { 3098 if (broker != null && broker.changeConfig(config)) 3099 { 3100 restartService(); 3101 } 3102 } 3103 3104 /** 3105 * Applies a configuration change to the attributes which should be included 3106 * in the ECL. 3107 * 3108 * @param includeAttributes 3109 * attributes to be included with all change records. 3110 * @param includeAttributesForDeletes 3111 * additional attributes to be included with delete change records. 3112 */ 3113 public void changeConfig(Set<String> includeAttributes, 3114 Set<String> includeAttributesForDeletes) 3115 { 3116 final boolean attrsModified = setEclIncludes( 3117 getServerId(), includeAttributes, includeAttributesForDeletes); 3118 if (attrsModified && broker != null) 3119 { 3120 restartService(); 3121 } 3122 } 3123 3124 private void restartService() 3125 { 3126 disableService(); 3127 enableService(); 3128 } 3129 3130 /** 3131 * This method should trigger an export of the replicated data. 3132 * to the provided outputStream. 3133 * When finished the outputStream should be flushed and closed. 3134 * 3135 * @param output The OutputStream where the export should 3136 * be produced. 3137 * @throws DirectoryException When needed. 3138 */ 3139 protected abstract void exportBackend(OutputStream output) 3140 throws DirectoryException; 3141 3142 /** 3143 * This method should trigger an import of the replicated data. 3144 * 3145 * @param input The InputStream from which 3146 * the import should be reading entries. 3147 * 3148 * @throws DirectoryException When needed. 3149 */ 3150 protected abstract void importBackend(InputStream input) 3151 throws DirectoryException; 3152 3153 /** 3154 * This method should return the total number of objects in the 3155 * replicated domain. 3156 * This count will be used for reporting. 3157 * 3158 * @throws DirectoryException when needed. 3159 * 3160 * @return The number of objects in the replication domain. 3161 */ 3162 public abstract long countEntries() throws DirectoryException; 3163 3164 3165 3166 /** 3167 * This method should handle the processing of {@link UpdateMsg} receive from 3168 * remote replication entities. 3169 * <p> 3170 * This method will be called by a single thread and should therefore should 3171 * not be blocking. 3172 * 3173 * @param updateMsg 3174 * The {@link UpdateMsg} that was received. 3175 * @return A boolean indicating if the processing is completed at return time. 3176 * If <code> true </code> is returned, no further processing is 3177 * necessary. If <code> false </code> is returned, the subclass should 3178 * call the method {@link #processUpdateDone(UpdateMsg, String)} and 3179 * update the ServerState When this processing is complete. 3180 */ 3181 public abstract boolean processUpdate(UpdateMsg updateMsg); 3182 3183 /** 3184 * This method must be called after each call to 3185 * {@link #processUpdate(UpdateMsg)} when the processing of the 3186 * update is completed. 3187 * <p> 3188 * It is useful for implementation needing to process the update in an 3189 * asynchronous way or using several threads, but must be called even by 3190 * implementation doing it in a synchronous, single-threaded way. 3191 * 3192 * @param msg 3193 * The UpdateMsg whose processing was completed. 3194 * @param replayErrorMsg 3195 * if not null, this means an error occurred during the replay of 3196 * this update, and this is the matching human readable message 3197 * describing the problem. 3198 */ 3199 protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg) 3200 { 3201 broker.updateWindowAfterReplay(); 3202 3203 /* 3204 Send an ack if it was requested and the group id is the same of the RS 3205 one. Only Safe Read mode makes sense in DS for returning an ack. 3206 */ 3207 // Assured feature is supported starting from replication protocol V2 3208 if (msg.isAssured() 3209 && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) 3210 { 3211 if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 3212 { 3213 if (broker.getRsGroupId() == getGroupId()) 3214 { 3215 // Send the ack 3216 AckMsg ackMsg = new AckMsg(msg.getCSN()); 3217 if (replayErrorMsg != null) 3218 { 3219 // Mark the error in the ack 3220 // -> replay error occurred 3221 ackMsg.setHasReplayError(true); 3222 // -> replay error occurred in our server 3223 ackMsg.setFailedServers(newArrayList(getServerId())); 3224 } 3225 broker.publish(ackMsg); 3226 if (replayErrorMsg != null) 3227 { 3228 assuredSrReceivedUpdatesNotAcked.incrementAndGet(); 3229 } 3230 else 3231 { 3232 assuredSrReceivedUpdatesAcked.incrementAndGet(); 3233 } 3234 } 3235 } 3236 else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE) 3237 { 3238 logger.error(ERR_DS_UNKNOWN_ASSURED_MODE, getServerId(), msg.getAssuredMode(), getBaseDN(), msg); 3239 } 3240 // Nothing to do in Assured safe data mode, only RS ack updates. 3241 } 3242 3243 incProcessedUpdates(); 3244 } 3245 3246 /** 3247 * Prepare a message if it is to be sent in assured mode. 3248 * If the assured mode is enabled, this method should be called before 3249 * publish(UpdateMsg msg) method. This will configure the update accordingly 3250 * before it is sent and will prepare the mechanism that will block until the 3251 * matching ack is received. To wait for the ack after publish call, use 3252 * the waitForAckIfAssuredEnabled() method. 3253 * The expected typical usage in a service inheriting from this class is 3254 * the following sequence: 3255 * UpdateMsg msg = xxx; 3256 * prepareWaitForAckIfAssuredEnabled(msg); 3257 * publish(msg); 3258 * waitForAckIfAssuredEnabled(msg); 3259 * 3260 * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have 3261 * no effect if assured replication is disabled. 3262 * Note: this mechanism should not be used if using publish(byte[] msg) 3263 * version as usage of these methods is already hidden inside. 3264 * 3265 * @param msg The update message to be sent soon. 3266 */ 3267 protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg) 3268 { 3269 /* 3270 * If assured configured, set message accordingly to request an ack in the 3271 * right assured mode. 3272 * No ack requested for a RS with a different group id. 3273 * Assured replication supported for the same locality, 3274 * i.e: a topology working in the same geographical location). 3275 * If we are connected to a RS which is not in our locality, 3276 * no need to ask for an ack. 3277 */ 3278 if (needsAck()) 3279 { 3280 msg.setAssured(true); 3281 msg.setAssuredMode(getAssuredMode()); 3282 if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE) 3283 { 3284 msg.setSafeDataLevel(getAssuredSdLevel()); 3285 } 3286 3287 // Add the assured message to the list of update that are waiting for acks 3288 waitingAckMsgs.put(msg.getCSN(), msg); 3289 } 3290 } 3291 3292 private boolean needsAck() 3293 { 3294 return isAssured() && broker.getRsGroupId() == getGroupId(); 3295 } 3296 3297 /** 3298 * Wait for the processing of an assured message after it has been sent, if 3299 * assured replication is configured, otherwise, do nothing. 3300 * The prepareWaitForAckIfAssuredEnabled method should have been called 3301 * before, see its comment for the full picture. 3302 * 3303 * @param msg The UpdateMsg for which we are waiting for an ack. 3304 * @throws TimeoutException When the configured timeout occurs waiting for the 3305 * ack. 3306 */ 3307 protected void waitForAckIfAssuredEnabled(UpdateMsg msg) 3308 throws TimeoutException 3309 { 3310 if (needsAck()) 3311 { 3312 // Increment assured replication monitoring counters 3313 switch (getAssuredMode()) 3314 { 3315 case SAFE_READ_MODE: 3316 assuredSrSentUpdates.incrementAndGet(); 3317 break; 3318 case SAFE_DATA_MODE: 3319 assuredSdSentUpdates.incrementAndGet(); 3320 break; 3321 default: 3322 // Should not happen 3323 } 3324 } else 3325 { 3326 // Not assured or bad group id, return immediately 3327 return; 3328 } 3329 3330 // Wait for the ack to be received, timing out if necessary 3331 long startTime = System.currentTimeMillis(); 3332 synchronized (msg) 3333 { 3334 CSN csn = msg.getCSN(); 3335 while (waitingAckMsgs.containsKey(csn)) 3336 { 3337 try 3338 { 3339 /* 3340 WARNING: this timeout may be difficult to optimize: too low, it 3341 may use too much CPU, too high, it may penalize performance... 3342 */ 3343 msg.wait(10); 3344 } catch (InterruptedException e) 3345 { 3346 if (logger.isTraceEnabled()) 3347 { 3348 logger.trace("waitForAck method interrupted for replication " + 3349 "baseDN: " + getBaseDN()); 3350 } 3351 break; 3352 } 3353 // Timeout ? 3354 if (System.currentTimeMillis() - startTime >= getAssuredTimeout()) 3355 { 3356 /* 3357 Timeout occurred, be sure that ack is not being received and if so, 3358 remove the update from the wait list, log the timeout error and 3359 also update assured monitoring counters 3360 */ 3361 final UpdateMsg update = waitingAckMsgs.remove(csn); 3362 if (update == null) 3363 { 3364 // Ack received just before timeout limit: we can exit 3365 break; 3366 } 3367 3368 // No luck, this is a real timeout 3369 // Increment assured replication monitoring counters 3370 switch (msg.getAssuredMode()) 3371 { 3372 case SAFE_READ_MODE: 3373 assuredSrNotAcknowledgedUpdates.incrementAndGet(); 3374 assuredSrTimeoutUpdates.incrementAndGet(); 3375 // Increment number of errors for our RS 3376 updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates, 3377 broker.getRsServerId()); 3378 break; 3379 case SAFE_DATA_MODE: 3380 assuredSdTimeoutUpdates.incrementAndGet(); 3381 // Increment number of errors for our RS 3382 updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, 3383 broker.getRsServerId()); 3384 break; 3385 default: 3386 // Should not happen 3387 } 3388 3389 throw new TimeoutException("No ack received for message csn: " + csn 3390 + " and replication domain: " + getBaseDN() + " after " 3391 + getAssuredTimeout() + " ms."); 3392 } 3393 } 3394 } 3395 } 3396 3397 /** 3398 * Publish an {@link UpdateMsg} to the Replication Service. 3399 * <p> 3400 * The Replication Service will handle the delivery of this {@link UpdateMsg} 3401 * to all the participants of this Replication Domain. These members will be 3402 * receive this {@link UpdateMsg} through a call of the 3403 * {@link #processUpdate(UpdateMsg)} message. 3404 * 3405 * @param msg The UpdateMsg that should be published. 3406 */ 3407 public void publish(UpdateMsg msg) 3408 { 3409 broker.publish(msg); 3410 if (msg.contributesToDomainState()) 3411 { 3412 state.update(msg.getCSN()); 3413 } 3414 numSentUpdates.incrementAndGet(); 3415 } 3416 3417 /** 3418 * Publishes a replica offline message if all pending changes for current 3419 * replica have been sent out. 3420 */ 3421 public void publishReplicaOfflineMsg() 3422 { 3423 // Here to be overridden 3424 } 3425 3426 /** 3427 * This method should return the generationID to use for this 3428 * ReplicationDomain. 3429 * This method can be called at any time after the ReplicationDomain 3430 * has been started. 3431 * 3432 * @return The GenerationID. 3433 */ 3434 public long getGenerationID() 3435 { 3436 return generationId; 3437 } 3438 3439 /** 3440 * Sets the generationId for this replication domain. 3441 * 3442 * @param generationId 3443 * the generationId to set 3444 */ 3445 public void setGenerationID(long generationId) 3446 { 3447 this.generationId = generationId; 3448 } 3449 3450 /** 3451 * Subclasses should use this method to add additional monitoring information 3452 * in the ReplicationDomain. 3453 * 3454 * @param monitorData where to additional monitoring attributes 3455 */ 3456 public void addAdditionalMonitoring(MonitorData monitorData) 3457 { 3458 } 3459 3460 /** 3461 * Returns the Import/Export context associated to this ReplicationDomain. 3462 * 3463 * @return the Import/Export context associated to this ReplicationDomain 3464 */ 3465 protected ImportExportContext getImportExportContext() 3466 { 3467 return importExportContext.get(); 3468 } 3469 3470 /** 3471 * Returns the local address of this replication domain, or the empty string 3472 * if it is not yet connected. 3473 * 3474 * @return The local address. 3475 */ 3476 String getLocalUrl() 3477 { 3478 final ReplicationBroker tmp = broker; 3479 return tmp != null ? tmp.getLocalUrl() : ""; 3480 } 3481 3482 /** 3483 * Set the attributes configured on a server to be included in the ECL. 3484 * 3485 * @param serverId 3486 * Server where these attributes are configured. 3487 * @param includeAttributes 3488 * Attributes to be included with all change records, may include 3489 * wild-cards. 3490 * @param includeAttributesForDeletes 3491 * Additional attributes to be included with delete change records, 3492 * may include wild-cards. 3493 * @return {@code true} if the set of attributes was modified. 3494 */ 3495 public boolean setEclIncludes(int serverId, 3496 Set<String> includeAttributes, 3497 Set<String> includeAttributesForDeletes) 3498 { 3499 ECLIncludes current; 3500 ECLIncludes updated; 3501 do 3502 { 3503 current = this.eclIncludes.get(); 3504 updated = current.addIncludedAttributes( 3505 serverId, includeAttributes, includeAttributesForDeletes); 3506 } 3507 while (!this.eclIncludes.compareAndSet(current, updated)); 3508 return current != updated; 3509 } 3510 3511 3512 3513 /** 3514 * Get the attributes to include in each change for the ECL. 3515 * 3516 * @return The attributes to include in each change for the ECL. 3517 */ 3518 public Set<String> getEclIncludes() 3519 { 3520 return eclIncludes.get().includedAttrsAllServers; 3521 } 3522 3523 3524 3525 /** 3526 * Get the attributes to include in each delete change for the ECL. 3527 * 3528 * @return The attributes to include in each delete change for the ECL. 3529 */ 3530 public Set<String> getEclIncludesForDeletes() 3531 { 3532 return eclIncludes.get().includedAttrsForDeletesAllServers; 3533 } 3534 3535 3536 3537 /** 3538 * Get the attributes to include in each change for the ECL for a given 3539 * serverId. 3540 * 3541 * @param serverId 3542 * The serverId for which we want the include attributes. 3543 * @return The attributes. 3544 */ 3545 Set<String> getEclIncludes(int serverId) 3546 { 3547 return eclIncludes.get().includedAttrsByServer.get(serverId); 3548 } 3549 3550 3551 3552 /** 3553 * Get the attributes to include in each change for the ECL for a given 3554 * serverId. 3555 * 3556 * @param serverId 3557 * The serverId for which we want the include attributes. 3558 * @return The attributes. 3559 */ 3560 Set<String> getEclIncludesForDeletes(int serverId) 3561 { 3562 return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId); 3563 } 3564 3565 /** 3566 * Returns the CSN of the last Change that was fully processed by this 3567 * ReplicationDomain. 3568 * 3569 * @return The CSN of the last Change that was fully processed by this 3570 * ReplicationDomain. 3571 */ 3572 public CSN getLastLocalChange() 3573 { 3574 return state.getCSN(getServerId()); 3575 } 3576 3577 /** 3578 * Gets and stores the assured replication configuration parameters. Returns a 3579 * boolean indicating if the passed configuration has changed compared to 3580 * previous values and the changes require a reconnection. 3581 * 3582 * @param config 3583 * The configuration object 3584 * @param allowReconnection 3585 * Tells if one must reconnect if significant changes occurred 3586 */ 3587 protected void readAssuredConfig(ReplicationDomainCfg config, 3588 boolean allowReconnection) 3589 { 3590 // Disconnect if required: changing configuration values before 3591 // disconnection would make assured replication used immediately and 3592 // disconnection could cause some timeouts error. 3593 if (needReconnection(config) && allowReconnection) 3594 { 3595 disableService(); 3596 3597 assuredConfig = config; 3598 3599 enableService(); 3600 } 3601 } 3602 3603 private boolean needReconnection(ReplicationDomainCfg cfg) 3604 { 3605 final AssuredMode assuredMode = getAssuredMode(); 3606 switch (cfg.getAssuredType()) 3607 { 3608 case NOT_ASSURED: 3609 if (isAssured()) 3610 { 3611 return true; 3612 } 3613 break; 3614 case SAFE_DATA: 3615 if (!isAssured() || assuredMode == SAFE_READ_MODE) 3616 { 3617 return true; 3618 } 3619 break; 3620 case SAFE_READ: 3621 if (!isAssured() || assuredMode == SAFE_DATA_MODE) 3622 { 3623 return true; 3624 } 3625 break; 3626 } 3627 3628 return isAssured() 3629 && assuredMode == SAFE_DATA_MODE 3630 && cfg.getAssuredSdLevel() != getAssuredSdLevel(); 3631 } 3632 3633 /** {@inheritDoc} */ 3634 @Override 3635 public String toString() 3636 { 3637 return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId(); 3638 } 3639}