001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2007-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.protocol; 018 019import java.util.*; 020import java.util.zip.DataFormatException; 021 022import org.opends.server.replication.common.AssuredMode; 023import org.opends.server.replication.common.DSInfo; 024import org.opends.server.replication.common.RSInfo; 025import org.opends.server.replication.common.ServerStatus; 026 027import static org.opends.server.replication.protocol.ProtocolVersion.*; 028 029/** 030 * This class defines a message that is sent: 031 * - By a RS to the other RSs in the topology, containing: 032 * - the DSs directly connected to the RS in the DS infos 033 * - only this RS in the RS infos 034 * - By a RS to his connected DSs, containing every DSs and RSs he knows. 035 * In that case the message contains: 036 * - every DSs the RS knows except the destinator DS in the DS infos 037 * - every connected RSs (including the sending RS) in the RS infos 038 * 039 * Exchanging these messages allows to have each RS or DS take 040 * appropriate decisions according to the current topology: 041 * - a RS can route a message to a DS 042 * - a DS can decide towards which peer DS send referrals 043 * ... 044 */ 045public class TopologyMsg extends ReplicationMsg 046{ 047 /** Information for the DSs (aka replicas) known in the topology. */ 048 private final Map<Integer, DSInfo> replicaInfos; 049 /** Information for the RSs known in the topology. */ 050 private final List<RSInfo> rsInfos; 051 052 /** 053 * Creates a new changelogInfo message from its encoded form. 054 * 055 * @param in The byte array containing the encoded form of the message. 056 * @param version The protocol version to use to decode the msg. 057 * @throws java.util.zip.DataFormatException If the byte array does not 058 * contain a valid encoded form of the message. 059 */ 060 TopologyMsg(byte[] in, short version) throws DataFormatException 061 { 062 final ByteArrayScanner scanner = new ByteArrayScanner(in); 063 final byte msgType = scanner.nextByte(); 064 if (msgType != MSG_TYPE_TOPOLOGY) 065 { 066 throw new DataFormatException("Input is not a valid " 067 + getClass().getCanonicalName()); 068 } 069 070 // Read the DS info entries, first read number of them 071 int nDsInfo = scanner.nextByte(); 072 final Map<Integer, DSInfo> replicaInfos = new HashMap<>(Math.max(0, nDsInfo)); 073 while (nDsInfo > 0 && !scanner.isEmpty()) 074 { 075 final DSInfo dsInfo = nextDSInfo(scanner, version); 076 replicaInfos.put(dsInfo.getDsId(), dsInfo); 077 nDsInfo--; 078 } 079 080 // Read the RS info entries 081 int nRsInfo = scanner.nextByte(); 082 final List<RSInfo> rsInfos = new ArrayList<>(Math.max(0, nRsInfo)); 083 while (nRsInfo > 0 && !scanner.isEmpty()) 084 { 085 rsInfos.add(nextRSInfo(scanner, version)); 086 nRsInfo--; 087 } 088 089 this.replicaInfos = Collections.unmodifiableMap(replicaInfos); 090 this.rsInfos = Collections.unmodifiableList(rsInfos); 091 } 092 093 private DSInfo nextDSInfo(ByteArrayScanner scanner, short version) 094 throws DataFormatException 095 { 096 final int dsId = scanner.nextIntUTF8(); 097 final String dsUrl = 098 version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString(); 099 final int rsId = scanner.nextIntUTF8(); 100 final long generationId = scanner.nextLongUTF8(); 101 final ServerStatus status = ServerStatus.valueOf(scanner.nextByte()); 102 final boolean assuredFlag = scanner.nextBoolean(); 103 final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte()); 104 final byte safeDataLevel = scanner.nextByte(); 105 final byte groupId = scanner.nextByte(); 106 107 final List<String> refUrls = new ArrayList<>(); 108 scanner.nextStrings(refUrls); 109 110 final Set<String> attrs = new HashSet<>(); 111 final Set<String> delattrs = new HashSet<>(); 112 short protocolVersion = -1; 113 if (version >= REPLICATION_PROTOCOL_V4) 114 { 115 scanner.nextStrings(attrs); 116 117 if (version >= REPLICATION_PROTOCOL_V5) 118 { 119 scanner.nextStrings(delattrs); 120 } 121 else 122 { 123 // Default to using the same set of attributes for deletes. 124 delattrs.addAll(attrs); 125 } 126 127 protocolVersion = scanner.nextByte(); 128 } 129 130 return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag, 131 assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs, 132 protocolVersion); 133 } 134 135 private RSInfo nextRSInfo(ByteArrayScanner scanner, short version) 136 throws DataFormatException 137 { 138 final int rsId = scanner.nextIntUTF8(); 139 final long generationId = scanner.nextLongUTF8(); 140 final byte groupId = scanner.nextByte(); 141 142 int weight = 1; 143 String serverUrl = null; 144 if (version >= REPLICATION_PROTOCOL_V4) 145 { 146 serverUrl = scanner.nextString(); 147 weight = scanner.nextIntUTF8(); 148 } 149 150 return new RSInfo(rsId, serverUrl, generationId, groupId, weight); 151 } 152 153 /** 154 * Creates a new message of the currently connected servers. 155 * 156 * @param dsInfos The collection of currently connected DS servers ID. 157 * @param rsInfos The list of currently connected RS servers ID. 158 */ 159 public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos) 160 { 161 if (dsInfos == null || dsInfos.isEmpty()) 162 { 163 this.replicaInfos = Collections.emptyMap(); 164 } 165 else 166 { 167 Map<Integer, DSInfo> replicas = new HashMap<>(); 168 for (DSInfo dsInfo : dsInfos) 169 { 170 replicas.put(dsInfo.getDsId(), dsInfo); 171 } 172 this.replicaInfos = Collections.unmodifiableMap(replicas); 173 } 174 175 if (rsInfos == null || rsInfos.isEmpty()) 176 { 177 this.rsInfos = Collections.emptyList(); 178 } 179 else 180 { 181 this.rsInfos = 182 Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos)); 183 } 184 } 185 186 // ============ 187 // Msg encoding 188 // ============ 189 190 /** {@inheritDoc} */ 191 @Override 192 public byte[] getBytes(short version) 193 { 194 /** 195 * Message has the following form: 196 * <pdu type><number of following DSInfo entries>[<DSInfo>]* 197 * <number of following RSInfo entries>[<RSInfo>]* 198 */ 199 final ByteArrayBuilder builder = new ByteArrayBuilder(); 200 builder.appendByte(MSG_TYPE_TOPOLOGY); 201 202 // Put DS infos 203 builder.appendByte(replicaInfos.size()); 204 for (DSInfo dsInfo : replicaInfos.values()) 205 { 206 builder.appendIntUTF8(dsInfo.getDsId()); 207 if (version >= REPLICATION_PROTOCOL_V6) 208 { 209 builder.appendString(dsInfo.getDsUrl()); 210 } 211 builder.appendIntUTF8(dsInfo.getRsId()); 212 builder.appendLongUTF8(dsInfo.getGenerationId()); 213 builder.appendByte(dsInfo.getStatus().getValue()); 214 builder.appendBoolean(dsInfo.isAssured()); 215 builder.appendByte(dsInfo.getAssuredMode().getValue()); 216 builder.appendByte(dsInfo.getSafeDataLevel()); 217 builder.appendByte(dsInfo.getGroupId()); 218 219 builder.appendStrings(dsInfo.getRefUrls()); 220 221 if (version >= REPLICATION_PROTOCOL_V4) 222 { 223 builder.appendStrings(dsInfo.getEclIncludes()); 224 if (version >= REPLICATION_PROTOCOL_V5) 225 { 226 builder.appendStrings(dsInfo.getEclIncludesForDeletes()); 227 } 228 builder.appendByte(dsInfo.getProtocolVersion()); 229 } 230 } 231 232 // Put RS infos 233 builder.appendByte(rsInfos.size()); 234 for (RSInfo rsInfo : rsInfos) 235 { 236 builder.appendIntUTF8(rsInfo.getId()); 237 builder.appendLongUTF8(rsInfo.getGenerationId()); 238 builder.appendByte(rsInfo.getGroupId()); 239 240 if (version >= REPLICATION_PROTOCOL_V4) 241 { 242 builder.appendString(rsInfo.getServerUrl()); 243 builder.appendIntUTF8(rsInfo.getWeight()); 244 } 245 } 246 247 return builder.toByteArray(); 248 } 249 250 /** {@inheritDoc} */ 251 @Override 252 public String toString() 253 { 254 String dsStr = ""; 255 for (DSInfo dsInfo : replicaInfos.values()) 256 { 257 dsStr += dsInfo + "\n----------------------------\n"; 258 } 259 260 String rsStr = ""; 261 for (RSInfo rsInfo : rsInfos) 262 { 263 rsStr += rsInfo + "\n----------------------------\n"; 264 } 265 266 return "TopologyMsg content:" 267 + "\n----------------------------" 268 + "\nCONNECTED DS SERVERS:" 269 + "\n--------------------\n" 270 + dsStr 271 + "CONNECTED RS SERVERS:" 272 + "\n--------------------\n" 273 + rsStr 274 + ("".equals(rsStr) ? "----------------------------\n" : ""); 275 } 276 277 /** 278 * Get the DS infos. 279 * 280 * @return The DS infos 281 */ 282 public Map<Integer, DSInfo> getReplicaInfos() 283 { 284 return replicaInfos; 285 } 286 287 /** 288 * Get the RS infos. 289 * 290 * @return The RS infos 291 */ 292 public List<RSInfo> getRsInfos() 293 { 294 return rsInfos; 295 } 296}