001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015 ForgeRock AS.
015 */
016package org.forgerock.audit.handlers.csv;
017
018import static java.lang.String.format;
019import static org.forgerock.audit.events.AuditEventHelper.ARRAY_TYPE;
020import static org.forgerock.audit.events.AuditEventHelper.OBJECT_TYPE;
021import static org.forgerock.audit.events.AuditEventHelper.dotNotationToJsonPointer;
022import static org.forgerock.audit.events.AuditEventHelper.getAuditEventProperties;
023import static org.forgerock.audit.events.AuditEventHelper.getAuditEventSchema;
024import static org.forgerock.audit.events.AuditEventHelper.getPropertyType;
025import static org.forgerock.audit.events.AuditEventHelper.jsonPointerToDotNotation;
026import static org.forgerock.audit.util.JsonSchemaUtils.generateJsonPointers;
027import static org.forgerock.audit.util.JsonValueUtils.JSONVALUE_FILTER_VISITOR;
028import static org.forgerock.audit.util.JsonValueUtils.expand;
029import static org.forgerock.json.JsonValue.field;
030import static org.forgerock.json.JsonValue.json;
031import static org.forgerock.json.JsonValue.object;
032import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
033import static org.forgerock.json.resource.Responses.newQueryResponse;
034import static org.forgerock.json.resource.Responses.newResourceResponse;
035
036import java.io.File;
037import java.io.FileReader;
038import java.io.IOException;
039import java.security.NoSuchAlgorithmException;
040import java.security.SecureRandom;
041import java.util.ArrayList;
042import java.util.Collection;
043import java.util.Collections;
044import java.util.HashMap;
045import java.util.HashSet;
046import java.util.LinkedHashMap;
047import java.util.LinkedHashSet;
048import java.util.List;
049import java.util.Map;
050import java.util.Random;
051import java.util.Set;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054
055import javax.inject.Inject;
056
057import org.forgerock.audit.Audit;
058import org.forgerock.audit.events.EventTopicsMetaData;
059import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
060import org.forgerock.audit.handlers.csv.CsvAuditEventHandlerConfiguration.CsvSecurity;
061import org.forgerock.audit.handlers.csv.CsvAuditEventHandlerConfiguration.EventBufferingConfiguration;
062import org.forgerock.audit.providers.KeyStoreHandlerProvider;
063import org.forgerock.audit.retention.TimeStampFileNamingPolicy;
064import org.forgerock.audit.secure.JcaKeyStoreHandler;
065import org.forgerock.audit.secure.KeyStoreHandler;
066import org.forgerock.audit.util.JsonValueUtils;
067import org.forgerock.json.JsonPointer;
068import org.forgerock.json.JsonValue;
069import org.forgerock.json.resource.ActionRequest;
070import org.forgerock.json.resource.ActionResponse;
071import org.forgerock.json.resource.BadRequestException;
072import org.forgerock.json.resource.InternalServerErrorException;
073import org.forgerock.json.resource.NotFoundException;
074import org.forgerock.json.resource.QueryFilters;
075import org.forgerock.json.resource.QueryRequest;
076import org.forgerock.json.resource.QueryResourceHandler;
077import org.forgerock.json.resource.QueryResponse;
078import org.forgerock.json.resource.ResourceException;
079import org.forgerock.json.resource.ResourceResponse;
080import org.forgerock.json.resource.Responses;
081import org.forgerock.services.context.Context;
082import org.forgerock.util.Reject;
083import org.forgerock.util.promise.Promise;
084import org.forgerock.util.query.QueryFilter;
085import org.forgerock.util.time.Duration;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088import org.supercsv.cellprocessor.Optional;
089import org.supercsv.cellprocessor.ift.CellProcessor;
090import org.supercsv.io.CsvMapReader;
091import org.supercsv.io.ICsvMapReader;
092import org.supercsv.prefs.CsvPreference;
093import org.supercsv.quote.AlwaysQuoteMode;
094import org.supercsv.util.CsvContext;
095
096import com.fasterxml.jackson.databind.ObjectMapper;
097
098/**
099 * Handles AuditEvents by writing them to a CSV file.
100 */
101public class CsvAuditEventHandler extends AuditEventHandlerBase {
102
103    private static final Logger logger = LoggerFactory.getLogger(CsvAuditEventHandler.class);
104
105    /** Name of action to force file rotation. */
106    public static final String ROTATE_FILE_ACTION_NAME = "rotate";
107
108    static final String SECURE_CSV_FILENAME_PREFIX = "tamper-evident-";
109
110    private static final ObjectMapper mapper = new ObjectMapper();
111    private static final Random random;
112
113    static {
114        try {
115            random = SecureRandom.getInstance("SHA1PRNG");
116        } catch (NoSuchAlgorithmException ex) {
117            throw new RuntimeException(ex);
118        }
119    }
120
121    private final CsvAuditEventHandlerConfiguration configuration;
122    private final CsvPreference csvPreference;
123    private final ConcurrentMap<String, CsvWriter> writers = new ConcurrentHashMap<>();
124    private final Map<String, Set<String>> fieldOrderByTopic;
125    /** Caches a JSON pointer for each field. */
126    private final Map<String, JsonPointer> jsonPointerByField;
127    /** Caches the dot notation for each field. */
128    private final Map<String, String> fieldDotNotationByField;
129    private KeyStoreHandler keyStoreHandler;
130
131    /**
132     * Create a new CsvAuditEventHandler instance.
133     *
134     * @param configuration
135     *          Configuration parameters that can be adjusted by system administrators.
136     * @param eventTopicsMetaData
137     *          Meta-data for all audit event topics.
138     * @param keyStoreHandlerProvider
139     *          The secure storage to use for keys.
140     */
141    @Inject
142    public CsvAuditEventHandler(
143            final CsvAuditEventHandlerConfiguration configuration,
144            final EventTopicsMetaData eventTopicsMetaData,
145            @Audit KeyStoreHandlerProvider keyStoreHandlerProvider) {
146
147        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
148        this.configuration = configuration;
149        this.csvPreference = createCsvPreference(this.configuration);
150        CsvSecurity security = configuration.getSecurity();
151        if (security.isEnabled()) {
152            Duration duration = security.getSignatureIntervalDuration();
153            Reject.ifTrue(duration.isZero() || duration.isUnlimited(),
154                    "The signature interval can't be zero or unlimited");
155
156            if (security.getKeyStoreHandlerName() != null) {
157                this.keyStoreHandler = keyStoreHandlerProvider.getKeystoreHandler(security.getKeyStoreHandlerName());
158            } else {
159                try {
160                    keyStoreHandler = new JcaKeyStoreHandler(CsvSecureConstants.KEYSTORE_TYPE, security.getFilename(),
161                            security.getPassword());
162                } catch (Exception e) {
163                    throw new IllegalArgumentException(
164                            "Unable to create secure storage from file: " + security.getFilename(), e);
165                }
166            }
167        }
168
169        Map<String, Set<String>> fieldOrderByTopic = new HashMap<>();
170        Map<String, JsonPointer> jsonPointerByField = new HashMap<>();
171        Map<String, String> fieldDotNotationByField = new HashMap<>();
172        for (String topic : this.eventTopicsMetaData.getTopics()) {
173            try {
174                Set<String> fieldOrder = getFieldOrder(topic, this.eventTopicsMetaData);
175                for (String field : fieldOrder) {
176                    if (!jsonPointerByField.containsKey(field)) {
177                        jsonPointerByField.put(field, new JsonPointer(field));
178                        fieldDotNotationByField.put(field, jsonPointerToDotNotation(field));
179                    }
180                }
181                fieldOrderByTopic.put(topic, Collections.unmodifiableSet(fieldOrder));
182            } catch (ResourceException e) {
183                logger.error(topic + " topic schema meta-data misconfigured.");
184            }
185        }
186        this.fieldOrderByTopic = Collections.unmodifiableMap(fieldOrderByTopic);
187        this.jsonPointerByField = Collections.unmodifiableMap(jsonPointerByField);
188        this.fieldDotNotationByField = Collections.unmodifiableMap(fieldDotNotationByField);
189    }
190
191    private CsvPreference createCsvPreference(final CsvAuditEventHandlerConfiguration config) {
192        return new CsvPreference.Builder(
193                config.getFormatting().getQuoteChar(),
194                config.getFormatting().getDelimiterChar(),
195                config.getFormatting().getEndOfLineSymbols())
196                .useQuoteMode(new AlwaysQuoteMode())
197                .build();
198    }
199
200    /**
201     * {@inheritDoc}
202     */
203    @Override
204    public void startup() throws ResourceException {
205        logger.trace("Audit logging to: {}", configuration.getLogDirectory());
206        File file = new File(configuration.getLogDirectory());
207        if (!file.isDirectory()) {
208            if (file.exists()) {
209                logger.warn("Specified path is file but should be a directory: {}", configuration.getLogDirectory());
210            } else {
211                if (!file.mkdirs()) {
212                    logger.warn("Unable to create audit directory in the path: {}", configuration.getLogDirectory());
213                }
214            }
215        }
216        for (String topic : eventTopicsMetaData.getTopics()) {
217            File auditLogFile = getAuditLogFile(topic);
218            try {
219                openWriter(topic, auditLogFile);
220            } catch (IOException e) {
221                logger.error("Error when creating audit file: {}", auditLogFile, e);
222            }
223        }
224    }
225
226    /** {@inheritDoc} */
227    @Override
228    public void shutdown() throws ResourceException {
229        cleanup();
230    }
231
232    /**
233     * Create a csv audit log entry.
234     * {@inheritDoc}
235     */
236    @Override
237    public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
238        try {
239            checkTopic(topic);
240            publishEventWithRetry(topic, event);
241            return newResourceResponse(
242                    event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
243        } catch (ResourceException e) {
244            return e.asPromise();
245        }
246    }
247
248    private void checkTopic(String topic) throws ResourceException {
249        final JsonValue auditEventProperties = getAuditEventProperties(eventTopicsMetaData.getSchema(topic));
250        if (auditEventProperties == null || auditEventProperties.isNull()) {
251            throw new InternalServerErrorException("No audit event properties defined for audit event: " + topic);
252        }
253    }
254
255    /**
256     * Publishes the provided event, and returns the writer used.
257     */
258    private void publishEventWithRetry(final String topic, final JsonValue event)
259                    throws ResourceException {
260        final CsvWriter csvWriter = getWriter(topic);
261        try {
262            writeEvent(topic, csvWriter, event);
263        } catch (IOException ex) {
264            // Re-try once in case the writer stream became closed for some reason
265            logger.debug("IOException while writing ({})", ex.getMessage());
266            CsvWriter newCsvWriter;
267            // An IOException may be thrown if the csvWriter reference we have above was reset by another thread.
268            // Synchronize to ensure that we wait for any reset to complete before proceeding - Otherwise, we may
269            // lose multiple events or have multiple threads attempting to reset the writer.
270            synchronized (this) {
271                // Lookup the current writer directly from the map so we can check if another thread has reset it.
272                newCsvWriter = writers.get(topic);
273                if (newCsvWriter == csvWriter) {
274                    // If both references are the same, the writer hasn't been reset.
275                    newCsvWriter = resetAndReopenWriter(topic, false);
276                    logger.debug("Resetting writer");
277                } else {
278                    logger.debug("Writer reset by another thread");
279                }
280            }
281            try {
282                writeEvent(topic, newCsvWriter, event);
283            } catch (IOException e) {
284                throw new BadRequestException(e);
285            }
286        }
287    }
288
289    /**
290     * Lookup CsvWriter for specified topic.
291     * <br/>
292     * Uses lazy synchronization in case another thread may be resetting the writer. If the writer is still null
293     * after synchronizing then the writer is reset.
294     * <br/>
295     * This method is only intended for use by {@link #publishEventWithRetry(String, JsonValue)}.
296     */
297    private CsvWriter getWriter(String topic) throws BadRequestException {
298        CsvWriter csvWriter = writers.get(topic);
299        if (csvWriter == null) {
300            logger.debug("CSV file writer for {} topic is null; checking for reset by another thread", topic);
301            synchronized (this) {
302                csvWriter = writers.get(topic);
303                if (csvWriter == null) {
304                    logger.debug("CSV file writer for {} topic not reset by another thread; resetting", topic);
305                    csvWriter = resetAndReopenWriter(topic, false);
306                }
307            }
308        }
309        return csvWriter;
310    }
311
312    private CsvWriter writeEvent(final String topic, CsvWriter csvWriter, final JsonValue event)
313                    throws IOException {
314        writeEntry(topic, csvWriter, event);
315        EventBufferingConfiguration bufferConfig = configuration.getBuffering();
316        if (!bufferConfig.isEnabled() || !bufferConfig.isAutoFlush()) {
317            csvWriter.flush();
318        }
319        return csvWriter;
320    }
321
322    private Set<String> getFieldOrder(final String topic, final EventTopicsMetaData eventTopicsMetaData)
323            throws ResourceException {
324        final Set<String> fieldOrder = new LinkedHashSet<>();
325        fieldOrder.addAll(generateJsonPointers(getAuditEventSchema(eventTopicsMetaData.getSchema(topic))));
326        return fieldOrder;
327    }
328
329    private synchronized CsvWriter openWriter(final String topic, final File auditFile) throws IOException {
330        final CsvWriter writer = createCsvWriter(auditFile, topic);
331        writers.put(topic, writer);
332        return writer;
333    }
334
335    private synchronized CsvWriter createCsvWriter(final File auditFile, String topic) throws IOException {
336        String[] headers = buildHeaders(fieldOrderByTopic.get(topic));
337        if (configuration.getSecurity().isEnabled()) {
338            return new SecureCsvWriter(auditFile, headers, csvPreference, configuration, keyStoreHandler, random);
339        } else {
340            return new StandardCsvWriter(auditFile, headers, csvPreference, configuration);
341        }
342    }
343
344    private ICsvMapReader createCsvMapReader(final File auditFile) throws IOException {
345        CsvMapReader csvReader = new CsvMapReader(new FileReader(auditFile), csvPreference);
346
347        if (configuration.getSecurity().isEnabled()) {
348            return new CsvSecureMapReader(csvReader);
349        } else {
350            return csvReader;
351        }
352    }
353
354    private String[] buildHeaders(final Collection<String> fieldOrder) {
355        final String[] headers = new String[fieldOrder.size()];
356        fieldOrder.toArray(headers);
357        for (int i = 0; i < headers.length; i++) {
358            headers[i] = jsonPointerToDotNotation(headers[i]);
359        }
360        return headers;
361    }
362
363    /**
364     * Perform a query on the csv audit log.
365     * {@inheritDoc}
366     */
367    @Override
368    public Promise<QueryResponse, ResourceException> queryEvents(
369            Context context,
370            String topic,
371            QueryRequest query,
372            QueryResourceHandler handler) {
373        try {
374            for (final JsonValue value : getEntries(topic, query.getQueryFilter())) {
375                handler.handleResource(newResourceResponse(value.get(FIELD_CONTENT_ID).asString(), null, value));
376            }
377            return newQueryResponse().asPromise();
378        } catch (Exception e) {
379            return new BadRequestException(e).asPromise();
380        }
381    }
382
383    /**
384     * Read from the csv audit log.
385     * {@inheritDoc}
386     */
387    @Override
388    public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
389        try {
390            final Set<JsonValue> entry = getEntries(topic, QueryFilters.parse("/_id eq \"" + resourceId + "\""));
391            if (entry.isEmpty()) {
392                throw new NotFoundException(topic + " audit log not found");
393            }
394            final JsonValue resource = entry.iterator().next();
395            return newResourceResponse(resource.get(FIELD_CONTENT_ID).asString(), null, resource).asPromise();
396        } catch (ResourceException e) {
397            return e.asPromise();
398        } catch (IOException e) {
399            return new BadRequestException(e).asPromise();
400        }
401    }
402
403    @Override
404    public Promise<ActionResponse, ResourceException> handleAction(
405            Context context, String topic, ActionRequest request) {
406        try {
407            String action = request.getAction();
408            if (topic == null) {
409                return new BadRequestException(format("Topic is required for action %s", action)).asPromise();
410            }
411            if (action.equals(ROTATE_FILE_ACTION_NAME)) {
412                return handleRotateAction(topic).asPromise();
413            }
414            final String error = format("This action is unknown for the CSV handler: %s", action);
415            return new BadRequestException(error).asPromise();
416        } catch (BadRequestException e) {
417            return e.asPromise();
418        }
419    }
420
421    private ActionResponse handleRotateAction(String topic)
422            throws BadRequestException {
423        CsvWriter csvWriter = writers.get(topic);
424        if (csvWriter == null) {
425            logger.debug("Unable to rotate file for topic: {}", topic);
426            throw new BadRequestException("Unable to rotate file for topic: " + topic);
427        }
428        if (configuration.getFileRotation().isRotationEnabled()) {
429            try {
430                if (!csvWriter.forceRotation()) {
431                    throw new BadRequestException("Unable to rotate file for topic: " + topic);
432                }
433            } catch (IOException e) {
434                throw new BadRequestException("Error when rotating file for topic: " + topic, e);
435            }
436        }
437        else {
438            // use a default rotation instead
439            resetAndReopenWriter(topic, true);
440        }
441        return Responses.newActionResponse(json(object(field("rotated", "true"))));
442    }
443
444    private File getAuditLogFile(final String type) {
445        final String prefix = configuration.getSecurity().isEnabled() ? SECURE_CSV_FILENAME_PREFIX : "";
446        return new File(configuration.getLogDirectory(), prefix + type + ".csv");
447    }
448
449    private void writeEntry(final String topic, final CsvWriter csvWriter, final JsonValue obj) throws IOException {
450        Set<String> fieldOrder = fieldOrderByTopic.get(topic);
451        Map<String, String> cells = new HashMap<>(fieldOrder.size());
452        for (String key : fieldOrder) {
453            final String value = JsonValueUtils.extractValueAsString(obj, key);
454            if (value != null && !value.isEmpty()) {
455                cells.put(fieldDotNotationByField.get(key), value);
456            }
457        }
458        csvWriter.writeEvent(cells);
459    }
460
461    private synchronized CsvWriter resetAndReopenWriter(final String topic, boolean forceRotation)
462            throws BadRequestException {
463        closeWriter(topic);
464        try {
465            File auditLogFile = getAuditLogFile(topic);
466            if (forceRotation) {
467                TimeStampFileNamingPolicy namingPolicy = new TimeStampFileNamingPolicy(auditLogFile, null, null);
468                File rotatedFile = namingPolicy.getNextName();
469                if (!auditLogFile.renameTo(rotatedFile)) {
470                    throw new BadRequestException(
471                            format("Unable to rename file %s to %s when rotating", auditLogFile, rotatedFile));
472                }
473            }
474            return openWriter(topic, auditLogFile);
475        } catch (IOException e) {
476            throw new BadRequestException(e);
477        }
478    }
479
480    private synchronized void closeWriter(final String topic) {
481        CsvWriter writerToClose = writers.remove(topic);
482        if (writerToClose != null) {
483            // attempt clean-up close
484            try {
485                writerToClose.close();
486            } catch (Exception ex) {
487                // Debug level as the writer is expected to potentially be invalid
488                logger.debug("File writer close in closeWriter reported failure ", ex);
489            }
490        }
491    }
492
493    /**
494     * Parser the csv file corresponding the the specified audit entry type and returns a set of matching audit entries.
495     *
496     * @param auditEntryType the audit log type
497     * @param queryFilter the query filter to apply to the entries
498     * @return  A audit log entry; null if no entry exists
499     * @throws IOException If unable to get an entry from the CSV file.
500     */
501    private Set<JsonValue> getEntries(final String auditEntryType, QueryFilter<JsonPointer> queryFilter)
502            throws IOException {
503        final File auditFile = getAuditLogFile(auditEntryType);
504        final Set<JsonValue> results = new HashSet<>();
505        if (queryFilter == null) {
506            queryFilter = QueryFilter.alwaysTrue();
507        }
508        if (auditFile.exists()) {
509            try (ICsvMapReader reader = createCsvMapReader(auditFile)) {
510                // the header elements are used to map the values to the bean (names must match)
511                final String[] header = convertDotNotationToSlashes(reader.getHeader(true));
512                final CellProcessor[] processors = createCellProcessors(auditEntryType, header);
513                Map<String, Object> entry;
514                while ((entry = reader.read(header, processors)) != null) {
515                    entry = convertDotNotationToSlashes(entry);
516                    final JsonValue jsonEntry = expand(entry);
517                    if (queryFilter.accept(JSONVALUE_FILTER_VISITOR, jsonEntry)) {
518                        results.add(jsonEntry);
519                    }
520                }
521
522            }
523        }
524        return results;
525    }
526
527    private CellProcessor[] createCellProcessors(final String auditEntryType, final String[] headers)
528            throws ResourceException {
529        final List<CellProcessor> cellProcessors = new ArrayList<>();
530        final JsonValue auditEvent = eventTopicsMetaData.getSchema(auditEntryType);
531
532        for (String header: headers) {
533            final String propertyType = getPropertyType(auditEvent, new JsonPointer(header));
534            if ((propertyType.equals(OBJECT_TYPE) || propertyType.equals(ARRAY_TYPE))) {
535                cellProcessors.add(new Optional(new ParseJsonValue()));
536            } else {
537                cellProcessors.add(new Optional());
538            }
539        }
540
541        return cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
542    }
543
544    /**
545     * CellProcessor for parsing JsonValue objects from CSV file.
546     */
547    public class ParseJsonValue implements CellProcessor {
548
549        @Override
550        public Object execute(final Object value, final CsvContext context) {
551            JsonValue jv = null;
552            // Check if value is JSON object
553            if (((String) value).startsWith("{") && ((String) value).endsWith("}")) {
554                try {
555                    jv = new JsonValue(mapper.readValue((String) value, Map.class));
556                } catch (Exception e) {
557                    logger.debug("Error parsing JSON string: " + e.getMessage());
558                }
559            } else if (((String) value).startsWith("[") && ((String) value).endsWith("]")) {
560                try {
561                    jv = new JsonValue(mapper.readValue((String) value, List.class));
562                } catch (Exception e) {
563                    logger.debug("Error parsing JSON string: " + e.getMessage());
564                }
565            }
566            if (jv == null) {
567                return value;
568            }
569            return jv.getObject();
570        }
571
572    }
573
574    private synchronized void cleanup() throws ResourceException {
575        try {
576            for (CsvWriter csvWriter : writers.values()) {
577                if (csvWriter != null) {
578                    csvWriter.flush();
579                    csvWriter.close();
580                }
581            }
582        } catch (IOException e) {
583            logger.error("Unable to close filewriters during {} cleanup", this.getClass().getName(), e);
584            throw new InternalServerErrorException(
585                    "Unable to close filewriters during " + this.getClass().getName() + " cleanup", e);
586        }
587    }
588
589    private Map<String, Object> convertDotNotationToSlashes(final Map<String, Object> entries) {
590        final Map<String, Object> newEntry = new LinkedHashMap<>();
591        for (Map.Entry<String, Object> entry : entries.entrySet()) {
592            final String key = dotNotationToJsonPointer(entry.getKey());
593            newEntry.put(key, entry.getValue());
594        }
595        return newEntry;
596    }
597
598    private String[] convertDotNotationToSlashes(final String[] entries) {
599        String[] result = new String[entries.length];
600        for (int i = 0; i < entries.length; i++) {
601            result[i] = dotNotationToJsonPointer(entries[i]);
602        }
603        return result;
604    }
605
606}