5 Replies Latest reply on May 21, 2009 3:23 AM by nyeste

    Clustered temporary queue problem

    nyeste

      Hi,

      I have problem using temporary queues in a clustered environment.
      - jboss-messaging-1.4.2.GA-SP1
      - clustered post office
      - jboss-remoting 2.2.2.SP11
      - jboss-4.2.3.GA cluster

      I want my client application to create a temporary queue, send it's name to the server and receive messages from the server side through that temp queue. The problem is, that about 70% of the messages are got lost because of this: "There is no administratively defined queue with name:i1-x0eldltf-1-ftnkdltf-02c0n3-t1go4c5". In the other 30%, the messages can be consumed without any problem. It doesn't matter how many members are in the server cluster, except the case, that there is only 1, I can see the same behavior (sometimes the message arrives to the client, but mostly not) on all nodes.
      The ejb3mdb example works perfectly, so the MDB always can send a reply to the client created temporary queue, but that's not what I need. I want to receive messages from a stateless session bean or a MDB receiving messages from other systems.
      I need temporary queue, because there are many users and I can't create a queue for each of them. And another reason is, that the queue should not be available while a user is offline.

      If there is only 1 node in the cluster, the desired scenario works fine, like the example, no exception in the log, but in production I need at least 5 nodes.

      Is it possible to use temporary queues like this? Is it a clustering problem or it shouldn't work at all?

      Thanks a lot!

      Zsolt

        • 1. Re: Clustered temporary queue problem
          timfox

          Temporary queues can take a little time to propagate across the network.

          Are you creating a temp queue then immediately sending a message with that queue as replyTo?

          Are you creating a new temp queue for each message sent? (That would be a classic anti-pattern).

          Have you tried sleeping a little while after creating the temp queue and sending a message?

          • 2. Re: Clustered temporary queue problem
            nyeste

            Hi Tim,

            Thanks for the fast reply!

            I've tried to wait 10 seconds or more after the creation of the temp queue.

            The request/reply case, which is in the example, is working without any wait. So if I create a temp queue on the client side, create a message, set the replyTo property to the created queue and send immediately to the server. The client always got reply through the temp queue. In this case everything is working perfectly, so the server configuration seems good. I used this example only for testing, my goal is a bit different. Sorry, if it was confusing.

            I would like to use temp queues without sending a request message to the server. Because on the server side, I have to process messages from other systems and then forward them to one of the temp queues. In this case I can't send a request message from the client. I store the name of the temp queue in a jboss cache and every time a message is received from one of the other systems, I want to create a producer to the temp queue and forward the message to the client. But the createQueue(tempQueueName) call returns in most cases the above mentioned error, however the client is running, there is no error on the client side and the consumer is on the temp queue. I've tried to wait a few seconds, but it didn't help.

            • 3. Re: Clustered temporary queue problem
              timfox

              I'm having trouble understanding what you're trying to achieve.

              Perhaps you could post a test program?

              • 4. Re: Clustered temporary queue problem
                nyeste

                The client, which creates a temporary queue, registers it's name to jboss cache and listening for messages:

                package hu.molaris.client;
                
                import java.util.Properties;
                
                import javax.jms.Connection;
                import javax.jms.ConnectionFactory;
                import javax.jms.Message;
                import javax.jms.MessageConsumer;
                import javax.jms.MessageListener;
                import javax.jms.Queue;
                import javax.jms.Session;
                import javax.jms.TextMessage;
                import javax.naming.InitialContext;
                
                public class Client implements MessageListener {
                 private static final String USER_NAME = "testUser";
                
                 public static void main(String[] args) throws Exception {
                 new Client().run();
                 }
                
                 @Override
                 public void onMessage(Message message) {
                 try {
                 String receivedText = null;
                 TextMessage receivedMessage = (TextMessage) message;
                 receivedText = receivedMessage.getText();
                
                 System.out.println("Received message: " + receivedText);
                 } catch (Exception e) {
                 e.printStackTrace();
                 }
                 }
                
                 public void run() {
                 Connection connection = null;
                 Session session = null;
                 Queue temporaryQueue;
                 InitialContext ic;
                
                 try {
                 Properties props = new Properties();
                 props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                 props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200");
                 props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
                
                 ic = new InitialContext(props);
                 ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory");
                
                 connection = cf.createConnection();
                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
                 temporaryQueue = session.createTemporaryQueue();
                 MessageConsumer consumer = session.createConsumer(temporaryQueue);
                
                 /*
                 * Registering temporary queue name into JBoss Cache
                 */
                 ICommunicationService commService = (ICommunicationService) ic.lookup("CommunicationSession/remote");
                 commService.registerClient(Client.USER_NAME, temporaryQueue.getQueueName());
                
                 connection.start();
                 consumer.setMessageListener(this);
                
                 System.out.println("Waiting for messages...");
                 for (int i = 0; i < 100; i++) {
                 Thread.sleep(5000);
                 System.out.println("tick.");
                 }
                 } catch (Exception e) {
                 e.printStackTrace();
                 } finally {
                 try {
                 session.close();
                 connection.close();
                 } catch (Exception e) {
                 e.printStackTrace();
                 }
                 }
                 System.out.println("Bye");
                 }
                
                }
                


                The MDB which receives messages and forwards them to the temp queue:
                package hu.molaris.mdb;
                
                import javax.ejb.ActivationConfigProperty;
                import javax.ejb.MessageDriven;
                import javax.jms.Connection;
                import javax.jms.ConnectionFactory;
                import javax.jms.JMSException;
                import javax.jms.Message;
                import javax.jms.MessageListener;
                import javax.jms.MessageProducer;
                import javax.jms.Queue;
                import javax.jms.Session;
                import javax.jms.TextMessage;
                import javax.naming.InitialContext;
                
                @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                 @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/CommunicationQueue") })
                public class MessageReceiverMDB implements MessageListener {
                
                 private static final String USER_NAME = "testUser";
                
                 /**
                 * @see MessageListener#onMessage(Message)
                 */
                 public void onMessage(Message message) {
                 Connection conn = null;
                 Session session = null;
                 try {
                 TextMessage tm = (TextMessage) message;
                
                 String text = tm.getText();
                 System.out.println("message " + text + " received");
                
                 InitialContext ic = new InitialContext();
                 ConnectionFactory cf = (ConnectionFactory) ic.lookup("java:/JmsXA");
                
                 conn = cf.createConnection();
                 conn.start();
                 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
                
                 /*
                 * Reading temporary queue name from JBoss Cache
                 */
                 CacheDataHandler cacheDataHandler = ComServCacheDataHandler.getInstance();
                 String queue = (String) cacheDataHandler.getData(MessageReceiverMDB.USER_NAME);
                
                 Queue forwardTo = session.createQueue(queue);
                 MessageProducer producer = session.createProducer(forwardTo);
                 TextMessage forward = session.createTextMessage("Message from device: " + text);
                
                 producer.send(forward);
                 producer.close();
                
                 ic.close();
                 } catch (Exception e) {
                 e.printStackTrace();
                 System.out.println("The Message Driven Bean failed!");
                 } finally {
                 try {
                 session.close();
                 conn.close();
                 } catch (JMSException e) {
                 e.printStackTrace();
                 }
                 }
                 }
                }
                


                The server, which represents the original sender of the messages:
                package hu.molaris.sender;
                
                import java.util.Properties;
                
                import javax.jms.Connection;
                import javax.jms.ConnectionFactory;
                import javax.jms.JMSException;
                import javax.jms.MessageProducer;
                import javax.jms.Queue;
                import javax.jms.Session;
                import javax.jms.TextMessage;
                import javax.naming.InitialContext;
                
                public class Sender {
                 private static final String QUEUE_NAME = "queue/CommunicationQueue";
                
                 public static void main(String[] args) {
                 for (int i = 0; i < 10; i++) {
                 new Sender().run();
                 }
                 }
                
                 public void run() {
                 Connection connection = null;
                 Session session = null;
                 MessageProducer sender;
                 InitialContext ic;
                
                 try {
                 Properties props = new Properties();
                 props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                 props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200");
                 props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
                
                 ic = new InitialContext(props);
                
                 ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory");
                 Queue queue = (Queue) ic.lookup(Sender.QUEUE_NAME);
                 System.out.println("Queue " + Sender.QUEUE_NAME + " exists");
                
                 connection = cf.createConnection();
                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 sender = session.createProducer(queue);
                
                 TextMessage message = session.createTextMessage("test message");
                
                 connection.start();
                 sender.send(message);
                
                 System.out.println("The " + message.getText() + " message was successfully sent to the " + queue.getQueueName() + " queue");
                 } catch (Exception e) {
                 e.printStackTrace();
                 } finally {
                 try {
                 session.close();
                 connection.close();
                 } catch (JMSException e) {
                 e.printStackTrace();
                 }
                 }
                 }
                }
                


                If the cluster consist of 1 node, there is no problem. But if it contains 2 or more, I get the error.

                Thanks!


                • 5. Re: Clustered temporary queue problem
                  nyeste

                  Does anybody know, whether it is a possible way of using temporary queues/topics?

                  Thanks a lot!