4 Replies Latest reply on Jun 12, 2014 1:42 AM by seb m

    Consumer Unable to receive messages from Producer in topic

    seb m Newbie

      Hi Team,

       

      I am just novice player in HornetQ. Kindly help me out to get this resolved.

       

      I am using a standalone hornetq-2.3.0.Final

       

      According to our requirement we need Topic so I have implemented a sample application for Topic using JMS Configurations. I am able to produce the messages from producer but unable to receive the messages to the consumer. please help me out where I am doing mistake.

       

      Please find my configuration file and class files.

       

      Configuration in my standalone HornetQ server

      E:\XXX\Hornetq\hornetq-2.3.0.Final\config\stand-alone\non-clustered

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      Hornetq-configuration.xml

       

      <configuration xmlns="urn:hornetq"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       


        <failover-on-shutdown>true</failover-on-shutdown>
         <shared-store>true</shared-store>
         <persistence-enabled>true</persistence-enabled>
         <security-enabled>false</security-enabled>
         <paging-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/paging</paging-directory>  
         <bindings-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/bindings</bindings-directory>  
         <journal-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/journal</journal-directory>  
         <journal-min-files>5</journal-min-files>
        
         <large-messages-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/large-messages</large-messages-directory>
         <management-address>jms.queue.hornetq.management</management-address>
         <journal-sync-non-transactional>true</journal-sync-non-transactional>
         <journal-sync-transactional>true</journal-sync-transactional>
         <journal-type>ASYNCIO</journal-type>
         <connection-ttl-override>70000</connection-ttl-override>
         <connectors>  
        
          <connector name="netty-connector">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="10.0.2.65"/>
               <param key="port"  value="5445"/>
          </connector>
         </connectors>
         <acceptors>
         
            <acceptor name="netty-acceptor">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="use-nio"  value="true"/>
         <param key="host"  value="10.0.2.65"/>
               <param key="port"  value="5445"/>
               <param key="tcp-no-delay" value="true"/>
               <param key="tcp-send-buffer-size" value="524288"/>
               <param key="tcp-receive-buffer-size" value="524288"/>
            </acceptor>
         </acceptors>

        <address-settings>     
            <address-setting match="#">
                 <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                 <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                 <redelivery-delay>0</redelivery-delay>
                 <page-size-bytes>10485760</page-size-bytes>      
                 <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <max-size-bytes>104857600</max-size-bytes>
          <address-full-policy>PAGE</address-full-policy>        
            </address-setting>
         </address-settings>

         <!-- Added for Topic related on 05 june 14-->
         <security-settings>
            <!--security for example topic-->
            <security-setting match="jms.topic.exampleTopic">
               <permission type="createDurableQueue" roles="guest"/>
               <permission type="deleteDurableQueue" roles="guest"/>
               <permission type="createNonDurableQueue" roles="guest"/>
               <permission type="deleteNonDurableQueue" roles="guest"/>
               <permission type="consume" roles="guest"/>
               <permission type="send" roles="guest"/>
            </security-setting>
         </security-settings>

      </configuration>

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      hornet-jms.xml


      <configuration xmlns="urn:hornetq"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

           <connection-factory name="ConnectionFactory">
        <client-failure-check-period>60000</client-failure-check-period>
            <connectors>
               <connector-ref connector-name="netty-connector"/>
            </connectors>
            <entries>
               <entry name="ConnectionFactory"/>
            </entries>
      <!--  <connection-ttl>10</connection-ttl> -->
         </connection-factory>
        
         <topic name="exampleTopic"><entry name="/topic/exampleTopic"/></topic>
      </configuration>

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      My first sample producer class

      producer


      package org.hornetq.jms.example;
      import java.util.Properties;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.InitialContext;

      //import org.hornetq.common.example.HornetQExample;

      public class producer //extends HornetQExample
      {

         
          public static void main(String[] args) throws Exception {
              String[] argss = {"jnp://10.0.2.65:1099"};
              //System.out.println("args[0]:" + args[0]);new producer().run(args);
              System.out.println("args[0]:" + argss[0]);new producer().run(argss);

          }
         
          public static void run(String[] argss) throws Exception{
              producer.getContext(0);
              runExample();
             
          }
         
           public static InitialContext getContext(final int serverId) throws Exception
         {
            //HornetQExample.log.info("using " + argss[0] + " for jndi");
            Properties props = new Properties();
            props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
            props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            return new InitialContext(props);
         }

          public static boolean runExample() throws Exception {
              Connection connection = null;
              InitialContext initialContext = null;
              try {
                 
                   initialContext = getContext(0);
                   Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");
                   ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
                   connection = cf.createConnection();
                   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   MessageProducer producer = session.createProducer(topic);
                  
                  for (int i = 1; i < 10; i++) {
                      for (int someID = 1; someID <= 2; someID++) {                   
                          TextMessage message1 = session.createTextMessage("Producing The is a text message " + i + " sent for someID=" + someID);                   
                          message1.setIntProperty("someID", someID);                   
                          producer.send(message1);
                          System.out.println("Sent message: " + message1.getText());
                      }
                  }

                  return true;
              } catch (Exception e) {
                  e.printStackTrace();
                  return false;
              } finally {
                  // Step 15. Be sure to close our JMS resources!
                  if (initialContext != null) {
                      initialContext.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              }

          }

      }

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      My consumer class


      package org.hornetq.jms.example;

      import java.util.Properties;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.InitialContext;

      import org.hornetq.common.example.HornetQExample;


      public class consumer// extends HornetQExample
      {

          public static void main(String[] args) throws Exception {
              String[] argss = {"jnp://10.0.2.65:1099"};
              //System.out.println("args[0]:" + args[0]);new producer().run(args);
              System.out.println("args[0]:" + argss[0]);new consumer().run(argss);
              System.out.println("...........................This line is from consumer...........................");

          }
         
          public static void run(String[] argss) throws Exception{
              consumer.getContext(0);
              runExample();
             
          }
         
           public static InitialContext getContext(final int serverId) throws Exception
         {
            //HornetQExample.log.info("using " + argss[0] + " for jndi");
            Properties props = new Properties();
            props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
            props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            return new InitialContext(props);
         }

         
          public static boolean runExample() throws Exception {
              Connection connection = null;
              InitialContext initialContext = null;
              try {

                  initialContext = getContext(0);
                  Topic topic = (Topic) initialContext.lookup("/topic/exampleTopic");           
                  ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
                  connection = cf.createConnection();
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  MessageConsumer messageConsumer = session.createConsumer(topic);
                  connection.start();

                  System.out.println("*************************************************************");
                  System.out.println("MessageConsumer3 will receive every message:");
                  for (;;) {
                      TextMessage messageReceivedC = (TextMessage) messageConsumer.receive(1000);
                      if (messageReceivedC == null) {
                          break;
                      }
                      System.out.println("messageConsumer3 received " + messageReceivedC.getText()+ " someID = "+ messageReceivedC.getIntProperty("someID"));
                      messageReceivedC.acknowledge();
                  }

                  messageConsumer.close();

                  return true;
              } catch (Exception e) {
                  System.out.println(""+e.getMessage());
                  return false;
              } finally {
                  // Step 15. Be sure to close our JMS resources!
                  if (initialContext != null) {
                      initialContext.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              }

          }

      }

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      output of producer:

      args[0]:jnp://10.0.2.65:1099

      Sent message: Producing The is a text message 1 sent for someID=1

      Sent message: Producing The is a text message 1 sent for someID=2

      Sent message: Producing The is a text message 2 sent for someID=1

      Sent message: Producing The is a text message 2 sent for someID=2

      Sent message: Producing The is a text message 3 sent for someID=1

      Sent message: Producing The is a text message 3 sent for someID=2

      Sent message: Producing The is a text message 4 sent for someID=1

      Sent message: Producing The is a text message 4 sent for someID=2

      Sent message: Producing The is a text message 5 sent for someID=1

      Sent message: Producing The is a text message 5 sent for someID=2

      Sent message: Producing The is a text message 6 sent for someID=1

      Sent message: Producing The is a text message 6 sent for someID=2

      Sent message: Producing The is a text message 7 sent for someID=1

      Sent message: Producing The is a text message 7 sent for someID=2

      Sent message: Producing The is a text message 8 sent for someID=1

      Sent message: Producing The is a text message 8 sent for someID=2

      Sent message: Producing The is a text message 9 sent for someID=1

      Sent message: Producing The is a text message 9 sent for someID=2

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      output of consumer.

       

       

      args[0]:jnp://10.0.2.65:1099

      *************************************************************

      MessageConsumer3 will receive every message:

      ...........................This line is from consumer...........................

       

       

      please help me out where I am missing.

       

      Br

      S.