001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2013-2016 ForgeRock AS. 016 */ 017package org.opends.server.backends.task; 018 019import static org.opends.messages.BackendMessages.*; 020import static org.opends.server.config.ConfigConstants.*; 021import static org.opends.server.util.CollectionUtils.*; 022import static org.opends.server.util.ServerConstants.*; 023import static org.opends.server.util.StaticUtils.*; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.GregorianCalendar; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.LinkedHashMap; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.TreeSet; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.locks.ReentrantLock; 037 038import org.forgerock.i18n.LocalizableMessage; 039import org.forgerock.i18n.slf4j.LocalizedLogger; 040import org.forgerock.opendj.ldap.ByteString; 041import org.forgerock.opendj.ldap.DN; 042import org.forgerock.opendj.ldap.ResultCode; 043import org.forgerock.opendj.ldap.schema.AttributeType; 044import org.opends.server.api.AlertGenerator; 045import org.opends.server.api.DirectoryThread; 046import org.opends.server.core.DirectoryServer; 047import org.opends.server.core.SearchOperation; 048import org.opends.server.core.ServerContext; 049import org.opends.server.types.Attribute; 050import org.opends.server.types.Attributes; 051import org.opends.server.types.DirectoryException; 052import org.opends.server.types.Entry; 053import org.opends.server.types.ExistingFileBehavior; 054import org.opends.server.types.InitializationException; 055import org.opends.server.types.LDIFExportConfig; 056import org.opends.server.types.LDIFImportConfig; 057import org.opends.server.types.LockManager.DNLock; 058import org.opends.server.types.Operation; 059import org.opends.server.types.SearchFilter; 060import org.opends.server.util.LDIFException; 061import org.opends.server.util.LDIFReader; 062import org.opends.server.util.LDIFWriter; 063import org.opends.server.util.TimeThread; 064 065/** 066 * This class defines a task scheduler for the Directory Server that will 067 * control the execution of scheduled tasks and other administrative functions 068 * that need to occur on a regular basis. 069 */ 070public class TaskScheduler 071 extends DirectoryThread 072 implements AlertGenerator 073{ 074 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 075 076 /** 077 * The fully-qualified name of this class. 078 */ 079 private static final String CLASS_NAME = 080 "org.opends.server.backends.task.TaskScheduler"; 081 082 083 084 /** 085 * The maximum length of time in milliseconds to sleep between iterations 086 * through the scheduler loop. 087 */ 088 private static long MAX_SLEEP_TIME = 5000; 089 090 091 /** Indicates whether the scheduler is currently running. */ 092 private boolean isRunning; 093 /** Indicates whether a request has been received to stop the scheduler. */ 094 private boolean stopRequested; 095 096 /** The entry that serves as the immediate parent for recurring tasks. */ 097 private Entry recurringTaskParentEntry; 098 /** The entry that serves as the immediate parent for scheduled tasks. */ 099 private Entry scheduledTaskParentEntry; 100 /** The top-level entry at the root of the task tree. */ 101 private Entry taskRootEntry; 102 103 /** The set of recurring tasks defined in the server. */ 104 private final HashMap<String,RecurringTask> recurringTasks = new HashMap<>(); 105 /** The set of tasks associated with this scheduler. */ 106 private final HashMap<String,Task> tasks = new HashMap<>(); 107 /** The set of worker threads that are actively busy processing tasks. */ 108 private final HashMap<String,TaskThread> activeThreads = new HashMap<>(); 109 110 /** The thread ID for the next task thread to be created;. */ 111 private int nextThreadID = 1; 112 113 /** The set of worker threads that may be used to process tasks. */ 114 private final LinkedList<TaskThread> idleThreads = new LinkedList<>(); 115 116 /** The lock used to provide threadsafe access to the scheduler. */ 117 private final ReentrantLock schedulerLock = new ReentrantLock(); 118 119 /** The task backend with which this scheduler is associated. */ 120 private TaskBackend taskBackend; 121 122 /** The thread being used to actually run the scheduler. */ 123 private Thread schedulerThread; 124 125 /** The set of recently-completed tasks that need to be retained. */ 126 private final TreeSet<Task> completedTasks = new TreeSet<>(); 127 /** The set of tasks that have been scheduled but not yet arrived. */ 128 private final TreeSet<Task> pendingTasks = new TreeSet<>(); 129 /** The set of tasks that are currently running. */ 130 private final TreeSet<Task> runningTasks = new TreeSet<>(); 131 132 private ServerContext serverContext; 133 134 /** 135 * Creates a new task scheduler that will be used to ensure that tasks are 136 * invoked at the appropriate times. 137 * @param serverContext 138 * The server context 139 * @param taskBackend The task backend with which this scheduler is 140 * associated. 141 * 142 * @throws InitializationException If a problem occurs while initializing 143 * the scheduler from the backing file. 144 */ 145 public TaskScheduler(ServerContext serverContext, TaskBackend taskBackend) 146 throws InitializationException 147 { 148 super("Task Scheduler Thread"); 149 150 this.serverContext = serverContext; 151 this.taskBackend = taskBackend; 152 153 DirectoryServer.registerAlertGenerator(this); 154 155 initializeTasksFromBackingFile(); 156 157 for (RecurringTask recurringTask : recurringTasks.values()) { 158 Task task = null; 159 try { 160 task = recurringTask.scheduleNextIteration(new GregorianCalendar()); 161 } catch (DirectoryException de) { 162 logger.error(de.getMessageObject()); 163 } 164 if (task != null) { 165 try { 166 scheduleTask(task, false); 167 } catch (DirectoryException de) { 168 // This task might have been already scheduled from before 169 // and thus got initialized from backing file, otherwise 170 // log error and continue. 171 if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) { 172 logger.error(de.getMessageObject()); 173 } 174 } 175 } 176 } 177 } 178 179 180 181 /** 182 * Adds a recurring task to the scheduler, optionally scheduling the first 183 * iteration for processing. 184 * 185 * @param recurringTask The recurring task to add to the scheduler. 186 * @param scheduleIteration Indicates whether to schedule an iteration of 187 * the recurring task. 188 * 189 * @throws DirectoryException If a problem occurs while trying to add the 190 * recurring task (e.g., there's already another 191 * recurring task defined with the same ID). 192 */ 193 public void addRecurringTask(RecurringTask recurringTask, 194 boolean scheduleIteration) 195 throws DirectoryException 196 { 197 schedulerLock.lock(); 198 199 try 200 { 201 String id = recurringTask.getRecurringTaskID(); 202 203 if (recurringTasks.containsKey(id)) 204 { 205 LocalizableMessage message = 206 ERR_TASKSCHED_DUPLICATE_RECURRING_ID.get(id); 207 throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message); 208 } 209 210 Attribute attr = Attributes.create(ATTR_TASK_STATE, TaskState.RECURRING.toString()); 211 Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry(); 212 recurringTaskEntry.putAttribute(attr.getAttributeDescription().getAttributeType(), newArrayList(attr)); 213 214 if (scheduleIteration) 215 { 216 Task task = recurringTask.scheduleNextIteration( 217 new GregorianCalendar()); 218 if (task != null) 219 { 220 // If there is an existing task with the same id 221 // and it is in completed state, take its place. 222 Task t = tasks.get(task.getTaskID()); 223 if (t != null && TaskState.isDone(t.getTaskState())) 224 { 225 removeCompletedTask(t.getTaskID()); 226 } 227 228 scheduleTask(task, false); 229 } 230 } 231 232 recurringTasks.put(id, recurringTask); 233 writeState(); 234 } 235 finally 236 { 237 schedulerLock.unlock(); 238 } 239 } 240 241 242 243 /** 244 * Removes the recurring task with the given ID. 245 * 246 * @param recurringTaskID The ID of the recurring task to remove. 247 * 248 * @return The recurring task that was removed, or <CODE>null</CODE> if there 249 * was no such recurring task. 250 * 251 * @throws DirectoryException If there is currently a pending or running 252 * iteration of the associated recurring task. 253 */ 254 public RecurringTask removeRecurringTask(String recurringTaskID) 255 throws DirectoryException 256 { 257 schedulerLock.lock(); 258 259 try 260 { 261 RecurringTask recurringTask = recurringTasks.remove(recurringTaskID); 262 HashMap<String,Task> iterationsMap = new HashMap<>(); 263 264 for (Task t : tasks.values()) 265 { 266 // Find any existing task iterations and try to cancel them. 267 if (t.getRecurringTaskID() != null && 268 t.getRecurringTaskID().equals(recurringTaskID)) 269 { 270 TaskState state = t.getTaskState(); 271 if (!TaskState.isDone(state) && !TaskState.isCancelled(state)) 272 { 273 cancelTask(t.getTaskID()); 274 } 275 iterationsMap.put(t.getTaskID(), t); 276 } 277 } 278 279 // Remove any completed task iterations. 280 for (Map.Entry<String,Task> iterationEntry : iterationsMap.entrySet()) 281 { 282 if (TaskState.isDone(iterationEntry.getValue().getTaskState())) 283 { 284 removeCompletedTask(iterationEntry.getKey()); 285 } 286 } 287 288 writeState(); 289 return recurringTask; 290 } 291 finally 292 { 293 schedulerLock.unlock(); 294 } 295 } 296 297 298 299 /** 300 * Schedules the provided task for execution. If the scheduler is active and 301 * the start time has arrived, then the task will begin execution immediately. 302 * Otherwise, it will be placed in the pending queue to be started at the 303 * appropriate time. 304 * 305 * @param task The task to be scheduled. 306 * @param writeState Indicates whether the current state information for 307 * the scheduler should be persisted to disk once the 308 * task is scheduled. 309 * 310 * @throws DirectoryException If a problem occurs while trying to schedule 311 * the task (e.g., there's already another task 312 * defined with the same ID). 313 */ 314 public void scheduleTask(Task task, boolean writeState) 315 throws DirectoryException 316 { 317 schedulerLock.lock(); 318 319 320 try 321 { 322 String id = task.getTaskID(); 323 324 if (tasks.containsKey(id)) 325 { 326 LocalizableMessage message = ERR_TASKSCHED_DUPLICATE_TASK_ID.get(id); 327 throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message); 328 } 329 330 for (String dependencyID : task.getDependencyIDs()) 331 { 332 Task t = tasks.get(dependencyID); 333 if (t == null) 334 { 335 LocalizableMessage message = ERR_TASKSCHED_DEPENDENCY_MISSING.get(id, dependencyID); 336 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 337 } 338 } 339 340 tasks.put(id, task); 341 342 TaskState state = shouldStart(task); 343 task.setTaskState(state); 344 345 if (state == TaskState.RUNNING) 346 { 347 TaskThread taskThread; 348 if (idleThreads.isEmpty()) 349 { 350 taskThread = new TaskThread(this, nextThreadID++); 351 taskThread.start(); 352 } 353 else 354 { 355 taskThread = idleThreads.removeFirst(); 356 } 357 358 runningTasks.add(task); 359 activeThreads.put(task.getTaskID(), taskThread); 360 taskThread.setTask(task); 361 } 362 else if (TaskState.isDone(state)) 363 { 364 if (state == TaskState.CANCELED_BEFORE_STARTING && task.isRecurring()) 365 { 366 pendingTasks.add(task); 367 } 368 else 369 { 370 completedTasks.add(task); 371 } 372 } 373 else 374 { 375 pendingTasks.add(task); 376 } 377 378 if (writeState) 379 { 380 writeState(); 381 } 382 } 383 finally 384 { 385 schedulerLock.unlock(); 386 } 387 } 388 389 390 391 /** 392 * Attempts to cancel the task with the given task ID. This will only cancel 393 * the task if it has not yet started running. If it has started, then it 394 * will not be interrupted. 395 * 396 * @param taskID The task ID of the task to cancel. 397 * 398 * @return The requested task, which may or may not have actually been 399 * cancelled (the task state should make it possible to determine 400 * whether it was cancelled), or <CODE>null</CODE> if there is no 401 * such task. 402 */ 403 public Task cancelTask(String taskID) 404 { 405 schedulerLock.lock(); 406 407 try 408 { 409 Task t = tasks.get(taskID); 410 if (t == null) 411 { 412 return null; 413 } 414 415 if (TaskState.isPending(t.getTaskState())) 416 { 417 pendingTasks.remove(t); 418 t.setTaskState(TaskState.CANCELED_BEFORE_STARTING); 419 addCompletedTask(t); 420 writeState(); 421 } 422 423 return t; 424 } 425 finally 426 { 427 schedulerLock.unlock(); 428 } 429 } 430 431 432 433 /** 434 * Removes the specified pending task. It will be completely removed rather 435 * than moving it to the set of completed tasks. 436 * 437 * @param taskID The task ID of the pending task to remove. 438 * 439 * @return The task that was removed. 440 * 441 * @throws DirectoryException If the requested task is not in the pending 442 * queue. 443 */ 444 public Task removePendingTask(String taskID) 445 throws DirectoryException 446 { 447 schedulerLock.lock(); 448 449 try 450 { 451 Task t = tasks.get(taskID); 452 if (t == null) 453 { 454 LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NO_SUCH_TASK.get(taskID); 455 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 456 } 457 458 if (TaskState.isPending(t.getTaskState())) 459 { 460 tasks.remove(taskID); 461 pendingTasks.remove(t); 462 writeState(); 463 return t; 464 } 465 else 466 { 467 LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NOT_PENDING.get(taskID); 468 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 469 } 470 } 471 finally 472 { 473 schedulerLock.unlock(); 474 } 475 } 476 477 478 479 /** 480 * Removes the specified completed task. 481 * 482 * @param taskID The task ID of the completed task to remove. 483 * 484 * @return The task that was removed. 485 * 486 * @throws DirectoryException If the requested task could not be found. 487 */ 488 public Task removeCompletedTask(String taskID) 489 throws DirectoryException 490 { 491 schedulerLock.lock(); 492 493 try 494 { 495 Iterator<Task> iterator = completedTasks.iterator(); 496 while (iterator.hasNext()) 497 { 498 Task t = iterator.next(); 499 if (t.getTaskID().equals(taskID)) 500 { 501 iterator.remove(); 502 tasks.remove(taskID); 503 writeState(); 504 return t; 505 } 506 } 507 508 LocalizableMessage message = ERR_TASKSCHED_REMOVE_COMPLETED_NO_SUCH_TASK.get(taskID); 509 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 510 } 511 finally 512 { 513 schedulerLock.unlock(); 514 } 515 } 516 517 518 519 /** 520 * Indicates that processing has completed on the provided task thread and 521 * that it is now available for processing other tasks. The thread may be 522 * immediately used for processing another task if appropriate. 523 * 524 * @param taskThread The thread that has completed processing on its 525 * previously-assigned task. 526 * @param completedTask The task for which processing has been completed. 527 * @param taskState The task state for this completed task. 528 * 529 * @return <CODE>true</CODE> if the thread should continue running and 530 * wait for the next task to process, or <CODE>false</CODE> if it 531 * should exit immediately. 532 */ 533 public boolean threadDone(TaskThread taskThread, Task completedTask, 534 TaskState taskState) 535 { 536 schedulerLock.lock(); 537 538 try 539 { 540 completedTask.setCompletionTime(TimeThread.getTime()); 541 completedTask.setTaskState(taskState); 542 addCompletedTask(completedTask); 543 544 try 545 { 546 completedTask.sendNotificationEMailMessage(); 547 } 548 catch (Exception e) 549 { 550 logger.traceException(e); 551 } 552 553 String taskID = completedTask.getTaskID(); 554 if (activeThreads.remove(taskID) == null) 555 { 556 return false; 557 } 558 559 // See if the task is part of a recurring task. 560 // If so, then schedule the next iteration. 561 scheduleNextRecurringTaskIteration(completedTask, 562 new GregorianCalendar()); 563 writeState(); 564 565 if (isRunning) 566 { 567 idleThreads.add(taskThread); 568 return true; 569 } 570 else 571 { 572 return false; 573 } 574 } 575 finally 576 { 577 schedulerLock.unlock(); 578 } 579 } 580 581 582 583 /** 584 * Check if a given task is a recurring task iteration and re-schedule it. 585 * @param completedTask The task for which processing has been completed. 586 * @param calendar The calendar date and time to schedule from. 587 */ 588 protected void scheduleNextRecurringTaskIteration(Task completedTask, 589 GregorianCalendar calendar) 590 { 591 String recurringTaskID = completedTask.getRecurringTaskID(); 592 if (recurringTaskID != null) 593 { 594 RecurringTask recurringTask = recurringTasks.get(recurringTaskID); 595 if (recurringTask != null) 596 { 597 Task newIteration = null; 598 try { 599 newIteration = recurringTask.scheduleNextIteration(calendar); 600 } catch (DirectoryException de) { 601 logger.error(de.getMessageObject()); 602 } 603 if (newIteration != null) 604 { 605 try 606 { 607 // If there is an existing task with the same id 608 // and it is in completed state, take its place. 609 Task t = tasks.get(newIteration.getTaskID()); 610 if (t != null && TaskState.isDone(t.getTaskState())) 611 { 612 removeCompletedTask(t.getTaskID()); 613 } 614 615 scheduleTask(newIteration, false); 616 } 617 catch (DirectoryException de) 618 { 619 // This task might have been already scheduled from before 620 // and thus got initialized from backing file, otherwise 621 // log error and continue. 622 if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) 623 { 624 logger.traceException(de); 625 626 LocalizableMessage message = 627 ERR_TASKSCHED_ERROR_SCHEDULING_RECURRING_ITERATION.get( 628 recurringTaskID, de.getMessageObject()); 629 logger.error(message); 630 631 DirectoryServer.sendAlertNotification(this, 632 ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION, 633 message); 634 } 635 } 636 } 637 } 638 } 639 } 640 641 642 643 /** 644 * Adds the provided task to the set of completed tasks associated with the 645 * scheduler. It will be automatically removed after the appropriate 646 * retention time has elapsed. 647 * 648 * @param completedTask The task for which processing has completed. 649 */ 650 public void addCompletedTask(Task completedTask) 651 { 652 // The scheduler lock is reentrant, so even if we already hold it, we can 653 // acquire it again. 654 schedulerLock.lock(); 655 656 try 657 { 658 completedTasks.add(completedTask); 659 runningTasks.remove(completedTask); 660 661 // If the task never ran set its completion 662 // time here explicitly so that it can be 663 // correctly evaluated for retention later. 664 if (completedTask.getCompletionTime() == -1) 665 { 666 completedTask.setCompletionTime(TimeThread.getTime()); 667 } 668 } 669 finally 670 { 671 schedulerLock.unlock(); 672 } 673 } 674 675 676 677 /** 678 * Stops the scheduler so that it will not start any scheduled tasks. It will 679 * not attempt to interrupt any tasks that are already running. Note that 680 * once the scheduler has been stopped, it cannot be restarted and it will be 681 * necessary to restart the task backend to start a new scheduler instance. 682 */ 683 public void stopScheduler() 684 { 685 stopRequested = true; 686 687 try 688 { 689 schedulerThread.interrupt(); 690 } 691 catch (Exception e) 692 { 693 logger.traceException(e); 694 } 695 696 try 697 { 698 schedulerThread.join(); 699 } 700 catch (Exception e) 701 { 702 logger.traceException(e); 703 } 704 705 pendingTasks.clear(); 706 runningTasks.clear(); 707 completedTasks.clear(); 708 tasks.clear(); 709 710 for (TaskThread thread : idleThreads) 711 { 712 LocalizableMessage message = INFO_TASKBE_INTERRUPTED_BY_SHUTDOWN.get(); 713 thread.interruptTask(TaskState.STOPPED_BY_SHUTDOWN, message, true); 714 } 715 } 716 717 718 719 /** 720 * Attempts to interrupt any tasks that are actively running. This will not 721 * make any attempt to stop the scheduler. 722 * 723 * @param interruptState The state that should be assigned to the tasks if 724 * they are successfully interrupted. 725 * @param interruptReason A message indicating the reason that the tasks 726 * are to be interrupted. 727 * @param waitForStop Indicates whether this method should wait until 728 * all active tasks have stopped before returning. 729 */ 730 public void interruptRunningTasks(TaskState interruptState, 731 LocalizableMessage interruptReason, 732 boolean waitForStop) 733 { 734 // Grab a copy of the running threads so that we can operate on them without 735 // holding the lock. 736 LinkedList<TaskThread> threadList = new LinkedList<>(); 737 738 schedulerLock.lock(); 739 try 740 { 741 threadList.addAll(activeThreads.values()); 742 } 743 finally 744 { 745 schedulerLock.unlock(); 746 } 747 748 749 // Iterate through all the task threads and request that they stop 750 // processing. 751 for (TaskThread t : threadList) 752 { 753 try 754 { 755 t.interruptTask(interruptState, interruptReason, true); 756 } 757 catch (Exception e) 758 { 759 logger.traceException(e); 760 } 761 } 762 763 764 // If we should actually wait for all the task threads to stop, then do so. 765 if (waitForStop) 766 { 767 for (TaskThread t : threadList) 768 { 769 try 770 { 771 t.join(); 772 } 773 catch (Exception e) 774 { 775 logger.traceException(e); 776 } 777 } 778 } 779 } 780 781 782 783 /** 784 * Operates in a loop, launching tasks at the appropriate time and performing 785 * any necessary periodic cleanup. 786 */ 787 @Override 788 public void run() 789 { 790 isRunning = true; 791 schedulerThread = currentThread(); 792 793 try 794 { 795 while (! stopRequested) 796 { 797 schedulerLock.lock(); 798 799 boolean writeState = false; 800 long sleepTime = MAX_SLEEP_TIME; 801 802 try 803 { 804 // If there are any pending tasks that need to be started, then do so 805 // now. 806 Iterator<Task> iterator = pendingTasks.iterator(); 807 while (iterator.hasNext()) 808 { 809 Task t = iterator.next(); 810 TaskState state = shouldStart(t); 811 812 if (state == TaskState.RUNNING) 813 { 814 TaskThread taskThread; 815 if (idleThreads.isEmpty()) 816 { 817 taskThread = new TaskThread(this, nextThreadID++); 818 taskThread.start(); 819 } 820 else 821 { 822 taskThread = idleThreads.removeFirst(); 823 } 824 825 runningTasks.add(t); 826 activeThreads.put(t.getTaskID(), taskThread); 827 taskThread.setTask(t); 828 829 iterator.remove(); 830 writeState = true; 831 } 832 else if (state == TaskState.WAITING_ON_START_TIME) 833 { 834 // If we're waiting for the start time to arrive, then see if that 835 // will come before the next sleep time is up. 836 long waitTime = t.getScheduledStartTime() - TimeThread.getTime(); 837 sleepTime = Math.min(sleepTime, waitTime); 838 } 839 // Recurring task iteration has to spawn the next one 840 // even if the current iteration has been canceled. 841 else if (state == TaskState.CANCELED_BEFORE_STARTING && t.isRecurring()) 842 { 843 if (t.getScheduledStartTime() > TimeThread.getTime()) { 844 // If we're waiting for the start time to arrive, 845 // then see if that will come before the next sleep time is up. 846 long waitTime = 847 t.getScheduledStartTime() - TimeThread.getTime(); 848 sleepTime = Math.min(sleepTime, waitTime); 849 } else { 850 TaskThread taskThread; 851 if (idleThreads.isEmpty()) { 852 taskThread = new TaskThread(this, nextThreadID++); 853 taskThread.start(); 854 } else { 855 taskThread = idleThreads.removeFirst(); 856 } 857 runningTasks.add(t); 858 activeThreads.put(t.getTaskID(), taskThread); 859 taskThread.setTask(t); 860 } 861 } 862 863 if (state != t.getTaskState()) 864 { 865 t.setTaskState(state); 866 writeState = true; 867 } 868 } 869 870 871 // Clean up any completed tasks that have been around long enough. 872 long retentionTimeMillis = 873 TimeUnit.SECONDS.toMillis(taskBackend.getRetentionTime()); 874 long oldestRetainedCompletionTime = 875 TimeThread.getTime() - retentionTimeMillis; 876 iterator = completedTasks.iterator(); 877 while (iterator.hasNext()) 878 { 879 Task t = iterator.next(); 880 if (t.getCompletionTime() < oldestRetainedCompletionTime) 881 { 882 iterator.remove(); 883 tasks.remove(t.getTaskID()); 884 writeState = true; 885 } 886 } 887 888 // If anything changed, then make sure that the on-disk state gets 889 // updated. 890 if (writeState) 891 { 892 writeState(); 893 } 894 } 895 finally 896 { 897 schedulerLock.unlock(); 898 } 899 900 901 try 902 { 903 if (sleepTime > 0) 904 { 905 Thread.sleep(sleepTime); 906 } 907 } catch (InterruptedException ie) {} 908 } 909 } 910 finally 911 { 912 isRunning = false; 913 } 914 } 915 916 917 918 /** 919 * Determines whether the specified task should start running. This is based 920 * on the start time, the set of dependencies, and whether the scheduler is active. 921 * Note that the caller to this method must hold the scheduler lock. 922 * 923 * @param task The task for which to make the determination. 924 * 925 * @return The task state that should be used for the task. It should be 926 * RUNNING if the task should be started, or some other state if not. 927 */ 928 private TaskState shouldStart(Task task) 929 { 930 // If the task has finished we don't want to restart it 931 TaskState state = task.getTaskState(); 932 933 // Reset task state if recurring. 934 if (state == TaskState.RECURRING) { 935 state = null; 936 } 937 938 if (state != null && TaskState.isDone(state)) 939 { 940 return state; 941 } 942 943 if (! isRunning) 944 { 945 return TaskState.UNSCHEDULED; 946 } 947 948 if (task.getScheduledStartTime() > TimeThread.getTime()) 949 { 950 return TaskState.WAITING_ON_START_TIME; 951 } 952 953 LinkedList<String> dependencyIDs = task.getDependencyIDs(); 954 if (dependencyIDs != null) 955 { 956 for (String dependencyID : dependencyIDs) 957 { 958 Task t = tasks.get(dependencyID); 959 if (t != null) 960 { 961 TaskState tState = t.getTaskState(); 962 if (!TaskState.isDone(tState)) 963 { 964 return TaskState.WAITING_ON_DEPENDENCY; 965 } 966 if (!TaskState.isSuccessful(tState)) 967 { 968 FailedDependencyAction action = task.getFailedDependencyAction(); 969 switch (action) 970 { 971 case CANCEL: 972 cancelTask(task.getTaskID()); 973 return task.getTaskState(); 974 case DISABLE: 975 task.setTaskState(TaskState.DISABLED); 976 return task.getTaskState(); 977 default: 978 break; 979 } 980 } 981 } 982 } 983 } 984 985 return TaskState.RUNNING; 986 } 987 988 989 990 /** 991 * Populates the scheduler with information read from the task backing file. 992 * If no backing file is found, then create a new one. The caller must 993 * already hold the scheduler lock or otherwise ensure that this is a 994 * threadsafe operation. 995 * 996 * @throws InitializationException If a fatal error occurs while attempting 997 * to perform the initialization. 998 */ 999 private void initializeTasksFromBackingFile() 1000 throws InitializationException 1001 { 1002 String backingFilePath = taskBackend.getTaskBackingFile(); 1003 1004 try 1005 { 1006 File backingFile = getFileForPath(backingFilePath); 1007 if (! backingFile.exists()) 1008 { 1009 createNewTaskBackingFile(); 1010 return; 1011 } 1012 1013 1014 LDIFImportConfig importConfig = new LDIFImportConfig(backingFilePath); 1015 LDIFReader ldifReader = new LDIFReader(importConfig); 1016 1017 taskRootEntry = null; 1018 recurringTaskParentEntry = null; 1019 scheduledTaskParentEntry = null; 1020 1021 while (true) 1022 { 1023 Entry entry; 1024 1025 try 1026 { 1027 entry = ldifReader.readEntry(); 1028 } 1029 catch (LDIFException le) 1030 { 1031 logger.traceException(le); 1032 1033 if (le.canContinueReading()) 1034 { 1035 logger.error(ERR_TASKSCHED_CANNOT_PARSE_ENTRY_RECOVERABLE, 1036 backingFilePath, le.getLineNumber(), le.getMessage()); 1037 1038 continue; 1039 } 1040 else 1041 { 1042 try 1043 { 1044 ldifReader.close(); 1045 } 1046 catch (Exception e) 1047 { 1048 logger.traceException(e); 1049 } 1050 1051 LocalizableMessage message = ERR_TASKSCHED_CANNOT_PARSE_ENTRY_FATAL.get( 1052 backingFilePath, le.getLineNumber(), le.getMessage()); 1053 throw new InitializationException(message); 1054 } 1055 } 1056 1057 if (entry == null) 1058 { 1059 break; 1060 } 1061 1062 DN entryDN = entry.getName(); 1063 if (entryDN.equals(taskBackend.getTaskRootDN())) 1064 { 1065 taskRootEntry = entry; 1066 } 1067 else if (entryDN.equals(taskBackend.getRecurringTasksParentDN())) 1068 { 1069 recurringTaskParentEntry = entry; 1070 } 1071 else if (entryDN.equals(taskBackend.getScheduledTasksParentDN())) 1072 { 1073 scheduledTaskParentEntry = entry; 1074 } 1075 else 1076 { 1077 DN parentDN = DirectoryServer.getParentDNInSuffix(entryDN); 1078 if (parentDN == null) 1079 { 1080 logger.error(ERR_TASKSCHED_ENTRY_HAS_NO_PARENT, entryDN, taskBackend.getTaskRootDN()); 1081 } 1082 else if (parentDN.equals(taskBackend.getScheduledTasksParentDN())) 1083 { 1084 try 1085 { 1086 Task task = entryToScheduledTask(entry, null); 1087 if (TaskState.isDone(task.getTaskState())) 1088 { 1089 String id = task.getTaskID(); 1090 if (tasks.containsKey(id)) 1091 { 1092 logger.warn(WARN_TASKSCHED_DUPLICATE_TASK_ID, id); 1093 } 1094 else 1095 { 1096 completedTasks.add(task); 1097 tasks.put(id, task); 1098 } 1099 } 1100 else 1101 { 1102 scheduleTask(task, false); 1103 } 1104 } 1105 catch (DirectoryException de) 1106 { 1107 logger.traceException(de); 1108 logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_TASK_FROM_ENTRY, entryDN, de.getMessageObject()); 1109 } 1110 } 1111 else if (parentDN.equals(taskBackend.getRecurringTasksParentDN())) 1112 { 1113 try 1114 { 1115 RecurringTask recurringTask = entryToRecurringTask(entry); 1116 addRecurringTask(recurringTask, false); 1117 } 1118 catch (DirectoryException de) 1119 { 1120 logger.traceException(de); 1121 logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY, entryDN, de.getMessageObject()); 1122 } 1123 } 1124 else 1125 { 1126 logger.error(ERR_TASKSCHED_INVALID_TASK_ENTRY_DN, entryDN, backingFilePath); 1127 } 1128 } 1129 } 1130 1131 ldifReader.close(); 1132 } 1133 catch (IOException ioe) 1134 { 1135 logger.traceException(ioe); 1136 1137 LocalizableMessage message = ERR_TASKSCHED_ERROR_READING_TASK_BACKING_FILE.get( 1138 backingFilePath, stackTraceToSingleLineString(ioe)); 1139 throw new InitializationException(message, ioe); 1140 } 1141 } 1142 1143 1144 1145 /** 1146 * Creates a new task backing file that contains only the basic structure but 1147 * no scheduled or recurring task entries. The caller must already hold the 1148 * scheduler lock or otherwise ensure that this is a threadsafe operation. 1149 * 1150 * @throws InitializationException If a problem occurs while attempting to 1151 * create the backing file. 1152 */ 1153 private void createNewTaskBackingFile() 1154 throws InitializationException 1155 { 1156 String backingFile = taskBackend.getTaskBackingFile(); 1157 LDIFExportConfig exportConfig = 1158 new LDIFExportConfig(backingFile, ExistingFileBehavior.OVERWRITE); 1159 1160 try 1161 { 1162 LDIFWriter writer = new LDIFWriter(exportConfig); 1163 1164 // First, write a header to the top of the file to indicate that it should 1165 // not be manually edited. 1166 writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80); 1167 1168 1169 // Next, create the required hierarchical entries and add them to the 1170 // LDIF. 1171 taskRootEntry = createEntry(taskBackend.getTaskRootDN()); 1172 writer.writeEntry(taskRootEntry); 1173 1174 scheduledTaskParentEntry = 1175 createEntry(taskBackend.getScheduledTasksParentDN()); 1176 writer.writeEntry(scheduledTaskParentEntry); 1177 1178 recurringTaskParentEntry = 1179 createEntry(taskBackend.getRecurringTasksParentDN()); 1180 writer.writeEntry(recurringTaskParentEntry); 1181 1182 1183 // Close the file and we're done. 1184 writer.close(); 1185 } 1186 catch (IOException ioe) 1187 { 1188 logger.traceException(ioe); 1189 1190 LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get( 1191 backingFile, stackTraceToSingleLineString(ioe)); 1192 throw new InitializationException(message, ioe); 1193 } 1194 catch (LDIFException le) 1195 { 1196 logger.traceException(le); 1197 LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get( 1198 backingFile, le.getMessage()); 1199 throw new InitializationException(message, le); 1200 } 1201 } 1202 1203 1204 1205 /** 1206 * Writes state information about all tasks and recurring tasks to disk. 1207 */ 1208 public void writeState() 1209 { 1210 String backingFilePath = taskBackend.getTaskBackingFile(); 1211 String tmpFilePath = backingFilePath + ".tmp"; 1212 LDIFExportConfig exportConfig = 1213 new LDIFExportConfig(tmpFilePath, ExistingFileBehavior.OVERWRITE); 1214 1215 1216 schedulerLock.lock(); 1217 1218 try 1219 { 1220 LDIFWriter writer = new LDIFWriter(exportConfig); 1221 1222 // First, write a header to the top of the file to indicate that it should 1223 // not be manually edited. 1224 writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80); 1225 1226 1227 // Next, write the structural entries to the top of the LDIF. 1228 writer.writeEntry(taskRootEntry); 1229 writer.writeEntry(scheduledTaskParentEntry); 1230 writer.writeEntry(recurringTaskParentEntry); 1231 1232 1233 // Iterate through all the recurring tasks and write them to LDIF. 1234 for (RecurringTask recurringTask : recurringTasks.values()) 1235 { 1236 writer.writeEntry(recurringTask.getRecurringTaskEntry()); 1237 } 1238 1239 1240 // Iterate through all the scheduled tasks and write them to LDIF. 1241 for (Task task : tasks.values()) 1242 { 1243 writer.writeEntry(task.getTaskEntry()); 1244 } 1245 1246 1247 // Close the file. 1248 writer.close(); 1249 1250 1251 // See if there is a ".save" file. If so, then delete it. 1252 File saveFile = getFileForPath(backingFilePath + ".save"); 1253 try 1254 { 1255 if (saveFile.exists()) 1256 { 1257 saveFile.delete(); 1258 } 1259 } 1260 catch (Exception e) 1261 { 1262 logger.traceException(e); 1263 } 1264 1265 1266 // If there is an existing backing file, then rename it to ".save". 1267 File backingFile = getFileForPath(backingFilePath); 1268 try 1269 { 1270 if (backingFile.exists()) 1271 { 1272 backingFile.renameTo(saveFile); 1273 } 1274 } 1275 catch (Exception e) 1276 { 1277 logger.traceException(e); 1278 1279 LocalizableMessage message = WARN_TASKSCHED_CANNOT_RENAME_CURRENT_BACKING_FILE.get( 1280 backingFilePath, saveFile.getAbsolutePath(), stackTraceToSingleLineString(e)); 1281 logger.warn(message); 1282 DirectoryServer.sendAlertNotification( 1283 this, ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE, message); 1284 } 1285 1286 1287 // Rename the ".tmp" file into place. 1288 File tmpFile = getFileForPath(tmpFilePath); 1289 try 1290 { 1291 tmpFile.renameTo(backingFile); 1292 } 1293 catch (Exception e) 1294 { 1295 logger.traceException(e); 1296 1297 LocalizableMessage message = ERR_TASKSCHED_CANNOT_RENAME_NEW_BACKING_FILE.get( 1298 tmpFilePath, backingFilePath, stackTraceToSingleLineString(e)); 1299 logger.error(message); 1300 DirectoryServer.sendAlertNotification( 1301 this, ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE, message); 1302 } 1303 } 1304 catch (LDIFException le) 1305 { 1306 logger.traceException(le); 1307 LocalizableMessage message = 1308 ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath, le 1309 .getMessage()); 1310 logger.error(message); 1311 DirectoryServer.sendAlertNotification(this, 1312 ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message); 1313 } 1314 catch (Exception e) 1315 { 1316 logger.traceException(e); 1317 LocalizableMessage message = 1318 ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath, 1319 stackTraceToSingleLineString(e)); 1320 logger.error(message); 1321 DirectoryServer.sendAlertNotification(this, 1322 ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message); 1323 } 1324 finally 1325 { 1326 schedulerLock.unlock(); 1327 } 1328 } 1329 1330 1331 1332 /** 1333 * Retrieves the total number of entries in the task backend. 1334 * 1335 * @return The total number of entries in the task backend. 1336 */ 1337 public long getEntryCount() 1338 { 1339 schedulerLock.lock(); 1340 1341 try 1342 { 1343 return tasks.size() + recurringTasks.size() + 3; 1344 } 1345 finally 1346 { 1347 schedulerLock.unlock(); 1348 } 1349 } 1350 1351 /** 1352 * Retrieves the number of scheduled tasks in the task backend. 1353 * 1354 * @return The total number of entries in the task backend. 1355 */ 1356 public long getScheduledTaskCount() 1357 { 1358 schedulerLock.lock(); 1359 1360 try 1361 { 1362 return tasks.size(); 1363 } 1364 finally 1365 { 1366 schedulerLock.unlock(); 1367 } 1368 } 1369 1370 1371 1372 /** 1373 * Retrieves the number of recurring tasks in the task backend. 1374 * 1375 * @return The total number of entries in the task backend. 1376 */ 1377 public long getRecurringTaskCount() 1378 { 1379 schedulerLock.lock(); 1380 1381 try 1382 { 1383 return recurringTasks.size(); 1384 } 1385 finally 1386 { 1387 schedulerLock.unlock(); 1388 } 1389 } 1390 1391 1392 1393 /** 1394 * Retrieves the task backend with which this scheduler is associated. 1395 * 1396 * @return The task backend with which this scheduler is associated. 1397 */ 1398 public TaskBackend getTaskBackend() 1399 { 1400 return taskBackend; 1401 } 1402 1403 1404 1405 /** 1406 * Retrieves the root entry that is the common ancestor for all entries in the 1407 * task backend. 1408 * 1409 * @return The root entry that is the common ancestor for all entries in the 1410 * task backend. 1411 */ 1412 public Entry getTaskRootEntry() 1413 { 1414 return taskRootEntry.duplicate(true); 1415 } 1416 1417 1418 1419 /** 1420 * Retrieves the entry that is the immediate parent for all scheduled task 1421 * entries in the task backend. 1422 * 1423 * @return The entry that is the immediate parent for all scheduled task 1424 * entries in the task backend. 1425 */ 1426 public Entry getScheduledTaskParentEntry() 1427 { 1428 return scheduledTaskParentEntry.duplicate(true); 1429 } 1430 1431 1432 1433 /** 1434 * Retrieves the entry that is the immediate parent for all recurring task 1435 * entries in the task backend. 1436 * 1437 * @return The entry that is the immediate parent for all recurring task 1438 * entries in the task backend. 1439 */ 1440 public Entry getRecurringTaskParentEntry() 1441 { 1442 return recurringTaskParentEntry.duplicate(true); 1443 } 1444 1445 1446 1447 /** 1448 * Retrieves the scheduled task with the given task ID. 1449 * 1450 * @param taskID The task ID for the scheduled task to retrieve. 1451 * 1452 * @return The requested scheduled task, or <CODE>null</CODE> if there is no 1453 * such task. 1454 */ 1455 public Task getScheduledTask(String taskID) 1456 { 1457 schedulerLock.lock(); 1458 1459 try 1460 { 1461 return tasks.get(taskID); 1462 } 1463 finally 1464 { 1465 schedulerLock.unlock(); 1466 } 1467 } 1468 1469 1470 1471 /** 1472 * Retrieves the scheduled task created from the specified entry. 1473 * 1474 * @param taskEntryDN The DN of the task configuration entry associated 1475 * with the task to retrieve. 1476 * 1477 * @return The requested scheduled task, or <CODE>null</CODE> if there is no 1478 * such task. 1479 */ 1480 public Task getScheduledTask(DN taskEntryDN) 1481 { 1482 schedulerLock.lock(); 1483 1484 try 1485 { 1486 for (Task t : tasks.values()) 1487 { 1488 if (taskEntryDN.equals(t.getTaskEntry().getName())) 1489 { 1490 return t; 1491 } 1492 } 1493 1494 return null; 1495 } 1496 finally 1497 { 1498 schedulerLock.unlock(); 1499 } 1500 } 1501 1502 1503 1504 /** 1505 * Indicates whether the current thread already holds a lock on the scheduler. 1506 * 1507 * @return {@code true} if the current thread holds the scheduler lock, or 1508 * {@code false} if not. 1509 */ 1510 boolean holdsSchedulerLock() 1511 { 1512 return schedulerLock.isHeldByCurrentThread(); 1513 } 1514 1515 1516 1517 /** 1518 * Attempts to acquire a write lock on the specified entry, trying as many 1519 * times as necessary until the lock has been acquired. 1520 * 1521 * @param entryDN The DN of the entry for which to acquire the write lock. 1522 * 1523 * @return The write lock that has been acquired for the entry. 1524 */ 1525 DNLock writeLockEntry(DN entryDN) 1526 { 1527 DNLock lock = null; 1528 while (lock == null) 1529 { 1530 lock = DirectoryServer.getLockManager().tryWriteLockEntry(entryDN); 1531 } 1532 return lock; 1533 } 1534 1535 1536 1537 /** 1538 * Attempts to acquire a read lock on the specified entry. 1539 * 1540 * @param entryDN The DN of the entry for which to acquire the read lock. 1541 * 1542 * @return The read lock that has been acquired for the entry. 1543 * 1544 * @throws DirectoryException If the read lock cannot be acquired. 1545 */ 1546 DNLock readLockEntry(DN entryDN) throws DirectoryException 1547 { 1548 final DNLock lock = DirectoryServer.getLockManager().tryReadLockEntry(entryDN); 1549 if (lock != null) 1550 { 1551 return lock; 1552 } 1553 throw new DirectoryException(ResultCode.BUSY, ERR_BACKEND_CANNOT_LOCK_ENTRY.get(entryDN)); 1554 } 1555 1556 1557 1558 /** 1559 * Retrieves the scheduled task entry with the provided DN. The caller should 1560 * hold a read lock on the target entry. 1561 * 1562 * @param scheduledTaskEntryDN The entry DN that indicates which scheduled 1563 * task entry to retrieve. 1564 * 1565 * @return The scheduled task entry with the provided DN, or 1566 * <CODE>null</CODE> if no scheduled task has the provided DN. 1567 */ 1568 public Entry getScheduledTaskEntry(DN scheduledTaskEntryDN) 1569 { 1570 schedulerLock.lock(); 1571 1572 try 1573 { 1574 for (Task task : tasks.values()) 1575 { 1576 Entry taskEntry = task.getTaskEntry(); 1577 1578 if (scheduledTaskEntryDN.equals(taskEntry.getName())) 1579 { 1580 return taskEntry.duplicate(true); 1581 } 1582 } 1583 1584 return null; 1585 } 1586 finally 1587 { 1588 schedulerLock.unlock(); 1589 } 1590 } 1591 1592 1593 1594 /** 1595 * Compares the filter in the provided search operation against each of the 1596 * task entries, returning any that match. Note that only the search filter 1597 * will be used -- the base and scope will be ignored, so the caller must 1598 * ensure that they are correct for scheduled tasks. 1599 * 1600 * @param searchOperation The search operation to use when performing the 1601 * search. 1602 * 1603 * @return <CODE>true</CODE> if processing should continue on the search 1604 * operation, or <CODE>false</CODE> if it should not for some reason 1605 * (e.g., a size or time limit was reached). 1606 * 1607 * @throws DirectoryException If a problem occurs while processing the 1608 * search operation against the scheduled tasks. 1609 */ 1610 public boolean searchScheduledTasks(SearchOperation searchOperation) 1611 throws DirectoryException 1612 { 1613 SearchFilter filter = searchOperation.getFilter(); 1614 1615 schedulerLock.lock(); 1616 1617 try 1618 { 1619 for (Task t : tasks.values()) 1620 { 1621 DN taskEntryDN = t.getTaskEntryDN(); 1622 DNLock lock = readLockEntry(taskEntryDN); 1623 try 1624 { 1625 Entry e = t.getTaskEntry().duplicate(true); 1626 if (filter.matchesEntry(e) && !searchOperation.returnEntry(e, null)) 1627 { 1628 return false; 1629 } 1630 } 1631 finally 1632 { 1633 lock.unlock(); 1634 } 1635 } 1636 1637 return true; 1638 } 1639 finally 1640 { 1641 schedulerLock.unlock(); 1642 } 1643 } 1644 1645 1646 1647 /** 1648 * Retrieves the recurring task with the given recurring task ID. 1649 * 1650 * @param recurringTaskID The recurring task ID for the recurring task to 1651 * retrieve. 1652 * 1653 * @return The requested recurring task, or <CODE>null</CODE> if there is no 1654 * such recurring task. 1655 */ 1656 public RecurringTask getRecurringTask(String recurringTaskID) 1657 { 1658 schedulerLock.lock(); 1659 1660 try 1661 { 1662 return recurringTasks.get(recurringTaskID); 1663 } 1664 finally 1665 { 1666 schedulerLock.unlock(); 1667 } 1668 } 1669 1670 1671 1672 /** 1673 * Retrieves the recurring task with the given recurring task ID. 1674 * 1675 * @param recurringTaskEntryDN The recurring task ID for the recurring task 1676 * to retrieve. 1677 * 1678 * @return The requested recurring task, or <CODE>null</CODE> if there is no 1679 * such recurring task. 1680 */ 1681 public RecurringTask getRecurringTask(DN recurringTaskEntryDN) 1682 { 1683 schedulerLock.lock(); 1684 1685 try 1686 { 1687 for (RecurringTask rt : recurringTasks.values()) 1688 { 1689 if (recurringTaskEntryDN.equals(rt.getRecurringTaskEntry().getName())) 1690 { 1691 return rt; 1692 } 1693 } 1694 1695 return null; 1696 } 1697 finally 1698 { 1699 schedulerLock.unlock(); 1700 } 1701 } 1702 1703 1704 1705 /** 1706 * Retrieves the recurring task entry with the provided DN. The caller should 1707 * hold a read lock on the target entry. 1708 * 1709 * @param recurringTaskEntryDN The entry DN that indicates which recurring 1710 * task entry to retrieve. 1711 * 1712 * @return The recurring task entry with the provided DN, or 1713 * <CODE>null</CODE> if no recurring task has the provided DN. 1714 */ 1715 public Entry getRecurringTaskEntry(DN recurringTaskEntryDN) 1716 { 1717 schedulerLock.lock(); 1718 1719 try 1720 { 1721 for (RecurringTask recurringTask : recurringTasks.values()) 1722 { 1723 Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry(); 1724 1725 if (recurringTaskEntryDN.equals(recurringTaskEntry.getName())) 1726 { 1727 return recurringTaskEntry.duplicate(true); 1728 } 1729 } 1730 1731 return null; 1732 } 1733 finally 1734 { 1735 schedulerLock.unlock(); 1736 } 1737 } 1738 1739 1740 1741 /** 1742 * Compares the filter in the provided search operation against each of the 1743 * recurring task entries, returning any that match. Note that only the 1744 * search filter will be used -- the base and scope will be ignored, so the 1745 * caller must ensure that they are correct for recurring tasks. 1746 * 1747 * @param searchOperation The search operation to use when performing the 1748 * search. 1749 * 1750 * @return <CODE>true</CODE> if processing should continue on the search 1751 * operation, or <CODE>false</CODE> if it should not for some reason 1752 * (e.g., a size or time limit was reached). 1753 * 1754 * @throws DirectoryException If a problem occurs while processing the 1755 * search operation against the recurring tasks. 1756 */ 1757 public boolean searchRecurringTasks(SearchOperation searchOperation) 1758 throws DirectoryException 1759 { 1760 SearchFilter filter = searchOperation.getFilter(); 1761 1762 schedulerLock.lock(); 1763 1764 try 1765 { 1766 for (RecurringTask rt : recurringTasks.values()) 1767 { 1768 DN recurringTaskEntryDN = rt.getRecurringTaskEntryDN(); 1769 DNLock lock = readLockEntry(recurringTaskEntryDN); 1770 try 1771 { 1772 Entry e = rt.getRecurringTaskEntry().duplicate(true); 1773 if (filter.matchesEntry(e) && ! searchOperation.returnEntry(e, null)) 1774 { 1775 return false; 1776 } 1777 } 1778 finally 1779 { 1780 lock.unlock(); 1781 } 1782 } 1783 1784 return true; 1785 } 1786 finally 1787 { 1788 schedulerLock.unlock(); 1789 } 1790 } 1791 1792 1793 1794 /** 1795 * Decodes the contents of the provided entry as a scheduled task. The 1796 * resulting task will not actually be scheduled for processing. 1797 * 1798 * @param entry The entry to decode as a scheduled task. 1799 * @param operation The operation used to create this task in the server, or 1800 * {@code null} if the operation is not available. 1801 * 1802 * @return The scheduled task decoded from the provided entry. 1803 * 1804 * @throws DirectoryException If the provided entry cannot be decoded as a 1805 * scheduled task. 1806 */ 1807 public Task entryToScheduledTask(Entry entry, Operation operation) 1808 throws DirectoryException 1809 { 1810 // Get the name of the class that implements the task logic. 1811 AttributeType attrType = DirectoryServer.getSchema().getAttributeType(ATTR_TASK_CLASS); 1812 List<Attribute> attrList = entry.getAttribute(attrType); 1813 if (attrList.isEmpty()) 1814 { 1815 LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_ATTRIBUTE.get(ATTR_TASK_ID); 1816 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1817 } 1818 1819 if (attrList.size() > 1) 1820 { 1821 LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_TYPES.get(ATTR_TASK_ID); 1822 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1823 } 1824 1825 Attribute attr = attrList.get(0); 1826 if (attr.isEmpty()) 1827 { 1828 LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_VALUES.get(ATTR_TASK_ID); 1829 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1830 } 1831 1832 Iterator<ByteString> iterator = attr.iterator(); 1833 ByteString value = iterator.next(); 1834 if (iterator.hasNext()) 1835 { 1836 LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_VALUES.get(ATTR_TASK_ID); 1837 throw new DirectoryException(ResultCode.OBJECTCLASS_VIOLATION, message); 1838 } 1839 1840 // Try to load the specified class. 1841 String taskClassName = value.toString(); 1842 Class<?> taskClass; 1843 try 1844 { 1845 taskClass = DirectoryServer.loadClass(taskClassName); 1846 } 1847 catch (Exception e) 1848 { 1849 logger.traceException(e); 1850 1851 LocalizableMessage message = ERR_TASKSCHED_CANNOT_LOAD_CLASS. 1852 get(taskClassName, ATTR_TASK_CLASS, stackTraceToSingleLineString(e)); 1853 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1854 } 1855 1856 // Instantiate the class as a task. 1857 Task task; 1858 try 1859 { 1860 task = (Task) taskClass.newInstance(); 1861 } 1862 catch (Exception e) 1863 { 1864 logger.traceException(e); 1865 1866 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INSTANTIATE_CLASS_AS_TASK.get( 1867 taskClassName, Task.class.getName()); 1868 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1869 } 1870 1871 // Perform the necessary internal and external initialization for the task. 1872 try 1873 { 1874 task.initializeTaskInternal(serverContext, this, entry); 1875 } 1876 catch (InitializationException ie) 1877 { 1878 logger.traceException(ie); 1879 1880 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get( 1881 taskClassName, ie.getMessage()); 1882 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1883 } 1884 catch (Exception e) 1885 { 1886 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get( 1887 taskClassName, stackTraceToSingleLineString(e)); 1888 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1889 } 1890 1891 if (!TaskState.isDone(task.getTaskState()) && 1892 !DirectoryServer.getAllowedTasks().contains(taskClassName)) 1893 { 1894 LocalizableMessage message = ERR_TASKSCHED_NOT_ALLOWED_TASK.get(taskClassName); 1895 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 1896 } 1897 1898 task.setOperation(operation); 1899 1900 // Avoid task specific initialization for completed tasks. 1901 if (!TaskState.isDone(task.getTaskState())) { 1902 task.initializeTask(); 1903 } 1904 task.setOperation(null); 1905 1906 return task; 1907 } 1908 1909 1910 1911 /** 1912 * Decodes the contents of the provided entry as a recurring task. The 1913 * resulting recurring task will not actually be added to the scheduler. 1914 * 1915 * @param entry The entry to decode as a recurring task. 1916 * 1917 * @return The recurring task decoded from the provided entry. 1918 * 1919 * @throws DirectoryException If the provided entry cannot be decoded as a 1920 * recurring task. 1921 */ 1922 public RecurringTask entryToRecurringTask(Entry entry) 1923 throws DirectoryException 1924 { 1925 return new RecurringTask(serverContext, this, entry); 1926 } 1927 1928 1929 1930 /** 1931 * Retrieves the DN of the configuration entry with which this alert generator 1932 * is associated. 1933 * 1934 * @return The DN of the configuration entry with which this alert generator 1935 * is associated. 1936 */ 1937 @Override 1938 public DN getComponentEntryDN() 1939 { 1940 return taskBackend.getConfigEntryDN(); 1941 } 1942 1943 1944 1945 /** 1946 * Retrieves the fully-qualified name of the Java class for this alert 1947 * generator implementation. 1948 * 1949 * @return The fully-qualified name of the Java class for this alert 1950 * generator implementation. 1951 */ 1952 @Override 1953 public String getClassName() 1954 { 1955 return CLASS_NAME; 1956 } 1957 1958 1959 1960 /** 1961 * Retrieves information about the set of alerts that this generator may 1962 * produce. The map returned should be between the notification type for a 1963 * particular notification and the human-readable description for that 1964 * notification. This alert generator must not generate any alerts with types 1965 * that are not contained in this list. 1966 * 1967 * @return Information about the set of alerts that this generator may 1968 * produce. 1969 */ 1970 @Override 1971 public LinkedHashMap<String,String> getAlerts() 1972 { 1973 LinkedHashMap<String, String> alerts = new LinkedHashMap<>(); 1974 1975 alerts.put(ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION, 1976 ALERT_DESCRIPTION_CANNOT_SCHEDULE_RECURRING_ITERATION); 1977 alerts.put(ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE, 1978 ALERT_DESCRIPTION_CANNOT_RENAME_CURRENT_TASK_FILE); 1979 alerts.put(ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE, 1980 ALERT_DESCRIPTION_CANNOT_RENAME_NEW_TASK_FILE); 1981 alerts.put(ALERT_TYPE_CANNOT_WRITE_TASK_FILE, 1982 ALERT_DESCRIPTION_CANNOT_WRITE_TASK_FILE); 1983 1984 return alerts; 1985 } 1986}