001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2009 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2015 ForgeRock AS. 016 */ 017package org.opends.server.extensions; 018 019 020import java.util.Map; 021 022import org.forgerock.i18n.LocalizableMessage; 023import org.opends.server.api.DirectoryThread; 024import org.opends.server.core.DirectoryServer; 025import org.forgerock.i18n.slf4j.LocalizedLogger; 026import org.opends.server.types.CancelRequest; 027import org.opends.server.types.DisconnectReason; 028import org.opends.server.types.Operation; 029 030import static org.opends.messages.CoreMessages.*; 031import static org.opends.server.util.StaticUtils.*; 032 033 034/** 035 * This class defines a data structure for storing and interacting with a 036 * Directory Server worker thread. 037 */ 038public class ParallelWorkerThread 039 extends DirectoryThread 040{ 041 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 042 043 /** 044 * Indicates whether the Directory Server is shutting down and this thread 045 * should stop running. 046 */ 047 private boolean shutdownRequested; 048 049 /** 050 * Indicates whether this thread was stopped because the server threadnumber 051 * was reduced. 052 */ 053 private boolean stoppedByReducedThreadNumber; 054 055 /** Indicates whether this thread is currently waiting for work. */ 056 private boolean waitingForWork; 057 058 /** The operation that this worker thread is currently processing. */ 059 private Operation operation; 060 061 /** The handle to the actual thread for this worker thread. */ 062 private Thread workerThread; 063 064 /** The work queue that this worker thread will service. */ 065 private ParallelWorkQueue workQueue; 066 067 068 069 /** 070 * Creates a new worker thread that will service the provided work queue and 071 * process any new requests that are submitted. 072 * 073 * @param workQueue The work queue with which this worker thread is 074 * associated. 075 * @param threadID The thread ID for this worker thread. 076 */ 077 public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID) 078 { 079 super("Worker Thread " + threadID); 080 081 082 this.workQueue = workQueue; 083 084 stoppedByReducedThreadNumber = false; 085 shutdownRequested = false; 086 waitingForWork = false; 087 operation = null; 088 workerThread = null; 089 } 090 091 092 093 /** 094 * Indicates that this thread is about to be stopped because the Directory 095 * Server configuration has been updated to reduce the number of worker 096 * threads. 097 */ 098 public void setStoppedByReducedThreadNumber() 099 { 100 stoppedByReducedThreadNumber = true; 101 } 102 103 104 105 /** 106 * Indicates whether this worker thread is actively processing a request. 107 * Note that this is a point-in-time determination and if a reliable answer is 108 * expected then the server should impose some external constraint to ensure 109 * that no new requests are enqueued. 110 * 111 * @return {@code true} if this worker thread is actively processing a 112 * request, or {@code false} if it is idle. 113 */ 114 public boolean isActive() 115 { 116 return isAlive() && operation != null; 117 } 118 119 120 121 /** 122 * Operates in a loop, retrieving the next request from the work queue, 123 * processing it, and then going back to the queue for more. 124 */ 125 @Override 126 public void run() 127 { 128 workerThread = currentThread(); 129 130 while (! shutdownRequested) 131 { 132 try 133 { 134 waitingForWork = true; 135 operation = null; 136 operation = workQueue.nextOperation(this); 137 waitingForWork = false; 138 139 140 if (operation == null) 141 { 142 // The operation may be null if the server is shutting down. If that 143 // is the case, then break out of the while loop. 144 break; 145 } 146 else 147 { 148 // The operation is not null, so process it. Make sure that when 149 // processing is complete. 150 operation.run(); 151 operation.operationCompleted(); 152 } 153 } 154 catch (Throwable t) 155 { 156 if (logger.isTraceEnabled()) 157 { 158 logger.trace( 159 "Uncaught exception in worker thread while processing " + 160 "operation %s: %s", operation, t); 161 162 logger.traceException(t); 163 } 164 165 try 166 { 167 LocalizableMessage message = 168 ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t)); 169 logger.error(message); 170 171 operation.setResultCode(DirectoryServer.getServerErrorResultCode()); 172 operation.appendErrorMessage(message); 173 operation.getClientConnection().sendResponse(operation); 174 } 175 catch (Throwable t2) 176 { 177 if (logger.isTraceEnabled()) 178 { 179 logger.trace( 180 "Exception in worker thread while trying to log a " + 181 "message about an uncaught exception %s: %s", t, t2); 182 183 logger.traceException(t2); 184 } 185 } 186 187 188 try 189 { 190 LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get( 191 getName(), operation, stackTraceToSingleLineString(t)); 192 operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message); 193 } 194 catch (Throwable t2) 195 { 196 logger.traceException(t2); 197 } 198 } 199 } 200 201 // If we have gotten here, then we presume that the server thread is 202 // shutting down. However, if that's not the case then that is a problem 203 // and we will want to log a message. 204 if (stoppedByReducedThreadNumber) 205 { 206 logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName()); 207 } 208 else if (! workQueue.shutdownRequested()) 209 { 210 logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName()); 211 } 212 213 214 if (logger.isTraceEnabled()) 215 { 216 logger.trace(getName() + " exiting."); 217 } 218 } 219 220 221 222 /** 223 * Indicates that the Directory Server has received a request to stop running 224 * and that this thread should stop running as soon as possible. 225 */ 226 public void shutDown() 227 { 228 if (logger.isTraceEnabled()) 229 { 230 logger.trace(getName() + " being signaled to shut down."); 231 } 232 233 // Set a flag that indicates that the thread should stop running. 234 shutdownRequested = true; 235 236 237 // Check to see if the thread is waiting for work. If so, then interrupt 238 // it. 239 if (waitingForWork) 240 { 241 try 242 { 243 workerThread.interrupt(); 244 } 245 catch (Exception e) 246 { 247 if (logger.isTraceEnabled()) 248 { 249 logger.trace( 250 "Caught an exception while trying to interrupt the worker " + 251 "thread waiting for work: %s", e); 252 logger.traceException(e); 253 } 254 } 255 } 256 else 257 { 258 try 259 { 260 CancelRequest cancelRequest = 261 new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get()); 262 operation.cancel(cancelRequest); 263 } 264 catch (Exception e) 265 { 266 if (logger.isTraceEnabled()) 267 { 268 logger.trace( 269 "Caught an exception while trying to abandon the " + 270 "operation in progress for the worker thread: %s", e); 271 logger.traceException(e); 272 } 273 } 274 } 275 } 276 277 /** 278 * Retrieves any relevent debug information with which this tread is 279 * associated so they can be included in debug messages. 280 * 281 * @return debug information about this thread as a string. 282 */ 283 @Override 284 public Map<String, String> getDebugProperties() 285 { 286 Map<String, String> properties = super.getDebugProperties(); 287 properties.put("clientConnection", 288 operation.getClientConnection().toString()); 289 properties.put("operation", operation.toString()); 290 291 return properties; 292 } 293} 294