Version 4

    RAC: Reliable Asynchronous Clustering


    Author: Bela Ban, Oct 2012

    (new document:





    Recently, code has been added to Infinispan to provide replication of data between

    sites (a site being a local cluster). (I use the term 'replication' although technically

    speaking I'm referring to 'distribution mode' in Infinispan).


    Changes can be replicated asynchronously and synchronously, when a transaction

    (TX) is committed or whenever the change happenes. Asynchronous replication is

    faster than sync replication, as callers don't have to wait for the acks of the



    In this document, we're only considering transactional updates, although similar

    arguments apply for non-transactional replication.


    Async replication simply sends the updates collected in the TX to the remote



    Sync replication runs a two-phase commit (2PC): when the TX is committed, a

    PREPARE is sent to the participating node, and - when all acks have been received

    - a COMMIT is sent, or a ROLLBACK when one or more nodes failed to apply the

    PREPARE, or when some nodes crashed or left (or were not reachable) in the



    Sync replication across sites involves sending the PREPARE message also to the

    site master of the remote site, which in turn applies the PREPARE to its local

    cluster (forwarding the PREPARE to all nodes which have keys involved in the TX

    and waiting for their acks). When the site master has collected all PREPARE

    responses, it sends its ack back to the originator of the TX (in the originating




    Problems with sync replication


    There are 2 problems with sync replication:


    1: Latency


    Usually, sites are connected via a high-throughput / high-latency link, e.g

    around 50-70 ms. Neglecting the cost of a local 2PC, a 2PC across 2 sites would

    cost (assuming 50 ms of latency) ca. 150 - 200 ms: a PREPARE call to the remote

    site plus the response (100 ms), then the same for the COMMIT or ROLLBACK (100

    ms) == 200 ms. If the second phase is async, then we'd only incur 150 ms.



    2: More nodes == higher probability of failures

    The more nodes we have participating, the higher the likelihood is that something

    goes wrong. Remember, a single node (either local or in the remote site) failing

    to apply the PREPARE means that the TX will be rolled back.


    For example, if we have N participants in a TX, and the probability of 1

    participant failing to apply a PREPARE is 2%, then the overall probability of the

    TX failing is (N * 2)%. If we have 5 participants, the failure probability is 10%,

    for 10 participants it is 20% and so on.


    Compounding this is that the communication between the local and remote site

    includes multiple components, e.g. forwarding to a site master, sending via the

    bride, and dispatching to the right target at the remote site. So N should

    probably not just include the participants, but also these components.





    It would be ideal if we could have the hard reliability of sync replication,

    but without the downsides listed above. More specifically, RAC should have

    the following properties:


    - High reliability: no updates are ever lost unless the entire primary site crashes.

      In this case, updates applied in the [lag] time period would be lost.

      Note that communication lost between primary and backup site(s) will not lead

      to data loss, but may trigger a resync of the backup site when connectivity is restored

    - Correct ordering of updates: all updates should be applied at the backup site

      in the order in which they were sent

    - Speed: it should take no more than a few milliseconds to commit a *sync* transaction

    - Configurable lag: the backup site(s) should be no more than N milliseconds

      (configurable) behind the primary site


    In other words, we want the speed of async replication combined with the

    reliability of sync replication.





    This is a high-level description only; see the implementation section below for



    RAC targets sync replication but avoids sending the PREPARE message to the remote

    site(s) as part of the 2PC. This avoids problem #1 and reduces the number of

    participants, which mitigates problem #2.


    Instead, RAC takes part in the 2PC but only stores the modified keys in 'stable

    storage'. An update process will then periodically grab the keys, fetch the

    associated values and send the updates to the remote site(s). This is all done in

    the background, so that sync TXs complete quickly.


    As stable storage, we could have picked the disk, but the problem is that if a

    machine crashes, that disk will not be accessible anymore. A shared file system

    wouldn't help either, as this means we would incur the cost of RPCs (e.g. with

    NFS) to sync to disk.


    So we chose memory as stable storage. A number of nodes of a site (say 3 out of

    10) act as 'updaters'. An updater is chosen based on its rank in the JGroups

    view. E.g. if we have {A,B,C,D,E,F,G}, then the first 3 could act as updaters: A,

    B and C. When a new view is installed, every member knows whether it is an

    updater or not, based on the view and its rank. Because everyone gets the same

    sequence of views, the decision whether or not a node is an updater is purely

    local and doesn't require an election process (involving messages being sent



    When a TX commits, as part of the PREPARE phase, the node which is committing the

    TX broadcasts an UPDATE message, including only the updated keys (not the

    values). Every key needs to be listed only once, ie. if a TX modified K1 20

    times, then K2 and K3, the list will be [K1,K2,K3]. Note that we don't need to

    ship the type of operation (PUT, REMOVE, REPLACE) with the UPDATE message, as

    the updaters will fetch the associated value anyway later.


    Everybody receives the UPDATE message, but only the updaters (A,B,C) store the

    keys. The updaters send an ack back to the sender and this completes the TX.


    An updater 'owns' keys, ie. it is responsible for replicating a certain range of

    keys to the backup site(s). The decision which updater owns which keys is made by

    using a consistent hash, mapping keys to updaters. E.g. if we have keys [1..10]

    updater A could process [1..3], B [4..7] and C [8..10].


    As an alternative, we could also have a set of nodes (N) which store modified

    keys in memory and a set up updaters (K, where K <= N) which ship the

    modifications to the backup sites. If K == 1, then we'd be assured that all

    modifications arrive at the backup sites in total order. If K > 1, then we would

    still have FIFO order (per key).


    Coming back to 'stable storage', as long as no more than N-1 updaters crash at

    exactly the same time, the keys to be processed (replicated) are not lost. This

    provides somewhat lesser guarantees than a disk, but should be good enough in

    most cases, as N is configurable and if only a few updaters crash, new ones are

    going to be picked immediately and the keys will be rebalanced to make sure

    they're not lost.


    OK, so now that we've stored the keys to be replicated in 'stable storage', we

    need to periodically send them to the backup site(s). To this end, every updater

    has a task which runs when the queue exceeds a certain number of elements, or a

    timer expires.


    The updater then sends a REPLICATE message to the site master(s), which in turn

    apply the updates in their local site(s). This is done using the site's

    configuration, ie. asynchronously, synchronously, with or without a TX. When

    done, the site master sends back an ack to the sender, and this signals to the

    sender the successful completion of the update task.


    When a site goes down, or updates fails for an extended period of time, a site

    can be marked offline. This means that state transfer would have to kick in when

    that site comes up again.



    Advantages of RAC


    - Because we have N updaters, this provides a certain probability against

      node failures (a.k.a. 'stable storage')

    - N updaters shoulder all the updates, so the work is distributed

    - The queues in the updaters only store a given key once. So if we have 1000

      updates to a given key in a TX, the key will be updated only once in the

      backup sites and - more importantly - it will only be included once in the

      REPLICATE message which saves bandwidth and CPU cycles.

    - Cloud-friendly: because of the above point, we can better utilize either low

      bandwidth to the cloud, or costs associated with the bandwidth used to ship

      updates to a cloud.

    - Having to store only keys and not values in the updaters' queues means we

      don't need a lot of memory

    - Updaters can be stateless (beyond storing the keys), as they fetch the

      current value when triggering an update to a remote site

    - See above: an updater always fetches the current value

    - Every updater is responsible for the same set of keys, so different updates to

      the same key end up in the same updater queue, so updates are sent in an

      ordered way. Caveat: view changes do affect this, but a good consistent hash to

      pick updaters should minimize the risk of unordered updates.



    State transfer


    This is needed when a site goes online and it needs to grab the state (sync) from

    an existing site.


    Because RAC is rather simplistic, state transfer is also simple: all we need to

    do is to send an UPDATE for all the keys (in-memory and on disk, e.g. in cache

    loaders) in the primary site, and the mechanism already in place for replication

    will make sure that the updates are sent to the newly started site.



    Conflict resolution (concurrent updates to the same data in different sites)


    If we have updates to the same keys in different sites, we can use the following mechanism to prevent inconsistencies:

    • Every updater acquires (local) write locks (using the lock API) on they keys that are about to be replicated to the remote site
    • When done, the keys are released


    • Updater A in LON and updater B in SFO, both want to replicate key K. A tries to set K=V1, B tries to set K=V2
    • Updater A write-locks K in LON
    • Updater B write-locks K in SFO and sends the REPLICATE(K,V2) message to LON
    • Updater A sends the REPLICATE(K,V1) message to SFO
    • B times out trying to acquire the lock for K in LON, releases the write-lock for K in SFO and sleeps (randomly) before retrying
    • A acquires the lock in SFO and sets K=V1 in SFO, then releases the remote lock in SFO and the local lock in LON
    • B wakes up and retries. It fetches the value for K and locks it locally in SFO. The value is now V1 (set by A)
    • B send the REPLICATE(K,V2) to LON, acquires the lock in LON and sets K=V2 both in LON and SFO
    • B then releases the remote and local lock


    K is now V2 in LON and SFO. This is essentially a last-updater wins algorithm, but it guarantees that K will be consistent

    across all sites.


    This algorithm should be off by default as one of the assumptions of xsite replication is that updaters in different sites

    (in the active-active case) always update disjoint data sets. If this is not the case, the mechanism could be enabled, e.g. via configuration.






    - Do we broadcast the UPDATE on PREPARE or COMMIT ? On PREPARE would mean we'd

      have to send a second 'COMMIT' or 'ROLLBACK' message. On COMMIT means we don't

      block the TX at all, but the update might get lost when the node crashes before

      the COMMIT


    - When an updater crashes, do we perform some sort of state transfer to the new

      updaters ? Use case: we have 3 updaters. Now 2 of them crash at the same

      time. 2 new ones are picked, but they have no keys. If the only updater with

      keys crashes, too, its updates will be lost.

      So a rebalancing of the keys would be in order...


    - State transfer: only update keys to the newly started site, not to an existing

      site !

    - Could RAC also be used for regular replication (within a local site only) ?


    - Possibly use multiple site masters to mask transient failures of one site master. Requests could be load balanced to any site master,

      or we could always pick the first site master and failover if that one is down.

      - Problem: we need to change routing as there can only be one entry for each site












    - my-keys

    - all-keys

    - pending-keys


    All 3 queues are FIFO ordered and duplicate-free, ie. a FIFO-ordered set


    On reception of UPDATE(keys):

    - Check if current member is an updater. If not --> discard message

    - If no update is in progress, add keys to all-keys, and add the keys I'm

      responsible for to my-keys

    - Else: add keys to pending-keys



    Periodically (or when queue exceeds a certain size):

    - Block insertions into all-keys

    - Grab keys from my-keys and replicate them to the remote sites

    - If replication was successful: clear my-keys

    - Process pending-keys: add all to all-keys, add keys I should process to

      my-keys, clear pending-keys

    - Unblock insertions into all-keys



    On view change V:

    - Determine which keys in all-keys needs to be processed by me and add them to





    Replication to a remote site (keys):

    - Create a REPLICATE message containing a hashmap with keys and values

    - For each key K:

      - Fetch the value for K from the local site (if the value is null, mark the

        value as removed (tombstone?) and add it to the hashmap

    - Send the REPLICATE message to the site master(s) of the remote site(s), wait

      for the ack

    - If successful: broadcast a REMOVE(keys) message



    On REMOVE(keys):

    - (only when updater): remove keys from all-keys (*not* from pending-keys as the

      K might have been updated again meanwhile !)