001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2013-2016 ForgeRock AS. 015 */ 016package org.opends.server.core; 017 018import java.util.concurrent.atomic.AtomicInteger; 019 020import org.opends.server.types.DirectoryException; 021import org.opends.server.types.Operation; 022 023/** 024 * A QueueingStrategy that concurrently enqueues a bounded number of operations 025 * to the DirectoryServer work queue. If the maximum number of concurrently 026 * enqueued operations has been reached or if the work queue if full, then the 027 * operation will be executed on the current thread. 028 */ 029public class BoundedWorkQueueStrategy implements QueueingStrategy 030{ 031 /** The number of concurrently running operations for this BoundedWorkQueueStrategy. */ 032 private final AtomicInteger nbRunningOperations = new AtomicInteger(0); 033 /** Maximum number of concurrent operations. 0 means "unlimited". */ 034 private final int maxNbConcurrentOperations; 035 036 /** 037 * Constructor for BoundedWorkQueueStrategy. 038 * 039 * @param maxNbConcurrentOperations 040 * the maximum number of operations that can be concurrently enqueued 041 * to the DirectoryServer work queue 042 */ 043 public BoundedWorkQueueStrategy(Integer maxNbConcurrentOperations) 044 { 045 if (maxNbConcurrentOperations != null) 046 { 047 this.maxNbConcurrentOperations = maxNbConcurrentOperations; 048 } 049 else 050 { 051 int cpus = Runtime.getRuntime().availableProcessors(); 052 this.maxNbConcurrentOperations = 053 Math.max(cpus, getNumWorkerThreads() * 25 / 100); 054 } 055 } 056 057 /** 058 * Return the maximum number of worker threads that can be used by the 059 * WorkQueue (The WorkQueue could have a thread pool which adjusts its size). 060 * 061 * @return the maximum number of worker threads that can be used by the 062 * WorkQueue 063 */ 064 protected int getNumWorkerThreads() 065 { 066 return DirectoryServer.getWorkQueue().getNumWorkerThreads(); 067 } 068 069 @Override 070 public void enqueueRequest(final Operation operation) 071 throws DirectoryException 072 { 073 if (!operation.getClientConnection().isConnectionValid()) 074 { 075 // do not bother enqueueing 076 return; 077 } 078 079 if (maxNbConcurrentOperations == 0) 080 { // unlimited concurrent operations 081 if (!tryEnqueueRequest(operation)) 082 { // avoid potential deadlocks by running in the current thread 083 operation.run(); 084 } 085 } 086 else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations 087 || !tryEnqueueRequest(wrap(operation))) 088 { // avoid potential deadlocks by running in the current thread 089 try 090 { 091 operation.run(); 092 } 093 finally 094 { 095 // only decrement when the operation is run synchronously. 096 // Otherwise it'll be decremented twice (once more in the wrapper). 097 nbRunningOperations.decrementAndGet(); 098 } 099 } 100 } 101 102 /** 103 * Tries to add the provided operation to the work queue if not full so that 104 * it will be processed by one of the worker threads. 105 * 106 * @param op 107 * The operation to be added to the work queue. 108 * @return true if the operation could be enqueued, false otherwise 109 * @throws DirectoryException 110 * If a problem prevents the operation from being added to the queue 111 * (e.g., the queue is full). 112 */ 113 protected boolean tryEnqueueRequest(Operation op) throws DirectoryException 114 { 115 return DirectoryServer.tryEnqueueRequest(op); 116 } 117 118 private Operation wrap(final Operation operation) 119 { 120 if (operation instanceof AbandonOperation) 121 { 122 return new AbandonOperationWrapper((AbandonOperation) operation) 123 { 124 @Override 125 public void run() 126 { 127 runWrapped(operation); 128 } 129 }; 130 } 131 else if (operation instanceof AddOperation) 132 { 133 return new AddOperationWrapper((AddOperation) operation) 134 { 135 @Override 136 public void run() 137 { 138 runWrapped(operation); 139 } 140 }; 141 } 142 else if (operation instanceof BindOperation) 143 { 144 return new BindOperationWrapper((BindOperation) operation) 145 { 146 @Override 147 public void run() 148 { 149 runWrapped(operation); 150 } 151 }; 152 } 153 else if (operation instanceof CompareOperation) 154 { 155 return new CompareOperationWrapper((CompareOperation) operation) 156 { 157 @Override 158 public void run() 159 { 160 runWrapped(operation); 161 } 162 }; 163 } 164 else if (operation instanceof DeleteOperation) 165 { 166 return new DeleteOperationWrapper((DeleteOperation) operation) 167 { 168 @Override 169 public void run() 170 { 171 runWrapped(operation); 172 } 173 }; 174 } 175 else if (operation instanceof ExtendedOperation) 176 { 177 return new ExtendedOperationWrapper((ExtendedOperation) operation) 178 { 179 @Override 180 public void run() 181 { 182 runWrapped(operation); 183 } 184 }; 185 } 186 else if (operation instanceof ModifyDNOperation) 187 { 188 return new ModifyDNOperationWrapper((ModifyDNOperation) operation) 189 { 190 @Override 191 public void run() 192 { 193 runWrapped(operation); 194 } 195 }; 196 } 197 else if (operation instanceof ModifyOperation) 198 { 199 return new ModifyOperationWrapper((ModifyOperation) operation) 200 { 201 @Override 202 public void run() 203 { 204 runWrapped(operation); 205 } 206 }; 207 } 208 else if (operation instanceof SearchOperation) 209 { 210 return new SearchOperationWrapper((SearchOperation) operation) 211 { 212 @Override 213 public void run() 214 { 215 runWrapped(operation); 216 } 217 }; 218 } 219 else if (operation instanceof UnbindOperation) 220 { 221 return new UnbindOperationWrapper((UnbindOperation) operation) 222 { 223 @Override 224 public void run() 225 { 226 runWrapped(operation); 227 } 228 }; 229 } 230 else 231 { 232 throw new RuntimeException( 233 "Not implemented for " + operation == null ? null : operation 234 .getClass().getName()); 235 } 236 } 237 238 /** 239 * Execute the provided operation and decrement the number of currently 240 * running operations after it has finished executing. 241 * 242 * @param the 243 * operation to execute 244 */ 245 private void runWrapped(final Operation operation) 246 { 247 try 248 { 249 operation.run(); 250 } 251 finally 252 { 253 nbRunningOperations.decrementAndGet(); 254 } 255 } 256}