Help/Advice need with message grouping
fatbob73 Mar 10, 2016 3:26 PMWe are planning on using to WildFly servers with HornetQ in a cluster.
We are planning on using HornetQ (running on multiple WildFly 9.02 servers) in a cluster. We plan on depositing files (byte arrays) onto the queue. The consumer will get a file from the queue, convert it to PDF and then FTP it to another server. Should something go wrong during this process of converting to PDF or FTPing we do not want to process any other files with the same ID. It is very important that these files are processed in order.
The queue would look something like this:
File 1, ID 1234
File 2, ID 1234
File 3, ID 9348
File 4, ID 1234
File 5, ID 4887
If something goes wrong while processing file 1, we don't want file 2 or 4 to be processed until file 1 can be successfully processed.
Where we see it getting even more complicated is when one WildFly server (server a) picks up file 1 and server b picks up file 2, while server a is having trouble processing file 1. At this point it's too late for server b to know that server a had trouble processing file 1.
Our belief is that message grouping can be the answer to our situation. Is that assumption correct? Is message grouping the answer to our needs?
Here is an example of the code we have now that writes messages to the queue (in this code we are not write bytes[], simply text):
context = new InitialContext(); String connectionFactoryString = System.getProperty("connection.factory", CONNECTION_FACTORY); connectionFactory = (ConnectionFactory) context.lookup(connectionFactoryString); String destinationString = System.getProperty("destination", DESTINATION); destination = (Destination) context.lookup(destinationString); // Create the JMS connection, session, producer, and consumer connection = connectionFactory.createConnection(System.getProperty("username", DEFAULT_USERNAME), System.getProperty("password", DEFAULT_PASSWORD)); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); connection.start(); // Send the specified number of messages for (int i = 0; i < NUM_OF_MESSAGES; i++) { message = session.createTextMessage(i + " Group-0 " + msg.toString()); message.setStringProperty("JMSXGroupID", "Group-0"); producer.send(message); log.info("\t\tSending messages with content ["+i+"]: " + msg.toString()); Thread.sleep(1000); } for (int i = 0; i < NUM_OF_MESSAGES; i++) { message = session.createTextMessage(i + " Group-1 " + msg.toString()); message.setStringProperty("JMSXGroupID", "Group-1"); producer.send(message); log.info("\t\tSending messages with content ["+i+"]: " + msg.toString()); Thread.sleep(1000); }
I'm not even sure if the messages went into groups. How can I verify that?
Here is the example code that we have that is reading from the queue, but I don't know how to handle reading from a specific group:
context = new InitialContext(); String connectionFactoryString = System.getProperty("connection.factory", CONNECTION_FACTORY); connectionFactory = (ConnectionFactory) context.lookup(connectionFactoryString); String destinationString = System.getProperty("destination", DESTINATION); destination = (Destination) context.lookup(destinationString); // Create the JMS connection, session, producer, and consumer connection = connectionFactory.createConnection(System.getProperty("username", DEFAULT_USERNAME), System.getProperty("password", DEFAULT_PASSWORD)); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(destination); connection.start(); do { try { message = (TextMessage)consumer.receiveNoWait(); } catch (javax.jms.IllegalStateException ise) { ise.printStackTrace(); Thread.sleep(3000); message = (TextMessage)consumer.receiveNoWait(); } if (message != null) { log.info("Receiving messages with content: " + message.getText()); Thread.sleep(1000); } } while (message != null);
If not message grouping, what other way can we go about handling our situation?
Thanks!