0 Replies Latest reply on Oct 20, 2008 9:24 AM by Jeff Mesnil

    AMQP integration status

    Jeff Mesnil Master

      AMQP integration in JBoss Messaging 2.0

      Now that I've enough code to handle the most basic AMQP examples, here are some notes about the integration, the work that's been done and the open issues I must solve.

      SVN branch: http://anonsvn.jboss.org/repos/messaging/branches/amqp_integration/

      Integration design

      The integration of AMQP into JBoss Messaging must be as seamless as possible:
      - implement only the server-side part of AMQP
      - implement AMQP wireformat
      - handle AMQP wireformat and delegates to JBoss Messaging Core API

      JBM Core API and AMQ protocol are similar enough that, in most cases, AMQ wireformat should be handled by the Core API as is.

      AMQP Wireformat

      Based on AMQP 0.9 version.
      Codec code was generated by Qpid from the AMQP XML specifications and then refactored to adapt to JBoss Messaging.
      Imported code is in o.j.messaging.amq.framing package.
      The original code imported from Qpid was based on MINA 1.x.
      It was refactored to use JBM MessagingBuffer instead of MINA 1.x IoBuffer and the rest of the API was updated to MINA 2.x.

      For now, AMQP integration is still bound to MINA code. It must be further refactored to be used with any JBM remoting implementation (based on o.j.messaging.remoting.spi)

      State of the integration

      The code in the branch allows to publish and consume a message from a AMQ client (using Qpid Ruby tests at the moment, see http://anonsvn.jboss.org/repos/messaging/branches/amqp_integration/README_AMQP.txt to manually run the example.
      Some of the AMQ methods are not implemented (Exchange and Queue methods are no-op) and rely on preexisting resources already configured (e.g. the queue used to publish/consume)

      o.j.messaging.core.remoting.impl.amqp package

      Based on remoting.impl.mina package. The modifications are:
      - use of AMQProtocolCodecFilter as the codec
      - use of AMQPMinaHandler as the handler

      on reception of a MINA "message" (which is a AMQDataBlock as decoded by the codec), call o.j.m.core.remoting.spi.BufferHandler.dataBlockReceived(AMQDataBlock).

      The code to handle AMQ message is similar to code handling JBM packets.


      This implementation of BufferHandler handles the AMQ data block in dataBlockReceived(AMQDataBlock).
      It checks for the special case of AMQP protocol initiation.
      If the data block is a regular AMQ frame, it retrieves the Channel in charge of handling the frame and calls o.j.m.core.remoting.ChannelHandler.handleFrame(AMQFrame).

      ChannelHandler implementations

      There are several implementation of ChannelHandler in JBoss Messaging corresponding to different "types" of messages to be handled. The 2 main are:
      - MessagingServerPacketHandler which handles connection messages
      - ServerSessionPacketHandler which handles session messages

      These 2 handlers takes care of AMQ methods:
      - MessagingServerPacket deals with AMQP connection methods
      - ServerSessionPacketHandlers deals with AMQP channel and basic methods for a given session

      AMQP Channels Vs JBM Channels

      AMQP Channel are defined as: "A biÂÂdirectional stream of communications   between two AMQP peers. Channels are multiplexed so that a single network connection can carry multiple channels"

      AMQP uses the channel 0 for frames which are global to the connection, trace and heartbeat frames
      The channel ID is determined by the server when it signals to the client that the channel is ready to use (channel.open-ok method).

      JBoss Messaging Channels are similar in their intent. They also provide a dialog between a client and a server on a single network connection.
      JBoss Messaging assigns its channel number differently:
      - channel 0 is for ping/pong (similar to AMQP heartbeat)
      - channel 1 is for packets global to the connection (similar to AMQP channel 0)
      - channels 2 to 9 are reserved for the system

      Message conversion

      A JBoss Messaging message is sent on the wire as a single packet (either in a SessionSendMessage or a SessionReceiveMessage).
      An AMQP message is sent as several frames on the wire:
      - when publishing: 1 basic.publish frame followed 1 content-header frame and 1 or many content-body frames
      - when delivering: 1 basic.deliver frame followed 1 content-header frame and 1 or many content-body frames

      Publishing a message

      JBoss Messaging creates a new instance of a "AMQMessage" when a basic.publish frame is handled by a session.
      This instance is then filled with the content-header and content-body(ies).
      When the last content-body frame is received, the AMQMessage is converted to a JBoss Messaging Core ServerMessage and session.sendMessage(ServerMessage) is called

      Delivering a message

      In ServerConsumerImpl.handle(MessageReference), the ServerMessage is converted to a list of AMQ frames (1 basic.deliver, 1 content-header, 1 or many content-body) which are sent to the client using a Channel.

      [FIXME] in the prototype, the code to deliver JBoss Messaging messages have been commented and replaced by code to deliver AMQ messages.
      For the integration, the code to send "something" on the channel should be determined based on the wireformat type instead (JBM Core or AMQP). This can be determined when creating the ServerConsumer instance so that it does not induce any performance penalty.