4 Replies Latest reply on Oct 27, 2009 3:54 AM by ataylor

    storageManager.afterReplicated(..) and when and how to use i

    ataylor

      This is really to do with how i should use this when saving Grouping information when replication is configured.

      Typically this is used as in serverSessionImpl:sendResponse(..),

      if (storageManager.isReplicated())
       {
       storageManager.afterReplicated(new Runnable()
       {
       public void run()
       {
       doSendResponse(confirmPacket, response, flush, closeChannel);
       }
      
       });
       storageManager.completeReplication();
       }
      


      This is quite simple, the sending of the reply won't happen until the storage manager has replicated any work that it needs to. This is fine as ServerSession is the entry point for any work that needs doing on a thread. This free's up the thread to be used elsewhere.

      However, In my scenario i do the following. in pseodo code something like this at a bindings level:

       Response resp = groupingHandler.propose(new Proposal(fullID, binding.getClusterName()));
      
       if (binding != null)
       {
       binding.route(message, context);
       }
       else
       {
       throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
       }
      


      When the propose is called the method will block until a response is received, If the the handler is the master node then at this point the group binding is saved to the journal. This is where I am confused, I can't route the message until a response is returned, The grouping handler cant delegate the work to a different thread as there's nothing it can do, the routing of the message has to happen on the same thread as there is business logic to handle such things as the exception being thrown.

      Maybe I am misunderstanding how this should work, thoughts?

        • 1. Re: storageManager.afterReplicated(..) and when and how to u
          clebert.suconic

          On PostOfficeimpl::processRoute, there is some code that will only add to the queue, after the storage is processed. If you just need to deliver after persistence, that code is already done.

          But if your process is changing routing tables such as creating queues, etc, on this case you need to process the route only after the replication.. you should do something like this:


          if (storage.isReplicated())
          {
           storage.afterReplicated(new runnable... etc... .doProcessroute());
          }
          else
          {
           doProcessRoute();
          }
          
          
          
          doProcessRoute()
          {
           binding.route(message, context);
          }



          You should probably test for the exceptions before donig this call.



          So, the key question here is... you need to deliver after replicated, or route after replicated? That depends on the use case.

          • 2. Re: storageManager.afterReplicated(..) and when and how to u
            clebert.suconic

            AddtoTheQueue implies deliver BTW.

            • 3. Re: storageManager.afterReplicated(..) and when and how to u
              clebert.suconic

              shouldn't this call on LocalGroupHnadler be blocked somehow?

               public Response propose(Proposal proposal) throws Exception
               {
               if(proposal.getClusterName() == null)
               {
               GroupBinding original = map.get(proposal.getGroupId());
               return original == null?null:new Response(proposal.getGroupId(), original.getClusterName());
               }
               GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
               if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
               {
               groupBinding.setId(storageManager.generateUniqueID());
               groupMap.put(groupBinding.getClusterName(), groupBinding);
               storageManager.addGrouping(groupBinding);
               return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
               }
               else
               {
               groupBinding = map.get(proposal.getGroupId());
               return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
               }
               }
              



              Ok, you're using a ConcurrentMap. That's not a problem. I'm referring to the journal operation. If you crashed here, you could already be already sending messages to the GroupBinding before the data is sync on disk.


              It would be a rare race I know.. but it could happen.

              • 4. Re: storageManager.afterReplicated(..) and when and how to u
                ataylor

                 

                shouldn't this call on LocalGroupHnadler be blocked somehow?


                If you mean should i wait until the stotage managert call has completed until routing then yes, thats what i need to add. The point i was making is that if i route the messages in a different thread using after afterReplicated then the routing may get called after the server sesion calling thread has completed allowing another cann which may deliver a message or delete a binding or whatever. for now I will make this blocking and review later.