Improperly-formatted (or improperly-interpreted) AMQP messages?
sumitsu Oct 16, 2013 3:00 PMI am using the Apache Qpid JMS-AMQP 1.0 client library to connect to the AMQP port on a HornetQ 2.4.0.beta1 instance, on top of which I have built an application using the standard JMS API for publishing to and consuming from a queue.
I am observing a problem wherein the Qpid library is unable to recognize the structure of consumed messages which were published to the broker as standard TextMessage/ObjectMessage instances. What happens instead is, in the case of TextMessage, dequeued messages show up at the consumer as instances of AmqpMessageImpl, which implements neither of said JMS interfaces. Per a response I received on the Qpid mailing list:
A message will be returned as an AmqpMessageImpl if the client can't work out what sort of JMS Message it is supposed to represent.
The logic in the client for detecting a TextMessage is as follows:
else if (bodySection instanceof AmqpValue &&
((AmqpValue)bodySection).getValue() instanceof String) {
message = new TextMessageImpl(header, messageAnnotations, properties, appProperties, (String) ((AmqpValue)bodySection).getValue(), footer, _session);
}
That is the client is expecting a TextMessage to be represented in AMQP as a message with an amqp-value section where the value is of type String. It seems as if HornetQ is not sending the messages in that form and thus the client is not recognising the message as a TextMessage.
In the case of ObjectMessage, the consumer is able to identify the messages as ObjectMessage, but the client throws a JMSException reporting "invalid stream header: 00000000" when I attempt to invoke ObjectMessage#getObject:
14:41:30.863 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000 javax.jms.JMSException: invalid stream header: 00000000 at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?] at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?] at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?] at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25]
Relevant logs (including wire-level logs from Qpid):
- Publishing a TextMessage:
14:48:02.885 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending text message: class org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=4,deliveryTag=4,messageFormat=0,settled=false}
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
FINE: SEND[{host}:10005] : \x00\x00\x00\xe4\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x04\xa0\x014CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0T\x0a\xa0$db5a5fb1-7d8c-4506-89cd-5b2919d56b0e@\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw\xa1?test message:1381949282885:104adf66-91d4-03f8-7a5b-42451a6b8003\x00Sx\xc1\x01\x00
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead
FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x03R\x03A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x05p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x05Rc
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=3,last=3,settled=true}
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=5,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=5,linkCredit=99}
- Consuming a TextMessage:
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead
FINE: RECV [{host}:10005] : \x00\x00\x00\xc3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x08\xa0\x010C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc07\x0a\x81\x00\x00\x00\x00\x00\x00\x0d @\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw@\x00Sx\xc1\x01\x00
14:48:02.896 [Thread-4] WARN [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Unrecognized message type: class org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=8,deliveryTag=0,messageFormat=0}
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=8,last=8,settled=true,state=Accepted{}}
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x08R\x08A\x00S$E
- Publishing an ObjectMessage:
14:47:52.884 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending object message: class org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=3,deliveryTag=3,messageFormat=0,settled=false}
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
FINE: SEND[{host}:10005] : \x00\x00\x01\x10\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x03\xa0\x013CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0y\x0a\xa0$2a1ff485-9b1c-4d6e-8180-d81742517118@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object@@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xa0F\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00Sx\xc1\x01\x00
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead
FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x02R\x02A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x04p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x04Rc
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=2,last=2,settled=true}
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=4,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=4,linkCredit=99}
- Consuming an ObjectMessage:
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead
FINE: RECV [{host}:10005] : \x00\x00\x03\xf3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x07\xa0\x013C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc0\x81\x0a\x81\x00\x00\x00\x00\x00\x00\x0d\x1c@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object\xa3$application/x-java-serialized-object@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xb0\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0d\x1c\x00\x00\x00\x00W\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x0d\x1c\x01\x00\x00\x006j\x00m\x00s\x00.\x00q\x00u\x00e\x00u\x00e\x00.\x00T\x00e\x00s\x00t\x00Q\x00u\x00e\x00u\x00e\x002\x000\x001\x003\x001\x000\x000\x007\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01A\xc2\x98q\x16\x04\x01\x00\x00\x00\x07\x00\x00\x008H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00R\x00E\x00A\x00T\x00I\x00O\x00N\x00_\x00T\x00I\x00M\x00E\x00\x07\x00\x00\x01A\xc2\x98s4\x00\x00\x00*H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00O\x00N\x00T\x00E\x00N\x00T\x00_\x00T\x00Y\x00P\x00E\x00\x0a\x00\x00\x00Ha\x00p\x00p\x00l\x00i\x00c\x00a\x00t\x00i\x00o\x00n\x00/\x00x\x00-\x00j\x00a\x00v\x00a\x00-\x00s\x00e\x00r\x00i\x00a\x00l\x00i\x00z\x00e\x00d\x00-\x00o\x00b\x00j\x00e\x00c\x00t\x00\x00\x00\x00&P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00S\x00I\x00Z\x00E\x00\x06\x00\x00\x00\xfa\x00\x00\x00`H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00A\x00N\x00N\x00O\x00T\x00A\x00T\x00I\x00O\x00N\x00S\x00_\x00x\x00-\x00o\x00p\x00t\x00-\x00t\x00o\x00-\x00t\x00y\x00p\x00e\x00\x0a\x00\x00\x00\x0aq\x00u\x00e\x00u\x00e\x00\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00T\x00Y\x00P\x00E\x00\x06\x00\x00\x00\x00\x00\x00\x00:H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00Sx\xc1\x01\x00
14:47:52.907 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000
javax.jms.JMSException: invalid stream header: 00000000
at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?]
at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?]
at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?]
at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25]
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive
FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=7,deliveryTag=3,messageFormat=0}
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame
FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=7,last=7,settled=true,state=Accepted{}}
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes
FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x07R\x07A\x00S$E
If anyone has observed this problem before, please let me know. My test case code is available here:
https://github.com/sumitsu/qpidamqpjmstest
Running TestQueuePublisherSimplified publishes a message to queue jms.queue.TestQueue20131007 every 10 seconds, alternating between TextMessage and ObjectMessage. Meanwhile, running TestQueueConsumerSimplified consumes messages from jms.queue.TestQueue20131007.