Failed to ack
daroo Oct 1, 2009 8:52 AMI'm currently evaluating HornetQ BETA-5. It looks pretty promising. However I'm facing following problem:
I have 3 separate, stand-alone instances, installed on the same Windows XP machine (Java 1.6.0_16). I've configured one of them to forward messages to the other two using following (HTTP) bridge configuration:
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/>
<param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
</connector>
<!-- Connector to the other node -->
<connector name="remote-connector-pl">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
<param key="hornetq.remoting.netty.port" value="8081" type="Integer"/>
</connector>
<connector name="remote-connector-ch">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
<param key="hornetq.remoting.netty.port" value="8082" type="Integer"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/>
<param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
</acceptor>
<acceptor name="netty-http-acceptor">
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/>
<param key="hornetq.remoting.netty.port" value="8080" type="Integer"/>
</acceptor>
</acceptors>
<security-enabled>false</security-enabled>
<queues>
<queue name="jms.queue.QueueLocalIn">
<address>jms.queue.QueueLocalIn</address>
</queue>
<queue name="jms.queue.QueueLocalOut">
<address>jms.queue.QueueLocalOut</address>
</queue>
<queue name="jms.queue.ForwardingQueueLocal">
<address>jms.queue.ForwardingQueueLocal</address>
</queue>
</queues>
<diverts>
<divert name="local-divert">
<address>jms.queue.QueueLocalIn</address>
<forwarding-address>jms.queue.QueueLocalOut</forwarding-address>
<filter string="country IS NULL OR country NOT IN ('PL', 'CH')"/>
<exclusive>true</exclusive>
</divert>
<divert name="remote-divert">
<address>jms.queue.QueueLocalIn</address>
<forwarding-address>jms.queue.ForwardingQueueLocal</forwarding-address>
<filter string="country IN ('PL', 'CH')"/>
<exclusive>true</exclusive>
</divert>
</diverts>
<bridges>
<bridge name="my-bridge-pl">
<queue-name>jms.queue.ForwardingQueueLocal</queue-name>
<forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address>
<filter string="country='PL'"/>
<transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name>
<reconnect-attempts>-1</reconnect-attempts>
<connector-ref connector-name="remote-connector-pl"/>
</bridge>
<bridge name="my-bridge-ch">
<queue-name>jms.queue.ForwardingQueueLocal</queue-name>
<forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address>
<filter string="country='CH'"/>
<transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name>
<reconnect-attempts>-1</reconnect-attempts>
<connector-ref connector-name="remote-connector-ch"/>
</bridge>
</bridges>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<clustered>false</clustered>
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
<paging-directory>../data/paging</paging-directory>
<bindings-directory>../data/bindings</bindings-directory>
<journal-directory>../data/journal</journal-directory>
<large-messages-directory>../data/large-messages</large-messages-directory>
</configuration>
The bridge seems to be working - I can retrieve the messages from the other two nodes, but when I shutdown the first instance (where there is this bridge configuration), using STOP.bat, I'm getting bunch of exceptions on console:
[New I/O client worker #5-1] 14:37:41,205 SEVERE [org.hornetq.core.server.cluster.impl.BridgeImpl] Failed to ack java.lang.IllegalStateException: Cannot find add info 638 at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:886) at org.hornetq.core.persistence.impl.journal.JournalStorageManager.storeAcknowledge(JournalStorageManager.java:302) at org.hornetq.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:673) at org.hornetq.core.server.cluster.impl.BridgeImpl.sendAcknowledged(BridgeImpl.java:380) at org.hornetq.core.client.impl.ClientSessionImpl.commandConfirmed(ClientSessionImpl.java:815) at org.hornetq.core.remoting.impl.ChannelImpl.clearUpTo(ChannelImpl.java:741) at org.hornetq.core.remoting.impl.ChannelImpl.handlePacket(ChannelImpl.java:569) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:397) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:349) at org.hornetq.core.client.impl.ConnectionManagerImpl$DelegatingBufferHandler.bufferReceived(ConnectionManagerImpl.java:1142) at org.hornetq.integration.transports.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:64) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:324) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:306) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:223) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.hornetq.integration.transports.netty.NettyConnector$HttpHandler.messageReceived(NettyConnector.java:500) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.unfoldAndfireMessageReceived(ReplayingDecoder.java:513) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:497) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:434) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:562) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:342) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:329) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:330) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:203) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:113) at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:53) at org.hornetq.integration.transports.netty.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:174) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Is this caused by something wrong in my configuration or a bug in HornetQ?
BTW. I also had this exception when I tried to retrieve messages (just after consumer.receive(1000))from my QueueLocalOut having session configured to Session.AUTO_ACKNOWLEDGE. When I changed it to Session.CLIENT_ACKNOWLEDGE and started ACKing manually it won't happened again.
Cheers!