9 Replies Latest reply on Jan 31, 2017 12:10 AM by Manohar M

    Efficient Concurrent Producer/Consumer Pattern

    Drew Kutchar Newbie

      Hi Guys,

       

      I need to implement a producer/consumer pattern where one or many producers write to a queue and many concurrent consumers read from the queue. I need a at-least-once guarantee, so if one of the consumers fail, then that message needs to stay in the queue for another consumer to pick it up (also I will need Dead Letter handling too, for messages that keep killing the consumers).

       

      How can I implement this efficiently using the Core API (not JMS).

       

      Thanks,

       

      Drew

        • 1. Re: Efficient Concurrent Producer/Consumer Pattern
          Andy Taylor Master

          the core API is quite similar to the JMS API just more granular, you could read the javadocs or take a look at some of the tests or examples.

          • 2. Re: Efficient Concurrent Producer/Consumer Pattern
            Drew Kutchar Newbie

            Hi Andy,

             

            I have already read the guide and the docs. But there are two issues here:

             

            1. Initially I was thinking of acknowledging the message after successful processing, but that causes all the messages before this message (that might be in the middle of processing in different consumers) to be acknowledged too.

             

            2. If I were to use a separate Session per consumer and commit/rollback that, wouldn't that be inefficient according to the documentation?

             

            That's why I wanted to see what would be the best approach.

             

            BTW, I'm using embedded HornetQ w/o JMS.

             

            -- Drew

            • 3. Re: Efficient Concurrent Producer/Consumer Pattern
              Andy Taylor Master

              1. Initially I was thinking of acknowledging the message after successful processing, but that causes all the messages before this message (that might be in the middle of processing in different consumers) to be acknowledged too.

              yes, thats the same as the JMS spec when using client commit since you commit on the session

              2. If I were to use a separate Session per consumer and commit/rollback that, wouldn't that be inefficient according to the documentation?

              Im not sure what you mean here, its inefficent to to commit/rollback every message you should batch them if poss. if you need to ack them separately or use transactions on the session you should use 1 session/consumer.

               

              remember that everything in session is single threaded so although you have multiple consumers its the session that handles all the message traffic so having multiple consumers won't increase performance, using 1 consumer, receiving 10 messages and farming them off to 10 threads would be exactly the same peformance wise

              • 4. Re: Efficient Concurrent Producer/Consumer Pattern
                Drew Kutchar Newbie


                2. If I were to use a separate Session per consumer and commit/rollback that, wouldn't that be inefficient according to the documentation?

                Im not sure what you mean here, its inefficent to to commit/rollback every message you should batch them if poss. if you need to ack them separately or use transactions on the session you should use 1 session/consumer.

                 

                remember that everything in session is single threaded so although you have multiple consumers its the session that handles all the message traffic so having multiple consumers won't increase performance, using 1 consumer, receiving 10 messages and farming them off to 10 threads would be exactly the same peformance wise

                 

                I see, I guess for my use case I don't have any other choice except to create a brand new session per consumer (since I'm using Akka and my consumers thread are managed by Akka), consume the messages and then commit/rollback the transaction.

                 

                If I were to create batches, how would that work? Let's say if my batch size is 10, but there are only 9 messages in the queue. Is there a batch get operation in HornetQ?

                • 5. Re: Efficient Concurrent Producer/Consumer Pattern
                  Andy Taylor Master

                  If I were to create batches, how would that work? Let's say if my batch size is 10, but there are only 9 messages in the queue. Is there a batch get operation in HornetQ?

                  receive message 1 using receive() and then receive the next 9 using receiveImmediate(), if it returns null process what you have so far

                  • 6. Re: Efficient Concurrent Producer/Consumer Pattern
                    Drew Kutchar Newbie

                    That's awesome, thanks Andy.

                     

                    BTW, when I do batch processing (let's say 10 messages at a time) and one of the messages fails but the 9 go thru I will have to rollback the transaction, which is not a problem since are messages are idempotent but how can I mark that one message as failed so it will eventually be cosidered a Dead Letter?

                    • 7. Re: Efficient Concurrent Producer/Consumer Pattern
                      Manohar M Newbie

                      Hello All,

                       

                      Sorry for picking up an older post.

                      But. we have a bad situation in our production system.

                       

                      We are using 2.1.2.Final version of Hornetq in JBoss 6.1.0 final.

                      Currently, we are reusing the connection and session and are created as shown below:

                       

                      cachedQueueConnection = cachedConnectionFactory.createConnection();

                       

                                  // KEY - register for exception callbacks

                                  cachedQueueConnection.setExceptionListener(new ExceptionJMSListenerImpl());

                       

                       

                                  session = cachedQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                                 

                                  producer = session.createProducer(cachedQueue);

                       

                       

                                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                       

                      We are pushing data to the queue as shown below:

                       

                      ObjectMessage contextMessage = session.createObjectMessage();

                      contextMessage.setObject(context);

                      producer.send(contextMessage);

                      Application runs fine for few days and we have a situation sometimes that, "producer.send(contextMessage);"

                      is getting blocked. Its returning only after sometime (sometimes ~15s to 15 min).

                      This pushing to the queue happens within a bean managed transaction. (This transaction includes database operation such as update, insert and delete as well and as a final action, this data of collected information on what is being done is pushed to the queue )

                      This inturn makes the transaction inactive since the locks held by the transaction will not be committed.

                       

                      There are 2 typical scenarios out of this.

                      It is observed sometimes that, eventhough producer call is not returned (Thread goes to the waiting state and returns later), consumer on the other end which is a MDB listening to the queue and responding to onMessage:

                       

                      MDBean implements MessageListener{

                       

                      public void onMessage(Message msg) {

                      ObjectMessage objMsg = (ObjectMessage) msg;    

                      objMsg.getObject();

                      }

                      }

                       

                      Sometimes, eventhough, producer.send() has not returned, on the other end, message is being already processed.

                      But, on the other scenario, message is not consumed.

                       

                      Is there anything wrong in the code? We are failing to understand why producer is getting blocked!

                       

                      Any hints would really be helpful.

                       

                      Thanks a lot in advance!

                      • 8. Re: Efficient Concurrent Producer/Consumer Pattern
                        Justin Bertram Master

                        Start a new forum thread with your question rather than commenting on multiple old threads.