1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.statetransfer; |
8 |
| |
9 |
| import org.apache.commons.logging.Log; |
10 |
| import org.apache.commons.logging.LogFactory; |
11 |
| import org.jboss.cache.CacheException; |
12 |
| import org.jboss.cache.CacheImpl; |
13 |
| import org.jboss.cache.Fqn; |
14 |
| import org.jboss.cache.InvocationContext; |
15 |
| import org.jboss.cache.Node; |
16 |
| import org.jboss.cache.NodeFactory; |
17 |
| import org.jboss.cache.NodeSPI; |
18 |
| import org.jboss.cache.Region; |
19 |
| import org.jboss.cache.buddyreplication.BuddyManager; |
20 |
| import org.jboss.cache.eviction.EvictedEventNode; |
21 |
| import org.jboss.cache.eviction.NodeEventType; |
22 |
| import org.jboss.cache.loader.CacheLoader; |
23 |
| import org.jboss.cache.marshall.NodeData; |
24 |
| import org.jboss.cache.marshall.NodeDataExceptionMarker; |
25 |
| import org.jboss.cache.marshall.NodeDataMarker; |
26 |
| |
27 |
| import java.io.IOException; |
28 |
| import java.io.ObjectInputStream; |
29 |
| import java.util.HashSet; |
30 |
| import java.util.Iterator; |
31 |
| import java.util.List; |
32 |
| import java.util.Map; |
33 |
| import java.util.Set; |
34 |
| |
35 |
| public class DefaultStateTransferIntegrator implements StateTransferIntegrator |
36 |
| { |
37 |
| |
38 |
| protected Log log = LogFactory.getLog(getClass().getName()); |
39 |
| |
40 |
| private CacheImpl cache; |
41 |
| |
42 |
| private Fqn targetFqn; |
43 |
| |
44 |
| private NodeFactory factory; |
45 |
| |
46 |
| private NodeFactory.NodeType nodeType; |
47 |
| |
48 |
| private Set<Fqn> internalFqns; |
49 |
| |
50 |
730
| public DefaultStateTransferIntegrator(Fqn targetFqn, CacheImpl cache)
|
51 |
| { |
52 |
730
| this.targetFqn = targetFqn;
|
53 |
730
| this.cache = cache;
|
54 |
730
| this.factory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
|
55 |
730
| this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
|
56 |
| ? NodeFactory.NodeType.VERSIONED_NODE |
57 |
| : NodeFactory.NodeType.UNVERSIONED_NODE; |
58 |
730
| this.internalFqns = cache.getInternalFqns();
|
59 |
| } |
60 |
| |
61 |
730
| public void integrateState(ObjectInputStream ois, Node target) throws Exception
|
62 |
| { |
63 |
730
| integrateTransientState(ois, (NodeSPI) target);
|
64 |
730
| integrateAssociatedState(ois);
|
65 |
730
| integratePersistentState(ois);
|
66 |
| } |
67 |
| |
68 |
730
| protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
|
69 |
| { |
70 |
730
| boolean transientSet = false;
|
71 |
| |
72 |
730
| try
|
73 |
| { |
74 |
730
| if (log.isTraceEnabled())
|
75 |
| { |
76 |
0
| log.trace("integrating transient state for " + target);
|
77 |
| } |
78 |
| |
79 |
730
| integrateTransientState(target, in);
|
80 |
| |
81 |
730
| transientSet = true;
|
82 |
| |
83 |
730
| if (log.isTraceEnabled())
|
84 |
| { |
85 |
0
| log.trace("transient state successfully integrated");
|
86 |
| } |
87 |
| |
88 |
730
| notifyAllNodesCreated(cache.getInvocationContext(), target);
|
89 |
| } |
90 |
| catch (Exception e) |
91 |
| { |
92 |
0
| if (log.isDebugEnabled()) log.debug("Caught unexpected exception", e);
|
93 |
| } |
94 |
| finally |
95 |
| { |
96 |
730
| if (!transientSet)
|
97 |
| { |
98 |
| |
99 |
0
| log.warn("transient state integration failed, removing all children of " + target);
|
100 |
0
| target.clearData();
|
101 |
0
| target.removeChildrenDirect();
|
102 |
| } |
103 |
| |
104 |
| |
105 |
| } |
106 |
| } |
107 |
| |
108 |
| |
109 |
| |
110 |
| |
111 |
| |
112 |
| |
113 |
730
| protected void integrateAssociatedState(ObjectInputStream in) throws Exception
|
114 |
| { |
115 |
| |
116 |
| |
117 |
730
| cache.getMarshaller().objectFromObjectStream(in);
|
118 |
| } |
119 |
| |
120 |
730
| protected void integratePersistentState(ObjectInputStream in) throws Exception
|
121 |
| { |
122 |
| |
123 |
730
| CacheLoader loader = cache.getCacheLoader();
|
124 |
730
| if (loader == null)
|
125 |
| { |
126 |
621
| if (log.isTraceEnabled())
|
127 |
| { |
128 |
0
| log.trace("cache loader is null, will not attempt to integrate persistent state");
|
129 |
| } |
130 |
| } |
131 |
| else |
132 |
| { |
133 |
109
| if (log.isTraceEnabled())
|
134 |
| { |
135 |
0
| log.trace("integrating persistent state using " + loader.getClass().getName());
|
136 |
| } |
137 |
| |
138 |
109
| boolean persistentSet = false;
|
139 |
109
| try
|
140 |
| { |
141 |
109
| if (targetFqn.isRoot())
|
142 |
| { |
143 |
58
| loader.storeEntireState(in);
|
144 |
| } |
145 |
| else |
146 |
| { |
147 |
51
| loader.storeState(targetFqn, in);
|
148 |
| } |
149 |
107
| persistentSet = true;
|
150 |
| } |
151 |
| catch (ClassCastException cce) |
152 |
| { |
153 |
2
| log.error("Failed integrating persistent state. One of cacheloaders is not"
|
154 |
| + " adhering to state stream format. See JBCACHE-738."); |
155 |
2
| throw cce;
|
156 |
| } |
157 |
| finally |
158 |
| { |
159 |
109
| if (!persistentSet)
|
160 |
| { |
161 |
2
| log.warn("persistent state integration failed, removing all nodes from loader");
|
162 |
2
| loader.remove(targetFqn);
|
163 |
| } |
164 |
| else |
165 |
| { |
166 |
107
| if (log.isTraceEnabled())
|
167 |
| { |
168 |
0
| log.trace("persistent state integrated successfully");
|
169 |
| } |
170 |
| } |
171 |
| } |
172 |
| } |
173 |
| } |
174 |
| |
175 |
22276
| protected CacheImpl getCache()
|
176 |
| { |
177 |
22276
| return cache;
|
178 |
| } |
179 |
| |
180 |
0
| protected NodeFactory getFactory()
|
181 |
| { |
182 |
0
| return factory;
|
183 |
| } |
184 |
| |
185 |
0
| protected NodeFactory.NodeType getNodeType()
|
186 |
| { |
187 |
0
| return nodeType;
|
188 |
| } |
189 |
| |
190 |
0
| protected Fqn getTargetFqn()
|
191 |
| { |
192 |
0
| return targetFqn;
|
193 |
| } |
194 |
| |
195 |
| |
196 |
| |
197 |
| |
198 |
| |
199 |
11138
| private void notifyAllNodesCreated(InvocationContext ctx, NodeSPI curr)
|
200 |
| { |
201 |
0
| if (curr == null) return;
|
202 |
11138
| getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true, ctx);
|
203 |
11138
| getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false, ctx);
|
204 |
11138
| Set<NodeSPI> children = curr.getChildrenDirect();
|
205 |
11138
| for (NodeSPI n : children)
|
206 |
| { |
207 |
10408
| notifyAllNodesCreated(ctx, n);
|
208 |
| } |
209 |
| } |
210 |
| |
211 |
| |
212 |
| |
213 |
| |
214 |
| |
215 |
| |
216 |
| |
217 |
| |
218 |
| |
219 |
| |
220 |
| |
221 |
| |
222 |
| |
223 |
| |
224 |
| |
225 |
| |
226 |
| |
227 |
| |
228 |
| |
229 |
| |
230 |
| |
231 |
| |
232 |
730
| private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws Exception
|
233 |
| { |
234 |
730
| Set<Node> retainedNodes = retainInternalNodes(target);
|
235 |
| |
236 |
730
| target.removeChildrenDirect();
|
237 |
| |
238 |
730
| List<NodeData> list = readNodesAsList(in);
|
239 |
730
| if (list != null)
|
240 |
| { |
241 |
| |
242 |
726
| Iterator<NodeData> nodeDataIterator = list.iterator();
|
243 |
| |
244 |
| |
245 |
726
| if (nodeDataIterator.hasNext())
|
246 |
| { |
247 |
726
| NodeData nd = nodeDataIterator.next();
|
248 |
| |
249 |
| |
250 |
726
| if (nd != null && !nd.isMarker())
|
251 |
| { |
252 |
726
| target.putAllDirect(nd.getAttributes());
|
253 |
| |
254 |
| |
255 |
| |
256 |
726
| Fqn tferFqn = nd.getFqn();
|
257 |
726
| Fqn tgtFqn = target.getFqn();
|
258 |
726
| boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
|
259 |
| && !tferFqn.isChildOrEquals(tgtFqn); |
260 |
| |
261 |
726
| int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
|
262 |
| |
263 |
726
| integrateStateTransferChildren(target, offset, nodeDataIterator);
|
264 |
| |
265 |
726
| integrateRetainedNodes(target, retainedNodes);
|
266 |
| } |
267 |
| } |
268 |
| |
269 |
| |
270 |
726
| cache.getMarshaller().objectFromObjectStream(in);
|
271 |
| } |
272 |
| } |
273 |
| |
274 |
730
| private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
|
275 |
| { |
276 |
730
| Object obj = cache.getMarshaller().objectFromObjectStream(in);
|
277 |
4
| if (obj instanceof NodeDataMarker) return null;
|
278 |
| |
279 |
726
| List list = (List) obj;
|
280 |
726
| return list;
|
281 |
| } |
282 |
| |
283 |
| |
284 |
| |
285 |
| |
286 |
| |
287 |
| |
288 |
| |
289 |
| |
290 |
| |
291 |
| |
292 |
| |
293 |
| |
294 |
| |
295 |
11134
| private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, Iterator<NodeData> nodeDataIterator)
|
296 |
| throws IOException, ClassNotFoundException |
297 |
| { |
298 |
11134
| int parent_level = parent.getFqn().size();
|
299 |
11134
| int target_level = parent_level + 1;
|
300 |
11134
| Fqn fqn;
|
301 |
11134
| int size;
|
302 |
11134
| Object name;
|
303 |
11134
| NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
|
304 |
11134
| while (nd != null && !nd.isMarker())
|
305 |
| { |
306 |
20626
| fqn = nd.getFqn();
|
307 |
| |
308 |
| |
309 |
20626
| if (offset > 0)
|
310 |
| { |
311 |
24
| fqn = new Fqn(parent.getFqn().getAncestor(offset), fqn);
|
312 |
| } |
313 |
20626
| size = fqn.size();
|
314 |
20626
| if (size <= parent_level)
|
315 |
| { |
316 |
10218
| return nd;
|
317 |
| } |
318 |
10408
| else if (size > target_level)
|
319 |
| { |
320 |
0
| throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
|
321 |
| } |
322 |
| |
323 |
10408
| name = fqn.get(size - 1);
|
324 |
| |
325 |
10408
| Map attrs = nd.getAttributes();
|
326 |
| |
327 |
| |
328 |
| |
329 |
10408
| NodeSPI target = factory.createDataNode(name, fqn, parent, attrs, false);
|
330 |
10408
| parent.addChild(name, target);
|
331 |
| |
332 |
| |
333 |
10408
| Region region = cache.getRegion(fqn, false);
|
334 |
10408
| if (region != null && region.getEvictionPolicy() != null)
|
335 |
| { |
336 |
10023
| region.putNodeEvent(new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT,
|
337 |
10023
| attrs == null ? 0 : attrs.size()));
|
338 |
| } |
339 |
| |
340 |
| |
341 |
| |
342 |
10408
| nd = integrateStateTransferChildren(target, offset, nodeDataIterator);
|
343 |
| } |
344 |
916
| if (nd != null && nd.isExceptionMarker())
|
345 |
| { |
346 |
0
| NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
|
347 |
0
| throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
|
348 |
| + " threw exception during loadState", ndem.getCause()); |
349 |
| } |
350 |
916
| return null;
|
351 |
| } |
352 |
| |
353 |
730
| private Set<Node> retainInternalNodes(Node target)
|
354 |
| { |
355 |
730
| Set<Node> result = new HashSet<Node>();
|
356 |
730
| Fqn targetFqn = target.getFqn();
|
357 |
730
| for (Fqn internalFqn : internalFqns)
|
358 |
| { |
359 |
240
| if (internalFqn.isChildOf(targetFqn))
|
360 |
| { |
361 |
0
| Node internalNode = getInternalNode(target, internalFqn);
|
362 |
0
| if (internalNode != null)
|
363 |
| { |
364 |
0
| result.add(internalNode);
|
365 |
| } |
366 |
| } |
367 |
| } |
368 |
| |
369 |
730
| return result;
|
370 |
| } |
371 |
| |
372 |
0
| private Node getInternalNode(Node parent, Fqn internalFqn)
|
373 |
| { |
374 |
0
| Object name = internalFqn.get(parent.getFqn().size());
|
375 |
0
| Node result = parent.getChild(new Fqn(name));
|
376 |
0
| if (result != null)
|
377 |
| { |
378 |
0
| if (internalFqn.size() < result.getFqn().size())
|
379 |
| { |
380 |
| |
381 |
0
| result = getInternalNode(result, internalFqn);
|
382 |
| } |
383 |
| } |
384 |
0
| return result;
|
385 |
| } |
386 |
| |
387 |
726
| private void integrateRetainedNodes(NodeSPI root, Set<Node> retainedNodes)
|
388 |
| { |
389 |
726
| Fqn rootFqn = root.getFqn();
|
390 |
726
| for (Node retained : retainedNodes)
|
391 |
| { |
392 |
0
| if (retained.getFqn().isChildOf(rootFqn))
|
393 |
| { |
394 |
0
| integrateRetainedNode(root, retained);
|
395 |
| } |
396 |
| } |
397 |
| } |
398 |
| |
399 |
0
| private void integrateRetainedNode(NodeSPI ancestor, Node descendant)
|
400 |
| { |
401 |
0
| Fqn descFqn = descendant.getFqn();
|
402 |
0
| Fqn ancFqn = ancestor.getFqn();
|
403 |
0
| Object name = descFqn.get(ancFqn.size());
|
404 |
0
| NodeSPI child = (NodeSPI) ancestor.getChild(new Fqn(name));
|
405 |
0
| if (ancFqn.size() == descFqn.size() + 1)
|
406 |
| { |
407 |
0
| if (child == null)
|
408 |
| { |
409 |
0
| ancestor.addChild(name, descendant);
|
410 |
| } |
411 |
| else |
412 |
| { |
413 |
0
| log.warn("Received unexpected internal node " + descFqn + " in transferred state");
|
414 |
| } |
415 |
| } |
416 |
| else |
417 |
| { |
418 |
0
| if (child == null)
|
419 |
| { |
420 |
| |
421 |
| |
422 |
| |
423 |
0
| child = factory.createDataNode(name, new Fqn(ancFqn, name), ancestor, null, true);
|
424 |
0
| ancestor.addChild(name, child);
|
425 |
| } |
426 |
| |
427 |
| |
428 |
0
| integrateRetainedNode(child, descendant);
|
429 |
| } |
430 |
| } |
431 |
| } |