Problem in HornetQ RA activation
st.bolli Dec 14, 2010 8:53 AMHello,
I am encountering some troubles with a standalone and cluster-symmetric HornetQ v.2.1.2 configuration.
An MDB configured to listen to a topic running on a remote JBoss 5.1.0.GA AS cannot subscribe at JBoss startup.
I do not think it is a problem of a deployment order, because:
1. 1. start an “empty” JBOSS
2. 2. start HornetQ and wait until it registers in HAJNDI
3. 3. deploy the MDB
Thank you in advance for any hints.
On the JBoss log, I repeatedly see the following lines:
12:14:55,244 INFO [HornetQActivation] awaiting topic/queue creation OpCoUpdatesTopic 12:14:57,570 INFO [HornetQActivation] Attempting to reconnect org.hornetq.ra.in flow.HornetQActivationSpec(ra=org.hornetq.ra.HornetQResourceAdapter@1ae4173 dest ination=OpCoUpdatesTopic destinationType=javax.jms.Topic ack=Dups-ok-acknowledge durable=false clientID=null user=null maxSession=15)
As a workaround, if I touch the MDB jar file, it subscribes correctly.
After some analysis in the code, I realized that the following exception is thrown and internally managed:
HornetQException[errorCode=100 message=Queue 02a470c3-a179-4579-b13d-2734e0243612 does not exist] at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:287) at org.hornetq.core.client.impl.ClientSessionImpl.internalCreateConsumer(ClientSessionImpl.java:1556) at org.hornetq.core.client.impl.ClientSessionImpl.createConsumer(ClientSessionImpl.java:447) at org.hornetq.core.client.impl.ClientSessionImpl.createConsumer(ClientSessionImpl.java:392) at org.hornetq.core.client.impl.DelegatingSession.createConsumer(DelegatingSession.java:201) at org.hornetq.ra.inflow.HornetQMessageHandler.setup(HornetQMessageHandler.java:165) at org.hornetq.ra.inflow.HornetQActivation.setup(HornetQActivation.java:291) at org.hornetq.ra.inflow.HornetQActivation.handleFailure(HornetQActivation.java:540) at org.hornetq.ra.inflow.HornetQActivation$SetupActivation.run(HornetQActivation.java:578) at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205)
The point in the code in which the exception is thrown is where the HornetQMessageHandler tries to create a consumer for the activaction temporary queue that it has just created (see the code snippet below)
SimpleString queueName; if (activation.isTopic()) { if (activation.getTopicTemporaryQueue() == null) { queueName = new SimpleString(UUID.randomUUID().toString()); session.createQueue(activation.getAddress(), queueName, selectorString, false); // Temp queue creation is successful activation.setTopicTemporaryQueue(queueName); } else { queueName = activation.getTopicTemporaryQueue(); } } else { queueName = activation.getAddress(); } consumer = session.createConsumer(queueName, selectorString); // Here the exception is thrown!
Below you can find the MDB I used for tests. Notice that I tried to put as activation config property destination either “/topic/OpCoUpdatesTopic” or simply “OpCoUpdatesTopic” but the result is the same.
package com.test.diagnostics; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.Message; import javax.jms.MessageListener; import org.apache.log4j.Logger; import org.jboss.ejb3.annotation.ResourceAdapter; /** * Message-Driven Bean implementation class for: TestTopicMDB * */ @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "/topic/OpCoUpdatesTopic"), @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge") }) @TransactionManagement(value = TransactionManagementType.BEAN) @ResourceAdapter("hornetq-ra.rar") public class TestTopicMDB implements MessageListener { Logger log = Logger.getLogger(TestTopicMDB.class); /** * @see MessageListener#onMessage(Message) */ public void onMessage(Message message) { log.info("Message received: "+message.toString()); } }
The hornetq-configuration.xml on the node 1
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <clustered>true</clustered> <cluster-user>myClusterAdmin</cluster-user> <cluster-password>myClusterPwd</cluster-password> <paging-directory>${data.dir:../../serverA0/test3/data}/paging</paging-directory> <bindings-directory>${data.dir:../../serverA0/test3/data}/bindings</bindings-directory> <journal-directory>${data.dir:../../serverA0/test3/data}/journal</journal-directory> <journal-min-files>10</journal-min-files> <large-messages-directory>${data.dir:../../serverA0/test3/data}/large-messages</large-messages-directory> <!--<backup-connector-ref connector-name="backup-connector"/> --> <connectors> <connector> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="172.16.8.32"/> <param key="port" value="5445"/> </connector> <!--<connector name="backup-connector"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="172.16.8.33"/> <param key="port" value="5445"/> </connector>--> </connectors> <acceptors> <acceptor> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="172.16.8.32"/> <param key="port" value="5445"/> </acceptor> </acceptors> <broadcast-groups> <broadcast-group> <local-bind-address>172.16.8.32</local-bind-address> <local-bind-port>5432</local-bind-port> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <broadcast-period>5000</broadcast-period> <connector-ref connector-name="netty-connector" /><!--backup-connector-name="backup-connector"--> </broadcast-group> </broadcast-groups> <discovery-groups> <discovery-group> <!--<local-bind-address>172.16.8.32</local-bind-address>--> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <refresh-timeout>10000</refresh-timeout> </discovery-group> </discovery-groups> <cluster-connections> <!-- <cluster-connection> <address>jms</address> <discovery-group-ref discovery-group-name="dg-group1"/> </cluster-connection> --> <cluster-connection> <address>jms</address> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>false</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="dg-group1"/> </cluster-connection> </cluster-connections> <security-settings> <security-setting match="#"> <permission roles="guest"/> <permission roles="guest"/> <permission roles="guest"/> <permission roles="guest"/> </security-setting> </security-settings> <address-settings> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <max-size-bytes>10485760</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>BLOCK</address-full-policy> </address-setting> </address-settings> </configuration>
The hornetq-configuration.xml on the node 2
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <clustered>true</clustered> <cluster-user>myClusterAdmin</cluster-user> <cluster-password>myClusterPwd</cluster-password> <paging-directory>${data.dir:../../serverB1/test3/data}/paging</paging-directory> <bindings-directory>${data.dir:../../serverB1/test3/data}/bindings</bindings-directory> <journal-directory>${data.dir:../../serverB1/test3/data}/journal</journal-directory> <journal-min-files>10</journal-min-files> <large-messages-directory>${data.dir:../../serverB1/test3/data}/large-messages</large-messages-directory> <!--<backup-connector-ref connector-name="backup-connector"/> --> <connectors> <connector> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="172.16.8.33"/> <param key="port" value="6445"/> </connector> <!--<connector> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="172.16.8.32"/> <param key="port" value="6445"/> </connector>--> </connectors> <acceptors> <acceptor> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="172.16.8.33"/> <param key="port" value="6445"/> </acceptor> </acceptors> <broadcast-groups> <broadcast-group> <local-bind-address>172.16.8.33</local-bind-address> <local-bind-port>5433</local-bind-port> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <broadcast-period>5000</broadcast-period> <connector-ref connector-name="netty-connector"/> </broadcast-group> </broadcast-groups> <discovery-groups> <discovery-group> <!--<local-bind-address>172.16.8.33</local-bind-address>--> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <refresh-timeout>10000</refresh-timeout> </discovery-group> </discovery-groups> <cluster-connections> <!-- <cluster-connection> <address>jms</address> <discovery-group-ref discovery-group-name="dg-group1"/> </cluster-connection> --> <cluster-connection> <address>jms</address> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>false</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="dg-group1"/> </cluster-connection> </cluster-connections> <security-settings> <security-setting match="#"> <permission roles="guest"/> <permission roles="guest"/> <permission roles="guest"/> <permission roles="guest"/> </security-setting> </security-settings> <address-settings> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <max-size-bytes>10485760</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>BLOCK</address-full-policy> </address-setting> </address-settings> </configuration>
The hornetq-jms.xml file (the same on both nodes)
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd"> <connection-factory name="NettyConnectionFactory"> <discovery-group-ref discovery-group-name="dg-group1"/> <connectors> <connector-ref connector-name="netty-connector" /> <!--backup-connector-name="backup-connector" --> </connectors> <entries> <entry name="/ConnectionFactory"/> <entry name="/XAConnectionFactory"/> </entries> </connection-factory> <queue name="DLQ"> <entry name="/queue/DLQ"/> </queue> <queue name="ExpiryQueue"> <entry name="/queue/ExpiryQueue"/> </queue> <topic name="OpCoUpdatesTopic"> <entry name="/topic/OpCoUpdatesTopic" /> </topic> </configuration>