001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2014-2016 ForgeRock AS. 015 */ 016package org.opends.server.backends.pdb; 017 018import static com.persistit.Transaction.CommitPolicy.*; 019import static java.util.Arrays.*; 020 021import static org.opends.messages.BackendMessages.*; 022import static org.opends.messages.UtilityMessages.*; 023import static org.opends.server.backends.pluggable.spi.StorageUtils.*; 024import static org.opends.server.util.StaticUtils.*; 025 026import java.io.Closeable; 027import java.io.File; 028import java.io.FileFilter; 029import java.io.IOException; 030import java.nio.file.Files; 031import java.nio.file.Path; 032import java.nio.file.Paths; 033import java.rmi.RemoteException; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.List; 038import java.util.ListIterator; 039import java.util.Map; 040import java.util.NoSuchElementException; 041import java.util.Objects; 042import java.util.Set; 043 044import org.forgerock.i18n.LocalizableMessage; 045import org.forgerock.i18n.slf4j.LocalizedLogger; 046import org.forgerock.opendj.config.server.ConfigChangeResult; 047import org.forgerock.opendj.config.server.ConfigException; 048import org.forgerock.opendj.config.server.ConfigurationChangeListener; 049import org.forgerock.opendj.ldap.ByteSequence; 050import org.forgerock.opendj.ldap.ByteString; 051import org.forgerock.opendj.server.config.server.PDBBackendCfg; 052import org.forgerock.util.Reject; 053import org.opends.server.api.Backupable; 054import org.opends.server.api.DiskSpaceMonitorHandler; 055import org.opends.server.backends.pluggable.spi.AccessMode; 056import org.opends.server.backends.pluggable.spi.Cursor; 057import org.opends.server.backends.pluggable.spi.Importer; 058import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; 059import org.opends.server.backends.pluggable.spi.ReadOperation; 060import org.opends.server.backends.pluggable.spi.SequentialCursor; 061import org.opends.server.backends.pluggable.spi.Storage; 062import org.opends.server.backends.pluggable.spi.StorageInUseException; 063import org.opends.server.backends.pluggable.spi.StorageRuntimeException; 064import org.opends.server.backends.pluggable.spi.StorageStatus; 065import org.opends.server.backends.pluggable.spi.StorageUtils; 066import org.opends.server.backends.pluggable.spi.TreeName; 067import org.opends.server.backends.pluggable.spi.UpdateFunction; 068import org.opends.server.backends.pluggable.spi.WriteOperation; 069import org.opends.server.backends.pluggable.spi.WriteableTransaction; 070import org.opends.server.core.DirectoryServer; 071import org.opends.server.core.MemoryQuota; 072import org.opends.server.core.ServerContext; 073import org.opends.server.extensions.DiskSpaceMonitor; 074import org.opends.server.types.BackupConfig; 075import org.opends.server.types.BackupDirectory; 076import org.opends.server.types.DirectoryException; 077import org.opends.server.types.RestoreConfig; 078import org.opends.server.util.BackupManager; 079 080import com.persistit.Configuration; 081import com.persistit.Configuration.BufferPoolConfiguration; 082import com.persistit.Exchange; 083import com.persistit.Key; 084import com.persistit.Persistit; 085import com.persistit.Transaction; 086import com.persistit.Transaction.CommitPolicy; 087import com.persistit.Value; 088import com.persistit.Volume; 089import com.persistit.VolumeSpecification; 090import com.persistit.exception.InUseException; 091import com.persistit.exception.PersistitException; 092import com.persistit.exception.RollbackException; 093import com.persistit.exception.TreeNotFoundException; 094 095/** PersistIt database implementation of the {@link Storage} engine. */ 096public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>, 097 DiskSpaceMonitorHandler 098{ 099 private static final int IMPORT_DB_CACHE_SIZE = 32 * MB; 100 101 private static final double MAX_SLEEP_ON_RETRY_MS = 50.0; 102 private static final String VOLUME_NAME = "dj"; 103 private static final String JOURNAL_NAME = VOLUME_NAME + "_journal"; 104 /** The buffer / page size used by the PersistIt storage. */ 105 private static final int BUFFER_SIZE = 16 * 1024; 106 107 /** PersistIt implementation of the {@link Cursor} interface. */ 108 private final class CursorImpl implements Cursor<ByteString, ByteString> 109 { 110 private ByteString currentKey; 111 private ByteString currentValue; 112 private final Exchange exchange; 113 114 private CursorImpl(final Exchange exchange) 115 { 116 this.exchange = exchange; 117 } 118 119 @Override 120 public void close() 121 { 122 // Release immediately because this exchange did not come from the txn cache 123 releaseExchange(exchange); 124 } 125 126 @Override 127 public boolean isDefined() 128 { 129 return exchange.getValue().isDefined(); 130 } 131 132 @Override 133 public ByteString getKey() 134 { 135 if (currentKey == null) 136 { 137 throwIfUndefined(); 138 currentKey = ByteString.wrap(exchange.getKey().reset().decodeByteArray()); 139 } 140 return currentKey; 141 } 142 143 @Override 144 public ByteString getValue() 145 { 146 if (currentValue == null) 147 { 148 throwIfUndefined(); 149 currentValue = ByteString.wrap(exchange.getValue().getByteArray()); 150 } 151 return currentValue; 152 } 153 154 @Override 155 public boolean next() 156 { 157 clearCurrentKeyAndValue(); 158 try 159 { 160 return exchange.next(); 161 } 162 catch (final PersistitException e) 163 { 164 throw new StorageRuntimeException(e); 165 } 166 } 167 168 @Override 169 public void delete() 170 { 171 throwIfUndefined(); 172 try 173 { 174 exchange.remove(); 175 } 176 catch (final PersistitException | RollbackException e) 177 { 178 throw new StorageRuntimeException(e); 179 } 180 } 181 182 @Override 183 public boolean positionToKey(final ByteSequence key) 184 { 185 clearCurrentKeyAndValue(); 186 bytesToKey(exchange.getKey(), key); 187 try 188 { 189 exchange.fetch(); 190 return exchange.getValue().isDefined(); 191 } 192 catch (final PersistitException e) 193 { 194 throw new StorageRuntimeException(e); 195 } 196 } 197 198 @Override 199 public boolean positionToKeyOrNext(final ByteSequence key) 200 { 201 clearCurrentKeyAndValue(); 202 bytesToKey(exchange.getKey(), key); 203 try 204 { 205 exchange.fetch(); 206 return exchange.getValue().isDefined() || exchange.next(); 207 } 208 catch (final PersistitException e) 209 { 210 throw new StorageRuntimeException(e); 211 } 212 } 213 214 @Override 215 public boolean positionToIndex(int index) 216 { 217 // There doesn't seem to be a way to optimize this using Persistit. 218 clearCurrentKeyAndValue(); 219 exchange.getKey().to(Key.BEFORE); 220 try 221 { 222 for (int i = 0; i <= index; i++) 223 { 224 if (!exchange.next()) 225 { 226 return false; 227 } 228 } 229 return true; 230 } 231 catch (final PersistitException e) 232 { 233 throw new StorageRuntimeException(e); 234 } 235 } 236 237 @Override 238 public boolean positionToLastKey() 239 { 240 clearCurrentKeyAndValue(); 241 exchange.getKey().to(Key.AFTER); 242 try 243 { 244 return exchange.previous(); 245 } 246 catch (final PersistitException e) 247 { 248 throw new StorageRuntimeException(e); 249 } 250 } 251 252 private void clearCurrentKeyAndValue() 253 { 254 currentKey = null; 255 currentValue = null; 256 } 257 258 private void throwIfUndefined() 259 { 260 if (!isDefined()) 261 { 262 throw new NoSuchElementException(); 263 } 264 } 265 } 266 267 /** PersistIt implementation of the {@link Importer} interface. */ 268 private final class ImporterImpl implements Importer 269 { 270 private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() 271 { 272 @Override 273 protected Map<TreeName, Exchange> initialValue() 274 { 275 return new HashMap<>(); 276 } 277 }; 278 279 @Override 280 public void close() 281 { 282 PDBStorage.this.close(); 283 } 284 285 @Override 286 public void clearTree(final TreeName treeName) 287 { 288 final Transaction txn = db.getTransaction(); 289 deleteTree(txn, treeName); 290 createTree(txn, treeName); 291 } 292 293 private void createTree(final Transaction txn, final TreeName treeName) 294 { 295 try 296 { 297 txn.begin(); 298 getNewExchange(treeName, true); 299 txn.commit(commitPolicy); 300 } 301 catch (PersistitException e) 302 { 303 throw new StorageRuntimeException(e); 304 } 305 finally 306 { 307 txn.end(); 308 } 309 } 310 311 private void deleteTree(Transaction txn, final TreeName treeName) 312 { 313 Exchange ex = null; 314 try 315 { 316 txn.begin(); 317 ex = getNewExchange(treeName, true); 318 ex.removeTree(); 319 txn.commit(commitPolicy); 320 } 321 catch (PersistitException e) 322 { 323 throw new StorageRuntimeException(e); 324 } 325 finally 326 { 327 txn.end(); 328 } 329 } 330 331 @Override 332 public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) 333 { 334 try 335 { 336 final Exchange ex = getExchangeFromCache(treeName); 337 bytesToKey(ex.getKey(), key); 338 bytesToValue(ex.getValue(), value); 339 ex.store(); 340 } 341 catch (final Exception e) 342 { 343 throw new StorageRuntimeException(e); 344 } 345 } 346 347 @Override 348 public ByteString read(final TreeName treeName, final ByteSequence key) 349 { 350 try 351 { 352 final Exchange ex = getExchangeFromCache(treeName); 353 bytesToKey(ex.getKey(), key); 354 ex.fetch(); 355 return valueToBytes(ex.getValue()); 356 } 357 catch (final PersistitException e) 358 { 359 throw new StorageRuntimeException(e); 360 } 361 } 362 363 private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException 364 { 365 Map<TreeName, Exchange> threadExchanges = exchanges.get(); 366 Exchange exchange = threadExchanges.get(treeName); 367 if (exchange == null) 368 { 369 exchange = getNewExchange(treeName, false); 370 threadExchanges.put(treeName, exchange); 371 } 372 return exchange; 373 } 374 375 @Override 376 public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName) 377 { 378 try 379 { 380 return new CursorImpl(getNewExchange(treeName, false)); 381 } 382 catch (PersistitException e) 383 { 384 throw new StorageRuntimeException(e); 385 } 386 } 387 } 388 389 /** Common interface for internal WriteableTransaction implementations. */ 390 private interface StorageImpl extends WriteableTransaction, Closeable { 391 } 392 393 /** PersistIt implementation of the {@link WriteableTransaction} interface. */ 394 private final class WriteableStorageImpl implements StorageImpl 395 { 396 private static final String DUMMY_RECORD = "_DUMMY_RECORD_"; 397 private final Map<TreeName, Exchange> exchanges = new HashMap<>(); 398 399 @Override 400 public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) 401 { 402 try 403 { 404 final Exchange ex = getExchangeFromCache(treeName); 405 bytesToKey(ex.getKey(), key); 406 bytesToValue(ex.getValue(), value); 407 ex.store(); 408 } 409 catch (final PersistitException | RollbackException e) 410 { 411 throw new StorageRuntimeException(e); 412 } 413 } 414 415 @Override 416 public boolean delete(final TreeName treeName, final ByteSequence key) 417 { 418 try 419 { 420 final Exchange ex = getExchangeFromCache(treeName); 421 bytesToKey(ex.getKey(), key); 422 return ex.remove(); 423 } 424 catch (final PersistitException | RollbackException e) 425 { 426 throw new StorageRuntimeException(e); 427 } 428 } 429 430 @Override 431 public void deleteTree(final TreeName treeName) 432 { 433 Exchange ex = null; 434 try 435 { 436 ex = getExchangeFromCache(treeName); 437 ex.removeTree(); 438 } 439 catch (final PersistitException | RollbackException e) 440 { 441 throw new StorageRuntimeException(e); 442 } 443 finally 444 { 445 exchanges.values().remove(ex); 446 releaseExchange(ex); 447 } 448 } 449 450 @Override 451 public long getRecordCount(TreeName treeName) 452 { 453 // FIXME: is there a better/quicker way to do this? 454 try(final Cursor<?, ?> cursor = openCursor(treeName)) 455 { 456 long count = 0; 457 while (cursor.next()) 458 { 459 count++; 460 } 461 return count; 462 } 463 } 464 465 @Override 466 public Cursor<ByteString, ByteString> openCursor(final TreeName treeName) 467 { 468 try 469 { 470 /* 471 * Acquire a new exchange for the cursor rather than using a cached 472 * exchange in order to avoid reentrant accesses to the same tree 473 * interfering with the cursor position. 474 */ 475 return new CursorImpl(getNewExchange(treeName, false)); 476 } 477 catch (final PersistitException | RollbackException e) 478 { 479 throw new StorageRuntimeException(e); 480 } 481 } 482 483 @Override 484 public void openTree(final TreeName treeName, boolean createOnDemand) 485 { 486 if (createOnDemand) 487 { 488 openCreateTree(treeName); 489 } 490 else 491 { 492 try 493 { 494 getExchangeFromCache(treeName); 495 } 496 catch (final PersistitException | RollbackException e) 497 { 498 throw new StorageRuntimeException(e); 499 } 500 } 501 } 502 503 @Override 504 public ByteString read(final TreeName treeName, final ByteSequence key) 505 { 506 try 507 { 508 final Exchange ex = getExchangeFromCache(treeName); 509 bytesToKey(ex.getKey(), key); 510 ex.fetch(); 511 return valueToBytes(ex.getValue()); 512 } 513 catch (final PersistitException | RollbackException e) 514 { 515 throw new StorageRuntimeException(e); 516 } 517 } 518 519 @Override 520 public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f) 521 { 522 try 523 { 524 final Exchange ex = getExchangeFromCache(treeName); 525 bytesToKey(ex.getKey(), key); 526 ex.fetch(); 527 final ByteSequence oldValue = valueToBytes(ex.getValue()); 528 final ByteSequence newValue = f.computeNewValue(oldValue); 529 if (!Objects.equals(newValue, oldValue)) 530 { 531 if (newValue == null) 532 { 533 ex.remove(); 534 } 535 else 536 { 537 ex.getValue().clear().putByteArray(newValue.toByteArray()); 538 ex.store(); 539 } 540 return true; 541 } 542 return false; 543 } 544 catch (final PersistitException | RollbackException e) 545 { 546 throw new StorageRuntimeException(e); 547 } 548 } 549 550 private void openCreateTree(final TreeName treeName) 551 { 552 Exchange ex = null; 553 try 554 { 555 ex = getNewExchange(treeName, true); 556 // Work around a problem with forced shutdown right after tree creation. 557 // Tree operations are not part of the journal, so force a couple operations to be able to recover. 558 ByteString dummyKey = ByteString.valueOfUtf8(DUMMY_RECORD); 559 put(treeName, dummyKey, ByteString.empty()); 560 delete(treeName, dummyKey); 561 } 562 catch (final PersistitException | RollbackException e) 563 { 564 throw new StorageRuntimeException(e); 565 } 566 finally 567 { 568 releaseExchange(ex); 569 } 570 } 571 572 private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException 573 { 574 Exchange exchange = exchanges.get(treeName); 575 if (exchange == null) 576 { 577 exchange = getNewExchange(treeName, false); 578 exchanges.put(treeName, exchange); 579 } 580 return exchange; 581 } 582 583 @Override 584 public void close() 585 { 586 for (final Exchange ex : exchanges.values()) 587 { 588 releaseExchange(ex); 589 } 590 exchanges.clear(); 591 } 592 } 593 594 /** PersistIt read-only implementation of {@link StorageImpl} interface. */ 595 private final class ReadOnlyStorageImpl implements StorageImpl { 596 private final WriteableStorageImpl delegate; 597 598 ReadOnlyStorageImpl(WriteableStorageImpl delegate) 599 { 600 this.delegate = delegate; 601 } 602 603 @Override 604 public ByteString read(TreeName treeName, ByteSequence key) 605 { 606 return delegate.read(treeName, key); 607 } 608 609 @Override 610 public Cursor<ByteString, ByteString> openCursor(TreeName treeName) 611 { 612 return delegate.openCursor(treeName); 613 } 614 615 @Override 616 public long getRecordCount(TreeName treeName) 617 { 618 return delegate.getRecordCount(treeName); 619 } 620 621 @Override 622 public void openTree(TreeName treeName, boolean createOnDemand) 623 { 624 if (createOnDemand) 625 { 626 throw new ReadOnlyStorageException(); 627 } 628 Exchange ex = null; 629 try 630 { 631 ex = getNewExchange(treeName, false); 632 } 633 catch (final TreeNotFoundException e) 634 { 635 // ignore missing trees. 636 } 637 catch (final PersistitException | RollbackException e) 638 { 639 throw new StorageRuntimeException(e); 640 } 641 finally 642 { 643 releaseExchange(ex); 644 } 645 } 646 647 @Override 648 public void close() 649 { 650 delegate.close(); 651 } 652 653 @Override 654 public void deleteTree(TreeName name) 655 { 656 throw new ReadOnlyStorageException(); 657 } 658 659 @Override 660 public void put(TreeName treeName, ByteSequence key, ByteSequence value) 661 { 662 throw new ReadOnlyStorageException(); 663 } 664 665 @Override 666 public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f) 667 { 668 throw new ReadOnlyStorageException(); 669 } 670 671 @Override 672 public boolean delete(TreeName treeName, ByteSequence key) 673 { 674 throw new ReadOnlyStorageException(); 675 } 676 } 677 678 Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException 679 { 680 final Exchange ex = db.getExchange(volume, treeName.toString(), create); 681 ex.setMaximumValueSize(Value.MAXIMUM_SIZE); 682 return ex; 683 } 684 685 void releaseExchange(Exchange ex) 686 { 687 // Don't keep exchanges with enlarged value - let them be GC'd. 688 // This is also done internally by Persistit in TransactionPlayer line 197. 689 if (ex.getValue().getEncodedBytes().length < Value.DEFAULT_MAXIMUM_SIZE) 690 { 691 db.releaseExchange(ex); 692 } 693 } 694 695 private StorageImpl newStorageImpl() { 696 final WriteableStorageImpl writeableStorage = new WriteableStorageImpl(); 697 return accessMode.isWriteable() ? writeableStorage : new ReadOnlyStorageImpl(writeableStorage); 698 } 699 700 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 701 702 private final ServerContext serverContext; 703 private final File backendDirectory; 704 private CommitPolicy commitPolicy; 705 private AccessMode accessMode; 706 private Persistit db; 707 private Volume volume; 708 private PDBBackendCfg config; 709 private DiskSpaceMonitor diskMonitor; 710 private PDBMonitor monitor; 711 private MemoryQuota memQuota; 712 private StorageStatus storageStatus = StorageStatus.working(); 713 714 /** 715 * Creates a new persistit storage with the provided configuration. 716 * 717 * @param cfg 718 * The configuration. 719 * @param serverContext 720 * This server instance context 721 * @throws ConfigException if memory cannot be reserved 722 */ 723 // FIXME: should be package private once importer is decoupled. 724 public PDBStorage(final PDBBackendCfg cfg, ServerContext serverContext) throws ConfigException 725 { 726 this.serverContext = serverContext; 727 backendDirectory = getBackendDirectory(cfg); 728 config = cfg; 729 cfg.addPDBChangeListener(this); 730 } 731 732 private Configuration buildImportConfiguration() 733 { 734 final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE); 735 getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE); 736 commitPolicy = SOFT; 737 return dbCfg; 738 } 739 740 private Configuration buildConfiguration(AccessMode accessMode) 741 { 742 this.accessMode = accessMode; 743 744 final Configuration dbCfg = new Configuration(); 745 dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath()); 746 dbCfg.setJournalPath(new File(backendDirectory, JOURNAL_NAME).getPath()); 747 dbCfg.setCheckpointInterval(config.getDBCheckpointerWakeupInterval()); 748 // Volume is opened read write because recovery will fail if opened read-only 749 dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null, 750 BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false))); 751 final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg); 752 bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE); 753 754 diskMonitor = serverContext.getDiskSpaceMonitor(); 755 memQuota = serverContext.getMemoryQuota(); 756 if (config.getDBCacheSize() > 0) 757 { 758 bufferPoolCfg.setMaximumMemory(config.getDBCacheSize()); 759 memQuota.acquireMemory(config.getDBCacheSize()); 760 } 761 else 762 { 763 bufferPoolCfg.setMaximumMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 764 memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 765 } 766 commitPolicy = config.isDBTxnNoSync() ? SOFT : GROUP; 767 dbCfg.setJmxEnabled(false); 768 return dbCfg; 769 } 770 771 @Override 772 public void close() 773 { 774 if (db != null) 775 { 776 DirectoryServer.deregisterMonitorProvider(monitor); 777 monitor = null; 778 try 779 { 780 db.close(); 781 db = null; 782 } 783 catch (final PersistitException e) 784 { 785 throw new IllegalStateException(e); 786 } 787 } 788 if (config.getDBCacheSize() > 0) 789 { 790 memQuota.releaseMemory(config.getDBCacheSize()); 791 } 792 else 793 { 794 memQuota.releaseMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 795 } 796 config.removePDBChangeListener(this); 797 diskMonitor.deregisterMonitoredDirectory(getDirectory(), this); 798 } 799 800 private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg) 801 { 802 return dbCfg.getBufferPoolMap().get(BUFFER_SIZE); 803 } 804 805 @Override 806 public void open(AccessMode accessMode) throws ConfigException, StorageRuntimeException 807 { 808 Reject.ifNull(accessMode, "accessMode must not be null"); 809 open0(buildConfiguration(accessMode)); 810 } 811 812 private void open0(final Configuration dbCfg) throws ConfigException 813 { 814 setupStorageFiles(backendDirectory, config.getDBDirectoryPermissions(), config.dn()); 815 try 816 { 817 if (db != null) 818 { 819 throw new IllegalStateException( 820 "Database is already open, either the backend is enabled or an import is currently running."); 821 } 822 db = new Persistit(dbCfg); 823 824 final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap()); 825 final long totalSize = bufferCount * BUFFER_SIZE / 1024; 826 logger.info(NOTE_PDB_MEMORY_CFG, config.getBackendId(), bufferCount, BUFFER_SIZE, totalSize); 827 828 db.initialize(); 829 volume = db.loadVolume(VOLUME_NAME); 830 monitor = new PDBMonitor(config.getBackendId() + " PDB Database", db); 831 DirectoryServer.registerMonitorProvider(monitor); 832 } 833 catch(final InUseException e) { 834 throw new StorageInUseException(e); 835 } 836 catch (final PersistitException | RollbackException e) 837 { 838 throw new StorageRuntimeException(e); 839 } 840 registerMonitoredDirectory(config); 841 } 842 843 @Override 844 public <T> T read(final ReadOperation<T> operation) throws Exception 845 { 846 // This check may be unnecessary for PDB, but it will help us detect bad business logic 847 // in the pluggable backend that would cause problems for JE. 848 final Transaction txn = db.getTransaction(); 849 for (;;) 850 { 851 txn.begin(); 852 try 853 { 854 try (final StorageImpl storageImpl = newStorageImpl()) 855 { 856 final T result = operation.run(storageImpl); 857 txn.commit(commitPolicy); 858 return result; 859 } 860 catch (final StorageRuntimeException e) 861 { 862 if (e.getCause() != null) 863 { 864 throw (Exception) e.getCause(); 865 } 866 throw e; 867 } 868 } 869 catch (final RollbackException e) 870 { 871 // retry 872 } 873 catch (final Exception e) 874 { 875 txn.rollback(); 876 throw e; 877 } 878 finally 879 { 880 txn.end(); 881 } 882 } 883 } 884 885 @Override 886 public Importer startImport() throws ConfigException, StorageRuntimeException 887 { 888 open0(buildImportConfiguration()); 889 return new ImporterImpl(); 890 } 891 892 @Override 893 public void write(final WriteOperation operation) throws Exception 894 { 895 final Transaction txn = db.getTransaction(); 896 for (;;) 897 { 898 txn.begin(); 899 try 900 { 901 try (final StorageImpl storageImpl = newStorageImpl()) 902 { 903 operation.run(storageImpl); 904 txn.commit(commitPolicy); 905 return; 906 } 907 catch (final StorageRuntimeException e) 908 { 909 if (e.getCause() != null) 910 { 911 throw (Exception) e.getCause(); 912 } 913 throw e; 914 } 915 } 916 catch (final RollbackException e) 917 { 918 // retry after random sleep (reduces transactions collision. Drawback: increased latency) 919 Thread.sleep((long) (Math.random() * MAX_SLEEP_ON_RETRY_MS)); 920 } 921 catch (final Exception e) 922 { 923 txn.rollback(); 924 throw e; 925 } 926 finally 927 { 928 txn.end(); 929 } 930 } 931 } 932 933 @Override 934 public boolean supportsBackupAndRestore() 935 { 936 return true; 937 } 938 939 @Override 940 public File getDirectory() 941 { 942 return getBackendDirectory(config); 943 } 944 945 private static File getBackendDirectory(PDBBackendCfg cfg) 946 { 947 return getDBDirectory(cfg.getDBDirectory(), cfg.getBackendId()); 948 } 949 950 @Override 951 public ListIterator<Path> getFilesToBackup() throws DirectoryException 952 { 953 try 954 { 955 if (db == null) 956 { 957 return getFilesToBackupWhenOffline(); 958 } 959 960 // FIXME: use full programmatic way of retrieving backup file once available in persistIt 961 // When requesting files to backup, append only mode must also be set (-a) otherwise it will be ended 962 // by PersistIt and performing backup may corrupt the DB. 963 String filesAsString = db.getManagement().execute("backup -a -f"); 964 String[] allFiles = filesAsString.split("[\r\n]+"); 965 final List<Path> files = new ArrayList<>(); 966 for (String file : allFiles) 967 { 968 files.add(Paths.get(file)); 969 } 970 return files.listIterator(); 971 } 972 catch (Exception e) 973 { 974 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 975 ERR_BACKEND_LIST_FILES_TO_BACKUP.get(config.getBackendId(), stackTraceToSingleLineString(e))); 976 } 977 } 978 979 /** Filter to retrieve the database files to backup. */ 980 private static final FileFilter BACKUP_FILES_FILTER = new FileFilter() 981 { 982 @Override 983 public boolean accept(File file) 984 { 985 String name = file.getName(); 986 return VOLUME_NAME.equals(name) || name.matches(JOURNAL_NAME + "\\.\\d+$"); 987 } 988 }; 989 990 /** 991 * Returns the list of files to backup when there is no open database. 992 * <p> 993 * It is not possible to rely on the database returning the files, so the files must be retrieved 994 * from a file filter. 995 */ 996 private ListIterator<Path> getFilesToBackupWhenOffline() throws DirectoryException 997 { 998 return BackupManager.getFiles(getDirectory(), BACKUP_FILES_FILTER, config.getBackendId()).listIterator(); 999 } 1000 1001 @Override 1002 public Path beforeRestore() throws DirectoryException 1003 { 1004 return null; 1005 } 1006 1007 @Override 1008 public boolean isDirectRestore() 1009 { 1010 // restore is done in an intermediate directory 1011 return false; 1012 } 1013 1014 @Override 1015 public void afterRestore(Path restoreDirectory, Path saveDirectory) throws DirectoryException 1016 { 1017 // intermediate directory content is moved to database directory 1018 File targetDirectory = getDirectory(); 1019 recursiveDelete(targetDirectory); 1020 try 1021 { 1022 Files.move(restoreDirectory, targetDirectory.toPath()); 1023 } 1024 catch(IOException e) 1025 { 1026 LocalizableMessage msg = ERR_CANNOT_RENAME_RESTORE_DIRECTORY.get(restoreDirectory, targetDirectory.getPath()); 1027 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), msg); 1028 } 1029 } 1030 1031 /** 1032 * Switch the database in append only mode. 1033 * <p> 1034 * This is a mandatory operation before performing a backup. 1035 */ 1036 private void switchToAppendOnlyMode() throws DirectoryException 1037 { 1038 try 1039 { 1040 // FIXME: use full programmatic way of switching to this mode once available in persistIt 1041 db.getManagement().execute("backup -a -c"); 1042 } 1043 catch (RemoteException e) 1044 { 1045 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 1046 ERR_BACKEND_SWITCH_TO_APPEND_MODE.get(config.getBackendId(), stackTraceToSingleLineString(e))); 1047 } 1048 } 1049 1050 /** 1051 * Terminate the append only mode of the database. 1052 * <p> 1053 * This should be called only when database was previously switched to append only mode. 1054 */ 1055 private void endAppendOnlyMode() throws DirectoryException 1056 { 1057 try 1058 { 1059 // FIXME: use full programmatic way of ending append mode once available in persistIt 1060 db.getManagement().execute("backup -e"); 1061 } 1062 catch (RemoteException e) 1063 { 1064 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 1065 ERR_BACKEND_END_APPEND_MODE.get(config.getBackendId(), stackTraceToSingleLineString(e))); 1066 } 1067 } 1068 1069 @Override 1070 public void createBackup(BackupConfig backupConfig) throws DirectoryException 1071 { 1072 if (db != null) 1073 { 1074 switchToAppendOnlyMode(); 1075 } 1076 try 1077 { 1078 new BackupManager(config.getBackendId()).createBackup(this, backupConfig); 1079 } 1080 finally 1081 { 1082 if (db != null) 1083 { 1084 endAppendOnlyMode(); 1085 } 1086 } 1087 } 1088 1089 @Override 1090 public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException 1091 { 1092 new BackupManager(config.getBackendId()).removeBackup(backupDirectory, backupID); 1093 } 1094 1095 @Override 1096 public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException 1097 { 1098 new BackupManager(config.getBackendId()).restoreBackup(this, restoreConfig); 1099 } 1100 1101 @Override 1102 public Set<TreeName> listTrees() 1103 { 1104 try 1105 { 1106 String[] treeNames = volume.getTreeNames(); 1107 final Set<TreeName> results = new HashSet<>(treeNames.length); 1108 for (String treeName : treeNames) 1109 { 1110 if (!treeName.equals("_classIndex")) 1111 { 1112 results.add(TreeName.valueOf(treeName)); 1113 } 1114 } 1115 return results; 1116 } 1117 catch (PersistitException e) 1118 { 1119 throw new StorageRuntimeException(e); 1120 } 1121 } 1122 1123 /** 1124 * TODO: it would be nice to use the low-level key/value APIs. They seem quite 1125 * inefficient at the moment for simple byte arrays. 1126 */ 1127 private static Key bytesToKey(final Key key, final ByteSequence bytes) 1128 { 1129 final byte[] tmp = bytes.toByteArray(); 1130 return key.clear().appendByteArray(tmp, 0, tmp.length); 1131 } 1132 1133 private static Value bytesToValue(final Value value, final ByteSequence bytes) 1134 { 1135 value.clear().putByteArray(bytes.toByteArray()); 1136 return value; 1137 } 1138 1139 private static ByteString valueToBytes(final Value value) 1140 { 1141 if (value.isDefined()) 1142 { 1143 return ByteString.wrap(value.getByteArray()); 1144 } 1145 return null; 1146 } 1147 1148 @Override 1149 public boolean isConfigurationChangeAcceptable(PDBBackendCfg newCfg, 1150 List<LocalizableMessage> unacceptableReasons) 1151 { 1152 long newSize = computeSize(newCfg); 1153 long oldSize = computeSize(config); 1154 return (newSize <= oldSize || memQuota.isMemoryAvailable(newSize - oldSize)) 1155 && checkConfigurationDirectories(newCfg, unacceptableReasons); 1156 } 1157 1158 private long computeSize(PDBBackendCfg cfg) 1159 { 1160 return cfg.getDBCacheSize() > 0 ? cfg.getDBCacheSize() : memQuota.memPercentToBytes(cfg.getDBCachePercent()); 1161 } 1162 1163 /** 1164 * Checks newly created backend has a valid configuration. 1165 * @param cfg the new configuration 1166 * @param unacceptableReasons the list of accumulated errors and their messages 1167 * @param context the server context 1168 * @return true if newly created backend has a valid configuration 1169 */ 1170 static boolean isConfigurationAcceptable(PDBBackendCfg cfg, List<LocalizableMessage> unacceptableReasons, 1171 ServerContext context) 1172 { 1173 if (context != null) 1174 { 1175 MemoryQuota memQuota = context.getMemoryQuota(); 1176 if (cfg.getDBCacheSize() > 0 && !memQuota.isMemoryAvailable(cfg.getDBCacheSize())) 1177 { 1178 unacceptableReasons.add(ERR_BACKEND_CONFIG_CACHE_SIZE_GREATER_THAN_JVM_HEAP.get( 1179 cfg.getDBCacheSize(), memQuota.getAvailableMemory())); 1180 return false; 1181 } 1182 else if (!memQuota.isMemoryAvailable(memQuota.memPercentToBytes(cfg.getDBCachePercent()))) 1183 { 1184 unacceptableReasons.add(ERR_BACKEND_CONFIG_CACHE_PERCENT_GREATER_THAN_JVM_HEAP.get( 1185 cfg.getDBCachePercent(), memQuota.memBytesToPercent(memQuota.getAvailableMemory()))); 1186 return false; 1187 } 1188 } 1189 return checkConfigurationDirectories(cfg, unacceptableReasons); 1190 } 1191 1192 private static boolean checkConfigurationDirectories(PDBBackendCfg cfg, 1193 List<LocalizableMessage> unacceptableReasons) 1194 { 1195 final ConfigChangeResult ccr = new ConfigChangeResult(); 1196 File newBackendDirectory = getBackendDirectory(cfg); 1197 1198 checkDBDirExistsOrCanCreate(newBackendDirectory, ccr, true); 1199 checkDBDirPermissions(cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1200 if (!ccr.getMessages().isEmpty()) 1201 { 1202 unacceptableReasons.addAll(ccr.getMessages()); 1203 return false; 1204 } 1205 return true; 1206 } 1207 1208 @Override 1209 public ConfigChangeResult applyConfigurationChange(PDBBackendCfg cfg) 1210 { 1211 final ConfigChangeResult ccr = new ConfigChangeResult(); 1212 1213 try 1214 { 1215 File newBackendDirectory = getBackendDirectory(cfg); 1216 1217 // Create the directory if it doesn't exist. 1218 if(!cfg.getDBDirectory().equals(config.getDBDirectory())) 1219 { 1220 checkDBDirExistsOrCanCreate(newBackendDirectory, ccr, false); 1221 if (!ccr.getMessages().isEmpty()) 1222 { 1223 return ccr; 1224 } 1225 1226 ccr.setAdminActionRequired(true); 1227 ccr.addMessage(NOTE_CONFIG_DB_DIR_REQUIRES_RESTART.get(config.getDBDirectory(), cfg.getDBDirectory())); 1228 } 1229 1230 if (!cfg.getDBDirectoryPermissions().equalsIgnoreCase(config.getDBDirectoryPermissions()) 1231 || !cfg.getDBDirectory().equals(config.getDBDirectory())) 1232 { 1233 checkDBDirPermissions(cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1234 if (!ccr.getMessages().isEmpty()) 1235 { 1236 return ccr; 1237 } 1238 1239 setDBDirPermissions(newBackendDirectory, cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1240 if (!ccr.getMessages().isEmpty()) 1241 { 1242 return ccr; 1243 } 1244 } 1245 registerMonitoredDirectory(cfg); 1246 config = cfg; 1247 commitPolicy = config.isDBTxnNoSync() ? SOFT : GROUP; 1248 } 1249 catch (Exception e) 1250 { 1251 addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1252 } 1253 return ccr; 1254 } 1255 1256 private void registerMonitoredDirectory(PDBBackendCfg cfg) 1257 { 1258 diskMonitor.registerMonitoredDirectory( 1259 cfg.getBackendId() + " backend", 1260 getDirectory(), 1261 cfg.getDiskLowThreshold(), 1262 cfg.getDiskFullThreshold(), 1263 this); 1264 } 1265 1266 @Override 1267 public void removeStorageFiles() throws StorageRuntimeException 1268 { 1269 StorageUtils.removeStorageFiles(backendDirectory); 1270 } 1271 1272 @Override 1273 public StorageStatus getStorageStatus() 1274 { 1275 return storageStatus; 1276 } 1277 1278 @Override 1279 public void diskFullThresholdReached(File directory, long thresholdInBytes) { 1280 storageStatus = statusWhenDiskSpaceFull(directory, thresholdInBytes, config.getBackendId()); 1281 } 1282 1283 @Override 1284 public void diskLowThresholdReached(File directory, long thresholdInBytes) { 1285 storageStatus = statusWhenDiskSpaceLow(directory, thresholdInBytes, config.getBackendId()); 1286 } 1287 1288 @Override 1289 public void diskSpaceRestored(File directory, long lowThresholdInBytes, long fullThresholdInBytes) { 1290 storageStatus = StorageStatus.working(); 1291 } 1292}