001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 *      Copyright 2006-2008 Sun Microsystems, Inc.
015 *      Portions Copyright 2013-2015 ForgeRock AS.
016 */
017package org.forgerock.audit.events.handlers.writers;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.TimeUnit;
028
029import org.forgerock.util.Reject;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
034import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
035
036/**
037 * A Text Writer which writes log records asynchronously to character-based stream.
038 * <p>
039 * The records are buffered in a queue and written asynchronously. If maximum CAPACITY of the queue is
040 * reached, then calls to {@code write()} method are blocked. This prevent OOM errors while allowing
041 * good write performances.
042 */
043public class AsynchronousTextWriter implements TextWriter {
044
045    private static final Logger logger = LoggerFactory.getLogger(AsynchronousTextWriter.class);
046    /** Maximum number of messages that can be queued before producers start to block. */
047    private static final int CAPACITY = 5000;
048
049    /** The wrapped Text Writer. */
050    private final TextWriter writer;
051
052    /** Queue to store unpublished records. */
053    private final BlockingQueue<String> queue;
054    /** Single threaded executor which runs the WriterTask. */
055    private final ExecutorService executorService;
056    /** Flag for determining if the wrapped TextWriter should be flushed after each event is written. */
057    private final boolean autoFlush;
058    /** Flag for notifying the WriterTask to exit. */
059    private volatile boolean stopRequested;
060
061    /**
062     * Construct a new AsynchronousTextWriter wrapper.
063     *
064     * @param name
065     *            the name of the thread.
066     * @param autoFlush
067     *            indicates if the underlying writer should be flushed after the queue is flushed.
068     * @param writer
069     *            a character stream used for output.
070     */
071    public AsynchronousTextWriter(final String name, final boolean autoFlush, final TextWriter writer) {
072        Reject.ifNull(writer);
073        this.autoFlush = autoFlush;
074        this.writer = writer;
075        this.queue = new LinkedBlockingQueue<>(CAPACITY);
076        this.stopRequested = false;
077        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
078            @Override
079            public Thread newThread(Runnable runnable) {
080                return new Thread(runnable, name);
081            }
082        });
083        executorService.execute(new WriterTask());
084    }
085
086    /**
087     * The publisher thread is responsible for emptying the queue of log records waiting to published.
088     */
089    private class WriterTask implements Runnable {
090
091        /**
092         * Runs until queue is empty AND we've been asked to terminate.
093         */
094        @Override
095        public void run() {
096            List<String> drainList = new ArrayList<>(CAPACITY);
097
098            boolean interrupted = false;
099            while (!stopRequested || !queue.isEmpty()) {
100                try {
101                    queue.drainTo(drainList, CAPACITY);
102                    if (drainList.isEmpty()) {
103                        String message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
104                        if (message != null) {
105                            writeMessage(message);
106                            if (autoFlush) {
107                                flush();
108                            }
109                        }
110                    } else {
111                        for (String message : drainList) {
112                            writeMessage(message);
113                        }
114                        drainList.clear();
115                        if (autoFlush) {
116                            flush();
117                        }
118                    }
119                } catch (InterruptedException ex) {
120                    // Ignore. We'll rerun the loop
121                    // and presumably fall out.
122                    interrupted = true;
123                }
124            }
125            if (interrupted) {
126                Thread.currentThread().interrupt();
127            }
128        }
129    }
130
131    private void writeMessage(String message) {
132        try {
133            writer.write(message);
134        } catch (IOException e) {
135            logger.error("Error when writing a message, message size: " + message.length(), e);
136        }
137    }
138
139    /**
140     * Write the log record asynchronously.
141     *
142     * @param record
143     *            the log record to write.
144     */
145    @Override
146    public void write(String record) throws IOException {
147        boolean interrupted = false;
148        boolean enqueued = false;
149        while (!stopRequested) {
150            // Put request on queue for writer
151            try {
152                queue.put(record);
153                enqueued = true;
154                break;
155            } catch (InterruptedException e) {
156                // We expect this to happen. Just ignore it and hopefully
157                // drop out in the next try.
158                interrupted = true;
159            }
160        }
161        if (interrupted) {
162            Thread.currentThread().interrupt();
163        }
164        // Inform caller if this writer has been shutdown
165        if (!enqueued) {
166            throw new IOException("Writer closed");
167        }
168    }
169
170    @Override
171    public void flush() {
172        try {
173            writer.flush();
174        } catch (IOException e) {
175            logger.error("Error  when flushing the writer", e);
176        }
177    }
178
179    /** {@inheritDoc} */
180    @Override
181    public long getBytesWritten() {
182        return writer.getBytesWritten();
183    }
184
185    /**
186     * Retrieves the wrapped writer.
187     *
188     * @return The wrapped writer used by this asynchronous writer.
189     */
190    public TextWriter getWrappedWriter() {
191        return writer;
192    }
193
194    /** {@inheritDoc} */
195    @Override
196    public void shutdown() {
197        shutdown(true);
198    }
199
200    /**
201     * Releases any resources held by the writer.
202     *
203     * @param shutdownWrapped
204     *            If the wrapped writer should be closed as well.
205     */
206    public void shutdown(boolean shutdownWrapped) {
207        stopRequested = true;
208
209        // Wait for writer thread to terminate
210        executorService.shutdown();
211        boolean interrupted = false;
212        while (!executorService.isTerminated()) {
213            try {
214                executorService.awaitTermination(1, TimeUnit.MINUTES);
215            } catch (InterruptedException e) {
216                interrupted = true;
217            }
218        }
219
220        // Shutdown the wrapped writer.
221        if (shutdownWrapped) {
222            writer.shutdown();
223        }
224
225        if (interrupted) {
226            Thread.currentThread().interrupt();
227        }
228    }
229}