/* * JBoss, Home of Professional Open Source * Copyright 2010, JBoss Inc., and individual contributors as indicated * by the @authors tag. See the copyright.txt in the distribution for a * full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package au.com.infomedix.cpflogin.cluster; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.transaction.Status; import javax.transaction.TransactionManager; import org.infinispan.Cache; import org.infinispan.config.Configuration; import org.infinispan.manager.CacheContainer; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; import org.infinispan.notifications.cachelistener.annotation.TransactionCompleted; import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent; import org.infinispan.notifications.cachelistener.event.TransactionCompletedEvent; import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged; import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent; import org.infinispan.remoting.transport.Address; import org.infinispan.transaction.xa.GlobalTransaction; import org.jboss.ha.ispn.CacheContainerRegistry; import org.jboss.ha.ispn.DefaultCacheContainerRegistry; import org.jboss.logging.Logger; import org.jboss.util.threadpool.ThreadPool; import org.jboss.web.tomcat.service.sso.ispn.CredentialKey; import org.jboss.web.tomcat.service.sso.ispn.SSOKey; import org.jboss.web.tomcat.service.sso.ispn.SessionKey; import org.jboss.web.tomcat.service.sso.spi.FullyQualifiedSessionId; import org.jboss.web.tomcat.service.sso.spi.SSOCredentials; import org.jboss.web.tomcat.service.sso.spi.SSOLocalManager; /** * An implementation of SSOClusterManager that uses a Infinispan cache * to share SSO information between cluster nodes. * * @author Brian E. Stansberry * @author Scott Marlow * @version $Revision: 109002 $ $Date: 2007-01-12 03:39:24 +0100 (ven., 12 janv. 2007) $ */ @Listener public final class SSOClusterManager implements org.jboss.web.tomcat.service.sso.spi.SSOClusterManager { // ------------------------------------------------------------- Constants /** * Default global value for the threadPoolName property */ public static final String DEFAULT_THREAD_POOL_NAME = "jboss.system:service=ThreadPool"; // ------------------------------------------------------- Instance Fields /** * The Log-object for this class */ private static final Logger log = Logger.getLogger(SSOClusterManager.class); /** * SSO id which the thread is currently storing to the cache */ private volatile ThreadLocal beingLocallyAdded = new ThreadLocal(); /** * SSO id which a thread is currently removing from the cache */ private volatile ThreadLocal beingLocallyRemoved = new ThreadLocal(); /** * SSO id which the thread is deregistering due to removal on another node */ private volatile ThreadLocal beingRemotelyRemoved = new ThreadLocal(); /** * The clustered cache that holds the SSO credentials and the sessions. * The CacheKey will indicate which type it is (CacheKey.CREDENTIAL or CacheKey.SESSION); */ private volatile Cache cache = null; /** * Transaction Manager */ private volatile TransactionManager tm = null; private volatile String threadPoolName = DEFAULT_THREAD_POOL_NAME; private volatile ThreadPool threadPool; /** * The MBean server we use to access external components (TODO: convert to injection) */ private volatile MBeanServer server = null; /** * The SingleSignOn for which we are providing cluster support */ private volatile SSOLocalManager ssoValve = null; /** * Whether we have been started */ private volatile boolean started = false; /** * Whether we have logged an error due to not having a valid cache */ private volatile boolean missingCacheErrorLogged = false; /** * Our node's address in the cluster. */ private volatile Object localAddress = null; /** * The new members of the last view passed to viewChange() */ private final Set currentView = new HashSet(); /** Mutex lock to ensure only one view change at a time is being processed */ private final Object cleanupMutex = new Object(); private final CacheContainerRegistry registry; // ----------------------------------------------------- Incoming event queue private class TxEventQueue { private ConcurrentMap>> map = new ConcurrentHashMap>>(); public void offer(CacheEntryModifiedEvent event) { final GlobalTransaction txn = event.getGlobalTransaction(); if(txn==null) { // no transaction apparent, so just do it now. cacheEntryModified(event); } else { // queue the event for when the transaction commits Queue> queue = getQueue(txn); queue.offer(event); } } private Queue> getQueue(GlobalTransaction transaction) { Queue> queue = map.get(transaction); if(queue == null) { queue = new ConcurrentLinkedQueue>(); map.putIfAbsent(transaction, queue); } return queue; } public Queue> takeAll(GlobalTransaction transaction) { return map.remove(transaction); } } private TxEventQueue events = new TxEventQueue(); @CacheEntryModified public void handle(CacheEntryModifiedEvent event) { if(event.isPre() || event.isOriginLocal()) { return; } events.offer(event); } @TransactionCompleted public void handleTx(TransactionCompletedEvent event) { Queue> completed = events.takeAll(event.getGlobalTransaction()); if((completed != null) && event.isTransactionSuccessful()) { log.trace("Comitted events = " + completed); for(CacheEntryModifiedEvent ev : completed) { cacheEntryModified(ev); } } } // ----------------------------------------------------- Constructor public SSOClusterManager() { this(DefaultCacheContainerRegistry.getInstance()); } /** * This constructor is for unit testing * @param atomicMapFactory */ public SSOClusterManager(CacheContainerRegistry registry) { this.registry = registry; } // ----------------------------------------------------- SSOClusterManager // ------------------------------------------------------------ Properties public String getThreadPoolName() { return threadPoolName; } public boolean isUsingThreadPool() { return threadPool != null; } /** * Notify the cluster of the addition of a Session to an SSO session. * * @param ssoId the id of the SSO session * @param sessionId id of the Session that has been added */ @Override public void addSession(String ssoId, FullyQualifiedSessionId sessionId) { if((ssoId == null) || (sessionId == null)) { return; } if(!isCacheAvailable()) { return; } if(log.isTraceEnabled()) { log.trace("addSession(): adding Session " + sessionId.getSessionId() + " to cached session set for SSO " + ssoId); } boolean doTx = false; try { // Confirm we have a transaction manager; if not get it from Cache // failure to find will throw an IllegalStateException if(tm == null) { configureFromCache(); } // Don't do anything if there is already a transaction // context associated with this thread. if(tm.getTransaction() == null) { doTx = true; } if(doTx) { tm.begin(); } putSessionInCache(ssoId, sessionId); } catch(Exception e) { try { if(doTx) { tm.setRollbackOnly(); } } catch(Exception ignored) { } log.error("caught exception adding session " + sessionId.getSessionId() + " to SSO id " + ssoId, e); } finally { if(doTx) { endTransaction(); } } } /** * Gets the SingleSignOn valve for which this object is handling * cluster communications. * * @return the SingleSignOn valve. */ @Override public SSOLocalManager getSSOLocalManager() { return ssoValve; } /** * Sets the SingleSignOn valve for which this object is handling * cluster communications. *

NOTE: This method must be called before calls can be * made to the other methods of this interface. * * @param localManager a SingleSignOn valve. */ @Override public void setSSOLocalManager(SSOLocalManager localManager) { ssoValve = localManager; if(ssoValve != null) { if(server == null) { server = ssoValve.getMBeanServer(); } String poolName = ssoValve.getThreadPoolName(); if(poolName != null) { threadPoolName = poolName; } } } /** * Notifies the cluster that a single sign on session has been terminated * due to a user logout. * * @param ssoId */ @Override public void logout(String ssoId) { if(!isCacheAvailable()) { return; } // Check whether we are already handling this removal if(ssoId.equals(beingLocallyRemoved.get())) { return; } // Add this SSO to our list of in-process local removals so // this.nodeRemoved() will ignore the removal beingLocallyRemoved.set(ssoId); if(log.isTraceEnabled()) { log.trace("Registering logout of SSO " + ssoId + " in clustered cache"); } try { removeSSOFromCache(ssoId); } catch(Exception e) { log.error("Exception attempting to logout " + ssoId, e); } finally { beingLocallyRemoved.set(null); } } @Override public SSOCredentials lookup(String ssoId) { if(!isCacheAvailable()) { return null; } try { return getCredentialsFromCache(ssoId); } catch(Exception e) { log.error("caught exception looking up SSOCredentials for SSO id " + ssoId, e); return null; } } /** * Notifies the cluster of the creation of a new SSO entry. * * @param ssoId the id of the SSO session * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST * or FORM) used to authenticate the SSO. * @param username the username (if any) used for the authentication * @param password the password (if any) used for the authentication */ @Override public void register(String ssoId, String authType, String username, String password) { if(!isCacheAvailable()) { return; } if(log.isTraceEnabled()) { log.trace("Registering SSO " + ssoId + " in clustered cache"); } storeCredentials(ssoId, authType, username, password); } /** * Notify the cluster of the removal of a Session from an SSO session. * May be called from ssoValue.deregister(String ssoId, String session) * * @param ssoId the id of the SSO session * @param sessionId id of the Session that has been removed */ @Override public void removeSession(String ssoId, FullyQualifiedSessionId sessionId) { if((ssoId == null) || (sessionId == null)) { log.trace("removeSession is ignored as ssoID or sessionId is null " + "ssoId = " + ssoId + ", sessionId = " + sessionId); return; } if(!isCacheAvailable()) { log.trace("removeSession is ignored as cache is not available"); return; } // Check that this session removal is not due to our own deregistration // of an SSO following receipt of a nodeRemoved() call if(ssoId.equals(beingRemotelyRemoved.get())) { log.trace("removeSession is ignored for being remotely removed case "); return; } if(log.isTraceEnabled()) { log.trace("removeSession(): removing Session " + sessionId.getSessionId() + " from cached session set for SSO " + ssoId); } boolean doTx = false; try { // Confirm we have a transaction manager; if not get it from Cache // failure to find will throw an IllegalStateException if(tm == null) { configureFromCache(); } // Don't do anything if there is already a transaction // context associated with this thread. if(tm.getTransaction() == null) { if(log.isTraceEnabled()) { log.trace("removeSession(): no active transaction, will start a new transaction"); } doTx = true; } if(doTx) { tm.begin(); } // remove session removeSessionFromCache(ssoId, sessionId); } catch(Exception e) { try { if(doTx) { if(log.isTraceEnabled()) { log.trace("removeSession(): rollback transaction due to failure " + e.getMessage()); } tm.setRollbackOnly(); } } catch(Exception x) { } log.error("caught exception removing session " + sessionId.getSessionId() + " from SSO id " + ssoId, e); } finally { if(doTx) { endTransaction(); } } } /** * Notifies the cluster of an update of the security credentials * associated with an SSO session. * * @param ssoId the id of the SSO session * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST * or FORM) used to authenticate the SSO. * @param username the username (if any) used for the authentication * @param password the password (if any) used for the authentication */ @Override public void updateCredentials(String ssoId, String authType, String username, String password) { if(!isCacheAvailable()) { return; } if(log.isTraceEnabled()) { log.trace("Updating credentials for SSO " + ssoId + " in clustered cache"); } storeCredentials(ssoId, authType, username, password); } // ------------------------------------------------------ CacheListener /** * Extracts an SSO session id and uses it in an invocation of * {@link ClusteredSingleSignOn#deregister(String) ClusteredSingleSignOn.deregister(String)}. *

* Ignores invocations resulting from Cache changes originated by * this object. * * @param event */ @CacheEntryRemoved public void cacheEntryRemoved(CacheEntryRemovedEvent event) { if(event.isPre()) { return; } SSOKey key = event.getKey(); String ssoId = key.getId(); if((ssoId == null) || !(key instanceof SessionKey)) { if(log.isTraceEnabled()) { log.trace("cacheEntryRemoved ssoId =" + ssoId + ", key class=" + key.getClass().getName()); } return; } if(log.isTraceEnabled()) { log.trace("cacheEntryRemoved ssoId =" + ssoId); } // Entire SSO is being removed; i.e. an invalidation // Ignore messages generated by our own logout activity // TODO: can we check event.isOriginLocal instead of beingLocallyRemoved? if(!ssoId.equals(beingLocallyRemoved.get())) { deregisterSSO(ssoId); } // signal the case that we have zero sessions for this ssoId notifySSOEmpty(ssoId); } /** * If any nodes have been removed from the view, asynchronously scans * all SSOs looking for and removing sessions owned by the removed node. * Notifies the SSO valve if as a result any SSOs no longer have active * sessions. If the removed node is the one associated with this object, * does nothing. */ @ViewChanged public void viewChange(ViewChangedEvent event) { log.debug("Received ViewChangedEvent " + event); boolean masterNode = false; // true if we are the first node in the cluster group. Set oldMembers = new HashSet(event.getOldMembers()); synchronized(currentView) { currentView.clear(); currentView.addAll(event.getNewMembers()); // If we're not in the view, just exit if((localAddress == null) || !currentView.contains(localAddress)) { return; } // treat the first node in the view as the master node if(currentView.iterator().next().equals(localAddress)) { masterNode = true; } // Remove all the current members from the old set; any left // are the dead members oldMembers.removeAll(currentView); } /** * if we are the master node, clean up any dead sessions left behind from a node that left the cluster group */ if((oldMembers.size() > 0) && masterNode) { log.debug("Members have been removed; will launch cleanup task. Dead members: " + oldMembers); launchSSOCleaner(false); } } /** * Instantiates a DeadMemberCleaner and assigns a thread * to execute the cleanup task. * @param notifyIfEmpty TODO */ private void launchSSOCleaner(boolean notifyIfEmpty) { SSOCleanerTask cleaner = new SSOCleanerTask(notifyIfEmpty); if(threadPool != null) { threadPool.run(cleaner); } else { Thread t = new Thread(cleaner, "ClusteredSSOCleaner"); t.setDaemon(true); t.start(); } } /** * Handles the notification that an entire SSO has been removed remotely * * @param ssoId id of the removed SSO */ private void deregisterSSO(String ssoId) { beingRemotelyRemoved.set(ssoId); try { if(log.isTraceEnabled()) { log.trace("received a node removed message for SSO " + ssoId); } ssoValve.deregister(ssoId); } finally { beingRemotelyRemoved.set(null); } } /** * Checks whether any peers remain for the given SSO; if not * notifies the valve that the SSO is empty. * * @param ssoId * @return true if SSO has zero sessions */ private boolean notifySSOEmpty(String ssoId) { boolean isEmpty = false; try { int ssoSessionCount = getSSOSessions(ssoId).size(); if(log.isTraceEnabled()) { log.trace("notifySSOEmpty: session count= " + ssoSessionCount + " for ssoid= " + ssoId); } if(ssoSessionCount == 0) { isEmpty = true; ssoValve.notifySSOEmpty(ssoId); } } catch(Exception e) { log.error("Caught exception checking if " + ssoId + " is empty", e); } return isEmpty; } /** * Extracts an SSO session id and uses it in an invocation of * {@link ClusteredSingleSignOn#update ClusteredSingleSignOn.update()}. *

* Ignores invocations resulting from Cache changes originated by * this object. *

* Ignores invocations for SSO session id's that are not registered * with the local SingleSignOn valve. * * @param event */ //@CacheEntryModified public void cacheEntryModified(CacheEntryModifiedEvent event) { if(event.isPre() || event.isOriginLocal()) { return; } SSOKey key = event.getKey(); String ssoId = key.getId(); if(key instanceof CredentialKey) { if(log.isTraceEnabled()) { log.trace("received a credentials modified message for SSO " + ssoId); } handleCredentialModifiedEvent(ssoId, (SSOCredentials) event.getValue()); } else if(key instanceof SessionKey) { if(log.isTraceEnabled()) { log.trace("received a session modified message for SSO " + ssoId + " with value " +event.getValue()); } // Map value = (Map)event.getValue(); // Map map = ((SessionKey)key).cast(this.cache).get(key); handleSessionModifiedEvent(ssoId); } } /** * @param ssoId the id of the sso * @param credentials */ private void handleCredentialModifiedEvent(String ssoId, SSOCredentials credentials) { // Ignore invocations that come as a result of our additions // TODO: can this local check be removed since we already ignore local events in caller if(ssoId.equals(beingLocallyAdded.get())) { return; } try { if(credentials != null) { ssoValve.remoteUpdate(ssoId, credentials); } } catch(Exception e) { log.error("failed to update credentials for SSO " + ssoId, e); } } /** * * @param ssoId single sign-on session id */ private void handleSessionModifiedEvent(String ssoId) { // Peers remove their entire node when it's empty, so any // other modification means it's not empty // notifySSOEmpty will tell the valve that SSO is empty, // otherwise we tell the valve that its not empty if(!notifySSOEmpty(ssoId)) { ssoValve.notifySSONotEmpty(ssoId); } } /** * Prepare for the beginning of active use of the public methods of this * component. This method should be called before any of the public * methods of this component are utilized. It should also send a * LifecycleEvent of type START_EVENT to any registered listeners. * * @throws Exception if this component detects a fatal error * that prevents this component from being used */ @Override public void start() throws Exception { // Validate and update our current component state if(started) { throw new IllegalStateException("SSOClusterManagerImpl already Started"); } String containerName = this.ssoValve.getCacheConfig(); String cacheName = containerName; if((containerName != null) && !containerName.isEmpty()) { String[] parts = containerName.split("/"); if(parts.length == 2) { containerName = parts[0]; cacheName = parts[1]; } } CacheContainer container = this.registry.getCacheContainer(containerName); this.cache = (cacheName == null) ? container.getCache() : container.getCache(cacheName); if(!this.cache.getStatus().allowInvocations()) { this.cache.start(); } initThreadPool(); this.cache.addListener(this); this.tm = this.cache.getAdvancedCache().getTransactionManager(); started = true; } /** * Gracefully terminate the active use of the public methods of this * component. This method should be the last one called on a given * instance of this component. It should also send a LifecycleEvent * of type STOP_EVENT to any registered listeners. * * @throws Exception if this component detects a fatal error * that needs to be reported */ @Override public void stop() throws Exception { // Validate and update our current component state if(!started) { throw new IllegalStateException("SSOClusterManagerImpl not Started"); } this.cache.removeListener(this); this.cache.stop(); started = false; } // ------------------------------------------------------- Public Methods /** * Gets the number of sessions associated with the given SSO. The same * session active on more than one node will count more than once. */ public int getSessionCount(String ssoId) throws Exception { return getSSOSessions(ssoId).size(); } // ------------------------------------------------------- Private Methods private Set getSSOIds() throws Exception { Set result = new HashSet(); for(SSOKey key : this.cache.keySet()) { if(log.isTraceEnabled()) { log.trace("getSSOIds: found "+key.getId()+" of type "+key.getClass().getSimpleName()); } result.add(key.getId()); } return result; } /** * * @param ssoId * @return set of FullyQualifiedSessionId * @throws Exception */ private Set getSSOSessions(String ssoId) throws Exception { SessionKey key = new SessionKey(ssoId); Map map = key.cast(this.cache).get(key); if(log.isTraceEnabled()) { log.trace("Get sessions for SSO " + ssoId + " returned " + map); getSSOIds(); } return (map != null) ? map.keySet() : Collections.emptySet(); } /** * Obtains needed configuration information from the cache. * Invokes "getTransactionManager" on the cache, caching the * result or throwing an IllegalStateException if one is not found. * Also gets our cluster-wide unique local address from the cache. * * @throws Exception */ private void configureFromCache() throws Exception { if(tm == null) { tm = cache.getAdvancedCache().getTransactionManager(); } if(tm == null) { throw new IllegalStateException("Cache does not have a " + "transaction manager; please " + "configure a valid " + "TransactionManagerLookupClass"); } // We no longer rule out buddy replication, as it can be valid if // all activity for the SSO is meant to pinned to one server (i.e. // only one session, or all sessions share the same session id cookie) /* if (cache.getConfiguration().getBuddyReplicationConfig() != null && cache.getConfiguration().getBuddyReplicationConfig().isEnabled()) { throw new IllegalStateException("Underlying cache is configured for " + "buddy replication; use of buddy " + "replication with ClusteredSingleSignOn " + "is not supported"); } */ // Find out our address Address address = cache.getAdvancedCache().getRpcManager().getAddress(); if(address != null) { localAddress = address; } else if(Configuration.CacheMode.LOCAL == cache.getConfiguration().getCacheMode()) { localAddress = "LOCAL"; } else { throw new IllegalStateException("Cannot get local address from cache"); } log.debug("Local address is " + localAddress); } private void endTransaction() { try { if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK) { if(log.isTraceEnabled()) { log.trace("committing transaction"); } tm.commit(); } else { if(log.isTraceEnabled()) { log.trace("rolling back transaction, tx status=" + tm.getTransaction().getStatus()); } tm.rollback(); } } catch(Exception e) { log.error(e); throw new RuntimeException("SSOClusterManagerImpl.endTransaction(): ", e); } } private MBeanServer getMBeanServer() { if((server == null) && (ssoValve != null)) { server = ssoValve.getMBeanServer(); } return server; } private boolean isCacheAvailable() { //boolean avail = isCacheAvailable(false); boolean avail = this.cache != null; if(!avail) { logMissingCacheError(); } return avail; } /** * Put a new session in the cache * @param ssoId session id * @param fullyQualifiedSessionId fully qualified session id * @throws Exception */ private void putSessionInCache(String ssoId, FullyQualifiedSessionId fullyQualifiedSessionId) throws Exception { if(log.isTraceEnabled()) { log.trace("Put session " + fullyQualifiedSessionId + " in cache against SSO " + ssoId); } SessionKey key = new SessionKey(ssoId); key.cast(this.cache).putIfAbsent(key, null).put(fullyQualifiedSessionId, null); getSSOIds(); int ssoSessionCount = getSSOSessions(ssoId).size(); if(log.isTraceEnabled()) { log.trace("putSessionInCache: session count= " + ssoSessionCount + " for ssoid= " + ssoId); } } /** * Put or update user credentials in the cache * @param ssoId session id * @param credentials * @throws Exception */ private void putCredentialsInCache(String ssoId, SSOCredentials credentials) throws Exception { if(log.isTraceEnabled()) { log.trace("Put Credentials for '" + credentials.getUsername() + "' in cache against SSO " + ssoId); } CredentialKey key = new CredentialKey(ssoId); key.cast(this.cache).put(key, credentials); getSSOIds(); } private SSOCredentials getCredentialsFromCache(String ssoId) { CredentialKey key = new CredentialKey(ssoId); return key.cast(this.cache).get(key); } /** * Remove the specified session from the cache (used for session.logout) * * @param ssoId the session id representing the shared single session * @throws Exception */ private void removeSSOFromCache(String ssoId) throws Exception { SSOKey key = new SessionKey(ssoId); this.cache.remove(key); key = new CredentialKey(ssoId); this.cache.remove(key); if(log.isTraceEnabled()) { log.trace("removeSSOFromCache(): removed SSO id =" + ssoId); } } /** * Remove one of the sessions associated with the users shared single session * @param ssoId ssoId the session id representing the shared single session * @param fullyQualifiedSessionId representing the session to remove * @throws Exception */ private void removeSessionFromCache(String ssoId, FullyQualifiedSessionId fullyQualifiedSessionId) throws Exception { SessionKey key = new SessionKey(ssoId); Map map = key.cast(this.cache).get(key); if(map != null) { int beforeCount = map.size(); map.remove(fullyQualifiedSessionId); if(log.isTraceEnabled()) { log.trace("removeSessionFromCache " + fullyQualifiedSessionId.toString() + " for SSO id=" + ssoId + "session count for SSO was=" + beforeCount + " and is now=" + map.size()); } if(map.size() == 0) // last session removed, send signal that credentials can be removed after maxEmptyLife { notifySSOEmpty(ssoId); } } else if(log.isTraceEnabled()) { log.trace("removeSessionFromCache(): could not find SSO id=" + ssoId + ", will not be able to remove session=" + fullyQualifiedSessionId.toString()); } } /** * Stores the given data to the clustered cache. * * @param ssoId the id of the SSO session * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST * or FORM) used to authenticate the SSO. * @param username the username (if any) used for the authentication * @param password the password (if any) used for the authentication */ private void storeCredentials(String ssoId, String authType, String username, String password) { SSOCredentials credentials = new SSOCredentials(authType, username, password); // Add this SSO to our list of in-process local adds so // this.nodeModified() will ignore the addition beingLocallyAdded.set(ssoId); try { putCredentialsInCache(ssoId, credentials); } catch(Exception e) { log.error("Exception attempting to add Cache nodes for SSO " + ssoId, e); } finally { beingLocallyAdded.set(null); } } private void initThreadPool() { if((threadPoolName != null) && (getMBeanServer() != null)) { try { ObjectName on = new ObjectName(threadPoolName); threadPool = (ThreadPool) server.getAttribute(on, "Instance"); log.debug("Using ThreadPool at " + threadPoolName + " to clean dead members"); } catch(Exception e) { log.info("Unable to access ThreadPool at " + threadPoolName + " -- will use individual threads for cleanup work"); log.debug("Failure to access ThreadPool due to: " + e); } } else { log.debug("No ThreadPool configured -- will use individual threads for cleanup work"); } } private boolean isMissingCacheErrorLogged() { return missingCacheErrorLogged; } private void setMissingCacheErrorLogged(boolean missingCacheErrorLogged) { this.missingCacheErrorLogged = missingCacheErrorLogged; } private void logMissingCacheError() { StringBuffer msg = new StringBuffer("Cache is not set"); msg.append(" -- Cache must be started before SSOClusterManagerImpl "); msg.append("can handle requests"); if(isMissingCacheErrorLogged()) { // Just log it as a warning log.warn(msg); } else { log.error(msg); // Set a flag so we don't relog this error over and over setMissingCacheErrorLogged(true); } } // --------------------------------------------------------- Outer Classes /** * Runnable that's run when the removal of a node from the cluster has been detected. * Removes any SessionAddress objects associated with dead members from the * session set of each SSO. Operates locally only so each node can independently clean * its SSOs without concern about replication lock conflicts. */ private class SSOCleanerTask implements Runnable { private final boolean checkForEmpty; SSOCleanerTask(boolean checkForEmpty) { this.checkForEmpty = checkForEmpty; } public void run() { synchronized(cleanupMutex) { try { log.debug("check if we have to clean up SSO for any members that left the cluster."); // Ensure we have a TransactionManager if(tm == null) { configureFromCache(); } Set ids = getSSOIds(); for(String sso : ids) { cleanSSO(sso); } } catch(Exception e) { log.error("Caught exception cleaning sessions from dead cluster members from SSOs ", e); } } } private void cleanSSO(String ssoId) { boolean doTx = false; try { // Don't start tx if there is already one associated with this thread. if(tm.getTransaction() == null) { doTx = true; } if(doTx) { tm.begin(); } Set fullyQualifiedSessionIds = getSSOSessions(ssoId); if(fullyQualifiedSessionIds.size() > 0) { for(FullyQualifiedSessionId fullyQualifiedSessionId : fullyQualifiedSessionIds) { boolean alive; synchronized(currentView) { alive = currentView.contains(fullyQualifiedSessionId.getHostName()); } if(!alive) { if(log.isTraceEnabled()) { log.trace("Removing peer " + fullyQualifiedSessionId + " from SSO " + ssoId); } // Remove the peer node removeSessionFromCache(ssoId, fullyQualifiedSessionId); } } } else if(checkForEmpty) { // SSO has no peers; notify our valve so we can expire it ssoValve.notifySSOEmpty(ssoId); } } catch(Exception e) { try { if(doTx) { tm.setRollbackOnly(); } } catch(Exception ignored) { } log.error("caught exception cleaning dead members from SSO " + ssoId, e); } finally { if(doTx) { endTransaction(); } } } } }