IllegalStateException for distributed system
smohanan Jun 3, 2014 12:54 PMHi all,
I have been trying to implement a distributed system that would store data in the node that does a get command. My idea was to use the KeyAffinityService to find a key that is associated with the local node and each time a before a put command is done, store a key that would refer to the same local node and use a grouping API that would use this key to store the value in the local node.
The code I have is the following:
SimpleCache.java:
import java.util.*;
import java.util.concurrent.*;
import org.infinispan.Cache;
import org.infinispan.affinity.*;
import org.infinispan.manager.*;
//Used to store the key for the local node
class locaddr{ static String nut; static String sim;}
public class SimpleCache {
public void start() throws Exception {
EmbeddedCacheManager manager = new DefaultCacheManager("democluster.xml");
Cache<String, String> cache = manager.getCache();
String command = "";
int ticketid = 1;
Scanner scan = new Scanner(System.in);
cache.start();
manager.start();
// Create the affinity service to find the Key for the manager
KeyAffinityService keyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService(
cache,
(KeyGenerator)new RndKeyGenerator(),
Executors.newSingleThreadExecutor(),
100);
//Find key associated with local node
locaddr.nut = Objects.toString(keyAffinityService.getKeyForAddress(manager.getAddress()));
log("Start of program.....");
log("Input one of following commands:");
log("book");
log("pay");
log("list");
log("locaddr");
log("quit");
while (true){
command = scan.nextLine();
if (command.equals("book")) {
log("Enter name ");
String name = scan.nextLine();
locaddr.sim = Objects.toString(keyAffinityService.getCollocatedKey(locaddr.nut));
cache.put(Integer.toString(ticketid)+manager.getAddress().toString(),name);
log("Booked ticket " + name);
ticketid++;
}
else if (command.equals("pay")) {
log("Enter ticket number ");
String id = scan.nextLine();
log("Display ticket:"+cache.get(id));
String ticket = cache.remove(id);
log("Checked out ticket " + ticket);
}
else if (command.equals("list")) {
Set <String> set = cache.keySet();
for (String ticket: set) {
log(ticket + " " + cache.get(ticket));
}
}
else if (command.equals("quit")) {
cache.clear();
cache.stop();
manager.stop();
keyAffinityService.stop();
log("Bye");
break;
}
else if (command.equals("locaddr")) {
log("local key for manager is: "+locaddr.nut);
log("manager address is: " + manager.getAddress());
}
else {
log("Unknown command " + command);
}
}
}
public static void main(String[] args) throws Exception{
new SimpleCache().start();
}
public static void log(String s){
System.out.println(s);
}
}
democluster.xml:
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport> <properties> <property name="configurationFile" value="jgroups-tcp.xml" /> </properties> </transport> </global> <default> <clustering mode="distributed" > <sync/> <hash numOwners="1" numSegments="100" capacityFactor="1"> <groups enabled="true"> <grouper class="KXGrouper"/> </groups> </hash> </clustering> </default> </infinispan>
KXGrouper.java:
import org.infinispan.distribution.group.Grouper;
public class KXGrouper implements Grouper<String> {
public String computeGroup(String key, String group) {
String g = locaddr.sim;
return g;
}
public Class<String> getKeyType() {
return String.class;
}
}
My implementation is based in a simple cache implementation example of infinispan. However I am having two main issues:
1. When I run this code in separate JVMs, sometimes it will work but sometimes when I do a "book" command (which invokes the collocated key command and the cache put command), I get an error where it says the another node is no longer part of the cluster. The error looks like this:
Exception in thread "main" java.lang.IllegalStateException: Address SRI-PC-4630 is no longer in the cluster at org.infinispan.affinity.KeyAffinityServiceImpl.getKeyForAddress(KeyAffinityServiceImpl.java:107) at org.infinispan.affinity.KeyAffinityServiceImpl.getCollocatedKey(KeyAffinityServiceImpl.java:91) at SimpleCache.start(SimpleCache.java:77) at SimpleCache.main(SimpleCache.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Where "Address SRI-PC-4630" would be the address of the manager in another JVM that is running. I have been looking online for a solution to this issue but no one seems to have a similar problem.
2. If I do get it running and do a "book" and have a key/value stored in the local node, I cannot access it from any other node.
I have been trying to fix this but to no avail and any advice or recommendation would be greatly appreciated.
Thank you for your time