2 Replies Latest reply on Dec 3, 2013 9:22 PM by Tim Wright

    More control over the execution thread when receiving messages

    Tim Wright Newbie

      I investigating using HornetQ as a replacement message technology for a number of uses cases in our system and I have a specific requirement around control of the execution thread on which messages are received.


      In some of our systems, events can arrive from a variety of sources, including remote-invocation style requests as well as inbound messages from a number of queues. We marshal all these requests onto a single thread as a method of concurrency control. For each type of request we are able to start an XA transaction prior to processing the request on that thread, (though not all cases involve XA transactions).


      For JMS we typically use the MessageConsumer API to manage this as this it allows you to receive a request to process a message and dispatch to a thread under your control wrapping it with the necessary transaction control if required. We can achieve similar things with requests coming from other sources.


      I see that HornetQ does not support this interface, and that there is no mechanism in the core API to do this sort of thing. I suppose it is possible using an Interceptor to effectively 'peek' a message and then schedule a job which will perform a blocking receive (having started a transaction if required) on some thread. However, this feels less than ideal.


      Ideally I would like the API to enable two extra things which I can summarise with the following pseudo-code


      ExecutorService myExecutor = ....

      // 1. Use a message handle on a executor service of my choice...

      MessageConsumer consumer = session.createConsumer( myExecutor );


      // 2. Provide a message handler which enables enlistment prior to receiving a message

      consumer.addMessageHandlerEx( new MessageHandlerEx() {


           void aboutToReciveMessage()  {




           void onMessage() {





      I have looked at the code a little and it seems plausible - but am not so not sure if this is really the right approach. Something along these lines would be good.



        • 1. Re: More control over the execution thread when receiving messages
          Clebert Suconic Master

          you should have used the user's forum for this... we would have picked this sooner...



          Why do you need an interceptor for this.. why you just don't use a super class on your consumer doing what you need... your onMessage would just call the beforeReceive and then an abstract method where you could use on all your consumers?


          We do something similar on our MDB interceptor...



          And I'm not sure why you call this thread control? it's just a simple executor after a message arrives? I got confused on your usecase.. maybe you need Message grouping?

          • 2. Re: More control over the execution thread when receiving messages
            Tim Wright Newbie



            Sorry to have confused you - let me see if I can describe it a little better! (it's not really related to message groups)


            By thread control, I really meant able to supply your own thread to dispatch messages on in the form on an ExecutorService. I would like to do this since we use the same ExecutorService to handle work from many other sources in addition to HornetQ. We would also like in some cases to start an XA transaction prior to processing a message.


            There are there two issues with the regular API.

            1. Using receive() is not sufficient, since you can't practically do this in combination with an ExecutorService.
            2. It's my understanding that you can't really use  MessageHandler.onMessage() with XA transaction as you need to enlist the resource prior to callback, hence the need to start the the transaction just before the message is dispatched.


            In essence it's very like implementing MDB, I suppose I was thinking about a solution through a public API. (The JMS API has the MessageConsumer API which allows you to do this sort of thing). I spent some time looking at the public API, and figured that it could be done with an interceptor, but did not really like the idea.


            I did take a quick peek at the internals,and figured it was doable. Generally I am not a fan of sub-classing internal components, unless the are designed to be sub-classes. I would need to look in more detail at your suggestion, but my initial impression was that it would be fiddly to do, as the internals or the core API are (understandably) not designed to be easily sub-classed or wrapped for this sort of purpose.


            I hope that clarifies my problem!