001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2008 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.protocol; 018 019import java.io.IOException; 020 021import org.opends.server.api.DirectoryThread; 022import org.forgerock.i18n.slf4j.LocalizedLogger; 023import org.opends.server.util.StaticUtils; 024 025/** 026 * This thread publishes a {@link HeartbeatMsg} on a given protocol session at 027 * regular intervals when there are no other replication messages being 028 * published. 029 * <p> 030 * These heartbeat messages are sent by a replication server. 031 */ 032public class HeartbeatThread extends DirectoryThread 033{ 034 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 035 036 037 /** 038 * For test purposes only to simulate loss of heartbeats. 039 */ 040 private static volatile boolean heartbeatsDisabled; 041 042 /** 043 * The session on which heartbeats are to be sent. 044 */ 045 private final Session session; 046 047 048 /** 049 * The time in milliseconds between heartbeats. 050 */ 051 private final long heartbeatInterval; 052 053 054 /** 055 * Set this to stop the thread. 056 */ 057 private volatile boolean shutdown; 058 private final Object shutdownLock = new Object(); 059 060 061 /** 062 * Create a heartbeat thread. 063 * @param threadName The name of the heartbeat thread. 064 * @param session The session on which heartbeats are to be sent. 065 * @param heartbeatInterval The desired interval between heartbeats in 066 * milliseconds. 067 */ 068 public HeartbeatThread(String threadName, Session session, 069 long heartbeatInterval) 070 { 071 super(threadName); 072 this.session = session; 073 this.heartbeatInterval = heartbeatInterval; 074 } 075 076 /** {@inheritDoc} */ 077 @Override 078 public void run() 079 { 080 try 081 { 082 if (logger.isTraceEnabled()) 083 { 084 logger.trace("Heartbeat thread is starting, interval is %d", 085 heartbeatInterval); 086 } 087 HeartbeatMsg heartbeatMessage = new HeartbeatMsg(); 088 089 while (!shutdown) 090 { 091 long now = System.currentTimeMillis(); 092 if (logger.isTraceEnabled()) 093 { 094 logger.trace("Heartbeat thread awoke at %d, last message " + 095 "was sent at %d", now, session.getLastPublishTime()); 096 } 097 098 if (now > session.getLastPublishTime() + heartbeatInterval 099 && !heartbeatsDisabled) 100 { 101 if (logger.isTraceEnabled()) 102 { 103 logger.trace("Heartbeat sent at %d", now); 104 } 105 session.publish(heartbeatMessage); 106 } 107 108 long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; 109 if (sleepTime <= 0) 110 { 111 sleepTime = heartbeatInterval; 112 } 113 114 if (logger.isTraceEnabled()) 115 { 116 logger.trace("Heartbeat thread sleeping for %d", sleepTime); 117 } 118 119 synchronized (shutdownLock) 120 { 121 if (!shutdown) 122 { 123 try 124 { 125 shutdownLock.wait(sleepTime); 126 } 127 catch (InterruptedException e) 128 { 129 // Server shutdown monitor may interrupt slow threads. 130 logger.traceException(e); 131 shutdown = true; 132 } 133 } 134 } 135 } 136 } 137 catch (IOException e) 138 { 139 if (logger.isTraceEnabled()) 140 { 141 logger.trace("Heartbeat thread could not send a heartbeat." 142 + StaticUtils.stackTraceToSingleLineString(e)); 143 } 144 } 145 finally 146 { 147 if (logger.isTraceEnabled()) 148 { 149 logger.trace("Heartbeat thread is exiting."); 150 } 151 } 152 } 153 154 155 /** 156 * Call this method to stop the thread. 157 * This method is blocking until the thread has stopped. 158 */ 159 public void shutdown() 160 { 161 synchronized (shutdownLock) 162 { 163 shutdown = true; 164 shutdownLock.notifyAll(); 165 if (logger.isTraceEnabled()) 166 { 167 logger.trace("Going to notify Heartbeat thread."); 168 } 169 } 170 if (logger.isTraceEnabled()) 171 { 172 logger.trace("Returning from Heartbeat shutdown."); 173 } 174 } 175 176 177 /** 178 * For testing purposes only to simulate loss of heartbeats. 179 * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. 180 */ 181 public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) 182 { 183 HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled; 184 } 185}