2 Replies Latest reply on Aug 2, 2010 2:03 PM by gentilim

    Netty HTTP not allowing bigger messages

    gentilim

      I'm using HornetQ 2.1.1 Final and have a server/client messaging system.  I am trying to configure the communication to be routed through HTTP, but found that messages are not being received by the client (maybe not being sent by the server?).   It appears to work when the message data is a smaller amount, but once it gets to a bigger size, it breaks down.  The size of the bigger messages is under the large message size of 100KiB.  With my setup and sample, it all works when http-enabled is false, but once that is set to true, the client stops getting messages.  I get the same results with and without using the HTTP tunneling servlet.  Is there a setting I missed perhaps?  Perhaps an issue with how http is used?  Any insight would be appreciated, thank you.

       

      Here is the netty configs:

      <!-- Connectors -->
      <connectors>
            <connector name="netty">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="http-enabled" value="true"/>
               <param key="host" value="${jboss.bind.address:localhost}"/>
               <param key="port" value="${hornetq.remoting.netty.port:28080}"/>
            </connector>
       
            <connector name="in-vm">
               <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
               <param key="server-id" value="${hornetq.server-id:0}"/>
            </connector>
         </connectors>
       
      <!-- Acceptors -->
      <acceptors>
       
            <acceptor name="in-vm">
               <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
               <param key="server-id" value="0"/>
            </acceptor>
       
            <acceptor name="netty-acceptor">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="http-enabled" value="true"/>
               <param key="port" value="28080"/>
            </acceptor>
       
         </acceptors>
      

       

      Connection factory:

      <connection-factory name="ConnectionFactory">
              <connectors>
               <connector-ref connector-name="in-vm"/>
              </connectors>
              <entries>
               <entry name="/ServerConnectionFactory"/>
              </entries>
          </connection-factory>
      

       

      Client test code:

              //create server connection
              Map<String, Object> props = new HashMap<String, Object>();
              props.put("host", "localhost");
              props.put("port", 28080);
              props.put("http-enabled", true);
              ClientSessionFactory factory = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName(), props, "netty"));
              ClientSession clientSession = null;
              try
              {
                  //get session
                  clientSession = factory.createSession(true, true, true, true);
                  clientSession.start();
      
                  //get queue
                  clientSession.createTemporaryQueue("jms.queue.exampleQueue", "jms.queue.exampleQueue");
                  QueueQuery q = clientSession.queueQuery(SimpleString.toSimpleString("jms.queue.exampleQueue"));
                  if(q.isExists())
                  {
                      //consume
                      ClientConsumer consumer = clientSession.createConsumer("jms.queue.exampleQueue", false);
                      consumer.setMessageHandler(new MessageHandler(){
                          @Override
                          public void onMessage(ClientMessage message)
                          {
                              log.info("Message received - " + message.getIntProperty("count"));
      
                              if(message.getBooleanProperty("end"))
                              {
                                  run = false;
                              }
                          }
                      });
      
                      //start web service
                      Iss.startQueueExample();
      
                      while(run)
                      {
                          //run
                      }
      
                      consumer.close();
                  }
                  else
                  {
                      log.error("Cannot find queue");
                  }
              }
              catch (HornetQException e)
              {
                  log.error(e);
              }
              log.info("Done");
              try
              {
                  if(clientSession != null)
                  {
                      clientSession.close();
                  }
              }
              catch (HornetQException e)
              {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
              factory.close();
      

       

      Server web service code:

       

                      logger.info("Start startQueueExample");
                      try
                      {
                          //get factory
                          javax.naming.Context initialContext = new InitialContext();
                          HornetQConnectionFactory cf = (HornetQConnectionFactory) initialContext.lookup("/ServerConnectionFactory");
                          ClientSessionFactory factory = cf.getCoreFactory();
                          initialContext.close();
      
                          //create session
                          ClientSession session = factory.createSession(true, true, true, true);
                          session.start();
      
                          //get queue
                          try
                          {
                              QueueQuery q = session.queueQuery(SimpleString.toSimpleString("jms.queue.exampleQueue"));
                              if(q.isExists() && q.getConsumerCount() > 0)
                              {
                                  //produce
                                  ClientProducer producer = session.createProducer(q.getAddress());
      
                                  //create the array to send
                                  byte[] data = new byte[8000]; //changing this size alters http results!
      
                                  for(Integer i = 0; i < 100; ++i)
                                  {
                                      ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, false);
                                      message.putIntProperty("count", i);
                                      message.putBooleanProperty("end", false);
                                      message.putBytesProperty("data", data);
                                      producer.send(message);
                                      logger.info("Sent data - " + i);
      
                                      //delay
                                      try
                                      {
                                          Thread.sleep(500);
                                      }
                                      catch (InterruptedException e)
                                      {
                                          logger.error(e);
                                      }
                                  }
                                  ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, false);
                                  message.putIntProperty("count", -1);
                                  message.putBooleanProperty("end", true);
                                  producer.send(message);
                                  logger.info("Sent end");
                                  producer.close();
                              }
                              else
                              {
                                  logger.info("Cannot find queue");
                              }
                          }
                          catch (HornetQException e)
                          {
                              logger.error(e);
                          }
                          session.close();
                          factory.close();
                          cf.close();
                      }
                      catch (HornetQException e)
                      {
                          logger.error(e);
                      }
                      catch (NamingException e)
                      {
                          logger.error(e);
                      }
                      try
                      {
                          Thread.sleep(500);
                      }
                      catch (InterruptedException e)
                      {
                          logger.error(e);
                      }
      

       

      When http-enabled is false, all output is expected; all messages are sent and received.  When I change http-enabled to true (and only that change) I get different results.  When tested with a the data byte array of size 100, everything worked fine.  When tested with a byte array of size 8000, all messages appear to be sent, but the client only received a hand full of the sent messages.  At the end of the run, there is a warning in the server console:

       

      WARN  [RemotingConnectionImpl] Connection failure has been detected: Did not receive ping from /127.0.0.1:64540. It is likely the client has exited or crashed without closing its connection, or the network between the server and client has failed. The connection will now be closed. [code=3]
      WARN  [ServerSessionImpl] Client connection failed, clearing up resources for session f142328e-9688-11df-b4ba-002219285654
      WARN  [ServerSessionImpl] Cleared up resources for session f142328e-9688-11df-b4ba-002219285654
      WARN  [ServerSessionPacketHandler] Client connection failed, clearing up resources for session f142328e-9688-11df-b4ba-002219285654
      WARN  [ServerSessionPacketHandler] Cleared up resources for session f142328e-9688-11df-b4ba-002219285654
      

       

      This warning was never seen on successful runs.  When tested with a byte array of size 40000 all messages appear to be sent, though an expection can be seen on the server after the first couple of message are sent, and the other warning is seen at the end as well:

       

      Sent data - 0
      Sent data - 1
      Exception in thread "pool-34-thread-1" 
      java.lang.NullPointerException
       at org.hornetq.core.remoting.impl.netty.HttpAcceptorHandler$ResponseRunner.run(HttpAcceptorHandler.java:173)
       at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
       at java.lang.Thread.run(Thread.java:619)
      Sent data - 2
      Exception in thread "pool-34-thread-2" 
      java.lang.NullPointerException
       at org.hornetq.core.remoting.impl.netty.HttpAcceptorHandler$ResponseRunner.run(HttpAcceptorHandler.java:173)
       at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
       at java.lang.Thread.run(Thread.java:619)
      Sent data - 3
      Sent data - 4
      Sent data - 5
      Sent data - 6
      Sent data - 7
      Sent data - 8
      Sent data - 9
      Sent data - 10
      

      continue to end...

       

       

      On the client, no messages are received.

        • 1. Re: Netty HTTP not allowing bigger messages
          gentilim

          It appears that with the bigger messages the server is not responding to the client calls, or perhaps the client is getting disconnected from the server.  Again, this is only seen with bigger messages, not large, and when http-enabled is true.

           

          Client side error seen

           

          SEVERE: Failed to handle failover
          HornetQException[errorCode=3 message=Timed out waiting for response when sending packet 32]
          at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:277)
          at org.hornetq.core.client.impl.ClientSessionImpl.handleFailover(ClientSessionImpl.java:874)
          at org.hornetq.core.client.impl.FailoverManagerImpl.reconnectSessions(FailoverManagerImpl.java:818)
          at org.hornetq.core.client.impl.FailoverManagerImpl.failoverOrReconnect(FailoverManagerImpl.java:719)
          at org.hornetq.core.client.impl.FailoverManagerImpl.handleConnectionFailure(FailoverManagerImpl.java:581)
          at org.hornetq.core.client.impl.FailoverManagerImpl.connectionException(FailoverManagerImpl.java:298)
          at org.hornetq.core.remoting.impl.netty.NettyConnector$Listener$2.run(NettyConnector.java:692)
          at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:96)
          at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
          at java.lang.Thread.run(Thread.java:619)
          
          • 2. Re: Netty HTTP not allowing bigger messages
            gentilim

            I've gone back and tried to use the http-transport example to see if it was a config issue and am seeing the same problems.  I modified the example to send some byte messages instead of just one test message.  When the byte array used in the message is small, the example runs without error.  When the byte array grows big enough, a time out exception is reported and the example fails.  Does anyone else see this problem?  Is it an issue with my setup or perhaps a bug withing HornetQ?

             

            Modified http-transport example

            /*
             * Copyright 2009 Red Hat, Inc.
             * Red Hat licenses this file to you under the Apache License, version
             * 2.0 (the "License"); you may not use this file except in compliance
             * with the License.  You may obtain a copy of the License at
             *    http://www.apache.org/licenses/LICENSE-2.0
             * Unless required by applicable law or agreed to in writing, software
             * distributed under the License is distributed on an "AS IS" BASIS,
             * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
             * implied.  See the License for the specific language governing
             * permissions and limitations under the License.
             */
            package org.hornetq.jms.example;
             
            import javax.jms.Connection;
            import javax.jms.ConnectionFactory;
            import javax.jms.MessageConsumer;
            import javax.jms.MessageProducer;
            import javax.jms.Queue;
            import javax.jms.Session;
            import javax.jms.BytesMessage;
            import javax.jms.TextMessage;
            import javax.naming.InitialContext;
             
            import org.hornetq.common.example.HornetQExample;
             
            /**
             * A simple JMS Queue example that uses HTTP protocol.
             *
             * @author <a href="hgao@redhat.com">Howard Gao</a>
             */
            public class HttpTransportExample extends HornetQExample
            {
               public static void main(final String[] args)
               {
                  new HttpTransportExample().run(args);
               }
             
               @Override
               public boolean runExample() throws Exception
               {
                  Connection connection = null;
                  InitialContext initialContext = null;
                  try
                  {
                     // Step 1. Create an initial context to perform the JNDI lookup.
                     initialContext = getContext(0);
             
                     // Step 2. Perfom a lookup on the queue
                     Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
             
                     // Step 3. Perform a lookup on the Connection Factory
                     ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
             
                     // Step 4.Create a JMS Connection
                     connection = cf.createConnection();
             
                     System.out.println("connection created: " + connection);
             
                     // Step 5. Create a JMS Session
                     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             
                     // Step 6. Create a JMS Message Producer
                     MessageProducer producer = session.createProducer(queue);
             
            for(int count = 1; count <= 10; ++count)
            {
            // Step 7. Create a Byte Message
            BytesMessage message = session.createBytesMessage();
                         message.setStringProperty("title", "This is a text message - " + count);
                         message.writeBytes(new byte[400]);     //change byte[] size to see issue
             
            //System.out.println("Sent message: " + message.getText());
            System.out.println("Sent message: " + count);
             
            // Step 8. Send the Message
            producer.send(message);
            }
             
                     // Step 9. Create a JMS Message Consumer
                     MessageConsumer messageConsumer = session.createConsumer(queue);
             
                     // Step 10. Start the Connection
                     connection.start();
             
                     for(int count = 1; count <= 10; ++count)
            {
            // Step 11. Receive the message
            BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(5000);
             
            System.out.println("Received message: " + messageReceived.getStringProperty("title"));
            }
             
                     initialContext.close();
             
                     return true;
                  }
                  finally
                  {
                     // Step 12. Be sure to close our JMS resources!
                     if (initialContext != null)
                     {
                        initialContext.close();
                     }
                     if (connection != null)
                     {
                        connection.close();
                     }
                  }
               }
             
            }
            

             

            example output

                 [java] Sent message: 1
                 [java] javax.jms.JMSException: Timed out waiting for response when sending packet 71
                 [java]     at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:277)
                 [java]     at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:285)
                 [java]     at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:139)
                 [java]     at org.hornetq.jms.client.HornetQMessageProducer.doSend(HornetQMessageProducer.java:451)
                 [java]     at org.hornetq.jms.client.HornetQMessageProducer.send(HornetQMessageProducer.java:199)
                 [java]     at org.hornetq.jms.example.HttpTransportExample.runExample(HttpTransportExample.java:79)
                 [java]     at org.hornetq.common.example.HornetQExample.run(HornetQExample.java:73)
                 [java]     at org.hornetq.jms.example.HttpTransportExample.main(HttpTransportExample.java:36)
                 [java] Caused by: HornetQException[errorCode=3 message=Timed out waiting for response when sending packet 71]
                 [java]     ... 8 more
                 [java]
                 [java] #####################
                 [java] ###    FAILURE!   ###
                 [java] #####################
                 [java] Java Result: 1