RAC: Reliable Asynchronous Clustering
Author: Bela Ban, Oct 2012
(new document: https://github.com/infinispan/infinispan/wiki/RAC:-Reliable-Asynchronous-Clustering)
Overview
Recently, code has been added to Infinispan to provide replication of data between
sites (a site being a local cluster). (I use the term 'replication' although technically
speaking I'm referring to 'distribution mode' in Infinispan).
Changes can be replicated asynchronously and synchronously, when a transaction
(TX) is committed or whenever the change happenes. Asynchronous replication is
faster than sync replication, as callers don't have to wait for the acks of the
receivers.
In this document, we're only considering transactional updates, although similar
arguments apply for non-transactional replication.
Async replication simply sends the updates collected in the TX to the remote
site.
Sync replication runs a two-phase commit (2PC): when the TX is committed, a
PREPARE is sent to the participating node, and - when all acks have been received
- a COMMIT is sent, or a ROLLBACK when one or more nodes failed to apply the
PREPARE, or when some nodes crashed or left (or were not reachable) in the
process.
Sync replication across sites involves sending the PREPARE message also to the
site master of the remote site, which in turn applies the PREPARE to its local
cluster (forwarding the PREPARE to all nodes which have keys involved in the TX
and waiting for their acks). When the site master has collected all PREPARE
responses, it sends its ack back to the originator of the TX (in the originating
site).
Problems with sync replication
There are 2 problems with sync replication:
1: Latency
Usually, sites are connected via a high-throughput / high-latency link, e.g
around 50-70 ms. Neglecting the cost of a local 2PC, a 2PC across 2 sites would
cost (assuming 50 ms of latency) ca. 150 - 200 ms: a PREPARE call to the remote
site plus the response (100 ms), then the same for the COMMIT or ROLLBACK (100
ms) == 200 ms. If the second phase is async, then we'd only incur 150 ms.
2: More nodes == higher probability of failures
The more nodes we have participating, the higher the likelihood is that something
goes wrong. Remember, a single node (either local or in the remote site) failing
to apply the PREPARE means that the TX will be rolled back.
For example, if we have N participants in a TX, and the probability of 1
participant failing to apply a PREPARE is 2%, then the overall probability of the
TX failing is (N * 2)%. If we have 5 participants, the failure probability is 10%,
for 10 participants it is 20% and so on.
Compounding this is that the communication between the local and remote site
includes multiple components, e.g. forwarding to a site master, sending via the
bride, and dispatching to the right target at the remote site. So N should
probably not just include the participants, but also these components.
Goals
It would be ideal if we could have the hard reliability of sync replication,
but without the downsides listed above. More specifically, RAC should have
the following properties:
- High reliability: no updates are ever lost unless the entire primary site crashes.
In this case, updates applied in the [lag] time period would be lost.
Note that communication lost between primary and backup site(s) will not lead
to data loss, but may trigger a resync of the backup site when connectivity is restored
- Correct ordering of updates: all updates should be applied at the backup site
in the order in which they were sent
- Speed: it should take no more than a few milliseconds to commit a *sync* transaction
- Configurable lag: the backup site(s) should be no more than N milliseconds
(configurable) behind the primary site
In other words, we want the speed of async replication combined with the
reliability of sync replication.
Approach
This is a high-level description only; see the implementation section below for
details.
RAC targets sync replication but avoids sending the PREPARE message to the remote
site(s) as part of the 2PC. This avoids problem #1 and reduces the number of
participants, which mitigates problem #2.
Instead, RAC takes part in the 2PC but only stores the modified keys in 'stable
storage'. An update process will then periodically grab the keys, fetch the
associated values and send the updates to the remote site(s). This is all done in
the background, so that sync TXs complete quickly.
As stable storage, we could have picked the disk, but the problem is that if a
machine crashes, that disk will not be accessible anymore. A shared file system
wouldn't help either, as this means we would incur the cost of RPCs (e.g. with
NFS) to sync to disk.
So we chose memory as stable storage. A number of nodes of a site (say 3 out of
10) act as 'updaters'. An updater is chosen based on its rank in the JGroups
view. E.g. if we have {A,B,C,D,E,F,G}, then the first 3 could act as updaters: A,
B and C. When a new view is installed, every member knows whether it is an
updater or not, based on the view and its rank. Because everyone gets the same
sequence of views, the decision whether or not a node is an updater is purely
local and doesn't require an election process (involving messages being sent
around).
When a TX commits, as part of the PREPARE phase, the node which is committing the
TX broadcasts an UPDATE message, including only the updated keys (not the
values). Every key needs to be listed only once, ie. if a TX modified K1 20
times, then K2 and K3, the list will be [K1,K2,K3]. Note that we don't need to
ship the type of operation (PUT, REMOVE, REPLACE) with the UPDATE message, as
the updaters will fetch the associated value anyway later.
Everybody receives the UPDATE message, but only the updaters (A,B,C) store the
keys. The updaters send an ack back to the sender and this completes the TX.
An updater 'owns' keys, ie. it is responsible for replicating a certain range of
keys to the backup site(s). The decision which updater owns which keys is made by
using a consistent hash, mapping keys to updaters. E.g. if we have keys [1..10]
updater A could process [1..3], B [4..7] and C [8..10].
As an alternative, we could also have a set of nodes (N) which store modified
keys in memory and a set up updaters (K, where K <= N) which ship the
modifications to the backup sites. If K == 1, then we'd be assured that all
modifications arrive at the backup sites in total order. If K > 1, then we would
still have FIFO order (per key).
Coming back to 'stable storage', as long as no more than N-1 updaters crash at
exactly the same time, the keys to be processed (replicated) are not lost. This
provides somewhat lesser guarantees than a disk, but should be good enough in
most cases, as N is configurable and if only a few updaters crash, new ones are
going to be picked immediately and the keys will be rebalanced to make sure
they're not lost.
OK, so now that we've stored the keys to be replicated in 'stable storage', we
need to periodically send them to the backup site(s). To this end, every updater
has a task which runs when the queue exceeds a certain number of elements, or a
timer expires.
The updater then sends a REPLICATE message to the site master(s), which in turn
apply the updates in their local site(s). This is done using the site's
configuration, ie. asynchronously, synchronously, with or without a TX. When
done, the site master sends back an ack to the sender, and this signals to the
sender the successful completion of the update task.
When a site goes down, or updates fails for an extended period of time, a site
can be marked offline. This means that state transfer would have to kick in when
that site comes up again.
Advantages of RAC
- Because we have N updaters, this provides a certain probability against
node failures (a.k.a. 'stable storage')
- N updaters shoulder all the updates, so the work is distributed
- The queues in the updaters only store a given key once. So if we have 1000
updates to a given key in a TX, the key will be updated only once in the
backup sites and - more importantly - it will only be included once in the
REPLICATE message which saves bandwidth and CPU cycles.
- Cloud-friendly: because of the above point, we can better utilize either low
bandwidth to the cloud, or costs associated with the bandwidth used to ship
updates to a cloud.
- Having to store only keys and not values in the updaters' queues means we
don't need a lot of memory
- Updaters can be stateless (beyond storing the keys), as they fetch the
current value when triggering an update to a remote site
- See above: an updater always fetches the current value
- Every updater is responsible for the same set of keys, so different updates to
the same key end up in the same updater queue, so updates are sent in an
ordered way. Caveat: view changes do affect this, but a good consistent hash to
pick updaters should minimize the risk of unordered updates.
State transfer
This is needed when a site goes online and it needs to grab the state (sync) from
an existing site.
Because RAC is rather simplistic, state transfer is also simple: all we need to
do is to send an UPDATE for all the keys (in-memory and on disk, e.g. in cache
loaders) in the primary site, and the mechanism already in place for replication
will make sure that the updates are sent to the newly started site.
Conflict resolution (concurrent updates to the same data in different sites)
If we have updates to the same keys in different sites, we can use the following mechanism to prevent inconsistencies:
- Every updater acquires (local) write locks (using the lock API) on they keys that are about to be replicated to the remote site
- When done, the keys are released
Example
- Updater A in LON and updater B in SFO, both want to replicate key K. A tries to set K=V1, B tries to set K=V2
- Updater A write-locks K in LON
- Updater B write-locks K in SFO and sends the REPLICATE(K,V2) message to LON
- Updater A sends the REPLICATE(K,V1) message to SFO
- B times out trying to acquire the lock for K in LON, releases the write-lock for K in SFO and sleeps (randomly) before retrying
- A acquires the lock in SFO and sets K=V1 in SFO, then releases the remote lock in SFO and the local lock in LON
- B wakes up and retries. It fetches the value for K and locks it locally in SFO. The value is now V1 (set by A)
- B send the REPLICATE(K,V2) to LON, acquires the lock in LON and sets K=V2 both in LON and SFO
- B then releases the remote and local lock
K is now V2 in LON and SFO. This is essentially a last-updater wins algorithm, but it guarantees that K will be consistent
across all sites.
This algorithm should be off by default as one of the assumptions of xsite replication is that updaters in different sites
(in the active-active case) always update disjoint data sets. If this is not the case, the mechanism could be enabled, e.g. via configuration.
TBD
- Do we broadcast the UPDATE on PREPARE or COMMIT ? On PREPARE would mean we'd
have to send a second 'COMMIT' or 'ROLLBACK' message. On COMMIT means we don't
block the TX at all, but the update might get lost when the node crashes before
the COMMIT
- When an updater crashes, do we perform some sort of state transfer to the new
updaters ? Use case: we have 3 updaters. Now 2 of them crash at the same
time. 2 new ones are picked, but they have no keys. If the only updater with
keys crashes, too, its updates will be lost.
So a rebalancing of the keys would be in order...
- State transfer: only update keys to the newly started site, not to an existing
site !
- Could RAC also be used for regular replication (within a local site only) ?
- Possibly use multiple site masters to mask transient failures of one site master. Requests could be load balanced to any site master,
or we could always pick the first site master and failover if that one is down.
- Problem: we need to change routing as there can only be one entry for each site
References
----------
[1] http://www.jgroups.org/taskdistribution.html
Implementation
--------------
- my-keys
- all-keys
- pending-keys
All 3 queues are FIFO ordered and duplicate-free, ie. a FIFO-ordered set
On reception of UPDATE(keys):
- Check if current member is an updater. If not --> discard message
- If no update is in progress, add keys to all-keys, and add the keys I'm
responsible for to my-keys
- Else: add keys to pending-keys
Periodically (or when queue exceeds a certain size):
- Block insertions into all-keys
- Grab keys from my-keys and replicate them to the remote sites
- If replication was successful: clear my-keys
- Process pending-keys: add all to all-keys, add keys I should process to
my-keys, clear pending-keys
- Unblock insertions into all-keys
On view change V:
- Determine which keys in all-keys needs to be processed by me and add them to
my-keys
Replication to a remote site (keys):
- Create a REPLICATE message containing a hashmap with keys and values
- For each key K:
- Fetch the value for K from the local site (if the value is null, mark the
value as removed (tombstone?) and add it to the hashmap
- Send the REPLICATE message to the site master(s) of the remote site(s), wait
for the ack
- If successful: broadcast a REMOVE(keys) message
On REMOVE(keys):
- (only when updater): remove keys from all-keys (*not* from pending-keys as the
K might have been updated again meanwhile !)
Comments