1 Reply Latest reply on Jun 12, 2014 2:47 PM by pkhanal

    Improperly-formatted (or improperly-interpreted) AMQP messages?

    sumitsu

      I am using the Apache Qpid JMS-AMQP 1.0 client library to connect to the AMQP port on a HornetQ 2.4.0.beta1 instance, on top of which I have built an application using the standard JMS API for publishing to and consuming from a queue.

       

      I am observing a problem wherein the Qpid library is unable to recognize the structure of consumed messages which were published to the broker as standard TextMessage/ObjectMessage instances.  What happens instead is, in the case of TextMessage, dequeued messages show up at the consumer as instances of AmqpMessageImpl, which implements neither of said JMS interfaces.  Per a response I received on the Qpid mailing list:

       

      A message will be returned as an AmqpMessageImpl if the client can't work out what sort of JMS Message it is supposed to represent.

       

      The logic in the client for detecting a TextMessage is as follows:

       

      else if (bodySection instanceof AmqpValue &&

               ((AmqpValue)bodySection).getValue() instanceof String) {

          message = new TextMessageImpl(header, messageAnnotations, properties, appProperties, (String) ((AmqpValue)bodySection).getValue(), footer, _session);

      }

       

      That is the client is expecting a TextMessage to be represented in AMQP as a message with an amqp-value section where the value is of type String.  It seems as if HornetQ is not sending the messages in that form and thus the client is not recognising the message as a TextMessage.

       

      In the case of ObjectMessage, the consumer is able to identify the messages as ObjectMessage, but the client throws a JMSException reporting "invalid stream header: 00000000" when I attempt to invoke ObjectMessage#getObject:

       

      14:41:30.863 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000
      javax.jms.JMSException: invalid stream header: 00000000
          at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?]
          at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?]
          at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?]
          at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25]
      
      

       

      Relevant logs (including wire-level logs from Qpid):

       

      • Publishing a TextMessage:
      14:48:02.885 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending text message: class org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
      FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=4,deliveryTag=4,messageFormat=0,settled=false}
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
      FINE: SEND[{host}:10005] : \x00\x00\x00\xe4\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x04\xa0\x014CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0T\x0a\xa0$db5a5fb1-7d8c-4506-89cd-5b2919d56b0e@\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw\xa1?test message:1381949282885:104adf66-91d4-03f8-7a5b-42451a6b8003\x00Sx\xc1\x01\x00
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead
      FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x03R\x03A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x05p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x05Rc
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=3,last=3,settled=true}
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=5,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=5,linkCredit=99}
      
      

       

      • Consuming a TextMessage:
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead
      FINE: RECV [{host}:10005] : \x00\x00\x00\xc3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x08\xa0\x010C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc07\x0a\x81\x00\x00\x00\x00\x00\x00\x0d @\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw@\x00Sx\xc1\x01\x00
      14:48:02.896 [Thread-4] WARN  [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Unrecognized message type: class org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=8,deliveryTag=0,messageFormat=0}
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
      FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=8,last=8,settled=true,state=Accepted{}}
      Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
      FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x08R\x08A\x00S$E
      
      
      

       

      • Publishing an ObjectMessage:
      14:47:52.884 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending object message: class org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
      FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=3,deliveryTag=3,messageFormat=0,settled=false}
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
      FINE: SEND[{host}:10005] : \x00\x00\x01\x10\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x03\xa0\x013CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0y\x0a\xa0$2a1ff485-9b1c-4d6e-8180-d81742517118@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object@@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xa0F\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00Sx\xc1\x01\x00
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead
      FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x02R\x02A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x04p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x04Rc
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=2,last=2,settled=true}
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=4,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=4,linkCredit=99}
      
      

       

      • Consuming an ObjectMessage:
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead
      FINE: RECV [{host}:10005] : \x00\x00\x03\xf3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x07\xa0\x013C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc0\x81\x0a\x81\x00\x00\x00\x00\x00\x00\x0d\x1c@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object\xa3$application/x-java-serialized-object@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xb0\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0d\x1c\x00\x00\x00\x00W\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x0d\x1c\x01\x00\x00\x006j\x00m\x00s\x00.\x00q\x00u\x00e\x00u\x00e\x00.\x00T\x00e\x00s\x00t\x00Q\x00u\x00e\x00u\x00e\x002\x000\x001\x003\x001\x000\x000\x007\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01A\xc2\x98q\x16\x04\x01\x00\x00\x00\x07\x00\x00\x008H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00R\x00E\x00A\x00T\x00I\x00O\x00N\x00_\x00T\x00I\x00M\x00E\x00\x07\x00\x00\x01A\xc2\x98s4\x00\x00\x00*H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00O\x00N\x00T\x00E\x00N\x00T\x00_\x00T\x00Y\x00P\x00E\x00\x0a\x00\x00\x00Ha\x00p\x00p\x00l\x00i\x00c\x00a\x00t\x00i\x00o\x00n\x00/\x00x\x00-\x00j\x00a\x00v\x00a\x00-\x00s\x00e\x00r\x00i\x00a\x00l\x00i\x00z\x00e\x00d\x00-\x00o\x00b\x00j\x00e\x00c\x00t\x00\x00\x00\x00&P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00S\x00I\x00Z\x00E\x00\x06\x00\x00\x00\xfa\x00\x00\x00`H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00A\x00N\x00N\x00O\x00T\x00A\x00T\x00I\x00O\x00N\x00S\x00_\x00x\x00-\x00o\x00p\x00t\x00-\x00t\x00o\x00-\x00t\x00y\x00p\x00e\x00\x0a\x00\x00\x00\x0aq\x00u\x00e\x00u\x00e\x00\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00T\x00Y\x00P\x00E\x00\x06\x00\x00\x00\x00\x00\x00\x00:H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00Sx\xc1\x01\x00
      14:47:52.907 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000
      javax.jms.JMSException: invalid stream header: 00000000
          at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?]
          at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?]
          at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?]
          at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25]
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
      FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=7,deliveryTag=3,messageFormat=0}
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
      FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=7,last=7,settled=true,state=Accepted{}}
      Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
      FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x07R\x07A\x00S$E
      
      

       

      If anyone has observed this problem before, please let me know.  My test case code is available here:

       

      https://github.com/sumitsu/qpidamqpjmstest

       

      Running TestQueuePublisherSimplified publishes a message to queue jms.queue.TestQueue20131007 every 10 seconds, alternating between TextMessage and ObjectMessage.  Meanwhile, running TestQueueConsumerSimplified consumes messages from jms.queue.TestQueue20131007.