001/** 002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 003 * 004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved 005 * 006 * The contents of this file are subject to the terms 007 * of the Common Development and Distribution License 008 * (the License). You may not use this file except in 009 * compliance with the License. 010 * 011 * You can obtain a copy of the License at 012 * https://opensso.dev.java.net/public/CDDLv1.0.html or 013 * opensso/legal/CDDLv1.0.txt 014 * See the License for the specific language governing 015 * permission and limitations under the License. 016 * 017 * When distributing Covered Code, include this CDDL 018 * Header Notice in each file and include the License file 019 * at opensso/legal/CDDLv1.0.txt. 020 * If applicable, add the following below the CDDL Header, 021 * with the fields enclosed by brackets [] replaced by 022 * your own identifying information: 023 * "Portions Copyrighted [year] [name of copyright owner]" 024 * 025 * $Id: ThreadPool.java,v 1.10 2008/10/04 00:11:46 arviranga Exp $ 026 * 027 */ 028/** 029 * Portions Copyrighted 2015 ForgeRock AS. 030 */ 031 032package com.iplanet.am.util; 033 034import com.sun.identity.shared.debug.Debug; 035import org.forgerock.openam.audit.context.AuditRequestContextPropagatingRunnable; 036 037/** 038 * <p> 039 * This thread pool maintains a number of threads that run the tasks from a task 040 * queue one by one. The tasks are handled in asynchronous mode, which means it 041 * will not block the main thread to proceed while the task is being processed 042 * by the thread pool. 043 * <p> 044 * This thread pool has a fixed size of threads. It maintains all the tasks to 045 * be executed in a task queue. Each thread then in turn gets a task from the 046 * queue to execute. If the tasks in the task queue reaches a certain number(the 047 * threshold value), it will log an error message and ignore the new incoming 048 * tasks until the number of un-executed tasks is less than the threshold value. 049 * This guarantees the thread pool will not use up the system resources under 050 * heavy load. 051 * @supported.all.api 052 */ 053public class ThreadPool { 054 055 private int poolSize; 056 private int threshold; 057 private String poolName; 058 private Debug debug; 059 private java.util.ArrayList taskList; 060 private int busyThreadCount; 061 private int currentThreadCount; 062 private boolean shutdownThePool; 063 private boolean daemon; 064 private WorkerThread[] threads; 065 066 /** 067 * Constructs a thread pool with given parameters. 068 * 069 * @param name 070 * name of the thread pool. 071 * @param poolSize 072 * the thread pool size, indicates how many threads are created 073 * in the pool. 074 * @param threshold 075 * the maximum size of the task queue in the thread pool. 076 * @param daemon 077 * set the threads as daemon if true; otherwise if not. 078 * @param debug 079 * Debug object to send debugging message to. 080 */ 081 public ThreadPool(String name, int poolSize, int threshold, boolean daemon, 082 Debug debug) { 083 this.debug = debug; 084 this.poolSize = poolSize; 085 this.threshold = threshold; 086 this.poolName = name; 087 if (threshold > 0) { 088 // initialize the size of the ArrayList, it doesn't need to expand 089 // during runtime. 090 this.taskList = new java.util.ArrayList(threshold); 091 } else { 092 this.taskList = new java.util.ArrayList(); 093 } 094 this.busyThreadCount = 0; 095 this.currentThreadCount = 0; 096 this.daemon = daemon; 097 this.shutdownThePool = false; 098 this.threads = new WorkerThread[poolSize]; 099 if (debug.messageEnabled()) { 100 debug.message("Initiating login thread pool size = " 101 + this.poolSize + "\nThreshold = " + threshold); 102 } 103 synchronized (this) { 104 createThreads(poolSize); 105 } 106 } 107 108 /** 109 * Create thread for the pool. 110 * 111 * @param threadsToCreate number of threads of the pool after creation 112 */ 113 protected void createThreads(int threadsToCreate) { 114 if (threadsToCreate > poolSize) { 115 threadsToCreate = poolSize; 116 } 117 for (int i = currentThreadCount; i < threadsToCreate; i++) { 118 threads[i - busyThreadCount] = new WorkerThread(poolName, this); 119 threads[i - busyThreadCount].setDaemon(daemon); 120 threads[i - busyThreadCount].start(); 121 } 122 currentThreadCount = threadsToCreate; 123 } 124 125 private WorkerThread getAvailableThread() { 126 WorkerThread t = null; 127 synchronized (this) { 128 if (currentThreadCount == busyThreadCount) { 129 createThreads(poolSize); 130 } 131 // get threads from the end of the array 132 t = threads[currentThreadCount - busyThreadCount - 1]; 133 threads[currentThreadCount - busyThreadCount - 1] = null; 134 busyThreadCount++; 135 } 136 return t; 137 } 138 139 /** 140 * Runs a user defined task. 141 * 142 * @param task 143 * user defined task. 144 * @throws ThreadPoolException 145 */ 146 public final void run(Runnable task) throws ThreadPoolException 147 { 148 WorkerThread t = null; 149 synchronized (this) { 150 if (shutdownThePool) { 151 // No more tasks will be accepted 152 throw new ThreadPoolException(poolName + 153 " thread pool's being shutdown."); 154 } 155 if (busyThreadCount == poolSize) { 156 if ((threshold > 0) && (taskList.size() >= threshold)) { 157 throw new ThreadPoolException(poolName + 158 " thread pool's task queue is full."); 159 } else { 160 taskList.add(wrap(task)); 161 } 162 } 163 else{ 164 t = getAvailableThread(); 165 } 166 } 167 if ((t != null) && (task != null)) { 168 t.runTask(wrap(task)); 169 } 170 } 171 172 private Runnable wrap(Runnable delegate) { 173 return new AuditRequestContextPropagatingRunnable(delegate); 174 } 175 176 protected synchronized void deductCurrentThreadCount(){ 177 currentThreadCount--; 178 busyThreadCount--; 179 if (!taskList.isEmpty()) { 180 WorkerThread t = getAvailableThread(); 181 t.runTask((Runnable)taskList.remove(0)); 182 } else { 183 if (shutdownThePool && (busyThreadCount == 0)) { 184 notify(); 185 } 186 } 187 } 188 189 // return the thread to the thread pool 190 protected synchronized void returnThread(WorkerThread t) { 191 if (!taskList.isEmpty()){ 192 t.runTask((Runnable)taskList.remove(0)); 193 } 194 else{ 195 if(shutdownThePool) { 196 t.terminate(); 197 // notify the thread pool when all threads are backed 198 // need to discuss whether the thread pool need to wait until 199 // all threads are terminated. For stand alone application, the 200 // answer is yes, however, our application is run under web 201 // container. The reason why we need shutdown because it has a 202 // parameter daemon in the constructor, if it is set to false, 203 // the old implementation has no way to stop the running 204 // threads. For the new implementation, if daemon is set to 205 // false, it is necessary to call shutdown. If daemon is set to 206 // true, it is nice to call it because the thread pool has 207 // better knownledge than the web container to stop the threads 208 // in the pool. 209 busyThreadCount--; 210 currentThreadCount--; 211 if(busyThreadCount == 0){ 212 notify(); 213 } 214 } else { 215 busyThreadCount--; 216 // return threads from the end of array 217 threads[currentThreadCount - busyThreadCount - 1] = t; 218 } 219 } 220 } 221 222 // terminate all the threads since the pass-in parameter of daemon may be 223 // false 224 public synchronized void shutdown() { 225 if(!shutdownThePool) { 226 shutdownThePool = true; 227 // If daemon thread, discard the remaining tasks 228 // else, wait for all tasks to be completed 229 if (daemon) { 230 taskList.clear(); 231 } else { 232 while (!taskList.isEmpty()) { 233 try { 234 // wait if there are tasks & threads to be executed 235 wait(); 236 } catch (Exception ex) { 237 debug.error("ThreadPool.shutdown Excetion while " + 238 "waiting for tasks/threads to complete", ex); 239 } 240 } 241 } 242 for(int i = 0; i < currentThreadCount - busyThreadCount; i++) { 243 // terminate the thread from the beginning of the array 244 if (threads[i] != null) { 245 threads[i].terminate(); 246 } 247 } 248 while(busyThreadCount != 0){ 249 try{ 250 // wait if there are threads running, it will be notified 251 // when they all back. 252 wait(); 253 } catch (Exception ex) { 254 ex.printStackTrace(); 255 } 256 } 257 currentThreadCount = busyThreadCount = 0; 258 threads = null; 259 } 260 } 261 262 // for test only 263 public synchronized int getCurrentThreadCount() { 264 return currentThreadCount; 265 } 266 267 /* 268 * Returns the size of the task list. 269 */ 270 public int getCurrentSize() { 271 return taskList.size(); 272 } 273 274 // private thread class that fetches tasks from the task queue and 275 // executes them. 276 private class WorkerThread extends Thread { 277 278 private Runnable task = null; 279 private ThreadPool pool; 280 private boolean needReturn; 281 private boolean shouldTerminate; 282 283 public WorkerThread(String name, ThreadPool pool) { 284 setName(name); 285 this.pool = pool; 286 this.shouldTerminate = false; 287 this.needReturn = true; 288 } 289 290 /** 291 * Starts the thread pool. 292 */ 293 public void run() { 294 boolean localShouldTerminate = false; 295 Runnable localTask = null; 296 WorkerThread t = this; 297 while (true) { 298 try{ 299 synchronized (this) { 300 while ((task == null) && (!shouldTerminate)){ 301 this.wait(); 302 } 303 // need a local copy because they may be changed after 304 // leaving synchronized block. 305 localShouldTerminate = shouldTerminate; 306 localTask = task; 307 task = null; 308 } 309 if (localShouldTerminate) { 310 // we may need to log something here! 311 break; 312 } 313 if(localTask != null){ 314 localTask.run(); 315 } 316 } catch (RuntimeException ex) { 317 debug.error("Running task " + task, ex); 318 // decide what to log here 319 pool.deductCurrentThreadCount(); 320 localShouldTerminate = true; 321 needReturn = false; 322 } catch (Exception ex) { 323 // don't need to rethrow 324 debug.error("Running task " + task, ex); 325 } catch (Throwable e) { 326 debug.error("Running task " + task, e); 327 // decide what to log here 328 pool.deductCurrentThreadCount(); 329 localShouldTerminate = true; 330 needReturn = false; 331 // rethrow Error here 332 throw new Error(e); 333 } finally { 334 // the thread may has returned already if shutdown is 335 // called. 336 if (needReturn) { 337 pool.returnThread(t); 338 } 339 } 340 if (localShouldTerminate) { 341 // we may need to log something here! 342 break; 343 } 344 } 345 } 346 347 public synchronized void runTask(Runnable toRun) { 348 this.task = toRun; 349 // Although the thread may not in wait state when this function 350 // is called (the taskList is not empty), it doesn't hurt to 351 // call it. getState method can check whether the Thread is 352 // waiting, but it is available in jdk1.5 or newer. 353 this.notify(); 354 } 355 356 // terminate the thread pool when daemon is set to false 357 // it is better to have a way to terminate the thread pool 358 public synchronized void terminate() { 359 shouldTerminate = true; 360 needReturn = false; 361 this.notify(); 362 } 363 } 364}