4 Replies Latest reply on Jan 12, 2011 10:42 AM by Kevin Ward

    Temporary resposne queue not receiving messages from other nodes

    Kevin Ward Newbie

      environment: JBoss 6.0.0Final, 2 nodes on one machine

       

      I have a permanent Queue which I use to distribute work to worker MDBs across the cluster.  The Master that creates these messages creates a temporary response queue for these workers to send updates to and sets this queue as the JMSReplyTo for each message sent out to the workers.  This temporary response queue is only created once and the connection, session, queue, and consumer are kept to be re-used.

       

      When my application runs I can see that messages go out to both nodes, the master begins listening for responses, each message is processed by a worker, and then sends the response to the response queue. The master only receives responses from workers on the local node, the other workers seem to successfully create a producer and send a message to the destination but it is never received by the master's consumer so it just sits there waiting for the other half of the results until it times out.

       

      I've included trimmed down verisons of the Master and Workers below:

       

      Master

      public class Master{
      
      private ConnectionFactory m_connectionFactory;
      private TopicConnection topicConnection;
      private Session basicSession;
      
      private Queue responseQueue;
      
      private Queue workQueue;
      private MessageProducer workProducer;
      
      
      //Called once when created
      private void openQueueConsumer() {
      m_connectionFactory = (ConnectionFactory) ic
                          .lookup("XAConnectionFactory");
      
                  topicConnection = ((TopicConnectionFactory) m_connectionFactory)
                          .createTopicConnection();
      
                  basicSession = topicConnection.createSession(false,
                          Session.AUTO_ACKNOWLEDGE);
      
                  // Temporary Queue for responses from workers
                  responseQueue = basicSession.createTemporaryQueue();
                  resultConsumer = basicSession.createConsumer(responseQueue);
      
                  // Queue for creating workers
                  workQueue = (Queue) ic.lookup("queue/workQueue");
                  workProducer = basicSession.createProducer(workQueue);
      }
      
      // Starts workers
      private void createWorkers(int numWorkers) throws JMSException {
           for(int i = 0; i < numWorkers; i++){
                WorkerMessage customMsg = new WorkerMessage(i);
      
                ObjectMessage msg = topicSession.createObjectMessage(customMsg);
                msg.setJMSReplyTo(responseQueue);
                workProducer.send(msg);
           }
           awaitReply(numWorkers);
      }
      
      private void awaitReply(int numWorkers) throws JMSException {
              int numResultsSoFar = 0;
      
              // Wait for response from each worker
              while (numResultsSoFar < numWorkers) {
                  Message msg = resultConsumer.receive();
                 CustomResponseMessage response = (CustomResponseMessage) ((ObjectMessage) msg)
                          .getObject();
      
                //Do Stuff with Response
      
                  numResultsSoFar++;
                  log.info("worker " + response.getWorkerID() + " initialized on VM " + response.getVMID());
                  log.info("recieved " + numResultsSoFar + " of " + numWorkers + " intiializations");
              }
          }
      
      }
      

       

       

       

      Worker MDB

      @MessageDriven(mappedName = "queue/workQueue", activationConfig = {
              @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
              @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/workQueue"),
              @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")
      })    
      public class Worker implements MessageListener {
      
          private static Logger log = Logger.getLogger(Worker.class);
      
      
          @Resource
          private MessageDrivenContext context;
      
          @Resource(mappedName="XAConnectionFactory")
          private ConnectionFactory connectionFactory;
      
          private Connection connection;
      
          @PostConstruct
          public void init() {
              try {
                  connection = connectionFactory.createConnection();
                  connection.start();
              }
              catch (JMSException e) {
                  log.error("Exception while initializing Worker", e);
              }
      
          }
      
          @PreDestroy
          public void destroy() {
              try {
                  connection.close();
              }
              catch (JMSException e) {
                  log.error("Exception while initializing Worker", e);
              }
          }
      
          /**
           * @see MessageListener#onMessage(Message)
           */
          public void onMessage(Message msg) {
              WorkerMessage customMsg = null;
              try {
                  customMsg = (WorkerMessage) ((ObjectMessage) msg)
                          .getObject();
                  log.info("Staring work on with Id: " + customMsg.getWorkerID());
      
                  // Do Work     
      
                  sendResponseMessage(msg.getJMSReplyTo(), customMsg.getWorkerID());
              }
              catch (JMSException e) {
                  log.error("Error while handling worker Message", e);       
                  log.error("Set Rollback flag");
                  context.setRollbackOnly();
              }
          }
      
      
      
          private void sendResponseMessage(Destination destination, GUID workerID) throws JMSException {
              Session session = null;
              try {
                  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                  MessageProducer producer = session.createProducer(destination);
      
                  CustomResponseMessage response = new CustomResponseMessage();
                  response.setWorkerID(workerID);
      
                  ObjectMessage results = session.createObjectMessage();
                  results.setObject(response);
                  producer.send(results);
              } finally {
                  if (session != null) {
                      try {
                          session.close();
                      }
                      catch (JMSException e) {
                          log.error("Exception while closing session", e);
                      }
                  }
              }
          }
      
      }
      
        • 1. Temporary resposne queue not receiving messages from other nodes
          Tim Fox Master

          This was another thread on this and possibly a JIRA some months back.

          • 2. Re: Temporary resposne queue not receiving messages from other nodes
            Kevin Ward Newbie

            Thanks for the response Tim,

             

            I believe you are referring to https://issues.jboss.org/browse/HORNETQ-236 .  I read through this issue (and every message with "temporary queue" in the last year)  and while this one does seem to be along the same lines of my problem most of the discussion related to it seemed to be about the anti-pattern of creating a new consumers, connections and sessions each time which is why I made sure to point out that I'm keeping them for re-use.

             

            Maybe you could expand on or help me better udnerstand the one section that I think does apply:

            Tim Fox wrote:

             

            Now, let's move on to sending the response messages back. When you consume a response message on, say node A, you pick a connection which has a 50% probability of *not* being the node where your temporary queue is. In that case the message gets sent to the wrong node, where it is immediately routed back to the node it just came from (this is a completely unnecessary extra 2 network hops.), assuming the information about the temporary queue has propagated that far (remember this is asynchronous). If the information about the temporary queue hasn't propagated that far yet, it won't know where to route it.

             

            Regarding the many connections and consumers - having multiple consumers on a single queue that just send back a response is not going to help performance - if anything it will reduce performance due to extra context switching. There is little or no processing done in the onMessage.

             

            The part I'm having trouble understanding is why when I pass the temporary queue destination in the JMSReplyTo the MDB it would then only have a 50% chance of getting that destination correct when it sends a response.  Is creating a temporary queue for responses in a clustered environment simply not a design option given HornetQ's implementation and I should switch to a shared permanent response queue with filtering or is there something I need to change with my setup in order to make those connections to the temporary queue from remote nodes correctly?

            • 4. Re: Temporary resposne queue not receiving messages from other nodes
              Kevin Ward Newbie

              Thanks for pointing me to the right thread.  I've tried to replicate the described work-around a few different ways but so far have been unsuccessful so I must be misunderstanding something. 

               

              Yong Zhang wrote:

               

              Thanks for the information and I got a good news for you: I tried another workaround by including address "jms.tempqueue" into cluster connection, and it worked well.

               

              I'm unsure what he means here, but I tried the following configurations:

               

               

               <cluster-connections>
                    <cluster-connection name="myTopics"> 
                         <address>jms.topic</address>      
                         <discovery-group-ref discovery-group-name="dg-group1"/>
                    </cluster-connection>
                    <cluster-connection name="myQueues"> 
                         <address>jms.queue</address>      
                         <discovery-group-ref discovery-group-name="dg-group1"/>
                    </cluster-connection>
              
              </cluster-connections>
              
              

               

              I thought the above would cause the temporary queues to not have the round-robin message redistribution load balancing occur on them but it causes the remote nodes to not be able to look up my temporary queue.  Instead it throws an error saying it cannot be found when a remote node tries tor respond to the destination.

               

              Inserting a connection for <address>jms.tempqueue</address> as he describes doesn't change anything from the original issue as it's already clustered under the default <address>jms</address> entry and replacing that entry completely would cause my other queues to not properly cluster. 

               

              Tim Fox wrote:

               

              If you want to prevent any message routing or redistribution for certain queues, you could make sure each "local" queue has a different name on each node. In this case the system won't attempt to load balance messages for them across the nodes.

              If I try to go this route how do I specify that a temporary queue should have a different name on each node? I thoguht temporary queue naming was handled by the server.

               

              Sorry if I'm missing something easy here, and thank you for taking the time to help me out!