001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import static org.opends.messages.ReplicationMessages.*; 020import static org.opends.server.replication.common.ServerStatus.*; 021import static org.opends.server.replication.common.StatusMachine.*; 022import static org.opends.server.replication.protocol.ProtocolVersion.*; 023import static org.opends.server.util.StaticUtils.*; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031 032import org.forgerock.i18n.LocalizableMessage; 033import org.forgerock.i18n.slf4j.LocalizedLogger; 034import org.forgerock.opendj.ldap.ResultCode; 035import org.opends.server.api.MonitorData; 036import org.opends.server.replication.common.AssuredMode; 037import org.opends.server.replication.common.DSInfo; 038import org.opends.server.replication.common.ServerState; 039import org.opends.server.replication.common.ServerStatus; 040import org.opends.server.replication.common.StatusMachine; 041import org.opends.server.replication.common.StatusMachineEvent; 042import org.opends.server.replication.protocol.ChangeStatusMsg; 043import org.opends.server.replication.protocol.ProtocolVersion; 044import org.opends.server.replication.protocol.ReplServerStartDSMsg; 045import org.opends.server.replication.protocol.ReplicationMsg; 046import org.opends.server.replication.protocol.ServerStartMsg; 047import org.opends.server.replication.protocol.Session; 048import org.opends.server.replication.protocol.StartMsg; 049import org.opends.server.replication.protocol.StartSessionMsg; 050import org.opends.server.replication.protocol.StopMsg; 051import org.opends.server.replication.protocol.TopologyMsg; 052import org.opends.server.types.DirectoryException; 053 054/** 055 * This class defines a server handler, which handles all interaction with a 056 * peer server (RS or DS). 057 */ 058public class DataServerHandler extends ServerHandler 059{ 060 061 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 062 063 /** 064 * Temporary generationId received in handshake/phase1, and used after 065 * handshake/phase2. 066 */ 067 private long tmpGenerationId; 068 069 /** Status of this DS (only used if this server handler represents a DS). */ 070 private ServerStatus status = ServerStatus.INVALID_STATUS; 071 072 /** Referrals URLs this DS is exporting. */ 073 private List<String> refUrls = new ArrayList<>(); 074 /** Assured replication enabled on DS or not. */ 075 private boolean assuredFlag; 076 /** DS assured mode (relevant if assured replication enabled). */ 077 private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; 078 /** DS safe data level (relevant if assured mode is safe data). */ 079 private byte safeDataLevel = -1; 080 private Set<String> eclIncludes = new HashSet<>(); 081 private Set<String> eclIncludesForDeletes = new HashSet<>(); 082 083 /** 084 * Creates a new data server handler. 085 * @param session The session opened with the remote data server. 086 * @param queueSize The queue size. 087 * @param replicationServer The hosting RS. 088 * @param rcvWindowSize The receiving window size. 089 */ 090 public DataServerHandler( 091 Session session, 092 int queueSize, 093 ReplicationServer replicationServer, 094 int rcvWindowSize) 095 { 096 super(session, queueSize, replicationServer, rcvWindowSize); 097 } 098 099 /** 100 * Order the peer DS server to change his status or close the connection 101 * according to the requested new generation id. 102 * @param newGenId The new generation id to take into account 103 * @throws IOException If IO error occurred. 104 */ 105 public void changeStatusForResetGenId(long newGenId) throws IOException 106 { 107 StatusMachineEvent event = getStatusMachineEvent(newGenId); 108 if (event == null) 109 { 110 return; 111 } 112 113 if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT 114 && status == ServerStatus.FULL_UPDATE_STATUS) 115 { 116 // Prevent useless error message (full update status cannot lead to bad gen status) 117 logger.info(NOTE_BAD_GEN_ID_IN_FULL_UPDATE, replicationServer.getServerId(), 118 getBaseDN(), serverId, generationId, newGenId); 119 return; 120 } 121 122 changeStatus(event, "for reset gen id"); 123 } 124 125 private StatusMachineEvent getStatusMachineEvent(long newGenId) 126 { 127 if (newGenId == -1) 128 { 129 // The generation id is being made invalid, let's put the DS 130 // into BAD_GEN_ID_STATUS 131 return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; 132 } 133 if (newGenId != generationId) 134 { 135 // This server has a bad generation id compared to new reference one, 136 // let's put it into BAD_GEN_ID_STATUS 137 return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; 138 } 139 140 if (status != ServerStatus.BAD_GEN_ID_STATUS) 141 { 142 if (logger.isTraceEnabled()) 143 { 144 logger.trace("In RS " + replicationServer.getServerId() 145 + ", DS " + getServerId() + " for baseDN=" + getBaseDN() 146 + " has already generation id " + newGenId 147 + " so no ChangeStatusMsg sent to him."); 148 } 149 return null; 150 } 151 152 // This server has the good new reference generation id. 153 // Close connection with him to force his reconnection: DS will 154 // reconnect in NORMAL_STATUS or DEGRADED_STATUS. 155 156 if (logger.isTraceEnabled()) 157 { 158 logger.trace("In RS " + replicationServer.getServerId() 159 + ", closing connection to DS " + getServerId() + " for baseDN=" + getBaseDN() 160 + " to force reconnection as new local generationId" 161 + " and remote one match and DS is in bad gen id: " + newGenId); 162 } 163 164 // Connection closure must not be done calling RSD.stopHandler() as it 165 // would rewait the RSD lock that we already must have entering this 166 // method. This would lead to a reentrant lock which we do not want. 167 // So simply close the session, this will make the hang up appear 168 // after the reader thread that took the RSD lock releases it. 169 if (session != null 170 // V4 protocol introduced a StopMsg to properly close the 171 // connection between servers 172 && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 173 { 174 try 175 { 176 session.publish(new StopMsg()); 177 } 178 catch (IOException ioe) 179 { 180 // Anyway, going to close session, so nothing to do 181 } 182 } 183 184 // NOT_CONNECTED_STATUS is the last one in RS session life: handler 185 // will soon disappear after this method call... 186 status = ServerStatus.NOT_CONNECTED_STATUS; 187 return null; 188 } 189 190 /** 191 * Change the status according to the event. 192 * 193 * @param event 194 * The event to be used for new status computation 195 * @return The new status of the DS 196 * @throws IOException 197 * When raised by the underlying session 198 */ 199 public ServerStatus changeStatus(StatusMachineEvent event) throws IOException 200 { 201 return changeStatus(event, "from status analyzer"); 202 } 203 204 private ServerStatus changeStatus(StatusMachineEvent event, String origin) 205 throws IOException 206 { 207 // Check state machine allows this new status (Sanity check) 208 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 209 if (newStatus == ServerStatus.INVALID_STATUS) 210 { 211 logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event); 212 // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice 213 // versa. We may be trying to change the status while another status has 214 // just been entered: e.g a full update has just been engaged. 215 // In that case, just ignore attempt to change the status 216 return newStatus; 217 } 218 219 // Send message requesting to change the DS status 220 ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS); 221 222 if (logger.isTraceEnabled()) 223 { 224 logger.trace("In RS " + replicationServer.getServerId() 225 + " Sending change status " + origin + " to " + getServerId() 226 + " for baseDN=" + getBaseDN() + ":\n" + csMsg); 227 } 228 229 session.publish(csMsg); 230 231 status = newStatus; 232 233 return newStatus; 234 } 235 236 @Override 237 public MonitorData getMonitorData() 238 { 239 MonitorData attributes = super.getMonitorData(); 240 241 // Add the specific DS ones 242 attributes.add("replica", serverURL); 243 attributes.add("connected-to", replicationServer.getMonitorInstanceName()); 244 245 ReplicationDomainMonitorData md = replicationServerDomain.getDomainMonitorData(); 246 247 // Oldest missing update 248 long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); 249 if (approxFirstMissingDate > 0) 250 { 251 attributes.add("approx-older-change-not-synchronized", new Date(approxFirstMissingDate)); 252 attributes.add("approx-older-change-not-synchronized-millis", approxFirstMissingDate); 253 } 254 255 attributes.add("missing-changes", md.getMissingChanges(serverId)); 256 // Replication delay 257 attributes.add("approximate-delay", md.getApproxDelay(serverId)); 258 259 ServerState state = md.getLDAPServerState(serverId); 260 if (state != null) 261 { 262 attributes.add("server-state", state.toStringSet()); 263 } 264 265 return attributes; 266 } 267 268 @Override 269 public String getMonitorInstanceName() 270 { 271 return "Connected directory server DS(" + serverId + ") " + serverURL 272 + ",cn=" + replicationServerDomain.getMonitorInstanceName(); 273 } 274 275 /** 276 * Gets the status of the connected DS. 277 * @return The status of the connected DS. 278 */ 279 @Override 280 public ServerStatus getStatus() 281 { 282 return status; 283 } 284 285 @Override 286 public boolean isDataServer() 287 { 288 return true; 289 } 290 291 /** 292 * Process message of a remote server changing his status. 293 * @param csMsg The message containing the new status 294 * @return The new server status of the DS 295 */ 296 public ServerStatus processNewStatus(ChangeStatusMsg csMsg) 297 { 298 // Get the status the DS just entered 299 ServerStatus reqStatus = csMsg.getNewStatus(); 300 // Translate new status to a state machine event 301 StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); 302 if (event == StatusMachineEvent.INVALID_EVENT) 303 { 304 logger.error(ERR_RS_INVALID_NEW_STATUS, reqStatus, getBaseDN(), serverId); 305 return ServerStatus.INVALID_STATUS; 306 } 307 308 // Check state machine allows this new status 309 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 310 if (newStatus == ServerStatus.INVALID_STATUS) 311 { 312 logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event); 313 return ServerStatus.INVALID_STATUS; 314 } 315 316 status = newStatus; 317 return status; 318 } 319 320 /** 321 * Processes a start message received from a remote data server. 322 * @param serverStartMsg The provided start message received. 323 * @return flag specifying whether the remote server requests encryption. 324 * @throws DirectoryException raised when an error occurs. 325 */ 326 public boolean processStartFromRemote(ServerStartMsg serverStartMsg) 327 throws DirectoryException 328 { 329 session 330 .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion())); 331 tmpGenerationId = serverStartMsg.getGenerationId(); 332 serverId = serverStartMsg.getServerId(); 333 serverURL = serverStartMsg.getServerURL(); 334 groupId = serverStartMsg.getGroupId(); 335 heartbeatInterval = serverStartMsg.getHeartbeatInterval(); 336 337 // generic stuff 338 setBaseDNAndDomain(serverStartMsg.getBaseDN(), true); 339 setInitialServerState(serverStartMsg.getServerState()); 340 setSendWindowSize(serverStartMsg.getWindowSize()); 341 342 if (heartbeatInterval < 0) 343 { 344 heartbeatInterval = 0; 345 } 346 return serverStartMsg.getSSLEncryption(); 347 } 348 349 /** Send our own TopologyMsg to DS. */ 350 private TopologyMsg sendTopoToRemoteDS() throws IOException 351 { 352 TopologyMsg outTopoMsg = replicationServerDomain 353 .createTopologyMsgForDS(this.serverId); 354 sendTopoInfo(outTopoMsg); 355 return outTopoMsg; 356 } 357 358 /** 359 * Starts the handler from a remote ServerStart message received from 360 * the remote data server. 361 * @param inServerStartMsg The provided ServerStart message received. 362 */ 363 public void startFromRemoteDS(ServerStartMsg inServerStartMsg) 364 { 365 try 366 { 367 // initializations 368 localGenerationId = -1; 369 oldGenerationId = -100; 370 371 // processes the ServerStart message received 372 boolean sessionInitiatorSSLEncryption = 373 processStartFromRemote(inServerStartMsg); 374 375 /** 376 * Hack to be sure that if a server disconnects and reconnect, we 377 * let the reader thread see the closure and cleanup any reference 378 * to old connection. This must be done before taking the domain lock so 379 * that the reader thread has a chance to stop the handler. 380 * 381 * TODO: This hack should be removed and disconnection/reconnection 382 * properly dealt with. 383 */ 384 if (replicationServerDomain.getConnectedDSs() 385 .containsKey(inServerStartMsg.getServerId())) 386 { 387 try { 388 Thread.sleep(100); 389 } 390 catch(Exception e){ 391 abortStart(null); 392 return; 393 } 394 } 395 396 lockDomainNoTimeout(); 397 398 localGenerationId = replicationServerDomain.getGenerationId(); 399 oldGenerationId = localGenerationId; 400 401 if (replicationServerDomain.isAlreadyConnectedToDS(this)) 402 { 403 abortStart(null); 404 return; 405 } 406 407 try 408 { 409 StartMsg outStartMsg = sendStartToRemote(); 410 411 logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg); 412 413 // The session initiator decides whether to use SSL. 414 // Until here session is encrypted then it depends on the negotiation 415 if (!sessionInitiatorSSLEncryption) 416 { 417 session.stopEncryption(); 418 } 419 420 // wait and process StartSessionMsg from remote RS 421 StartSessionMsg inStartSessionMsg = 422 waitAndProcessStartSessionFromRemoteDS(); 423 if (inStartSessionMsg == null) 424 { 425 // DS wants to properly close the connection (DS sent a StopMsg) 426 logStopReceived(); 427 abortStart(null); 428 return; 429 } 430 431 // Send our own TopologyMsg to remote DS 432 TopologyMsg outTopoMsg = sendTopoToRemoteDS(); 433 434 logStartSessionHandshake(inStartSessionMsg, outTopoMsg); 435 } 436 catch (IOException e) 437 { 438 logger.traceException(e); 439 LocalizableMessage errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get( 440 inServerStartMsg.getServerId(), replicationServer.getServerId(), getExceptionMessage(e)); 441 abortStart(errMessage); 442 return; 443 } 444 catch (Exception e) 445 { 446 logger.traceException(e); 447 // We do not need to support DS V1 connection, we just accept RS V1 connection: 448 // We just trash the message, log the event for debug purpose and close the connection 449 abortStart(null); 450 return; 451 } 452 453 replicationServerDomain.register(this); 454 455 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_DS, getReplicationServerId(), getServerId(), 456 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 457 458 super.finalizeStart(); 459 } 460 catch(DirectoryException de) 461 { 462 logger.traceException(de); 463 abortStart(de.getMessageObject()); 464 } 465 catch(Exception e) 466 { 467 logger.traceException(e); 468 abortStart(null); 469 } 470 finally 471 { 472 releaseDomainLock(); 473 } 474 } 475 476 /** 477 * Sends a start message to the remote DS. 478 * 479 * @return The StartMsg sent. 480 * @throws IOException 481 * When an exception occurs. 482 */ 483 private StartMsg sendStartToRemote() throws IOException 484 { 485 final StartMsg startMsg; 486 487 // Before V4 protocol, we sent a ReplServerStartMsg 488 if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) 489 { 490 // Peer DS uses protocol < V4 : send it a ReplServerStartMsg 491 startMsg = createReplServerStartMsg(); 492 } 493 else 494 { 495 // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg 496 startMsg = new ReplServerStartDSMsg(getReplicationServerId(), 497 getReplicationServerURL(), getBaseDN(), maxRcvWindow, 498 replicationServerDomain.getLatestServerState(), 499 localGenerationId, sslEncryption, getLocalGroupId(), 500 replicationServer.getDegradedStatusThreshold(), 501 replicationServer.getWeight(), 502 replicationServerDomain.getConnectedDSs().size()); 503 } 504 505 send(startMsg); 506 return startMsg; 507 } 508 509 /** 510 * Creates a DSInfo structure representing this remote DS. 511 * @return The DSInfo structure representing this remote DS 512 */ 513 public DSInfo toDSInfo() 514 { 515 return new DSInfo(serverId, serverURL, getReplicationServerId(), 516 generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, 517 refUrls, eclIncludes, eclIncludesForDeletes, getProtocolVersion()); 518 } 519 520 @Override 521 public String toString() 522 { 523 if (serverId != 0) 524 { 525 return "Replica DS(" + serverId + ") for domain \"" 526 + replicationServerDomain.getBaseDN() + "\""; 527 } 528 return "Unknown server"; 529 } 530 531 /** 532 * Wait receiving the StartSessionMsg from the remote DS and process it, or 533 * receiving a StopMsg to properly stop the handshake procedure. 534 * @return the startSessionMsg received or null DS sent a stop message to 535 * not finish the handshake. 536 * @throws Exception 537 */ 538 private StartSessionMsg waitAndProcessStartSessionFromRemoteDS() 539 throws Exception 540 { 541 ReplicationMsg msg = session.receive(); 542 543 if (msg instanceof StopMsg) 544 { 545 // DS wants to stop handshake (was just for handshake phase one for RS 546 // choice). Return null to make the session be terminated. 547 return null; 548 } else if (!(msg instanceof StartSessionMsg)) 549 { 550 LocalizableMessage message = LocalizableMessage.raw( 551 "Protocol error: StartSessionMsg required." + msg + " received."); 552 abortStart(message); 553 return null; 554 } 555 556 // Process StartSessionMsg sent by remote DS 557 StartSessionMsg startSessionMsg = (StartSessionMsg) msg; 558 559 this.status = startSessionMsg.getStatus(); 560 // Sanity check: is it a valid initial status? 561 if (!isValidInitialStatus(this.status)) 562 { 563 throw new DirectoryException(ResultCode.OTHER, 564 ERR_RS_INVALID_INIT_STATUS.get( this.status, getBaseDN(), serverId)); 565 } 566 567 this.refUrls = startSessionMsg.getReferralsURLs(); 568 this.assuredFlag = startSessionMsg.isAssured(); 569 this.assuredMode = startSessionMsg.getAssuredMode(); 570 this.safeDataLevel = startSessionMsg.getSafeDataLevel(); 571 this.eclIncludes = startSessionMsg.getEclIncludes(); 572 this.eclIncludesForDeletes = startSessionMsg.getEclIncludesForDeletes(); 573 574 /* 575 * If we have already a generationID set for the domain 576 * then 577 * if the connecting replica has not the same 578 * then it is degraded locally and notified by an error message 579 * else 580 * we set the generationID from the one received 581 * (unsaved yet on disk . will be set with the 1rst change 582 * received) 583 */ 584 generationId = tmpGenerationId; 585 if (localGenerationId > 0) 586 { 587 if (generationId != localGenerationId) 588 { 589 logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(), 590 generationId, getBaseDN(), getReplicationServerId(), localGenerationId); 591 } 592 } 593 else 594 { 595 // We are an empty ReplicationServer 596 if (generationId > 0 && !getServerState().isEmpty()) 597 { 598 // If the LDAP server has already sent changes 599 // it is not expected to connect to an empty RS 600 logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(), 601 generationId, getBaseDN(), getReplicationServerId(), localGenerationId); 602 } 603 else 604 { 605 // The local RS is not initialized - take the one received 606 // WARNING: Must be done before computing topo message to send 607 // to peer server as topo message must embed valid generation id 608 // for our server 609 oldGenerationId = replicationServerDomain.changeGenerationId(generationId); 610 } 611 } 612 return startSessionMsg; 613 } 614 615 /** 616 * Process message of a remote server changing his status. 617 * @param csMsg The message containing the new status 618 */ 619 public void receiveNewStatus(ChangeStatusMsg csMsg) 620 { 621 replicationServerDomain.processNewStatus(this, csMsg); 622 } 623}