001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2016 ForgeRock AS.
016 */
017package org.opends.server.extensions;
018
019import static org.opends.messages.ConfigMessages.*;
020import static org.opends.messages.CoreMessages.*;
021
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.Semaphore;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.forgerock.i18n.LocalizableMessage;
030import org.forgerock.i18n.slf4j.LocalizedLogger;
031import org.forgerock.opendj.config.server.ConfigException;
032import org.forgerock.opendj.ldap.ResultCode;
033import org.forgerock.opendj.config.server.ConfigurationChangeListener;
034import org.forgerock.opendj.server.config.server.ParallelWorkQueueCfg;
035import org.opends.server.api.WorkQueue;
036import org.opends.server.core.DirectoryServer;
037import org.opends.server.monitors.ParallelWorkQueueMonitor;
038import org.opends.server.types.CancelRequest;
039import org.forgerock.opendj.config.server.ConfigChangeResult;
040import org.opends.server.types.DirectoryException;
041import org.opends.server.types.InitializationException;
042import org.opends.server.types.Operation;
043
044/**
045 * This class defines a data structure for storing and interacting with the
046 * Directory Server work queue.
047 */
048public class ParallelWorkQueue
049       extends WorkQueue<ParallelWorkQueueCfg>
050       implements ConfigurationChangeListener<ParallelWorkQueueCfg>
051{
052  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
053
054  /**
055   * The maximum number of times to retry getting the next operation from the
056   * queue if an unexpected failure occurs.
057   */
058  private static final int MAX_RETRY_COUNT = 5;
059
060  /** The set of worker threads that will be used to process this work queue. */
061  private ArrayList<ParallelWorkerThread> workerThreads;
062
063  /** The number of operations that have been submitted to the work queue for processing. */
064  private AtomicLong opsSubmitted;
065
066  /**
067   * Indicates whether one or more of the worker threads needs to be killed at
068   * the next convenient opportunity.
069   */
070  private boolean killThreads;
071
072  /** Indicates whether the Directory Server is shutting down. */
073  private boolean shutdownRequested;
074
075  /** The thread number used for the last worker thread that was created. */
076  private int lastThreadNumber;
077
078  /**
079   * The number of worker threads that should be active (or will be shortly if a
080   * configuration change has not been completely applied).
081   */
082  private int numWorkerThreads;
083
084  /** The queue that will be used to actually hold the pending operations. */
085  private ConcurrentLinkedQueue<Operation> opQueue;
086
087  /** The lock used to provide threadsafe access for the queue. */
088  private final Object queueLock = new Object();
089
090  private final Semaphore queueSemaphore = new Semaphore(0, false);
091
092  /**
093   * Creates a new instance of this work queue.  All initialization should be
094   * performed in the <CODE>initializeWorkQueue</CODE> method.
095   */
096  public ParallelWorkQueue()
097  {
098    // No implementation should be performed here.
099  }
100
101  @Override
102  public void initializeWorkQueue(ParallelWorkQueueCfg configuration)
103         throws ConfigException, InitializationException
104  {
105    shutdownRequested = false;
106    killThreads       = false;
107    opsSubmitted      = new AtomicLong(0);
108
109    // Register to be notified of any configuration changes.
110    configuration.addParallelChangeListener(this);
111
112    // Get the necessary configuration from the provided entry.
113    numWorkerThreads =
114        computeNumWorkerThreads(configuration.getNumWorkerThreads());
115
116    // Create the actual work queue.
117    opQueue = new ConcurrentLinkedQueue<>();
118
119    // Create the set of worker threads that should be used to service the work queue.
120    workerThreads = new ArrayList<>(numWorkerThreads);
121    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
122    lastThreadNumber++)
123    {
124      ParallelWorkerThread t =
125           new ParallelWorkerThread(this, lastThreadNumber);
126      t.start();
127      workerThreads.add(t);
128    }
129
130    // Create and register a monitor provider for the work queue.
131    try
132    {
133      ParallelWorkQueueMonitor monitor =
134           new ParallelWorkQueueMonitor(this);
135      monitor.initializeMonitorProvider(null);
136      DirectoryServer.registerMonitorProvider(monitor);
137    }
138    catch (Exception e)
139    {
140      logger.traceException(e);
141      logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, ParallelWorkQueueMonitor.class, e);
142    }
143  }
144
145  @Override
146  public void finalizeWorkQueue(LocalizableMessage reason)
147  {
148    shutdownRequested = true;
149
150    // Send responses to any operations in the pending queue to indicate that
151    // they won't be processed because the server is shutting down.
152    CancelRequest cancelRequest = new CancelRequest(true, reason);
153    ArrayList<Operation> pendingOperations = new ArrayList<>();
154    opQueue.removeAll(pendingOperations);
155
156    for (Operation o : pendingOperations)
157    {
158      try
159      {
160        // The operation has no chance of responding to the cancel
161        // request so avoid waiting for a cancel response.
162        if (o.getCancelResult() == null) {
163          o.abort(cancelRequest);
164        }
165      }
166      catch (Exception e)
167      {
168        logger.traceException(e);
169        logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e);
170      }
171    }
172
173    // Notify all the worker threads of the shutdown.
174    for (ParallelWorkerThread t : workerThreads)
175    {
176      try
177      {
178        t.shutDown();
179      }
180      catch (Exception e)
181      {
182        logger.traceException(e);
183        logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e);
184      }
185    }
186  }
187
188  /**
189   * Indicates whether this work queue has received a request to shut down.
190   *
191   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
192   *          down, or <CODE>false</CODE> if not.
193   */
194  public boolean shutdownRequested()
195  {
196    return shutdownRequested;
197  }
198
199  /**
200   * Submits an operation to be processed by one of the worker threads
201   * associated with this work queue.
202   *
203   * @param  operation  The operation to be processed.
204   *
205   * @throws  DirectoryException  If the provided operation is not accepted for
206   *                              some reason (e.g., if the server is shutting
207   *                              down or the pending operation queue is already
208   *                              at its maximum capacity).
209   */
210  @Override
211  public void submitOperation(Operation operation) throws DirectoryException
212  {
213    if (shutdownRequested)
214    {
215      LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
216      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
217    }
218
219    opQueue.add(operation);
220    queueSemaphore.release();
221
222    opsSubmitted.incrementAndGet();
223  }
224
225  @Override
226  public boolean trySubmitOperation(Operation operation)
227      throws DirectoryException
228  {
229    submitOperation(operation);
230    return true;
231  }
232
233  /**
234   * Retrieves the next operation that should be processed by one of the worker
235   * threads, blocking if necessary until a new request arrives.  This method
236   * should only be called by a worker thread associated with this work queue.
237   *
238   * @param  workerThread  The worker thread that is requesting the operation.
239   *
240   * @return  The next operation that should be processed, or <CODE>null</CODE>
241   *          if the server is shutting down and no more operations will be
242   *          processed.
243   */
244  public Operation nextOperation(ParallelWorkerThread workerThread)
245  {
246    return retryNextOperation(workerThread, 0);
247  }
248
249  /**
250   * Retrieves the next operation that should be processed by one of the worker
251   * threads following a previous failure attempt.  A maximum of five
252   * consecutive failures will be allowed before returning <CODE>null</CODE>,
253   * which will cause the associated thread to exit.
254   *
255   * @param  workerThread  The worker thread that is requesting the operation.
256   * @param  numFailures   The number of consecutive failures that the worker
257   *                       thread has experienced so far.  If this gets too
258   *                       high, then this method will return <CODE>null</CODE>
259   *                       rather than retrying.
260   *
261   * @return  The next operation that should be processed, or <CODE>null</CODE>
262   *          if the server is shutting down and no more operations will be
263   *          processed, or if there have been too many consecutive failures.
264   */
265  private Operation retryNextOperation(
266                                       ParallelWorkerThread workerThread,
267                                       int numFailures)
268  {
269    // See if we should kill off this thread.  This could be necessary if the
270    // number of worker threads has been decreased with the server online. If
271    // so, then return null and the thread will exit.
272    if (killThreads)
273    {
274      synchronized (queueLock)
275      {
276        try
277        {
278          int currentThreads = workerThreads.size();
279          if (currentThreads > numWorkerThreads)
280          {
281            if (workerThreads.remove(Thread.currentThread()))
282            {
283              currentThreads--;
284            }
285
286            if (currentThreads <= numWorkerThreads)
287            {
288              killThreads = false;
289            }
290
291            workerThread.setStoppedByReducedThreadNumber();
292            return null;
293          }
294        }
295        catch (Exception e)
296        {
297          logger.traceException(e);
298        }
299      }
300    }
301
302    if (shutdownRequested || numFailures > MAX_RETRY_COUNT)
303    {
304      if (numFailures > MAX_RETRY_COUNT)
305      {
306        logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread
307            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
308      }
309
310      return null;
311    }
312
313    try
314    {
315      while (true)
316      {
317        Operation nextOperation = null;
318        if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
319          nextOperation = opQueue.poll();
320        }
321        if (nextOperation == null)
322        {
323          // There was no work to do in the specified length of time.  See if
324          // we should shutdown, and if not then just check again.
325          if (shutdownRequested)
326          {
327            return null;
328          }
329          else if (killThreads)
330          {
331            synchronized (queueLock)
332            {
333              try
334              {
335                int currentThreads = workerThreads.size();
336                if (currentThreads > numWorkerThreads)
337                {
338                  if (workerThreads.remove(Thread.currentThread()))
339                  {
340                    currentThreads--;
341                  }
342
343                  if (currentThreads <= numWorkerThreads)
344                  {
345                    killThreads = false;
346                  }
347
348                  workerThread.setStoppedByReducedThreadNumber();
349                  return null;
350                }
351              }
352              catch (Exception e)
353              {
354                logger.traceException(e);
355              }
356            }
357          }
358        }
359        else
360        {
361          return nextOperation;
362        }
363      }
364    }
365    catch (Exception e)
366    {
367      logger.traceException(e);
368
369      // This should not happen.  The only recourse we have is to log a message
370      // and try again.
371      logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e);
372      return retryNextOperation(workerThread, numFailures + 1);
373    }
374  }
375
376  /**
377   * Attempts to remove the specified operation from this queue if it has not
378   * yet been picked up for processing by one of the worker threads.
379   *
380   * @param  operation  The operation to remove from the queue.
381   *
382   * @return  <CODE>true</CODE> if the provided request was present in the queue
383   *          and was removed successfully, or <CODE>false</CODE> it not.
384   */
385  public boolean removeOperation(Operation operation)
386  {
387    return opQueue.remove(operation);
388  }
389
390  /**
391   * Retrieves the total number of operations that have been successfully
392   * submitted to this work queue for processing since server startup.  This
393   * does not include operations that have been rejected for some reason like
394   * the queue already at its maximum capacity.
395   *
396   * @return  The total number of operations that have been successfully
397   *          submitted to this work queue since startup.
398   */
399  public long getOpsSubmitted()
400  {
401    return opsSubmitted.longValue();
402  }
403
404  /**
405   * Retrieves the number of pending operations in the queue that have not yet
406   * been picked up for processing.  Note that this method is not a
407   * constant-time operation and can be relatively inefficient, so it should be
408   * used sparingly.
409   *
410   * @return  The number of pending operations in the queue that have not yet
411   *          been picked up for processing.
412   */
413  public int size()
414  {
415    return opQueue.size();
416  }
417
418  @Override
419  public boolean isConfigurationChangeAcceptable(
420                      ParallelWorkQueueCfg configuration,
421                      List<LocalizableMessage> unacceptableReasons)
422  {
423    return true;
424  }
425
426  @Override
427  public ConfigChangeResult applyConfigurationChange(
428                                 ParallelWorkQueueCfg configuration)
429  {
430    int newNumThreads =
431        computeNumWorkerThreads(configuration.getNumWorkerThreads());
432
433    // Apply a change to the number of worker threads if appropriate.
434    int currentThreads = workerThreads.size();
435    if (newNumThreads != currentThreads)
436    {
437      synchronized (queueLock)
438      {
439        try
440        {
441          int threadsToAdd = newNumThreads - currentThreads;
442          if (threadsToAdd > 0)
443          {
444            for (int i = 0; i < threadsToAdd; i++)
445            {
446              ParallelWorkerThread t =
447                   new ParallelWorkerThread(this, lastThreadNumber++);
448              workerThreads.add(t);
449              t.start();
450            }
451
452            killThreads = false;
453          }
454          else
455          {
456            killThreads = true;
457          }
458
459          numWorkerThreads = newNumThreads;
460        }
461        catch (Exception e)
462        {
463          logger.traceException(e);
464        }
465      }
466    }
467    return new ConfigChangeResult();
468  }
469
470  @Override
471  public boolean isIdle()
472  {
473    if (!opQueue.isEmpty()) {
474      return false;
475    }
476
477    synchronized (queueLock)
478    {
479      for (ParallelWorkerThread t : workerThreads)
480      {
481        if (t.isActive())
482        {
483          return false;
484        }
485      }
486
487      return true;
488    }
489  }
490
491  /**
492   * Return the number of worker threads used by this WorkQueue.
493   *
494   * @return the number of worker threads used by this WorkQueue
495   */
496  @Override
497  public int getNumWorkerThreads()
498  {
499    return this.numWorkerThreads;
500  }
501}