Rollback Problem in JBM 2.0.0 BETA4
kazuno Aug 24, 2009 1:02 AMHello.
I'm evaluating JBoss Messaging 2.0.0 BETA4 .
And, I tried the following procedure to check JBM's rollback functionality.
1. Start up JBM.
2. Check MessageCount of ExampleQue by jconsole. and 0 message found.
3. Producer sends 1000 messages to ExampleQue by a single transaction. and commit.
4. Check MessageCount of ExampleQue by jconsole. and 1000 messages found.
5. Consumer tries to receive 1000 messages using a transaction for each message. and makes a rollback one out of ten trials.
My Consumer received 30 messages and looks like freezed, seems to wait the arraival of a message.
10 messsages, 100 messages are OK. But at the case of 1000 messages, this problem happens.
My Consumer's output is here.
message 0 received. payload is 0 === Rollback === message 1 received. payload is 0 message 2 received. payload is 1 message 3 received. payload is 2 message 4 received. payload is 3 message 5 received. payload is 4 message 6 received. payload is 5 message 7 received. payload is 6 message 8 received. payload is 7 message 9 received. payload is 8 message 10 received. payload is 9 message 11 received. payload is 10 === Rollback === message 12 received. payload is 10 message 13 received. payload is 11 message 14 received. payload is 12 message 15 received. payload is 13 message 16 received. payload is 14 message 17 received. payload is 15 message 18 received. payload is 16 message 19 received. payload is 17 message 20 received. payload is 18 message 21 received. payload is 19 message 22 received. payload is 20 === Rollback === message 23 received. payload is 20 message 24 received. payload is 21 message 25 received. payload is 22 message 26 received. payload is 23 message 27 received. payload is 24 message 28 received. payload is 25 message 29 received. payload is 26 message 30 received. payload is 27 message 31 received. payload is 28 message 32 received. payload is 29 message 33 received. payload is 30 === Rollback ===
I confirmed following by jconsole.
o ExampleQueue has 70 messages.
And my environment is
o Windows XP SP3
o Sun JDK 1.6.0_15
o JBoss Messaging 2.0.0 BETA4
Any comments or suggestions?
Producer.java
import java.util.HashMap; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.jboss.messaging.core.config.TransportConfiguration; import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory; import org.jboss.messaging.jms.JBossQueue; import org.jboss.messaging.jms.client.JBossConnectionFactory; /** * A Producer. */ public class Producer { public static void main(String[] args) throws Exception { Connection connection = null; Queue queue = null; Session session = null; MessageProducer producer = null; try { // Step 1. Create a ConnectionFactory Map<String, Object> params = new HashMap<String, Object>(); params.put("jbm.remoting.netty.host","localhost"); params.put("jbm.remoting.netty.port","5445"); TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); ConnectionFactory cf = new JBossConnectionFactory(transportConfiguration); // Step 2. Create a Connection and start. connection = cf.createConnection(); connection.start(); // Step 3. Create a Session that transactedl. session = connection.createSession(true, Session.SESSION_TRANSACTED); // Step 4. Create a Queue. queue = new JBossQueue("ExampleQueue"); // Step 5. Create a Producer. producer = session.createProducer(queue); // Step 7. Set delivery mode to persistent. producer.setDeliveryMode(DeliveryMode.PERSISTENT); // Step 8. Send 1000 messages. for (int i = 0; i < 1000; i++) { Message message = session.createObjectMessage(new Integer(i)); producer.send(message); System.out.println("message " + Integer.toString(i) + " sent."); } // Step 9. Commit. session.commit(); System.out.println("commited!"); } finally { if (producer != null) { producer.close(); } if (session != null) { session.rollback(); session.close(); } if (connection != null) { connection.close(); } } } }
Consumer.java
import java.util.HashMap; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import org.jboss.messaging.core.config.TransportConfiguration; import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory; import org.jboss.messaging.jms.JBossQueue; import org.jboss.messaging.jms.client.JBossConnectionFactory; /** * A Consumer. */ public class Consumer { public static void main(String[] args) throws Exception { Connection connection = null; Queue queue = null; Session session = null; MessageConsumer consumer = null; try { // Step 1. Create a ConnectionFactory Map<String, Object> params = new HashMap<String, Object>(); params.put("jbm.remoting.netty.host","localhost"); params.put("jbm.remoting.netty.port","5445"); TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); ConnectionFactory cf = new JBossConnectionFactory(transportConfiguration); // Step 2. Create a Connection and start. connection = cf.createConnection(); connection.start(); queue = new JBossQueue("ExampleQueue"); // Step 3. Create a Session that transactedl. session = connection.createSession(true, Session.SESSION_TRANSACTED); // Step 4. Create a Queue. queue = new JBossQueue("ExampleQueue"); // Step 5. Create a Consumer. consumer = session.createConsumer(queue); // Step 7. Try to eceive 1000 messages, but one in ten messages rollbacked. int commited = 0; int rollbacked = 0; for (int i = 0; i < 1100; i++) { ObjectMessage message = (ObjectMessage)consumer.receive(); String payload = ((Integer)message.getObject()).toString(); System.out.println("message " + Integer.toString(i) + " received. payload is " + payload); boolean redelivered = message.getJMSRedelivered(); if(payload.substring(payload.length() -1, payload.length()).equals("0") && !redelivered) { System.out.println("=== Rollback ==="); session.rollback(); rollbacked++; } else { session.commit(); commited++; } } System.out.println("===="); System.out.println("commited:" + commited); System.out.println("rollbacked:" + rollbacked); } finally { if (consumer != null) { consumer.close(); } if (session != null) { session.rollback(); session.close(); } if (connection != null) { connection.close(); } } } }
jbm-configuration.xml
<configuration xmlns="urn:jboss:messaging" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:jboss:messaging /schema/jbm-configuration.xsd"> <connectors> <connector name="netty"> <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class> <param key="jbm.remoting.netty.host" value="${jbm.remoting.netty.host:localhost}" type="String"/> <param key="jbm.remoting.netty.port" value="${jbm.remoting.netty.port:5445}" type="Integer"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class> <param key="jbm.remoting.netty.host" value="${jbm.remoting.netty.host:localhost}" type="String"/> <param key="jbm.remoting.netty.port" value="${jbm.remoting.netty.port:5445}" type="Integer"/> </acceptor> </acceptors> <security-settings> <security-setting match="#"> <permission type="createTempQueue" roles="guest"/> <permission type="deleteTempQueue" roles="guest"/> <permission type="consume" roles="guest"/> <permission type="send" roles="guest"/> </security-setting> </security-settings> <address-settings> <!--default for catch all--> <address-setting match="#"> <clustered>false</clustered> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <max-size-bytes>-1</max-size-bytes> <page-size-bytes>10485760</page-size-bytes> <distribution-policy-class>org.jboss.messaging.core.server.impl.RoundRobinDistributor</distribution-policy-class> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> </address-settings> <paging-directory>../data/paging</paging-directory> <bindings-directory>../data/bindings</bindings-directory> <journal-directory>../data/journal</journal-directory> <large-messages-directory>../data/large-messages</large-messages-directory> </configuration>
jbm-jms.xml
<configuration xmlns="urn:jboss:messaging" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:jboss:messaging /schema/jbm-jms.xsd"> <connection-factory name="ConnectionFactory"> <connector-ref connector-name="netty"/> <entries> <entry name="ConnectionFactory"/> <entry name="XAConnectionFactory"/> <entry name="java:/ConnectionFactory"/> <entry name="java:/XAConnectionFactory"/> </entries> </connection-factory> <queue name="DLQ"> <entry name="/queue/DLQ"/> </queue> <queue name="ExpiryQueue"> <entry name="/queue/ExpiryQueue"/> </queue> <queue name="ExampleQueue"> <entry name="/queue/ExampleQueue"/> </queue> <topic name="ExampleTopic"> <entry name="/topic/ExampleTopic"/> </topic> </configuration>