Failed to ack
daroo Oct 1, 2009 8:52 AMI'm currently evaluating HornetQ BETA-5. It looks pretty promising. However I'm facing following problem:
I have 3 separate, stand-alone instances, installed on the same Windows XP machine (Java 1.6.0_16). I've configured one of them to forward messages to the other two using following (HTTP) bridge configuration:
<?xml version="1.0" encoding="UTF-8"?> <configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <clustered>true</clustered> <connectors> <connector name="netty"> <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class> <param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/> <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/> </connector> <!-- Connector to the other node --> <connector name="remote-connector-pl"> <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class> <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/> <param key="hornetq.remoting.netty.port" value="8081" type="Integer"/> </connector> <connector name="remote-connector-ch"> <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class> <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/> <param key="hornetq.remoting.netty.port" value="8082" type="Integer"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class> <param key="hornetq.remoting.netty.host" value="${hornetq.remoting.netty.host:localhost}" type="String"/> <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/> </acceptor> <acceptor name="netty-http-acceptor"> <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class> <param key="hornetq.remoting.netty.httpenabled" value="true" type="Boolean"/> <param key="hornetq.remoting.netty.port" value="8080" type="Integer"/> </acceptor> </acceptors> <security-enabled>false</security-enabled> <queues> <queue name="jms.queue.QueueLocalIn"> <address>jms.queue.QueueLocalIn</address> </queue> <queue name="jms.queue.QueueLocalOut"> <address>jms.queue.QueueLocalOut</address> </queue> <queue name="jms.queue.ForwardingQueueLocal"> <address>jms.queue.ForwardingQueueLocal</address> </queue> </queues> <diverts> <divert name="local-divert"> <address>jms.queue.QueueLocalIn</address> <forwarding-address>jms.queue.QueueLocalOut</forwarding-address> <filter string="country IS NULL OR country NOT IN ('PL', 'CH')"/> <exclusive>true</exclusive> </divert> <divert name="remote-divert"> <address>jms.queue.QueueLocalIn</address> <forwarding-address>jms.queue.ForwardingQueueLocal</forwarding-address> <filter string="country IN ('PL', 'CH')"/> <exclusive>true</exclusive> </divert> </diverts> <bridges> <bridge name="my-bridge-pl"> <queue-name>jms.queue.ForwardingQueueLocal</queue-name> <forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address> <filter string="country='PL'"/> <transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name> <reconnect-attempts>-1</reconnect-attempts> <connector-ref connector-name="remote-connector-pl"/> </bridge> <bridge name="my-bridge-ch"> <queue-name>jms.queue.ForwardingQueueLocal</queue-name> <forwarding-address>jms.queue.ForwardingQueueRemote</forwarding-address> <filter string="country='CH'"/> <transformer-class-name>zoo.daroo.hornetq.example.dee.DeETransformer</transformer-class-name> <reconnect-attempts>-1</reconnect-attempts> <connector-ref connector-name="remote-connector-ch"/> </bridge> </bridges> <address-settings> <!--default for catch all--> <address-setting match="#"> <clustered>false</clustered> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <max-size-bytes>-1</max-size-bytes> <page-size-bytes>10485760</page-size-bytes> <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> </address-settings> <paging-directory>../data/paging</paging-directory> <bindings-directory>../data/bindings</bindings-directory> <journal-directory>../data/journal</journal-directory> <large-messages-directory>../data/large-messages</large-messages-directory> </configuration>
The bridge seems to be working - I can retrieve the messages from the other two nodes, but when I shutdown the first instance (where there is this bridge configuration), using STOP.bat, I'm getting bunch of exceptions on console:
[New I/O client worker #5-1] 14:37:41,205 SEVERE [org.hornetq.core.server.cluster.impl.BridgeImpl] Failed to ack java.lang.IllegalStateException: Cannot find add info 638 at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:886) at org.hornetq.core.persistence.impl.journal.JournalStorageManager.storeAcknowledge(JournalStorageManager.java:302) at org.hornetq.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:673) at org.hornetq.core.server.cluster.impl.BridgeImpl.sendAcknowledged(BridgeImpl.java:380) at org.hornetq.core.client.impl.ClientSessionImpl.commandConfirmed(ClientSessionImpl.java:815) at org.hornetq.core.remoting.impl.ChannelImpl.clearUpTo(ChannelImpl.java:741) at org.hornetq.core.remoting.impl.ChannelImpl.handlePacket(ChannelImpl.java:569) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:397) at org.hornetq.core.remoting.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:349) at org.hornetq.core.client.impl.ConnectionManagerImpl$DelegatingBufferHandler.bufferReceived(ConnectionManagerImpl.java:1142) at org.hornetq.integration.transports.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:64) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:324) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:306) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:223) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.hornetq.integration.transports.netty.NettyConnector$HttpHandler.messageReceived(NettyConnector.java:500) at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:105) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:803) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:385) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.unfoldAndfireMessageReceived(ReplayingDecoder.java:513) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:497) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:434) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:87) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:567) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:562) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:342) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:329) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:330) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:203) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:113) at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:53) at org.hornetq.integration.transports.netty.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:174) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Is this caused by something wrong in my configuration or a bug in HornetQ?
BTW. I also had this exception when I tried to retrieve messages (just after consumer.receive(1000))from my QueueLocalOut having session configured to Session.AUTO_ACKNOWLEDGE. When I changed it to Session.CLIENT_ACKNOWLEDGE and started ACKing manually it won't happened again.
Cheers!