001/**
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003 *
004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved
005 *
006 * The contents of this file are subject to the terms
007 * of the Common Development and Distribution License
008 * (the License). You may not use this file except in
009 * compliance with the License.
010 *
011 * You can obtain a copy of the License at
012 * https://opensso.dev.java.net/public/CDDLv1.0.html or
013 * opensso/legal/CDDLv1.0.txt
014 * See the License for the specific language governing
015 * permission and limitations under the License.
016 *
017 * When distributing Covered Code, include this CDDL
018 * Header Notice in each file and include the License file
019 * at opensso/legal/CDDLv1.0.txt.
020 * If applicable, add the following below the CDDL Header,
021 * with the fields enclosed by brackets [] replaced by
022 * your own identifying information:
023 * "Portions Copyrighted [year] [name of copyright owner]"
024 *
025 * $Id: EventService.java,v 1.19 2009/09/28 21:47:33 ww203982 Exp $
026 *
027 */
028
029/*
030 * Portions Copyrighted 2010-2011 ForgeRock AS
031 */
032
033package com.iplanet.services.ldap.event;
034
035import java.security.AccessController;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Date;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Hashtable;
042import java.util.Iterator;
043import java.util.Map;
044import java.util.Set;
045import java.util.StringTokenizer;
046
047import com.sun.identity.shared.ldap.LDAPConnection;
048import com.sun.identity.shared.ldap.LDAPControl;
049import com.sun.identity.shared.ldap.LDAPEntry;
050import com.sun.identity.shared.ldap.LDAPException;
051import com.sun.identity.shared.ldap.LDAPInterruptedException;
052import com.sun.identity.shared.ldap.LDAPMessage;
053import com.sun.identity.shared.ldap.LDAPResponse;
054import com.sun.identity.shared.ldap.LDAPSearchConstraints;
055import com.sun.identity.shared.ldap.LDAPSearchListener;
056import com.sun.identity.shared.ldap.LDAPSearchResult;
057import com.sun.identity.shared.ldap.LDAPSearchResultReference;
058import com.sun.identity.shared.ldap.controls.LDAPEntryChangeControl;
059import com.sun.identity.shared.ldap.controls.LDAPPersistSearchControl;
060
061import com.sun.identity.common.GeneralTaskRunnable;
062import com.sun.identity.common.ShutdownListener;
063import com.sun.identity.common.ShutdownManager;
064import com.sun.identity.common.SystemTimer;
065import com.sun.identity.shared.debug.Debug;
066import com.iplanet.am.util.SystemProperties;
067import com.iplanet.services.ldap.DSConfigMgr;
068import com.iplanet.services.ldap.LDAPServiceException;
069import com.iplanet.services.ldap.LDAPUser;
070import com.iplanet.services.util.I18n;
071import com.iplanet.sso.SSOException;
072import com.iplanet.sso.SSOToken;
073import com.iplanet.ums.IUMSConstants;
074import com.sun.identity.idm.IdConstants;
075import com.sun.identity.security.AdminTokenAction;
076import com.sun.identity.shared.Constants;
077import com.sun.identity.sm.ServiceSchema;
078import com.sun.identity.sm.ServiceSchemaManager;
079import com.sun.identity.sm.SMSException;
080import com.sun.identity.sm.ServiceManager;
081
082/**
083 * Event Service monitors changes on the server. Implemented with the persistant
084 * search control. Uses ldapjdk asynchronous interfaces so that multiple search
085 * requests can be processed by a single thread
086 * 
087 * The Type of changes that can be monitored are: - 
088 * LDAPPersistSearchControl.ADD -
089 * LDAPPersistSearchControl.DELETE - LDAPPersistSearchControl.MODIFY -
090 * LDAPPersistSearchControl.MODDN
091 * 
092 * A single connection is established initially and reused to service all
093 * notification requests.
094 * @supported.api
095 */
096public class EventService implements Runnable {
097
098    protected static DSConfigMgr cm = null;
099
100    // list that holds notification requests
101    protected Map _requestList = null;
102
103    // Thread that listens to DS notifications
104    static Thread _monitorThread = null;
105
106    // search listener for asynch ldap searches
107    static LDAPSearchListener _msgQueue;
108
109    // A singelton patern
110    protected static EventService _instance = null;
111
112    // Don't want the server to return all the
113    // entries. return only the changes.
114    private static final boolean CHANGES_ONLY = true;
115
116    // Want the server to return the entry
117    // change control in the search result
118    private static final boolean RETURN_CONTROLS = true;
119
120    // Don't perform search if Persistent
121    // Search control is not supported.
122    private static final boolean IS_CRITICAL = true;
123
124    private static I18n i18n = I18n.getInstance(IUMSConstants.UMS_PKG);
125
126    protected static Debug debugger = Debug.getInstance("amEventService");
127
128    // Parameters in AMConfig, that provide values for connection retries
129    protected static final String EVENT_CONNECTION_NUM_RETRIES = 
130        "com.iplanet.am.event.connection.num.retries";
131
132    protected static final String EVENT_CONNECTION_RETRY_INTERVAL = 
133        "com.iplanet.am.event.connection.delay.between.retries";
134
135    protected static final String EVENT_CONNECTION_ERROR_CODES = 
136        "com.iplanet.am.event.connection.ldap.error.codes.retries";
137
138    // Idle timeout in minutes
139    protected static final String EVENT_IDLE_TIMEOUT_INTERVAL = 
140        "com.sun.am.event.connection.idle.timeout";
141    
142    protected static final String EVENT_LISTENER_DISABLE_LIST =
143        "com.sun.am.event.connection.disable.list";
144          
145    private static boolean _allDisabled = false;    
146
147    private static int _numRetries = 3;
148
149    private static int _retryInterval = 3000;
150
151    private static int _retryMaxInterval = 720000; // 12 minutes
152
153    private static int _retryCount = 1;
154
155    private static long _lastResetTime = 0;
156
157    protected static HashSet _retryErrorCodes;
158
159    // Connection Time Out parameters
160    protected static int _idleTimeOut = 0; // Idle timeout in minutes.
161
162    protected static long _idleTimeOutMills;
163    
164    // List of know listeners. The order of the listeners is important
165    // since it is used to enable & disable the listeners
166    private static final String[] ALL_LISTENERS = {
167        "com.iplanet.am.sdk.ldap.ACIEventListener",
168        "com.iplanet.am.sdk.ldap.EntryEventListener",
169        "com.sun.identity.sm.ldap.LDAPEventManager"
170    };
171
172    protected static String[] listeners;
173
174    protected static Hashtable _ideListenersMap = new Hashtable();   
175    
176    protected static volatile boolean _isThreadStarted = false;
177    
178    protected static volatile boolean _shutdownCalled = false;
179
180    private static HashSet getPropertyRetryErrorCodes(String key) {
181        HashSet codes = new HashSet();
182        String retryErrorStr = SystemProperties.get(key);
183        if (retryErrorStr != null && retryErrorStr.trim().length() > 0) {
184            StringTokenizer stz = new StringTokenizer(retryErrorStr, ",");
185            while (stz.hasMoreTokens()) {
186                codes.add(stz.nextToken().trim());
187            }
188        }
189        return codes;
190    }
191
192    private static int getPropertyIntValue(String key, int defaultValue) {
193        int value = defaultValue;
194        String valueStr = SystemProperties.get(key);
195        if (valueStr != null && valueStr.trim().length() > 0) {
196            try {
197                value = Integer.parseInt(valueStr);
198            } catch (NumberFormatException e) {
199                value = defaultValue;
200                if (debugger.warningEnabled()) {
201                    debugger.warning("EventService.getPropertyIntValue(): "
202                            + "Invalid value for property: "
203                            + EVENT_CONNECTION_NUM_RETRIES
204                            + " Defaulting to value: " + defaultValue);
205                }
206            }
207        }
208
209        if (debugger.messageEnabled()) {
210            debugger.message("EventService.getPropertyIntValue(): " + key
211                    + " = " + value);
212        }
213        return value;
214    }
215    
216    /**
217     * Determine the listener list based on the diable list property
218     * and SMS DataStore notification property in Realm mode
219     */
220    private static void getListenerList() {
221        String list = SystemProperties.get(EVENT_LISTENER_DISABLE_LIST, "");
222        if (debugger.messageEnabled()) {
223            debugger.message("EventService.getListenerList(): " +
224                    EVENT_LISTENER_DISABLE_LIST + ": " + list);
225        }
226        
227        boolean enableDataStoreNotification = Boolean.parseBoolean(
228            SystemProperties.get(Constants.SMS_ENABLE_DB_NOTIFICATION));
229        if (debugger.messageEnabled()) {
230            debugger.message("EventService.getListenerList(): " +
231                "com.sun.identity.sm.enableDataStoreNotification: " +
232                enableDataStoreNotification);
233        }
234        
235        boolean configTime = Boolean.parseBoolean(SystemProperties.get(
236            Constants.SYS_PROPERTY_INSTALL_TIME));
237        if (debugger.messageEnabled()) {
238            debugger.message("EventService.getListenerList(): " +
239                Constants.SYS_PROPERTY_INSTALL_TIME + ": " + configTime);
240        }
241        
242        // Copy the default listeners
243        String[] tmpListeners = new String[ALL_LISTENERS.length];
244        System.arraycopy(ALL_LISTENERS, 0, tmpListeners, 0, ALL_LISTENERS.length);
245        
246        // Process the configured disabled list first
247        boolean disableACI = false, disableUM = false, disableSM = false;
248        if (list.length() != 0) {
249            StringTokenizer st = new StringTokenizer(list, ",");
250            String listener = "";
251            while (st.hasMoreTokens()) {
252                listener = st.nextToken().trim();
253                if (listener.equalsIgnoreCase("aci")) {
254                    disableACI = true;
255                } else if (listener.equalsIgnoreCase("um")) {
256                    disableUM = true;
257                } else if (listener.equalsIgnoreCase("sm")) {
258                    disableSM = true;
259                } else {
260                    debugger.error("EventService.getListenerList() - " +
261                        "Invalid listener name: " + listener);
262                }
263            }
264        }
265        
266        if (!disableUM || !disableACI) {
267            // Check if AMSDK is configured
268            boolean disableAMSDK = true;
269            if (!configTime) {
270                try {
271                    ServiceSchemaManager scm = new ServiceSchemaManager(
272                        getSSOToken(), IdConstants.REPO_SERVICE, "1.0");
273                    ServiceSchema idRepoSubSchema = scm.getOrganizationSchema();
274                    Set idRepoPlugins = idRepoSubSchema.getSubSchemaNames();
275                    if (idRepoPlugins.contains("amSDK")) {
276                        disableAMSDK = false;
277                    }
278                } catch (SMSException ex) {
279                    if (debugger.warningEnabled()) {
280                        debugger.warning("EventService.getListenerList() - " +
281                            "Unable to obtain idrepo service", ex);
282                    }
283                } catch (SSOException ex) {
284                    // Should not happen, ignore the exception
285                }
286            }
287            if (disableAMSDK) {
288                disableUM = true;
289                disableACI = true;
290                if (debugger.messageEnabled()) {
291                    debugger.message("EventService.getListener" +
292                        "List(): AMSDK is not configured or config time. " +
293                        "Disabling UM and ACI event listeners");
294                }
295            }
296        }
297        
298        // Verify if SMSnotification should be enabled
299        if (configTime || ServiceManager.isRealmEnabled()) {
300            disableSM = !enableDataStoreNotification;
301            if (debugger.messageEnabled()) {
302                debugger.message("EventService.getListenerList(): In realm " +
303                    "mode or config time, SMS listener is set to datastore " +
304                    "notification flag: " + enableDataStoreNotification);
305            }
306        }
307        
308        // Disable the selected listeners
309        if (disableACI) {
310            tmpListeners[0] = null;
311        }
312        if (disableUM) {
313            tmpListeners[1] = null;
314        }
315        if (disableSM) {
316            tmpListeners[2] = null;
317        }
318        listeners = tmpListeners;
319
320        // if all disabled, signal to not start the thread
321        if (disableACI && disableUM && disableSM) {
322            if (debugger.messageEnabled()) {
323                debugger.message("EventService.getListenerList() - " +
324                        "all listeners are disabled, EventService won't start");
325                }
326            _allDisabled = true;
327        } else {
328            _allDisabled = false;
329        }
330    }
331
332    /**
333     * Private Constructor
334     */
335    protected EventService() throws EventException {
336        getConfigManager();
337        _requestList = Collections.synchronizedMap(new HashMap());
338    }
339
340    /**
341     * create the singelton EventService object if it doesn't exist already.
342     * Check if directory server supports the Persistent Search Control and the
343     * Proxy Auth Control
344     * @supported.api
345     */
346    public synchronized static EventService getEventService()
347            throws EventException, LDAPException {
348        
349        if (_shutdownCalled) {
350            return null;
351        }
352        
353        // Make sure only one instance of this class is created.
354        if (_instance == null) {
355            // Determine the Idle time out value for Event Service (LB/Firewall)
356            // scenarios. Value == 0 imples no idle timeout.
357            _idleTimeOut = getPropertyIntValue(EVENT_IDLE_TIMEOUT_INTERVAL,
358                _idleTimeOut);
359            _idleTimeOutMills = _idleTimeOut * 60000;
360            ShutdownManager shutdownMan = ShutdownManager.getInstance();
361            if (shutdownMan.acquireValidLock()) {
362                try {
363                    if (_idleTimeOut == 0) {
364                        _instance = new EventService();
365                    } else {
366                        _instance = new EventServicePolling();
367                    }
368                    shutdownMan.addShutdownListener(new
369                        ShutdownListener() {
370                            public void shutdown() {
371                                if (_instance != null) {
372                                    _instance.finalize();
373                                }
374                            }
375                        });
376                } finally {
377                    shutdownMan.releaseLockAndNotify();
378                }
379            }
380        }
381        return _instance;
382    }
383    
384    protected static String getName() {
385        return "EventService";
386    }
387
388    /**
389     * At the end, close THE Event Manager's connections Abandon all previous
390     * persistent search requests
391     * @supported.api
392     */
393    public void finalize() {
394        synchronized (this) {
395            _shutdownCalled = true;
396            if ((_monitorThread != null) && (_monitorThread.isAlive())) {
397                _monitorThread.interrupt();
398                _isThreadStarted = false;
399            }
400        }
401        synchronized (_requestList) {
402            Collection requestObjs = _requestList.values();
403            Iterator iter = requestObjs.iterator();
404            while (iter.hasNext()) {
405                Request request = (Request) iter.next();
406                removeListener(request);
407            }
408            _requestList.clear();
409        }
410    }
411
412    /**
413     * Adds a listener to the directory.
414     * @supported.api
415     */
416    protected synchronized String addListener(SSOToken token,
417            IDSEventListener listener, String base, int scope, String filter,
418            int operations) throws LDAPException, EventException {
419
420        if (_shutdownCalled) {
421            throw new EventException(i18n
422                    .getString(IUMSConstants.DSCFG_CONNECTFAIL));
423        }
424        
425        LDAPConnection lc = null;
426        try {
427            // Check for SMS listener and use "sms" group if present
428            if ((listener.getClass().getName().equals(
429                "com.sun.identity.sm.ldap.LDAPEventManager")) &&
430                (cm.getServerGroup("sms") != null)) {
431                lc = cm.getNewConnection("sms", LDAPUser.Type.AUTH_ADMIN);
432
433            } else {
434                lc = cm.getNewAdminConnection();
435            }
436        } catch (LDAPServiceException le) {
437            throw new EventException(i18n
438                    .getString(IUMSConstants.DSCFG_CONNECTFAIL), le);
439        }
440
441        LDAPSearchConstraints cons = lc.getSearchConstraints();
442
443        // Create Persistent Search Control object
444        LDAPPersistSearchControl psearchCtrl = new LDAPPersistSearchControl(
445                operations, CHANGES_ONLY, RETURN_CONTROLS, IS_CRITICAL);
446
447        // Add LDAPControl array to the search constraint object
448        cons.setServerControls(psearchCtrl);
449        cons.setBatchSize(1);
450
451        // Listeners can not read attributes from the event.
452        // Request only javaClassName to be able to determine object type
453        String[] attrs = new String[] { "objectclass" };
454        LDAPSearchListener searchListener = null;
455        // Set (asynchronous) persistent search request in the DS
456        try {
457            if (debugger.messageEnabled()) {
458                debugger.message("EventService.addListener() - Submiting "
459                        + "Persistent Search on: " + base + " for listener: "
460                        + listener);
461            }
462            searchListener = lc.search(base, scope, filter, attrs, false,
463                    null, cons);
464        } catch (LDAPException le) {
465            if ((lc != null) && lc.isConnected()) {
466                try {
467                    lc.disconnect();
468                } catch (Exception ex) {
469                    //ignored
470                }
471            }
472            debugger.error("EventService.addListener() - Failed to set "
473                    + "Persistent Search" + le.getMessage());
474            throw le;
475        }
476
477        int[] outstandingRequests = searchListener.getMessageIDs();
478        int id = outstandingRequests[outstandingRequests.length - 1];
479
480        String reqID = Integer.toString(id);
481        long startTime = System.currentTimeMillis();
482        Request request = new Request(id, reqID, token, base, scope, filter,
483                attrs, operations, listener, lc, startTime);
484        _requestList.put(reqID, request);
485
486        // Add this search request to the m_msgQueue so it can be
487        // processed by the monitor thread
488        if (_msgQueue == null) {
489            _msgQueue = searchListener;
490        } else {
491            _msgQueue.merge(searchListener);
492        }
493
494        if (!_isThreadStarted) {
495            startMonitorThread();
496        } else {
497            if (_requestList.size() == 1) {
498                notify();
499            }
500        }
501        
502        if (debugger.messageEnabled()) {
503            outstandingRequests = _msgQueue.getMessageIDs();
504            debugger.message("EventService.addListener(): merged Listener: "
505                    + " requestID: " + reqID + " & Request: " + request
506                    + " on to message Queue. No. of current outstanding "
507                    + "requests = " + outstandingRequests.length);
508        }
509
510        // Create new (EventService) Thread, if one doesn't exist.
511        return reqID;
512    }
513
514    public IDSEventListener getIDSListeners(String className) {
515        return (IDSEventListener) _ideListenersMap.get(className);
516    }
517    
518    public static boolean isThreadStarted() {
519        return _isThreadStarted;
520    }
521      
522    /**
523     * Main monitor thread loop. Wait for persistent search change notifications
524     *
525     * @supported.api
526     */    
527    public void run() {
528        try {
529            if (debugger.messageEnabled()) {
530                debugger.message("EventService.run(): Event Thread is running! "
531                        + "No Idle timeout Set: " + _idleTimeOut + " minutes.");
532            }
533            
534            boolean successState = true;
535            LDAPMessage message = null;
536            while (successState) {
537                try {
538                    if (debugger.messageEnabled()) {
539                        debugger.message("EventService.run(): Waiting for "
540                                + "response");
541                    }
542                    synchronized (this) {
543                        if (_requestList.isEmpty()) {
544                            wait();
545                        }
546                    }
547                    message = _msgQueue.getResponse();
548                    successState = processResponse(message);
549                } catch (LDAPInterruptedException ex) {
550                    if (_shutdownCalled) {
551                        break;
552                    } else {
553                        if (debugger.warningEnabled()) {
554                            debugger.warning("EventService.run() " +
555                                "LDAPInterruptedException received:", ex);
556                        }
557                    }
558                } catch (LDAPException ex) {
559                    if (_shutdownCalled) {                        
560                        break;
561                    } else {
562                        int resultCode = ex.getLDAPResultCode();
563                        if (debugger.warningEnabled()) {
564                            debugger.warning("EventService.run() LDAPException "
565                                + "received:", ex);
566                        }
567                        _retryErrorCodes = getPropertyRetryErrorCodes(
568                            EVENT_CONNECTION_ERROR_CODES);
569
570                        // Catch special error codition in
571                        // LDAPSearchListener.getResponse
572                        String msg = ex.getLDAPErrorMessage();
573                        if ((resultCode == LDAPException.OTHER) &&
574                            (msg != null) && msg.equals("Invalid response")) {
575                            // We should not try to resetError and retry
576                            processNetworkError(ex);
577                        } else {
578                            if (_retryErrorCodes.contains("" + resultCode)) {
579                                resetErrorSearches(true);
580                            } else { // Some other network error
581                                processNetworkError(ex);
582                            }
583                        }
584                    }
585                }
586            } // end of while loop
587        } catch (InterruptedException ex) {
588            if (!_shutdownCalled) {
589                if (debugger.warningEnabled()) {
590                    debugger.warning("EventService.run(): Interrupted exception"
591                        + " caught.", ex);
592                }
593            }
594        } catch (RuntimeException ex) {
595            if (debugger.warningEnabled()) {
596                debugger.warning("EventService.run(): Runtime exception "
597                    + "caught.", ex);
598            }
599            // rethrow the Runtime exception to let the container handle the
600            // exception.
601            throw ex;
602        } catch (Exception ex) {
603            if (debugger.warningEnabled()) {
604                debugger.warning("EventService.run(): Unknown exception "
605                    + "caught.", ex);
606            }
607            // no need to rethrow.
608        } catch (Throwable t) {
609            // Catching Throwable to prevent the thread from exiting.
610            if (debugger.warningEnabled()) {
611                debugger.warning("EventService.run(): Unknown exception "
612                    + "caught. Sleeping for a while.. ", t);
613            }
614            // rethrow the Error to let the container handle the error.
615            throw new Error(t);
616        } finally {
617            synchronized (this) {
618                if (!_shutdownCalled) {
619                    // try to restart the monitor thread.
620                    _monitorThread = null;
621                    startMonitorThread();
622                }
623            }
624        }
625    } // end of thread
626    
627    private static synchronized void startMonitorThread() {
628        if (((_monitorThread == null) || !_monitorThread.isAlive()) &&
629            !_shutdownCalled) {
630            // Even if the monitor thread is not alive, we should use the
631            // same instance of Event Service object (as it maintains all
632            // the listener information)
633            _monitorThread = new Thread(_instance, getName());
634            _monitorThread.setDaemon(true);
635            _monitorThread.start();
636            
637            // Since this is a singleton class once a getEventService() 
638            // is invoked the thread will be started and the variable 
639            // will be set to true. This will help other components 
640            // to avoid starting it once again if the thread has 
641            // started.
642            _isThreadStarted = true;            
643        }
644    }
645
646    protected boolean retryManager(boolean clearCaches) {
647        long now = System.currentTimeMillis();
648        // reset _retryCount to 1 after 12 hours
649        if ((now - _lastResetTime) > 43200000) {
650            _retryCount = 1;
651            _lastResetTime = now;
652        }
653
654        int i = _retryCount * _retryInterval;
655        if (i > _retryMaxInterval) {
656            i = _retryMaxInterval;
657        } else {
658            _retryCount *= 2;
659        }
660
661        if (debugger.messageEnabled()) {
662            debugger.message("EventService.retryManager() - wait " +
663                    (i / 1000) +" seconds before calling resetAllSearches");
664        }
665        sleepRetryInterval(i);
666        return resetAllSearches(clearCaches);
667    }
668
669    /**
670     * Method which process the Response received from the DS.
671     * 
672     * @param message -
673     *            the LDAPMessage received as response
674     * @return true if the reset was successful. False Otherwise.
675     */        
676    protected boolean processResponse(LDAPMessage message) {
677        if ((message == null) && (!_requestList.isEmpty())) {
678            // Some problem with the message queue. We should
679            // try to reset it.
680            debugger.error("EventService.processResponse() - Received a NULL Response, call retryManager");
681            return retryManager(false);
682        }
683        
684        if (debugger.messageEnabled()) {
685            debugger.message("EventService.processResponse() - received "
686                    + "DS message  => " + message.toString());
687        }
688
689        // To determine if the monitor thread needs to be stopped.
690        boolean successState = true;
691
692        Request request = getRequestEntry(message.getMessageID());
693
694        // If no listeners, abandon this message id
695        if (request == null) {
696            // We do not have anything stored about this message id.
697            // So, just log a message and do nothing.
698            if (debugger.messageEnabled()) {
699                debugger.message("EventService.processResponse() - Received "
700                        + "ldap message with unknown id = "
701                        + message.getMessageID());
702            }
703        } else if (message.getMessageType() ==
704            LDAPMessage.LDAP_SEARCH_RESULT_MESSAGE) {
705            // then must be a LDAPSearchResult carrying change control
706            processSearchResultMessage((LDAPSearchResult) message, request);
707            request.setLastUpdatedTime(System.currentTimeMillis());
708        } else if (message.getMessageType() ==
709            LDAPMessage.LDAP_RESPONSE_MESSAGE) {
710            // Check for error message ...
711            LDAPResponse rsp = (LDAPResponse) message;
712            successState = processResponseMessage(rsp, request);
713        } else if (message.getMessageType() ==
714            LDAPMessage.LDAP_SEARCH_RESULT_REFERENCE_MESSAGE) { // Referral
715            processSearchResultRef(
716                    (LDAPSearchResultReference) message, request);
717        }
718        return successState;
719    }
720
721    /**
722     * removes the listener from the list of Persistent Search listeners of the
723     * asynchronous seach for the given search ID.
724     * 
725     * @param request
726     *            The request returned by the addListener
727     * @supported.api
728     */   
729    protected void removeListener(Request request) {
730        LDAPConnection connection = request.getLDAPConnection();
731        if (connection != null) {
732            if (debugger.messageEnabled()) {
733                debugger.message("EventService.removeListener(): Removing "
734                        + "listener requestID: " + request.getRequestID()
735                        + " Listener: " + request.getListener());
736            }
737            try {
738                if ((connection != null) && (connection.isConnected())) {
739                    connection.abandon(request.getId());
740                    connection.disconnect();
741                }
742            } catch (LDAPException le) {
743                // Might have to check the reset codes and try to reset
744                if (debugger.warningEnabled()) {
745                    debugger.warning("EventService.removeListener(): "
746                            + "LDAPException, when trying to remove listener",
747                            le);
748                }
749            }
750        }
751    }
752
753    
754    /**
755     * Reset error searches. Clear cache only if true is passed to argument
756     * 
757     * @param clearCaches
758     */    
759    protected void resetErrorSearches(boolean clearCaches) {
760        
761        Hashtable tmpReqList = new Hashtable();
762        tmpReqList.putAll(_requestList);
763       
764        int[] ids = _msgQueue.getMessageIDs();
765        if (ids != null) {
766            for (int i = 0; i < ids.length; i++) {
767                String reqID = Integer.toString(ids[i]);
768                tmpReqList.remove(reqID);
769            }
770        }
771        Collection reqList = tmpReqList.values();
772        for (Iterator iter = reqList.iterator(); iter.hasNext();) {
773            Request req = (Request) iter.next();
774            _requestList.remove(req.getRequestID());
775        }
776        _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL,
777            _retryInterval);
778        RetryTask task = new RetryTask(tmpReqList);
779        task.clearCache(clearCaches);
780        SystemTimer.getTimer().schedule(task, new Date(((
781            System.currentTimeMillis() + _retryInterval) / 1000) * 1000));
782    }
783    
784    /**
785     * Reset all searches. Clear cache only if true is passed to argument
786     * 
787     * @param clearCaches
788     * @return <code>true</code> if the reset was successful, otherwise <code>false</code>
789     */    
790    public synchronized boolean resetAllSearches(boolean clearCaches) {
791        if (_shutdownCalled) {
792            return false;
793        }
794        
795        // Make a copy of the existing psearches
796        Hashtable tmpReqList = new Hashtable();
797        tmpReqList.putAll(_requestList);
798        _requestList.clear(); // Will be updated in addListener method
799        Collection reqList = tmpReqList.values();
800        
801        // Clear the cache, if parameter is set
802        if (clearCaches && !reqList.isEmpty()) {
803            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
804                Request req = (Request) iter.next();
805                IDSEventListener el = req.getListener();
806                el.allEntriesChanged();
807            }
808        }
809        
810        // Get the list of psearches to be enabled
811        getListenerList();
812        if (_allDisabled) {
813            // All psearches are disabled, remove listeners if any and return
814            if (debugger.messageEnabled()) {
815                debugger.message("EventService.resetAllSearches(): " +
816                    "All psearches have been disabled");
817            }
818            if (!reqList.isEmpty()) {
819                for (Iterator iter = reqList.iterator(); iter.hasNext();) {
820                    Request req = (Request) iter.next();
821                    removeListener(req);
822                    if (debugger.messageEnabled()) {
823                        debugger.message("EventService.resetAllSearches(): " +
824                            "Psearch disabled: " +
825                            req.getListener().getClass().getName());
826                    }
827                }
828            }
829            return true;
830        }
831        
832        // Psearches are enabled, verify and reinitilize
833        // Maintain the listeners to reinitialized in tmpListenerList
834        Set tmpListenerList = new HashSet();
835        Set newListenerList = new HashSet();
836        for (int i = 0; i < listeners.length; i++) {
837            if (listeners[i] != null) {
838                // Check if the listener is present in reqList
839                boolean present = false;
840                for (Iterator iter = reqList.iterator(); iter.hasNext();) {
841                    Request req = (Request) iter.next();
842                    IDSEventListener el = req.getListener();
843                    String listenerClass = el.getClass().getName();
844                    if (listenerClass.equals(listeners[i])) {
845                        present = true;
846                        iter.remove();
847                        tmpListenerList.add(req);
848                    }
849                }
850                if (!present) {
851                    // Add the listner object
852                    if (debugger.messageEnabled()) {
853                        debugger.message("EventService.resetAllSearches(): " +
854                            "Psearch being added: " + listeners[i]);
855                    }
856                    newListenerList.add(listeners[i]);
857                }
858            }
859        }
860        // Remove the listeners not configured
861        if (!reqList.isEmpty()) {
862            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
863                Request req = (Request) iter.next();
864                removeListener(req);
865                if (debugger.messageEnabled()) {
866                    debugger.message("EventService.resetAllSearches(): " +
867                        "Psearch disabled due to configuration changes: " +
868                        req.getListener().getClass().getName());
869                }
870            }
871        }
872        // Reset the requested list
873        reqList = tmpListenerList;
874        
875        // Determine the number of retry attempts in case of failure
876        // If retry property is set to -1, retries will be done infinitely
877        _numRetries = getPropertyIntValue(EVENT_CONNECTION_NUM_RETRIES,
878            _numRetries);
879        int retry = 1;
880        boolean doItAgain = ((_numRetries == -1) || ((_numRetries != 0) &&
881            (retry <= _numRetries))) ? true : false;
882        while (doItAgain) {
883            if (debugger.messageEnabled()) {
884                String str = (_numRetries == -1) ? "indefinitely" : Integer
885                    .toString(retry);
886                debugger.message("EventService.resetAllSearches(): "
887                    + "retrying = " + str);
888            }
889
890            // Note: Avoid setting the messageQueue to null and just
891            // try to disconnect the connections. That way we can be sure
892            // that we have not lost any responses.
893            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
894                try {
895                    Request request = (Request) iter.next();
896
897                    // First add a new listener and then remove the old one
898                    // that we do don't loose any responses to the message
899                    // Queue.
900                    addListener(request.getRequester(), request.getListener(),
901                        request.getBaseDn(), request.getScope(),
902                        request.getFilter(), request.getOperations());
903                    removeListener(request);
904                    iter.remove();
905                } catch (LDAPServiceException e) {
906                    // Ignore exception and retry as we are in the process of
907                    // re-establishing the searches. Notify Listeners after the
908                    // attempt
909                    if (retry == _numRetries) {
910                        processNetworkError(e);
911                    }
912                } catch (LDAPException le) {
913                    // Ignore exception and retry as we are in the process of
914                    // re-establishing the searches. Notify Listeners after the
915                    // attempt
916                    if (retry == _numRetries) {
917                        processNetworkError(le);
918                    }
919                }       
920            }
921            
922            // Check if new listeners need to be added
923            for (Iterator iter = newListenerList.iterator(); iter.hasNext();) {
924                String listnerClass = (String) iter.next();
925                try {
926                    Class thisClass = Class.forName(listnerClass);
927                    IDSEventListener listener = (IDSEventListener)
928                        thisClass.newInstance();
929                    _ideListenersMap.put(listnerClass, listener);
930                    _instance.addListener(getSSOToken(), listener,
931                        listener.getBase(), listener.getScope(),
932                        listener.getFilter(), listener.getOperations());
933                    if (debugger.messageEnabled()) {
934                        debugger.message("EventService.resetAllSearches() - " +
935                            "successfully initialized: " + listnerClass);
936                    }
937                    iter.remove();
938                } catch (Exception e) {
939                    debugger.error("EventService.resetAllSearches() " +
940                        "Unable to start listener " + listnerClass, e);
941                }
942            }
943            
944            if (reqList.isEmpty() && newListenerList.isEmpty()) {
945                return true;
946            } else {
947                if (_numRetries != -1) {
948                   doItAgain = (++retry <= _numRetries) ? true : false;
949                   if (!doItAgain) {
950                       // remove the requests fail to be resetted
951                       // would try to reinitialized the next time
952                       for (Iterator iter = reqList.iterator();
953                           iter.hasNext();) {
954                           Request req = (Request) iter.next();
955                           removeListener(req);
956                           debugger.error("EventService.resetAll" +
957                               "Searches(): unable to restart: " +
958                               req.getListener().getClass().getName());
959                       }
960                       for (Iterator iter = newListenerList.iterator();
961                           iter.hasNext();) {
962                           String req = (String) iter.next();
963                           debugger.error("EventService.resetAll" +
964                               "Searches(): unable add listener: " + req);
965                       }
966                   }
967                }
968            }
969            if (doItAgain) {
970                // Sleep before retry
971                sleepRetryInterval();
972            }
973        } // end while loop
974        return false;
975    }
976       
977    protected void sleepRetryInterval() {
978        _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL,
979            _retryInterval);
980        try {
981            Thread.sleep(_retryInterval);
982        } catch (InterruptedException ie) {
983            // ignore
984        }
985    }
986
987    protected void sleepRetryInterval(int interval) {
988        try {
989            Thread.sleep(interval);
990        } catch (InterruptedException ie) { // ignore
991        }
992    }
993
994    /**
995     * get a handle to the Directory Server Configuration Manager sets the value
996     */    
997    protected static void getConfigManager() throws EventException {
998        try {
999            cm = DSConfigMgr.getDSConfigMgr();
1000        } catch (LDAPServiceException lse) {
1001            debugger.error("EventService.getConfigManager() - Failed to get "
1002                    + "handle to Configuration Manager", lse);
1003            throw new EventException(i18n
1004                    .getString(IUMSConstants.DSCFG_NOCFGMGR), lse);
1005        }
1006    }
1007    
1008    private void dispatchException(Exception e, Request request) {
1009        IDSEventListener el = request.getListener();
1010        debugger.error("EventService.dispatchException() - dispatching "
1011                + "exception to the listener: " + request.getRequestID()
1012                + " Listener: " + request.getListener(), e);
1013        el.eventError(e.toString());
1014    }
1015
1016    /**
1017     * Dispatch naming event to all listeners
1018     */    
1019    private void dispatchEvent(DSEvent dirEvent, Request request) {
1020        IDSEventListener el = request.getListener();
1021        el.entryChanged(dirEvent);
1022    }
1023
1024    /**
1025     * On network error, create ExceptionEvent and delever it to all listeners
1026     * on all events.
1027     */    
1028    protected void processNetworkError(Exception ex) {
1029        Hashtable tmpRequestList = new Hashtable();
1030        tmpRequestList.putAll(_requestList);
1031        int[] ids = _msgQueue.getMessageIDs();
1032        if (ids != null) {
1033            for (int i = 0; i < ids.length; i++) {
1034                tmpRequestList.remove(Integer.toString(ids[i]));
1035            }
1036        }
1037        Collection reqList = tmpRequestList.values();
1038        for (Iterator iter = reqList.iterator(); iter.hasNext();) {
1039            Request request = (Request) iter.next();
1040            dispatchException(ex, request);
1041        }
1042    }
1043
1044    /**
1045     * Response message carries a LDAP error. Response with the code 0
1046     * (SUCCESS), should never be received as persistent search never completes,
1047     * it has to be abandon. Referral messages are ignored
1048     */    
1049    protected boolean processResponseMessage(LDAPResponse rsp,
1050        Request request) {
1051        _retryErrorCodes = getPropertyRetryErrorCodes(
1052            EVENT_CONNECTION_ERROR_CODES);
1053        int resultCode = rsp.getResultCode();
1054        if (_retryErrorCodes.contains("" + resultCode)) {
1055            if (debugger.messageEnabled()) {
1056                debugger.message("EventService.processResponseMessage() - "
1057                        + "received LDAP Response for requestID: "
1058                        + request.getRequestID() + " Listener: "
1059                        + request.getListener() + "Need restarting");
1060            }
1061            resetErrorSearches(false);
1062        } else if (resultCode != 0
1063                || resultCode != LDAPException.REFERRAL) { 
1064            // If not neither of the cases then
1065            if (resultCode == LDAPException.BUSY) {
1066                debugger.error("EventService.processResponseMessage() - received error BUSY, call retryManager");
1067                return retryManager(false);
1068            }
1069            LDAPException ex = new LDAPException("Error result", rsp
1070                    .getResultCode(), rsp.getErrorMessage(), 
1071                    rsp.getMatchedDN());
1072            dispatchException(ex, request);
1073        }
1074        return true;
1075    }
1076
1077    /**
1078     * Process change notification attached as the change control to the message
1079     */    
1080    protected void processSearchResultMessage(LDAPSearchResult res,
1081            Request req) {
1082        LDAPEntry modEntry = res.getEntry();
1083
1084        if (debugger.messageEnabled()) {
1085            debugger.message("EventService.processSearchResultMessage() - "
1086                    + "Changed " + modEntry.getDN());
1087        }
1088
1089        /* Get any entry change controls. */
1090        LDAPControl[] ctrls = res.getControls();
1091
1092        // Can not create event without change control
1093        if (ctrls == null) {
1094            Exception ex = new Exception("EventService - Cannot create "
1095                    + "NamingEvent, no change control info");
1096            dispatchException(ex, req);
1097        } else {
1098            // Multiple controls might be in the message
1099            for (int i = 0; i < ctrls.length; i++) {
1100                LDAPEntryChangeControl changeCtrl = null;
1101
1102                if (ctrls[i].getType() ==
1103                    LDAPControl.LDAP_ENTRY_CHANGE_CONTROL) {
1104                    changeCtrl = (LDAPEntryChangeControl) ctrls[i];
1105                    if (debugger.messageEnabled()) {
1106                        debugger.message("EventService."
1107                                + "processSearchResultMessage() changeCtrl = "
1108                                + changeCtrl.toString());
1109                    }
1110
1111                    // Can not create event without change control
1112                    if (changeCtrl.getChangeType() == -1) {
1113                        Exception ex = new Exception("EventService - Cannot "
1114                                + "create NamingEvent, no change control info");
1115                        dispatchException(ex, req);
1116                    }
1117
1118                    // Convert control into a DSEvent and dispatch to listeners
1119                    try {
1120                        DSEvent event = createDSEvent(
1121                                            modEntry, changeCtrl, req);
1122                        dispatchEvent(event, req);
1123                    } catch (Exception ex) {
1124                        dispatchException(ex, req);
1125                    }
1126                }
1127            }
1128        }
1129    }
1130
1131    /**
1132     * Search continuation messages are ignored.
1133     */    
1134    protected void processSearchResultRef(LDAPSearchResultReference ref,
1135            Request req) {
1136        // Do nothing, message ignored, do not dispatch ExceptionEvent
1137        if (debugger.messageEnabled()) {
1138            debugger.message("EventService.processSearchResultRef() - "
1139                    + "Ignoring..");
1140        }
1141    }
1142    
1143    protected static SSOToken getSSOToken() throws SSOException {
1144        return ((SSOToken) AccessController.doPrivileged(
1145            AdminTokenAction.getInstance()));
1146    }
1147
1148    /**
1149     * Find event entry by message ID
1150     */    
1151    protected Request getRequestEntry(int id) {
1152        return (Request) _requestList.get(Integer.toString(id));
1153    }
1154
1155    /**
1156     * Create naming event from a change control
1157     */    
1158    private DSEvent createDSEvent(LDAPEntry entry,
1159            LDAPEntryChangeControl changeCtrl, Request req) throws Exception {
1160        DSEvent dsEvent = new DSEvent();
1161
1162        if (debugger.messageEnabled()) {
1163            debugger.message("EventService.createDSEvent() - Notifying event "
1164                    + "to: " + req.getListener());
1165        }
1166
1167        // Get the dn from the entry
1168        String dn = entry.getDN();
1169        dsEvent.setID(dn);
1170
1171        // Get information on the type of change made
1172        int changeType = changeCtrl.getChangeType();
1173        dsEvent.setEventType(changeType);
1174
1175        // Pass the search ID as the event's change info
1176        dsEvent.setSearchID(req.getRequestID());
1177
1178        // set the object class name
1179        String className = entry.getAttribute("objectclass").toString();
1180        dsEvent.setClassName(className);
1181
1182        return dsEvent;
1183    }
1184    
1185    class RetryTask extends GeneralTaskRunnable {
1186        
1187        private long runPeriod;
1188        private Map requests;
1189        private boolean clearCaches;
1190        private int numRetries;
1191        
1192        public RetryTask(Map requests) {
1193            
1194            this.runPeriod = getPropertyIntValue(
1195                EVENT_CONNECTION_RETRY_INTERVAL, EventService._retryInterval);
1196            this.requests = requests;
1197            this.numRetries = _numRetries;
1198        }
1199        
1200        public void clearCache(boolean cc) {
1201            clearCaches = cc;
1202        }
1203        
1204        public void run() {
1205            for (Iterator iter = requests.values().iterator();
1206                iter.hasNext();) {
1207                Request req = (Request) iter.next();
1208                try {
1209                    // First add a new listener and then remove the old one
1210                    // that we do don't loose any responses to the message
1211                    // Queue. However before adding check if request list
1212                    // already has this listener initialized
1213                    if (!_requestList.containsValue(req)) {
1214                        addListener(req.getRequester(), req.getListener(),
1215                            req.getBaseDn(), req.getScope(),
1216                            req.getFilter(), req.getOperations());
1217                    }
1218                    removeListener(req);
1219                    if (clearCaches) {
1220                        // Send all entries changed notifications
1221                        // only after successful establishment of psearch
1222                        req.getListener().allEntriesChanged();
1223                    }
1224                    iter.remove();
1225                } catch (Exception e) {
1226                    debugger.error("RetryTask", e);
1227                    // Ignore exception and retry as we are in the process of
1228                    // re-establishing the searches. Notify Listeners after the
1229                    // attempt
1230                }
1231            }
1232            if (--numRetries == 0) {
1233                debugger.error("NumRetries " + numRetries);
1234                runPeriod = -1;
1235            }
1236        }
1237        
1238        public long getRunPeriod() {
1239            return runPeriod;
1240        }
1241        
1242        public boolean isEmpty() {
1243            return true;
1244        }
1245        
1246        public boolean addElement(Object obj) {
1247            return false;
1248        }
1249        
1250        public boolean removeElement(Object obj) {
1251            return false;
1252        }
1253    }
1254}