001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2009-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2014 ForgeRock AS.
016 */
017package org.opends.server.replication.server;
018
019import java.io.IOException;
020
021import org.opends.server.api.DirectoryThread;
022import org.forgerock.i18n.slf4j.LocalizedLogger;
023import org.opends.server.replication.protocol.MonitorMsg;
024
025/**
026 * This thread regularly publishes monitoring information:
027 * - it sends monitoring messages regarding the direct topology (directly
028 *   connected DSs and RSs) to the connected RSs
029 * - it sends monitoring messages regarding the whole topology (also includes
030 *   the local RS) to the connected DSs
031 * Note: as of today, monitoring messages mainly contains the server state of
032 * the entities.
033 */
034public class MonitoringPublisher extends DirectoryThread
035{
036  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
037
038  /** The replication domain we send monitoring for. */
039  private final ReplicationServerDomain domain;
040
041  /** Sleep time (in ms) before sending new monitoring messages. */
042  private volatile long period;
043
044  private final Object shutdownLock = new Object();
045
046  /**
047   * Create a monitoring publisher.
048   * @param replicationServerDomain The ReplicationServerDomain the monitoring
049   *        publisher is for.
050   * @param period The sleep time to use
051   */
052  public MonitoringPublisher(ReplicationServerDomain replicationServerDomain,
053    long period)
054  {
055    super("Replication server RS("
056        + replicationServerDomain.getLocalRSServerId()
057        + ") monitor publisher for domain \""
058        + replicationServerDomain.getBaseDN() + "\"");
059
060    this.domain = replicationServerDomain;
061    this.period = period;
062  }
063
064  /**
065   * Run method for the monitoring publisher.
066   */
067  @Override
068  public void run()
069  {
070    if (logger.isTraceEnabled())
071    {
072      logger.trace(getMessage("Monitoring publisher starting."));
073    }
074
075    try
076    {
077      while (!isShutdownInitiated())
078      {
079        // Send global topology information to peer DSs
080        final int senderId = domain.getLocalRSServerId();
081        final MonitorMsg monitorMsg =
082            domain.createGlobalTopologyMonitorMsg(senderId, 0);
083
084        for (ServerHandler serverHandler : domain.getConnectedDSs().values())
085        {
086          // send() can be long operation, check for shutdown between each calls
087          if (isShutdownInitiated())
088          {
089            break;
090          }
091          try
092          {
093            serverHandler.send(monitorMsg);
094          }
095          catch (IOException e)
096          {
097            // Server is disconnecting ? Forget it
098          }
099        }
100
101        synchronized (shutdownLock)
102        {
103          // double check to ensure the call to notify() was not missed
104          if (!isShutdownInitiated())
105          {
106            shutdownLock.wait(period);
107          }
108        }
109      }
110    }
111    catch (InterruptedException e)
112    {
113      logger.trace(getMessage(
114          "Monitoring publisher has been interrupted while sleeping."));
115    }
116
117    logger.trace(getMessage("Monitoring publisher is terminated."));
118  }
119
120
121
122  /**
123   * Stops the thread.
124   */
125  public void shutdown()
126  {
127    initiateShutdown();
128    synchronized (shutdownLock)
129    {
130      shutdownLock.notifyAll();
131    }
132    if (logger.isTraceEnabled())
133    {
134      logger.trace(getMessage("Shutting down monitoring publisher."));
135    }
136  }
137
138  /**
139   * Waits for thread death. If not terminated within 2 seconds,
140   * forces interruption
141   */
142  public void waitForShutdown()
143  {
144    try
145    {
146      // Here, "this" is the monitoring publisher thread
147      join(2000);
148    }
149    catch (InterruptedException e)
150    {
151      // exit the loop if this thread is interrupted.
152    }
153  }
154
155  /**
156   * Sets the period value.
157   * @param period The new period value.
158   */
159  public void setPeriod(long period)
160  {
161    if (logger.isTraceEnabled())
162    {
163      logger.trace(getMessage(
164          "Monitoring publisher changing period value to " + period));
165    }
166
167    this.period = period;
168  }
169
170  private String getMessage(String message)
171  {
172    return "In RS " + domain.getLocalRSServerId() + ", for base dn "
173        + domain.getBaseDN() + ": " + message;
174  }
175}