3 Replies Latest reply on Sep 22, 2008 8:04 AM by lovelyliatroim

    State Transfer

    lovelyliatroim

      Hi Guys,
      Im just wondering whats the best way to go about this problem that i have, i have on one machine which receives a feed X, when the feed X stops and starts feeding into another node a message is sent to all members that this node is the new source for feed X. What i want to do is however before the new node takes over he should retrieve the state from the old node. Question is what is the best way to go about this?

      I have multiple feeds going into the one cache, so its not like i can configure the cache to retrieve state on start up,cache could already be started and receiving a feed Y already.

      I currently am using the TCPCacheServer to handle remote requests for this data and was looking at the loadState(FQN,ObjectOutputStream) as a way to retrieve the data from the old node but so far have had no joy in getting it to work.

      First off i dont know if it will do what i want. This is the quote from the docs


      Fetches a portion of the state for this cache from secondary storage (disk, database) and writes it to a provided ObjectOutputStream. State written to the provided ObjectOutputStream parameter is used for activation of a portion of a new CacheImpl instance.

      Now the bit about fetches from secondary storage, so if i have a TcpDelegatingCacheLoader and call this, does it take the data from memory or from secondary storage? (Even though i dont have a cacheloader configured that persists cache data to secondary storage.

      Ok if the above does what i want it to do then how do i get it to work, trouble i have is creating the ObjectOutPutStream, what should it be? Tried with the following but no joy

      try {
       ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
       MarshalledValueOutputStream os = new MarshalledValueOutputStream(baos);
       String path = "/feed/feedA";
       loader.loadState(Fqn.fromString(path), os);
       state = baos.toByteArray();
       watch.stop();
       if(log.isDebugEnabled()){
       log.debug("It took "+watch.toString() + " to load state from remote cache.Byte size = "+state.length);
       }
       } catch (Exception e) {
       log.error("Problem trying to load state from remote server",e);
       }
      


      So how do you create the ObjectOutputStream so it can be written to remotely?? If i cant use the TCPDelegatingCaceLoader to transfer the old state to the new node then whats the best approach you would suggest.

      Thanks for any help,
      LL


        • 1. Re: State Transfer
          lovelyliatroim

          Just an update

          Other option Im looking at is the JGroups getState and setState as a way to transfer a branch.

          Im as far as trying to stream a node, but "Node" is not serializable, so have started to look at the org.jboss.cache.statetransfer.StateTransferManager and how that does it.

          • 2. Re: State Transfer
            manik

            The STM uses a NodeData object, which is Externalizable.

            What you may want to do is to use Regions for each feed, so that each region can be activated/deactivated independent of one another. Each region activation will cause a state transfer for that region.

            • 3. Re: State Transfer
              lovelyliatroim

              Ok i have got it to work through getState and setState, however I would nearly call it a hack version but it works. I have tried to re-use what is already in JBoss Cache to do the state transfer, now whether it is a good idea or not to reuse im not sure. How often does this area change?? Would I be looking at re-test for every minor version upgrade??

              Anyways here it is


               public void setState(String path, InputStream istream) {
               log.info("About to set state in real time cache for path ="+path);
               CacheSPI cache = (CacheSPI)CacheManager.getCacheById("realTimeCache");
               if(cache == null){
               log.error("Cant set state for real time cache because it has not been configured for this node!! Looking to import Path="+path);
               return;
               }
               MarshalledValueInputStream in = null;
               try
               {
               in = new MarshalledValueInputStream(istream);
               Fqn subroot = Fqn.fromString(path);
               Configuration configuration = new Configuration();
               configuration.setFetchInMemoryState(true);
               StateTransferManager stateTransferManager = new StateTransferManager(cache);
               stateTransferManager.injectDependencies(cache, cache.getMarshaller(), cache.getRegionManager(), configuration,new NodeBasedLockManager());
               stateTransferManager.setState(in, subroot);
               }
               catch (Exception e)
               {
               log.error("Failed to upload state into real time cache for path="+path,e);
               }
               finally
               {
               Util.close(in);
               }
              }
              
               public void getState(String path, OutputStream ostream) {
               log.info("Server has been asked for its real time state for path "+path);
              
               CacheSPI cache = (CacheSPI)CacheManager.getCacheById("realTimeCache");
               if(cache == null){
               log.error("Cant ask for state when realTimeCache is not configured for this node. Path looking for "+path);
               return;
               }
              
               String sourceRoot = path;
               MarshalledValueOutputStream out = null;
               try
               {
               out = new MarshalledValueOutputStream(ostream);
              
               Configuration configuration = new Configuration();
               configuration.setFetchInMemoryState(true);
               StateTransferManager stateTransferManager = new StateTransferManager(cache);
               stateTransferManager.injectDependencies(cache, cache.getMarshaller(), cache.getRegionManager(), configuration,new NodeBasedLockManager());
               stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
               //cache.getMarshaller().objectToObjectStream(stateTransferManager, out,Fqn.fromString(sourceRoot) );
              
              
               }catch (Throwable e)
               {
               log.error("Failed to replicate state transfer for path "+path,e);
               }
               finally
               {
               Util.close(out);
               }
              }
              
              


              Now what im most interested in is this

              stateTransferManager.injectDependencies(cache, cache.getMarshaller(), cache.getRegionManager(), configuration,new NodeBasedLockManager());
              

              How does these parameters inputted look to you guys? In particular the NodeBasedLockManager, is there any way to find out or get the LockManager of a cache without having to do what i did above? Any implications of using the NodebasedLockManager that i should look out for?


              Not the happiest about doing it this way but it works, thoughts on this approach are welcome. Will look into the region based activation and see what it offers.

              Thanks,
              LL