1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.buddyreplication; |
8 |
| |
9 |
| import org.apache.commons.logging.Log; |
10 |
| import org.apache.commons.logging.LogFactory; |
11 |
| import org.jboss.cache.CacheException; |
12 |
| import org.jboss.cache.CacheImpl; |
13 |
| import org.jboss.cache.Fqn; |
14 |
| import org.jboss.cache.Region; |
15 |
| import org.jboss.cache.config.BuddyReplicationConfig; |
16 |
| import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig; |
17 |
| import org.jboss.cache.lock.TimeoutException; |
18 |
| import org.jboss.cache.marshall.MethodCall; |
19 |
| import org.jboss.cache.marshall.MethodCallFactory; |
20 |
| import org.jboss.cache.marshall.MethodDeclarations; |
21 |
| import org.jboss.cache.notifications.annotation.CacheListener; |
22 |
| import org.jboss.cache.notifications.annotation.ViewChanged; |
23 |
| import org.jboss.cache.notifications.event.ViewChangedEvent; |
24 |
| import org.jboss.cache.statetransfer.StateTransferManager; |
25 |
| import org.jboss.cache.util.ExposedByteArrayOutputStream; |
26 |
| import org.jboss.cache.util.concurrent.ConcurrentHashSet; |
27 |
| import org.jboss.util.stream.MarshalledValueInputStream; |
28 |
| import org.jboss.util.stream.MarshalledValueOutputStream; |
29 |
| import org.jgroups.Address; |
30 |
| import org.jgroups.Channel; |
31 |
| import org.jgroups.View; |
32 |
| import org.jgroups.util.Util; |
33 |
| |
34 |
| import java.io.ByteArrayInputStream; |
35 |
| import java.util.ArrayList; |
36 |
| import java.util.Arrays; |
37 |
| import java.util.Collection; |
38 |
| import java.util.HashMap; |
39 |
| import java.util.Iterator; |
40 |
| import java.util.LinkedList; |
41 |
| import java.util.List; |
42 |
| import java.util.Map; |
43 |
| import java.util.Set; |
44 |
| import java.util.Vector; |
45 |
| import java.util.concurrent.BlockingQueue; |
46 |
| import java.util.concurrent.ConcurrentHashMap; |
47 |
| import java.util.concurrent.CountDownLatch; |
48 |
| import java.util.concurrent.LinkedBlockingQueue; |
49 |
| import java.util.concurrent.TimeUnit; |
50 |
| |
51 |
| |
52 |
| |
53 |
| |
54 |
| |
55 |
| |
56 |
| public class BuddyManager |
57 |
| { |
58 |
| private static Log log = LogFactory.getLog(BuddyManager.class); |
59 |
| |
60 |
| |
61 |
| |
62 |
| |
63 |
| final BuddyReplicationConfig config; |
64 |
| |
65 |
| |
66 |
| |
67 |
| |
68 |
| BuddyLocator buddyLocator; |
69 |
| |
70 |
| |
71 |
| |
72 |
| |
73 |
| private CacheImpl cache; |
74 |
| |
75 |
| |
76 |
| |
77 |
| |
78 |
| BuddyGroup buddyGroup; |
79 |
| |
80 |
| |
81 |
| |
82 |
| |
83 |
| final Map<Address, String> buddyPool = new ConcurrentHashMap<Address, String>(); |
84 |
| |
85 |
| |
86 |
| |
87 |
| |
88 |
| final Set<Address> nullBuddyPool = new ConcurrentHashSet<Address>(); |
89 |
| |
90 |
| |
91 |
| |
92 |
| |
93 |
| |
94 |
| |
95 |
| Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap<String, BuddyGroup>(); |
96 |
| |
97 |
| |
98 |
| |
99 |
| |
100 |
| private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue<MembershipChange>(); |
101 |
| |
102 |
| |
103 |
| |
104 |
| |
105 |
| private AsyncViewChangeHandlerThread asyncViewChangeHandler = new AsyncViewChangeHandlerThread(); |
106 |
| |
107 |
| |
108 |
| |
109 |
| |
110 |
| public static final String BUDDY_BACKUP_SUBTREE = "_BUDDY_BACKUP_"; |
111 |
| public static final Fqn BUDDY_BACKUP_SUBTREE_FQN = Fqn.fromString(BUDDY_BACKUP_SUBTREE); |
112 |
| |
113 |
| |
114 |
| |
115 |
| |
116 |
| private static int UNINIT_BUDDIES_RETRIES = 5; |
117 |
| |
118 |
| |
119 |
| |
120 |
| private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500}; |
121 |
| |
122 |
| |
123 |
| |
124 |
| |
125 |
| private final Object poolInfoNotifierLock = new Object(); |
126 |
| |
127 |
| private CountDownLatch initialisationLatch = new CountDownLatch(1); |
128 |
| |
129 |
| |
130 |
| private static final MembershipChange STOP_NOTIFIER = new MembershipChange(null, null); |
131 |
| |
132 |
| private ViewChangeListener viewChangeListener; |
133 |
| |
134 |
173
| public BuddyManager(BuddyReplicationConfig config)
|
135 |
| { |
136 |
173
| this.config = config;
|
137 |
| |
138 |
173
| BuddyLocatorConfig blc = config.getBuddyLocatorConfig();
|
139 |
173
| try
|
140 |
| { |
141 |
| |
142 |
173
| buddyLocator = (blc == null) ? createDefaultBuddyLocator() : createBuddyLocator(blc);
|
143 |
| } |
144 |
| catch (Exception e) |
145 |
| { |
146 |
1
| log.warn("Caught exception instantiating buddy locator", e);
|
147 |
1
| log.error("Unable to instantiate specified buddyLocatorClass [" + blc + "]. Using default buddyLocator [" + NextMemberBuddyLocator.class.getName() + "] instead, with default properties.");
|
148 |
1
| buddyLocator = createDefaultBuddyLocator();
|
149 |
| } |
150 |
| |
151 |
| |
152 |
173
| if (blc != buddyLocator.getConfig())
|
153 |
| { |
154 |
165
| config.setBuddyLocatorConfig(buddyLocator.getConfig());
|
155 |
| } |
156 |
| } |
157 |
| |
158 |
2
| public BuddyReplicationConfig getConfig()
|
159 |
| { |
160 |
2
| return config;
|
161 |
| } |
162 |
| |
163 |
173
| protected BuddyLocator createBuddyLocator(BuddyLocatorConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException
|
164 |
| { |
165 |
173
| BuddyLocator bl = (BuddyLocator) Class.forName(config.getBuddyLocatorClass()).newInstance();
|
166 |
172
| bl.init(config);
|
167 |
172
| return bl;
|
168 |
| } |
169 |
| |
170 |
1
| protected BuddyLocator createDefaultBuddyLocator()
|
171 |
| { |
172 |
1
| BuddyLocator bl = new NextMemberBuddyLocator();
|
173 |
1
| bl.init(null);
|
174 |
1
| return bl;
|
175 |
| } |
176 |
| |
177 |
1072
| public boolean isEnabled()
|
178 |
| { |
179 |
1072
| return config.isEnabled();
|
180 |
| } |
181 |
| |
182 |
4
| public String getBuddyPoolName()
|
183 |
| { |
184 |
4
| return config.getBuddyPoolName();
|
185 |
| } |
186 |
| |
187 |
354
| public static String getGroupNameFromAddress(Object address)
|
188 |
| { |
189 |
354
| String s = address.toString();
|
190 |
354
| return s.replace(':', '_');
|
191 |
| } |
192 |
| |
193 |
| |
194 |
| |
195 |
| |
196 |
292
| public void stop()
|
197 |
| { |
198 |
| |
199 |
292
| if (cache != null) cache.removeCacheListener(viewChangeListener);
|
200 |
292
| try
|
201 |
| { |
202 |
292
| queue.clear();
|
203 |
292
| queue.put(STOP_NOTIFIER);
|
204 |
| } |
205 |
| catch (InterruptedException ie) |
206 |
| { |
207 |
| |
208 |
| } |
209 |
| } |
210 |
| |
211 |
164
| public void init(CacheImpl cache) throws CacheException
|
212 |
| { |
213 |
164
| log.debug("Starting buddy manager");
|
214 |
164
| this.cache = cache;
|
215 |
164
| buddyGroup = new BuddyGroup();
|
216 |
164
| buddyGroup.setDataOwner(cache.getLocalAddress());
|
217 |
164
| buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
|
218 |
| |
219 |
164
| if (config.getBuddyPoolName() != null)
|
220 |
| { |
221 |
41
| buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName());
|
222 |
| } |
223 |
| |
224 |
164
| broadcastBuddyPoolMembership();
|
225 |
| |
226 |
| |
227 |
164
| initialisationLatch.countDown();
|
228 |
| |
229 |
| |
230 |
164
| viewChangeListener = new ViewChangeListener();
|
231 |
| |
232 |
164
| cache.addCacheListener(viewChangeListener);
|
233 |
| |
234 |
| |
235 |
164
| reassignBuddies(cache.getMembers());
|
236 |
164
| asyncViewChangeHandler.start();
|
237 |
| } |
238 |
| |
239 |
467
| public boolean isAutoDataGravitation()
|
240 |
| { |
241 |
467
| return config.isAutoDataGravitation();
|
242 |
| } |
243 |
| |
244 |
65
| public boolean isDataGravitationRemoveOnFind()
|
245 |
| { |
246 |
65
| return config.isDataGravitationRemoveOnFind();
|
247 |
| } |
248 |
| |
249 |
35
| public boolean isDataGravitationSearchBackupTrees()
|
250 |
| { |
251 |
35
| return config.isDataGravitationSearchBackupTrees();
|
252 |
| } |
253 |
| |
254 |
35
| public int getBuddyCommunicationTimeout()
|
255 |
| { |
256 |
35
| return config.getBuddyCommunicationTimeout();
|
257 |
| } |
258 |
| |
259 |
| |
260 |
| |
261 |
| static class MembershipChange |
262 |
| { |
263 |
| List<Address> oldMembers; |
264 |
| List<Address> newMembers; |
265 |
| |
266 |
317
| public MembershipChange(List<Address> oldMembers, List<Address> newMembers)
|
267 |
| { |
268 |
317
| this.oldMembers = oldMembers;
|
269 |
317
| this.newMembers = newMembers;
|
270 |
| } |
271 |
| |
272 |
0
| public String toString()
|
273 |
| { |
274 |
0
| return "MembershipChange: Old members = " + oldMembers + " New members = " + newMembers;
|
275 |
| } |
276 |
| } |
277 |
| |
278 |
192
| private synchronized void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
|
279 |
| { |
280 |
| |
281 |
192
| try
|
282 |
| { |
283 |
192
| if (queue.peek() != STOP_NOTIFIER)
|
284 |
| { |
285 |
| |
286 |
192
| queue.clear();
|
287 |
192
| MembershipChange mc = new MembershipChange(oldMembers, newMembers);
|
288 |
0
| if (log.isTraceEnabled()) log.trace("Enqueueing " + mc + " for async processing");
|
289 |
192
| queue.put(mc);
|
290 |
| } |
291 |
| } |
292 |
| catch (InterruptedException e) |
293 |
| { |
294 |
0
| log.warn("Caught interrupted exception trying to enqueue a view change event", e);
|
295 |
| } |
296 |
| } |
297 |
| |
298 |
| |
299 |
| |
300 |
| |
301 |
| |
302 |
| |
303 |
| |
304 |
| |
305 |
342
| private void reassignBuddies(List<Address> members) throws CacheException
|
306 |
| { |
307 |
342
| List<Address> membership = new ArrayList<Address>(members);
|
308 |
| |
309 |
342
| if (log.isDebugEnabled())
|
310 |
| { |
311 |
0
| log.debug("Data owner address " + cache.getLocalAddress());
|
312 |
0
| log.debug("Entering updateGroup. Current group: " + buddyGroup + ". Current View membership: " + membership);
|
313 |
| } |
314 |
| |
315 |
342
| List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
|
316 |
342
| List<Address> unreachableBuddies;
|
317 |
?
| if (!(unreachableBuddies = checkBuddyStatus(newBuddies)).isEmpty())
|
318 |
| { |
319 |
| |
320 |
0
| membership.removeAll(unreachableBuddies);
|
321 |
0
| newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
|
322 |
| } |
323 |
342
| List<Address> uninitialisedBuddies = new ArrayList<Address>();
|
324 |
342
| List<Address> originalBuddies = buddyGroup.getBuddies();
|
325 |
| |
326 |
342
| for (Address newBuddy : newBuddies)
|
327 |
| { |
328 |
312
| if (!originalBuddies.contains(newBuddy))
|
329 |
| { |
330 |
232
| uninitialisedBuddies.add(newBuddy);
|
331 |
| } |
332 |
| } |
333 |
| |
334 |
342
| List<Address> obsoleteBuddies = new ArrayList<Address>();
|
335 |
| |
336 |
342
| for (Address origBuddy : originalBuddies)
|
337 |
| { |
338 |
144
| if (!newBuddies.contains(origBuddy))
|
339 |
| { |
340 |
60
| obsoleteBuddies.add(origBuddy);
|
341 |
| } |
342 |
| } |
343 |
| |
344 |
| |
345 |
342
| boolean buddyGroupMutated = false;
|
346 |
342
| if (!obsoleteBuddies.isEmpty())
|
347 |
| { |
348 |
56
| removeFromGroup(obsoleteBuddies);
|
349 |
56
| buddyGroupMutated = true;
|
350 |
| } |
351 |
| else |
352 |
| { |
353 |
286
| log.trace("No obsolete buddies found, nothing to announce.");
|
354 |
| } |
355 |
342
| if (!uninitialisedBuddies.isEmpty())
|
356 |
| { |
357 |
224
| addBuddies(newBuddies);
|
358 |
224
| buddyGroupMutated = true;
|
359 |
| } |
360 |
| else |
361 |
| { |
362 |
118
| log.trace("No uninitialized buddies found, nothing to announce.");
|
363 |
| } |
364 |
| |
365 |
342
| if (buddyGroupMutated)
|
366 |
| { |
367 |
224
| if (log.isInfoEnabled()) log.info("Buddy group members have changed. New buddy group: " + buddyGroup);
|
368 |
224
| cache.getConfiguration().getRuntimeConfig().setBuddyGroup(buddyGroup);
|
369 |
| } |
370 |
| else |
371 |
118
| log.debug("Nothing has changed; new buddy list is identical to the old one.");
|
372 |
| |
373 |
| } |
374 |
| |
375 |
| |
376 |
| |
377 |
| |
378 |
| |
379 |
| |
380 |
| |
381 |
342
| private List<Address> checkBuddyStatus(List<Address> members)
|
382 |
| { |
383 |
342
| Channel ch = cache.getConfiguration().getRuntimeConfig().getChannel();
|
384 |
342
| View currentView = ch.getView();
|
385 |
342
| List<Address> deadBuddies = new LinkedList<Address>();
|
386 |
0
| for (Address a : members) if (!currentView.containsMember(a)) deadBuddies.add(a);
|
387 |
342
| return deadBuddies;
|
388 |
| } |
389 |
| |
390 |
| |
391 |
| |
392 |
| |
393 |
| |
394 |
| |
395 |
| |
396 |
489
| public void handlePoolNameBroadcast(Address address, String poolName)
|
397 |
| { |
398 |
489
| if (log.isDebugEnabled())
|
399 |
| { |
400 |
0
| log.debug("BuddyManager@" + Integer.toHexString(hashCode()) + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
|
401 |
| } |
402 |
489
| if (poolName != null)
|
403 |
| { |
404 |
139
| buddyPool.put(address, poolName);
|
405 |
| } |
406 |
| else |
407 |
| { |
408 |
350
| synchronized (nullBuddyPool)
|
409 |
| { |
410 |
240
| if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address);
|
411 |
| } |
412 |
| } |
413 |
| |
414 |
| |
415 |
489
| synchronized (poolInfoNotifierLock)
|
416 |
| { |
417 |
489
| log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
|
418 |
489
| poolInfoNotifierLock.notifyAll();
|
419 |
| } |
420 |
| } |
421 |
| |
422 |
| |
423 |
| |
424 |
| |
425 |
| |
426 |
45
| public void handleRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException
|
427 |
| { |
428 |
45
| try
|
429 |
| { |
430 |
45
| if (!initialisationLatch.await(0, TimeUnit.NANOSECONDS))
|
431 |
0
| throw new BuddyNotInitException("Not yet initialised");
|
432 |
| } |
433 |
| catch (InterruptedException e) |
434 |
| { |
435 |
0
| log.debug("Caught InterruptedException", e);
|
436 |
| } |
437 |
| |
438 |
45
| if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
|
439 |
45
| buddyGroupsIParticipateIn.remove(groupName);
|
440 |
| |
441 |
| |
442 |
45
| if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
|
443 |
| |
444 |
45
| try
|
445 |
| { |
446 |
| |
447 |
45
| cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
|
448 |
| |
449 |
45
| cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
|
450 |
| } |
451 |
| catch (CacheException e) |
452 |
| { |
453 |
0
| log.error("Unable to remove backup data for group " + groupName, e);
|
454 |
| } |
455 |
| finally |
456 |
| { |
457 |
45
| cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
|
458 |
| } |
459 |
| } |
460 |
| |
461 |
| |
462 |
| |
463 |
| |
464 |
| |
465 |
| |
466 |
| |
467 |
| |
468 |
| |
469 |
250
| public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map<Fqn, byte[]> state) throws Exception
|
470 |
| { |
471 |
250
| try
|
472 |
| { |
473 |
250
| if (!initialisationLatch.await(0, TimeUnit.NANOSECONDS))
|
474 |
4
| throw new BuddyNotInitException("Not yet initialised");
|
475 |
| } |
476 |
| catch (InterruptedException e) |
477 |
| { |
478 |
0
| log.debug("Caught InterruptedException", e);
|
479 |
| } |
480 |
| |
481 |
246
| if (log.isInfoEnabled()) log.info("Assigning self to buddy group " + newGroup);
|
482 |
246
| buddyGroupsIParticipateIn.put(newGroup.getGroupName(), newGroup);
|
483 |
| |
484 |
| |
485 |
246
| Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
|
486 |
| newGroup.getGroupName()); |
487 |
| |
488 |
246
| StateTransferManager stateMgr = cache.getStateTransferManager();
|
489 |
| |
490 |
246
| for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
|
491 |
| { |
492 |
238
| Fqn fqn = entry.getKey();
|
493 |
238
| if (!cache.getRegionManager().isInactive(fqn))
|
494 |
| { |
495 |
| |
496 |
238
| Fqn integrationRoot = new Fqn(integrationBase, fqn);
|
497 |
| |
498 |
238
| byte[] stateBuffer = entry.getValue();
|
499 |
238
| MarshalledValueInputStream in = null;
|
500 |
238
| try
|
501 |
| { |
502 |
238
| ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
|
503 |
238
| in = new MarshalledValueInputStream(bais);
|
504 |
| |
505 |
238
| stateMgr.setState(in, integrationRoot);
|
506 |
| } |
507 |
| catch (Throwable t) |
508 |
| { |
509 |
0
| log.error("State for fqn " + fqn + " could not be transferred to a buddy at " + cache.getLocalAddress());
|
510 |
| } |
511 |
| finally |
512 |
| { |
513 |
238
| if (in != null)
|
514 |
| { |
515 |
238
| in.close();
|
516 |
| } |
517 |
| } |
518 |
| } |
519 |
| } |
520 |
| } |
521 |
| |
522 |
| |
523 |
| |
524 |
| |
525 |
| |
526 |
| |
527 |
| |
528 |
| |
529 |
| |
530 |
| |
531 |
37
| public static Fqn getBackupFqn(Address dataOwnerAddress, Fqn origFqn)
|
532 |
| { |
533 |
37
| return getBackupFqn(getGroupNameFromAddress(dataOwnerAddress), origFqn);
|
534 |
| } |
535 |
| |
536 |
| |
537 |
| |
538 |
| |
539 |
| |
540 |
| |
541 |
| |
542 |
| |
543 |
248
| public static Fqn getBackupFqn(String buddyGroupName, Fqn origFqn)
|
544 |
| { |
545 |
248
| if (isBackupFqn(origFqn))
|
546 |
0
| throw new CacheException("Cannot make a backup Fqn from a backup Fqn! Attempting to create a backup of " + origFqn);
|
547 |
248
| List<Object> elements = new ArrayList<Object>();
|
548 |
248
| elements.add(BUDDY_BACKUP_SUBTREE);
|
549 |
248
| elements.add(buddyGroupName);
|
550 |
248
| elements.addAll(origFqn.peekElements());
|
551 |
| |
552 |
248
| return new Fqn(elements);
|
553 |
| } |
554 |
| |
555 |
| |
556 |
| |
557 |
| |
558 |
| |
559 |
| |
560 |
| |
561 |
| |
562 |
| |
563 |
26
| public static Fqn getBackupFqn(Fqn buddyGroupRoot, Fqn origFqn)
|
564 |
| { |
565 |
26
| if (origFqn.isChildOf(buddyGroupRoot))
|
566 |
| { |
567 |
0
| return origFqn;
|
568 |
| } |
569 |
| |
570 |
26
| List<Object> elements = new ArrayList<Object>();
|
571 |
26
| elements.add(BUDDY_BACKUP_SUBTREE);
|
572 |
26
| elements.add(buddyGroupRoot.get(1));
|
573 |
26
| elements.addAll(origFqn.peekElements());
|
574 |
| |
575 |
26
| return new Fqn(elements);
|
576 |
| } |
577 |
| |
578 |
221352
| public static boolean isBackupFqn(Fqn name)
|
579 |
| { |
580 |
221353
| return name != null && name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE);
|
581 |
| } |
582 |
| |
583 |
| |
584 |
| |
585 |
| |
586 |
| |
587 |
| |
588 |
| |
589 |
| |
590 |
213
| public List<Address> getBuddyAddresses()
|
591 |
| { |
592 |
213
| return buddyGroup.getBuddies();
|
593 |
| } |
594 |
| |
595 |
| |
596 |
| |
597 |
| |
598 |
| |
599 |
| |
600 |
| |
601 |
164
| public MethodCall transformFqns(MethodCall call)
|
602 |
| { |
603 |
164
| return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id);
|
604 |
| } |
605 |
| |
606 |
234
| public MethodCall transformFqns(MethodCall call, boolean transformForCurrentCall)
|
607 |
| { |
608 |
234
| if (call != null && call.getArgs() != null)
|
609 |
| { |
610 |
234
| MethodCall call2 = MethodCallFactory.create(call.getMethod(), call.getArgs().clone());
|
611 |
234
| handleArgs(call2.getArgs(), transformForCurrentCall);
|
612 |
234
| return call2;
|
613 |
| } |
614 |
| else |
615 |
| { |
616 |
0
| return call;
|
617 |
| } |
618 |
| } |
619 |
| |
620 |
| |
621 |
| |
622 |
56
| private void removeFromGroup(List<Address> buddies)
|
623 |
| { |
624 |
56
| if (log.isDebugEnabled())
|
625 |
| { |
626 |
0
| log.debug("Removing obsolete buddies from buddy group [" + buddyGroup.getGroupName() + "]. Obsolete buddies are " + buddies);
|
627 |
| } |
628 |
56
| buddyGroup.removeBuddies(buddies);
|
629 |
| |
630 |
56
| MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteRemoveFromBuddyGroupMethod, buddyGroup.getGroupName());
|
631 |
56
| MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
|
632 |
| |
633 |
56
| int attemptsLeft = UNINIT_BUDDIES_RETRIES;
|
634 |
56
| int currentAttempt = 0;
|
635 |
| |
636 |
56
| while (attemptsLeft-- > 0)
|
637 |
| { |
638 |
56
| try
|
639 |
| { |
640 |
56
| makeRemoteCall(buddies, replicateCall, true);
|
641 |
56
| break;
|
642 |
| } |
643 |
| catch (Exception e) |
644 |
| { |
645 |
0
| if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
|
646 |
| { |
647 |
0
| if (attemptsLeft > 0)
|
648 |
| { |
649 |
0
| log.info("One of the buddies have not been initialised. Will retry after a short nap.");
|
650 |
0
| try
|
651 |
| { |
652 |
0
| Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
|
653 |
| } |
654 |
| catch (InterruptedException e1) |
655 |
| { |
656 |
| |
657 |
0
| log.trace("Thread interrupted while sleeping/waiting for a retry", e1);
|
658 |
| } |
659 |
| } |
660 |
| else |
661 |
| { |
662 |
0
| throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
|
663 |
| } |
664 |
| } |
665 |
| else |
666 |
| { |
667 |
0
| log.error("Unable to communicate with Buddy for some reason", e);
|
668 |
| } |
669 |
| } |
670 |
| } |
671 |
56
| log.trace("removeFromGroup notification complete");
|
672 |
| } |
673 |
| |
674 |
224
| private void addBuddies(List<Address> buddies) throws CacheException
|
675 |
| { |
676 |
| |
677 |
| |
678 |
| |
679 |
| |
680 |
| |
681 |
| |
682 |
224
| if (log.isDebugEnabled())
|
683 |
| { |
684 |
0
| log.debug("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "]. New buddies are " + buddies);
|
685 |
| } |
686 |
| |
687 |
| |
688 |
224
| buddyGroup.addBuddies(buddies);
|
689 |
| |
690 |
| |
691 |
| |
692 |
224
| Map<Fqn, byte[]> stateMap = new HashMap<Fqn, byte[]>();
|
693 |
224
| byte[] state;
|
694 |
224
| if (cache.getConfiguration().isUseRegionBasedMarshalling())
|
695 |
| { |
696 |
8
| Collection<Region> regions = cache.getRegionManager().getAllRegions(Region.Type.MARSHALLING);
|
697 |
8
| if (regions.size() > 0)
|
698 |
| { |
699 |
0
| for (Region r : regions)
|
700 |
| { |
701 |
0
| Fqn f = r.getFqn();
|
702 |
0
| state = acquireState(f);
|
703 |
0
| if (state != null)
|
704 |
| { |
705 |
0
| stateMap.put(f, state);
|
706 |
| } |
707 |
| } |
708 |
| } |
709 |
8
| else if (!cache.getConfiguration().isInactiveOnStartup())
|
710 |
| { |
711 |
| |
712 |
0
| state = acquireState(Fqn.ROOT);
|
713 |
0
| if (state != null)
|
714 |
| { |
715 |
0
| stateMap.put(Fqn.ROOT, state);
|
716 |
| } |
717 |
| } |
718 |
| } |
719 |
| else |
720 |
| { |
721 |
216
| state = acquireState(Fqn.ROOT);
|
722 |
216
| if (state != null)
|
723 |
| { |
724 |
216
| stateMap.put(Fqn.ROOT, state);
|
725 |
| } |
726 |
| } |
727 |
| |
728 |
| |
729 |
224
| MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup, stateMap);
|
730 |
224
| MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
|
731 |
| |
732 |
224
| int attemptsLeft = UNINIT_BUDDIES_RETRIES;
|
733 |
224
| int currentAttempt = 0;
|
734 |
| |
735 |
224
| while (attemptsLeft-- > 0)
|
736 |
| { |
737 |
224
| try
|
738 |
| { |
739 |
224
| makeRemoteCall(buddies, replicateCall, true);
|
740 |
224
| break;
|
741 |
| } |
742 |
| catch (Exception e) |
743 |
| { |
744 |
0
| if (e instanceof BuddyNotInitException || e.getCause() instanceof BuddyNotInitException)
|
745 |
| { |
746 |
0
| if (attemptsLeft > 0)
|
747 |
| { |
748 |
0
| log.info("One of the buddies have not been initialised. Will retry after a short nap.");
|
749 |
0
| try
|
750 |
| { |
751 |
0
| Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
|
752 |
| } |
753 |
| catch (InterruptedException e1) |
754 |
| { |
755 |
| |
756 |
0
| log.trace("Thread interrupted while sleeping/waiting for a retry", e1);
|
757 |
| } |
758 |
| |
759 |
| } |
760 |
| else |
761 |
| { |
762 |
0
| throw new BuddyNotInitException("Unable to contact buddy after " + UNINIT_BUDDIES_RETRIES + " retries");
|
763 |
| } |
764 |
| } |
765 |
| else |
766 |
| { |
767 |
0
| log.error("Unable to communicate with Buddy for some reason", e);
|
768 |
| } |
769 |
| } |
770 |
| } |
771 |
| |
772 |
224
| log.trace("addToGroup notification complete");
|
773 |
| } |
774 |
| |
775 |
216
| private byte[] acquireState(Fqn fqn) throws CacheException
|
776 |
| { |
777 |
| |
778 |
| |
779 |
216
| long[] timeouts = {400, 800, 1600};
|
780 |
216
| TimeoutException timeoutException = null;
|
781 |
| |
782 |
216
| boolean trace = log.isTraceEnabled();
|
783 |
| |
784 |
216
| for (int i = 0; i < timeouts.length; i++)
|
785 |
| { |
786 |
216
| timeoutException = null;
|
787 |
| |
788 |
216
| boolean force = (i == timeouts.length - 1);
|
789 |
| |
790 |
216
| try
|
791 |
| { |
792 |
216
| byte[] state = generateState(fqn, timeouts[i], force, false);
|
793 |
216
| if (log.isDebugEnabled())
|
794 |
| { |
795 |
0
| log.debug("acquireState(): got state");
|
796 |
| } |
797 |
216
| return state;
|
798 |
| } |
799 |
| catch (TimeoutException t) |
800 |
| { |
801 |
0
| timeoutException = t;
|
802 |
0
| if (trace)
|
803 |
| { |
804 |
0
| log.trace("acquireState(): got a TimeoutException");
|
805 |
| } |
806 |
| } |
807 |
| catch (Exception e) |
808 |
| { |
809 |
0
| throw new CacheException("Error acquiring state", e);
|
810 |
| } |
811 |
| catch (Throwable t) |
812 |
| { |
813 |
0
| throw new RuntimeException(t);
|
814 |
| } |
815 |
| } |
816 |
| |
817 |
| |
818 |
| |
819 |
0
| if (timeoutException != null)
|
820 |
| { |
821 |
0
| throw new CacheException("acquireState(): Failed getting state due to timeout",
|
822 |
| timeoutException); |
823 |
| } |
824 |
| |
825 |
0
| if (log.isDebugEnabled())
|
826 |
| { |
827 |
0
| log.debug("acquireState(): Unable to give state");
|
828 |
| } |
829 |
| |
830 |
0
| return null;
|
831 |
| } |
832 |
| |
833 |
| |
834 |
| |
835 |
| |
836 |
| |
837 |
| |
838 |
| |
839 |
| |
840 |
| |
841 |
| |
842 |
| |
843 |
| |
844 |
| |
845 |
| |
846 |
| |
847 |
| |
848 |
| |
849 |
| |
850 |
| |
851 |
| |
852 |
216
| public byte[] generateState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
|
853 |
| { |
854 |
| |
855 |
216
| MarshalledValueOutputStream out = null;
|
856 |
216
| byte[] result = null;
|
857 |
216
| try
|
858 |
| { |
859 |
216
| ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
|
860 |
216
| out = new MarshalledValueOutputStream(baos);
|
861 |
216
| cache.getStateTransferManager().getState(out, fqn, timeout, force, suppressErrors);
|
862 |
216
| result = baos.getRawBuffer();
|
863 |
| } |
864 |
| finally |
865 |
| { |
866 |
216
| Util.close(out);
|
867 |
| } |
868 |
| |
869 |
216
| return result;
|
870 |
| } |
871 |
| |
872 |
| |
873 |
| |
874 |
| |
875 |
330
| private void broadcastBuddyPoolMembership()
|
876 |
| { |
877 |
330
| broadcastBuddyPoolMembership(null);
|
878 |
| } |
879 |
| |
880 |
346
| private void broadcastBuddyPoolMembership(List<Address> recipients)
|
881 |
| { |
882 |
| |
883 |
346
| if (log.isDebugEnabled())
|
884 |
| { |
885 |
0
| log.debug("Instance " + buddyGroup.getDataOwner() + " broadcasting membership in buddy pool " + config.getBuddyPoolName() + " to recipients " + recipients);
|
886 |
| } |
887 |
| |
888 |
346
| MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAnnounceBuddyPoolNameMethod, buddyGroup.getDataOwner(), config.getBuddyPoolName());
|
889 |
346
| MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
|
890 |
| |
891 |
346
| try
|
892 |
| { |
893 |
346
| makeRemoteCall(recipients, replicateCall, true);
|
894 |
| } |
895 |
| catch (Exception e) |
896 |
| { |
897 |
0
| log.error("Problems broadcasting buddy pool membership info to cluster", e);
|
898 |
| } |
899 |
| } |
900 |
| |
901 |
626
| private void makeRemoteCall(List<Address> recipients, MethodCall call, boolean sync) throws Exception
|
902 |
| { |
903 |
| |
904 |
626
| if (recipients != null)
|
905 |
| { |
906 |
296
| Iterator<Address> recipientsIt = recipients.iterator();
|
907 |
296
| List<Address> members = cache.getMembers();
|
908 |
296
| while (recipientsIt.hasNext())
|
909 |
| { |
910 |
329
| if (!members.contains(recipientsIt.next()))
|
911 |
| { |
912 |
12
| recipientsIt.remove();
|
913 |
| |
914 |
| } |
915 |
| } |
916 |
| } |
917 |
| |
918 |
626
| cache.getRPCManager().callRemoteMethods(recipients, call, sync, true, config.getBuddyCommunicationTimeout());
|
919 |
| } |
920 |
| |
921 |
| |
922 |
261
| private void handleArgs(Object[] args, boolean transformForCurrentCall)
|
923 |
| { |
924 |
261
| for (int i = 0; i < args.length; i++)
|
925 |
| { |
926 |
979
| if (args[i] instanceof MethodCall)
|
927 |
| { |
928 |
70
| MethodCall call = (MethodCall) args[i];
|
929 |
70
| boolean transformFqns = true;
|
930 |
70
| if (call.getMethodId() == MethodDeclarations.dataGravitationCleanupMethod_id)
|
931 |
| { |
932 |
10
| transformFqns = false;
|
933 |
| } |
934 |
| |
935 |
70
| args[i] = transformFqns((MethodCall) args[i], transformFqns);
|
936 |
| } |
937 |
| |
938 |
979
| if (args[i] instanceof List && args[i] != null)
|
939 |
| { |
940 |
27
| Object[] asArray = ((List) args[i]).toArray();
|
941 |
27
| handleArgs(asArray, transformForCurrentCall);
|
942 |
27
| List newList = new ArrayList(asArray.length);
|
943 |
| |
944 |
| |
945 |
27
| newList.addAll(Arrays.asList(asArray));
|
946 |
27
| args[i] = newList;
|
947 |
| } |
948 |
| |
949 |
979
| if (args[i] instanceof Fqn)
|
950 |
| { |
951 |
216
| Fqn fqn = (Fqn) args[i];
|
952 |
142
| if (transformForCurrentCall) args[i] = getBackupFqn(fqn);
|
953 |
| } |
954 |
| } |
955 |
| } |
956 |
| |
957 |
| |
958 |
| |
959 |
| |
960 |
| |
961 |
| |
962 |
| |
963 |
142
| public Fqn getBackupFqn(Fqn originalFqn)
|
964 |
| { |
965 |
142
| return getBackupFqn(buddyGroup == null || buddyGroup.getGroupName() == null ? "null" : buddyGroup.getGroupName(), originalFqn);
|
966 |
| } |
967 |
| |
968 |
65
| public static Fqn getActualFqn(Fqn fqn)
|
969 |
| { |
970 |
21
| if (!isBackupFqn(fqn)) return fqn;
|
971 |
| |
972 |
44
| return fqn.getSubFqn(2, fqn.size());
|
973 |
| } |
974 |
| |
975 |
| |
976 |
| |
977 |
| |
978 |
| private class AsyncViewChangeHandlerThread implements Runnable |
979 |
| { |
980 |
| private Thread t; |
981 |
| private boolean isRunning = true; |
982 |
| |
983 |
164
| public void start()
|
984 |
| { |
985 |
164
| if (t == null || !t.isAlive())
|
986 |
| { |
987 |
164
| t = new Thread(this);
|
988 |
164
| t.setName("AsyncViewChangeHandlerThread," + cache.getLocalAddress());
|
989 |
164
| t.setDaemon(true);
|
990 |
164
| t.start();
|
991 |
| } |
992 |
| } |
993 |
| |
994 |
164
| public void run()
|
995 |
| { |
996 |
| |
997 |
164
| try
|
998 |
| { |
999 |
164
| initialisationLatch.await();
|
1000 |
| } |
1001 |
| catch (InterruptedException e) |
1002 |
| { |
1003 |
0
| log.debug("Caught InterruptedException", e);
|
1004 |
| } |
1005 |
164
| while (!Thread.interrupted() && isRunning)
|
1006 |
| { |
1007 |
353
| try
|
1008 |
| { |
1009 |
353
| handleEnqueuedViewChange();
|
1010 |
| } |
1011 |
| catch (InterruptedException e) |
1012 |
| { |
1013 |
0
| break;
|
1014 |
| } |
1015 |
| catch (Throwable t) |
1016 |
| { |
1017 |
| |
1018 |
0
| log.error("Caught exception handling view change", t);
|
1019 |
| } |
1020 |
| } |
1021 |
162
| log.trace("Exiting run()");
|
1022 |
| } |
1023 |
| |
1024 |
353
| private void handleEnqueuedViewChange() throws Exception
|
1025 |
| { |
1026 |
353
| log.trace("Waiting for enqueued view change events");
|
1027 |
353
| MembershipChange members = queue.take();
|
1028 |
353
| if (members == STOP_NOTIFIER)
|
1029 |
| { |
1030 |
| |
1031 |
162
| isRunning = false;
|
1032 |
162
| return;
|
1033 |
| } |
1034 |
| |
1035 |
| |
1036 |
| |
1037 |
191
| if (members.newMembers.size() == 1 && members.newMembers.get(0).equals(cache.getLocalAddress()))
|
1038 |
| { |
1039 |
11
| log.info("Ignoring membership change event since it only contains self.");
|
1040 |
11
| return;
|
1041 |
| } |
1042 |
| |
1043 |
180
| broadcastPoolMembership(members);
|
1044 |
| |
1045 |
180
| boolean rebroadcast = false;
|
1046 |
| |
1047 |
| |
1048 |
180
| while (!buddyPoolInfoAvailable(members.newMembers))
|
1049 |
| { |
1050 |
4
| rebroadcast = true;
|
1051 |
4
| synchronized (poolInfoNotifierLock)
|
1052 |
| { |
1053 |
4
| log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
|
1054 |
4
| poolInfoNotifierLock.wait();
|
1055 |
| } |
1056 |
| } |
1057 |
| |
1058 |
2
| if (rebroadcast) broadcastPoolMembership(members);
|
1059 |
| |
1060 |
| |
1061 |
178
| reassignBuddies(members.newMembers);
|
1062 |
| } |
1063 |
| |
1064 |
182
| private void broadcastPoolMembership(MembershipChange members)
|
1065 |
| { |
1066 |
182
| log.trace("Broadcasting pool membership details, triggered by view change.");
|
1067 |
182
| if (members.oldMembers == null)
|
1068 |
| { |
1069 |
166
| broadcastBuddyPoolMembership();
|
1070 |
| } |
1071 |
| else |
1072 |
| { |
1073 |
16
| List<Address> delta = new ArrayList<Address>();
|
1074 |
16
| delta.addAll(members.newMembers);
|
1075 |
16
| delta.removeAll(members.oldMembers);
|
1076 |
16
| broadcastBuddyPoolMembership(delta);
|
1077 |
| } |
1078 |
| } |
1079 |
| |
1080 |
182
| private boolean buddyPoolInfoAvailable(List<Address> newMembers)
|
1081 |
| { |
1082 |
182
| boolean infoReceived = true;
|
1083 |
182
| for (Address address : newMembers)
|
1084 |
| { |
1085 |
| |
1086 |
528
| synchronized (nullBuddyPool)
|
1087 |
| { |
1088 |
| |
1089 |
| |
1090 |
| |
1091 |
| |
1092 |
528
| infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
|
1093 |
| } |
1094 |
| } |
1095 |
| |
1096 |
182
| if (log.isTraceEnabled())
|
1097 |
| { |
1098 |
0
| log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived);
|
1099 |
| } |
1100 |
| |
1101 |
182
| return infoReceived;
|
1102 |
| } |
1103 |
| |
1104 |
0
| public void stop()
|
1105 |
| { |
1106 |
0
| if (t != null) t.interrupt();
|
1107 |
| } |
1108 |
| } |
1109 |
| |
1110 |
| @CacheListener |
1111 |
| public class ViewChangeListener |
1112 |
| { |
1113 |
| private Vector<Address> oldMembers; |
1114 |
| |
1115 |
192
| @ViewChanged
|
1116 |
| public void handleViewChange(ViewChangedEvent event) |
1117 |
| { |
1118 |
192
| View newView = event.getNewView();
|
1119 |
192
| if (log.isTraceEnabled())
|
1120 |
0
| log.trace("BuddyManager CacheListener - got view change with new view " + newView);
|
1121 |
192
| Vector<Address> newMembers = newView.getMembers();
|
1122 |
| |
1123 |
| |
1124 |
192
| if (config.getBuddyPoolName() == null)
|
1125 |
| { |
1126 |
140
| enqueueViewChange(null, newMembers);
|
1127 |
| } |
1128 |
| else |
1129 |
| { |
1130 |
52
| enqueueViewChange(oldMembers == null ? null : new Vector<Address>(oldMembers), new Vector<Address>(newMembers));
|
1131 |
35
| if (oldMembers == null) oldMembers = new Vector<Address>();
|
1132 |
52
| oldMembers.clear();
|
1133 |
52
| oldMembers.addAll(newMembers);
|
1134 |
| } |
1135 |
| } |
1136 |
| } |
1137 |
| } |