Temporary resposne queue not receiving messages from other nodes
kevin-ncsu Jan 11, 2011 3:24 PMenvironment: JBoss 6.0.0Final, 2 nodes on one machine
I have a permanent Queue which I use to distribute work to worker MDBs across the cluster. The Master that creates these messages creates a temporary response queue for these workers to send updates to and sets this queue as the JMSReplyTo for each message sent out to the workers. This temporary response queue is only created once and the connection, session, queue, and consumer are kept to be re-used.
When my application runs I can see that messages go out to both nodes, the master begins listening for responses, each message is processed by a worker, and then sends the response to the response queue. The master only receives responses from workers on the local node, the other workers seem to successfully create a producer and send a message to the destination but it is never received by the master's consumer so it just sits there waiting for the other half of the results until it times out.
I've included trimmed down verisons of the Master and Workers below:
Master
public class Master{ private ConnectionFactory m_connectionFactory; private TopicConnection topicConnection; private Session basicSession; private Queue responseQueue; private Queue workQueue; private MessageProducer workProducer; //Called once when created private void openQueueConsumer() { m_connectionFactory = (ConnectionFactory) ic .lookup("XAConnectionFactory"); topicConnection = ((TopicConnectionFactory) m_connectionFactory) .createTopicConnection(); basicSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Temporary Queue for responses from workers responseQueue = basicSession.createTemporaryQueue(); resultConsumer = basicSession.createConsumer(responseQueue); // Queue for creating workers workQueue = (Queue) ic.lookup("queue/workQueue"); workProducer = basicSession.createProducer(workQueue); } // Starts workers private void createWorkers(int numWorkers) throws JMSException { for(int i = 0; i < numWorkers; i++){ WorkerMessage customMsg = new WorkerMessage(i); ObjectMessage msg = topicSession.createObjectMessage(customMsg); msg.setJMSReplyTo(responseQueue); workProducer.send(msg); } awaitReply(numWorkers); } private void awaitReply(int numWorkers) throws JMSException { int numResultsSoFar = 0; // Wait for response from each worker while (numResultsSoFar < numWorkers) { Message msg = resultConsumer.receive(); CustomResponseMessage response = (CustomResponseMessage) ((ObjectMessage) msg) .getObject(); //Do Stuff with Response numResultsSoFar++; log.info("worker " + response.getWorkerID() + " initialized on VM " + response.getVMID()); log.info("recieved " + numResultsSoFar + " of " + numWorkers + " intiializations"); } } }
Worker MDB
@MessageDriven(mappedName = "queue/workQueue", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/workQueue"), @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") }) public class Worker implements MessageListener { private static Logger log = Logger.getLogger(Worker.class); @Resource private MessageDrivenContext context; @Resource(mappedName="XAConnectionFactory") private ConnectionFactory connectionFactory; private Connection connection; @PostConstruct public void init() { try { connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { log.error("Exception while initializing Worker", e); } } @PreDestroy public void destroy() { try { connection.close(); } catch (JMSException e) { log.error("Exception while initializing Worker", e); } } /** * @see MessageListener#onMessage(Message) */ public void onMessage(Message msg) { WorkerMessage customMsg = null; try { customMsg = (WorkerMessage) ((ObjectMessage) msg) .getObject(); log.info("Staring work on with Id: " + customMsg.getWorkerID()); // Do Work sendResponseMessage(msg.getJMSReplyTo(), customMsg.getWorkerID()); } catch (JMSException e) { log.error("Error while handling worker Message", e); log.error("Set Rollback flag"); context.setRollbackOnly(); } } private void sendResponseMessage(Destination destination, GUID workerID) throws JMSException { Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); CustomResponseMessage response = new CustomResponseMessage(); response.setWorkerID(workerID); ObjectMessage results = session.createObjectMessage(); results.setObject(response); producer.send(results); } finally { if (session != null) { try { session.close(); } catch (JMSException e) { log.error("Exception while closing session", e); } } } } }