001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2016 ForgeRock AS.
016 */
017package org.opends.server.api;
018
019import static org.opends.messages.CoreMessages.*;
020
021import org.forgerock.i18n.slf4j.LocalizedLogger;
022import org.forgerock.i18n.LocalizableMessage;
023import org.forgerock.opendj.server.config.server.WorkQueueCfg;
024import org.forgerock.opendj.config.server.ConfigException;
025import org.opends.server.types.DirectoryException;
026import org.opends.server.types.InitializationException;
027import org.opends.server.types.Operation;
028import org.opends.server.util.Platform;
029
030/**
031 * This class defines the structure and methods that must be
032 * implemented by a Directory Server work queue.  The work queue is
033 * the component of the server that accepts requests from connection
034 * handlers and ensures that they are properly processed.  The manner
035 * in which the work queue is able to accomplish this may vary between
036 * implementations, but in general it is assumed that one or more
037 * worker threads will be associated with the queue and may be used to
038 * process requests in parallel.
039 *
040 * @param  <T>  The type of configuration handled by this work queue.
041 */
042@org.opends.server.types.PublicAPI(
043     stability=org.opends.server.types.StabilityLevel.VOLATILE,
044     mayInstantiate=false,
045     mayExtend=true,
046     mayInvoke=true)
047public abstract class WorkQueue<T extends WorkQueueCfg>
048{
049
050  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
051
052  /**
053   * Initializes this work queue based on the information in the
054   * provided configuration entry.
055   *
056   * @param  configuration  The configuration to use to initialize
057   *                        the work queue.
058   *
059   * @throws  ConfigException  If the provided configuration entry
060   *                           does not have a valid work queue
061   *                           configuration.
062   *
063   * @throws  InitializationException  If a problem occurs during
064   *                                   initialization that is not
065   *                                   related to the server
066   *                                   configuration.
067   */
068  public abstract void initializeWorkQueue(T configuration)
069         throws ConfigException, InitializationException;
070
071
072
073  /**
074   * Performs any necessary finalization for this work queue,
075   * including ensuring that all active operations are interrupted or
076   * will be allowed to complete, and that all pending operations will
077   * be cancelled.
078   *
079   * @param  reason  The human-readable reason that the work queue is
080   *                 being shut down.
081   */
082  public abstract void finalizeWorkQueue(LocalizableMessage reason);
083
084
085
086  /**
087   * Submits an operation to be processed in the server.
088   *
089   * @param  operation  The operation to be processed.
090   *
091   * @throws  DirectoryException  If the provided operation is not
092   *                              accepted for some reason (e.g., if
093   *                              the server is shutting down or
094   *                              already has too many pending
095   *                              requests in the queue).
096   */
097  public abstract void submitOperation(Operation operation)
098         throws DirectoryException;
099
100
101
102  /**
103   * Tries to submit an operation to be processed in the server, without
104   * blocking.
105   *
106   * @param operation
107   *          The operation to be processed.
108   * @return true if the operation could be submitted to the queue, false if the
109   *         queue was full
110   * @throws DirectoryException
111   *           If the provided operation is not accepted for some reason (e.g.,
112   *           if the server is shutting down).
113   */
114  public abstract boolean trySubmitOperation(Operation operation)
115      throws DirectoryException;
116
117
118  /**
119   * Indicates whether the work queue is currently processing any
120   * requests.  Note that this is a point-in-time determination, and
121   * if any component of the server wishes to depend on a quiescent
122   * state then it should use some external mechanism to ensure that
123   * no other requests are submitted to the queue.
124   *
125   * @return  {@code true} if the work queue is currently idle, or
126   *          {@code false} if it is being used to process one or more
127   *          operations.
128   */
129  public abstract boolean isIdle();
130
131
132  /**
133   * Return the maximum number of worker threads that can be used by this
134   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
135   *
136   * @return the maximum number of worker threads that can be used by this
137   *         WorkQueue
138   */
139  public abstract int getNumWorkerThreads();
140
141
142  /**
143   * Computes the number of worker threads to use by the working queue based on
144   * the configured number.
145   *
146   * @param configuredNumWorkerThreads
147   *          the configured number of worker threads to use
148   * @return the number of worker threads to use
149   */
150  protected int computeNumWorkerThreads(Integer configuredNumWorkerThreads)
151  {
152    if (configuredNumWorkerThreads != null)
153    {
154      return configuredNumWorkerThreads;
155    }
156    else
157    {
158      // Automatically choose based on the number of processors.
159      int value = Platform.computeNumberOfThreads(16, 2.0f);
160      logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value);
161      return value;
162    }
163  }
164
165  /**
166   * Waits for the work queue to become idle before returning.  Note
167   * that this is a point-in-time determination, and if any component
168   * of the server wishes to depend on a quiescent state then it
169   * should use some external mechanism to ensure that no other
170   * requests are submitted to the queue.
171   *
172   * @param  timeLimit  The maximum length of time in milliseconds
173   *                    that this method should wait for the queue to
174   *                    become idle before giving up.  A time limit
175   *                    that is less than or equal to zero indicates
176   *                    that there should not be a time limit.
177   *
178   * @return  {@code true} if the work queue is idle at the time that
179   *          this method returns, or {@code false} if the wait time
180   *          limit was reached before the server became idle.
181   */
182  public boolean waitUntilIdle(long timeLimit)
183  {
184    long stopWaitingTime;
185    if (timeLimit <= 0)
186    {
187      stopWaitingTime = Long.MAX_VALUE;
188    }
189    else
190    {
191      stopWaitingTime = System.currentTimeMillis() + timeLimit;
192    }
193
194    while (System.currentTimeMillis() < stopWaitingTime)
195    {
196      if (isIdle())
197      {
198        return true;
199      }
200
201      try
202      {
203        Thread.sleep(1);
204      } catch (InterruptedException ie) {}
205    }
206
207    return false;
208  }
209}
210