5 Replies Latest reply on Jun 2, 2010 1:01 AM by Josh Wo

    bridges on two queues of same address not working. One bridge get two copies

    Josh Wo Newbie

      I have one address with two queues and trying to configure bridges for each queue in the same server, so that slower/stopped consumer has its own queue from a "topic" address and won't block each other. I am using core API and I just sync'ed to trunk head this morning (so latest of 2.1 RC1)

       

      However, this doesn't seem to work correctly. I see all the messages are deliverred to one bridge and the othe bridge has message count as zero. Also, the one receiving message have both copies of the message and therefore is duplicate for the consumer attached.

      I can see from log and jconsole that both bridges are connected correctly. Also, if I remove one bridge or another, they can both get messages correctly but not when they both on.

       

      I am pretty new to this product. So maybe I missed something basic configuration so the whole configuratin is copied here. I am using netty not the in-vm one. The server is started in standalone momde. I do prefer to use in-vm connector but it doesn't work if I use the one configured below. Is it possible?

       

      Feature wise, is there a better way to have a topic address consumers won't block each other if paging is enabled? (basically having each queue a copy rather than the reference).

       

      my configuration for bridges. Messages of both web1 and web2 goes to either local1 or local2, pretty randomly.

       

      <configuration xmlns="urn:hornetq"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       

         <paging-directory>${data.dir:../data}/paging</paging-directory>
        
         <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
        
         <journal-directory>${data.dir:../data}/journal</journal-directory>
        
         <journal-min-files>10</journal-min-files>
         <security-enabled>false</security-enabled>
       
        
         <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
        
         <connectors>
            <connector name="netty">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
            </connector>
           
            <connector name="netty-invm">
               <factory-class>
                  org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
               </factory-class>
               <param key="use-invm" value="true"/>
               <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
            </connector>
           
            <connector name="netty-throughput">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
               <param key="batch-delay" value="50"/>
            </connector>
         </connectors>
         <acceptors>
            <acceptor name="netty">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
            </acceptor>     
            <acceptor name="netty-invm">
               <factory-class>
                  org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
               </factory-class>
               <param key="use-invm" value="true"/>
               <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
            </acceptor>
           
            <acceptor name="netty-throughput">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
               <param key="batch-delay" value="50"/>
               <param key="direct-deliver" value="false"/>
            </acceptor>
         </acceptors>
         <queues>
              <queue name="jms.queue.web1">
                  <address>jms.address.local</address>
                  <durable>true</durable>
              </queue>
              <queue name="jms.queue.web2">
                  <address>jms.address.local</address>
                  <durable>true</durable>
              </queue>
              <queue name="jms.queue.local1">
                  <address>jms.address.webnode.one</address>
                  <durable>true</durable>
              </queue>
              <queue name="jms.queue.local2">
                  <address>jms.address.webnode.two</address>
                  <durable>true</durable>
              </queue>
          </queues>
          <bridges>
              <bridge name="jms.bridge.local1">
                  <queue-name>jms.queue.web1</queue-name>
                  <forwarding-address>jms.address.webnode.one</forwarding-address>   
                  <retry-interval>1000</retry-interval>
                  <retry-interval-multiplier>1.0</retry-interval-multiplier>
                  <reconnect-attempts>-1</reconnect-attempts>
                  <failover-on-server-shutdown>false</failover-on-server-shutdown>
                  <use-duplicate-detection>true</use-duplicate-detection>
                  <confirmation-window-size>10000000</confirmation-window-size>
                  <connector-ref connector-name="netty"/>        
                 </bridge> 
                
                 <bridge name="jms.bridge.local2">
                  <queue-name>jms.queue.web2</queue-name>
                  <forwarding-address>jms.address.webnode.two</forwarding-address>   
                  <retry-interval>1000</retry-interval>
                  <retry-interval-multiplier>1.0</retry-interval-multiplier>
                  <reconnect-attempts>-1</reconnect-attempts>
                  <failover-on-server-shutdown>false</failover-on-server-shutdown>
                  <use-duplicate-detection>true</use-duplicate-detection>
                  <confirmation-window-size>10000000</confirmation-window-size>
                  <connector-ref connector-name="netty"/>        
                 </bridge> 
                
          </bridges>

       

         <address-settings>
            <!--default for catch all-->
            <address-setting match="#">
               <dead-letter-address>jms.queue.DLQ</dead-letter-address>
               <expiry-address>jms.queue.ExpiryQueue</expiry-address>
               <redelivery-delay>0</redelivery-delay>
               <max-size-bytes>4000</max-size-bytes>
               <page-size-bytes>1000</page-size-bytes>   
               <message-counter-history-day-limit>10</message-counter-history-day-limit>
               <address-full-policy>PAGE</address-full-policy>
            </address-setting>
         </address-settings>
        

       

       

      </configuration>

        • 1. Re: bridges on two queues of same address not working. One bridge get two copies
          Tim Fox Master

          I don't understand what you're trying to achieve here.

           

          Bridges are designed for bridging from one server to another, not on the same server.

           

          Also values for 4000 and 1000 bytes for paging max size and page size seem incredibly low.

          • 3. Re: bridges on two queues of same address not working. One bridge get two copies
            Josh Wo Newbie

            I tried diverts and it almost worked for me. So thanks first!

             

            The main reason I am trying to do it is to workaround the issue mentioned in your documentation regarding paging of an address with mulitple queue. And the reason I set the paging size so small is to simulate a scenario where paging happens with one conusmer down or slow.

             

            What I intend to implement is a replication mechenism where same "updated" data is propagated to web nodes in two data center, with very different network latency (one is huge, as it is remote).

             

            With diverts, I am designing the hornetQ config as follow:

            1. update message is published to a local address (jms.address.local), and then diverted to two local queues (jms.address.local1 and jms.address.local2), each specific to each data center. Therefore, the message is copy rather than reference.

            2. setup bridges (jms.bridge.remote1 and jms.bridge.remote2) for each local queue, and forward to a remote standalone broker server in each data center.

            3. standalone broker server divert the message to queues specific to each web node and consumed by them. (not in the example as I just consumed directly from the source queue in the test).

             

             

            It almost worked except in a negative test case, when I don't bring up the consumers, and let paging started to happen. And then I bring up the consumer a little later, it turned out that they don't get all the messages. Seems that some messages get expired as the consumer only get the first few messages in the memory (the exact number as shown on the jmx console) and skip to the messages in the the later part of queue (I put sequeunce number in the message and that is how I can tell).

             

            All my messages are set to expiration as 0 (never expire). This doesn't happen when no paging happens (basically, increase the max-size-bytes to very big number). This also doesn't happen when paging happens for the diverted queues, but no bridges are configured and messages are consumed directly from diverted queues. So seems to the problem is with the bridging, which expires the message.

             

            Is this something known or something I misconfigured? My configuration is as follow:

             

            <configuration xmlns="urn:hornetq"
                           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                           xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

             

               <paging-directory>${data.dir:../data}/paging</paging-directory>
              
               <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
              
               <journal-directory>${data.dir:../data}/journal</journal-directory>
              
               <journal-min-files>10</journal-min-files>
               <security-enabled>false</security-enabled>
             
              
               <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
              
               <connectors>
                  <connector name="netty">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                  </connector>
                 
                  <connector name="netty-invm">
                     <factory-class>
                        org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
                     </factory-class>
                     <param key="use-invm" value="true"/>
                     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
                  </connector>
                 
                  <connector name="netty-throughput">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                     <param key="batch-delay" value="50"/>
                  </connector>
               </connectors>
               <acceptors>
                  <acceptor name="netty">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                  </acceptor>     
                  <acceptor name="netty-invm">
                     <factory-class>
                        org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
                     </factory-class>
                     <param key="use-invm" value="true"/>
                     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
                  </acceptor>
                 
                  <acceptor name="netty-throughput">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                     <param key="batch-delay" value="50"/>
                     <param key="direct-deliver" value="false"/>
                  </acceptor>
               </acceptors>
               <queues>
                    <queue name="jms.queue.ExpiryQueue">
                        <address>jms.queue.ExpiryQueue</address>
                        <durable>true</durable>
                    </queue>
                   
                    <queue name="jms.queue.DLQ">
                        <address>jms.queue.DLQ</address>
                        <durable>true</durable>
                    </queue>
                   
                    <queue name="jms.queue.local">
                        <address>jms.address.local</address>
                        <durable>true</durable>
                    </queue>
                   
                    <queue name="jms.queue.local1">
                        <address>jms.address.local1</address>
                        <durable>true</durable>
                    </queue>
                    <queue name="jms.queue.local2">
                        <address>jms.address.local2</address>
                        <durable>true</durable>
                    </queue>
                    <queue name="jms.queue.remote1">
                        <address>jms.address.remote1</address>
                        <durable>true</durable>
                    </queue>       
                    <queue name="jms.queue.remote2">
                        <address>jms.address.remote2</address>
                        <durable>true</durable>
                    </queue>
                </queues>
                <diverts>
                    <divert name="jms.divert.local1">
                        <address>jms.address.local</address>
                        <forwarding-address>jms.address.local1</forwarding-address>        
                        <exclusive>true</exclusive>        
                    </divert>        
                    <divert name="jms.divert.local2">
                        <address>jms.address.local</address>
                        <forwarding-address>jms.address.local2</forwarding-address>        
                        <exclusive>true</exclusive>        
                    </divert>                     
                </diverts>
                <bridges>
                    <bridge name="jms.bridge.remote1">
                        <queue-name>jms.queue.local1</queue-name>
                        <forwarding-address>jms.address.remote1</forwarding-address>  
                        <retry-interval>1000</retry-interval>
                        <retry-interval-multiplier>1.0</retry-interval-multiplier>
                        <reconnect-attempts>-1</reconnect-attempts>
                        <failover-on-server-shutdown>false</failover-on-server-shutdown>
                        <use-duplicate-detection>true</use-duplicate-detection>
                        <confirmation-window-size>10000000</confirmation-window-size>
                        <connector-ref connector-name="netty"/>      
                    </bridge>       
                    <bridge name="jms.bridge.remote2">
                        <queue-name>jms.queue.local2</queue-name>
                        <forwarding-address>jms.address.remote2</forwarding-address>  
                        <retry-interval>1000</retry-interval>
                        <retry-interval-multiplier>1.0</retry-interval-multiplier>
                        <reconnect-attempts>-1</reconnect-attempts>
                        <failover-on-server-shutdown>false</failover-on-server-shutdown>
                        <use-duplicate-detection>true</use-duplicate-detection>
                        <confirmation-window-size>10000000</confirmation-window-size>
                        <connector-ref connector-name="netty"/>      
                    </bridge>
               </bridges>
               <address-settings>
                  <!--default for catch all-->
                  <address-setting match="#">
                     <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                     <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                     <redelivery-delay>0</redelivery-delay>
                     <max-size-bytes>4000</max-size-bytes>
                     <page-size-bytes>1000</page-size-bytes>   
                     <message-counter-history-day-limit>10</message-counter-history-day-limit>
                     <address-full-policy>PAGE</address-full-policy>
                  </address-setting>
               </address-settings>  
            </configuration>

            • 4. Re: bridges on two queues of same address not working. One bridge get two copies
              Josh Wo Newbie

              I am also getting these message when I restart the server. Can someone explain what do these messages mean?

               

              [main] 16:15:58,113 WARNING [org.hornetq.core.journal.impl.JournalImpl]  Uncommitted transaction with id 48 found and discarded
              [main] 16:15:58,113 WARNING [org.hornetq.core.journal.impl.JournalImpl]  Uncommitted transaction with id 53 found and discarded
              [main] 16:15:58,113 WARNING [org.hornetq.core.journal.impl.JournalImpl]  Uncommitted transaction with id 348 found and discarded
              [main] 16:15:58,113 WARNING [org.hornetq.core.journal.impl.JournalImpl]  Uncommitted transaction with id 351 found and discarded
              [main] 16:15:58,113 WARNING [org.hornetq.core.journal.impl.JournalImpl]  Uncommitted transaction with id 366 found and discarded