001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2009 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2015 ForgeRock AS.
016 */
017package org.opends.server.extensions;
018
019
020import java.util.Map;
021
022import org.forgerock.i18n.LocalizableMessage;
023import org.opends.server.api.DirectoryThread;
024import org.opends.server.core.DirectoryServer;
025import org.forgerock.i18n.slf4j.LocalizedLogger;
026import org.opends.server.types.CancelRequest;
027import org.opends.server.types.DisconnectReason;
028import org.opends.server.types.Operation;
029
030import static org.opends.messages.CoreMessages.*;
031import static org.opends.server.util.StaticUtils.*;
032
033
034/**
035 * This class defines a data structure for storing and interacting with a
036 * Directory Server worker thread.
037 */
038public class ParallelWorkerThread
039       extends DirectoryThread
040{
041  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
042
043  /**
044   * Indicates whether the Directory Server is shutting down and this thread
045   * should stop running.
046   */
047  private boolean shutdownRequested;
048
049  /**
050   * Indicates whether this thread was stopped because the server threadnumber
051   * was reduced.
052   */
053  private boolean stoppedByReducedThreadNumber;
054
055  /** Indicates whether this thread is currently waiting for work. */
056  private boolean waitingForWork;
057
058  /** The operation that this worker thread is currently processing. */
059  private Operation operation;
060
061  /** The handle to the actual thread for this worker thread. */
062  private Thread workerThread;
063
064  /** The work queue that this worker thread will service. */
065  private ParallelWorkQueue workQueue;
066
067
068
069  /**
070   * Creates a new worker thread that will service the provided work queue and
071   * process any new requests that are submitted.
072   *
073   * @param  workQueue  The work queue with which this worker thread is
074   *                    associated.
075   * @param  threadID   The thread ID for this worker thread.
076   */
077  public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID)
078  {
079    super("Worker Thread " + threadID);
080
081
082    this.workQueue = workQueue;
083
084    stoppedByReducedThreadNumber = false;
085    shutdownRequested            = false;
086    waitingForWork               = false;
087    operation                    = null;
088    workerThread                 = null;
089  }
090
091
092
093  /**
094   * Indicates that this thread is about to be stopped because the Directory
095   * Server configuration has been updated to reduce the number of worker
096   * threads.
097   */
098  public void setStoppedByReducedThreadNumber()
099  {
100    stoppedByReducedThreadNumber = true;
101  }
102
103
104
105  /**
106   * Indicates whether this worker thread is actively processing a request.
107   * Note that this is a point-in-time determination and if a reliable answer is
108   * expected then the server should impose some external constraint to ensure
109   * that no new requests are enqueued.
110   *
111   * @return  {@code true} if this worker thread is actively processing a
112   *          request, or {@code false} if it is idle.
113   */
114  public boolean isActive()
115  {
116    return isAlive() && operation != null;
117  }
118
119
120
121  /**
122   * Operates in a loop, retrieving the next request from the work queue,
123   * processing it, and then going back to the queue for more.
124   */
125  @Override
126  public void run()
127  {
128    workerThread = currentThread();
129
130    while (! shutdownRequested)
131    {
132      try
133      {
134        waitingForWork = true;
135        operation = null;
136        operation = workQueue.nextOperation(this);
137        waitingForWork = false;
138
139
140        if (operation == null)
141        {
142          // The operation may be null if the server is shutting down.  If that
143          // is the case, then break out of the while loop.
144          break;
145        }
146        else
147        {
148          // The operation is not null, so process it.  Make sure that when
149          // processing is complete.
150          operation.run();
151          operation.operationCompleted();
152        }
153      }
154      catch (Throwable t)
155      {
156        if (logger.isTraceEnabled())
157        {
158          logger.trace(
159            "Uncaught exception in worker thread while processing " +
160                "operation %s: %s", operation, t);
161
162          logger.traceException(t);
163        }
164
165        try
166        {
167          LocalizableMessage message =
168              ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t));
169          logger.error(message);
170
171          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
172          operation.appendErrorMessage(message);
173          operation.getClientConnection().sendResponse(operation);
174        }
175        catch (Throwable t2)
176        {
177          if (logger.isTraceEnabled())
178          {
179            logger.trace(
180              "Exception in worker thread while trying to log a " +
181                  "message about an uncaught exception %s: %s", t, t2);
182
183            logger.traceException(t2);
184          }
185        }
186
187
188        try
189        {
190          LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(
191              getName(), operation, stackTraceToSingleLineString(t));
192          operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message);
193        }
194        catch (Throwable t2)
195        {
196          logger.traceException(t2);
197        }
198      }
199    }
200
201    // If we have gotten here, then we presume that the server thread is
202    // shutting down.  However, if that's not the case then that is a problem
203    // and we will want to log a message.
204    if (stoppedByReducedThreadNumber)
205    {
206      logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName());
207    }
208    else if (! workQueue.shutdownRequested())
209    {
210      logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName());
211    }
212
213
214    if (logger.isTraceEnabled())
215    {
216      logger.trace(getName() + " exiting.");
217    }
218  }
219
220
221
222  /**
223   * Indicates that the Directory Server has received a request to stop running
224   * and that this thread should stop running as soon as possible.
225   */
226  public void shutDown()
227  {
228    if (logger.isTraceEnabled())
229    {
230      logger.trace(getName() + " being signaled to shut down.");
231    }
232
233    // Set a flag that indicates that the thread should stop running.
234    shutdownRequested = true;
235
236
237    // Check to see if the thread is waiting for work.  If so, then interrupt
238    // it.
239    if (waitingForWork)
240    {
241      try
242      {
243        workerThread.interrupt();
244      }
245      catch (Exception e)
246      {
247        if (logger.isTraceEnabled())
248        {
249          logger.trace(
250            "Caught an exception while trying to interrupt the worker " +
251                "thread waiting for work: %s", e);
252          logger.traceException(e);
253        }
254      }
255    }
256    else
257    {
258      try
259      {
260        CancelRequest cancelRequest =
261          new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
262        operation.cancel(cancelRequest);
263      }
264      catch (Exception e)
265      {
266        if (logger.isTraceEnabled())
267        {
268          logger.trace(
269            "Caught an exception while trying to abandon the " +
270                "operation in progress for the worker thread: %s", e);
271          logger.traceException(e);
272        }
273      }
274    }
275  }
276
277  /**
278   * Retrieves any relevent debug information with which this tread is
279   * associated so they can be included in debug messages.
280   *
281   * @return debug information about this thread as a string.
282   */
283  @Override
284  public Map<String, String> getDebugProperties()
285  {
286    Map<String, String> properties = super.getDebugProperties();
287    properties.put("clientConnection",
288                   operation.getClientConnection().toString());
289    properties.put("operation", operation.toString());
290
291    return properties;
292  }
293}
294