001/** 002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 003 * 004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved 005 * 006 * The contents of this file are subject to the terms 007 * of the Common Development and Distribution License 008 * (the License). You may not use this file except in 009 * compliance with the License. 010 * 011 * You can obtain a copy of the License at 012 * https://opensso.dev.java.net/public/CDDLv1.0.html or 013 * opensso/legal/CDDLv1.0.txt 014 * See the License for the specific language governing 015 * permission and limitations under the License. 016 * 017 * When distributing Covered Code, include this CDDL 018 * Header Notice in each file and include the License file 019 * at opensso/legal/CDDLv1.0.txt. 020 * If applicable, add the following below the CDDL Header, 021 * with the fields enclosed by brackets [] replaced by 022 * your own identifying information: 023 * "Portions Copyrighted [year] [name of copyright owner]" 024 * 025 * $Id: ThreadPool.java,v 1.10 2008/10/04 00:11:46 arviranga Exp $ 026 * 027 */ 028 029package com.iplanet.am.util; 030 031import com.sun.identity.shared.debug.Debug; 032 033/** 034 * <p> 035 * This thread pool maintains a number of threads that run the tasks from a task 036 * queue one by one. The tasks are handled in asynchronous mode, which means it 037 * will not block the main thread to proceed while the task is being processed 038 * by the thread pool. 039 * <p> 040 * This thread pool has a fixed size of threads. It maintains all the tasks to 041 * be executed in a task queue. Each thread then in turn gets a task from the 042 * queue to execute. If the tasks in the task queue reaches a certain number(the 043 * threshold value), it will log an error message and ignore the new incoming 044 * tasks until the number of un-executed tasks is less than the threshold value. 045 * This guarantees the thread pool will not use up the system resources under 046 * heavy load. 047 * @supported.all.api 048 */ 049public class ThreadPool { 050 051 private int poolSize; 052 private int threshold; 053 private String poolName; 054 private Debug debug; 055 private java.util.ArrayList taskList; 056 private int busyThreadCount; 057 private int currentThreadCount; 058 private boolean shutdownThePool; 059 private boolean daemon; 060 private WorkerThread[] threads; 061 062 /** 063 * Constructs a thread pool with given parameters. 064 * 065 * @param name 066 * name of the thread pool. 067 * @param poolSize 068 * the thread pool size, indicates how many threads are created 069 * in the pool. 070 * @param threshold 071 * the maximum size of the task queue in the thread pool. 072 * @param daemon 073 * set the threads as daemon if true; otherwise if not. 074 * @param debug 075 * Debug object to send debugging message to. 076 */ 077 public ThreadPool(String name, int poolSize, int threshold, boolean daemon, 078 Debug debug) { 079 this.debug = debug; 080 this.poolSize = poolSize; 081 this.threshold = threshold; 082 this.poolName = name; 083 if (threshold > 0) { 084 // initialize the size of the ArrayList, it doesn't need to expand 085 // during runtime. 086 this.taskList = new java.util.ArrayList(threshold); 087 } else { 088 this.taskList = new java.util.ArrayList(); 089 } 090 this.busyThreadCount = 0; 091 this.currentThreadCount = 0; 092 this.daemon = daemon; 093 this.shutdownThePool = false; 094 this.threads = new WorkerThread[poolSize]; 095 if (debug.messageEnabled()) { 096 debug.message("Initiating login thread pool size = " 097 + this.poolSize + "\nThreshold = " + threshold); 098 } 099 synchronized (this) { 100 createThreads(poolSize); 101 } 102 } 103 104 /** 105 * Create thread for the pool. 106 * 107 * @param threadsToCreate number of threads of the pool after creation 108 */ 109 protected void createThreads(int threadsToCreate) { 110 if (threadsToCreate > poolSize) { 111 threadsToCreate = poolSize; 112 } 113 for (int i = currentThreadCount; i < threadsToCreate; i++) { 114 threads[i - busyThreadCount] = new WorkerThread(poolName, this); 115 threads[i - busyThreadCount].setDaemon(daemon); 116 threads[i - busyThreadCount].start(); 117 } 118 currentThreadCount = threadsToCreate; 119 } 120 121 private WorkerThread getAvailableThread() { 122 WorkerThread t = null; 123 synchronized (this) { 124 if (currentThreadCount == busyThreadCount) { 125 createThreads(poolSize); 126 } 127 // get threads from the end of the array 128 t = threads[currentThreadCount - busyThreadCount - 1]; 129 threads[currentThreadCount - busyThreadCount - 1] = null; 130 busyThreadCount++; 131 } 132 return t; 133 } 134 135 /** 136 * Runs a user defined task. 137 * 138 * @param task 139 * user defined task. 140 * @throws ThreadPoolException 141 */ 142 public final void run(Runnable task) throws ThreadPoolException 143 { 144 WorkerThread t = null; 145 synchronized (this) { 146 if (shutdownThePool) { 147 // No more tasks will be accepted 148 throw new ThreadPoolException(poolName + 149 " thread pool's being shutdown."); 150 } 151 if (busyThreadCount == poolSize) { 152 if ((threshold > 0) && (taskList.size() >= threshold)) { 153 throw new ThreadPoolException(poolName + 154 " thread pool's task queue is full."); 155 } else { 156 taskList.add(task); 157 } 158 } 159 else{ 160 t = getAvailableThread(); 161 } 162 } 163 if ((t != null) && (task != null)) { 164 t.runTask(task); 165 } 166 } 167 168 protected synchronized void deductCurrentThreadCount(){ 169 currentThreadCount--; 170 busyThreadCount--; 171 if (!taskList.isEmpty()) { 172 WorkerThread t = getAvailableThread(); 173 t.runTask((Runnable)taskList.remove(0)); 174 } else { 175 if (shutdownThePool && (busyThreadCount == 0)) { 176 notify(); 177 } 178 } 179 } 180 181 // return the thread to the thread pool 182 protected synchronized void returnThread(WorkerThread t) { 183 if (!taskList.isEmpty()){ 184 t.runTask((Runnable)taskList.remove(0)); 185 } 186 else{ 187 if(shutdownThePool) { 188 t.terminate(); 189 // notify the thread pool when all threads are backed 190 // need to discuss whether the thread pool need to wait until 191 // all threads are terminated. For stand alone application, the 192 // answer is yes, however, our application is run under web 193 // container. The reason why we need shutdown because it has a 194 // parameter daemon in the constructor, if it is set to false, 195 // the old implementation has no way to stop the running 196 // threads. For the new implementation, if daemon is set to 197 // false, it is necessary to call shutdown. If daemon is set to 198 // true, it is nice to call it because the thread pool has 199 // better knownledge than the web container to stop the threads 200 // in the pool. 201 busyThreadCount--; 202 currentThreadCount--; 203 if(busyThreadCount == 0){ 204 notify(); 205 } 206 } else { 207 busyThreadCount--; 208 // return threads from the end of array 209 threads[currentThreadCount - busyThreadCount - 1] = t; 210 } 211 } 212 } 213 214 // terminate all the threads since the pass-in parameter of daemon may be 215 // false 216 public synchronized void shutdown() { 217 if(!shutdownThePool) { 218 shutdownThePool = true; 219 // If daemon thread, discard the remaining tasks 220 // else, wait for all tasks to be completed 221 if (daemon) { 222 taskList.clear(); 223 } else { 224 while (!taskList.isEmpty()) { 225 try { 226 // wait if there are tasks & threads to be executed 227 wait(); 228 } catch (Exception ex) { 229 debug.error("ThreadPool.shutdown Excetion while " + 230 "waiting for tasks/threads to complete", ex); 231 } 232 } 233 } 234 for(int i = 0; i < currentThreadCount - busyThreadCount; i++) { 235 // terminate the thread from the beginning of the array 236 if (threads[i] != null) { 237 threads[i].terminate(); 238 } 239 } 240 while(busyThreadCount != 0){ 241 try{ 242 // wait if there are threads running, it will be notified 243 // when they all back. 244 wait(); 245 } catch (Exception ex) { 246 ex.printStackTrace(); 247 } 248 } 249 currentThreadCount = busyThreadCount = 0; 250 threads = null; 251 } 252 } 253 254 // for test only 255 public synchronized int getCurrentThreadCount() { 256 return currentThreadCount; 257 } 258 259 /* 260 * Returns the size of the task list. 261 */ 262 public int getCurrentSize() { 263 return taskList.size(); 264 } 265 266 // private thread class that fetches tasks from the task queue and 267 // executes them. 268 private class WorkerThread extends Thread { 269 270 private Runnable task = null; 271 private ThreadPool pool; 272 private boolean needReturn; 273 private boolean shouldTerminate; 274 275 public WorkerThread(String name, ThreadPool pool) { 276 setName(name); 277 this.pool = pool; 278 this.shouldTerminate = false; 279 this.needReturn = true; 280 } 281 282 /** 283 * Starts the thread pool. 284 */ 285 public void run() { 286 boolean localShouldTerminate = false; 287 Runnable localTask = null; 288 WorkerThread t = this; 289 while (true) { 290 try{ 291 synchronized (this) { 292 while ((task == null) && (!shouldTerminate)){ 293 this.wait(); 294 } 295 // need a local copy because they may be changed after 296 // leaving synchronized block. 297 localShouldTerminate = shouldTerminate; 298 localTask = task; 299 task = null; 300 } 301 if (localShouldTerminate) { 302 // we may need to log something here! 303 break; 304 } 305 if(localTask != null){ 306 localTask.run(); 307 } 308 } catch (RuntimeException ex) { 309 debug.error("Running task " + task, ex); 310 // decide what to log here 311 pool.deductCurrentThreadCount(); 312 localShouldTerminate = true; 313 needReturn = false; 314 } catch (Exception ex) { 315 // don't need to rethrow 316 debug.error("Running task " + task, ex); 317 } catch (Throwable e) { 318 debug.error("Running task " + task, e); 319 // decide what to log here 320 pool.deductCurrentThreadCount(); 321 localShouldTerminate = true; 322 needReturn = false; 323 // rethrow Error here 324 throw new Error(e); 325 } finally { 326 // the thread may has returned already if shutdown is 327 // called. 328 if (needReturn) { 329 pool.returnThread(t); 330 } 331 } 332 if (localShouldTerminate) { 333 // we may need to log something here! 334 break; 335 } 336 } 337 } 338 339 public synchronized void runTask(Runnable toRun) { 340 this.task = toRun; 341 // Although the thread may not in wait state when this function 342 // is called (the taskList is not empty), it doesn't hurt to 343 // call it. getState method can check whether the Thread is 344 // waiting, but it is available in jdk1.5 or newer. 345 this.notify(); 346 } 347 348 // terminate the thread pool when daemon is set to false 349 // it is better to have a way to terminate the thread pool 350 public synchronized void terminate() { 351 shouldTerminate = true; 352 needReturn = false; 353 this.notify(); 354 } 355 } 356}