Failed to ack
daroo Oct 1, 2009 8:52 AMI'm currently evaluating HornetQ BETA-5. It looks pretty promising. However I'm facing following problem:
I have 3 separate, stand-alone instances, installed on the same Windows XP machine (Java 1.6.0_16). I've configured one of them to forward messages to the other two using following (HTTP) bridge configuration:
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
 <clustered>true</clustered>
 <connectors>
 <connector name="netty">
 <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
 <param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/>
 <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
 </connector>
 <!-- Connector to the other node -->
 <connector name="remote-connector-pl">
 <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
 <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
 <param key="hornetq.remoting.netty.port" value="8081" type="Integer"/>
 </connector>
 <connector name="remote-connector-ch">
 <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
 <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
 <param key="hornetq.remoting.netty.port" value="8082" type="Integer"/>
 </connector>
 </connectors>
 <acceptors>
 <acceptor name="netty">
 <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
 <param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/>
 <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
 </acceptor>
 <acceptor name="netty-http-acceptor">
 <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
 <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
 <param key="hornetq.remoting.netty.port" value="8080" type="Integer"/>
 </acceptor>
 </acceptors>
 <security-enabled>false</security-enabled>
 <queues>
 <queue name="jms.queue.QueueLocalIn">
 <address>jms.queue.QueueLocalIn</address>
 </queue>
 <queue name="jms.queue.QueueLocalOut">
 <address>jms.queue.QueueLocalOut</address>
 </queue>
 <queue name="jms.queue.ForwardingQueueLocal">
 <address>jms.queue.ForwardingQueueLocal</address>
 </queue>
 </queues>
 <diverts>
 <divert name="local-divert">
 <address>jms.queue.QueueLocalIn</address>
 <forwarding-address>jms.queue.QueueLocalOut</forwarding-address>
 <filter string="country IS NULL OR country NOT IN ('PL', 'CH')"/>
 <exclusive>true</exclusive>
 </divert>
 <divert name="remote-divert">
 <address>jms.queue.QueueLocalIn</address>
 <forwarding-address>jms.queue.ForwardingQueueLocal</forwarding-address>
 <filter string="country IN ('PL', 'CH')"/>
 <exclusive>true</exclusive>
 </divert>
 </diverts>
 <bridges>
 <bridge name="my-bridge-pl">
 <queue-name>jms.queue.ForwardingQueueLocal</queue-name>
 <forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address>
 <filter string="country='PL'"/>
 <transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name>
 <reconnect-attempts>-1</reconnect-attempts>
 <connector-ref connector-name="remote-connector-pl"/>
 </bridge>
 <bridge name="my-bridge-ch">
 <queue-name>jms.queue.ForwardingQueueLocal</queue-name>
 <forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address>
 <filter string="country='CH'"/>
 <transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name>
 <reconnect-attempts>-1</reconnect-attempts>
 <connector-ref connector-name="remote-connector-ch"/>
 </bridge>
 </bridges>
 <address-settings>
 <!--default for catch all-->
 <address-setting match="#">
 <clustered>false</clustered>
 <dead-letter-address>jms.queue.DLQ</dead-letter-address>
 <expiry-address>jms.queue.ExpiryQueue</expiry-address>
 <redelivery-delay>0</redelivery-delay>
 <max-size-bytes>-1</max-size-bytes>
 <page-size-bytes>10485760</page-size-bytes>
 <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
 <message-counter-history-day-limit>10</message-counter-history-day-limit>
 </address-setting>
 </address-settings>
 <paging-directory>../data/paging</paging-directory>
 <bindings-directory>../data/bindings</bindings-directory>
 <journal-directory>../data/journal</journal-directory>
 <large-messages-directory>../data/large-messages</large-messages-directory>
</configuration>
The bridge seems to be working - I can retrieve the messages from the other two nodes, but when I shutdown the first instance (where there is this bridge configuration), using STOP.bat, I'm getting bunch of exceptions on console:
[New I/O client worker #5-1] 14:37:41,205 SEVERE [org.hornetq.core.server.cluster.impl.BridgeImpl] Failed to ack java.lang.IllegalStateException: Cannot find add info 638 at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:886) at org.hornetq.core.persistence.impl.journal.JournalStorageManager.storeAcknowledge(JournalStorageManager.java:302) at org.hornetq.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:673) at org.hornetq.core.server.cluster.impl.BridgeImpl.sendAcknowledged(BridgeImpl.java:380) at org.hornetq.core.client.impl.ClientSessionImpl.commandConfirmed(ClientSessionImpl.java:815) at org.hornetq.core.remoting.impl.ChannelImpl.clearUpTo(ChannelImpl.java:741) at org.hornetq.core.remoting.impl.ChannelImpl.handlePacket(ChannelImpl.java:569) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:397) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:349) at org.hornetq.core.client.impl.ConnectionManagerImpl$DelegatingBufferHandler.bufferReceived(ConnectionManagerImpl.java:1142) at org.hornetq.integration.transports.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:64) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:324) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:306) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:223) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.hornetq.integration.transports.netty.NettyConnector$HttpHandler.messageReceived(NettyConnector.java:500) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.unfoldAndfireMessageReceived(ReplayingDecoder.java:513) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:497) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:434) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:562) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:342) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:329) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:330) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:203) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:113) at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:53) at org.hornetq.integration.transports.netty.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:174) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Is this caused by something wrong in my configuration or a bug in HornetQ?
BTW. I also had this exception when I tried to retrieve messages (just after consumer.receive(1000))from my QueueLocalOut having session configured to Session.AUTO_ACKNOWLEDGE. When I changed it to Session.CLIENT_ACKNOWLEDGE and started ACKing manually it won't happened again.
Cheers!
 
     
    