001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015 ForgeRock AS.
015 */
016package org.forgerock.audit.handlers.jdbc;
017
018import static org.forgerock.json.JsonValue.object;
019import static org.forgerock.json.resource.Responses.newQueryResponse;
020import static org.forgerock.json.resource.Responses.newResourceResponse;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Map;
025import javax.inject.Inject;
026import javax.sql.DataSource;
027
028import org.forgerock.audit.Audit;
029import org.forgerock.audit.AuditException;
030import org.forgerock.audit.events.AuditEvent;
031import org.forgerock.audit.events.AuditEventHelper;
032import org.forgerock.audit.events.EventTopicsMetaData;
033import org.forgerock.audit.events.handlers.AuditEventHandler;
034import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
035import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
036import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
037import org.forgerock.http.util.Json;
038import org.forgerock.json.JsonPointer;
039import org.forgerock.json.JsonValue;
040import org.forgerock.json.resource.CountPolicy;
041import org.forgerock.json.resource.InternalServerErrorException;
042import org.forgerock.json.resource.NotFoundException;
043import org.forgerock.json.resource.QueryRequest;
044import org.forgerock.json.resource.QueryResourceHandler;
045import org.forgerock.json.resource.QueryResponse;
046import org.forgerock.json.resource.ResourceException;
047import org.forgerock.json.resource.ResourceResponse;
048import org.forgerock.services.context.Context;
049import org.forgerock.util.promise.Promise;
050import org.forgerock.util.time.Duration;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import com.zaxxer.hikari.HikariConfig;
055import com.zaxxer.hikari.HikariDataSource;
056
057/**
058 * Implements a {@link AuditEventHandler} to write {@link AuditEvent}s to a JDBC repository.
059 **/
060public class JdbcAuditEventHandler extends AuditEventHandlerBase {
061
062    private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
063    public static final String MYSQL = "mysql";
064    public static final String H2 = "h2";
065    public static final String ORACLE = "oracle";
066
067    private final JdbcAuditEventHandlerConfiguration configuration;
068    private DataSource dataSource;
069    private DatabaseStatementProvider databaseStatementProvider;
070    private boolean sharedDataSource;
071    private JdbcAuditEventExecutor jdbcAuditEventExecutor;
072
073    /**
074     * Create a new JdbcAuditEventHandler instance.
075     *
076     * @param configuration
077     *          Configuration parameters that can be adjusted by system administrators.
078     * @param eventTopicsMetaData
079     *          Meta-data for all audit event topics.
080     * @param dataSource
081     *          Connection pool. If this parameter is null, then a Hikari data source will be created.
082     */
083    @Inject
084    public JdbcAuditEventHandler(
085            final JdbcAuditEventHandlerConfiguration configuration,
086            final EventTopicsMetaData eventTopicsMetaData,
087            @Audit final DataSource dataSource) {
088        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
089        this.configuration = configuration;
090        this.dataSource = dataSource;
091    }
092
093    /**
094     * {@inheritDoc}
095     */
096    @Override
097    public void startup() throws ResourceException {
098        if (dataSource != null) {
099            sharedDataSource = true;
100        } else {
101            logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
102            sharedDataSource = false;
103            dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
104        }
105        databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
106        final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
107        final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
108        if (bufferConfig.isEnabled()) {
109            this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
110                    bufferConfig.getMaxSize(),
111                    bufferConfig.isAutoFlush(),
112                    jdbcAuditEventExecutor,
113                    Duration.duration(bufferConfig.getWriteInterval()),
114                    bufferConfig.getWriterThreads(),
115                    bufferConfig.getMaxBatchedEvents(),
116                    dataSource);
117        } else {
118            this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
119        }
120    }
121
122    /**
123     * {@inheritDoc}
124     */
125    @Override
126    public void shutdown() throws ResourceException {
127        if (!sharedDataSource && dataSource instanceof HikariDataSource) {
128            ((HikariDataSource) dataSource).close();
129        }
130        jdbcAuditEventExecutor.close();
131    }
132
133    /**
134     * {@inheritDoc}
135     */
136    @Override
137    public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
138        try {
139            final TableMapping mapping = getTableMapping(topic);
140            final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
141                    event, mapping, eventTopicsMetaData.getSchema(topic));
142            jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
143        } catch (AuditException e) {
144            final String error = String.format("Unable to create audit entry for %s", topic);
145            logger.error(error, e);
146            return new InternalServerErrorException(error, e).asPromise();
147        }
148        return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
149    }
150
151    /**
152     * {@inheritDoc}
153     */
154    @Override
155    public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
156            final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
157        final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
158        try {
159            logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
160                    queryRequest.getQueryFilter());
161
162            final TableMapping mapping = getTableMapping(topic);
163            final List<Map<String, Object>> results =
164                    jdbcAuditEventExecutor.queryAuditEvent(
165                            databaseStatementProvider.buildQueryEvent(
166                                    mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
167
168            for (Map<String, Object> entry : results) {
169                final JsonValue result = processEntry(entry, mapping, topic);
170                queryResourceHandler.handleResource(
171                        newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
172            }
173            return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
174                            CountPolicy.EXACT, results.size()).asPromise();
175        } catch (AuditException e) {
176            final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
177            logger.error(error, e);
178            return new InternalServerErrorException(error, e).asPromise();
179        }
180    }
181
182    @Override
183    public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
184        JsonValue result;
185        try {
186            logger.debug("Read called for audit event {} with id {}", topic, resourceId);
187
188            final TableMapping mapping = getTableMapping(topic);
189            final List<Map<String, Object>> results =
190                    jdbcAuditEventExecutor.readAuditEvent(
191                            databaseStatementProvider.buildReadEvent(
192                                    mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
193
194            if (results.isEmpty()) {
195                return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
196            }
197            result = processEntry(results.get(0), mapping, topic);
198        } catch (AuditException e) {
199            final String error = String.format("Unable to read audit entry for %s", topic);
200            logger.error(error, e);
201            return new InternalServerErrorException(error, e).asPromise();
202        }
203        return newResourceResponse(resourceId, null, result).asPromise();
204    }
205
206    private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
207        for (TableMapping tableMapping : configuration.getTableMappings()) {
208            if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
209                return tableMapping;
210            }
211        }
212        throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
213    }
214
215    private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
216            final String auditEventTopic) throws AuditException {
217        final JsonValue result = JsonValue.json(object());
218        try {
219            for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
220                final Object value = sqlResult.get(entry.getValue().toLowerCase());
221                if (value != null) {
222                    final JsonPointer field = new JsonPointer(entry.getKey());
223                    final String fieldType =
224                            AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
225                    if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
226                            || AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
227                        // parse stringified json
228                        result.putPermissive(field, Json.readJson((String) value));
229                    } else {
230                        // value doesn't need parsing
231                        result.putPermissive(field, value);
232                    }
233                }
234            }
235        } catch (IOException e) {
236            logger.error("Unable to process retrieved entry", e);
237            throw new AuditException("Unable to process retrieved entry", e);
238        }
239        return result;
240    }
241
242    private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
243        final HikariConfig hikariConfig = new HikariConfig();
244        hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
245        hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
246        hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
247        hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
248        hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
249        hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
250        if (!isBlank(connectionPool.getJdbcUrl())) {
251            hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
252        }
253        if (!isBlank(connectionPool.getDataSourceClassName())) {
254            hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
255        }
256        if (!isBlank(connectionPool.getUsername())) {
257            hikariConfig.setUsername(connectionPool.getUsername());
258        }
259        if (!isBlank(connectionPool.getPassword())) {
260            hikariConfig.setPassword(connectionPool.getPassword());
261        }
262        if (!isBlank(connectionPool.getPoolName())) {
263            hikariConfig.setPoolName(connectionPool.getPoolName());
264        }
265        if (!isBlank(connectionPool.getDriverClassName())) {
266            hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
267        }
268        return hikariConfig;
269    }
270
271    private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
272        switch (databaseName) {
273            case MYSQL:
274            case H2:
275                return new GenericDatabaseStatementProvider();
276            case ORACLE:
277                return new OracleDatabaseStatementProvider();
278            default:
279                logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
280                return new GenericDatabaseStatementProvider();
281        }
282    }
283
284    private static boolean isBlank(CharSequence charSeq) {
285        if (charSeq == null) {
286            return true;
287        }
288        final int length = charSeq.length();
289        if (length == 0) {
290            return true;
291        }
292        for (int i = 0; i < length; i++) {
293            if (Character.isWhitespace(charSeq.charAt(i)) == false) {
294                return false;
295            }
296        }
297        return true;
298    }
299}