0 Replies Latest reply on Aug 24, 2010 3:14 PM by justinkwaugh

    Missed messages on clustered queue after server reconnect

    justinkwaugh

      I have 3 servers (A,B,C) set up in a chained cluster.  Connections to each other are defined explicitly.

       

      A connects to B.

      B connects to A and C.

      C connects to B.

       

      The cluster connections on all three are defined to cluster the address jms.

       

      Server A cluster config looks like this:

      <cluster-connection name="test-cluster">

        <address>jms</address>     
        <max-hops>2</max-hops>
        <connector-ref connector-name="server-b"/>
      </cluster-connection>
      Server B cluster config looks like this:

      <cluster-connection name="test-cluster">

        <address>jms</address>     
        <max-hops>2</max-hops>
        <connector-ref connector-name="server-a"/>
        <connector-ref connector-name="server-c"/>
      </cluster-connection>
      Server C cluster config looks like this:

      <cluster-connection name="test-cluster">

        <address>jms</address>     
        <max-hops>2</max-hops>
        <connector-ref connector-name="server-b"/>
      </cluster-connection>

       

      I have no queues or topics defined on Server B.  On server A and C I have a jms queue defined:

       

      <queue name="testQueue">

          <entry name="/queue/testQueue"/>

      </queue>

       

      I start up all the servers, and connect a consumer on C and a producer on A.  The producer simply sends a message every 5 seconds incrementing an integer on the body.  I can in fact see messages being consumed on C and I print out the body value and it is incrementing just fine.  Then I kill the consumer and server C and wait a little bit.  During this time the producer on A is still sending messages.  I then start up Server C again.  Then I start up the consumer on C.  I immediately get delivered the messages sent while Server C and consumer were down, followed by an exception (pasted below), and then no more messages are ever delivered to Server C on that queue.   If I then kill the consumer and server C again, wait a bit, and start them up, I start receiving messages again, but any sent during that period of downtime are missed. 

       

      In sequence:

      1. Start server A, B, C, consumer on C, producer on A.

      2. Lets say messages 0-10 are delivered to C successfully.

      3. Kill consumer on C and server C.

      4. Wait 10 seconds

      5. Start up server C, and connect consumer again.

      6. Messages 11-13 are received followed by the exception and then nothing.

      7. Kill consumer on C and server C.

      8. Wait 10 seconds

      9. Start up server C, and connect consumer again.

      10. Message 17 and onward arrive as the producer sends them.  Messages 14-16 never arrive.

       

       

      Here is the exception:

      [Old I/O server worker (parentId: 32364979, channelId: 24191379, null => /172.26

      .162.3:5445)] 14:51:09,420 SEVERE [org.hornetq.core.server.management.impl.Manag

      ementServiceImpl]  Failed to call listener

      java.lang.IllegalStateException: Cannot find queue info for queue jms.queue.testQ

      ueue46c46fe4-aa18-11df-a3ac-005056940757

              at org.hornetq.core.postoffice.impl.PostOfficeImpl.onNotification(PostOf

      ficeImpl.java:298)

              at org.hornetq.core.server.management.impl.ManagementServiceImpl.sendNot

      ification(ManagementServiceImpl.java:677)

              at org.hornetq.core.server.impl.ServerSessionImpl.createConsumer(ServerS

      essionImpl.java:334)

              at org.hornetq.core.protocol.core.ServerSessionPacketHandler.handlePacke

      t(ServerSessionPacketHandler.java:234)

              at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelI

      mpl.java:471)

              at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferRe

      ceived(RemotingConnectionImpl.java:451)

              at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferRece

      ived(RemotingConnectionImpl.java:412)

              at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingB

      ufferHandler.bufferReceived(RemotingServiceImpl.java:459)

              at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageRec

      eived(HornetQChannelHandler.java:67)

              at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleCha

      nnelHandler.java:100)

              at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultCh

      annelPipeline.java:545)

              at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerC

      ontext.sendUpstream(DefaultChannelPipeline.java:754)

              at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:28

      7)

              at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.decode(Horn

      etQFrameDecoder2.java:169)

              at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.messageRece

      ived(HornetQFrameDecoder2.java:134)

              at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(S

      impleChannelUpstreamHandler.java:80)

              at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultCh

      annelPipeline.java:545)

              at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultCh

      annelPipeline.java:540)

              at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:27

      4)

              at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:26

      1)

              at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)

              at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnabl

      e.java:108)

              at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.j

      ava:46)

              at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run

      (VirtualExecutorService.java:181)

              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec

      utor.java:886)

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor

      .java:908)

              at java.lang.Thread.run(Thread.java:619)