Version 31

    Overview

     

    The JBoss Messaging Core is a framework built in top of JGroups that can be used to create distributed and reliable message transport systems. A JMS provider is an example of such system, and JBoss Messaging is a fully compliant JMS 1.1 provider in process of being built on the foundation laid by the JBoss Messaging Core.

     

    The Core is completely independent of the JMS API, it uses a general messaging idiom. JBoss Messaging adds the programatic convenience of the JMS facade and provides various Quality of Service features that can be expected from an industry-standard JMS provider. Internally, JBoss Messaging relies heavily on the distributed and reliable nature of the Core to provide said Quality of Service features.

     

    JBoss Messaging Core is a serverless (distributed) and reliable message transport mechanism. It is serverless because the system does not require a unique central server (single point of failure). Instead, the Core relies on messaging runtimes (peers) distributed over LAN or WAN. In an extreme case, only one such peer is required to provide the full set of messaging services, but in this case the system loses the high availability and the load balancing capabilities that result from the distributed nature of the Core.

     

    The following picture is a schematic representation of the implementation of a Point-to-Point messaging domain built in top of JBoss Messaging Core.

     

     

    A Publish-Subscribe messaging domain based on JBoss Messaging Core can be represented as follows:

     

     

    These two very well known messaging domanins are mandated by JMS. However, messaging domains with completely different semantics could be implemented using the Core's components, as well.

     

     

    The JBoss Messaging Core essence could be reduced to:

    • Acknowledgment (or handling of ...). As a preview, the interface directly responsible with this aspect is Receiver.

    • Synchronicity/Asynchronicity. The interface that deals with this is Channel.

    • Persistence. The persistence aspect is taken care of by the MessageStore and AcknowledgmentStore interfaces.

     

    Optimally interweaving these elements could lead to the maximization of the perfomance (throughput) over reliability or vice-versa.

     

    One of the main advantages of an asynchronous message delivery system over a RPC-based system is that the sender, the network and the reciever do not have to be working at the same time in order for a message to get delivered. If the network is down, the messaging provider stores the in-flight messages until the connection is restored and the messages can be forwared. The same thing happens if the receiver is down. The above behavior results from applying the store-and-forward pattern: the message is not forwarded to the next messaging component unless it is safely stored locally, and the storage acknowleged. However, if the messages are only stored in memory, they are subject to the VM or system crash.

     

    "Guaranteed Delivery" is a requirement for a JMS provider. The provider must make sure that a message marked PERSISTENT survives a VM or system crash. A typical implementation of this feature is to use use a built-in or external datastore to persist messages. The key of the pattern is that a send operation does not sucessfully complete until the message is safely persisted in the datastore. Subsequently, the message is not deleted from the datastore until an explicit positive acknowledgment is received by the component.

     

    JBoss Messaging Core uses the same pattern to provide reliable delivery. Esentially, this means that at any moment during the in-flow of a "reliable" message through the system, there is one (or more) copies of the message maintained in a reliable store (transactional database, for example). If a peer has reliable storage capabilities, it will maintain a copy of the message until it receives acknowledgments from all the peers the message was forwarded to. A side effect of this behaviour is that it will allow the peer to immediately acknowledge delivery to its senders. If the peer we are talking about does not have reliable storage capabilities, it will do the best effort to forward the message towards its final destinations, but it will not acknowledge delivery to its senders until it receives positive acknowledgments on its own.

     

    The reliability is built into the system, but if a user chooses to circumvent it for performance reasons (storing bytes into a database takes time), it is possible to do so. It only takes to declare a message "NON PERSISTENT" (or "non-reliable" in JBoss Messaging Core speak) and all the components on the message's path through the system will rush the message towards its destination, without reliable storage. If a problem occurs (hardware failure, network failure, VM crash), the sender that faces the problem will try redelivery from memory (unreliable storage), but if it has problems on its own, it is not excluded that the message may be lost.

     

     

    In essence, the Messaging Core is aggregation of components living in the same address space or in different address spaces, whose main goal is to forward messages, always in only one direction, dynamically choosing different types of storage and delivery modes (asynchronous delivery with reliable storage -database-, asynchronous delivery with unreliable storage -memory- and no storage at all, in the case of synchronous delivery) in order to maximize throughput versus reliability or vice-versa.

     

     

    Patterns and Design Elements

     

    The Core was designed using very generic and very simple messaging patterns: Receivers, Channels and Routers.

     

    A Receiver is the basic interface for a component that handles messages and its only concern is to provide a positive or negative, implicit or explicit acknowledgment.

     

    A Channel is an abstraction defining a message handling component that forwards a message from a sender to one or more Receivers. Channels' concern is synchroniciy, or the lack thereof. The Messaging Core is pretty much a Channel world. Almost all components are Channels: local or distributed pipes, local or distributed queues and topics are all Channels.

     

    A Router is another abstraction that defines a message handling component which incapsulates a "routing policy": the decision to which Receivers a message should be delivered. A Router is always synchronous and lives in a single address space.

     

    However, before defining the basic patterns in more details, we should start by defining the representation of the very reason the messaging delivering system exists in the first place: the message.

     

    Routable

     

    A Routable is the lowest common denominator that defines an atomic, self containted unit of data that flows through the messaging system. As we will see later, a Message, which is, by the way, something that we would expect a messaging system to handle, it's a Routable. A Routable must be serializable, because due to the distributed nature of the system, the situation where Routable instances are sent over a network connection are common.

     

       public interface Routable extends Serializable
       {
          public Serializable getMessageID();
          public boolean isReliable();
          public long getExpirationTime();
    
          public void putHeader(String name, Serializable value);
          public Serializable getHeader(String name);
          public Serializable removeHeader(String name);
          public Set getHeaderNames();
       }
    

     

    Each Routable carries a set of headers. Various components can attach or remove headers to/from the Routable. A common usage is message flow management. The Routable does not support the concept of address, though. The core of the messaging system is an assembly of Channels and Receivers that route an in-flight Routable on pre-defined "paths", and hence the lack of need for a formal address.

     

    When a sender submits a Routable for delivery, it has a choice to declare the Routable RELIABLE or UNRELIABLE. An explanation on how a Channel handles these two types of routables follows below.

     

    Messages/MessageReferences

     

    A Message is a Routable that has a payload. The payload is what the sending clients submit to the messaging system and expect to be delivered to the intended receiving clients.

     

       public interface Message extends Routable
       {
          public Serializable getPayload();
       }
    

     

    A MessageReference is a "lightweight representative" of a Message. If the Message itself is stored in a MessageStore, as we will see below, the messaging system may choose to route only the corresponding MessageReference(s) instead.

     

       public interface MessageReference extends Routable
       {
          public Serializable getStorageID();
       }
    

     

    If two or more channels share a MessageStore, then it is more efficient to do all message routing work using MessageReferences instead of Messages.

     

     

    The core is designed to be "eager": it tries to turn messages into references as soon as possible and as often as it can. When a "gap" (two components access different MessageStores) is reached, the message is restored and message is sent instead of the reference.

     

    Receiver

     

    Any component that handles messages is a Receiver. The only concern of a Receiver is to provide a positive or negative, implicit or explicit acknowledgment. An example of explicit positive acknowledgment is returning true upon the handling method call. An example of implicit negative acknowledgment is the same handling method call throwing an unchecked exception. Handling means either consumption of the message or forwarding of the message to another Receiver(s).

     

    The Java interface that defines a Receiver follows:

     

       public interface Receiver 
       {
          public Serializable getReceiverID();
          public boolean handle(Routable routable);
       }
    

     

    Upon receiving a Message and depending on its implementation, a Receiver could consume the message or forward it to one or more subsequent Receivers. The Receiver interface doesn't enforce any message handling behavior; the behavior rather depends on a specific implementation. The Receiver interface makes sure though, that handling the message produces an unequivocal positive or negative acknowledgment.

     

    If handle() invocation returns true it means the Receiver acknowledges the message and from that moment, it is solely responsible with the consumption/delivery of the message. The sender does not need to worry about the message anymore and it is sure that the receiver will do its best effort to carry on the delivery.

     

    If handle() invocation returns false it means the Receiver does not want to or cannot handle the message. It is the sender's responsibility to decide whether to re-attempt delivery or to give up. The Receiver may throw unchecked exceptions, so when invoking handle(), be prepared to deal with them. This automatically implies a negative acknowledgement, but the sender may also decide to remove this Receiver from its list and not attempt delivery to it anymore (that Receiver is considered to be "broken").

     

    Channel

     

    A Channel is the abstraction that defines a message delivery mechanism that forwards a message from a sender to one or more receivers. Since in order to deliver a message, the Channel must first get that message from a sender, the Channel is also a Receiver. Typically, Channels do not consume messages, they only forward them to the Channel's Receivers.

     

       public interface Channel extends Receiver
       {
          public boolean isSynchronous();      
          public boolean setSynchronous(boolean b);
    
          public boolean deliver();
          public boolean hasMessages();
          public Set getUnacknoweldged();
    
          public void setMessageStore(MessageStore ms);
          public MessageStore getMessageStore();
          public void setAcknowledgmentStore(AcknowledgmentStore as);
          public AcknowledgmentStore getAcknowledgmentStore();
       }
    

     

    The Channel's responsibilities are:

    1. To decide the Receiver(s) the message is forwarded to.

    2. To effectively forward the message, synchronously or asynchronously.

     

    A Channel can have zero, one or more Receivers it can potentially forward messages to. They're called output Receivers. If there is more than one output Receiver, the Channel may deliver the message to none, one or more than one of them, according to the logic particular to that specific Channel implementation. A Receiver never explicitely pulls a message from the Channel, it only declares its availability, should a message arrive. It's the cannel that pushes the message to the Receiver.

     

    Synchronous Channels

     

    A Channel that was configured to behave synchronously will always attempt synchronous delivery on the chosen Receivers. If the Channel's handle() method returns with a positive acknowledgment, it is guaranteed that each and every Receiver that was supposed to get the message actually got and acknowledged it.

     

    However, synchronous delivery is not always possible. The Channel may not have any output Receiver, or its Receivers may be unable to accept messages. In this situations, the synchronous Channel has no other choice but negatively acknowledge the message to the sender.

     

    Synchronous Channels and delivery reliability

     

    Since a synchronous Channel acknowledges only after all the entitled Receivers acknowledge, the delivery using a synchronous Channel is always reliable, regardless whether the message was declared RELIABLE or UNRELIABLE.

     

    Asynchronous Channels

     

    A Channel that was configured to behave asynchronously may act as a middle man: if positively acknowledged synchronous delivery is not immediately possible, the Channel may hold the message for a while and attempt re-delivery.  When the asynchronous channel's handle() method returns with a positive acknowledgment, this doesn't necessarily mean that all entitled Receivers got the message and acknowledged for it. It only means that the Channel assumes the responsibility for delivery.

     

    Even in the case of an asynchronous Channel, the Channel implementors are encouraged to attempt immediate synchronous delivery. The Channel should attempt to forward the message on the same thread that performed the delivery and only if that is not possible, the Channel should store the message for further re-delivery attempts. However, this is an implementation detail.

     

    The Channel's inteface offers the possibility to switch the channel from synchronous to asynchronous mode and vice-versa. However, it could happen that immediate switch from asynchronous to synchronous mode may not be possible, if the channel contains unacknowledged messages.

     

    Asynchronous Channels and delivery reliability

     

    The asynchronous Channels must be prepared to deal with the eventuality of failure. They must be able to deterministically handle the situation when they hold messages that were not acknowledged by one or more Receivers, and a hardware or software failure occurs.

     

    When a sender submits a message for delivery, it has a choice to declare the message RELIABLE or UNRELIABLE. If an asynchronous Channel receives an UNRELIABLE message, no special precautions must be taken. If the initial synchronous delivery attempt fails, the Channel will hold the message in memory and attempt redelivery. The message is lost, a Channel failure should occur during this time.

     

    However, if an asynchronous Channel receives a RELIABLE message and the initial synchronous delivery attempt fails, the Channel must NOT acknowledge the message, unless the message is saved in a reliable store.

     

    Redelivery

     

    When the Channel's deliver() method is called, the channel attempts to deliver unacknowledged messages. deliver() returns true if all messages have been delivered and positively acknowledged or false if there are still messages to be delivered after the call completes. As an implementation detail, the asynchronous delivery can be triggered by an event such adding a new receiver, or there could be a background thread, internal to the channel, which tries to deliver from time to time if stored messages exist.

     

    The Relationship between a Channel and its Receivers

     

    An output Receiver never pulls a message from a Channel. It only declares its availability, should a message arrive, by registering with the Channel. The Receiver doesn't even know that is "associated" with a Channel. It's the Channel that pushes the message to its Receivers. For a messaging handling system build with Channels and Receivers, there is always a unidirectional flow of messages from the input to the output of the system.

     

    TODO: How to deal with the situation when a Receiver goes off-line (handle() returns false) while the

    Receiver is still being connected to the Channel, and then goes back on-line (handle() potentially

    returns true). There are two possibilities:

    1. Polling (a Channel background thread)

    2. The Receiver must somehow send a notification to the channel ... but this makes the Receiver aware of the Channel

     

    MessageStore/AcknowledgmentStore

     

    An asynchronous Channel is able to handle RELIABLE messages only if it has (mandatory) access to an AcknowledgmentStore and (optional) access to a MessageStore. A RELIABLE message that was not acknowledged by a Receiver can be acknowledged by the sending asynchronous Channel only if:

    1. The negative acknowledgment (<messageID - receiverID> touple) was stored in the AcknowledgmentStore.

    2. The message itself was stored in the MessageStore.

    This makes the Channel immune to a failure. Should the channel fail, upon restart it will have enough information to restore its state and re-attempt delivery.

     

    An MessageStore is a reliable repository for messages.

     

       public interface MessageStore
       {
          public Serializable getStoreID();
          public MessageReference store(Message m) throws Throwable;
          public Message retrieve(MessageReference r);
       }
    

     

    An AcknowledgmentStore is a reliable repository for negative acknowledgments.

     

       public interface AcknowledgmentStore
       {
          public Serializable getStoreID();
          public void storeNACK(Serializable messageID, Serializable receiverID) throws Throwable;
          public boolean forgetNACK(Serializable messageID, Serializable receiverID) throws Throwable;
       }
    

     

     

     

    Distributor

     

    The Distributor interface provides a programmatic way to attach or detach output Receivers to/from a sender.

     

       public interface Distributor {
          public boolean add(Receiver receiver);
          public Receiver get(Serializable receiverID);
          public Receiver remove(Serializable receiverID);
          public boolean contains(Serializable receiverID);
          public Iterator iterator();
          public void clear();
          public boolean acknowledged(Serializable receiverID);
       }
    

     

    Routers

     

    A Router is a component that synchronously sends a Routable to none, one, several or all of its Receivers. It this respect, a Router behaves pretty much like a Channel, with the only difference that it is never asynchronous, and it is never distributed. It never stores a message, it either routes is successfully to its Receivers or handle() call fails, returning false. All Receivers of a Router live in the same address space or at least they have the input endpoint in the Router's address space, as we will see later.

     

    We introduced the concept of Router especially to be able to easier encapsulate different "routing" algorithms. Routers are a natural candidate for the job of "routing delegate", used by a Channel to decide whom to send a message. A Router is a Receiver and a Distributor.  

     

    Two very intuitive Router implementations are the PointToPointRouter, which synchronously routes a Routable by forwarding it to one and only one Receiver, and PointToMultipointRouter, which synchronously routes a Routable by duplicating it and forwarding it to all its Receivers. In the second case, the router may choose to forward the Routable by value (physically copying it) or by reference, by just passing the same reference to its Receivers. PointToPointRouters are used to assemble Queues, and PointToMultipointRouters are used to assemble Topics, as we will see below.

     

    Same-Address-Space Messaging Primitives

     

    LocalPipe

     

    A LocalPipe is a Channel with only one output. Only one receiver can be connected to a pipe at a time. It always attempts synchronous delivery first, but if it is not possible, the pipe will hold the message. The asynchronous behaviour can be turned off by seting the "synchronous" flag true. If the pipe is "synchronous" and the synchronous delivery fails, the overall delivery fails and the pipe won't hold the message.

     

     

    Local Destinations (LocalQueues/LocalTopics)

     

    A Point-to-Point Channel is a channel that ensures that only one receiver (in case there are more than one receivers) consumes the message. The channel can have multiple receivers that can consume multiple messages concurrently, but only one of them can successfully consume a particular message. The receivers do not have to explicitely coordinate with each other; they become competing consumers and they are coordinated by the channel. If no receiver is connected to the channel at the moment the message is sent, the channel stores the message until at least one receiver connects, and then delivers the message.

     

    A Publish-Subscribe Channel is a channel that broadcasts a message to all interested receivers. The channel takes care that each connected receiver is notified once and only once of a particular event. By default, a Publish-Subscribe Channel doesn't deliver messages to subscribers that are not connected at the moment the channel receives the message for delivery. If a subscriber is interested to receive the messages that othewise it would have missed, it could register as a Durable Subscriber. In this case the channel "keeps" the messages until the subscriber connects and then deliver the messages to it. It is very natural to implement this behavior by using a combination beween a simple Publish-Subscribe channel and output Pipes.

     

    The in-memory implementation of a Point-to-Point channel is the LocalQueue. The in-memory implementation of a Publish-Subscribe channel is the LocalTopic. Both classes have a common internal structure, which allows us to define a common AbstractDestination base class.

     

     

    An AbstractDestination uses an input LocalPipe connected to a Router:

    • A Queue uses an asynchronous input Pipe connected to a PointToPointRouter.

    • A Topic uses a synchronous input Pipe connected to a PointToMultipointRouter.

     

     

     

    A Topic with durable subscriptions can be very simply implementd by extending a regular Topic and inserting Pipes between the PointToMultipointRouter and the Topic's Receivers.

     

    Distributed Messaging Primitives

     

    Pipe

     

    A Pipe is a channel with only one output, that spans two address spaces. It allows sending messages synchronously or asynchronously, in one direction, between two VMs. It is implemented using a pair of distributed pipe endpoints (the Pipe instance itself, which acts like a pipe input and a PipeOutput).

     

    Multiple pipes can share the same PipeOutput instance (and implicitly the pipeID), as long the Pipe instances are different.

     

     

    Replicator

     

    A Replicator is a distributed channel that replicates synchronously or asynchronously a message to multiple receivers living in different address spaces. A replicator can have multiple inputs and multiple outputs. Messages sent by an input are replicated to all current outputs. The Replicator's main reason to exist is to allow to sender to synchronously send a message into different address spaces and be sure that the message was received (and it is acknowledged) when the handle() method returns).

     

     

    In the current implementation, the replication of messages is done efficiently by multicasting, but message acknowledment is handled by the replicator (so far) in a point-to-point and synchronous manner. For that reason, each replicator peer must be able to reach synchronously and efficiently any other peer. In this respect the replicator peers are tightly coupled. This is an implementation detail and it's subject to change. If you want a looser coupling, use a Destination.

     

    When it is configured to be synchronous, the Replicator works pretty much like a distributed PointToMultipointRouter. The other way said, a PointToMultipointRouter is the local version of a Replicator.

     

     

    Distributed Destinations

     

    Distributed Queue

     

    A Queue is a LocalQueue extension and similarly to its ancestor it lives in the address space of only one VM. However, one or more Queue instances (peers) interact and coordinate into creating a distributed queue that spans multiple VM, where senders and receivers can be located on any of the participating VMs.

     

    The Queue is similar in structure to a regular LocalQueue. A Queue has local receivers, the same way a LocalQueue does. However, for each remote peer, the Queue's router maintains a reference to the input endpoint of a distributed Pipe, which will send the received message to its corresponding PipeOutput, which in turns sends the message into the Queue's remote peer router. This way, a message that is forwarded to the distributed Pipe (from the Router's point of view, is not different from a local delivery) goes to the remote Queue peer and gets delivered remotely to one and only one Receiver.

     

    Care must be taken to avoid infinite loops, since each Queue peer maintains references to each of its peers, so a message possibly can go from QP1 to QP2 and back to QP1.

     

    A Queue has two states: started and stopped. Queue.start() transitions the Queue peer from the "stopped" to the "started" state. The underlying JChannel must be connected for the transition to succeed.

     

     

    Replicated Queue

     

    The Queue presented above is distributed in that it has different receivers on different VMs. However, it does not offer in-memory replication of the messages, so it does not offer protection for non-reliable messages agains VM crashes. This Quality of Service is achieved by using a Replicated Queue.

     

    Use Case. A queue should be able to handle the following situation: the queue with no receivers receives non-reliable messages for delivery. The queue holds the messages in its in-memory internal store. The queue crashes. The messages must not be lost (even if they are declared non-reliable), but they must not be physically persisted either.

     

    The solution is a Replicated Queue, which is implemented using Queue peers and Replicators instead of Pipes. More details to follow.

     

    Distributed Topic

     

    A distributed topic is implemented similarily to a distributed queue, uning Pipes as "local representatives" for the remote topic peers.

     

    Replicated Topic

     

    TODO: Work in progress

     

    The UML Class Diagram

     

    The UML Class Diagram

     

     

    -


     

    Referenced by: