3 Replies Latest reply on Nov 3, 2011 4:04 AM by andrey.vorobiev

    Not all of messages are delivered in symmetric HornetQ cluster

    andrey.vorobiev

      I have following symmetric cluster configuration:

       

      hornetq-configuration.xml:

      <connectors>
              <connector name="netty">
                  <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                  <param key="host"  value="10.0.1.32"/>
                  <param key="port"  value="1254"/>
              </connector>
              <connector name="node2-connector">
                  <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                  <param key="host" value="10.0.2.199"/>
                  <param key="port" value="1254"/>
              </connector>
      <connectors/>
      
      <cluster-connections>
              <cluster-connection name="my-cluster">
                  <address>jms</address>
                  <connector-ref>netty</connector-ref>
                  <retry-interval>500</retry-interval>
                  <use-duplicate-detection>true</use-duplicate-detection>
                  <forward-when-no-consumers>true</forward-when-no-consumers>
                  <max-hops>1</max-hops>
                  <static-connectors>
                      <connector-ref>node2-connector</connector-ref>
                  </static-connectors>
              </cluster-connection>
      </cluster-connections>
      
      <address-settings>
            <!--default for catch all-->
              <address-setting match="#">
                  <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                  <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                  <redelivery-delay>0</redelivery-delay>
                  <redistribution-delay>0</redistribution-delay>
                  <max-size-bytes>10485760</max-size-bytes>
                  <message-counter-history-day-limit>10</message-counter-history-day-limit>
                  <address-full-policy>BLOCK</address-full-policy>
              </address-setting>
      </address-settings>
      

       

      hornetq-jms.xml

       

      <connection-factory name="remote-cf">
            <xa>true</xa>
            <ha>true</ha>
            <connectors>
                  <connector-ref connector-name="netty"/>
                  <connector-ref connector-name="node2-connector"/>
      
            </connectors>
            <entries>
                  <entry name="jms/remote-cf"/>
            </entries>
      </connection-factory>
      
      <queue name="FakeQueue">
             <entry name="jms/fakequeue"/>
      </queue>
      

       

      Second node is configured in the same way.

       

      Message producer code:

      public class Main
      {
          private static final String prefix = UUID.randomUUID().toString();
      
          private static AtomicInteger counter = new AtomicInteger();
      
          public static InitialContext getInitialContext() throws Exception
          {
              Properties properties = new Properties();
      
              properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
      
              properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299");
      
              properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
      
              return new InitialContext(properties);
          }
      
          public static ConnectionFactory getConnectionFactory() throws Exception
          {
              return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf");
          }
      
          public static Destination getDestination() throws Exception
          {
              return (Destination) getInitialContext().lookup("jms/fakequeue");
          }
      
          public static void main(String[] args) throws Exception
          {
              ConnectionFactory cf = getConnectionFactory();
      
      
              for (int i = 0; i < 10; i++)
              {
                  Connection connection = cf.createConnection();
      
                  connection.start();
      
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
                  String ip = getIpBy(getRemoteAddress(session));
      
                  MessageProducer producer = session.createProducer(getDestination());
                  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
      
                  TextMessage message = session.createTextMessage("Message " + prefix + "-" + counter.incrementAndGet());
      
                  System.out.println("Sending " + message.getText() + " to " + ip);
      
                  producer.send(message);
      
                  connection.close();
      
                  Thread.sleep(1000);
              }
          }
      
          public static String getRemoteAddress(Session session)
          {
              return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress();
          }
      
          public static String getIpBy(String address)
          {
              return address.split(":")[0].replace("/", "");
          }
      }
      

       

       

      Message consumer code:

      public class Subscriber
      {
          public static InitialContext getInitialContext() throws Exception
          {
              Properties properties = new Properties();
      
              properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
      
              properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299");
      
              properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
      
              return new InitialContext(properties);
          }
      
          public static ConnectionFactory getConnectionFactory() throws Exception
          {
              return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf");
          }
      
          public static Destination getDestination() throws Exception
          {
              return (Destination) getInitialContext().lookup("jms/fakequeue");
          }
      
          public static void main(String[] args) throws Exception
          {
              ConnectionFactory cf = getConnectionFactory();
      
              Connection connection = cf.createConnection();
      
              connection.start();
      
              connection.setExceptionListener(new ExceptionListener()
              {
                  public void onException(JMSException exception)
                  {
                      exception.printStackTrace();
                  }
              });
      
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
              System.out.println(getRemoteAddress(session));
      
              MessageConsumer consumer = session.createConsumer(getDestination());
      
              consumer.setMessageListener(new MessageListener()
              {
                  public void onMessage(Message message)
                  {
                      try
                      {
                          TextMessage m = (TextMessage) message;
      
                          System.out.println("message " + m.getText());
                      }
                      catch (Exception e)
                      {
                          e.printStackTrace();
                      }
                  }
              });
      
              Thread.sleep(5 * 60 * 1000);
          }
      
          public static String getRemoteAddress(Session session)
          {
              return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress();
          }
      }
      

       

      Expected behavior: all messages sent by producer will be received by consumer (it doesn't have any matter which node consumer is connected to).

       

      But as a result I get the following:

      Sender:

      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-1 to 10.0.2.199
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-2 to 10.0.1.32
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-3 to 10.0.2.199
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-4 to 10.0.1.32
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-5 to 10.0.2.199
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-6 to 10.0.1.32
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-7 to 10.0.2.199
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-8 to 10.0.1.32
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-9 to 10.0.2.199
      Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-10 to 10.0.1.32
      

      Consumer:

      /10.0.1.32:1254
      message Message 303b5854-0806-40eb-86a3-e3a92eafdade-2
      message Message 303b5854-0806-40eb-86a3-e3a92eafdade-3
      message Message 303b5854-0806-40eb-86a3-e3a92eafdade-6
      message Message 303b5854-0806-40eb-86a3-e3a92eafdade-7
      message Message 303b5854-0806-40eb-86a3-e3a92eafdade-10
      

       

      So the question: where are other five messages?:)