Is it possible to send/receive message over jms topic defined in hornetq-jms.xml using AMQP 1.0 client?
pkhanal Jun 13, 2014 1:53 PMHello Team,
Is it possible to use jms topic defined in hornetq-jms.xml from AMQP 1.0 client?
I have a queue and topic defined in hornetq-jms.xml as shown below -
<queue name="myqueue"> <entry name="myqueue"/> </queue> <topic name="mytopic"> <entry name="mytopic"/> </topic>
I used the proton-j sample provided with the HornetQ server distribution v2.4.0 final to send/receive message on the topic. I used the prefix jms.topic. to access the topic defined. However, the receiver is not able to receive the message doing so. If I change the code to use queue instead using jms.queue. prefix, it works. Am I missing something or accessing the topic defined in hornetq-jms.xml from AMQP 1.0 client is not supported? Eventually my plan is to use the topic defined in hornetq-jms.xml using Qpid JMS client which provides AMQP 1.0 fluent JMS implementation.
Sample Code
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.client.Session;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
public class ProtonJExample
{
public static void main(String[] args) throws Exception
{
runExample();
}
private static void runExample() throws Exception
{
Connection connection = null;
try
{
// Step 1. Create an amqp qpid 1.0 connection
connection= new Connection("localhost", 5673, "guest", "guest");
// Step 2. Create a session
Session session = connection.createSession();
// Step 3. Create a sender
// Sender sender = session.createSender("jms.queue.myqueue"); // This works!
Sender sender = session.createSender("jms.topic.mytopic");
List<Section> sections = new ArrayList<Section>();
Header header = new Header();
Message message = new Message("hello world");
// Step 4. send a simple message
sender.send(message);
// Step 5. create a moving receiver, this means the message will be removed from the queue
// Receiver rec = session.createMovingReceiver("jms.queue.myqueue"); // This works!
Receiver rec = session.createMovingReceiver("jms.topic.mytopic");
// Step 6. set some credit so we can receive
rec.setCredit(UnsignedInteger.valueOf(1), false);
// Step 7. receive the simple message
Message m = rec.receive(5000);
System.out.println("Header = " + m.getPayload().get(0));
System.out.println("Message Annotations = " + m.getPayload().get(1));
System.out.println("Properties = " + m.getPayload().get(2));
System.out.println("Application Properties = " + m.getPayload().get(3));
System.out.println("Amqp Value = " + m.getPayload().get(4));
System.out.println("Footer = " + m.getPayload().get(5));
// Step 8. acknowledge the message
rec.acknowledge(m);
}
finally
{
if(connection != null)
{
// Step 9. close the connection
connection.close();
}
}
}
}