001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the License.
004 *
005 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
006 * specific language governing permission and limitations under the License.
007 *
008 * When distributing Covered Software, include this CDDL Header Notice in each file and include
009 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
010 * Header, with the fields enclosed by brackets [] replaced by your own identifying
011 * information: "Portions copyright [year] [name of copyright owner]".
012 *
013 * Copyright 2015 ForgeRock AS.
014 */
015package org.forgerock.util.thread;
016
017import org.forgerock.util.Reject;
018import org.forgerock.util.thread.listener.ShutdownListener;
019import org.forgerock.util.thread.listener.ShutdownManager;
020
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023import java.util.concurrent.ScheduledExecutorService;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicInteger;
029
030/**
031 * Responsible for generating ExecutorService instances which are automatically
032 * wired up to shutdown when the ShutdownListener event triggers.
033 *
034 * This factory simplifies the creation of ExecutorServices which could overlook
035 * the important step of registering with the ShutdownManager. Failure to do so
036 * will prevent the server from shutting down.
037 *
038 * Note: Executors created using this factory will be triggered with the
039 * ExecutorService#shutdownNow method. This will interrupt all blocking calls
040 * made by those threads. This may be important for some users.
041 *
042 * @since 1.3.5
043 */
044public class ExecutorServiceFactory {
045    private final ShutdownManager shutdownManager;
046
047    /**
048     * Create an instance of the factory.
049     *
050     * @param shutdownManager Required to ensure each ExecutorService will be shutdown.
051     */
052    public ExecutorServiceFactory(ShutdownManager shutdownManager) {
053        this.shutdownManager = shutdownManager;
054    }
055
056    /**
057     * Generates a ScheduledExecutorService which has been pre-registered with the
058     * ShutdownManager.
059     *
060     * @see java.util.concurrent.Executors#newScheduledThreadPool(int)
061     *
062     * @param poolSize The size of the ScheduledExecutorService thread pool.
063     *
064     * @return A non null ScheduledExecutorService
065     */
066    public ScheduledExecutorService createScheduledService(int poolSize) {
067        final ScheduledExecutorService service = Executors.newScheduledThreadPool(poolSize);
068        registerShutdown(service);
069        return service;
070    }
071
072    /**
073     * Creates a fixed size Thread Pool ExecutorService which has been pre-registered with
074     * the {@link org.forgerock.util.thread.listener.ShutdownManager}.
075     *
076     * @param pool The size of the pool to create.
077     * @param factory The {@link java.util.concurrent.ThreadFactory} used to generate new threads.
078     * @return Non null.
079     */
080    public ExecutorService createFixedThreadPool(int pool, ThreadFactory factory) {
081        ExecutorService service = Executors.newFixedThreadPool(pool, factory);
082        registerShutdown(service);
083        return service;
084    }
085
086    /**
087     * @see #createFixedThreadPool(int, java.util.concurrent.ThreadFactory)
088     *
089     * @param pool Size of the fixed pool.
090     * @param threadName The name to use when generating new threads.
091     * @return Non null.
092     */
093    public ExecutorService createFixedThreadPool(int pool, String threadName) {
094        ExecutorService service = Executors.newFixedThreadPool(pool, new NamedThreadFactory(threadName));
095        registerShutdown(service);
096        return service;
097    }
098
099    /**
100     * @see #createFixedThreadPool(int, java.util.concurrent.ThreadFactory)
101     *
102     * @param pool Size of the fixed pool.
103     * @return Non null.
104     */
105    public ExecutorService createFixedThreadPool(int pool) {
106        ExecutorService service = Executors.newFixedThreadPool(pool);
107        registerShutdown(service);
108        return service;
109    }
110
111    /**
112     * Generates a Cached Thread Pool ExecutorService which has been pre-registered with the
113     * ShutdownManager. The provided ThreadFactory is used by the service when creating Threads.
114     *
115     * @see java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
116     *
117     * @param factory The ThreadFactory that will be used when generating threads. May not be null.
118     * @return A non null ExecutorService.
119     */
120    public ExecutorService createCachedThreadPool(ThreadFactory factory) {
121        ExecutorService service = Executors.newCachedThreadPool(factory);
122        registerShutdown(service);
123        return service;
124    }
125
126    /**
127     * @see #createCachedThreadPool(java.util.concurrent.ThreadFactory)
128     *
129     * @param threadName The name to assign to all threads generated by this executor service.
130     * @return Non null.
131     */
132    public ExecutorService createCachedThreadPool(String threadName) {
133        ExecutorService service = Executors.newCachedThreadPool(new NamedThreadFactory(threadName));
134        registerShutdown(service);
135        return service;
136    }
137
138    /**
139     * @see #createCachedThreadPool(java.util.concurrent.ThreadFactory)
140     *
141     * @return Non null.
142     */
143    public ExecutorService createCachedThreadPool() {
144        ExecutorService service = Executors.newCachedThreadPool();
145        registerShutdown(service);
146        return service;
147    }
148
149    /**
150     * Generates a ThreadPoolExecutor with the provided values, and registers that executor as listening for
151     * shutdown messages.
152     *
153     * @param coreSize the number of threads to keep in the pool, even if they are idle
154     * @param maxSize Max number of threads in the pool
155     * @param idleTimeout When the number of threads is greater than core, maximum time that excess idle
156     *                    threads will wait before terminating
157     * @param timeoutTimeunit The time unit for the idleTimeout argument
158     * @param runnables Queue of threads to be run
159     * @return a configured ExecutorService, registered to listen to shutdown messages.
160     */
161    public ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout,
162                                            TimeUnit timeoutTimeunit, BlockingQueue<Runnable> runnables) {
163        Reject.ifTrue(coreSize < 0);
164        Reject.ifTrue(maxSize < coreSize || maxSize <= 0);
165        Reject.ifTrue(idleTimeout < 0);
166
167        ExecutorService service = new ThreadPoolExecutor(coreSize, maxSize, idleTimeout, timeoutTimeunit,
168                runnables);
169        registerShutdown(service);
170        return service;
171    }
172
173    /**
174     * Registers a listener to trigger shutdown of the ExecutorService.
175     * @param service Non null ExecutorService to register.
176     */
177    private void registerShutdown(final ExecutorService service) {
178        shutdownManager.addShutdownListener(new ShutdownListener() {
179                    public void shutdown() {
180                        service.shutdownNow();
181                    }
182                });
183    }
184
185    /**
186     * Used to generate threads with a provided name. Each new thread will
187     * have its generated number appended to the end of it, in the form -X, where
188     * X is incremented once for each thread created.
189     */
190    private class NamedThreadFactory implements ThreadFactory {
191
192        private final AtomicInteger count = new AtomicInteger(0);
193        private final String name;
194
195        public NamedThreadFactory(String name) {
196            this.name = name;
197        }
198
199        public Thread newThread(Runnable r) {
200            return new Thread(r, name + "-" +  count.getAndIncrement());
201        }
202    }
203
204}