3 Replies Latest reply on Jun 16, 2014 12:21 PM by Andy Taylor

    Is it possible to send/receive message over jms topic defined in hornetq-jms.xml using AMQP 1.0 client?

    Prashant Khanal Newbie

      Hello Team,

       

      Is it possible to use jms topic defined in hornetq-jms.xml from AMQP 1.0 client?

       

      I have a queue and topic defined in hornetq-jms.xml as shown below -

       

      <queue name="myqueue">
          <entry name="myqueue"/>
       </queue>
      
       <topic name="mytopic">
           <entry name="mytopic"/>
       </topic>
      

      I used the proton-j sample provided with the HornetQ server distribution v2.4.0 final to send/receive message on the topic. I used the prefix jms.topic. to access the topic defined. However, the receiver is not able to receive the message doing so. If I change the code to use queue instead using jms.queue. prefix, it works. Am I missing something or accessing the topic defined in hornetq-jms.xml from AMQP 1.0 client is not supported? Eventually my plan is to use the topic defined in hornetq-jms.xml using Qpid JMS client which provides AMQP 1.0 fluent JMS implementation.

       

      Sample Code

       

      import java.util.ArrayList;
      import java.util.List;
      
      import org.apache.qpid.amqp_1_0.client.Connection;
      import org.apache.qpid.amqp_1_0.client.Message;
      import org.apache.qpid.amqp_1_0.client.Receiver;
      import org.apache.qpid.amqp_1_0.client.Sender;
      import org.apache.qpid.amqp_1_0.client.Session;
      import org.apache.qpid.amqp_1_0.type.Section;
      import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
      import org.apache.qpid.amqp_1_0.type.messaging.Header;
      
      public class ProtonJExample
      {
         public static void main(String[] args) throws Exception
         {
            runExample();
         }
         
         private static void runExample() throws Exception
         {
            Connection connection = null;
      
            try
            {
               // Step 1. Create an amqp qpid 1.0 connection
               connection= new Connection("localhost", 5673, "guest", "guest");
      
               // Step 2. Create a session
               Session session = connection.createSession();
      
               // Step 3. Create a sender
               // Sender sender = session.createSender("jms.queue.myqueue"); // This works!
               Sender sender = session.createSender("jms.topic.mytopic");
               
               List<Section> sections = new ArrayList<Section>();
               Header header = new Header();
               
               
               Message message = new Message("hello world");
               
               // Step 4. send a simple message
               sender.send(message);
      
               // Step 5. create a moving receiver, this means the message will be removed from the queue
               // Receiver rec = session.createMovingReceiver("jms.queue.myqueue"); // This works!
               Receiver rec = session.createMovingReceiver("jms.topic.mytopic");
               
      
               // Step 6. set some credit so we can receive
               rec.setCredit(UnsignedInteger.valueOf(1), false);
               
               // Step 7. receive the simple message
               Message m = rec.receive(5000);
               System.out.println("Header = " + m.getPayload().get(0));
               System.out.println("Message Annotations = " + m.getPayload().get(1));
               System.out.println("Properties = " + m.getPayload().get(2));
               System.out.println("Application Properties = " + m.getPayload().get(3));
               System.out.println("Amqp Value = " + m.getPayload().get(4));
               System.out.println("Footer = " + m.getPayload().get(5));
               
               
               
      
               // Step 8. acknowledge the message
               rec.acknowledge(m);
            }
            finally
            {
               if(connection != null)
               {
                  // Step 9. close the connection
                  connection.close();
               }
            }
         }
      }