001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2009 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.protocol; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.zip.DataFormatException; 025 026import org.forgerock.opendj.io.ASN1Reader; 027import org.forgerock.opendj.io.ASN1Writer; 028import org.opends.server.replication.common.CSN; 029import org.opends.server.replication.common.ServerState; 030 031/** 032 * This message is part of the replication protocol. 033 * RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring 034 * information. 035 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a 036 * MonitorMsg. 037 */ 038public class MonitorMsg extends ReplicationMsg 039{ 040 /** 041 * The destination server or servers of this message. 042 */ 043 private final int destination; 044 045 /** 046 * The serverID of the server that sends this message. 047 */ 048 private final int senderID; 049 050 051 052 /** 053 * Data structure to manage the state and the approximation of the data of the 054 * first missing change for each LDAP server connected to a Replication 055 * Server. 056 */ 057 private static class ServerData 058 { 059 private ServerState state; 060 private long approxFirstMissingDate; 061 } 062 063 /** 064 * Data structure to manage the state of this replication server 065 * and the state information for the servers connected to it. 066 */ 067 private static class SubTopoMonitorData 068 { 069 /** This replication server DbState. */ 070 private ServerState replServerDbState; 071 /** The data related to the LDAP servers connected to this RS. */ 072 private final Map<Integer, ServerData> ldapStates = new HashMap<>(); 073 /** The data related to the RS servers connected to this RS. */ 074 private final Map<Integer, ServerData> rsStates = new HashMap<>(); 075 } 076 077 private final SubTopoMonitorData data = new SubTopoMonitorData(); 078 079 /** 080 * Creates a new MonitorMsg. 081 * 082 * @param sender The sender of this message. 083 * @param destination The destination of this message. 084 */ 085 public MonitorMsg(int sender, int destination) 086 { 087 this.senderID = sender; 088 this.destination = destination; 089 } 090 091 /** 092 * Sets the state of the replication server. 093 * @param state The state. 094 */ 095 public void setReplServerDbState(ServerState state) 096 { 097 data.replServerDbState = state; 098 } 099 100 /** 101 * Sets the information of an LDAP server. 102 * @param serverId The serverID. 103 * @param state The server state. 104 * @param approxFirstMissingDate The approximation of the date 105 * of the older missing change. null when none. 106 * @param isLDAPServer Specifies whether the server is a DS or a RS 107 */ 108 public void setServerState(int serverId, ServerState state, 109 long approxFirstMissingDate, boolean isLDAPServer) 110 { 111 final ServerData sd = new ServerData(); 112 sd.state = state; 113 sd.approxFirstMissingDate = approxFirstMissingDate; 114 if (isLDAPServer) 115 { 116 data.ldapStates.put(serverId, sd); 117 } 118 else 119 { 120 data.rsStates.put(serverId, sd); 121 } 122 } 123 124 /** 125 * Get the server state for the LDAP server with the provided serverId. 126 * @param serverId The provided serverId. 127 * @return The state. 128 */ 129 public ServerState getLDAPServerState(int serverId) 130 { 131 return data.ldapStates.get(serverId).state; 132 } 133 134 /** 135 * Get the server state for the RS server with the provided serverId. 136 * @param serverId The provided serverId. 137 * @return The state. 138 */ 139 public ServerState getRSServerState(int serverId) 140 { 141 return data.rsStates.get(serverId).state; 142 } 143 144 /** 145 * Get the approximation of the date of the older missing change for the 146 * LDAP Server with the provided server Id. 147 * @param serverId The provided serverId. 148 * @return The approximated state. 149 */ 150 public long getLDAPApproxFirstMissingDate(int serverId) 151 { 152 return data.ldapStates.get(serverId).approxFirstMissingDate; 153 } 154 155 /** 156 * Get the approximation of the date of the older missing change for the 157 * RS Server with the provided server Id. 158 * @param serverId The provided serverId. 159 * @return The approximated state. 160 */ 161 public long getRSApproxFirstMissingDate(int serverId) 162 { 163 return data.rsStates.get(serverId).approxFirstMissingDate; 164 } 165 166 /** 167 * Creates a new EntryMessage from its encoded form. 168 * 169 * @param in The byte array containing the encoded form of the message. 170 * @param version The version of the protocol to use to decode the msg. 171 * @throws DataFormatException If the byte array does not contain a valid 172 * encoded form of the ServerStartMessage. 173 */ 174 MonitorMsg(byte[] in, short version) throws DataFormatException 175 { 176 final ByteArrayScanner scanner = new ByteArrayScanner(in); 177 if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR) 178 { 179 throw new DataFormatException("input is not a valid " 180 + getClass().getCanonicalName()); 181 } 182 183 if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1) 184 { 185 this.senderID = scanner.nextIntUTF8(); 186 this.destination = scanner.nextIntUTF8(); 187 } 188 else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3) 189 { 190 this.senderID = scanner.nextShort(); 191 this.destination = scanner.nextShort(); 192 } 193 else 194 { 195 this.senderID = scanner.nextInt(); 196 this.destination = scanner.nextInt(); 197 } 198 199 ASN1Reader asn1Reader = scanner.getASN1Reader(); 200 try 201 { 202 asn1Reader.readStartSequence(); 203 // loop on the servers 204 while(asn1Reader.hasNextElement()) 205 { 206 ServerState newState = new ServerState(); 207 int serverId = 0; 208 long outime = 0; 209 boolean isLDAPServer = false; 210 211 asn1Reader.readStartSequence(); 212 // loop on the list of CSN of the state 213 while(asn1Reader.hasNextElement()) 214 { 215 CSN csn; 216 if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) 217 { 218 csn = CSN.valueOf(asn1Reader.readOctetString()); 219 } 220 else 221 { 222 csn = CSN.valueOf(asn1Reader.readOctetStringAsString()); 223 } 224 225 if (data.replServerDbState != null && serverId == 0) 226 { 227 // we are on the first CSN that is a fake CSN to store the serverId 228 // and the older update time 229 serverId = csn.getServerId(); 230 outime = csn.getTime(); 231 isLDAPServer = csn.getSeqnum() > 0; 232 } 233 else 234 { 235 // we are on a normal CSN 236 newState.update(csn); 237 } 238 } 239 asn1Reader.readEndSequence(); 240 241 if (data.replServerDbState == null) 242 { 243 // the first state is the replication state 244 data.replServerDbState = newState; 245 } 246 else 247 { 248 // the next states are the server states 249 setServerState(serverId, newState, outime, isLDAPServer); 250 } 251 } 252 asn1Reader.readEndSequence(); 253 } catch(Exception e) 254 { /* do nothing */ 255 } 256 } 257 258 /** {@inheritDoc} */ 259 @Override 260 public byte[] getBytes(short protocolVersion) 261 { 262 try 263 { 264 final ByteArrayBuilder builder = new ByteArrayBuilder(); 265 builder.appendByte(MSG_TYPE_REPL_SERVER_MONITOR); 266 append(builder, senderID, protocolVersion); 267 append(builder, destination, protocolVersion); 268 269 /* Put the serverStates ... */ 270 ASN1Writer writer = builder.getASN1Writer(); 271 writer.writeStartSequence(); 272 { 273 /* first put the Replication Server state */ 274 writer.writeStartSequence(); 275 { 276 data.replServerDbState.writeTo(writer, protocolVersion); 277 } 278 writer.writeEndSequence(); 279 280 // then the DS + RS server data 281 writeServerStates(protocolVersion, writer, false /* DS */); 282 writeServerStates(protocolVersion, writer, true /* RS */); 283 } 284 writer.writeEndSequence(); 285 286 if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) 287 { 288 // legacy coding mistake 289 builder.appendByte(0); 290 } 291 return builder.toByteArray(); 292 } 293 catch (Exception e) 294 { 295 return null; 296 } 297 } 298 299 private void append(final ByteArrayBuilder builder, int data, 300 short protocolVersion) 301 { 302 if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) 303 { 304 builder.appendIntUTF8(data); 305 } 306 else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3) 307 { 308 builder.appendShort(data); 309 } 310 else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4 311 { 312 builder.appendInt(data); 313 } 314 } 315 316 private void writeServerStates(short protocolVersion, ASN1Writer writer, 317 boolean writeRSStates) throws IOException 318 { 319 final Map<Integer, ServerData> servers = 320 writeRSStates ? data.rsStates : data.ldapStates; 321 final int seqNum = writeRSStates ? 0 : 1; 322 for (Map.Entry<Integer, ServerData> server : servers.entrySet()) 323 { 324 writer.writeStartSequence(); 325 { 326 /* 327 * A fake CSN helps storing the LDAP server ID. The sequence number will 328 * be used to differentiate between an LDAP server (1) or an RS (0). 329 */ 330 CSN csn = new CSN( 331 server.getValue().approxFirstMissingDate, seqNum, 332 server.getKey()); 333 if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) 334 { 335 writer.writeOctetString(csn.toByteString()); 336 } 337 else 338 { 339 writer.writeOctetString(csn.toString()); 340 } 341 342 // the CSNs that make the state 343 server.getValue().state.writeTo(writer, protocolVersion); 344 } 345 writer.writeEndSequence(); 346 } 347 } 348 349 /** 350 * Get the state of the replication server that sent this message. 351 * @return The state. 352 */ 353 public ServerState getReplServerDbState() 354 { 355 return data.replServerDbState; 356 } 357 358 /** 359 * Returns an iterator on the serverId of the connected LDAP servers. 360 * @return The iterator. 361 */ 362 public Iterator<Integer> ldapIterator() 363 { 364 return data.ldapStates.keySet().iterator(); 365 } 366 367 /** 368 * Returns an iterator on the serverId of the connected RS servers. 369 * @return The iterator. 370 */ 371 public Iterator<Integer> rsIterator() 372 { 373 return data.rsStates.keySet().iterator(); 374 } 375 376 /** 377 * Get the destination. 378 * 379 * @return the destination 380 */ 381 public int getDestination() 382 { 383 return destination; 384 } 385 386 /** 387 * Get the server ID of the server that sent this message. 388 * 389 * @return the server id 390 */ 391 public int getSenderID() 392 { 393 return senderID; 394 } 395 396 /** {@inheritDoc} */ 397 @Override 398 public String toString() 399 { 400 final StringBuilder stateS = new StringBuilder("\nRState:["); 401 stateS.append(data.replServerDbState); 402 stateS.append("]"); 403 404 stateS.append("\nLDAPStates:["); 405 for (Entry<Integer, ServerData> entry : data.ldapStates.entrySet()) 406 { 407 ServerData sd = entry.getValue(); 408 stateS.append("\n[LSstate(").append(entry.getKey()).append(")=") 409 .append(sd.state).append("]").append(" afmd=") 410 .append(sd.approxFirstMissingDate).append("]"); 411 } 412 413 stateS.append("\nRSStates:["); 414 for (Entry<Integer, ServerData> entry : data.rsStates.entrySet()) 415 { 416 final ServerData sd = entry.getValue(); 417 stateS.append("\n[RSState(").append(entry.getKey()).append(")=") 418 .append(sd.state).append("]").append(" afmd=") 419 .append(sd.approxFirstMissingDate).append("]"); 420 } 421 return getClass().getCanonicalName() + 422 "[ sender=" + this.senderID + 423 " destination=" + this.destination + 424 " data=[" + stateS + "]" + 425 "]"; 426 } 427}