HornetQ not shutting down all threads in Tomcat
connerm Feb 1, 2012 1:18 AMI currently am attempting to run a webapp with an in memory queue in it. I am able to startup this application with no problems but upon shutting it down it hangs with the following jstack on the tomcat pid:
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode):
"Attach Listener" daemon prio=10 tid=0x0000000041c93000 nid=0x16be waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" prio=10 tid=0x00007ff5293a3000 nid=0x1652 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Thread-1 (HornetQ-client-global-threads-1793061098)" daemon prio=10 tid=0x000000004255e800 nid=0x1677 waiting on condition [0x00007ff5246ec000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000df12bab0> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"Thread-1 (HornetQ-client-global-scheduled-threads-339855735)" daemon prio=10 tid=0x000000004255c800 nid=0x1676 waiting on condition [0x00007ff5247ed000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000df12c2a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"Thread-0 (HornetQ-client-global-threads-1793061098)" daemon prio=10 tid=0x0000000042553000 nid=0x1675 waiting on condition [0x00007ff5248ee000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000df12bab0> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"Thread-0 (HornetQ-client-global-scheduled-threads-339855735)" daemon prio=10 tid=0x0000000042550000 nid=0x1673 waiting on condition [0x00007ff524af0000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000df12c2a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"NamingBootstrap Pool(1)-1" daemon prio=10 tid=0x0000000041fc9800 nid=0x1664 in Object.wait() [0x00007ff526408000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000df2600b0> (a java.lang.Object)
at EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue.poll(BoundedLinkedQueue.java:253)
- locked <0x00000000df2600b0> (a java.lang.Object)
at EDU.oswego.cs.dl.util.concurrent.PooledExecutor.getTask(PooledExecutor.java:723)
at org.jboss.util.threadpool.MinPooledExecutor.getTask(MinPooledExecutor.java:106)
at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExecutor.java:747)
at java.lang.Thread.run(Thread.java:662)
"RMI Reaper" prio=10 tid=0x0000000041d46800 nid=0x1663 in Object.wait() [0x00007ff526509000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000df25a4f0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x00000000df25a4f0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at sun.rmi.transport.ObjectTable$Reaper.run(ObjectTable.java:333)
at java.lang.Thread.run(Thread.java:662)
"RMI TCP Accept-1098" daemon prio=10 tid=0x00000000422ff000 nid=0x1662 runnable [0x00007ff52660a000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
- locked <0x00000000df25a698> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:462)
at java.net.ServerSocket.accept(ServerSocket.java:430)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
at java.lang.Thread.run(Thread.java:662)
"GC Daemon" daemon prio=10 tid=0x00007ff52824b000 nid=0x165f in Object.wait() [0x00007ff526cb4000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000085200120> (a sun.misc.GC$LatencyLock)
at sun.misc.GC$Daemon.run(GC.java:100)
- locked <0x0000000085200120> (a sun.misc.GC$LatencyLock)
"Low Memory Detector" daemon prio=10 tid=0x00007ff528071000 nid=0x165d runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" daemon prio=10 tid=0x00007ff52806e800 nid=0x165c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" daemon prio=10 tid=0x00007ff52806b800 nid=0x165b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x00007ff528069800 nid=0x165a runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x00007ff52804d000 nid=0x1659 in Object.wait() [0x00007ff527bfa000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000852080c0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x00000000852080c0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
"Reference Handler" daemon prio=10 tid=0x00007ff52804b000 nid=0x1658 in Object.wait() [0x00007ff527cfb000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000085200100> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <0x0000000085200100> (a java.lang.ref.Reference$Lock)
"VM Thread" prio=10 tid=0x00007ff528044000 nid=0x1657 runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x000000004135b000 nid=0x1653 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x000000004135d000 nid=0x1654 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x000000004135e800 nid=0x1655 runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x0000000041360800 nid=0x1656 runnable
"VM Periodic Task Thread" prio=10 tid=0x00007ff52807c000 nid=0x165e waiting on condition
JNI global references: 1139
Upon viewing https://community.jboss.org/thread/161892, I see setting useGlobalThreadPool to false can fix this, upon adding <use-global-pools>false</use-global-pools> to my connector-factory the same thing happens and a jstack shows:
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode):
"Attach Listener" daemon prio=10 tid=0x00000000419a9000 nid=0x1497 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" prio=10 tid=0x00007f3dc0007000 nid=0x13f4 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Thread-1 (HornetQ-client-factory-pinger-threads-1334375686-320508688)" daemon prio=10 tid=0x00007f3dc0650800 nid=0x141c waiting on condition [0x00007f3dc7af9000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000dadc11f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:160)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"Thread-0 (HornetQ-client-factory-pinger-threads-1334375686-320508688)" daemon prio=10 tid=0x00007f3dc0654800 nid=0x1419 waiting on condition [0x00007f3dc7dfc000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000dadc11f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:160)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"RMI Reaper" prio=10 tid=0x00007f3dc01f4000 nid=0x1405 in Object.wait() [0x00007f3dcd8c3000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000008576dff0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x000000008576dff0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at sun.rmi.transport.ObjectTable$Reaper.run(ObjectTable.java:333)
at java.lang.Thread.run(Thread.java:662)
"RMI TCP Accept-1098" daemon prio=10 tid=0x00007f3dc0213000 nid=0x1404 runnable [0x00007f3dcd9c4000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
- locked <0x0000000085639568> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:462)
at java.net.ServerSocket.accept(ServerSocket.java:430)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
at java.lang.Thread.run(Thread.java:662)
"GC Daemon" daemon prio=10 tid=0x00007f3dc0081000 nid=0x1401 in Object.wait() [0x00007f3dce06e000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000085200120> (a sun.misc.GC$LatencyLock)
at sun.misc.GC$Daemon.run(GC.java:100)
- locked <0x0000000085200120> (a sun.misc.GC$LatencyLock)
"Low Memory Detector" daemon prio=10 tid=0x00007f3dc8071000 nid=0x13ff runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" daemon prio=10 tid=0x00007f3dc806e800 nid=0x13fe waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" daemon prio=10 tid=0x00007f3dc806b800 nid=0x13fd waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x00007f3dc8069800 nid=0x13fc runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x00007f3dc804d000 nid=0x13fb in Object.wait() [0x00007f3dcefcb000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000085208790> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x0000000085208790> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
"Reference Handler" daemon prio=10 tid=0x00007f3dc804b000 nid=0x13fa in Object.wait() [0x00007f3dcf0cc000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000085200100> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <0x0000000085200100> (a java.lang.ref.Reference$Lock)
"VM Thread" prio=10 tid=0x00007f3dc8044000 nid=0x13f9 runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00000000419bc000 nid=0x13f5 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00000000419be000 nid=0x13f6 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00000000419bf800 nid=0x13f7 runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00000000419c1800 nid=0x13f8 runnable
"VM Periodic Task Thread" prio=10 tid=0x00007f3dc807c000 nid=0x1400 waiting on condition
JNI global references: 1196
In either case it appears there are still hanging threads. My configurations are as follows:
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="InVMConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
<use-global-pools>false</use-global-pools>
</connection-factory>
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
<queue name="ExpiryQueue">
<entry name="/queue/ExpiryQueue"/>
</queue>
<queue name="Notifications">
<entry name="/queue/Notifications"/>
</queue>
</configuration>
hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
<log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory
</log-delegate-factory-class-name>
<bindings-directory>${data.dir:../data}/hornetq/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/hornetq/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/hornetq/largemessages</large-messages-directory>
<paging-directory>${data.dir:../data}/hornetq/paging</paging-directory>
<connectors>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
</connector>
</connectors>
<acceptors>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
jndi.properties
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="example.blah"/>
<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop">
<property name="useGlobalService" value="false" />
</bean>
<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
<property name="namingInfo" ref="namingServerImpl" />
<property name="port" value="1099" />
<property name="bindAddress" value="localhost" />
<property name="rmiPort" value="1098" />
<property name="rmiBindAddress" value="localhost" />
</bean>
<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />
<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />
<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />
<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
<constructor-arg ref="fileConfiguration" />
<constructor-arg ref="mbeanServer" />
<constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>
<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
<constructor-arg ref="hornetQServerImpl" />
</bean>
<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
<property name="jndiName" value="java:/ConnectionFactory"/>
</bean>
<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
<property name="jndiName" value="/queue/Notifications"/>
</bean>
<bean id="notificationsProducer" class="example.blah.NotificationsProducer" init-method="init" destroy-method="destroy" depends-on="notificationsQueue">
<property name="notificationsQueue" ref="notificationsQueue" />
<property name="inVMConnectionFactory" ref="inVMConnectionFactory" />
</bean>
<bean id="notificationsConsumer" class="example.blah.NotificationsConsumer" init-method="init" destroy-method="destroy" depends-on="notificationsQueue">
<property name="notificationsQueue" ref="notificationsQueue" />
<property name="inVMConnectionFactory" ref="inVMConnectionFactory" />
</bean>
</beans>
And the following consumer / producer:
NotificationsConsumer
imports
.
.
.
public class NotificationsConsumer implements MessageListener {
Queue notificationsQueue;
ConnectionFactory inVMConnectionFactory;
private Connection notificationsQueueConnection;
public void init() throws Exception {
notificationsQueueConnection = inVMConnectionFactory.createConnection();
Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
notificationsQueueConsumer.setMessageListener(this);
notificationsQueueConnection.start();
}
public void destroy() throws Exception {
if(notificationsQueueConnection != null)
notificationsQueueConnection.close();
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println("The Notification Message is : \n" + text);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
} else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
public void setNotificationsQueue(Queue notificationsQueue) {
this.notificationsQueue = notificationsQueue;
}
public void setInVMConnectionFactory(ConnectionFactory inVMConnectionFactory) {
this.inVMConnectionFactory = inVMConnectionFactory;
}
}
NotificationsProducer
imports
.
.
.
public class NotificationsProducer {
Queue notificationsQueue;
ConnectionFactory inVMConnectionFactory;
private Connection notificationsQueueConnection;
private Session notificationsQueueSession;
private MessageProducer notificationsQueueProducer;
public void init() throws Exception {
notificationsQueueConnection = inVMConnectionFactory.createConnection();
notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void destroy() throws Exception {
if(notificationsQueueConnection != null)
notificationsQueueConnection.close();
}
public void sendNotification(final String message) throws Exception {
TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
notificationsQueueProducer.send(textMessage);
}
public void setNotificationsQueue(Queue notificationsQueue) {
this.notificationsQueue = notificationsQueue;
}
public void setInVMConnectionFactory(ConnectionFactory inVMConnectionFactory) {
this.inVMConnectionFactory = inVMConnectionFactory;
}
}
I apologize for the length of the post but wanted to show all my configurations in hopes I was missing a small configuration. I've been at this for the last couple of days and would really appreciate any help. Thanks in advance.
- Conner