001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2008 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2015 ForgeRock AS. 016 */ 017package org.forgerock.audit.events.handlers.writers; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.TimeUnit; 028 029import org.forgerock.util.Reject; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT; 034import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT; 035 036/** 037 * A Text Writer which writes log records asynchronously to character-based stream. 038 * <p> 039 * The records are buffered in a queue and written asynchronously. If maximum CAPACITY of the queue is 040 * reached, then calls to {@code write()} method are blocked. This prevent OOM errors while allowing 041 * good write performances. 042 */ 043public class AsynchronousTextWriter implements TextWriter { 044 045 private static final Logger logger = LoggerFactory.getLogger(AsynchronousTextWriter.class); 046 /** Maximum number of messages that can be queued before producers start to block. */ 047 private static final int CAPACITY = 5000; 048 049 /** The wrapped Text Writer. */ 050 private final TextWriter writer; 051 052 /** Queue to store unpublished records. */ 053 private final BlockingQueue<String> queue; 054 /** Single threaded executor which runs the WriterTask. */ 055 private final ExecutorService executorService; 056 /** Flag for determining if the wrapped TextWriter should be flushed after each event is written. */ 057 private final boolean autoFlush; 058 /** Flag for notifying the WriterTask to exit. */ 059 private volatile boolean stopRequested; 060 061 /** 062 * Construct a new AsynchronousTextWriter wrapper. 063 * 064 * @param name 065 * the name of the thread. 066 * @param autoFlush 067 * indicates if the underlying writer should be flushed after the queue is flushed. 068 * @param writer 069 * a character stream used for output. 070 */ 071 public AsynchronousTextWriter(final String name, final boolean autoFlush, final TextWriter writer) { 072 Reject.ifNull(writer); 073 this.autoFlush = autoFlush; 074 this.writer = writer; 075 this.queue = new LinkedBlockingQueue<>(CAPACITY); 076 this.stopRequested = false; 077 this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() { 078 @Override 079 public Thread newThread(Runnable runnable) { 080 return new Thread(runnable, name); 081 } 082 }); 083 executorService.execute(new WriterTask()); 084 } 085 086 /** 087 * The publisher thread is responsible for emptying the queue of log records waiting to published. 088 */ 089 private class WriterTask implements Runnable { 090 091 /** 092 * Runs until queue is empty AND we've been asked to terminate. 093 */ 094 @Override 095 public void run() { 096 List<String> drainList = new ArrayList<>(CAPACITY); 097 098 boolean interrupted = false; 099 while (!stopRequested || !queue.isEmpty()) { 100 try { 101 queue.drainTo(drainList, CAPACITY); 102 if (drainList.isEmpty()) { 103 String message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT); 104 if (message != null) { 105 writeMessage(message); 106 if (autoFlush) { 107 flush(); 108 } 109 } 110 } else { 111 for (String message : drainList) { 112 writeMessage(message); 113 } 114 drainList.clear(); 115 if (autoFlush) { 116 flush(); 117 } 118 } 119 } catch (InterruptedException ex) { 120 // Ignore. We'll rerun the loop 121 // and presumably fall out. 122 interrupted = true; 123 } 124 } 125 if (interrupted) { 126 Thread.currentThread().interrupt(); 127 } 128 } 129 } 130 131 private void writeMessage(String message) { 132 try { 133 writer.write(message); 134 } catch (IOException e) { 135 logger.error("Error when writing a message, message size: " + message.length(), e); 136 } 137 } 138 139 /** 140 * Write the log record asynchronously. 141 * 142 * @param record 143 * the log record to write. 144 */ 145 @Override 146 public void write(String record) throws IOException { 147 boolean interrupted = false; 148 boolean enqueued = false; 149 while (!stopRequested) { 150 // Put request on queue for writer 151 try { 152 queue.put(record); 153 enqueued = true; 154 break; 155 } catch (InterruptedException e) { 156 // We expect this to happen. Just ignore it and hopefully 157 // drop out in the next try. 158 interrupted = true; 159 } 160 } 161 if (interrupted) { 162 Thread.currentThread().interrupt(); 163 } 164 // Inform caller if this writer has been shutdown 165 if (!enqueued) { 166 throw new IOException("Writer closed"); 167 } 168 } 169 170 @Override 171 public void flush() { 172 try { 173 writer.flush(); 174 } catch (IOException e) { 175 logger.error("Error when flushing the writer", e); 176 } 177 } 178 179 /** {@inheritDoc} */ 180 @Override 181 public long getBytesWritten() { 182 return writer.getBytesWritten(); 183 } 184 185 /** 186 * Retrieves the wrapped writer. 187 * 188 * @return The wrapped writer used by this asynchronous writer. 189 */ 190 public TextWriter getWrappedWriter() { 191 return writer; 192 } 193 194 /** {@inheritDoc} */ 195 @Override 196 public void shutdown() { 197 shutdown(true); 198 } 199 200 /** 201 * Releases any resources held by the writer. 202 * 203 * @param shutdownWrapped 204 * If the wrapped writer should be closed as well. 205 */ 206 public void shutdown(boolean shutdownWrapped) { 207 stopRequested = true; 208 209 // Wait for writer thread to terminate 210 executorService.shutdown(); 211 boolean interrupted = false; 212 while (!executorService.isTerminated()) { 213 try { 214 executorService.awaitTermination(1, TimeUnit.MINUTES); 215 } catch (InterruptedException e) { 216 interrupted = true; 217 } 218 } 219 220 // Shutdown the wrapped writer. 221 if (shutdownWrapped) { 222 writer.shutdown(); 223 } 224 225 if (interrupted) { 226 Thread.currentThread().interrupt(); 227 } 228 } 229}