14 Replies Latest reply on Oct 24, 2012 8:07 AM by qtm

    Information about core bridges at clustering

    qtm

      Hi,

       

      When a cluster is created some core bridges are created. My question is if they are using batching or not. I couldn't find any way to customize them. Do they work in the same way regardless of the number of messages they must move or do they adapt to the load of the system?

       

      Thanks

        • 1. Re: Information about core bridges at clustering
          qtm

          One more thing: is there an option to disable load-balancing when producing messages?

           

          For example if a cluster has 3 nodes and a producer is conected to one node, all the messages produced should stay on that node, the other 2 nodes shouldn't get 1/3 of the messages.

          • 2. Re: Information about core bridges at clustering
            clebert.suconic

            The messages are batch.. yes...

             

             

            Regarding load balancing.. you can define the cluster connections per destination. You could simply not define a cluster connection for the specific destination you don't want to be shared.

            1 of 1 people found this helpful
            • 3. Re: Information about core bridges at clustering
              qtm

              Hi,

              Consider the following setup : a cluster with several nodes, cluster connections defined for all of them and a slow hardware. If I produce a lot of messages for each node (about 30 producers each one with its own thread  - sessions are per thread) the producers finish at some point, I can see they produced about 1000-4000 messages/second(depends on the number of threads, if the producers were remote or local). If I browse the queues immediately after the producers have finished I can see that not all the messages have arrived and that they are still coming(another refresh shows more messages) even though my client producers have long been finished, so I guess the messages are kept in a buffer somewhere and the inserting into the journal keeps happening. If I let them be, they will finish successfuly and everything will be ok, but if I restart one the servers while the messages are transfered from the buffers(I could be wrong about the mechanism..) to the queues, I'll get a lot of errors. When the server finishes restarting, the transfer resumes, but at the end I'll get a lot of duplicates in the queue.(for 30k messages I get 1k-2k duplicates).  At the closing of the server the errors are like :

              2012-10-16 08:08:26,166 ERROR [org.hornetq.core.server.cluster.impl.BridgeImpl] Failed to ack: java.lang.IllegalStateException: Journal must be loaded first

                  at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:897) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.persistence.impl.journal.JournalStorageManager.storeAcknowledge(JournalStorageManager.java:572) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:888) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.server.cluster.impl.BridgeImpl.sendAcknowledged(BridgeImpl.java:463) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.client.impl.ClientSessionImpl.commandConfirmed(ClientSessionImpl.java:1282) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:552) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:477) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:556) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

                  at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:517) [hornetq-core-2.2.16.Final.jar:2.2.16.Final (HQ_2_2_16_FINAL, 122)]

               

              At the restart I get:

              2012-10-16 08:41:52,232 WARN  [org.hornetq.core.journal.impl.JournalImpl] (MSC service thread 1-10) Uncommitted transaction with id 87568 found and discarded

               

               

              I'm using paging and redistribution-delay=-1.

               

              If I test with just one server, the transfer is much faster and I couldn't reproduce this issue( I can't restart the server when the transfer is happening, because it works too fast ).  That's why I asked if the bridges are using batches, for my slow hardware they are bottlenecks and produce duplicates in the situation described above.

               

              I'd appreciate any suggestion on this issue - duplicates are not ok.

              • 4. Re: Information about core bridges at clustering
                clebert.suconic

                > If I browse the queues immediately after the producers have finished I can see that not all the messages have arrived and that they are still coming

                 

                Two things: how you are sending? Transaction? Durable?

                 

                I -  Non TX / NonDurable are not synced and hence your producer will finish before they arrive on the server eventually.

                 

                II - Messages are kept on a buffer between the server and are flushed.

                 

                 

                 

                Also: make sure you size the duplicate detection accordingly. If you send too many messages per second, having more messages in flight than what you have on the duplicate detection buffer may cause eventual losses.  You should probably increase that buffer size.


                • 5. Re: Information about core bridges at clustering
                  qtm

                  Thanks for answering.

                   

                  Yes, durable & transactioned. This doesn't happen when I don't use a transaction. I reproduce with both remote transactioned producers and local transaction producers(I'm calling the beans remotely).

                  As I said:

                  At the restart I get:

                  2012-10-16 08:41:52,232 WARN  [org.hornetq.core.journal.impl.JournalImpl] (MSC service thread 1-10) Uncommitted transaction with id 87568 found and discarded

                   

                  Note that all the messages arrive at the destination, my problem is that a lot of duplicates also arrive. I have set duplicate-detection on the clustered-connection(anyway it's the default). I can see messages like:

                  WARN  [org.hornetq.core.postoffice.impl.PostOfficeImpl] (Old I/O server worker (parentId: 1423449306, [id: 0x54d81cda, /[my_ip]:5447])) Duplicate message detected through the bridge - message will not be routed. Message information:

                  ServerMessage[messageID=72638,priority=4, bodySize=452,expiration=0, durable=true, address=jms.queue.testQueue,properties=TypedProperties[{_HQ_BRIDGE_DUP=[B@176730bb, _HQ_ROUTE_TO=[B@77b050fd,  prop_long=451,  JMSReplyTo=jms.queue.testQueue, prop_str=str1!}]]@1486180206

                   

                  but not all the duplicates are filtered.

                   

                  How do I increase the buffer size?

                  • 7. Re: Information about core bridges at clustering
                    qtm

                    Hi,

                     

                    I've added



                    <id-cache-size>50000</id-cache-size>

                     


                    <persist-id-cache>true</persist-id-cache>

                    to my HornetQ server, but this didn't solve my problem. Duplicates still exist. I've attached my config file for a node (the others are similar - the file system and ports differ).

                     

                    Thanks for your help

                    • 8. Re: Information about core bridges at clustering
                      clebert.suconic

                      what you mean by Duplicates still exist.... that you see those messages about duplication or that you are seeing duplicated messages?

                      • 9. Re: Information about core bridges at clustering
                        clebert.suconic

                        At the restart I get:

                        2012-10-16 08:41:52,232 WARN  [org.hornetq.core.journal.impl.JournalImpl] (MSC service thread 1-10) Uncommitted transaction with id 87568 found and discarded

                         

                         

                        One thing you need to understand is, when you restart the system, any uncommitted transaction will be rolled back.

                         

                         

                        that means.. the server didn't have time to write the commit before you killed or stopped the server.

                         

                        if you had any acks as part of that TX these messages will be re-sent. and maybe you're getting the impression of a redelivery.

                         

                         

                        it seams you are also sending transactionally. how many messages you send every time you commit?

                        • 10. Re: Information about core bridges at clustering
                          qtm

                          Hi Clebert,

                           

                          "Duplicates still exist" = at the end I can see in the target queues duplicates, so what I mean is that they not completly filtered.

                           

                          all the transactions were commited from the client code's point of view. I always wait for the client producers to finish. Each transaction has 50 messages, but I have several threads that are working with the same connectionfactory(different sessions though).

                           

                          Here's the client code:

                           

                          public static void main(String[] args) throws Exception{

                                 

                                  Thread t1 = createThread("[server_1]", 4449);

                                  Thread t2 = createThread("[server_2]", 4449);

                                  Thread t3 = createThread("[server_3]", 4449);   

                                 

                                  t1.start();

                                  t2.start();

                                  t3.start();

                                 

                                  t1.join();

                                  t2.join();   

                                  t3.join();

                                

                              }   

                             

                              public static void produceMessages(QueueConnectionFactory factory, Queue q, boolean batched, boolean singleConnection,

                                      MesType type) throws Exception

                              {

                                         

                                  List<QueueConnection> cons = new ArrayList<QueueConnection>();

                                  int messageNumber = 1000;

                                  int numberOfThreads = 10;

                                  try{

                                      ExecutorService service = Executors.newCachedThreadPool();

                                     

                                      if (singleConnection){

                                          QueueConnection qcon = factory.createQueueConnection("App1", "App2");

                                          qcon.start();

                                          cons.add(qcon);

                                      }

                                      for (int i = 0; i < numberOfThreads; i++){

                                          QueueConnection qcon = factory.createQueueConnection("App1", "App2");

                                          qcon.start();

                                          cons.add(qcon);

                                      }

                                      long start = System.currentTimeMillis();

                                      for (int i = 0; i < numberOfThreads; i++){               

                                          QueueConnection qcon = null;

                                          if (singleConnection){

                                              qcon = cons.get(0);

                                          } else{

                                              qcon = cons.get(i);

                                          }               

                                          service.execute(new ProducingThread(q, qcon, batched, messageNumber, type));

                                      }   

                                      service.shutdown();

                                      service.awaitTermination(1, TimeUnit.DAYS);

                                      long end = System.currentTimeMillis();

                                      System.out.println("duration = " + (end-start));

                                      System.out.println("messages/second = " + numberOfThreads*messageNumber*1000f/(end-start));                       

                                  } catch (Exception e){

                                      e.printStackTrace();

                                  } finally{

                                      if (cons != null){

                                          for (QueueConnection con : cons){

                                              con.close();

                                          }

                                      }

                                  }       

                              }

                             

                              private static Context getContext(String host, int port) throws Exception{

                                 

                                  final Properties env1 = new Properties();

                                 

                                  env1.put(Context.INITIAL_CONTEXT_FACTORY, org.jboss.naming.remote.client.InitialContextFactory.class.getName());       

                                  env1.put(Context.PROVIDER_URL, "remote://" + host + ":" + port);       

                                  env1.put(Context.SECURITY_PRINCIPAL, "App1");

                                  env1.put(Context.SECURITY_CREDENTIALS, "App2");           

                               

                                  Context ctx = new InitialContext(env1);

                                 

                                  return ctx;

                              }

                             

                              private static Thread createThread(final String host, final int port){

                                  Thread t = new Thread(new Runnable() {

                                     

                                      public void run() {

                                          try {

                                              Context ctx = getContext(host, port);                   

                                              final Queue q = (Queue)ctx.lookup("jms/queue/test");

                                              final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("/jms/RemoteConnectionFactory");

                                              produceMessages(factory, q, true, false, MesType.TEXT);

                                          } catch (Exception e) {

                                              // TODO Auto-generated catch block

                                              e.printStackTrace();

                                          }

                                      }

                                  });

                                  return t;

                              }

                           

                           

                          public class ProducingThread extends Thread{

                             

                              private Queue queue;

                              private QueueConnection queueConnection;

                              private int count;

                              private MesType type;

                              private boolean batched;

                             

                              public ProducingThread(Queue queue, QueueConnection queueConnection, boolean batched, int count, MesType type){

                                  this.queue = queue;

                                  this.queueConnection = queueConnection;

                                  this.batched = batched;

                                  this.count = count;

                                  this.type = type;

                              }

                           

                              public void run(){

                                  try {

                                      if (MesType.TEXT.equals(type)){

                                          TestUtils.produce(queueConnection, queue, batched, 50, count);

                                      } else if (MesType.MAP.equals(type)){

                                          TestUtils.produceMap(queueConnection, queue, batched, 50, count);

                                      } else if (MesType.OBJECT.equals(type)){

                                          TestUtils.produceObjects(queueConnection, queue, batched, 50, count);

                                      }           

                                     

                                  } catch (Exception e) {

                                      e.printStackTrace();

                                  }

                                  System.out.println("thread " + getId() + " has ended");

                                  return;

                              }

                          }

                          • 11. Re: Information about core bridges at clustering
                            qtm

                            and finally(from TestUtils):

                             

                            public static void produce(QueueSession queueSession, QueueSender queueSender, Queue queue, int numberOfMessages,

                                        boolean batched, int batchSize) throws Exception{

                                    TextMessage message = queueSession.createTextMessage();

                                    for (int i = 0; i < numberOfMessages; i++) {

                                        message.setJMSReplyTo(queue);

                                        message.setLongProperty("prop_long", 451l);

                                        message.setStringProperty("prop_str", "str1!");

                                        message.setText("Mes " + (i + 1));

                                        queueSender.send(message);

                                        if (i !=0 && batchSize !=0 && (i+1)%batchSize==0 && batched){

                                            queueSession.commit();

                                        }

                                    }

                                }

                             

                            public static void produce(QueueConnection queueConnection, Queue queue, boolean batched, int batchSize,

                                        int numberOfMessages) throws Exception{

                                   

                                    QueueSession queueSession = null;

                                    try{

                                        queueSession = queueConnection.createQueueSession((batched)?true:false,

                                                (batched)?Session.SESSION_TRANSACTED:Session.AUTO_ACKNOWLEDGE);

                                        produce(queueSession, queue, numberOfMessages, batched, batchSize);        

                                    }

                                    finally{

                                        if (queueSession != null){

                                            try {

                                                queueSession.close();

                                            } catch (JMSException e) {

                                                System.out.println("Closing error: " + e.toString());

                                            }

                                        }

                                    }

                                }

                            • 12. Re: Information about core bridges at clustering
                              clebert.suconic

                              Can you:

                               

                              - verify if this is happening on a checkout of Branch_2_2_AS7 or the Beta we just released?

                               

                              - if still happening can you attach your running example and I will take a look? (you advanced editor on this forum and you can attach files).

                              • 13. Re: Information about core bridges at clustering
                                qtm

                                Hi Clebert,

                                 

                                It's fixed in 2.3.0Beta. I could reproduce the issue with HQ 2.2.14 and in my tests with JBoss 7.1.2(HQ 2.2.16), but it's solved in 2.3.0Beta.

                                 

                                Since I need the replication too, I can't wait for 2.3.0 Final .

                                 

                                Thanks for your help

                                • 14. Re: Information about core bridges at clustering
                                  qtm

                                  I guess the JIRA issue for this bug is https://issues.jboss.org/browse/HORNETQ-1017 , right?