Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 1,137   Methods: 45
NCLOC: 794   Classes: 4
 
 Source file Conditionals Statements Methods TOTAL
BuddyManager.java 63.9% 81.6% 95.6% 78%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.buddyreplication;
 8   
 9    import org.apache.commons.logging.Log;
 10    import org.apache.commons.logging.LogFactory;
 11    import org.jboss.cache.CacheException;
 12    import org.jboss.cache.CacheImpl;
 13    import org.jboss.cache.Fqn;
 14    import org.jboss.cache.Region;
 15    import org.jboss.cache.config.BuddyReplicationConfig;
 16    import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
 17    import org.jboss.cache.lock.TimeoutException;
 18    import org.jboss.cache.marshall.MethodCall;
 19    import org.jboss.cache.marshall.MethodCallFactory;
 20    import org.jboss.cache.marshall.MethodDeclarations;
 21    import org.jboss.cache.notifications.annotation.CacheListener;
 22    import org.jboss.cache.notifications.annotation.ViewChanged;
 23    import org.jboss.cache.notifications.event.ViewChangedEvent;
 24    import org.jboss.cache.statetransfer.StateTransferManager;
 25    import org.jboss.cache.util.ExposedByteArrayOutputStream;
 26    import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 27    import org.jboss.util.stream.MarshalledValueInputStream;
 28    import org.jboss.util.stream.MarshalledValueOutputStream;
 29    import org.jgroups.Address;
 30    import org.jgroups.Channel;
 31    import org.jgroups.View;
 32    import org.jgroups.util.Util;
 33   
 34    import java.io.ByteArrayInputStream;
 35    import java.util.ArrayList;
 36    import java.util.Arrays;
 37    import java.util.Collection;
 38    import java.util.HashMap;
 39    import java.util.Iterator;
 40    import java.util.LinkedList;
 41    import java.util.List;
 42    import java.util.Map;
 43    import java.util.Set;
 44    import java.util.Vector;
 45    import java.util.concurrent.BlockingQueue;
 46    import java.util.concurrent.ConcurrentHashMap;
 47    import java.util.concurrent.CountDownLatch;
 48    import java.util.concurrent.LinkedBlockingQueue;
 49    import java.util.concurrent.TimeUnit;
 50   
 51    /**
 52    * Class that manages buddy replication groups.
 53    *
 54    * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
 55    */
 56    public class BuddyManager
 57    {
 58    private static Log log = LogFactory.getLog(BuddyManager.class);
 59   
 60    /**
 61    * Configuration object.
 62    */
 63    final BuddyReplicationConfig config;
 64   
 65    /**
 66    * Buddy locator class
 67    */
 68    BuddyLocator buddyLocator;
 69   
 70    /**
 71    * back-refernce to the CacheImpl object
 72    */
 73    private CacheImpl cache;
 74   
 75    /**
 76    * The buddy group set up for this instance
 77    */
 78    BuddyGroup buddyGroup;
 79   
 80    /**
 81    * Map of buddy pools received from broadcasts
 82    */
 83    final Map<Address, String> buddyPool = new ConcurrentHashMap<Address, String>();
 84   
 85    /**
 86    * The nullBuddyPool is a set of addresses that have not specified buddy pools.
 87    */
 88    final Set<Address> nullBuddyPool = new ConcurrentHashSet<Address>();
 89   
 90    /**
 91    * Map of bddy groups the current instance participates in as a backup node.
 92    * Keyed on String group name, values are BuddyGroup objects.
 93    * Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
 94    */
 95    Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap<String, BuddyGroup>();
 96   
 97    /**
 98    * Queue to deal with queued up view change requests - which are handled asynchronously
 99    */
 100    private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue<MembershipChange>();
 101   
 102    /**
 103    * Async thread that handles items on the view change queue
 104    */
 105    private AsyncViewChangeHandlerThread asyncViewChangeHandler = new AsyncViewChangeHandlerThread();
 106   
 107    /**
 108    * Constants representng the buddy backup subtree
 109    */
 110    public static final String BUDDY_BACKUP_SUBTREE = "_BUDDY_BACKUP_";
 111    public static final Fqn BUDDY_BACKUP_SUBTREE_FQN = Fqn.fromString(BUDDY_BACKUP_SUBTREE);
 112   
 113    /**
 114    * number of times to retry communicating with a selected buddy if the buddy has not been initialised.
 115    */
 116    private static int UNINIT_BUDDIES_RETRIES = 5;
 117    /**
 118    * wait time between retries
 119    */
 120    private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500};
 121   
 122    /**
 123    * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
 124    */
 125    private final Object poolInfoNotifierLock = new Object();
 126   
 127    private CountDownLatch initialisationLatch = new CountDownLatch(1);
 128    // a dummy MembershipChange - a poison-pill to be placed on the membership change queue to notify async handler
 129    // threads to exit gracefully when the BuddyManager has been stopped.
 130    private static final MembershipChange STOP_NOTIFIER = new MembershipChange(null, null);
 131   
 132    private ViewChangeListener viewChangeListener; // the view-change viewChangeListener
 133   
 134  173 public BuddyManager(BuddyReplicationConfig config)
 135    {
 136  173 this.config = config;
 137   
 138  173 BuddyLocatorConfig blc = config.getBuddyLocatorConfig();
 139  173 try
 140    {
 141    // it's OK if the buddy locator config is null.
 142  173 buddyLocator = (blc == null) ? createDefaultBuddyLocator() : createBuddyLocator(blc);
 143    }
 144    catch (Exception e)
 145    {
 146  1 log.warn("Caught exception instantiating buddy locator", e);
 147  1 log.error("Unable to instantiate specified buddyLocatorClass [" + blc + "]. Using default buddyLocator [" + NextMemberBuddyLocator.class.getName() + "] instead, with default properties.");
 148  1 buddyLocator = createDefaultBuddyLocator();
 149    }
 150   
 151    // Update the overall config with the BuddyLocatorConfig actually used
 152  173 if (blc != buddyLocator.getConfig())
 153    {
 154  165 config.setBuddyLocatorConfig(buddyLocator.getConfig());
 155    }
 156    }
 157   
 158  2 public BuddyReplicationConfig getConfig()
 159    {
 160  2 return config;
 161    }
 162   
 163  173 protected BuddyLocator createBuddyLocator(BuddyLocatorConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException
 164    {
 165  173 BuddyLocator bl = (BuddyLocator) Class.forName(config.getBuddyLocatorClass()).newInstance();
 166  172 bl.init(config);
 167  172 return bl;
 168    }
 169   
 170  1 protected BuddyLocator createDefaultBuddyLocator()
 171    {
 172  1 BuddyLocator bl = new NextMemberBuddyLocator();
 173  1 bl.init(null);
 174  1 return bl;
 175    }
 176   
 177  1072 public boolean isEnabled()
 178    {
 179  1072 return config.isEnabled();
 180    }
 181   
 182  4 public String getBuddyPoolName()
 183    {
 184  4 return config.getBuddyPoolName();
 185    }
 186   
 187  354 public static String getGroupNameFromAddress(Object address)
 188    {
 189  354 String s = address.toString();
 190  354 return s.replace(':', '_');
 191    }
 192   
 193    /**
 194    * Stops the buddy manager and the related async thread.
 195    */
 196  292 public void stop()
 197    {
 198    // unregister the viewChangeListener
 199  292 if (cache != null) cache.removeCacheListener(viewChangeListener);
 200  292 try
 201    {
 202  292 queue.clear();
 203  292 queue.put(STOP_NOTIFIER);
 204    }
 205    catch (InterruptedException ie)
 206    {
 207    // do nothing - we're stopping anyway
 208    }
 209    }
 210   
 211  164 public void init(CacheImpl cache) throws CacheException
 212    {
 213  164 log.debug("Starting buddy manager");
 214  164 this.cache = cache;
 215  164 buddyGroup = new BuddyGroup();
 216  164 buddyGroup.setDataOwner(cache.getLocalAddress());
 217  164 buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
 218   
 219  164 if (config.getBuddyPoolName() != null)
 220    {
 221  41 buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName());
 222    }
 223   
 224  164 broadcastBuddyPoolMembership();
 225   
 226    // allow waiting threads to process.
 227  164 initialisationLatch.countDown();
 228   
 229    // register a CacheImpl Listener to reassign buddies as and when view changes occur
 230  164 viewChangeListener = new ViewChangeListener();
 231   
 232  164 cache.addCacheListener(viewChangeListener);
 233   
 234    // assign buddies based on what we know now
 235  164 reassignBuddies(cache.getMembers());
 236  164 asyncViewChangeHandler.start();
 237    }
 238   
 239  467 public boolean isAutoDataGravitation()
 240    {
 241  467 return config.isAutoDataGravitation();
 242    }
 243   
 244  65 public boolean isDataGravitationRemoveOnFind()
 245    {
 246  65 return config.isDataGravitationRemoveOnFind();
 247    }
 248   
 249  35 public boolean isDataGravitationSearchBackupTrees()
 250    {
 251  35 return config.isDataGravitationSearchBackupTrees();
 252    }
 253   
 254  35 public int getBuddyCommunicationTimeout()
 255    {
 256  35 return config.getBuddyCommunicationTimeout();
 257    }
 258   
 259    // -------------- methods to be called by the tree cache viewChangeListener --------------------
 260   
 261    static class MembershipChange
 262    {
 263    List<Address> oldMembers;
 264    List<Address> newMembers;
 265   
 266  317 public MembershipChange(List<Address> oldMembers, List<Address> newMembers)
 267    {
 268  317 this.oldMembers = oldMembers;
 269  317 this.newMembers = newMembers;
 270    }
 271   
 272  0 public String toString()
 273    {
 274  0 return "MembershipChange: Old members = " + oldMembers + " New members = " + newMembers;
 275    }
 276    }
 277   
 278  192 private synchronized void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
 279    {
 280    // put this on a queue
 281  192 try
 282    {
 283  192 if (queue.peek() != STOP_NOTIFIER)
 284    {
 285    //first empty the queue. All queued up view changes that have not been processed yet are now obsolete.
 286  192 queue.clear();
 287  192 MembershipChange mc = new MembershipChange(oldMembers, newMembers);
 288  0 if (log.isTraceEnabled()) log.trace("Enqueueing " + mc + " for async processing");
 289  192 queue.put(mc);
 290    }
 291    }
 292    catch (InterruptedException e)
 293    {
 294  0 log.warn("Caught interrupted exception trying to enqueue a view change event", e);
 295    }
 296    }
 297   
 298    /**
 299    * Called by the TreeCacheListener when a
 300    * view change is detected. Used to find new buddies if
 301    * existing buddies have died or if new members to the cluster
 302    * have been added. Makes use of the BuddyLocator and then
 303    * makes RPC calls to remote nodes to assign/remove buddies.
 304    */
 305  342 private void reassignBuddies(List<Address> members) throws CacheException
 306    {
 307  342 List<Address> membership = new ArrayList<Address>(members); // defensive copy
 308   
 309  342 if (log.isDebugEnabled())
 310    {
 311  0 log.debug("Data owner address " + cache.getLocalAddress());
 312  0 log.debug("Entering updateGroup. Current group: " + buddyGroup + ". Current View membership: " + membership);
 313    }
 314    // some of my buddies have died!
 315  342 List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
 316  342 List<Address> unreachableBuddies;
 317  ? if (!(unreachableBuddies = checkBuddyStatus(newBuddies)).isEmpty())
 318    {
 319    // some of the new buddies are unreachable. Ditch them, try the algo again.
 320  0 membership.removeAll(unreachableBuddies);
 321  0 newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
 322    }
 323  342 List<Address> uninitialisedBuddies = new ArrayList<Address>();
 324  342 List<Address> originalBuddies = buddyGroup.getBuddies();
 325   
 326  342 for (Address newBuddy : newBuddies)
 327    {
 328  312 if (!originalBuddies.contains(newBuddy))
 329    {
 330  232 uninitialisedBuddies.add(newBuddy);
 331    }
 332    }
 333   
 334  342 List<Address> obsoleteBuddies = new ArrayList<Address>();
 335    // find obsolete buddies
 336  342 for (Address origBuddy : originalBuddies)
 337    {
 338  144 if (!newBuddies.contains(origBuddy))
 339    {
 340  60 obsoleteBuddies.add(origBuddy);
 341    }
 342    }
 343   
 344    // Update buddy list
 345  342 boolean buddyGroupMutated = false;
 346  342 if (!obsoleteBuddies.isEmpty())
 347    {
 348  56 removeFromGroup(obsoleteBuddies);
 349  56 buddyGroupMutated = true;
 350    }
 351    else
 352    {
 353  286 log.trace("No obsolete buddies found, nothing to announce.");
 354    }
 355  342 if (!uninitialisedBuddies.isEmpty())
 356    {
 357  224 addBuddies(newBuddies);
 358  224 buddyGroupMutated = true;
 359    }
 360    else
 361    {
 362  118 log.trace("No uninitialized buddies found, nothing to announce.");
 363    }
 364   
 365  342 if (buddyGroupMutated)
 366    {
 367  224 if (log.isInfoEnabled()) log.info("Buddy group members have changed. New buddy group: " + buddyGroup);
 368  224 cache.getConfiguration().getRuntimeConfig().setBuddyGroup(buddyGroup);
 369    }
 370    else
 371  118 log.debug("Nothing has changed; new buddy list is identical to the old one.");
 372   
 373    }
 374   
 375    /**
 376    * Tests whether all members in the list are valid JGroups members.
 377    *
 378    * @param members
 379    * @return
 380    */
 381  342 private List<Address> checkBuddyStatus(List<Address> members)
 382    {
 383  342 Channel ch = cache.getConfiguration().getRuntimeConfig().getChannel();
 384  342 View currentView = ch.getView();
 385  342 List<Address> deadBuddies = new LinkedList<Address>();
 386  0 for (Address a : members) if (!currentView.containsMember(a)) deadBuddies.add(a);
 387  342 return deadBuddies;
 388    }
 389   
 390    // -------------- methods to be called by the tree cache --------------------
 391   
 392    /**
 393    * Called by CacheImpl._remoteAnnounceBuddyPoolName(Address address, String buddyPoolName)
 394    * when a view change occurs and caches need to inform the cluster of which buddy pool it is in.
 395    */
 396  489 public void handlePoolNameBroadcast(Address address, String poolName)
 397    {
 398  489 if (log.isDebugEnabled())
 399    {
 400  0 log.debug("BuddyManager@" + Integer.toHexString(hashCode()) + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
 401    }
 402  489 if (poolName != null)
 403    {
 404  139 buddyPool.put(address, poolName);
 405    }
 406    else
 407    {
 408  350 synchronized (nullBuddyPool)
 409    {
 410  240 if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address);
 411    }
 412    }
 413   
 414    // notify any waiting view change threads that buddy pool info has been received.
 415  489 synchronized (poolInfoNotifierLock)
 416    {
 417  489 log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
 418  489 poolInfoNotifierLock.notifyAll();
 419    }
 420    }
 421   
 422    /**
 423    * Called by CacheImpl._remoteRemoveFromBuddyGroup(String groupName)
 424    * when a method call for this is received from a remote cache.
 425    */
 426  45 public void handleRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException
 427    {
 428  45 try
 429    {
 430  45 if (!initialisationLatch.await(0, TimeUnit.NANOSECONDS))
 431  0 throw new BuddyNotInitException("Not yet initialised");
 432    }
 433    catch (InterruptedException e)
 434    {
 435  0 log.debug("Caught InterruptedException", e);
 436    }
 437   
 438  45 if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
 439  45 buddyGroupsIParticipateIn.remove(groupName);
 440   
 441    // remove backup data for this group
 442  45 if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
 443   
 444  45 try
 445    {
 446    // should be a LOCAL call.
 447  45 cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
 448   
 449  45 cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
 450    }
 451    catch (CacheException e)
 452    {
 453  0 log.error("Unable to remove backup data for group " + groupName, e);
 454    }
 455    finally
 456    {
 457  45 cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
 458    }
 459    }
 460   
 461    /**
 462    * Called by CacheImpl._remoteAssignToBuddyGroup(BuddyGroup g) when a method
 463    * call for this is received from a remote cache.
 464    *
 465    * @param newGroup the buddy group
 466    * @param state Map<Fqn, byte[]> of any state from the DataOwner. Cannot
 467    * be <code>null</code>.
 468    */
 469  250 public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map<Fqn, byte[]> state) throws Exception
 470    {
 471  250 try
 472    {
 473  250 if (!initialisationLatch.await(0, TimeUnit.NANOSECONDS))
 474  4 throw new BuddyNotInitException("Not yet initialised");
 475    }
 476    catch (InterruptedException e)
 477    {
 478  0 log.debug("Caught InterruptedException", e);
 479    }
 480   
 481  246 if (log.isInfoEnabled()) log.info("Assigning self to buddy group " + newGroup);
 482  246 buddyGroupsIParticipateIn.put(newGroup.getGroupName(), newGroup);
 483   
 484    // Integrate state transfer from the data owner of the buddy group
 485  246 Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
 486    newGroup.getGroupName());
 487   
 488  246 StateTransferManager stateMgr = cache.getStateTransferManager();
 489   
 490  246 for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
 491    {
 492  238 Fqn fqn = entry.getKey();
 493  238 if (!cache.getRegionManager().isInactive(fqn))
 494    {
 495    //ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
 496  238 Fqn integrationRoot = new Fqn(integrationBase, fqn);
 497   
 498  238 byte[] stateBuffer = entry.getValue();
 499  238 MarshalledValueInputStream in = null;
 500  238 try
 501    {
 502  238 ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
 503  238 in = new MarshalledValueInputStream(bais);
 504    //stateMgr.setState(in, integrationRoot, cl);
 505  238 stateMgr.setState(in, integrationRoot);
 506    }
 507    catch (Throwable t)
 508    {
 509  0 log.error("State for fqn " + fqn + " could not be transferred to a buddy at " + cache.getLocalAddress());
 510    }
 511    finally
 512    {
 513  238 if (in != null)
 514    {
 515  238 in.close();
 516    }
 517    }
 518    }
 519    }
 520    }
 521   
 522    // -------------- static util methods ------------------
 523   
 524    /**
 525    * Utility method that retrieves a buddy backup Fqn given the actual Fqn of some data and the data owner's Address.
 526    *
 527    * @param dataOwnerAddress the JGroups {@link org.jgroups.Address} of the data owner
 528    * @param origFqn the original Fqn
 529    * @return a backup Fqn
 530    */
 531  37 public static Fqn getBackupFqn(Address dataOwnerAddress, Fqn origFqn)
 532    {
 533  37 return getBackupFqn(getGroupNameFromAddress(dataOwnerAddress), origFqn);
 534    }
 535   
 536    /**
 537    * Utility method that retrieves a buddy backup Fqn given the actual Fqn of some data and the buddy group name.
 538    *
 539    * @param buddyGroupName the buddy group name
 540    * @param origFqn the original Fqn
 541    * @return a backup Fqn
 542    */
 543  248 public static Fqn getBackupFqn(String buddyGroupName, Fqn origFqn)
 544    {
 545  248 if (isBackupFqn(origFqn))
 546  0 throw new CacheException("Cannot make a backup Fqn from a backup Fqn! Attempting to create a backup of " + origFqn);
 547  248 List<Object> elements = new ArrayList<Object>();
 548  248 elements.add(BUDDY_BACKUP_SUBTREE);
 549  248 elements.add(buddyGroupName);
 550  248 elements.addAll(origFqn.peekElements());
 551   
 552  248 return new Fqn(elements);
 553    }
 554   
 555    /**
 556    * Utility method that retrieves a buddy backup Fqn given the actual Fqn of some data and the backup subtree for the
 557    * buddy group in question
 558    *
 559    * @param buddyGroupRoot the subtree under which data for a particular buddy is backed up
 560    * @param origFqn the original Fqn
 561    * @return a backup Fqn
 562    */
 563  26 public static Fqn getBackupFqn(Fqn buddyGroupRoot, Fqn origFqn)
 564    {
 565  26 if (origFqn.isChildOf(buddyGroupRoot))
 566    {
 567  0 return origFqn;
 568    }
 569   
 570  26 List<Object> elements = new ArrayList<Object>();
 571  26 elements.add(BUDDY_BACKUP_SUBTREE);
 572  26 elements.add(buddyGroupRoot.get(1));
 573  26 elements.addAll(origFqn.peekElements());
 574   
 575  26 return new Fqn(elements);
 576    }
 577   
 578  221352 public static boolean isBackupFqn(Fqn name)
 579    {
 580  221353 return name != null && name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE);
 581    }
 582   
 583    // -------------- methods to be called by the BaseRPCINterceptor --------------------
 584   
 585    /**
 586    * Returns a list of buddies for which this instance is Data Owner.
 587    * List excludes self. Used by the BaseRPCInterceptor when deciding
 588    * who to replicate to.
 589    */
 590  213 public List<Address> getBuddyAddresses()
 591    {
 592  213 return buddyGroup.getBuddies();
 593    }
 594   
 595    /**
 596    * Introspects method call for Fqns and changes them such that they
 597    * are under the current buddy group's backup subtree
 598    * (e.g., /_buddy_backup_/my_host:7890/) rather than the root (/).
 599    * Called by BaseRPCInterceptor to transform method calls before broadcasting.
 600    */
 601  164 public MethodCall transformFqns(MethodCall call)
 602    {
 603  164 return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id);
 604    }
 605   
 606  234 public MethodCall transformFqns(MethodCall call, boolean transformForCurrentCall)
 607    {
 608  234 if (call != null && call.getArgs() != null)
 609    {
 610  234 MethodCall call2 = MethodCallFactory.create(call.getMethod(), call.getArgs().clone());
 611  234 handleArgs(call2.getArgs(), transformForCurrentCall);
 612  234 return call2;
 613    }
 614    else
 615    {
 616  0 return call;
 617    }
 618    }
 619   
 620    // -------------- internal helpers methods --------------------
 621   
 622  56 private void removeFromGroup(List<Address> buddies)
 623    {
 624  56 if (log.isDebugEnabled())
 625    {
 626  0 log.debug("Removing obsolete buddies from buddy group [" + buddyGroup.getGroupName() + "]. Obsolete buddies are " + buddies);
 627    }
 628  56 buddyGroup.removeBuddies(buddies);
 629    // now broadcast a message to the removed buddies.
 630  56 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteRemoveFromBuddyGroupMethod, buddyGroup.getGroupName());
 631  56 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
 632   
 633  56 int attemptsLeft = UNINIT_BUDDIES_RETRIES;
 634  56 int currentAttempt = 0;
 635   
 636  56 while (attemptsLeft-- > 0)
 637    {
 638  56 try
 639    {
 640  56 makeRemoteCall(buddies, replicateCall, true);
 641  56 break;
 642    }
 643    catch (Exception e)
 644    {
 645  0 if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
 646    {
 647  0 if (attemptsLeft > 0)
 648    {
 649  0 log.info("One of the buddies have not been initialised. Will retry after a short nap.");
 650  0 try
 651    {
 652  0 Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
 653    }
 654    catch (InterruptedException e1)
 655    {
 656    // what do we do?
 657  0 log.trace("Thread interrupted while sleeping/waiting for a retry", e1);
 658    }
 659    }
 660    else
 661    {
 662  0 throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
 663    }
 664    }
 665    else
 666    {
 667  0 log.error("Unable to communicate with Buddy for some reason", e);
 668    }
 669    }
 670    }
 671  56 log.trace("removeFromGroup notification complete");
 672    }
 673   
 674  224 private void addBuddies(List<Address> buddies) throws CacheException
 675    {
 676    // this check is redundant - if buddies is empty this method will not be called. - Manik
 677   
 678    // if (buddies.size() == 0)
 679    // return;
 680   
 681   
 682  224 if (log.isDebugEnabled())
 683    {
 684  0 log.debug("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "]. New buddies are " + buddies);
 685    }
 686   
 687   
 688  224 buddyGroup.addBuddies(buddies);
 689   
 690    // Create the state transfer map
 691   
 692  224 Map<Fqn, byte[]> stateMap = new HashMap<Fqn, byte[]>();
 693  224 byte[] state;
 694  224 if (cache.getConfiguration().isUseRegionBasedMarshalling())
 695    {
 696  8 Collection<Region> regions = cache.getRegionManager().getAllRegions(Region.Type.MARSHALLING);
 697  8 if (regions.size() > 0)
 698    {
 699  0 for (Region r : regions)
 700    {
 701  0 Fqn f = r.getFqn();
 702  0 state = acquireState(f);
 703  0 if (state != null)
 704    {
 705  0 stateMap.put(f, state);
 706    }
 707    }
 708    }
 709  8 else if (!cache.getConfiguration().isInactiveOnStartup())
 710    {
 711    // No regions defined; try the root
 712  0 state = acquireState(Fqn.ROOT);
 713  0 if (state != null)
 714    {
 715  0 stateMap.put(Fqn.ROOT, state);
 716    }
 717    }
 718    }
 719    else
 720    {
 721  216 state = acquireState(Fqn.ROOT);
 722  216 if (state != null)
 723    {
 724  216 stateMap.put(Fqn.ROOT, state);
 725    }
 726    }
 727   
 728    // now broadcast a message to the newly assigned buddies.
 729  224 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup, stateMap);
 730  224 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
 731   
 732  224 int attemptsLeft = UNINIT_BUDDIES_RETRIES;
 733  224 int currentAttempt = 0;
 734   
 735  224 while (attemptsLeft-- > 0)
 736    {
 737  224 try
 738    {
 739  224 makeRemoteCall(buddies, replicateCall, true);
 740  224 break;
 741    }
 742    catch (Exception e)
 743    {
 744  0 if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
 745    {
 746  0 if (attemptsLeft > 0)
 747    {
 748  0 log.info("One of the buddies have not been initialised. Will retry after a short nap.");
 749  0 try
 750    {
 751  0 Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
 752    }
 753    catch (InterruptedException e1)
 754    {
 755    // what do we do?
 756  0 log.trace("Thread interrupted while sleeping/waiting for a retry", e1);
 757    }
 758   
 759    }
 760    else
 761    {
 762  0 throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
 763    }
 764    }
 765    else
 766    {
 767  0 log.error("Unable to communicate with Buddy for some reason", e);
 768    }
 769    }
 770    }
 771   
 772  224 log.trace("addToGroup notification complete");
 773    }
 774   
 775  216 private byte[] acquireState(Fqn fqn) throws CacheException
 776    {
 777    // Call _getState with progressively longer timeouts until we
 778    // get state or it doesn't throw a TimeoutException
 779  216 long[] timeouts = {400, 800, 1600};
 780  216 TimeoutException timeoutException = null;
 781   
 782  216 boolean trace = log.isTraceEnabled();
 783   
 784  216 for (int i = 0; i < timeouts.length; i++)
 785    {
 786  216 timeoutException = null;
 787   
 788  216 boolean force = (i == timeouts.length - 1);
 789   
 790  216 try
 791    {
 792  216 byte[] state = generateState(fqn, timeouts[i], force, false);
 793  216 if (log.isDebugEnabled())
 794    {
 795  0 log.debug("acquireState(): got state");
 796    }
 797  216 return state;
 798    }
 799    catch (TimeoutException t)
 800    {
 801  0 timeoutException = t;
 802  0 if (trace)
 803    {
 804  0 log.trace("acquireState(): got a TimeoutException");
 805    }
 806    }
 807    catch (Exception e)
 808    {
 809  0 throw new CacheException("Error acquiring state", e);
 810    }
 811    catch (Throwable t)
 812    {
 813  0 throw new RuntimeException(t);
 814    }
 815    }
 816   
 817    // If we got a timeout exception on the final try,
 818    // this is a failure condition
 819  0 if (timeoutException != null)
 820    {
 821  0 throw new CacheException("acquireState(): Failed getting state due to timeout",
 822    timeoutException);
 823    }
 824   
 825  0 if (log.isDebugEnabled())
 826    {
 827  0 log.debug("acquireState(): Unable to give state");
 828    }
 829   
 830  0 return null;
 831    }
 832   
 833    /**
 834    * Returns the state for the portion of the cache named by <code>fqn</code>.
 835    * <p/>
 836    * State returned is a serialized byte[][], element 0 is the transient state
 837    * (or null), and element 1 is the persistent state (or null).
 838    *
 839    * @param fqn Fqn indicating the uppermost node in the
 840    * portion of the cache whose state should be returned.
 841    * @param timeout max number of ms this method should wait to acquire
 842    * a read lock on the nodes being transferred
 843    * @param force if a read lock cannot be acquired after
 844    * <code>timeout</code> ms, should the lock acquisition
 845    * be forced, and any existing transactions holding locks
 846    * on the nodes be rolled back? <strong>NOTE:</strong>
 847    * In release 1.2.4, this parameter has no effect.
 848    * @param suppressErrors should any Throwable thrown be suppressed?
 849    * @return a serialized byte[][], element 0 is the transient state
 850    * (or null), and element 1 is the persistent state (or null).
 851    */
 852  216 public byte[] generateState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
 853    {
 854   
 855  216 MarshalledValueOutputStream out = null;
 856  216 byte[] result = null;
 857  216 try
 858    {
 859  216 ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
 860  216 out = new MarshalledValueOutputStream(baos);
 861  216 cache.getStateTransferManager().getState(out, fqn, timeout, force, suppressErrors);
 862  216 result = baos.getRawBuffer();
 863    }
 864    finally
 865    {
 866  216 Util.close(out);
 867    }
 868   
 869  216 return result;
 870    }
 871   
 872    /**
 873    * Called by the BuddyGroupMembershipMonitor every time a view change occurs.
 874    */
 875  330 private void broadcastBuddyPoolMembership()
 876    {
 877  330 broadcastBuddyPoolMembership(null);
 878    }
 879   
 880  346 private void broadcastBuddyPoolMembership(List<Address> recipients)
 881    {
 882    // broadcast to other caches
 883  346 if (log.isDebugEnabled())
 884    {
 885  0 log.debug("Instance " + buddyGroup.getDataOwner() + " broadcasting membership in buddy pool " + config.getBuddyPoolName() + " to recipients " + recipients);
 886    }
 887   
 888  346 MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAnnounceBuddyPoolNameMethod, buddyGroup.getDataOwner(), config.getBuddyPoolName());
 889  346 MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
 890   
 891  346 try
 892    {
 893  346 makeRemoteCall(recipients, replicateCall, true);
 894    }
 895    catch (Exception e)
 896    {
 897  0 log.error("Problems broadcasting buddy pool membership info to cluster", e);
 898    }
 899    }
 900   
 901  626 private void makeRemoteCall(List<Address> recipients, MethodCall call, boolean sync) throws Exception
 902    {
 903    // remove non-members from dest list
 904  626 if (recipients != null)
 905    {
 906  296 Iterator<Address> recipientsIt = recipients.iterator();
 907  296 List<Address> members = cache.getMembers();
 908  296 while (recipientsIt.hasNext())
 909    {
 910  329 if (!members.contains(recipientsIt.next()))
 911    {
 912  12 recipientsIt.remove();
 913   
 914    }
 915    }
 916    }
 917   
 918  626 cache.getRPCManager().callRemoteMethods(recipients, call, sync, true, config.getBuddyCommunicationTimeout());
 919    }
 920   
 921   
 922  261 private void handleArgs(Object[] args, boolean transformForCurrentCall)
 923    {
 924  261 for (int i = 0; i < args.length; i++)
 925    {
 926  979 if (args[i] instanceof MethodCall)
 927    {
 928  70 MethodCall call = (MethodCall) args[i];
 929  70 boolean transformFqns = true;
 930  70 if (call.getMethodId() == MethodDeclarations.dataGravitationCleanupMethod_id)
 931    {
 932  10 transformFqns = false;
 933    }
 934   
 935  70 args[i] = transformFqns((MethodCall) args[i], transformFqns);
 936    }
 937   
 938  979 if (args[i] instanceof List && args[i] != null)
 939    {
 940  27 Object[] asArray = ((List) args[i]).toArray();
 941  27 handleArgs(asArray, transformForCurrentCall);
 942  27 List newList = new ArrayList(asArray.length);
 943    // Oops! JDK 5.0!
 944    //Collections.addAll(newList, asArray);
 945  27 newList.addAll(Arrays.asList(asArray));
 946  27 args[i] = newList;
 947    }
 948   
 949  979 if (args[i] instanceof Fqn)
 950    {
 951  216 Fqn fqn = (Fqn) args[i];
 952  142 if (transformForCurrentCall) args[i] = getBackupFqn(fqn);
 953    }
 954    }
 955    }
 956   
 957    /**
 958    * Assumes the backup Fqn if the current instance is the data owner
 959    *
 960    * @param originalFqn
 961    * @return backup fqn
 962    */
 963  142 public Fqn getBackupFqn(Fqn originalFqn)
 964    {
 965  142 return getBackupFqn(buddyGroup == null || buddyGroup.getGroupName() == null ? "null" : buddyGroup.getGroupName(), originalFqn);
 966    }
 967   
 968  65 public static Fqn getActualFqn(Fqn fqn)
 969    {
 970  21 if (!isBackupFqn(fqn)) return fqn;
 971    // remove the first 2 elements
 972  44 return fqn.getSubFqn(2, fqn.size());
 973    }
 974   
 975    /**
 976    * Asynchronous thread that deals with handling view changes placed on a queue
 977    */
 978    private class AsyncViewChangeHandlerThread implements Runnable
 979    {
 980    private Thread t;
 981    private boolean isRunning = true;
 982   
 983  164 public void start()
 984    {
 985  164 if (t == null || !t.isAlive())
 986    {
 987  164 t = new Thread(this);
 988  164 t.setName("AsyncViewChangeHandlerThread," + cache.getLocalAddress());
 989  164 t.setDaemon(true);
 990  164 t.start();
 991    }
 992    }
 993   
 994  164 public void run()
 995    {
 996    // don't start this thread until the Buddy Manager has initialised as it cocks things up.
 997  164 try
 998    {
 999  164 initialisationLatch.await();
 1000    }
 1001    catch (InterruptedException e)
 1002    {
 1003  0 log.debug("Caught InterruptedException", e);
 1004    }
 1005  164 while (!Thread.interrupted() && isRunning)
 1006    {
 1007  353 try
 1008    {
 1009  353 handleEnqueuedViewChange();
 1010    }
 1011    catch (InterruptedException e)
 1012    {
 1013  0 break;
 1014    }
 1015    catch (Throwable t)
 1016    {
 1017    // Don't let the thread die
 1018  0 log.error("Caught exception handling view change", t);
 1019    }
 1020    }
 1021  162 log.trace("Exiting run()");
 1022    }
 1023   
 1024  353 private void handleEnqueuedViewChange() throws Exception
 1025    {
 1026  353 log.trace("Waiting for enqueued view change events");
 1027  353 MembershipChange members = queue.take();
 1028  353 if (members == STOP_NOTIFIER)
 1029    {
 1030    // time to go home
 1031  162 isRunning = false;
 1032  162 return;
 1033    }
 1034   
 1035    // there is a strange case where JGroups issues view changes and just includes self in new views, and then
 1036    // quickly corrects it. Happens intermittently on some unit tests. If this is such a case, please ignore.
 1037  191 if (members.newMembers.size() == 1 && members.newMembers.get(0).equals(cache.getLocalAddress()))
 1038    {
 1039  11 log.info("Ignoring membership change event since it only contains self.");
 1040  11 return;
 1041    }
 1042   
 1043  180 broadcastPoolMembership(members);
 1044   
 1045  180 boolean rebroadcast = false;
 1046   
 1047    // make sure new buddies have broadcast their pool memberships.
 1048  180 while (!buddyPoolInfoAvailable(members.newMembers))
 1049    {
 1050  4 rebroadcast = true;
 1051  4 synchronized (poolInfoNotifierLock)
 1052    {
 1053  4 log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
 1054  4 poolInfoNotifierLock.wait();
 1055    }
 1056    }
 1057   
 1058  2 if (rebroadcast) broadcastPoolMembership(members);
 1059   
 1060    // always refresh buddy list.
 1061  178 reassignBuddies(members.newMembers);
 1062    }
 1063   
 1064  182 private void broadcastPoolMembership(MembershipChange members)
 1065    {
 1066  182 log.trace("Broadcasting pool membership details, triggered by view change.");
 1067  182 if (members.oldMembers == null)
 1068    {
 1069  166 broadcastBuddyPoolMembership();
 1070    }
 1071    else
 1072    {
 1073  16 List<Address> delta = new ArrayList<Address>();
 1074  16 delta.addAll(members.newMembers);
 1075  16 delta.removeAll(members.oldMembers);
 1076  16 broadcastBuddyPoolMembership(delta);
 1077    }
 1078    }
 1079   
 1080  182 private boolean buddyPoolInfoAvailable(List<Address> newMembers)
 1081    {
 1082  182 boolean infoReceived = true;
 1083  182 for (Address address : newMembers)
 1084    {
 1085    // make sure no one is concurrently writing to nullBuddyPool.
 1086  528 synchronized (nullBuddyPool)
 1087    {
 1088    // log.trace("Testing on node " + buddyGroup.getDataOwner() + " for candidate " + address);
 1089    // log.trace("Is me? " + address.equals(cache.getLocalAddress()));
 1090    // log.trace("is in bP? " + buddyPool.keySet().contains(address));
 1091    // log.trace("is in nBP? " + nullBuddyPool.contains(address));
 1092  528 infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
 1093    }
 1094    }
 1095   
 1096  182 if (log.isTraceEnabled())
 1097    {
 1098  0 log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived);
 1099    }
 1100   
 1101  182 return infoReceived;
 1102    }
 1103   
 1104  0 public void stop()
 1105    {
 1106  0 if (t != null) t.interrupt();
 1107    }
 1108    }
 1109   
 1110    @CacheListener
 1111    public class ViewChangeListener
 1112    {
 1113    private Vector<Address> oldMembers;
 1114   
 1115  192 @ViewChanged
 1116    public void handleViewChange(ViewChangedEvent event)
 1117    {
 1118  192 View newView = event.getNewView();
 1119  192 if (log.isTraceEnabled())
 1120  0 log.trace("BuddyManager CacheListener - got view change with new view " + newView);
 1121  192 Vector<Address> newMembers = newView.getMembers();
 1122   
 1123    // the whole 'oldMembers' concept is only used for buddy pool announcements.
 1124  192 if (config.getBuddyPoolName() == null)
 1125    {
 1126  140 enqueueViewChange(null, newMembers);
 1127    }
 1128    else
 1129    {
 1130  52 enqueueViewChange(oldMembers == null ? null : new Vector<Address>(oldMembers), new Vector<Address>(newMembers));
 1131  35 if (oldMembers == null) oldMembers = new Vector<Address>();
 1132  52 oldMembers.clear();
 1133  52 oldMembers.addAll(newMembers);
 1134    }
 1135    }
 1136    }
 1137    }