001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2014-2017 ForgeRock AS.
015 */
016package org.opends.server.backends;
017
018import static com.forgerock.opendj.util.StaticUtils.containsIgnoreCase;
019import static org.forgerock.opendj.ldap.schema.CoreSchema.*;
020import static org.opends.messages.BackendMessages.*;
021import static org.opends.messages.ReplicationMessages.*;
022import static org.opends.server.config.ConfigConstants.*;
023import static org.opends.server.controls.PersistentSearchChangeType.ADD;
024import static org.opends.server.core.DirectoryServer.*;
025import static org.opends.server.replication.plugin.MultimasterReplication.*;
026import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
027import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
028import static org.opends.server.util.LDIFWriter.*;
029import static org.opends.server.util.ServerConstants.*;
030import static org.opends.server.util.StaticUtils.*;
031
032import java.text.SimpleDateFormat;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Date;
036import java.util.Iterator;
037import java.util.LinkedHashMap;
038import java.util.LinkedList;
039import java.util.List;
040import java.util.Map;
041import java.util.Set;
042import java.util.TimeZone;
043import java.util.concurrent.ConcurrentLinkedQueue;
044import java.util.concurrent.ConcurrentSkipListMap;
045import java.util.concurrent.atomic.AtomicReference;
046
047import org.forgerock.i18n.LocalizableMessage;
048import org.forgerock.i18n.slf4j.LocalizedLogger;
049import org.forgerock.opendj.config.Configuration;
050import org.forgerock.opendj.config.server.ConfigException;
051import org.forgerock.opendj.ldap.AttributeDescription;
052import org.forgerock.opendj.ldap.ByteString;
053import org.forgerock.opendj.ldap.ConditionResult;
054import org.forgerock.opendj.ldap.DN;
055import org.forgerock.opendj.ldap.ModificationType;
056import org.forgerock.opendj.ldap.RDN;
057import org.forgerock.opendj.ldap.ResultCode;
058import org.forgerock.opendj.ldap.SearchScope;
059import org.forgerock.opendj.ldap.schema.AttributeType;
060import org.forgerock.opendj.ldap.schema.CoreSchema;
061import org.forgerock.opendj.ldap.schema.ObjectClass;
062import org.opends.server.api.Backend;
063import org.opends.server.controls.EntryChangeNotificationControl;
064import org.opends.server.controls.EntryChangelogNotificationControl;
065import org.opends.server.controls.ExternalChangelogRequestControl;
066import org.opends.server.controls.PersistentSearchControl;
067import org.opends.server.core.AddOperation;
068import org.opends.server.core.DeleteOperation;
069import org.opends.server.core.DirectoryServer;
070import org.opends.server.core.ModifyDNOperation;
071import org.opends.server.core.ModifyOperation;
072import org.opends.server.core.PersistentSearch;
073import org.opends.server.core.SearchOperation;
074import org.opends.server.core.ServerContext;
075import org.opends.server.replication.common.CSN;
076import org.opends.server.replication.common.MultiDomainServerState;
077import org.opends.server.replication.common.ServerState;
078import org.opends.server.replication.protocol.AddMsg;
079import org.opends.server.replication.protocol.DeleteMsg;
080import org.opends.server.replication.protocol.LDAPUpdateMsg;
081import org.opends.server.replication.protocol.ModifyCommonMsg;
082import org.opends.server.replication.protocol.ModifyDNMsg;
083import org.opends.server.replication.protocol.UpdateMsg;
084import org.opends.server.replication.server.ReplicationServer;
085import org.opends.server.replication.server.ReplicationServerDomain;
086import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
087import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
088import org.opends.server.replication.server.changelog.api.ChangelogDB;
089import org.opends.server.replication.server.changelog.api.ChangelogException;
090import org.opends.server.replication.server.changelog.api.DBCursor;
091import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
092import org.opends.server.replication.server.changelog.api.ReplicaId;
093import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
094import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
095import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor;
096import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
097import org.opends.server.types.Attribute;
098import org.opends.server.types.Attributes;
099import org.opends.server.types.BackupConfig;
100import org.opends.server.types.BackupDirectory;
101import org.opends.server.types.CanceledOperationException;
102import org.opends.server.types.Control;
103import org.opends.server.types.DirectoryException;
104import org.opends.server.types.Entry;
105import org.opends.server.types.FilterType;
106import org.opends.server.types.IndexType;
107import org.opends.server.types.InitializationException;
108import org.opends.server.types.LDIFExportConfig;
109import org.opends.server.types.LDIFImportConfig;
110import org.opends.server.types.LDIFImportResult;
111import org.opends.server.types.Modification;
112import org.opends.server.types.Privilege;
113import org.opends.server.types.RawAttribute;
114import org.opends.server.types.RestoreConfig;
115import org.opends.server.types.SearchFilter;
116import org.opends.server.types.WritabilityMode;
117import org.opends.server.util.CollectionUtils;
118import org.opends.server.util.StaticUtils;
119
120/**
121 * A backend that provides access to the changelog, i.e. the "cn=changelog"
122 * suffix. It is a read-only backend that is created by a
123 * {@link ReplicationServer} and is not configurable.
124 * <p>
125 * There are two modes to search the changelog:
126 * <ul>
127 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
128 * request. The cookie provided in the control is used to retrieve entries from
129 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
130 * the entries.</li>
131 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
132 * with the request. The entries are retrieved using the ChangeNumberIndexDB and
133 * their attributes are set with the information from the ReplicasDBs. The
134 * <code>changeNumber</code> attribute value is set from the content of
135 * ChangeNumberIndexDB.</li>
136 * </ul>
137 * <h3>Searches flow</h3>
138 * <p>
139 * Here is the flow of searches within the changelog backend APIs:
140 * <ul>
141 * <li>Normal searches only go through:
142 * <ol>
143 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
144 * </ol>
145 * </li>
146 * <li>Persistent searches with <code>changesOnly=false</code> go through:
147 * <ol>
148 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
149 * (once, single threaded),</li>
150 * <li>
151 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
152 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
153 * threaded)</li>
154 * </ol>
155 * </li>
156 * <li>Persistent searches with <code>changesOnly=true</code> go through:
157 * <ol>
158 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
159 * (once, single threaded)</li>
160 * <li>
161 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
162 * threaded)</li>
163 * </ol>
164 * </li>
165 * </ul>
166 *
167 * @see ReplicationServer
168 */
169public class ChangelogBackend extends Backend<Configuration>
170{
171  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
172
173  /** The id of this backend. */
174  public static final String BACKEND_ID = "changelog";
175
176  private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
177
178  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
179  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
180
181  /** The set of objectclasses that will be used in root entry. */
182  private static final Map<ObjectClass, String>
183    CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2);
184  static
185  {
186    CHANGELOG_ROOT_OBJECT_CLASSES.put(CoreSchema.getTopObjectClass(), OC_TOP);
187    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getSchema().getObjectClass("container"), "container");
188  }
189
190  /** The set of objectclasses that will be used in ECL entries. */
191  private static final Map<ObjectClass, String>
192    CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2);
193  static
194  {
195    CHANGELOG_ENTRY_OBJECT_CLASSES.put(CoreSchema.getTopObjectClass(), OC_TOP);
196    CHANGELOG_ENTRY_OBJECT_CLASSES.put(getSchema().getObjectClass(OC_CHANGELOG_ENTRY), OC_CHANGELOG_ENTRY);
197  }
198
199  /** The base DN for the external change log. */
200  public static final DN CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT);
201
202  /** The set of base DNs for this backend. */
203  private Set<DN> baseDNs;
204  /** The set of supported controls for this backend. */
205  private final Set<String> supportedControls =
206      CollectionUtils.newHashSet(OID_ECL_COOKIE_EXCHANGE_CONTROL, OID_PERSISTENT_SEARCH);
207  /** Whether the base changelog entry has subordinates. */
208  private Boolean baseEntryHasSubordinates;
209
210  /** The replication server on which the changelog is read. */
211  private final ReplicationServer replicationServer;
212  private final ECLEnabledDomainPredicate domainPredicate;
213
214  /** The set of cookie-based persistent searches registered with this backend. */
215  private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>();
216  /** The set of change number-based persistent searches registered with this backend. */
217  private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
218      new ConcurrentLinkedQueue<>();
219
220  /**
221   * Creates a new backend with the provided replication server.
222   *
223   * @param replicationServer
224   *          The replication server on which the changes are read.
225   * @param domainPredicate
226   *          Returns whether a domain is enabled for the external changelog.
227   */
228  public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
229  {
230    this.replicationServer = replicationServer;
231    this.domainPredicate = domainPredicate;
232    setBackendID(BACKEND_ID);
233    setWritabilityMode(WritabilityMode.DISABLED);
234    setPrivateBackend(true);
235  }
236
237  private ChangelogDB getChangelogDB()
238  {
239    return replicationServer.getChangelogDB();
240  }
241
242  /**
243   * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
244   *
245   * @return the ChangelogBackend configured for "cn=changelog" in this directory server
246   * @deprecated instead inject the required object where needed
247   */
248  @Deprecated
249  public static ChangelogBackend getInstance()
250  {
251    return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
252  }
253
254  @Override
255  public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException
256  {
257    throw new UnsupportedOperationException("The changelog backend is not configurable");
258  }
259
260  @Override
261  public void openBackend() throws InitializationException
262  {
263    baseDNs = Collections.singleton(CHANGELOG_BASE_DN);
264
265    try
266    {
267      DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
268    }
269    catch (final DirectoryException e)
270    {
271      throw new InitializationException(
272          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
273    }
274  }
275
276  @Override
277  public void closeBackend()
278  {
279    try
280    {
281      DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
282    }
283    catch (final DirectoryException e)
284    {
285      logger.traceException(e);
286    }
287  }
288
289  @Override
290  public Set<DN> getBaseDNs()
291  {
292    return baseDNs;
293  }
294
295  @Override
296  public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
297  {
298    return true;
299  }
300
301  @Override
302  public Entry getEntry(final DN entryDN) throws DirectoryException
303  {
304    if (entryDN == null)
305    {
306      throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
307          ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID()));
308    }
309    throw new RuntimeException("Not implemented");
310  }
311
312  @Override
313  public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
314  {
315    if (CHANGELOG_BASE_DN.equals(entryDN))
316    {
317      final Boolean hasSubs = baseChangelogHasSubordinates();
318      if (hasSubs == null)
319      {
320        return ConditionResult.UNDEFINED;
321      }
322      return ConditionResult.valueOf(hasSubs);
323    }
324    return ConditionResult.FALSE;
325  }
326
327  private Boolean baseChangelogHasSubordinates() throws DirectoryException
328  {
329    if (baseEntryHasSubordinates == null)
330    {
331      // compute its value
332      try
333      {
334        final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
335        CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
336        try (final MultiDomainDBCursor cursor =
337            replicationDomainDB.getCursorFrom(new MultiDomainServerState(), options, getExcludedBaseDNs()))
338        {
339          baseEntryHasSubordinates = cursor.next();
340        }
341      }
342      catch (ChangelogException e)
343      {
344        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
345            "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
346      }
347    }
348    return baseEntryHasSubordinates;
349  }
350
351  @Override
352  public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException
353  {
354    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
355  }
356
357  @Override
358  public long getNumberOfChildren(final DN parentDN) throws DirectoryException
359  {
360    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
361  }
362
363  /**
364   * Notifies persistent searches of this backend that a new cookie entry was added to it.
365   * <p>
366   * Note: This method correspond to the "persistent search" phase.
367   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
368   * <p>
369   * This method must only be called after the provided data have been persisted to disk.
370   *
371   * @param baseDN
372   *          the baseDN of the newly added entry.
373   * @param updateMsg
374   *          the update message of the newly added entry
375   * @throws ChangelogException
376   *           If a problem occurs while notifying of the newly added entry.
377   */
378  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
379  {
380    if (!(updateMsg instanceof LDAPUpdateMsg))
381    {
382      return;
383    }
384
385    try
386    {
387      for (PersistentSearch pSearch : cookieBasedPersistentSearches)
388      {
389        final SearchOperation searchOp = pSearch.getSearchOperation();
390        final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
391        entrySender.persistentSearchSendEntry(baseDN, updateMsg);
392      }
393    }
394    catch (DirectoryException e)
395    {
396      throw new ChangelogException(e.getMessageObject(), e);
397    }
398  }
399
400  /**
401   * Notifies persistent searches of this backend that a new change number entry was added to it.
402   * <p>
403   * Note: This method correspond to the "persistent search" phase.
404   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
405   * <p>
406   * This method must only be called after the provided data have been persisted to disk.
407   *
408   * @param baseDN
409   *          the baseDN of the newly added entry.
410   * @param changeNumber
411   *          the change number of the newly added entry. It will be greater
412   *          than zero for entries added to the change number index and less
413   *          than or equal to zero for entries added to any replica DB
414   * @param cookieString
415   *          a string representing the cookie of the newly added entry.
416   *          This is only meaningful for entries added to the change number index
417   * @param updateMsg
418   *          the update message of the newly added entry
419   * @throws ChangelogException
420   *           If a problem occurs while notifying of the newly added entry.
421   */
422  public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
423      throws ChangelogException
424  {
425    if (!(updateMsg instanceof LDAPUpdateMsg)
426        || changeNumberBasedPersistentSearches.isEmpty())
427    {
428      return;
429    }
430
431    try
432    {
433      // changeNumber entry can be shared with multiple persistent searches
434      final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
435      for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
436      {
437        final SearchOperation searchOp = pSearch.getSearchOperation();
438        final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
439        entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
440      }
441    }
442    catch (DirectoryException e)
443    {
444      throw new ChangelogException(e.getMessageObject(), e);
445    }
446  }
447
448  private boolean isCookieBased(final SearchOperation searchOp)
449  {
450    for (Control c : searchOp.getRequestControls())
451    {
452      if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
453      {
454        return true;
455      }
456    }
457    return false;
458  }
459
460  @Override
461  public void addEntry(Entry entry, AddOperation addOperation)
462      throws DirectoryException, CanceledOperationException
463  {
464    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
465        ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID()));
466  }
467
468  @Override
469  public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
470      throws DirectoryException, CanceledOperationException
471  {
472    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
473        ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID()));
474  }
475
476  @Override
477  public void replaceEntry(Entry oldEntry, Entry newEntry,
478      ModifyOperation modifyOperation) throws DirectoryException,
479      CanceledOperationException
480  {
481    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
482        ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID()));
483  }
484
485  @Override
486  public void renameEntry(DN currentDN, Entry entry,
487      ModifyDNOperation modifyDNOperation) throws DirectoryException,
488      CanceledOperationException
489  {
490    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
491        ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID()));
492  }
493
494  /**
495   * {@inheritDoc}
496   * <p>
497   * Runs the "initial search" phase (as opposed to a "persistent search"
498   * phase). The "initial search" phase is the only search run by normal
499   * searches, but it is also run by persistent searches with
500   * <code>changesOnly=false</code>. Persistent searches with
501   * <code>changesOnly=true</code> never execute this code.
502   * <p>
503   * Note: this method is executed only once per persistent search, single
504   * threaded.
505   */
506  @Override
507  public void search(final SearchOperation searchOperation) throws DirectoryException
508  {
509    checkChangelogReadPrivilege(searchOperation);
510
511    final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
512    final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs);
513
514    final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter());
515    try
516    {
517      final boolean isPersistentSearch = isPersistentSearch(searchOperation);
518      if (cookie != null)
519      {
520        initialSearchFromCookie(
521            getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch));
522      }
523      else
524      {
525        initialSearchFromChangeNumber(
526            getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch));
527      }
528    }
529    catch (ChangelogException e)
530    {
531      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
532          searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e)));
533    }
534  }
535
536  private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs)
537      throws DirectoryException
538  {
539    final ExternalChangelogRequestControl eclRequestControl =
540        searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
541    if (eclRequestControl != null)
542    {
543      final MultiDomainServerState cookie = eclRequestControl.getCookie();
544      validateProvidedCookie(cookie, excludedBaseDNs);
545      return cookie;
546    }
547    return null;
548  }
549
550  @Override
551  public Set<String> getSupportedControls()
552  {
553    return supportedControls;
554  }
555
556  @Override
557  public Set<String> getSupportedFeatures()
558  {
559    return Collections.emptySet();
560  }
561
562  @Override
563  public boolean supports(BackendOperation backendOperation)
564  {
565    return false;
566  }
567
568  @Override
569  public void exportLDIF(final LDIFExportConfig exportConfig)
570      throws DirectoryException
571  {
572    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
573        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
574  }
575
576  @Override
577  public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext)
578      throws DirectoryException
579  {
580    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
581        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
582  }
583
584  @Override
585  public void createBackup(BackupConfig backupConfig) throws DirectoryException
586  {
587    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
588        ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
589  }
590
591  @Override
592  public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
593  {
594      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
595          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
596  }
597
598  @Override
599  public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
600  {
601      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
602          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
603  }
604
605  @Override
606  public long getEntryCount()
607  {
608    try
609    {
610      return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1;
611    }
612    catch (DirectoryException e)
613    {
614      logger.traceException(e);
615      return -1;
616    }
617  }
618
619  /**
620   * Represent the change number range targeted by a search operation.
621   * <p>
622   * This class should be visible for tests.
623   */
624  static final class ChangeNumberRange
625  {
626    private long lowerBound = -1;
627    private long upperBound = -1;
628
629    /**
630     * Returns the lowest change number to retrieve (inclusive).
631     *
632     * @return the lowest change number
633     */
634    long getLowerBound()
635    {
636      return lowerBound;
637    }
638
639    /**
640     * Returns the highest change number to retrieve (inclusive).
641     *
642     * @return the highest change number
643     */
644    long getUpperBound()
645    {
646      return upperBound;
647    }
648  }
649
650  /**
651   * Returns the set of DNs to exclude from the search.
652   *
653   * @return the DNs corresponding to domains to exclude from the search.
654   * @throws DirectoryException
655   *           If a DN can't be decoded.
656   */
657  private static Set<DN> getExcludedBaseDNs() throws DirectoryException
658  {
659    return getExcludedChangelogDomains();
660  }
661
662  /**
663   * Optimize the search parameters by analyzing the DN and filter.
664   * It also performs validation on some search parameters
665   * for both cookie and change number based changelogs.
666   *
667   * @param baseDN the provided search baseDN.
668   * @param userFilter the provided search filter.
669   * @return the optimized change number range
670   * @throws DirectoryException when an exception occurs.
671   */
672  ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException
673  {
674    SearchFilter equalityFilter = null;
675    switch (baseDN.size())
676    {
677    case 1:
678      // "cn=changelog" : use user-provided search filter.
679      break;
680    case 2:
681      // It is probably "changeNumber=xxx,cn=changelog", use equality filter
682      // But it also could be "<service-id>,cn=changelog" so need to check on attribute
683      equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR);
684      break;
685    default:
686      // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
687      equalityFilter = buildSearchFilterFrom(baseDN, "replicationCSN");
688      break;
689    }
690
691    return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
692  }
693
694  /**
695   * Build a search filter from given DN and attribute.
696   *
697   * @return the search filter or {@code null} if attribute is not present in
698   *         the provided DN
699   */
700  private SearchFilter buildSearchFilterFrom(final DN baseDN, final String attrName)
701  {
702    final RDN rdn = baseDN.rdn();
703    AttributeType attrType = DirectoryServer.getSchema().getAttributeType(attrName);
704    final ByteString attrValue = rdn.getAttributeValue(attrType);
705    if (attrValue != null)
706    {
707      return SearchFilter.createEqualityFilter(attrType, attrValue);
708    }
709    return null;
710  }
711
712  private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
713  {
714    final ChangeNumberRange range = new ChangeNumberRange();
715    if (filter == null)
716    {
717      return range;
718    }
719
720    if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
721    {
722      range.lowerBound = decodeChangeNumber(filter.getAssertionValue());
723    }
724    else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
725    {
726      range.upperBound = decodeChangeNumber(filter.getAssertionValue());
727    }
728    else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
729    {
730      final long number = decodeChangeNumber(filter.getAssertionValue());
731      range.lowerBound = number;
732      range.upperBound = number;
733    }
734    else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
735    {
736      // == exact CSN
737      // validate provided CSN is correct
738      new CSN(filter.getAssertionValue().toString());
739    }
740    else if (filter.getFilterType() == FilterType.AND)
741    {
742      // TODO: it looks like it could be generalized to N components, not only two
743      final Collection<SearchFilter> components = filter.getFilterComponents();
744      final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
745      long upper1 = -1;
746      long lower1 = -1;
747      long upper2 = -1;
748      long lower2 = -1;
749      if (filters.length > 0)
750      {
751        ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]);
752        upper1 = range1.upperBound;
753        lower1 = range1.lowerBound;
754      }
755      if (filters.length > 1)
756      {
757        ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]);
758        upper2 = range2.upperBound;
759        lower2 = range2.lowerBound;
760      }
761      if (upper1 == -1)
762      {
763        range.upperBound = upper2;
764      }
765      else if (upper2 == -1)
766      {
767        range.upperBound = upper1;
768      }
769      else
770      {
771        range.upperBound = Math.min(upper1, upper2);
772      }
773
774      range.lowerBound = Math.max(lower1, lower2);
775    }
776    return range;
777  }
778
779  private static long decodeChangeNumber(final ByteString assertionValue)
780      throws DirectoryException
781  {
782    try
783    {
784      return Long.decode(assertionValue.toString());
785    }
786    catch (NumberFormatException e)
787    {
788      throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
789          LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue));
790    }
791  }
792
793  private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
794  {
795    return filter.getFilterType() == filterType
796           && filter.getAttributeType() != null
797           && filter.getAttributeType().getNameOrOID().equalsIgnoreCase(primaryName);
798  }
799
800  /** Search the changelog when a cookie control is provided. */
801  private void initialSearchFromCookie(final CookieEntrySender entrySender)
802      throws DirectoryException, ChangelogException
803  {
804    if (!sendBaseChangelogEntry(entrySender.searchOp))
805    { // only return the base entry: stop here
806      return;
807    }
808
809    final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
810    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
811    try (final MultiDomainDBCursor cursor =
812        replicationDomainDB.getCursorFrom(entrySender.cookie, options, entrySender.excludedBaseDNs);
813        ECLMultiDomainDBCursor replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
814    {
815      if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor))
816      {
817        entrySender.transitioningToPersistentSearchPhase();
818        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
819      }
820    }
821    finally
822    {
823      entrySender.finalizeInitialSearch();
824    }
825  }
826
827  private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation,
828      MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch)
829  {
830    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
831    {
832      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
833    }
834    return new CookieEntrySender(searchOperation, null, startPhase, cookie, excludedBaseDNs);
835  }
836
837  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
838      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
839  {
840    boolean continueSearch = true;
841    while (continueSearch && replicaUpdatesCursor.next())
842    {
843      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
844      final DN domainBaseDN = replicaUpdatesCursor.getData();
845      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
846    }
847    return continueSearch;
848  }
849
850  private boolean isPersistentSearch(SearchOperation op)
851  {
852    for (PersistentSearch pSearch : getPersistentSearches())
853    {
854      if (op == pSearch.getSearchOperation())
855      {
856        return true;
857      }
858    }
859    return false;
860  }
861
862  @Override
863  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
864  {
865    initializePersistentSearch(pSearch);
866
867    if (isCookieBased(pSearch.getSearchOperation()))
868    {
869      cookieBasedPersistentSearches.add(pSearch);
870    }
871    else
872    {
873      changeNumberBasedPersistentSearches.add(pSearch);
874    }
875    super.registerPersistentSearch(pSearch);
876  }
877
878  private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException
879  {
880    final SearchOperation searchOp = pSearch.getSearchOperation();
881
882    // Validation must be done during registration for changes only persistent searches.
883    // Otherwise, when there is an initial search phase,
884    // validation is performed by the search() method.
885    if (pSearch.isChangesOnly())
886    {
887      checkChangelogReadPrivilege(searchOp);
888    }
889    final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter());
890
891    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
892    PersistentSearchControl psearchControl = searchOp.getRequestControl(PersistentSearchControl.DECODER);
893    if (isCookieBased(searchOp))
894    {
895      final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
896      final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs);
897      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
898          new CookieEntrySender(searchOp, psearchControl, startPhase, cookie, excludedBaseDNs));
899    }
900    else
901    {
902      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
903          new ChangeNumberEntrySender(searchOp, psearchControl, startPhase, range));
904    }
905  }
906
907  private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs)
908      throws DirectoryException
909  {
910    if (isChangesOnly)
911    {
912      // this changesOnly persistent search will not go through #initialSearch()
913      // so we must initialize the cookie here
914      return getNewestCookie(searchOp);
915    }
916    return getCookieFromControl(searchOp, excludedBaseDNs);
917  }
918
919  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
920  {
921    if (!isCookieBased(searchOp))
922    {
923      return null;
924    }
925
926    final MultiDomainServerState cookie = new MultiDomainServerState();
927    for (final Iterator<ReplicationServerDomain> it =
928        replicationServer.getDomainIterator(); it.hasNext();)
929    {
930      final DN baseDN = it.next().getBaseDN();
931      final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
932      cookie.update(baseDN, state);
933    }
934    return cookie;
935  }
936
937  /**
938   * Validates the cookie contained in search parameters by checking its content
939   * with the actual replication server state.
940   *
941   * @throws DirectoryException
942   *           If the state is not valid
943   */
944  private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs)
945      throws DirectoryException
946  {
947    if (cookie != null && !cookie.isEmpty())
948    {
949      replicationServer.validateCookie(cookie, excludedBaseDNs);
950    }
951  }
952
953  /** Search the changelog using change number(s). */
954  private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender)
955      throws ChangelogException, DirectoryException
956  {
957    if (!sendBaseChangelogEntry(entrySender.searchOp))
958    { // only return the base entry: stop here
959      return;
960    }
961
962    final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>();
963    try (DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber))
964    {
965      final MultiDomainServerState cookie = new MultiDomainServerState();
966
967      if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie))
968      {
969        entrySender.transitioningToPersistentSearchPhase();
970        sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie);
971      }
972    }
973    finally
974    {
975      entrySender.finalizeInitialSearch();
976      StaticUtils.close(replicaUpdatesCursor.get());
977    }
978  }
979
980  private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase,
981      final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch)
982  {
983    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
984    {
985      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
986    }
987    return new ChangeNumberEntrySender(searchOperation, null, SearchPhase.INITIAL, range);
988  }
989
990  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
991      DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor,
992      MultiDomainServerState cookie) throws ChangelogException, DirectoryException
993  {
994    boolean continueSearch = true;
995    while (continueSearch && cnIndexDBCursor.next())
996    {
997      // Handle the current cnIndex record
998      final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
999      if (replicaUpdatesCursor.get() == null)
1000      {
1001        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
1002        if (isChangelogCookieAttributeRequired(entrySender.searchOp))
1003        {
1004          initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
1005        } // otherwise, the cookie is not required and/or will be filtered before sending the entry
1006      }
1007      else
1008      {
1009        cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1010      }
1011      continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
1012      if (continueSearch)
1013      {
1014        final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
1015        if (updateMsg != null)
1016        {
1017          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
1018          replicaUpdatesCursor.get().next();
1019        }
1020      }
1021    }
1022    return continueSearch;
1023  }
1024
1025  private boolean isChangelogCookieAttributeRequired(SearchOperation searchOp)
1026  {
1027    final Set<String> requiredAttributes = searchOp.getAttributes();
1028    return containsIgnoreCase(requiredAttributes, "changelogcookie")
1029      || containsIgnoreCase(requiredAttributes, ALL_OPERATIONAL_ATTRIBUTES)
1030      || queriesChangelogCookieAttribute(searchOp.getFilter());
1031  }
1032
1033  /** Initialize the provided cookie from the provided change number index record. */
1034  private void initializeCookieForChangeNumberMode(
1035      MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1036  {
1037    // TODO optimize positioning when the search filter is an equality filter on the changelogCookie?
1038    MultiDomainServerState startCookie = new MultiDomainServerState();
1039    startCookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1040
1041    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
1042    MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(startCookie, options);
1043    try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
1044    {
1045      updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
1046    }
1047  }
1048
1049  private boolean queriesChangelogCookieAttribute(SearchFilter filter)
1050  {
1051    switch (filter.getFilterType())
1052    {
1053    case AND:
1054    case OR:
1055      for (SearchFilter f : filter.getFilterComponents())
1056      {
1057        if (queriesChangelogCookieAttribute(f))
1058        {
1059          return true;
1060        }
1061      }
1062      return false;
1063
1064    case NOT:
1065      return queriesChangelogCookieAttribute(filter.getNotComponent());
1066
1067    case PRESENT:
1068    case EQUALITY:
1069    case APPROXIMATE_MATCH:
1070    case SUBSTRING:
1071    case GREATER_OR_EQUAL:
1072    case LESS_OR_EQUAL:
1073    case EXTENSIBLE_MATCH:
1074      return filter.getAttributeType().hasName("changelogCookie");
1075
1076    default:
1077      return false;
1078    }
1079  }
1080
1081  /**
1082   * Rebuilds the changelogcookie starting at the newest change number index record.
1083   * <p>
1084   * It updates the provided cookie with the changes from the provided ECL cursor,
1085   * up to (and including) the provided change number index record.
1086   * <p>
1087   * Therefore, after calling this method, the cursor is positioned
1088   * to the change immediately following the provided change number index record.
1089   *
1090   * @param cookie the cookie to update
1091   * @param cursor the cursor where to read changes from
1092   * @param cnIndexRecord the change number index record to go right after
1093   * @throws ChangelogException if any problem occurs
1094   */
1095  public static void updateCookieToMediumConsistencyPoint(
1096      MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
1097          throws ChangelogException
1098  {
1099    if (cnIndexRecord == null)
1100    {
1101      return;
1102    }
1103
1104    while (cursor.next())
1105    {
1106      UpdateMsg updateMsg = cursor.getRecord();
1107      if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
1108      {
1109        break;
1110      }
1111      cookie.update(cursor.getData(), updateMsg.getCSN());
1112    }
1113  }
1114
1115  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
1116      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1117  {
1118    final MultiDomainServerState state = new MultiDomainServerState();
1119    state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1120
1121    // No need for ECLMultiDomainDBCursor in this case
1122    // as updateMsg will be matched with cnIndexRecord
1123    CursorOptions options =
1124        new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
1125    final MultiDomainDBCursor replicaUpdatesCursor =
1126        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options);
1127    replicaUpdatesCursor.next();
1128    return replicaUpdatesCursor;
1129  }
1130
1131  /**
1132   * Returns the replica update message corresponding to the provided
1133   * cnIndexRecord.
1134   *
1135   * @return the update message, which may be {@code null} if the update message
1136   *         could not be found because it was purged or because corresponding
1137   *         baseDN was removed from the changelog
1138   * @throws DirectoryException
1139   *           If inconsistency is detected between the available update
1140   *           messages and the provided cnIndexRecord
1141   */
1142  private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
1143      throws ChangelogException, DirectoryException
1144  {
1145    while (true)
1146    {
1147      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
1148      final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
1149      if (compareIndexWithUpdateMsg < 0) {
1150        // Either update message has been purged or baseDN has been removed from changelogDB,
1151        // ignore current index record and go to the next one
1152        return null;
1153      }
1154      else if (compareIndexWithUpdateMsg == 0)
1155      {
1156        // Found the matching update message
1157        return updateMsg;
1158      }
1159      // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
1160      if (!replicaUpdatesCursor.next())
1161      {
1162        // Should never happen, as it means some messages have disappeared
1163        // TODO : put the correct I18N message
1164        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1165            LocalizableMessage.raw("Could not find replica update message matching index record. " +
1166                "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
1167      }
1168    }
1169  }
1170
1171  /** Returns a cursor on CNIndexDB for the provided first change number. */
1172  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
1173      final long firstChangeNumber) throws ChangelogException
1174  {
1175    final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
1176    long changeNumberToUse = firstChangeNumber;
1177    if (changeNumberToUse <= 1)
1178    {
1179      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
1180      changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
1181    }
1182    return cnIndexDB.getCursorFrom(changeNumberToUse);
1183  }
1184
1185  /** Creates a changelog entry. */
1186  private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
1187      final UpdateMsg msg) throws DirectoryException
1188  {
1189    if (msg instanceof AddMsg)
1190    {
1191      return createAddMsg(baseDN, changeNumber, cookie, msg);
1192    }
1193    else if (msg instanceof ModifyCommonMsg)
1194    {
1195      return createModifyMsg(baseDN, changeNumber, cookie, msg);
1196    }
1197    else if (msg instanceof DeleteMsg)
1198    {
1199      final DeleteMsg delMsg = (DeleteMsg) msg;
1200      return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
1201    }
1202    throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1203        LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN,
1204            msg.getClass()));
1205  }
1206
1207  /**
1208   * Creates an entry from an add message.
1209   * <p>
1210   * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
1211   * change initiators name if available which is contained in the creatorsName
1212   * attribute.
1213   */
1214  private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
1215      throws DirectoryException
1216  {
1217    final AddMsg addMsg = (AddMsg) msg;
1218    String changeInitiatorsName = null;
1219    String ldifChanges = null;
1220    try
1221    {
1222      final StringBuilder builder = new StringBuilder(256);
1223      for (Attribute attr : addMsg.getAttributes())
1224      {
1225        if (!attr.isEmpty()
1226            && attr.getAttributeDescription().getAttributeType().equals(getCreatorsNameAttributeType()))
1227        {
1228          // This attribute is not multi-valued.
1229          changeInitiatorsName = attr.iterator().next().toString();
1230        }
1231        for (ByteString value : attr)
1232        {
1233          builder.append(attr.getAttributeDescription());
1234          appendLDIFSeparatorAndValue(builder, value);
1235          builder.append('\n');
1236        }
1237      }
1238      ldifChanges = builder.toString();
1239    }
1240    catch (Exception e)
1241    {
1242      logEncodingMessageError("add", addMsg.getDN(), e);
1243    }
1244
1245    return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
1246  }
1247
1248  /**
1249   * Creates an entry from a modify message.
1250   * <p>
1251   * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
1252   * out change initiators name if available which is contained in the
1253   * modifiersName attribute.
1254   */
1255  private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
1256      final UpdateMsg msg) throws DirectoryException
1257  {
1258    final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
1259    String changeInitiatorsName = null;
1260    String ldifChanges = null;
1261    try
1262    {
1263      final StringBuilder builder = new StringBuilder(128);
1264      for (Modification mod : modifyMsg.getMods())
1265      {
1266        final Attribute attr = mod.getAttribute();
1267        if (mod.getModificationType() == ModificationType.REPLACE
1268            && !attr.isEmpty()
1269            && attr.getAttributeDescription().getAttributeType().equals(getModifiersNameAttributeType()))
1270        {
1271          // This attribute is not multi-valued.
1272          changeInitiatorsName = attr.iterator().next().toString();
1273        }
1274        final AttributeDescription attrDesc = attr.getAttributeDescription();
1275        builder.append(mod.getModificationType());
1276        builder.append(": ");
1277        builder.append(attrDesc);
1278        builder.append('\n');
1279
1280        for (ByteString value : attr)
1281        {
1282          builder.append(attrDesc);
1283          appendLDIFSeparatorAndValue(builder, value);
1284          builder.append('\n');
1285        }
1286        builder.append("-\n");
1287      }
1288      ldifChanges = builder.toString();
1289    }
1290    catch (Exception e)
1291    {
1292      logEncodingMessageError("modify", modifyMsg.getDN(), e);
1293    }
1294
1295    final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
1296    final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
1297        isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
1298
1299    if (isModifyDNMsg)
1300    {
1301      final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
1302      addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
1303      if (modDNMsg.getNewSuperior() != null)
1304      {
1305        addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
1306      }
1307      addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
1308    }
1309    return entry;
1310  }
1311
1312  /**
1313   * Log an encoding message error.
1314   *
1315   * @param messageType
1316   *            String identifying type of message. Should be "add" or "modify".
1317   * @param entryDN
1318   *            DN of original entry
1319   */
1320  private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
1321  {
1322    logger.traceException(exception);
1323    logger.error(LocalizableMessage.raw(
1324        "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
1325        + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
1326  }
1327
1328  private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException
1329  {
1330    if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp))
1331    {
1332      throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS,
1333          NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
1334    }
1335  }
1336
1337  /**
1338   * Create a changelog entry from a set of provided information. This is the part of
1339   * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
1340   */
1341  private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
1342      final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
1343      final String changeInitiatorsName) throws DirectoryException
1344  {
1345    final CSN csn = msg.getCSN();
1346    String dnString;
1347    if (changeNumber > 0)
1348    {
1349      // change number mode
1350      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1351    }
1352    else
1353    {
1354      // Cookie mode
1355      dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1356    }
1357
1358    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1359    final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>();
1360
1361    // Operational standard attributes
1362    addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
1363    addAttributeByType("numSubordinates", "0", userAttrs, opAttrs);
1364    addAttributeByType("hasSubordinates", "false", userAttrs, opAttrs);
1365    addAttributeByType("entryDN", dnString, userAttrs, opAttrs);
1366
1367    // REQUIRED attributes
1368    if (changeNumber > 0)
1369    {
1370      addAttributeByType("changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
1371    }
1372    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
1373    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
1374    final String format = dateFormat.format(new Date(csn.getTime()));
1375    addAttributeByType("changeTime", format, userAttrs, opAttrs);
1376    addAttributeByType("changeType", changeType, userAttrs, opAttrs);
1377    addAttributeByType("targetDN", msg.getDN().toString(), userAttrs, opAttrs);
1378
1379    // NON REQUESTED attributes
1380    addAttributeByType("replicationCSN", csn.toString(), userAttrs, opAttrs);
1381    addAttributeByType("replicaIdentifier", Integer.toString(csn.getServerId()), userAttrs, opAttrs);
1382
1383    if (ldifChanges != null)
1384    {
1385      addAttributeByType("changes", ldifChanges, userAttrs, opAttrs);
1386    }
1387    if (changeInitiatorsName != null)
1388    {
1389      addAttributeByType("changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
1390    }
1391
1392    final String targetUUID = msg.getEntryUUID();
1393    if (targetUUID != null)
1394    {
1395      addAttributeByType("targetEntryUUID", targetUUID, userAttrs, opAttrs);
1396    }
1397    final String cookie2 = cookie != null ? cookie : "";
1398    addAttributeByType("changeLogCookie", cookie2, userAttrs, opAttrs);
1399
1400    final List<RawAttribute> includedAttributes = msg.getEclIncludes();
1401    if (includedAttributes != null && !includedAttributes.isEmpty())
1402    {
1403      final StringBuilder builder = new StringBuilder(256);
1404      for (final RawAttribute includedAttribute : includedAttributes)
1405      {
1406        final String name = includedAttribute.getAttributeType();
1407        for (final ByteString value : includedAttribute.getValues())
1408        {
1409          builder.append(name);
1410          appendLDIFSeparatorAndValue(builder, value);
1411          builder.append('\n');
1412        }
1413      }
1414      final String includedAttributesLDIF = builder.toString();
1415      addAttributeByType("includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
1416    }
1417
1418    return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
1419  }
1420
1421  /**
1422   * Sends the entry if it matches the base, scope and filter of the current search operation.
1423   * It will also send the base changelog entry if it needs to be sent and was not sent before.
1424   *
1425   * @return {@code true} if search should continue, {@code false} otherwise
1426   */
1427  private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, boolean returnECs,
1428                                            String cookie, long changeNumber)
1429      throws DirectoryException
1430  {
1431    if (matchBaseAndScopeAndFilter(searchOp, entry))
1432    {
1433      return searchOp.returnEntry(entry, getControls(returnECs, cookie, changeNumber));
1434    }
1435    // maybe the next entry will match?
1436    return true;
1437  }
1438
1439  /** Indicates if the provided entry matches the filter, base and scope. */
1440  private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
1441  {
1442    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
1443        && searchOp.getFilter().matchesEntry(entry);
1444  }
1445
1446  private static List<Control> getControls(boolean returnECs, String cookie, long changeNumber)
1447  {
1448    final List<Control> controls = new LinkedList<>();
1449    if (cookie != null)
1450    {
1451      controls.add(new EntryChangelogNotificationControl(true, cookie));
1452    }
1453    if (returnECs)
1454    {
1455      controls.add(new EntryChangeNotificationControl(ADD, changeNumber));
1456    }
1457    return controls;
1458  }
1459
1460  /**
1461   * Create and returns the base changelog entry to the underlying search operation.
1462   * <p>
1463   * "initial search" phase must return the base entry immediately.
1464   *
1465   * @return {@code true} if search should continue, {@code false} otherwise
1466   */
1467  private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
1468  {
1469    final DN baseDN = searchOp.getBaseDN();
1470    final SearchFilter filter = searchOp.getFilter();
1471    final SearchScope scope = searchOp.getScope();
1472
1473    if (ChangelogBackend.CHANGELOG_BASE_DN.isInScopeOf(baseDN, scope))
1474    {
1475      final Entry entry = buildBaseChangelogEntry();
1476      if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
1477      {
1478        // Abandon, size limit reached.
1479        return false;
1480      }
1481    }
1482    return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
1483        || !scope.equals(SearchScope.BASE_OBJECT);
1484  }
1485
1486  private Entry buildBaseChangelogEntry() throws DirectoryException
1487  {
1488    final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
1489
1490    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1491    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>();
1492
1493    // We never return the numSubordinates attribute for the base changelog entry
1494    // and there is a very good reason for that:
1495    // - Either we compute it before sending the entries,
1496    // -- then we risk returning more entries if new entries come in after we computed numSubordinates
1497    // --   or we risk returning less entries if purge kicks in      after we computed numSubordinates
1498    // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
1499
1500    addAttributeByName(ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
1501    addAttributeByName(ATTR_SUBSCHEMA_SUBENTRY, DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
1502    addAttributeByName("hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
1503    addAttributeByName("entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
1504    return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
1505  }
1506
1507  private static void addAttribute(final Entry e, final String attrType, final String attrValue)
1508  {
1509    e.addAttribute(Attributes.create(attrType, attrValue), null);
1510  }
1511
1512  private static void addAttributeByType(String attrName, String attrValue,
1513      Map<AttributeType, List<Attribute>> userAttrs,
1514      Map<AttributeType, List<Attribute>> operationalAttrs)
1515  {
1516    addAttribute(attrName, attrValue, userAttrs, operationalAttrs, true);
1517  }
1518
1519  private static void addAttributeByName(String attrName, String attrValue,
1520      Map<AttributeType, List<Attribute>> userAttrs,
1521      Map<AttributeType, List<Attribute>> operationalAttrs)
1522  {
1523    addAttribute(attrName, attrValue, userAttrs, operationalAttrs, false);
1524  }
1525
1526  private static void addAttribute(final String attrName, final String attrValue,
1527      final Map<AttributeType, List<Attribute>> userAttrs,
1528      final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
1529  {
1530    final Attribute a = addByType
1531        ? Attributes.create(getSchema().getAttributeType(attrName), attrValue)
1532        : Attributes.create(attrName, attrValue);
1533    final AttributeType attrType = a.getAttributeDescription().getAttributeType();
1534    final List<Attribute> attrList = Collections.singletonList(a);
1535    if (attrType.isOperational())
1536    {
1537      operationalAttrs.put(attrType, attrList);
1538    }
1539    else
1540    {
1541      userAttrs.put(attrType, attrList);
1542    }
1543  }
1544
1545  /** Describes the current search phase. */
1546  private enum SearchPhase
1547  {
1548    /**
1549     * "Initial search" phase. The "initial search" phase is running
1550     * concurrently. All update notifications are ignored.
1551     */
1552    INITIAL,
1553    /**
1554     * Transitioning from the "initial search" phase to the "persistent search"
1555     * phase. "Initial search" phase has finished reading from the DB. It now
1556     * verifies if any more updates have been persisted to the DB since stopping
1557     * and send them. All update notifications are blocked.
1558     */
1559    TRANSITIONING,
1560    /**
1561     * "Persistent search" phase. "Initial search" phase has completed. All
1562     * update notifications are published.
1563     */
1564    PERSISTENT;
1565  }
1566
1567  /**
1568   * Contains data to ensure that the same change is not sent twice to clients
1569   * because of race conditions between the "initial search" phase and the
1570   * "persistent search" phase.
1571   */
1572  private static class SendEntryData<K extends Comparable<K>>
1573  {
1574    private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL);
1575    private final Object transitioningLock = new Object();
1576    private volatile K lastKeySentByInitialSearch;
1577
1578    private SendEntryData(SearchPhase startPhase)
1579    {
1580      searchPhase.set(startPhase);
1581    }
1582
1583    private void finalizeInitialSearch()
1584    {
1585      searchPhase.set(SearchPhase.PERSISTENT);
1586      synchronized (transitioningLock)
1587      { // initial search phase has completed, release all persistent searches
1588        transitioningLock.notifyAll();
1589      }
1590    }
1591
1592    public void transitioningToPersistentSearchPhase()
1593    {
1594      searchPhase.set(SearchPhase.TRANSITIONING);
1595    }
1596
1597    private void initialSearchSendsEntry(final K key)
1598    {
1599      lastKeySentByInitialSearch = key;
1600    }
1601
1602    private boolean persistentSearchCanSendEntry(K key)
1603    {
1604      final SearchPhase stateValue = searchPhase.get();
1605      switch (stateValue)
1606      {
1607      case INITIAL:
1608        return false;
1609      case TRANSITIONING:
1610        synchronized (transitioningLock)
1611        {
1612          while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
1613          {
1614            // "initial search" phase is over, and is now verifying whether new
1615            // changes have been published to the DB.
1616            // Wait for this check to complete
1617            try
1618            {
1619              transitioningLock.wait();
1620            }
1621            catch (InterruptedException e)
1622            {
1623              Thread.currentThread().interrupt();
1624              // Shutdown must have been called. Stop sending entries.
1625              return false;
1626            }
1627          }
1628        }
1629        return key.compareTo(lastKeySentByInitialSearch) > 0;
1630      case PERSISTENT:
1631        return true;
1632      default:
1633        throw new RuntimeException("Not implemented for " + stateValue);
1634      }
1635    }
1636  }
1637
1638  /** Sends entries to clients for change number searches. */
1639  private static class ChangeNumberEntrySender
1640  {
1641    private final SearchOperation searchOp;
1642    private final boolean containsAdd;
1643    private final boolean returnECs;
1644    private final long lowestChangeNumber;
1645    private final long highestChangeNumber;
1646    private final SendEntryData<Long> sendEntryData;
1647
1648    private ChangeNumberEntrySender(SearchOperation searchOp, PersistentSearchControl psearchControl,
1649                                    SearchPhase startPhase, ChangeNumberRange range)
1650    {
1651      this.searchOp = searchOp;
1652      this.containsAdd = psearchControl != null ? psearchControl.getChangeTypes().contains(ADD) : false;
1653      this.returnECs = psearchControl != null ? psearchControl.getReturnECs() : false;
1654      this.sendEntryData = new SendEntryData<>(startPhase);
1655      this.lowestChangeNumber = range.lowerBound;
1656      this.highestChangeNumber = range.upperBound;
1657    }
1658
1659    /**
1660     * Indicates if provided change number is compatible with last change
1661     * number.
1662     *
1663     * @param changeNumber
1664     *          The change number to test.
1665     * @return {@code true} if and only if the provided change number is in the
1666     *         range of the last change number.
1667     */
1668    boolean changeNumberIsInRange(long changeNumber)
1669    {
1670      return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
1671    }
1672
1673    private void finalizeInitialSearch()
1674    {
1675      sendEntryData.finalizeInitialSearch();
1676    }
1677
1678    private void transitioningToPersistentSearchPhase()
1679    {
1680      sendEntryData.transitioningToPersistentSearchPhase();
1681    }
1682
1683    /**
1684     * @return {@code true} if search should continue, {@code false} otherwise
1685     */
1686    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
1687        MultiDomainServerState cookie) throws DirectoryException
1688    {
1689      final DN baseDN = cnIndexRecord.getBaseDN();
1690      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
1691      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
1692      return sendEntryIfMatches(searchOp, entry, false, null, cnIndexRecord.getChangeNumber());
1693    }
1694
1695    private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
1696    {
1697      if (containsAdd && sendEntryData.persistentSearchCanSendEntry(changeNumber))
1698      {
1699        sendEntryIfMatches(searchOp, entry, returnECs, null, changeNumber);
1700      }
1701    }
1702  }
1703
1704  /** Sends entries to clients for cookie-based searches. */
1705  private static class CookieEntrySender {
1706    private final SearchOperation searchOp;
1707    private final boolean containsAdd;
1708    private final boolean returnECs;
1709    private final SearchPhase startPhase;
1710    private final Set<DN> excludedBaseDNs;
1711    private final MultiDomainServerState cookie;
1712    private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData =
1713        new ConcurrentSkipListMap<>();
1714
1715    private CookieEntrySender(SearchOperation searchOp, PersistentSearchControl psearchControl,
1716                              SearchPhase startPhase, MultiDomainServerState cookie, Set<DN> excludedBaseDNs)
1717    {
1718      this.searchOp = searchOp;
1719      this.containsAdd = psearchControl != null ? psearchControl.getChangeTypes().contains(ADD) : false;
1720      this.returnECs = psearchControl != null ? psearchControl.getReturnECs() : false;
1721      this.startPhase = startPhase;
1722      this.cookie = cookie;
1723      this.excludedBaseDNs = excludedBaseDNs;
1724    }
1725
1726    private void finalizeInitialSearch()
1727    {
1728      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1729      {
1730        sendEntryData.finalizeInitialSearch();
1731      }
1732    }
1733
1734    private void transitioningToPersistentSearchPhase()
1735    {
1736      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1737      {
1738        sendEntryData.transitioningToPersistentSearchPhase();
1739      }
1740    }
1741
1742    private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
1743    {
1744      final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId());
1745      SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
1746      if (data == null)
1747      {
1748        final SendEntryData<CSN> newData = new SendEntryData<>(startPhase);
1749        data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
1750        return data == null ? newData : data;
1751      }
1752      return data;
1753    }
1754
1755    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
1756    {
1757      final CSN csn = updateMsg.getCSN();
1758      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1759      sendEntryData.initialSearchSendsEntry(csn);
1760      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1761      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1762      return sendEntryIfMatches(searchOp, entry, false, cookieString, -1);
1763    }
1764
1765    private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
1766        throws DirectoryException
1767    {
1768      if (!containsAdd)
1769      {
1770        return;
1771      }
1772      final CSN csn = updateMsg.getCSN();
1773      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1774      if (sendEntryData.persistentSearchCanSendEntry(csn))
1775      {
1776        // multi threaded case: wait for the "initial search" phase to set the cookie
1777        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1778        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1779        // FIXME JNR use this instead of previous line:
1780        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
1781        sendEntryIfMatches(searchOp, cookieEntry, returnECs, cookieString, -1);
1782      }
1783    }
1784
1785    private String updateCookie(DN baseDN, final CSN csn)
1786    {
1787      synchronized (cookie)
1788      { // forbid concurrent updates to the cookie
1789        cookie.update(baseDN, csn);
1790        return cookie.toString();
1791      }
1792    }
1793  }
1794}