001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2008-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.admin.ads; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.LinkedHashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import javax.naming.NameNotFoundException; 031import javax.naming.NamingEnumeration; 032import javax.naming.NamingException; 033import javax.naming.directory.SearchControls; 034import javax.naming.directory.SearchResult; 035import javax.naming.ldap.LdapName; 036 037import org.forgerock.i18n.LocalizableMessage; 038import org.forgerock.i18n.slf4j.LocalizedLogger; 039import org.opends.admin.ads.ADSContext.ServerProperty; 040import org.opends.admin.ads.util.ApplicationTrustManager; 041import org.opends.admin.ads.util.ConnectionUtils; 042import org.opends.admin.ads.util.ConnectionWrapper; 043import org.opends.admin.ads.util.PreferredConnection; 044import org.opends.admin.ads.util.ServerLoader; 045import org.opends.quicksetup.util.Utils; 046 047import static com.forgerock.opendj.cli.Utils.*; 048 049import static org.opends.messages.QuickSetupMessages.*; 050 051/** 052 * This class allows to read the configuration of the different servers that are 053 * registered in a given ADS server. It provides a read only view of the 054 * configuration of the servers and of the replication topologies that might be 055 * configured between them. 056 */ 057public class TopologyCache 058{ 059 private final ADSContext adsContext; 060 private final ApplicationTrustManager trustManager; 061 private final int timeout; 062 private final String bindDN; 063 private final String bindPwd; 064 private final Set<ServerDescriptor> servers = new HashSet<>(); 065 private final Set<SuffixDescriptor> suffixes = new HashSet<>(); 066 private final Set<PreferredConnection> preferredConnections = new LinkedHashSet<>(); 067 private final TopologyCacheFilter filter = new TopologyCacheFilter(); 068 private static final int MULTITHREAD_TIMEOUT = 90 * 1000; 069 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 070 071 /** 072 * Constructor of the TopologyCache. 073 * 074 * @param adsContext the adsContext to the ADS registry. 075 * @param trustManager the ApplicationTrustManager that must be used to trust 076 * certificates when we create connections to the registered servers to read 077 * their configuration. 078 * @param timeout the timeout to establish the connection in milliseconds. 079 * Use {@code 0} to express no timeout. 080 */ 081 public TopologyCache(ADSContext adsContext, 082 ApplicationTrustManager trustManager, 083 int timeout) 084 { 085 this.adsContext = adsContext; 086 this.trustManager = trustManager; 087 this.timeout = timeout; 088 bindDN = ConnectionUtils.getBindDN(adsContext.getDirContext()); 089 bindPwd = ConnectionUtils.getBindPassword(adsContext.getDirContext()); 090 } 091 092 /** 093 * Reads the configuration of the registered servers. 094 * 095 * @throws TopologyCacheException if there is an issue reading the 096 * configuration of the registered servers. 097 */ 098 public void reloadTopology() throws TopologyCacheException 099 { 100 suffixes.clear(); 101 servers.clear(); 102 try 103 { 104 Set<Map<ServerProperty, Object>> adsServers = 105 adsContext.readServerRegistry(); 106 107 Set<ServerLoader> threadSet = new HashSet<>(); 108 for (Map<ServerProperty, Object> serverProperties : adsServers) 109 { 110 ServerLoader t = getServerLoader(serverProperties); 111 t.start(); 112 threadSet.add(t); 113 } 114 joinThreadSet(threadSet); 115 116 // Try to consolidate things (even if the data is not complete) 117 Map<LdapName, Set<SuffixDescriptor>> hmSuffixes = new HashMap<>(); 118 for (ServerLoader loader : threadSet) 119 { 120 ServerDescriptor descriptor = loader.getServerDescriptor(); 121 for (ReplicaDescriptor replica : descriptor.getReplicas()) 122 { 123 logger.info(LocalizableMessage.raw("Handling replica with dn: " 124 + replica.getSuffix().getDN())); 125 126 boolean suffixFound = false; 127 LdapName dn = new LdapName(replica.getSuffix().getDN()); 128 Set<SuffixDescriptor> sufs = hmSuffixes.get(dn); 129 if (sufs != null) 130 { 131 Iterator<SuffixDescriptor> it = sufs.iterator(); 132 while (it.hasNext() && !suffixFound) 133 { 134 SuffixDescriptor suffix = it.next(); 135 Iterator<String> it2 = suffix.getReplicationServers().iterator(); 136 while (it2.hasNext() && !suffixFound) 137 { 138 if (replica.getReplicationServers().contains(it2.next())) 139 { 140 suffixFound = true; 141 Set<ReplicaDescriptor> replicas = suffix.getReplicas(); 142 replicas.add(replica); 143 suffix.setReplicas(replicas); 144 replica.setSuffix(suffix); 145 } 146 } 147 } 148 } 149 if (!suffixFound) 150 { 151 if (sufs == null) 152 { 153 sufs = new HashSet<>(); 154 hmSuffixes.put(dn, sufs); 155 } 156 sufs.add(replica.getSuffix()); 157 suffixes.add(replica.getSuffix()); 158 } 159 } 160 servers.add(descriptor); 161 } 162 163 // Figure out the replication monitoring if it is required. 164 if (getFilter().searchMonitoringInformation()) 165 { 166 readReplicationMonitoring(); 167 } 168 } 169 catch (ADSContextException ade) 170 { 171 throw new TopologyCacheException(ade); 172 } 173 catch (Throwable t) 174 { 175 throw new TopologyCacheException(TopologyCacheException.Type.BUG, t); 176 } 177 } 178 179 /** Reads the replication monitoring. */ 180 private void readReplicationMonitoring() 181 { 182 Set<ReplicaDescriptor> replicasToUpdate = getReplicasToUpdate(); 183 for (ServerDescriptor server : putQueriedReplicaFirst(this.servers)) 184 { 185 if (server.isReplicationServer()) 186 { 187 // If is replication server, then at least we were able to read the 188 // configuration, so assume that we might be able to read monitoring 189 // (even if an exception occurred before). 190 Set<ReplicaDescriptor> candidateReplicas = getCandidateReplicas(server); 191 if (!candidateReplicas.isEmpty()) 192 { 193 Set<ReplicaDescriptor> updatedReplicas = new HashSet<>(); 194 try 195 { 196 updateReplicas(server, candidateReplicas, updatedReplicas); 197 } 198 catch (NamingException ne) 199 { 200 server.setLastException(new TopologyCacheException( 201 TopologyCacheException.Type.GENERIC_READING_SERVER, ne)); 202 } 203 replicasToUpdate.removeAll(updatedReplicas); 204 } 205 } 206 207 if (replicasToUpdate.isEmpty()) 208 { 209 break; 210 } 211 } 212 } 213 214 /** Put first in the list the replica which host/port was provided on the command line. */ 215 private List<ServerDescriptor> putQueriedReplicaFirst(Set<ServerDescriptor> servers) 216 { 217 List<ServerDescriptor> results = new ArrayList<>(servers); 218 for (Iterator<ServerDescriptor> it = results.iterator(); it.hasNext();) 219 { 220 ServerDescriptor server = it.next(); 221 if (adsContext.getHostPort().equals(server.getHostPort(true))) 222 { 223 it.remove(); 224 results.add(0, server); 225 break; // avoids any ConcurrentModificationException 226 } 227 } 228 return results; 229 } 230 231 private Set<ReplicaDescriptor> getReplicasToUpdate() 232 { 233 Set<ReplicaDescriptor> replicasToUpdate = new HashSet<>(); 234 for (ServerDescriptor server : getServers()) 235 { 236 for (ReplicaDescriptor replica : server.getReplicas()) 237 { 238 if (replica.isReplicated()) 239 { 240 replicasToUpdate.add(replica); 241 } 242 } 243 } 244 return replicasToUpdate; 245 } 246 247 private Set<ReplicaDescriptor> getCandidateReplicas(ServerDescriptor server) 248 { 249 Set<ReplicaDescriptor> candidateReplicas = new HashSet<>(); 250 // It contains replication information: analyze it. 251 String repServer = server.getReplicationServerHostPort(); 252 for (SuffixDescriptor suffix : getSuffixes()) 253 { 254 if (containsIgnoreCase(suffix.getReplicationServers(), repServer)) 255 { 256 candidateReplicas.addAll(suffix.getReplicas()); 257 } 258 } 259 return candidateReplicas; 260 } 261 262 private boolean containsIgnoreCase(Set<String> col, String toFind) 263 { 264 for (String s : col) 265 { 266 if (s.equalsIgnoreCase(toFind)) 267 { 268 return true; 269 } 270 } 271 return false; 272 } 273 274 /** 275 * Sets the list of LDAP URLs and connection type that are preferred to be 276 * used to connect to the servers. When we have a server to which we can 277 * connect using a URL on the list we will try to use it. 278 * 279 * @param cnx the list of preferred connections. 280 */ 281 public void setPreferredConnections(Set<PreferredConnection> cnx) 282 { 283 preferredConnections.clear(); 284 preferredConnections.addAll(cnx); 285 } 286 287 /** 288 * Returns the list of LDAP URLs and connection type that are preferred to be 289 * used to connect to the servers. If a URL is on this list, when we have a 290 * server to which we can connect using that URL and the associated connection 291 * type we will try to use it. 292 * 293 * @return the list of preferred connections. 294 */ 295 public LinkedHashSet<PreferredConnection> getPreferredConnections() 296 { 297 return new LinkedHashSet<>(preferredConnections); 298 } 299 300 /** 301 * Returns a Set containing all the servers that are registered in the ADS. 302 * 303 * @return a Set containing all the servers that are registered in the ADS. 304 */ 305 public Set<ServerDescriptor> getServers() 306 { 307 return new HashSet<>(servers); 308 } 309 310 /** 311 * Returns a Set containing the suffixes (replication topologies) that could 312 * be retrieved after the last call to reloadTopology. 313 * 314 * @return a Set containing the suffixes (replication topologies) that could 315 * be retrieved after the last call to reloadTopology. 316 */ 317 public Set<SuffixDescriptor> getSuffixes() 318 { 319 return new HashSet<>(suffixes); 320 } 321 322 /** 323 * Returns the filter to be used when retrieving information. 324 * 325 * @return the filter to be used when retrieving information. 326 */ 327 public TopologyCacheFilter getFilter() 328 { 329 return filter; 330 } 331 332 /** 333 * Method used to wait at most a certain time (MULTITHREAD_TIMEOUT) for the 334 * different threads to finish. 335 * 336 * @param threadSet the list of threads (we assume that they are started) that 337 * we must wait for. 338 */ 339 private void joinThreadSet(Set<ServerLoader> threadSet) 340 { 341 Date startDate = new Date(); 342 for (ServerLoader t : threadSet) 343 { 344 long timeToJoin = MULTITHREAD_TIMEOUT - System.currentTimeMillis() 345 + startDate.getTime(); 346 try 347 { 348 if (timeToJoin > 0) 349 { 350 t.join(MULTITHREAD_TIMEOUT); 351 } 352 } 353 catch (InterruptedException ie) 354 { 355 logger.info(LocalizableMessage.raw(ie + " caught and ignored", ie)); 356 } 357 if (t.isAlive()) 358 { 359 t.interrupt(); 360 } 361 } 362 Date endDate = new Date(); 363 long workingTime = endDate.getTime() - startDate.getTime(); 364 logger.info(LocalizableMessage.raw("Loading ended at " + workingTime + " ms")); 365 } 366 367 /** 368 * Creates a ServerLoader object based on the provided server properties. 369 * 370 * @param serverProperties the server properties to be used to generate the 371 * ServerLoader. 372 * @return a ServerLoader object based on the provided server properties. 373 */ 374 private ServerLoader getServerLoader( 375 Map<ServerProperty, Object> serverProperties) 376 { 377 return new ServerLoader(serverProperties, bindDN, bindPwd, 378 trustManager == null ? null : trustManager.createCopy(), 379 timeout, 380 getPreferredConnections(), getFilter()); 381 } 382 383 /** 384 * Returns the adsContext used by this TopologyCache. 385 * 386 * @return the adsContext used by this TopologyCache. 387 */ 388 public ADSContext getAdsContext() 389 { 390 return adsContext; 391 } 392 393 /** 394 * Returns a set of error messages encountered in the TopologyCache. 395 * 396 * @return a set of error messages encountered in the TopologyCache. 397 */ 398 public Set<LocalizableMessage> getErrorMessages() 399 { 400 Set<TopologyCacheException> exceptions = new HashSet<>(); 401 Set<ServerDescriptor> theServers = getServers(); 402 Set<LocalizableMessage> exceptionMsgs = new LinkedHashSet<>(); 403 for (ServerDescriptor server : theServers) 404 { 405 TopologyCacheException e = server.getLastException(); 406 if (e != null) 407 { 408 exceptions.add(e); 409 } 410 } 411 /* Check the exceptions and see if we throw them or not. */ 412 for (TopologyCacheException e : exceptions) 413 { 414 switch (e.getType()) 415 { 416 case NOT_GLOBAL_ADMINISTRATOR: 417 exceptionMsgs.add(INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED.get()); 418 419 break; 420 case GENERIC_CREATING_CONNECTION: 421 if (isCertificateException(e.getCause())) 422 { 423 exceptionMsgs.add( 424 INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER.get( 425 e.getHostPort(), e.getCause().getMessage())); 426 } 427 else 428 { 429 exceptionMsgs.add(Utils.getMessage(e)); 430 } 431 break; 432 default: 433 exceptionMsgs.add(Utils.getMessage(e)); 434 } 435 } 436 return exceptionMsgs; 437 } 438 439 /** 440 * Updates the monitoring information of the provided replicas using the 441 * information located in cn=monitor of a given replication server. 442 * 443 * @param replicationServer the replication server. 444 * @param candidateReplicas the collection of replicas that must be updated. 445 * @param updatedReplicas the collection of replicas that are actually 446 * updated. This list is updated by the method. 447 */ 448 private void updateReplicas(ServerDescriptor replicationServer, 449 Collection<ReplicaDescriptor> candidateReplicas, 450 Collection<ReplicaDescriptor> updatedReplicas) 451 throws NamingException 452 { 453 SearchControls ctls = new SearchControls(); 454 ctls.setSearchScope(SearchControls.SUBTREE_SCOPE); 455 ctls.setReturningAttributes( 456 new String[] 457 { 458 "approx-older-change-not-synchronized-millis", "missing-changes", 459 "domain-name", "server-id" 460 }); 461 462 NamingEnumeration<SearchResult> monitorEntries = null; 463 ServerLoader loader = getServerLoader(replicationServer.getAdsProperties()); 464 try (ConnectionWrapper conn = loader.createConnectionWrapper()) 465 { 466 monitorEntries = conn.getLdapContext().search( 467 new LdapName("cn=monitor"), "(missing-changes=*)", ctls); 468 469 while (monitorEntries.hasMore()) 470 { 471 SearchResult sr = monitorEntries.next(); 472 473 String dn = ConnectionUtils.getFirstValue(sr, "domain-name"); 474 int replicaId = -1; 475 try 476 { 477 String sid = ConnectionUtils.getFirstValue(sr, "server-id"); 478 if (sid == null) 479 { 480 // This is not a replica, but a replication server. Skip it 481 continue; 482 } 483 replicaId = Integer.valueOf(sid); 484 } 485 catch (Throwable t) 486 { 487 logger.warn(LocalizableMessage.raw("Unexpected error reading replica ID: " + t, 488 t)); 489 } 490 491 for (ReplicaDescriptor replica : candidateReplicas) 492 { 493 if (Utils.areDnsEqual(dn, replica.getSuffix().getDN()) 494 && replica.isReplicated() 495 && replica.getReplicationId() == replicaId) 496 { 497 // This statistic is optional. 498 setAgeOfOldestMissingChange(replica, sr); 499 setMissingChanges(replica, sr); 500 updatedReplicas.add(replica); 501 } 502 } 503 } 504 } 505 catch (NameNotFoundException nse) 506 { 507 } 508 finally 509 { 510 if (monitorEntries != null) 511 { 512 try 513 { 514 monitorEntries.close(); 515 } 516 catch (Throwable t) 517 { 518 logger.warn(LocalizableMessage.raw( 519 "Unexpected error closing enumeration on monitor entries" + t, t)); 520 } 521 } 522 } 523 } 524 525 private void setMissingChanges(ReplicaDescriptor replica, SearchResult sr) throws NamingException 526 { 527 String s = ConnectionUtils.getFirstValue(sr, "missing-changes"); 528 if (s != null) 529 { 530 try 531 { 532 replica.setMissingChanges(Integer.valueOf(s)); 533 } 534 catch (Throwable t) 535 { 536 logger.warn(LocalizableMessage.raw( 537 "Unexpected error reading missing changes: " + t, t)); 538 } 539 } 540 } 541 542 private void setAgeOfOldestMissingChange(ReplicaDescriptor replica, SearchResult sr) throws NamingException 543 { 544 String s = ConnectionUtils.getFirstValue(sr, "approx-older-change-not-synchronized-millis"); 545 if (s != null) 546 { 547 try 548 { 549 replica.setAgeOfOldestMissingChange(Long.valueOf(s)); 550 } 551 catch (Throwable t) 552 { 553 logger.warn(LocalizableMessage.raw( 554 "Unexpected error reading age of oldest change: " + t, t)); 555 } 556 } 557 } 558}