001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2014 ForgeRock AS. 016 */ 017package org.opends.server.extensions; 018 019import java.util.Map; 020 021import org.forgerock.i18n.LocalizableMessage; 022import org.opends.server.api.DirectoryThread; 023import org.opends.server.core.DirectoryServer; 024import org.forgerock.i18n.slf4j.LocalizedLogger; 025import org.opends.server.types.CancelRequest; 026import org.opends.server.types.DisconnectReason; 027import org.opends.server.types.Operation; 028 029import static org.opends.messages.CoreMessages.*; 030import static org.opends.server.util.StaticUtils.*; 031 032/** 033 * This class defines a data structure for storing and interacting with a 034 * Directory Server worker thread. 035 */ 036public class TraditionalWorkerThread 037 extends DirectoryThread 038{ 039 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 040 041 /** 042 * Indicates whether the Directory Server is shutting down and this thread 043 * should stop running. 044 */ 045 private volatile boolean shutdownRequested; 046 047 /** 048 * Indicates whether this thread was stopped because the server thread number 049 * was reduced. 050 */ 051 private boolean stoppedByReducedThreadNumber; 052 053 /** Indicates whether this thread is currently waiting for work. */ 054 private boolean waitingForWork; 055 056 /** The operation that this worker thread is currently processing. */ 057 private volatile Operation operation; 058 059 /** The handle to the actual thread for this worker thread. */ 060 private Thread workerThread; 061 062 /** The work queue that this worker thread will service. */ 063 private final TraditionalWorkQueue workQueue; 064 065 066 067 /** 068 * Creates a new worker thread that will service the provided work queue and 069 * process any new requests that are submitted. 070 * 071 * @param workQueue The work queue with which this worker thread is 072 * associated. 073 * @param threadID The thread ID for this worker thread. 074 */ 075 public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID) 076 { 077 super("Worker Thread " + threadID); 078 079 080 this.workQueue = workQueue; 081 082 stoppedByReducedThreadNumber = false; 083 shutdownRequested = false; 084 waitingForWork = false; 085 operation = null; 086 workerThread = null; 087 } 088 089 090 091 /** 092 * Indicates that this thread is about to be stopped because the Directory 093 * Server configuration has been updated to reduce the number of worker 094 * threads. 095 */ 096 public void setStoppedByReducedThreadNumber() 097 { 098 stoppedByReducedThreadNumber = true; 099 } 100 101 102 103 /** 104 * Indicates whether this worker thread is actively processing a request. 105 * Note that this is a point-in-time determination and if a reliable answer is 106 * expected then the server should impose some external constraint to ensure 107 * that no new requests are enqueued. 108 * 109 * @return {@code true} if this worker thread is actively processing a 110 * request, or {@code false} if it is idle. 111 */ 112 public boolean isActive() 113 { 114 return isAlive() && operation != null; 115 } 116 117 118 119 /** 120 * Operates in a loop, retrieving the next request from the work queue, 121 * processing it, and then going back to the queue for more. 122 */ 123 @Override 124 public void run() 125 { 126 workerThread = currentThread(); 127 128 while (! shutdownRequested) 129 { 130 try 131 { 132 waitingForWork = true; 133 operation = null; // this line is necessary because next line can block 134 operation = workQueue.nextOperation(this); 135 waitingForWork = false; 136 137 138 if (operation == null) 139 { 140 // The operation may be null if the server is shutting down. If that 141 // is the case, then break out of the while loop. 142 break; 143 } 144 else 145 { 146 // The operation is not null, so process it. Make sure that when 147 // processing is complete. 148 operation.run(); 149 operation.operationCompleted(); 150 } 151 } 152 catch (Throwable t) 153 { 154 if (logger.isTraceEnabled()) 155 { 156 logger.trace( 157 "Uncaught exception in worker thread while processing " + 158 "operation %s: %s", operation, t); 159 logger.traceException(t); 160 } 161 162 try 163 { 164 LocalizableMessage message = 165 ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t)); 166 logger.error(message); 167 168 // Ensure that the client receives some kind of result so that it does 169 // not hang. 170 operation.setResultCode(DirectoryServer.getServerErrorResultCode()); 171 operation.appendErrorMessage(message); 172 operation.getClientConnection().sendResponse(operation); 173 } 174 catch (Throwable t2) 175 { 176 if (logger.isTraceEnabled()) 177 { 178 logger.trace( 179 "Exception in worker thread while trying to log a " + 180 "message about an uncaught exception %s: %s", t, t2); 181 182 logger.traceException(t2); 183 } 184 } 185 186 187 try 188 { 189 LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get( 190 getName(), operation, stackTraceToSingleLineString(t)); 191 192 operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message); 193 } 194 catch (Throwable t2) 195 { 196 logger.traceException(t2); 197 } 198 } 199 } 200 201 // If we have gotten here, then we presume that the server thread is 202 // shutting down. However, if that's not the case then that is a problem 203 // and we will want to log a message. 204 if (stoppedByReducedThreadNumber) 205 { 206 logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName()); 207 } 208 else if (! workQueue.shutdownRequested()) 209 { 210 logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName()); 211 } 212 213 214 if (logger.isTraceEnabled()) 215 { 216 logger.trace(getName() + " exiting."); 217 } 218 } 219 220 221 222 /** 223 * Indicates that the Directory Server has received a request to stop running 224 * and that this thread should stop running as soon as possible. 225 */ 226 public void shutDown() 227 { 228 if (logger.isTraceEnabled()) 229 { 230 logger.trace(getName() + " being signaled to shut down."); 231 } 232 233 // Set a flag that indicates that the thread should stop running. 234 shutdownRequested = true; 235 236 237 // Check to see if the thread is waiting for work. If so, then interrupt 238 // it. 239 if (waitingForWork) 240 { 241 try 242 { 243 workerThread.interrupt(); 244 } 245 catch (Exception e) 246 { 247 if (logger.isTraceEnabled()) 248 { 249 logger.trace( 250 "Caught an exception while trying to interrupt the worker " + 251 "thread waiting for work: %s", e); 252 logger.traceException(e); 253 } 254 } 255 } 256 else 257 { 258 try 259 { 260 final Operation localOperation = operation; 261 if (localOperation != null) 262 { 263 CancelRequest cancelRequest = new CancelRequest(true, 264 INFO_CANCELED_BY_SHUTDOWN.get()); 265 localOperation.cancel(cancelRequest); 266 } 267 } 268 catch (Exception e) 269 { 270 if (logger.isTraceEnabled()) 271 { 272 logger.trace( 273 "Caught an exception while trying to abandon the " + 274 "operation in progress for the worker thread: %s", e); 275 logger.traceException(e); 276 } 277 } 278 } 279 } 280 281 /** 282 * Retrieves any relevant debug information with which this tread is 283 * associated so they can be included in debug messages. 284 * 285 * @return debug information about this thread as a string. 286 */ 287 @Override 288 public Map<String, String> getDebugProperties() 289 { 290 Map<String, String> properties = super.getDebugProperties(); 291 properties.put("clientConnection", operation != null 292 ? String.valueOf(operation.getClientConnection()) : "none"); 293 properties.put("operation", String.valueOf(operation)); 294 return properties; 295 } 296} 297