001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2009-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2014 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import java.io.IOException; 020 021import org.opends.server.api.DirectoryThread; 022import org.forgerock.i18n.slf4j.LocalizedLogger; 023import org.opends.server.replication.protocol.MonitorMsg; 024 025/** 026 * This thread regularly publishes monitoring information: 027 * - it sends monitoring messages regarding the direct topology (directly 028 * connected DSs and RSs) to the connected RSs 029 * - it sends monitoring messages regarding the whole topology (also includes 030 * the local RS) to the connected DSs 031 * Note: as of today, monitoring messages mainly contains the server state of 032 * the entities. 033 */ 034public class MonitoringPublisher extends DirectoryThread 035{ 036 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 037 038 /** The replication domain we send monitoring for. */ 039 private final ReplicationServerDomain domain; 040 041 /** Sleep time (in ms) before sending new monitoring messages. */ 042 private volatile long period; 043 044 private final Object shutdownLock = new Object(); 045 046 /** 047 * Create a monitoring publisher. 048 * @param replicationServerDomain The ReplicationServerDomain the monitoring 049 * publisher is for. 050 * @param period The sleep time to use 051 */ 052 public MonitoringPublisher(ReplicationServerDomain replicationServerDomain, 053 long period) 054 { 055 super("Replication server RS(" 056 + replicationServerDomain.getLocalRSServerId() 057 + ") monitor publisher for domain \"" 058 + replicationServerDomain.getBaseDN() + "\""); 059 060 this.domain = replicationServerDomain; 061 this.period = period; 062 } 063 064 /** 065 * Run method for the monitoring publisher. 066 */ 067 @Override 068 public void run() 069 { 070 if (logger.isTraceEnabled()) 071 { 072 logger.trace(getMessage("Monitoring publisher starting.")); 073 } 074 075 try 076 { 077 while (!isShutdownInitiated()) 078 { 079 // Send global topology information to peer DSs 080 final int senderId = domain.getLocalRSServerId(); 081 final MonitorMsg monitorMsg = 082 domain.createGlobalTopologyMonitorMsg(senderId, 0); 083 084 for (ServerHandler serverHandler : domain.getConnectedDSs().values()) 085 { 086 // send() can be long operation, check for shutdown between each calls 087 if (isShutdownInitiated()) 088 { 089 break; 090 } 091 try 092 { 093 serverHandler.send(monitorMsg); 094 } 095 catch (IOException e) 096 { 097 // Server is disconnecting ? Forget it 098 } 099 } 100 101 synchronized (shutdownLock) 102 { 103 // double check to ensure the call to notify() was not missed 104 if (!isShutdownInitiated()) 105 { 106 shutdownLock.wait(period); 107 } 108 } 109 } 110 } 111 catch (InterruptedException e) 112 { 113 logger.trace(getMessage( 114 "Monitoring publisher has been interrupted while sleeping.")); 115 } 116 117 logger.trace(getMessage("Monitoring publisher is terminated.")); 118 } 119 120 121 122 /** 123 * Stops the thread. 124 */ 125 public void shutdown() 126 { 127 initiateShutdown(); 128 synchronized (shutdownLock) 129 { 130 shutdownLock.notifyAll(); 131 } 132 if (logger.isTraceEnabled()) 133 { 134 logger.trace(getMessage("Shutting down monitoring publisher.")); 135 } 136 } 137 138 /** 139 * Waits for thread death. If not terminated within 2 seconds, 140 * forces interruption 141 */ 142 public void waitForShutdown() 143 { 144 try 145 { 146 // Here, "this" is the monitoring publisher thread 147 join(2000); 148 } 149 catch (InterruptedException e) 150 { 151 // exit the loop if this thread is interrupted. 152 } 153 } 154 155 /** 156 * Sets the period value. 157 * @param period The new period value. 158 */ 159 public void setPeriod(long period) 160 { 161 if (logger.isTraceEnabled()) 162 { 163 logger.trace(getMessage( 164 "Monitoring publisher changing period value to " + period)); 165 } 166 167 this.period = period; 168 } 169 170 private String getMessage(String message) 171 { 172 return "In RS " + domain.getLocalRSServerId() + ", for base dn " 173 + domain.getBaseDN() + ": " + message; 174 } 175}