How to route message from jms:consumer to camel endpoint?
sdv0967 Mar 28, 2011 9:33 AMHi,
I use Fuse ESB 4.3.1, standard install. I need to process messages in the following way: JMS Consumer -> NMR -> Camel EP -> Custom Bean. The scenario looks simple, but I'm fighting with it for the last two day with no luck. The JMS consumer receives messages succesfully and looks like it routes them to NMR. But nothing received from NMR then. The error log is:
16:46:47,889 | DEBUG | enerContainer-17 | JmsTransactionManager | 74 - org.springframework.transaction - 3.0.5.RELEASE | Initiating transaction rollback 16:46:47,889 | DEBUG | enerContainer-17 | JmsTransactionManager | 74 - org.springframework.transaction - 3.0.5.RELEASE | Rolling back JMS transaction on Session [com.tibco.tibjms.TibjmsSession@e23a25] 16:46:48,170 | WARN | enerContainer-17 | DefaultMessageListenerContainer | 95 - org.springframework.jms - 3.0.5.RELEASE | Setup of JMS message listener invoker failed for destination 'tpm.outbox.TradeNotification' - trying to recover. Cause: Error sending JBI exchange javax.jms.JMSException: Error sending JBI exchange at org.apache.servicemix.jms.endpoints.AbstractConsumerEndpoint.onMessage(AbstractConsumerEndpoint.java:580)[159:servicemix-jms:2011.01.0.fuse-00-00] at org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint$1.onMessage(JmsConsumerEndpoint.java:477)[159:servicemix-jms:2011.01.0.fuse-00-00] at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:535)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:495)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)[95:org.springframework.jms:3.0.5.RELEASE] at java.lang.Thread.run(Thread.java:619)[:1.6.0_20] Caused by: org.osgi.service.blueprint.container.ServiceUnavailableException: Service is unavailable at org.apache.aries.blueprint.container.ReferenceListRecipe$ServiceDispatcher.call(ReferenceListRecipe.java:201)[7:org.apache.aries.blueprint:0.2.0.incubating] at org.apache.aries.blueprint.container.AbstractServiceReferenceRecipe$CgLibProxyFactory$1.loadObject(AbstractServiceReferenceRecipe.java:652)[7:org.apache.aries.blueprint:0.2.0.incubating] at org.apache.servicemix.jbi.cluster.engine.ClusterRegistration$$EnhancerByCGLIB$$f26255e4.match(<generated>)[bundlefile:] at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.exchangeSent(ClusterEngine.java:410)[97:org.apache.servicemix.jbi.cluster.engine:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.dispatch(ChannelImpl.java:282)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.sendSync(ChannelImpl.java:141)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.sendSync(ChannelImpl.java:127)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl.sendSync(DeliveryChannelImpl.java:187)[91:org.apache.servicemix.jbi.runtime:1.4.0.fuse-00-00] at org.apache.servicemix.common.EndpointDeliveryChannel.sendSync(EndpointDeliveryChannel.java:120)[90:servicemix-common:2011.01.0.fuse-00-00] at org.apache.servicemix.common.endpoints.SimpleEndpoint.sendSync(SimpleEndpoint.java:74)[90:servicemix-common:2011.01.0.fuse-00-00] at org.apache.servicemix.jms.endpoints.AbstractConsumerEndpoint.onMessage(AbstractConsumerEndpoint.java:553)[159:servicemix-jms:2011.01.0.fuse-00-00] ... 10 more
when I set DEBUG log level I also see the following:
14:32:14,364 | DEBUG | tenerContainer-1 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@9f8d32 dispatching exchange: [ id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 mep: InOnly status: Active role: Consumer target: PropertyMatchingReference[{SERVICE_NAME={http://com.db.dbclearstore/2.0}tpmProcessingService}] properties: [ org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950 javax.jbi.messaging.sendSync = true javax.jbi.InterfaceName = <null> javax.jbi.transaction.jta = <null> org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService ] ] 14:32:14,364 | DEBUG | tenerContainer-1 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@dfa877 delivering exchange: [ id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 mep: InOnly status: Active role: Consumer target: PropertyMatchingReference[{NAME=root}] properties: [ org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950 javax.jbi.messaging.sendSync = true javax.jbi.InterfaceName = <null> javax.jbi.transaction.jta = <null> org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService ] ] 14:32:14,364 | DEBUG | t.root-thread-15 | ActiveMQJmsRequestorPool | 95 - org.springframework.jms - 3.0.5.RELEASE | Choosing a requestor from pool 14:32:14,380 | DEBUG | t.root-thread-15 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@dfa877 dispatching exchange: [ id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 mep: InOnly status: Error role: Provider target: PropertyMatchingReference[{NAME=root}] properties: [ org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950 javax.jbi.messaging.sendSync = true javax.jbi.InterfaceName = <null> javax.jbi.transaction.jta = <null> org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491 javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService ] error: [ java.lang.RuntimeException at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:111) at org.apache.activemq.command.ActiveMQObjectMessage.setObject(ActiveMQObjectMessage.java:162) at org.apache.activemq.ActiveMQSession.createObjectMessage(ActiveMQSession.java:380) at org.apache.activemq.pool.PooledSession.createObjectMessage(PooledSession.java:153) at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.processExchange(ClusterEngine.java:739) at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.process(ClusterEngine.java:570) at org.apache.servicemix.nmr.core.InternalEndpointWrapper.process(InternalEndpointWrapper.java:86) at org.apache.servicemix.nmr.core.ChannelImpl.process(ChannelImpl.java:255) at org.apache.servicemix.nmr.core.ChannelImpl$1.run(ChannelImpl.java:215) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) Caused by: java.io.UTFDataFormatException at java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2125) at java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:1968) at java.io.ObjectOutputStream.writeUTF(ObjectOutputStream.java:841) at org.apache.servicemix.nmr.core.util.StringSource.writeExternal(StringSource.java:99) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1421) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1390) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:416) at org.apache.servicemix.nmr.core.MessageImpl.writeObject(MessageImpl.java:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326) at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:105) ... 11 more ] ]
The beans.xml for JMS Consumer bundle is:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:camel="http://camel.apache.org/schema/spring" xmlns:jms="http://servicemix.apache.org/jms/1.0" xmlns:dbcs="http://com.db.dbclearstore/2.0" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://servicemix.apache.org/jms/1.0 http://servicemix.apache.org/jms/1.0/servicemix-jms.xsd"> <import resource="classpath:org/apache/servicemix/camel/nmr/camel-nmr.xml"></import> <context:property-placeholder location = "classpath:tpm-feed.properties" ignore-unresolvable = "true"></context:property-placeholder> <!-- Tibco EMS settings --> <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <map> <entry key = "java.naming.provider.url" value = "${tpm.jndi.url}"></entry> <entry key = "java.naming.factory.initial" value = "${tpm.jndi.factory.classname}"></entry> <entry key = "java.naming.security.principal" value = "${tpm.jndi.username}"></entry> <entry key = "java.naming.security.credentials" value = "${tpm.jndi.password}"></entry> </map> </property> </bean> <bean id="jndiConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="${tpm.jndi.factory}"></property> <property name="jndiTemplate" ref="jndiTemplate"></property> <property name="lookupOnStartup" value="true"></property> <property name="expectedType" value="com.tibco.tibjms.naming.TibjmsFederatedTopicConnectionFactory"></property> <property name="proxyInterface" value="javax.jms.ConnectionFactory"></property> </bean> <bean id="tpmConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"> <property name="targetConnectionFactory" ref="jndiConnectionFactory"></property> <property name="username" value="${tpm.jms.username}"></property> <property name="password" value="${tpm.jms.password}"></property> </bean> <bean id="tpmDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver"> <property name="jndiTemplate" ref="jndiTemplate"></property> <property name="cache" value="true"></property> </bean> <jms:consumer id="tpmConsumer" service="dbcs:tpmService" endpoint="tpmEndpoint" connectionFactory="#tpmConnectionFactory" destinationName="${tpm.jndi.destination}" destinationResolver="#tpmDestinationResolver" targetService="dbcs:tpmProcessingService" targetEndpoint="tpmProcessingEndpoint" pubSubDomain="true" subscriptionDurable="false" durableSubscriptionName="dbcs" clientId="dbcs" transacted="jms" cacheLevel="3" recoveryInterval="30000" exceptionListener="#tpmErrorListener"> </jms:consumer> <bean id="tpmErrorListener" class="com.db.dbclearstore.tpm.TpmErrorListener"></bean> <bean class="org.apache.servicemix.common.osgi.EndpointExporter"></bean> <bean class="org.apache.servicemix.jbi.cluster.engine.OsgiSimpleClusterRegistration"> <property name="endpoint" ref="tpmConsumer"></property> </bean> </beans>
Beans.xml for Camel Route bundle is:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:camel="http://camel.apache.org/schema/spring" xmlns:bean="http://servicemix.apache.org/bean/1.0" xmlns:dbcs="http://com.db.dbclearstore/2.0" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://servicemix.apache.org/bean/1.0 http://servicemix.apache.org/bean/1.0/servicemix-bean.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <import resource="classpath:org/apache/servicemix/camel/nmr/camel-nmr.xml"></import> <context:property-placeholder location = "classpath:tpm-feed.properties" ignore-unresolvable = "true"></context:property-placeholder> <bean id="dataSource" class="oracle.ucp.jdbc.PoolDataSourceFactory" factory-method="getPoolDataSource"> <property name="URL" value="${jdbc.url}"></property> <property name="user" value="${jdbc.username}"></property> <property name="password" value="${jdbc.password}"></property> <property name="connectionFactoryClassName" value="oracle.jdbc.pool.OracleDataSource"></property> <property name="connectionPoolName" value="TPML_POOL"></property> <property name="connectionWaitTimeout" value="5"></property> <property name="initialPoolSize" value="5"></property> <property name="minPoolSize" value="5"></property> <property name="maxPoolSize" value="20"></property> <property name="maxStatements" value="50"></property> <property name="timeoutCheckInterval" value="0"></property> <property name="validateConnectionOnBorrow" value="true"></property> <property name="SQLForValidateConnection" value="select 1 from dual"></property> </bean> <bean id="jdbcTxManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"></property> </bean> <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> </bean> <bean id = "jdbcTemplate" class = "org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"></property> </bean> <bean id = "txTemplate" class = "org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jdbcTxManager"></property> </bean> <bean id="msgDao" class="com.db.dbclearstore.tpm.camel.MessageDAO"> <property name="jdbcTemplate" ref="jdbcTemplate"></property> <property name="txTemplate" ref="txTemplate"></property> <property name="lobHandler" ref="lobHandler"></property> </bean> <!--bean:endpoint service="dbcs:tpmProcessingService" endpoint="tpmProcessingEndpoint" bean="#msgDao" /--> <camel:camelContext id="tpm-context" trace="false"> <!-- Enable JMX server --> <camel:jmxAgent id="agent" createConnector="true"></camel:jmxAgent> <camel:onException> <camel:exception>java.lang.Exception</camel:exception> <camel:handled><camel:constant>true</camel:constant></camel:handled> <camel:log message="Got error for: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log> <camel:rollback markRollbackOnly="true"></camel:rollback> </camel:onException> <camel:route id="storeRawTPML_NMR"> <camel:from uri="nmr:{http://com.db.dbclearstore/2.0}tpmProcessingService:tpmProcessingEndpoint"></camel:from> <!--transacted ref="required"/--> <camel:log message="Got message: #${header.JMSMessageID} from: ${header.JMSDestination}" loggingLevel="INFO"></camel:log> <camel:to uri="bean:msgDao?method=store"></camel:to> <camel:log message="Message stored: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log> </camel:route> <camel:route id="storeRawTPML_JBI"> <camel:from uri="jbi:endpoint:http://com.db.dbclearstore/2.0/tpmProcessingService/tpmProcessingEndpoint"></camel:from> <camel:log message="Got message 2: #${header.JMSMessageID} from: ${header.JMSDestination}" loggingLevel="INFO"></camel:log> <camel:to uri="bean:msgDao?method=store"></camel:to> <camel:log message="Message stored 2: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log> </camel:route> </camel:camelContext> </beans>
I tried both nmr and jbi endpoints, as you can see. Probably I still miss something in my spring context. Could you help, please?
Thanks, Denis.