Can't receive messages from queue despite.. queue showing messages
anujboss Jun 30, 2011 7:42 AMHi,
I am new hornetq . I just wanted to create a simple producer consumer scenario on the localhostw.
The environment is something like this . I manually start Hornetq Server (2.2.2) on my localhost with default configuration. (bin/run.sh)
Now in an IDE I have two codes, a producer whose job is to send messages on a queue created on the server (in the first run only)..
a consumer whose job is to receive and consume messages from the same queue.. but despite showing messages on the queue using management apis , my consumer.receive() hangs and consumer.receiveImmediate() throws NullPointer Exception....
Consumer
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import java.util.HashMap;
import java.util.Map;
public class HornetConsumerClient
{
public static void main (String args[])
{
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.HOST_PROP_NAME,"localhost");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
5445);
TransportConfiguration transportConfiguration = new TransportConfiguration(
NettyConnectorFactory.class.getName(),
connectionParams);
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);
ClientSessionFactory sf = null;
try
{
sf=serverLocator.createSessionFactory();
final String queueName = "queue.exampleQueue";
ClientSession sfSession = sf.createSession();
ClientSession.QueueQuery q = sfSession.queueQuery(SimpleString.toSimpleString(queueName));
System.out.println(q.isExists());
ClientSession.BindingQuery bq = sfSession.bindingQuery(SimpleString.toSimpleString(queueName));
if (bq.isExists()) {
for (SimpleString ss : bq.getQueueNames()) {
System.out.println("Queue name :: " + ss);
try {
q = sfSession.queueQuery(ss);
System.out.println("\tQueue Address :: " + q.getAddress());
System.out.println("\tQueue Consumer Count :: " + q.getConsumerCount());
System.out.println("\tQueue Message Count :: " + q.getMessageCount());
System.out.println("\tQueue Filter String :: " + q.getFilterString());
System.out.println("\tQueue Is Durable :: " + q.isDurable());
} catch (HornetQException ex) {
System.out.println("Queue query to address does not exists..." + ss);
}
}
} else {
System.out.println("Binding query to address does not exists..." + queueName);
}
ClientConsumer consumer = sfSession.createConsumer(queueName,"",0,-1,false);
System.out.println("Consuming the message.");
ClientMessage message= consumer.receive();
System.out.println("ho");
System.out.println("Received Message: "+message.getStringProperty("myprop"));
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
if(sf!=null)
{
sf.close();
}
}
}
}
Output
true
Queue name :: queue.exampleQueue
Queue Address :: queue.exampleQueue
Queue Consumer Count :: 3
Queue Message Count :: 15
Queue Filter String :: null
Queue Is Durable :: false
Consuming the message.
//it stops here and when I manually stop it exits with code 143
Producer //jst in case
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class HornetProducerClient
{
public static void main (String args[])
{
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.HOST_PROP_NAME,"localhost");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
5445);
TransportConfiguration transportConfiguration = new TransportConfiguration(
NettyConnectorFactory.class.getName(),
connectionParams);
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);
ClientSessionFactory sf=null;
ClientProducer producer=null;
try
{
sf = serverLocator.createSessionFactory();
// ClientSession coreSession = sf.createSession(false, false, false);
final String queueName = "queue.exampleQueue";
// coreSession.createQueue(queueName, queueName, false);
// coreSession.close();
ClientSession msg;
msg = sf.createSession();
producer = msg.createProducer(SimpleString.toSimpleString(queueName), 10);
ClientMessage message = msg.createMessage(true);
message.putStringProperty("myprop", "Hello sent at " + new Date());
System.out.println("Sending the message.");
producer.send(message);
}
catch (Exception e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
finally
{
if(sf!=null)
{
sf.close();
}
}
}
}