In my deployment model I have one application that receives client requests and forwards them to the JMS topic. Two other applications receives messages from this topic, and if application will find out that it is able to process it, then it responds. If application will find that it is not able to process message, message is dropped.
It is done by topic, because my "dispatching" front application doesn't know anything about consumers (which one is responsible for processing this kind of messages, and even how many of processing consumers is).
The problem is following:
1. Dispatcher received request, forwards it to all consumers by topic (consumerA, consumerB).
2a) Every time message comes to consumerA, it is processed and send back.
2b) Consumer B does not have an appropriate hadnler, so it is not processing message - message is dropped.
The same process is reapeated for e.g 100 messages.
ConsumerA has already processed all messages, all of them were send back to the client.
At the same time, when all messages has been already processed, ConsumerB receives them delayed.
It causes, that ConsumerB is stucked in processing messages, and if client send message which may be processed only by consumerB, client will receive timeout (only B is able to handle it, but it is stucked...).
I am using HornetQ2.0.0.GA and Spring integration.
My consumer- side connection configuration (producer-side is the same):
<beans:bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <beans:property name="targetConnectionFactory" ref="targetConnectionFactory" /> <beans:property name="sessionCacheSize" value="10" /> <beans:property name="cacheProducers" value="false" /> </beans:bean> <beans:bean id="targetConnectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory"> <beans:constructor-arg> <beans:bean class="org.hornetq.api.core.TransportConfiguration"> <beans:constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" /> <beans:constructor-arg> <beans:map key-type="java.lang.String" value-type="java.lang.Object"> <beans:entry key="host" value="localhost" /> <beans:entry key="port" value="5445" /> </beans:map> </beans:constructor-arg> </beans:bean> </beans:constructor-arg> <beans:property name="clientID" value="AnossstherConsumer"/> <!-- different for every consumer --> </beans:bean> <beans:bean name="requestTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic"> <beans:constructor-arg index="0" value="TopicRequestDemo"/> </beans:bean> <beans:bean name="responseTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic"> <beans:constructor-arg index="0" value="TopicResponseDemo"/> </beans:bean>
HornetQ configuration in hornetq-jms.xml:
<connection-factory name="NettyConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="/ConnectionFactory"/> <entry name="/XAConnectionFactory"/> </entries> <consumer-window-size>-1</consumer-window-size> <consumer-max-rate>-1</consumer-max-rate> </connection-factory>
I use non-clustered, standalone broker.
Messages are send and received using spring integration jms:inbound-gateway / jms:outbound-gateway (standard, default configuration)
If I use message queues with consumers sharing the same configuration (each one can handle message), then everything works fine, and even default load balancing works.
Thanks for any advices!