HornetQ pub sub not working on two wildfly 9.0.2 instances
massimiliano.masi Jan 14, 2016 6:40 AMHi All,
I have two wildfly 9.0.2 instances and I want to publish a message from application A, running in instance 1 to application B in instance 1 and application C in instance 2.
B and C have the same consumer code, just deployed in two different instances.
The queueing works just fine when producing in the same wildfly instance (A produces, B consumes), but application C never receive the message. C and B start before A.
I think I am missing some basics here: why instance B is not receiving the message?
In standalone.xml of instance 1 I created the following connectors (security is disabled):
<connector name="netty-throughput"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="port" value="5455"/> <param key="batch-delay" value="50"/> </connector> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="port" value="5445"/> </connector>
and the following acceptor:
<acceptor name="netty-throughput"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="port" value="5455"/> <param key="batch-delay" value="50"/> <param key="direct-deliver" value="false"/> </acceptor>
I defined a JMS destination as:
<jms-queue name="/queue/auditClientConfigurationQueue"> <entry name="/queue/auditClientConfigurationQueue"/> </jms-queue>
In instance 2 I have defined the same connector and the same JMS destination.
The code that I use to produce (from application A instance 1 is:)
Connection connection = null;
try {
// Before starting the connection, we must make sure that
// there are at least
// one connector specified, otherwise we need to reverse to
// the configuration.
ConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF);
connection = cf.createConnection();
} catch (JMSException e) {
l.debug("Connection failed, error was: " + e.getMessage());
l.debug("No default connectors seems to be present, let's fallback with the default (HARDCODED). "
+ DEFAULT_QUEUE_HOST + ":" + DEFAULT_QUEUE_PORT);
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.PORT_PROP_NAME, "5445");
connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration transportConfiguration = new TransportConfiguration(
NettyConnectorFactory.class.getName(), connectionParams);
try {
ConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
transportConfiguration);
connection = cf.createConnection();
} catch (Throwable exc) {
l.error("Unable to start the connection on the default ports. Error is: " + exc.getMessage(),
exc);
throw new RuntimeException(exc);
}
l.debug("Connection established. Binding to default ports and services");
then I search the queue:
l.debug("Loading the JNDI initial context");
InitialContext jndiContext = null;
Queue queue = null;
try {
jndiContext = new InitialContext();
System.out.println(jndiContext.lookup(QUEUE_NAME));
queue = (Queue) jndiContext.lookup(QUEUE_NAME);
} catch (Throwable e) {
l.error(e);
e.printStackTrace();
throw new RuntimeException(e);
}
l.debug("Loaded the JNDI Initial context. Now looking up to the queue. This is hardcoded to "
+ QUEUE_NAME);
and I produce:
l.debug("Starting the session");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
l.debug("Starting");
connection.start();
MessageProducer producer = session.createProducer(queue);
String myMessage = "AUDIT_CLIENT_RELOAD_CONFIGURATION";
TextMessage message = session.createTextMessage(myMessage);
producer.send(message);
The receiver has the same configuration and connection settings, and it receives as (in a while(true) loop):
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
L.debug("Starting the consumer");
MessageConsumer messageConsumer = session.createConsumer(queue);
L.debug("Starting");
connection.start();
while (true) {
L.debug("Waiting in the queue");
TextMessage messageReceived = (TextMessage) messageConsumer.receive();
String received = messageReceived.getText();
L.debug("Message received: " + received + " delivery time is "
+ messageReceived.getJMSDeliveryTime());