001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2016 ForgeRock AS.
015 */
016package org.forgerock.audit.handlers.elasticsearch;
017
018import static org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration;
019import static org.forgerock.audit.handlers.elasticsearch.ElasticsearchUtil.OBJECT_MAPPER;
020import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
021import static org.forgerock.json.JsonValue.field;
022import static org.forgerock.json.JsonValue.json;
023import static org.forgerock.json.JsonValue.object;
024import static org.forgerock.json.resource.ResourceException.newResourceException;
025import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
026import static org.forgerock.json.resource.Responses.newQueryResponse;
027import static org.forgerock.json.resource.Responses.newResourceResponse;
028
029import java.io.IOException;
030import java.net.URISyntaxException;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.concurrent.ExecutionException;
034
035import org.forgerock.audit.Audit;
036import org.forgerock.audit.events.EventTopicsMetaData;
037import org.forgerock.audit.events.handlers.AuditEventHandler;
038import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
039import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration;
040import org.forgerock.http.Client;
041import org.forgerock.http.HttpApplicationException;
042import org.forgerock.http.apache.async.AsyncHttpClientProvider;
043import org.forgerock.http.handler.HttpClientHandler;
044import org.forgerock.http.header.ContentTypeHeader;
045import org.forgerock.http.protocol.Request;
046import org.forgerock.http.protocol.Response;
047import org.forgerock.http.protocol.Responses;
048import org.forgerock.http.spi.Loader;
049import org.forgerock.json.JsonValue;
050import org.forgerock.json.resource.CountPolicy;
051import org.forgerock.json.resource.InternalServerErrorException;
052import org.forgerock.json.resource.NotFoundException;
053import org.forgerock.json.resource.QueryRequest;
054import org.forgerock.json.resource.QueryResourceHandler;
055import org.forgerock.json.resource.QueryResponse;
056import org.forgerock.json.resource.ResourceException;
057import org.forgerock.json.resource.ResourceResponse;
058import org.forgerock.json.resource.ServiceUnavailableException;
059import org.forgerock.services.context.Context;
060import org.forgerock.util.Function;
061import org.forgerock.util.Options;
062import org.forgerock.util.Reject;
063import org.forgerock.util.encode.Base64;
064import org.forgerock.util.promise.Promise;
065import org.forgerock.util.time.Duration;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * {@link AuditEventHandler} for Elasticsearch.
071 */
072public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements
073        ElasticsearchBatchAuditEventHandler {
074
075    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAuditEventHandler.class);
076    private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR =
077            new ElasticsearchQueryFilterVisitor();
078
079    private static final String QUERY = "query";
080    private static final String GET = "GET";
081    private static final String SEARCH = "/_search";
082    private static final String BULK = "/_bulk";
083    private static final String HITS = "hits";
084    private static final String SOURCE = "_source";
085    private static final int DEFAULT_PAGE_SIZE = 10;
086    private static final String TOTAL = "total";
087    private static final String PUT = "PUT";
088    private static final String POST = "POST";
089
090    /**
091     * Average number of characters, per event, for batch indexing via Elasticsearch Bulk API. This value
092     * is used to initialize the size of buffers, but if the value is too low, the buffers will automatically resize
093     * as needed.
094     */
095    private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
096
097    /**
098     * The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes events in the batch queue on shutdown or
099     * configuration change.
100     */
101    private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
102    private static final int DEFAULT_OFFSET = 0;
103
104    private final String indexName;
105    private final String basicAuthHeaderValue;
106    private final String baseUri;
107    private final String bulkUri;
108    private final ElasticsearchAuditEventHandlerConfiguration configuration;
109    private final Client client;
110    private final ElasticsearchBatchIndexer batchIndexer;
111    private final HttpClientHandler defaultHttpClientHandler;
112
113    /**
114     * Create a new {@code ElasticsearchAuditEventHandler} instance.
115     *
116     * @param configuration Configuration parameters that can be adjusted by system administrators.
117     * @param eventTopicsMetaData Meta-data for all audit event topics.
118     * @param client HTTP client or {@code null} to use default client.
119     */
120    public ElasticsearchAuditEventHandler(
121            final ElasticsearchAuditEventHandlerConfiguration configuration,
122            final EventTopicsMetaData eventTopicsMetaData,
123            @Audit final Client client) {
124        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
125        this.configuration = Reject.checkNotNull(configuration);
126        if (client == null) {
127            this.defaultHttpClientHandler = defaultHttpClientHandler();
128            this.client = new Client(defaultHttpClientHandler);
129        } else {
130            this.defaultHttpClientHandler = null;
131            this.client = client;
132        }
133        indexName = configuration.getIndexMapping().getIndexName();
134        basicAuthHeaderValue = buildBasicAuthHeaderValue();
135        baseUri = buildBaseUri();
136        bulkUri = buildBulkUri();
137
138        final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
139        if (bufferConfig.isEnabled()) {
140            final Duration writeInterval =
141                    bufferConfig.getWriteInterval() == null || bufferConfig.getWriteInterval().isEmpty()
142                            ? null
143                            : Duration.duration(bufferConfig.getWriteInterval());
144            batchIndexer = new ElasticsearchBatchIndexer(bufferConfig.getMaxSize(),
145                    writeInterval, bufferConfig.getMaxBatchedEvents(),
146                    BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE, ALWAYS_FLUSH_BATCH_QUEUE, this);
147        } else {
148            batchIndexer = null;
149        }
150    }
151
152    @Override
153    public void startup() throws ResourceException {
154        if (batchIndexer != null) {
155            batchIndexer.startup();
156        }
157    }
158
159    @Override
160    public void shutdown() throws ResourceException {
161        if (batchIndexer != null) {
162            batchIndexer.shutdown();
163        }
164        if (defaultHttpClientHandler != null) {
165            try {
166                defaultHttpClientHandler.close();
167            } catch (IOException e) {
168                throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
169                        "An error occurred while closing the default HTTP client handler", e);
170            }
171        }
172    }
173
174    /**
175     * Queries the Elasticsearch
176     * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html">Search API</a> for
177     * audit events.
178     *
179     * {@inheritDoc}
180     */
181    @Override
182    public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
183           final QueryRequest query, final QueryResourceHandler handler) {
184        final int pageSize = query.getPageSize() <= 0 ? DEFAULT_PAGE_SIZE : query.getPageSize();
185        // set the offset to either first the offset provided, or second the paged result cookie value, or finally 0
186        final int offset;
187        if (query.getPagedResultsOffset() != 0) {
188            offset = query.getPagedResultsOffset();
189        } else if (query.getPagedResultsCookie() != null) {
190            offset = Integer.valueOf(query.getPagedResultsCookie());
191        } else {
192            offset = DEFAULT_OFFSET;
193        }
194
195        final JsonValue payload =
196                json(object(field(
197                        QUERY, query.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null).getObject())));
198        try {
199            final Request request = createRequest(GET, buildSearchUri(topic, pageSize, offset), payload.getObject());
200            return client.send(request).then(new Function<Response, QueryResponse, ResourceException>() {
201                    @Override
202                    public QueryResponse apply(Response response) throws ResourceException {
203                        if (!response.getStatus().isSuccessful()) {
204                            final String message = "Elasticsearch response (" + indexName + "/" + topic + SEARCH + "): "
205                                    + response.getEntity();
206                            throw newResourceException(response.getStatus().getCode(), message);
207                        }
208                        try {
209                            JsonValue events = json(response.getEntity().getJson());
210                            for (JsonValue event : events.get(HITS).get(HITS)) {
211                                handler.handleResource(
212                                        newResourceResponse(event.get(FIELD_CONTENT_ID).asString(), null,
213                                                ElasticsearchUtil.denormalizeJson(event.get(SOURCE))));
214                            }
215                            final int totalResults = events.get(HITS).get(TOTAL).asInteger();
216                            final String pagedResultsCookie = (pageSize + offset) >= totalResults
217                                    ? null
218                                    : Integer.toString(pageSize + offset);
219                            return newQueryResponse(pagedResultsCookie,
220                                    CountPolicy.EXACT,
221                                    totalResults);
222                        } catch (IOException e) {
223                            throw new InternalServerErrorException(e.getMessage(), e);
224                        }
225                    }
226            },
227                    Responses.<QueryResponse, ResourceException>noopExceptionFunction());
228        } catch (URISyntaxException e) {
229            return new InternalServerErrorException(e.getMessage(), e).asPromise();
230        }
231    }
232
233    @Override
234    public Promise<ResourceResponse, ResourceException> readEvent(final Context context, final String topic,
235            final String resourceId) {
236        final Request request;
237        try {
238            request = createRequest(GET, buildEventUri(topic, resourceId), null);
239        } catch (Exception e) {
240            final String error = String.format("Unable to read audit entry for topic=%s, _id=%s", topic, resourceId);
241            LOGGER.error(error, e);
242            return new InternalServerErrorException(error, e).asPromise();
243        }
244
245        return client.send(request).then(new Function<Response, ResourceResponse, ResourceException>() {
246                @Override
247                public ResourceResponse apply(Response response) throws ResourceException {
248                    if (!response.getStatus().isSuccessful()) {
249                        throw resourceException(indexName, topic, resourceId, response);
250                    }
251
252                    try {
253                        // the original audit JSON is under _source, and we also add back the _id
254                        JsonValue jsonValue = json(response.getEntity().getJson());
255                        jsonValue = ElasticsearchUtil.denormalizeJson(jsonValue.get(SOURCE));
256                        jsonValue.put(FIELD_CONTENT_ID, resourceId);
257                        return newResourceResponse(resourceId, null, jsonValue);
258                    } catch (IOException e) {
259                        throw new InternalServerErrorException(e.getMessage(), e);
260                    }
261                }
262        }, Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
263    }
264
265    @Override
266    public Promise<ResourceResponse, ResourceException> publishEvent(final Context context, final String topic,
267            final JsonValue event) {
268        if (batchIndexer == null) {
269            return publishSingleEvent(topic, event);
270        } else {
271            if (!batchIndexer.offer(topic, event)) {
272                return new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event "
273                        + indexName + "/" + topic + "/" + event.get("_id").asString()).asPromise();
274            }
275            return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
276                    event).asPromise();
277        }
278    }
279
280    /**
281     * Publishes a single event to the provided topic.
282     *
283     * @param topic The topic where to publish the event.
284     * @param event The event to publish.
285     * @return a promise with either a response or an exception
286     */
287    protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String topic,
288            final JsonValue event) {
289        // _id is a protected Elasticsearch field, so read it and remove it
290        final String resourceId = event.get(FIELD_CONTENT_ID).asString();
291        event.remove(FIELD_CONTENT_ID);
292
293        try {
294            final JsonValue normalizedEvent = ElasticsearchUtil.normalizeJson(event);
295            final String jsonPayload = OBJECT_MAPPER.writeValueAsString(normalizedEvent.getObject());
296            event.put(FIELD_CONTENT_ID, resourceId);
297
298            final Request request = createRequest(PUT, buildEventUri(topic, resourceId), jsonPayload);
299
300            return client.send(request).then(new Function<Response, ResourceResponse, ResourceException>() {
301                    @Override
302                    public ResourceResponse apply(Response response) throws ResourceException {
303                        if (!response.getStatus().isSuccessful()) {
304                            throw resourceException(indexName, topic, resourceId, response);
305                        }
306                        return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
307                                event);
308                    }
309            }, Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
310        } catch (Exception e) {
311            final String error = String.format("Unable to create audit entry for topic=%s, _id=%s", topic, resourceId);
312            LOGGER.error(error, e);
313            return new InternalServerErrorException(error, e).asPromise();
314        }
315    }
316
317    /**
318     * Adds an audit event to an Elasticsearch Bulk API payload.
319     *
320     * @param topic Event topic
321     * @param event Event JSON payload
322     * @param payload Elasticsearch Bulk API payload
323     * @throws BatchException indicates failure to add-to-batch
324     */
325    @Override
326    public void addToBatch(final String topic, final JsonValue event, final StringBuilder payload)
327            throws BatchException {
328        try {
329            // _id is a protected Elasticsearch field
330            final String resourceId = event.get(FIELD_CONTENT_ID).asString();
331            event.remove(FIELD_CONTENT_ID);
332            final JsonValue normalizedEvent = ElasticsearchUtil.normalizeJson(event);
333            final String jsonPayload = OBJECT_MAPPER.writeValueAsString(normalizedEvent.getObject());
334
335            // newlines have special significance in the Bulk API
336            // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
337            payload.append("{ \"index\" : { \"_type\" : ")
338                    .append(OBJECT_MAPPER.writeValueAsString(topic))
339                    .append(", \"_id\" : ")
340                    .append(OBJECT_MAPPER.writeValueAsString(resourceId))
341                    .append(" } }\n")
342                    .append(jsonPayload)
343                    .append('\n');
344        } catch (IOException e) {
345            throw new BatchException("Unexpected error while adding to batch", e);
346        }
347    }
348
349    /**
350     * Publishes a <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API</a>
351     * payload to Elasticsearch.
352     *
353     * @param payload Elasticsearch Bulk API payload
354     * @throws BatchException indicates (full or partial) failure to publish batch
355     */
356    @Override
357    public void publishBatch(final String payload) throws BatchException {
358        try {
359            final Request request = createRequest(POST, buildBulkUri(), payload);
360
361            final Response response = client.send(request).get();
362            if (!response.getStatus().isSuccessful()) {
363                throw new BatchException("Elasticsearch batch index failed: " + response.getEntity());
364            } else {
365                final JsonValue responseJson = json(response.getEntity().getJson());
366                if (responseJson.get("errors").asBoolean()) {
367                    // one or more batch index operations failed, so log failures
368                    final JsonValue items = responseJson.get("items");
369                    final int n = items.size();
370                    final List<Object> failureItems = new ArrayList<>(n);
371                    for (int i = 0; i < n; ++i) {
372                        final JsonValue item = items.get(i).get("index");
373                        final Integer status = item.get("status").asInteger();
374                        if (status >= 400) {
375                            failureItems.add(item);
376                        }
377                    }
378                    final String message = "One or more Elasticsearch batch index entries failed: "
379                            + OBJECT_MAPPER.writeValueAsString(failureItems);
380                    throw new BatchException(message);
381                }
382            }
383        } catch (IOException | URISyntaxException | ExecutionException | InterruptedException e) {
384            throw new BatchException("Unexpected error while publishing batch", e);
385        }
386    }
387
388    /**
389     * Builds a basic authentication header-value, if username and password are provided in configuration.
390     *
391     * @return Basic authentication header-value or {@code null} if not configured
392     */
393    protected String buildBasicAuthHeaderValue() {
394        if (basicAuthHeaderValue != null) {
395            return basicAuthHeaderValue;
396        }
397        final ConnectionConfiguration connection = configuration.getConnection();
398        if (connection.getUsername() == null || connection.getUsername().isEmpty()
399                || connection.getPassword() == null || connection.getPassword().isEmpty()) {
400            return null;
401        }
402        final String credentials = connection.getUsername() + ":" + connection.getPassword();
403        return "Basic " + Base64.encode(credentials.getBytes());
404    }
405
406    /**
407     * Builds an Elasticsearch API URI for operating on a single event (e.g., index, get, etc.).
408     *
409     * @param topic Audit topic
410     * @param eventId Event ID
411     * @return URI
412     */
413    protected String buildEventUri(final String topic, final String eventId) {
414        return buildBaseUri() + "/" + topic + "/" + eventId;
415    }
416
417    /**
418     * Builds an Elasticsearch API URI for Bulk API.
419     *
420     * @return URI
421     */
422    protected String buildBulkUri() {
423        if (bulkUri != null) {
424            return bulkUri;
425        }
426        return buildBaseUri() + BULK;
427    }
428
429    /**
430     * Builds an Elasticsearch API URI for Search API.
431     *
432     * @param topic The audit topic to search.
433     * @param pageSize The number of results to return.
434     * @param offset The number of results to skip.
435     * @return The search uri.
436     */
437    protected String buildSearchUri(final String topic, final int pageSize, final int offset) {
438        return buildBaseUri() + "/" + topic + SEARCH + "?size=" + pageSize + "&from=" + offset;
439    }
440
441    /**
442     * Builds an Elasticsearch API base URI. The format is,
443     * <pre>http[s]://host:port/indexName</pre>
444     *
445     * @return Base URI
446     */
447    protected String buildBaseUri() {
448        if (baseUri != null) {
449            return baseUri;
450        }
451        final ConnectionConfiguration connection = configuration.getConnection();
452        return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort()
453                + "/" + indexName;
454    }
455
456    /**
457     * Gets an {@code Exception} {@link Promise} containing an Elasticsearch HTTP response status and payload.
458     *
459     * @param indexName Index name
460     * @param topic Event topic
461     * @param resourceId Event ID
462     * @param response HTTP response
463     * @return {@code Exception} {@link Promise}
464     */
465    protected static ResourceException resourceException(
466            final String indexName, final String topic, final String resourceId, final Response response) {
467        if (response.getStatus().getCode() == ResourceException.NOT_FOUND) {
468            return new NotFoundException("Object " + resourceId + " not found in " + indexName + "/" + topic);
469        }
470        final String message = "Elasticsearch response (" + indexName + "/" + topic + "/" + resourceId + "): "
471                + response.getEntity();
472        return newResourceException(response.getStatus().getCode(), message);
473    }
474
475    private Request createRequest(final String method, final String uri, final Object payload)
476            throws URISyntaxException {
477        final Request request = new Request();
478        request.setMethod(method);
479        request.setUri(uri);
480        if (payload != null) {
481            request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
482            request.setEntity(payload);
483        }
484        if (basicAuthHeaderValue != null) {
485            request.getHeaders().put("Authorization", basicAuthHeaderValue);
486        }
487        return request;
488    }
489
490    private HttpClientHandler defaultHttpClientHandler() {
491        try {
492            return new HttpClientHandler(
493                    Options.defaultOptions()
494                            .set(OPTION_LOADER, new Loader() {
495                                @Override
496                                public <S> S load(Class<S> service, Options options) {
497                                    return service.cast(new AsyncHttpClientProvider());
498                                }
499                            }));
500        } catch (HttpApplicationException e) {
501            throw new RuntimeException("Error while building default HTTP Client", e);
502        }
503    }
504}