001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2014 ForgeRock AS.
016 */
017package org.opends.server.extensions;
018
019import java.util.Map;
020
021import org.forgerock.i18n.LocalizableMessage;
022import org.opends.server.api.DirectoryThread;
023import org.opends.server.core.DirectoryServer;
024import org.forgerock.i18n.slf4j.LocalizedLogger;
025import org.opends.server.types.CancelRequest;
026import org.opends.server.types.DisconnectReason;
027import org.opends.server.types.Operation;
028
029import static org.opends.messages.CoreMessages.*;
030import static org.opends.server.util.StaticUtils.*;
031
032/**
033 * This class defines a data structure for storing and interacting with a
034 * Directory Server worker thread.
035 */
036public class TraditionalWorkerThread
037       extends DirectoryThread
038{
039  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
040
041  /**
042   * Indicates whether the Directory Server is shutting down and this thread
043   * should stop running.
044   */
045  private volatile boolean shutdownRequested;
046
047  /**
048   * Indicates whether this thread was stopped because the server thread number
049   * was reduced.
050   */
051  private boolean stoppedByReducedThreadNumber;
052
053  /** Indicates whether this thread is currently waiting for work. */
054  private boolean waitingForWork;
055
056  /** The operation that this worker thread is currently processing. */
057  private volatile Operation operation;
058
059  /** The handle to the actual thread for this worker thread. */
060  private Thread workerThread;
061
062  /** The work queue that this worker thread will service. */
063  private final TraditionalWorkQueue workQueue;
064
065
066
067  /**
068   * Creates a new worker thread that will service the provided work queue and
069   * process any new requests that are submitted.
070   *
071   * @param  workQueue  The work queue with which this worker thread is
072   *                    associated.
073   * @param  threadID   The thread ID for this worker thread.
074   */
075  public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID)
076  {
077    super("Worker Thread " + threadID);
078
079
080    this.workQueue = workQueue;
081
082    stoppedByReducedThreadNumber = false;
083    shutdownRequested            = false;
084    waitingForWork               = false;
085    operation                    = null;
086    workerThread                 = null;
087  }
088
089
090
091  /**
092   * Indicates that this thread is about to be stopped because the Directory
093   * Server configuration has been updated to reduce the number of worker
094   * threads.
095   */
096  public void setStoppedByReducedThreadNumber()
097  {
098    stoppedByReducedThreadNumber = true;
099  }
100
101
102
103  /**
104   * Indicates whether this worker thread is actively processing a request.
105   * Note that this is a point-in-time determination and if a reliable answer is
106   * expected then the server should impose some external constraint to ensure
107   * that no new requests are enqueued.
108   *
109   * @return  {@code true} if this worker thread is actively processing a
110   *          request, or {@code false} if it is idle.
111   */
112  public boolean isActive()
113  {
114    return isAlive() && operation != null;
115  }
116
117
118
119  /**
120   * Operates in a loop, retrieving the next request from the work queue,
121   * processing it, and then going back to the queue for more.
122   */
123  @Override
124  public void run()
125  {
126    workerThread = currentThread();
127
128    while (! shutdownRequested)
129    {
130      try
131      {
132        waitingForWork = true;
133        operation = null; // this line is necessary because next line can block
134        operation = workQueue.nextOperation(this);
135        waitingForWork = false;
136
137
138        if (operation == null)
139        {
140          // The operation may be null if the server is shutting down.  If that
141          // is the case, then break out of the while loop.
142          break;
143        }
144        else
145        {
146          // The operation is not null, so process it.  Make sure that when
147          // processing is complete.
148          operation.run();
149          operation.operationCompleted();
150        }
151      }
152      catch (Throwable t)
153      {
154        if (logger.isTraceEnabled())
155        {
156          logger.trace(
157            "Uncaught exception in worker thread while processing " +
158                "operation %s: %s", operation, t);
159          logger.traceException(t);
160        }
161
162        try
163        {
164          LocalizableMessage message =
165              ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t));
166          logger.error(message);
167
168          // Ensure that the client receives some kind of result so that it does
169          // not hang.
170          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
171          operation.appendErrorMessage(message);
172          operation.getClientConnection().sendResponse(operation);
173        }
174        catch (Throwable t2)
175        {
176          if (logger.isTraceEnabled())
177          {
178            logger.trace(
179              "Exception in worker thread while trying to log a " +
180                  "message about an uncaught exception %s: %s", t, t2);
181
182            logger.traceException(t2);
183          }
184        }
185
186
187        try
188        {
189          LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(
190              getName(), operation, stackTraceToSingleLineString(t));
191
192          operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message);
193        }
194        catch (Throwable t2)
195        {
196          logger.traceException(t2);
197        }
198      }
199    }
200
201    // If we have gotten here, then we presume that the server thread is
202    // shutting down.  However, if that's not the case then that is a problem
203    // and we will want to log a message.
204    if (stoppedByReducedThreadNumber)
205    {
206      logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName());
207    }
208    else if (! workQueue.shutdownRequested())
209    {
210      logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName());
211    }
212
213
214    if (logger.isTraceEnabled())
215    {
216      logger.trace(getName() + " exiting.");
217    }
218  }
219
220
221
222  /**
223   * Indicates that the Directory Server has received a request to stop running
224   * and that this thread should stop running as soon as possible.
225   */
226  public void shutDown()
227  {
228    if (logger.isTraceEnabled())
229    {
230      logger.trace(getName() + " being signaled to shut down.");
231    }
232
233    // Set a flag that indicates that the thread should stop running.
234    shutdownRequested = true;
235
236
237    // Check to see if the thread is waiting for work.  If so, then interrupt
238    // it.
239    if (waitingForWork)
240    {
241      try
242      {
243        workerThread.interrupt();
244      }
245      catch (Exception e)
246      {
247        if (logger.isTraceEnabled())
248        {
249          logger.trace(
250            "Caught an exception while trying to interrupt the worker " +
251                "thread waiting for work: %s", e);
252          logger.traceException(e);
253        }
254      }
255    }
256    else
257    {
258      try
259      {
260        final Operation localOperation = operation;
261        if (localOperation != null)
262        {
263          CancelRequest cancelRequest = new CancelRequest(true,
264              INFO_CANCELED_BY_SHUTDOWN.get());
265          localOperation.cancel(cancelRequest);
266        }
267      }
268      catch (Exception e)
269      {
270        if (logger.isTraceEnabled())
271        {
272          logger.trace(
273            "Caught an exception while trying to abandon the " +
274                "operation in progress for the worker thread: %s", e);
275          logger.traceException(e);
276        }
277      }
278    }
279  }
280
281  /**
282   * Retrieves any relevant debug information with which this tread is
283   * associated so they can be included in debug messages.
284   *
285   * @return debug information about this thread as a string.
286   */
287  @Override
288  public Map<String, String> getDebugProperties()
289  {
290    Map<String, String> properties = super.getDebugProperties();
291    properties.put("clientConnection", operation != null
292        ? String.valueOf(operation.getClientConnection()) : "none");
293    properties.put("operation", String.valueOf(operation));
294    return properties;
295  }
296}
297