4 Replies Latest reply on Jun 12, 2014 1:42 AM by sebtainmdr

    Consumer Unable to receive messages from Producer in topic

    sebtainmdr

      Hi Team,

       

      I am just novice player in HornetQ. Kindly help me out to get this resolved.

       

      I am using a standalone hornetq-2.3.0.Final

       

      According to our requirement we need Topic so I have implemented a sample application for Topic using JMS Configurations. I am able to produce the messages from producer but unable to receive the messages to the consumer. please help me out where I am doing mistake.

       

      Please find my configuration file and class files.

       

      Configuration in my standalone HornetQ server

      E:\XXX\Hornetq\hornetq-2.3.0.Final\config\stand-alone\non-clustered

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      Hornetq-configuration.xml

       

      <configuration xmlns="urn:hornetq"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       


        <failover-on-shutdown>true</failover-on-shutdown>
         <shared-store>true</shared-store>
         <persistence-enabled>true</persistence-enabled>
         <security-enabled>false</security-enabled>
         <paging-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/paging</paging-directory>  
         <bindings-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/bindings</bindings-directory>  
         <journal-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/journal</journal-directory>  
         <journal-min-files>5</journal-min-files>
        
         <large-messages-directory>E:/XXX/Hornetq/hornetq-2.3.0.Final/hornetq-data/large-messages</large-messages-directory>
         <management-address>jms.queue.hornetq.management</management-address>
         <journal-sync-non-transactional>true</journal-sync-non-transactional>
         <journal-sync-transactional>true</journal-sync-transactional>
         <journal-type>ASYNCIO</journal-type>
         <connection-ttl-override>70000</connection-ttl-override>
         <connectors>  
        
          <connector name="netty-connector">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="10.0.2.65"/>
               <param key="port"  value="5445"/>
          </connector>
         </connectors>
         <acceptors>
         
            <acceptor name="netty-acceptor">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="use-nio"  value="true"/>
         <param key="host"  value="10.0.2.65"/>
               <param key="port"  value="5445"/>
               <param key="tcp-no-delay" value="true"/>
               <param key="tcp-send-buffer-size" value="524288"/>
               <param key="tcp-receive-buffer-size" value="524288"/>
            </acceptor>
         </acceptors>

        <address-settings>     
            <address-setting match="#">
                 <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                 <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                 <redelivery-delay>0</redelivery-delay>
                 <page-size-bytes>10485760</page-size-bytes>      
                 <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <max-size-bytes>104857600</max-size-bytes>
          <address-full-policy>PAGE</address-full-policy>        
            </address-setting>
         </address-settings>

         <!-- Added for Topic related on 05 june 14-->
         <security-settings>
            <!--security for example topic-->
            <security-setting match="jms.topic.exampleTopic">
               <permission type="createDurableQueue" roles="guest"/>
               <permission type="deleteDurableQueue" roles="guest"/>
               <permission type="createNonDurableQueue" roles="guest"/>
               <permission type="deleteNonDurableQueue" roles="guest"/>
               <permission type="consume" roles="guest"/>
               <permission type="send" roles="guest"/>
            </security-setting>
         </security-settings>

      </configuration>

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      hornet-jms.xml


      <configuration xmlns="urn:hornetq"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

           <connection-factory name="ConnectionFactory">
        <client-failure-check-period>60000</client-failure-check-period>
            <connectors>
               <connector-ref connector-name="netty-connector"/>
            </connectors>
            <entries>
               <entry name="ConnectionFactory"/>
            </entries>
      <!--  <connection-ttl>10</connection-ttl> -->
         </connection-factory>
        
         <topic name="exampleTopic"><entry name="/topic/exampleTopic"/></topic>
      </configuration>

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      My first sample producer class

      producer


      package org.hornetq.jms.example;
      import java.util.Properties;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.InitialContext;

      //import org.hornetq.common.example.HornetQExample;

      public class producer //extends HornetQExample
      {

         
          public static void main(String[] args) throws Exception {
              String[] argss = {"jnp://10.0.2.65:1099"};
              //System.out.println("args[0]:" + args[0]);new producer().run(args);
              System.out.println("args[0]:" + argss[0]);new producer().run(argss);

          }
         
          public static void run(String[] argss) throws Exception{
              producer.getContext(0);
              runExample();
             
          }
         
           public static InitialContext getContext(final int serverId) throws Exception
         {
            //HornetQExample.log.info("using " + argss[0] + " for jndi");
            Properties props = new Properties();
            props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
            props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            return new InitialContext(props);
         }

          public static boolean runExample() throws Exception {
              Connection connection = null;
              InitialContext initialContext = null;
              try {
                 
                   initialContext = getContext(0);
                   Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");
                   ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
                   connection = cf.createConnection();
                   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   MessageProducer producer = session.createProducer(topic);
                  
                  for (int i = 1; i < 10; i++) {
                      for (int someID = 1; someID <= 2; someID++) {                   
                          TextMessage message1 = session.createTextMessage("Producing The is a text message " + i + " sent for someID=" + someID);                   
                          message1.setIntProperty("someID", someID);                   
                          producer.send(message1);
                          System.out.println("Sent message: " + message1.getText());
                      }
                  }

                  return true;
              } catch (Exception e) {
                  e.printStackTrace();
                  return false;
              } finally {
                  // Step 15. Be sure to close our JMS resources!
                  if (initialContext != null) {
                      initialContext.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              }

          }

      }

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      My consumer class


      package org.hornetq.jms.example;

      import java.util.Properties;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.InitialContext;

      import org.hornetq.common.example.HornetQExample;


      public class consumer// extends HornetQExample
      {

          public static void main(String[] args) throws Exception {
              String[] argss = {"jnp://10.0.2.65:1099"};
              //System.out.println("args[0]:" + args[0]);new producer().run(args);
              System.out.println("args[0]:" + argss[0]);new consumer().run(argss);
              System.out.println("...........................This line is from consumer...........................");

          }
         
          public static void run(String[] argss) throws Exception{
              consumer.getContext(0);
              runExample();
             
          }
         
           public static InitialContext getContext(final int serverId) throws Exception
         {
            //HornetQExample.log.info("using " + argss[0] + " for jndi");
            Properties props = new Properties();
            props.put("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            props.put("java.naming.provider.url", "jnp://10.0.2.65:1099");//props.put("java.naming.provider.url", args[serverId]);
            props.put("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            return new InitialContext(props);
         }

         
          public static boolean runExample() throws Exception {
              Connection connection = null;
              InitialContext initialContext = null;
              try {

                  initialContext = getContext(0);
                  Topic topic = (Topic) initialContext.lookup("/topic/exampleTopic");           
                  ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
                  connection = cf.createConnection();
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  MessageConsumer messageConsumer = session.createConsumer(topic);
                  connection.start();

                  System.out.println("*************************************************************");
                  System.out.println("MessageConsumer3 will receive every message:");
                  for (;;) {
                      TextMessage messageReceivedC = (TextMessage) messageConsumer.receive(1000);
                      if (messageReceivedC == null) {
                          break;
                      }
                      System.out.println("messageConsumer3 received " + messageReceivedC.getText()+ " someID = "+ messageReceivedC.getIntProperty("someID"));
                      messageReceivedC.acknowledge();
                  }

                  messageConsumer.close();

                  return true;
              } catch (Exception e) {
                  System.out.println(""+e.getMessage());
                  return false;
              } finally {
                  // Step 15. Be sure to close our JMS resources!
                  if (initialContext != null) {
                      initialContext.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              }

          }

      }

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      output of producer:

      args[0]:jnp://10.0.2.65:1099

      Sent message: Producing The is a text message 1 sent for someID=1

      Sent message: Producing The is a text message 1 sent for someID=2

      Sent message: Producing The is a text message 2 sent for someID=1

      Sent message: Producing The is a text message 2 sent for someID=2

      Sent message: Producing The is a text message 3 sent for someID=1

      Sent message: Producing The is a text message 3 sent for someID=2

      Sent message: Producing The is a text message 4 sent for someID=1

      Sent message: Producing The is a text message 4 sent for someID=2

      Sent message: Producing The is a text message 5 sent for someID=1

      Sent message: Producing The is a text message 5 sent for someID=2

      Sent message: Producing The is a text message 6 sent for someID=1

      Sent message: Producing The is a text message 6 sent for someID=2

      Sent message: Producing The is a text message 7 sent for someID=1

      Sent message: Producing The is a text message 7 sent for someID=2

      Sent message: Producing The is a text message 8 sent for someID=1

      Sent message: Producing The is a text message 8 sent for someID=2

      Sent message: Producing The is a text message 9 sent for someID=1

      Sent message: Producing The is a text message 9 sent for someID=2

      -----------------------------------------------------------------------------------------------------------------------------------------------------

      output of consumer.

       

       

      args[0]:jnp://10.0.2.65:1099

      *************************************************************

      MessageConsumer3 will receive every message:

      ...........................This line is from consumer...........................

       

       

      please help me out where I am missing.

       

      Br

      S.

        • 1. Re: Consumer Unable to receive messages from Producer in topic
          jbertram

          Is the consumer running when the messages are produced?  If not, then there will be no subscription on the topic to receive the messages which means the messages will vanish.  That's the basic semantics of a topic.

          • 2. Re: Consumer Unable to receive messages from Producer in topic
            sebtainmdr

            Thanks Mr.Justin for the reply.

             

            I have mistaken by assuming consumer is unable to receive the messages.

            Where as i found the while producer is producing  the messages those messages are not getting stored in the sever observed using Jconsole.

            May be Producer and Hornetq server is unable to bind.

             

            Suggest me where am i doing mistake is it ,in configuring properties or programming end.

             

            Thanks,

            S.

            • 3. Re: Consumer Unable to receive messages from Producer in topic
              jbertram

              At this point I'm not sure you understand the basic semantics of a topic.  When a message is sent to a topic a subscription must exist to provide a place for the message to be stored.  If no applicable subscription exists on the topic when a message is sent then the message is deleted.  As I indicated before, that's the basic semantics of a topic.

               

              In your case, you need to make sure that the consumer is started before the producer sends messages.

              • 4. Re: Consumer Unable to receive messages from Producer in topic
                sebtainmdr

                Thanks Mr.Justin.

                 

                You are correct Topic Consumers has to be started first in order receive the messages from Producers.

                 

                For others viewers : Edited the Consumer programme - added a while(true)  loop in the Consumer. There after ran the programme(3 Consumers) , and there ran Producer.

                Able to receive the Messages.

                1 of 1 people found this helpful