3 Replies Latest reply on Sep 15, 2010 4:05 AM by dax

    Using inOut Exchange pattern with ActiveMQ

    dax

      Dear Readers,

       

      For my project I have to deal with queue in request/response mode.

      I read the documentation, and I use the InOut Exchange pattern, to obtain the following simple example :

       

      from ("timer://foo?fixedRate=true&period=600000")

      .setBody(constant("test_in"))

      .to(ExchangePattern.InOut, inputQueue"?disableReplyTo=false&""receiveTimeout=20000&" + "replyTo="+outputQueue )

      .to("log:no-file")

      .end();

      inputQueue = jms:queue:com.awl.wlsi.acs.ws.in
      outputQueue = com.awl.wlsi.acs.ws.out

      The problem now is that I'm expecting a flow like :

      • 1. Launch by the timer

      • 2. Set text in the body

      • 3. Put the message in the input queue

      • 4. Wait for the response

      At this point I put a message with the same correlationID in the output queue

      • 5. Read the response

      • 6. Write in the log file

       

      But at point 3, in place of putting the message in the input queue, the message is send to the ActiveMQ.DLQ .

       

      After 20 seconds, I have the following exception on the log file :

      16:59:28,444 | ERROR | foo              | TimerConsumer                    | rg.apache.camel.processor.Logger  248 | Error processing exchange. Exchange[Message: test_in|http://fusesource.com/forums/]. Caused by: [org.apache.camel.ExchangeTimedOutException - The OUT message was not received within: 20000 millis. Exchange[Message: test_in]|http://fusesource.com/forums/]

      org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis. Exchange[Message: test_in|http://fusesource.com/forums/]

      at org.apache.camel.component.jms.JmsProducer.processInOut(JmsProducer.java:265)

      at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:147)

      at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:97)

      at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:95)

      at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:146)

      at org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:94)

      at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:82)

      at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

      at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)

      at org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)

      at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:93)

      at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

      at org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:177)

      at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:143)

      at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:88)

      at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)

      at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:228)

      at org.apache.camel.processor.Pipeline.process(Pipeline.java:74)

      at org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:66)

      at org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)

      at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

      at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:103)

      at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:50)

      at java.util.TimerThread.mainLoop(Timer.java:512)

      at java.util.TimerThread.run(Timer.java:462)

       

      What did I make wrong ? Is it link to my ActiveMQ configuration (I do not have problem with the inOnly pattern) ?

       

      Regards,

      Julien

       

      My JMS configuration:

       

       

      <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">

      <property name="connectionFactory" ref="connectionFactory"/>

      </bean>

       

      Edited by: dax on Sep 14, 2010 3:01 PM

        • 1. Re: Using inOut Exchange pattern with ActiveMQ
          davsclaus

          Start simple and dont use fixed reply to queues.

          Then get that example working.

           

          And make sure there is a consumer on the queue which pickup the message and sends back a reply.

          • 2. Re: Using inOut Exchange pattern with ActiveMQ
            dax

            Hi Dav,

            Thanks for your reply, I started as simple as possible:

             

            Bundle 1

             

            from ("timer://foo?fixedRate=true&period=60000")

            .setBody(constant("test_in"))

            .to(ExchangePattern.InOut, inputQueue)

            .end();

             

            Bundle 2

             

            from (inputQueue)

            .to(outputQueue)

            .end()

             

            In this way I have a consumer on my input queue, but the message is send to the 'ActiveMQ.DLQ' queue again.

            Regards,

            Julien.

             

            Edit : I don't have problem if in bundle 1 I use .to(inputQueue) in place of .to(ExchangePattern.InOut, inputQueue)

            • 3. Re: Using inOut Exchange pattern with ActiveMQ
              dax

              Problem fixed for the ActiveMQ dead letter queue : I had to delete my old bundle, restart the broker & restart ServiceMix.

               

              It seems that the osgi:update did not refresh the service correctly.

               

              Thanks for you support.

              Regards,

              Julien.