0 Replies Latest reply on Jun 3, 2014 12:54 PM by smohanan

    IllegalStateException for distributed system

    smohanan

      Hi all,

       

      I have been trying to implement a distributed system that would store data in the node that does a get command. My idea was to use the KeyAffinityService to find a key that is associated with the local node and each time a before a put command is done, store a key that would refer to the same local node and use a grouping API that would use this key to store the value in the local node.

      The code I have is the following:

       

      SimpleCache.java:

       

      import java.util.*;
      import java.util.concurrent.*;
      
      
      import org.infinispan.Cache;
      import org.infinispan.affinity.*;
      import org.infinispan.manager.*;
      
      
      
      
      //Used to store the key for the local node
      class locaddr{ static String nut; static String sim;}
      
      
      public class SimpleCache {
          public void start() throws Exception {
              EmbeddedCacheManager manager = new DefaultCacheManager("democluster.xml");
      
      
              Cache<String, String> cache = manager.getCache();
              String command = "";
              int ticketid = 1;
      
      
              Scanner scan = new Scanner(System.in);
              cache.start();
              manager.start();
      
      
              // Create the affinity service to find the Key for the manager
              KeyAffinityService keyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService(
                      cache,
                      (KeyGenerator)new RndKeyGenerator(),
                      Executors.newSingleThreadExecutor(),
                      100);
      
      
      
      
              //Find key associated with local node
              locaddr.nut = Objects.toString(keyAffinityService.getKeyForAddress(manager.getAddress()));
      
      
      
      
              log("Start of program.....");
              log("Input one of following commands:");
              log("book");
              log("pay");
              log("list");
              log("locaddr");
              log("quit");
              while (true){
                  command = scan.nextLine();
                  if (command.equals("book")) {
                      log("Enter name ");
      
      
                      String name = scan.nextLine();
      
      
                      locaddr.sim = Objects.toString(keyAffinityService.getCollocatedKey(locaddr.nut));
                      cache.put(Integer.toString(ticketid)+manager.getAddress().toString(),name);
      
      
                      log("Booked ticket " + name);
                      ticketid++;
                  }
                  else if (command.equals("pay")) {
      
      
                      log("Enter ticket number ");
                      String id = scan.nextLine();
      
      
                      log("Display ticket:"+cache.get(id));
      
      
                      String ticket = cache.remove(id);
                      log("Checked out ticket " + ticket);
                  }
                  else if (command.equals("list")) {
                      Set <String> set = cache.keySet();
                      for (String ticket: set) {
                          log(ticket + " " + cache.get(ticket));
                      }
                  }
                  else if (command.equals("quit")) {
                      cache.clear();
                      cache.stop();
                      manager.stop();
                      keyAffinityService.stop();
                      log("Bye");
                      break;
                  }
                  else if (command.equals("locaddr")) {
                      log("local key for manager is: "+locaddr.nut);
                      log("manager address is: " + manager.getAddress());
                  }
                  else {
                      log("Unknown command " + command);
                  }
              }
          }
      
      
          public static void main(String[] args) throws Exception{
              new SimpleCache().start();
          }
          public static void log(String s){
              System.out.println(s);
          }
      }
      
      
      

       

      democluster.xml:

       

      <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
                  xmlns="urn:infinispan:config:6.0">
      
      
          <global>
                  <transport>
                      <properties>
                          <property name="configurationFile" value="jgroups-tcp.xml" />
                      </properties>
                  </transport>
          </global>
      
          <default>
              <clustering mode="distributed" >
                  <sync/>
                  <hash numOwners="1" numSegments="100" capacityFactor="1">
                      <groups enabled="true">
                          <grouper class="KXGrouper"/>
                      </groups>
                  </hash>
              </clustering>
          </default>
      </infinispan>
      
      
      

       

      KXGrouper.java:


      import org.infinispan.distribution.group.Grouper;
      
      public class KXGrouper implements Grouper<String> {
      
      
          public String computeGroup(String key, String group) {
              String g = locaddr.sim;
              return g;
          }
      
      
          public Class<String> getKeyType() {
              return String.class;
          }
      }
      
      
      

       

      My implementation is based in a simple cache implementation example of infinispan. However I am having two main issues:

      1. When I run this code in separate JVMs, sometimes it will work but sometimes when I do a "book" command (which invokes the collocated key command and the cache put command), I get an error where it says the another node is no longer part of the cluster. The error looks like this:

       

      Exception in thread "main" java.lang.IllegalStateException: Address SRI-PC-4630 is no longer in the cluster
        at org.infinispan.affinity.KeyAffinityServiceImpl.getKeyForAddress(KeyAffinityServiceImpl.java:107)
        at org.infinispan.affinity.KeyAffinityServiceImpl.getCollocatedKey(KeyAffinityServiceImpl.java:91)
        at SimpleCache.start(SimpleCache.java:77)
        at SimpleCache.main(SimpleCache.java:125)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
      
      
      

       

      Where "Address SRI-PC-4630" would be the address of the manager in another JVM that is running. I have been looking online for a solution to this issue but no one seems to have a similar problem.

       

      2. If I do get it running and do a "book" and have a key/value stored in the local node, I cannot access it from any other node.

       

      I have been trying to fix this but to no avail and any advice or recommendation would be greatly appreciated.

       

      Thank you for your time