001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2016 ForgeRock AS.
016 */
017package org.opends.server.extensions;
018
019import static org.opends.messages.ConfigMessages.*;
020import static org.opends.messages.CoreMessages.*;
021
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicLong;
027import java.util.concurrent.locks.ReentrantReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.forgerock.i18n.slf4j.LocalizedLogger;
033import org.forgerock.opendj.config.server.ConfigException;
034import org.forgerock.opendj.ldap.ResultCode;
035import org.forgerock.opendj.config.server.ConfigurationChangeListener;
036import org.forgerock.opendj.server.config.server.TraditionalWorkQueueCfg;
037import org.opends.server.api.WorkQueue;
038import org.opends.server.core.DirectoryServer;
039import org.opends.server.monitors.TraditionalWorkQueueMonitor;
040import org.opends.server.types.CancelRequest;
041import org.forgerock.opendj.config.server.ConfigChangeResult;
042import org.opends.server.types.DirectoryException;
043import org.opends.server.types.InitializationException;
044import org.opends.server.types.Operation;
045
046/**
047 * This class defines a data structure for storing and interacting with the
048 * Directory Server work queue.
049 */
050public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
051    implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
052{
053  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
054
055  /**
056   * The maximum number of times to retry getting the next operation from the
057   * queue if an unexpected failure occurs.
058   */
059  private static final int MAX_RETRY_COUNT = 5;
060
061  /** The set of worker threads that will be used to process this work queue. */
062  private final ArrayList<TraditionalWorkerThread> workerThreads = new ArrayList<>();
063
064  /** The number of operations that have been submitted to the work queue for processing. */
065  private AtomicLong opsSubmitted;
066
067  /**
068   * The number of times that an attempt to submit a new request has been
069   * rejected because the work queue is already at its maximum capacity.
070   */
071  private AtomicLong queueFullRejects;
072
073  /**
074   * Indicates whether one or more of the worker threads needs to be killed at
075   * the next convenient opportunity.
076   */
077  private boolean killThreads;
078
079  /** Indicates whether the Directory Server is shutting down. */
080  private boolean shutdownRequested;
081
082  /** The thread number used for the last worker thread that was created. */
083  private int lastThreadNumber;
084
085  /**
086   * The maximum number of pending requests that this work queue will allow
087   * before it will start rejecting them.
088   */
089  private int maxCapacity;
090
091  /**
092   * The number of worker threads that should be active (or will be shortly if a
093   * configuration change has not been completely applied).
094   */
095  private int numWorkerThreads;
096
097  /**
098   * The queue overflow policy: true indicates that operations will be blocked
099   * until the queue has available capacity, otherwise operations will be
100   * rejected.
101   * <p>
102   * This is hard-coded to true for now because a reject on full policy does not
103   * seem to have a valid use case.
104   * </p>
105   */
106  private final boolean isBlocking = true;
107
108  /** The queue that will be used to actually hold the pending operations. */
109  private LinkedBlockingQueue<Operation> opQueue;
110
111  /** The lock used to provide threadsafe access for the queue, used for non-config changes. */
112  private final ReadLock queueReadLock;
113
114  /** The lock used to provide threadsafe access for the queue, used for config changes. */
115  private final WriteLock queueWriteLock;
116  {
117    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
118    queueReadLock = lock.readLock();
119    queueWriteLock = lock.writeLock();
120  }
121
122  /**
123   * Creates a new instance of this work queue. All initialization should be
124   * performed in the <CODE>initializeWorkQueue</CODE> method.
125   */
126  public TraditionalWorkQueue()
127  {
128    // No implementation should be performed here.
129  }
130
131  @Override
132  public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
133      throws ConfigException, InitializationException
134  {
135    queueWriteLock.lock();
136    try
137    {
138      shutdownRequested = false;
139      killThreads = false;
140      opsSubmitted = new AtomicLong(0);
141      queueFullRejects = new AtomicLong(0);
142
143      // Register to be notified of any configuration changes.
144      configuration.addTraditionalChangeListener(this);
145
146      // Get the necessary configuration from the provided entry.
147      numWorkerThreads =
148          computeNumWorkerThreads(configuration.getNumWorkerThreads());
149      maxCapacity = configuration.getMaxWorkQueueCapacity();
150
151      // Create the actual work queue.
152      if (maxCapacity > 0)
153      {
154        opQueue = new LinkedBlockingQueue<>(maxCapacity);
155      }
156      else
157      {
158        // This will never be the case, since the configuration definition
159        // ensures that the capacity is always finite.
160        opQueue = new LinkedBlockingQueue<>();
161      }
162
163      // Create the set of worker threads that should be used to service the
164      // work queue.
165      for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
166        lastThreadNumber++)
167      {
168        TraditionalWorkerThread t = new TraditionalWorkerThread(this,
169            lastThreadNumber);
170        t.start();
171        workerThreads.add(t);
172      }
173
174      // Create and register a monitor provider for the work queue.
175      try
176      {
177        TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
178            this);
179        monitor.initializeMonitorProvider(null);
180        DirectoryServer.registerMonitorProvider(monitor);
181      }
182      catch (Exception e)
183      {
184        logger.traceException(e);
185        logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, TraditionalWorkQueueMonitor.class, e);
186      }
187    }
188    finally
189    {
190      queueWriteLock.unlock();
191    }
192  }
193
194  @Override
195  public void finalizeWorkQueue(LocalizableMessage reason)
196  {
197    queueWriteLock.lock();
198    try
199    {
200      shutdownRequested = true;
201    }
202    finally
203    {
204      queueWriteLock.unlock();
205    }
206
207    // From now on no more operations can be enqueued or dequeued.
208
209    // Send responses to any operations in the pending queue to indicate that
210    // they won't be processed because the server is shutting down.
211    CancelRequest cancelRequest = new CancelRequest(true, reason);
212    ArrayList<Operation> pendingOperations = new ArrayList<>();
213    opQueue.removeAll(pendingOperations);
214    for (Operation o : pendingOperations)
215    {
216      try
217      {
218        // The operation has no chance of responding to the cancel
219        // request so avoid waiting for a cancel response.
220        if (o.getCancelResult() == null)
221        {
222          o.abort(cancelRequest);
223        }
224      }
225      catch (Exception e)
226      {
227        logger.traceException(e);
228        logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e);
229      }
230    }
231
232    // Notify all the worker threads of the shutdown.
233    for (TraditionalWorkerThread t : workerThreads)
234    {
235      try
236      {
237        t.shutDown();
238      }
239      catch (Exception e)
240      {
241        logger.traceException(e);
242        logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e);
243      }
244    }
245  }
246
247  /**
248   * Indicates whether this work queue has received a request to shut down.
249   *
250   * @return <CODE>true</CODE> if the work queue has recieved a request to shut
251   *         down, or <CODE>false</CODE> if not.
252   */
253  public boolean shutdownRequested()
254  {
255    queueReadLock.lock();
256    try
257    {
258      return shutdownRequested;
259    }
260    finally
261    {
262      queueReadLock.unlock();
263    }
264  }
265
266  /**
267   * Submits an operation to be processed by one of the worker threads
268   * associated with this work queue.
269   *
270   * @param operation
271   *          The operation to be processed.
272   * @throws DirectoryException
273   *           If the provided operation is not accepted for some reason (e.g.,
274   *           if the server is shutting down or the pending operation queue is
275   *           already at its maximum capacity).
276   */
277  @Override
278  public void submitOperation(Operation operation) throws DirectoryException
279  {
280    submitOperation(operation, isBlocking);
281  }
282
283  @Override
284  public boolean trySubmitOperation(Operation operation)
285      throws DirectoryException
286  {
287    try
288    {
289      submitOperation(operation, false);
290      return true;
291    }
292    catch (DirectoryException e)
293    {
294      if (ResultCode.BUSY == e.getResultCode())
295      {
296        return false;
297      }
298      throw e;
299    }
300  }
301
302  private void submitOperation(Operation operation,
303      boolean blockEnqueuingWhenFull) throws DirectoryException
304  {
305    queueReadLock.lock();
306    try
307    {
308      if (shutdownRequested)
309      {
310        LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
311        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
312      }
313
314      if (blockEnqueuingWhenFull)
315      {
316        try
317        {
318          // If the queue is full and there is an administrative change taking
319          // place then starvation could arise: this thread will hold the read
320          // lock, the admin thread will be waiting on the write lock, and the
321          // worker threads may be queued behind the admin thread. Since the
322          // worker threads cannot run, the queue will never empty and allow
323          // this thread to proceed. To help things out we can periodically
324          // yield the read lock when the queue is full.
325          while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
326          {
327            queueReadLock.unlock();
328            Thread.yield();
329            queueReadLock.lock();
330
331            if (shutdownRequested)
332            {
333              LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
334              throw new DirectoryException(ResultCode.UNAVAILABLE, message);
335            }
336          }
337        }
338        catch (InterruptedException e)
339        {
340          // We cannot handle the interruption here. Reject the request and
341          // re-interrupt this thread.
342          Thread.currentThread().interrupt();
343
344          queueFullRejects.incrementAndGet();
345
346          LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
347          throw new DirectoryException(ResultCode.BUSY, message);
348        }
349      }
350      else
351      {
352        if (!opQueue.offer(operation))
353        {
354          queueFullRejects.incrementAndGet();
355
356          LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
357          throw new DirectoryException(ResultCode.BUSY, message);
358        }
359      }
360
361      opsSubmitted.incrementAndGet();
362    }
363    finally
364    {
365      queueReadLock.unlock();
366    }
367  }
368
369  /**
370   * Retrieves the next operation that should be processed by one of the worker
371   * threads, blocking if necessary until a new request arrives. This method
372   * should only be called by a worker thread associated with this work queue.
373   *
374   * @param workerThread
375   *          The worker thread that is requesting the operation.
376   * @return The next operation that should be processed, or <CODE>null</CODE>
377   *         if the server is shutting down and no more operations will be
378   *         processed.
379   */
380  public Operation nextOperation(TraditionalWorkerThread workerThread)
381  {
382    return retryNextOperation(workerThread, 0);
383  }
384
385  /**
386   * Retrieves the next operation that should be processed by one of the worker
387   * threads following a previous failure attempt. A maximum of five consecutive
388   * failures will be allowed before returning <CODE>null</CODE>, which will
389   * cause the associated thread to exit.
390   *
391   * @param workerThread
392   *          The worker thread that is requesting the operation.
393   * @param numFailures
394   *          The number of consecutive failures that the worker thread has
395   *          experienced so far. If this gets too high, then this method will
396   *          return <CODE>null</CODE> rather than retrying.
397   * @return The next operation that should be processed, or <CODE>null</CODE>
398   *         if the server is shutting down and no more operations will be
399   *         processed, or if there have been too many consecutive failures.
400   */
401  private Operation retryNextOperation(TraditionalWorkerThread workerThread,
402      int numFailures)
403  {
404    // See if we should kill off this thread. This could be necessary if the
405    // number of worker threads has been decreased with the server online. If
406    // so, then return null and the thread will exit.
407    queueReadLock.lock();
408    try
409    {
410      if (shutdownRequested)
411      {
412        return null;
413      }
414
415      if (killThreads && tryKillThisWorkerThread(workerThread))
416      {
417        return null;
418      }
419
420      if (numFailures > MAX_RETRY_COUNT)
421      {
422        logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread
423            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
424
425        return null;
426      }
427
428      while (true)
429      {
430        Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
431        if (nextOperation != null)
432        {
433          return nextOperation;
434        }
435
436        // There was no work to do in the specified length of time. Release the
437        // read lock allowing shutdown or config changes to proceed and then see
438        // if we should give up or check again.
439        queueReadLock.unlock();
440        Thread.yield();
441        queueReadLock.lock();
442
443        if (shutdownRequested)
444        {
445          return null;
446        }
447
448        if (killThreads && tryKillThisWorkerThread(workerThread))
449        {
450          return null;
451        }
452      }
453    }
454    catch (InterruptedException ie)
455    {
456      // This is somewhat expected so don't log.
457      // assert debugException(CLASS_NAME, "retryNextOperation", ie);
458
459      // If this occurs, then the worker thread must have been interrupted for
460      // some reason. This could be because the Directory Server is shutting
461      // down, in which case we should return null.
462      if (shutdownRequested)
463      {
464        return null;
465      }
466
467      // If we've gotten here, then the worker thread was interrupted for some
468      // other reason. This should not happen, and we need to log a message.
469      logger.warn(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN, Thread.currentThread().getName(), ie);
470    }
471    catch (Exception e)
472    {
473      logger.traceException(e);
474
475      // This should not happen. The only recourse we have is to log a message
476      // and try again.
477      logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e);
478    }
479    finally
480    {
481      queueReadLock.unlock();
482    }
483
484    // An exception has occurred - retry.
485    return retryNextOperation(workerThread, numFailures + 1);
486  }
487
488  /**
489   * Kills this worker thread if needed. This method assumes that the read lock
490   * is already taken and ensure that it is taken on exit.
491   *
492   * @param workerThread
493   *          The worker thread associated with this thread.
494   * @return {@code true} if this thread was killed or is about to be killed as
495   *         a result of shutdown.
496   */
497  private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
498  {
499    queueReadLock.unlock();
500    queueWriteLock.lock();
501    try
502    {
503      if (shutdownRequested)
504      {
505        // Shutdown may have been requested between unlock/lock. This thread is
506        // about to shutdown anyway, so return true.
507        return true;
508      }
509
510      int currentThreads = workerThreads.size();
511      if (currentThreads > numWorkerThreads)
512      {
513        if (workerThreads.remove(Thread.currentThread()))
514        {
515          currentThreads--;
516        }
517
518        if (currentThreads <= numWorkerThreads)
519        {
520          killThreads = false;
521        }
522
523        workerThread.setStoppedByReducedThreadNumber();
524        return true;
525      }
526    }
527    finally
528    {
529      queueWriteLock.unlock();
530      queueReadLock.lock();
531
532      if (shutdownRequested)
533      {
534        // Shutdown may have been requested between unlock/lock. This thread is
535        // about to shutdown anyway, so return true.
536        return true;
537      }
538    }
539    return false;
540  }
541
542  /**
543   * Retrieves the total number of operations that have been successfully
544   * submitted to this work queue for processing since server startup. This does
545   * not include operations that have been rejected for some reason like the
546   * queue already at its maximum capacity.
547   *
548   * @return The total number of operations that have been successfully
549   *         submitted to this work queue since startup.
550   */
551  public long getOpsSubmitted()
552  {
553    return opsSubmitted.longValue();
554  }
555
556  /**
557   * Retrieves the total number of operations that have been rejected because
558   * the work queue was already at its maximum capacity.
559   *
560   * @return The total number of operations that have been rejected because the
561   *         work queue was already at its maximum capacity.
562   */
563  public long getOpsRejectedDueToQueueFull()
564  {
565    return queueFullRejects.longValue();
566  }
567
568  /**
569   * Retrieves the number of pending operations in the queue that have not yet
570   * been picked up for processing. Note that this method is not a constant-time
571   * operation and can be relatively inefficient, so it should be used
572   * sparingly.
573   *
574   * @return The number of pending operations in the queue that have not yet
575   *         been picked up for processing.
576   */
577  public int size()
578  {
579    queueReadLock.lock();
580    try
581    {
582      return opQueue.size();
583    }
584    finally
585    {
586      queueReadLock.unlock();
587    }
588  }
589
590  @Override
591  public boolean isConfigurationChangeAcceptable(
592      TraditionalWorkQueueCfg configuration, List<LocalizableMessage> unacceptableReasons)
593  {
594    return true;
595  }
596
597  @Override
598  public ConfigChangeResult applyConfigurationChange(
599      TraditionalWorkQueueCfg configuration)
600  {
601    int newNumThreads =
602        computeNumWorkerThreads(configuration.getNumWorkerThreads());
603    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
604
605    // Apply a change to the number of worker threads if appropriate.
606    int currentThreads = workerThreads.size();
607    if (newNumThreads != currentThreads)
608    {
609      queueWriteLock.lock();
610      try
611      {
612        int threadsToAdd = newNumThreads - currentThreads;
613        if (threadsToAdd > 0)
614        {
615          for (int i = 0; i < threadsToAdd; i++)
616          {
617            TraditionalWorkerThread t = new TraditionalWorkerThread(this,
618                lastThreadNumber++);
619            workerThreads.add(t);
620            t.start();
621          }
622
623          killThreads = false;
624        }
625        else
626        {
627          killThreads = true;
628        }
629
630        numWorkerThreads = newNumThreads;
631      }
632      catch (Exception e)
633      {
634        logger.traceException(e);
635      }
636      finally
637      {
638        queueWriteLock.unlock();
639      }
640    }
641
642    // Apply a change to the maximum capacity if appropriate. Since we can't
643    // change capacity on the fly, then we'll have to create a new queue and
644    // transfer any remaining items into it. Any thread that is waiting on the
645    // original queue will time out after at most a few seconds and further
646    // checks will be against the new queue.
647    if (newMaxCapacity != maxCapacity)
648    {
649      // First switch the queue with the exclusive lock.
650      queueWriteLock.lock();
651      LinkedBlockingQueue<Operation> oldOpQueue;
652      try
653      {
654        LinkedBlockingQueue<Operation> newOpQueue = null;
655        if (newMaxCapacity > 0)
656        {
657          newOpQueue = new LinkedBlockingQueue<>(newMaxCapacity);
658        }
659        else
660        {
661          newOpQueue = new LinkedBlockingQueue<>();
662        }
663
664        oldOpQueue = opQueue;
665        opQueue = newOpQueue;
666
667        maxCapacity = newMaxCapacity;
668      }
669      finally
670      {
671        queueWriteLock.unlock();
672      }
673
674      // Now resubmit any pending requests - we'll need the shared lock.
675      Operation pendingOperation = null;
676      queueReadLock.lock();
677      try
678      {
679        // We have to be careful when adding any existing pending operations
680        // because the new capacity could be less than what was already
681        // backlogged in the previous queue. If that happens, we may have to
682        // loop a few times to get everything in there.
683        while ((pendingOperation = oldOpQueue.poll()) != null)
684        {
685          opQueue.put(pendingOperation);
686        }
687      }
688      catch (InterruptedException e)
689      {
690        // We cannot handle the interruption here. Cancel pending requests and
691        // re-interrupt this thread.
692        Thread.currentThread().interrupt();
693
694        LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
695        CancelRequest cancelRequest = new CancelRequest(true, message);
696        if (pendingOperation != null)
697        {
698          pendingOperation.abort(cancelRequest);
699        }
700        while ((pendingOperation = oldOpQueue.poll()) != null)
701        {
702          pendingOperation.abort(cancelRequest);
703        }
704      }
705      finally
706      {
707        queueReadLock.unlock();
708      }
709    }
710
711    return new ConfigChangeResult();
712  }
713
714  @Override
715  public boolean isIdle()
716  {
717    queueReadLock.lock();
718    try
719    {
720      if (!opQueue.isEmpty())
721      {
722        return false;
723      }
724
725      for (TraditionalWorkerThread t : workerThreads)
726      {
727        if (t.isActive())
728        {
729          return false;
730        }
731      }
732
733      return true;
734    }
735    finally
736    {
737      queueReadLock.unlock();
738    }
739  }
740
741  /**
742   * Return the number of worker threads used by this WorkQueue.
743   *
744   * @return the number of worker threads used by this WorkQueue
745   */
746  @Override
747  public int getNumWorkerThreads()
748  {
749    return this.numWorkerThreads;
750  }
751}