9 Replies Latest reply on Dec 11, 2012 8:23 AM by igarashitm

    HornetQ Mixin and Dead Letter Address

    luisdeltoro

      I have the following setup in switchyard 0.5.0.Final:

      A composite component that exposes a service through a java interface which contains 3 operations.

      This service is promoted and bound to a JMS queue, which will be the input point for this service.

      Messages sent to this queue contain a header called "operationName" set to the name of the method of the interface that should be invoked.

       

      I am trying to write a test that proves that if a message with a non-existing operationName is put into this queue, switchyard will throw an exception and the server HornetQ will save the message into a configured "Dead Letter Queue".

       

      I've already manually test this behaviour with the JBoss AS that comes bundled with Switchyard 0.5, but I would like to write a Junit test for this. I'm unfortunately not able to reproduce this behaviour making use of the HornetQ mixin.

      I'm trying to test this sending a message to the input queue and subscribing to the "Dead Letter Queue". My consumer correctly subscribes to the "Dead Letter Queue" but does not receive the offending message.

       

      Here is the configuration of the HornetQ Server:

       

      hornetq-jms.xml

       

      <configuration xmlns="urn:hornetq">

       

           <connection-factory name="ConnectionFactory">

             <connectors>

              <connector-ref connector-name="invm-connector"/>

            </connectors>

            <entries>

               <entry name="ConnectionFactory"/>

            </entries>

         </connection-factory>

       

         <queue name="InputQueue">

             <entry name="InputQueue"/>

         </queue>

       

            <queue name="DeadLetterQueue">

              <entry name="DeadLetterQueue"/>

          </queue>

       

      </configuration>

       

      hornetq-configuration.xml

       

      <configuration xmlns="urn:hornetq">

          <paging-directory>target/data/paging</paging-directory>

          <bindings-directory>target/data/bindings</bindings-directory>

          <persistence-enabled>false</persistence-enabled>

          <journal-directory>target/data/journal</journal-directory>

          <journal-min-files>10</journal-min-files>

          <large-messages-directory>target/data/large-messages</large-messages-directory>

          <security-enabled>false</security-enabled>

       

           <connectors>

              <connector name="invm-connector">

                  <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>

              </connector>

          </connectors>

       

          <acceptors>

              <acceptor name="invm-acceptor">

                  <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

              </acceptor>

          </acceptors>

       

          <address-settings>

              <address-setting match="jms.queue.InputQueue">

                  <dead-letter-address>jms.queue.DeadLetterQueue</dead-letter-address>

                  <max-delivery-attempts>1</max-delivery-attempts>

              </address-setting>

          </address-settings>

      </configuration>

       

      In my test I make use of the HornetQ Mixin, the CDI Mixin and the Transaction Mixin.

       

      Here is the stacktrace I receive from switchyard during my test:

       

      17:59:04,951 ERROR [org.apache.camel.processor.DefaultErrorHandler] Failed delivery for (MessageId: ID:16866c83-358f-11e2-b862-50e54950011a on ExchangeId: ID-luis-desktop-42847-1353689942544-1-1). Exhausted after delivery attempt: 1 caught: org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:my.package:0.1.0}InputService

      org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:urn:my.package:0.1.0}InputService

          at org.switchyard.internal.DomainImpl.createExchange(DomainImpl.java:239)

          at org.switchyard.internal.ServiceReferenceImpl.createExchange(ServiceReferenceImpl.java:102)

          at org.switchyard.component.camel.SwitchYardProducer.createSwitchyardExchange(SwitchYardProducer.java:102)

          at org.switchyard.component.camel.SwitchYardProducer.process(SwitchYardProducer.java:81)

          at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)

          at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:285)

          at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:218)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:96)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:109)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:218)

          at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:178)

          at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)

          at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)

          at org.apache.camel.spring.spi.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:171)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:131)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:100)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:109)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

          at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

          at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:50)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)

          at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)

          at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)

          at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)

          at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

          at java.lang.Thread.run(Thread.java:662)

      17:59:04,961 WARN  [org.apache.camel.spring.spi.TransactionErrorHandler] Transaction rollback (0x2f1261b1) redelivered(false) for (MessageId: ID:16866c83-358f-11e2-b862-50e54950011a on ExchangeId: ID-luis-desktop-42847-1353689942544-1-1) caught: org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:my.package:0.1.0}InputService

      17:59:04,962 WARN  [org.apache.camel.component.jms.EndpointMessageListener] Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:my.package:0.1.0}InputService]

      org.apache.camel.RuntimeCamelException: org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:urn:my.package:0.1.0}InputService

          at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1221)

          at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:185)

          at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)

          at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)

          at org.apache.camel.spring.spi.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:171)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:131)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:100)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:109)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

          at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

          at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:50)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)

          at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)

          at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)

          at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)

          at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)

          at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)

          at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

          at java.lang.Thread.run(Thread.java:662)

      Caused by: org.switchyard.exception.SwitchYardException: Operation NON_EXISTING_OPERATION is not included in interface for reference: {urn:urn:my.package:0.1.0}InputService

          at org.switchyard.internal.DomainImpl.createExchange(DomainImpl.java:239)

          at org.switchyard.internal.ServiceReferenceImpl.createExchange(ServiceReferenceImpl.java:102)

          at org.switchyard.component.camel.SwitchYardProducer.createSwitchyardExchange(SwitchYardProducer.java:102)

          at org.switchyard.component.camel.SwitchYardProducer.process(SwitchYardProducer.java:81)

          at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)

          at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:285)

          at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:218)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:96)

          at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:109)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

          at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

          at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

          at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:56)

          at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)

          at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)

          at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:218)

          at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:178)

          ... 33 more

        • 1. Re: HornetQ Mixin and Dead Letter Address
          kcbabo

          Nothing leaps out at me from the config you posted.  Can you attach your test class which uses the MixIn?  If you change the hornetq-configuration.xml to bump the max-delivery-attempts > 1, do you see that reflected in the execution of the test?  It would be good to confirm that the intended configuration file is being read.

          • 2. Re: HornetQ Mixin and Dead Letter Address
            luisdeltoro

            The test I'm trying to get to pass is the following:

             

            @SwitchYardTestCaseConfig(

                    config = SwitchYardTestCaseConfig.SWITCHYARD_XML,

                    mixins = {

                            CDIMixIn.class,

                            HornetQMixIn.class,

                            TransactionMixIn.class

                    }

            )

            @RunWith(SwitchYardRunner.class)

            public class UnsupportedOperationIT {

             

                private static final String INBOUND_QUEUE = "InputQueue";

                private static final String DEAD_LETTER_QUEUE = "DeadLetterQueue";

                private static final String NON_EXISTING_OPERATION = "NON_EXISTING_OPERATION";

                private static final String CONNECTION_FACTORY = "ConnectionFactory";

                private static final int WAIT_TIMEOUT = 5000;

             

                private SwitchYardTestKit testKit;

             

                @Test

                public void testInputBindingWithNonExistingOperation() throws Exception {

                    final Host payload = ModelCreationUtil.createSingleHostNoTransaction();

                    sendToQueue(payload, INBOUND_QUEUE, NON_EXISTING_OPERATION);

                    assertMessageInQueue(DEAD_LETTER_QUEUE);

                }

             

                private void sendToQueue(final Serializable payload, final String queueName, String operation) throws Exception {

                    MessageProducer producer = null;

                    MessagingInfrastructureHelper messagingInfrastructureHelper = null;

                    try {

                        messagingInfrastructureHelper = new MessagingInfrastructureHelper(queueName).invoke();

                        final Session session = messagingInfrastructureHelper.getSession();

                        final Queue queue = messagingInfrastructureHelper.getQueue();

                        producer = session.createProducer(queue);

                        final ObjectMessage objectMessage = session.createObjectMessage(payload);

                        objectMessage.setStringProperty(Header.Operation.HEADER_NAME, operation);

                        producer.send(objectMessage);

                    } finally {

                        if (producer != null) {

                            producer.close();

                        }

                        if (messagingInfrastructureHelper != null) {

                            messagingInfrastructureHelper.close();

                        }

                    }

                }

             

                private void assertMessageInQueue(final String queueName) throws Exception {

                    MessageConsumer consumer = null;

                    MessagingInfrastructureHelper messagingInfrastructureHelper = null;

                    try {

                        messagingInfrastructureHelper = new MessagingInfrastructureHelper(queueName).invoke();

                        final Session session = messagingInfrastructureHelper.getSession();

                        final Queue queue = messagingInfrastructureHelper.getQueue();

                        consumer = session.createConsumer(queue);

                        final Message message = consumer.receive(WAIT_TIMEOUT);

                        assertNotNull(message);

                    } finally {

                        if (consumer != null) {

                            consumer.close();

                        }

                        if (messagingInfrastructureHelper != null) {

                            messagingInfrastructureHelper.close();

                        }

                    }

                }

             

                private class MessagingInfrastructureHelper {

                    private String queueName;

                    private Connection connection;

                    private Session session;

                    private InitialContext initialContext;

                    private Queue queue;

             

                    public MessagingInfrastructureHelper(String queueName) {

                        this.queueName = queueName;

                    }

             

                    private Session getSession() {

                        return session;

                    }

             

                    private Queue getQueue() {

                        return queue;

                    }

             

                    private MessagingInfrastructureHelper invoke() throws NamingException, JMSException {

                        initialContext = new InitialContext();

                        queue = (Queue) initialContext.lookup(queueName);

                        final ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup(CONNECTION_FACTORY);

                        connection = connectionFactory.createConnection();

                        connection.start();

                        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                        return this;

                    }

             

                    private void close() throws JMSException, NamingException {

                        if (this.session != null) {

                            this.session.close();

                        }

                        if (this.connection != null) {

                            this.connection.close();

                        }

                        if (this.initialContext != null) {

                            this.initialContext.close();

                        }

                    }

                }

            }

             

            I've tried changing the max-delivery-attempts of the configuration file to something bigger than 1 and I did not notice any difference in the output of the test. The configuration file is definitely being read though, because deleting a tag in order the make the XML invalid causes the test to throw a org.xml.sax.SAXParseException.

            • 3. Re: HornetQ Mixin and Dead Letter Address
              kcbabo

              Hmm ... any chance you can share your application or a hacked up version of one our quickstarts (camel-jms-binding) to reproduce this? 

              • 4. Re: HornetQ Mixin and Dead Letter Address
                luisdeltoro

                Sure! I've already prepared a modified version of camel-jms-binding. Please find it attached!

                • 5. Re: HornetQ Mixin and Dead Letter Address
                  kcbabo

                  I am able to reproduce this issue.  Unfortunately, I have not found a root cause.  I have attached with jconsole in the middle of the test to view message counts for the DLQ and there's definitely no message there. :-)  The DLQ is set correctly for the GreetingServiceQueue when viewed through JMX.  Since this works in a deployed AS7 instance, it must have something to do with the hornetq configuration, although I'm not sure what it could be.  I think it should be possible to replicate this scenario with no SY involved to verify it happens with just HornetQ and a Java test client.  That's my next step in debugging, but I won't be able to pick that up until later in the week.  Just an FYI.

                  • 6. Re: HornetQ Mixin and Dead Letter Address
                    luisdeltoro

                    Ok. Thanks for the info and for your efforts!

                    • 7. Re: HornetQ Mixin and Dead Letter Address
                      igarashitm

                      I have noticed it succeeds if I change the camel:transacted attribute to "true". It suggests camel-jms binding might acknowledge to receive a message before the service fails if transacted=false.

                       

                      I also noticed that it fails to receive a message from DLQ even on AS7 if transacted=false.

                       

                      --- camel-jms-binding-dlq.orig/src/test/java/org/switchyard/quickstarts/camel/jms/binding/HornetQClient.java
                      2012-10-15 06:32:41.000000000 -0400
                      +++ camel-jms-binding-dlq/src/test/java/org/switchyard/quickstarts/camel/jms/binding/HornetQClient.java
                      2012-12-05 11:59:06.735260615 -0500
                      
                      @@ -21,10 +21,11 @@
                       package org.switchyard.quickstarts.camel.jms.binding;
                      
                       import javax.jms.Message;
                      +import javax.jms.MessageConsumer;
                       import javax.jms.MessageProducer;
                       import javax.jms.Session;
                      
                      import org.switchyard.test.mixins.HornetQMixIn;
                      
                       /**
                        * HornetQ client that uses HornetQ API to connect to a remote server and
                      @@ -39,6 +40,7 @@
                            * The queue to send to.
                            */
                           private static final String QUEUE_NAME = "GreetingServiceQueue";
                      +    private static final String DLQ_NAME = "DeadLetterQueue";
                      
                           /**
                            * The name of the file containing the message content.
                      @@ -72,6 +74,11 @@
                                   Message message = hqMixIn.createJMSMessageFromResource(MESSAGE_PAYLOAD);
                                   producer.send(message);
                                   System.out.println("Message sent. Please see server console output");
                      +
                      +            session = hqMixIn.createJMSSession();
                      +            MessageConsumer consumer = session.createConsumer(HornetQMixIn.getJMSQueue(DLQ_NAME));
                      +            System.out.println("A message in DLQ: " + hqMixIn.readStringFromJMSMessage(consumer.receive(1000)));
                      +            
                               } finally {
                                   hqMixIn.uninitialize();
                               }
                      
                      
                      • 8. Re: HornetQ Mixin and Dead Letter Address
                        luisdeltoro

                        Hi Tomohisa,

                         

                        thanks for finding out this.

                         

                        By your explanation I understand that the behaviour I'm observing is the correct one when camel:transacted attribute is set to "false", right?

                        • 9. Re: HornetQ Mixin and Dead Letter Address
                          igarashitm

                          Hi Luis,

                           

                          Unfortunately, I and our Camel expert man are not 100% sure that is there any option to cancel message consuming on error case with transacted=false. Please use transacted=true for now.