Losing messages on the subscriber when filtering
java1970 May 11, 2011 9:40 AMHere is a simple load test program to test the creation of 10 subscribers and one publisher that publishes 5000 messages.
When ran using an argument it uses a selector when creating the subscribers and when publishing the message.
The test passes when I just run one process at a time. (ten subscribers and one publisher on the topic using one filter)
The trouble happens when I run >=two of these processes with different command line arguments (i.e. a different filter for each running process)
Let's say I run this process twice. Once with a command line argument of 1 and the other with a command line argument of 2
This will create one process with 10 subscribers and publisher publishing 5000 messages - both using a filter of 1
This will also create another process with 10 subscribers and a publisher publishing 5000 messages - both using a filter of 2
When looking at the output of the two processes, one of the processes will usually not pick up the expected amount of published messages while the other will pick up more than what was expected.
Following is the code. Maybe hornetQ is ok and there is just something incorrect with my test.
Hope it was ok to just paste it in here seeing as it's not too large.
If a zip file is preferred I can do that.
I have HornetQ 2.2.2 final integrated with AS 5
Let me know if there is any other information you need.
Thanks
*******************************************************************************************************************************************
{code}
import java.util.Properties;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class BasicJMSLoadTest
{
private static Logger log = Logger.getLogger(BasicJMSLoadTest.class.getName());
// parameters that are populated from VM parameters
private static int numSubscribers, numMessages, messageSize, publishSleep, onMessageSleep, waitSleep;
private static String topicName;
private static String namingHost;
private static int namingPort;
private static boolean runOutOfMemory = false;
private static InitialContext initialContext;
private static ConnectionFactory connectionFactory;
private static TopicConnection[] connections;
private static TopicSession[] sessions;
private static TopicSubscriber[] subscribers;
private static CountingMessageListener[] listeners;
private static String clientId = null;
private static Vector memoryUsingVector = new Vector();
private static int publishCount, publishRetries, receiveCount = -1;
private static Throwable publishException, otherException;
public static void main(String[] args) throws Throwable
{
try
{
if (args.length == 1)
clientId = args[0];
log.info("Started with clientId: " + clientId);
initializeParameters();
initializeConnection();
log.info("Creating subscribers");
createSubscribers();
log.info("Starting publish");
publishMessagesInThread();
long start = System.currentTimeMillis();
log.info("Waiting for finish...");
waitForFinish();
if (publishRetries > 0 && publishException == null)
publishException = new Exception("There were " + publishRetries + " publish retries.");
if (otherException != null)
log.info("Received exception: " + otherException);
if (publishException != null)
log.info("Received publish exception: " + publishException);
log.info("Finished in " + (System.currentTimeMillis() - start) + " ms... Closing connections...");
closeResources();
log.info("Connections closed.");
log.info("Publish: " + publishCount + " Rec: " + receiveCount + " Retries: " + publishRetries);
}
catch (Throwable ex)
{
log.log(Level.SEVERE, "Exception", ex);
otherException = ex;
}
if (otherException != null || publishException != null)
{
log.info("Exception received. Press Enter to quit.");
System.in.read();
}
}
private static void createSubscribers() throws Exception
{
connections = new TopicConnection[numSubscribers];
sessions = new TopicSession[numSubscribers];
subscribers = new TopicSubscriber[numSubscribers];
listeners = new CountingMessageListener[numSubscribers];
Topic topic = (Topic)initialContext.lookup("topic/" + topicName);
for (int i = 0; i < numSubscribers; i++)
{
connections[i] = (TopicConnection)connectionFactory.createConnection();
connections[i].setExceptionListener(new LocalExceptionListener(i));
sessions[i] = connections[i].createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
if (clientId == null)
subscribers[i] = sessions[i].createSubscriber(topic);
else
subscribers[i] = sessions[i].createSubscriber(topic, "clientId = '" + clientId + "'", false);
listeners[i] = new CountingMessageListener();
subscribers[i].setMessageListener(listeners[i]);
connections[i].start();
}
}
private static void publishMessagesInThread()
{
Thread t = new Thread(new Runnable()
{
public void run()
{
try
{
Topic topic = (Topic)initialContext.lookup("topic/" + topicName);
TopicConnection c = (TopicConnection)connectionFactory.createConnection();
TopicSession s = c.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher p = s.createPublisher(topic);
for (int i = 0; i < numMessages; i++)
{
int retryCount = 0;
boolean success = false;
// we get a MessagingNetworkFailureException sometimes. Retrying the publish appears to work sometimes
while (!success && retryCount < 3)
{
try
{
ObjectMessage message = s.createObjectMessage(new byte[messageSize]);
if (clientId != null)
message.setStringProperty("clientId", clientId);
p.publish(topic, message);
publishCount++;
sleep(publishSleep);
success = true;
}
catch (Throwable ex)
{
log.log(Level.SEVERE, "Received publish exception - will retry", ex);
retryCount++;
publishRetries++;
if (publishRetries > 25) // if we have to retry more than 25 times total, stop the test
throw ex;
}
}
}
p.close();
s.close();
c.close();
}
catch (Throwable ex)
{
log.log(Level.SEVERE, "Error during publish", ex);
publishException = ex;
}
}
});
t.start();
}
private static void waitForFinish()
{
int totalReceived = 0, noMessagesReceivedCount = 0;
do
{
totalReceived = 0;
for (int i = 0; i < numSubscribers; i++)
totalReceived += listeners[i].messagesReceived;
log.info("Published: " + publishCount + " Received: " + totalReceived);
// if we haven't received any new messages, stop waiting for new messages and record an exception
if (totalReceived == receiveCount)
noMessagesReceivedCount++;
else
noMessagesReceivedCount = 0;
sleep(waitSleep);
if (noMessagesReceivedCount == 10)
otherException = new Exception("No messages received in timeout.");
receiveCount = totalReceived;
} while (totalReceived < numMessages * numSubscribers &&
publishException == null && noMessagesReceivedCount < 10);
if (totalReceived > numMessages * numSubscribers)
otherException = new Exception("Received more messages than expected (" + totalReceived + ")");
}
private static void sleep(int millis)
{
if (millis > 0)
try
{
Thread.sleep(millis);
}
catch (Exception ex)
{
}
}
private static void initializeConnection() throws NamingException
{
log.info("Getting initial context [" + namingHost + ":" + namingPort + "]");
Properties prop = new Properties();
prop.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
prop.setProperty(InitialContext.PROVIDER_URL, namingHost + ":" + namingPort);
initialContext = new InitialContext(prop);
log.info("Initial context created, looking up ConnectionFactory.");
connectionFactory = (ConnectionFactory)initialContext.lookup("ConnectionFactory");
log.info("ConnectionFactory looked up.");
}
private static void closeResources() throws Exception
{
for (int i = 0; i < numSubscribers; i++)
{
subscribers[i].close();
sessions[i].close();
connections[i].close();
}
}
private static void initializeParameters()
{
messageSize = Integer.parseInt(System.getProperty("messageSize", "4096"));
publishSleep = Integer.parseInt(System.getProperty("publishSleep", "10"));
waitSleep = Integer.parseInt(System.getProperty("waitSleep", "5000"));
onMessageSleep = Integer.parseInt(System.getProperty("onMessageSleep", "10"));
numMessages = Integer.parseInt(System.getProperty("numMessages", "5000"));
numSubscribers = Integer.parseInt(System.getProperty("numSubscribers", "10"));
topicName = System.getProperty("topicName", "jmsLoadTest");
namingHost = System.getProperty("namingHost", "localhost");
namingPort = Integer.parseInt(System.getProperty("namingPort", "1099"));
runOutOfMemory = Boolean.parseBoolean(System.getProperty("runOutOfMemory", "false"));
}
public static class CountingMessageListener implements MessageListener
{
public int messagesReceived;
public void onMessage(Message message)
{
try
{
messagesReceived++;
if (runOutOfMemory)
memoryUsingVector.add(new byte[512000]);
sleep(onMessageSleep);
}
catch (Throwable ex)
{
log.log(Level.WARNING, "Exception in onMessage", ex);
}
}
}
private static class LocalExceptionListener implements ExceptionListener
{
private int subscriberIndex;
public LocalExceptionListener(int subscriberIndex)
{
this.subscriberIndex = subscriberIndex;
}
public void onException(JMSException jmsException)
{
otherException = jmsException;
log.severe("Subscriber " + subscriberIndex + " received asynchronous exception " + jmsException);
}
}
}
{code}