HornetQ pub sub not working on two wildfly 9.0.2 instances
massimiliano.masi Jan 14, 2016 6:40 AMHi All,
I have two wildfly 9.0.2 instances and I want to publish a message from application A, running in instance 1 to application B in instance 1 and application C in instance 2.
B and C have the same consumer code, just deployed in two different instances.
The queueing works just fine when producing in the same wildfly instance (A produces, B consumes), but application C never receive the message. C and B start before A.
I think I am missing some basics here: why instance B is not receiving the message?
In standalone.xml of instance 1 I created the following connectors (security is disabled):
<connector name="netty-throughput"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="port" value="5455"/> <param key="batch-delay" value="50"/> </connector> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="port" value="5445"/> </connector>
and the following acceptor:
<acceptor name="netty-throughput"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="port" value="5455"/> <param key="batch-delay" value="50"/> <param key="direct-deliver" value="false"/> </acceptor>
I defined a JMS destination as:
<jms-queue name="/queue/auditClientConfigurationQueue"> <entry name="/queue/auditClientConfigurationQueue"/> </jms-queue>
In instance 2 I have defined the same connector and the same JMS destination.
The code that I use to produce (from application A instance 1 is:)
Connection connection = null; try { // Before starting the connection, we must make sure that // there are at least // one connector specified, otherwise we need to reverse to // the configuration. ConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF); connection = cf.createConnection(); } catch (JMSException e) { l.debug("Connection failed, error was: " + e.getMessage()); l.debug("No default connectors seems to be present, let's fallback with the default (HARDCODED). " + DEFAULT_QUEUE_HOST + ":" + DEFAULT_QUEUE_PORT); Map<String, Object> connectionParams = new HashMap<String, Object>(); connectionParams.put(TransportConstants.PORT_PROP_NAME, "5445"); connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost"); TransportConfiguration transportConfiguration = new TransportConfiguration( NettyConnectorFactory.class.getName(), connectionParams); try { ConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration); connection = cf.createConnection(); } catch (Throwable exc) { l.error("Unable to start the connection on the default ports. Error is: " + exc.getMessage(), exc); throw new RuntimeException(exc); } l.debug("Connection established. Binding to default ports and services");
then I search the queue:
l.debug("Loading the JNDI initial context"); InitialContext jndiContext = null; Queue queue = null; try { jndiContext = new InitialContext(); System.out.println(jndiContext.lookup(QUEUE_NAME)); queue = (Queue) jndiContext.lookup(QUEUE_NAME); } catch (Throwable e) { l.error(e); e.printStackTrace(); throw new RuntimeException(e); } l.debug("Loaded the JNDI Initial context. Now looking up to the queue. This is hardcoded to " + QUEUE_NAME);
and I produce:
l.debug("Starting the session"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); l.debug("Starting"); connection.start(); MessageProducer producer = session.createProducer(queue); String myMessage = "AUDIT_CLIENT_RELOAD_CONFIGURATION"; TextMessage message = session.createTextMessage(myMessage); producer.send(message);
The receiver has the same configuration and connection settings, and it receives as (in a while(true) loop):
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); L.debug("Starting the consumer"); MessageConsumer messageConsumer = session.createConsumer(queue); L.debug("Starting"); connection.start(); while (true) { L.debug("Waiting in the queue"); TextMessage messageReceived = (TextMessage) messageConsumer.receive(); String received = messageReceived.getText(); L.debug("Message received: " + received + " delivery time is " + messageReceived.getJMSDeliveryTime());