clustered-queue example is not working.
verystrongjoe Apr 17, 2013 1:39 AMI'm wacthing the example, clustered queue, 2.3.0.BETA3.
During running, I saw two kinds of errors, i didn't add or modify example source.
I modfied source for consume behavior to be invoked not by receivce function of consumer class
but the listener injected to consumer.. so I added two listeners.
consumer0.setMessageListener(listener5000); // it takes 5000 milli-secs.
consumer1.setMessageListener(listener); // it takes 0 sec.
and the listener implementation is like below..
private final MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("*********" + message.toString() );
}
};
private final MessageListener listener5000 = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("5000 *********" + message.toString() );
}
};
finally, i removed the below
// Step 14. We now consume those messages on *both* server 0 and server 1.
// We note the messages have been distributed between servers in a round robin fashion
// JMS Queues implement point-to-point message where each message is only ever consumed by a
// maximum of one consumer
// for (int i = 0; i < numMessages; i += 2)
// {
// TextMessage message0 = (TextMessage)consumer0.receive(5000);
//
// System.out.println("Got message: " + message0.getText() + " from node 0");
//
// TextMessage message1 = (TextMessage)consumer1.receive(5000);
//
// System.out.println("Got message: " + message1.getText() + " from node 1");
// }
but two error is occcured,
1) one error is like below
WARN: HQ214084: Invalid concurrent session usage. Sessions are not supposed to be used by more than one thread concurrently.
java.lang.Exception: trace
at org.hornetq.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1316)
at org.hornetq.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:800)
at org.hornetq.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1149)
at org.hornetq.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:752)
at org.hornetq.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:96)
at org.hornetq.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:133)
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1016)
at org.hornetq.core.client.impl.ClientConsumerImpl.access$400(ClientConsumerImpl.java:52)
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1161)
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
2) and the other is like below..
ERROR: HQ224079: Bridge Failed to ack
java.lang.IllegalStateException: Journal must be in state=LOADED, was [STOPPED]
at org.hornetq.core.journal.impl.JournalImpl.checkJournalIsLoaded(JournalImpl.java:1001)
at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:824)
at org.hornetq.core.persistence.impl.journal.JournalStorageManager.storeAcknowledge(JournalStorageManager.java:895)
at org.hornetq.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:933)
at org.hornetq.core.server.cluster.impl.BridgeImpl.sendAcknowledged(BridgeImpl.java:476)
at org.hornetq.core.client.impl.ClientSessionImpl.commandConfirmed(ClientSessionImpl.java:1333)
at org.hornetq.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:657)
at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:586)
at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:547)
at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:523)
at org.hornetq.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1662)
at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:72)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:281)
at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.decode(HornetQFrameDecoder2.java:169)
at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.messageReceived(HornetQFrameDecoder2.java:134)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.oio.OioWorker.process(OioWorker.java:72)
at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:73)
at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:52)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:176)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
and I have last question.
the listener is asyncronous?? how can I set synchronous listener?