How to route message from jms:consumer to camel endpoint?
sdv0967 Mar 28, 2011 9:33 AMHi,
I use Fuse ESB 4.3.1, standard install. I need to process messages in the following way: JMS Consumer -> NMR -> Camel EP -> Custom Bean. The scenario looks simple, but I'm fighting with it for the last two day with no luck. The JMS consumer receives messages succesfully and looks like it routes them to NMR. But nothing received from NMR then. The error log is:
16:46:47,889 | DEBUG | enerContainer-17 | JmsTransactionManager | 74 - org.springframework.transaction - 3.0.5.RELEASE | Initiating transaction rollback 16:46:47,889 | DEBUG | enerContainer-17 | JmsTransactionManager | 74 - org.springframework.transaction - 3.0.5.RELEASE | Rolling back JMS transaction on Session [com.tibco.tibjms.TibjmsSession@e23a25] 16:46:48,170 | WARN | enerContainer-17 | DefaultMessageListenerContainer | 95 - org.springframework.jms - 3.0.5.RELEASE | Setup of JMS message listener invoker failed for destination 'tpm.outbox.TradeNotification' - trying to recover. Cause: Error sending JBI exchange javax.jms.JMSException: Error sending JBI exchange at org.apache.servicemix.jms.endpoints.AbstractConsumerEndpoint.onMessage(AbstractConsumerEndpoint.java:580)[159:servicemix-jms:2011.01.0.fuse-00-00] at org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint$1.onMessage(JmsConsumerEndpoint.java:477)[159:servicemix-jms:2011.01.0.fuse-00-00] at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:535)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:495)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)[95:org.springframework.jms:3.0.5.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)[95:org.springframework.jms:3.0.5.RELEASE] at java.lang.Thread.run(Thread.java:619)[:1.6.0_20] Caused by: org.osgi.service.blueprint.container.ServiceUnavailableException: Service is unavailable at org.apache.aries.blueprint.container.ReferenceListRecipe$ServiceDispatcher.call(ReferenceListRecipe.java:201)[7:org.apache.aries.blueprint:0.2.0.incubating] at org.apache.aries.blueprint.container.AbstractServiceReferenceRecipe$CgLibProxyFactory$1.loadObject(AbstractServiceReferenceRecipe.java:652)[7:org.apache.aries.blueprint:0.2.0.incubating] at org.apache.servicemix.jbi.cluster.engine.ClusterRegistration$$EnhancerByCGLIB$$f26255e4.match(<generated>)[bundlefile:] at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.exchangeSent(ClusterEngine.java:410)[97:org.apache.servicemix.jbi.cluster.engine:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.dispatch(ChannelImpl.java:282)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.sendSync(ChannelImpl.java:141)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.nmr.core.ChannelImpl.sendSync(ChannelImpl.java:127)[81:org.apache.servicemix.nmr.core:1.4.0.fuse-00-00] at org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl.sendSync(DeliveryChannelImpl.java:187)[91:org.apache.servicemix.jbi.runtime:1.4.0.fuse-00-00] at org.apache.servicemix.common.EndpointDeliveryChannel.sendSync(EndpointDeliveryChannel.java:120)[90:servicemix-common:2011.01.0.fuse-00-00] at org.apache.servicemix.common.endpoints.SimpleEndpoint.sendSync(SimpleEndpoint.java:74)[90:servicemix-common:2011.01.0.fuse-00-00] at org.apache.servicemix.jms.endpoints.AbstractConsumerEndpoint.onMessage(AbstractConsumerEndpoint.java:553)[159:servicemix-jms:2011.01.0.fuse-00-00] ... 10 more
when I set DEBUG log level I also see the following:
14:32:14,364 | DEBUG | tenerContainer-1 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@9f8d32 dispatching exchange: [
id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
mep: InOnly
status: Active
role: Consumer
target: PropertyMatchingReference[{SERVICE_NAME={http://com.db.dbclearstore/2.0}tpmProcessingService}]
properties: [
org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint
javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950
javax.jbi.messaging.sendSync = true
javax.jbi.InterfaceName = <null>
javax.jbi.transaction.jta = <null>
org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService
]
]
14:32:14,364 | DEBUG | tenerContainer-1 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@dfa877 delivering exchange: [
id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
mep: InOnly
status: Active
role: Consumer
target: PropertyMatchingReference[{NAME=root}]
properties: [
org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint
javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950
javax.jbi.messaging.sendSync = true
javax.jbi.InterfaceName = <null>
javax.jbi.transaction.jta = <null>
org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService
]
]
14:32:14,364 | DEBUG | t.root-thread-15 | ActiveMQJmsRequestorPool | 95 - org.springframework.jms - 3.0.5.RELEASE | Choosing a requestor from pool
14:32:14,380 | DEBUG | t.root-thread-15 | NMR | 81 - org.apache.servicemix.nmr.core - 1.4.0.fuse-00-00 | Channel org.apache.servicemix.nmr.core.ChannelImpl@dfa877 dispatching exchange: [
id: 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
mep: InOnly
status: Error
role: Provider
target: PropertyMatchingReference[{NAME=root}]
properties: [
org.apache.servicemix.senderEndpoint = {http://com.db.dbclearstore/2.0}tpmService:tpmEndpoint
javax.jbi.messaging.MessageExchange = org.apache.servicemix.jbi.runtime.impl.InOnlyImpl@1bd9950
javax.jbi.messaging.sendSync = true
javax.jbi.InterfaceName = <null>
javax.jbi.transaction.jta = <null>
org.apache.servicemix.correlationId = 6e7db2d8-2480-4bfb-99f1-d554c7cdb491
javax.jbi.ServiceName = {http://com.db.dbclearstore/2.0}tpmProcessingService
]
error: [
java.lang.RuntimeException
at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:111)
at org.apache.activemq.command.ActiveMQObjectMessage.setObject(ActiveMQObjectMessage.java:162)
at org.apache.activemq.ActiveMQSession.createObjectMessage(ActiveMQSession.java:380)
at org.apache.activemq.pool.PooledSession.createObjectMessage(PooledSession.java:153)
at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.processExchange(ClusterEngine.java:739)
at org.apache.servicemix.jbi.cluster.engine.ClusterEngine.process(ClusterEngine.java:570)
at org.apache.servicemix.nmr.core.InternalEndpointWrapper.process(InternalEndpointWrapper.java:86)
at org.apache.servicemix.nmr.core.ChannelImpl.process(ChannelImpl.java:255)
at org.apache.servicemix.nmr.core.ChannelImpl$1.run(ChannelImpl.java:215)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.io.UTFDataFormatException
at java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:2125)
at java.io.ObjectOutputStream$BlockDataOutputStream.writeUTF(ObjectOutputStream.java:1968)
at java.io.ObjectOutputStream.writeUTF(ObjectOutputStream.java:841)
at org.apache.servicemix.nmr.core.util.StringSource.writeExternal(StringSource.java:99)
at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1421)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1390)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:416)
at org.apache.servicemix.nmr.core.MessageImpl.writeObject(MessageImpl.java:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:105)
... 11 more
]
]
The beans.xml for JMS Consumer bundle is:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jms="http://servicemix.apache.org/jms/1.0"
xmlns:dbcs="http://com.db.dbclearstore/2.0"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://servicemix.apache.org/jms/1.0 http://servicemix.apache.org/jms/1.0/servicemix-jms.xsd">
<import resource="classpath:org/apache/servicemix/camel/nmr/camel-nmr.xml"></import>
<context:property-placeholder location = "classpath:tpm-feed.properties" ignore-unresolvable = "true"></context:property-placeholder>
<!-- Tibco EMS settings -->
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<map>
<entry key = "java.naming.provider.url" value = "${tpm.jndi.url}"></entry>
<entry key = "java.naming.factory.initial" value = "${tpm.jndi.factory.classname}"></entry>
<entry key = "java.naming.security.principal" value = "${tpm.jndi.username}"></entry>
<entry key = "java.naming.security.credentials" value = "${tpm.jndi.password}"></entry>
</map>
</property>
</bean>
<bean id="jndiConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="${tpm.jndi.factory}"></property>
<property name="jndiTemplate" ref="jndiTemplate"></property>
<property name="lookupOnStartup" value="true"></property>
<property name="expectedType" value="com.tibco.tibjms.naming.TibjmsFederatedTopicConnectionFactory"></property>
<property name="proxyInterface" value="javax.jms.ConnectionFactory"></property>
</bean>
<bean id="tpmConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="jndiConnectionFactory"></property>
<property name="username" value="${tpm.jms.username}"></property>
<property name="password" value="${tpm.jms.password}"></property>
</bean>
<bean id="tpmDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"></property>
<property name="cache" value="true"></property>
</bean>
<jms:consumer id="tpmConsumer"
service="dbcs:tpmService"
endpoint="tpmEndpoint"
connectionFactory="#tpmConnectionFactory"
destinationName="${tpm.jndi.destination}"
destinationResolver="#tpmDestinationResolver"
targetService="dbcs:tpmProcessingService"
targetEndpoint="tpmProcessingEndpoint"
pubSubDomain="true"
subscriptionDurable="false"
durableSubscriptionName="dbcs"
clientId="dbcs"
transacted="jms"
cacheLevel="3"
recoveryInterval="30000"
exceptionListener="#tpmErrorListener">
</jms:consumer>
<bean id="tpmErrorListener" class="com.db.dbclearstore.tpm.TpmErrorListener"></bean>
<bean class="org.apache.servicemix.common.osgi.EndpointExporter"></bean>
<bean class="org.apache.servicemix.jbi.cluster.engine.OsgiSimpleClusterRegistration">
<property name="endpoint" ref="tpmConsumer"></property>
</bean>
</beans>
Beans.xml for Camel Route bundle is:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:bean="http://servicemix.apache.org/bean/1.0"
xmlns:dbcs="http://com.db.dbclearstore/2.0"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://servicemix.apache.org/bean/1.0 http://servicemix.apache.org/bean/1.0/servicemix-bean.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<import resource="classpath:org/apache/servicemix/camel/nmr/camel-nmr.xml"></import>
<context:property-placeholder location = "classpath:tpm-feed.properties" ignore-unresolvable = "true"></context:property-placeholder>
<bean id="dataSource"
class="oracle.ucp.jdbc.PoolDataSourceFactory"
factory-method="getPoolDataSource">
<property name="URL" value="${jdbc.url}"></property>
<property name="user" value="${jdbc.username}"></property>
<property name="password" value="${jdbc.password}"></property>
<property name="connectionFactoryClassName" value="oracle.jdbc.pool.OracleDataSource"></property>
<property name="connectionPoolName" value="TPML_POOL"></property>
<property name="connectionWaitTimeout" value="5"></property>
<property name="initialPoolSize" value="5"></property>
<property name="minPoolSize" value="5"></property>
<property name="maxPoolSize" value="20"></property>
<property name="maxStatements" value="50"></property>
<property name="timeoutCheckInterval" value="0"></property>
<property name="validateConnectionOnBorrow" value="true"></property>
<property name="SQLForValidateConnection" value="select 1 from dual"></property>
</bean>
<bean id="jdbcTxManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
</bean>
<bean id = "jdbcTemplate" class = "org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource"></property>
</bean>
<bean id = "txTemplate" class = "org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jdbcTxManager"></property>
</bean>
<bean id="msgDao" class="com.db.dbclearstore.tpm.camel.MessageDAO">
<property name="jdbcTemplate" ref="jdbcTemplate"></property>
<property name="txTemplate" ref="txTemplate"></property>
<property name="lobHandler" ref="lobHandler"></property>
</bean>
<!--bean:endpoint service="dbcs:tpmProcessingService" endpoint="tpmProcessingEndpoint" bean="#msgDao" /-->
<camel:camelContext id="tpm-context" trace="false">
<!-- Enable JMX server -->
<camel:jmxAgent id="agent" createConnector="true"></camel:jmxAgent>
<camel:onException>
<camel:exception>java.lang.Exception</camel:exception>
<camel:handled><camel:constant>true</camel:constant></camel:handled>
<camel:log message="Got error for: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log>
<camel:rollback markRollbackOnly="true"></camel:rollback>
</camel:onException>
<camel:route id="storeRawTPML_NMR">
<camel:from uri="nmr:{http://com.db.dbclearstore/2.0}tpmProcessingService:tpmProcessingEndpoint"></camel:from>
<!--transacted ref="required"/-->
<camel:log message="Got message: #${header.JMSMessageID} from: ${header.JMSDestination}" loggingLevel="INFO"></camel:log>
<camel:to uri="bean:msgDao?method=store"></camel:to>
<camel:log message="Message stored: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log>
</camel:route>
<camel:route id="storeRawTPML_JBI">
<camel:from uri="jbi:endpoint:http://com.db.dbclearstore/2.0/tpmProcessingService/tpmProcessingEndpoint"></camel:from>
<camel:log message="Got message 2: #${header.JMSMessageID} from: ${header.JMSDestination}" loggingLevel="INFO"></camel:log>
<camel:to uri="bean:msgDao?method=store"></camel:to>
<camel:log message="Message stored 2: #${header.JMSMessageID}" loggingLevel="INFO"></camel:log>
</camel:route>
</camel:camelContext>
</beans>
I tried both nmr and jbi endpoints, as you can see. Probably I still miss something in my spring context. Could you help, please?
Thanks, Denis.