Paging doesnt seem to be working
into_java Sep 7, 2012 3:32 PMHi All,
The version of HORNETQ is am using is 2.1.2 Final
I have a simple queue as defined here where i have set the size of the queue to be 20 Mbps and because of the hig rate at which messages are getting inserted i see that the queue gets full and i also see in the paging directory the 000001.page and 00002.page file sgetting created.
Once the producer stops after dumping data i see that the .page files gets deleted but i dont see those messages on the other side of my consumer.
My consumer only get the messages in my queue and not the one which were supposed to be depaged and available.
Also in the jmx console i observe that the message added property doesnt get incremented for when the depaging starts .. what does that mean ? Even after depage the messages are some how getting dropped ? I would like to get some pointers of troubleshooting this.
<address-setting match="queue.event.outageQueue">
<dead-letter-address>queue.event.outageQueue.deadLetterQueue</dead-letter-address>
<max-delivery-attempts>10</max-delivery-attempts>
<redelivery-delay>5000</redelivery-delay>
<max-size-bytes>20971520</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<send-to-dla-on-no-route>true</send-to-dla-on-no-route>
<address-full-policy>PAGE</address-full-policy>
</address-setting>
My configuartion for the Queue is
try {
if (tc == null) {
if ( connectionParams.size() != 0 ) {
this.factoryOutage = HornetQClient
.createClientSessionFactory(new TransportConfiguration(
NettyConnectorFactory.class.getName(), connectionParams));
} else {
this.factoryOutage = HornetQClient
.createClientSessionFactory(new TransportConfiguration(
NettyConnectorFactory.class.getName()));
}
} else {
this.factoryOutage = HornetQClient.createClientSessionFactory(tc);
}
} catch (Exception e) {
log.error("Failed in creating factory", e);
}
// Create a queue
try {
factoryOutage.setBlockOnDurableSend(false);
factoryOutage.setBlockOnNonDurableSend(false);
factoryOutage.setPreAcknowledge(true);
factoryOutage.setConsumerWindowSize(0); //No Buffering
this.coreSessionOutage = this.factoryOutage.createSession(true, true,20000);
this.producerSessionOutage = this.factoryOutage.createSession(true, true,20000);
log.info("Core Session Created " + this.coreSessionOutage.toString());
} catch (HornetQException e) {
log.error("Error in Creating a Session from FactoryOutage", e);
}
// Create a Session for Consumer.
SimpleString sQueueName = SimpleString.toSimpleString(outageQueueName);
try {
ClientSession.QueueQuery qq = coreSessionOutage
.queueQuery(sQueueName);
if (!qq.isExists()) {
this.coreSessionOutage.createQueue(sQueueName, sQueueName, true);
}
log.info("Outage Queue Created or Exists");
} catch (HornetQException e) {
log.error("Error in Creating Queue", e);
}
// Create a Consumer
try {
this.clientConsumerOutage = coreSessionOutage.createConsumer(sQueueName);
// log.info("Consumer created "+ clientConsumer.toString());
} catch (HornetQException e) {
log.error("Error in Creating a Outage Consumer", e);
}
if (this.clientConsumerOutage != null) {
try {
this.clientConsumerOutage.setMessageHandler(new OutageEventConsumer());
} catch (HornetQException e) {
log.error("Error in Creating a Message Handler for Outage Consumer", e);
}
}
// start the consumer session
try {
this.coreSessionOutage.start();
} catch (HornetQException e) {
log.error("Error in Starting Outage Core Session", e);
}
And the way i send messages using producer is
SimpleString uid = SimpleString.toSimpleString(evnObj); | |
ClientMessage clientM = clientS.createMessage(false); | |
//clientM.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_ID, uid); | |
//clientM.putLongProperty(ClientMessage.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + outageSuppressionTime ); | |
// Convert or Serialize Event Object | |
ByteArrayOutputStream out = new ByteArrayOutputStream(); | |
ObjectOutputStream objOut = null; | |
try { | |
objOut = new ObjectOutputStream(out); | |
objOut.writeObject(evnObj); | |
} catch (IOException e1) { | |
log.error("Error during Serilizing the Event Object", e1); | |
stop(); | |
} | |
byte[] bytes = out.toByteArray(); | |
log.info("EventObject byte size = " + bytes.length); | |
//log.info("Event Object which is send = " + evnObj.toString()); | |
clientM.putBytesProperty("eventObject", bytes); | |
//Send the Message | |
try { | |
clientP.send(clientM); | |
log.info("Outage Message send done"); | |
} catch (HornetQException e) { | |
log.error("Error in sending Message from Producer : MSG = " | |
+ clientM.toString(), e); | |
} finally { | |
stop(); | |
} |
Is there any more configuration that i need for paging to work ? Would be great if some one can give me pointers.