001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2008-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.opends.admin.ads;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.LinkedHashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import javax.naming.NameNotFoundException;
031import javax.naming.NamingEnumeration;
032import javax.naming.NamingException;
033import javax.naming.directory.SearchControls;
034import javax.naming.directory.SearchResult;
035import javax.naming.ldap.LdapName;
036
037import org.forgerock.i18n.LocalizableMessage;
038import org.forgerock.i18n.slf4j.LocalizedLogger;
039import org.opends.admin.ads.ADSContext.ServerProperty;
040import org.opends.admin.ads.util.ApplicationTrustManager;
041import org.opends.admin.ads.util.ConnectionUtils;
042import org.opends.admin.ads.util.ConnectionWrapper;
043import org.opends.admin.ads.util.PreferredConnection;
044import org.opends.admin.ads.util.ServerLoader;
045import org.opends.quicksetup.util.Utils;
046
047import static com.forgerock.opendj.cli.Utils.*;
048
049import static org.opends.messages.QuickSetupMessages.*;
050
051/**
052 * This class allows to read the configuration of the different servers that are
053 * registered in a given ADS server. It provides a read only view of the
054 * configuration of the servers and of the replication topologies that might be
055 * configured between them.
056 */
057public class TopologyCache
058{
059  private final ADSContext adsContext;
060  private final ApplicationTrustManager trustManager;
061  private final int timeout;
062  private final String bindDN;
063  private final String bindPwd;
064  private final Set<ServerDescriptor> servers = new HashSet<>();
065  private final Set<SuffixDescriptor> suffixes = new HashSet<>();
066  private final Set<PreferredConnection> preferredConnections = new LinkedHashSet<>();
067  private final TopologyCacheFilter filter = new TopologyCacheFilter();
068  private static final int MULTITHREAD_TIMEOUT = 90 * 1000;
069  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
070
071  /**
072   * Constructor of the TopologyCache.
073   *
074   * @param adsContext the adsContext to the ADS registry.
075   * @param trustManager the ApplicationTrustManager that must be used to trust
076   * certificates when we create connections to the registered servers to read
077   * their configuration.
078   * @param timeout the timeout to establish the connection in milliseconds.
079   * Use {@code 0} to express no timeout.
080   */
081  public TopologyCache(ADSContext adsContext,
082                       ApplicationTrustManager trustManager,
083                       int timeout)
084  {
085    this.adsContext = adsContext;
086    this.trustManager = trustManager;
087    this.timeout = timeout;
088    bindDN = ConnectionUtils.getBindDN(adsContext.getDirContext());
089    bindPwd = ConnectionUtils.getBindPassword(adsContext.getDirContext());
090  }
091
092  /**
093   * Reads the configuration of the registered servers.
094   *
095   * @throws TopologyCacheException if there is an issue reading the
096   * configuration of the registered servers.
097   */
098  public void reloadTopology() throws TopologyCacheException
099  {
100    suffixes.clear();
101    servers.clear();
102    try
103    {
104      Set<Map<ServerProperty, Object>> adsServers =
105          adsContext.readServerRegistry();
106
107      Set<ServerLoader> threadSet = new HashSet<>();
108      for (Map<ServerProperty, Object> serverProperties : adsServers)
109      {
110        ServerLoader t = getServerLoader(serverProperties);
111        t.start();
112        threadSet.add(t);
113      }
114      joinThreadSet(threadSet);
115
116      // Try to consolidate things (even if the data is not complete)
117      Map<LdapName, Set<SuffixDescriptor>> hmSuffixes = new HashMap<>();
118      for (ServerLoader loader : threadSet)
119      {
120        ServerDescriptor descriptor = loader.getServerDescriptor();
121        for (ReplicaDescriptor replica : descriptor.getReplicas())
122        {
123          logger.info(LocalizableMessage.raw("Handling replica with dn: "
124              + replica.getSuffix().getDN()));
125
126          boolean suffixFound = false;
127          LdapName dn = new LdapName(replica.getSuffix().getDN());
128          Set<SuffixDescriptor> sufs = hmSuffixes.get(dn);
129          if (sufs != null)
130          {
131            Iterator<SuffixDescriptor> it = sufs.iterator();
132            while (it.hasNext() && !suffixFound)
133            {
134              SuffixDescriptor suffix = it.next();
135              Iterator<String> it2 = suffix.getReplicationServers().iterator();
136              while (it2.hasNext() && !suffixFound)
137              {
138                if (replica.getReplicationServers().contains(it2.next()))
139                {
140                  suffixFound = true;
141                  Set<ReplicaDescriptor> replicas = suffix.getReplicas();
142                  replicas.add(replica);
143                  suffix.setReplicas(replicas);
144                  replica.setSuffix(suffix);
145                }
146              }
147            }
148          }
149          if (!suffixFound)
150          {
151            if (sufs == null)
152            {
153              sufs = new HashSet<>();
154              hmSuffixes.put(dn, sufs);
155            }
156            sufs.add(replica.getSuffix());
157            suffixes.add(replica.getSuffix());
158          }
159        }
160        servers.add(descriptor);
161      }
162
163      // Figure out the replication monitoring if it is required.
164      if (getFilter().searchMonitoringInformation())
165      {
166        readReplicationMonitoring();
167      }
168    }
169    catch (ADSContextException ade)
170    {
171      throw new TopologyCacheException(ade);
172    }
173    catch (Throwable t)
174    {
175      throw new TopologyCacheException(TopologyCacheException.Type.BUG, t);
176    }
177  }
178
179  /** Reads the replication monitoring. */
180  private void readReplicationMonitoring()
181  {
182    Set<ReplicaDescriptor> replicasToUpdate = getReplicasToUpdate();
183    for (ServerDescriptor server : putQueriedReplicaFirst(this.servers))
184    {
185      if (server.isReplicationServer())
186      {
187        // If is replication server, then at least we were able to read the
188        // configuration, so assume that we might be able to read monitoring
189        // (even if an exception occurred before).
190        Set<ReplicaDescriptor> candidateReplicas = getCandidateReplicas(server);
191        if (!candidateReplicas.isEmpty())
192        {
193          Set<ReplicaDescriptor> updatedReplicas = new HashSet<>();
194          try
195          {
196            updateReplicas(server, candidateReplicas, updatedReplicas);
197          }
198          catch (NamingException ne)
199          {
200            server.setLastException(new TopologyCacheException(
201                TopologyCacheException.Type.GENERIC_READING_SERVER, ne));
202          }
203          replicasToUpdate.removeAll(updatedReplicas);
204        }
205      }
206
207      if (replicasToUpdate.isEmpty())
208      {
209        break;
210      }
211    }
212  }
213
214  /** Put first in the list the replica which host/port was provided on the command line. */
215  private List<ServerDescriptor> putQueriedReplicaFirst(Set<ServerDescriptor> servers)
216  {
217    List<ServerDescriptor> results = new ArrayList<>(servers);
218    for (Iterator<ServerDescriptor> it = results.iterator(); it.hasNext();)
219    {
220      ServerDescriptor server = it.next();
221      if (adsContext.getHostPort().equals(server.getHostPort(true)))
222      {
223        it.remove();
224        results.add(0, server);
225        break; // avoids any ConcurrentModificationException
226      }
227    }
228    return results;
229  }
230
231  private Set<ReplicaDescriptor> getReplicasToUpdate()
232  {
233    Set<ReplicaDescriptor> replicasToUpdate = new HashSet<>();
234    for (ServerDescriptor server : getServers())
235    {
236      for (ReplicaDescriptor replica : server.getReplicas())
237      {
238        if (replica.isReplicated())
239        {
240          replicasToUpdate.add(replica);
241        }
242      }
243    }
244    return replicasToUpdate;
245  }
246
247  private Set<ReplicaDescriptor> getCandidateReplicas(ServerDescriptor server)
248  {
249    Set<ReplicaDescriptor> candidateReplicas = new HashSet<>();
250    // It contains replication information: analyze it.
251    String repServer = server.getReplicationServerHostPort();
252    for (SuffixDescriptor suffix : getSuffixes())
253    {
254      if (containsIgnoreCase(suffix.getReplicationServers(), repServer))
255      {
256        candidateReplicas.addAll(suffix.getReplicas());
257      }
258    }
259    return candidateReplicas;
260  }
261
262  private boolean containsIgnoreCase(Set<String> col, String toFind)
263  {
264    for (String s : col)
265    {
266      if (s.equalsIgnoreCase(toFind))
267      {
268        return true;
269      }
270    }
271    return false;
272  }
273
274  /**
275   * Sets the list of LDAP URLs and connection type that are preferred to be
276   * used to connect to the servers. When we have a server to which we can
277   * connect using a URL on the list we will try to use it.
278   *
279   * @param cnx the list of preferred connections.
280   */
281  public void setPreferredConnections(Set<PreferredConnection> cnx)
282  {
283    preferredConnections.clear();
284    preferredConnections.addAll(cnx);
285  }
286
287  /**
288   * Returns the list of LDAP URLs and connection type that are preferred to be
289   * used to connect to the servers. If a URL is on this list, when we have a
290   * server to which we can connect using that URL and the associated connection
291   * type we will try to use it.
292   *
293   * @return the list of preferred connections.
294   */
295  public LinkedHashSet<PreferredConnection> getPreferredConnections()
296  {
297    return new LinkedHashSet<>(preferredConnections);
298  }
299
300  /**
301   * Returns a Set containing all the servers that are registered in the ADS.
302   *
303   * @return a Set containing all the servers that are registered in the ADS.
304   */
305  public Set<ServerDescriptor> getServers()
306  {
307    return new HashSet<>(servers);
308  }
309
310  /**
311   * Returns a Set containing the suffixes (replication topologies) that could
312   * be retrieved after the last call to reloadTopology.
313   *
314   * @return a Set containing the suffixes (replication topologies) that could
315   * be retrieved after the last call to reloadTopology.
316   */
317  public Set<SuffixDescriptor> getSuffixes()
318  {
319    return new HashSet<>(suffixes);
320  }
321
322  /**
323   * Returns the filter to be used when retrieving information.
324   *
325   * @return the filter to be used when retrieving information.
326   */
327  public TopologyCacheFilter getFilter()
328  {
329    return filter;
330  }
331
332  /**
333   * Method used to wait at most a certain time (MULTITHREAD_TIMEOUT) for the
334   * different threads to finish.
335   *
336   * @param threadSet the list of threads (we assume that they are started) that
337   * we must wait for.
338   */
339  private void joinThreadSet(Set<ServerLoader> threadSet)
340  {
341    Date startDate = new Date();
342    for (ServerLoader t : threadSet)
343    {
344      long timeToJoin = MULTITHREAD_TIMEOUT - System.currentTimeMillis()
345          + startDate.getTime();
346      try
347      {
348        if (timeToJoin > 0)
349        {
350          t.join(MULTITHREAD_TIMEOUT);
351        }
352      }
353      catch (InterruptedException ie)
354      {
355        logger.info(LocalizableMessage.raw(ie + " caught and ignored", ie));
356      }
357      if (t.isAlive())
358      {
359        t.interrupt();
360      }
361    }
362    Date endDate = new Date();
363    long workingTime = endDate.getTime() - startDate.getTime();
364    logger.info(LocalizableMessage.raw("Loading ended at " + workingTime + " ms"));
365  }
366
367  /**
368   * Creates a ServerLoader object based on the provided server properties.
369   *
370   * @param serverProperties the server properties to be used to generate the
371   * ServerLoader.
372   * @return a ServerLoader object based on the provided server properties.
373   */
374  private ServerLoader getServerLoader(
375      Map<ServerProperty, Object> serverProperties)
376  {
377    return new ServerLoader(serverProperties, bindDN, bindPwd,
378        trustManager == null ? null : trustManager.createCopy(),
379        timeout,
380        getPreferredConnections(), getFilter());
381  }
382
383  /**
384   * Returns the adsContext used by this TopologyCache.
385   *
386   * @return the adsContext used by this TopologyCache.
387   */
388  public ADSContext getAdsContext()
389  {
390    return adsContext;
391  }
392
393  /**
394   * Returns a set of error messages encountered in the TopologyCache.
395   *
396   * @return a set of error messages encountered in the TopologyCache.
397   */
398  public Set<LocalizableMessage> getErrorMessages()
399  {
400    Set<TopologyCacheException> exceptions = new HashSet<>();
401    Set<ServerDescriptor> theServers = getServers();
402    Set<LocalizableMessage> exceptionMsgs = new LinkedHashSet<>();
403    for (ServerDescriptor server : theServers)
404    {
405      TopologyCacheException e = server.getLastException();
406      if (e != null)
407      {
408        exceptions.add(e);
409      }
410    }
411    /* Check the exceptions and see if we throw them or not. */
412    for (TopologyCacheException e : exceptions)
413    {
414      switch (e.getType())
415      {
416        case NOT_GLOBAL_ADMINISTRATOR:
417          exceptionMsgs.add(INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED.get());
418
419          break;
420        case GENERIC_CREATING_CONNECTION:
421          if (isCertificateException(e.getCause()))
422          {
423            exceptionMsgs.add(
424                INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER.get(
425                e.getHostPort(), e.getCause().getMessage()));
426          }
427          else
428          {
429            exceptionMsgs.add(Utils.getMessage(e));
430          }
431          break;
432        default:
433          exceptionMsgs.add(Utils.getMessage(e));
434      }
435    }
436    return exceptionMsgs;
437  }
438
439  /**
440   * Updates the monitoring information of the provided replicas using the
441   * information located in cn=monitor of a given replication server.
442   *
443   * @param replicationServer the replication server.
444   * @param candidateReplicas the collection of replicas that must be updated.
445   * @param updatedReplicas the collection of replicas that are actually
446   * updated. This list is updated by the method.
447   */
448  private void updateReplicas(ServerDescriptor replicationServer,
449                              Collection<ReplicaDescriptor> candidateReplicas,
450                              Collection<ReplicaDescriptor> updatedReplicas)
451      throws NamingException
452  {
453    SearchControls ctls = new SearchControls();
454    ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
455    ctls.setReturningAttributes(
456        new String[]
457        {
458          "approx-older-change-not-synchronized-millis", "missing-changes",
459          "domain-name", "server-id"
460        });
461
462    NamingEnumeration<SearchResult> monitorEntries = null;
463    ServerLoader loader = getServerLoader(replicationServer.getAdsProperties());
464    try (ConnectionWrapper conn = loader.createConnectionWrapper())
465    {
466      monitorEntries = conn.getLdapContext().search(
467          new LdapName("cn=monitor"), "(missing-changes=*)", ctls);
468
469      while (monitorEntries.hasMore())
470      {
471        SearchResult sr = monitorEntries.next();
472
473        String dn = ConnectionUtils.getFirstValue(sr, "domain-name");
474        int replicaId = -1;
475        try
476        {
477          String sid = ConnectionUtils.getFirstValue(sr, "server-id");
478          if (sid == null)
479          {
480            // This is not a replica, but a replication server. Skip it
481            continue;
482          }
483          replicaId = Integer.valueOf(sid);
484        }
485        catch (Throwable t)
486        {
487          logger.warn(LocalizableMessage.raw("Unexpected error reading replica ID: " + t,
488              t));
489        }
490
491        for (ReplicaDescriptor replica : candidateReplicas)
492        {
493          if (Utils.areDnsEqual(dn, replica.getSuffix().getDN())
494              && replica.isReplicated()
495              && replica.getReplicationId() == replicaId)
496          {
497            // This statistic is optional.
498            setAgeOfOldestMissingChange(replica, sr);
499            setMissingChanges(replica, sr);
500            updatedReplicas.add(replica);
501          }
502        }
503      }
504    }
505    catch (NameNotFoundException nse)
506    {
507    }
508    finally
509    {
510      if (monitorEntries != null)
511      {
512        try
513        {
514          monitorEntries.close();
515        }
516        catch (Throwable t)
517        {
518          logger.warn(LocalizableMessage.raw(
519              "Unexpected error closing enumeration on monitor entries" + t, t));
520        }
521      }
522    }
523  }
524
525  private void setMissingChanges(ReplicaDescriptor replica, SearchResult sr) throws NamingException
526  {
527    String s = ConnectionUtils.getFirstValue(sr, "missing-changes");
528    if (s != null)
529    {
530      try
531      {
532        replica.setMissingChanges(Integer.valueOf(s));
533      }
534      catch (Throwable t)
535      {
536        logger.warn(LocalizableMessage.raw(
537            "Unexpected error reading missing changes: " + t, t));
538      }
539    }
540  }
541
542  private void setAgeOfOldestMissingChange(ReplicaDescriptor replica, SearchResult sr) throws NamingException
543  {
544    String s = ConnectionUtils.getFirstValue(sr, "approx-older-change-not-synchronized-millis");
545    if (s != null)
546    {
547      try
548      {
549        replica.setAgeOfOldestMissingChange(Long.valueOf(s));
550      }
551      catch (Throwable t)
552      {
553        logger.warn(LocalizableMessage.raw(
554            "Unexpected error reading age of oldest change: " + t, t));
555      }
556    }
557  }
558}