1 Reply Latest reply on Apr 11, 2013 12:35 PM by clebert.suconic

    Transaction timeouts in 2.2.14 (not working??)

    npomfret

      What should happen when a transaction timeout is exceded?  I assumed, perhaps incorrectly, that if a transaction is not commited within the timeout then:

       

           a) it should get replayed to another consumer

           b) if/when the commit finally happens (after the timeout) it should throw an exception

       

      However, what I see is that the commit is successful despite exceeding the timeout.  Here's a unit test to prove it:

       

      -------------------------------

       

      package hornetq;

       

       

      import javax.jms.BytesMessage;

      import javax.jms.Connection;

      import javax.jms.DeliveryMode;

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageConsumer;

      import javax.jms.MessageListener;

      import javax.jms.MessageProducer;

      import javax.jms.Session;

      import java.io.File;

      import java.net.InetAddress;

      import java.net.UnknownHostException;

      import java.util.ArrayList;

      import java.util.HashMap;

      import java.util.List;

      import java.util.Map;

      import java.util.UUID;

      import java.util.concurrent.CountDownLatch;

      import java.util.concurrent.TimeUnit;

       

       

      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.jms.HornetQJMSClient;

      import org.hornetq.api.jms.JMSFactoryType;

      import org.hornetq.core.config.Configuration;

      import org.hornetq.core.config.impl.ConfigurationImpl;

      import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

      import org.hornetq.core.server.JournalType;

      import org.hornetq.integration.logging.Log4jLogDelegateFactory;

      import org.hornetq.jms.client.HornetQConnectionFactory;

      import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;

      import org.hornetq.jms.server.config.JMSConfiguration;

      import org.hornetq.jms.server.config.JMSQueueConfiguration;

      import org.hornetq.jms.server.config.TopicConfiguration;

      import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;

      import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;

      import org.hornetq.jms.server.embedded.EmbeddedJMS;

      import org.junit.After;

      import org.junit.Test;

       

       

      import static javax.jms.Session.*;

      import static org.hamcrest.Matchers.*;

      import static org.hornetq.core.remoting.impl.netty.TransportConstants.*;

      import static org.junit.Assert.*;

       

       

      public class HornetQFeaturesTest {

       

       

          private static String queueName = "queue";

       

       

          private EmbeddedJMS server;

          private Connection connection;

          private Session nonTransactedSession;

          private Session transactedSession;

       

       

          @Test

          public void justShowThatWeCanSendARecieveAMessage() throws Exception {

              start();

       

       

              sendBytesMessageTo("message", nonTransactedSession);

       

       

              MessageConsumer consumer = nonTransactedSession.createConsumer(nonTransactedSession.createQueue(queueName));

       

       

              assertThat(consumer.receive(100), not(nullValue()));

              assertThat(consumer.receive(100), nullValue());

          }

       

       

          @Test

          public void showThatTxnTimeoutDoNotWorkAsExpcted() throws Exception {

              final int transactionTimeout = 1000;

       

       

              start(transactionTimeout);

       

       

              sendBytesMessageTo("message", nonTransactedSession);

       

       

              assertThat(startAVerySlowConsumer(transactionTimeout).await(transactionTimeout, TimeUnit.MILLISECONDS), equalTo(true));//the slow consumer has got the message

       

       

              final CountDownLatch countDownLatch = new CountDownLatch(1);

              MessageConsumer consumer = nonTransactedSession.createConsumer(nonTransactedSession.createQueue(queueName));

              consumer.setMessageListener(new MessageListener() {

                  @Override

                  public void onMessage(Message message) {

                      System.out.println("Woohoo! it got delivered to someone else");// this never happens

                      countDownLatch.countDown();

                  }

              });

       

       

              assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS), equalTo(true));

          }

       

       

          private CountDownLatch startAVerySlowConsumer(final int transactionTimeout) throws JMSException {

              final CountDownLatch countDownLatch = new CountDownLatch(1);

              MessageConsumer consumer = transactedSession.createConsumer(transactedSession.createQueue(queueName));

              consumer.setMessageListener(new MessageListener() {

                  @Override

                  public void onMessage(Message message) {

                      System.out.println("HornetQFeaturesTest.onMessage " + message);

                      countDownLatch.countDown();

       

       

                      try {

                          System.out.println("sleeping for longer than the tx timeout");

                          Thread.sleep(transactionTimeout * 2);

                      } catch (InterruptedException e) {

                          return;

                      }

       

       

                      try {

                          System.out.println("HornetQFeaturesTest.onMessage comitting txn");

                          transactedSession.commit();//should blow up??

                      } catch (JMSException e) {

                          e.printStackTrace();

                          return;

                      }

                  }

              });

              return countDownLatch;

          }

       

       

          public void start() throws Exception {

              start(0);//start broker with zero txn timeout

          }

       

       

          private void start(int transactionTimeout) throws Exception {

              int brokerAPort = 8888;

              server = createServer("serverA", brokerAPort, transactionTimeout);

              server.start();

       

       

              Thread.sleep(3 * 1000);//wait for the cluster - don't know how to do this programatically

       

       

              HornetQConnectionFactory factoryForBorkerA = connectionFactory(brokerAPort);

              connection = factoryForBorkerA.createConnection();

              connection.start();

       

       

              transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);

              nonTransactedSession = connection.createSession(false, AUTO_ACKNOWLEDGE);

          }

       

       

          private static void sendBytesMessageTo(String messageBody, Session session) throws Exception {

              MessageProducer producer = session.createProducer(session.createQueue(queueName));

              try {

                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                  BytesMessage message = session.createBytesMessage();

                  message.writeBytes(messageBody.getBytes("UTF-8"));

                  producer.send(message);

              } finally {

                  producer.close();

              }

          }

       

       

          private static HornetQConnectionFactory connectionFactory(int port) throws Exception {

              Map<String, Object> connectionParams = new HashMap<>();

              connectionParams.put(TransportConstants.PORT_PROP_NAME, port);

              connectionParams.put(TransportConstants.HOST_PROP_NAME, fqn());

              connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);

              TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);

       

       

              HornetQConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);

              return connectionFactory;

          }

       

       

          private static String fqn() throws UnknownHostException {

              return InetAddress.getLocalHost().getCanonicalHostName();

          }

       

       

          private static EmbeddedJMS createServer(String name, final int port, int transactionTimeout) throws Exception {

              Configuration configuration = new ConfigurationImpl();

       

       

              String connectorName = "netty-connector";

       

       

              Map<String, Object> acceptorParams = new HashMap<String, Object>() {{

                  put(PORT_PROP_NAME, port);

                  put(HOST_PROP_NAME, fqn());

                  put(USE_NIO_PROP_NAME, true);

              }};

       

       

              TransportConfiguration acceptorConfig = new TransportConfiguration(NettyAcceptorFactory.class.getName(), acceptorParams);

              TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), acceptorParams, connectorName);

       

       

              List<String> connectorInfos = new ArrayList<>();

              connectorInfos.add(connectorName);

       

       

              configuration.getAcceptorConfigurations().add(acceptorConfig);

              configuration.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);

              configuration.setPersistenceEnabled(false);

              configuration.setSecurityEnabled(false);

              configuration.setLogDelegateFactoryClassName(Log4jLogDelegateFactory.class.getName());

              configuration.setTransactionTimeout(transactionTimeout);

              configuration.setTransactionTimeoutScanPeriod(100);

              File dir = new File("target/" + name + "/" + UUID.randomUUID().toString());

              dir.mkdirs();

       

       

              configuration.setBindingsDirectory(new File(dir, "bindings").getAbsolutePath());

              configuration.setJournalDirectory(new File(dir, "journal").getAbsolutePath());

              configuration.setLargeMessagesDirectory(new File(dir, "largemessages").getAbsolutePath());

              configuration.setPagingDirectory(new File(dir, "paging").getAbsolutePath());

              configuration.setSharedStore(false);

              configuration.setJournalType(JournalType.NIO);

       

       

              List<JMSQueueConfiguration> queueConfigurations = new ArrayList<>();

              queueConfigurations.add(new JMSQueueConfigurationImpl(queueName, null, false));

       

       

              JMSConfiguration jmsConfig = new JMSConfigurationImpl(new ArrayList<ConnectionFactoryConfiguration>(), queueConfigurations, new ArrayList<TopicConfiguration>(), null);

       

       

              EmbeddedJMS server = new EmbeddedJMS();

              server.setConfiguration(configuration);

              server.setJmsConfiguration(jmsConfig);

              return server;

          }

       

       

          @After

          public void after() throws Exception {

              if (nonTransactedSession != null) {

                  nonTransactedSession.close();

              }

              if (transactedSession != null) {

                  transactedSession.close();

              }

              if (connection != null) {

                  connection.close();

              }

              if (server != null) {

                  server.stop();

              }

          }

      }