001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015-2016 ForgeRock AS.
015 */
016package org.forgerock.audit.handlers.jdbc;
017
018import static org.forgerock.json.JsonValue.object;
019import static org.forgerock.json.resource.Responses.newQueryResponse;
020import static org.forgerock.json.resource.Responses.newResourceResponse;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Map;
025import javax.inject.Inject;
026import javax.sql.DataSource;
027
028import org.forgerock.audit.Audit;
029import org.forgerock.audit.AuditException;
030import org.forgerock.audit.events.AuditEvent;
031import org.forgerock.audit.events.AuditEventHelper;
032import org.forgerock.audit.events.EventTopicsMetaData;
033import org.forgerock.audit.events.handlers.AuditEventHandler;
034import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
035import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
036import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
037import org.forgerock.http.util.Json;
038import org.forgerock.json.JsonPointer;
039import org.forgerock.json.JsonValue;
040import org.forgerock.json.resource.CountPolicy;
041import org.forgerock.json.resource.InternalServerErrorException;
042import org.forgerock.json.resource.NotFoundException;
043import org.forgerock.json.resource.QueryRequest;
044import org.forgerock.json.resource.QueryResourceHandler;
045import org.forgerock.json.resource.QueryResponse;
046import org.forgerock.json.resource.ResourceException;
047import org.forgerock.json.resource.ResourceResponse;
048import org.forgerock.services.context.Context;
049import org.forgerock.util.promise.Promise;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import com.zaxxer.hikari.HikariConfig;
054import com.zaxxer.hikari.HikariDataSource;
055
056import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
057
058/**
059 * Implements a {@link AuditEventHandler} to write {@link AuditEvent}s to a JDBC repository.
060 **/
061public class JdbcAuditEventHandler extends AuditEventHandlerBase {
062
063    private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
064    /** The name used for a MySQL database. */
065    public static final String MYSQL = "mysql";
066    /** The name used for an H2 database. */
067    public static final String H2 = "h2";
068    /** The name used for an Oracle database. */
069    public static final String ORACLE = "oracle";
070
071    private final JdbcAuditEventHandlerConfiguration configuration;
072    private DataSource dataSource;
073    private DatabaseStatementProvider databaseStatementProvider;
074    private boolean sharedDataSource;
075    private JdbcAuditEventExecutor jdbcAuditEventExecutor;
076
077    /**
078     * Create a new JdbcAuditEventHandler instance.
079     *
080     * @param configuration
081     *          Configuration parameters that can be adjusted by system administrators.
082     * @param eventTopicsMetaData
083     *          Meta-data for all audit event topics.
084     * @param dataSource
085     *          Connection pool. If this parameter is null, then a Hikari data source will be created.
086     */
087    @Inject
088    public JdbcAuditEventHandler(
089            final JdbcAuditEventHandlerConfiguration configuration,
090            final EventTopicsMetaData eventTopicsMetaData,
091            @Audit final DataSource dataSource) {
092        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
093        this.configuration = configuration;
094        this.dataSource = dataSource;
095    }
096
097    /**
098     * {@inheritDoc}
099     */
100    @Override
101    public void startup() throws ResourceException {
102        if (dataSource != null) {
103            sharedDataSource = true;
104        } else {
105            logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
106            sharedDataSource = false;
107            dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
108        }
109        databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
110        final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
111        final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
112        if (bufferConfig.isEnabled()) {
113            this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
114                    bufferConfig.getMaxSize(),
115                    bufferConfig.isAutoFlush(),
116                    jdbcAuditEventExecutor,
117                    POLLING_INTERVAL,
118                    bufferConfig.getWriterThreads(),
119                    bufferConfig.getMaxBatchedEvents(),
120                    dataSource);
121        } else {
122            this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
123        }
124    }
125
126    /**
127     * {@inheritDoc}
128     */
129    @Override
130    public void shutdown() throws ResourceException {
131        if (!sharedDataSource && dataSource instanceof HikariDataSource) {
132            ((HikariDataSource) dataSource).close();
133        }
134        jdbcAuditEventExecutor.close();
135    }
136
137    /**
138     * {@inheritDoc}
139     */
140    @Override
141    public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
142        try {
143            final TableMapping mapping = getTableMapping(topic);
144            final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
145                    event, mapping, eventTopicsMetaData.getSchema(topic));
146            jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
147        } catch (AuditException e) {
148            final String error = String.format("Unable to create audit entry for %s", topic);
149            logger.error(error, e);
150            return new InternalServerErrorException(error, e).asPromise();
151        }
152        return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
153    }
154
155    /**
156     * {@inheritDoc}
157     */
158    @Override
159    public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
160            final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
161        final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
162        try {
163            logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
164                    queryRequest.getQueryFilter());
165
166            final TableMapping mapping = getTableMapping(topic);
167            final List<Map<String, Object>> results =
168                    jdbcAuditEventExecutor.queryAuditEvent(
169                            databaseStatementProvider.buildQueryEvent(
170                                    mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
171
172            for (Map<String, Object> entry : results) {
173                final JsonValue result = processEntry(entry, mapping, topic);
174                queryResourceHandler.handleResource(
175                        newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
176            }
177            return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
178                            CountPolicy.EXACT, results.size()).asPromise();
179        } catch (AuditException e) {
180            final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
181            logger.error(error, e);
182            return new InternalServerErrorException(error, e).asPromise();
183        }
184    }
185
186    @Override
187    public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
188        JsonValue result;
189        try {
190            logger.debug("Read called for audit event {} with id {}", topic, resourceId);
191
192            final TableMapping mapping = getTableMapping(topic);
193            final List<Map<String, Object>> results =
194                    jdbcAuditEventExecutor.readAuditEvent(
195                            databaseStatementProvider.buildReadEvent(
196                                    mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
197
198            if (results.isEmpty()) {
199                return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
200            }
201            result = processEntry(results.get(0), mapping, topic);
202        } catch (AuditException e) {
203            final String error = String.format("Unable to read audit entry for %s", topic);
204            logger.error(error, e);
205            return new InternalServerErrorException(error, e).asPromise();
206        }
207        return newResourceResponse(resourceId, null, result).asPromise();
208    }
209
210    private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
211        for (TableMapping tableMapping : configuration.getTableMappings()) {
212            if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
213                return tableMapping;
214            }
215        }
216        throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
217    }
218
219    private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
220            final String auditEventTopic) throws AuditException {
221        final JsonValue result = JsonValue.json(object());
222        try {
223            for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
224                final Object value = sqlResult.get(entry.getValue().toLowerCase());
225                if (value != null) {
226                    final JsonPointer field = new JsonPointer(entry.getKey());
227                    final String fieldType =
228                            AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
229                    if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
230                            || AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
231                        // parse stringified json
232                        result.putPermissive(field, Json.readJson((String) value));
233                    } else {
234                        // value doesn't need parsing
235                        result.putPermissive(field, value);
236                    }
237                }
238            }
239        } catch (IOException e) {
240            logger.error("Unable to process retrieved entry", e);
241            throw new AuditException("Unable to process retrieved entry", e);
242        }
243        return result;
244    }
245
246    private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
247        final HikariConfig hikariConfig = new HikariConfig();
248        hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
249        hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
250        hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
251        hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
252        hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
253        hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
254        if (!isBlank(connectionPool.getJdbcUrl())) {
255            hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
256        }
257        if (!isBlank(connectionPool.getDataSourceClassName())) {
258            hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
259        }
260        if (!isBlank(connectionPool.getUsername())) {
261            hikariConfig.setUsername(connectionPool.getUsername());
262        }
263        if (!isBlank(connectionPool.getPassword())) {
264            hikariConfig.setPassword(connectionPool.getPassword());
265        }
266        if (!isBlank(connectionPool.getPoolName())) {
267            hikariConfig.setPoolName(connectionPool.getPoolName());
268        }
269        if (!isBlank(connectionPool.getDriverClassName())) {
270            hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
271        }
272        return hikariConfig;
273    }
274
275    private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
276        switch (databaseName) {
277        case MYSQL:
278        case H2:
279            return new GenericDatabaseStatementProvider();
280        case ORACLE:
281            return new OracleDatabaseStatementProvider();
282        default:
283            logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
284            return new GenericDatabaseStatementProvider();
285        }
286    }
287
288    private static boolean isBlank(CharSequence charSeq) {
289        if (charSeq == null) {
290            return true;
291        }
292        final int length = charSeq.length();
293        if (length == 0) {
294            return true;
295        }
296        for (int i = 0; i < length; i++) {
297            if (!Character.isWhitespace(charSeq.charAt(i))) {
298                return false;
299            }
300        }
301        return true;
302    }
303}