2 Replies Latest reply on Dec 2, 2009 10:09 AM by gmotts_gary.motts

    Duplicated messages - Virtual Topics

    gmotts_gary.motts

      Need help using Virtual Topics and Queues.

       

      I'd like to make use of Virtual Topics/Queues as an alternative to using the servicemix jms:consumer component. 

       

      I can consume messages from a virtual topic as follows:

       

      Predicate filter1 = header("destAppSvcName").isEqualTo("Foo");

      from("activemq:topic:VirtualTopic.TestDurability").filter(filter1)

          .convertBodyTo(String.class)

          .to("activemq:Consumer.A.VirtualTopic.TestDurability);

       

      Whenever I publish messages to the virtual topic named "VirtualTopic.TestDurability"  I receive 2 duplicated messages, that are identical only one has a JMSDestination of "VirtualTopic.TestDurability" and the other as "Consumer.A.VirtualTopic.TestDurability);

       

      I'm using Fuse SM 3.4.0.4 AND ActiveMQ v5.3.0.5.  Within the activemq.xml confiig file I've added the excludeDestinations element as follows, thinking that might be the problem.

       

               

       

      Also note that when I shutdown the service assembly with the camel route, I still receive the messages which is what I want for durability.

       

      Any help or guidence appreciated,

       

      Thanks

       

      Gary

        • 1. Re: Duplicated messages - Virtual Topics
          lvisnick

          Gary,  

             please read through the following article on the subject of Virtual

          destinations.  After the text of the article, please find my comments on

          your forum post.

           

          Article P151451 What are Fuse Message Broker Virtual Destinations and

          how do they work?

           

          Virtual Destinations are logical destinations (i.e. Queues or Topics)

          that map onto one or more physical destinations. Their purpose is to

          overcome some load balancing/HA limitations with durable subscribers.

          How it works:

          1) Producer sends to the logical topic "VirtualTopic.T"

          2) Consumer A can consume from the "VirutalTopic.T" as normal Topic

          consumer.

          3) Consumers B & C can also consume, via a dedicated physical queue

          linked to the logical topic.

          --> -->

                                                     -->

          --> [ConsumerC]

           

          Note: "VirtualQueueConsumer.VirtualTopicT" is a physical queue. This

          means you can have a pool of consumers consuming from the queue,

          allowing you to overcome some of the limitations of durable subscribers.

           

          Configuration fo virtual destinations is via the brokers /conf/activemq.xml:

           

           

          -


          end of article ---

           

          Forum post:

          I can consume messages from a virtual topic as follows:

           

          Predicate filter1 = header("destAppSvcName").isEqualTo("Foo");

          from("activemq:topic:VirtualTopic.TestDurability").filter(filter1)

          .convertBodyTo(String.class)

          .to("activemq:Consumer.A.VirtualTopic.TestDurability);

           

          As you describe it here, your from is just a regular topic consumer,

          that subsequently sends to a Queue associated with that virtual topic.

          It seems like the route should be changed to either:

          1) Change the "from" to be the actual queue backing the virtual topic

          (i.e. "activemq:Consumer.A.VirtualTopic.TestDurability") so that you can

          load balance the incoming Message stream. Messages sent to "

          VirtualTopic.TestDurability" by a regular topic producer will end up in

          this queue.

          2) Change the "to" to be a regular (non-virtual) queue. Using this

          approach your not really exercising the virtual topic feature.

           

          Lorinda

          • 2. Re: Duplicated messages - Virtual Topics
            gmotts_gary.motts

            Thank you very much for the very helpful article!  Now I can understand how to properly implement Virtual Topics to overcome some of the pitfalls we've encountered with traditional durable subscribers (e.g. unique client ids, unsubscribing, clustering...).

             

            Here's the code I used to test durabilty:

             

            Modified Fuse Message Broker activemq.xml:

             

                   

             

            Then in my Camel Route I defined the following:

                   Predicate filter1 = header("destAppSvcName").isEqualTo("Foo");

                    from("activemq:VirtualQueueConsumer.A.VirtualTopic.TestDurability").filter(filter1)

                    .to("activemq:queue:TESTQUEUE");

             

            I then posted a message to the Virtual Topic which was received by the physical VirtualQueueConsumer which was consumed by my TESTQUEUE.  I then shutdown my service assembly and re-published a message to the Virtual Topic-VirtualTopic.TestDurability.  The message was not yet received by the TESTQUEUE because the service assembly was shutdown but the VirtualQueueConsumer did receive the message and I could see it in the ActiveMQ console as Pending.  I then shutdown and restarted Fuse SM and there it was a new message in my TESTQUEUE!  BTW I think there is a Camel bug that forces a SM restart to restart the queue listeners as opposed to simply starting the shutdown service assembly. 

             

            -CHEERS!

             

            -GARY