001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2016 ForgeRock AS. 016 */ 017package org.opends.server.extensions; 018 019import static org.opends.messages.ConfigMessages.*; 020import static org.opends.messages.CoreMessages.*; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.Semaphore; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.forgerock.i18n.LocalizableMessage; 030import org.forgerock.i18n.slf4j.LocalizedLogger; 031import org.forgerock.opendj.config.server.ConfigException; 032import org.forgerock.opendj.ldap.ResultCode; 033import org.forgerock.opendj.config.server.ConfigurationChangeListener; 034import org.forgerock.opendj.server.config.server.ParallelWorkQueueCfg; 035import org.opends.server.api.WorkQueue; 036import org.opends.server.core.DirectoryServer; 037import org.opends.server.monitors.ParallelWorkQueueMonitor; 038import org.opends.server.types.CancelRequest; 039import org.forgerock.opendj.config.server.ConfigChangeResult; 040import org.opends.server.types.DirectoryException; 041import org.opends.server.types.InitializationException; 042import org.opends.server.types.Operation; 043 044/** 045 * This class defines a data structure for storing and interacting with the 046 * Directory Server work queue. 047 */ 048public class ParallelWorkQueue 049 extends WorkQueue<ParallelWorkQueueCfg> 050 implements ConfigurationChangeListener<ParallelWorkQueueCfg> 051{ 052 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 053 054 /** 055 * The maximum number of times to retry getting the next operation from the 056 * queue if an unexpected failure occurs. 057 */ 058 private static final int MAX_RETRY_COUNT = 5; 059 060 /** The set of worker threads that will be used to process this work queue. */ 061 private ArrayList<ParallelWorkerThread> workerThreads; 062 063 /** The number of operations that have been submitted to the work queue for processing. */ 064 private AtomicLong opsSubmitted; 065 066 /** 067 * Indicates whether one or more of the worker threads needs to be killed at 068 * the next convenient opportunity. 069 */ 070 private boolean killThreads; 071 072 /** Indicates whether the Directory Server is shutting down. */ 073 private boolean shutdownRequested; 074 075 /** The thread number used for the last worker thread that was created. */ 076 private int lastThreadNumber; 077 078 /** 079 * The number of worker threads that should be active (or will be shortly if a 080 * configuration change has not been completely applied). 081 */ 082 private int numWorkerThreads; 083 084 /** The queue that will be used to actually hold the pending operations. */ 085 private ConcurrentLinkedQueue<Operation> opQueue; 086 087 /** The lock used to provide threadsafe access for the queue. */ 088 private final Object queueLock = new Object(); 089 090 private final Semaphore queueSemaphore = new Semaphore(0, false); 091 092 /** 093 * Creates a new instance of this work queue. All initialization should be 094 * performed in the <CODE>initializeWorkQueue</CODE> method. 095 */ 096 public ParallelWorkQueue() 097 { 098 // No implementation should be performed here. 099 } 100 101 @Override 102 public void initializeWorkQueue(ParallelWorkQueueCfg configuration) 103 throws ConfigException, InitializationException 104 { 105 shutdownRequested = false; 106 killThreads = false; 107 opsSubmitted = new AtomicLong(0); 108 109 // Register to be notified of any configuration changes. 110 configuration.addParallelChangeListener(this); 111 112 // Get the necessary configuration from the provided entry. 113 numWorkerThreads = 114 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 115 116 // Create the actual work queue. 117 opQueue = new ConcurrentLinkedQueue<>(); 118 119 // Create the set of worker threads that should be used to service the work queue. 120 workerThreads = new ArrayList<>(numWorkerThreads); 121 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads; 122 lastThreadNumber++) 123 { 124 ParallelWorkerThread t = 125 new ParallelWorkerThread(this, lastThreadNumber); 126 t.start(); 127 workerThreads.add(t); 128 } 129 130 // Create and register a monitor provider for the work queue. 131 try 132 { 133 ParallelWorkQueueMonitor monitor = 134 new ParallelWorkQueueMonitor(this); 135 monitor.initializeMonitorProvider(null); 136 DirectoryServer.registerMonitorProvider(monitor); 137 } 138 catch (Exception e) 139 { 140 logger.traceException(e); 141 logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, ParallelWorkQueueMonitor.class, e); 142 } 143 } 144 145 @Override 146 public void finalizeWorkQueue(LocalizableMessage reason) 147 { 148 shutdownRequested = true; 149 150 // Send responses to any operations in the pending queue to indicate that 151 // they won't be processed because the server is shutting down. 152 CancelRequest cancelRequest = new CancelRequest(true, reason); 153 ArrayList<Operation> pendingOperations = new ArrayList<>(); 154 opQueue.removeAll(pendingOperations); 155 156 for (Operation o : pendingOperations) 157 { 158 try 159 { 160 // The operation has no chance of responding to the cancel 161 // request so avoid waiting for a cancel response. 162 if (o.getCancelResult() == null) { 163 o.abort(cancelRequest); 164 } 165 } 166 catch (Exception e) 167 { 168 logger.traceException(e); 169 logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e); 170 } 171 } 172 173 // Notify all the worker threads of the shutdown. 174 for (ParallelWorkerThread t : workerThreads) 175 { 176 try 177 { 178 t.shutDown(); 179 } 180 catch (Exception e) 181 { 182 logger.traceException(e); 183 logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e); 184 } 185 } 186 } 187 188 /** 189 * Indicates whether this work queue has received a request to shut down. 190 * 191 * @return <CODE>true</CODE> if the work queue has recieved a request to shut 192 * down, or <CODE>false</CODE> if not. 193 */ 194 public boolean shutdownRequested() 195 { 196 return shutdownRequested; 197 } 198 199 /** 200 * Submits an operation to be processed by one of the worker threads 201 * associated with this work queue. 202 * 203 * @param operation The operation to be processed. 204 * 205 * @throws DirectoryException If the provided operation is not accepted for 206 * some reason (e.g., if the server is shutting 207 * down or the pending operation queue is already 208 * at its maximum capacity). 209 */ 210 @Override 211 public void submitOperation(Operation operation) throws DirectoryException 212 { 213 if (shutdownRequested) 214 { 215 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 216 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 217 } 218 219 opQueue.add(operation); 220 queueSemaphore.release(); 221 222 opsSubmitted.incrementAndGet(); 223 } 224 225 @Override 226 public boolean trySubmitOperation(Operation operation) 227 throws DirectoryException 228 { 229 submitOperation(operation); 230 return true; 231 } 232 233 /** 234 * Retrieves the next operation that should be processed by one of the worker 235 * threads, blocking if necessary until a new request arrives. This method 236 * should only be called by a worker thread associated with this work queue. 237 * 238 * @param workerThread The worker thread that is requesting the operation. 239 * 240 * @return The next operation that should be processed, or <CODE>null</CODE> 241 * if the server is shutting down and no more operations will be 242 * processed. 243 */ 244 public Operation nextOperation(ParallelWorkerThread workerThread) 245 { 246 return retryNextOperation(workerThread, 0); 247 } 248 249 /** 250 * Retrieves the next operation that should be processed by one of the worker 251 * threads following a previous failure attempt. A maximum of five 252 * consecutive failures will be allowed before returning <CODE>null</CODE>, 253 * which will cause the associated thread to exit. 254 * 255 * @param workerThread The worker thread that is requesting the operation. 256 * @param numFailures The number of consecutive failures that the worker 257 * thread has experienced so far. If this gets too 258 * high, then this method will return <CODE>null</CODE> 259 * rather than retrying. 260 * 261 * @return The next operation that should be processed, or <CODE>null</CODE> 262 * if the server is shutting down and no more operations will be 263 * processed, or if there have been too many consecutive failures. 264 */ 265 private Operation retryNextOperation( 266 ParallelWorkerThread workerThread, 267 int numFailures) 268 { 269 // See if we should kill off this thread. This could be necessary if the 270 // number of worker threads has been decreased with the server online. If 271 // so, then return null and the thread will exit. 272 if (killThreads) 273 { 274 synchronized (queueLock) 275 { 276 try 277 { 278 int currentThreads = workerThreads.size(); 279 if (currentThreads > numWorkerThreads) 280 { 281 if (workerThreads.remove(Thread.currentThread())) 282 { 283 currentThreads--; 284 } 285 286 if (currentThreads <= numWorkerThreads) 287 { 288 killThreads = false; 289 } 290 291 workerThread.setStoppedByReducedThreadNumber(); 292 return null; 293 } 294 } 295 catch (Exception e) 296 { 297 logger.traceException(e); 298 } 299 } 300 } 301 302 if (shutdownRequested || numFailures > MAX_RETRY_COUNT) 303 { 304 if (numFailures > MAX_RETRY_COUNT) 305 { 306 logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread 307 .currentThread().getName(), numFailures, MAX_RETRY_COUNT); 308 } 309 310 return null; 311 } 312 313 try 314 { 315 while (true) 316 { 317 Operation nextOperation = null; 318 if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) { 319 nextOperation = opQueue.poll(); 320 } 321 if (nextOperation == null) 322 { 323 // There was no work to do in the specified length of time. See if 324 // we should shutdown, and if not then just check again. 325 if (shutdownRequested) 326 { 327 return null; 328 } 329 else if (killThreads) 330 { 331 synchronized (queueLock) 332 { 333 try 334 { 335 int currentThreads = workerThreads.size(); 336 if (currentThreads > numWorkerThreads) 337 { 338 if (workerThreads.remove(Thread.currentThread())) 339 { 340 currentThreads--; 341 } 342 343 if (currentThreads <= numWorkerThreads) 344 { 345 killThreads = false; 346 } 347 348 workerThread.setStoppedByReducedThreadNumber(); 349 return null; 350 } 351 } 352 catch (Exception e) 353 { 354 logger.traceException(e); 355 } 356 } 357 } 358 } 359 else 360 { 361 return nextOperation; 362 } 363 } 364 } 365 catch (Exception e) 366 { 367 logger.traceException(e); 368 369 // This should not happen. The only recourse we have is to log a message 370 // and try again. 371 logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e); 372 return retryNextOperation(workerThread, numFailures + 1); 373 } 374 } 375 376 /** 377 * Attempts to remove the specified operation from this queue if it has not 378 * yet been picked up for processing by one of the worker threads. 379 * 380 * @param operation The operation to remove from the queue. 381 * 382 * @return <CODE>true</CODE> if the provided request was present in the queue 383 * and was removed successfully, or <CODE>false</CODE> it not. 384 */ 385 public boolean removeOperation(Operation operation) 386 { 387 return opQueue.remove(operation); 388 } 389 390 /** 391 * Retrieves the total number of operations that have been successfully 392 * submitted to this work queue for processing since server startup. This 393 * does not include operations that have been rejected for some reason like 394 * the queue already at its maximum capacity. 395 * 396 * @return The total number of operations that have been successfully 397 * submitted to this work queue since startup. 398 */ 399 public long getOpsSubmitted() 400 { 401 return opsSubmitted.longValue(); 402 } 403 404 /** 405 * Retrieves the number of pending operations in the queue that have not yet 406 * been picked up for processing. Note that this method is not a 407 * constant-time operation and can be relatively inefficient, so it should be 408 * used sparingly. 409 * 410 * @return The number of pending operations in the queue that have not yet 411 * been picked up for processing. 412 */ 413 public int size() 414 { 415 return opQueue.size(); 416 } 417 418 @Override 419 public boolean isConfigurationChangeAcceptable( 420 ParallelWorkQueueCfg configuration, 421 List<LocalizableMessage> unacceptableReasons) 422 { 423 return true; 424 } 425 426 @Override 427 public ConfigChangeResult applyConfigurationChange( 428 ParallelWorkQueueCfg configuration) 429 { 430 int newNumThreads = 431 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 432 433 // Apply a change to the number of worker threads if appropriate. 434 int currentThreads = workerThreads.size(); 435 if (newNumThreads != currentThreads) 436 { 437 synchronized (queueLock) 438 { 439 try 440 { 441 int threadsToAdd = newNumThreads - currentThreads; 442 if (threadsToAdd > 0) 443 { 444 for (int i = 0; i < threadsToAdd; i++) 445 { 446 ParallelWorkerThread t = 447 new ParallelWorkerThread(this, lastThreadNumber++); 448 workerThreads.add(t); 449 t.start(); 450 } 451 452 killThreads = false; 453 } 454 else 455 { 456 killThreads = true; 457 } 458 459 numWorkerThreads = newNumThreads; 460 } 461 catch (Exception e) 462 { 463 logger.traceException(e); 464 } 465 } 466 } 467 return new ConfigChangeResult(); 468 } 469 470 @Override 471 public boolean isIdle() 472 { 473 if (!opQueue.isEmpty()) { 474 return false; 475 } 476 477 synchronized (queueLock) 478 { 479 for (ParallelWorkerThread t : workerThreads) 480 { 481 if (t.isActive()) 482 { 483 return false; 484 } 485 } 486 487 return true; 488 } 489 } 490 491 /** 492 * Return the number of worker threads used by this WorkQueue. 493 * 494 * @return the number of worker threads used by this WorkQueue 495 */ 496 @Override 497 public int getNumWorkerThreads() 498 { 499 return this.numWorkerThreads; 500 } 501}