1 2 Previous Next 24 Replies Latest reply on Apr 24, 2008 11:33 AM by timfox

    Clustered server preference

    chipschoch

      JBoss AS 4.2.2.ga, JBM1.4.0.SP1.

      I several clients that connect to a 2 machine cluster (devapp1 and devapp2). Below is the JMSProvider I use. On one machine I have the naming provider with devapp1 first in the list. On the second I have devapp2, as shown here. The second machine always connects to devapp1 unless I completely remove the reference to devapp1 from the list. Then it it will connect to devapp2. I thought it was supposed to try to connect in the order that they are listed in java.naming.provider.url. Is this an incorrect assumption?

      I am able to determine which machine it has connection by looking at the number of consumers the app servers have connected to them.

      <mbean code="org.jboss.jms.jndi.JMSProviderLoader"
       name="jboss.messaging:service=JMSProviderLoader,name=ConversionJMSProvider">
       <attribute name="ProviderName">ConversionJMSProvider</attribute>
       <attribute name="ProviderAdapterClass">
       org.jboss.jms.jndi.JNDIProviderAdapter
       </attribute>
      
       <attribute name="FactoryRef">ClusteredXAConnectionFactory</attribute>
       <attribute name="QueueFactoryRef">ClusteredXAConnectionFactory</attribute>
       <attribute name="TopicFactoryRef">ClusteredXAConnectionFactory</attribute>
      
       <attribute name="Properties">
       java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
       java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
       java.naming.provider.url=jnp://devapp2.qa.cin.int:1100,jnp://devapp1.qa.cin.int:1100
       jnp.disableDiscovery=true
       jnp.partitionName=dev.application
       jnp.discoveryGroup=228.1.2.4
       jnp.discoveryPort=1102
       jnp.discoveryTTL=16
       jnp.discoveryTimeout=5000
       jnp.maxRetries=1
       </attribute>
       </mbean>




        • 1. Re: Clustered server preference
          clebert.suconic

          JCA is to be used internally on the application server.

          If you have external clients, why do you need the JCA adapter, why you can't just use ClusteredXAConnectionFactory from the client side?


          java.naming.provider.url=jnp://devapp2.qa.cin.int:1100,jnp://devapp1.qa.cin.int:1100


          I'm not sure what this is doing.. please ask it on the JCA forum or look for some documentation.

          But I believe you messing up with the JCA's load balance and the JBossMessaging load balancing. i.e. you are making up a big mess.

          The ClsuteredXAConnectionFactory already has a load balancing, and you are putting another load balancing in top of that on this new JCA adapter you're creating.

          Usually, we prefer to have MDBs aways connecting to the local queue (there is no point on doing a cluster round trip if there is a consumer available on the current node), and then relay on message redistribution if a server is too busy.

          You should clear up this!

          • 2. Re: Clustered server preference
            chipschoch

            Clearly I have been having some serious misunderstandings on how this works.

            1. My clients use the JCA adapter because we develop on windows machines that can run a windows only conversion library, so the "ConversionJMSProvider" is defined to connect to localhost on our machines and to connect to a cluster on the deployment environment. Our deployment environment has only the conversion service running on a windows JBoss, which connects to a cluster of linux JBoss AS. I thought this was the way to do that.

            2.

            java.naming.provider.url=jnp://devapp2.qa.cin.int:1100,jnp://devapp1.qa.cin.int:1100


            I was under the (mis) understanding that this was how you specified the servers in the cluster. We were initially having issues with discovery. I think that it was related to firewall settings or something.

            3. I (mistakenly, apparently) thought that a producer that uses the ClusteredConnectionFactory would round robin the messages as they were posted. We have a service that runs as a singleton that produces messages and we want them to be round-robined to the consumers running on each node. It seemed reasonable to me.

            After running a series of tests I see that virtually all of my assumptions were ass-backward. Time to rethink my strategy.

            • 3. Re: Clustered server preference
              chipschoch

              By the way.

              From the JBoss Messaging Documentation 3.1

              A JMS client uses HA JNDI to lookup the connection factory. When creating connections using that connection factory a client side load balancing policy will automatically chose a node to connect to.


              From the Clustering Guide 1.2.2

              The JNDI client needs to be aware of the HA-JNDI cluster. You can pass a list of JNDI servers (i.e., the nodes in the HA-JNDI cluster) to the java.naming.provider.url JNDI setting in the jndi.properties file. Each server node is identified by its IP address and the JNDI port number. The server nodes are separated by commas (see Section 1.2.3, JBoss configuration on how to configure the servers and ports).
              java.naming.provider.url=server1:1100,server2:1100,server3:1100,server4:1100



              My use of the JMSProviderLoader in my client

              // Get local InitialContext and look up the ProviderLoader
              InitialContext ic = ConnectionManager.getInitialContext ();
              JNDIProviderAdapter adapter = (JNDIProviderAdapter) ic.lookup ("ConversionJMSProvider");
              
              // Get the properties that we need for our target context and
              // then create the target InitialContext
              ic= new InitialContext (adapter.getProperties ());
              
              // Use the queue factory ref name from the JMSProviderLoader for the lookup
              ConnectionFactory factory = (ConnectionFactory) ic.lookup (adapter.getQueueFactoryRef ());
              


              Doing it this way essentially dereferences which connection factory I use based on the environment my service is running in.


              • 4. Re: Clustered server preference
                ataylor

                take a look at the DistributedQueueExample.

                The load balancing is done by the clustered connection factory, on createConnection(..) it will return a connection to one of the nodes in the cluster in a round robin fashion. From then on all messages are routed through this node. You dont need to use ha jndi if you're using a clustered connection factory.

                If your JMS client is running inside the App Server then you should always route to the local queue to save on round trips.

                • 5. Re: Clustered server preference
                  clebert.suconic

                   

                  "chip_schoch" wrote:
                  By the way.

                  From the JBoss Messaging Documentation 3.1




                  There is no such thing as JBoss Messaging Documentation 3.1. Maybe you are from the future? :-) (eh eh... just kidding!)


                  I believe you are confusing JBossMQ with JBossMessaging docs.

                  The only docs I'm aware for JBossMessaging is the ones deployed at our project page. You should be using 1.4.0.SP3 if you are under JBoss 4.2.X, or 1.4.1 if you are planning to use JBoss5:


                  http://www.jboss.org/file-access/default/members/jbossmessaging/freezone/docs/userguide-1.4.0.SP3/html_single/index.html


                  • 6. Re: Clustered server preference
                    chipschoch

                    That was Chapter 3.1 and Chapter 1.2.2 respectively. Here is the link http://www.jboss.org/file-access/default/members/jbossmessaging/freezone/docs/userguide-1.3.0.GA/html/c_overview.html

                    It looks like it was the 1.3 UserGuide. I did initially set this stuff up last year. It is only now that I have begun taking a closer look at how the load balancing is working.

                    To that end I have written a test program based in the distributed queue example. It posts 50 messages to a distributed queue then creates 2 listeners and consumes the messages. I was suprised to see that the message consumption was so random. I would have expected FIFO consumption. I tried setting DefaultPreserveOrdering=true but that caused all the message to be consumed by one listener. Essentially, it defeated load balancing. Is there a way to get FIFO consumption and load balancing? The Random House Unabridged dictionary defines Queue As:

                    3. Computers. a FIFO-organized sequence of items, as data, messages, jobs, or the like, waiting for action.


                    Below is my test log output.

                    [2008-04-16 11:41:11,249] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_0
                    [2008-04-16 11:41:11,311] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_1
                    [2008-04-16 11:41:11,327] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_2
                    [2008-04-16 11:41:11,343] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_3
                    [2008-04-16 11:41:11,358] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_4
                    [2008-04-16 11:41:11,374] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_5
                    [2008-04-16 11:41:11,390] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_6
                    [2008-04-16 11:41:11,405] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_7
                    [2008-04-16 11:41:11,421] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_8
                    [2008-04-16 11:41:11,436] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_9
                    [2008-04-16 11:41:11,452] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_10
                    [2008-04-16 11:41:11,468] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_11
                    [2008-04-16 11:41:11,483] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_12
                    [2008-04-16 11:41:11,499] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_13
                    [2008-04-16 11:41:11,515] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_14
                    [2008-04-16 11:41:11,530] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_15
                    [2008-04-16 11:41:11,546] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_16
                    [2008-04-16 11:41:11,561] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_17
                    [2008-04-16 11:41:11,577] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_18
                    [2008-04-16 11:41:11,593] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_19
                    [2008-04-16 11:41:11,608] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_20
                    [2008-04-16 11:41:11,624] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_21
                    [2008-04-16 11:41:11,655] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_22
                    [2008-04-16 11:41:11,671] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_23
                    [2008-04-16 11:41:11,686] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_24
                    [2008-04-16 11:41:11,702] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_25
                    [2008-04-16 11:41:11,718] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_26
                    [2008-04-16 11:41:11,733] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_27
                    [2008-04-16 11:41:11,749] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_28
                    [2008-04-16 11:41:11,765] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_29
                    [2008-04-16 11:41:11,780] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_30
                    [2008-04-16 11:41:11,796] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_31
                    [2008-04-16 11:41:11,811] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_32
                    [2008-04-16 11:41:11,811] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_33
                    [2008-04-16 11:41:11,843] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_34
                    [2008-04-16 11:41:11,843] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_35
                    [2008-04-16 11:41:11,874] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_36
                    [2008-04-16 11:41:11,874] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_37
                    [2008-04-16 11:41:11,890] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_38
                    [2008-04-16 11:41:11,905] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_39
                    [2008-04-16 11:41:11,921] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_40
                    [2008-04-16 11:41:11,936] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_41
                    [2008-04-16 11:41:11,952] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_42
                    [2008-04-16 11:41:11,968] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_43
                    [2008-04-16 11:41:12,015] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_44
                    [2008-04-16 11:41:12,046] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_45
                    [2008-04-16 11:41:12,061] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_46
                    [2008-04-16 11:41:12,077] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_47
                    [2008-04-16 11:41:12,093] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_48
                    [2008-04-16 11:41:12,093] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_49
                    [2008-04-16 11:41:12,389] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_0
                    [2008-04-16 11:41:12,593] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_1
                    [2008-04-16 11:41:13,624] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_19
                    [2008-04-16 11:41:13,983] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_20
                    [2008-04-16 11:41:14,749] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_37
                    [2008-04-16 11:41:15,233] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_42
                    [2008-04-16 11:41:15,842] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_8
                    [2008-04-16 11:41:16,358] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_16
                    [2008-04-16 11:41:16,936] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_28
                    [2008-04-16 11:41:17,452] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_36
                    [2008-04-16 11:41:17,999] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_48
                    [2008-04-16 11:41:18,592] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_9
                    [2008-04-16 11:41:19,139] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_21
                    [2008-04-16 11:41:19,639] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_31
                    [2008-04-16 11:41:20,202] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_43
                    [2008-04-16 11:41:20,717] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_4
                    [2008-04-16 11:41:21,311] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_15
                    [2008-04-16 11:41:21,811] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_26
                    [2008-04-16 11:41:22,405] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_41
                    [2008-04-16 11:41:22,905] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_5
                    [2008-04-16 11:41:23,577] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_22
                    [2008-04-16 11:41:24,155] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_34
                    [2008-04-16 11:41:24,639] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_47
                    [2008-04-16 11:41:25,373] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_14
                    [2008-04-16 11:41:25,780] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_30
                    [2008-04-16 11:41:26,576] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_6
                    [2008-04-16 11:41:27,030] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_17
                    [2008-04-16 11:41:27,686] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_39
                    [2008-04-16 11:41:28,092] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_3
                    [2008-04-16 11:41:28,858] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_29
                    [2008-04-16 11:41:29,358] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_46
                    [2008-04-16 11:41:30,029] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_25
                    [2008-04-16 11:41:30,561] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_45
                    [2008-04-16 11:41:31,295] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_27
                    [2008-04-16 11:41:31,795] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_2
                    [2008-04-16 11:41:32,561] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_38
                    [2008-04-16 11:41:32,904] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_13
                    [2008-04-16 11:41:33,654] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_10
                    [2008-04-16 11:41:33,998] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_32
                    [2008-04-16 11:41:34,795] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_24
                    [2008-04-16 11:41:35,357] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_44
                    [2008-04-16 11:41:36,014] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_23
                    [2008-04-16 11:41:36,435] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_11
                    [2008-04-16 11:41:37,123] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_18
                    [2008-04-16 11:41:37,560] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_33
                    [2008-04-16 11:41:38,326] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_7
                    [2008-04-16 11:41:38,701] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_35
                    [2008-04-16 11:41:39,435] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_49
                    [2008-04-16 11:41:39,763] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_40
                    [2008-04-16 11:41:40,529] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_12


                    For completeness here is the program that produced this output:

                    package com.eLynx.Utility.test;
                    
                    
                    
                    import java.util.Properties;
                    
                    import javax.jms.Connection;
                    import javax.jms.ConnectionFactory;
                    import javax.jms.JMSException;
                    import javax.jms.Message;
                    import javax.jms.MessageConsumer;
                    import javax.jms.MessageListener;
                    import javax.jms.MessageProducer;
                    import javax.jms.Queue;
                    import javax.jms.QueueSession;
                    import javax.jms.Session;
                    import javax.jms.TextMessage;
                    
                    import javax.naming.Context;
                    import javax.naming.InitialContext;
                    import javax.naming.NamingException;
                    
                    import org.apache.log4j.Logger;
                    
                    /**
                     * Class JBMTest
                     */
                    
                    public class JBMTest
                    {
                     static Logger logger = Logger.getLogger (JBMTest.class);
                     static MessageProducer m_producer = null;
                     static Queue m_producerQueue = null;
                     static Connection m_jmsConnection = null;
                     static QueueSession m_jmsSession = null;
                     static InitialContext m_initialContext_1 = null;
                     static InitialContext m_initialContext_2 = null;
                     static ConnectionFactory m_factory_1 = null;
                     static ConnectionFactory m_factory_2 = null;
                    
                     /**
                     * Method: initialize
                     *
                     *
                     * @throws NamingException
                     */
                    
                     static void initialize () throws NamingException
                     {
                     Properties p = new Properties ();
                     p.put (Context.PROVIDER_URL, "jnp://devapp1.qa.cin.int:1100");
                     p.put (Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                     p.put (Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
                    
                     m_initialContext_1 = new InitialContext (p);
                     m_factory_1 = (ConnectionFactory) m_initialContext_1.lookup ("/LoadBalanceConnectionFactory");
                    
                    
                     p.put (Context.PROVIDER_URL, "jnp://devapp2.qa.cin.int:1100");
                     m_initialContext_2 = new InitialContext (p);
                     m_factory_2 = (ConnectionFactory) m_initialContext_2.lookup ("/LoadBalanceConnectionFactory");
                     }
                    
                     /**
                     * Method: main
                     *
                     *
                     * @param args
                     */
                    
                     public static void main (String[] args)
                     {
                     JBMTest tester = null;
                     try
                     {
                     initialize ();
                    
                     tester = new JBMTest ();
                     tester.sendMessages (50);
                     tester.consumeMessages (50);
                    
                     }
                     catch (Exception e)
                     {
                     logger.error (e.getMessage (), e);
                     }
                     finally
                     {
                     try
                     {
                     if (tester != null)
                     tester.cleanup ();
                     }
                     catch (Exception e)
                     {
                     logger.error ("Tester cleanup exception", e);
                     }
                     }
                     }
                    
                     /**
                     * Method: cleanup
                     *
                     *
                     * @throws JMSException
                     */
                    
                     void cleanup () throws JMSException
                     {
                     if (m_producer != null)
                     m_producer.close ();
                     if (m_jmsSession != null)
                     m_jmsSession.close ();
                     if (m_jmsConnection != null)
                     m_jmsConnection.close ();
                     }
                    
                     /**
                     * Method: consumeMessages
                     *
                     *
                     * @param count
                     */
                    
                     void consumeMessages (int count)
                     {
                     TestListener listener1 = null;
                     TestListener listener2 = null;
                    
                     try
                     {
                     listener1 = new TestListener ("Listener_1", m_initialContext_1, m_factory_1);
                     listener2 = new TestListener ("Listener_2", m_initialContext_2, m_factory_2);
                     listener1.m_jmsConnection.start ();
                     listener2.m_jmsConnection.start ();
                    
                     int consumedCount = listener1.m_count + listener2.m_count;
                    
                     while (consumedCount < count)
                     {
                     Thread.sleep (1000);
                     }
                     }
                     catch (Exception e)
                     {
                     logger.error (e.getMessage (), e);
                     }
                     finally
                     {
                     try
                     {
                     if (listener1 != null)
                     listener1.cleanup ();
                    
                     if (listener2 != null)
                     listener2.cleanup ();
                     }
                     catch (Exception e)
                     {
                     logger.error ("Cleanup exception", e);
                     }
                     }
                     }
                    
                     /**
                     * Method: sendMessages
                     *
                     *
                     * @param count
                     */
                    
                     void sendMessages (int count)
                     {
                     try
                     {
                     m_jmsConnection = m_factory_1.createConnection ();
                    
                     m_producerQueue = (Queue) m_initialContext_1.lookup ("/queue/testDistributedQueue");
                     m_jmsConnection = m_factory_1.createConnection ();
                     m_jmsSession = (QueueSession) m_jmsConnection.createSession (false, Session.AUTO_ACKNOWLEDGE);
                     m_producer = m_jmsSession.createProducer (m_producerQueue);
                    
                     for (int i = 0; i < count; i++)
                     {
                     logger.info ("Queueing message: Test_Message_" + i);
                     TextMessage tm = m_jmsSession.createTextMessage ("Test_Message_" + i);
                     m_producer.send (tm);
                     }
                     }
                     catch (Exception e)
                     {
                     logger.error (e.getMessage (), e);
                     }
                     }
                    }
                    
                    /**
                     * Class TestListener
                     */
                    
                    class TestListener implements MessageListener
                    {
                     static Logger logger = Logger.getLogger (TestListener.class);
                     int m_count = 0;
                     MessageConsumer m_consumer = null;
                     Queue m_responseQueue = null;
                     Connection m_jmsConnection = null;
                     QueueSession m_jmsSession = null;
                     String m_name = null;
                    
                     /**
                     * Constructor: TestListener
                     *
                     *
                     * @param name
                     * @param ic
                     * @param factory
                     *
                     * @throws JMSException
                     * @throws NamingException
                     */
                    
                     TestListener (String name,
                     InitialContext ic,
                     ConnectionFactory factory) throws NamingException, JMSException
                     {
                     m_responseQueue = (Queue) ic.lookup ("/queue/testDistributedQueue");
                     m_jmsConnection = factory.createConnection ();
                     m_jmsSession = (QueueSession) m_jmsConnection.createSession (false, Session.AUTO_ACKNOWLEDGE);
                     m_consumer = m_jmsSession.createConsumer (m_responseQueue);
                     m_consumer.setMessageListener (this);
                     m_name = name;
                     }
                    
                     /**
                     * Method: cleanup
                     *
                     *
                     * @throws JMSException
                     */
                    
                     void cleanup () throws JMSException
                     {
                     if (m_consumer != null)
                     m_consumer.close ();
                     if (m_jmsSession != null)
                     m_jmsSession.close ();
                     if (m_jmsConnection != null)
                     m_jmsConnection.close ();
                     }
                    
                     /**
                     * Method: onMessage
                     *
                     *
                     * @param arg0
                     */
                    
                     public void onMessage (Message arg0)
                     {
                     try
                     {
                     TextMessage msg = (TextMessage) arg0;
                     logger.info (m_name + " received message: " + msg.getText ());
                     Thread.sleep (1000);
                     m_count++;
                     }
                     catch (Exception e)
                     {
                     logger.error ("Exception", e);
                     }
                     }
                    }
                    


                    • 7. Re: Clustered server preference
                      clebert.suconic

                      Ahhh... *Chapter* 3.1!

                      First of all.. don't use 1.3.0.. you will find several bugs that we have fixed... including the documentation.

                      Second... you have FIFO on the cluster, but we aways distribute messages to the local queue first.

                      Say, you have a ClusteredQueue deployed in two servers. When you send a message to that queue, any clients connected to that server will receive the message first. Look at other threads as we have discussed this recently.


                      To that end I have written a test program based in the distributed queue example. It posts 50 messages to a distributed queue then creates 2 listeners and consumes the messages. I was suprised to see that the message consumption was so random. I would have expected FIFO consumption. I tried setting DefaultPreserveOrdering=true but that caused all the message to be consumed by one listener. Essentially, it defeated load balancing. Is there a way to get FIFO consumption and load balancing? The Random House Unabridged dictionary defines Queue As


                      You are using 1.3.0... You really should get to use 1.4.0.SP3.

                      Also... be aware of the local queue behaviors on the clustering. We don't do singleton cluster as that would be performance prohibitive.

                      • 8. Re: Clustered server preference
                        chipschoch

                        Let me clear up some things. First, I am not using 1.3.0. I am using JBoss AS 4.2.2.ga, JBM1.4.0.SP1, as stated in the intial post. I simply made the mistake of referencing a paragraph in the 1.3 documentation. My bad.

                        I have been running numerous tests using the program I included, although I found a few bugs that I fixed, and it is clear that the the consumption on a distributed queue is not FIFO. And as I look at the JMS spec I see that FIFO is not guaranteed. One wonders why they use the term MessageQueue instead of MessageBag. So, I will ask my question again. Is there a way to configure JBossMessaging (1.4.0SP1) such that I can get FIFO from a particular destination regardless of the number of consumers of that destination?

                        Finally, my singleton service is a service that monitors a directory on a SAN for incoming files and queues a message when a file comes in. I really don't want the service running on multiple nodes monitoring a shared directory. It is not a "singleton cluster". It is a singleton service that runs in the cluster.

                        The only reason I am trying to understand this stuff is that we are about ready to deploy our new architecture in production (3 linux app servers clustered and 4 windows app servers running a jms client service) and we are not seeing the load balanced across them like we should. Apparently my misunderstanding of how distributed queues work is a major contributing factor.

                        Here is the last 3 runs of my program with 2 client listeners using:

                        <mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
                         name="jboss.messaging.connectionfactory:service=LoadBalanceConnectionFactory"
                         xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
                         <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
                         <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket</depends>
                         <depends>jboss.messaging:service=PostOffice</depends>
                        
                         <attribute name="JNDIBindings">
                         <bindings>
                         <binding>/LoadBalanceConnectionFactory</binding>
                         <binding>/LoadBalanceConnectionFactory</binding>
                         <binding>java:/LoadBalanceConnectionFactory</binding>
                         <binding>java:/LoadBalanceConnectionFactory</binding>
                         </bindings>
                         </attribute>
                        
                         <attribute name="PrefetchSize">1</attribute>
                         <attribute name="SlowConsumers">true</attribute>
                         <attribute name="SupportsFailover">false</attribute>
                         <attribute name="SupportsLoadBalancing">true</attribute>
                         </mbean>
                        </server>


                        [2008-04-16 13:44:26,266] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_0
                        [2008-04-16 13:44:26,313] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_1
                        [2008-04-16 13:44:26,313] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_2
                        [2008-04-16 13:44:26,329] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_3
                        [2008-04-16 13:44:26,344] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_4
                        [2008-04-16 13:44:26,360] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_5
                        [2008-04-16 13:44:26,376] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_6
                        [2008-04-16 13:44:26,391] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_7
                        [2008-04-16 13:44:26,407] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_8
                        [2008-04-16 13:44:26,422] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_9
                        [2008-04-16 13:44:26,438] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_10
                        [2008-04-16 13:44:26,454] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_11
                        [2008-04-16 13:44:26,469] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_12
                        [2008-04-16 13:44:26,485] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_13
                        [2008-04-16 13:44:26,501] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_14
                        [2008-04-16 13:44:26,516] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_15
                        [2008-04-16 13:44:26,532] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_16
                        [2008-04-16 13:44:26,547] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_17
                        [2008-04-16 13:44:26,563] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_18
                        [2008-04-16 13:44:26,579] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_19
                        [2008-04-16 13:44:26,594] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_20
                        [2008-04-16 13:44:26,610] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_21
                        [2008-04-16 13:44:26,626] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_22
                        [2008-04-16 13:44:26,641] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_23
                        [2008-04-16 13:44:26,657] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_24
                        [2008-04-16 13:44:27,016] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_0
                        [2008-04-16 13:44:27,016] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_1
                        [2008-04-16 13:44:28,032] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_2
                        [2008-04-16 13:44:28,047] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_3
                        [2008-04-16 13:44:29,063] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_4
                        [2008-04-16 13:44:29,079] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_5
                        [2008-04-16 13:44:30,094] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_6
                        [2008-04-16 13:44:30,110] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_7
                        [2008-04-16 13:44:31,157] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_8
                        [2008-04-16 13:44:31,157] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_9
                        [2008-04-16 13:44:32,204] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_10
                        [2008-04-16 13:44:32,204] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_11
                        [2008-04-16 13:44:33,219] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_12
                        [2008-04-16 13:44:33,235] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_13
                        [2008-04-16 13:44:34,266] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_15
                        [2008-04-16 13:44:34,266] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_14
                        [2008-04-16 13:44:35,297] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_16
                        [2008-04-16 13:44:35,297] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_17
                        [2008-04-16 13:44:36,329] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_18
                        [2008-04-16 13:44:36,344] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_19
                        [2008-04-16 13:44:37,376] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_20
                        [2008-04-16 13:44:37,376] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_21
                        [2008-04-16 13:44:38,407] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_22
                        [2008-04-16 13:44:38,407] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_23
                        [2008-04-16 13:44:39,422] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_24
                        [2008-04-16 13:44:52,360] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_0
                        [2008-04-16 13:44:52,391] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_1
                        [2008-04-16 13:44:52,422] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_2
                        [2008-04-16 13:44:52,422] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_3
                        [2008-04-16 13:44:52,454] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_4
                        [2008-04-16 13:44:52,469] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_5
                        [2008-04-16 13:44:52,485] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_6
                        [2008-04-16 13:44:52,501] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_7
                        [2008-04-16 13:44:52,516] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_8
                        [2008-04-16 13:44:52,532] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_9
                        [2008-04-16 13:44:52,547] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_10
                        [2008-04-16 13:44:52,563] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_11
                        [2008-04-16 13:44:52,579] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_12
                        [2008-04-16 13:44:52,594] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_13
                        [2008-04-16 13:44:52,610] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_14
                        [2008-04-16 13:44:52,626] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_15
                        [2008-04-16 13:44:52,641] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_16
                        [2008-04-16 13:44:52,657] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_17
                        [2008-04-16 13:44:52,672] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_18
                        [2008-04-16 13:44:52,672] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_19
                        [2008-04-16 13:44:52,704] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_20
                        [2008-04-16 13:44:52,704] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_21
                        [2008-04-16 13:44:52,735] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_22
                        [2008-04-16 13:44:52,735] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_23
                        [2008-04-16 13:44:52,751] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_24
                        [2008-04-16 13:44:53,313] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_0
                        [2008-04-16 13:44:53,313] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_3
                        [2008-04-16 13:44:54,407] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_1
                        [2008-04-16 13:44:54,422] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_23
                        [2008-04-16 13:44:55,454] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_22
                        [2008-04-16 13:44:55,485] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_21
                        [2008-04-16 13:44:56,532] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_24
                        [2008-04-16 13:44:56,547] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_2
                        [2008-04-16 13:44:57,594] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_19
                        [2008-04-16 13:44:57,610] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_20
                        [2008-04-16 13:44:58,719] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_11
                        [2008-04-16 13:44:58,751] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_12
                        [2008-04-16 13:44:59,813] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_16
                        [2008-04-16 13:44:59,829] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_17
                        [2008-04-16 13:45:00,876] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_7
                        [2008-04-16 13:45:00,954] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_8
                        [2008-04-16 13:45:01,969] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_6
                        [2008-04-16 13:45:02,047] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_9
                        [2008-04-16 13:45:03,063] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_15
                        [2008-04-16 13:45:03,126] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_18
                        [2008-04-16 13:45:04,172] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_5
                        [2008-04-16 13:45:04,219] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_10
                        [2008-04-16 13:45:05,266] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_4
                        [2008-04-16 13:45:05,329] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_13
                        [2008-04-16 13:45:06,360] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_14
                        [2008-04-16 13:45:20,188] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_0
                        [2008-04-16 13:45:20,235] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_1
                        [2008-04-16 13:45:20,251] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_2
                        [2008-04-16 13:45:20,266] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_3
                        [2008-04-16 13:45:20,282] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_4
                        [2008-04-16 13:45:20,297] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_5
                        [2008-04-16 13:45:20,313] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_6
                        [2008-04-16 13:45:20,329] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_7
                        [2008-04-16 13:45:20,344] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_8
                        [2008-04-16 13:45:20,360] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_9
                        [2008-04-16 13:45:20,376] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_10
                        [2008-04-16 13:45:20,391] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_11
                        [2008-04-16 13:45:20,407] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_12
                        [2008-04-16 13:45:20,422] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_13
                        [2008-04-16 13:45:20,438] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_14
                        [2008-04-16 13:45:20,454] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_15
                        [2008-04-16 13:45:20,469] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_16
                        [2008-04-16 13:45:20,485] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_17
                        [2008-04-16 13:45:20,501] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_18
                        [2008-04-16 13:45:20,516] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_19
                        [2008-04-16 13:45:20,532] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_20
                        [2008-04-16 13:45:20,532] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_21
                        [2008-04-16 13:45:20,563] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_22
                        [2008-04-16 13:45:20,563] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_23
                        [2008-04-16 13:45:20,579] INFO - com.eLynx.Utility.test.JBMTest - Queueing message: Test_Message_24
                        [2008-04-16 13:45:21,001] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_6
                        [2008-04-16 13:45:21,001] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_0
                        [2008-04-16 13:45:22,079] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_18
                        [2008-04-16 13:45:22,141] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_19
                        [2008-04-16 13:45:23,126] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_11
                        [2008-04-16 13:45:23,188] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_12
                        [2008-04-16 13:45:24,172] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_5
                        [2008-04-16 13:45:24,251] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_7
                        [2008-04-16 13:45:25,282] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_4
                        [2008-04-16 13:45:25,344] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_8
                        [2008-04-16 13:45:26,376] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_9
                        [2008-04-16 13:45:26,454] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_10
                        [2008-04-16 13:45:27,422] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_14
                        [2008-04-16 13:45:27,516] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_15
                        [2008-04-16 13:45:28,485] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_21
                        [2008-04-16 13:45:28,563] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_22
                        [2008-04-16 13:45:29,594] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_16
                        [2008-04-16 13:45:29,672] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_17
                        [2008-04-16 13:45:30,657] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_23
                        [2008-04-16 13:45:30,735] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_24
                        [2008-04-16 13:45:31,751] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_1
                        [2008-04-16 13:45:31,844] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_2
                        [2008-04-16 13:45:32,813] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_20
                        [2008-04-16 13:45:32,907] INFO - com.eLynx.Utility.test.TestListener - Listener_1 received message: Test_Message_3
                        [2008-04-16 13:45:33,860] INFO - com.eLynx.Utility.test.TestListener - Listener_2 received message: Test_Message_13


                        • 9. Re: Clustered server preference
                          clebert.suconic

                          Just a FYI (to keep you happy) I have invested a lot of time on this issue now :-)

                          Load balancing on the ClusteredConnectionFactory, only guarantees that each cf.createConnection() will point to a different Server. Having said that, a MessageProducer will aways be bound to where its Connection is pointing to.

                          Also, Messages are aways consumed by local consumers on the cluster preferably. If there are no local consumers, message are redistributed to the cluster.

                          If you need to guarantee ordering on message redistribution, take a look on this:

                          http://www.jboss.org/file-access/default/members/jbossmessaging/freezone/docs/userguide-1.4.1.Beta1/html_single/index.html#c_conf.orderingincluster



                          I have changed your example to send messages on both nodes, and after setting DefaultPreserveOrdering=true, I got the output the way you would expect.

                          Listener_1 received message: Test_Message_0
                          Listener_2 received message: Test_Message_1
                          Listener_1 received message: Test_Message_2
                          Listener_2 received message: Test_Message_3
                          Listener_1 received message: Test_Message_4
                          Listener_2 received message: Test_Message_5
                          Listener_1 received message: Test_Message_6
                          Listener_2 received message: Test_Message_7
                          Listener_1 received message: Test_Message_8
                          Listener_2 received message: Test_Message_9
                          Listener_1 received message: Test_Message_10
                          Listener_2 received message: Test_Message_11
                          Listener_1 received message: Test_Message_12
                          Listener_2 received message: Test_Message_13
                          Listener_1 received message: Test_Message_14
                          Listener_2 received message: Test_Message_15
                          Listener_1 received message: Test_Message_16
                          Listener_2 received message: Test_Message_17
                          Listener_1 received message: Test_Message_18
                          Listener_2 received message: Test_Message_19
                          Listener_1 received message: Test_Message_20
                          Listener_2 received message: Test_Message_21
                          Listener_1 received message: Test_Message_22
                          Listener_2 received message: Test_Message_23
                          Listener_1 received message: Test_Message_24
                          Listener_2 received message: Test_Message_25
                          Listener_1 received message: Test_Message_26
                          Listener_2 received message: Test_Message_27
                          Listener_1 received message: Test_Message_28
                          Listener_2 received message: Test_Message_29
                          Listener_1 received message: Test_Message_30
                          Listener_2 received message: Test_Message_31
                          Listener_1 received message: Test_Message_32
                          Listener_2 received message: Test_Message_33
                          Listener_1 received message: Test_Message_34
                          Listener_2 received message: Test_Message_35
                          Listener_1 received message: Test_Message_36
                          Listener_2 received message: Test_Message_37
                          Listener_1 received message: Test_Message_38
                          Listener_2 received message: Test_Message_39
                          Listener_1 received message: Test_Message_40
                          Listener_2 received message: Test_Message_41
                          Listener_1 received message: Test_Message_42
                          Listener_2 received message: Test_Message_43
                          Listener_1 received message: Test_Message_44
                          Listener_2 received message: Test_Message_45
                          Listener_1 received message: Test_Message_46
                          Listener_2 received message: Test_Message_47
                          Listener_1 received message: Test_Message_48
                          Listener_2 received message: Test_Message_49
                          



                          I really hate posting a full source code on the forum.. but I will open an exception now:

                          
                          
                          import java.util.Properties;
                          
                          import javax.jms.*;
                          import javax.jms.ConnectionFactory;
                          import javax.jms.JMSException;
                          import javax.jms.Message;
                          import javax.jms.MessageConsumer;
                          import javax.jms.MessageListener;
                          import javax.jms.MessageProducer;
                          import javax.jms.Queue;
                          import javax.jms.QueueSession;
                          import javax.jms.Session;
                          import javax.jms.TextMessage;
                          
                          import javax.naming.Context;
                          import javax.naming.InitialContext;
                          import javax.naming.NamingException;
                          
                          
                          /**
                           * Class JBMTest
                           */
                          
                          public class JBMTest
                          {
                           static Queue m_producerQueue = null;
                           static InitialContext m_initialContext_1 = null;
                           static ConnectionFactory m_factory_1 = null;
                          
                           /**
                           * Method: initialize
                           *
                           *
                           * @throws NamingException
                           */
                          
                           static void initialize () throws NamingException
                           {
                           Properties p = new Properties ();
                           p.put (Context.PROVIDER_URL, "jnp://10.61.5.1.int:1100");
                           p.put (Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                           p.put (Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
                          
                           m_initialContext_1 = new InitialContext (p);
                           m_factory_1 = (ConnectionFactory) m_initialContext_1.lookup ("/ClusteredConnectionFactory");
                          
                          
                           }
                          
                           /**
                           * Method: main
                           *
                           *
                           * @param args
                           */
                          
                           public static void main (String[] args)
                           {
                           JBMTest tester = null;
                           try
                           {
                           initialize ();
                          
                           tester = new JBMTest ();
                           tester.sendMessages (50);
                           tester.consumeMessages (50);
                          
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           finally
                           {
                           try
                           {
                           if (tester != null)
                           tester.cleanup ();
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           }
                           }
                          
                           /**
                           * Method: cleanup
                           *
                           *
                           * @throws JMSException
                           */
                          
                           void cleanup () throws JMSException
                           {
                           }
                          
                           /**
                           * Method: consumeMessages
                           *
                           *
                           * @param count
                           */
                          
                           void consumeMessages (int count)
                           {
                           TestListener listener1 = null;
                           TestListener listener2 = null;
                          
                           try
                           {
                           listener1 = new TestListener ("Listener_1", m_initialContext_1, m_factory_1);
                           listener2 = new TestListener ("Listener_2", m_initialContext_1, m_factory_1);
                           listener1.m_jmsConnection.start ();
                           listener2.m_jmsConnection.start ();
                          
                          
                           while ((listener1.m_count + listener2.m_count) < count)
                           {
                           Thread.sleep (1000);
                           }
                          
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           finally
                           {
                           try
                           {
                           if (listener1 != null)
                           listener1.cleanup ();
                          
                           if (listener2 != null)
                           listener2.cleanup ();
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           }
                           }
                          
                           /**
                           * Method: sendMessages
                           *
                           *
                           * @param count
                           */
                          
                           void sendMessages (int count)
                           {
                           try
                           {
                           m_producerQueue = (Queue) m_initialContext_1.lookup ("/queue/testDistributedQueue");
                           Connection conn1 = m_factory_1.createConnection ();
                           Session session1 = (QueueSession) conn1.createSession (false, Session.AUTO_ACKNOWLEDGE);
                           MessageProducer producer1 = session1.createProducer (m_producerQueue);
                          
                           Connection conn2 = m_factory_1.createConnection ();
                           Session session2 = (QueueSession) conn2.createSession (false, Session.AUTO_ACKNOWLEDGE);
                           MessageProducer producer2 = session2.createProducer (m_producerQueue);
                          
                           for (int i = 0; i < count; i++)
                           {
                           System.out.println("Queueing message: Test_Message_" + i);
                           System.out.flush();
                           TextMessage tm = session1.createTextMessage ("Test_Message_" + i);
                           if (i%2 == 0)
                           {
                           producer1.send (tm);
                           }
                           else
                           {
                           producer2.send (tm);
                           }
                           }
                          
                           conn1.close();
                          
                           conn2.close();
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           }
                          }
                          
                          /**
                           * Class TestListener
                           */
                          
                          class TestListener implements MessageListener
                          {
                           int m_count = 0;
                           MessageConsumer m_consumer = null;
                           Queue m_responseQueue = null;
                           Connection m_jmsConnection = null;
                           QueueSession m_jmsSession = null;
                           String m_name = null;
                          
                           /**
                           * Constructor: TestListener
                           *
                           *
                           * @param name
                           * @param ic
                           * @param factory
                           *
                           * @throws JMSException
                           * @throws NamingException
                           */
                          
                           TestListener (String name,
                           InitialContext ic,
                           ConnectionFactory factory) throws NamingException, JMSException
                           {
                           m_responseQueue = (Queue) ic.lookup ("/queue/testDistributedQueue");
                           m_jmsConnection = factory.createConnection ();
                           m_jmsSession = (QueueSession) m_jmsConnection.createSession (false, Session.AUTO_ACKNOWLEDGE);
                           m_consumer = m_jmsSession.createConsumer (m_responseQueue);
                           m_consumer.setMessageListener (this);
                           m_name = name;
                           }
                          
                           /**
                           * Method: cleanup
                           *
                           *
                           * @throws JMSException
                           */
                          
                           void cleanup () throws JMSException
                           {
                           if (m_consumer != null)
                           m_consumer.close ();
                           if (m_jmsSession != null)
                           m_jmsSession.close ();
                           if (m_jmsConnection != null)
                           m_jmsConnection.close ();
                           }
                          
                           /**
                           * Method: onMessage
                           *
                           *
                           * @param arg0
                           */
                          
                           public void onMessage (Message arg0)
                           {
                           try
                           {
                           TextMessage msg = (TextMessage) arg0;
                           System.out.println(m_name + " received message: " + msg.getText ());
                           Thread.sleep (1000);
                           msg.acknowledge();
                           m_count++;
                           }
                           catch (Exception e)
                           {
                           e.printStackTrace();
                           }
                           }
                          }
                          


                          • 10. Re: Clustered server preference
                            chipschoch

                            I certainly appreciate your trying to keep me happy, as that is the prime objective. Perhaps you could speak to my wife about that. And in all seriousness, I do appreciate the time you have spent.

                            Requiring the producer of the messages to be cognizant of how many nodes are in the cluster in order to distribute them round robin is IMHO not a very good solution. Any process producing messages to a clustered queue should not care how many nodes are in the queue.

                            Furthermore, absent any configuration (MsgSelector, priority etc;) to the contrary, messages should get consumed in the order they were produced. That just makes sense. It's called a Queue. It should act like a queue. To have a queue, that has multiple consumers, have the messages pulled off it in a haphazard order just doesn't make sense to me.

                            If I am waiting in line (i.e. the queue for our english friends) for a teller at the bank I don't want to see someone (say, the manager) grabbing anyone in the line to go to the next available teller. I would expect that the person who has been in line the longest (i.e. the front of the line) gets the next available teller.

                            Now of course if I was at the Movies it would be different. There, when I go to get some snacks there are six registers and six lines and I have to pick the line I want. If I get in the shortest line and then the person in front of me has a big order and I end up standing there while the other longer lines go faster, well then I get pissed. I much prefer the single line feeding into all six registers so the person who was there first gets serviced first, regardless of which register it is. Get my drift?

                            :)

                            • 11. Re: Clustered server preference
                              clebert.suconic

                              Just enable ordering, and it will work for you.

                              We just try to avoid unnecessary round trips to the cluster, and if you need to guarantee ordering, we do things in a different way. That's all.

                              • 12. Re: Clustered server preference
                                chipschoch

                                I hate to belabor the point but it seems you are not quite understanding what I am after. Let me try again.

                                Here is what I see.

                                My producer is connected to appserver1. Here is the serverLocatorURI from the factory via the debugger:

                                bisocket://172.17.20.60:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat


                                Consumer 1 is connected to appserver1:
                                bisocket://172.17.20.60:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat

                                Consumer 2 is connected to appserver2:
                                bisocket://172.17.20.61:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat


                                All are using this connectionfactory:

                                 <mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
                                 name="jboss.messaging.connectionfactory:service=NoLoadBalanceConnectionFactory"
                                 xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
                                 <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
                                 <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket</depends>
                                 <depends>jboss.messaging:service=PostOffice</depends>
                                
                                 <attribute name="JNDIBindings">
                                 <bindings>
                                 <binding>/NoLoadBalanceConnectionFactory</binding>
                                 <binding>/XANoLoadBalanceConnectionFactory</binding>
                                 <binding>java:/NoLoadBalanceConnectionFactory</binding>
                                 <binding>java:/XANoLoadBalanceConnectionFactory</binding>
                                 </bindings>
                                 </attribute>
                                 <attribute name="PrefetchSize">1</attribute>
                                 <attribute name="SlowConsumers">true</attribute>
                                 <attribute name="SupportsFailover">false</attribute>
                                 <attribute name="SupportsLoadBalancing">false</attribute>
                                 </mbean>


                                Set DefaultPreserveOrdering=true on the servers

                                Post 5 messages:
                                [2008-04-17 09:18:54,613] INFO - Queueing message: Test_Message_0
                                [2008-04-17 09:18:54,660] INFO - Queueing message: Test_Message_1
                                [2008-04-17 09:18:54,675] INFO - Queueing message: Test_Message_2
                                [2008-04-17 09:18:54,691] INFO - Queueing message: Test_Message_3
                                [2008-04-17 09:18:54,722] INFO - Queueing message: Test_Message_4
                                [2008-04-17 09:20:39,393] INFO - Listener_2 received message: Test_Message_0
                                [2008-04-17 09:20:40,440] INFO - Listener_2 received message: Test_Message_1
                                [2008-04-17 09:20:41,456] INFO - Listener_2 received message: Test_Message_2
                                [2008-04-17 09:20:42,487] INFO - Listener_2 received message: Test_Message_3
                                [2008-04-17 09:20:43,519] INFO - Listener_2 received message: Test_Message_4



                                I get message ordering but no client load balancing, as expected.
                                But why is Listener_2 getting the messages and not Listener_1? I verified that the messages were put on appserver1 by stopping before creating the consumers and using the jmx-console to see the message count. I also verified that after creating the consumers each appserver had one consumer.

                                My desired result is that I get message ordering and both listeners consume the messages.


                                • 13. Re: Clustered server preference
                                  ataylor

                                   

                                  But why is Listener_2 getting the messages and not Listener_1? I verified that the messages were put on appserver1 by stopping before creating the consumers and using the jmx-console to see the message count. I also verified that after creating the consumers each appserver had one consumer.



                                  This is the correct outcome. The JMS spec does not specify how messages are distributed when more than one receiver is created on a queue. Both receivers will only receive messages if Message Selectors are used. Remember using queues is point to point so you should only have one receiver.

                                  • 14. Re: Clustered server preference
                                    chipschoch

                                     

                                    Remember using queues is point to point so you should only have one receiver.


                                    I am assuming that by receiver you mean consumer, in JMS parlance. If that is the case I don't know how you can make this assertion. The whole point of using queues in a load balanced environment is so that you can have a single work queue (or virtual single queue as in a clustered queue) and be able to scale up your workers to meet the load. In my case that means bringing more hardware on line to process messages in the queue.

                                    If I have completely misunderstood you in this context I apologize. I have been writing business systems that employ queues for a long time and the paradigm has always been FIFO behavior, allowing an unlimited number of worker processes to pull messages off the queue in the order that they were put on. The idea of a queue having only one process that can pull messages off is absurd. Surely I am misunderstanding what you are saying.

                                    1 2 Previous Next