001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2013-2016 ForgeRock AS.
015 */
016package org.opends.server.core;
017
018import java.util.concurrent.atomic.AtomicInteger;
019
020import org.opends.server.types.DirectoryException;
021import org.opends.server.types.Operation;
022
023/**
024 * A QueueingStrategy that concurrently enqueues a bounded number of operations
025 * to the DirectoryServer work queue. If the maximum number of concurrently
026 * enqueued operations has been reached or if the work queue if full, then the
027 * operation will be executed on the current thread.
028 */
029public class BoundedWorkQueueStrategy implements QueueingStrategy
030{
031  /** The number of concurrently running operations for this BoundedWorkQueueStrategy. */
032  private final AtomicInteger nbRunningOperations = new AtomicInteger(0);
033  /** Maximum number of concurrent operations. 0 means "unlimited". */
034  private final int maxNbConcurrentOperations;
035
036  /**
037   * Constructor for BoundedWorkQueueStrategy.
038   *
039   * @param maxNbConcurrentOperations
040   *          the maximum number of operations that can be concurrently enqueued
041   *          to the DirectoryServer work queue
042   */
043  public BoundedWorkQueueStrategy(Integer maxNbConcurrentOperations)
044  {
045    if (maxNbConcurrentOperations != null)
046    {
047      this.maxNbConcurrentOperations = maxNbConcurrentOperations;
048    }
049    else
050    {
051      int cpus = Runtime.getRuntime().availableProcessors();
052      this.maxNbConcurrentOperations =
053          Math.max(cpus, getNumWorkerThreads() * 25 / 100);
054    }
055  }
056
057  /**
058   * Return the maximum number of worker threads that can be used by the
059   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
060   *
061   * @return the maximum number of worker threads that can be used by the
062   *         WorkQueue
063   */
064  protected int getNumWorkerThreads()
065  {
066    return DirectoryServer.getWorkQueue().getNumWorkerThreads();
067  }
068
069  @Override
070  public void enqueueRequest(final Operation operation)
071      throws DirectoryException
072  {
073    if (!operation.getClientConnection().isConnectionValid())
074    {
075      // do not bother enqueueing
076      return;
077    }
078
079    if (maxNbConcurrentOperations == 0)
080    { // unlimited concurrent operations
081      if (!tryEnqueueRequest(operation))
082      { // avoid potential deadlocks by running in the current thread
083        operation.run();
084      }
085    }
086    else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations
087        || !tryEnqueueRequest(wrap(operation)))
088    { // avoid potential deadlocks by running in the current thread
089      try
090      {
091        operation.run();
092      }
093      finally
094      {
095        // only decrement when the operation is run synchronously.
096        // Otherwise it'll be decremented twice (once more in the wrapper).
097        nbRunningOperations.decrementAndGet();
098      }
099    }
100  }
101
102  /**
103   * Tries to add the provided operation to the work queue if not full so that
104   * it will be processed by one of the worker threads.
105   *
106   * @param op
107   *          The operation to be added to the work queue.
108   * @return true if the operation could be enqueued, false otherwise
109   * @throws DirectoryException
110   *           If a problem prevents the operation from being added to the queue
111   *           (e.g., the queue is full).
112   */
113  protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
114  {
115    return DirectoryServer.tryEnqueueRequest(op);
116  }
117
118  private Operation wrap(final Operation operation)
119  {
120    if (operation instanceof AbandonOperation)
121    {
122      return new AbandonOperationWrapper((AbandonOperation) operation)
123      {
124        @Override
125        public void run()
126        {
127          runWrapped(operation);
128        }
129      };
130    }
131    else if (operation instanceof AddOperation)
132    {
133      return new AddOperationWrapper((AddOperation) operation)
134      {
135        @Override
136        public void run()
137        {
138          runWrapped(operation);
139        }
140      };
141    }
142    else if (operation instanceof BindOperation)
143    {
144      return new BindOperationWrapper((BindOperation) operation)
145      {
146        @Override
147        public void run()
148        {
149          runWrapped(operation);
150        }
151      };
152    }
153    else if (operation instanceof CompareOperation)
154    {
155      return new CompareOperationWrapper((CompareOperation) operation)
156      {
157        @Override
158        public void run()
159        {
160          runWrapped(operation);
161        }
162      };
163    }
164    else if (operation instanceof DeleteOperation)
165    {
166      return new DeleteOperationWrapper((DeleteOperation) operation)
167      {
168        @Override
169        public void run()
170        {
171          runWrapped(operation);
172        }
173      };
174    }
175    else if (operation instanceof ExtendedOperation)
176    {
177      return new ExtendedOperationWrapper((ExtendedOperation) operation)
178      {
179        @Override
180        public void run()
181        {
182          runWrapped(operation);
183        }
184      };
185    }
186    else if (operation instanceof ModifyDNOperation)
187    {
188      return new ModifyDNOperationWrapper((ModifyDNOperation) operation)
189      {
190        @Override
191        public void run()
192        {
193          runWrapped(operation);
194        }
195      };
196    }
197    else if (operation instanceof ModifyOperation)
198    {
199      return new ModifyOperationWrapper((ModifyOperation) operation)
200      {
201        @Override
202        public void run()
203        {
204          runWrapped(operation);
205        }
206      };
207    }
208    else if (operation instanceof SearchOperation)
209    {
210      return new SearchOperationWrapper((SearchOperation) operation)
211      {
212        @Override
213        public void run()
214        {
215          runWrapped(operation);
216        }
217      };
218    }
219    else if (operation instanceof UnbindOperation)
220    {
221      return new UnbindOperationWrapper((UnbindOperation) operation)
222      {
223        @Override
224        public void run()
225        {
226          runWrapped(operation);
227        }
228      };
229    }
230    else
231    {
232      throw new RuntimeException(
233          "Not implemented for " + operation == null ? null : operation
234              .getClass().getName());
235    }
236  }
237
238  /**
239   * Execute the provided operation and decrement the number of currently
240   * running operations after it has finished executing.
241   *
242   * @param the
243   *          operation to execute
244   */
245  private void runWrapped(final Operation operation)
246  {
247    try
248    {
249      operation.run();
250    }
251    finally
252    {
253      nbRunningOperations.decrementAndGet();
254    }
255  }
256}