2 Replies Latest reply on Nov 16, 2011 4:26 PM by mackerman

    Topic messages not being delivered to all subscribers in cluster

    mackerman

      Hi, I have a scenario where I have 2 servers in a cluster, one topic, 2 producers, one on each server, and 2 consumers, one on each server.  When I send a message to the topic on the first server, the consumer on the first server receives the message, but not the consumer on the second server.  When I send a message to the topic on the second server, both consumers receive the message.

       

      I am setting up the cluster using static connectors; the second server connects to the first server to establish the bridge, but from the behaviour above, it appears that the connection from the first server to the second is not being established or used correctly.

       

      I need to be able to establish a cluster in a situation where not all hosts are known initially, so i'm relying on HornetQ to propagate cluster connections as they are established.  According to the user manual, section 38.2.2:

       

      "This doesn't mean that you have to know where all your servers are going to be hosted, you can configure these servers to use the reliable servers to connect to. Once they are connected there connection details will be propagated via the server it connects to"

       

      Note that the consumers are hornetQ rest consumers.

       

      My environment is HornetQ 2.2.7 & HornetQ Rest 2.2.6.  I am running in JBoss AS7, though no messaging configuration is done in the AS7 standalone (or domain) configuration files due to problems getting Rest working with that configuration.  In this configuration JNDI is not being used.

       

      Here are snippets from my configuration files & source.  In the scenario outlined above, the first server is 10.2.62.28 & the second 10.2.62.27.

       

      hornetq-configuration.xml (note that the inVM connectors & acceptors are needed by hornetQ rest) 

       

      <configuration xmlns="urn:hornetq"

                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       

          <persistence-enabled>false</persistence-enabled>

         

          <clustered>true</clustered>

       

         <connectors>

              <connector name="in-vm">

                  <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>

              </connector>

            <connector name="netty-connector">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="port" value="5445"/>

            </connector>

            <connector name="netty-connector-28">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="port" value="5445"/>

              <param key="host" value="10.2.62.28" />

            </connector>

            <connector name="netty-connector-27">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="port" value="5445"/>

              <param key="host" value="10.2.62.27" />

            </connector>

         </connectors>

        

         <!-- Acceptors -->

         <acceptors>

              <acceptor name="in-vm">

                  <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

              </acceptor>

              <acceptor name="netty-acceptor">

                 <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                 <param key="port" value="5445"/>

                  <param key="host" value="0.0.0.0" />

              </acceptor>

         </acceptors>

        

         <cluster-connections>

            <cluster-connection name="bob-cluster">

               <address>jms</address>

               <connector-ref>netty-connector-28</connector-ref>

               <retry-interval>1000</retry-interval>

               <use-duplicate-detection>true</use-duplicate-detection>

               <forward-when-no-consumers>false</forward-when-no-consumers>

               <max-hops>1</max-hops>

               <static-connectors>

                  <connector-ref>netty-connector-27</connector-ref>

                  <connector-ref>netty-connector-28</connector-ref>

               </static-connectors>

            </cluster-connection>

         </cluster-connections>

        

          <!-- Other config -->

       

          <security-settings>

              <!--security for example queue-->

              <security-setting match="#">

                  <permission type="createDurableQueue" roles="guest"/>

                  <permission type="deleteDurableQueue" roles="guest"/>

                  <permission type="createNonDurableQueue" roles="guest"/>

                  <permission type="deleteNonDurableQueue" roles="guest"/>

                  <permission type="consume" roles="guest"/>

                  <permission type="send" roles="guest"/>

              </security-setting>

          </security-settings>

       

      </configuration>

       

      hornetq-jms.xml

       

      <configuration xmlns="urn:hornetq"

                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                     xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

       

         

          <connection-factory name="ConnectionFactory">

              <connectors>

                  <connector-ref connector-name="netty-connector" />

              </connectors>

              <entries>

                  <entry name="ConnectionFactory" />

              </entries>

          </connection-factory>

                    

          <topic name="asyncEvents">

              <entry name="/topic/asyncEvents"/>

          </topic>

         

          <queue name="apiRequests">

              <entry name="/queue/apiRequests"/>

          </queue>

           

      </configuration>

       

      The code:

       




      ConnectionFactory cf = (ConnectionFactory)getRegistry().lookup("ConnectionFactory");



      Topic topic = (Topic)getRegistry().lookup("/topic/asyncEvents");



      Connection connection = cf.createConnection();



      connection.start();



      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);



      producer = session.createProducer(topic);

       



      producer.setDisableMessageID(true);


      producer.setDisableMessageTimestamp(true);

       










      ObjectMessage message = session.createObjectMessage();









      message.setStringProperty(org.hornetq.rest.HttpHeaderProperty.CONTENT_TYPE, MediaType.APPLICATION_JSON);









      String strmsg = MessageSerializer.toJson(messages.get(confId));









      message.setObject(strmsg);









      message.setStringProperty("confId", confId);









      producer.send(message);


















       

      The console on server1 shows the bridge being setup:

       

      13:33:30,103 INFO  [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads646299910-484057683)) Connecting bridge sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31 to its destination [dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]

      13:33:30,171 INFO  [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads646299910-484057683)) Bridge sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31 is connected [dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba-> sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31]

       

      The console on server2 also shows a bridge being setup:

       

      13:21:23,689 INFO  [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads1495441375-1380431624)) Connecting bridge sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba to its destination [6753eaf9-0b1f-11e1-86b2-0025b3289f31]

      13:21:23,798 INFO  [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads1495441375-1380431624)) Bridge sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba is connected [6753eaf9-0b1f-11e1-86b2-0025b3289f31-> sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]

       

      Does anyone have any idea why this is not working as I need, and how to fix it?

       

      thanks, Mitchell

        • 1. Re: Topic messages not being delivered to all subscribers in cluster
          clebert.suconic

          Can you try Branch_2_2_AS7 for EAP for a test?

          • 2. Re: Topic messages not being delivered to all subscribers in cluster
            mackerman

            Same behaviour.

             

            Built with 2.2.8.EAP.GA:

             

            14:17:55,060 INFO  [org.hornetq.core.server.impl.HornetQServerImpl] (MSC service thread 1-4) HornetQ Server version 2.2.8.GA (HQ_2_2_8_EAP_GA, 122) [dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]) started

             

            Upon starting server 2, server 1 log shows:

             

            14:18:28,476 INFO  [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-14 (HornetQ-server-HornetQServerImpl::serverUUID=dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba-989301669)) Bridge ClusterConnectionBridge@222b00 [name=sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31, queue=QueueImpl[name=sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31, postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]]@728dc3c3 targetConnector=ServerLocatorImpl (identity=(Cluster-connection-bridge::ClusterConnectionBridge@222b00 [name=sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31, queue=QueueImpl[name=sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31, postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]]@728dc3c3 targetConnector=ServerLocatorImpl [initialConnectors=[org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=10-2-62-28], discoveryGroupConfiguration=null]]::ClusterConnectionImpl@2032131172[nodeUUID=dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba, connector=org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=10-2-62-28, address=jms, server=HornetQServerImpl::serverUUID=dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba])) [initialConnectors=[org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=10-2-62-28], discoveryGroupConfiguration=null]] is connected

             

            The configuration files are:

             

            <configuration xmlns="urn:hornetq"

                           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                           xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

             

                <persistence-enabled>false</persistence-enabled>

               

                <clustered>true</clustered>

             

               <connectors>

                    <connector name="in-vm">

                        <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>

                    </connector>

                  <connector name="netty-connector-localhost">

                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                     <param key="port" value="5446"/>

                  </connector>

                  <connector name="netty-connector-server-one">

                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                     <param key="port" value="5445"/>

                    <param key="host" value="10.2.62.28" />

                  </connector>

                  <connector name="netty-connector-server-two">

                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                     <param key="port" value="5445"/>

                    <param key="host" value="10.2.62.27" />

                  </connector>

               </connectors>

              

               <!-- Acceptors -->

               <acceptors>

                    <acceptor name="in-vm">

                        <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

                    </acceptor>

                    <acceptor name="netty-acceptor-5445">

                       <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                       <param key="port" value="5445"/>

                        <param key="host" value="0.0.0.0" />

                    </acceptor>

                    <acceptor name="netty-acceptor-localhost">

                       <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                       <param key="port" value="5446"/>

                        <param key="host" value="0.0.0.0" />

                    </acceptor>

               </acceptors>

              

               <cluster-connections>

                  <cluster-connection name="bob-cluster">

                     <address>jms</address>

                     <connector-ref>netty-connector-server-one</connector-ref>

                     <retry-interval>1000</retry-interval>

                     <use-duplicate-detection>true</use-duplicate-detection>

                     <forward-when-no-consumers>true</forward-when-no-consumers>

                     <max-hops>1</max-hops>

                     <static-connectors>

                        <connector-ref>netty-connector-server-one</connector-ref>

                     </static-connectors>

                  </cluster-connection>

               </cluster-connections>

              

                <!-- Other config -->

             

                <security-settings>

                    <!--security for example queue-->

                    <security-setting match="#">

                        <permission type="createDurableQueue" roles="guest"/>

                        <permission type="deleteDurableQueue" roles="guest"/>

                        <permission type="createNonDurableQueue" roles="guest"/>

                        <permission type="deleteNonDurableQueue" roles="guest"/>

                        <permission type="consume" roles="guest"/>

                        <permission type="send" roles="guest"/>

                    </security-setting>

                </security-settings>

             

            </configuration>

             

            <configuration xmlns="urn:hornetq"

                           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                           xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

             

                <connection-factory name="ConnectionFactory">

                    <connectors>

                        <connector-ref connector-name="netty-connector-server-one" />

                        <connector-ref connector-name="netty-connector-server-two" />

                    </connectors>

                    <entries>

                        <entry name="ConnectionFactory" />

                    </entries>

                </connection-factory>

                          

                <topic name="asyncEvents">

                    <entry name="/topic/asyncEvents"/>

                </topic>

               

                <queue name="apiRequests">

                    <entry name="/queue/apiRequests"/>

                </queue>

             

            </configuration>