001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2008 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2015 ForgeRock AS. 016 */ 017package org.forgerock.audit.events.handlers.writers; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.TimeUnit; 028 029import org.forgerock.util.Reject; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Text Writer which writes log records asynchronously to character-based stream. 035 * <p> 036 * The records are buffered in a queue and written asynchronously. If maximum CAPACITY of the queue is 037 * reached, then calls to {@code write()} method are blocked. This prevent OOM errors while allowing 038 * good write performances. 039 */ 040public class AsynchronousTextWriter implements TextWriter { 041 042 private static final Logger logger = LoggerFactory.getLogger(AsynchronousTextWriter.class); 043 /** Maximum number of messages that can be queued before producers start to block. */ 044 private static final int CAPACITY = 5000; 045 046 /** The wrapped Text Writer. */ 047 private final TextWriter writer; 048 049 /** Queue to store unpublished records. */ 050 private final BlockingQueue<String> queue; 051 /** Single threaded executor which runs the WriterTask. */ 052 private final ExecutorService executorService; 053 /** Flag for determining if the wrapped TextWriter should be flushed after each event is written. */ 054 private final boolean autoFlush; 055 /** Flag for notifying the WriterTask to exit. */ 056 private volatile boolean stopRequested; 057 058 /** 059 * Construct a new AsynchronousTextWriter wrapper. 060 * 061 * @param name 062 * the name of the thread. 063 * @param autoFlush 064 * indicates if the underlying writer should be flushed after the queue is flushed. 065 * @param writer 066 * a character stream used for output. 067 */ 068 public AsynchronousTextWriter(final String name, final boolean autoFlush, final TextWriter writer) { 069 Reject.ifNull(writer); 070 this.autoFlush = autoFlush; 071 this.writer = writer; 072 this.queue = new LinkedBlockingQueue<>(CAPACITY); 073 this.stopRequested = false; 074 this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() { 075 @Override 076 public Thread newThread(Runnable runnable) { 077 return new Thread(runnable, name); 078 } 079 }); 080 executorService.execute(new WriterTask()); 081 } 082 083 /** 084 * The publisher thread is responsible for emptying the queue of log records waiting to published. 085 */ 086 private class WriterTask implements Runnable { 087 088 /** 089 * Runs until queue is empty AND we've been asked to terminate. 090 */ 091 @Override 092 public void run() { 093 List<String> drainList = new ArrayList<>(CAPACITY); 094 095 boolean interrupted = false; 096 while (!stopRequested || !queue.isEmpty()) { 097 try { 098 queue.drainTo(drainList, CAPACITY); 099 if (drainList.isEmpty()) { 100 String message = queue.poll(10, TimeUnit.SECONDS); 101 if (message != null) { 102 writeMessage(message); 103 if (autoFlush) { 104 flush(); 105 } 106 } 107 } else { 108 for (String message : drainList) { 109 writeMessage(message); 110 } 111 drainList.clear(); 112 if (autoFlush) { 113 flush(); 114 } 115 } 116 } catch (InterruptedException ex) { 117 // Ignore. We'll rerun the loop 118 // and presumably fall out. 119 interrupted = true; 120 } 121 } 122 if (interrupted) { 123 Thread.currentThread().interrupt(); 124 } 125 } 126 } 127 128 private void writeMessage(String message) { 129 try { 130 writer.write(message); 131 } catch (IOException e) { 132 logger.error("Error when writing a message, message size: " + message.length(), e); 133 } 134 } 135 136 /** 137 * Write the log record asynchronously. 138 * 139 * @param record 140 * the log record to write. 141 */ 142 @Override 143 public void write(String record) throws IOException { 144 boolean interrupted = false; 145 boolean enqueued = false; 146 while (!stopRequested) { 147 // Put request on queue for writer 148 try { 149 queue.put(record); 150 enqueued = true; 151 break; 152 } catch (InterruptedException e) { 153 // We expect this to happen. Just ignore it and hopefully 154 // drop out in the next try. 155 interrupted = true; 156 } 157 } 158 if (interrupted) { 159 Thread.currentThread().interrupt(); 160 } 161 // Inform caller if this writer has been shutdown 162 if (!enqueued) { 163 throw new IOException("Writer closed"); 164 } 165 } 166 167 @Override 168 public void flush() { 169 try { 170 writer.flush(); 171 } catch (IOException e) { 172 logger.error("Error when flushing the writer", e); 173 } 174 } 175 176 /** {@inheritDoc} */ 177 @Override 178 public long getBytesWritten() { 179 return writer.getBytesWritten(); 180 } 181 182 /** 183 * Retrieves the wrapped writer. 184 * 185 * @return The wrapped writer used by this asynchronous writer. 186 */ 187 public TextWriter getWrappedWriter() { 188 return writer; 189 } 190 191 /** {@inheritDoc} */ 192 @Override 193 public void shutdown() { 194 shutdown(true); 195 } 196 197 /** 198 * Releases any resources held by the writer. 199 * 200 * @param shutdownWrapped 201 * If the wrapped writer should be closed as well. 202 */ 203 public void shutdown(boolean shutdownWrapped) { 204 stopRequested = true; 205 206 // Wait for writer thread to terminate 207 executorService.shutdown(); 208 boolean interrupted = false; 209 while (!executorService.isTerminated()) { 210 try { 211 executorService.awaitTermination(1, TimeUnit.MINUTES); 212 } catch (InterruptedException e) { 213 interrupted = true; 214 } 215 } 216 217 // Shutdown the wrapped writer. 218 if (shutdownWrapped) { 219 writer.shutdown(); 220 } 221 222 if (interrupted) { 223 Thread.currentThread().interrupt(); 224 } 225 } 226}