Consumer Unable to receive messages from Producer in topic
sebtainmdr Jun 7, 2014 12:18 AMHi Team,
I am just novice player in HornetQ. Kindly help me out to get this resolved.
I am using a standalone hornetq-2.3.0.Final
According to our requirement we need Topic so I have implemented a sample application for Topic using JMS Configurations. I am able to produce the messages from producer but unable to receive the messages to the consumer. please help me out where I am doing mistake.
Please find my configuration file and class files.
Configuration in my standalone HornetQ server
E:\XXX\Hornetq\hornetq-2.3.0.Final\config\stand-alone\non-clustered
-----------------------------------------------------------------------------------------------------------------------------------------------------
Hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<failover-on-shutdown>true</failover-on-shutdown>
<shared-store>true</shared-store>
<persistence-enabled>true</persistence-enabled>
<security-enabled>false</security-enabled>
<paging-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/paging</paging-directory>
<bindings-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/bindings</bindings-directory>
<journal-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/journal</journal-directory>
<journal-min-files>5</journal-min-files>
<large-messages-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/large-messages</large-messages-directory>
<management-address>jms.queue.hornetq.management</management-address>
<journal-sync-non-transactional>true</journal-sync-non-transactional>
<journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<connection-ttl-override>70000</connection-ttl-override>
<connectors>
<connector name="netty-connector">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="10.0.2.65"/>
<param key="port" value="5445"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="use-nio" value="true"/>
<param key="host" value="10.0.2.65"/>
<param key="port" value="5445"/>
<param key="tcp-no-delay" value="true"/>
<param key="tcp-send-buffer-size" value="524288"/>
<param key="tcp-receive-buffer-size" value="524288"/>
</acceptor>
</acceptors>
<address-settings>
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<max-size-bytes>104857600</max-size-bytes>
<address-full-policy>PAGE</address-full-policy>
</address-setting>
</address-settings>
<!-- Added for Topic related on 05 june 14-->
<security-settings>
<!--security for example topic-->
<security-setting match="jms.topic.exampleTopic">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</configuration>
-----------------------------------------------------------------------------------------------------------------------------------------------------
hornet-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<client-failure-check-period>60000</client-failure-check-period>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<!-- <connection-ttl>10</connection-ttl> -->
</connection-factory>
<topic name="exampleTopic"><entry name="/topic/exampleTopic"/></topic>
</configuration>
-----------------------------------------------------------------------------------------------------------------------------------------------------
My first sample producer class
producer
package org.hornetq.jms.example;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
//import org.hornetq.common.example.HornetQExample;
public class producer //extends HornetQExample
{
public static void main(String[] args) throws Exception {
String[] argss = {"jnp://10.0.2.65:1099"};
//System.out.println("args[0]:" + args[0]);new producer().run(args);
System.out.println("args[0]:" + argss[0]);new producer().run(argss);
}
public static void run(String[] argss) throws Exception{
producer.getContext(0);
runExample();
}
public static InitialContext getContext(final int serverId) throws Exception
{
//HornetQExample.log.info("using " + argss[0] + " for jndi");
Properties props = new Properties();
props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
return new InitialContext(props);
}
public static boolean runExample() throws Exception {
Connection connection = null;
InitialContext initialContext = null;
try {
initialContext = getContext(0);
Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);
for (int i = 1; i < 10; i++) {
for (int someID = 1; someID <= 2; someID++) {
TextMessage message1 = session.createTextMessage("Producing The is a text message " + i + " sent for someID=" + someID);
message1.setIntProperty("someID", someID);
producer.send(message1);
System.out.println("Sent message: " + message1.getText());
}
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
// Step 15. Be sure to close our JMS resources!
if (initialContext != null) {
initialContext.close();
}
if (connection != null) {
connection.close();
}
}
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------
My consumer class
package org.hornetq.jms.example;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.hornetq.common.example.HornetQExample;
public class consumer// extends HornetQExample
{
public static void main(String[] args) throws Exception {
String[] argss = {"jnp://10.0.2.65:1099"};
//System.out.println("args[0]:" + args[0]);new producer().run(args);
System.out.println("args[0]:" + argss[0]);new consumer().run(argss);
System.out.println("...........................This line is from consumer...........................");
}
public static void run(String[] argss) throws Exception{
consumer.getContext(0);
runExample();
}
public static InitialContext getContext(final int serverId) throws Exception
{
//HornetQExample.log.info("using " + argss[0] + " for jndi");
Properties props = new Properties();
props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
return new InitialContext(props);
}
public static boolean runExample() throws Exception {
Connection connection = null;
InitialContext initialContext = null;
try {
initialContext = getContext(0);
Topic topic = (Topic) initialContext.lookup("/topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(topic);
connection.start();
System.out.println("*************************************************************");
System.out.println("MessageConsumer3 will receive every message:");
for (;;) {
TextMessage messageReceivedC = (TextMessage) messageConsumer.receive(1000);
if (messageReceivedC == null) {
break;
}
System.out.println("messageConsumer3 received " + messageReceivedC.getText()+ " someID = "+ messageReceivedC.getIntProperty("someID"));
messageReceivedC.acknowledge();
}
messageConsumer.close();
return true;
} catch (Exception e) {
System.out.println(""+e.getMessage());
return false;
} finally {
// Step 15. Be sure to close our JMS resources!
if (initialContext != null) {
initialContext.close();
}
if (connection != null) {
connection.close();
}
}
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------
output of producer:
args[0]:jnp://10.0.2.65:1099
Sent message: Producing The is a text message 1 sent for someID=1
Sent message: Producing The is a text message 1 sent for someID=2
Sent message: Producing The is a text message 2 sent for someID=1
Sent message: Producing The is a text message 2 sent for someID=2
Sent message: Producing The is a text message 3 sent for someID=1
Sent message: Producing The is a text message 3 sent for someID=2
Sent message: Producing The is a text message 4 sent for someID=1
Sent message: Producing The is a text message 4 sent for someID=2
Sent message: Producing The is a text message 5 sent for someID=1
Sent message: Producing The is a text message 5 sent for someID=2
Sent message: Producing The is a text message 6 sent for someID=1
Sent message: Producing The is a text message 6 sent for someID=2
Sent message: Producing The is a text message 7 sent for someID=1
Sent message: Producing The is a text message 7 sent for someID=2
Sent message: Producing The is a text message 8 sent for someID=1
Sent message: Producing The is a text message 8 sent for someID=2
Sent message: Producing The is a text message 9 sent for someID=1
Sent message: Producing The is a text message 9 sent for someID=2
-----------------------------------------------------------------------------------------------------------------------------------------------------
output of consumer.
args[0]:jnp://10.0.2.65:1099
*************************************************************
MessageConsumer3 will receive every message:
...........................This line is from consumer...........................
please help me out where I am missing.
Br
S.