7 Replies Latest reply on Nov 2, 2016 11:07 AM by bernd.koecke

    Can a subsystem add a property with a computed value to every JMS message?

    bernd.koecke

      Hello,

       

      first my motivation for this feature: In the old days we had an application running on an application server cluster, but now we have small services running on a number of nodes. And calling these services in some order builds a business transaction. If something happens we have to look into the log files of all services. We use ELK to have all log events at one place. But we need a unique id attached to the business transaction to correlate the log events of one call chain.

       

      I'm building a subsystem for WildFly 10 to provide this feature. If a call reaches our server and no id is attached, the subsystem creates a UUID, attaches it and send it behind the scenes to other servers. After creation the id is part of the invocation and logging context and is send to Elasticsearch with every log event. It is not necessary to write code in EJBs for handling this id. Because the EJB3 subsystem has a very fine grained structure, I got it working for EJBs with local, remote and asynchronous calls. I hope I will get it working for REST and SOAP calls, CDI and EJB interceptors, too.

       

      At the moment I try to get it working for JMS (2.0) implemented by the messaging-activemq subsystem in WildFly 10 but I got stuck. I would like to add a property to each JMS message which is send, from an EJB. But at that moment when I inject a connection factory or a JMSContext and send a message, I'm leaving the EJB container and I can't find a way of adding some kind of interceptor to the messaging subsystem. I tried the following so far:

       

      My first idea was sub classing and using the delegator pattern for the connection factory and JMS context.. But because of final classes and methods together with the JMSProducers convenience methods, e.g send(destination, map), I don't think that this can work.

       

      I found the "outgoing-interceptors" in the messaging subsystem configuration and I added one, but its called to late. The invocation context with my tracking id is a ThreadLocal and this kind of interceptor is called after a thread change and so I don't have access to my context and id any more.

       

      There is an instance of ActiveMQs ChannelImpl called from the right thread when a message is sent and it has a currently empty interceptor list, but I can't find a way of adding my interceptor. Maybe I can add one with some dirty hacks, but I'm searching for a solution which doesn't break in the next messaging subsystem version.

       

      Does anybody know a way of adding some kind of interceptor to the messaging subsystem which is in the call chain of the EJB container thread (named e.g. "EJB default - <num>") before a message is sent? Or maybe a completely different solution for adding a property to JMS messages?

       

      Thanks a lot for your help! Regards

      Bernd

        • 1. Re: Can a subsystem add a property with a computed value to every JMS message?
          jbertram

          How are you setting your outgoing-interceptor?  Keep in mind that both the broker and the client can have incoming and outgoing interceptors.  Where and how you set them matters.

          • 2. Re: Can a subsystem add a property with a computed value to every JMS message?
            bernd.koecke

            Hello Justin,

             

            My subsystem configuration starts with:

            <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
              <server name="default">
                <outgoing-interceptors>
                  <class name="...CatMQInterceptor" module="my module"/>
                </outgoing-interceptors>
            ...
            

             

            But this interceptor seems to be part of the receiving side, because it gets called with the following stacktrace and the thread is from ActiveMQ not from the EJB container:

            Thread [Thread-16 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$2@2c7d99dc-884588878)] (Suspended (breakpoint at line 55 in CatMQInterceptor))    
                owns: QueueImpl$DeliverRunner  (id=550)    
                CatMQInterceptor.intercept(Packet, RemotingConnection) line: 55    
                CatMQInterceptor.intercept(Object, RemotingConnection) line: 25    
                ChannelImpl.invokeInterceptors(Packet, List<Interceptor>, RemotingConnection) line: 382    
                ChannelImpl.send(Packet, boolean, boolean) line: 214    
                ChannelImpl.sendBatched(Packet) line: 205    
                CoreSessionCallback.sendMessage(ServerMessage, ServerConsumer, int) line: 75    
                ServerConsumerImpl.deliverStandardMessage(MessageReference, ServerMessage) line: 834    
                ServerConsumerImpl.proceedDeliver(MessageReference) line: 368    
                QueueImpl.proceedDeliver(Consumer, MessageReference) line: 2341    
                QueueImpl.deliver() line: 1859    
                QueueImpl.access$1400(QueueImpl) line: 96    
                QueueImpl$DeliverRunner.run() line: 2551    
                OrderedExecutorFactory$OrderedExecutor$1.run() line: 94    
                ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142    
                ThreadPoolExecutor$Worker.run() line: 617    
                Thread.run() line: 745    
            

             

            But I need my interceptor in the interceptor list of ChannelImpl, that it is used at the beginning of "ChannelImpl.send(packet, boolean, boolean)

            calling interceptors    
            Thread [EJB default - 2] (Suspended)    
                ChannelImpl.send(Packet, boolean, boolean) line: 214    
                ChannelImpl.sendBatched(Packet) line: 205    
                ActiveMQSessionContext.sendFullMessage(MessageInternal, boolean, SendAcknowledgementHandler, SimpleString) line: 363    
                ClientProducerImpl.sendRegularMessage(MessageInternal, boolean, ClientProducerCredits, SendAcknowledgementHandler) line: 293    
                ClientProducerImpl.doSend(SimpleString, Message, SendAcknowledgementHandler, boolean) line: 266    
                ClientProducerImpl.send(SimpleString, Message) line: 124    
                ActiveMQMessageProducer.doSendx(ActiveMQDestination, Message, int, int, long, CompletionListener) line: 476    
                ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 196    
                ActiveMQMessageProducer.send(Destination, Message) line: 184    
                ActiveMQRAMessageProducer.send(Destination, Message) line: 141    
                ActiveMQJMSProducer.send(Destination, Message) line: 99    
                ActiveMQJMSProducer.send(Destination, Map<String,Object>) line: 175    
                CatSampleSecondBean.sendMsg(CatSamplesPojo) line: 92    
            

            where the interceptors of the client side are called. And in contrast to the previous stacktrace, here the thread name shows that I'm still in the EJB container and I should have access to my ThreadLocal. The code in my sendMsg method looks like:

            @Resource(mappedName="java:/JmsXA")
            private QueueConnectionFactory conFactory;
                
            @Resource(mappedName="java:/jms/queue/CatSamples")
            private Queue catQueue;
            ...
            @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
            public void sendMsg(CatSamplesPojo data) {
                try (JMSContext jmsCtx = conFactory.createContext()) {
                    Map<String, Object> msgMap = new HashMap<>();
                    msgMap.put(GlobalConstants.CLIENT_ID_MSG_PROP, data.getClientId());
                    msgMap.put(GlobalConstants.MDB_DELAY_MSG_PROP, data.getMdbDelay().getDelay());
                    msgMap.put(GlobalConstants.MDB_JITTER_MSG_PROP, data.getMdbDelay().getJitter());
                    jmsCtx.createProducer().send(catQueue, msgMap);
                }
            }
            

            The map content is only for my test code. The UUID should be set behind the scenes.

             

            You are completely right. My interceptor is called at the wrong place. But how can I configure my interceptor that it is part of the client side?

             

            But may be my idea is totally wrong and there is a completely other solution for adding a message property.

             

            Thanks for your help!

            • 3. Re: Can a subsystem add a property with a computed value to every JMS message?
              jbertram

              My subsystem configuration starts with:

              1. <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0"> 
              2.   <server name="default"> 
              3.     <outgoing-interceptors> 
              4.       <class name="...CatMQInterceptor" module="my module"/> 
              5.     </outgoing-interceptors> 
              6. ... 

              This is configuring an "outgoing" interceptor for the broker which means it will be called when the broker sends a packet to a client.

               

              But I need my interceptor in the interceptor list of ChannelImpl, that it is used at the beginning of "ChannelImpl.send(packet, boolean, boolean)

              To set an "outgoing" interceptor for your client you'd need to invoke org.apache.activemq.artemis.api.core.client.ServerLocator#setOutgoingInterceptorList from the client.  Of course, this is an implementation-specific method so you'd need to do some casting to get access to the connection factory's underlying ServerLocator.

              • 4. Re: Can a subsystem add a property with a computed value to every JMS message?
                bernd.koecke

                Thanks for the hint, I will try to do this .

                 

                My first idea was to configure the ServerLocator from the XML subsystem configuration, to get a solution which doesn't dive to deep into ActiveMQ implementation and doesn't break in next versions. But I didn't found a way to do it.

                 

                If you say it's ok to access the connection factory implementation, I will try to configure the ServerLocator right after startup/deployment when the connection factory is generated as an injection source.

                 

                Thanks again! Regards

                Bernd

                • 5. Re: Can a subsystem add a property with a computed value to every JMS message?
                  jbertram

                  If you say it's ok to access the connection factory implementation, I will try to configure the ServerLocator right after startup/deployment when the connection factory is generated as an injection source.

                  HornetQ implements JMS on top of our "core" API.  Lots of users leverage the core API directly rather than using JMS.  The ServerLocator is part of the core API so it's not likely to change much if at all between releases.

                  • 6. Re: Can a subsystem add a property with a computed value to every JMS message?
                    bernd.koecke

                    That sounds good. Yesterday I tried to set my interceptor on the ServerLocator, but I got the wrong one and ended up to deep in the system and got called again after a thread change. I will stay tuned to this, but it will take some more time. I will post it here when I succeeded. Thanks for your hint and explanation!

                    • 7. Re: Can a subsystem add a property with a computed value to every JMS message?
                      bernd.koecke

                      I know it was a long time ago when I send my last comment to this post. But now its done and TraceWildFly is part of our internal WildFly based runtime. It took so long because I had to add this feature not only to a WildFly server but also to an embedded Camunda BPM engine. Beside other subsystems (EJB, CDI, JAX-RS, JAX-WS...) I added it to JMS. For a normal connection factory it is as easy as described above. After the ActiveMQ system started I added my interceptor to the connection factories server locator. This is working because a generated connection factory is never destroyed or changed.

                       

                      But for a pooled connection factory it was more difficult. Because a pooled connection factory can be destroyed and created during runtime. So I had to be called each time a new connection is created. For this I build a service which depends on the resource adapter deployment. It gets the connection managers and gets their pool implementation. Then I build a java.util.Proxy around this pool and replaced the original pool with my proxy. Now my invocation handler can add my interceptor each time a new connection is created.

                       

                      The result is that when a call reaches one of our WildFly based runtimes, a UUID is generated and attached to the invocation context, is stored in the logging context (MDC) and printed with each log statement. If the call goes to a service on another WildFly server, the id is transfered, too. All log lines are send to an ELK stack and can be correlated and analyzed there, regardless how many server instances are involved in a business transaction and without any changes to their (business)interfaces.

                       

                      Thanks to all, who helped me in this thread! If others are interested in the code, I can have a look if I get the permission to share it.