Allright, I've created a test case that replicates the problem:
import core.hornetq.HornetQStompServer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
/**
* @author: Pål Evensen
* Date: Jun 22, 2010
*/
public class HornetQTest {
public static void main(String args[]) {
try {
HornetQStompServer server = new HornetQStompServer();
server.createQueue("soccer.events.goals", false);
ClientMessage clientMessage = server.getClientSession().createMessage(true);
clientMessage.putIntProperty("matchId", 1);
server.sendSimpleMessage("soccer.events.goals", clientMessage);
//Create queue browser, subscribing to the queue
QueueSubscriber queueSubscriber = new QueueSubscriber(server);
Thread t1 = new Thread(queueSubscriber);
t1.start();
//Create queue browser, subscribing to the queue
QueueSubscriber queueSubscriber2 = new QueueSubscriber(server);
Thread t2 = new Thread(queueSubscriber2);
t2.start();
ClientMessage clientMessage2 = server.getClientSession().createMessage(true);
clientMessage2.putIntProperty("matchId", 2);
Thread.currentThread().sleep(1000);
System.out.println("woke up");
server.sendSimpleMessage("soccer.events.goals", clientMessage2);
} catch (Exception e) {
e.printStackTrace();
}
}
}
#########################################
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
/**
* @author: Pål Evensen
* Date: Jun 22, 2010
*/
public class QueueSubscriber implements Runnable {
private final String queueName; //Queue to be consumed
private final core.hornetq.HornetQStompServer stompServer;
public QueueSubscriber(core.hornetq.HornetQStompServer stompServer)
throws TransformerException, ParserConfigurationException {
queueName = "soccer.events.goals";
this.stompServer = stompServer;
}
public void run() {
ClientSession session = stompServer.getClientSession();
try {
//ClientConsumer messageConsumer = session.createConsumer(queueName, "matchId=1 OR matchId=2 OR matchId=3 OR matchId=4");
//Creates queue browser, not consumer
//FIXME: This does not pick up new messages from the queue.
ClientConsumer messageConsumer = session.createConsumer(queueName, "matchId=1 OR matchId=2 OR matchId=3 OR matchId=4", true);
for(;;) {
try {
// Step 8. Receive the message.
ClientMessage messageReceived = messageConsumer.receive();
System.out.println(Thread.currentThread().getName()+": found event. MatchId: " + messageReceived.getIntProperty("matchId"));
}
catch(Exception e) {
e.printStackTrace();
}
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
// Step 9. Be sure to close our resources!
if (session != null)
{
try {
session.close();
} catch (HornetQException e) {
e.printStackTrace();
}
}
}
}
}
#####################
package core.hornetq;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* @author: Pål Evensen
* Date: Apr 5, 2010
*/
/**
* Create HornetQ Server with a Stomp Connector.
*/
public class HornetQStompServer {
private HornetQServer hornetqServer;
private ClientSession session;
private Set<ClientProducer> producers;
public HornetQStompServer() throws Exception {
FileConfiguration configuration = new FileConfiguration();
//configuration.setConfigurationUrl("file:///home/paale/projects/eventmw/config/hornetq-configuration.xml");
String path = System.getProperty("user.dir");
configuration.setConfigurationUrl("file:///"+ path +"/config/hornetq-configuration.xml");
configuration.setSecurityEnabled(false);
configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
configuration.start();
// we create the HornetQ core using this config
hornetqServer = HornetQServers
.newHornetQServer(configuration);
hornetqServer.start();
createClientSession();
session.start();
producers = new HashSet<ClientProducer>();
}
/**
* Creates and sets up a client session to be used by other classes
* @throws Exception
*/
private void createClientSession() throws Exception {
ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
//ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
//session = nettyFactory.createSession("paale", "moordi", false, true, true, true, 64);
session = nettyFactory.createSession();
}
public void sendSimpleMessage(String address, ClientMessage message) throws HornetQException {
createQueue(address, false);
getClientProducer(address).send(message);
}
private ClientProducer getClientProducer(String address) throws HornetQException {
ClientProducer producer = null;
for(ClientProducer p : producers) {
if(address.equalsIgnoreCase(p.getAddress().toString())) {
producer = p;
break;
}
}
if(producer == null) {
producer = session.createProducer(address);
producers.add(producer);
}
return producer;
}
public ClientSession getClientSession() {
return session;
}
/**
* Creates a new queue if the specified queue doesn't exist
* @param queueName
*/
public void createQueue(String queueName, boolean persistent) throws HornetQException{
ClientSession.QueueQuery result = session.queueQuery(new SimpleString(queueName));
if(!result.isExists())
session.createQueue(queueName, queueName, persistent);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq/schema/hornetq-configuration.xsd">
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="stomp-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="protocol" value="stomp"/>
<param key="host" value="localhost"/>
<param key="port" value="61613"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createDurableQueue" roles="admin"/>
<permission type="deleteDurableQueue" roles="admin"/>
<permission type="createTempQueue" roles="admin, generic-client"/>
<permission type="deleteTempQueue" roles="admin"/>
<permission type="createNonDurableQueue" roles="admin, generic-client"/>
<permission type="consume" roles="admin, generic-client"/>
<permission type="send" roles="admin"/>
</security-setting>
<security-setting match="jms.topic.soccer.#">
<permission type="createDurableQueue" roles="admin, generic-client"/>
<permission type="deleteDurableQueue" roles="admin"/>
<permission type="createNonDurableQueue" roles="admin, generic-client, guest"/>
<permission type="deleteNonDurableQueue" roles="admin"/>
<permission type="createTempQueue" roles="admin, generic-client, guest"/>
<permission type="send" roles="admin"/>
<permission type="consume" roles="admin, generic-client, guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
<queues>
<queue name="soccer.events.goals">
<address>soccer.events.goals</address>
<durable>false</durable>
</queue>
</queues>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<!-- false to disable JMX management for HornetQ -->
<jmx-management-enabled>true</jmx-management-enabled>
</configuration>
Running this example as posted here, I get the following output:
Thread-1: found event. MatchId: 1
Thread-2: found event. MatchId: 1
woke up
Notice that the second event is never received.
By using the message consumer method instead of the message browser (both in bold), I get the following output (as expected):
Thread-1: found event. MatchId: 1
woke up
Thread-2: found event. MatchId: 2
This might be a threading issue that I don't understand, because removing the sleep() (also in bold) statement enables both methods to work as expected.