001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import static org.opends.messages.ReplicationMessages.*; 020import static org.opends.server.replication.protocol.ProtocolVersion.*; 021import static org.opends.server.util.StaticUtils.*; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028 029import org.forgerock.i18n.LocalizableMessage; 030import org.forgerock.i18n.slf4j.LocalizedLogger; 031import org.forgerock.opendj.ldap.DN; 032import org.forgerock.opendj.ldap.ResultCode; 033import org.opends.server.api.MonitorData; 034import org.opends.server.replication.common.DSInfo; 035import org.opends.server.replication.common.RSInfo; 036import org.opends.server.replication.common.ServerState; 037import org.opends.server.replication.common.ServerStatus; 038import org.opends.server.replication.protocol.ProtocolVersion; 039import org.opends.server.replication.protocol.ReplServerStartMsg; 040import org.opends.server.replication.protocol.ReplicationMsg; 041import org.opends.server.replication.protocol.Session; 042import org.opends.server.replication.protocol.StopMsg; 043import org.opends.server.replication.protocol.TopologyMsg; 044import org.opends.server.types.DirectoryException; 045import org.opends.server.types.HostPort; 046 047/** 048 * This class defines a server handler, which handles all interaction with a 049 * peer replication server. 050 */ 051public class ReplicationServerHandler extends ServerHandler 052{ 053 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 054 055 /** Properties filled only if remote server is a RS. */ 056 private String serverAddressURL; 057 /** 058 * This collection will contain as many elements as there are 059 * LDAP servers connected to the remote replication server. 060 */ 061 private final Map<Integer, LightweightServerHandler> remoteDirectoryServers = new ConcurrentHashMap<>(); 062 063 /** 064 * Starts this handler based on a start message received from remote server. 065 * @param inReplServerStartMsg The start msg provided by the remote server. 066 * @return Whether the remote server requires encryption or not. 067 * @throws DirectoryException When a problem occurs. 068 */ 069 private boolean processStartFromRemote( 070 ReplServerStartMsg inReplServerStartMsg) 071 throws DirectoryException 072 { 073 try 074 { 075 short protocolVersion = getCompatibleVersion(inReplServerStartMsg 076 .getVersion()); 077 session.setProtocolVersion(protocolVersion); 078 generationId = inReplServerStartMsg.getGenerationId(); 079 serverId = inReplServerStartMsg.getServerId(); 080 serverURL = inReplServerStartMsg.getServerURL(); 081 serverAddressURL = toServerAddressURL(serverURL); 082 setBaseDNAndDomain(inReplServerStartMsg.getBaseDN(), false); 083 setInitialServerState(inReplServerStartMsg.getServerState()); 084 setSendWindowSize(inReplServerStartMsg.getWindowSize()); 085 if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) 086 { 087 // We support connection from a V1 RS 088 // Only V2 protocol has the group id in repl server start message 089 this.groupId = inReplServerStartMsg.getGroupId(); 090 } 091 092 oldGenerationId = -100; 093 } 094 catch(Exception e) 095 { 096 LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage()); 097 throw new DirectoryException(ResultCode.OTHER, message); 098 } 099 return inReplServerStartMsg.getSSLEncryption(); 100 } 101 102 private String toServerAddressURL(String serverURL) 103 { 104 final int port = HostPort.valueOf(serverURL).getPort(); 105 // Ensure correct formatting of IPv6 addresses by using a HostPort instance. 106 return new HostPort(session.getRemoteAddress(), port).toString(); 107 } 108 109 /** 110 * Sends a start message to the remote RS. 111 * 112 * @return The ReplServerStartMsg sent. 113 * @throws IOException 114 * When an exception occurs. 115 */ 116 private ReplServerStartMsg sendStartToRemote() throws IOException 117 { 118 ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg(); 119 send(outReplServerStartMsg); 120 return outReplServerStartMsg; 121 } 122 123 /** 124 * Creates a new handler object to a remote replication server. 125 * @param session The session with the remote RS. 126 * @param queueSize The queue size to manage updates to that RS. 127 * @param replicationServer The hosting local RS object. 128 * @param rcvWindowSize The receiving window size. 129 */ 130 public ReplicationServerHandler( 131 Session session, 132 int queueSize, 133 ReplicationServer replicationServer, 134 int rcvWindowSize) 135 { 136 super(session, queueSize, replicationServer, rcvWindowSize); 137 } 138 139 /** 140 * Connect the hosting RS to the RS represented by THIS handler 141 * on an outgoing connection. 142 * @param baseDN The baseDN 143 * @param sslEncryption The sslEncryption requested to the remote RS. 144 * @throws DirectoryException when an error occurs. 145 */ 146 public void connect(DN baseDN, boolean sslEncryption) 147 throws DirectoryException 148 { 149 // we are the initiator and decides of the encryption 150 this.sslEncryption = sslEncryption; 151 152 setBaseDNAndDomain(baseDN, false); 153 154 localGenerationId = replicationServerDomain.getGenerationId(); 155 oldGenerationId = localGenerationId; 156 157 try 158 { 159 lockDomainNoTimeout(); 160 161 ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); 162 163 // Wait answer 164 ReplicationMsg msg = session.receive(); 165 166 // Reject bad responses 167 if (!(msg instanceof ReplServerStartMsg)) 168 { 169 if (msg instanceof StopMsg) 170 { 171 // Remote replication server is probably shutting down or simultaneous 172 // cross-connect detected. 173 abortStart(null); 174 } 175 else 176 { 177 LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg 178 .getClass().getCanonicalName(), "ReplServerStartMsg"); 179 abortStart(message); 180 } 181 return; 182 } 183 184 processStartFromRemote((ReplServerStartMsg) msg); 185 186 if (replicationServerDomain.isAlreadyConnectedToRS(this)) 187 { 188 // Simultaneous cross connect. 189 abortStart(null); 190 return; 191 } 192 193 /* 194 Since we are going to send the topology message before having received 195 one, we need to set the generation ID as soon as possible if it is 196 currently uninitialized. See OpenDJ-121. 197 */ 198 if (localGenerationId < 0 && generationId > 0) 199 { 200 oldGenerationId = 201 replicationServerDomain.changeGenerationId(generationId); 202 } 203 204 logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg); 205 206 // Until here session is encrypted then it depends on the negotiation 207 // The session initiator decides whether to use SSL. 208 if (!this.sslEncryption) 209 { 210 session.stopEncryption(); 211 } 212 213 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 214 { 215 /* 216 Only protocol version above V1 has a phase 2 handshake 217 NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: 218 TopologyMsg then TopologyMsg (with a RS) 219 220 Send our own TopologyMsg to remote RS 221 */ 222 TopologyMsg outTopoMsg = 223 replicationServerDomain.createTopologyMsgForRS(); 224 sendTopoInfo(outTopoMsg); 225 226 // wait and process Topo from remote RS 227 TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS(); 228 if (inTopoMsg == null) 229 { 230 // Simultaneous cross connect. 231 abortStart(null); 232 return; 233 } 234 235 logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg); 236 237 /* 238 FIXME: i think this should be done for all protocol version !! 239 not only those > V1 240 */ 241 replicationServerDomain.register(this); 242 243 /* 244 Process TopologyMsg sent by remote RS: store matching new info 245 (this will also warn our connected DSs of the new received info) 246 */ 247 replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); 248 } 249 250 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_TO_RS, getReplicationServerId(), getServerId(), 251 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 252 253 super.finalizeStart(); 254 } 255 catch (IOException e) 256 { 257 logger.traceException(e); 258 LocalizableMessage errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get( 259 getReplicationServerId(), session.getReadableRemoteAddress(), getExceptionMessage(e)); 260 abortStart(errMessage); 261 } 262 catch (DirectoryException e) 263 { 264 logger.traceException(e); 265 abortStart(e.getMessageObject()); 266 } 267 catch (Exception e) 268 { 269 logger.traceException(e); 270 abortStart(LocalizableMessage.raw(e.getLocalizedMessage())); 271 } 272 finally 273 { 274 releaseDomainLock(); 275 } 276 } 277 278 /** 279 * Starts the handler from a remote ReplServerStart message received from 280 * the remote replication server. 281 * @param inReplServerStartMsg The provided ReplServerStart message received. 282 */ 283 public void startFromRemoteRS(ReplServerStartMsg inReplServerStartMsg) 284 { 285 localGenerationId = -1; 286 oldGenerationId = -100; 287 try 288 { 289 // The initiator decides if the session is encrypted 290 sslEncryption = processStartFromRemote(inReplServerStartMsg); 291 292 lockDomainWithTimeout(); 293 294 if (replicationServerDomain.isAlreadyConnectedToRS(this)) 295 { 296 abortStart(null); 297 return; 298 } 299 300 this.localGenerationId = replicationServerDomain.getGenerationId(); 301 ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); 302 303 logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg); 304 305 /* 306 until here session is encrypted then it depends on the negotiation 307 The session initiator decides whether to use SSL. 308 */ 309 if (!sslEncryption) 310 { 311 session.stopEncryption(); 312 } 313 314 TopologyMsg inTopoMsg = null; 315 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 316 { 317 /* 318 Only protocol version above V1 has a phase 2 handshake 319 NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: 320 TopologyMsg then TopologyMsg (with a RS) 321 wait and process Topo from remote RS 322 */ 323 inTopoMsg = waitAndProcessTopoFromRemoteRS(); 324 if (inTopoMsg == null) 325 { 326 // Simultaneous cross connect. 327 abortStart(null); 328 return; 329 } 330 331 // send our own TopologyMsg to remote RS 332 TopologyMsg outTopoMsg = replicationServerDomain 333 .createTopologyMsgForRS(); 334 sendTopoInfo(outTopoMsg); 335 336 logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg); 337 } 338 else 339 { 340 // Terminate connection from a V1 RS 341 342 // if the remote RS and the local RS have the same genID 343 // then it's ok and nothing else to do 344 if (generationId == localGenerationId) 345 { 346 if (logger.isTraceEnabled()) 347 { 348 logger.trace("In " + replicationServer.getMonitorInstanceName() 349 + " " + this + " RS V1 with serverID=" + serverId 350 + " is connected with the right generation ID"); 351 } 352 } else 353 { 354 checkGenerationId(); 355 } 356 /* 357 Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1 358 all the servers of the topology. We prefer not not send a TopologyMsg 359 for giving partial/false information to the V2 servers as for 360 instance we don't have the connected DS of the V1 RS...When the V1 361 RS will be upgraded in his turn, topo info will be sent and accurate. 362 That way, there is no risk to have false/incomplete information in 363 other servers. 364 */ 365 } 366 367 replicationServerDomain.register(this); 368 369 // Process TopologyMsg sent by remote RS: store matching new info 370 // (this will also warn our connected DSs of the new received info) 371 if (inTopoMsg!=null) 372 { 373 replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); 374 } 375 376 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_RS, getReplicationServerId(), getServerId(), 377 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 378 379 super.finalizeStart(); 380 } 381 catch (IOException e) 382 { 383 logger.traceException(e); 384 abortStart(ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get( 385 inReplServerStartMsg.getServerId(), replicationServer.getServerId(), getExceptionMessage(e))); 386 } 387 catch (DirectoryException e) 388 { 389 logger.traceException(e); 390 abortStart(e.getMessageObject()); 391 } 392 catch (Exception e) 393 { 394 logger.traceException(e); 395 abortStart(LocalizableMessage.raw(e.getLocalizedMessage())); 396 } 397 finally 398 { 399 releaseDomainLock(); 400 } 401 } 402 403 /** 404 * Wait receiving the TopologyMsg from the remote RS and process it. 405 * @return the topologyMsg received or {@code null} if stop was received. 406 * @throws DirectoryException 407 */ 408 private TopologyMsg waitAndProcessTopoFromRemoteRS() 409 throws DirectoryException 410 { 411 ReplicationMsg msg; 412 try 413 { 414 msg = session.receive(); 415 } 416 catch(Exception e) 417 { 418 LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage()); 419 throw new DirectoryException(ResultCode.OTHER, message); 420 } 421 422 if (!(msg instanceof TopologyMsg)) 423 { 424 if (msg instanceof StopMsg) 425 { 426 // Remote replication server is probably shutting down, or cross 427 // connection attempt. 428 return null; 429 } 430 431 LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get( 432 msg.getClass().getCanonicalName(), "TopologyMsg"); 433 throw new DirectoryException(ResultCode.OTHER, message); 434 } 435 436 // Remote RS sent his topo msg 437 TopologyMsg inTopoMsg = (TopologyMsg) msg; 438 439 /* Store remote RS weight if it has one. 440 * For protocol version < 4, use default value of 1 for weight 441 */ 442 if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 443 { 444 // List should only contain RS info for sender 445 RSInfo rsInfo = inTopoMsg.getRsInfos().get(0); 446 weight = rsInfo.getWeight(); 447 } 448 449 /* 450 if the remote RS and the local RS have the same genID 451 then it's ok and nothing else to do 452 */ 453 if (generationId == localGenerationId) 454 { 455 if (logger.isTraceEnabled()) 456 { 457 logger.trace("In " + replicationServer.getMonitorInstanceName() 458 + " RS with serverID=" + serverId 459 + " is connected with the right generation ID, same as local =" 460 + generationId); 461 } 462 } 463 else 464 { 465 checkGenerationId(); 466 } 467 468 return inTopoMsg; 469 } 470 471 /** 472 * Checks local generation ID against the remote RS one, 473 * and logs Warning messages if needed. 474 */ 475 private void checkGenerationId() 476 { 477 if (localGenerationId <= 0) 478 { 479 // The local RS is not initialized - take the one received 480 // WARNING: Must be done before computing topo message to send to peer 481 // server as topo message must embed valid generation id for our server 482 oldGenerationId = 483 replicationServerDomain.changeGenerationId(generationId); 484 return; 485 } 486 487 // the local RS is initialized 488 if (generationId > 0 489 // the remote RS is initialized. If not, there's nothing to do anyway. 490 && generationId != localGenerationId) 491 { 492 /* Either: 493 * 494 * 1) The 2 RS have different generationID 495 * replicationServerDomain.getGenerationIdSavedStatus() == true 496 * 497 * if the present RS has received changes regarding its gen ID and so will 498 * not change without a reset then we are just degrading the peer. 499 * 500 * 2) This RS has never received any changes for the current gen ID. 501 * 502 * Example case: 503 * - we are in RS1 504 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) 505 * - RS1 has genId1 from LS1 /genId1 comes from data in suffix 506 * - we are in RS1 and we receive a START msg from RS2 507 * - Each RS keeps its genID / is degraded and when LS2 508 * will be populated from LS1 everything will become ok. 509 * 510 * Issue: 511 * FIXME : Would it be a good idea in some cases to just set the gen ID 512 * received from the peer RS specially if the peer has a non null state 513 * and we have a null state ? 514 * replicationServerDomain.setGenerationId(generationId, false); 515 */ 516 logger.warn(WARN_BAD_GENERATION_ID_FROM_RS, serverId, session.getReadableRemoteAddress(), generationId, 517 getBaseDN(), getReplicationServerId(), localGenerationId); 518 } 519 } 520 521 /** {@inheritDoc} */ 522 @Override 523 public boolean isDataServer() 524 { 525 return false; 526 } 527 528 /** 529 * Add the DSinfos of the connected Directory Servers 530 * to the List of DSInfo provided as a parameter. 531 * 532 * @param dsInfos The List of DSInfo that should be updated 533 * with the DSInfo for the remoteDirectoryServers 534 * connected to this ServerHandler. 535 */ 536 public void addDSInfos(List<DSInfo> dsInfos) 537 { 538 synchronized (remoteDirectoryServers) 539 { 540 for (LightweightServerHandler ls : remoteDirectoryServers.values()) 541 { 542 dsInfos.add(ls.toDSInfo()); 543 } 544 } 545 } 546 547 /** 548 * Shutdown This ServerHandler. 549 */ 550 @Override 551 public void shutdown() 552 { 553 super.shutdown(); 554 clearRemoteLSHandlers(); 555 } 556 557 private void clearRemoteLSHandlers() 558 { 559 synchronized (remoteDirectoryServers) 560 { 561 for (LightweightServerHandler lsh : remoteDirectoryServers.values()) 562 { 563 lsh.stopHandler(); 564 } 565 remoteDirectoryServers.clear(); 566 } 567 } 568 569 /** 570 * Stores topology information received from a peer RS and that must be kept 571 * in RS handler. 572 * 573 * @param topoMsg The received topology message 574 */ 575 public void processTopoInfoFromRS(TopologyMsg topoMsg) 576 { 577 // List should only contain RS info for sender 578 final RSInfo rsInfo = topoMsg.getRsInfos().get(0); 579 generationId = rsInfo.getGenerationId(); 580 groupId = rsInfo.getGroupId(); 581 weight = rsInfo.getWeight(); 582 583 synchronized (remoteDirectoryServers) 584 { 585 clearRemoteLSHandlers(); 586 587 // Creates the new structure according to the message received. 588 for (DSInfo dsInfo : topoMsg.getReplicaInfos().values()) 589 { 590 // For each DS connected to the peer RS 591 DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId); 592 LightweightServerHandler lsh = 593 new LightweightServerHandler(this, clonedDSInfo); 594 lsh.startHandler(); 595 remoteDirectoryServers.put(lsh.getServerId(), lsh); 596 } 597 } 598 } 599 600 /** 601 * When this handler is connected to a replication server, specifies if 602 * a wanted server is connected to this replication server. 603 * 604 * @param serverId The server we want to know if it is connected 605 * to the replication server represented by this handler. 606 * @return boolean True is the wanted server is connected to the server 607 * represented by this handler. 608 */ 609 public boolean isRemoteLDAPServer(int serverId) 610 { 611 synchronized (remoteDirectoryServers) 612 { 613 for (LightweightServerHandler server : remoteDirectoryServers.values()) 614 { 615 if (serverId == server.getServerId()) 616 { 617 return true; 618 } 619 } 620 return false; 621 } 622 } 623 624 /** 625 * When the handler is connected to a replication server, specifies the 626 * replication server has remote LDAP servers connected to it. 627 * 628 * @return boolean True is the replication server has remote LDAP servers 629 * connected to it. 630 */ 631 public boolean hasRemoteLDAPServers() 632 { 633 return !remoteDirectoryServers.isEmpty(); 634 } 635 636 /** 637 * Return a Set containing the servers known by this replicationServer. 638 * @return a set containing the servers known by this replicationServer. 639 */ 640 public Set<Integer> getConnectedDirectoryServerIds() 641 { 642 return remoteDirectoryServers.keySet(); 643 } 644 645 /** {@inheritDoc} */ 646 @Override 647 public String getMonitorInstanceName() 648 { 649 return "Connected replication server RS(" + serverId + ") " + serverURL 650 + ",cn=" + replicationServerDomain.getMonitorInstanceName(); 651 } 652 653 @Override 654 public MonitorData getMonitorData() 655 { 656 MonitorData attributes = super.getMonitorData(); 657 658 ReplicationDomainMonitorData md = replicationServerDomain.getDomainMonitorData(); 659 attributes.add("Replication-Server", serverURL); 660 attributes.add("missing-changes", md.getMissingChangesRS(serverId)); 661 662 ServerState state = md.getRSStates(serverId); 663 if (state != null) 664 { 665 attributes.add("server-state", state.toStringSet()); 666 } 667 668 return attributes; 669 } 670 671 /** {@inheritDoc} */ 672 @Override 673 public String toString() 674 { 675 if (serverId != 0) 676 { 677 return "Replication server RS(" + serverId + ") for domain \"" 678 + replicationServerDomain.getBaseDN() + "\""; 679 } 680 return "Unknown server"; 681 } 682 683 /** 684 * Gets the status of the connected DS. 685 * @return The status of the connected DS. 686 */ 687 @Override 688 public ServerStatus getStatus() 689 { 690 return ServerStatus.INVALID_STATUS; 691 } 692 693 /** 694 * Retrieves the Address URL for this server handler. 695 * 696 * @return The Address URL for this server handler, 697 * in the form of an IP address and port separated by a colon. 698 */ 699 public String getServerAddressURL() 700 { 701 return serverAddressURL; 702 } 703 704 /** 705 * Receives a topology msg. 706 * @param topoMsg The message received. 707 * @throws DirectoryException when it occurs. 708 * @throws IOException when it occurs. 709 */ 710 public void receiveTopoInfoFromRS(TopologyMsg topoMsg) 711 throws DirectoryException, IOException 712 { 713 replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); 714 } 715 716}