001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015-2016 ForgeRock AS.
015 */
016package org.forgerock.util.thread;
017
018import org.forgerock.util.Reject;
019import org.forgerock.util.thread.listener.ShutdownListener;
020import org.forgerock.util.thread.listener.ShutdownManager;
021
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.atomic.AtomicInteger;
030
031/**
032 * Responsible for generating ExecutorService instances which are automatically
033 * wired up to shutdown when the ShutdownListener event triggers.
034 *
035 * This factory simplifies the creation of ExecutorServices which could overlook
036 * the important step of registering with the ShutdownManager. Failure to do so
037 * will prevent the server from shutting down.
038 *
039 * Note: Executors created using this factory will be triggered with the
040 * ExecutorService#shutdownNow method. This will interrupt all blocking calls
041 * made by those threads. This may be important for some users.
042 *
043 * @since 1.3.5
044 */
045public class ExecutorServiceFactory {
046    private final ShutdownManager shutdownManager;
047
048    /**
049     * Create an instance of the factory.
050     *
051     * @param shutdownManager Required to ensure each ExecutorService will be shutdown.
052     */
053    public ExecutorServiceFactory(ShutdownManager shutdownManager) {
054        this.shutdownManager = shutdownManager;
055    }
056
057    /**
058     * Generates a ScheduledExecutorService which has been pre-registered with the
059     * ShutdownManager.
060     *
061     * @see java.util.concurrent.Executors#newScheduledThreadPool(int)
062     *
063     * @param poolSize The size of the ScheduledExecutorService thread pool.
064     *
065     * @return A non null ScheduledExecutorService
066     */
067    public ScheduledExecutorService createScheduledService(int poolSize) {
068        final ScheduledExecutorService service = Executors.newScheduledThreadPool(poolSize);
069        registerShutdown(service);
070        return service;
071    }
072
073    /**
074     * Creates a fixed size Thread Pool ExecutorService which has been pre-registered with
075     * the {@link org.forgerock.util.thread.listener.ShutdownManager}.
076     *
077     * @param pool The size of the pool to create.
078     * @param factory The {@link java.util.concurrent.ThreadFactory} used to generate new threads.
079     * @return Non null.
080     */
081    public ExecutorService createFixedThreadPool(int pool, ThreadFactory factory) {
082        ExecutorService service = Executors.newFixedThreadPool(pool, factory);
083        registerShutdown(service);
084        return service;
085    }
086
087    /**
088     * Create a fixed size Thread Pool ExecutorService using the provided name as the prefix
089     * of the thread names.
090     *
091     * @see #createFixedThreadPool(int, java.util.concurrent.ThreadFactory)
092     *
093     * @param pool Size of the fixed pool.
094     * @param threadNamePrefix The thread name prefix to use when generating new threads.
095     * @return Non null.
096     */
097    public ExecutorService createFixedThreadPool(int pool, String threadNamePrefix) {
098        ExecutorService service = Executors.newFixedThreadPool(pool, new NamedThreadFactory(threadNamePrefix));
099        registerShutdown(service);
100        return service;
101    }
102
103    /**
104     * Create a fixed size Thread Pool ExecutorService.
105     * @see #createFixedThreadPool(int, java.util.concurrent.ThreadFactory)
106     *
107     * @param pool Size of the fixed pool.
108     * @return Non null.
109     */
110    public ExecutorService createFixedThreadPool(int pool) {
111        ExecutorService service = Executors.newFixedThreadPool(pool);
112        registerShutdown(service);
113        return service;
114    }
115
116    /**
117     * Generates a Cached Thread Pool ExecutorService which has been pre-registered with the
118     * ShutdownManager. The provided ThreadFactory is used by the service when creating Threads.
119     *
120     * @see java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
121     *
122     * @param factory The ThreadFactory that will be used when generating threads. May not be null.
123     * @return A non null ExecutorService.
124     */
125    public ExecutorService createCachedThreadPool(ThreadFactory factory) {
126        ExecutorService service = Executors.newCachedThreadPool(factory);
127        registerShutdown(service);
128        return service;
129    }
130
131    /**
132     * Generates a Cached Thread Pool ExecutorService using the provided name as a prefix
133     * of the thread names.
134     *
135     * @see #createCachedThreadPool(java.util.concurrent.ThreadFactory)
136     *
137     * @param threadNamePrefix The thread name prefix to use when generating new threads.
138     * @return Non null.
139     */
140    public ExecutorService createCachedThreadPool(String threadNamePrefix) {
141        ExecutorService service = Executors.newCachedThreadPool(new NamedThreadFactory(threadNamePrefix));
142        registerShutdown(service);
143        return service;
144    }
145
146    /**
147     * Generates a Cached Thread Pool ExecutorService.
148     * @see #createCachedThreadPool(java.util.concurrent.ThreadFactory)
149     *
150     * @return Non null.
151     */
152    public ExecutorService createCachedThreadPool() {
153        ExecutorService service = Executors.newCachedThreadPool();
154        registerShutdown(service);
155        return service;
156    }
157
158    /**
159     * Generates a ThreadPoolExecutor with the provided values, and registers that executor as listening for
160     * shutdown messages.
161     *
162     * @param coreSize the number of threads to keep in the pool, even if they are idle
163     * @param maxSize Max number of threads in the pool
164     * @param idleTimeout When the number of threads is greater than core, maximum time that excess idle
165     *                    threads will wait before terminating
166     * @param timeoutTimeunit The time unit for the idleTimeout argument
167     * @param runnables Queue of threads to be run
168     * @return a configured ExecutorService, registered to listen to shutdown messages.
169     */
170    public ExecutorService createThreadPool(int coreSize, int maxSize, long idleTimeout,
171                                            TimeUnit timeoutTimeunit, BlockingQueue<Runnable> runnables) {
172        Reject.ifTrue(coreSize < 0);
173        Reject.ifTrue(maxSize < coreSize || maxSize <= 0);
174        Reject.ifTrue(idleTimeout < 0);
175
176        ExecutorService service = new ThreadPoolExecutor(coreSize, maxSize, idleTimeout, timeoutTimeunit,
177                runnables);
178        registerShutdown(service);
179        return service;
180    }
181
182    /**
183     * Registers a listener to trigger shutdown of the ExecutorService.
184     * @param service Non null ExecutorService to register.
185     */
186    private void registerShutdown(final ExecutorService service) {
187        shutdownManager.addShutdownListener(
188                new ShutdownListener() {
189                    public void shutdown() {
190                        service.shutdownNow();
191                    }
192                });
193    }
194
195    /**
196     * Used to generate threads with a provided name. Each new thread will
197     * have its generated number appended to the end of it, in the form -X, where
198     * X is incremented once for each thread created.
199     */
200    private class NamedThreadFactory implements ThreadFactory {
201
202        private final AtomicInteger count = new AtomicInteger(0);
203        private final String name;
204
205        public NamedThreadFactory(String name) {
206            this.name = name;
207        }
208
209        public Thread newThread(Runnable r) {
210            return new Thread(r, name + "-" +  count.getAndIncrement());
211        }
212    }
213
214}