8 Replies Latest reply on Jun 4, 2014 10:58 PM by yangrui

    HornetQ 2.4.0 can't receive single message of more than 4096 bytes

    yangrui

      Hi All,

      I am using qpid messaging api to access Hornetq server, the Hornetq version is 2.4 and it can support AMQP 1.0.

      When I send a message with less than 4096 bytes, the receiver can get it.

      But when I send a message with more than 4096 bytes, the receiver can't get it and the server put error as follows.

      By the way, if I change hornetq to activemq, the receiver can work correctly.

      Best regards.

      ----------------------------------------------

      16:10:04,184 WARN  [org.hornetq.core.server] HQ222151: removing consumer which d

      id not handle a message, consumer=ServerConsumerImpl [id=324270037384, filter=nu

      ll, binding=LocalQueueBinding [address=testQueue, queue=QueueImpl[name=testQueue

      , postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=2fc82b91-cd1d-

      11e3-ac81-ffca7aefbfad]]@1a417e7, filter=null, name=testQueue, clusterName=testQ

      ueue2fc82b91-cd1d-11e3-ac81-ffca7aefbfad]], message=Reference[324270030919]:NON-

      RELIABLE:ServerMessage[messageID=324270030919,durable=false,userID=null,priority

      =0, bodySize=32768,expiration=0, durable=false, address=testQueue,properties=Typ

      edProperties[{HORNETQ_PROTON_FORMAT=1, PROTON_MESSAGE_SIZE=32422, HORNETQ_PROTON

      _MESSAGE_TYPE=0, HORNETQ_PROTON_MESSAGE_FORMAT=0}]]@5464185: java.nio.BufferOver

      flowException

              at java.nio.HeapByteBuffer.put(Unknown Source) [rt.jar:1.7.0_04]

              at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.put(Wri

      tableBuffer.java:80) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.EncoderImpl.writeRaw(EncoderImpl.java:76

      7) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.BinaryType$LongBinaryEncoding.writeEncod

      edValue(BinaryType.java:80) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.BinaryType$LongBinaryEncoding.writeEncod

      edValue(BinaryType.java:67) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.FloatingSizePrimitiveTypeEncoding.writeV

      alue(FloatingSizePrimitiveTypeEncoding.java:41) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.AbstractDescribedType$DynamicDescribedTy

      peEncoding.writeValue(AbstractDescribedType.java:114) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.AbstractDescribedType.write(AbstractDesc

      ribedType.java:75) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.codec.EncoderImpl.writeObject(EncoderImpl.java

      :731) [proton-j-impl.jar:0.5]

              at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.ja

      va:725) [proton-j-impl.jar:0.5]

              at org.hornetq.core.protocol.proton.ProtonUtils$OUTBOUND.transform(Proto

      nUtils.java:378) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.protocol.proton.ProtonConsumer.handleDelivery(Proton

      Consumer.java:210) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.protocol.proton.ProtonSession.sendMessage(ProtonSess

      ion.java:139) [hornetq-amqp-protocol.jar:]

              at org.hornetq.core.server.impl.ServerConsumerImpl.deliverStandardMessag

      e(ServerConsumerImpl.java:942) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.ServerConsumerImpl.proceedDeliver(Server

      ConsumerImpl.java:400) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:

      2449) [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.deliver(QueueImpl.java:2061) [

      hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl.access$1200(QueueImpl.java:81)

      [hornetq-server.jar:]

              at org.hornetq.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.ja

      va:2822) [hornetq-server.jar:]

              at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(Ordere

      dExecutorFactory.java:107) [hornetq-core-client.jar:]

              at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [rt

      .jar:1.7.0_04]

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [r

      t.jar:1.7.0_04]

              at java.lang.Thread.run(Unknown Source) [rt.jar:1.7.0_04]

        • 1. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
          ataylor

          moved to correct forum

          • 2. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
            ataylor

            can you provide some test code?

            • 3. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
              yangrui

              Please tell me move to which forum and the test code is followed.

              -------------------------------------------------

              qpid::messaging::Connection TheConnection = CreatConnector();
              qpid::messaging::Session TheSession;

              TheConnection.open();
              TheSession = TheConnection.createSession("xxx");
              Receiver receiver = TheSession.createReceiver("testQueue");

              Message message;
              while (receiver.fetch(message, Duration::MINUTE))   // here I can't receive message if a message over than 4096 bytes
              {

                const char* pStr = message.getContentPtr();
                nSize = message.getContentSize();

                TheSession.acknowledge(message);

              }

              • 4. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                ataylor

                ive already moved it to the correct forum.

                 

                how are you sending the messages, can you attach a test case that i can run end to end.

                • 5. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                  yangrui

                  OK,THANKS.

                  My test case is based on qpid 0.26 package.<http://qpid.apache.org/components/messaging-api/index.html>

                  -------------------------------------------send message-------------------------

                  #include <fstream>

                  std::string broker = "amqp:tcp:127.0.0.1:5672";

                  std::string connectionOptions = "{protocol:amqp1.0}";

                  qpid::messaging::Connection TheConnection(broker, connectionOptions);
                  qpid::messaging::Session TheSession;

                  TheConnection.open();
                  TheSession = TheConnection.createSession("xxx");

                  Sender sender = TheSession.createSender("testQueue");

                  //

                  fstream binary_file(std:string("d:\\1.txt"), ios::binary|ios::in); // 1.txt must be over than 4096 bytes
                  if (!binary_file) {  return; }

                  binary_file.seekg(0, ios::end);      //设置文件指针到文件流的尾部
                  streampos ps = binary_file.tellg();  //读取文件指针的位置
                  int size = (int)ps;

                  char *pPackageContent = new char[size + 1];
                  if (!pPackageContent)  {  return; }
                  memset(pPackageContent, 0, size + 1);

                  binary_file.read(pPackageContent, size);
                  binary_file.close();

                  //

                  Message msg;
                  msg.setContent(pPackageContent, size);
                  sender.send(msg, false);

                  delete[] pPackageContent;

                  if (TheConnection.isOpen())

                  {  TheConnection.close(); }
                  ------------------------------------------receive message-------------------

                  std::string broker = "amqp:tcp:127.0.0.1:5672";

                  std::string connectionOptions = "{protocol:amqp1.0}";

                  qpid::messaging::Connection TheConnection(broker, connectionOptions);

                  TheSession = TheConnection.createSession("xxx");
                  Receiver receiver = TheSession.createReceiver("testQueue");

                  Message message;
                  while (receiver.fetch(message, Duration::MINUTE))   // here I can't receive message if a message over than 4096 bytes
                  {

                    const char* pStr = message.getContentPtr();
                    nSize = message.getContentSize();

                    TheSession.acknowledge(message);

                  }

                  • 6. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                    ataylor

                    i really need something i can run, i.e. binaries, build and run scripts etc. or a java test

                    • 7. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                      yangrui

                      The test code referenced the file(hornetq-2.4.0.Final\examples\jms\proton-j) and the running context is same.

                      -------------------------------------------------------java test code---------------------

                      package com.hornetq.jms.example;

                      import java.util.List;

                      import org.apache.qpid.amqp_1_0.client.Connection;
                      import org.apache.qpid.amqp_1_0.client.Message;
                      import org.apache.qpid.amqp_1_0.client.Receiver;
                      import org.apache.qpid.amqp_1_0.client.Sender;
                      import org.apache.qpid.amqp_1_0.client.Session;
                      import org.apache.qpid.amqp_1_0.type.Binary;
                      import org.apache.qpid.amqp_1_0.type.Section;
                      import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
                      import org.apache.qpid.amqp_1_0.type.messaging.Data;
                      import org.hornetq.common.example.HornetQExample;

                      public class ProtonJExample extends HornetQExample
                      {
                          public static void main(String[] args)
                          {
                              new ProtonJExample().run(args);
                          }
                          @Override
                          public boolean runExample() throws Exception
                          {
                              Connection connection = null;
                              Session session = null;
                              String destination = "jms.queue.testQueue";
                              try
                              {
                                  // Step 1. Create an amqp qpid 1.0 connection
                                  connection = new Connection("localhost", 5672, null, null);

                                  // Step 2. Create a session
                                  session = connection.createSession();

                                  // Step 3. Create a sender
                                  Sender sender = session.createSender(destination);

                                  //
                                  Data data = new Data(new Binary(new byte[1024*100]));

                                  Message message = new Message(data);
                                  sender.send(message);
                                 
                                  Receiver rec1 = session.createReceiver(destination);
                                 
                                  // Step 6. set some credit so we can receive
                                  rec1.setCredit(UnsignedInteger.valueOf(1), false);

                                  // Step 7. receive the simple message
                                  Message m1 = rec1.receive(5000);
                                  List<Section> payload = m1.getPayload();
                                  Section bodySection = payload.get(4);
                                  byte[] raw = ((Data)bodySection).getValue().getArray();
                                  System.out.println(raw.length);
                                 
                                  // Step 8. acknowledge the message
                                  rec1.acknowledge(m1);
                                 
                              } finally
                              {
                                  if (connection != null)
                                  {
                                      // Step 9. close the connection
                                      connection.close();
                                  }
                              }
                             
                              return true;
                          }
                      }

                      • 8. Re: HornetQ 2.4.0 can't receive single message of more than 4096 bytes
                        yangrui

                        Hi. Taylor

                        I repeated my question and I hava a new discovery.

                        If the message content-type is "application/octet-stream",the question will happen.but if the message content type is "text/plain", the question will be solved.