Improperly-formatted (or improperly-interpreted) AMQP messages?
sumitsu Oct 16, 2013 3:00 PMI am using the Apache Qpid JMS-AMQP 1.0 client library to connect to the AMQP port on a HornetQ 2.4.0.beta1 instance, on top of which I have built an application using the standard JMS API for publishing to and consuming from a queue.
I am observing a problem wherein the Qpid library is unable to recognize the structure of consumed messages which were published to the broker as standard TextMessage/ObjectMessage instances. What happens instead is, in the case of TextMessage, dequeued messages show up at the consumer as instances of AmqpMessageImpl, which implements neither of said JMS interfaces. Per a response I received on the Qpid mailing list:
A message will be returned as an AmqpMessageImpl if the client can't work out what sort of JMS Message it is supposed to represent.
The logic in the client for detecting a TextMessage is as follows:
else if (bodySection instanceof AmqpValue &&
((AmqpValue)bodySection).getValue() instanceof String) {
message = new TextMessageImpl(header, messageAnnotations, properties, appProperties, (String) ((AmqpValue)bodySection).getValue(), footer, _session);
}
That is the client is expecting a TextMessage to be represented in AMQP as a message with an amqp-value section where the value is of type String. It seems as if HornetQ is not sending the messages in that form and thus the client is not recognising the message as a TextMessage.
In the case of ObjectMessage, the consumer is able to identify the messages as ObjectMessage, but the client throws a JMSException reporting "invalid stream header: 00000000" when I attempt to invoke ObjectMessage#getObject:
14:41:30.863 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000 javax.jms.JMSException: invalid stream header: 00000000 at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?] at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?] at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?] at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25]
Relevant logs (including wire-level logs from Qpid):
- Publishing a TextMessage:
14:48:02.885 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending text message: class org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=4,deliveryTag=4,messageFormat=0,settled=false} Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes FINE: SEND[{host}:10005] : \x00\x00\x00\xe4\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x04\xa0\x014CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0T\x0a\xa0$db5a5fb1-7d8c-4506-89cd-5b2919d56b0e@\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw\xa1?test message:1381949282885:104adf66-91d4-03f8-7a5b-42451a6b8003\x00Sx\xc1\x01\x00 Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x03R\x03A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x05p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x05Rc Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=3,last=3,settled=true} Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=5,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=5,linkCredit=99}
- Consuming a TextMessage:
Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.client.Connection doRead FINE: RECV [{host}:10005] : \x00\x00\x00\xc3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x08\xa0\x010C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc07\x0a\x81\x00\x00\x00\x00\x00\x00\x0d @\xa1\x1bjms.queue.TestQueue20131007@@@@@@\x83\x00\x00\x01A\xc2\x98\x9aE\x00St\xc1\x01\x00\x00Sw@\x00Sx\xc1\x01\x00 14:48:02.896 [Thread-4] WARN [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Unrecognized message type: class org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=8,deliveryTag=0,messageFormat=0} Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=8,last=8,settled=true,state=Accepted{}} Oct 16, 2013 2:48:02 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x08R\x08A\x00S$E
- Publishing an ObjectMessage:
14:47:52.884 [main] DEBUG [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestQueuePublisherSimplified] Sending object message: class org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame FINE: SEND[{host}:10005|0] : Transfer{handle=0,deliveryId=3,deliveryTag=3,messageFormat=0,settled=false} Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes FINE: SEND[{host}:10005] : \x00\x00\x01\x10\x02\x00\x00\x00\x00S\x14\xc0\x09\x05CR\x03\xa0\x013CB\x00Sp\xc0\x04\x02AP\x04\x00Sr\xc1\x17\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0y\x0a\xa0$2a1ff485-9b1c-4d6e-8180-d81742517118@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object@@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xa0F\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00Sx\xc1\x01\x00 Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead FINE: RECV [{host}:10005] : \x00\x00\x00\x14\x02\x00\x00\x00\x00S\x15\xc0\x07\x04AR\x02R\x02A\x00\x00\x00!\x02\x00\x00\x00\x00S\x13\xc0\x14\x07R\x04p\x00\x00\x03\xffR\x01p\x00\x00\x04\x00CR\x04Rc Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Disposition{role=receiver,first=2,last=2,settled=true} Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Flow{nextIncomingId=4,incomingWindow=1023,nextOutgoingId=1,outgoingWindow=1024,handle=0,deliveryCount=4,linkCredit=99}
- Consuming an ObjectMessage:
Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.client.Connection doRead FINE: RECV [{host}:10005] : \x00\x00\x03\xf3\x02\x00\x00\x00\x00S\x14\xc0\x08\x04CR\x07\xa0\x013C\x00Sp\xc0\x08\x05AP\x04C@R\x01\x00Sr\xc1P\x04\xa3\x0dx-opt-to-type\xa1\x05queue\xa10HORNETQ_PROTON_MESSAGE_ANNOTATIONS_x-opt-to-type\xa1\x05queue\x00Ss\xc0\x81\x0a\x81\x00\x00\x00\x00\x00\x00\x0d\x1c@\xa1\x1bjms.queue.TestQueue20131007@@@\xa3$application/x-java-serialized-object\xa3$application/x-java-serialized-object@\x83\x00\x00\x01A\xc2\x98s4\x00St\xc1\x01\x00\x00Su\xb0\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0d\x1c\x00\x00\x00\x00W\xac\xed\x00\x05t\x00?test message:1381949272884:a49509c7-d6ca-2c2e-6c5f-9a7a8fcaacc6\x00\x00\x02\xe2\x00\x00\x00\x00\x00\x00\x0d\x1c\x01\x00\x00\x006j\x00m\x00s\x00.\x00q\x00u\x00e\x00u\x00e\x00.\x00T\x00e\x00s\x00t\x00Q\x00u\x00e\x00u\x00e\x002\x000\x001\x003\x001\x000\x000\x007\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01A\xc2\x98q\x16\x04\x01\x00\x00\x00\x07\x00\x00\x008H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00R\x00E\x00A\x00T\x00I\x00O\x00N\x00_\x00T\x00I\x00M\x00E\x00\x07\x00\x00\x01A\xc2\x98s4\x00\x00\x00*H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00C\x00O\x00N\x00T\x00E\x00N\x00T\x00_\x00T\x00Y\x00P\x00E\x00\x0a\x00\x00\x00Ha\x00p\x00p\x00l\x00i\x00c\x00a\x00t\x00i\x00o\x00n\x00/\x00x\x00-\x00j\x00a\x00v\x00a\x00-\x00s\x00e\x00r\x00i\x00a\x00l\x00i\x00z\x00e\x00d\x00-\x00o\x00b\x00j\x00e\x00c\x00t\x00\x00\x00\x00&P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00S\x00I\x00Z\x00E\x00\x06\x00\x00\x00\xfa\x00\x00\x00`H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00A\x00N\x00N\x00O\x00T\x00A\x00T\x00I\x00O\x00N\x00S\x00_\x00x\x00-\x00o\x00p\x00t\x00-\x00t\x00o\x00-\x00t\x00y\x00p\x00e\x00\x0a\x00\x00\x00\x0aq\x00u\x00e\x00u\x00e\x00\x00\x00\x006H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00T\x00Y\x00P\x00E\x00\x06\x00\x00\x00\x00\x00\x00\x00:H\x00O\x00R\x00N\x00E\x00T\x00Q\x00_\x00P\x00R\x00O\x00T\x00O\x00N\x00_\x00M\x00E\x00S\x00S\x00A\x00G\x00E\x00_\x00F\x00O\x00R\x00M\x00A\x00T\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00Sx\xc1\x01\x00 14:47:52.907 [Thread-4] ERROR [net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000 javax.jms.JMSException: invalid stream header: 00000000 at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?] at net.sumitsu.qpidamqpjmstest.hornetqamqp.TestConsumerListener.onMessage(TestConsumerListener.java:25) [test-classes/:?] at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:876) [qpid-amqp-1-0-client-jms-0.24.jar:?] at java.lang.Thread.run(Thread.java:724) [?:1.7.0_25] Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint receive FINE: RECV[{host}:10005|0] : Transfer{handle=0,deliveryId=7,deliveryTag=3,messageFormat=0} Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput getNextFrame FINE: SEND[{host}:10005|0] : Disposition{role=receiver,first=7,last=7,settled=true,state=Accepted{}} Oct 16, 2013 2:47:52 PM org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler processBytes FINE: SEND[{host}:10005] : \x00\x00\x00\x18\x02\x00\x00\x00\x00S\x15\xc0\x0b\x05AR\x07R\x07A\x00S$E
If anyone has observed this problem before, please let me know. My test case code is available here:
https://github.com/sumitsu/qpidamqpjmstest
Running TestQueuePublisherSimplified publishes a message to queue jms.queue.TestQueue20131007 every 10 seconds, alternating between TextMessage and ObjectMessage. Meanwhile, running TestQueueConsumerSimplified consumes messages from jms.queue.TestQueue20131007.