001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2017 ForgeRock AS. 016 */ 017package org.opends.server.replication.service; 018 019import java.io.IOException; 020import java.math.BigDecimal; 021import java.math.MathContext; 022import java.math.RoundingMode; 023import java.net.*; 024import java.util.*; 025import java.util.Map.Entry; 026import java.util.concurrent.ConcurrentSkipListMap; 027import java.util.concurrent.Semaphore; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031 032import net.jcip.annotations.GuardedBy; 033import net.jcip.annotations.Immutable; 034 035import org.forgerock.i18n.LocalizableMessage; 036import org.forgerock.i18n.slf4j.LocalizedLogger; 037import org.forgerock.opendj.ldap.DN; 038import org.forgerock.util.Utils; 039import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; 040import org.opends.server.core.DirectoryServer; 041import org.opends.server.replication.common.*; 042import org.opends.server.replication.plugin.MultimasterReplication; 043import org.opends.server.replication.plugin.MultimasterReplication.UnreachableReplicationServers; 044import org.opends.server.replication.protocol.ChangeStatusMsg; 045import org.opends.server.replication.protocol.MonitorMsg; 046import org.opends.server.replication.protocol.MonitorRequestMsg; 047import org.opends.server.replication.protocol.ProtocolVersion; 048import org.opends.server.replication.protocol.ReplServerStartDSMsg; 049import org.opends.server.replication.protocol.ReplServerStartMsg; 050import org.opends.server.replication.protocol.ReplSessionSecurity; 051import org.opends.server.replication.protocol.ReplicaOfflineMsg; 052import org.opends.server.replication.protocol.ReplicationMsg; 053import org.opends.server.replication.protocol.ServerStartMsg; 054import org.opends.server.replication.protocol.Session; 055import org.opends.server.replication.protocol.StartMsg; 056import org.opends.server.replication.protocol.StartSessionMsg; 057import org.opends.server.replication.protocol.StopMsg; 058import org.opends.server.replication.protocol.TopologyMsg; 059import org.opends.server.replication.protocol.UpdateMsg; 060import org.opends.server.replication.protocol.WindowMsg; 061import org.opends.server.replication.protocol.WindowProbeMsg; 062import org.opends.server.types.HostPort; 063 064import static org.opends.messages.ReplicationMessages.*; 065import static org.opends.server.replication.plugin.MultimasterReplication.getUnreachableReplicationServers; 066import static org.opends.server.replication.protocol.ProtocolVersion.*; 067import static org.opends.server.replication.server.ReplicationServer.*; 068import static org.opends.server.util.StaticUtils.*; 069 070/** 071 * The broker for Multi-master Replication. 072 */ 073public class ReplicationBroker 074{ 075 076 /** 077 * Immutable class containing information about whether the broker is 078 * connected to an RS and data associated to this connected RS. 079 */ 080 @Immutable 081 private static final class ConnectedRS 082 { 083 084 private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS( 085 NO_CONNECTED_SERVER); 086 087 /** The info of the RS we are connected to. */ 088 private final ReplicationServerInfo rsInfo; 089 /** Contains a connected session to the RS if any exist, null otherwise. */ 090 private final Session session; 091 private final String replicationServer; 092 093 private ConnectedRS(String replicationServer) 094 { 095 this.rsInfo = null; 096 this.session = null; 097 this.replicationServer = replicationServer; 098 } 099 100 private ConnectedRS(ReplicationServerInfo rsInfo, Session session) 101 { 102 this.rsInfo = rsInfo; 103 this.session = session; 104 this.replicationServer = session != null ? 105 session.getReadableRemoteAddress() 106 : NO_CONNECTED_SERVER; 107 } 108 109 private static ConnectedRS stopped() 110 { 111 return new ConnectedRS("stopped"); 112 } 113 114 private static ConnectedRS noConnectedRS() 115 { 116 return NO_CONNECTED_RS; 117 } 118 119 public int getServerId() 120 { 121 return rsInfo != null ? rsInfo.getServerId() : -1; 122 } 123 124 private byte getGroupId() 125 { 126 return rsInfo != null ? rsInfo.getGroupId() : -1; 127 } 128 129 private boolean isConnected() 130 { 131 return session != null; 132 } 133 134 /** {@inheritDoc} */ 135 @Override 136 public String toString() 137 { 138 final StringBuilder sb = new StringBuilder(); 139 toString(sb); 140 return sb.toString(); 141 } 142 143 public void toString(StringBuilder sb) 144 { 145 sb.append("connected=").append(isConnected()).append(", "); 146 if (!isConnected()) 147 { 148 sb.append("no connectedRS"); 149 } 150 else 151 { 152 sb.append("connectedRS(serverId=").append(rsInfo.getServerId()) 153 .append(", serverUrl=").append(rsInfo.getServerURL()) 154 .append(", groupId=").append(rsInfo.getGroupId()) 155 .append(")"); 156 } 157 } 158 159 } 160 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 161 private volatile boolean shutdown; 162 private final Object startStopLock = new Object(); 163 private volatile ReplicationDomainCfg config; 164 /** 165 * String reported under CSN=monitor when there is no connected RS. 166 */ 167 static final String NO_CONNECTED_SERVER = "Not connected"; 168 private final ServerState state; 169 private Semaphore sendWindow; 170 private int maxSendWindow; 171 private int rcvWindow = 100; 172 private int halfRcvWindow = rcvWindow / 2; 173 private int timeout; 174 private final ReplSessionSecurity replSessionSecurity; 175 /** 176 * The RS this DS is currently connected to. 177 * <p> 178 * Always use {@link #setConnectedRS(ConnectedRS)} to set a new 179 * connected RS. 180 */ 181 // @NotNull // for the reference 182 private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS()); 183 /** Our replication domain. */ 184 private final ReplicationDomain domain; 185 /** 186 * This object is used as a conditional event to be notified about 187 * the reception of monitor information from the Replication Server. 188 */ 189 private final AtomicBoolean monitorResponse = new AtomicBoolean(false); 190 /** 191 * A Map containing the ServerStates of all the replicas in the topology 192 * as seen by the ReplicationServer the last time it was polled or the last 193 * time it published monitoring information. 194 */ 195 private Map<Integer, ServerState> replicaStates = new HashMap<>(); 196 /** A thread to monitor heartbeats on the session. */ 197 private HeartbeatMonitor heartbeatMonitor; 198 /** The number of times the connection was lost. */ 199 private int numLostConnections; 200 /** 201 * When the broker cannot connect to any replication server 202 * it log an error and keeps continuing every second. 203 * This boolean is set when the first failure happens and is used 204 * to avoid repeating the error message for further failure to connect 205 * and to know that it is necessary to print a new message when the broker 206 * finally succeed to connect. 207 */ 208 private volatile boolean connectionError; 209 private final Object connectPhaseLock = new Object(); 210 /** 211 * The thread that publishes messages to the RS containing the current 212 * change time of this DS. 213 */ 214 private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; 215 /* 216 * Properties for the last topology info received from the network. 217 */ 218 /** Contains the last known state of the replication topology. */ 219 private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology()); 220 @GuardedBy("this") 221 private volatile int updateDoneCount; 222 private volatile boolean connectRequiresRecovery; 223 224 /** 225 * This integer defines when the best replication server checking algorithm 226 * should be engaged. 227 * Every time a monitoring message (each monitoring publisher period) is 228 * received, it is incremented. When it reaches 2, we run the checking 229 * algorithm to see if we must reconnect to another best replication server. 230 * Then we reset the value to 0. But when a topology message is received, the 231 * integer is reset to 0. This ensures that we wait at least one monitoring 232 * publisher period before running the algorithm, but also that we wait at 233 * least for a monitoring period after the last received topology message 234 * (topology stabilization). 235 */ 236 private int mustRunBestServerCheckingAlgorithm; 237 238 /** 239 * The monitor provider for this replication domain. 240 * <p> 241 * The name of the monitor includes the local address and must therefore be 242 * re-registered every time the session is re-established or destroyed. The 243 * monitor provider can only be created (i.e. non-null) if there is a 244 * replication domain, which is not the case in unit tests. 245 */ 246 private final ReplicationMonitor monitor; 247 248 /** 249 * Creates a new ReplicationServer Broker for a particular ReplicationDomain. 250 * 251 * @param replicationDomain The replication domain that is creating us. 252 * @param state The ServerState that should be used by this broker 253 * when negotiating the session with the replicationServer. 254 * @param config The configuration to use. 255 * @param replSessionSecurity The session security configuration. 256 */ 257 public ReplicationBroker(ReplicationDomain replicationDomain, 258 ServerState state, ReplicationDomainCfg config, 259 ReplSessionSecurity replSessionSecurity) 260 { 261 this.domain = replicationDomain; 262 this.state = state; 263 this.config = config; 264 this.replSessionSecurity = replSessionSecurity; 265 this.rcvWindow = getMaxRcvWindow(); 266 this.halfRcvWindow = rcvWindow / 2; 267 this.shutdown = true; 268 269 /* 270 * Only create a monitor if there is a replication domain (this is not the 271 * case in some unit tests). 272 */ 273 this.monitor = replicationDomain != null ? new ReplicationMonitor( 274 replicationDomain) : null; 275 registerReplicationMonitor(); 276 } 277 278 /** 279 * Start the ReplicationBroker. 280 */ 281 public void start() 282 { 283 synchronized (startStopLock) 284 { 285 if (!shutdown) 286 { 287 return; 288 } 289 shutdown = false; 290 this.rcvWindow = getMaxRcvWindow(); 291 connectAsDataServer(); 292 } 293 } 294 295 /** 296 * Gets the group id of the RS we are connected to. 297 * @return The group id of the RS we are connected to 298 */ 299 public byte getRsGroupId() 300 { 301 return connectedRS.get().getGroupId(); 302 } 303 304 /** 305 * Gets the server id of the RS we are connected to. 306 * @return The server id of the RS we are connected to 307 */ 308 public int getRsServerId() 309 { 310 return connectedRS.get().getServerId(); 311 } 312 313 /** 314 * Gets the server id. 315 * @return The server id 316 */ 317 public int getServerId() 318 { 319 return config.getServerId(); 320 } 321 322 private DN getBaseDN() 323 { 324 return config.getBaseDN(); 325 } 326 327 private Set<String> getReplicationServerUrls() 328 { 329 return config.getReplicationServer(); 330 } 331 332 private byte getGroupId() 333 { 334 return (byte) config.getGroupId(); 335 } 336 337 /** 338 * Gets the server id. 339 * @return The server id 340 */ 341 private long getGenerationID() 342 { 343 return domain.getGenerationID(); 344 } 345 346 /** 347 * Set the generation id - for test purpose. 348 * @param generationID The generation id 349 */ 350 public void setGenerationID(long generationID) 351 { 352 domain.setGenerationID(generationID); 353 } 354 355 /** 356 * Compares 2 replication servers addresses and returns true if they both 357 * represent the same replication server instance. 358 * @param rs1Url Replication server 1 address 359 * @param rs2Url Replication server 2 address 360 * @return True if both replication server addresses represent the same 361 * replication server instance, false otherwise. 362 */ 363 private static boolean isSameReplicationServerUrl(String rs1Url, 364 String rs2Url) 365 { 366 try 367 { 368 final HostPort hp1 = HostPort.valueOf(rs1Url); 369 final HostPort hp2 = HostPort.valueOf(rs2Url); 370 return hp1.isEquivalentTo(hp2); 371 } 372 catch (RuntimeException ex) 373 { 374 // Not a RS url or not a valid port number: should not happen 375 return false; 376 } 377 } 378 379 /** 380 * Bag class for keeping info we get from a replication server in order to 381 * compute the best one to connect to. This is in fact a wrapper to a 382 * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be 383 * updated with a info coming from received topology messages or monitoring 384 * messages. 385 */ 386 static class ReplicationServerInfo 387 { 388 private RSInfo rsInfo; 389 private final short protocolVersion; 390 private final DN baseDN; 391 private final int windowSize; 392 // @NotNull 393 private final ServerState serverState; 394 private final boolean sslEncryption; 395 private final int degradedStatusThreshold; 396 /** Keeps the 0 value if created with a ReplServerStartMsg. */ 397 private int connectedDSNumber; 398 // @NotNull 399 private Set<Integer> connectedDSs; 400 /** 401 * Is this RS locally configured? (the RS is recognized as a usable server). 402 */ 403 private boolean locallyConfigured = true; 404 405 /** 406 * Create a new instance of ReplicationServerInfo wrapping the passed 407 * message. 408 * @param msg LocalizableMessage to wrap. 409 * @param newServerURL Override serverURL. 410 * @return The new instance wrapping the passed message. 411 * @throws IllegalArgumentException If the passed message has an unexpected 412 * type. 413 */ 414 private static ReplicationServerInfo newInstance( 415 ReplicationMsg msg, String newServerURL) throws IllegalArgumentException 416 { 417 final ReplicationServerInfo rsInfo = newInstance(msg); 418 rsInfo.setServerURL(newServerURL); 419 return rsInfo; 420 } 421 422 /** 423 * Create a new instance of ReplicationServerInfo wrapping the passed 424 * message. 425 * @param msg LocalizableMessage to wrap. 426 * @return The new instance wrapping the passed message. 427 * @throws IllegalArgumentException If the passed message has an unexpected 428 * type. 429 */ 430 static ReplicationServerInfo newInstance(ReplicationMsg msg) 431 throws IllegalArgumentException 432 { 433 if (msg instanceof ReplServerStartMsg) 434 { 435 // RS uses protocol V3 or lower 436 return new ReplicationServerInfo((ReplServerStartMsg) msg); 437 } 438 else if (msg instanceof ReplServerStartDSMsg) 439 { 440 // RS uses protocol V4 or higher 441 return new ReplicationServerInfo((ReplServerStartDSMsg) msg); 442 } 443 444 // Unsupported message type: should not happen 445 throw new IllegalArgumentException("Unexpected PDU type: " 446 + msg.getClass().getName() + ":\n" + msg); 447 } 448 449 /** 450 * Constructs a ReplicationServerInfo object wrapping a 451 * {@link ReplServerStartMsg}. 452 * 453 * @param msg 454 * The {@link ReplServerStartMsg} this object will wrap. 455 */ 456 private ReplicationServerInfo(ReplServerStartMsg msg) 457 { 458 this.protocolVersion = msg.getVersion(); 459 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 460 msg.getGenerationId(), msg.getGroupId(), 1); 461 this.baseDN = msg.getBaseDN(); 462 this.windowSize = msg.getWindowSize(); 463 final ServerState ss = msg.getServerState(); 464 this.serverState = ss != null ? ss : new ServerState(); 465 this.sslEncryption = msg.getSSLEncryption(); 466 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 467 } 468 469 /** 470 * Constructs a ReplicationServerInfo object wrapping a 471 * {@link ReplServerStartDSMsg}. 472 * 473 * @param msg 474 * The {@link ReplServerStartDSMsg} this object will wrap. 475 */ 476 private ReplicationServerInfo(ReplServerStartDSMsg msg) 477 { 478 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 479 msg.getGenerationId(), msg.getGroupId(), msg.getWeight()); 480 this.protocolVersion = msg.getVersion(); 481 this.baseDN = msg.getBaseDN(); 482 this.windowSize = msg.getWindowSize(); 483 final ServerState ss = msg.getServerState(); 484 this.serverState = ss != null ? ss : new ServerState(); 485 this.sslEncryption = msg.getSSLEncryption(); 486 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 487 this.connectedDSNumber = msg.getConnectedDSNumber(); 488 } 489 490 /** 491 * Constructs a new replication server info with the passed RSInfo internal 492 * values and the passed connected DSs. 493 * 494 * @param rsInfo 495 * The RSinfo to use for the update 496 * @param connectedDSs 497 * The new connected DSs 498 */ 499 ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs) 500 { 501 this.rsInfo = 502 new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo 503 .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 504 this.protocolVersion = 0; 505 this.baseDN = null; 506 this.windowSize = 0; 507 this.connectedDSs = connectedDSs; 508 this.connectedDSNumber = connectedDSs.size(); 509 this.sslEncryption = false; 510 this.degradedStatusThreshold = -1; 511 this.serverState = new ServerState(); 512 } 513 514 /** 515 * Get the server state. 516 * @return The server state 517 */ 518 public ServerState getServerState() 519 { 520 return serverState; 521 } 522 523 /** 524 * Get the group id. 525 * @return The group id 526 */ 527 public byte getGroupId() 528 { 529 return rsInfo.getGroupId(); 530 } 531 532 /** 533 * Get the server protocol version. 534 * @return the protocolVersion 535 */ 536 public short getProtocolVersion() 537 { 538 return protocolVersion; 539 } 540 541 /** 542 * Get the generation id. 543 * @return the generationId 544 */ 545 public long getGenerationId() 546 { 547 return rsInfo.getGenerationId(); 548 } 549 550 /** 551 * Get the server id. 552 * @return the serverId 553 */ 554 public int getServerId() 555 { 556 return rsInfo.getId(); 557 } 558 559 /** 560 * Get the server URL. 561 * @return the serverURL 562 */ 563 public String getServerURL() 564 { 565 return rsInfo.getServerUrl(); 566 } 567 568 /** 569 * Get the base DN. 570 * 571 * @return the base DN 572 */ 573 public DN getBaseDN() 574 { 575 return baseDN; 576 } 577 578 /** 579 * Get the window size. 580 * @return the windowSize 581 */ 582 public int getWindowSize() 583 { 584 return windowSize; 585 } 586 587 /** 588 * Get the ssl encryption. 589 * @return the sslEncryption 590 */ 591 public boolean isSslEncryption() 592 { 593 return sslEncryption; 594 } 595 596 /** 597 * Get the degraded status threshold. 598 * @return the degradedStatusThreshold 599 */ 600 public int getDegradedStatusThreshold() 601 { 602 return degradedStatusThreshold; 603 } 604 605 /** 606 * Get the weight. 607 * @return the weight. Null if this object is a wrapper for 608 * a ReplServerStartMsg. 609 */ 610 public int getWeight() 611 { 612 return rsInfo.getWeight(); 613 } 614 615 /** 616 * Get the connected DS number. 617 * @return the connectedDSNumber. Null if this object is a wrapper for 618 * a ReplServerStartMsg. 619 */ 620 public int getConnectedDSNumber() 621 { 622 return connectedDSNumber; 623 } 624 625 /** 626 * Converts the object to a RSInfo object. 627 * @return The RSInfo object matching this object. 628 */ 629 RSInfo toRSInfo() 630 { 631 return rsInfo; 632 } 633 634 /** 635 * Updates replication server info with the passed RSInfo internal values 636 * and the passed connected DSs. 637 * @param rsInfo The RSinfo to use for the update 638 * @param connectedDSs The new connected DSs 639 */ 640 private void update(RSInfo rsInfo, Set<Integer> connectedDSs) 641 { 642 this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(), 643 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 644 this.connectedDSs = connectedDSs; 645 this.connectedDSNumber = connectedDSs.size(); 646 } 647 648 private void setServerURL(String newServerURL) 649 { 650 rsInfo = new RSInfo(rsInfo.getId(), newServerURL, 651 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 652 } 653 654 /** 655 * Updates replication server info with the passed server state. 656 * @param serverState The ServerState to use for the update 657 */ 658 private void update(ServerState serverState) 659 { 660 this.serverState.update(serverState); 661 } 662 663 /** 664 * Get the getConnectedDSs. 665 * @return the getConnectedDSs 666 */ 667 public Set<Integer> getConnectedDSs() 668 { 669 return connectedDSs; 670 } 671 672 /** 673 * Gets the locally configured status for this RS. 674 * @return the locallyConfigured 675 */ 676 public boolean isLocallyConfigured() 677 { 678 return locallyConfigured; 679 } 680 681 /** 682 * Sets the locally configured status for this RS. 683 * @param locallyConfigured the locallyConfigured to set 684 */ 685 public void setLocallyConfigured(boolean locallyConfigured) 686 { 687 this.locallyConfigured = locallyConfigured; 688 } 689 690 /** 691 * Returns a string representation of this object. 692 * @return A string representation of this object. 693 */ 694 @Override 695 public String toString() 696 { 697 return "ReplServerInfo Url:" + getServerURL() 698 + " ServerId:" + getServerId() 699 + " GroupId:" + getGroupId() 700 + " connectedDSs:" + connectedDSs; 701 } 702 } 703 704 /** 705 * Contacts all replication servers to get information from them and being 706 * able to choose the more suitable. 707 * @return the collected information. 708 */ 709 private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() 710 { 711 final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>(); 712 713 final UnreachableReplicationServers unreachableReplicationServers = getUnreachableReplicationServers(); 714 for (String serverUrl : getReplicationServerUrls()) 715 { 716 if (unreachableReplicationServers.isUnreachable(HostPort.valueOf(serverUrl))) 717 { 718 continue; 719 } 720 // Connect to server + get and store info about it 721 final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); 722 final ReplicationServerInfo rsInfo = rs.rsInfo; 723 if (rsInfo != null) 724 { 725 rsInfos.put(rsInfo.getServerId(), rsInfo); 726 } 727 } 728 729 return rsInfos; 730 } 731 732 /** 733 * Connect to a ReplicationServer. 734 * 735 * Handshake sequences between a DS and a RS is divided into 2 logical 736 * consecutive phases (phase 1 and phase 2). DS always initiates connection 737 * and always sends first message: 738 * 739 * DS<->RS: 740 * ------- 741 * 742 * phase 1: 743 * DS --- ServerStartMsg ---> RS 744 * DS <--- ReplServerStartDSMsg --- RS 745 * phase 2: 746 * DS --- StartSessionMsg ---> RS 747 * DS <--- TopologyMsg --- RS 748 * 749 * Before performing a full handshake sequence, DS searches for best suitable 750 * RS by making only phase 1 handshake to every RS he knows then closing 751 * connection. This allows to gather information on available RSs and then 752 * decide with which RS the full handshake (phase 1 then phase 2) will be 753 * finally performed. 754 * 755 * @throws NumberFormatException address was invalid 756 */ 757 private void connectAsDataServer() 758 { 759 /* 760 * If a first connect or a connection failure occur, we go through here. 761 * force status machine to NOT_CONNECTED_STATUS so that monitoring can see 762 * that we are not connected. 763 */ 764 domain.toNotConnectedStatus(); 765 766 /* 767 Stop any existing heartbeat monitor and changeTime publisher 768 from a previous session. 769 */ 770 stopRSHeartBeatMonitoring(); 771 stopChangeTimeHeartBeatPublishing(); 772 mustRunBestServerCheckingAlgorithm = 0; 773 774 synchronized (connectPhaseLock) 775 { 776 final int serverId = getServerId(); 777 final DN baseDN = getBaseDN(); 778 779 /* 780 * Connect to each replication server and get their ServerState then find 781 * out which one is the best to connect to. 782 */ 783 if (logger.isTraceEnabled()) 784 { 785 debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one"); 786 } 787 788 // Get info from every available replication servers 789 Map<Integer, ReplicationServerInfo> rsInfos = collectReplicationServersInfo(); 790 computeNewTopology(toRSInfos(rsInfos)); 791 792 if (rsInfos.isEmpty()) 793 { 794 setConnectedRS(ConnectedRS.noConnectedRS()); 795 } 796 else 797 { 798 // At least one server answered, find the best one. 799 RSEvaluations evals = computeBestReplicationServer(true, -1, state, 800 rsInfos, serverId, getGroupId(), getGenerationID()); 801 802 // Best found, now initialize connection to this one (handshake phase 1) 803 if (logger.isTraceEnabled()) 804 { 805 debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS()); 806 } 807 808 final ConnectedRS electedRS = performPhaseOneHandshake( 809 evals.getBestRS().getServerURL(), true); 810 final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; 811 if (electedRsInfo != null) 812 { 813 /* 814 Update replication server info with potentially more up to date 815 data (server state for instance may have changed) 816 */ 817 rsInfos.put(electedRsInfo.getServerId(), electedRsInfo); 818 819 // Handshake phase 1 exchange went well 820 821 // Compute in which status we are starting the session to tell the RS 822 final ServerStatus initStatus = computeInitialServerStatus( 823 electedRsInfo.getGenerationId(), electedRsInfo.getServerState(), 824 electedRsInfo.getDegradedStatusThreshold(), getGenerationID()); 825 826 // Perform session start (handshake phase 2) 827 final TopologyMsg topologyMsg = 828 performPhaseTwoHandshake(electedRS, initStatus); 829 830 if (topologyMsg != null) // Handshake phase 2 exchange went well 831 { 832 connectToReplicationServer(electedRS, initStatus, topologyMsg); 833 } // Could perform handshake phase 2 with best 834 } // Could perform handshake phase 1 with best 835 } 836 837 // connectedRS has been updated by calls above, reload it 838 final ConnectedRS rs = connectedRS.get(); 839 if (rs.isConnected()) 840 { 841 connectPhaseLock.notify(); 842 843 final long rsGenId = rs.rsInfo.getGenerationId(); 844 final int rsServerId = rs.rsInfo.getServerId(); 845 if (rsGenId == getGenerationID() || rsGenId == -1) 846 { 847 logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 848 rs.replicationServer, getGenerationID()); 849 } 850 else 851 { 852 logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 853 rs.replicationServer, getGenerationID(), rsGenId); 854 } 855 } 856 else 857 { 858 // This server could not find any replicationServer. 859 // It's going to start in degraded mode. Log a message. 860 if (!connectionError) 861 { 862 connectionError = true; 863 connectPhaseLock.notify(); 864 865 if (!rsInfos.isEmpty()) 866 { 867 logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN, 868 Utils.joinAsString(", ", rsInfos.keySet())); 869 } 870 else 871 { 872 logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN); 873 } 874 } 875 } 876 } 877 } 878 879 private void computeNewTopology(List<RSInfo> newRSInfos) 880 { 881 final int rsServerId = getRsServerId(); 882 883 Topology oldTopo; 884 Topology newTopo; 885 do 886 { 887 oldTopo = topology.get(); 888 newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(), 889 rsServerId, getReplicationServerUrls(), oldTopo.rsInfos); 890 } 891 while (!topology.compareAndSet(oldTopo, newTopo)); 892 893 if (logger.isTraceEnabled()) 894 { 895 debugInfo(topologyChange(rsServerId, oldTopo, newTopo)); 896 } 897 } 898 899 private StringBuilder topologyChange(int rsServerId, Topology oldTopo, 900 Topology newTopo) 901 { 902 final StringBuilder sb = new StringBuilder(); 903 sb.append("rsServerId=").append(rsServerId); 904 if (newTopo.equals(oldTopo)) 905 { 906 sb.append(", unchangedTopology=").append(newTopo); 907 } 908 else 909 { 910 sb.append(", oldTopology=").append(oldTopo); 911 sb.append(", newTopology=").append(newTopo); 912 } 913 return sb; 914 } 915 916 /** 917 * Connects to a replication server. 918 * 919 * @param rs 920 * the Replication Server to connect to 921 * @param initStatus 922 * The status to enter the state machine with 923 * @param topologyMsg 924 * the message containing the topology information 925 */ 926 private void connectToReplicationServer(ConnectedRS rs, 927 ServerStatus initStatus, TopologyMsg topologyMsg) 928 { 929 final DN baseDN = getBaseDN(); 930 final ReplicationServerInfo rsInfo = rs.rsInfo; 931 932 boolean connectCompleted = false; 933 try 934 { 935 maxSendWindow = rsInfo.getWindowSize(); 936 937 receiveTopo(topologyMsg, rs.getServerId()); 938 939 /* 940 Log a message to let the administrator know that the failure was resolved. 941 Wake up all the thread that were waiting on the window 942 on the previous connection. 943 */ 944 connectionError = false; 945 if (sendWindow != null) 946 { 947 /* 948 * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding 949 * this semaphore will get blocked when they acquire it. However, we 950 * also need to make sure that we don't overflow the semaphore by 951 * releasing too many permits. 952 */ 953 final int MAX_PERMITS = Integer.MAX_VALUE >>> 2; 954 if (sendWindow.availablePermits() < MAX_PERMITS) 955 { 956 /* 957 * At least 2^29 acquisitions would need to occur for this to be 958 * insufficient. In addition, at least 2^30 releases would need to 959 * occur for this to potentially overflow. Hopefully this is unlikely 960 * to happen. 961 */ 962 sendWindow.release(MAX_PERMITS); 963 } 964 } 965 sendWindow = new Semaphore(maxSendWindow); 966 rcvWindow = getMaxRcvWindow(); 967 968 domain.sessionInitiated(initStatus, rsInfo.getServerState()); 969 970 final byte groupId = getGroupId(); 971 if (rs.getGroupId() != groupId) 972 { 973 /* 974 Connected to replication server with wrong group id: 975 warn user and start heartbeat monitor to recover when a server 976 with the right group id shows up. 977 */ 978 logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID, 979 groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId()); 980 } 981 startRSHeartBeatMonitoring(rs); 982 if (rsInfo.getProtocolVersion() >= 983 ProtocolVersion.REPLICATION_PROTOCOL_V3) 984 { 985 startChangeTimeHeartBeatPublishing(rs); 986 } 987 connectCompleted = true; 988 } 989 catch (Exception e) 990 { 991 logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(), 992 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 993 } 994 finally 995 { 996 if (!connectCompleted) 997 { 998 setConnectedRS(ConnectedRS.noConnectedRS()); 999 } 1000 } 1001 } 1002 1003 /** 1004 * Determines the status we are starting with according to our state and the 1005 * RS state. 1006 * 1007 * @param rsGenId The generation id of the RS 1008 * @param rsState The server state of the RS 1009 * @param degradedStatusThreshold The degraded status threshold of the RS 1010 * @param dsGenId The local generation id 1011 * @return The initial status 1012 */ 1013 private ServerStatus computeInitialServerStatus(long rsGenId, 1014 ServerState rsState, int degradedStatusThreshold, long dsGenId) 1015 { 1016 if (rsGenId == -1) 1017 { 1018 // RS has no generation id 1019 return ServerStatus.NORMAL_STATUS; 1020 } 1021 else if (rsGenId != dsGenId) 1022 { 1023 // DS and RS do not have same generation id 1024 return ServerStatus.BAD_GEN_ID_STATUS; 1025 } 1026 else 1027 { 1028 /* 1029 DS and RS have same generation id 1030 1031 Determine if we are late or not to replay changes. RS uses a 1032 threshold value for pending changes to be replayed by a DS to 1033 determine if the DS is in normal status or in degraded status. 1034 Let's compare the local and remote server state using this threshold 1035 value to determine if we are late or not 1036 */ 1037 1038 int nChanges = ServerState.diffChanges(rsState, state); 1039 if (logger.isTraceEnabled()) 1040 { 1041 debugInfo("computed " + nChanges + " changes late."); 1042 } 1043 1044 /* 1045 Check status to know if it is relevant to change the status. Do not 1046 take RSD lock to test. If we attempt to change the status whereas 1047 we are in a status that do not allows that, this will be noticed by 1048 the changeStatusFromStatusAnalyzer method. This allows to take the 1049 lock roughly only when needed versus every sleep time timeout. 1050 */ 1051 if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold) 1052 { 1053 return ServerStatus.DEGRADED_STATUS; 1054 } 1055 // degradedStatusThreshold value of '0' means no degrading system used 1056 // (no threshold): force normal status 1057 return ServerStatus.NORMAL_STATUS; 1058 } 1059 } 1060 1061 1062 1063 /** 1064 * Connect to the provided server performing the first phase handshake (start 1065 * messages exchange) and return the reply message from the replication 1066 * server, wrapped in a ReplicationServerInfo object. 1067 * 1068 * @param serverURL 1069 * Server to connect to. 1070 * @param keepSession 1071 * Do we keep session opened or not after handshake. Use true if want 1072 * to perform handshake phase 2 with the same session and keep the 1073 * session to create as the current one. 1074 * @return The answer from the server . Null if could not get an answer. 1075 */ 1076 private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) 1077 { 1078 Session newSession = null; 1079 Socket socket = null; 1080 boolean hasConnected = false; 1081 LocalizableMessage errorMessage = null; 1082 1083 try 1084 { 1085 // Open a socket connection to the next candidate. 1086 socket = new Socket(); 1087 socket.setReceiveBufferSize(1000000); 1088 socket.setTcpNoDelay(true); 1089 if (config.getSourceAddress() != null) 1090 { 1091 InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0); 1092 socket.bind(local); 1093 } 1094 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 1095 socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); 1096 newSession = replSessionSecurity.createClientSession(socket, timeoutMS); 1097 boolean isSslEncryption = replSessionSecurity.isSslEncryption(); 1098 1099 // Send our ServerStartMsg. 1100 final HostPort hp = new HostPort( 1101 socket.getLocalAddress().getHostName(), socket.getLocalPort()); 1102 final String url = hp.toString(); 1103 final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), 1104 getMaxRcvWindow(), config.getHeartbeatInterval(), state, 1105 getGenerationID(), isSslEncryption, getGroupId()); 1106 newSession.publish(serverStartMsg); 1107 1108 // Read the ReplServerStartMsg or ReplServerStartDSMsg that should 1109 // come back. 1110 ReplicationMsg msg = newSession.receive(); 1111 if (logger.isTraceEnabled()) 1112 { 1113 debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n" 1114 + msg); 1115 } 1116 1117 // Wrap received message in a server info object 1118 final ReplicationServerInfo replServerInfo = 1119 ReplicationServerInfo.newInstance(msg, serverURL); 1120 1121 // Sanity check 1122 final DN repDN = replServerInfo.getBaseDN(); 1123 if (!getBaseDN().equals(repDN)) 1124 { 1125 errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN()); 1126 return setConnectedRS(ConnectedRS.noConnectedRS()); 1127 } 1128 1129 /* 1130 * We have sent our own protocol version to the replication server. The 1131 * replication server will use the same one (or an older one if it is an 1132 * old replication server). 1133 */ 1134 newSession.setProtocolVersion( 1135 getCompatibleVersion(replServerInfo.getProtocolVersion())); 1136 1137 if (!isSslEncryption) 1138 { 1139 newSession.stopEncryption(); 1140 } 1141 1142 hasConnected = true; 1143 1144 if (keepSession) 1145 { 1146 // cannot store it yet, 1147 // only store after a successful phase two handshake 1148 return new ConnectedRS(replServerInfo, newSession); 1149 } 1150 return new ConnectedRS(replServerInfo, null); 1151 } 1152 catch (ConnectException e) 1153 { 1154 logger.traceException(e); 1155 errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN()); 1156 } 1157 catch (SocketTimeoutException e) 1158 { 1159 logger.traceException(e); 1160 errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN()); 1161 } 1162 catch (Exception e) 1163 { 1164 logger.traceException(e); 1165 errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( 1166 getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e)); 1167 } 1168 finally 1169 { 1170 if (!hasConnected || !keepSession) 1171 { 1172 close(newSession); 1173 close(socket); 1174 } 1175 1176 if (!hasConnected && errorMessage != null && !connectionError) 1177 { 1178 // There was no server waiting on this host:port 1179 // Log a notice and will try the next replicationServer in the list 1180 if (keepSession) // Log error message only for final connection 1181 { 1182 // log the error message only once to avoid overflowing the error log 1183 logger.error(errorMessage); 1184 } 1185 1186 logger.trace(errorMessage); 1187 } 1188 } 1189 return setConnectedRS(ConnectedRS.noConnectedRS()); 1190 } 1191 1192 /** 1193 * Performs the second phase handshake (send StartSessionMsg and receive 1194 * TopologyMsg messages exchange) and return the reply message from the 1195 * replication server. 1196 * 1197 * @param electedRS Server we are connecting with. 1198 * @param initStatus The status we are starting with 1199 * @return The ReplServerStartMsg the server replied. Null if could not 1200 * get an answer. 1201 */ 1202 private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS, 1203 ServerStatus initStatus) 1204 { 1205 try 1206 { 1207 // Send our StartSessionMsg. 1208 final StartSessionMsg startSessionMsg; 1209 startSessionMsg = new StartSessionMsg( 1210 initStatus, 1211 domain.getRefUrls(), 1212 domain.isAssured(), 1213 domain.getAssuredMode(), 1214 domain.getAssuredSdLevel()); 1215 startSessionMsg.setEclIncludes( 1216 domain.getEclIncludes(domain.getServerId()), 1217 domain.getEclIncludesForDeletes(domain.getServerId())); 1218 final Session session = electedRS.session; 1219 session.publish(startSessionMsg); 1220 1221 // Read the TopologyMsg that should come back. 1222 final TopologyMsg topologyMsg = (TopologyMsg) session.receive(); 1223 1224 if (logger.isTraceEnabled()) 1225 { 1226 debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg 1227 + "\nAND RECEIVED:\n" + topologyMsg); 1228 } 1229 1230 // Alright set the timeout to the desired value 1231 session.setSoTimeout(timeout); 1232 setConnectedRS(electedRS); 1233 return topologyMsg; 1234 } 1235 catch (Exception e) 1236 { 1237 logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE, 1238 getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e)); 1239 1240 setConnectedRS(ConnectedRS.noConnectedRS()); 1241 return null; 1242 } 1243 } 1244 1245 /** 1246 * Class holding evaluation results for electing the best replication server 1247 * for the local directory server. 1248 */ 1249 static class RSEvaluations 1250 { 1251 private final int localServerId; 1252 private Map<Integer, ReplicationServerInfo> bestRSs; 1253 private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>(); 1254 1255 /** 1256 * Ctor. 1257 * 1258 * @param localServerId 1259 * the serverId for the local directory server 1260 * @param rsInfos 1261 * a Map of serverId => {@link ReplicationServerInfo} with all the 1262 * candidate replication servers 1263 */ 1264 RSEvaluations(int localServerId, 1265 Map<Integer, ReplicationServerInfo> rsInfos) 1266 { 1267 this.localServerId = localServerId; 1268 this.bestRSs = rsInfos; 1269 } 1270 1271 private boolean keepBest(LocalEvaluation eval) 1272 { 1273 if (eval.hasAcceptedAny()) 1274 { 1275 bestRSs = eval.getAccepted(); 1276 rsEvals.putAll(eval.getRejected()); 1277 return true; 1278 } 1279 return false; 1280 } 1281 1282 /** 1283 * Sets the elected best replication server, rejecting all the other 1284 * replication servers with the supplied evaluation. 1285 * 1286 * @param bestRsId 1287 * the serverId of the elected replication server 1288 * @param rejectedRSsEval 1289 * the evaluation for all the rejected replication servers 1290 */ 1291 private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval) 1292 { 1293 for (Iterator<Entry<Integer, ReplicationServerInfo>> it = 1294 this.bestRSs.entrySet().iterator(); it.hasNext();) 1295 { 1296 final Entry<Integer, ReplicationServerInfo> entry = it.next(); 1297 final Integer rsId = entry.getKey(); 1298 final ReplicationServerInfo rsInfo = entry.getValue(); 1299 if (rsInfo.getServerId() != bestRsId) 1300 { 1301 it.remove(); 1302 } 1303 rsEvals.put(rsId, rejectedRSsEval); 1304 } 1305 } 1306 1307 private void discardAll(LocalizableMessage eval) 1308 { 1309 for (Integer rsId : bestRSs.keySet()) 1310 { 1311 rsEvals.put(rsId, eval); 1312 } 1313 } 1314 1315 private boolean foundBestRS() 1316 { 1317 return bestRSs.size() == 1; 1318 } 1319 1320 /** 1321 * Returns the {@link ReplicationServerInfo} for the best replication 1322 * server. 1323 * 1324 * @return the {@link ReplicationServerInfo} for the best replication server 1325 */ 1326 ReplicationServerInfo getBestRS() 1327 { 1328 if (foundBestRS()) 1329 { 1330 return bestRSs.values().iterator().next(); 1331 } 1332 return null; 1333 } 1334 1335 /** 1336 * Returns the evaluations for all the candidate replication servers. 1337 * 1338 * @return a Map of serverId => LocalizableMessage containing the evaluation for each 1339 * candidate replication servers. 1340 */ 1341 Map<Integer, LocalizableMessage> getEvaluations() 1342 { 1343 if (foundBestRS()) 1344 { 1345 final Integer bestRSServerId = getBestRS().getServerId(); 1346 if (rsEvals.get(bestRSServerId) == null) 1347 { 1348 final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId); 1349 rsEvals.put(bestRSServerId, eval); 1350 } 1351 } 1352 return Collections.unmodifiableMap(rsEvals); 1353 } 1354 1355 /** 1356 * Returns the evaluation for the supplied replication server Id. 1357 * <p> 1358 * Note: "unknown RS" message is returned if the supplied replication server 1359 * was not part of the candidate replication servers. 1360 * 1361 * @param rsServerId 1362 * the supplied replication server Id 1363 * @return the evaluation {@link LocalizableMessage} for the supplied replication 1364 * server Id 1365 */ 1366 private LocalizableMessage getEvaluation(int rsServerId) 1367 { 1368 final LocalizableMessage evaluation = getEvaluations().get(rsServerId); 1369 if (evaluation != null) 1370 { 1371 return evaluation; 1372 } 1373 return NOTE_UNKNOWN_RS.get(rsServerId, localServerId); 1374 } 1375 1376 /** {@inheritDoc} */ 1377 @Override 1378 public String toString() 1379 { 1380 return "Current best replication server Ids: " + bestRSs.keySet() 1381 + ", Evaluation of connected replication servers" 1382 + " (ServerId => Evaluation): " + rsEvals.keySet() 1383 + ", Any replication server not appearing here" 1384 + " could not be contacted."; 1385 } 1386 } 1387 1388 /** 1389 * Evaluation local to one filter. 1390 */ 1391 private static class LocalEvaluation 1392 { 1393 private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>(); 1394 private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>(); 1395 1396 private void accept(Integer rsId, ReplicationServerInfo rsInfo) 1397 { 1398 // forget previous eval, including undoing reject 1399 this.rsEvals.remove(rsInfo); 1400 this.accepted.put(rsId, rsInfo); 1401 } 1402 1403 private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason) 1404 { 1405 this.accepted.remove(rsInfo.getServerId()); // undo accept 1406 this.rsEvals.put(rsInfo, reason); 1407 } 1408 1409 private Map<Integer, ReplicationServerInfo> getAccepted() 1410 { 1411 return accepted; 1412 } 1413 1414 private ReplicationServerInfo[] getAcceptedRSInfos() 1415 { 1416 return accepted.values().toArray( 1417 new ReplicationServerInfo[accepted.size()]); 1418 } 1419 1420 public Map<Integer, LocalizableMessage> getRejected() 1421 { 1422 final Map<Integer, LocalizableMessage> result = new HashMap<>(); 1423 for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet()) 1424 { 1425 result.put(entry.getKey().getServerId(), entry.getValue()); 1426 } 1427 return result; 1428 } 1429 1430 private boolean hasAcceptedAny() 1431 { 1432 return !accepted.isEmpty(); 1433 } 1434 1435 } 1436 1437 /** 1438 * Returns the replication server that best fits our need so that we can 1439 * connect to it or determine if we must disconnect from current one to 1440 * re-connect to best server. 1441 * <p> 1442 * Note: this method is static for test purpose (access from unit tests) 1443 * 1444 * @param firstConnection True if we run this method for the very first 1445 * connection of the broker. False if we run this method to determine if the 1446 * replication server we are currently connected to is still the best or not. 1447 * @param rsServerId The id of the replication server we are currently 1448 * connected to. Only used when firstConnection is false. 1449 * @param myState The local server state. 1450 * @param rsInfos The list of available replication servers and their 1451 * associated information (choice will be made among them). 1452 * @param localServerId The server id for the suffix we are working for. 1453 * @param groupId The groupId we prefer being connected to if possible 1454 * @param generationId The generation id we are using 1455 * @return The computed best replication server. If the returned value is 1456 * null, the best replication server is undetermined but the local server must 1457 * disconnect (so the best replication server is another one than the current 1458 * one). Null can only be returned when firstConnection is false. 1459 */ 1460 static RSEvaluations computeBestReplicationServer( 1461 boolean firstConnection, int rsServerId, ServerState myState, 1462 Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, 1463 byte groupId, long generationId) 1464 { 1465 final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos); 1466 // Shortcut, if only one server, this is the best 1467 if (evals.foundBestRS()) 1468 { 1469 return evals; 1470 } 1471 1472 /** 1473 * Apply some filtering criteria to determine the best servers list from 1474 * the available ones. The ordered list of criteria is (from more important 1475 * to less important): 1476 * - replication server has the same group id as the local DS one 1477 * - replication server has the same generation id as the local DS one 1478 * - replication server is up to date regarding changes generated by the 1479 * local DS 1480 * - replication server in the same VM as local DS one 1481 */ 1482 /* 1483 The list of best replication servers is filtered with each criteria. At 1484 each criteria, the list is replaced with the filtered one if there 1485 are some servers from the filtering, otherwise, the list is left as is 1486 and the new filtering for the next criteria is applied and so on. 1487 1488 Use only servers locally configured: those are servers declared in 1489 the local configuration. When the current method is called, for 1490 sure, at least one server from the list is locally configured 1491 */ 1492 filterServersLocallyConfigured(evals, localServerId); 1493 // Some servers with same group id ? 1494 filterServersWithSameGroupId(evals, localServerId, groupId); 1495 // Some servers with same generation id ? 1496 final boolean rssWithSameGenerationIdExist = 1497 filterServersWithSameGenerationId(evals, localServerId, generationId); 1498 if (rssWithSameGenerationIdExist) 1499 { 1500 // If some servers with the right generation id this is useful to 1501 // run the local DS change criteria 1502 filterServersWithAllLocalDSChanges(evals, myState, localServerId); 1503 } 1504 // Some servers in the local VM or local host? 1505 filterServersOnSameHost(evals, localServerId); 1506 1507 if (evals.foundBestRS()) 1508 { 1509 return evals; 1510 } 1511 1512 /** 1513 * Now apply the choice based on the weight to the best servers list 1514 */ 1515 if (firstConnection) 1516 { 1517 // We are not connected to a server yet 1518 computeBestServerForWeight(evals, -1, -1); 1519 } 1520 else 1521 { 1522 /* 1523 * We are already connected to a RS: compute the best RS as far as the 1524 * weights is concerned. If this is another one, some DS must disconnect. 1525 */ 1526 computeBestServerForWeight(evals, rsServerId, localServerId); 1527 } 1528 return evals; 1529 } 1530 1531 /** 1532 * Creates a new list that contains only replication servers that are locally 1533 * configured. 1534 * @param evals The evaluation object 1535 */ 1536 private static void filterServersLocallyConfigured(RSEvaluations evals, 1537 int localServerId) 1538 { 1539 final LocalEvaluation eval = new LocalEvaluation(); 1540 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1541 { 1542 final Integer rsId = entry.getKey(); 1543 final ReplicationServerInfo rsInfo = entry.getValue(); 1544 if (rsInfo.isLocallyConfigured()) 1545 { 1546 eval.accept(rsId, rsInfo); 1547 } 1548 else 1549 { 1550 eval.reject(rsInfo, 1551 NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId)); 1552 } 1553 } 1554 evals.keepBest(eval); 1555 } 1556 1557 /** 1558 * Creates a new list that contains only replication servers that have the 1559 * passed group id, from a passed replication server list. 1560 * @param evals The evaluation object 1561 * @param groupId The group id that must match 1562 */ 1563 private static void filterServersWithSameGroupId(RSEvaluations evals, 1564 int localServerId, byte groupId) 1565 { 1566 final LocalEvaluation eval = new LocalEvaluation(); 1567 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1568 { 1569 final Integer rsId = entry.getKey(); 1570 final ReplicationServerInfo rsInfo = entry.getValue(); 1571 if (rsInfo.getGroupId() == groupId) 1572 { 1573 eval.accept(rsId, rsInfo); 1574 } 1575 else 1576 { 1577 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get( 1578 rsId, rsInfo.getGroupId(), localServerId, groupId)); 1579 } 1580 } 1581 evals.keepBest(eval); 1582 } 1583 1584 /** 1585 * Creates a new list that contains only replication servers that have the 1586 * provided generation id, from a provided replication server list. 1587 * When the selected replication servers have no change (empty serverState) 1588 * then the 'empty'(generationId==-1) replication servers are also included 1589 * in the result list. 1590 * 1591 * @param evals The evaluation object 1592 * @param generationId The generation id that must match 1593 * @return whether some replication server passed the filter 1594 */ 1595 private static boolean filterServersWithSameGenerationId( 1596 RSEvaluations evals, long localServerId, long generationId) 1597 { 1598 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1599 final LocalEvaluation eval = new LocalEvaluation(); 1600 boolean emptyState = true; 1601 1602 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1603 { 1604 final Integer rsId = entry.getKey(); 1605 final ReplicationServerInfo rsInfo = entry.getValue(); 1606 if (rsInfo.getGenerationId() == generationId) 1607 { 1608 eval.accept(rsId, rsInfo); 1609 if (!rsInfo.serverState.isEmpty()) 1610 { 1611 emptyState = false; 1612 } 1613 } 1614 else if (rsInfo.getGenerationId() == -1) 1615 { 1616 eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId, 1617 generationId, localServerId)); 1618 } 1619 else 1620 { 1621 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get( 1622 rsId, rsInfo.getGenerationId(), localServerId, generationId)); 1623 } 1624 } 1625 1626 if (emptyState) 1627 { 1628 // If the RS with a generationId have all an empty state, 1629 // then the 'empty'(genId=-1) RSes are also candidate 1630 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1631 { 1632 ReplicationServerInfo rsInfo = entry.getValue(); 1633 if (rsInfo.getGenerationId() == -1) 1634 { 1635 // will undo the reject of previously rejected RSs 1636 eval.accept(entry.getKey(), rsInfo); 1637 } 1638 } 1639 } 1640 1641 return evals.keepBest(eval); 1642 } 1643 1644 /** 1645 * Creates a new list that contains only replication servers that have the 1646 * latest changes from the passed DS, from a passed replication server list. 1647 * @param evals The evaluation object 1648 * @param localState The state of the local DS 1649 * @param localServerId The server id to consider for the changes 1650 */ 1651 private static void filterServersWithAllLocalDSChanges( 1652 RSEvaluations evals, ServerState localState, int localServerId) 1653 { 1654 // Extract the CSN of the latest change generated by the local server 1655 final CSN localCSN = getCSN(localState, localServerId); 1656 1657 /** 1658 * Find replication servers that are up to date (or more up to date than us, 1659 * if for instance we failed and restarted, having sent some changes to the 1660 * RS but without having time to store our own state) regarding our own 1661 * server id. If some servers are more up to date, prefer this list but take 1662 * only the latest CSN. 1663 */ 1664 final LocalEvaluation mostUpToDateEval = new LocalEvaluation(); 1665 boolean foundRSMoreUpToDateThanLocalDS = false; 1666 CSN latestRsCSN = null; 1667 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1668 { 1669 final Integer rsId = entry.getKey(); 1670 final ReplicationServerInfo rsInfo = entry.getValue(); 1671 final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId); 1672 1673 // Has this replication server the latest local change ? 1674 if (rsCSN.isOlderThan(localCSN)) 1675 { 1676 mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get( 1677 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1678 } 1679 else if (rsCSN.equals(localCSN)) 1680 { 1681 // This replication server has exactly the latest change from the 1682 // local server 1683 if (!foundRSMoreUpToDateThanLocalDS) 1684 { 1685 mostUpToDateEval.accept(rsId, rsInfo); 1686 } 1687 else 1688 { 1689 mostUpToDateEval.reject(rsInfo, 1690 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1691 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1692 } 1693 } 1694 else if (rsCSN.isNewerThan(localCSN)) 1695 { 1696 // This replication server is even more up to date than the local server 1697 if (latestRsCSN == null) 1698 { 1699 foundRSMoreUpToDateThanLocalDS = true; 1700 // all previous results are now outdated, reject them all 1701 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1702 localCSN); 1703 // Initialize the latest CSN 1704 latestRsCSN = rsCSN; 1705 } 1706 1707 if (rsCSN.equals(latestRsCSN)) 1708 { 1709 mostUpToDateEval.accept(rsId, rsInfo); 1710 } 1711 else if (rsCSN.isNewerThan(latestRsCSN)) 1712 { 1713 // This RS is even more up to date, reject all previously accepted RSs 1714 // and store this new RS 1715 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1716 localCSN); 1717 mostUpToDateEval.accept(rsId, rsInfo); 1718 latestRsCSN = rsCSN; 1719 } 1720 else 1721 { 1722 mostUpToDateEval.reject(rsInfo, 1723 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1724 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1725 } 1726 } 1727 } 1728 evals.keepBest(mostUpToDateEval); 1729 } 1730 1731 private static CSN getCSN(ServerState state, int serverId) 1732 { 1733 final CSN csn = state.getCSN(serverId); 1734 if (csn != null) 1735 { 1736 return csn; 1737 } 1738 return new CSN(0, 0, serverId); 1739 } 1740 1741 private static void rejectAllWithRSIsLaterThanBestRS( 1742 final LocalEvaluation eval, int localServerId, CSN localCSN) 1743 { 1744 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1745 { 1746 final String rsCSN = 1747 getCSN(rsInfo.getServerState(), localServerId).toStringUI(); 1748 final LocalizableMessage reason = 1749 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1750 rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI()); 1751 eval.reject(rsInfo, reason); 1752 } 1753 } 1754 1755 /** 1756 * Creates a new list that contains only replication servers that are on the 1757 * same host as the local DS, from a passed replication server list. This 1758 * method will gives priority to any replication server which is in the same 1759 * VM as this DS. 1760 * 1761 * @param evals The evaluation object 1762 */ 1763 private static void filterServersOnSameHost(RSEvaluations evals, 1764 int localServerId) 1765 { 1766 /* 1767 * Initially look for all servers on the same host. If we find one in the 1768 * same VM, then narrow the search. 1769 */ 1770 boolean foundRSInSameVM = false; 1771 final LocalEvaluation eval = new LocalEvaluation(); 1772 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1773 { 1774 final Integer rsId = entry.getKey(); 1775 final ReplicationServerInfo rsInfo = entry.getValue(); 1776 final HostPort hp = HostPort.valueOf(rsInfo.getServerURL()); 1777 if (hp.isLocalAddress()) 1778 { 1779 if (isLocalReplicationServerPort(hp.getPort())) 1780 { 1781 if (!foundRSInSameVM) 1782 { 1783 // An RS in the same VM will always have priority. 1784 // Narrow the search to only include servers in this VM. 1785 rejectAllWithRSOnDifferentVMThanDS(eval, localServerId); 1786 foundRSInSameVM = true; 1787 } 1788 eval.accept(rsId, rsInfo); 1789 } 1790 else if (!foundRSInSameVM) 1791 { 1792 // OK, accept RSs on the same machine because we have not found an RS 1793 // in the same VM yet 1794 eval.accept(rsId, rsInfo); 1795 } 1796 else 1797 { 1798 // Skip: we have found some RSs in the same VM, but this RS is not. 1799 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId, 1800 localServerId)); 1801 } 1802 } 1803 else 1804 { 1805 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId, 1806 localServerId)); 1807 } 1808 } 1809 evals.keepBest(eval); 1810 } 1811 1812 private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval, 1813 int localServerId) 1814 { 1815 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1816 { 1817 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get( 1818 rsInfo.getServerId(), localServerId)); 1819 } 1820 } 1821 1822 /** 1823 * Computes the best replication server the local server should be connected 1824 * to so that the load is correctly spread across the topology, following the 1825 * weights guidance. 1826 * Warning: This method is expected to be called with at least 2 servers in 1827 * bestServers 1828 * Note: this method is static for test purpose (access from unit tests) 1829 * @param evals The evaluation object 1830 * @param currentRsServerId The replication server the local server is 1831 * currently connected to. -1 if the local server is not yet connected 1832 * to any replication server. 1833 * @param localServerId The server id of the local server. This is not used 1834 * when it is not connected to a replication server 1835 * (currentRsServerId = -1) 1836 */ 1837 static void computeBestServerForWeight(RSEvaluations evals, 1838 int currentRsServerId, int localServerId) 1839 { 1840 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1841 /* 1842 * - Compute the load goal of each RS, deducing it from the weights affected 1843 * to them. 1844 * - Compute the current load of each RS, deducing it from the DSs 1845 * currently connected to them. 1846 * - Compute the differences between the load goals and the current loads of 1847 * the RSs. 1848 */ 1849 // Sum of the weights 1850 int sumOfWeights = 0; 1851 // Sum of the connected DSs 1852 int sumOfConnectedDSs = 0; 1853 for (ReplicationServerInfo rsInfo : bestServers.values()) 1854 { 1855 sumOfWeights += rsInfo.getWeight(); 1856 sumOfConnectedDSs += rsInfo.getConnectedDSNumber(); 1857 } 1858 1859 // Distance (difference) of the current loads to the load goals of each RS: 1860 // key:server id, value: distance 1861 Map<Integer, BigDecimal> loadDistances = new HashMap<>(); 1862 // Precision for the operations (number of digits after the dot) 1863 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1864 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1865 { 1866 final Integer rsId = entry.getKey(); 1867 final ReplicationServerInfo rsInfo = entry.getValue(); 1868 1869 // load goal = rs weight / sum of weights 1870 BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide( 1871 BigDecimal.valueOf(sumOfWeights), mathContext); 1872 BigDecimal currentLoadBd = BigDecimal.ZERO; 1873 if (sumOfConnectedDSs != 0) 1874 { 1875 // current load = number of connected DSs / total number of DSs 1876 int connectedDSs = rsInfo.getConnectedDSNumber(); 1877 currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( 1878 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 1879 } 1880 // load distance = load goal - current load 1881 BigDecimal loadDistanceBd = 1882 loadGoalBd.subtract(currentLoadBd, mathContext); 1883 loadDistances.put(rsId, loadDistanceBd); 1884 } 1885 1886 if (currentRsServerId == -1) 1887 { 1888 // The local server is not connected yet, find best server to connect to, 1889 // taking the weights into account. 1890 computeBestServerWhenNotConnected(evals, loadDistances, localServerId); 1891 } 1892 else 1893 { 1894 // The local server is currently connected to a RS, let's see if it must 1895 // disconnect or not, taking the weights into account. 1896 computeBestServerWhenConnected(evals, loadDistances, localServerId, 1897 currentRsServerId, sumOfWeights, sumOfConnectedDSs); 1898 } 1899 } 1900 1901 private static void computeBestServerWhenNotConnected(RSEvaluations evals, 1902 Map<Integer, BigDecimal> loadDistances, int localServerId) 1903 { 1904 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1905 /* 1906 * Find the server with the current highest distance to its load goal and 1907 * choose it. Make an exception if every server is correctly balanced, 1908 * that is every current load distances are equal to 0, in that case, 1909 * choose the server with the highest weight 1910 */ 1911 int bestRsId = 0; // If all server equal, return the first one 1912 float highestDistance = Float.NEGATIVE_INFINITY; 1913 boolean allRsWithZeroDistance = true; 1914 int highestWeightRsId = -1; 1915 int highestWeight = -1; 1916 for (Integer rsId : bestServers.keySet()) 1917 { 1918 float loadDistance = loadDistances.get(rsId).floatValue(); 1919 if (loadDistance > highestDistance) 1920 { 1921 // This server is far more from its balance point 1922 bestRsId = rsId; 1923 highestDistance = loadDistance; 1924 } 1925 if (loadDistance != 0) 1926 { 1927 allRsWithZeroDistance = false; 1928 } 1929 int weight = bestServers.get(rsId).getWeight(); 1930 if (weight > highestWeight) 1931 { 1932 // This server has a higher weight 1933 highestWeightRsId = rsId; 1934 highestWeight = weight; 1935 } 1936 } 1937 // All servers with a 0 distance ? 1938 if (allRsWithZeroDistance) 1939 { 1940 // Choose server with the highest weight 1941 bestRsId = highestWeightRsId; 1942 } 1943 evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId, 1944 bestRsId)); 1945 } 1946 1947 private static void computeBestServerWhenConnected(RSEvaluations evals, 1948 Map<Integer, BigDecimal> loadDistances, int localServerId, 1949 int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) 1950 { 1951 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1952 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1953 float currentLoadDistance = 1954 loadDistances.get(currentRsServerId).floatValue(); 1955 if (currentLoadDistance < 0) 1956 { 1957 /* 1958 Too much DSs connected to the current RS, compared with its load 1959 goal: 1960 Determine the potential number of DSs to disconnect from the current 1961 RS and see if the local DS is part of them: the DSs that must 1962 disconnect are those with the lowest server id. 1963 Compute the sum of the distances of the load goals of the other RSs 1964 */ 1965 BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; 1966 for (Integer rsId : bestServers.keySet()) 1967 { 1968 if (rsId != currentRsServerId) 1969 { 1970 sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( 1971 loadDistances.get(rsId), mathContext); 1972 } 1973 } 1974 1975 if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) 1976 { 1977 /* 1978 The average distance of the other RSs shows a lack of DSs. 1979 Compute the number of DSs to disconnect from the current RS, 1980 rounding to the nearest integer number. Do only this if there is 1981 no risk of yoyo effect: when the exact balance cannot be 1982 established due to the current number of DSs connected, do not 1983 disconnect a DS. A simple example where the balance cannot be 1984 reached is: 1985 - RS1 has weight 1 and 2 DSs 1986 - RS2 has weight 1 and 1 DS 1987 => disconnecting a DS from RS1 to reconnect it to RS2 would have no 1988 sense as this would lead to the reverse situation. In that case, 1989 the perfect balance cannot be reached and we must stick to the 1990 current situation, otherwise the DS would keep move between the 2 1991 RSs 1992 */ 1993 float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. 1994 multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) 1995 .floatValue(); 1996 int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); 1997 1998 // Avoid yoyo effect 1999 if (overloadingDSsNumber == 1) 2000 { 2001 // What would be the new load distance for the current RS if 2002 // we disconnect some DSs ? 2003 ReplicationServerInfo currentReplicationServerInfo = 2004 bestServers.get(currentRsServerId); 2005 2006 int currentRsWeight = currentReplicationServerInfo.getWeight(); 2007 BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); 2008 BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); 2009 BigDecimal currentRsLoadGoalBd = 2010 currentRsWeightBd.divide(sumOfWeightsBd, mathContext); 2011 BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; 2012 if (sumOfConnectedDSs != 0) 2013 { 2014 int connectedDSs = currentReplicationServerInfo. 2015 getConnectedDSNumber(); 2016 BigDecimal potentialNewConnectedDSsBd = 2017 BigDecimal.valueOf(connectedDSs - 1); 2018 BigDecimal sumOfConnectedDSsBd = 2019 BigDecimal.valueOf(sumOfConnectedDSs); 2020 potentialCurrentRsNewLoadBd = 2021 potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, 2022 mathContext); 2023 } 2024 BigDecimal potentialCurrentRsNewLoadDistanceBd = 2025 currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, 2026 mathContext); 2027 2028 // What would be the new load distance for the other RSs ? 2029 BigDecimal additionalDsLoadBd = 2030 BigDecimal.ONE.divide( 2031 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 2032 BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = 2033 sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, 2034 mathContext); 2035 2036 /* 2037 Now compare both values: we must not disconnect the DS if this 2038 is for going in a situation where the load distance of the other 2039 RSs is the opposite of the future load distance of the local RS 2040 or we would evaluate that we should disconnect just after being 2041 arrived on the new RS. But we should disconnect if we reach the 2042 perfect balance (both values are 0). 2043 */ 2044 if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd, 2045 potentialNewSumOfLoadDistancesOfOtherRSsBd)) 2046 { 2047 // Avoid the yoyo effect, and keep the local DS connected to its 2048 // current RS 2049 evals.setBestRS(currentRsServerId, 2050 NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId)); 2051 return; 2052 } 2053 } 2054 2055 ReplicationServerInfo currentRsInfo = 2056 bestServers.get(currentRsServerId); 2057 if (isServerOverloadingRS(localServerId, currentRsInfo, 2058 overloadingDSsNumber)) 2059 { 2060 // The local server is part of the DSs to disconnect 2061 evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get( 2062 localServerId, currentRsServerId)); 2063 } 2064 else 2065 { 2066 // The local server is not part of the servers to disconnect from the 2067 // current RS. 2068 evals.setBestRS(currentRsServerId, 2069 NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId, 2070 currentRsServerId)); 2071 } 2072 } else { 2073 // The average distance of the other RSs does not show a lack of DSs: 2074 // no need to disconnect any DS from the current RS. 2075 evals.setBestRS(currentRsServerId, 2076 NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId, 2077 currentRsServerId)); 2078 } 2079 } else { 2080 // The RS load goal is reached or there are not enough DSs connected to 2081 // it to reach it: do not disconnect from this RS and return rsInfo for 2082 // this RS 2083 evals.setBestRS(currentRsServerId, 2084 NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId, 2085 currentRsServerId)); 2086 } 2087 } 2088 2089 private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance, 2090 BigDecimal otherRSsNewSumOfLoadDistances) 2091 { 2092 final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN); 2093 final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx); 2094 final BigDecimal otherRSsSumOfLoadDistances = 2095 otherRSsNewSumOfLoadDistances.round(roundCtx); 2096 2097 return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0 2098 && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0; 2099 } 2100 2101 /** 2102 * Returns whether the local DS is overloading the RS. 2103 * <p> 2104 * There are an "overloadingDSsNumber" of DS overloading the RS. The list of 2105 * DSs connected to this RS is ordered by serverId to use a consistent 2106 * ordering across all nodes in the topology. The serverIds which index in the 2107 * List are lower than "overloadingDSsNumber" will be evicted first. 2108 * <p> 2109 * This ordering is unfair since nodes with the lower serverIds will be 2110 * evicted more often than nodes with higher serverIds. However, it is a 2111 * consistent and reliable ordering applicable anywhere in the topology. 2112 */ 2113 private static boolean isServerOverloadingRS(int localServerId, 2114 ReplicationServerInfo currentRsInfo, int overloadingDSsNumber) 2115 { 2116 List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs()); 2117 Collections.sort(serversConnectedToCurrentRS); 2118 2119 final int idx = serversConnectedToCurrentRS.indexOf(localServerId); 2120 return idx != -1 && idx < overloadingDSsNumber; 2121 } 2122 2123 /** 2124 * Start the heartbeat monitor thread. 2125 */ 2126 private void startRSHeartBeatMonitoring(ConnectedRS rs) 2127 { 2128 final long heartbeatInterval = config.getHeartbeatInterval(); 2129 if (heartbeatInterval > 0) 2130 { 2131 heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(), 2132 getBaseDN().toString(), rs.session, heartbeatInterval); 2133 heartbeatMonitor.start(); 2134 } 2135 } 2136 2137 /** 2138 * Stop the heartbeat monitor thread. 2139 */ 2140 private synchronized void stopRSHeartBeatMonitoring() 2141 { 2142 if (heartbeatMonitor != null) 2143 { 2144 heartbeatMonitor.shutdown(); 2145 heartbeatMonitor = null; 2146 } 2147 } 2148 2149 /** 2150 * Restart the ReplicationBroker. 2151 * @param infiniteTry the socket which failed 2152 */ 2153 public void reStart(boolean infiniteTry) 2154 { 2155 reStart(connectedRS.get().session, infiniteTry); 2156 } 2157 2158 /** 2159 * Restart the ReplicationServer broker after a failure. 2160 * 2161 * @param failingSession the socket which failed 2162 * @param infiniteTry the socket which failed 2163 */ 2164 private void reStart(Session failingSession, boolean infiniteTry) 2165 { 2166 if (failingSession != null) 2167 { 2168 failingSession.close(); 2169 numLostConnections++; 2170 } 2171 2172 ConnectedRS rs = connectedRS.get(); 2173 if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS())) 2174 { 2175 rs = setConnectedRS(ConnectedRS.noConnectedRS()); 2176 } 2177 2178 while (true) 2179 { 2180 // Synchronize inside the loop in order to allow shutdown. 2181 synchronized (startStopLock) 2182 { 2183 if (rs.isConnected() || shutdown) 2184 { 2185 break; 2186 } 2187 2188 try 2189 { 2190 connectAsDataServer(); 2191 rs = connectedRS.get(); 2192 } 2193 catch (Exception e) 2194 { 2195 logger.error(NOTE_EXCEPTION_RESTARTING_SESSION, 2196 getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 2197 } 2198 2199 if (rs.isConnected() || !infiniteTry) 2200 { 2201 break; 2202 } 2203 } 2204 try 2205 { 2206 Thread.sleep(500); 2207 } 2208 catch (InterruptedException ignored) 2209 { 2210 // ignore 2211 } 2212 } 2213 2214 if (logger.isTraceEnabled()) 2215 { 2216 debugInfo("end restart : connected=" + rs.isConnected() + " with RS(" 2217 + rs.getServerId() + ") genId=" + getGenerationID()); 2218 } 2219 } 2220 2221 /** 2222 * Publish a message to the other servers. 2223 * @param msg the message to publish 2224 */ 2225 public void publish(ReplicationMsg msg) 2226 { 2227 publish(msg, false, true); 2228 } 2229 2230 /** 2231 * Publish a message to the other servers. 2232 * @param msg The message to publish. 2233 * @param retryOnFailure Whether reconnect should automatically be done. 2234 * @return Whether publish succeeded. 2235 */ 2236 boolean publish(ReplicationMsg msg, boolean retryOnFailure) 2237 { 2238 return publish(msg, false, retryOnFailure); 2239 } 2240 2241 /** 2242 * Publish a recovery message to the other servers. 2243 * @param msg the message to publish 2244 */ 2245 public void publishRecovery(ReplicationMsg msg) 2246 { 2247 publish(msg, true, true); 2248 } 2249 2250 /** 2251 * Publish a message to the other servers. 2252 * @param msg the message to publish 2253 * @param recoveryMsg the message is a recovery LocalizableMessage 2254 * @param retryOnFailure whether retry should be done on failure 2255 * @return whether the message was successfully sent. 2256 */ 2257 private boolean publish(ReplicationMsg msg, boolean recoveryMsg, 2258 boolean retryOnFailure) 2259 { 2260 boolean done = false; 2261 2262 while (!done && !shutdown) 2263 { 2264 if (connectionError) 2265 { 2266 /* 2267 It was not possible to connect to any replication server. 2268 Since the operation was already processed, we have no other 2269 choice than to return without sending the ReplicationMsg 2270 and relying on the resend procedure of the connect phase to 2271 fix the problem when we finally connect. 2272 */ 2273 2274 if (logger.isTraceEnabled()) 2275 { 2276 debugInfo("publish(): Publishing a message is not possible due to" 2277 + " existing connection error."); 2278 } 2279 2280 return false; 2281 } 2282 2283 try 2284 { 2285 /* 2286 save the session at the time when we acquire the 2287 sendwindow credit so that we can make sure later 2288 that the session did not change in between. 2289 This is necessary to make sure that we don't publish a message 2290 on a session with a credit that was acquired from a previous 2291 session. 2292 */ 2293 Session currentSession; 2294 Semaphore currentWindowSemaphore; 2295 synchronized (connectPhaseLock) 2296 { 2297 currentSession = connectedRS.get().session; 2298 currentWindowSemaphore = sendWindow; 2299 } 2300 2301 /* 2302 If the Replication domain has decided that there is a need to 2303 recover some changes then it is not allowed to send this 2304 change but it will be the responsibility of the recovery thread to 2305 do it. 2306 */ 2307 if (!recoveryMsg & connectRequiresRecovery) 2308 { 2309 return false; 2310 } 2311 2312 boolean credit; 2313 if (msg instanceof UpdateMsg) 2314 { 2315 /* 2316 Acquiring the window credit must be done outside of the 2317 connectPhaseLock because it can be blocking and we don't 2318 want to hold off reconnection in case the connection dropped. 2319 */ 2320 credit = 2321 currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); 2322 } 2323 else 2324 { 2325 credit = true; 2326 } 2327 2328 if (credit) 2329 { 2330 synchronized (connectPhaseLock) 2331 { 2332 /* 2333 session may have been set to null in the connection phase 2334 when restarting the broker for example. 2335 Check the session. If it has changed, some disconnection or 2336 reconnection happened and we need to restart from scratch. 2337 */ 2338 final Session session = connectedRS.get().session; 2339 if (session != null && session == currentSession) 2340 { 2341 session.publish(msg); 2342 done = true; 2343 } 2344 } 2345 } 2346 if (!credit && currentWindowSemaphore.availablePermits() == 0) 2347 { 2348 synchronized (connectPhaseLock) 2349 { 2350 /* 2351 the window is still closed. 2352 Send a WindowProbeMsg message to wake up the receiver in case the 2353 window update message was lost somehow... 2354 then loop to check again if connection was closed. 2355 */ 2356 Session session = connectedRS.get().session; 2357 if (session != null) 2358 { 2359 session.publish(new WindowProbeMsg()); 2360 } 2361 } 2362 } 2363 } 2364 catch (IOException e) 2365 { 2366 if (logger.isTraceEnabled()) 2367 { 2368 debugInfo("publish(): IOException caught: " 2369 + stackTraceToSingleLineString(e)); 2370 } 2371 if (!retryOnFailure) 2372 { 2373 return false; 2374 } 2375 2376 // The receive threads should handle reconnection or 2377 // mark this broker in error. Just retry. 2378 synchronized (connectPhaseLock) 2379 { 2380 try 2381 { 2382 connectPhaseLock.wait(100); 2383 } 2384 catch (InterruptedException ignored) 2385 { 2386 if (logger.isTraceEnabled()) 2387 { 2388 debugInfo("publish(): InterruptedException caught 1: " 2389 + stackTraceToSingleLineString(ignored)); 2390 } 2391 } 2392 } 2393 } 2394 catch (InterruptedException ignored) 2395 { 2396 // just loop. 2397 if (logger.isTraceEnabled()) 2398 { 2399 debugInfo("publish(): InterruptedException caught 2: " 2400 + stackTraceToSingleLineString(ignored)); 2401 } 2402 } 2403 } 2404 return true; 2405 } 2406 2407 /** 2408 * Receive a message. 2409 * This method is not thread-safe and should either always be 2410 * called in a single thread or protected by a locking mechanism 2411 * before being called. This is a wrapper to the method with a boolean version 2412 * so that we do not have to modify existing tests. 2413 * 2414 * @return the received message 2415 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2416 * has expired 2417 */ 2418 public ReplicationMsg receive() throws SocketTimeoutException 2419 { 2420 return receive(false, true, false); 2421 } 2422 2423 /** 2424 * Receive a message. 2425 * This method is not thread-safe and should either always be 2426 * called in a single thread or protected by a locking mechanism 2427 * before being called. 2428 * 2429 * @param reconnectToTheBestRS Whether broker will automatically switch 2430 * to the best suitable RS. 2431 * @param reconnectOnFailure Whether broker will automatically reconnect 2432 * on failure. 2433 * @param returnOnTopoChange Whether broker should return TopologyMsg 2434 * received. 2435 * @return the received message 2436 * 2437 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2438 * has expired 2439 */ 2440 ReplicationMsg receive(boolean reconnectToTheBestRS, 2441 boolean reconnectOnFailure, boolean returnOnTopoChange) 2442 throws SocketTimeoutException 2443 { 2444 while (!shutdown) 2445 { 2446 ConnectedRS rs = connectedRS.get(); 2447 if (reconnectOnFailure && !rs.isConnected()) 2448 { 2449 // infinite try to reconnect 2450 reStart(null, true); 2451 continue; 2452 } 2453 2454 // Save session information for later in case we need it for log messages 2455 // after the session has been closed and/or failed. 2456 if (rs.session == null) 2457 { 2458 // Must be shutting down. 2459 break; 2460 } 2461 2462 final int serverId = getServerId(); 2463 final DN baseDN = getBaseDN(); 2464 final int previousRsServerID = rs.getServerId(); 2465 try 2466 { 2467 ReplicationMsg msg = rs.session.receive(); 2468 if (msg instanceof UpdateMsg && !(msg instanceof ReplicaOfflineMsg)) 2469 { 2470 synchronized (this) 2471 { 2472 rcvWindow--; 2473 } 2474 } 2475 if (msg instanceof WindowMsg) 2476 { 2477 final WindowMsg windowMsg = (WindowMsg) msg; 2478 sendWindow.release(windowMsg.getNumAck()); 2479 } 2480 else if (msg instanceof TopologyMsg) 2481 { 2482 final TopologyMsg topoMsg = (TopologyMsg) msg; 2483 receiveTopo(topoMsg, getRsServerId()); 2484 if (reconnectToTheBestRS) 2485 { 2486 // Reset wait time before next computation of best server 2487 mustRunBestServerCheckingAlgorithm = 0; 2488 } 2489 2490 // Caller wants to check what's changed 2491 if (returnOnTopoChange) 2492 { 2493 return msg; 2494 } 2495 } 2496 else if (msg instanceof StopMsg) 2497 { 2498 // RS performs a proper disconnection 2499 logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer, 2500 serverId, baseDN); 2501 2502 // Try to find a suitable RS 2503 reStart(rs.session, true); 2504 } 2505 else if (msg instanceof MonitorMsg) 2506 { 2507 // This is the response to a MonitorRequest that was sent earlier or 2508 // the regular message of the monitoring publisher of the RS. 2509 MonitorMsg monitorMsg = (MonitorMsg) msg; 2510 2511 // Extract and store replicas ServerStates 2512 final Map<Integer, ServerState> newReplicaStates = new HashMap<>(); 2513 for (int srvId : toIterable(monitorMsg.ldapIterator())) 2514 { 2515 newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); 2516 } 2517 replicaStates = newReplicaStates; 2518 2519 // Notify the sender that the response was received. 2520 synchronized (monitorResponse) 2521 { 2522 monitorResponse.set(true); 2523 monitorResponse.notify(); 2524 } 2525 2526 // Update the replication servers ServerStates with new received info 2527 Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos; 2528 for (int srvId : toIterable(monitorMsg.rsIterator())) 2529 { 2530 final ReplicationServerInfo rsInfo = rsInfos.get(srvId); 2531 if (rsInfo != null) 2532 { 2533 rsInfo.update(monitorMsg.getRSServerState(srvId)); 2534 } 2535 } 2536 2537 /* 2538 Now if it is allowed, compute the best replication server to see if 2539 it is still the one we are currently connected to. If not, 2540 disconnect properly and let the connection algorithm re-connect to 2541 best replication server 2542 */ 2543 if (reconnectToTheBestRS) 2544 { 2545 mustRunBestServerCheckingAlgorithm++; 2546 if (mustRunBestServerCheckingAlgorithm == 2) 2547 { 2548 // Stable topology (no topo msg since few seconds): proceed with 2549 // best server checking. 2550 final RSEvaluations evals = computeBestReplicationServer( 2551 false, previousRsServerID, state, 2552 rsInfos, serverId, getGroupId(), getGenerationID()); 2553 final ReplicationServerInfo bestServerInfo = evals.getBestRS(); 2554 if (previousRsServerID != -1 2555 && (bestServerInfo == null 2556 || bestServerInfo.getServerId() != previousRsServerID)) 2557 { 2558 // The best replication server is no more the one we are 2559 // currently using. Disconnect properly then reconnect. 2560 LocalizableMessage message; 2561 if (bestServerInfo == null) 2562 { 2563 message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( 2564 serverId, previousRsServerID, rs.replicationServer, baseDN); 2565 } 2566 else 2567 { 2568 final int bestRsServerId = bestServerInfo.getServerId(); 2569 message = NOTE_NEW_BEST_REPLICATION_SERVER.get( 2570 serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN, 2571 evals.getEvaluation(previousRsServerID), 2572 evals.getEvaluation(bestRsServerId)); 2573 } 2574 logger.info(message); 2575 if (logger.isTraceEnabled()) 2576 { 2577 debugInfo("best replication servers evaluation results: " + evals); 2578 } 2579 reStart(true); 2580 } 2581 2582 // Reset wait time before next computation of best server 2583 mustRunBestServerCheckingAlgorithm = 0; 2584 } 2585 } 2586 } 2587 else 2588 { 2589 return msg; 2590 } 2591 } 2592 catch (SocketTimeoutException e) 2593 { 2594 throw e; 2595 } 2596 catch (Exception e) 2597 { 2598 logger.traceException(e); 2599 2600 if (!shutdown) 2601 { 2602 if (rs.session == null || !rs.session.closeInitiated()) 2603 { 2604 // We did not initiate the close on our side, log an error message. 2605 logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED, 2606 serverId, baseDN, previousRsServerID, rs.replicationServer); 2607 } 2608 2609 if (!reconnectOnFailure) 2610 { 2611 break; // does not seem necessary to explicitly disconnect .. 2612 } 2613 2614 reStart(rs.session, true); 2615 } 2616 } 2617 } // while !shutdown 2618 return null; 2619 } 2620 2621 /** 2622 * Gets the States of all the Replicas currently in the Topology. When this 2623 * method is called, a Monitoring message will be sent to the Replication 2624 * Server to which this domain is currently connected so that it computes a 2625 * table containing information about all Directory Servers in the topology. 2626 * This Computation involves communications will all the servers currently 2627 * connected and 2628 * 2629 * @return The States of all Replicas in the topology (except us) 2630 */ 2631 public Map<Integer, ServerState> getReplicaStates() 2632 { 2633 monitorResponse.set(false); 2634 2635 // publish Monitor Request LocalizableMessage to the Replication Server 2636 publish(new MonitorRequestMsg(getServerId(), getRsServerId())); 2637 2638 // wait for Response up to 10 seconds. 2639 try 2640 { 2641 synchronized (monitorResponse) 2642 { 2643 if (!monitorResponse.get()) 2644 { 2645 monitorResponse.wait(10000); 2646 } 2647 } 2648 } catch (InterruptedException e) 2649 { 2650 Thread.currentThread().interrupt(); 2651 } 2652 return replicaStates; 2653 } 2654 2655 /** 2656 * This method allows to do the necessary computing for the window 2657 * management after treatment by the worker threads. 2658 * 2659 * This should be called once the replay thread have done their job 2660 * and the window can be open again. 2661 */ 2662 public synchronized void updateWindowAfterReplay() 2663 { 2664 try 2665 { 2666 updateDoneCount++; 2667 final Session session = connectedRS.get().session; 2668 if (updateDoneCount >= halfRcvWindow && session != null) 2669 { 2670 session.publish(new WindowMsg(updateDoneCount)); 2671 rcvWindow += updateDoneCount; 2672 updateDoneCount = 0; 2673 } 2674 } catch (IOException e) 2675 { 2676 // Any error on the socket will be handled by the thread calling receive() 2677 // just ignore. 2678 } 2679 } 2680 2681 /** Stop the server. */ 2682 public void stop() 2683 { 2684 if (logger.isTraceEnabled() && !shutdown) 2685 { 2686 debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")"); 2687 } 2688 2689 synchronized (startStopLock) 2690 { 2691 if (shutdown) 2692 { 2693 return; 2694 } 2695 domain.publishReplicaOfflineMsg(); 2696 shutdown = true; 2697 setConnectedRS(ConnectedRS.stopped()); 2698 stopRSHeartBeatMonitoring(); 2699 stopChangeTimeHeartBeatPublishing(); 2700 deregisterReplicationMonitor(); 2701 } 2702 } 2703 2704 /** 2705 * Set a timeout value. 2706 * With this option set to a non-zero value, calls to the receive() method 2707 * block for only this amount of time after which a 2708 * java.net.SocketTimeoutException is raised. 2709 * The Broker is valid and usable even after such an Exception is raised. 2710 * 2711 * @param timeout the specified timeout, in milliseconds. 2712 * @throws SocketException if there is an error in the underlying protocol, 2713 * such as a TCP error. 2714 */ 2715 public void setSoTimeout(int timeout) throws SocketException 2716 { 2717 this.timeout = timeout; 2718 final Session session = connectedRS.get().session; 2719 if (session != null) 2720 { 2721 session.setSoTimeout(timeout); 2722 } 2723 } 2724 2725 /** 2726 * Get the name of the replicationServer to which this broker is currently 2727 * connected. 2728 * 2729 * @return the name of the replicationServer to which this domain 2730 * is currently connected. 2731 */ 2732 public String getReplicationServer() 2733 { 2734 return connectedRS.get().replicationServer; 2735 } 2736 2737 /** 2738 * Get the maximum receive window size. 2739 * 2740 * @return The maximum receive window size. 2741 */ 2742 public int getMaxRcvWindow() 2743 { 2744 return config.getWindowSize(); 2745 } 2746 2747 /** 2748 * Get the current receive window size. 2749 * 2750 * @return The current receive window size. 2751 */ 2752 public int getCurrentRcvWindow() 2753 { 2754 return rcvWindow; 2755 } 2756 2757 /** 2758 * Get the maximum send window size. 2759 * 2760 * @return The maximum send window size. 2761 */ 2762 public int getMaxSendWindow() 2763 { 2764 return maxSendWindow; 2765 } 2766 2767 /** 2768 * Get the current send window size. 2769 * 2770 * @return The current send window size. 2771 */ 2772 public int getCurrentSendWindow() 2773 { 2774 if (isConnected()) 2775 { 2776 return sendWindow.availablePermits(); 2777 } 2778 return 0; 2779 } 2780 2781 /** 2782 * Get the number of times the connection was lost. 2783 * @return The number of times the connection was lost. 2784 */ 2785 public int getNumLostConnections() 2786 { 2787 return numLostConnections; 2788 } 2789 2790 /** 2791 * Change some configuration parameters. 2792 * 2793 * @param newConfig The new config to use. 2794 * @return A boolean indicating if the changes 2795 * requires to restart the service. 2796 */ 2797 boolean changeConfig(ReplicationDomainCfg newConfig) 2798 { 2799 // These parameters needs to be renegotiated with the ReplicationServer 2800 // so if they have changed, that requires restarting the session with 2801 // the ReplicationServer. 2802 // A new session is necessary only when information regarding 2803 // the connection is modified 2804 boolean needToRestartSession = 2805 !newConfig.getReplicationServer().equals(config.getReplicationServer()) 2806 || newConfig.getWindowSize() != config.getWindowSize() 2807 || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval() 2808 || newConfig.getGroupId() != config.getGroupId(); 2809 2810 this.config = newConfig; 2811 this.rcvWindow = newConfig.getWindowSize(); 2812 this.halfRcvWindow = this.rcvWindow / 2; 2813 2814 return needToRestartSession; 2815 } 2816 2817 /** 2818 * Get the version of the replication protocol. 2819 * @return The version of the replication protocol. 2820 */ 2821 public short getProtocolVersion() 2822 { 2823 final Session session = connectedRS.get().session; 2824 if (session != null) 2825 { 2826 return session.getProtocolVersion(); 2827 } 2828 return ProtocolVersion.getCurrentVersion(); 2829 } 2830 2831 /** 2832 * Check if the broker is connected to a ReplicationServer and therefore 2833 * ready to received and send Replication Messages. 2834 * 2835 * @return true if the server is connected, false if not. 2836 */ 2837 public boolean isConnected() 2838 { 2839 return connectedRS.get().isConnected(); 2840 } 2841 2842 /** 2843 * Determine whether the connection to the replication server is encrypted. 2844 * @return true if the connection is encrypted, false otherwise. 2845 */ 2846 public boolean isSessionEncrypted() 2847 { 2848 final Session session = connectedRS.get().session; 2849 return session != null ? session.isEncrypted() : false; 2850 } 2851 2852 /** 2853 * Signals the RS we just entered a new status. 2854 * @param newStatus The status the local DS just entered 2855 */ 2856 public void signalStatusChange(ServerStatus newStatus) 2857 { 2858 try 2859 { 2860 connectedRS.get().session.publish( 2861 new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus)); 2862 } catch (IOException ex) 2863 { 2864 logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(), 2865 ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); 2866 } 2867 } 2868 2869 /** 2870 * Gets the info for DSs in the topology (except us). 2871 * @return The info for DSs in the topology (except us) 2872 */ 2873 public Map<Integer, DSInfo> getReplicaInfos() 2874 { 2875 return topology.get().replicaInfos; 2876 } 2877 2878 /** 2879 * Gets the info for RSs in the topology (except the one we are connected 2880 * to). 2881 * @return The info for RSs in the topology (except the one we are connected 2882 * to) 2883 */ 2884 public List<RSInfo> getRsInfos() 2885 { 2886 return toRSInfos(topology.get().rsInfos); 2887 } 2888 2889 private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos) 2890 { 2891 final List<RSInfo> result = new ArrayList<>(); 2892 for (ReplicationServerInfo rsInfo : rsInfos.values()) 2893 { 2894 result.add(rsInfo.toRSInfo()); 2895 } 2896 return result; 2897 } 2898 2899 /** 2900 * Processes an incoming TopologyMsg. 2901 * Updates the structures for the local view of the topology. 2902 * 2903 * @param topoMsg 2904 * The topology information received from RS. 2905 * @param rsServerId 2906 * the serverId to use for the connectedDS 2907 */ 2908 private void receiveTopo(TopologyMsg topoMsg, int rsServerId) 2909 { 2910 final Topology newTopo = computeNewTopology(topoMsg, rsServerId); 2911 for (DSInfo dsInfo : newTopo.replicaInfos.values()) 2912 { 2913 domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo 2914 .getEclIncludesForDeletes()); 2915 } 2916 } 2917 2918 private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId) 2919 { 2920 Topology oldTopo; 2921 Topology newTopo; 2922 do 2923 { 2924 oldTopo = topology.get(); 2925 newTopo = new Topology(topoMsg, getServerId(), rsServerId, 2926 getReplicationServerUrls(), oldTopo.rsInfos); 2927 } 2928 while (!topology.compareAndSet(oldTopo, newTopo)); 2929 2930 if (logger.isTraceEnabled()) 2931 { 2932 final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo); 2933 sb.append(" received TopologyMsg=").append(topoMsg); 2934 debugInfo(sb); 2935 } 2936 return newTopo; 2937 } 2938 2939 /** 2940 * Contains the last known state of the replication topology. 2941 */ 2942 static final class Topology 2943 { 2944 2945 /** 2946 * The RS's serverId that this DS was connected to when this topology state 2947 * was computed. 2948 */ 2949 private final int rsServerId; 2950 /** 2951 * Info for other DSs. 2952 * <p> 2953 * Warning: does not contain info for us (for our server id) 2954 */ 2955 final Map<Integer, DSInfo> replicaInfos; 2956 /** 2957 * The map of replication server info initialized at connection time and 2958 * regularly updated. This is used to decide to which best suitable 2959 * replication server one wants to connect. Key: replication server id 2960 * Value: replication server info for the matching replication server id 2961 */ 2962 final Map<Integer, ReplicationServerInfo> rsInfos; 2963 2964 private Topology() 2965 { 2966 this.rsServerId = -1; 2967 this.replicaInfos = Collections.emptyMap(); 2968 this.rsInfos = Collections.emptyMap(); 2969 } 2970 2971 /** 2972 * Constructor to use when only the RSInfos need to be recomputed. 2973 * 2974 * @param dsInfosToKeep 2975 * the DSInfos that will be stored as is 2976 * @param newRSInfos 2977 * the new RSInfos from which to compute the new topology 2978 * @param dsServerId 2979 * the DS serverId 2980 * @param rsServerId 2981 * the current connected RS serverId 2982 * @param configuredReplicationServerUrls 2983 * the configured replication server URLs 2984 * @param previousRsInfos 2985 * the RSInfos computed in the previous Topology object 2986 */ 2987 Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos, 2988 int dsServerId, int rsServerId, 2989 Set<String> configuredReplicationServerUrls, 2990 Map<Integer, ReplicationServerInfo> previousRsInfos) 2991 { 2992 this.rsServerId = rsServerId; 2993 this.replicaInfos = dsInfosToKeep == null 2994 ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; 2995 this.rsInfos = computeRSInfos(dsServerId, newRSInfos, 2996 previousRsInfos, configuredReplicationServerUrls); 2997 } 2998 2999 /** 3000 * Constructor to use when a new TopologyMsg has been received. 3001 * 3002 * @param topoMsg 3003 * the topology message containing the new DSInfos and RSInfos from 3004 * which to compute the new topology 3005 * @param dsServerId 3006 * the DS serverId 3007 * @param rsServerId 3008 * the current connected RS serverId 3009 * @param configuredReplicationServerUrls 3010 * the configured replication server URLs 3011 * @param previousRsInfos 3012 * the RSInfos computed in the previous Topology object 3013 */ 3014 Topology(TopologyMsg topoMsg, int dsServerId, 3015 int rsServerId, Set<String> configuredReplicationServerUrls, 3016 Map<Integer, ReplicationServerInfo> previousRsInfos) 3017 { 3018 this.rsServerId = rsServerId; 3019 this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId); 3020 this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(), 3021 previousRsInfos, configuredReplicationServerUrls); 3022 } 3023 3024 private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos, 3025 int dsServerId) 3026 { 3027 final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos); 3028 copy.remove(dsServerId); 3029 return Collections.unmodifiableMap(copy); 3030 } 3031 3032 private Map<Integer, ReplicationServerInfo> computeRSInfos( 3033 int dsServerId, List<RSInfo> newRsInfos, 3034 Map<Integer, ReplicationServerInfo> previousRsInfos, 3035 Set<String> configuredReplicationServerUrls) 3036 { 3037 final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos); 3038 3039 // Update replication server info list with the received topology info 3040 final Set<Integer> rssToKeep = new HashSet<>(); 3041 for (RSInfo newRSInfo : newRsInfos) 3042 { 3043 final int rsId = newRSInfo.getId(); 3044 rssToKeep.add(rsId); // Mark this server as still existing 3045 Set<Integer> connectedDSs = 3046 computeDSsConnectedTo(rsId, dsServerId); 3047 ReplicationServerInfo rsInfo = results.get(rsId); 3048 if (rsInfo == null) 3049 { 3050 // New replication server, create info for it add it to the list 3051 rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs); 3052 setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls); 3053 results.put(rsId, rsInfo); 3054 } 3055 else 3056 { 3057 // Update the existing info for the replication server 3058 rsInfo.update(newRSInfo, connectedDSs); 3059 } 3060 } 3061 3062 // Remove any replication server that may have disappeared from the 3063 // topology 3064 results.keySet().retainAll(rssToKeep); 3065 3066 return Collections.unmodifiableMap(results); 3067 } 3068 3069 /** Computes the list of DSs connected to a particular RS. */ 3070 private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId) 3071 { 3072 final Set<Integer> connectedDSs = new HashSet<>(); 3073 if (rsServerId == rsId) 3074 { 3075 /* 3076 * If we are computing connected DSs for the RS we are connected to, we 3077 * should count the local DS as the DSInfo of the local DS is not sent 3078 * by the replication server in the topology message. We must count 3079 * ourselves as a connected server. 3080 */ 3081 connectedDSs.add(dsServerId); 3082 } 3083 3084 for (DSInfo dsInfo : replicaInfos.values()) 3085 { 3086 if (dsInfo.getRsId() == rsId) 3087 { 3088 connectedDSs.add(dsInfo.getDsId()); 3089 } 3090 } 3091 3092 return connectedDSs; 3093 } 3094 3095 /** 3096 * Sets the locally configured flag for the passed ReplicationServerInfo 3097 * object, analyzing the local configuration. 3098 * 3099 * @param rsInfo 3100 * the Replication server to check and update 3101 * @param configuredReplicationServerUrls 3102 */ 3103 private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo, 3104 Set<String> configuredReplicationServerUrls) 3105 { 3106 // Determine if the passed ReplicationServerInfo has a URL that is present 3107 // in the locally configured replication servers 3108 String rsUrl = rsInfo.getServerURL(); 3109 if (rsUrl == null) 3110 { 3111 // The ReplicationServerInfo has been generated from a server with 3112 // no URL in TopologyMsg (i.e: with replication protocol version < 4): 3113 // ignore this server as we do not know how to connect to it 3114 rsInfo.setLocallyConfigured(false); 3115 return; 3116 } 3117 for (String serverUrl : configuredReplicationServerUrls) 3118 { 3119 if (isSameReplicationServerUrl(serverUrl, rsUrl)) 3120 { 3121 // This RS is locally configured, mark this 3122 rsInfo.setLocallyConfigured(true); 3123 rsInfo.setServerURL(serverUrl); 3124 return; 3125 } 3126 } 3127 rsInfo.setLocallyConfigured(false); 3128 } 3129 3130 /** {@inheritDoc} */ 3131 @Override 3132 public boolean equals(Object obj) 3133 { 3134 if (this == obj) 3135 { 3136 return true; 3137 } 3138 if (obj == null || getClass() != obj.getClass()) 3139 { 3140 return false; 3141 } 3142 final Topology other = (Topology) obj; 3143 return rsServerId == other.rsServerId 3144 && Objects.equals(replicaInfos, other.replicaInfos) 3145 && Objects.equals(rsInfos, other.rsInfos) 3146 && urlsEqual1(replicaInfos, other.replicaInfos) 3147 && urlsEqual2(rsInfos, other.rsInfos); 3148 } 3149 3150 private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1, 3151 Map<Integer, DSInfo> replicaInfos2) 3152 { 3153 for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet()) 3154 { 3155 DSInfo dsInfo = replicaInfos2.get(entry.getKey()); 3156 if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl())) 3157 { 3158 return false; 3159 } 3160 } 3161 return true; 3162 } 3163 3164 private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1, 3165 Map<Integer, ReplicationServerInfo> rsInfos2) 3166 { 3167 for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet()) 3168 { 3169 ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey()); 3170 if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL())) 3171 { 3172 return false; 3173 } 3174 } 3175 return true; 3176 } 3177 3178 /** {@inheritDoc} */ 3179 @Override 3180 public int hashCode() 3181 { 3182 final int prime = 31; 3183 int result = 1; 3184 result = prime * result + rsServerId; 3185 result = prime * result 3186 + (replicaInfos == null ? 0 : replicaInfos.hashCode()); 3187 result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode()); 3188 return result; 3189 } 3190 3191 /** {@inheritDoc} */ 3192 @Override 3193 public String toString() 3194 { 3195 return getClass().getSimpleName() 3196 + " rsServerId=" + rsServerId 3197 + ", replicaInfos=" + replicaInfos.values() 3198 + ", rsInfos=" + rsInfos.values(); 3199 } 3200 } 3201 3202 /** 3203 * Check if the broker could not find any Replication Server and therefore 3204 * connection attempt failed. 3205 * 3206 * @return true if the server could not connect to any Replication Server. 3207 */ 3208 boolean hasConnectionError() 3209 { 3210 return connectionError; 3211 } 3212 3213 /** 3214 * Starts publishing to the RS the current timestamp used in this server. 3215 */ 3216 private void startChangeTimeHeartBeatPublishing(ConnectedRS rs) 3217 { 3218 // Start a CSN heartbeat thread. 3219 long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); 3220 if (changeTimeHeartbeatInterval > 0) 3221 { 3222 final String threadName = "Replica DS(" + getServerId() 3223 + ") change time heartbeat publisher for domain \"" + getBaseDN() 3224 + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer; 3225 3226 ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( 3227 threadName, rs.session, changeTimeHeartbeatInterval, getServerId()); 3228 ctHeartbeatPublisherThread.start(); 3229 } 3230 else 3231 { 3232 if (logger.isTraceEnabled()) 3233 { 3234 debugInfo("is not configured to send CSN heartbeat interval"); 3235 } 3236 } 3237 } 3238 3239 /** 3240 * Stops publishing to the RS the current timestamp used in this server. 3241 */ 3242 private synchronized void stopChangeTimeHeartBeatPublishing() 3243 { 3244 if (ctHeartbeatPublisherThread != null) 3245 { 3246 ctHeartbeatPublisherThread.shutdown(); 3247 ctHeartbeatPublisherThread = null; 3248 } 3249 } 3250 3251 /** 3252 * Set the connectRequiresRecovery to the provided value. 3253 * This flag is used to indicate if a recovery of Update is necessary 3254 * after a reconnection to a RS. 3255 * It is the responsibility of the ReplicationDomain to set it during the 3256 * sessionInitiated phase. 3257 * 3258 * @param b the new value of the connectRequiresRecovery. 3259 */ 3260 public void setRecoveryRequired(boolean b) 3261 { 3262 connectRequiresRecovery = b; 3263 } 3264 3265 /** 3266 * Returns whether the broker is shutting down. 3267 * @return whether the broker is shutting down. 3268 */ 3269 boolean shuttingDown() 3270 { 3271 return shutdown; 3272 } 3273 3274 /** 3275 * Returns the local address of this replication domain, or the empty string 3276 * if it is not yet connected. 3277 * 3278 * @return The local address. 3279 */ 3280 String getLocalUrl() 3281 { 3282 final Session session = connectedRS.get().session; 3283 return session != null ? session.getLocalUrl() : ""; 3284 } 3285 3286 /** 3287 * Returns the replication monitor instance name associated with this broker. 3288 * 3289 * @return The replication monitor instance name. 3290 */ 3291 String getReplicationMonitorInstanceName() 3292 { 3293 // Only invoked by replication domain so always non-null. 3294 return monitor.getMonitorInstanceName(); 3295 } 3296 3297 private ConnectedRS setConnectedRS(final ConnectedRS newRS) 3298 { 3299 final ConnectedRS oldRS = connectedRS.getAndSet(newRS); 3300 if (!oldRS.equals(newRS) && oldRS.session != null) 3301 { 3302 // monitor name is changing, deregister before registering again 3303 deregisterReplicationMonitor(); 3304 oldRS.session.close(); 3305 registerReplicationMonitor(); 3306 } 3307 return newRS; 3308 } 3309 3310 /** 3311 * Must be invoked each time the session changes because, the monitor name is 3312 * dynamically created with the session name, while monitor registration is 3313 * static. 3314 * 3315 * @see #monitor 3316 */ 3317 private void registerReplicationMonitor() 3318 { 3319 // The monitor should not be registered if this is a unit test 3320 // because the replication domain is null. 3321 if (monitor != null) 3322 { 3323 DirectoryServer.registerMonitorProvider(monitor); 3324 } 3325 } 3326 3327 private void deregisterReplicationMonitor() 3328 { 3329 // The monitor should not be deregistered if this is a unit test 3330 // because the replication domain is null. 3331 if (monitor != null) 3332 { 3333 DirectoryServer.deregisterMonitorProvider(monitor); 3334 } 3335 } 3336 3337 /** {@inheritDoc} */ 3338 @Override 3339 public String toString() 3340 { 3341 final StringBuilder sb = new StringBuilder(); 3342 sb.append(getClass().getSimpleName()) 3343 .append(" \"").append(getBaseDN()).append(" ") 3344 .append(getServerId()).append("\",") 3345 .append(" groupId=").append(getGroupId()) 3346 .append(", genId=").append(getGenerationID()) 3347 .append(", "); 3348 connectedRS.get().toString(sb); 3349 return sb.toString(); 3350 } 3351 3352 private void debugInfo(CharSequence message) 3353 { 3354 logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN() 3355 + " and serverId=" + getServerId() + ": " + message); 3356 } 3357}