Is it possible to send/receive message over jms topic defined in hornetq-jms.xml using AMQP 1.0 client?
pkhanal Jun 13, 2014 1:53 PMHello Team,
Is it possible to use jms topic defined in hornetq-jms.xml from AMQP 1.0 client?
I have a queue and topic defined in hornetq-jms.xml as shown below -
<queue name="myqueue"> <entry name="myqueue"/> </queue> <topic name="mytopic"> <entry name="mytopic"/> </topic>
I used the proton-j sample provided with the HornetQ server distribution v2.4.0 final to send/receive message on the topic. I used the prefix jms.topic. to access the topic defined. However, the receiver is not able to receive the message doing so. If I change the code to use queue instead using jms.queue. prefix, it works. Am I missing something or accessing the topic defined in hornetq-jms.xml from AMQP 1.0 client is not supported? Eventually my plan is to use the topic defined in hornetq-jms.xml using Qpid JMS client which provides AMQP 1.0 fluent JMS implementation.
Sample Code
import java.util.ArrayList; import java.util.List; import org.apache.qpid.amqp_1_0.client.Connection; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.client.Session; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.messaging.Header; public class ProtonJExample { public static void main(String[] args) throws Exception { runExample(); } private static void runExample() throws Exception { Connection connection = null; try { // Step 1. Create an amqp qpid 1.0 connection connection= new Connection("localhost", 5673, "guest", "guest"); // Step 2. Create a session Session session = connection.createSession(); // Step 3. Create a sender // Sender sender = session.createSender("jms.queue.myqueue"); // This works! Sender sender = session.createSender("jms.topic.mytopic"); List<Section> sections = new ArrayList<Section>(); Header header = new Header(); Message message = new Message("hello world"); // Step 4. send a simple message sender.send(message); // Step 5. create a moving receiver, this means the message will be removed from the queue // Receiver rec = session.createMovingReceiver("jms.queue.myqueue"); // This works! Receiver rec = session.createMovingReceiver("jms.topic.mytopic"); // Step 6. set some credit so we can receive rec.setCredit(UnsignedInteger.valueOf(1), false); // Step 7. receive the simple message Message m = rec.receive(5000); System.out.println("Header = " + m.getPayload().get(0)); System.out.println("Message Annotations = " + m.getPayload().get(1)); System.out.println("Properties = " + m.getPayload().get(2)); System.out.println("Application Properties = " + m.getPayload().get(3)); System.out.println("Amqp Value = " + m.getPayload().get(4)); System.out.println("Footer = " + m.getPayload().get(5)); // Step 8. acknowledge the message rec.acknowledge(m); } finally { if(connection != null) { // Step 9. close the connection connection.close(); } } } }