8 Replies Latest reply on Jun 4, 2014 10:58 PM by yangrui yang

    HornetQ 2.4.0 can't receive single message of more than 4096 bytes

    yangrui yang Newbie

      Hi All,

      I am using qpid messaging api to access Hornetq server, the Hornetq version is 2.4 and it can support AMQP 1.0.

      When I send a message with less than 4096 bytes, the receiver can get it.

      But when I send a message with more than 4096 bytes, the receiver can't get it and the server put error as follows.

      By the way, if I change hornetq to activemq, the receiver can work correctly.

      Best regards.

      ----------------------------------------------

      16:10:04,184 WARN  [org.hornetq.core.server] HQ222151: removing consumer which d

      id not handle a message, consumer=ServerConsumerImpl [id=324270037384, filter=nu

      ll, binding=LocalQueueBinding [address=testQueue, queue=QueueImpl[name=testQueue

      , postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=2fc82b91-cd1d-

      11e3-ac81-ffca7aefbfad]]@1a417e7, filter=null, name=testQueue, clusterName=testQ

      ueue2fc82b91-cd1d-11e3-ac81-ffca7aefbfad]], message=Reference[324270030919]:NON-

      RELIABLE:ServerMessage[messageID=324270030919,durable=false,userID=null,priority

      =0, bodySize=32768,expiration=0, durable=false, address=testQueue,properties=Typ

      edProperties[{HORNETQ_PROTON_FORMAT=1, PROTON_MESSAGE_SIZE=32422, HORNETQ_PROTON

      _MESSAGE_TYPE=0, HORNETQ_PROTON_MESSAGE_FORMAT=0}]]@5464185: java.nio.BufferOver

      flowException

              at java.nio.HeapByteBuffer.put(Unknown Source) [rt.jar:1.7.0_04]

              at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.put(Wri

      tableBuffer.java:80) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.EncoderImpl.writeRaw(EncoderImpl.java:76

      7) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.BinaryType$LongBinaryEncoding.writeEncod

      edValue(BinaryType.java:80) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.BinaryType$LongBinaryEncoding.writeEncod

      edValue(BinaryType.java:67) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.FloatingSizePrimitiveTypeEncoding.writeV

      alue(FloatingSizePrimitiveTypeEncoding.java:41) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.AbstractDescribedType$DynamicDescribedTy

      peEncoding.writeValue(AbstractDescribedType.java:114) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.AbstractDescribedType.write(AbstractDesc

      ribedType.java:75) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.EncoderImpl.writeObject(EncoderImpl.java

      :731) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.ja

      va:725) [proton-j-impl.jar:0.5]

              at org.hornetq.core.protocol.proton.ProtonUtils$OUTBOUND.transform(Proto

      nUtils.java:378) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.protocol.proton.ProtonConsumer.handleDelivery(Proton

      Consumer.java:210) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.protocol.proton.ProtonSession.sendMessage(ProtonSess

      ion.java:139) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.server.impl.ServerConsumerImpl.deliverStandardMessag

      e(ServerConsumerImpl.java:942) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.ServerConsumerImpl.proceedDeliver(Server

      ConsumerImpl.java:400) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:

      2449) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.deliver(QueueImpl.java:2061) [

      hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.access$1200(QueueImpl.java:81)

      [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.ja

      va:2822) [hornetq-server.jar:]

              at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(Ordere

      dExecutorFactory.java:107) [hornetq-core-client.jar:]

              at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [rt

      .jar:1.7.0_04]

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [r

      t.jar:1.7.0_04]

              at java.lang.Thread.run(Unknown Source) [rt.jar:1.7.0_04]

        • 3. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
          yangrui yang Newbie

          Please tell me move to which forum and the test code is followed.

          -------------------------------------------------

          qpid::messaging::Connection TheConnection = CreatConnector();
          qpid::messaging::Session TheSession;

          TheConnection.open();
          TheSession = TheConnection.createSession("xxx");
          Receiver receiver = TheSession.createReceiver("testQueue");

          Message message;
          while (receiver.fetch(message, Duration::MINUTE))   // here I can't receive message if a message over than 4096 bytes
          {

            const char* pStr = message.getContentPtr();
            nSize = message.getContentSize();

            TheSession.acknowledge(message);

          }

          • 4. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
            Andy Taylor Master

            ive already moved it to the correct forum.

             

            how are you sending the messages, can you attach a test case that i can run end to end.

            • 5. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
              yangrui yang Newbie

              OK,THANKS.

              My test case is based on qpid 0.26 package.<http://qpid.apache.org/components/messaging-api/index.html>

              -------------------------------------------send message-------------------------

              #include <fstream>

              std::string broker = "amqp:tcp:127.0.0.1:5672";

              std::string connectionOptions = "{protocol:amqp1.0}";

              qpid::messaging::Connection TheConnection(broker, connectionOptions);
              qpid::messaging::Session TheSession;

              TheConnection.open();
              TheSession = TheConnection.createSession("xxx");

              Sender sender = TheSession.createSender("testQueue");

              //

              fstream binary_file(std:string("d:\\1.txt"), ios::binary|ios::in); // 1.txt must be over than 4096 bytes
              if (!binary_file) {  return; }

              binary_file.seekg(0, ios::end);      //设置文件指针到文件流的尾部
              streampos ps = binary_file.tellg();  //读取文件指针的位置
              int size = (int)ps;

              char *pPackageContent = new char[size + 1];
              if (!pPackageContent)  {  return; }
              memset(pPackageContent, 0, size + 1);

              binary_file.read(pPackageContent, size);
              binary_file.close();

              //

              Message msg;
              msg.setContent(pPackageContent, size);
              sender.send(msg, false);

              delete[] pPackageContent;

              if (TheConnection.isOpen())

              {  TheConnection.close(); }
              ------------------------------------------receive message-------------------

              std::string broker = "amqp:tcp:127.0.0.1:5672";

              std::string connectionOptions = "{protocol:amqp1.0}";

              qpid::messaging::Connection TheConnection(broker, connectionOptions);

              TheSession = TheConnection.createSession("xxx");
              Receiver receiver = TheSession.createReceiver("testQueue");

              Message message;
              while (receiver.fetch(message, Duration::MINUTE))   // here I can't receive message if a message over than 4096 bytes
              {

                const char* pStr = message.getContentPtr();
                nSize = message.getContentSize();

                TheSession.acknowledge(message);

              }

              • 6. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                Andy Taylor Master

                i really need something i can run, i.e. binaries, build and run scripts etc. or a java test

                • 7. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                  yangrui yang Newbie

                  The test code referenced the file(hornetq-2.4.0.Final\examples\jms\proton-j) and the running context is same.

                  -------------------------------------------------------java test code---------------------

                  package com.hornetq.jms.example;

                  import java.util.List;

                  import org.apache.qpid.amqp_1_0.client.Connection;
                  import org.apache.qpid.amqp_1_0.client.Message;
                  import org.apache.qpid.amqp_1_0.client.Receiver;
                  import org.apache.qpid.amqp_1_0.client.Sender;
                  import org.apache.qpid.amqp_1_0.client.Session;
                  import org.apache.qpid.amqp_1_0.type.Binary;
                  import org.apache.qpid.amqp_1_0.type.Section;
                  import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
                  import org.apache.qpid.amqp_1_0.type.messaging.Data;
                  import org.hornetq.common.example.HornetQExample;

                  public class ProtonJExample extends HornetQExample
                  {
                      public static void main(String[] args)
                      {
                          new ProtonJExample().run(args);
                      }
                      @Override
                      public boolean runExample() throws Exception
                      {
                          Connection connection = null;
                          Session session = null;
                          String destination = "jms.queue.testQueue";
                          try
                          {
                              // Step 1. Create an amqp qpid 1.0 connection
                              connection = new Connection("localhost", 5672, null, null);

                              // Step 2. Create a session
                              session = connection.createSession();

                              // Step 3. Create a sender
                              Sender sender = session.createSender(destination);

                              //
                              Data data = new Data(new Binary(new byte[1024*100]));

                              Message message = new Message(data);
                              sender.send(message);
                             
                              Receiver rec1 = session.createReceiver(destination);
                             
                              // Step 6. set some credit so we can receive
                              rec1.setCredit(UnsignedInteger.valueOf(1), false);

                              // Step 7. receive the simple message
                              Message m1 = rec1.receive(5000);
                              List<Section> payload = m1.getPayload();
                              Section bodySection = payload.get(4);
                              byte[] raw = ((Data)bodySection).getValue().getArray();
                              System.out.println(raw.length);
                             
                              // Step 8. acknowledge the message
                              rec1.acknowledge(m1);
                             
                          } finally
                          {
                              if (connection != null)
                              {
                                  // Step 9. close the connection
                                  connection.close();
                              }
                          }
                         
                          return true;
                      }
                  }

                  • 8. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                    yangrui yang Newbie

                    Hi. Taylor

                    I repeated my question and I hava a new discovery.

                    If the message content-type is "application/octet-stream",the question will happen.but if the message content type is "text/plain", the question will be solved.