001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2016 ForgeRock AS. 015 */ 016package org.forgerock.audit.handlers.elasticsearch; 017 018import static org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration; 019import static org.forgerock.audit.handlers.elasticsearch.ElasticsearchUtil.OBJECT_MAPPER; 020import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER; 021import static org.forgerock.json.JsonValue.field; 022import static org.forgerock.json.JsonValue.json; 023import static org.forgerock.json.JsonValue.object; 024import static org.forgerock.json.resource.ResourceException.newResourceException; 025import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID; 026import static org.forgerock.json.resource.Responses.newQueryResponse; 027import static org.forgerock.json.resource.Responses.newResourceResponse; 028 029import java.io.IOException; 030import java.net.URISyntaxException; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.concurrent.ExecutionException; 034 035import org.forgerock.audit.Audit; 036import org.forgerock.audit.events.EventTopicsMetaData; 037import org.forgerock.audit.events.handlers.AuditEventHandler; 038import org.forgerock.audit.events.handlers.AuditEventHandlerBase; 039import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration; 040import org.forgerock.http.Client; 041import org.forgerock.http.HttpApplicationException; 042import org.forgerock.http.apache.async.AsyncHttpClientProvider; 043import org.forgerock.http.handler.HttpClientHandler; 044import org.forgerock.http.header.ContentTypeHeader; 045import org.forgerock.http.protocol.Request; 046import org.forgerock.http.protocol.Response; 047import org.forgerock.http.protocol.Responses; 048import org.forgerock.http.spi.Loader; 049import org.forgerock.json.JsonValue; 050import org.forgerock.json.resource.CountPolicy; 051import org.forgerock.json.resource.InternalServerErrorException; 052import org.forgerock.json.resource.NotFoundException; 053import org.forgerock.json.resource.QueryRequest; 054import org.forgerock.json.resource.QueryResourceHandler; 055import org.forgerock.json.resource.QueryResponse; 056import org.forgerock.json.resource.ResourceException; 057import org.forgerock.json.resource.ResourceResponse; 058import org.forgerock.json.resource.ServiceUnavailableException; 059import org.forgerock.services.context.Context; 060import org.forgerock.util.Function; 061import org.forgerock.util.Options; 062import org.forgerock.util.Reject; 063import org.forgerock.util.encode.Base64; 064import org.forgerock.util.promise.Promise; 065import org.forgerock.util.time.Duration; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * {@link AuditEventHandler} for Elasticsearch. 071 */ 072public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements 073 ElasticsearchBatchAuditEventHandler { 074 075 private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAuditEventHandler.class); 076 private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR = 077 new ElasticsearchQueryFilterVisitor(); 078 079 private static final String QUERY = "query"; 080 private static final String GET = "GET"; 081 private static final String SEARCH = "/_search"; 082 private static final String BULK = "/_bulk"; 083 private static final String HITS = "hits"; 084 private static final String SOURCE = "_source"; 085 private static final int DEFAULT_PAGE_SIZE = 10; 086 private static final String TOTAL = "total"; 087 private static final String PUT = "PUT"; 088 private static final String POST = "POST"; 089 090 /** 091 * Average number of characters, per event, for batch indexing via Elasticsearch Bulk API. This value 092 * is used to initialize the size of buffers, but if the value is too low, the buffers will automatically resize 093 * as needed. 094 */ 095 private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280; 096 097 /** 098 * The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes events in the batch queue on shutdown or 099 * configuration change. 100 */ 101 private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true; 102 private static final int DEFAULT_OFFSET = 0; 103 104 private final String indexName; 105 private final String basicAuthHeaderValue; 106 private final String baseUri; 107 private final String bulkUri; 108 private final ElasticsearchAuditEventHandlerConfiguration configuration; 109 private final Client client; 110 private final ElasticsearchBatchIndexer batchIndexer; 111 private final HttpClientHandler defaultHttpClientHandler; 112 113 /** 114 * Create a new {@code ElasticsearchAuditEventHandler} instance. 115 * 116 * @param configuration Configuration parameters that can be adjusted by system administrators. 117 * @param eventTopicsMetaData Meta-data for all audit event topics. 118 * @param client HTTP client or {@code null} to use default client. 119 */ 120 public ElasticsearchAuditEventHandler( 121 final ElasticsearchAuditEventHandlerConfiguration configuration, 122 final EventTopicsMetaData eventTopicsMetaData, 123 @Audit final Client client) { 124 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled()); 125 this.configuration = Reject.checkNotNull(configuration); 126 if (client == null) { 127 this.defaultHttpClientHandler = defaultHttpClientHandler(); 128 this.client = new Client(defaultHttpClientHandler); 129 } else { 130 this.defaultHttpClientHandler = null; 131 this.client = client; 132 } 133 indexName = configuration.getIndexMapping().getIndexName(); 134 basicAuthHeaderValue = buildBasicAuthHeaderValue(); 135 baseUri = buildBaseUri(); 136 bulkUri = buildBulkUri(); 137 138 final EventBufferingConfiguration bufferConfig = configuration.getBuffering(); 139 if (bufferConfig.isEnabled()) { 140 final Duration writeInterval = 141 bufferConfig.getWriteInterval() == null || bufferConfig.getWriteInterval().isEmpty() 142 ? null 143 : Duration.duration(bufferConfig.getWriteInterval()); 144 batchIndexer = new ElasticsearchBatchIndexer(bufferConfig.getMaxSize(), 145 writeInterval, bufferConfig.getMaxBatchedEvents(), 146 BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE, ALWAYS_FLUSH_BATCH_QUEUE, this); 147 } else { 148 batchIndexer = null; 149 } 150 } 151 152 @Override 153 public void startup() throws ResourceException { 154 if (batchIndexer != null) { 155 batchIndexer.startup(); 156 } 157 } 158 159 @Override 160 public void shutdown() throws ResourceException { 161 if (batchIndexer != null) { 162 batchIndexer.shutdown(); 163 } 164 if (defaultHttpClientHandler != null) { 165 try { 166 defaultHttpClientHandler.close(); 167 } catch (IOException e) { 168 throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR, 169 "An error occurred while closing the default HTTP client handler", e); 170 } 171 } 172 } 173 174 /** 175 * Queries the Elasticsearch 176 * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html">Search API</a> for 177 * audit events. 178 * 179 * {@inheritDoc} 180 */ 181 @Override 182 public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic, 183 final QueryRequest query, final QueryResourceHandler handler) { 184 final int pageSize = query.getPageSize() <= 0 ? DEFAULT_PAGE_SIZE : query.getPageSize(); 185 // set the offset to either first the offset provided, or second the paged result cookie value, or finally 0 186 final int offset; 187 if (query.getPagedResultsOffset() != 0) { 188 offset = query.getPagedResultsOffset(); 189 } else if (query.getPagedResultsCookie() != null) { 190 offset = Integer.valueOf(query.getPagedResultsCookie()); 191 } else { 192 offset = DEFAULT_OFFSET; 193 } 194 195 final JsonValue payload = 196 json(object(field( 197 QUERY, query.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null).getObject()))); 198 try { 199 final Request request = createRequest(GET, buildSearchUri(topic, pageSize, offset), payload.getObject()); 200 return client.send(request).then(new Function<Response, QueryResponse, ResourceException>() { 201 @Override 202 public QueryResponse apply(Response response) throws ResourceException { 203 if (!response.getStatus().isSuccessful()) { 204 final String message = "Elasticsearch response (" + indexName + "/" + topic + SEARCH + "): " 205 + response.getEntity(); 206 throw newResourceException(response.getStatus().getCode(), message); 207 } 208 try { 209 JsonValue events = json(response.getEntity().getJson()); 210 for (JsonValue event : events.get(HITS).get(HITS)) { 211 handler.handleResource( 212 newResourceResponse(event.get(FIELD_CONTENT_ID).asString(), null, 213 ElasticsearchUtil.denormalizeJson(event.get(SOURCE)))); 214 } 215 final int totalResults = events.get(HITS).get(TOTAL).asInteger(); 216 final String pagedResultsCookie = (pageSize + offset) >= totalResults 217 ? null 218 : Integer.toString(pageSize + offset); 219 return newQueryResponse(pagedResultsCookie, 220 CountPolicy.EXACT, 221 totalResults); 222 } catch (IOException e) { 223 throw new InternalServerErrorException(e.getMessage(), e); 224 } 225 } 226 }, 227 Responses.<QueryResponse, ResourceException>noopExceptionFunction()); 228 } catch (URISyntaxException e) { 229 return new InternalServerErrorException(e.getMessage(), e).asPromise(); 230 } 231 } 232 233 @Override 234 public Promise<ResourceResponse, ResourceException> readEvent(final Context context, final String topic, 235 final String resourceId) { 236 final Request request; 237 try { 238 request = createRequest(GET, buildEventUri(topic, resourceId), null); 239 } catch (Exception e) { 240 final String error = String.format("Unable to read audit entry for topic=%s, _id=%s", topic, resourceId); 241 LOGGER.error(error, e); 242 return new InternalServerErrorException(error, e).asPromise(); 243 } 244 245 return client.send(request).then(new Function<Response, ResourceResponse, ResourceException>() { 246 @Override 247 public ResourceResponse apply(Response response) throws ResourceException { 248 if (!response.getStatus().isSuccessful()) { 249 throw resourceException(indexName, topic, resourceId, response); 250 } 251 252 try { 253 // the original audit JSON is under _source, and we also add back the _id 254 JsonValue jsonValue = json(response.getEntity().getJson()); 255 jsonValue = ElasticsearchUtil.denormalizeJson(jsonValue.get(SOURCE)); 256 jsonValue.put(FIELD_CONTENT_ID, resourceId); 257 return newResourceResponse(resourceId, null, jsonValue); 258 } catch (IOException e) { 259 throw new InternalServerErrorException(e.getMessage(), e); 260 } 261 } 262 }, Responses.<ResourceResponse, ResourceException>noopExceptionFunction()); 263 } 264 265 @Override 266 public Promise<ResourceResponse, ResourceException> publishEvent(final Context context, final String topic, 267 final JsonValue event) { 268 if (batchIndexer == null) { 269 return publishSingleEvent(topic, event); 270 } else { 271 if (!batchIndexer.offer(topic, event)) { 272 return new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event " 273 + indexName + "/" + topic + "/" + event.get("_id").asString()).asPromise(); 274 } 275 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, 276 event).asPromise(); 277 } 278 } 279 280 /** 281 * Publishes a single event to the provided topic. 282 * 283 * @param topic The topic where to publish the event. 284 * @param event The event to publish. 285 * @return a promise with either a response or an exception 286 */ 287 protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String topic, 288 final JsonValue event) { 289 // _id is a protected Elasticsearch field, so read it and remove it 290 final String resourceId = event.get(FIELD_CONTENT_ID).asString(); 291 event.remove(FIELD_CONTENT_ID); 292 293 try { 294 final JsonValue normalizedEvent = ElasticsearchUtil.normalizeJson(event); 295 final String jsonPayload = OBJECT_MAPPER.writeValueAsString(normalizedEvent.getObject()); 296 event.put(FIELD_CONTENT_ID, resourceId); 297 298 final Request request = createRequest(PUT, buildEventUri(topic, resourceId), jsonPayload); 299 300 return client.send(request).then(new Function<Response, ResourceResponse, ResourceException>() { 301 @Override 302 public ResourceResponse apply(Response response) throws ResourceException { 303 if (!response.getStatus().isSuccessful()) { 304 throw resourceException(indexName, topic, resourceId, response); 305 } 306 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, 307 event); 308 } 309 }, Responses.<ResourceResponse, ResourceException>noopExceptionFunction()); 310 } catch (Exception e) { 311 final String error = String.format("Unable to create audit entry for topic=%s, _id=%s", topic, resourceId); 312 LOGGER.error(error, e); 313 return new InternalServerErrorException(error, e).asPromise(); 314 } 315 } 316 317 /** 318 * Adds an audit event to an Elasticsearch Bulk API payload. 319 * 320 * @param topic Event topic 321 * @param event Event JSON payload 322 * @param payload Elasticsearch Bulk API payload 323 * @throws BatchException indicates failure to add-to-batch 324 */ 325 @Override 326 public void addToBatch(final String topic, final JsonValue event, final StringBuilder payload) 327 throws BatchException { 328 try { 329 // _id is a protected Elasticsearch field 330 final String resourceId = event.get(FIELD_CONTENT_ID).asString(); 331 event.remove(FIELD_CONTENT_ID); 332 final JsonValue normalizedEvent = ElasticsearchUtil.normalizeJson(event); 333 final String jsonPayload = OBJECT_MAPPER.writeValueAsString(normalizedEvent.getObject()); 334 335 // newlines have special significance in the Bulk API 336 // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 337 payload.append("{ \"index\" : { \"_type\" : ") 338 .append(OBJECT_MAPPER.writeValueAsString(topic)) 339 .append(", \"_id\" : ") 340 .append(OBJECT_MAPPER.writeValueAsString(resourceId)) 341 .append(" } }\n") 342 .append(jsonPayload) 343 .append('\n'); 344 } catch (IOException e) { 345 throw new BatchException("Unexpected error while adding to batch", e); 346 } 347 } 348 349 /** 350 * Publishes a <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API</a> 351 * payload to Elasticsearch. 352 * 353 * @param payload Elasticsearch Bulk API payload 354 * @throws BatchException indicates (full or partial) failure to publish batch 355 */ 356 @Override 357 public void publishBatch(final String payload) throws BatchException { 358 try { 359 final Request request = createRequest(POST, buildBulkUri(), payload); 360 361 final Response response = client.send(request).get(); 362 if (!response.getStatus().isSuccessful()) { 363 throw new BatchException("Elasticsearch batch index failed: " + response.getEntity()); 364 } else { 365 final JsonValue responseJson = json(response.getEntity().getJson()); 366 if (responseJson.get("errors").asBoolean()) { 367 // one or more batch index operations failed, so log failures 368 final JsonValue items = responseJson.get("items"); 369 final int n = items.size(); 370 final List<Object> failureItems = new ArrayList<>(n); 371 for (int i = 0; i < n; ++i) { 372 final JsonValue item = items.get(i).get("index"); 373 final Integer status = item.get("status").asInteger(); 374 if (status >= 400) { 375 failureItems.add(item); 376 } 377 } 378 final String message = "One or more Elasticsearch batch index entries failed: " 379 + OBJECT_MAPPER.writeValueAsString(failureItems); 380 throw new BatchException(message); 381 } 382 } 383 } catch (IOException | URISyntaxException | ExecutionException | InterruptedException e) { 384 throw new BatchException("Unexpected error while publishing batch", e); 385 } 386 } 387 388 /** 389 * Builds a basic authentication header-value, if username and password are provided in configuration. 390 * 391 * @return Basic authentication header-value or {@code null} if not configured 392 */ 393 protected String buildBasicAuthHeaderValue() { 394 if (basicAuthHeaderValue != null) { 395 return basicAuthHeaderValue; 396 } 397 final ConnectionConfiguration connection = configuration.getConnection(); 398 if (connection.getUsername() == null || connection.getUsername().isEmpty() 399 || connection.getPassword() == null || connection.getPassword().isEmpty()) { 400 return null; 401 } 402 final String credentials = connection.getUsername() + ":" + connection.getPassword(); 403 return "Basic " + Base64.encode(credentials.getBytes()); 404 } 405 406 /** 407 * Builds an Elasticsearch API URI for operating on a single event (e.g., index, get, etc.). 408 * 409 * @param topic Audit topic 410 * @param eventId Event ID 411 * @return URI 412 */ 413 protected String buildEventUri(final String topic, final String eventId) { 414 return buildBaseUri() + "/" + topic + "/" + eventId; 415 } 416 417 /** 418 * Builds an Elasticsearch API URI for Bulk API. 419 * 420 * @return URI 421 */ 422 protected String buildBulkUri() { 423 if (bulkUri != null) { 424 return bulkUri; 425 } 426 return buildBaseUri() + BULK; 427 } 428 429 /** 430 * Builds an Elasticsearch API URI for Search API. 431 * 432 * @param topic The audit topic to search. 433 * @param pageSize The number of results to return. 434 * @param offset The number of results to skip. 435 * @return The search uri. 436 */ 437 protected String buildSearchUri(final String topic, final int pageSize, final int offset) { 438 return buildBaseUri() + "/" + topic + SEARCH + "?size=" + pageSize + "&from=" + offset; 439 } 440 441 /** 442 * Builds an Elasticsearch API base URI. The format is, 443 * <pre>http[s]://host:port/indexName</pre> 444 * 445 * @return Base URI 446 */ 447 protected String buildBaseUri() { 448 if (baseUri != null) { 449 return baseUri; 450 } 451 final ConnectionConfiguration connection = configuration.getConnection(); 452 return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort() 453 + "/" + indexName; 454 } 455 456 /** 457 * Gets an {@code Exception} {@link Promise} containing an Elasticsearch HTTP response status and payload. 458 * 459 * @param indexName Index name 460 * @param topic Event topic 461 * @param resourceId Event ID 462 * @param response HTTP response 463 * @return {@code Exception} {@link Promise} 464 */ 465 protected static ResourceException resourceException( 466 final String indexName, final String topic, final String resourceId, final Response response) { 467 if (response.getStatus().getCode() == ResourceException.NOT_FOUND) { 468 return new NotFoundException("Object " + resourceId + " not found in " + indexName + "/" + topic); 469 } 470 final String message = "Elasticsearch response (" + indexName + "/" + topic + "/" + resourceId + "): " 471 + response.getEntity(); 472 return newResourceException(response.getStatus().getCode(), message); 473 } 474 475 private Request createRequest(final String method, final String uri, final Object payload) 476 throws URISyntaxException { 477 final Request request = new Request(); 478 request.setMethod(method); 479 request.setUri(uri); 480 if (payload != null) { 481 request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8"); 482 request.setEntity(payload); 483 } 484 if (basicAuthHeaderValue != null) { 485 request.getHeaders().put("Authorization", basicAuthHeaderValue); 486 } 487 return request; 488 } 489 490 private HttpClientHandler defaultHttpClientHandler() { 491 try { 492 return new HttpClientHandler( 493 Options.defaultOptions() 494 .set(OPTION_LOADER, new Loader() { 495 @Override 496 public <S> S load(Class<S> service, Options options) { 497 return service.cast(new AsyncHttpClientProvider()); 498 } 499 })); 500 } catch (HttpApplicationException e) { 501 throw new RuntimeException("Error while building default HTTP Client", e); 502 } 503 } 504}