001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2016 ForgeRock AS. 016 */ 017package org.opends.server.api; 018 019import static org.opends.messages.CoreMessages.*; 020 021import org.forgerock.i18n.slf4j.LocalizedLogger; 022import org.forgerock.i18n.LocalizableMessage; 023import org.forgerock.opendj.server.config.server.WorkQueueCfg; 024import org.forgerock.opendj.config.server.ConfigException; 025import org.opends.server.types.DirectoryException; 026import org.opends.server.types.InitializationException; 027import org.opends.server.types.Operation; 028import org.opends.server.util.Platform; 029 030/** 031 * This class defines the structure and methods that must be 032 * implemented by a Directory Server work queue. The work queue is 033 * the component of the server that accepts requests from connection 034 * handlers and ensures that they are properly processed. The manner 035 * in which the work queue is able to accomplish this may vary between 036 * implementations, but in general it is assumed that one or more 037 * worker threads will be associated with the queue and may be used to 038 * process requests in parallel. 039 * 040 * @param <T> The type of configuration handled by this work queue. 041 */ 042@org.opends.server.types.PublicAPI( 043 stability=org.opends.server.types.StabilityLevel.VOLATILE, 044 mayInstantiate=false, 045 mayExtend=true, 046 mayInvoke=true) 047public abstract class WorkQueue<T extends WorkQueueCfg> 048{ 049 050 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 051 052 /** 053 * Initializes this work queue based on the information in the 054 * provided configuration entry. 055 * 056 * @param configuration The configuration to use to initialize 057 * the work queue. 058 * 059 * @throws ConfigException If the provided configuration entry 060 * does not have a valid work queue 061 * configuration. 062 * 063 * @throws InitializationException If a problem occurs during 064 * initialization that is not 065 * related to the server 066 * configuration. 067 */ 068 public abstract void initializeWorkQueue(T configuration) 069 throws ConfigException, InitializationException; 070 071 072 073 /** 074 * Performs any necessary finalization for this work queue, 075 * including ensuring that all active operations are interrupted or 076 * will be allowed to complete, and that all pending operations will 077 * be cancelled. 078 * 079 * @param reason The human-readable reason that the work queue is 080 * being shut down. 081 */ 082 public abstract void finalizeWorkQueue(LocalizableMessage reason); 083 084 085 086 /** 087 * Submits an operation to be processed in the server. 088 * 089 * @param operation The operation to be processed. 090 * 091 * @throws DirectoryException If the provided operation is not 092 * accepted for some reason (e.g., if 093 * the server is shutting down or 094 * already has too many pending 095 * requests in the queue). 096 */ 097 public abstract void submitOperation(Operation operation) 098 throws DirectoryException; 099 100 101 102 /** 103 * Tries to submit an operation to be processed in the server, without 104 * blocking. 105 * 106 * @param operation 107 * The operation to be processed. 108 * @return true if the operation could be submitted to the queue, false if the 109 * queue was full 110 * @throws DirectoryException 111 * If the provided operation is not accepted for some reason (e.g., 112 * if the server is shutting down). 113 */ 114 public abstract boolean trySubmitOperation(Operation operation) 115 throws DirectoryException; 116 117 118 /** 119 * Indicates whether the work queue is currently processing any 120 * requests. Note that this is a point-in-time determination, and 121 * if any component of the server wishes to depend on a quiescent 122 * state then it should use some external mechanism to ensure that 123 * no other requests are submitted to the queue. 124 * 125 * @return {@code true} if the work queue is currently idle, or 126 * {@code false} if it is being used to process one or more 127 * operations. 128 */ 129 public abstract boolean isIdle(); 130 131 132 /** 133 * Return the maximum number of worker threads that can be used by this 134 * WorkQueue (The WorkQueue could have a thread pool which adjusts its size). 135 * 136 * @return the maximum number of worker threads that can be used by this 137 * WorkQueue 138 */ 139 public abstract int getNumWorkerThreads(); 140 141 142 /** 143 * Computes the number of worker threads to use by the working queue based on 144 * the configured number. 145 * 146 * @param configuredNumWorkerThreads 147 * the configured number of worker threads to use 148 * @return the number of worker threads to use 149 */ 150 protected int computeNumWorkerThreads(Integer configuredNumWorkerThreads) 151 { 152 if (configuredNumWorkerThreads != null) 153 { 154 return configuredNumWorkerThreads; 155 } 156 else 157 { 158 // Automatically choose based on the number of processors. 159 int value = Platform.computeNumberOfThreads(16, 2.0f); 160 logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value); 161 return value; 162 } 163 } 164 165 /** 166 * Waits for the work queue to become idle before returning. Note 167 * that this is a point-in-time determination, and if any component 168 * of the server wishes to depend on a quiescent state then it 169 * should use some external mechanism to ensure that no other 170 * requests are submitted to the queue. 171 * 172 * @param timeLimit The maximum length of time in milliseconds 173 * that this method should wait for the queue to 174 * become idle before giving up. A time limit 175 * that is less than or equal to zero indicates 176 * that there should not be a time limit. 177 * 178 * @return {@code true} if the work queue is idle at the time that 179 * this method returns, or {@code false} if the wait time 180 * limit was reached before the server became idle. 181 */ 182 public boolean waitUntilIdle(long timeLimit) 183 { 184 long stopWaitingTime; 185 if (timeLimit <= 0) 186 { 187 stopWaitingTime = Long.MAX_VALUE; 188 } 189 else 190 { 191 stopWaitingTime = System.currentTimeMillis() + timeLimit; 192 } 193 194 while (System.currentTimeMillis() < stopWaitingTime) 195 { 196 if (isIdle()) 197 { 198 return true; 199 } 200 201 try 202 { 203 Thread.sleep(1); 204 } catch (InterruptedException ie) {} 205 } 206 207 return false; 208 } 209} 210