IllegalStateException for distributed system
smohanan Jun 3, 2014 12:54 PMHi all,
I have been trying to implement a distributed system that would store data in the node that does a get command. My idea was to use the KeyAffinityService to find a key that is associated with the local node and each time a before a put command is done, store a key that would refer to the same local node and use a grouping API that would use this key to store the value in the local node.
The code I have is the following:
SimpleCache.java:
import java.util.*; import java.util.concurrent.*; import org.infinispan.Cache; import org.infinispan.affinity.*; import org.infinispan.manager.*; //Used to store the key for the local node class locaddr{ static String nut; static String sim;} public class SimpleCache { public void start() throws Exception { EmbeddedCacheManager manager = new DefaultCacheManager("democluster.xml"); Cache<String, String> cache = manager.getCache(); String command = ""; int ticketid = 1; Scanner scan = new Scanner(System.in); cache.start(); manager.start(); // Create the affinity service to find the Key for the manager KeyAffinityService keyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService( cache, (KeyGenerator)new RndKeyGenerator(), Executors.newSingleThreadExecutor(), 100); //Find key associated with local node locaddr.nut = Objects.toString(keyAffinityService.getKeyForAddress(manager.getAddress())); log("Start of program....."); log("Input one of following commands:"); log("book"); log("pay"); log("list"); log("locaddr"); log("quit"); while (true){ command = scan.nextLine(); if (command.equals("book")) { log("Enter name "); String name = scan.nextLine(); locaddr.sim = Objects.toString(keyAffinityService.getCollocatedKey(locaddr.nut)); cache.put(Integer.toString(ticketid)+manager.getAddress().toString(),name); log("Booked ticket " + name); ticketid++; } else if (command.equals("pay")) { log("Enter ticket number "); String id = scan.nextLine(); log("Display ticket:"+cache.get(id)); String ticket = cache.remove(id); log("Checked out ticket " + ticket); } else if (command.equals("list")) { Set <String> set = cache.keySet(); for (String ticket: set) { log(ticket + " " + cache.get(ticket)); } } else if (command.equals("quit")) { cache.clear(); cache.stop(); manager.stop(); keyAffinityService.stop(); log("Bye"); break; } else if (command.equals("locaddr")) { log("local key for manager is: "+locaddr.nut); log("manager address is: " + manager.getAddress()); } else { log("Unknown command " + command); } } } public static void main(String[] args) throws Exception{ new SimpleCache().start(); } public static void log(String s){ System.out.println(s); } }
democluster.xml:
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport> <properties> <property name="configurationFile" value="jgroups-tcp.xml" /> </properties> </transport> </global> <default> <clustering mode="distributed" > <sync/> <hash numOwners="1" numSegments="100" capacityFactor="1"> <groups enabled="true"> <grouper class="KXGrouper"/> </groups> </hash> </clustering> </default> </infinispan>
KXGrouper.java:
import org.infinispan.distribution.group.Grouper; public class KXGrouper implements Grouper<String> { public String computeGroup(String key, String group) { String g = locaddr.sim; return g; } public Class<String> getKeyType() { return String.class; } }
My implementation is based in a simple cache implementation example of infinispan. However I am having two main issues:
1. When I run this code in separate JVMs, sometimes it will work but sometimes when I do a "book" command (which invokes the collocated key command and the cache put command), I get an error where it says the another node is no longer part of the cluster. The error looks like this:
Exception in thread "main" java.lang.IllegalStateException: Address SRI-PC-4630 is no longer in the cluster at org.infinispan.affinity.KeyAffinityServiceImpl.getKeyForAddress(KeyAffinityServiceImpl.java:107) at org.infinispan.affinity.KeyAffinityServiceImpl.getCollocatedKey(KeyAffinityServiceImpl.java:91) at SimpleCache.start(SimpleCache.java:77) at SimpleCache.main(SimpleCache.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Where "Address SRI-PC-4630" would be the address of the manager in another JVM that is running. I have been looking online for a solution to this issue but no one seems to have a similar problem.
2. If I do get it running and do a "book" and have a key/value stored in the local node, I cannot access it from any other node.
I have been trying to fix this but to no avail and any advice or recommendation would be greatly appreciated.
Thank you for your time