001/** 002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 003 * 004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved 005 * 006 * The contents of this file are subject to the terms 007 * of the Common Development and Distribution License 008 * (the License). You may not use this file except in 009 * compliance with the License. 010 * 011 * You can obtain a copy of the License at 012 * https://opensso.dev.java.net/public/CDDLv1.0.html or 013 * opensso/legal/CDDLv1.0.txt 014 * See the License for the specific language governing 015 * permission and limitations under the License. 016 * 017 * When distributing Covered Code, include this CDDL 018 * Header Notice in each file and include the License file 019 * at opensso/legal/CDDLv1.0.txt. 020 * If applicable, add the following below the CDDL Header, 021 * with the fields enclosed by brackets [] replaced by 022 * your own identifying information: 023 * "Portions Copyrighted [year] [name of copyright owner]" 024 * 025 * $Id: EventService.java,v 1.19 2009/09/28 21:47:33 ww203982 Exp $ 026 * 027 */ 028 029/* 030 * Portions Copyrighted 2010-2011 ForgeRock AS 031 */ 032 033package com.iplanet.services.ldap.event; 034 035import java.security.AccessController; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Date; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Hashtable; 042import java.util.Iterator; 043import java.util.Map; 044import java.util.Set; 045import java.util.StringTokenizer; 046 047import com.sun.identity.shared.ldap.LDAPConnection; 048import com.sun.identity.shared.ldap.LDAPControl; 049import com.sun.identity.shared.ldap.LDAPEntry; 050import com.sun.identity.shared.ldap.LDAPException; 051import com.sun.identity.shared.ldap.LDAPInterruptedException; 052import com.sun.identity.shared.ldap.LDAPMessage; 053import com.sun.identity.shared.ldap.LDAPResponse; 054import com.sun.identity.shared.ldap.LDAPSearchConstraints; 055import com.sun.identity.shared.ldap.LDAPSearchListener; 056import com.sun.identity.shared.ldap.LDAPSearchResult; 057import com.sun.identity.shared.ldap.LDAPSearchResultReference; 058import com.sun.identity.shared.ldap.controls.LDAPEntryChangeControl; 059import com.sun.identity.shared.ldap.controls.LDAPPersistSearchControl; 060 061import com.sun.identity.common.GeneralTaskRunnable; 062import com.sun.identity.common.SystemTimer; 063import com.sun.identity.shared.debug.Debug; 064import com.iplanet.am.util.SystemProperties; 065import com.iplanet.services.ldap.DSConfigMgr; 066import com.iplanet.services.ldap.LDAPServiceException; 067import com.iplanet.services.ldap.LDAPUser; 068import com.iplanet.services.util.I18n; 069import com.iplanet.sso.SSOException; 070import com.iplanet.sso.SSOToken; 071import com.iplanet.ums.IUMSConstants; 072import com.sun.identity.idm.IdConstants; 073import com.sun.identity.security.AdminTokenAction; 074import com.sun.identity.shared.Constants; 075import com.sun.identity.sm.ServiceSchema; 076import com.sun.identity.sm.ServiceSchemaManager; 077import com.sun.identity.sm.SMSException; 078import com.sun.identity.sm.ServiceManager; 079import org.forgerock.util.thread.listener.ShutdownListener; 080import org.forgerock.util.thread.listener.ShutdownManager; 081 082/** 083 * Event Service monitors changes on the server. Implemented with the persistant 084 * search control. Uses ldapjdk asynchronous interfaces so that multiple search 085 * requests can be processed by a single thread 086 * 087 * The Type of changes that can be monitored are: - 088 * LDAPPersistSearchControl.ADD - 089 * LDAPPersistSearchControl.DELETE - LDAPPersistSearchControl.MODIFY - 090 * LDAPPersistSearchControl.MODDN 091 * 092 * A single connection is established initially and reused to service all 093 * notification requests. 094 * @supported.api 095 */ 096public class EventService implements Runnable { 097 098 protected static DSConfigMgr cm = null; 099 100 // list that holds notification requests 101 protected Map _requestList = null; 102 103 // Thread that listens to DS notifications 104 static Thread _monitorThread = null; 105 106 // search listener for asynch ldap searches 107 static LDAPSearchListener _msgQueue; 108 109 // A singelton patern 110 protected static EventService _instance = null; 111 112 // Don't want the server to return all the 113 // entries. return only the changes. 114 private static final boolean CHANGES_ONLY = true; 115 116 // Want the server to return the entry 117 // change control in the search result 118 private static final boolean RETURN_CONTROLS = true; 119 120 // Don't perform search if Persistent 121 // Search control is not supported. 122 private static final boolean IS_CRITICAL = true; 123 124 private static I18n i18n = I18n.getInstance(IUMSConstants.UMS_PKG); 125 126 protected static Debug debugger = Debug.getInstance("amEventService"); 127 128 // Parameters in AMConfig, that provide values for connection retries 129 protected static final String EVENT_CONNECTION_NUM_RETRIES = 130 "com.iplanet.am.event.connection.num.retries"; 131 132 protected static final String EVENT_CONNECTION_RETRY_INTERVAL = 133 "com.iplanet.am.event.connection.delay.between.retries"; 134 135 protected static final String EVENT_CONNECTION_ERROR_CODES = 136 "com.iplanet.am.event.connection.ldap.error.codes.retries"; 137 138 // Idle timeout in minutes 139 protected static final String EVENT_IDLE_TIMEOUT_INTERVAL = 140 "com.sun.am.event.connection.idle.timeout"; 141 142 protected static final String EVENT_LISTENER_DISABLE_LIST = 143 "com.sun.am.event.connection.disable.list"; 144 145 private static boolean _allDisabled = false; 146 147 private static int _numRetries = 3; 148 149 private static int _retryInterval = 3000; 150 151 private static int _retryMaxInterval = 720000; // 12 minutes 152 153 private static int _retryCount = 1; 154 155 private static long _lastResetTime = 0; 156 157 protected static HashSet _retryErrorCodes; 158 159 // Connection Time Out parameters 160 protected static int _idleTimeOut = 0; // Idle timeout in minutes. 161 162 protected static long _idleTimeOutMills; 163 164 // List of know listeners. The order of the listeners is important 165 // since it is used to enable & disable the listeners 166 private static final String[] ALL_LISTENERS = { 167 "com.iplanet.am.sdk.ldap.ACIEventListener", 168 "com.iplanet.am.sdk.ldap.EntryEventListener", 169 "com.sun.identity.sm.ldap.LDAPEventManager" 170 }; 171 172 protected static String[] listeners; 173 174 protected static Hashtable _ideListenersMap = new Hashtable(); 175 176 protected static volatile boolean _isThreadStarted = false; 177 178 protected static volatile boolean _shutdownCalled = false; 179 180 private static HashSet getPropertyRetryErrorCodes(String key) { 181 HashSet codes = new HashSet(); 182 String retryErrorStr = SystemProperties.get(key); 183 if (retryErrorStr != null && retryErrorStr.trim().length() > 0) { 184 StringTokenizer stz = new StringTokenizer(retryErrorStr, ","); 185 while (stz.hasMoreTokens()) { 186 codes.add(stz.nextToken().trim()); 187 } 188 } 189 return codes; 190 } 191 192 private static int getPropertyIntValue(String key, int defaultValue) { 193 int value = defaultValue; 194 String valueStr = SystemProperties.get(key); 195 if (valueStr != null && valueStr.trim().length() > 0) { 196 try { 197 value = Integer.parseInt(valueStr); 198 } catch (NumberFormatException e) { 199 value = defaultValue; 200 if (debugger.warningEnabled()) { 201 debugger.warning("EventService.getPropertyIntValue(): " 202 + "Invalid value for property: " 203 + EVENT_CONNECTION_NUM_RETRIES 204 + " Defaulting to value: " + defaultValue); 205 } 206 } 207 } 208 209 if (debugger.messageEnabled()) { 210 debugger.message("EventService.getPropertyIntValue(): " + key 211 + " = " + value); 212 } 213 return value; 214 } 215 216 /** 217 * Determine the listener list based on the diable list property 218 * and SMS DataStore notification property in Realm mode 219 */ 220 private static void getListenerList() { 221 String list = SystemProperties.get(EVENT_LISTENER_DISABLE_LIST, ""); 222 if (debugger.messageEnabled()) { 223 debugger.message("EventService.getListenerList(): " + 224 EVENT_LISTENER_DISABLE_LIST + ": " + list); 225 } 226 227 boolean enableDataStoreNotification = Boolean.parseBoolean( 228 SystemProperties.get(Constants.SMS_ENABLE_DB_NOTIFICATION)); 229 if (debugger.messageEnabled()) { 230 debugger.message("EventService.getListenerList(): " + 231 "com.sun.identity.sm.enableDataStoreNotification: " + 232 enableDataStoreNotification); 233 } 234 235 boolean configTime = Boolean.parseBoolean(SystemProperties.get( 236 Constants.SYS_PROPERTY_INSTALL_TIME)); 237 if (debugger.messageEnabled()) { 238 debugger.message("EventService.getListenerList(): " + 239 Constants.SYS_PROPERTY_INSTALL_TIME + ": " + configTime); 240 } 241 242 // Copy the default listeners 243 String[] tmpListeners = new String[ALL_LISTENERS.length]; 244 System.arraycopy(ALL_LISTENERS, 0, tmpListeners, 0, ALL_LISTENERS.length); 245 246 // Process the configured disabled list first 247 boolean disableACI = false, disableUM = false, disableSM = false; 248 if (list.length() != 0) { 249 StringTokenizer st = new StringTokenizer(list, ","); 250 String listener = ""; 251 while (st.hasMoreTokens()) { 252 listener = st.nextToken().trim(); 253 if (listener.equalsIgnoreCase("aci")) { 254 disableACI = true; 255 } else if (listener.equalsIgnoreCase("um")) { 256 disableUM = true; 257 } else if (listener.equalsIgnoreCase("sm")) { 258 disableSM = true; 259 } else { 260 debugger.error("EventService.getListenerList() - " + 261 "Invalid listener name: " + listener); 262 } 263 } 264 } 265 266 if (!disableUM || !disableACI) { 267 // Check if AMSDK is configured 268 boolean disableAMSDK = true; 269 if (!configTime) { 270 try { 271 ServiceSchemaManager scm = new ServiceSchemaManager( 272 getSSOToken(), IdConstants.REPO_SERVICE, "1.0"); 273 ServiceSchema idRepoSubSchema = scm.getOrganizationSchema(); 274 Set idRepoPlugins = idRepoSubSchema.getSubSchemaNames(); 275 if (idRepoPlugins.contains("amSDK")) { 276 disableAMSDK = false; 277 } 278 } catch (SMSException ex) { 279 if (debugger.warningEnabled()) { 280 debugger.warning("EventService.getListenerList() - " + 281 "Unable to obtain idrepo service", ex); 282 } 283 } catch (SSOException ex) { 284 // Should not happen, ignore the exception 285 } 286 } 287 if (disableAMSDK) { 288 disableUM = true; 289 disableACI = true; 290 if (debugger.messageEnabled()) { 291 debugger.message("EventService.getListener" + 292 "List(): AMSDK is not configured or config time. " + 293 "Disabling UM and ACI event listeners"); 294 } 295 } 296 } 297 298 // Verify if SMSnotification should be enabled 299 if (configTime || ServiceManager.isRealmEnabled()) { 300 disableSM = !enableDataStoreNotification; 301 if (debugger.messageEnabled()) { 302 debugger.message("EventService.getListenerList(): In realm " + 303 "mode or config time, SMS listener is set to datastore " + 304 "notification flag: " + enableDataStoreNotification); 305 } 306 } 307 308 // Disable the selected listeners 309 if (disableACI) { 310 tmpListeners[0] = null; 311 } 312 if (disableUM) { 313 tmpListeners[1] = null; 314 } 315 if (disableSM) { 316 tmpListeners[2] = null; 317 } 318 listeners = tmpListeners; 319 320 // if all disabled, signal to not start the thread 321 if (disableACI && disableUM && disableSM) { 322 if (debugger.messageEnabled()) { 323 debugger.message("EventService.getListenerList() - " + 324 "all listeners are disabled, EventService won't start"); 325 } 326 _allDisabled = true; 327 } else { 328 _allDisabled = false; 329 } 330 } 331 332 /** 333 * Private Constructor 334 */ 335 protected EventService() throws EventException { 336 getConfigManager(); 337 _requestList = Collections.synchronizedMap(new HashMap()); 338 } 339 340 /** 341 * create the singelton EventService object if it doesn't exist already. 342 * Check if directory server supports the Persistent Search Control and the 343 * Proxy Auth Control 344 * @supported.api 345 */ 346 public synchronized static EventService getEventService() 347 throws EventException, LDAPException { 348 349 if (_shutdownCalled) { 350 return null; 351 } 352 353 // Make sure only one instance of this class is created. 354 if (_instance == null) { 355 // Determine the Idle time out value for Event Service (LB/Firewall) 356 // scenarios. Value == 0 imples no idle timeout. 357 _idleTimeOut = getPropertyIntValue(EVENT_IDLE_TIMEOUT_INTERVAL, 358 _idleTimeOut); 359 _idleTimeOutMills = _idleTimeOut * 60000; 360 ShutdownManager shutdownMan = com.sun.identity.common.ShutdownManager.getInstance(); 361 362 if (_idleTimeOut == 0) { 363 _instance = new EventService(); 364 } else { 365 _instance = new EventServicePolling(); 366 } 367 shutdownMan.addShutdownListener(new 368 ShutdownListener() { 369 public void shutdown() { 370 if (_instance != null) { 371 _instance.finalize(); 372 } 373 } 374 }); 375 376 } 377 return _instance; 378 } 379 380 protected static String getName() { 381 return "EventService"; 382 } 383 384 /** 385 * At the end, close THE Event Manager's connections Abandon all previous 386 * persistent search requests 387 * @supported.api 388 */ 389 public void finalize() { 390 synchronized (this) { 391 _shutdownCalled = true; 392 if ((_monitorThread != null) && (_monitorThread.isAlive())) { 393 _monitorThread.interrupt(); 394 _isThreadStarted = false; 395 } 396 } 397 synchronized (_requestList) { 398 Collection requestObjs = _requestList.values(); 399 Iterator iter = requestObjs.iterator(); 400 while (iter.hasNext()) { 401 Request request = (Request) iter.next(); 402 removeListener(request); 403 } 404 _requestList.clear(); 405 } 406 } 407 408 /** 409 * Adds a listener to the directory. 410 * @supported.api 411 */ 412 protected synchronized String addListener(SSOToken token, 413 IDSEventListener listener, String base, int scope, String filter, 414 int operations) throws LDAPException, EventException { 415 416 if (_shutdownCalled) { 417 throw new EventException(i18n 418 .getString(IUMSConstants.DSCFG_CONNECTFAIL)); 419 } 420 421 LDAPConnection lc = null; 422 try { 423 // Check for SMS listener and use "sms" group if present 424 if ((listener.getClass().getName().equals( 425 "com.sun.identity.sm.ldap.LDAPEventManager")) && 426 (cm.getServerGroup("sms") != null)) { 427 lc = cm.getNewConnection("sms", LDAPUser.Type.AUTH_ADMIN); 428 429 } else { 430 lc = cm.getNewAdminConnection(); 431 } 432 } catch (LDAPServiceException le) { 433 throw new EventException(i18n 434 .getString(IUMSConstants.DSCFG_CONNECTFAIL), le); 435 } 436 437 LDAPSearchConstraints cons = lc.getSearchConstraints(); 438 439 // Create Persistent Search Control object 440 LDAPPersistSearchControl psearchCtrl = new LDAPPersistSearchControl( 441 operations, CHANGES_ONLY, RETURN_CONTROLS, IS_CRITICAL); 442 443 // Add LDAPControl array to the search constraint object 444 cons.setServerControls(psearchCtrl); 445 cons.setBatchSize(1); 446 447 // Listeners can not read attributes from the event. 448 // Request only javaClassName to be able to determine object type 449 String[] attrs = new String[] { "objectclass" }; 450 LDAPSearchListener searchListener = null; 451 // Set (asynchronous) persistent search request in the DS 452 try { 453 if (debugger.messageEnabled()) { 454 debugger.message("EventService.addListener() - Submiting " 455 + "Persistent Search on: " + base + " for listener: " 456 + listener); 457 } 458 searchListener = lc.search(base, scope, filter, attrs, false, 459 null, cons); 460 } catch (LDAPException le) { 461 if ((lc != null) && lc.isConnected()) { 462 try { 463 lc.disconnect(); 464 } catch (Exception ex) { 465 //ignored 466 } 467 } 468 debugger.error("EventService.addListener() - Failed to set " 469 + "Persistent Search" + le.getMessage()); 470 throw le; 471 } 472 473 int[] outstandingRequests = searchListener.getMessageIDs(); 474 int id = outstandingRequests[outstandingRequests.length - 1]; 475 476 String reqID = Integer.toString(id); 477 long startTime = System.currentTimeMillis(); 478 Request request = new Request(id, reqID, token, base, scope, filter, 479 attrs, operations, listener, lc, startTime); 480 _requestList.put(reqID, request); 481 482 // Add this search request to the m_msgQueue so it can be 483 // processed by the monitor thread 484 if (_msgQueue == null) { 485 _msgQueue = searchListener; 486 } else { 487 _msgQueue.merge(searchListener); 488 } 489 490 if (!_isThreadStarted) { 491 startMonitorThread(); 492 } else { 493 if (_requestList.size() == 1) { 494 notify(); 495 } 496 } 497 498 if (debugger.messageEnabled()) { 499 outstandingRequests = _msgQueue.getMessageIDs(); 500 debugger.message("EventService.addListener(): merged Listener: " 501 + " requestID: " + reqID + " & Request: " + request 502 + " on to message Queue. No. of current outstanding " 503 + "requests = " + outstandingRequests.length); 504 } 505 506 // Create new (EventService) Thread, if one doesn't exist. 507 return reqID; 508 } 509 510 public IDSEventListener getIDSListeners(String className) { 511 return (IDSEventListener) _ideListenersMap.get(className); 512 } 513 514 public static boolean isThreadStarted() { 515 return _isThreadStarted; 516 } 517 518 /** 519 * Main monitor thread loop. Wait for persistent search change notifications 520 * 521 * @supported.api 522 */ 523 public void run() { 524 try { 525 if (debugger.messageEnabled()) { 526 debugger.message("EventService.run(): Event Thread is running! " 527 + "No Idle timeout Set: " + _idleTimeOut + " minutes."); 528 } 529 530 boolean successState = true; 531 LDAPMessage message = null; 532 while (successState) { 533 try { 534 if (debugger.messageEnabled()) { 535 debugger.message("EventService.run(): Waiting for " 536 + "response"); 537 } 538 synchronized (this) { 539 if (_requestList.isEmpty()) { 540 wait(); 541 } 542 } 543 message = _msgQueue.getResponse(); 544 successState = processResponse(message); 545 } catch (LDAPInterruptedException ex) { 546 if (_shutdownCalled) { 547 break; 548 } else { 549 if (debugger.warningEnabled()) { 550 debugger.warning("EventService.run() " + 551 "LDAPInterruptedException received:", ex); 552 } 553 } 554 } catch (LDAPException ex) { 555 if (_shutdownCalled) { 556 break; 557 } else { 558 int resultCode = ex.getLDAPResultCode(); 559 if (debugger.warningEnabled()) { 560 debugger.warning("EventService.run() LDAPException " 561 + "received:", ex); 562 } 563 _retryErrorCodes = getPropertyRetryErrorCodes( 564 EVENT_CONNECTION_ERROR_CODES); 565 566 // Catch special error codition in 567 // LDAPSearchListener.getResponse 568 String msg = ex.getMessage(); 569 if ((resultCode == LDAPException.OTHER) && 570 (msg != null) && msg.equals("Invalid response")) { 571 // We should not try to resetError and retry 572 processNetworkError(ex); 573 } else { 574 if (_retryErrorCodes.contains("" + resultCode)) { 575 resetErrorSearches(true); 576 } else { // Some other network error 577 processNetworkError(ex); 578 } 579 } 580 } 581 } 582 } // end of while loop 583 } catch (InterruptedException ex) { 584 if (!_shutdownCalled) { 585 if (debugger.warningEnabled()) { 586 debugger.warning("EventService.run(): Interrupted exception" 587 + " caught.", ex); 588 } 589 } 590 } catch (RuntimeException ex) { 591 if (debugger.warningEnabled()) { 592 debugger.warning("EventService.run(): Runtime exception " 593 + "caught.", ex); 594 } 595 // rethrow the Runtime exception to let the container handle the 596 // exception. 597 throw ex; 598 } catch (Exception ex) { 599 if (debugger.warningEnabled()) { 600 debugger.warning("EventService.run(): Unknown exception " 601 + "caught.", ex); 602 } 603 // no need to rethrow. 604 } catch (Throwable t) { 605 // Catching Throwable to prevent the thread from exiting. 606 if (debugger.warningEnabled()) { 607 debugger.warning("EventService.run(): Unknown exception " 608 + "caught. Sleeping for a while.. ", t); 609 } 610 // rethrow the Error to let the container handle the error. 611 throw new Error(t); 612 } finally { 613 synchronized (this) { 614 if (!_shutdownCalled) { 615 // try to restart the monitor thread. 616 _monitorThread = null; 617 startMonitorThread(); 618 } 619 } 620 } 621 } // end of thread 622 623 private static synchronized void startMonitorThread() { 624 if (((_monitorThread == null) || !_monitorThread.isAlive()) && 625 !_shutdownCalled) { 626 // Even if the monitor thread is not alive, we should use the 627 // same instance of Event Service object (as it maintains all 628 // the listener information) 629 _monitorThread = new Thread(_instance, getName()); 630 _monitorThread.setDaemon(true); 631 _monitorThread.start(); 632 633 // Since this is a singleton class once a getEventService() 634 // is invoked the thread will be started and the variable 635 // will be set to true. This will help other components 636 // to avoid starting it once again if the thread has 637 // started. 638 _isThreadStarted = true; 639 } 640 } 641 642 protected boolean retryManager(boolean clearCaches) { 643 long now = System.currentTimeMillis(); 644 // reset _retryCount to 1 after 12 hours 645 if ((now - _lastResetTime) > 43200000) { 646 _retryCount = 1; 647 _lastResetTime = now; 648 } 649 650 int i = _retryCount * _retryInterval; 651 if (i > _retryMaxInterval) { 652 i = _retryMaxInterval; 653 } else { 654 _retryCount *= 2; 655 } 656 657 if (debugger.messageEnabled()) { 658 debugger.message("EventService.retryManager() - wait " + 659 (i / 1000) +" seconds before calling resetAllSearches"); 660 } 661 sleepRetryInterval(i); 662 return resetAllSearches(clearCaches); 663 } 664 665 /** 666 * Method which process the Response received from the DS. 667 * 668 * @param message - 669 * the LDAPMessage received as response 670 * @return true if the reset was successful. False Otherwise. 671 */ 672 protected boolean processResponse(LDAPMessage message) { 673 if ((message == null) && (!_requestList.isEmpty())) { 674 // Some problem with the message queue. We should 675 // try to reset it. 676 debugger.error("EventService.processResponse() - Received a NULL Response, call retryManager"); 677 return retryManager(false); 678 } 679 680 if (debugger.messageEnabled()) { 681 debugger.message("EventService.processResponse() - received " 682 + "DS message => " + message.toString()); 683 } 684 685 // To determine if the monitor thread needs to be stopped. 686 boolean successState = true; 687 688 Request request = getRequestEntry(message.getMessageID()); 689 690 // If no listeners, abandon this message id 691 if (request == null) { 692 // We do not have anything stored about this message id. 693 // So, just log a message and do nothing. 694 if (debugger.messageEnabled()) { 695 debugger.message("EventService.processResponse() - Received " 696 + "ldap message with unknown id = " 697 + message.getMessageID()); 698 } 699 } else if (message.getMessageType() == 700 LDAPMessage.LDAP_SEARCH_RESULT_MESSAGE) { 701 // then must be a LDAPSearchResult carrying change control 702 processSearchResultMessage((LDAPSearchResult) message, request); 703 request.setLastUpdatedTime(System.currentTimeMillis()); 704 } else if (message.getMessageType() == 705 LDAPMessage.LDAP_RESPONSE_MESSAGE) { 706 // Check for error message ... 707 LDAPResponse rsp = (LDAPResponse) message; 708 successState = processResponseMessage(rsp, request); 709 } else if (message.getMessageType() == 710 LDAPMessage.LDAP_SEARCH_RESULT_REFERENCE_MESSAGE) { // Referral 711 processSearchResultRef( 712 (LDAPSearchResultReference) message, request); 713 } 714 return successState; 715 } 716 717 /** 718 * removes the listener from the list of Persistent Search listeners of the 719 * asynchronous seach for the given search ID. 720 * 721 * @param request 722 * The request returned by the addListener 723 * @supported.api 724 */ 725 protected void removeListener(Request request) { 726 LDAPConnection connection = request.getLDAPConnection(); 727 if (connection != null) { 728 if (debugger.messageEnabled()) { 729 debugger.message("EventService.removeListener(): Removing " 730 + "listener requestID: " + request.getRequestID() 731 + " Listener: " + request.getListener()); 732 } 733 try { 734 if ((connection != null) && (connection.isConnected())) { 735 connection.abandon(request.getId()); 736 connection.disconnect(); 737 } 738 } catch (LDAPException le) { 739 // Might have to check the reset codes and try to reset 740 if (debugger.warningEnabled()) { 741 debugger.warning("EventService.removeListener(): " 742 + "LDAPException, when trying to remove listener", 743 le); 744 } 745 } 746 } 747 } 748 749 750 /** 751 * Reset error searches. Clear cache only if true is passed to argument 752 * 753 * @param clearCaches 754 */ 755 protected void resetErrorSearches(boolean clearCaches) { 756 757 Hashtable tmpReqList = new Hashtable(); 758 tmpReqList.putAll(_requestList); 759 760 int[] ids = _msgQueue.getMessageIDs(); 761 if (ids != null) { 762 for (int i = 0; i < ids.length; i++) { 763 String reqID = Integer.toString(ids[i]); 764 tmpReqList.remove(reqID); 765 } 766 } 767 Collection reqList = tmpReqList.values(); 768 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 769 Request req = (Request) iter.next(); 770 _requestList.remove(req.getRequestID()); 771 } 772 _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL, 773 _retryInterval); 774 RetryTask task = new RetryTask(tmpReqList); 775 task.clearCache(clearCaches); 776 SystemTimer.getTimer().schedule(task, new Date((( 777 System.currentTimeMillis() + _retryInterval) / 1000) * 1000)); 778 } 779 780 /** 781 * Reset all searches. Clear cache only if true is passed to argument 782 * 783 * @param clearCaches 784 * @return <code>true</code> if the reset was successful, otherwise <code>false</code> 785 */ 786 public synchronized boolean resetAllSearches(boolean clearCaches) { 787 if (_shutdownCalled) { 788 return false; 789 } 790 791 // Make a copy of the existing psearches 792 Hashtable tmpReqList = new Hashtable(); 793 tmpReqList.putAll(_requestList); 794 _requestList.clear(); // Will be updated in addListener method 795 Collection reqList = tmpReqList.values(); 796 797 // Clear the cache, if parameter is set 798 if (clearCaches && !reqList.isEmpty()) { 799 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 800 Request req = (Request) iter.next(); 801 IDSEventListener el = req.getListener(); 802 el.allEntriesChanged(); 803 } 804 } 805 806 // Get the list of psearches to be enabled 807 getListenerList(); 808 if (_allDisabled) { 809 // All psearches are disabled, remove listeners if any and return 810 if (debugger.messageEnabled()) { 811 debugger.message("EventService.resetAllSearches(): " + 812 "All psearches have been disabled"); 813 } 814 if (!reqList.isEmpty()) { 815 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 816 Request req = (Request) iter.next(); 817 removeListener(req); 818 if (debugger.messageEnabled()) { 819 debugger.message("EventService.resetAllSearches(): " + 820 "Psearch disabled: " + 821 req.getListener().getClass().getName()); 822 } 823 } 824 } 825 return true; 826 } 827 828 // Psearches are enabled, verify and reinitilize 829 // Maintain the listeners to reinitialized in tmpListenerList 830 Set tmpListenerList = new HashSet(); 831 Set newListenerList = new HashSet(); 832 for (int i = 0; i < listeners.length; i++) { 833 if (listeners[i] != null) { 834 // Check if the listener is present in reqList 835 boolean present = false; 836 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 837 Request req = (Request) iter.next(); 838 IDSEventListener el = req.getListener(); 839 String listenerClass = el.getClass().getName(); 840 if (listenerClass.equals(listeners[i])) { 841 present = true; 842 iter.remove(); 843 tmpListenerList.add(req); 844 } 845 } 846 if (!present) { 847 // Add the listner object 848 if (debugger.messageEnabled()) { 849 debugger.message("EventService.resetAllSearches(): " + 850 "Psearch being added: " + listeners[i]); 851 } 852 newListenerList.add(listeners[i]); 853 } 854 } 855 } 856 // Remove the listeners not configured 857 if (!reqList.isEmpty()) { 858 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 859 Request req = (Request) iter.next(); 860 removeListener(req); 861 if (debugger.messageEnabled()) { 862 debugger.message("EventService.resetAllSearches(): " + 863 "Psearch disabled due to configuration changes: " + 864 req.getListener().getClass().getName()); 865 } 866 } 867 } 868 // Reset the requested list 869 reqList = tmpListenerList; 870 871 // Determine the number of retry attempts in case of failure 872 // If retry property is set to -1, retries will be done infinitely 873 _numRetries = getPropertyIntValue(EVENT_CONNECTION_NUM_RETRIES, 874 _numRetries); 875 int retry = 1; 876 boolean doItAgain = ((_numRetries == -1) || ((_numRetries != 0) && 877 (retry <= _numRetries))) ? true : false; 878 while (doItAgain) { 879 if (debugger.messageEnabled()) { 880 String str = (_numRetries == -1) ? "indefinitely" : Integer 881 .toString(retry); 882 debugger.message("EventService.resetAllSearches(): " 883 + "retrying = " + str); 884 } 885 886 // Note: Avoid setting the messageQueue to null and just 887 // try to disconnect the connections. That way we can be sure 888 // that we have not lost any responses. 889 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 890 try { 891 Request request = (Request) iter.next(); 892 893 // First add a new listener and then remove the old one 894 // that we do don't loose any responses to the message 895 // Queue. 896 addListener(request.getRequester(), request.getListener(), 897 request.getBaseDn(), request.getScope(), 898 request.getFilter(), request.getOperations()); 899 removeListener(request); 900 iter.remove(); 901 } catch (LDAPServiceException e) { 902 // Ignore exception and retry as we are in the process of 903 // re-establishing the searches. Notify Listeners after the 904 // attempt 905 if (retry == _numRetries) { 906 processNetworkError(e); 907 } 908 } catch (LDAPException le) { 909 // Ignore exception and retry as we are in the process of 910 // re-establishing the searches. Notify Listeners after the 911 // attempt 912 if (retry == _numRetries) { 913 processNetworkError(le); 914 } 915 } 916 } 917 918 // Check if new listeners need to be added 919 for (Iterator iter = newListenerList.iterator(); iter.hasNext();) { 920 String listnerClass = (String) iter.next(); 921 try { 922 Class thisClass = Class.forName(listnerClass); 923 IDSEventListener listener = (IDSEventListener) 924 thisClass.newInstance(); 925 _ideListenersMap.put(listnerClass, listener); 926 _instance.addListener(getSSOToken(), listener, 927 listener.getBase(), listener.getScope(), 928 listener.getFilter(), listener.getOperations()); 929 if (debugger.messageEnabled()) { 930 debugger.message("EventService.resetAllSearches() - " + 931 "successfully initialized: " + listnerClass); 932 } 933 iter.remove(); 934 } catch (Exception e) { 935 debugger.error("EventService.resetAllSearches() " + 936 "Unable to start listener " + listnerClass, e); 937 } 938 } 939 940 if (reqList.isEmpty() && newListenerList.isEmpty()) { 941 return true; 942 } else { 943 if (_numRetries != -1) { 944 doItAgain = (++retry <= _numRetries) ? true : false; 945 if (!doItAgain) { 946 // remove the requests fail to be resetted 947 // would try to reinitialized the next time 948 for (Iterator iter = reqList.iterator(); 949 iter.hasNext();) { 950 Request req = (Request) iter.next(); 951 removeListener(req); 952 debugger.error("EventService.resetAll" + 953 "Searches(): unable to restart: " + 954 req.getListener().getClass().getName()); 955 } 956 for (Iterator iter = newListenerList.iterator(); 957 iter.hasNext();) { 958 String req = (String) iter.next(); 959 debugger.error("EventService.resetAll" + 960 "Searches(): unable add listener: " + req); 961 } 962 } 963 } 964 } 965 if (doItAgain) { 966 // Sleep before retry 967 sleepRetryInterval(); 968 } 969 } // end while loop 970 return false; 971 } 972 973 protected void sleepRetryInterval() { 974 _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL, 975 _retryInterval); 976 try { 977 Thread.sleep(_retryInterval); 978 } catch (InterruptedException ie) { 979 // ignore 980 } 981 } 982 983 protected void sleepRetryInterval(int interval) { 984 try { 985 Thread.sleep(interval); 986 } catch (InterruptedException ie) { // ignore 987 } 988 } 989 990 /** 991 * get a handle to the Directory Server Configuration Manager sets the value 992 */ 993 protected static void getConfigManager() throws EventException { 994 try { 995 cm = DSConfigMgr.getDSConfigMgr(); 996 } catch (LDAPServiceException lse) { 997 debugger.error("EventService.getConfigManager() - Failed to get " 998 + "handle to Configuration Manager", lse); 999 throw new EventException(i18n 1000 .getString(IUMSConstants.DSCFG_NOCFGMGR), lse); 1001 } 1002 } 1003 1004 private void dispatchException(Exception e, Request request) { 1005 IDSEventListener el = request.getListener(); 1006 debugger.error("EventService.dispatchException() - dispatching " 1007 + "exception to the listener: " + request.getRequestID() 1008 + " Listener: " + request.getListener(), e); 1009 el.eventError(e.toString()); 1010 } 1011 1012 /** 1013 * Dispatch naming event to all listeners 1014 */ 1015 private void dispatchEvent(DSEvent dirEvent, Request request) { 1016 IDSEventListener el = request.getListener(); 1017 el.entryChanged(dirEvent); 1018 } 1019 1020 /** 1021 * On network error, create ExceptionEvent and delever it to all listeners 1022 * on all events. 1023 */ 1024 protected void processNetworkError(Exception ex) { 1025 Hashtable tmpRequestList = new Hashtable(); 1026 tmpRequestList.putAll(_requestList); 1027 int[] ids = _msgQueue.getMessageIDs(); 1028 if (ids != null) { 1029 for (int i = 0; i < ids.length; i++) { 1030 tmpRequestList.remove(Integer.toString(ids[i])); 1031 } 1032 } 1033 Collection reqList = tmpRequestList.values(); 1034 for (Iterator iter = reqList.iterator(); iter.hasNext();) { 1035 Request request = (Request) iter.next(); 1036 dispatchException(ex, request); 1037 } 1038 } 1039 1040 /** 1041 * Response message carries a LDAP error. Response with the code 0 1042 * (SUCCESS), should never be received as persistent search never completes, 1043 * it has to be abandon. Referral messages are ignored 1044 */ 1045 protected boolean processResponseMessage(LDAPResponse rsp, 1046 Request request) { 1047 _retryErrorCodes = getPropertyRetryErrorCodes( 1048 EVENT_CONNECTION_ERROR_CODES); 1049 int resultCode = rsp.getResultCode(); 1050 if (_retryErrorCodes.contains("" + resultCode)) { 1051 if (debugger.messageEnabled()) { 1052 debugger.message("EventService.processResponseMessage() - " 1053 + "received LDAP Response for requestID: " 1054 + request.getRequestID() + " Listener: " 1055 + request.getListener() + "Need restarting"); 1056 } 1057 resetErrorSearches(false); 1058 } else if (resultCode != 0 1059 || resultCode != LDAPException.REFERRAL) { 1060 // If not neither of the cases then 1061 if (resultCode == LDAPException.BUSY) { 1062 debugger.error("EventService.processResponseMessage() - received error BUSY, call retryManager"); 1063 return retryManager(false); 1064 } 1065 LDAPException ex = new LDAPException("Error result", rsp 1066 .getResultCode(), rsp.getErrorMessage(), 1067 rsp.getMatchedDN()); 1068 dispatchException(ex, request); 1069 } 1070 return true; 1071 } 1072 1073 /** 1074 * Process change notification attached as the change control to the message 1075 */ 1076 protected void processSearchResultMessage(LDAPSearchResult res, 1077 Request req) { 1078 LDAPEntry modEntry = res.getEntry(); 1079 1080 if (debugger.messageEnabled()) { 1081 debugger.message("EventService.processSearchResultMessage() - " 1082 + "Changed " + modEntry.getDN()); 1083 } 1084 1085 /* Get any entry change controls. */ 1086 LDAPControl[] ctrls = res.getControls(); 1087 1088 // Can not create event without change control 1089 if (ctrls == null) { 1090 Exception ex = new Exception("EventService - Cannot create " 1091 + "NamingEvent, no change control info"); 1092 dispatchException(ex, req); 1093 } else { 1094 // Multiple controls might be in the message 1095 for (int i = 0; i < ctrls.length; i++) { 1096 LDAPEntryChangeControl changeCtrl = null; 1097 1098 if (ctrls[i].getType() == 1099 LDAPControl.LDAP_ENTRY_CHANGE_CONTROL) { 1100 changeCtrl = (LDAPEntryChangeControl) ctrls[i]; 1101 if (debugger.messageEnabled()) { 1102 debugger.message("EventService." 1103 + "processSearchResultMessage() changeCtrl = " 1104 + changeCtrl.toString()); 1105 } 1106 1107 // Can not create event without change control 1108 if (changeCtrl.getChangeType() == -1) { 1109 Exception ex = new Exception("EventService - Cannot " 1110 + "create NamingEvent, no change control info"); 1111 dispatchException(ex, req); 1112 } 1113 1114 // Convert control into a DSEvent and dispatch to listeners 1115 try { 1116 DSEvent event = createDSEvent( 1117 modEntry, changeCtrl, req); 1118 dispatchEvent(event, req); 1119 } catch (Exception ex) { 1120 dispatchException(ex, req); 1121 } 1122 } 1123 } 1124 } 1125 } 1126 1127 /** 1128 * Search continuation messages are ignored. 1129 */ 1130 protected void processSearchResultRef(LDAPSearchResultReference ref, 1131 Request req) { 1132 // Do nothing, message ignored, do not dispatch ExceptionEvent 1133 if (debugger.messageEnabled()) { 1134 debugger.message("EventService.processSearchResultRef() - " 1135 + "Ignoring.."); 1136 } 1137 } 1138 1139 protected static SSOToken getSSOToken() throws SSOException { 1140 return ((SSOToken) AccessController.doPrivileged( 1141 AdminTokenAction.getInstance())); 1142 } 1143 1144 /** 1145 * Find event entry by message ID 1146 */ 1147 protected Request getRequestEntry(int id) { 1148 return (Request) _requestList.get(Integer.toString(id)); 1149 } 1150 1151 /** 1152 * Create naming event from a change control 1153 */ 1154 private DSEvent createDSEvent(LDAPEntry entry, 1155 LDAPEntryChangeControl changeCtrl, Request req) throws Exception { 1156 DSEvent dsEvent = new DSEvent(); 1157 1158 if (debugger.messageEnabled()) { 1159 debugger.message("EventService.createDSEvent() - Notifying event " 1160 + "to: " + req.getListener()); 1161 } 1162 1163 // Get the dn from the entry 1164 String dn = entry.getDN(); 1165 dsEvent.setID(dn); 1166 1167 // Get information on the type of change made 1168 int changeType = changeCtrl.getChangeType(); 1169 dsEvent.setEventType(changeType); 1170 1171 // Pass the search ID as the event's change info 1172 dsEvent.setSearchID(req.getRequestID()); 1173 1174 // set the object class name 1175 String className = entry.getAttribute("objectclass").toString(); 1176 dsEvent.setClassName(className); 1177 1178 return dsEvent; 1179 } 1180 1181 class RetryTask extends GeneralTaskRunnable { 1182 1183 private long runPeriod; 1184 private Map requests; 1185 private boolean clearCaches; 1186 private int numRetries; 1187 1188 public RetryTask(Map requests) { 1189 1190 this.runPeriod = getPropertyIntValue( 1191 EVENT_CONNECTION_RETRY_INTERVAL, EventService._retryInterval); 1192 this.requests = requests; 1193 this.numRetries = _numRetries; 1194 } 1195 1196 public void clearCache(boolean cc) { 1197 clearCaches = cc; 1198 } 1199 1200 public void run() { 1201 for (Iterator iter = requests.values().iterator(); 1202 iter.hasNext();) { 1203 Request req = (Request) iter.next(); 1204 try { 1205 // First add a new listener and then remove the old one 1206 // that we do don't loose any responses to the message 1207 // Queue. However before adding check if request list 1208 // already has this listener initialized 1209 if (!_requestList.containsValue(req)) { 1210 addListener(req.getRequester(), req.getListener(), 1211 req.getBaseDn(), req.getScope(), 1212 req.getFilter(), req.getOperations()); 1213 } 1214 removeListener(req); 1215 if (clearCaches) { 1216 // Send all entries changed notifications 1217 // only after successful establishment of psearch 1218 req.getListener().allEntriesChanged(); 1219 } 1220 iter.remove(); 1221 } catch (Exception e) { 1222 debugger.error("RetryTask", e); 1223 // Ignore exception and retry as we are in the process of 1224 // re-establishing the searches. Notify Listeners after the 1225 // attempt 1226 } 1227 } 1228 if (--numRetries == 0) { 1229 debugger.error("NumRetries " + numRetries); 1230 runPeriod = -1; 1231 } 1232 } 1233 1234 public long getRunPeriod() { 1235 return runPeriod; 1236 } 1237 1238 public boolean isEmpty() { 1239 return true; 1240 } 1241 1242 public boolean addElement(Object obj) { 1243 return false; 1244 } 1245 1246 public boolean removeElement(Object obj) { 1247 return false; 1248 } 1249 } 1250}
Copyright © 2010-2017, ForgeRock All Rights Reserved.