001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 *      Copyright 2006-2008 Sun Microsystems, Inc.
015 *      Portions Copyright 2013-2015 ForgeRock AS.
016 */
017package org.forgerock.audit.events.handlers.writers;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.TimeUnit;
028
029import org.forgerock.util.Reject;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Text Writer which writes log records asynchronously to character-based stream.
035 * <p>
036 * The records are buffered in a queue and written asynchronously. If maximum CAPACITY of the queue is
037 * reached, then calls to {@code write()} method are blocked. This prevent OOM errors while allowing
038 * good write performances.
039 */
040public class AsynchronousTextWriter implements TextWriter {
041
042    private static final Logger logger = LoggerFactory.getLogger(AsynchronousTextWriter.class);
043    /** Maximum number of messages that can be queued before producers start to block. */
044    private static final int CAPACITY = 5000;
045
046    /** The wrapped Text Writer. */
047    private final TextWriter writer;
048
049    /** Queue to store unpublished records. */
050    private final BlockingQueue<String> queue;
051    /** Single threaded executor which runs the WriterTask. */
052    private final ExecutorService executorService;
053    /** Flag for determining if the wrapped TextWriter should be flushed after each event is written. */
054    private final boolean autoFlush;
055    /** Flag for notifying the WriterTask to exit. */
056    private volatile boolean stopRequested;
057
058    /**
059     * Construct a new AsynchronousTextWriter wrapper.
060     *
061     * @param name
062     *            the name of the thread.
063     * @param autoFlush
064     *            indicates if the underlying writer should be flushed after the queue is flushed.
065     * @param writer
066     *            a character stream used for output.
067     */
068    public AsynchronousTextWriter(final String name, final boolean autoFlush, final TextWriter writer) {
069        Reject.ifNull(writer);
070        this.autoFlush = autoFlush;
071        this.writer = writer;
072        this.queue = new LinkedBlockingQueue<>(CAPACITY);
073        this.stopRequested = false;
074        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
075            @Override
076            public Thread newThread(Runnable runnable) {
077                return new Thread(runnable, name);
078            }
079        });
080        executorService.execute(new WriterTask());
081    }
082
083    /**
084     * The publisher thread is responsible for emptying the queue of log records waiting to published.
085     */
086    private class WriterTask implements Runnable {
087
088        /**
089         * Runs until queue is empty AND we've been asked to terminate.
090         */
091        @Override
092        public void run() {
093            List<String> drainList = new ArrayList<>(CAPACITY);
094
095            boolean interrupted = false;
096            while (!stopRequested || !queue.isEmpty()) {
097                try {
098                    queue.drainTo(drainList, CAPACITY);
099                    if (drainList.isEmpty()) {
100                        String message = queue.poll(10, TimeUnit.SECONDS);
101                        if (message != null) {
102                            writeMessage(message);
103                            if (autoFlush) {
104                                flush();
105                            }
106                        }
107                    } else {
108                        for (String message : drainList) {
109                            writeMessage(message);
110                        }
111                        drainList.clear();
112                        if (autoFlush) {
113                            flush();
114                        }
115                    }
116                } catch (InterruptedException ex) {
117                    // Ignore. We'll rerun the loop
118                    // and presumably fall out.
119                    interrupted = true;
120                }
121            }
122            if (interrupted) {
123                Thread.currentThread().interrupt();
124            }
125        }
126    }
127
128    private void writeMessage(String message) {
129        try {
130            writer.write(message);
131        } catch (IOException e) {
132            logger.error("Error when writing a message, message size: " + message.length(), e);
133        }
134    }
135
136    /**
137     * Write the log record asynchronously.
138     *
139     * @param record
140     *            the log record to write.
141     */
142    @Override
143    public void write(String record) throws IOException {
144        boolean interrupted = false;
145        boolean enqueued = false;
146        while (!stopRequested) {
147            // Put request on queue for writer
148            try {
149                queue.put(record);
150                enqueued = true;
151                break;
152            } catch (InterruptedException e) {
153                // We expect this to happen. Just ignore it and hopefully
154                // drop out in the next try.
155                interrupted = true;
156            }
157        }
158        if (interrupted) {
159            Thread.currentThread().interrupt();
160        }
161        // Inform caller if this writer has been shutdown
162        if (!enqueued) {
163            throw new IOException("Writer closed");
164        }
165    }
166
167    @Override
168    public void flush() {
169        try {
170            writer.flush();
171        } catch (IOException e) {
172            logger.error("Error  when flushing the writer", e);
173        }
174    }
175
176    /** {@inheritDoc} */
177    @Override
178    public long getBytesWritten() {
179        return writer.getBytesWritten();
180    }
181
182    /**
183     * Retrieves the wrapped writer.
184     *
185     * @return The wrapped writer used by this asynchronous writer.
186     */
187    public TextWriter getWrappedWriter() {
188        return writer;
189    }
190
191    /** {@inheritDoc} */
192    @Override
193    public void shutdown() {
194        shutdown(true);
195    }
196
197    /**
198     * Releases any resources held by the writer.
199     *
200     * @param shutdownWrapped
201     *            If the wrapped writer should be closed as well.
202     */
203    public void shutdown(boolean shutdownWrapped) {
204        stopRequested = true;
205
206        // Wait for writer thread to terminate
207        executorService.shutdown();
208        boolean interrupted = false;
209        while (!executorService.isTerminated()) {
210            try {
211                executorService.awaitTermination(1, TimeUnit.MINUTES);
212            } catch (InterruptedException e) {
213                interrupted = true;
214            }
215        }
216
217        // Shutdown the wrapped writer.
218        if (shutdownWrapped) {
219            writer.shutdown();
220        }
221
222        if (interrupted) {
223            Thread.currentThread().interrupt();
224        }
225    }
226}