HORNETQ-264 receiveImmediate and failover issue
clebert.suconic Jan 8, 2010 6:36 PMThis is easy to replicate with a new test on MultiThreadReplicationTestBase
public void testO() throws Exception
{
runTestMultipleThreads(new RunnableT()
{
@Override
public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
{
doTestO(sf, threadNum);
}
}, NUM_THREADS, false);
}
protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws Exception
{
ClientSession sessCreate = sf.createSession(false, true, true);
sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
null,
false);
ClientSession sess = sf.createSession(false, true, true);
sess.start();
ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
for (int i = 0 ; i < 100; i++)
{
assertNull(consumer.receiveImmediate());
}
sess.close();
sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
sessCreate.close();
}
The issue is that ServerConsumer is not taking transfered into consideration.
I could fix it with a hack (this is not a proper fix.. this is just to show what the issue is):
public synchronized void forceDelivery(final long sequence)
{
promptDelivery();
executor.execute(new Runnable()
{
public void run()
{
try
{
// We execute this on the same executor to make sure the force delivery message is written after
// any delivery is completed
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
// hack is in red
if (transferring)
{
Thread.sleep(500);
executor.execute(this);
}
else
{
channel.send(packet);
}
}
catch (Exception e)
{
ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
}
}
});
}
BTW: I only spent 5 minutes on this before my day was over on a Friday. TGIF now!