001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2015 ForgeRock AS. 015 */ 016 017package org.forgerock.openig.util; 018 019import java.util.concurrent.Callable; 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.ConcurrentMap; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.Future; 024import java.util.concurrent.FutureTask; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.TimeUnit; 027 028import org.forgerock.util.AsyncFunction; 029import org.forgerock.util.promise.Promise; 030import org.forgerock.util.promise.Promises; 031import org.forgerock.util.promise.ResultHandler; 032import org.forgerock.util.time.Duration; 033 034/** 035 * ThreadSafeCache is a thread-safe write-through cache. 036 * <p> 037 * Instead of storing directly the value in the backing Map, it requires the 038 * consumer to provide a value factory (a Callable). A new FutureTask 039 * encapsulate the callable, is executed and is placed inside a 040 * ConcurrentHashMap if absent. 041 * <p> 042 * The final behavior is that, even if two concurrent Threads are borrowing an 043 * object from the cache, given that they provide an equivalent value factory, 044 * the first one will compute the value while the other will get the result from 045 * the Future (and will wait until the result is computed or a timeout occurs). 046 * <p> 047 * By default, cache duration is set to 1 minute. 048 * 049 * @param <K> 050 * Type of the key 051 * @param <V> 052 * Type of the value 053 */ 054public class ThreadSafeCache<K, V> { 055 056 private final ScheduledExecutorService executorService; 057 private final ConcurrentMap<K, Future<V>> cache = new ConcurrentHashMap<>(); 058 private AsyncFunction<V, Duration, Exception> defaultTimeoutFunction; 059 private Duration defaultTimeout; 060 061 /** 062 * Build a new {@link ThreadSafeCache} using the given scheduled executor. 063 * 064 * @param executorService 065 * scheduled executor for registering expiration callbacks. 066 */ 067 public ThreadSafeCache(final ScheduledExecutorService executorService) { 068 this.executorService = executorService; 069 setDefaultTimeout(new Duration(1L, TimeUnit.MINUTES)); 070 } 071 072 /** 073 * Sets the default cache entry expiration delay, if none provided in the 074 * caller. Notice that this will impact only new cache entries. 075 * 076 * @param defaultTimeout 077 * new cache entry timeout 078 */ 079 public void setDefaultTimeout(final Duration defaultTimeout) { 080 this.defaultTimeout = defaultTimeout; 081 this.defaultTimeoutFunction = new AsyncFunction<V, Duration, Exception>() { 082 @Override 083 public Promise<Duration, Exception> apply(V value) { 084 return Promises.newResultPromise(defaultTimeout); 085 } 086 }; 087 } 088 089 private Future<V> createIfAbsent(final K key, 090 final Callable<V> callable, 091 final AsyncFunction<V, Duration, Exception> timeoutFunction) 092 throws InterruptedException, ExecutionException { 093 // See the javadoc of the class for the intent of the Future and FutureTask. 094 Future<V> future = cache.get(key); 095 if (future == null) { 096 // First call: no value cached for that key 097 final FutureTask<V> futureTask = new FutureTask<>(callable); 098 future = cache.putIfAbsent(key, futureTask); 099 if (future == null) { 100 // after the double check, it seems we are still the first to want to cache that value. 101 future = futureTask; 102 103 // Compute the value 104 futureTask.run(); 105 106 scheduleEviction(key, futureTask.get(), timeoutFunction); 107 } 108 } 109 return future; 110 } 111 112 private void scheduleEviction(final K key, 113 final V value, 114 final AsyncFunction<V, Duration, Exception> timeoutFunction) { 115 try { 116 timeoutFunction.apply(value) 117 .thenOnResult(new ResultHandler<Duration>() { 118 @Override 119 public void handleResult(Duration timeout) { 120 // Register cache entry expiration time 121 executorService.schedule(new Expiration(key), 122 timeout.getValue(), 123 timeout.getUnit()); 124 } 125 }); 126 } catch (Exception e) { 127 executorService.schedule(new Expiration(key), defaultTimeout.getValue(), defaultTimeout.getUnit()); 128 } 129 } 130 131 /** 132 * Borrow (and create before hand if absent) a cache entry. If another 133 * Thread has created (or the creation is undergoing) the value, this 134 * methods waits indefinitely for the value to be available. 135 * 136 * @param key 137 * entry key 138 * @param callable 139 * cached value factory 140 * @return the cached value 141 * @throws InterruptedException 142 * if the current thread was interrupted while waiting 143 * @throws ExecutionException 144 * if the cached value computation threw an exception 145 */ 146 public V getValue(final K key, final Callable<V> callable) throws InterruptedException, 147 ExecutionException { 148 return getValue(key, callable, defaultTimeoutFunction); 149 } 150 151 /** 152 * Borrow (and create before hand if absent) a cache entry. If another 153 * Thread has created (or the creation is undergoing) the value, this 154 * methods waits indefinitely for the value to be available. 155 * 156 * @param key 157 * entry key 158 * @param callable 159 * cached value factory 160 * @param expire 161 * function to override the global cache's timeout 162 * @return the cached value 163 * @throws InterruptedException 164 * if the current thread was interrupted while waiting 165 * @throws ExecutionException 166 * if the cached value computation threw an exception 167 */ 168 public V getValue(final K key, 169 final Callable<V> callable, 170 final AsyncFunction<V, Duration, Exception> expire) throws InterruptedException, 171 ExecutionException { 172 try { 173 return createIfAbsent(key, callable, expire).get(); 174 } catch (InterruptedException | RuntimeException | ExecutionException e) { 175 cache.remove(key); 176 throw e; 177 } 178 } 179 180 /** 181 * Clean-up the cache entries. 182 */ 183 public void clear() { 184 cache.clear(); 185 } 186 187 /** 188 * Registered in the executor, this callable simply removes the cache entry 189 * after a specified amount of time. 190 */ 191 private class Expiration implements Callable<Object> { 192 private final K key; 193 194 public Expiration(final K key) { 195 this.key = key; 196 } 197 198 @Override 199 public Object call() throws Exception { 200 return cache.remove(key); 201 } 202 } 203}