001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2017 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import static org.opends.messages.ReplicationMessages.*; 020 021import java.io.IOException; 022import java.util.Random; 023import java.util.concurrent.Semaphore; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.forgerock.i18n.LocalizableMessage; 028import org.forgerock.i18n.slf4j.LocalizedLogger; 029import org.forgerock.opendj.config.server.ConfigException; 030import org.forgerock.opendj.ldap.ResultCode; 031import org.opends.server.api.MonitorData; 032import org.forgerock.opendj.server.config.server.MonitorProviderCfg; 033import org.opends.server.core.DirectoryServer; 034import org.opends.server.replication.common.AssuredMode; 035import org.opends.server.replication.common.CSN; 036import org.opends.server.replication.common.RSInfo; 037import org.opends.server.replication.common.ServerStatus; 038import org.opends.server.replication.protocol.AckMsg; 039import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; 040import org.opends.server.replication.protocol.HeartbeatThread; 041import org.opends.server.replication.protocol.MonitorMsg; 042import org.opends.server.replication.protocol.MonitorRequestMsg; 043import org.opends.server.replication.protocol.ProtocolVersion; 044import org.opends.server.replication.protocol.ReplServerStartMsg; 045import org.opends.server.replication.protocol.ReplicaOfflineMsg; 046import org.opends.server.replication.protocol.ReplicationMsg; 047import org.opends.server.replication.protocol.ResetGenerationIdMsg; 048import org.opends.server.replication.protocol.RoutableMsg; 049import org.opends.server.replication.protocol.Session; 050import org.opends.server.replication.protocol.StartMsg; 051import org.opends.server.replication.protocol.StartSessionMsg; 052import org.opends.server.replication.protocol.TopologyMsg; 053import org.opends.server.replication.protocol.UpdateMsg; 054import org.opends.server.replication.protocol.WindowMsg; 055import org.opends.server.replication.server.changelog.api.ChangelogException; 056import org.opends.server.types.DirectoryException; 057import org.opends.server.types.InitializationException; 058 059/** 060 * This class defines a server handler : 061 * - that is a MessageHandler (see this class for more details) 062 * - that handles all interaction with a peer server (RS or DS). 063 */ 064public abstract class ServerHandler extends MessageHandler 065{ 066 067 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 068 069 /** 070 * Time during which the server will wait for existing thread to stop 071 * during the shutdownWriter. 072 */ 073 private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; 074 075 /** 076 * The serverId of the remote server. 077 */ 078 protected int serverId; 079 /** 080 * The session opened with the remote server. 081 */ 082 protected final Session session; 083 084 /** 085 * The serverURL of the remote server. 086 */ 087 protected String serverURL; 088 /** 089 * Number of updates received from the server in assured safe read mode. 090 */ 091 private int assuredSrReceivedUpdates; 092 /** 093 * Number of updates received from the server in assured safe read mode that 094 * timed out. 095 */ 096 private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); 097 /** 098 * Number of updates sent to the server in assured safe read mode. 099 */ 100 private int assuredSrSentUpdates; 101 /** 102 * Number of updates sent to the server in assured safe read mode that timed 103 * out. 104 */ 105 private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); 106 /** 107 * Number of updates received from the server in assured safe data mode. 108 */ 109 private int assuredSdReceivedUpdates; 110 /** 111 * Number of updates received from the server in assured safe data mode that 112 * timed out. 113 */ 114 private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); 115 /** 116 * Number of updates sent to the server in assured safe data mode. 117 */ 118 private int assuredSdSentUpdates; 119 120 /** 121 * Number of updates sent to the server in assured safe data mode that timed out. 122 */ 123 private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); 124 125 /** 126 * The associated ServerWriter that sends messages to the remote server. 127 */ 128 private ServerWriter writer; 129 130 /** 131 * The associated ServerReader that receives messages from the remote server. 132 */ 133 private ServerReader reader; 134 135 /** Window. */ 136 private int rcvWindow; 137 private final int rcvWindowSizeHalf; 138 139 /** The size of the receiving window. */ 140 protected final int maxRcvWindow; 141 /** Semaphore that the writer uses to control the flow to the remote server. */ 142 private Semaphore sendWindow; 143 /** The initial size of the sending window. */ 144 private int sendWindowSize; 145 /** Remote generation id. */ 146 protected long generationId = -1; 147 /** The generation id of the hosting RS. */ 148 protected long localGenerationId = -1; 149 /** The generation id before processing a new start handshake. */ 150 protected long oldGenerationId = -1; 151 /** Group id of this remote server. */ 152 protected byte groupId = -1; 153 /** The SSL encryption after the negotiation with the peer. */ 154 protected boolean sslEncryption; 155 /** 156 * The time in milliseconds between heartbeats from the replication 157 * server. Zero means heartbeats are off. 158 */ 159 protected long heartbeatInterval; 160 161 /** The thread that will send heartbeats. */ 162 private HeartbeatThread heartbeatThread; 163 164 /** Set when ServerWriter is stopping. */ 165 private volatile boolean shutdownWriter; 166 167 /** Weight of this remote server. */ 168 protected int weight = 1; 169 170 /** 171 * Creates a new server handler instance with the provided socket. 172 * 173 * @param session The Session used by the ServerHandler to 174 * communicate with the remote entity. 175 * @param queueSize The maximum number of update that will be kept 176 * in memory by this ServerHandler. 177 * @param replicationServer The hosting replication server. 178 * @param rcvWindowSize The window size to receive from the remote server. 179 */ 180 public ServerHandler( 181 Session session, 182 int queueSize, 183 ReplicationServer replicationServer, 184 int rcvWindowSize) 185 { 186 super(queueSize, replicationServer); 187 this.session = session; 188 this.rcvWindowSizeHalf = rcvWindowSize / 2; 189 this.maxRcvWindow = rcvWindowSize; 190 this.rcvWindow = rcvWindowSize; 191 } 192 193 /** 194 * Abort a start procedure currently establishing. 195 * @param reason The provided reason. 196 */ 197 protected void abortStart(LocalizableMessage reason) 198 { 199 // We did not recognize the message, close session as what can happen after 200 // is undetermined and we do not want the server to be disturbed 201 Session localSession = session; 202 if (localSession != null) 203 { 204 if (reason != null) 205 { 206 if (logger.isTraceEnabled()) 207 { 208 logger.trace("In " + this + " closing session with err=" + reason); 209 } 210 logger.error(reason); 211 } 212 213 // This method is only called when aborting a failing handshake and 214 // not StopMsg should be sent in such situation. StopMsg are only 215 // expected when full handshake has been performed, or at end of 216 // handshake phase 1, when DS was just gathering available RS info 217 localSession.close(); 218 } 219 220 releaseDomainLock(); 221 222 // If generation id of domain was changed, set it back to old value 223 // We may have changed it as it was -1 and we received a value >0 from peer 224 // server and the last topo message sent may have failed being sent: in that 225 // case retrieve old value of generation id for replication server domain 226 if (oldGenerationId != -100) 227 { 228 replicationServerDomain.changeGenerationId(oldGenerationId); 229 } 230 } 231 232 /** 233 * Releases the lock on the replication server domain if it was held. 234 */ 235 protected void releaseDomainLock() 236 { 237 if (replicationServerDomain.hasLock()) 238 { 239 replicationServerDomain.release(); 240 } 241 } 242 243 /** 244 * Check the protocol window and send WindowMsg if necessary. 245 * 246 * @throws IOException when the session becomes unavailable. 247 */ 248 public synchronized void checkWindow() throws IOException 249 { 250 if (rcvWindow < rcvWindowSizeHalf) 251 { 252 WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); 253 session.publish(msg); 254 rcvWindow += rcvWindowSizeHalf; 255 } 256 } 257 258 /** 259 * Decrement the protocol window, then check if it is necessary 260 * to send a WindowMsg and send it. 261 * 262 * @throws IOException when the session becomes unavailable. 263 */ 264 private synchronized void decAndCheckWindow() throws IOException 265 { 266 rcvWindow--; 267 checkWindow(); 268 } 269 270 /** 271 * Finalize the initialization, create reader, writer, heartbeat system 272 * and monitoring system. 273 * @throws DirectoryException When an exception is raised. 274 */ 275 protected void finalizeStart() throws DirectoryException 276 { 277 // FIXME:ECL We should refactor so that a SH always have a session 278 if (session != null) 279 { 280 try 281 { 282 // Disable timeout for next communications 283 session.setSoTimeout(0); 284 } 285 catch(Exception e) 286 { /* do nothing */ 287 } 288 289 // sendWindow MUST be created before starting the writer 290 sendWindow = new Semaphore(sendWindowSize); 291 292 writer = new ServerWriter(session, this, replicationServerDomain, 293 replicationServer.getDSRSShutdownSync()); 294 reader = new ServerReader(session, this); 295 296 session.setName("Replication server RS(" + getReplicationServerId() 297 + ") session thread to " + this + " at " 298 + session.getReadableRemoteAddress()); 299 session.start(); 300 try 301 { 302 session.waitForStartup(); 303 } 304 catch (InterruptedException e) 305 { 306 final LocalizableMessage message = 307 ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName()); 308 throw new DirectoryException(ResultCode.OTHER, message, e); 309 } 310 reader.start(); 311 writer.start(); 312 313 // Create a thread to send heartbeat messages. 314 if (heartbeatInterval > 0) 315 { 316 String threadName = "Replication server RS(" + getReplicationServerId() 317 + ") heartbeat publisher to " + this + " at " 318 + session.getReadableRemoteAddress(); 319 heartbeatThread = new HeartbeatThread(threadName, session, 320 heartbeatInterval / 3); 321 heartbeatThread.start(); 322 } 323 } 324 325 DirectoryServer.deregisterMonitorProvider(this); 326 DirectoryServer.registerMonitorProvider(this); 327 } 328 329 /** 330 * Sends a message. 331 * 332 * @param msg 333 * The message to be sent. 334 * @throws IOException 335 * When it occurs while sending the message, 336 */ 337 public void send(ReplicationMsg msg) throws IOException 338 { 339 // avoid logging anything for unit tests that include a null domain. 340 if (logger.isTraceEnabled()) 341 { 342 logger.trace("In " 343 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 344 + this + " publishes message:\n" + msg); 345 } 346 session.publish(msg); 347 } 348 349 /** 350 * Get the age of the older change that has not yet been replicated 351 * to the server handled by this ServerHandler. 352 * @return The age if the older change has not yet been replicated 353 * to the server handled by this ServerHandler. 354 */ 355 public long getApproxFirstMissingDate() 356 { 357 // Get the older CSN received 358 CSN olderUpdateCSN = getOlderUpdateCSN(); 359 if (olderUpdateCSN != null) 360 { 361 // If not present in the local RS db, 362 // then approximate with the older update time 363 return olderUpdateCSN.getTime(); 364 } 365 return 0; 366 } 367 368 /** 369 * Get the number of updates received from the server in assured safe data 370 * mode. 371 * @return The number of updates received from the server in assured safe data 372 * mode 373 */ 374 public int getAssuredSdReceivedUpdates() 375 { 376 return assuredSdReceivedUpdates; 377 } 378 379 /** 380 * Get the number of updates received from the server in assured safe data 381 * mode that timed out. 382 * @return The number of updates received from the server in assured safe data 383 * mode that timed out. 384 */ 385 public AtomicInteger getAssuredSdReceivedUpdatesTimeout() 386 { 387 return assuredSdReceivedUpdatesTimeout; 388 } 389 390 /** 391 * Get the number of updates sent to the server in assured safe data mode. 392 * @return The number of updates sent to the server in assured safe data mode 393 */ 394 public int getAssuredSdSentUpdates() 395 { 396 return assuredSdSentUpdates; 397 } 398 399 /** 400 * Get the number of updates sent to the server in assured safe data mode that 401 * timed out. 402 * @return The number of updates sent to the server in assured safe data mode 403 * that timed out. 404 */ 405 public AtomicInteger getAssuredSdSentUpdatesTimeout() 406 { 407 return assuredSdSentUpdatesTimeout; 408 } 409 410 /** 411 * Get the number of updates received from the server in assured safe read 412 * mode. 413 * @return The number of updates received from the server in assured safe read 414 * mode 415 */ 416 public int getAssuredSrReceivedUpdates() 417 { 418 return assuredSrReceivedUpdates; 419 } 420 421 /** 422 * Get the number of updates received from the server in assured safe read 423 * mode that timed out. 424 * @return The number of updates received from the server in assured safe read 425 * mode that timed out. 426 */ 427 public AtomicInteger getAssuredSrReceivedUpdatesTimeout() 428 { 429 return assuredSrReceivedUpdatesTimeout; 430 } 431 432 /** 433 * Get the number of updates sent to the server in assured safe read mode. 434 * @return The number of updates sent to the server in assured safe read mode 435 */ 436 public int getAssuredSrSentUpdates() 437 { 438 return assuredSrSentUpdates; 439 } 440 441 /** 442 * Get the number of updates sent to the server in assured safe read mode that 443 * timed out. 444 * @return The number of updates sent to the server in assured safe read mode 445 * that timed out. 446 */ 447 public AtomicInteger getAssuredSrSentUpdatesTimeout() 448 { 449 return assuredSrSentUpdatesTimeout; 450 } 451 452 /** 453 * Returns the Replication Server Domain to which belongs this server handler. 454 * 455 * @return The replication server domain. 456 */ 457 public ReplicationServerDomain getDomain() 458 { 459 return replicationServerDomain; 460 } 461 462 /** 463 * Returns the value of generationId for that handler. 464 * @return The value of the generationId. 465 */ 466 public long getGenerationId() 467 { 468 return generationId; 469 } 470 471 /** 472 * Gets the group id of the server represented by this object. 473 * @return The group id of the server represented by this object. 474 */ 475 public byte getGroupId() 476 { 477 return groupId; 478 } 479 480 /** 481 * Get our heartbeat interval. 482 * @return Our heartbeat interval. 483 */ 484 public long getHeartbeatInterval() 485 { 486 return heartbeatInterval; 487 } 488 489 @Override 490 public MonitorData getMonitorData() 491 { 492 // Get the generic ones 493 MonitorData attributes = super.getMonitorData(); 494 495 attributes.add("server-id", serverId); 496 attributes.add("domain-name", getBaseDN()); 497 498 // Deprecated 499 attributes.add("max-waiting-changes", maxQueueSize); 500 attributes.add("sent-updates", getOutCount()); 501 attributes.add("received-updates", getInCount()); 502 503 // Assured counters 504 attributes.add("assured-sr-received-updates", getAssuredSrReceivedUpdates()); 505 attributes.add("assured-sr-received-updates-timeout", getAssuredSrReceivedUpdatesTimeout()); 506 attributes.add("assured-sr-sent-updates", getAssuredSrSentUpdates()); 507 attributes.add("assured-sr-sent-updates-timeout", getAssuredSrSentUpdatesTimeout()); 508 attributes.add("assured-sd-received-updates", getAssuredSdReceivedUpdates()); 509 if (!isDataServer()) 510 { 511 attributes.add("assured-sd-sent-updates", getAssuredSdSentUpdates()); 512 attributes.add("assured-sd-sent-updates-timeout", getAssuredSdSentUpdatesTimeout()); 513 } else 514 { 515 attributes.add("assured-sd-received-updates-timeout", getAssuredSdReceivedUpdatesTimeout()); 516 } 517 518 // Window stats 519 attributes.add("max-send-window", sendWindowSize); 520 attributes.add("current-send-window", sendWindow.availablePermits()); 521 attributes.add("max-rcv-window", maxRcvWindow); 522 attributes.add("current-rcv-window", rcvWindow); 523 524 // Encryption 525 attributes.add("ssl-encryption", session.isEncrypted()); 526 527 // Data generation 528 attributes.add("generation-id", generationId); 529 530 return attributes; 531 } 532 533 /** 534 * Retrieves the name of this monitor provider. It should be unique among all 535 * monitor providers, including all instances of the same monitor provider. 536 * 537 * @return The name of this monitor provider. 538 */ 539 @Override 540 public abstract String getMonitorInstanceName(); 541 542 /** 543 * Gets the protocol version used with this remote server. 544 * @return The protocol version used with this remote server. 545 */ 546 public short getProtocolVersion() 547 { 548 return session.getProtocolVersion(); 549 } 550 551 /** 552 * Get the Server Id. 553 * 554 * @return the ID of the server to which this object is linked 555 */ 556 public int getServerId() 557 { 558 return serverId; 559 } 560 561 /** 562 * Retrieves the URL for this server handler. 563 * 564 * @return The URL for this server handler, in the form of an address and 565 * port separated by a colon. 566 */ 567 public String getServerURL() 568 { 569 return serverURL; 570 } 571 572 /** 573 * Return the ServerStatus. 574 * @return The server status. 575 */ 576 protected abstract ServerStatus getStatus(); 577 578 /** 579 * Increment the number of updates received from the server in assured safe 580 * data mode. 581 */ 582 public void incrementAssuredSdReceivedUpdates() 583 { 584 assuredSdReceivedUpdates++; 585 } 586 587 /** 588 * Increment the number of updates received from the server in assured safe 589 * data mode that timed out. 590 */ 591 public void incrementAssuredSdReceivedUpdatesTimeout() 592 { 593 assuredSdReceivedUpdatesTimeout.incrementAndGet(); 594 } 595 596 /** 597 * Increment the number of updates sent to the server in assured safe data 598 * mode that timed out. 599 */ 600 public void incrementAssuredSdSentUpdatesTimeout() 601 { 602 assuredSdSentUpdatesTimeout.incrementAndGet(); 603 } 604 605 /** 606 * Increment the number of updates received from the server in assured safe 607 * read mode. 608 */ 609 public void incrementAssuredSrReceivedUpdates() 610 { 611 assuredSrReceivedUpdates++; 612 } 613 614 /** 615 * Increment the number of updates received from the server in assured safe 616 * read mode that timed out. 617 */ 618 public void incrementAssuredSrReceivedUpdatesTimeout() 619 { 620 assuredSrReceivedUpdatesTimeout.incrementAndGet(); 621 } 622 623 /** 624 * Increment the number of updates sent to the server in assured safe read 625 * mode that timed out. 626 */ 627 public void incrementAssuredSrSentUpdatesTimeout() 628 { 629 assuredSrSentUpdatesTimeout.incrementAndGet(); 630 } 631 632 /** {@inheritDoc} */ 633 @Override 634 public void initializeMonitorProvider(MonitorProviderCfg configuration) 635 throws ConfigException, InitializationException 636 { 637 // Nothing to do for now 638 } 639 640 /** 641 * Check if the server associated to this ServerHandler is a data server 642 * in the topology. 643 * @return true if the server is a data server. 644 */ 645 public abstract boolean isDataServer(); 646 647 /** 648 * Check if the server associated to this ServerHandler is a replication 649 * server. 650 * @return true if the server is a replication server. 651 */ 652 public boolean isReplicationServer() 653 { 654 return !isDataServer(); 655 } 656 657 // The handshake phase must be done by blocking any access to structures 658 // keeping info on connected servers, so that one can safely check for 659 // pre-existence of a server, send a coherent snapshot of known topology to 660 // peers, update the local view of the topology... 661 // 662 // For instance a kind of problem could be that while we connect with a 663 // peer RS, a DS is connecting at the same time and we could publish the 664 // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. 665 // 666 // This method and every others that need to read/make changes to the 667 // structures holding topology for the domain should: 668 // - call ReplicationServerDomain.lock() 669 // - read/modify structures 670 // - call ReplicationServerDomain.release() 671 // 672 // More information is provided in comment of ReplicationServerDomain.lock() 673 674 /** 675 * Lock the domain without a timeout. 676 * <p> 677 * If domain already exists, lock it until handshake is finished otherwise it 678 * will be created and locked later in the method 679 * 680 * @throws DirectoryException 681 * When an exception occurs. 682 * @throws InterruptedException 683 * If the current thread was interrupted while waiting for the lock. 684 */ 685 public void lockDomainNoTimeout() throws DirectoryException, 686 InterruptedException 687 { 688 if (!replicationServerDomain.hasLock()) 689 { 690 replicationServerDomain.lock(); 691 } 692 } 693 694 /** 695 * Lock the domain with a timeout. 696 * <p> 697 * Take the lock on the domain. WARNING: Here we try to acquire the lock with 698 * a timeout. This is for preventing a deadlock that may happen if there are 699 * cross connection attempts (for same domain) from this replication server 700 * and from a peer one. 701 * <p> 702 * Here is the scenario: 703 * <ol> 704 * <li>RS1 connect thread takes the domain lock and starts connection to RS2 705 * </li> 706 * <li>at the same time RS2 connect thread takes his domain lock and start 707 * connection to RS2</li> 708 * <li>RS2 listen thread starts processing received ReplServerStartMsg from 709 * RS1 and wants to acquire the lock on the domain (here) but cannot as RS2 710 * connect thread already has it</li> 711 * <li>RS1 listen thread starts processing received ReplServerStartMsg from 712 * RS2 and wants to acquire the lock on the domain (here) but cannot as RS1 713 * connect thread already has it</li> 714 * </ol> 715 * => Deadlock: 4 threads are locked. 716 * <p> 717 * To prevent threads locking in such situation, the listen threads here will 718 * both timeout trying to acquire the lock. The random time for the timeout 719 * should allow on connection attempt to be aborted whereas the other one 720 * should have time to finish in the same time. 721 * <p> 722 * Warning: the minimum time (3s) should be big enough to allow normal 723 * situation connections to terminate. The added random time should represent 724 * a big enough range so that the chance to have one listen thread timing out 725 * a lot before the peer one is great. When the first listen thread times out, 726 * the remote connect thread should release the lock and allow the peer listen 727 * thread to take the lock it was waiting for and process the connection 728 * attempt. 729 * 730 * @throws DirectoryException 731 * When an exception occurs. 732 * @throws InterruptedException 733 * If the current thread was interrupted while waiting for the lock. 734 */ 735 public void lockDomainWithTimeout() throws DirectoryException, 736 InterruptedException 737 { 738 final Random random = new Random(); 739 final int randomTime = random.nextInt(6); // Random from 0 to 5 740 // Wait at least 3 seconds + (0 to 5 seconds) 741 final long timeout = 3000 + randomTime * 1000; 742 final boolean lockAcquired = replicationServerDomain.tryLock(timeout); 743 if (!lockAcquired) 744 { 745 LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( 746 getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId()); 747 throw new DirectoryException(ResultCode.OTHER, message); 748 } 749 } 750 751 /** 752 * Processes a routable message. 753 * 754 * @param msg The message to be processed. 755 */ 756 void process(RoutableMsg msg) 757 { 758 if (logger.isTraceEnabled()) 759 { 760 logger.trace("In " 761 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 762 + this + " processes routable msg received:" + msg); 763 } 764 replicationServerDomain.process(msg, this); 765 } 766 767 /** 768 * Responds to a monitor request message. 769 * 770 * @param msg 771 * The monitor request message. 772 */ 773 void processMonitorRequestMsg(MonitorRequestMsg msg) 774 { 775 replicationServerDomain.processMonitorRequestMsg(msg, this); 776 } 777 778 /** 779 * Responds to a monitor message. 780 * 781 * @param msg 782 * The monitor message. 783 */ 784 void processMonitorMsg(MonitorMsg msg) 785 { 786 replicationServerDomain.processMonitorMsg(msg, this); 787 } 788 789 /** 790 * Processes a change time heartbeat msg. 791 * 792 * @param msg 793 * The message to be processed. 794 * @throws DirectoryException 795 * When an exception is raised. 796 */ 797 void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException 798 { 799 if (logger.isTraceEnabled()) 800 { 801 logger.trace("In " 802 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 803 + this + " processes received msg:\n" + msg); 804 } 805 replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg); 806 } 807 808 /** 809 * Process the reception of a WindowProbeMsg message. 810 * 811 * @throws IOException 812 * When the session becomes unavailable. 813 */ 814 public void replyToWindowProbe() throws IOException 815 { 816 if (rcvWindow > 0) 817 { 818 // The LDAP server believes that its window is closed while it is not, 819 // this means that some problem happened in the window exchange procedure! 820 // lets update the LDAP server with out current window size and hope 821 // that everything will work better in the future. 822 // TODO also log an error message. 823 session.publish(new WindowMsg(rcvWindow)); 824 } 825 else 826 { 827 // Both the LDAP server and the replication server believes that the 828 // window is closed. Lets check the flowcontrol in case we 829 // can now resume operations and send a windowMessage if necessary. 830 checkWindow(); 831 } 832 } 833 834 /** 835 * Sends the provided TopologyMsg to the peer server. 836 * 837 * @param topoMsg 838 * The TopologyMsg message to be sent. 839 * @throws IOException 840 * When it occurs while sending the message, 841 */ 842 public void sendTopoInfo(TopologyMsg topoMsg) throws IOException 843 { 844 // V1 Rs do not support the TopologyMsg 845 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 846 { 847 send(topoMsg); 848 } 849 } 850 851 /** 852 * Set a new generation ID. 853 * 854 * @param generationId The new generation ID 855 * 856 */ 857 public void setGenerationId(long generationId) 858 { 859 this.generationId = generationId; 860 } 861 862 /** 863 * Sets the window size when used when sending to the remote. 864 * @param size The provided window size. 865 */ 866 protected void setSendWindowSize(int size) 867 { 868 this.sendWindowSize = size; 869 } 870 871 /** 872 * Shutdown This ServerHandler. 873 */ 874 @Override 875 public void shutdown() 876 { 877 shutdownWriter = true; 878 setConsumerActive(false); 879 super.shutdown(); 880 881 if (session != null) 882 { 883 session.close(); 884 } 885 if (heartbeatThread != null) 886 { 887 heartbeatThread.shutdown(); 888 } 889 890 DirectoryServer.deregisterMonitorProvider(this); 891 892 /* 893 * Be sure to wait for ServerWriter and ServerReader death 894 * It does not matter if we try to stop a thread which is us (reader 895 * or writer), but we must not wait for our own thread death. 896 */ 897 try 898 { 899 if (writer != null && !Thread.currentThread().equals(writer)) 900 { 901 writer.join(SHUTDOWN_JOIN_TIMEOUT); 902 } 903 if (reader != null && !Thread.currentThread().equals(reader)) 904 { 905 reader.join(SHUTDOWN_JOIN_TIMEOUT); 906 } 907 } catch (InterruptedException e) 908 { 909 // don't try anymore to join and return. 910 } 911 if (logger.isTraceEnabled()) 912 { 913 logger.trace("SH.shutdowned(" + this + ")"); 914 } 915 } 916 917 /** 918 * Select the next update that must be sent to the server managed by this 919 * ServerHandler. 920 * 921 * @return the next update that must be sent to the server managed by this 922 * ServerHandler. 923 * @throws ChangelogException 924 * If a problem occurs when reading the changelog 925 */ 926 public UpdateMsg take() throws ChangelogException 927 { 928 final UpdateMsg msg = getNextMessage(); 929 930 if (!(msg instanceof ReplicaOfflineMsg)) 931 { 932 acquirePermitInSendWindow(); 933 } 934 935 if (msg != null) 936 { 937 incrementOutCount(); 938 if (msg.isAssured()) 939 { 940 incrementAssuredStats(msg); 941 } 942 return msg; 943 } 944 return null; 945 } 946 947 private void acquirePermitInSendWindow() 948 { 949 boolean acquired = false; 950 boolean interrupted = true; 951 do 952 { 953 try 954 { 955 acquired = sendWindow.tryAcquire(500, TimeUnit.MILLISECONDS); 956 interrupted = false; 957 } catch (InterruptedException e) 958 { 959 // loop until not interrupted 960 } 961 } while ((interrupted || !acquired) && !shutdownWriter); 962 } 963 964 private void incrementAssuredStats(final UpdateMsg msg) 965 { 966 if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 967 { 968 assuredSrSentUpdates++; 969 } 970 else if (!isDataServer()) 971 { 972 assuredSdSentUpdates++; 973 } 974 } 975 976 /** 977 * Creates a RSInfo structure representing this remote RS. 978 * @return The RSInfo structure representing this remote RS 979 */ 980 public RSInfo toRSInfo() 981 { 982 return new RSInfo(serverId, serverURL, generationId, groupId, weight); 983 } 984 985 /** 986 * Update the send window size based on the credit specified in the 987 * given window message. 988 * 989 * @param windowMsg The Window LocalizableMessage containing the information 990 * necessary for updating the window size. 991 */ 992 public void updateWindow(WindowMsg windowMsg) 993 { 994 sendWindow.release(windowMsg.getNumAck()); 995 } 996 997 /** 998 * Log the messages involved in the start handshake. 999 * @param inStartMsg The message received first. 1000 * @param outStartMsg The message sent in response. 1001 */ 1002 protected void logStartHandshakeRCVandSND( 1003 StartMsg inStartMsg, 1004 StartMsg outStartMsg) 1005 { 1006 if (logger.isTraceEnabled()) 1007 { 1008 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1009 + ", " + getClass().getSimpleName() + " " + this + ":" 1010 + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg 1011 + "\nAND REPLIED:\n" + outStartMsg); 1012 } 1013 } 1014 1015 /** 1016 * Log the messages involved in the start handshake. 1017 * @param outStartMsg The message sent first. 1018 * @param inStartMsg The message received in response. 1019 */ 1020 protected void logStartHandshakeSNDandRCV( 1021 StartMsg outStartMsg, 1022 StartMsg inStartMsg) 1023 { 1024 if (logger.isTraceEnabled()) 1025 { 1026 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1027 + ", " + getClass().getSimpleName() + " " + this + ":" 1028 + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n" 1029 + inStartMsg); 1030 } 1031 } 1032 1033 /** 1034 * Log the messages involved in the Topology handshake. 1035 * @param inTopoMsg The message received first. 1036 * @param outTopoMsg The message sent in response. 1037 */ 1038 protected void logTopoHandshakeRCVandSND( 1039 TopologyMsg inTopoMsg, 1040 TopologyMsg outTopoMsg) 1041 { 1042 if (logger.isTraceEnabled()) 1043 { 1044 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1045 + ", " + getClass().getSimpleName() + " " + this + ":" 1046 + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n" 1047 + outTopoMsg); 1048 } 1049 } 1050 1051 /** 1052 * Log the messages involved in the Topology handshake. 1053 * @param outTopoMsg The message sent first. 1054 * @param inTopoMsg The message received in response. 1055 */ 1056 protected void logTopoHandshakeSNDandRCV( 1057 TopologyMsg outTopoMsg, 1058 TopologyMsg inTopoMsg) 1059 { 1060 if (logger.isTraceEnabled()) 1061 { 1062 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1063 + ", " + getClass().getSimpleName() + " " + this + ":" 1064 + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n" 1065 + inTopoMsg); 1066 } 1067 } 1068 1069 /** 1070 * Log the messages involved in the Topology/StartSession handshake. 1071 * @param inStartSessionMsg The message received first. 1072 * @param outTopoMsg The message sent in response. 1073 */ 1074 protected void logStartSessionHandshake( 1075 StartSessionMsg inStartSessionMsg, 1076 TopologyMsg outTopoMsg) 1077 { 1078 if (logger.isTraceEnabled()) 1079 { 1080 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1081 + ", " + getClass().getSimpleName() + " " + this + " :" 1082 + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg 1083 + "\nAND REPLIED:\n" + outTopoMsg); 1084 } 1085 } 1086 1087 /** 1088 * Log stop message has been received. 1089 */ 1090 protected void logStopReceived() 1091 { 1092 if (logger.isTraceEnabled()) 1093 { 1094 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1095 + ", " + getClass().getSimpleName() + " " + this + " :" 1096 + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); 1097 } 1098 } 1099 1100 /** 1101 * Process a Ack message received. 1102 * @param ack the message received. 1103 */ 1104 void processAck(AckMsg ack) 1105 { 1106 replicationServerDomain.processAck(ack, this); 1107 } 1108 1109 /** 1110 * Get the reference generation id (associated with the changes in the db). 1111 * @return the reference generation id. 1112 */ 1113 public long getReferenceGenId() 1114 { 1115 return replicationServerDomain.getGenerationId(); 1116 } 1117 1118 /** 1119 * Process a ResetGenerationIdMsg message received. 1120 * @param msg the message received. 1121 */ 1122 void processResetGenId(ResetGenerationIdMsg msg) 1123 { 1124 replicationServerDomain.resetGenerationId(this, msg); 1125 } 1126 1127 /** 1128 * Put a new update message received. 1129 * @param update the update message received. 1130 * @throws IOException when it occurs. 1131 */ 1132 public void put(UpdateMsg update) throws IOException 1133 { 1134 if (!(update instanceof ReplicaOfflineMsg)) 1135 { 1136 decAndCheckWindow(); 1137 } 1138 replicationServerDomain.put(update, this); 1139 } 1140 1141 /** 1142 * Stop this handler. 1143 */ 1144 public void doStop() 1145 { 1146 replicationServerDomain.stopServer(this, false); 1147 } 1148 1149 /** 1150 * Creates a ReplServerStartMsg for the current ServerHandler. 1151 * 1152 * @return a new ReplServerStartMsg for the current ServerHandler. 1153 */ 1154 protected ReplServerStartMsg createReplServerStartMsg() 1155 { 1156 return new ReplServerStartMsg(getReplicationServerId(), 1157 getReplicationServerURL(), getBaseDN(), maxRcvWindow, 1158 replicationServerDomain.getLatestServerState(), localGenerationId, 1159 sslEncryption, getLocalGroupId(), 1160 replicationServer.getDegradedStatusThreshold()); 1161 } 1162 1163 /** 1164 * Returns a "badly disconnected" error message for this server handler. 1165 * 1166 * @return a "badly disconnected" error message for this server handler 1167 */ 1168 public LocalizableMessage getBadlyDisconnectedErrorMessage() 1169 { 1170 if (isDataServer()) 1171 { 1172 return ERR_DS_BADLY_DISCONNECTED.get(getReplicationServerId(), 1173 getServerId(), session.getReadableRemoteAddress(), getBaseDN()); 1174 } 1175 return ERR_RS_BADLY_DISCONNECTED.get(getReplicationServerId(), 1176 getServerId(), session.getReadableRemoteAddress(), getBaseDN()); 1177 } 1178}