001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2014-2015 ForgeRock AS.
015 */
016
017package org.forgerock.openig.util;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.ConcurrentMap;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.Future;
024import java.util.concurrent.FutureTask;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027
028import org.forgerock.util.AsyncFunction;
029import org.forgerock.util.promise.Promise;
030import org.forgerock.util.promise.Promises;
031import org.forgerock.util.promise.ResultHandler;
032import org.forgerock.util.time.Duration;
033
034/**
035 * ThreadSafeCache is a thread-safe write-through cache.
036 * <p>
037 * Instead of storing directly the value in the backing Map, it requires the
038 * consumer to provide a value factory (a Callable). A new FutureTask
039 * encapsulate the callable, is executed and is placed inside a
040 * ConcurrentHashMap if absent.
041 * <p>
042 * The final behavior is that, even if two concurrent Threads are borrowing an
043 * object from the cache, given that they provide an equivalent value factory,
044 * the first one will compute the value while the other will get the result from
045 * the Future (and will wait until the result is computed or a timeout occurs).
046 * <p>
047 * By default, cache duration is set to 1 minute.
048 *
049 * @param <K>
050 *            Type of the key
051 * @param <V>
052 *            Type of the value
053 */
054public class ThreadSafeCache<K, V> {
055
056    private final ScheduledExecutorService executorService;
057    private final ConcurrentMap<K, Future<V>> cache = new ConcurrentHashMap<>();
058    private AsyncFunction<V, Duration, Exception> defaultTimeoutFunction;
059    private Duration defaultTimeout;
060
061    /**
062     * Build a new {@link ThreadSafeCache} using the given scheduled executor.
063     *
064     * @param executorService
065     *            scheduled executor for registering expiration callbacks.
066     */
067    public ThreadSafeCache(final ScheduledExecutorService executorService) {
068        this.executorService = executorService;
069        setDefaultTimeout(new Duration(1L, TimeUnit.MINUTES));
070    }
071
072    /**
073     * Sets the default cache entry expiration delay, if none provided in the
074     * caller. Notice that this will impact only new cache entries.
075     *
076     * @param defaultTimeout
077     *            new cache entry timeout
078     */
079    public void setDefaultTimeout(final Duration defaultTimeout) {
080        this.defaultTimeout = defaultTimeout;
081        this.defaultTimeoutFunction = new AsyncFunction<V, Duration, Exception>() {
082            @Override
083            public Promise<Duration, Exception> apply(V value) {
084                return Promises.newResultPromise(defaultTimeout);
085            }
086        };
087    }
088
089    private Future<V> createIfAbsent(final K key,
090                                     final Callable<V> callable,
091                                     final AsyncFunction<V, Duration, Exception> timeoutFunction)
092            throws InterruptedException, ExecutionException {
093        // See the javadoc of the class for the intent of the Future and FutureTask.
094        Future<V> future = cache.get(key);
095        if (future == null) {
096            // First call: no value cached for that key
097            final FutureTask<V> futureTask = new FutureTask<>(callable);
098            future = cache.putIfAbsent(key, futureTask);
099            if (future == null) {
100                // after the double check, it seems we are still the first to want to cache that value.
101                future = futureTask;
102
103                // Compute the value
104                futureTask.run();
105
106                scheduleEviction(key, futureTask.get(), timeoutFunction);
107            }
108        }
109        return future;
110    }
111
112    private void scheduleEviction(final K key,
113                                  final V value,
114                                  final AsyncFunction<V, Duration, Exception> timeoutFunction) {
115        try {
116            timeoutFunction.apply(value)
117                           .thenOnResult(new ResultHandler<Duration>() {
118                               @Override
119                               public void handleResult(Duration timeout) {
120                                   // Register cache entry expiration time
121                                   executorService.schedule(new Expiration(key),
122                                                            timeout.getValue(),
123                                                            timeout.getUnit());
124                               }
125                           });
126        } catch (Exception e) {
127            executorService.schedule(new Expiration(key), defaultTimeout.getValue(), defaultTimeout.getUnit());
128        }
129    }
130
131    /**
132     * Borrow (and create before hand if absent) a cache entry. If another
133     * Thread has created (or the creation is undergoing) the value, this
134     * methods waits indefinitely for the value to be available.
135     *
136     * @param key
137     *            entry key
138     * @param callable
139     *            cached value factory
140     * @return the cached value
141     * @throws InterruptedException
142     *             if the current thread was interrupted while waiting
143     * @throws ExecutionException
144     *             if the cached value computation threw an exception
145     */
146    public V getValue(final K key, final Callable<V> callable) throws InterruptedException,
147                                                                      ExecutionException {
148        return getValue(key, callable, defaultTimeoutFunction);
149    }
150
151    /**
152     * Borrow (and create before hand if absent) a cache entry. If another
153     * Thread has created (or the creation is undergoing) the value, this
154     * methods waits indefinitely for the value to be available.
155     *
156     * @param key
157     *            entry key
158     * @param callable
159     *            cached value factory
160     * @param expire
161     *            function to override the global cache's timeout
162     * @return the cached value
163     * @throws InterruptedException
164     *             if the current thread was interrupted while waiting
165     * @throws ExecutionException
166     *             if the cached value computation threw an exception
167     */
168    public V getValue(final K key,
169                      final Callable<V> callable,
170                      final AsyncFunction<V, Duration, Exception> expire) throws InterruptedException,
171                                                                                 ExecutionException {
172        try {
173            return createIfAbsent(key, callable, expire).get();
174        } catch (InterruptedException | RuntimeException | ExecutionException e) {
175            cache.remove(key);
176            throw e;
177        }
178    }
179
180    /**
181     * Clean-up the cache entries.
182     */
183    public void clear() {
184        cache.clear();
185    }
186
187    /**
188     * Registered in the executor, this callable simply removes the cache entry
189     * after a specified amount of time.
190     */
191    private class Expiration implements Callable<Object> {
192        private final K key;
193
194        public Expiration(final K key) {
195            this.key = key;
196        }
197
198        @Override
199        public Object call() throws Exception {
200            return cache.remove(key);
201        }
202    }
203}