-
1. Re: Clustered temporary queue problem
timfox Apr 27, 2009 3:17 AM (in response to nyeste)Temporary queues can take a little time to propagate across the network.
Are you creating a temp queue then immediately sending a message with that queue as replyTo?
Are you creating a new temp queue for each message sent? (That would be a classic anti-pattern).
Have you tried sleeping a little while after creating the temp queue and sending a message? -
2. Re: Clustered temporary queue problem
nyeste Apr 27, 2009 4:01 AM (in response to nyeste)Hi Tim,
Thanks for the fast reply!
I've tried to wait 10 seconds or more after the creation of the temp queue.
The request/reply case, which is in the example, is working without any wait. So if I create a temp queue on the client side, create a message, set the replyTo property to the created queue and send immediately to the server. The client always got reply through the temp queue. In this case everything is working perfectly, so the server configuration seems good. I used this example only for testing, my goal is a bit different. Sorry, if it was confusing.
I would like to use temp queues without sending a request message to the server. Because on the server side, I have to process messages from other systems and then forward them to one of the temp queues. In this case I can't send a request message from the client. I store the name of the temp queue in a jboss cache and every time a message is received from one of the other systems, I want to create a producer to the temp queue and forward the message to the client. But the createQueue(tempQueueName) call returns in most cases the above mentioned error, however the client is running, there is no error on the client side and the consumer is on the temp queue. I've tried to wait a few seconds, but it didn't help. -
3. Re: Clustered temporary queue problem
timfox Apr 27, 2009 4:07 AM (in response to nyeste)I'm having trouble understanding what you're trying to achieve.
Perhaps you could post a test program? -
4. Re: Clustered temporary queue problem
nyeste Apr 27, 2009 5:21 AM (in response to nyeste)The client, which creates a temporary queue, registers it's name to jboss cache and listening for messages:
package hu.molaris.client; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; public class Client implements MessageListener { private static final String USER_NAME = "testUser"; public static void main(String[] args) throws Exception { new Client().run(); } @Override public void onMessage(Message message) { try { String receivedText = null; TextMessage receivedMessage = (TextMessage) message; receivedText = receivedMessage.getText(); System.out.println("Received message: " + receivedText); } catch (Exception e) { e.printStackTrace(); } } public void run() { Connection connection = null; Session session = null; Queue temporaryQueue; InitialContext ic; try { Properties props = new Properties(); props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200"); props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); ic = new InitialContext(props); ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory"); connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); temporaryQueue = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(temporaryQueue); /* * Registering temporary queue name into JBoss Cache */ ICommunicationService commService = (ICommunicationService) ic.lookup("CommunicationSession/remote"); commService.registerClient(Client.USER_NAME, temporaryQueue.getQueueName()); connection.start(); consumer.setMessageListener(this); System.out.println("Waiting for messages..."); for (int i = 0; i < 100; i++) { Thread.sleep(5000); System.out.println("tick."); } } catch (Exception e) { e.printStackTrace(); } finally { try { session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } System.out.println("Bye"); } }
The MDB which receives messages and forwards them to the temp queue:package hu.molaris.mdb; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/CommunicationQueue") }) public class MessageReceiverMDB implements MessageListener { private static final String USER_NAME = "testUser"; /** * @see MessageListener#onMessage(Message) */ public void onMessage(Message message) { Connection conn = null; Session session = null; try { TextMessage tm = (TextMessage) message; String text = tm.getText(); System.out.println("message " + text + " received"); InitialContext ic = new InitialContext(); ConnectionFactory cf = (ConnectionFactory) ic.lookup("java:/JmsXA"); conn = cf.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); /* * Reading temporary queue name from JBoss Cache */ CacheDataHandler cacheDataHandler = ComServCacheDataHandler.getInstance(); String queue = (String) cacheDataHandler.getData(MessageReceiverMDB.USER_NAME); Queue forwardTo = session.createQueue(queue); MessageProducer producer = session.createProducer(forwardTo); TextMessage forward = session.createTextMessage("Message from device: " + text); producer.send(forward); producer.close(); ic.close(); } catch (Exception e) { e.printStackTrace(); System.out.println("The Message Driven Bean failed!"); } finally { try { session.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
The server, which represents the original sender of the messages:package hu.molaris.sender; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; public class Sender { private static final String QUEUE_NAME = "queue/CommunicationQueue"; public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Sender().run(); } } public void run() { Connection connection = null; Session session = null; MessageProducer sender; InitialContext ic; try { Properties props = new Properties(); props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200"); props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); ic = new InitialContext(props); ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory"); Queue queue = (Queue) ic.lookup(Sender.QUEUE_NAME); System.out.println("Queue " + Sender.QUEUE_NAME + " exists"); connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); sender = session.createProducer(queue); TextMessage message = session.createTextMessage("test message"); connection.start(); sender.send(message); System.out.println("The " + message.getText() + " message was successfully sent to the " + queue.getQueueName() + " queue"); } catch (Exception e) { e.printStackTrace(); } finally { try { session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
If the cluster consist of 1 node, there is no problem. But if it contains 2 or more, I get the error.
Thanks! -
5. Re: Clustered temporary queue problem
nyeste May 21, 2009 3:23 AM (in response to nyeste)Does anybody know, whether it is a possible way of using temporary queues/topics?
Thanks a lot!