001/**
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003 *
004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved
005 *
006 * The contents of this file are subject to the terms
007 * of the Common Development and Distribution License
008 * (the License). You may not use this file except in
009 * compliance with the License.
010 *
011 * You can obtain a copy of the License at
012 * https://opensso.dev.java.net/public/CDDLv1.0.html or
013 * opensso/legal/CDDLv1.0.txt
014 * See the License for the specific language governing
015 * permission and limitations under the License.
016 *
017 * When distributing Covered Code, include this CDDL
018 * Header Notice in each file and include the License file
019 * at opensso/legal/CDDLv1.0.txt.
020 * If applicable, add the following below the CDDL Header,
021 * with the fields enclosed by brackets [] replaced by
022 * your own identifying information:
023 * "Portions Copyrighted [year] [name of copyright owner]"
024 *
025 * $Id: ThreadPool.java,v 1.10 2008/10/04 00:11:46 arviranga Exp $
026 *
027 */
028
029package com.iplanet.am.util;
030
031import com.sun.identity.shared.debug.Debug;
032
033/**
034 * <p>
035 * This thread pool maintains a number of threads that run the tasks from a task
036 * queue one by one. The tasks are handled in asynchronous mode, which means it
037 * will not block the main thread to proceed while the task is being processed
038 * by the thread pool.
039 * <p>
040 * This thread pool has a fixed size of threads. It maintains all the tasks to
041 * be executed in a task queue. Each thread then in turn gets a task from the
042 * queue to execute. If the tasks in the task queue reaches a certain number(the
043 * threshold value), it will log an error message and ignore the new incoming
044 * tasks until the number of un-executed tasks is less than the threshold value.
045 * This guarantees the thread pool will not use up the system resources under
046 * heavy load.
047 * @supported.all.api
048 */
049public class ThreadPool {
050
051    private int poolSize;
052    private int threshold;
053    private String poolName;
054    private Debug debug;
055    private java.util.ArrayList taskList;
056    private int busyThreadCount;
057    private int currentThreadCount;
058    private boolean shutdownThePool;
059    private boolean daemon;
060    private WorkerThread[] threads;
061
062    /**
063     * Constructs a thread pool with given parameters.
064     * 
065     * @param name
066     *            name of the thread pool.
067     * @param poolSize
068     *            the thread pool size, indicates how many threads are created
069     *            in the pool.
070     * @param threshold
071     *            the maximum size of the task queue in the thread pool.
072     * @param daemon
073     *            set the threads as daemon if true; otherwise if not.
074     * @param debug
075     *            Debug object to send debugging message to.
076     */
077    public ThreadPool(String name, int poolSize, int threshold, boolean daemon,
078        Debug debug) {
079        this.debug = debug;
080        this.poolSize = poolSize;
081        this.threshold = threshold;
082        this.poolName = name;
083        if (threshold > 0) {
084            // initialize the size of the ArrayList, it doesn't need to expand
085            // during runtime.
086            this.taskList = new java.util.ArrayList(threshold);
087        } else {
088            this.taskList = new java.util.ArrayList();
089        }
090        this.busyThreadCount = 0;
091        this.currentThreadCount = 0;
092        this.daemon = daemon;
093        this.shutdownThePool = false;
094        this.threads = new WorkerThread[poolSize];
095        if (debug.messageEnabled()) {
096            debug.message("Initiating login thread pool size = "
097                    + this.poolSize + "\nThreshold = " + threshold);
098        }
099        synchronized (this) {
100            createThreads(poolSize);
101        }
102    }
103    
104    /**
105     * Create thread for the pool.
106     *
107     * @param threadsToCreate number of threads of the pool after creation
108     */
109    protected void createThreads(int threadsToCreate) {
110        if (threadsToCreate > poolSize) {
111            threadsToCreate = poolSize;
112        }
113        for (int i = currentThreadCount; i < threadsToCreate; i++) {
114            threads[i - busyThreadCount] = new WorkerThread(poolName, this);
115            threads[i - busyThreadCount].setDaemon(daemon);
116            threads[i - busyThreadCount].start();
117        }
118        currentThreadCount = threadsToCreate;
119    }
120    
121    private WorkerThread getAvailableThread() {
122        WorkerThread t = null;
123        synchronized (this) {
124            if (currentThreadCount == busyThreadCount) {
125                createThreads(poolSize);
126            }
127            // get threads from the end of the array
128            t = threads[currentThreadCount - busyThreadCount - 1];
129            threads[currentThreadCount - busyThreadCount - 1] = null;
130            busyThreadCount++;
131        }
132        return t;
133    }
134
135    /**
136     * Runs a user defined task.
137     * 
138     * @param task
139     *            user defined task.
140     * @throws ThreadPoolException
141     */
142    public final void run(Runnable task) throws ThreadPoolException 
143    {
144        WorkerThread t = null;
145        synchronized (this) {
146            if (shutdownThePool) {
147                // No more tasks will be accepted
148                throw new ThreadPoolException(poolName +
149                    " thread pool's being shutdown.");
150            }
151            if (busyThreadCount == poolSize) {
152                if ((threshold > 0) && (taskList.size() >= threshold)) {
153                    throw new ThreadPoolException(poolName + 
154                        " thread pool's task queue is full.");
155                } else {
156                    taskList.add(task);
157                }
158            }
159            else{
160                t = getAvailableThread();
161            }
162        }
163        if ((t != null) && (task != null)) {
164            t.runTask(task);
165        }
166    }
167
168    protected synchronized void deductCurrentThreadCount(){
169        currentThreadCount--;
170        busyThreadCount--;
171        if (!taskList.isEmpty()) {
172            WorkerThread t = getAvailableThread();
173            t.runTask((Runnable)taskList.remove(0));
174        } else {
175            if (shutdownThePool && (busyThreadCount == 0)) {
176                notify();
177            }
178        }
179    }
180    
181    // return the thread to the thread pool
182    protected synchronized void returnThread(WorkerThread t) {
183        if (!taskList.isEmpty()){
184            t.runTask((Runnable)taskList.remove(0));
185        }
186        else{
187            if(shutdownThePool) {
188                t.terminate();
189                // notify the thread pool when all threads are backed
190                // need to discuss whether the thread pool need to wait until
191                // all threads are terminated.  For stand alone application, the
192                // answer is yes, however, our application is run under web
193                // container. The reason why we need shutdown because it has a
194                // parameter daemon in the constructor, if it is set to false,
195                // the old implementation has no way to stop the running
196                // threads. For the new implementation, if daemon is set to
197                // false, it is necessary to call shutdown.  If daemon is set to
198                // true, it is nice to call it because the thread pool has
199                // better knownledge than the web container to stop the threads
200                // in the pool.
201                busyThreadCount--;
202                currentThreadCount--;
203                if(busyThreadCount == 0){
204                    notify();
205                }
206            } else {
207                busyThreadCount--;
208                // return threads from the end of array
209                threads[currentThreadCount - busyThreadCount - 1] = t;
210            }
211        }
212    }
213    
214    // terminate all the threads since the pass-in parameter of daemon may be
215    // false
216    public synchronized void shutdown() {
217        if(!shutdownThePool) {
218            shutdownThePool = true;
219            // If daemon thread, discard the remaining tasks
220            // else, wait for all tasks to be completed
221            if (daemon) {
222                taskList.clear();
223            } else {
224                while (!taskList.isEmpty()) {
225                    try {
226                        // wait if there are tasks & threads to be executed
227                        wait();
228                    } catch (Exception ex) {
229                        debug.error("ThreadPool.shutdown Excetion while " +
230                            "waiting for tasks/threads to complete", ex);
231                    }
232                }
233            }
234            for(int i = 0; i < currentThreadCount - busyThreadCount; i++) {
235                // terminate the thread from the beginning of the array
236                if (threads[i] != null) {
237                    threads[i].terminate();
238                }
239            }
240            while(busyThreadCount != 0){
241                try{
242                    // wait if there are threads running, it will be notified
243                    // when they all back.
244                    wait();
245                } catch (Exception ex) {
246                    ex.printStackTrace();
247                }
248            }
249            currentThreadCount = busyThreadCount = 0;
250            threads = null;
251        }
252    }
253    
254    // for test only
255    public synchronized int getCurrentThreadCount() {
256        return currentThreadCount;
257    }
258    
259    /*
260     * Returns the size of the task list.
261     */
262    public int getCurrentSize() {
263        return taskList.size();
264    }
265    
266    // private thread class that fetches tasks from the task queue and
267    // executes them.
268    private class WorkerThread extends Thread {
269        
270        private Runnable task = null;
271        private ThreadPool pool;
272        private boolean needReturn;
273        private boolean shouldTerminate;
274        
275        public WorkerThread(String name, ThreadPool pool) {
276            setName(name);
277            this.pool = pool;
278            this.shouldTerminate = false;
279            this.needReturn = true;
280        }
281 
282        /**
283         * Starts the thread pool.
284         */
285        public void run() {
286            boolean localShouldTerminate = false;
287            Runnable localTask = null;
288            WorkerThread t = this;
289            while (true) {
290                try{
291                    synchronized (this) {
292                        while ((task == null) && (!shouldTerminate)){
293                            this.wait();
294                        }
295                        // need a local copy because they may be changed after
296                        // leaving synchronized block.
297                        localShouldTerminate = shouldTerminate;
298                        localTask = task;
299                        task = null;
300                    }
301                    if (localShouldTerminate) {
302                        // we may need to log something here!
303                        break;
304                    }
305                    if(localTask != null){
306                        localTask.run();
307                    }
308                } catch (RuntimeException ex) {
309                    debug.error("Running task " + task, ex);
310                    // decide what to log here
311                    pool.deductCurrentThreadCount();
312                    localShouldTerminate = true;
313                    needReturn = false;
314                } catch (Exception ex) {
315                    // don't need to rethrow
316                    debug.error("Running task " + task, ex);
317                } catch (Throwable e) {
318                    debug.error("Running task " + task, e);
319                    // decide what to log here
320                    pool.deductCurrentThreadCount();
321                    localShouldTerminate = true;
322                    needReturn = false;
323                    // rethrow Error here
324                    throw new Error(e);
325                } finally {
326                    // the thread may has returned already if shutdown is
327                    // called.
328                    if (needReturn) {
329                        pool.returnThread(t);
330                    }
331                }
332                if (localShouldTerminate) {
333                    // we may need to log something here!
334                    break;
335                }
336            }
337        }
338    
339        public synchronized void runTask(Runnable toRun) {
340            this.task = toRun;
341            // Although the thread may not in wait state when this function
342            // is called (the taskList is not empty), it doesn't hurt to
343            // call it.  getState method can check whether the Thread is
344            // waiting, but it is available in jdk1.5 or newer.
345            this.notify();
346        }
347        
348        // terminate the thread pool when daemon is set to false
349        // it is better to have a way to terminate the thread pool
350        public synchronized void terminate() {
351            shouldTerminate = true;
352            needReturn = false;
353            this.notify();
354        }
355    }
356}