Transaction timeouts in 2.2.14 (not working??)
npomfret Apr 11, 2013 6:56 AMWhat should happen when a transaction timeout is exceded? I assumed, perhaps incorrectly, that if a transaction is not commited within the timeout then:
a) it should get replayed to another consumer
b) if/when the commit finally happens (after the timeout) it should throw an exception
However, what I see is that the commit is successful despite exceeding the timeout. Here's a unit test to prove it:
-------------------------------
package hornetq;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.integration.logging.Log4jLogDelegateFactory;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.JMSQueueConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.junit.After;
import org.junit.Test;
import static javax.jms.Session.*;
import static org.hamcrest.Matchers.*;
import static org.hornetq.core.remoting.impl.netty.TransportConstants.*;
import static org.junit.Assert.*;
public class HornetQFeaturesTest {
private static String queueName = "queue";
private EmbeddedJMS server;
private Connection connection;
private Session nonTransactedSession;
private Session transactedSession;
@Test
public void justShowThatWeCanSendARecieveAMessage() throws Exception {
start();
sendBytesMessageTo("message", nonTransactedSession);
MessageConsumer consumer = nonTransactedSession.createConsumer(nonTransactedSession.createQueue(queueName));
assertThat(consumer.receive(100), not(nullValue()));
assertThat(consumer.receive(100), nullValue());
}
@Test
public void showThatTxnTimeoutDoNotWorkAsExpcted() throws Exception {
final int transactionTimeout = 1000;
start(transactionTimeout);
sendBytesMessageTo("message", nonTransactedSession);
assertThat(startAVerySlowConsumer(transactionTimeout).await(transactionTimeout, TimeUnit.MILLISECONDS), equalTo(true));//the slow consumer has got the message
final CountDownLatch countDownLatch = new CountDownLatch(1);
MessageConsumer consumer = nonTransactedSession.createConsumer(nonTransactedSession.createQueue(queueName));
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("Woohoo! it got delivered to someone else");// this never happens
countDownLatch.countDown();
}
});
assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS), equalTo(true));
}
private CountDownLatch startAVerySlowConsumer(final int transactionTimeout) throws JMSException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
MessageConsumer consumer = transactedSession.createConsumer(transactedSession.createQueue(queueName));
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("HornetQFeaturesTest.onMessage " + message);
countDownLatch.countDown();
try {
System.out.println("sleeping for longer than the tx timeout");
Thread.sleep(transactionTimeout * 2);
} catch (InterruptedException e) {
return;
}
try {
System.out.println("HornetQFeaturesTest.onMessage comitting txn");
transactedSession.commit();//should blow up??
} catch (JMSException e) {
e.printStackTrace();
return;
}
}
});
return countDownLatch;
}
public void start() throws Exception {
start(0);//start broker with zero txn timeout
}
private void start(int transactionTimeout) throws Exception {
int brokerAPort = 8888;
server = createServer("serverA", brokerAPort, transactionTimeout);
server.start();
Thread.sleep(3 * 1000);//wait for the cluster - don't know how to do this programatically
HornetQConnectionFactory factoryForBorkerA = connectionFactory(brokerAPort);
connection = factoryForBorkerA.createConnection();
connection.start();
transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
nonTransactedSession = connection.createSession(false, AUTO_ACKNOWLEDGE);
}
private static void sendBytesMessageTo(String messageBody, Session session) throws Exception {
MessageProducer producer = session.createProducer(session.createQueue(queueName));
try {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageBody.getBytes("UTF-8"));
producer.send(message);
} finally {
producer.close();
}
}
private static HornetQConnectionFactory connectionFactory(int port) throws Exception {
Map<String, Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
connectionParams.put(TransportConstants.HOST_PROP_NAME, fqn());
connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
HornetQConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
return connectionFactory;
}
private static String fqn() throws UnknownHostException {
return InetAddress.getLocalHost().getCanonicalHostName();
}
private static EmbeddedJMS createServer(String name, final int port, int transactionTimeout) throws Exception {
Configuration configuration = new ConfigurationImpl();
String connectorName = "netty-connector";
Map<String, Object> acceptorParams = new HashMap<String, Object>() {{
put(PORT_PROP_NAME, port);
put(HOST_PROP_NAME, fqn());
put(USE_NIO_PROP_NAME, true);
}};
TransportConfiguration acceptorConfig = new TransportConfiguration(NettyAcceptorFactory.class.getName(), acceptorParams);
TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), acceptorParams, connectorName);
List<String> connectorInfos = new ArrayList<>();
connectorInfos.add(connectorName);
configuration.getAcceptorConfigurations().add(acceptorConfig);
configuration.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
configuration.setPersistenceEnabled(false);
configuration.setSecurityEnabled(false);
configuration.setLogDelegateFactoryClassName(Log4jLogDelegateFactory.class.getName());
configuration.setTransactionTimeout(transactionTimeout);
configuration.setTransactionTimeoutScanPeriod(100);
File dir = new File("target/" + name + "/" + UUID.randomUUID().toString());
dir.mkdirs();
configuration.setBindingsDirectory(new File(dir, "bindings").getAbsolutePath());
configuration.setJournalDirectory(new File(dir, "journal").getAbsolutePath());
configuration.setLargeMessagesDirectory(new File(dir, "largemessages").getAbsolutePath());
configuration.setPagingDirectory(new File(dir, "paging").getAbsolutePath());
configuration.setSharedStore(false);
configuration.setJournalType(JournalType.NIO);
List<JMSQueueConfiguration> queueConfigurations = new ArrayList<>();
queueConfigurations.add(new JMSQueueConfigurationImpl(queueName, null, false));
JMSConfiguration jmsConfig = new JMSConfigurationImpl(new ArrayList<ConnectionFactoryConfiguration>(), queueConfigurations, new ArrayList<TopicConfiguration>(), null);
EmbeddedJMS server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfig);
return server;
}
@After
public void after() throws Exception {
if (nonTransactedSession != null) {
nonTransactedSession.close();
}
if (transactedSession != null) {
transactedSession.close();
}
if (connection != null) {
connection.close();
}
if (server != null) {
server.stop();
}
}
}