This content has been marked as final.
Show 1 reply
-
1. Re: JBoss Messaging and Spring Message Listener Container
jamesbo Jun 18, 2008 6:10 AM (in response to sonicfab)I've done a proof-of-concept necessitated by http://www.jboss.com/index.html?module=bb&op=viewtopic&t=136956 using Spring's JmsMessageEndpointManager.
I started testing by bootstrapping via applicationContext.xml as documented by Spring then did the equivalent in code (because we need dynamic configuration). This has worked fine so far.
One problem I had was http://jira.jboss.com/jira/browse/JBAS-5623 so implemented following as workaround:/** * Customised partly to work around http://jira.jboss.com/jira/browse/JBAS-5623 and partly to provide support * for SESSION_TRANSACTED. By default Spring would override this to AUTO_ACKNOWLEDGE although JBoss would * ignore that so long as its 'sessionTransacted' property remained true (the default value). */ public class JBossJmsActivationSpecFactory extends StandardJmsActivationSpecFactory { @Override protected void applyAcknowledgeMode(BeanWrapper bw, int ackMode) { if (bw.isWritableProperty("acknowledgeMode")) { switch (ackMode) { case Session.AUTO_ACKNOWLEDGE: bw.setPropertyValue("acknowledgeMode", "AUTO_ACKNOWLEDGE"); break; case Session.DUPS_OK_ACKNOWLEDGE: bw.setPropertyValue("acknowledgeMode", "DUPS_OK_ACKNOWLEDGE"); break; case Session.SESSION_TRANSACTED: bw.setPropertyValue("acknowledgeMode", "SESSION_TRANSACTED"); break; default: throw new IllegalArgumentException("acknowledgeMode = " + ackMode + " not supported."); } } } }
So POJO MessageListener looks OK. I was able to confirm that Transactional messaging was now working, the callback is on a proper WorkManager Thread and we can dynamically create, start and stop connections.
Here's config from initial test:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd"> <bean id="mbeanServer" class="org.jboss.mx.util.MBeanServerLocator" factory-method="locateJBoss" /> <bean id="resourceAdapter" factory-bean="mbeanServer" factory-method="getAttribute"> <constructor-arg> <bean class="org.springframework.jmx.support.ObjectNameManager" factory-method="getInstance"> <constructor-arg value="jboss.jca:name='jms-ra.rar',service=RARDeployment" /> </bean> </constructor-arg> <constructor-arg value="ResourceAdapter" /> </bean> <bean id="messageListener" class="jca.jms.MessageLogger" /> <bean id="jmsManager" class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager" lazy-init="true"> <property name="activationSpecFactory"> <bean class="jca.jms.config.JBossJmsActivationSpecFactory"> <property name="activationSpecClass" value="org.jboss.resource.adapter.jms.inflow.JmsActivationSpec" /> </bean> </property> <property name="activationSpecConfig"> <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig"> <property name="destinationName" value="MYTOPIC" /> <property name="pubSubDomain" value="true" /> <property name="acknowledgeMode"> <util:constant static-field="javax.jms.Session.SESSION_TRANSACTED" /> </property> </bean> </property> <property name="autoStartup" value="false" /> <property name="resourceAdapter" ref="resourceAdapter" /> <property name="messageListener" ref="messageListener" /> </bean> </beans>
And code equivalent:public class ControlEJB extends AbstractStatelessSessionBean { private static final String FACTORY_SELECTOR = "classpath*:beanRefFactory.xml"; private static final String FACTORY_KEY = "jca-jms"; private static final Logger log = Logger.getLogger(ControlEJB.class); private JmsMessageEndpointManager jmsManager; public void start() { log.info("#start"); jmsManager.start(); } public void stop() { log.info("#stop"); jmsManager.stop(); } @Override public void setSessionContext(SessionContext sessionContext) { super.setSessionContext(sessionContext); setBeanFactoryLocator(ContextSingletonBeanFactoryLocator.getInstance(FACTORY_SELECTOR)); setBeanFactoryLocatorKey(FACTORY_KEY); } @Override protected void onEjbCreate() throws CreateException { log.info("#onEjbCreate"); jmsManager = new JmsMessageEndpointManager(); JmsActivationSpecConfig activationSpecConfig = new JmsActivationSpecConfig(); activationSpecConfig.setAcknowledgeMode(Session.SESSION_TRANSACTED); activationSpecConfig.setDestinationName("MYTOPIC"); activationSpecConfig.setPubSubDomain(true); jmsManager.setActivationSpecConfig(activationSpecConfig); StandardJmsActivationSpecFactory activationSpecFactory = new JBossJmsActivationSpecFactory(); try { activationSpecFactory.setActivationSpecClass(Class .forName("org.jboss.resource.adapter.jms.inflow.JmsActivationSpec")); } catch (ClassNotFoundException e) { throw new EJBException(e); } jmsManager.setActivationSpecFactory(activationSpecFactory); jmsManager.setAutoStartup(false); jmsManager.setResourceAdapter((ResourceAdapter) getBeanFactory().getBean("resourceAdapter")); MessageListener listener = new MessageLogger(); jmsManager.setMessageListener(listener); try { jmsManager.afterPropertiesSet(); } catch (ResourceException e) { throw new EJBException(e); } } }