12 Replies Latest reply on Apr 16, 2011 6:01 PM by piotrekde

    Message delivery delayed by broker

    piotrekde

      Hello,

       

      In my deployment model I have one application that receives client requests and forwards them to the JMS topic. Two other applications receives messages from this topic, and if application will find out that it is able to process it, then it responds. If application will find that it is not able to process message, message is dropped.

      It is done by topic, because my "dispatching" front application doesn't know anything about consumers (which one is responsible for processing this kind of messages, and even how many of processing consumers is).

       

      The problem is following:

      1. Dispatcher received request, forwards it to all consumers by topic (consumerA, consumerB).

      2a) Every time message comes to consumerA, it is processed and send back.

      2b) Consumer B does not have an appropriate hadnler, so it is not processing message - message is dropped.

       

      The same process is reapeated for e.g 100 messages.

       

      ConsumerA has already processed all messages, all of them were send back to the client.

      At the same time, when all messages has been already processed, ConsumerB receives them delayed.

       

      It causes, that ConsumerB is stucked in processing messages, and if client send message which may be processed only by consumerB, client will receive timeout (only B is able to handle it, but it is stucked...).

       

      I am using HornetQ2.0.0.GA and Spring integration.

       

      My consumer- side connection configuration (producer-side is the same):

       

      <beans:bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
          <beans:property name="targetConnectionFactory" ref="targetConnectionFactory" />
          <beans:property name="sessionCacheSize" value="10" />
          <beans:property name="cacheProducers" value="false" />
        </beans:bean>
      
      
          <beans:bean id="targetConnectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory">
          <beans:constructor-arg>
            <beans:bean class="org.hornetq.api.core.TransportConfiguration">
              <beans:constructor-arg
                value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
              <beans:constructor-arg>
                <beans:map key-type="java.lang.String" value-type="java.lang.Object">
                  <beans:entry key="host" value="localhost" />
                  <beans:entry key="port" value="5445" />
                </beans:map>
              </beans:constructor-arg>
            </beans:bean>
          </beans:constructor-arg>
          <beans:property name="clientID" value="AnossstherConsumer"/> <!-- different for every consumer -->
        </beans:bean>
      
      
        <beans:bean name="requestTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
          <beans:constructor-arg index="0" value="TopicRequestDemo"/>
        </beans:bean>
      
      
         <beans:bean name="responseTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
          <beans:constructor-arg index="0" value="TopicResponseDemo"/>
         </beans:bean>
      
      

       

       

      HornetQ configuration in hornetq-jms.xml:

       <connection-factory name="NettyConnectionFactory">
                   <connectors>
                   <connector-ref connector-name="netty"/>
                          </connectors>
                 <entries>
                     <entry name="/ConnectionFactory"/>
                     <entry name="/XAConnectionFactory"/>
                   </entries>
                 <consumer-window-size>-1</consumer-window-size>
                 <consumer-max-rate>-1</consumer-max-rate>
                </connection-factory>
      

       

      I use non-clustered, standalone broker.

      Messages are send and received using spring integration jms:inbound-gateway / jms:outbound-gateway (standard, default configuration)

       

      If I use message queues with consumers sharing the same configuration (each one can handle message), then everything works fine, and even default load balancing works.

       

      Thanks for any advices!

        • 1. Message delivery delayed by broker
          clebert.suconic

          You're setting consumerWindowSize = -1, that means that if your consumer is busy, it will be indefinitely caching messages at the client buffer.

           

          Maybe you should try consumerWindowSize = 0, or something more reasonable.

          • 2. Message delivery delayed by broker
            piotrekde

            Thanks Clebert,

             

            It's not client speed causing this delay. Actually consumers are really fast. It's HornetQ sending messages slower to the 2nd consumer (I checked it using network sniffer just to be 100% sure). Also, I tried without consumer-window-size and consumer-max-rate parameters - everything works the same.

            • 3. Message delivery delayed by broker
              clebert.suconic

              You should use HornetQ 2.2.2. we fixed several bugs along the way.

              • 4. Message delivery delayed by broker
                piotrekde

                Yep, I know, but I couldn't find other configuration of hornetq-netty-spring integration artifacts versions which would work fine with each other (it's maven based project).

                • 5. Message delivery delayed by broker
                  clebert.suconic

                  hornetq-netty-spring? what is that?

                   

                  Can you send me the URL for the repository?

                   

                   

                  2.0.0.GA is way too old. I can't do much to help you on that version.

                  • 6. Message delivery delayed by broker
                    piotrekde

                    I mean: "hornetq" and "netty" and "spring integration"

                    Anyway, if there is no possibility that hornetq broker purposely slows down message distribution (because of some configuration option/topic capacity and so one) I have to try do it again.

                    • 7. Message delivery delayed by broker
                      clebert.suconic

                      first thing you gotta do is upgrade

                      • 8. Re: Message delivery delayed by broker
                        piotrekde

                        Ok, upgrade to 2.2.2.Final was not easy but it is done.

                         

                        I tried with following configurations (maven managed):

                        hornetq-core-client: 2.2.2.Final

                        hornetq-jms-client: 2.2.2.Final

                        hornetq-transports: 2.0.0.GA (there is no 2.2.2.Final version in repo, and I assume that this one is the newest stable version)

                        netty: 3.2.4 (the newest one)

                         

                        Also, I had to change Spring configuration to the following:

                         

                         <bean id="targetConnectionFactory" class="org.hornetq.jms.client.HornetQJMSConnectionFactory">
                            <constructor-arg index="0" value="false"/> <!-- HighAvailability mode? -->
                            <constructor-arg index="1">
                              <bean class="org.hornetq.api.core.TransportConfiguration">
                                <constructor-arg
                                  value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
                                <constructor-arg>
                                  <map key-type="java.lang.String" value-type="java.lang.Object">
                                    <entry key="host" value="localhost" />
                                    <entry key="port" value="5445" />
                                  </map>
                                </constructor-arg>
                              </bean>
                            </constructor-arg>
                            <property name="clientID" value="123Consumer"/>
                          </bean>
                        
                        

                         

                        Application crashed when it was starting:

                         

                        INFO: Setting the server's publish address to be /

                        Apr 15, 2011 10:02:20 AM org.hornetq.core.logging.impl.JULLogDelegate warn

                        WARNING: Unexpected Netty Version was expecting 3.2.3.Final-r${buildNumber} using 3.1.5.GA-r1772          

                        2011-04-15 10:02:20,577 [main] INFO  org.hibernate.impl.SessionFactoryImpl - closing

                        2011-04-15 10:02:20,579 [main] ERROR org.springframework.web.context.ContextLoader - Context initialization failed

                        org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.jms.listener.DefaultMessageListenerContainer#0'; nested exception is java.lang.NoSuchMethodError: org.hornetq.spi.core.remoting.ConnectionLifeCycleListener.connectionCreated(Lorg/hornetq/spi/core/remoting/Connection;)V

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:169)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:159)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)

                                  at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:335)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)

                                  at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)

                                  at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)

                                  at org.springframework.web.context.ContextLoader.createWebApplicationContext(ContextLoader.java:276)

                                  at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:197)

                                  at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:47)

                                  at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4205)

                                  at org.apache.catalina.core.StandardContext.start(StandardContext.java:4704)

                                  at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053)

                                  at org.apache.catalina.core.StandardHost.start(StandardHost.java:840)

                                  at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053)

                                  at org.apache.catalina.core.StandardEngine.start(StandardEngine.java:463)

                                  at org.apache.catalina.core.StandardService.start(StandardService.java:525)

                                  at org.apache.catalina.core.StandardServer.start(StandardServer.java:754)

                                  at org.apache.catalina.startup.Catalina.start(Catalina.java:595)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

                                  at java.lang.reflect.Method.invoke(Method.java:597)

                                  at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:289)

                                  at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:414)

                        Caused by: java.lang.NoSuchMethodError: org.hornetq.spi.core.remoting.ConnectionLifeCycleListener.connectionCreated(Lorg/hornetq/spi/core/remoting/Connection;)V

                                  at org.hornetq.integration.transports.netty.NettyConnection.<init>(NettyConnection.java:58)

                                  at org.hornetq.integration.transports.netty.NettyConnector.createConnection(NettyConnector.java:410)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnection(ClientSessionFactoryImpl.java:979)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnectionWithRetry(ClientSessionFactoryImpl.java:865)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:208)

                                  at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:602)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnectionInternal(HornetQConnectionFactory.java:601)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnection(HornetQConnectionFactory.java:119)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnection(HornetQConnectionFactory.java:114)

                                  at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:342)

                                  at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:288)

                                  at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:225)

                                  at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:403)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.establishSharedConnection(AbstractJmsListeningContainer.java:371)

                                  at org.springframework.jms.listener.DefaultMessageListenerContainer.establishSharedConnection(DefaultMessageListenerContainer.java:749)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.doStart(AbstractJmsListeningContainer.java:278)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.start(AbstractJmsListeningContainer.java:263)

                                  at org.springframework.jms.listener.DefaultMessageListenerContainer.start(DefaultMessageListenerContainer.java:555)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:166)

                                  ... 25 more

                        Apr 15, 2011 10:02:20 AM org.apache.catalina.core.StandardContext listenerStart

                        SEVERE: Exception sending context initialized event to listener instance of class org.springframework.web.context.ContextLoaderListener

                        org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.jms.listener.DefaultMessageListenerContainer#0'; nested exception is java.lang.NoSuchMethodError: org.hornetq.spi.core.remoting.ConnectionLifeCycleListener.connectionCreated(Lorg/hornetq/spi/core/remoting/Connection;)V

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:169)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:159)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)

                                  at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:335)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)

                                  at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)

                                  at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)

                                  at org.springframework.web.context.ContextLoader.createWebApplicationContext(ContextLoader.java:276)

                                  at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:197)

                                  at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:47)

                                  at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4205)

                                  at org.apache.catalina.core.StandardContext.start(StandardContext.java:4704)

                                  at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053)

                                  at org.apache.catalina.core.StandardHost.start(StandardHost.java:840)

                                  at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053)

                                  at org.apache.catalina.core.StandardEngine.start(StandardEngine.java:463)

                                  at org.apache.catalina.core.StandardService.start(StandardService.java:525)

                                  at org.apache.catalina.core.StandardServer.start(StandardServer.java:754)

                                  at org.apache.catalina.startup.Catalina.start(Catalina.java:595)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

                                  at java.lang.reflect.Method.invoke(Method.java:597)

                                  at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:289)

                                  at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:414)

                        Caused by: java.lang.NoSuchMethodError: org.hornetq.spi.core.remoting.ConnectionLifeCycleListener.connectionCreated(Lorg/hornetq/spi/core/remoting/Connection;)V

                                  at org.hornetq.integration.transports.netty.NettyConnection.<init>(NettyConnection.java:58)

                                  at org.hornetq.integration.transports.netty.NettyConnector.createConnection(NettyConnector.java:410)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnection(ClientSessionFactoryImpl.java:979)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnectionWithRetry(ClientSessionFactoryImpl.java:865)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:208)

                                  at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:602)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnectionInternal(HornetQConnectionFactory.java:601)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnection(HornetQConnectionFactory.java:119)

                                  at org.hornetq.jms.client.HornetQConnectionFactory.createConnection(HornetQConnectionFactory.java:114)

                                  at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:342)

                                  at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:288)

                                  at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:225)

                                  at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:403)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.establishSharedConnection(AbstractJmsListeningContainer.java:371)

                                  at org.springframework.jms.listener.DefaultMessageListenerContainer.establishSharedConnection(DefaultMessageListenerContainer.java:749)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.doStart(AbstractJmsListeningContainer.java:278)

                                  at org.springframework.jms.listener.AbstractJmsListeningContainer.start(AbstractJmsListeningContainer.java:263)

                                  at org.springframework.jms.listener.DefaultMessageListenerContainer.start(DefaultMessageListenerContainer.java:555)

                                  at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:166)

                                  ... 25 more

                        Apr 15, 2011 10:02:20 AM org.apache.catalina.core.StandardContext start

                         

                         

                         

                        With the same configuration, except netty version only; now with 3.2.3.Final:

                         

                         

                        WARNING: Unexpected Netty Version was expecting 3.2.3.Final-r${buildNumber} using 3.1.5.GA-r1772

                        Apr 15, 2011 10:12:38 AM org.apache.catalina.core.StandardWrapperValve invoke

                        SEVERE: Servlet.service() for servlet requestHandler threw exception

                        java.lang.NoSuchMethodError: org.hornetq.spi.core.remoting.ConnectionLifeCycleListener.connectionCreated(Lorg/hornetq/spi/core/remoting/Connection;)V

                                  at org.hornetq.integration.transports.netty.NettyConnection.<init>(NettyConnection.java:58)

                                  at org.hornetq.integration.transports.netty.NettyConnector.createConnection(NettyConnector.java:410)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnection(ClientSessionFactoryImpl.java:979)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.getConnectionWithRetry(ClientSessionFactoryImpl.java:865)

                                  at org.hornetq.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:208)

                                  at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:602)

                         

                         

                        (...)

                         

                         

                         

                        Now hornetq-transports: 2.1.0.r9031 and netty 3.2.3.Final.

                        Server started but with warning:

                         

                         

                        WARNING: Unexpected Netty Version was expecting 3.2.3.Final-r${buildNumber} using 3.2.0.BETA1-r2215

                         

                         

                        So I tried with hornetq-transports: 2.1.0.r9031 and netty 3.2.0.BETA1 , but result was the same (unexpected netty version).

                         

                        Anyway, I have currently starting (working) server and HornetQ in 2.2.2.Final version.

                         

                        Unfortunately everything works the same - hornet slows down sending messages to one of the consumers...

                         

                        I am sure that delay comes from Hornet because as it was written previously I've put sniffer on network links:

                        tcp6       0      0 127.0.0.1:35164         127.0.0.1:5445          ESTABLISHED 6243/java     

                        tcp6       0      0 127.0.0.1:35165         127.0.0.1:5445          ESTABLISHED 5673/java  

                        tcp6       0      0 127.0.0.1:35167         127.0.0.1:5445          ESTABLISHED 5631/java

                         

                        5673 (port 35165)  is producer

                        two others are consumers.


                        When the one of the consumers has already processed all data and sent them back to the producer, another consumer is still receiving delayed data. Also I tried with consumer windows size = 0; everything works the same...

                        • 9. Re: Message delivery delayed by broker
                          piotrekde

                          Most probably I've figured out what is going one. Dispatcher (producer) sends JMS message using Spring integration gateway. This gateway is biderectional, but JMS topics/queues are unidirectional (?). So SI gateway must set up a temporary channel, over which response will arrive. Because response comes only from one consumer (2nd dropped it because it doesn't have an appropriate handler), Hornet assumes that 2nd consumer is slow and it starts sending messages to it with a greater delay.

                           

                          We have tested this configuration (deployment) previously on ActiveMQ and it hadn't slowing down messages - it seems that it is not as smart as HornetQ

                           

                          Is it any way to disable this feature (of course if it exists, because my suspicion may not be valid)?

                          • 10. Re: Message delivery delayed by broker
                            clebert.suconic
                            hornetq-transports: 2.0.0.GA (there is no 2.2.2.Final version in repo, and I assume that this one is the newest stable version)

                             

                            No, this one is gone. It's been incorporated by core.jar

                            • 11. Re: Message delivery delayed by broker
                              clebert.suconic
                              but JMS topics/queues are unidirectional

                              I'm not sure what you mean by this?

                               

                               

                              It seems you created a consumer and it's not looking at it? the consumer will receive messages as long as you have enough consumer-window-size available (buffering), and the connection is started.

                              • 12. Re: Message delivery delayed by broker
                                piotrekde
                                but JMS topics/queues are unidirectional

                                I'm not sure what you mean by this?

                                 

                                By this I mean that you can send messages only in one direction - is that right?

                                 

                                I thinks that it's a flaw in HornetQ-SpringIntegration communication - when I connected several consumers, some using SpringIntegration gateways and some using traditional J2EE approach, all "J2EE-traditional" consumers has received non-delayed messages, and only one of SI-based consumers also has received messages 'on time' (this one which has been sending message back using SI underlaying temporary channel). All other SI-based consumers received messages delayed.