001/**
002 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003 *
004 * Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved
005 *
006 * The contents of this file are subject to the terms
007 * of the Common Development and Distribution License
008 * (the License). You may not use this file except in
009 * compliance with the License.
010 *
011 * You can obtain a copy of the License at
012 * https://opensso.dev.java.net/public/CDDLv1.0.html or
013 * opensso/legal/CDDLv1.0.txt
014 * See the License for the specific language governing
015 * permission and limitations under the License.
016 *
017 * When distributing Covered Code, include this CDDL
018 * Header Notice in each file and include the License file
019 * at opensso/legal/CDDLv1.0.txt.
020 * If applicable, add the following below the CDDL Header,
021 * with the fields enclosed by brackets [] replaced by
022 * your own identifying information:
023 * "Portions Copyrighted [year] [name of copyright owner]"
024 *
025 * $Id: EventService.java,v 1.19 2009/09/28 21:47:33 ww203982 Exp $
026 *
027 */
028
029/*
030 * Portions Copyrighted 2010-2011 ForgeRock AS
031 */
032
033package com.iplanet.services.ldap.event;
034
035import java.security.AccessController;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Date;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Hashtable;
042import java.util.Iterator;
043import java.util.Map;
044import java.util.Set;
045import java.util.StringTokenizer;
046
047import com.sun.identity.shared.ldap.LDAPConnection;
048import com.sun.identity.shared.ldap.LDAPControl;
049import com.sun.identity.shared.ldap.LDAPEntry;
050import com.sun.identity.shared.ldap.LDAPException;
051import com.sun.identity.shared.ldap.LDAPInterruptedException;
052import com.sun.identity.shared.ldap.LDAPMessage;
053import com.sun.identity.shared.ldap.LDAPResponse;
054import com.sun.identity.shared.ldap.LDAPSearchConstraints;
055import com.sun.identity.shared.ldap.LDAPSearchListener;
056import com.sun.identity.shared.ldap.LDAPSearchResult;
057import com.sun.identity.shared.ldap.LDAPSearchResultReference;
058import com.sun.identity.shared.ldap.controls.LDAPEntryChangeControl;
059import com.sun.identity.shared.ldap.controls.LDAPPersistSearchControl;
060
061import com.sun.identity.common.GeneralTaskRunnable;
062import com.sun.identity.common.SystemTimer;
063import com.sun.identity.shared.debug.Debug;
064import com.iplanet.am.util.SystemProperties;
065import com.iplanet.services.ldap.DSConfigMgr;
066import com.iplanet.services.ldap.LDAPServiceException;
067import com.iplanet.services.ldap.LDAPUser;
068import com.iplanet.services.util.I18n;
069import com.iplanet.sso.SSOException;
070import com.iplanet.sso.SSOToken;
071import com.iplanet.ums.IUMSConstants;
072import com.sun.identity.idm.IdConstants;
073import com.sun.identity.security.AdminTokenAction;
074import com.sun.identity.shared.Constants;
075import com.sun.identity.sm.ServiceSchema;
076import com.sun.identity.sm.ServiceSchemaManager;
077import com.sun.identity.sm.SMSException;
078import com.sun.identity.sm.ServiceManager;
079import org.forgerock.util.thread.listener.ShutdownListener;
080import org.forgerock.util.thread.listener.ShutdownManager;
081
082/**
083 * Event Service monitors changes on the server. Implemented with the persistant
084 * search control. Uses ldapjdk asynchronous interfaces so that multiple search
085 * requests can be processed by a single thread
086 * 
087 * The Type of changes that can be monitored are: - 
088 * LDAPPersistSearchControl.ADD -
089 * LDAPPersistSearchControl.DELETE - LDAPPersistSearchControl.MODIFY -
090 * LDAPPersistSearchControl.MODDN
091 * 
092 * A single connection is established initially and reused to service all
093 * notification requests.
094 * @supported.api
095 */
096public class EventService implements Runnable {
097
098    protected static DSConfigMgr cm = null;
099
100    // list that holds notification requests
101    protected Map _requestList = null;
102
103    // Thread that listens to DS notifications
104    static Thread _monitorThread = null;
105
106    // search listener for asynch ldap searches
107    static LDAPSearchListener _msgQueue;
108
109    // A singelton patern
110    protected static EventService _instance = null;
111
112    // Don't want the server to return all the
113    // entries. return only the changes.
114    private static final boolean CHANGES_ONLY = true;
115
116    // Want the server to return the entry
117    // change control in the search result
118    private static final boolean RETURN_CONTROLS = true;
119
120    // Don't perform search if Persistent
121    // Search control is not supported.
122    private static final boolean IS_CRITICAL = true;
123
124    private static I18n i18n = I18n.getInstance(IUMSConstants.UMS_PKG);
125
126    protected static Debug debugger = Debug.getInstance("amEventService");
127
128    // Parameters in AMConfig, that provide values for connection retries
129    protected static final String EVENT_CONNECTION_NUM_RETRIES = 
130        "com.iplanet.am.event.connection.num.retries";
131
132    protected static final String EVENT_CONNECTION_RETRY_INTERVAL = 
133        "com.iplanet.am.event.connection.delay.between.retries";
134
135    protected static final String EVENT_CONNECTION_ERROR_CODES = 
136        "com.iplanet.am.event.connection.ldap.error.codes.retries";
137
138    // Idle timeout in minutes
139    protected static final String EVENT_IDLE_TIMEOUT_INTERVAL = 
140        "com.sun.am.event.connection.idle.timeout";
141    
142    protected static final String EVENT_LISTENER_DISABLE_LIST =
143        "com.sun.am.event.connection.disable.list";
144          
145    private static boolean _allDisabled = false;    
146
147    private static int _numRetries = 3;
148
149    private static int _retryInterval = 3000;
150
151    private static int _retryMaxInterval = 720000; // 12 minutes
152
153    private static int _retryCount = 1;
154
155    private static long _lastResetTime = 0;
156
157    protected static HashSet _retryErrorCodes;
158
159    // Connection Time Out parameters
160    protected static int _idleTimeOut = 0; // Idle timeout in minutes.
161
162    protected static long _idleTimeOutMills;
163    
164    // List of know listeners. The order of the listeners is important
165    // since it is used to enable & disable the listeners
166    private static final String[] ALL_LISTENERS = {
167        "com.iplanet.am.sdk.ldap.ACIEventListener",
168        "com.iplanet.am.sdk.ldap.EntryEventListener",
169        "com.sun.identity.sm.ldap.LDAPEventManager"
170    };
171
172    protected static String[] listeners;
173
174    protected static Hashtable _ideListenersMap = new Hashtable();   
175    
176    protected static volatile boolean _isThreadStarted = false;
177    
178    protected static volatile boolean _shutdownCalled = false;
179
180    private static HashSet getPropertyRetryErrorCodes(String key) {
181        HashSet codes = new HashSet();
182        String retryErrorStr = SystemProperties.get(key);
183        if (retryErrorStr != null && retryErrorStr.trim().length() > 0) {
184            StringTokenizer stz = new StringTokenizer(retryErrorStr, ",");
185            while (stz.hasMoreTokens()) {
186                codes.add(stz.nextToken().trim());
187            }
188        }
189        return codes;
190    }
191
192    private static int getPropertyIntValue(String key, int defaultValue) {
193        int value = defaultValue;
194        String valueStr = SystemProperties.get(key);
195        if (valueStr != null && valueStr.trim().length() > 0) {
196            try {
197                value = Integer.parseInt(valueStr);
198            } catch (NumberFormatException e) {
199                value = defaultValue;
200                if (debugger.warningEnabled()) {
201                    debugger.warning("EventService.getPropertyIntValue(): "
202                            + "Invalid value for property: "
203                            + EVENT_CONNECTION_NUM_RETRIES
204                            + " Defaulting to value: " + defaultValue);
205                }
206            }
207        }
208
209        if (debugger.messageEnabled()) {
210            debugger.message("EventService.getPropertyIntValue(): " + key
211                    + " = " + value);
212        }
213        return value;
214    }
215    
216    /**
217     * Determine the listener list based on the diable list property
218     * and SMS DataStore notification property in Realm mode
219     */
220    private static void getListenerList() {
221        String list = SystemProperties.get(EVENT_LISTENER_DISABLE_LIST, "");
222        if (debugger.messageEnabled()) {
223            debugger.message("EventService.getListenerList(): " +
224                    EVENT_LISTENER_DISABLE_LIST + ": " + list);
225        }
226        
227        boolean enableDataStoreNotification = Boolean.parseBoolean(
228            SystemProperties.get(Constants.SMS_ENABLE_DB_NOTIFICATION));
229        if (debugger.messageEnabled()) {
230            debugger.message("EventService.getListenerList(): " +
231                "com.sun.identity.sm.enableDataStoreNotification: " +
232                enableDataStoreNotification);
233        }
234        
235        boolean configTime = Boolean.parseBoolean(SystemProperties.get(
236            Constants.SYS_PROPERTY_INSTALL_TIME));
237        if (debugger.messageEnabled()) {
238            debugger.message("EventService.getListenerList(): " +
239                Constants.SYS_PROPERTY_INSTALL_TIME + ": " + configTime);
240        }
241        
242        // Copy the default listeners
243        String[] tmpListeners = new String[ALL_LISTENERS.length];
244        System.arraycopy(ALL_LISTENERS, 0, tmpListeners, 0, ALL_LISTENERS.length);
245        
246        // Process the configured disabled list first
247        boolean disableACI = false, disableUM = false, disableSM = false;
248        if (list.length() != 0) {
249            StringTokenizer st = new StringTokenizer(list, ",");
250            String listener = "";
251            while (st.hasMoreTokens()) {
252                listener = st.nextToken().trim();
253                if (listener.equalsIgnoreCase("aci")) {
254                    disableACI = true;
255                } else if (listener.equalsIgnoreCase("um")) {
256                    disableUM = true;
257                } else if (listener.equalsIgnoreCase("sm")) {
258                    disableSM = true;
259                } else {
260                    debugger.error("EventService.getListenerList() - " +
261                        "Invalid listener name: " + listener);
262                }
263            }
264        }
265        
266        if (!disableUM || !disableACI) {
267            // Check if AMSDK is configured
268            boolean disableAMSDK = true;
269            if (!configTime) {
270                try {
271                    ServiceSchemaManager scm = new ServiceSchemaManager(
272                        getSSOToken(), IdConstants.REPO_SERVICE, "1.0");
273                    ServiceSchema idRepoSubSchema = scm.getOrganizationSchema();
274                    Set idRepoPlugins = idRepoSubSchema.getSubSchemaNames();
275                    if (idRepoPlugins.contains("amSDK")) {
276                        disableAMSDK = false;
277                    }
278                } catch (SMSException ex) {
279                    if (debugger.warningEnabled()) {
280                        debugger.warning("EventService.getListenerList() - " +
281                            "Unable to obtain idrepo service", ex);
282                    }
283                } catch (SSOException ex) {
284                    // Should not happen, ignore the exception
285                }
286            }
287            if (disableAMSDK) {
288                disableUM = true;
289                disableACI = true;
290                if (debugger.messageEnabled()) {
291                    debugger.message("EventService.getListener" +
292                        "List(): AMSDK is not configured or config time. " +
293                        "Disabling UM and ACI event listeners");
294                }
295            }
296        }
297        
298        // Verify if SMSnotification should be enabled
299        if (configTime || ServiceManager.isRealmEnabled()) {
300            disableSM = !enableDataStoreNotification;
301            if (debugger.messageEnabled()) {
302                debugger.message("EventService.getListenerList(): In realm " +
303                    "mode or config time, SMS listener is set to datastore " +
304                    "notification flag: " + enableDataStoreNotification);
305            }
306        }
307        
308        // Disable the selected listeners
309        if (disableACI) {
310            tmpListeners[0] = null;
311        }
312        if (disableUM) {
313            tmpListeners[1] = null;
314        }
315        if (disableSM) {
316            tmpListeners[2] = null;
317        }
318        listeners = tmpListeners;
319
320        // if all disabled, signal to not start the thread
321        if (disableACI && disableUM && disableSM) {
322            if (debugger.messageEnabled()) {
323                debugger.message("EventService.getListenerList() - " +
324                        "all listeners are disabled, EventService won't start");
325                }
326            _allDisabled = true;
327        } else {
328            _allDisabled = false;
329        }
330    }
331
332    /**
333     * Private Constructor
334     */
335    protected EventService() throws EventException {
336        getConfigManager();
337        _requestList = Collections.synchronizedMap(new HashMap());
338    }
339
340    /**
341     * create the singelton EventService object if it doesn't exist already.
342     * Check if directory server supports the Persistent Search Control and the
343     * Proxy Auth Control
344     * @supported.api
345     */
346    public synchronized static EventService getEventService()
347            throws EventException, LDAPException {
348        
349        if (_shutdownCalled) {
350            return null;
351        }
352        
353        // Make sure only one instance of this class is created.
354        if (_instance == null) {
355            // Determine the Idle time out value for Event Service (LB/Firewall)
356            // scenarios. Value == 0 imples no idle timeout.
357            _idleTimeOut = getPropertyIntValue(EVENT_IDLE_TIMEOUT_INTERVAL,
358                _idleTimeOut);
359            _idleTimeOutMills = _idleTimeOut * 60000;
360            ShutdownManager shutdownMan = com.sun.identity.common.ShutdownManager.getInstance();
361
362            if (_idleTimeOut == 0) {
363                _instance = new EventService();
364            } else {
365                _instance = new EventServicePolling();
366            }
367            shutdownMan.addShutdownListener(new
368                ShutdownListener() {
369                    public void shutdown() {
370                        if (_instance != null) {
371                            _instance.finalize();
372                        }
373                    }
374                });
375
376        }
377        return _instance;
378    }
379    
380    protected static String getName() {
381        return "EventService";
382    }
383
384    /**
385     * At the end, close THE Event Manager's connections Abandon all previous
386     * persistent search requests
387     * @supported.api
388     */
389    public void finalize() {
390        synchronized (this) {
391            _shutdownCalled = true;
392            if ((_monitorThread != null) && (_monitorThread.isAlive())) {
393                _monitorThread.interrupt();
394                _isThreadStarted = false;
395            }
396        }
397        synchronized (_requestList) {
398            Collection requestObjs = _requestList.values();
399            Iterator iter = requestObjs.iterator();
400            while (iter.hasNext()) {
401                Request request = (Request) iter.next();
402                removeListener(request);
403            }
404            _requestList.clear();
405        }
406    }
407
408    /**
409     * Adds a listener to the directory.
410     * @supported.api
411     */
412    protected synchronized String addListener(SSOToken token,
413            IDSEventListener listener, String base, int scope, String filter,
414            int operations) throws LDAPException, EventException {
415
416        if (_shutdownCalled) {
417            throw new EventException(i18n
418                    .getString(IUMSConstants.DSCFG_CONNECTFAIL));
419        }
420        
421        LDAPConnection lc = null;
422        try {
423            // Check for SMS listener and use "sms" group if present
424            if ((listener.getClass().getName().equals(
425                "com.sun.identity.sm.ldap.LDAPEventManager")) &&
426                (cm.getServerGroup("sms") != null)) {
427                lc = cm.getNewConnection("sms", LDAPUser.Type.AUTH_ADMIN);
428
429            } else {
430                lc = cm.getNewAdminConnection();
431            }
432        } catch (LDAPServiceException le) {
433            throw new EventException(i18n
434                    .getString(IUMSConstants.DSCFG_CONNECTFAIL), le);
435        }
436
437        LDAPSearchConstraints cons = lc.getSearchConstraints();
438
439        // Create Persistent Search Control object
440        LDAPPersistSearchControl psearchCtrl = new LDAPPersistSearchControl(
441                operations, CHANGES_ONLY, RETURN_CONTROLS, IS_CRITICAL);
442
443        // Add LDAPControl array to the search constraint object
444        cons.setServerControls(psearchCtrl);
445        cons.setBatchSize(1);
446
447        // Listeners can not read attributes from the event.
448        // Request only javaClassName to be able to determine object type
449        String[] attrs = new String[] { "objectclass" };
450        LDAPSearchListener searchListener = null;
451        // Set (asynchronous) persistent search request in the DS
452        try {
453            if (debugger.messageEnabled()) {
454                debugger.message("EventService.addListener() - Submiting "
455                        + "Persistent Search on: " + base + " for listener: "
456                        + listener);
457            }
458            searchListener = lc.search(base, scope, filter, attrs, false,
459                    null, cons);
460        } catch (LDAPException le) {
461            if ((lc != null) && lc.isConnected()) {
462                try {
463                    lc.disconnect();
464                } catch (Exception ex) {
465                    //ignored
466                }
467            }
468            debugger.error("EventService.addListener() - Failed to set "
469                    + "Persistent Search" + le.getMessage());
470            throw le;
471        }
472
473        int[] outstandingRequests = searchListener.getMessageIDs();
474        int id = outstandingRequests[outstandingRequests.length - 1];
475
476        String reqID = Integer.toString(id);
477        long startTime = System.currentTimeMillis();
478        Request request = new Request(id, reqID, token, base, scope, filter,
479                attrs, operations, listener, lc, startTime);
480        _requestList.put(reqID, request);
481
482        // Add this search request to the m_msgQueue so it can be
483        // processed by the monitor thread
484        if (_msgQueue == null) {
485            _msgQueue = searchListener;
486        } else {
487            _msgQueue.merge(searchListener);
488        }
489
490        if (!_isThreadStarted) {
491            startMonitorThread();
492        } else {
493            if (_requestList.size() == 1) {
494                notify();
495            }
496        }
497        
498        if (debugger.messageEnabled()) {
499            outstandingRequests = _msgQueue.getMessageIDs();
500            debugger.message("EventService.addListener(): merged Listener: "
501                    + " requestID: " + reqID + " & Request: " + request
502                    + " on to message Queue. No. of current outstanding "
503                    + "requests = " + outstandingRequests.length);
504        }
505
506        // Create new (EventService) Thread, if one doesn't exist.
507        return reqID;
508    }
509
510    public IDSEventListener getIDSListeners(String className) {
511        return (IDSEventListener) _ideListenersMap.get(className);
512    }
513    
514    public static boolean isThreadStarted() {
515        return _isThreadStarted;
516    }
517      
518    /**
519     * Main monitor thread loop. Wait for persistent search change notifications
520     *
521     * @supported.api
522     */    
523    public void run() {
524        try {
525            if (debugger.messageEnabled()) {
526                debugger.message("EventService.run(): Event Thread is running! "
527                        + "No Idle timeout Set: " + _idleTimeOut + " minutes.");
528            }
529            
530            boolean successState = true;
531            LDAPMessage message = null;
532            while (successState) {
533                try {
534                    if (debugger.messageEnabled()) {
535                        debugger.message("EventService.run(): Waiting for "
536                                + "response");
537                    }
538                    synchronized (this) {
539                        if (_requestList.isEmpty()) {
540                            wait();
541                        }
542                    }
543                    message = _msgQueue.getResponse();
544                    successState = processResponse(message);
545                } catch (LDAPInterruptedException ex) {
546                    if (_shutdownCalled) {
547                        break;
548                    } else {
549                        if (debugger.warningEnabled()) {
550                            debugger.warning("EventService.run() " +
551                                "LDAPInterruptedException received:", ex);
552                        }
553                    }
554                } catch (LDAPException ex) {
555                    if (_shutdownCalled) {                        
556                        break;
557                    } else {
558                        int resultCode = ex.getLDAPResultCode();
559                        if (debugger.warningEnabled()) {
560                            debugger.warning("EventService.run() LDAPException "
561                                + "received:", ex);
562                        }
563                        _retryErrorCodes = getPropertyRetryErrorCodes(
564                            EVENT_CONNECTION_ERROR_CODES);
565
566                        // Catch special error codition in
567                        // LDAPSearchListener.getResponse
568                        String msg = ex.getMessage();
569                        if ((resultCode == LDAPException.OTHER) &&
570                            (msg != null) && msg.equals("Invalid response")) {
571                            // We should not try to resetError and retry
572                            processNetworkError(ex);
573                        } else {
574                            if (_retryErrorCodes.contains("" + resultCode)) {
575                                resetErrorSearches(true);
576                            } else { // Some other network error
577                                processNetworkError(ex);
578                            }
579                        }
580                    }
581                }
582            } // end of while loop
583        } catch (InterruptedException ex) {
584            if (!_shutdownCalled) {
585                if (debugger.warningEnabled()) {
586                    debugger.warning("EventService.run(): Interrupted exception"
587                        + " caught.", ex);
588                }
589            }
590        } catch (RuntimeException ex) {
591            if (debugger.warningEnabled()) {
592                debugger.warning("EventService.run(): Runtime exception "
593                    + "caught.", ex);
594            }
595            // rethrow the Runtime exception to let the container handle the
596            // exception.
597            throw ex;
598        } catch (Exception ex) {
599            if (debugger.warningEnabled()) {
600                debugger.warning("EventService.run(): Unknown exception "
601                    + "caught.", ex);
602            }
603            // no need to rethrow.
604        } catch (Throwable t) {
605            // Catching Throwable to prevent the thread from exiting.
606            if (debugger.warningEnabled()) {
607                debugger.warning("EventService.run(): Unknown exception "
608                    + "caught. Sleeping for a while.. ", t);
609            }
610            // rethrow the Error to let the container handle the error.
611            throw new Error(t);
612        } finally {
613            synchronized (this) {
614                if (!_shutdownCalled) {
615                    // try to restart the monitor thread.
616                    _monitorThread = null;
617                    startMonitorThread();
618                }
619            }
620        }
621    } // end of thread
622    
623    private static synchronized void startMonitorThread() {
624        if (((_monitorThread == null) || !_monitorThread.isAlive()) &&
625            !_shutdownCalled) {
626            // Even if the monitor thread is not alive, we should use the
627            // same instance of Event Service object (as it maintains all
628            // the listener information)
629            _monitorThread = new Thread(_instance, getName());
630            _monitorThread.setDaemon(true);
631            _monitorThread.start();
632            
633            // Since this is a singleton class once a getEventService() 
634            // is invoked the thread will be started and the variable 
635            // will be set to true. This will help other components 
636            // to avoid starting it once again if the thread has 
637            // started.
638            _isThreadStarted = true;            
639        }
640    }
641
642    protected boolean retryManager(boolean clearCaches) {
643        long now = System.currentTimeMillis();
644        // reset _retryCount to 1 after 12 hours
645        if ((now - _lastResetTime) > 43200000) {
646            _retryCount = 1;
647            _lastResetTime = now;
648        }
649
650        int i = _retryCount * _retryInterval;
651        if (i > _retryMaxInterval) {
652            i = _retryMaxInterval;
653        } else {
654            _retryCount *= 2;
655        }
656
657        if (debugger.messageEnabled()) {
658            debugger.message("EventService.retryManager() - wait " +
659                    (i / 1000) +" seconds before calling resetAllSearches");
660        }
661        sleepRetryInterval(i);
662        return resetAllSearches(clearCaches);
663    }
664
665    /**
666     * Method which process the Response received from the DS.
667     * 
668     * @param message -
669     *            the LDAPMessage received as response
670     * @return true if the reset was successful. False Otherwise.
671     */        
672    protected boolean processResponse(LDAPMessage message) {
673        if ((message == null) && (!_requestList.isEmpty())) {
674            // Some problem with the message queue. We should
675            // try to reset it.
676            debugger.error("EventService.processResponse() - Received a NULL Response, call retryManager");
677            return retryManager(false);
678        }
679        
680        if (debugger.messageEnabled()) {
681            debugger.message("EventService.processResponse() - received "
682                    + "DS message  => " + message.toString());
683        }
684
685        // To determine if the monitor thread needs to be stopped.
686        boolean successState = true;
687
688        Request request = getRequestEntry(message.getMessageID());
689
690        // If no listeners, abandon this message id
691        if (request == null) {
692            // We do not have anything stored about this message id.
693            // So, just log a message and do nothing.
694            if (debugger.messageEnabled()) {
695                debugger.message("EventService.processResponse() - Received "
696                        + "ldap message with unknown id = "
697                        + message.getMessageID());
698            }
699        } else if (message.getMessageType() ==
700            LDAPMessage.LDAP_SEARCH_RESULT_MESSAGE) {
701            // then must be a LDAPSearchResult carrying change control
702            processSearchResultMessage((LDAPSearchResult) message, request);
703            request.setLastUpdatedTime(System.currentTimeMillis());
704        } else if (message.getMessageType() ==
705            LDAPMessage.LDAP_RESPONSE_MESSAGE) {
706            // Check for error message ...
707            LDAPResponse rsp = (LDAPResponse) message;
708            successState = processResponseMessage(rsp, request);
709        } else if (message.getMessageType() ==
710            LDAPMessage.LDAP_SEARCH_RESULT_REFERENCE_MESSAGE) { // Referral
711            processSearchResultRef(
712                    (LDAPSearchResultReference) message, request);
713        }
714        return successState;
715    }
716
717    /**
718     * removes the listener from the list of Persistent Search listeners of the
719     * asynchronous seach for the given search ID.
720     * 
721     * @param request
722     *            The request returned by the addListener
723     * @supported.api
724     */   
725    protected void removeListener(Request request) {
726        LDAPConnection connection = request.getLDAPConnection();
727        if (connection != null) {
728            if (debugger.messageEnabled()) {
729                debugger.message("EventService.removeListener(): Removing "
730                        + "listener requestID: " + request.getRequestID()
731                        + " Listener: " + request.getListener());
732            }
733            try {
734                if ((connection != null) && (connection.isConnected())) {
735                    connection.abandon(request.getId());
736                    connection.disconnect();
737                }
738            } catch (LDAPException le) {
739                // Might have to check the reset codes and try to reset
740                if (debugger.warningEnabled()) {
741                    debugger.warning("EventService.removeListener(): "
742                            + "LDAPException, when trying to remove listener",
743                            le);
744                }
745            }
746        }
747    }
748
749    
750    /**
751     * Reset error searches. Clear cache only if true is passed to argument
752     * 
753     * @param clearCaches
754     */    
755    protected void resetErrorSearches(boolean clearCaches) {
756        
757        Hashtable tmpReqList = new Hashtable();
758        tmpReqList.putAll(_requestList);
759       
760        int[] ids = _msgQueue.getMessageIDs();
761        if (ids != null) {
762            for (int i = 0; i < ids.length; i++) {
763                String reqID = Integer.toString(ids[i]);
764                tmpReqList.remove(reqID);
765            }
766        }
767        Collection reqList = tmpReqList.values();
768        for (Iterator iter = reqList.iterator(); iter.hasNext();) {
769            Request req = (Request) iter.next();
770            _requestList.remove(req.getRequestID());
771        }
772        _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL,
773            _retryInterval);
774        RetryTask task = new RetryTask(tmpReqList);
775        task.clearCache(clearCaches);
776        SystemTimer.getTimer().schedule(task, new Date(((
777            System.currentTimeMillis() + _retryInterval) / 1000) * 1000));
778    }
779    
780    /**
781     * Reset all searches. Clear cache only if true is passed to argument
782     * 
783     * @param clearCaches
784     * @return <code>true</code> if the reset was successful, otherwise <code>false</code>
785     */    
786    public synchronized boolean resetAllSearches(boolean clearCaches) {
787        if (_shutdownCalled) {
788            return false;
789        }
790        
791        // Make a copy of the existing psearches
792        Hashtable tmpReqList = new Hashtable();
793        tmpReqList.putAll(_requestList);
794        _requestList.clear(); // Will be updated in addListener method
795        Collection reqList = tmpReqList.values();
796        
797        // Clear the cache, if parameter is set
798        if (clearCaches && !reqList.isEmpty()) {
799            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
800                Request req = (Request) iter.next();
801                IDSEventListener el = req.getListener();
802                el.allEntriesChanged();
803            }
804        }
805        
806        // Get the list of psearches to be enabled
807        getListenerList();
808        if (_allDisabled) {
809            // All psearches are disabled, remove listeners if any and return
810            if (debugger.messageEnabled()) {
811                debugger.message("EventService.resetAllSearches(): " +
812                    "All psearches have been disabled");
813            }
814            if (!reqList.isEmpty()) {
815                for (Iterator iter = reqList.iterator(); iter.hasNext();) {
816                    Request req = (Request) iter.next();
817                    removeListener(req);
818                    if (debugger.messageEnabled()) {
819                        debugger.message("EventService.resetAllSearches(): " +
820                            "Psearch disabled: " +
821                            req.getListener().getClass().getName());
822                    }
823                }
824            }
825            return true;
826        }
827        
828        // Psearches are enabled, verify and reinitilize
829        // Maintain the listeners to reinitialized in tmpListenerList
830        Set tmpListenerList = new HashSet();
831        Set newListenerList = new HashSet();
832        for (int i = 0; i < listeners.length; i++) {
833            if (listeners[i] != null) {
834                // Check if the listener is present in reqList
835                boolean present = false;
836                for (Iterator iter = reqList.iterator(); iter.hasNext();) {
837                    Request req = (Request) iter.next();
838                    IDSEventListener el = req.getListener();
839                    String listenerClass = el.getClass().getName();
840                    if (listenerClass.equals(listeners[i])) {
841                        present = true;
842                        iter.remove();
843                        tmpListenerList.add(req);
844                    }
845                }
846                if (!present) {
847                    // Add the listner object
848                    if (debugger.messageEnabled()) {
849                        debugger.message("EventService.resetAllSearches(): " +
850                            "Psearch being added: " + listeners[i]);
851                    }
852                    newListenerList.add(listeners[i]);
853                }
854            }
855        }
856        // Remove the listeners not configured
857        if (!reqList.isEmpty()) {
858            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
859                Request req = (Request) iter.next();
860                removeListener(req);
861                if (debugger.messageEnabled()) {
862                    debugger.message("EventService.resetAllSearches(): " +
863                        "Psearch disabled due to configuration changes: " +
864                        req.getListener().getClass().getName());
865                }
866            }
867        }
868        // Reset the requested list
869        reqList = tmpListenerList;
870        
871        // Determine the number of retry attempts in case of failure
872        // If retry property is set to -1, retries will be done infinitely
873        _numRetries = getPropertyIntValue(EVENT_CONNECTION_NUM_RETRIES,
874            _numRetries);
875        int retry = 1;
876        boolean doItAgain = ((_numRetries == -1) || ((_numRetries != 0) &&
877            (retry <= _numRetries))) ? true : false;
878        while (doItAgain) {
879            if (debugger.messageEnabled()) {
880                String str = (_numRetries == -1) ? "indefinitely" : Integer
881                    .toString(retry);
882                debugger.message("EventService.resetAllSearches(): "
883                    + "retrying = " + str);
884            }
885
886            // Note: Avoid setting the messageQueue to null and just
887            // try to disconnect the connections. That way we can be sure
888            // that we have not lost any responses.
889            for (Iterator iter = reqList.iterator(); iter.hasNext();) {
890                try {
891                    Request request = (Request) iter.next();
892
893                    // First add a new listener and then remove the old one
894                    // that we do don't loose any responses to the message
895                    // Queue.
896                    addListener(request.getRequester(), request.getListener(),
897                        request.getBaseDn(), request.getScope(),
898                        request.getFilter(), request.getOperations());
899                    removeListener(request);
900                    iter.remove();
901                } catch (LDAPServiceException e) {
902                    // Ignore exception and retry as we are in the process of
903                    // re-establishing the searches. Notify Listeners after the
904                    // attempt
905                    if (retry == _numRetries) {
906                        processNetworkError(e);
907                    }
908                } catch (LDAPException le) {
909                    // Ignore exception and retry as we are in the process of
910                    // re-establishing the searches. Notify Listeners after the
911                    // attempt
912                    if (retry == _numRetries) {
913                        processNetworkError(le);
914                    }
915                }       
916            }
917            
918            // Check if new listeners need to be added
919            for (Iterator iter = newListenerList.iterator(); iter.hasNext();) {
920                String listnerClass = (String) iter.next();
921                try {
922                    Class thisClass = Class.forName(listnerClass);
923                    IDSEventListener listener = (IDSEventListener)
924                        thisClass.newInstance();
925                    _ideListenersMap.put(listnerClass, listener);
926                    _instance.addListener(getSSOToken(), listener,
927                        listener.getBase(), listener.getScope(),
928                        listener.getFilter(), listener.getOperations());
929                    if (debugger.messageEnabled()) {
930                        debugger.message("EventService.resetAllSearches() - " +
931                            "successfully initialized: " + listnerClass);
932                    }
933                    iter.remove();
934                } catch (Exception e) {
935                    debugger.error("EventService.resetAllSearches() " +
936                        "Unable to start listener " + listnerClass, e);
937                }
938            }
939            
940            if (reqList.isEmpty() && newListenerList.isEmpty()) {
941                return true;
942            } else {
943                if (_numRetries != -1) {
944                   doItAgain = (++retry <= _numRetries) ? true : false;
945                   if (!doItAgain) {
946                       // remove the requests fail to be resetted
947                       // would try to reinitialized the next time
948                       for (Iterator iter = reqList.iterator();
949                           iter.hasNext();) {
950                           Request req = (Request) iter.next();
951                           removeListener(req);
952                           debugger.error("EventService.resetAll" +
953                               "Searches(): unable to restart: " +
954                               req.getListener().getClass().getName());
955                       }
956                       for (Iterator iter = newListenerList.iterator();
957                           iter.hasNext();) {
958                           String req = (String) iter.next();
959                           debugger.error("EventService.resetAll" +
960                               "Searches(): unable add listener: " + req);
961                       }
962                   }
963                }
964            }
965            if (doItAgain) {
966                // Sleep before retry
967                sleepRetryInterval();
968            }
969        } // end while loop
970        return false;
971    }
972       
973    protected void sleepRetryInterval() {
974        _retryInterval = getPropertyIntValue(EVENT_CONNECTION_RETRY_INTERVAL,
975            _retryInterval);
976        try {
977            Thread.sleep(_retryInterval);
978        } catch (InterruptedException ie) {
979            // ignore
980        }
981    }
982
983    protected void sleepRetryInterval(int interval) {
984        try {
985            Thread.sleep(interval);
986        } catch (InterruptedException ie) { // ignore
987        }
988    }
989
990    /**
991     * get a handle to the Directory Server Configuration Manager sets the value
992     */    
993    protected static void getConfigManager() throws EventException {
994        try {
995            cm = DSConfigMgr.getDSConfigMgr();
996        } catch (LDAPServiceException lse) {
997            debugger.error("EventService.getConfigManager() - Failed to get "
998                    + "handle to Configuration Manager", lse);
999            throw new EventException(i18n
1000                    .getString(IUMSConstants.DSCFG_NOCFGMGR), lse);
1001        }
1002    }
1003    
1004    private void dispatchException(Exception e, Request request) {
1005        IDSEventListener el = request.getListener();
1006        debugger.error("EventService.dispatchException() - dispatching "
1007                + "exception to the listener: " + request.getRequestID()
1008                + " Listener: " + request.getListener(), e);
1009        el.eventError(e.toString());
1010    }
1011
1012    /**
1013     * Dispatch naming event to all listeners
1014     */    
1015    private void dispatchEvent(DSEvent dirEvent, Request request) {
1016        IDSEventListener el = request.getListener();
1017        el.entryChanged(dirEvent);
1018    }
1019
1020    /**
1021     * On network error, create ExceptionEvent and delever it to all listeners
1022     * on all events.
1023     */    
1024    protected void processNetworkError(Exception ex) {
1025        Hashtable tmpRequestList = new Hashtable();
1026        tmpRequestList.putAll(_requestList);
1027        int[] ids = _msgQueue.getMessageIDs();
1028        if (ids != null) {
1029            for (int i = 0; i < ids.length; i++) {
1030                tmpRequestList.remove(Integer.toString(ids[i]));
1031            }
1032        }
1033        Collection reqList = tmpRequestList.values();
1034        for (Iterator iter = reqList.iterator(); iter.hasNext();) {
1035            Request request = (Request) iter.next();
1036            dispatchException(ex, request);
1037        }
1038    }
1039
1040    /**
1041     * Response message carries a LDAP error. Response with the code 0
1042     * (SUCCESS), should never be received as persistent search never completes,
1043     * it has to be abandon. Referral messages are ignored
1044     */    
1045    protected boolean processResponseMessage(LDAPResponse rsp,
1046        Request request) {
1047        _retryErrorCodes = getPropertyRetryErrorCodes(
1048            EVENT_CONNECTION_ERROR_CODES);
1049        int resultCode = rsp.getResultCode();
1050        if (_retryErrorCodes.contains("" + resultCode)) {
1051            if (debugger.messageEnabled()) {
1052                debugger.message("EventService.processResponseMessage() - "
1053                        + "received LDAP Response for requestID: "
1054                        + request.getRequestID() + " Listener: "
1055                        + request.getListener() + "Need restarting");
1056            }
1057            resetErrorSearches(false);
1058        } else if (resultCode != 0
1059                || resultCode != LDAPException.REFERRAL) { 
1060            // If not neither of the cases then
1061            if (resultCode == LDAPException.BUSY) {
1062                debugger.error("EventService.processResponseMessage() - received error BUSY, call retryManager");
1063                return retryManager(false);
1064            }
1065            LDAPException ex = new LDAPException("Error result", rsp
1066                    .getResultCode(), rsp.getErrorMessage(), 
1067                    rsp.getMatchedDN());
1068            dispatchException(ex, request);
1069        }
1070        return true;
1071    }
1072
1073    /**
1074     * Process change notification attached as the change control to the message
1075     */    
1076    protected void processSearchResultMessage(LDAPSearchResult res,
1077            Request req) {
1078        LDAPEntry modEntry = res.getEntry();
1079
1080        if (debugger.messageEnabled()) {
1081            debugger.message("EventService.processSearchResultMessage() - "
1082                    + "Changed " + modEntry.getDN());
1083        }
1084
1085        /* Get any entry change controls. */
1086        LDAPControl[] ctrls = res.getControls();
1087
1088        // Can not create event without change control
1089        if (ctrls == null) {
1090            Exception ex = new Exception("EventService - Cannot create "
1091                    + "NamingEvent, no change control info");
1092            dispatchException(ex, req);
1093        } else {
1094            // Multiple controls might be in the message
1095            for (int i = 0; i < ctrls.length; i++) {
1096                LDAPEntryChangeControl changeCtrl = null;
1097
1098                if (ctrls[i].getType() ==
1099                    LDAPControl.LDAP_ENTRY_CHANGE_CONTROL) {
1100                    changeCtrl = (LDAPEntryChangeControl) ctrls[i];
1101                    if (debugger.messageEnabled()) {
1102                        debugger.message("EventService."
1103                                + "processSearchResultMessage() changeCtrl = "
1104                                + changeCtrl.toString());
1105                    }
1106
1107                    // Can not create event without change control
1108                    if (changeCtrl.getChangeType() == -1) {
1109                        Exception ex = new Exception("EventService - Cannot "
1110                                + "create NamingEvent, no change control info");
1111                        dispatchException(ex, req);
1112                    }
1113
1114                    // Convert control into a DSEvent and dispatch to listeners
1115                    try {
1116                        DSEvent event = createDSEvent(
1117                                            modEntry, changeCtrl, req);
1118                        dispatchEvent(event, req);
1119                    } catch (Exception ex) {
1120                        dispatchException(ex, req);
1121                    }
1122                }
1123            }
1124        }
1125    }
1126
1127    /**
1128     * Search continuation messages are ignored.
1129     */    
1130    protected void processSearchResultRef(LDAPSearchResultReference ref,
1131            Request req) {
1132        // Do nothing, message ignored, do not dispatch ExceptionEvent
1133        if (debugger.messageEnabled()) {
1134            debugger.message("EventService.processSearchResultRef() - "
1135                    + "Ignoring..");
1136        }
1137    }
1138    
1139    protected static SSOToken getSSOToken() throws SSOException {
1140        return ((SSOToken) AccessController.doPrivileged(
1141            AdminTokenAction.getInstance()));
1142    }
1143
1144    /**
1145     * Find event entry by message ID
1146     */    
1147    protected Request getRequestEntry(int id) {
1148        return (Request) _requestList.get(Integer.toString(id));
1149    }
1150
1151    /**
1152     * Create naming event from a change control
1153     */    
1154    private DSEvent createDSEvent(LDAPEntry entry,
1155            LDAPEntryChangeControl changeCtrl, Request req) throws Exception {
1156        DSEvent dsEvent = new DSEvent();
1157
1158        if (debugger.messageEnabled()) {
1159            debugger.message("EventService.createDSEvent() - Notifying event "
1160                    + "to: " + req.getListener());
1161        }
1162
1163        // Get the dn from the entry
1164        String dn = entry.getDN();
1165        dsEvent.setID(dn);
1166
1167        // Get information on the type of change made
1168        int changeType = changeCtrl.getChangeType();
1169        dsEvent.setEventType(changeType);
1170
1171        // Pass the search ID as the event's change info
1172        dsEvent.setSearchID(req.getRequestID());
1173
1174        // set the object class name
1175        String className = entry.getAttribute("objectclass").toString();
1176        dsEvent.setClassName(className);
1177
1178        return dsEvent;
1179    }
1180    
1181    class RetryTask extends GeneralTaskRunnable {
1182        
1183        private long runPeriod;
1184        private Map requests;
1185        private boolean clearCaches;
1186        private int numRetries;
1187        
1188        public RetryTask(Map requests) {
1189            
1190            this.runPeriod = getPropertyIntValue(
1191                EVENT_CONNECTION_RETRY_INTERVAL, EventService._retryInterval);
1192            this.requests = requests;
1193            this.numRetries = _numRetries;
1194        }
1195        
1196        public void clearCache(boolean cc) {
1197            clearCaches = cc;
1198        }
1199        
1200        public void run() {
1201            for (Iterator iter = requests.values().iterator();
1202                iter.hasNext();) {
1203                Request req = (Request) iter.next();
1204                try {
1205                    // First add a new listener and then remove the old one
1206                    // that we do don't loose any responses to the message
1207                    // Queue. However before adding check if request list
1208                    // already has this listener initialized
1209                    if (!_requestList.containsValue(req)) {
1210                        addListener(req.getRequester(), req.getListener(),
1211                            req.getBaseDn(), req.getScope(),
1212                            req.getFilter(), req.getOperations());
1213                    }
1214                    removeListener(req);
1215                    if (clearCaches) {
1216                        // Send all entries changed notifications
1217                        // only after successful establishment of psearch
1218                        req.getListener().allEntriesChanged();
1219                    }
1220                    iter.remove();
1221                } catch (Exception e) {
1222                    debugger.error("RetryTask", e);
1223                    // Ignore exception and retry as we are in the process of
1224                    // re-establishing the searches. Notify Listeners after the
1225                    // attempt
1226                }
1227            }
1228            if (--numRetries == 0) {
1229                debugger.error("NumRetries " + numRetries);
1230                runPeriod = -1;
1231            }
1232        }
1233        
1234        public long getRunPeriod() {
1235            return runPeriod;
1236        }
1237        
1238        public boolean isEmpty() {
1239            return true;
1240        }
1241        
1242        public boolean addElement(Object obj) {
1243            return false;
1244        }
1245        
1246        public boolean removeElement(Object obj) {
1247            return false;
1248        }
1249    }
1250}




























































Copyright © 2010-2017, ForgeRock All Rights Reserved.