001/**
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003 *
004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved
005 *
006 * The contents of this file are subject to the terms
007 * of the Common Development and Distribution License
008 * (the License). You may not use this file except in
009 * compliance with the License.
010 *
011 * You can obtain a copy of the License at
012 * https://opensso.dev.java.net/public/CDDLv1.0.html or
013 * opensso/legal/CDDLv1.0.txt
014 * See the License for the specific language governing
015 * permission and limitations under the License.
016 *
017 * When distributing Covered Code, include this CDDL
018 * Header Notice in each file and include the License file
019 * at opensso/legal/CDDLv1.0.txt.
020 * If applicable, add the following below the CDDL Header,
021 * with the fields enclosed by brackets [] replaced by
022 * your own identifying information:
023 * "Portions Copyrighted [year] [name of copyright owner]"
024 *
025 * $Id: ThreadPool.java,v 1.10 2008/10/04 00:11:46 arviranga Exp $
026 *
027 */
028/**
029 * Portions Copyrighted 2015 ForgeRock AS.
030 */
031
032package com.iplanet.am.util;
033
034import com.sun.identity.shared.debug.Debug;
035import org.forgerock.openam.audit.context.AuditRequestContextPropagatingRunnable;
036
037/**
038 * <p>
039 * This thread pool maintains a number of threads that run the tasks from a task
040 * queue one by one. The tasks are handled in asynchronous mode, which means it
041 * will not block the main thread to proceed while the task is being processed
042 * by the thread pool.
043 * <p>
044 * This thread pool has a fixed size of threads. It maintains all the tasks to
045 * be executed in a task queue. Each thread then in turn gets a task from the
046 * queue to execute. If the tasks in the task queue reaches a certain number(the
047 * threshold value), it will log an error message and ignore the new incoming
048 * tasks until the number of un-executed tasks is less than the threshold value.
049 * This guarantees the thread pool will not use up the system resources under
050 * heavy load.
051 * @supported.all.api
052 */
053public class ThreadPool {
054
055    private int poolSize;
056    private int threshold;
057    private String poolName;
058    private Debug debug;
059    private java.util.ArrayList taskList;
060    private int busyThreadCount;
061    private int currentThreadCount;
062    private boolean shutdownThePool;
063    private boolean daemon;
064    private WorkerThread[] threads;
065
066    /**
067     * Constructs a thread pool with given parameters.
068     * 
069     * @param name
070     *            name of the thread pool.
071     * @param poolSize
072     *            the thread pool size, indicates how many threads are created
073     *            in the pool.
074     * @param threshold
075     *            the maximum size of the task queue in the thread pool.
076     * @param daemon
077     *            set the threads as daemon if true; otherwise if not.
078     * @param debug
079     *            Debug object to send debugging message to.
080     */
081    public ThreadPool(String name, int poolSize, int threshold, boolean daemon,
082        Debug debug) {
083        this.debug = debug;
084        this.poolSize = poolSize;
085        this.threshold = threshold;
086        this.poolName = name;
087        if (threshold > 0) {
088            // initialize the size of the ArrayList, it doesn't need to expand
089            // during runtime.
090            this.taskList = new java.util.ArrayList(threshold);
091        } else {
092            this.taskList = new java.util.ArrayList();
093        }
094        this.busyThreadCount = 0;
095        this.currentThreadCount = 0;
096        this.daemon = daemon;
097        this.shutdownThePool = false;
098        this.threads = new WorkerThread[poolSize];
099        if (debug.messageEnabled()) {
100            debug.message("Initiating login thread pool size = "
101                    + this.poolSize + "\nThreshold = " + threshold);
102        }
103        synchronized (this) {
104            createThreads(poolSize);
105        }
106    }
107    
108    /**
109     * Create thread for the pool.
110     *
111     * @param threadsToCreate number of threads of the pool after creation
112     */
113    protected void createThreads(int threadsToCreate) {
114        if (threadsToCreate > poolSize) {
115            threadsToCreate = poolSize;
116        }
117        for (int i = currentThreadCount; i < threadsToCreate; i++) {
118            threads[i - busyThreadCount] = new WorkerThread(poolName, this);
119            threads[i - busyThreadCount].setDaemon(daemon);
120            threads[i - busyThreadCount].start();
121        }
122        currentThreadCount = threadsToCreate;
123    }
124    
125    private WorkerThread getAvailableThread() {
126        WorkerThread t = null;
127        synchronized (this) {
128            if (currentThreadCount == busyThreadCount) {
129                createThreads(poolSize);
130            }
131            // get threads from the end of the array
132            t = threads[currentThreadCount - busyThreadCount - 1];
133            threads[currentThreadCount - busyThreadCount - 1] = null;
134            busyThreadCount++;
135        }
136        return t;
137    }
138
139    /**
140     * Runs a user defined task.
141     * 
142     * @param task
143     *            user defined task.
144     * @throws ThreadPoolException
145     */
146    public final void run(Runnable task) throws ThreadPoolException 
147    {
148        WorkerThread t = null;
149        synchronized (this) {
150            if (shutdownThePool) {
151                // No more tasks will be accepted
152                throw new ThreadPoolException(poolName +
153                    " thread pool's being shutdown.");
154            }
155            if (busyThreadCount == poolSize) {
156                if ((threshold > 0) && (taskList.size() >= threshold)) {
157                    throw new ThreadPoolException(poolName + 
158                        " thread pool's task queue is full.");
159                } else {
160                    taskList.add(wrap(task));
161                }
162            }
163            else{
164                t = getAvailableThread();
165            }
166        }
167        if ((t != null) && (task != null)) {
168            t.runTask(wrap(task));
169        }
170    }
171
172    private Runnable wrap(Runnable delegate) {
173        return new AuditRequestContextPropagatingRunnable(delegate);
174    }
175
176    protected synchronized void deductCurrentThreadCount(){
177        currentThreadCount--;
178        busyThreadCount--;
179        if (!taskList.isEmpty()) {
180            WorkerThread t = getAvailableThread();
181            t.runTask((Runnable)taskList.remove(0));
182        } else {
183            if (shutdownThePool && (busyThreadCount == 0)) {
184                notify();
185            }
186        }
187    }
188    
189    // return the thread to the thread pool
190    protected synchronized void returnThread(WorkerThread t) {
191        if (!taskList.isEmpty()){
192            t.runTask((Runnable)taskList.remove(0));
193        }
194        else{
195            if(shutdownThePool) {
196                t.terminate();
197                // notify the thread pool when all threads are backed
198                // need to discuss whether the thread pool need to wait until
199                // all threads are terminated.  For stand alone application, the
200                // answer is yes, however, our application is run under web
201                // container. The reason why we need shutdown because it has a
202                // parameter daemon in the constructor, if it is set to false,
203                // the old implementation has no way to stop the running
204                // threads. For the new implementation, if daemon is set to
205                // false, it is necessary to call shutdown.  If daemon is set to
206                // true, it is nice to call it because the thread pool has
207                // better knownledge than the web container to stop the threads
208                // in the pool.
209                busyThreadCount--;
210                currentThreadCount--;
211                if(busyThreadCount == 0){
212                    notify();
213                }
214            } else {
215                busyThreadCount--;
216                // return threads from the end of array
217                threads[currentThreadCount - busyThreadCount - 1] = t;
218            }
219        }
220    }
221    
222    // terminate all the threads since the pass-in parameter of daemon may be
223    // false
224    public synchronized void shutdown() {
225        if(!shutdownThePool) {
226            shutdownThePool = true;
227            // If daemon thread, discard the remaining tasks
228            // else, wait for all tasks to be completed
229            if (daemon) {
230                taskList.clear();
231            } else {
232                while (!taskList.isEmpty()) {
233                    try {
234                        // wait if there are tasks & threads to be executed
235                        wait();
236                    } catch (Exception ex) {
237                        debug.error("ThreadPool.shutdown Excetion while " +
238                            "waiting for tasks/threads to complete", ex);
239                    }
240                }
241            }
242            for(int i = 0; i < currentThreadCount - busyThreadCount; i++) {
243                // terminate the thread from the beginning of the array
244                if (threads[i] != null) {
245                    threads[i].terminate();
246                }
247            }
248            while(busyThreadCount != 0){
249                try{
250                    // wait if there are threads running, it will be notified
251                    // when they all back.
252                    wait();
253                } catch (Exception ex) {
254                    ex.printStackTrace();
255                }
256            }
257            currentThreadCount = busyThreadCount = 0;
258            threads = null;
259        }
260    }
261    
262    // for test only
263    public synchronized int getCurrentThreadCount() {
264        return currentThreadCount;
265    }
266    
267    /*
268     * Returns the size of the task list.
269     */
270    public int getCurrentSize() {
271        return taskList.size();
272    }
273    
274    // private thread class that fetches tasks from the task queue and
275    // executes them.
276    private class WorkerThread extends Thread {
277        
278        private Runnable task = null;
279        private ThreadPool pool;
280        private boolean needReturn;
281        private boolean shouldTerminate;
282        
283        public WorkerThread(String name, ThreadPool pool) {
284            setName(name);
285            this.pool = pool;
286            this.shouldTerminate = false;
287            this.needReturn = true;
288        }
289 
290        /**
291         * Starts the thread pool.
292         */
293        public void run() {
294            boolean localShouldTerminate = false;
295            Runnable localTask = null;
296            WorkerThread t = this;
297            while (true) {
298                try{
299                    synchronized (this) {
300                        while ((task == null) && (!shouldTerminate)){
301                            this.wait();
302                        }
303                        // need a local copy because they may be changed after
304                        // leaving synchronized block.
305                        localShouldTerminate = shouldTerminate;
306                        localTask = task;
307                        task = null;
308                    }
309                    if (localShouldTerminate) {
310                        // we may need to log something here!
311                        break;
312                    }
313                    if(localTask != null){
314                        localTask.run();
315                    }
316                } catch (RuntimeException ex) {
317                    debug.error("Running task " + task, ex);
318                    // decide what to log here
319                    pool.deductCurrentThreadCount();
320                    localShouldTerminate = true;
321                    needReturn = false;
322                } catch (Exception ex) {
323                    // don't need to rethrow
324                    debug.error("Running task " + task, ex);
325                } catch (Throwable e) {
326                    debug.error("Running task " + task, e);
327                    // decide what to log here
328                    pool.deductCurrentThreadCount();
329                    localShouldTerminate = true;
330                    needReturn = false;
331                    // rethrow Error here
332                    throw new Error(e);
333                } finally {
334                    // the thread may has returned already if shutdown is
335                    // called.
336                    if (needReturn) {
337                        pool.returnThread(t);
338                    }
339                }
340                if (localShouldTerminate) {
341                    // we may need to log something here!
342                    break;
343                }
344            }
345        }
346    
347        public synchronized void runTask(Runnable toRun) {
348            this.task = toRun;
349            // Although the thread may not in wait state when this function
350            // is called (the taskList is not empty), it doesn't hurt to
351            // call it.  getState method can check whether the Thread is
352            // waiting, but it is available in jdk1.5 or newer.
353            this.notify();
354        }
355        
356        // terminate the thread pool when daemon is set to false
357        // it is better to have a way to terminate the thread pool
358        public synchronized void terminate() {
359            shouldTerminate = true;
360            needReturn = false;
361            this.notify();
362        }
363    }
364}