2 Replies Latest reply on Jul 1, 2011 2:03 AM by anujboss

    Can't receive messages from queue despite.. queue showing messages

    anujboss

      Hi,

      I am new hornetq . I just wanted to create a simple producer consumer scenario on the localhostw.

      The environment is something like this . I manually start Hornetq Server (2.2.2) on my localhost with default configuration. (bin/run.sh)

      Now in an IDE I have two codes, a producer whose job is to send messages on a queue created on the server (in the first run only)..

      a consumer whose job is to receive and consume messages from the same queue.. but despite showing messages on the queue using management apis , my consumer.receive() hangs and consumer.receiveImmediate() throws NullPointer Exception....

       

      Consumer

       

      import org.hornetq.api.core.HornetQException;

      import org.hornetq.api.core.SimpleString;

      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.core.client.*;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

       

       

      import java.util.HashMap;

      import java.util.Map;

       

       

      public class HornetConsumerClient

      {

        public static void main (String args[])

        {

            Map<String, Object> connectionParams = new HashMap<String, Object>();

       

       

              connectionParams.put(TransportConstants.HOST_PROP_NAME,"localhost");

              connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,

                          5445);

       

       

       

       

              TransportConfiguration transportConfiguration = new TransportConfiguration(

                                                              NettyConnectorFactory.class.getName(),

                                                              connectionParams);

       

       

              ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);

              ClientSessionFactory sf = null;

              try

              {

                  sf=serverLocator.createSessionFactory();

                  final String queueName = "queue.exampleQueue";

                  ClientSession sfSession = sf.createSession();

       

       

                  ClientSession.QueueQuery q = sfSession.queueQuery(SimpleString.toSimpleString(queueName));

                  System.out.println(q.isExists());

       

       

                  ClientSession.BindingQuery bq = sfSession.bindingQuery(SimpleString.toSimpleString(queueName));

       

       

                  if (bq.isExists()) {

                      for (SimpleString ss : bq.getQueueNames()) {

                          System.out.println("Queue name :: " + ss);

                          try {

                              q = sfSession.queueQuery(ss);

                              System.out.println("\tQueue Address        :: " + q.getAddress());

                              System.out.println("\tQueue Consumer Count :: " + q.getConsumerCount());

                              System.out.println("\tQueue Message Count  :: " + q.getMessageCount());

                              System.out.println("\tQueue Filter String  :: " + q.getFilterString());

                              System.out.println("\tQueue Is Durable     :: " + q.isDurable());

                          } catch (HornetQException ex) {

                              System.out.println("Queue query to address does not exists..." + ss);

                          }

                      }

                  } else {

                      System.out.println("Binding query to address does not exists..." + queueName);

                  }

       

                        ClientConsumer consumer = sfSession.createConsumer(queueName,"",0,-1,false);

                      System.out.println("Consuming the message.");

                  ClientMessage message= consumer.receive();

                  System.out.println("ho");

                  System.out.println("Received Message: "+message.getStringProperty("myprop"));

       

       

              }

              catch(Exception e)

              {

                e.printStackTrace();

              }

              finally

              {

                  if(sf!=null)

                      {

                          sf.close();

                      }

              }

        }

       

       

      }

       

      Output

      true

      Queue name :: queue.exampleQueue

                Queue Address        :: queue.exampleQueue

                Queue Consumer Count :: 3

                Queue Message Count  :: 15

                Queue Filter String  :: null

                Queue Is Durable     :: false

      Consuming the message.

      //it stops here and when I manually stop it exits with code 143

       

       

      Producer //jst in case

      import org.hornetq.api.core.SimpleString;

      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.core.client.*;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

       

       

      import java.util.Date;

      import java.util.HashMap;

      import java.util.Map;

       

      public class HornetProducerClient

      {

       

       

          public static void main (String args[])

          {

       

       

              Map<String, Object> connectionParams = new HashMap<String, Object>();

       

       

              connectionParams.put(TransportConstants.HOST_PROP_NAME,"localhost");

              connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,

                          5445);

       

       

       

       

              TransportConfiguration transportConfiguration = new TransportConfiguration(

                                                              NettyConnectorFactory.class.getName(),

                                                              connectionParams);

       

       

              ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);

                    ClientSessionFactory sf=null;

              ClientProducer producer=null;

                  try

                  {

                       sf  = serverLocator.createSessionFactory();

                    //  ClientSession coreSession = sf.createSession(false, false, false);

                      final String queueName = "queue.exampleQueue";

                    //  coreSession.createQueue(queueName, queueName, false);

                    //   coreSession.close();

                      ClientSession msg;

                      msg = sf.createSession();

                      producer = msg.createProducer(SimpleString.toSimpleString(queueName), 10);

       

       

                      ClientMessage message = msg.createMessage(true);

                      message.putStringProperty("myprop", "Hello sent at " + new Date());

                      System.out.println("Sending the message.");

              producer.send(message);

       

       

       

       

                  }

                  catch (Exception e)

                  {

                      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                  }

                  finally

                  {

                      if(sf!=null)

                      {

                          sf.close();

       

       

                      }

                  }

          }

       

       

       

       

       

       

      }