001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2008 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2015 ForgeRock AS.
016 */
017package org.opends.server.replication.protocol;
018
019import java.io.IOException;
020
021import org.opends.server.api.DirectoryThread;
022import org.forgerock.i18n.slf4j.LocalizedLogger;
023import org.opends.server.util.StaticUtils;
024
025/**
026 * This thread publishes a {@link HeartbeatMsg} on a given protocol session at
027 * regular intervals when there are no other replication messages being
028 * published.
029 * <p>
030 * These heartbeat messages are sent by a replication server.
031 */
032public class HeartbeatThread extends DirectoryThread
033{
034  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
035
036
037  /**
038   * For test purposes only to simulate loss of heartbeats.
039   */
040  private static volatile boolean heartbeatsDisabled;
041
042  /**
043   * The session on which heartbeats are to be sent.
044   */
045  private final Session session;
046
047
048  /**
049   * The time in milliseconds between heartbeats.
050   */
051  private final long heartbeatInterval;
052
053
054  /**
055   * Set this to stop the thread.
056   */
057  private volatile boolean shutdown;
058  private final Object shutdownLock = new Object();
059
060
061  /**
062   * Create a heartbeat thread.
063   * @param threadName The name of the heartbeat thread.
064   * @param session The session on which heartbeats are to be sent.
065   * @param heartbeatInterval The desired interval between heartbeats in
066   * milliseconds.
067   */
068  public HeartbeatThread(String threadName, Session session,
069                  long heartbeatInterval)
070  {
071    super(threadName);
072    this.session = session;
073    this.heartbeatInterval = heartbeatInterval;
074  }
075
076  /** {@inheritDoc} */
077  @Override
078  public void run()
079  {
080    try
081    {
082      if (logger.isTraceEnabled())
083      {
084        logger.trace("Heartbeat thread is starting, interval is %d",
085                  heartbeatInterval);
086      }
087      HeartbeatMsg heartbeatMessage = new HeartbeatMsg();
088
089      while (!shutdown)
090      {
091        long now = System.currentTimeMillis();
092        if (logger.isTraceEnabled())
093        {
094          logger.trace("Heartbeat thread awoke at %d, last message " +
095              "was sent at %d", now, session.getLastPublishTime());
096        }
097
098        if (now > session.getLastPublishTime() + heartbeatInterval
099            && !heartbeatsDisabled)
100        {
101          if (logger.isTraceEnabled())
102          {
103            logger.trace("Heartbeat sent at %d", now);
104          }
105          session.publish(heartbeatMessage);
106        }
107
108        long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
109        if (sleepTime <= 0)
110        {
111          sleepTime = heartbeatInterval;
112        }
113
114        if (logger.isTraceEnabled())
115        {
116          logger.trace("Heartbeat thread sleeping for %d", sleepTime);
117        }
118
119        synchronized (shutdownLock)
120        {
121          if (!shutdown)
122          {
123            try
124            {
125              shutdownLock.wait(sleepTime);
126            }
127            catch (InterruptedException e)
128            {
129              // Server shutdown monitor may interrupt slow threads.
130              logger.traceException(e);
131              shutdown = true;
132            }
133          }
134        }
135      }
136    }
137    catch (IOException e)
138    {
139      if (logger.isTraceEnabled())
140      {
141        logger.trace("Heartbeat thread could not send a heartbeat."
142            + StaticUtils.stackTraceToSingleLineString(e));
143      }
144    }
145    finally
146    {
147      if (logger.isTraceEnabled())
148      {
149        logger.trace("Heartbeat thread is exiting.");
150      }
151    }
152  }
153
154
155  /**
156   * Call this method to stop the thread.
157   * This method is blocking until the thread has stopped.
158   */
159  public void shutdown()
160  {
161    synchronized (shutdownLock)
162    {
163      shutdown = true;
164      shutdownLock.notifyAll();
165      if (logger.isTraceEnabled())
166      {
167        logger.trace("Going to notify Heartbeat thread.");
168      }
169    }
170    if (logger.isTraceEnabled())
171    {
172      logger.trace("Returning from Heartbeat shutdown.");
173    }
174  }
175
176
177  /**
178   * For testing purposes only to simulate loss of heartbeats.
179   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
180   */
181  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
182  {
183    HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
184  }
185}