001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import java.net.SocketException; 020 021import org.forgerock.i18n.LocalizableMessage; 022import org.opends.server.api.DirectoryThread; 023import org.forgerock.i18n.slf4j.LocalizedLogger; 024import org.opends.server.replication.common.ServerStatus; 025import org.opends.server.replication.protocol.*; 026 027import static org.opends.messages.ReplicationMessages.*; 028import static org.opends.server.replication.common.ServerStatus.*; 029import static org.opends.server.util.StaticUtils.*; 030 031/** 032 * This class implement the part of the replicationServer that is reading 033 * the connection from the LDAP servers to get all the updates that 034 * were done on this replica and forward them to other servers. 035 * 036 * A single thread is dedicated to this work. 037 * It waits in a blocking mode on the connection from the LDAP server 038 * and upon receiving an update puts in into the replicationServer cache 039 * from where the other servers will grab it. 040 */ 041public class ServerReader extends DirectoryThread 042{ 043 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 044 private final Session session; 045 private final ServerHandler handler; 046 047 /** 048 * Constructor for the LDAP server reader part of the replicationServer. 049 * 050 * @param session 051 * The Session from which to read the data. 052 * @param handler 053 * The server handler for this server reader. 054 */ 055 public ServerReader(Session session, ServerHandler handler) 056 { 057 super("Replication server RS(" + handler.getReplicationServerId() 058 + ") reading from " + handler + " at " 059 + session.getReadableRemoteAddress()); 060 this.session = session; 061 this.handler = handler; 062 } 063 064 /** 065 * Create a loop that reads changes and hands them off to be processed. 066 */ 067 @Override 068 public void run() 069 { 070 if (logger.isTraceEnabled()) 071 { 072 logger.trace(getName() + " starting"); 073 } 074 /* 075 * wait on input stream 076 * grab all incoming messages and publish them to the 077 * replicationServerDomain 078 */ 079 LocalizableMessage errMessage = null; 080 try 081 { 082 while (true) 083 { 084 try 085 { 086 final ReplicationMsg msg = session.receive(); 087 088 if (logger.isTraceEnabled()) 089 { 090 logger.trace("In " + getName() + " receives " + msg); 091 } 092 093 if (msg instanceof AckMsg) 094 { 095 handler.checkWindow(); 096 handler.processAck((AckMsg) msg); 097 } 098 else if (msg instanceof UpdateMsg) 099 { 100 final UpdateMsg updateMsg = (UpdateMsg) msg; 101 if (!isUpdateMsgFiltered(updateMsg)) 102 { 103 handler.put(updateMsg); 104 } 105 } 106 else if (msg instanceof WindowMsg) 107 { 108 handler.updateWindow((WindowMsg) msg); 109 } 110 else if (msg instanceof MonitorRequestMsg) 111 { 112 handler.processMonitorRequestMsg((MonitorRequestMsg) msg); 113 } 114 else if (msg instanceof MonitorMsg) 115 { 116 handler.processMonitorMsg((MonitorMsg) msg); 117 } 118 else if (msg instanceof RoutableMsg) 119 { 120 /* 121 * Note that we handle monitor messages separately since they in 122 * fact never need "routing" and are instead sent directly between 123 * connected peers. Doing so allows us to more clearly decouple 124 * write IO from the reader thread (see OPENDJ-1354). 125 */ 126 handler.process((RoutableMsg) msg); 127 } 128 else if (msg instanceof ResetGenerationIdMsg) 129 { 130 handler.processResetGenId((ResetGenerationIdMsg) msg); 131 } 132 else if (msg instanceof WindowProbeMsg) 133 { 134 handler.replyToWindowProbe(); 135 } 136 else if (msg instanceof TopologyMsg) 137 { 138 ReplicationServerHandler rsh = (ReplicationServerHandler) handler; 139 rsh.receiveTopoInfoFromRS((TopologyMsg) msg); 140 } 141 else if (msg instanceof ChangeStatusMsg) 142 { 143 ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; 144 try 145 { 146 DataServerHandler dsh = (DataServerHandler) handler; 147 dsh.receiveNewStatus(csMsg); 148 } 149 catch (Exception e) 150 { 151 errMessage = ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get( 152 handler.getBaseDN(), handler.getServerId(), csMsg); 153 logger.error(errMessage); 154 } 155 } 156 else if (msg instanceof ChangeTimeHeartbeatMsg) 157 { 158 handler.process((ChangeTimeHeartbeatMsg) msg); 159 } 160 else if (msg instanceof StopMsg) 161 { 162 /* 163 * Peer server is properly disconnecting: go out of here to properly 164 * close the server handler going to finally block. 165 */ 166 if (logger.isTraceEnabled()) 167 { 168 logger.trace(handler 169 + " has properly disconnected from this replication server " 170 + handler.getReplicationServerId()); 171 } 172 return; 173 } 174 else if (msg == null) 175 { 176 /* 177 * The remote server has sent an unknown message, close the 178 * connection. 179 */ 180 errMessage = NOTE_READER_NULL_MSG.get(handler); 181 logger.info(errMessage); 182 return; 183 } 184 } 185 catch (NotSupportedOldVersionPDUException e) 186 { 187 /* 188 * Received a V1 PDU we do not need to support: we just trash the 189 * message and log the event for debug purpose, then continue 190 * receiving messages. 191 */ 192 logException(e); 193 } 194 } 195 } 196 catch (SocketException e) 197 { 198 /* 199 * The connection has been broken Log a message and exit from this loop So 200 * that this handler is stopped. 201 */ 202 logException(e); 203 if (!handler.shuttingDown()) 204 { 205 errMessage = handler.getBadlyDisconnectedErrorMessage(); 206 logger.error(errMessage); 207 } 208 } 209 catch (Exception e) 210 { 211 /* 212 * The remote server has sent an unknown message, close the connection. 213 */ 214 errMessage = NOTE_READER_EXCEPTION.get(handler, 215 stackTraceToSingleLineString(e)); 216 logger.info(errMessage); 217 } 218 finally 219 { 220 /* 221 * The thread only exits the loop above if some error condition happen. 222 * Attempt to close the socket and stop the server handler. 223 */ 224 if (logger.isTraceEnabled()) 225 { 226 logger.trace("In " + getName() + " closing the session"); 227 } 228 session.close(); 229 handler.doStop(); 230 if (logger.isTraceEnabled()) 231 { 232 logger.trace(getName() + " stopped: " + errMessage); 233 } 234 } 235 } 236 237 /** 238 * Returns whether the update message is filtered in one of those cases: 239 * <ul> 240 * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li> 241 * <li>Ignore updates from RS with bad gen id</li> 242 * </ul> 243 */ 244 private boolean isUpdateMsgFiltered(UpdateMsg updateMsg) 245 { 246 if (handler.isDataServer()) 247 { 248 /** 249 * Ignore updates from DS in bad BAD_GENID_STATUS or 250 * FULL_UPDATE_STATUS 251 * 252 * The RSD lock should not be taken here as it is acceptable to 253 * have a delay between the time the server has a wrong status and 254 * the fact we detect it: the updates that succeed to pass during 255 * this time will have no impact on remote server. But it is 256 * interesting to not saturate uselessly the network if the 257 * updates are not necessary so this check to stop sending updates 258 * is interesting anyway. Not taking the RSD lock allows to have 259 * better performances in normal mode (most of the time). 260 */ 261 final ServerStatus dsStatus = handler.getStatus(); 262 if (dsStatus == BAD_GEN_ID_STATUS) 263 { 264 logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID, 265 handler.getReplicationServerId(), updateMsg.getCSN(), 266 handler.getBaseDN(), handler.getServerId(), 267 session.getReadableRemoteAddress(), 268 handler.getGenerationId(), handler.getReferenceGenId()); 269 return true; 270 } 271 else if (dsStatus == FULL_UPDATE_STATUS) 272 { 273 logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP, 274 handler.getReplicationServerId(), updateMsg.getCSN(), 275 handler.getBaseDN(), handler.getServerId(), 276 session.getReadableRemoteAddress()); 277 return true; 278 } 279 } 280 else 281 { 282 /** 283 * Ignore updates from RS with bad gen id 284 * (no system managed status for a RS) 285 */ 286 long referenceGenerationId = handler.getReferenceGenId(); 287 if (referenceGenerationId > 0 288 && referenceGenerationId != handler.getGenerationId()) 289 { 290 logger.error(WARN_IGNORING_UPDATE_FROM_RS, 291 handler.getReplicationServerId(), updateMsg.getCSN(), 292 handler.getBaseDN(), handler.getServerId(), 293 session.getReadableRemoteAddress(), 294 handler.getGenerationId(), referenceGenerationId); 295 return true; 296 } 297 } 298 return false; 299 } 300 301 private void logException(Exception e) 302 { 303 if (logger.isTraceEnabled()) 304 { 305 logger.trace("In " + getName() + " " + stackTraceToSingleLineString(e)); 306 } 307 } 308}