001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2016 ForgeRock AS. 016 */ 017package org.opends.server.tasks; 018 019import java.util.List; 020 021import org.forgerock.i18n.LocalizableMessage; 022import org.forgerock.i18n.LocalizableMessageBuilder; 023import org.opends.messages.TaskMessages; 024import org.opends.server.backends.task.Task; 025import org.opends.server.backends.task.TaskState; 026import org.forgerock.i18n.slf4j.LocalizedLogger; 027import org.opends.server.replication.common.CSN; 028import org.opends.server.replication.plugin.LDAPReplicationDomain; 029import org.forgerock.opendj.ldap.schema.AttributeType; 030import org.opends.server.types.*; 031import org.forgerock.opendj.ldap.DN; 032import org.forgerock.opendj.ldap.ResultCode; 033import org.opends.server.util.TimeThread; 034 035import static org.opends.server.config.ConfigConstants.*; 036import static org.opends.server.core.DirectoryServer.*; 037 038/** 039 * This class provides an implementation of a Directory Server task that can 040 * be used to purge the replication historical informations stored in the 041 * user entries to solve conflicts. 042 */ 043public class PurgeConflictsHistoricalTask extends Task 044{ 045 /** The default value for the maximum duration of the purge expressed in seconds. */ 046 public static final int DEFAULT_MAX_DURATION = 60 * 60; 047 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 048 049 private String domainString; 050 private LDAPReplicationDomain domain; 051 052 /** 053 * current historical purge delay 054 * <---------------------------------> 055 * -----------------------------------------------------------------> t 056 * | | | 057 * current task task 058 * CSN being purged start date max end date 059 * <------------> 060 * config.purgeMaxDuration 061 * 062 * The task will start purging the change with the oldest CSN found. 063 * The task run as long as : 064 * - the end date (computed from the configured max duration) is not reached 065 * - the CSN purged is oldest than the configured historical purge delay 066 */ 067 private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION; 068 069 private TaskState initState; 070 071 072 /** {@inheritDoc} */ 073 @Override 074 public LocalizableMessage getDisplayName() { 075 return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get(); 076 } 077 078 /** {@inheritDoc} */ 079 @Override public void initializeTask() throws DirectoryException 080 { 081 if (TaskState.isDone(getTaskState())) 082 { 083 return; 084 } 085 086 // FIXME -- Do we need any special authorization here? 087 Entry taskEntry = getTaskEntry(); 088 089 AttributeType typeDomainBase = getSchema().getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN); 090 List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase); 091 domainString = TaskUtils.getSingleValueString(attrList); 092 093 try 094 { 095 DN dn = DN.valueOf(domainString); 096 // We can assume that this is an LDAP replication domain 097 domain = LDAPReplicationDomain.retrievesReplicationDomain(dn); 098 } 099 catch(DirectoryException e) 100 { 101 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 102 mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get()); 103 mb.append(e.getMessage()); 104 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage()); 105 } 106 107 AttributeType typeMaxDuration = getSchema().getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION); 108 attrList = taskEntry.getAttribute(typeMaxDuration); 109 String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList); 110 111 if (maxDurationStringInSec != null) 112 { 113 try 114 { 115 purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec); 116 } 117 catch(Exception e) 118 { 119 throw new DirectoryException( 120 ResultCode.UNWILLING_TO_PERFORM, 121 TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get( 122 ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage())); 123 } 124 } 125 } 126 127 /** {@inheritDoc} */ 128 @Override 129 protected TaskState runTask() 130 { 131 Boolean purgeCompletedInTime = false; 132 logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d", 133 domain.getBaseDN(), purgeTaskMaxDurationInSec); 134 try 135 { 136 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 137 138 // launch the task 139 domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000); 140 141 purgeCompletedInTime = true; 142 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 143 144 initState = TaskState.COMPLETED_SUCCESSFULLY; 145 } 146 catch(DirectoryException de) 147 { 148 logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); 149 if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED) 150 { 151 // Error raised at submission time 152 logger.error(de.getMessageObject()); 153 initState = TaskState.STOPPED_BY_ERROR; 154 } 155 else 156 { 157 initState = TaskState.COMPLETED_SUCCESSFULLY; 158 } 159 } 160 finally 161 { 162 try 163 { 164 // sets in the attributes the last stats values 165 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 166 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 167 logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); 168 } 169 catch(Exception e) 170 { 171 logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage()); 172 initState = TaskState.STOPPED_BY_ERROR; 173 } 174 } 175 176 logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s", 177 initState, purgeCompletedInTime); 178 return initState; 179 } 180 181 private int updateAttrPeriod; 182 private CSN lastCSN; 183 private int purgeCount; 184 185 /** 186 * Set the last CSN purged and the count of purged values in order to monitor 187 * the historical purge. 188 * 189 * @param lastCSN 190 * the last CSN purged. 191 * @param purgeCount 192 * the count of purged values. 193 */ 194 public void setProgressStats(CSN lastCSN, int purgeCount) 195 { 196 try 197 { 198 if (purgeCount == 0) 199 { 200 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI()); 201 } 202 203 // we don't want the update of the task to overload too much task duration 204 this.purgeCount = purgeCount; 205 this.lastCSN = lastCSN; 206 if (++updateAttrPeriod % 100 == 0) 207 { 208 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 209 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 210 logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); 211 } 212 } 213 catch(DirectoryException de) 214 { 215 logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); 216 initState = TaskState.STOPPED_BY_ERROR; 217 } 218 } 219}