001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2006-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2013-2016 ForgeRock AS.
016 */
017package org.opends.server.tasks;
018
019import java.util.List;
020
021import org.forgerock.i18n.LocalizableMessage;
022import org.forgerock.i18n.LocalizableMessageBuilder;
023import org.opends.messages.TaskMessages;
024import org.opends.server.backends.task.Task;
025import org.opends.server.backends.task.TaskState;
026import org.forgerock.i18n.slf4j.LocalizedLogger;
027import org.opends.server.replication.common.CSN;
028import org.opends.server.replication.plugin.LDAPReplicationDomain;
029import org.forgerock.opendj.ldap.schema.AttributeType;
030import org.opends.server.types.*;
031import org.forgerock.opendj.ldap.DN;
032import org.forgerock.opendj.ldap.ResultCode;
033import org.opends.server.util.TimeThread;
034
035import static org.opends.server.config.ConfigConstants.*;
036import static org.opends.server.core.DirectoryServer.*;
037
038/**
039 * This class provides an implementation of a Directory Server task that can
040 * be used to purge the replication historical informations stored in the
041 * user entries to solve conflicts.
042 */
043public class PurgeConflictsHistoricalTask extends Task
044{
045  /** The default value for the maximum duration of the purge expressed in seconds. */
046  public static final int DEFAULT_MAX_DURATION = 60 * 60;
047  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
048
049  private String domainString;
050  private LDAPReplicationDomain domain;
051
052  /**
053   *                 current historical purge delay
054   *                <--------------------------------->
055   * -----------------------------------------------------------------> t
056   *               |                           |            |
057   *           current                      task           task
058   *           CSN being purged           start date    max end date
059   *                                           <------------>
060   *                                          config.purgeMaxDuration
061   *
062   * The task will start purging the change with the oldest CSN found.
063   * The task run as long as :
064   *  - the end date (computed from the configured max duration) is not reached
065   *  - the CSN purged is oldest than the configured historical purge delay
066   */
067  private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION;
068
069  private TaskState initState;
070
071
072  /** {@inheritDoc} */
073  @Override
074  public LocalizableMessage getDisplayName() {
075    return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get();
076  }
077
078  /** {@inheritDoc} */
079  @Override public void initializeTask() throws DirectoryException
080  {
081    if (TaskState.isDone(getTaskState()))
082    {
083      return;
084    }
085
086    // FIXME -- Do we need any special authorization here?
087    Entry taskEntry = getTaskEntry();
088
089    AttributeType typeDomainBase = getSchema().getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN);
090    List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase);
091    domainString = TaskUtils.getSingleValueString(attrList);
092
093    try
094    {
095      DN dn = DN.valueOf(domainString);
096      // We can assume that this is an LDAP replication domain
097      domain = LDAPReplicationDomain.retrievesReplicationDomain(dn);
098    }
099    catch(DirectoryException e)
100    {
101      LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
102      mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
103      mb.append(e.getMessage());
104      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage());
105    }
106
107    AttributeType typeMaxDuration = getSchema().getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION);
108    attrList = taskEntry.getAttribute(typeMaxDuration);
109    String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList);
110
111    if (maxDurationStringInSec != null)
112    {
113      try
114      {
115        purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec);
116      }
117      catch(Exception e)
118      {
119        throw new DirectoryException(
120            ResultCode.UNWILLING_TO_PERFORM,
121            TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get(
122        ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage()));
123      }
124    }
125  }
126
127  /** {@inheritDoc} */
128  @Override
129  protected TaskState runTask()
130  {
131    Boolean purgeCompletedInTime = false;
132    logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d",
133        domain.getBaseDN(), purgeTaskMaxDurationInSec);
134    try
135    {
136      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
137
138      // launch the task
139      domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000);
140
141      purgeCompletedInTime = true;
142      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
143
144      initState =  TaskState.COMPLETED_SUCCESSFULLY;
145    }
146    catch(DirectoryException de)
147    {
148      logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
149      if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
150      {
151        // Error raised at submission time
152        logger.error(de.getMessageObject());
153        initState = TaskState.STOPPED_BY_ERROR;
154      }
155      else
156      {
157        initState =  TaskState.COMPLETED_SUCCESSFULLY;
158      }
159    }
160    finally
161    {
162      try
163      {
164        // sets in the attributes the last stats values
165        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
166        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
167        logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
168      }
169      catch(Exception e)
170      {
171        logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage());
172        initState = TaskState.STOPPED_BY_ERROR;
173      }
174    }
175
176    logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s",
177        initState, purgeCompletedInTime);
178    return initState;
179  }
180
181  private int updateAttrPeriod;
182  private CSN lastCSN;
183  private int purgeCount;
184
185  /**
186   * Set the last CSN purged and the count of purged values in order to monitor
187   * the historical purge.
188   *
189   * @param lastCSN
190   *          the last CSN purged.
191   * @param purgeCount
192   *          the count of purged values.
193   */
194  public void setProgressStats(CSN lastCSN, int purgeCount)
195  {
196    try
197    {
198      if (purgeCount == 0)
199      {
200        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI());
201      }
202
203      // we don't want the update of the task to overload too much task duration
204      this.purgeCount = purgeCount;
205      this.lastCSN = lastCSN;
206      if (++updateAttrPeriod % 100 == 0)
207      {
208        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
209        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
210        logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
211      }
212    }
213    catch(DirectoryException de)
214    {
215      logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
216      initState = TaskState.STOPPED_BY_ERROR;
217    }
218  }
219}