001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2016 ForgeRock AS. 016 */ 017package org.opends.server.extensions; 018 019import static org.opends.messages.ConfigMessages.*; 020import static org.opends.messages.CoreMessages.*; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicLong; 027import java.util.concurrent.locks.ReentrantReadWriteLock; 028import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 029import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 030 031import org.forgerock.i18n.LocalizableMessage; 032import org.forgerock.i18n.slf4j.LocalizedLogger; 033import org.forgerock.opendj.config.server.ConfigException; 034import org.forgerock.opendj.ldap.ResultCode; 035import org.forgerock.opendj.config.server.ConfigurationChangeListener; 036import org.forgerock.opendj.server.config.server.TraditionalWorkQueueCfg; 037import org.opends.server.api.WorkQueue; 038import org.opends.server.core.DirectoryServer; 039import org.opends.server.monitors.TraditionalWorkQueueMonitor; 040import org.opends.server.types.CancelRequest; 041import org.forgerock.opendj.config.server.ConfigChangeResult; 042import org.opends.server.types.DirectoryException; 043import org.opends.server.types.InitializationException; 044import org.opends.server.types.Operation; 045 046/** 047 * This class defines a data structure for storing and interacting with the 048 * Directory Server work queue. 049 */ 050public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg> 051 implements ConfigurationChangeListener<TraditionalWorkQueueCfg> 052{ 053 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 054 055 /** 056 * The maximum number of times to retry getting the next operation from the 057 * queue if an unexpected failure occurs. 058 */ 059 private static final int MAX_RETRY_COUNT = 5; 060 061 /** The set of worker threads that will be used to process this work queue. */ 062 private final ArrayList<TraditionalWorkerThread> workerThreads = new ArrayList<>(); 063 064 /** The number of operations that have been submitted to the work queue for processing. */ 065 private AtomicLong opsSubmitted; 066 067 /** 068 * The number of times that an attempt to submit a new request has been 069 * rejected because the work queue is already at its maximum capacity. 070 */ 071 private AtomicLong queueFullRejects; 072 073 /** 074 * Indicates whether one or more of the worker threads needs to be killed at 075 * the next convenient opportunity. 076 */ 077 private boolean killThreads; 078 079 /** Indicates whether the Directory Server is shutting down. */ 080 private boolean shutdownRequested; 081 082 /** The thread number used for the last worker thread that was created. */ 083 private int lastThreadNumber; 084 085 /** 086 * The maximum number of pending requests that this work queue will allow 087 * before it will start rejecting them. 088 */ 089 private int maxCapacity; 090 091 /** 092 * The number of worker threads that should be active (or will be shortly if a 093 * configuration change has not been completely applied). 094 */ 095 private int numWorkerThreads; 096 097 /** 098 * The queue overflow policy: true indicates that operations will be blocked 099 * until the queue has available capacity, otherwise operations will be 100 * rejected. 101 * <p> 102 * This is hard-coded to true for now because a reject on full policy does not 103 * seem to have a valid use case. 104 * </p> 105 */ 106 private final boolean isBlocking = true; 107 108 /** The queue that will be used to actually hold the pending operations. */ 109 private LinkedBlockingQueue<Operation> opQueue; 110 111 /** The lock used to provide threadsafe access for the queue, used for non-config changes. */ 112 private final ReadLock queueReadLock; 113 114 /** The lock used to provide threadsafe access for the queue, used for config changes. */ 115 private final WriteLock queueWriteLock; 116 { 117 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 118 queueReadLock = lock.readLock(); 119 queueWriteLock = lock.writeLock(); 120 } 121 122 /** 123 * Creates a new instance of this work queue. All initialization should be 124 * performed in the <CODE>initializeWorkQueue</CODE> method. 125 */ 126 public TraditionalWorkQueue() 127 { 128 // No implementation should be performed here. 129 } 130 131 @Override 132 public void initializeWorkQueue(TraditionalWorkQueueCfg configuration) 133 throws ConfigException, InitializationException 134 { 135 queueWriteLock.lock(); 136 try 137 { 138 shutdownRequested = false; 139 killThreads = false; 140 opsSubmitted = new AtomicLong(0); 141 queueFullRejects = new AtomicLong(0); 142 143 // Register to be notified of any configuration changes. 144 configuration.addTraditionalChangeListener(this); 145 146 // Get the necessary configuration from the provided entry. 147 numWorkerThreads = 148 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 149 maxCapacity = configuration.getMaxWorkQueueCapacity(); 150 151 // Create the actual work queue. 152 if (maxCapacity > 0) 153 { 154 opQueue = new LinkedBlockingQueue<>(maxCapacity); 155 } 156 else 157 { 158 // This will never be the case, since the configuration definition 159 // ensures that the capacity is always finite. 160 opQueue = new LinkedBlockingQueue<>(); 161 } 162 163 // Create the set of worker threads that should be used to service the 164 // work queue. 165 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads; 166 lastThreadNumber++) 167 { 168 TraditionalWorkerThread t = new TraditionalWorkerThread(this, 169 lastThreadNumber); 170 t.start(); 171 workerThreads.add(t); 172 } 173 174 // Create and register a monitor provider for the work queue. 175 try 176 { 177 TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor( 178 this); 179 monitor.initializeMonitorProvider(null); 180 DirectoryServer.registerMonitorProvider(monitor); 181 } 182 catch (Exception e) 183 { 184 logger.traceException(e); 185 logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, TraditionalWorkQueueMonitor.class, e); 186 } 187 } 188 finally 189 { 190 queueWriteLock.unlock(); 191 } 192 } 193 194 @Override 195 public void finalizeWorkQueue(LocalizableMessage reason) 196 { 197 queueWriteLock.lock(); 198 try 199 { 200 shutdownRequested = true; 201 } 202 finally 203 { 204 queueWriteLock.unlock(); 205 } 206 207 // From now on no more operations can be enqueued or dequeued. 208 209 // Send responses to any operations in the pending queue to indicate that 210 // they won't be processed because the server is shutting down. 211 CancelRequest cancelRequest = new CancelRequest(true, reason); 212 ArrayList<Operation> pendingOperations = new ArrayList<>(); 213 opQueue.removeAll(pendingOperations); 214 for (Operation o : pendingOperations) 215 { 216 try 217 { 218 // The operation has no chance of responding to the cancel 219 // request so avoid waiting for a cancel response. 220 if (o.getCancelResult() == null) 221 { 222 o.abort(cancelRequest); 223 } 224 } 225 catch (Exception e) 226 { 227 logger.traceException(e); 228 logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e); 229 } 230 } 231 232 // Notify all the worker threads of the shutdown. 233 for (TraditionalWorkerThread t : workerThreads) 234 { 235 try 236 { 237 t.shutDown(); 238 } 239 catch (Exception e) 240 { 241 logger.traceException(e); 242 logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e); 243 } 244 } 245 } 246 247 /** 248 * Indicates whether this work queue has received a request to shut down. 249 * 250 * @return <CODE>true</CODE> if the work queue has recieved a request to shut 251 * down, or <CODE>false</CODE> if not. 252 */ 253 public boolean shutdownRequested() 254 { 255 queueReadLock.lock(); 256 try 257 { 258 return shutdownRequested; 259 } 260 finally 261 { 262 queueReadLock.unlock(); 263 } 264 } 265 266 /** 267 * Submits an operation to be processed by one of the worker threads 268 * associated with this work queue. 269 * 270 * @param operation 271 * The operation to be processed. 272 * @throws DirectoryException 273 * If the provided operation is not accepted for some reason (e.g., 274 * if the server is shutting down or the pending operation queue is 275 * already at its maximum capacity). 276 */ 277 @Override 278 public void submitOperation(Operation operation) throws DirectoryException 279 { 280 submitOperation(operation, isBlocking); 281 } 282 283 @Override 284 public boolean trySubmitOperation(Operation operation) 285 throws DirectoryException 286 { 287 try 288 { 289 submitOperation(operation, false); 290 return true; 291 } 292 catch (DirectoryException e) 293 { 294 if (ResultCode.BUSY == e.getResultCode()) 295 { 296 return false; 297 } 298 throw e; 299 } 300 } 301 302 private void submitOperation(Operation operation, 303 boolean blockEnqueuingWhenFull) throws DirectoryException 304 { 305 queueReadLock.lock(); 306 try 307 { 308 if (shutdownRequested) 309 { 310 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 311 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 312 } 313 314 if (blockEnqueuingWhenFull) 315 { 316 try 317 { 318 // If the queue is full and there is an administrative change taking 319 // place then starvation could arise: this thread will hold the read 320 // lock, the admin thread will be waiting on the write lock, and the 321 // worker threads may be queued behind the admin thread. Since the 322 // worker threads cannot run, the queue will never empty and allow 323 // this thread to proceed. To help things out we can periodically 324 // yield the read lock when the queue is full. 325 while (!opQueue.offer(operation, 1, TimeUnit.SECONDS)) 326 { 327 queueReadLock.unlock(); 328 Thread.yield(); 329 queueReadLock.lock(); 330 331 if (shutdownRequested) 332 { 333 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 334 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 335 } 336 } 337 } 338 catch (InterruptedException e) 339 { 340 // We cannot handle the interruption here. Reject the request and 341 // re-interrupt this thread. 342 Thread.currentThread().interrupt(); 343 344 queueFullRejects.incrementAndGet(); 345 346 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get(); 347 throw new DirectoryException(ResultCode.BUSY, message); 348 } 349 } 350 else 351 { 352 if (!opQueue.offer(operation)) 353 { 354 queueFullRejects.incrementAndGet(); 355 356 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity); 357 throw new DirectoryException(ResultCode.BUSY, message); 358 } 359 } 360 361 opsSubmitted.incrementAndGet(); 362 } 363 finally 364 { 365 queueReadLock.unlock(); 366 } 367 } 368 369 /** 370 * Retrieves the next operation that should be processed by one of the worker 371 * threads, blocking if necessary until a new request arrives. This method 372 * should only be called by a worker thread associated with this work queue. 373 * 374 * @param workerThread 375 * The worker thread that is requesting the operation. 376 * @return The next operation that should be processed, or <CODE>null</CODE> 377 * if the server is shutting down and no more operations will be 378 * processed. 379 */ 380 public Operation nextOperation(TraditionalWorkerThread workerThread) 381 { 382 return retryNextOperation(workerThread, 0); 383 } 384 385 /** 386 * Retrieves the next operation that should be processed by one of the worker 387 * threads following a previous failure attempt. A maximum of five consecutive 388 * failures will be allowed before returning <CODE>null</CODE>, which will 389 * cause the associated thread to exit. 390 * 391 * @param workerThread 392 * The worker thread that is requesting the operation. 393 * @param numFailures 394 * The number of consecutive failures that the worker thread has 395 * experienced so far. If this gets too high, then this method will 396 * return <CODE>null</CODE> rather than retrying. 397 * @return The next operation that should be processed, or <CODE>null</CODE> 398 * if the server is shutting down and no more operations will be 399 * processed, or if there have been too many consecutive failures. 400 */ 401 private Operation retryNextOperation(TraditionalWorkerThread workerThread, 402 int numFailures) 403 { 404 // See if we should kill off this thread. This could be necessary if the 405 // number of worker threads has been decreased with the server online. If 406 // so, then return null and the thread will exit. 407 queueReadLock.lock(); 408 try 409 { 410 if (shutdownRequested) 411 { 412 return null; 413 } 414 415 if (killThreads && tryKillThisWorkerThread(workerThread)) 416 { 417 return null; 418 } 419 420 if (numFailures > MAX_RETRY_COUNT) 421 { 422 logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread 423 .currentThread().getName(), numFailures, MAX_RETRY_COUNT); 424 425 return null; 426 } 427 428 while (true) 429 { 430 Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS); 431 if (nextOperation != null) 432 { 433 return nextOperation; 434 } 435 436 // There was no work to do in the specified length of time. Release the 437 // read lock allowing shutdown or config changes to proceed and then see 438 // if we should give up or check again. 439 queueReadLock.unlock(); 440 Thread.yield(); 441 queueReadLock.lock(); 442 443 if (shutdownRequested) 444 { 445 return null; 446 } 447 448 if (killThreads && tryKillThisWorkerThread(workerThread)) 449 { 450 return null; 451 } 452 } 453 } 454 catch (InterruptedException ie) 455 { 456 // This is somewhat expected so don't log. 457 // assert debugException(CLASS_NAME, "retryNextOperation", ie); 458 459 // If this occurs, then the worker thread must have been interrupted for 460 // some reason. This could be because the Directory Server is shutting 461 // down, in which case we should return null. 462 if (shutdownRequested) 463 { 464 return null; 465 } 466 467 // If we've gotten here, then the worker thread was interrupted for some 468 // other reason. This should not happen, and we need to log a message. 469 logger.warn(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN, Thread.currentThread().getName(), ie); 470 } 471 catch (Exception e) 472 { 473 logger.traceException(e); 474 475 // This should not happen. The only recourse we have is to log a message 476 // and try again. 477 logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e); 478 } 479 finally 480 { 481 queueReadLock.unlock(); 482 } 483 484 // An exception has occurred - retry. 485 return retryNextOperation(workerThread, numFailures + 1); 486 } 487 488 /** 489 * Kills this worker thread if needed. This method assumes that the read lock 490 * is already taken and ensure that it is taken on exit. 491 * 492 * @param workerThread 493 * The worker thread associated with this thread. 494 * @return {@code true} if this thread was killed or is about to be killed as 495 * a result of shutdown. 496 */ 497 private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread) 498 { 499 queueReadLock.unlock(); 500 queueWriteLock.lock(); 501 try 502 { 503 if (shutdownRequested) 504 { 505 // Shutdown may have been requested between unlock/lock. This thread is 506 // about to shutdown anyway, so return true. 507 return true; 508 } 509 510 int currentThreads = workerThreads.size(); 511 if (currentThreads > numWorkerThreads) 512 { 513 if (workerThreads.remove(Thread.currentThread())) 514 { 515 currentThreads--; 516 } 517 518 if (currentThreads <= numWorkerThreads) 519 { 520 killThreads = false; 521 } 522 523 workerThread.setStoppedByReducedThreadNumber(); 524 return true; 525 } 526 } 527 finally 528 { 529 queueWriteLock.unlock(); 530 queueReadLock.lock(); 531 532 if (shutdownRequested) 533 { 534 // Shutdown may have been requested between unlock/lock. This thread is 535 // about to shutdown anyway, so return true. 536 return true; 537 } 538 } 539 return false; 540 } 541 542 /** 543 * Retrieves the total number of operations that have been successfully 544 * submitted to this work queue for processing since server startup. This does 545 * not include operations that have been rejected for some reason like the 546 * queue already at its maximum capacity. 547 * 548 * @return The total number of operations that have been successfully 549 * submitted to this work queue since startup. 550 */ 551 public long getOpsSubmitted() 552 { 553 return opsSubmitted.longValue(); 554 } 555 556 /** 557 * Retrieves the total number of operations that have been rejected because 558 * the work queue was already at its maximum capacity. 559 * 560 * @return The total number of operations that have been rejected because the 561 * work queue was already at its maximum capacity. 562 */ 563 public long getOpsRejectedDueToQueueFull() 564 { 565 return queueFullRejects.longValue(); 566 } 567 568 /** 569 * Retrieves the number of pending operations in the queue that have not yet 570 * been picked up for processing. Note that this method is not a constant-time 571 * operation and can be relatively inefficient, so it should be used 572 * sparingly. 573 * 574 * @return The number of pending operations in the queue that have not yet 575 * been picked up for processing. 576 */ 577 public int size() 578 { 579 queueReadLock.lock(); 580 try 581 { 582 return opQueue.size(); 583 } 584 finally 585 { 586 queueReadLock.unlock(); 587 } 588 } 589 590 @Override 591 public boolean isConfigurationChangeAcceptable( 592 TraditionalWorkQueueCfg configuration, List<LocalizableMessage> unacceptableReasons) 593 { 594 return true; 595 } 596 597 @Override 598 public ConfigChangeResult applyConfigurationChange( 599 TraditionalWorkQueueCfg configuration) 600 { 601 int newNumThreads = 602 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 603 int newMaxCapacity = configuration.getMaxWorkQueueCapacity(); 604 605 // Apply a change to the number of worker threads if appropriate. 606 int currentThreads = workerThreads.size(); 607 if (newNumThreads != currentThreads) 608 { 609 queueWriteLock.lock(); 610 try 611 { 612 int threadsToAdd = newNumThreads - currentThreads; 613 if (threadsToAdd > 0) 614 { 615 for (int i = 0; i < threadsToAdd; i++) 616 { 617 TraditionalWorkerThread t = new TraditionalWorkerThread(this, 618 lastThreadNumber++); 619 workerThreads.add(t); 620 t.start(); 621 } 622 623 killThreads = false; 624 } 625 else 626 { 627 killThreads = true; 628 } 629 630 numWorkerThreads = newNumThreads; 631 } 632 catch (Exception e) 633 { 634 logger.traceException(e); 635 } 636 finally 637 { 638 queueWriteLock.unlock(); 639 } 640 } 641 642 // Apply a change to the maximum capacity if appropriate. Since we can't 643 // change capacity on the fly, then we'll have to create a new queue and 644 // transfer any remaining items into it. Any thread that is waiting on the 645 // original queue will time out after at most a few seconds and further 646 // checks will be against the new queue. 647 if (newMaxCapacity != maxCapacity) 648 { 649 // First switch the queue with the exclusive lock. 650 queueWriteLock.lock(); 651 LinkedBlockingQueue<Operation> oldOpQueue; 652 try 653 { 654 LinkedBlockingQueue<Operation> newOpQueue = null; 655 if (newMaxCapacity > 0) 656 { 657 newOpQueue = new LinkedBlockingQueue<>(newMaxCapacity); 658 } 659 else 660 { 661 newOpQueue = new LinkedBlockingQueue<>(); 662 } 663 664 oldOpQueue = opQueue; 665 opQueue = newOpQueue; 666 667 maxCapacity = newMaxCapacity; 668 } 669 finally 670 { 671 queueWriteLock.unlock(); 672 } 673 674 // Now resubmit any pending requests - we'll need the shared lock. 675 Operation pendingOperation = null; 676 queueReadLock.lock(); 677 try 678 { 679 // We have to be careful when adding any existing pending operations 680 // because the new capacity could be less than what was already 681 // backlogged in the previous queue. If that happens, we may have to 682 // loop a few times to get everything in there. 683 while ((pendingOperation = oldOpQueue.poll()) != null) 684 { 685 opQueue.put(pendingOperation); 686 } 687 } 688 catch (InterruptedException e) 689 { 690 // We cannot handle the interruption here. Cancel pending requests and 691 // re-interrupt this thread. 692 Thread.currentThread().interrupt(); 693 694 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get(); 695 CancelRequest cancelRequest = new CancelRequest(true, message); 696 if (pendingOperation != null) 697 { 698 pendingOperation.abort(cancelRequest); 699 } 700 while ((pendingOperation = oldOpQueue.poll()) != null) 701 { 702 pendingOperation.abort(cancelRequest); 703 } 704 } 705 finally 706 { 707 queueReadLock.unlock(); 708 } 709 } 710 711 return new ConfigChangeResult(); 712 } 713 714 @Override 715 public boolean isIdle() 716 { 717 queueReadLock.lock(); 718 try 719 { 720 if (!opQueue.isEmpty()) 721 { 722 return false; 723 } 724 725 for (TraditionalWorkerThread t : workerThreads) 726 { 727 if (t.isActive()) 728 { 729 return false; 730 } 731 } 732 733 return true; 734 } 735 finally 736 { 737 queueReadLock.unlock(); 738 } 739 } 740 741 /** 742 * Return the number of worker threads used by this WorkQueue. 743 * 744 * @return the number of worker threads used by this WorkQueue 745 */ 746 @Override 747 public int getNumWorkerThreads() 748 { 749 return this.numWorkerThreads; 750 } 751}