12.3 Ordering of Events and Vector Clocks

Assuming a network of servers that communicate by sending messages, these messages usually have to be be processed in an certain order because there are causal dependencies between the messages. For example, when a server receives a message from a second server, the message may influence internal computations of the first server and may also influence messages that the first server sends to other servers.

In a distributed system, message delivery is usually not synchronized and messages might be delayed without a time bound. When a server receives several messages, he cannot be sure that the order in which the messages arrive is the same order in which they were produced by the other servers. To make things worse, in a distributed system, it is difficult to establish a common notion of time in general: the system clocks of every server might differ; that is why attaching the current local system time (of the sending server) as a timestamp to a message is of no use for ordering messages chronologically in a global sense.

However it is important to observe that no exact time is needed when ordering messages in a network: instead of relying on physical clocks a notion of a logical clock suffices. A logical clock basically consists of a counter that count events. Events can be sending or receiving messages; moreover, there may be internal events inside a server where no communication with other servers is needed but the internal events increase the local counter of a server, too. Each increment of the counter represents a single logical clock tick from the perspective of a server. Hence, we may not only talk about messages exchanged between servers but, more generally, about events in processes that are run by a set of servers. When one event in a process happened before another event, the first event may have caused the second. That is why we want to process events according to a causality order.

12.3.1 Scalar Clocks

Logical clocks as described by Lamport [Lam78] (and hence often called Lamport clocks) provide a partial order of all messages exchanged in a distributed system. Messages are sent and received by servers in a network; or more precisely, by a set of client processes which run on the servers in the network.

A Lamport clock orders events (like sending and receiving messages) based on a relation called happened-before relation; this relation is used as a formal notion for the fact that we are not interested in the exact physical time but only in the ordering of events. In general, there may happen many internal events inside each client process, but for the global perspective of the system only the communication events between processes are relevant. If in the global perspective of the distributed system one event e1 happened before another event e2, it is denoted as e1e2. The happened-before relation is induced by (1) the total ordering of events happening at a single server in a single process and (2) the fact that the send event of a message must have happened before the receive event of that message. Moreover (3), if one event happened before a second event, and the second event happened before a third event, then the first event must also have happened before the third event; hence the happened-before relation is transitive. More formally these three properties can be written as follows.

image Happened-before relation:

1.if e1 is an event inside a process and e2 is an event happening in the same process after e1, then e1 also happened before e2 in the global view of the system: e1e2.

2.if e1 is the event of sending a message m and e2 is the event of receiving m, then e1 happened before e2: e1e2.

3.if it is the case that e1e2 and e2e3 then also e1e3.

To implement Lamport clocks, each client process has its own counter (that has a scalar value) to denote its local (and logical) time. The initial value of each local counter is 0. Whenever a process sends a message it attaches its local counter to the message (incremented by one); in other words, a message “piggybacks” a timestamp. When a process receives a message, it also has to increment its local counter: the important point is that other processes in the system might have processed more messages and some counters might hence be ahead of others. This difference is corrected when receiving a message with a timestamp higher than the local clock: the receiving process takes the maximum of its local clock and the timestamp to be its new local clock; then this new local clock is incremented by one to account for the message retrieval event. Hence more formally, we define Ci to be the counter for process i and let m1, m2, … and so on be the messages that are exchanged between the processes. Then the counters have the following properties:

1.Initialization: Initially, all counters are 0: Ci = 0 for all i

2.Send event: Before process i sends a message mj, it increments its local counter and attaches the counter as a logical timestamp to the message:

(a)Ci = Ci + 1

(b)send (mj, Ci) to receiving process(es)

3.Receive event: Whenever process i receives a message mj, it reads the attached timestamp t and in case the timestamp is greater than its local counter it advances the counter to match the timestamp; before processing the message, the counter is incremented:

(a)receive (mj, t)

(b)Ci = max(Ci, t)

(c)Ci = Ci + 1

(d)process mj

The example in Figure 12.13 shows how message exchange advances the local counters. Before sending message m1, Process a increments its counter Ca by 1 and attaches it as the timestamp t1 to the message. Next, Process b receives the message and notices that its counter is behind the timestamp and it has to adjust it: max(Cb, t1) = max(0, 1) = 1; before processing the message, Cb is incremented by 1 yielding a value of 2. After some time, Process b prepares another message m2, increments its counter Cb to be 3, and sends the message m2 together with the value 3 as its timestamp t2. Now, Process a has to advance its clock (which is still 1) by taking the maximum: max(Ca, t2) = max(1, 3) = 3 and immediately increments Ca to be 4 before processing the message.

Figure 12.14 shows a more advanced situation with three processes. In particular, we see here the case that with Lamport clocks it may happen that two events in different processes have the same clock value: In Process a the scalar clock 5 denotes the event of receiving message m3 while in Process b the scalar clock 5 denotes the event of sending message m4.

image

Fig. 12.13. Lamport clock with two processes

image

Fig. 12.14. Lamport clock with three processes

Moreover, scalar clocks do not provide the notion of a globally total order and hence for some (concurrent) events their processing in Process b may occur in arbitrary order. Consider for example Figure 12.14: here we see that messages m1 and m2 are being sent at the same global time 1; that is also why the timestamps for the two messages are identical.

image A globally total order over all processes in the distributed servers may be necessary to schedule messages with an identical timestamp in an unambiguous way.

For example, with a total order in Figure 12.14, the two messages with identical timestamp m1 (sent by Process a) and m2 (sent by Process b) may only be processed in a certain order. A simple way to establish such a total order of events is breaking ties by using process identifiers: the process IDs can be totally ordered (on each server) and they may contain the name of the server they are running on to obtain a globally unique identifier. We can let the clock of a process with a lower ID always take precedence over a process with a higher ID. In our example, we can assume that the process IDs are ordered as a < b < c, and append the process ID pid to the timestamp t such that messages piggyback the combination t.pid. We can then compare two timestamps t1.pid1 and t2.pid2 as follows.

image Comparison based on process IDs: For timestamps t1 and t2 and process IDs pid1 and pid2 as well as a predefined ordering on process IDs, it holds that t1.pid1 < t2.pid2 whenever t1 < t2 or in the case that t1 = t2 whenever pid1 < pid2.

In Figure 12.15 we see that message m1 hence has to be processed before message m2 at Server b. This ordering by process IDs is however somewhat arbitrary and may not capture any semantic order of messages that may be necessary for a correct processing of the messages.

image

Fig. 12.15. Lamport clock totally ordered by process identifiers

12.3.2 Concurrency and Clock Properties

As already mentioned, the happened-before relation provides a partial ordering: that is, some events might happen in parallel and we don’t care what their actual order is. For these events we can neither say that one happened before the other nor the other way round; they are called concurrent events. More formally, for two events e1 and e2, when we know that e1e2 and e2e1, then we say that e1 and e2 are concurrent and write e1||e2.

image Concurrency of Events: For two events e1 and e2 in a distributed system, the happened-before relation →, Let e1 and e2 be processes in a distributed system, the two events are concurrent (written as e1||e2) if and only if e1e2 and e2e1.

Another important notion when talking about events in a distributed system is causality: does one event influence another event – and in particular, can the first event potentially be the cause of the other event? Ideally, we want to have a global clock with the property that whenever the global clock value of one event is less than the global clock value of a second event – that is, C(e1) < C(e2) – then we can be sure that e1 may at least potentially influence another event e2. However, Lamport clocks do not have this property of representing causality: the clock of one event may be less than the clock of another, yet the former cannot (not even potentially) influence the latter. To illustrate this case, Figure 12.16 shows a message exchange between four processes which however happen totally independently. In particular, the receipt of message m1 happens when the global clock is 2, and the sending of message m4 happens when the global clock is 3; however the receipt of message m1 cannot have caused the sending of message m4 in any way because there is no communication at all between Server b and Server c.

image

Fig. 12.16. Lamport clock with independent events

Hence, all we can say about Lamport clocks is that they satisfy the following weak clock property: if one event is in a happened-before relation another one, then the global Lamport clock of the first event is less than the one of the second.

image Weak Clock Property: For two events e1 and e2 in a distributed system, the happened-before relation →, and a global clock C: if e1e2 then C(e1) < C(e2).

What we can derive from this property (by using the contrapositive) is that when the Lamport clock of one event is not less than the one of the other, then the former event cannot be in a happened-before relation to the latter: if C(e1) ≮ C(e2) then e1e2.

A more helpful property is the opposite direction: whenever the global clock of the first event is less than the one of the second, then we can be sure that the first event is in a happened-before relation to the second. And from this fact we can draw the conclusion that the first event may have had an influence on – or may have caused – the second event. The strong clock property says that both directions are fulfilled.

image Strong Clock Property: For two events e1 and e2 in a distributed system, the happened-before relation →, and a global clock C: e1e2 if and only if C(e1) < C(e2).

The strong clock property is not satisfied for Lamport clocks.

12.3.3 Vector Clocks

The strong clock property is satisfied for vector clocks: instead of a single counter for the whole system, a vector clock is a vector of counters with one counter for each client process. In the distributed database setting, a client process handles read and write requests coming from a database user. With vector clocks it is crucial to have a separate counter for each client processes (even for those running on the same server) as otherwise servers could not accept concurrent write requests from multiple users.

The problem with a single server counter (instead of individual process counters) is that write attempts by different processes on the same server would simply increase the server counter and there is no way to tell apart different processes; this can lead to lost updates.

image Vector clocks provide a partial order of events.

The partial order of vector clocks expresses concurrency and causality better than Lamport clocks because it exposes which client process has seen which message. Messages piggyback the current vector clock of the sending process. Differences in the vector clocks are consolidated when receiving a message by taking the maximum for each vector element. Before sending and after receiving a message only the vector element of the sending or the receiving process is stepped forward. More formally, vector clocks are maintained with the following steps:

1.Initialization: For n client processes, a vector clock is a vector (or an array) of n elements. Each of the n processes maintains one local vector clock. Initially, for each process all elements are 0: for the vector clock VCi of process i, VCi[j] = 0 for all j (where i, j 2 {1, , n})

2.Send event: Before process i sends a message mk, it increments only the i-th element of the local vector clock and attaches the entire vector as a logical timestamp to the message:

(a)VCi[i] = VCi[i] + 1

(b)send (mk, VCi) to receiving process(es)

3.Receive event: Whenever process i receives a message mk, it reads the attached timestamp vector t; it iterates over all elements and in case the timestamp element t[j] is greater than its local vector clock element VCi[j] it advances the clock element to match the timestamp element; before processing the message, the i-th vector element is incremented:

(a)receive (mk, t)

(b)for j = 1 to n: VCi[j] = max(VCi[j], t[j])

(c)VCi[i] = VCi[i] + 1

(d)process mk

In Figure 12.17 we see how Process b steps its own vector element forward when sending and receiving messages, and how the other two processes advance their clocks to match the timestamp when receiving a message.

In order to define causality and concurrency of events we must be able to compare vector clocks – that is, we have to specify a partial ordering on the vectors. This is done by a pairwise comparison of the vector elements. More specifically, let VC(e1) be the vector clock for an event e1 and let VC(e2) be the vector clock for an event e2 (potentially in different processes), we then define VC(e1) VC(e2) and VC(e1) < VC(e2) as follows.

image

Fig. 12.17. Vector clock

image Vector Clock Comparison: Let e1 and e2 be processes in a distributed system, VC(e1) be the vector clock for event e1 and VC(e2) be the vector clock for event e2, then

VC(e1) is less than or equal to VC(e2) (written as VC(e1) ≤ VC(e2)) if and only if the vector clock elements of VC(e1) are less than or equal to the elements of VC(e2); that is, for all i 2 {1, , n} VC(e1)[i] ≤ VC(e2)[i].

VC(e1) is less than VC(e2) (written as VC(e1) ≤ VC(e2)) if and only if the vector clock elements of VC(e1) are less than or equal to the ones of VC(e2) and at least one element of VC(e1) is strictly less than the corresponding one of VC(e2); that is, for all i 2 {1, , n} VC(e1)[i] VC(e2)[i] and there is at least one j 2 {1, , n} for which VC(e1)[j] ≠ VC(e2)[j].

Due to the fact that vector clocks enjoy the strong clock property, we can define causality and concurrency by the vector clocks of events: For one particular event e, we can define all events with lower vector clocks as causes of e and all events with higher vector clocks as effects of e. All other events have vector clocks incomparable to the one of e; these are the events concurrent to e. More formally, we define the sets of causes, effects and concurrent events as follows:

For a given event e the set of causes is causes(e) = {e' | VC(e') < VC(e)}

For a given event e the set of effects is effects(e) = {e' | VC(e) < VC(e')}

For a given event e the set of concurrent events is concurrent(e) = {e' | VC(e') ≮ VC(e) and VC(e) ≮ VC(e')}

image

Fig. 12.18. Vector clock with independent events

In the example of Figure 12.17 we can see that the event of receiving message m2 (with vector clock [0,1,1]) is by this definition a cause of the event of sending of message m3 (with vector clock [1,3,1]) and also the event of receiving message m3 (with vector clock [2,3,1]); that is, these three vector clocks are comparable and we have that [0,1,1]<[1,3,1]< [2,3,1]. On the other hand, we cannot compare the timestamp of message m1 (which is [1,0,0]) and the timestamp of message m2 (which is [0,0,1]) and we hence know that the events of sending m1 and m2 are concurrent.

In Figure 12.18 (as opposed to Figure 12.16) with vector clocks we can now easily determine that the communication between Process a and b is entirely independent of the communication between Process c and d: All events in Processes a and b are incomparable to all events in Processes c and d.

12.3.4 Version Vectors

While vector clocks are a mechanism for stepping forward the time in a messagepassing system, version vectors are a mechanism to consolidate and synchronize several replicas of a data record. Usually, one data record is replicated on a small set of servers; that is, the number of replicas of a data record is considerably lower than the large overall number of servers or the number of interacting clients. This will be useful when trying to handle scalability problems of version vectors (see Section 12.3.5)

In order to determine which view of the state of the database contents each user has, for every read request for a data record the answer contains the current version vector for that data record. When subsequently the user writes that same data record, the most recently read version vector is sent with the write request as the so-called context. With this context, the database system can decide how to order or merge the writes. For example, it might happen that a replica receives write requests out of (chronological) order – or the replica might even miss some writes due to message loss or other failures. In this case, version vectors can be used to decide in which order the writes should be applied and which write is the most recent one. Moreover, if the version vectors at two replicas of the same data record differ, they are said to be in conflict. Conflicting versions can for example occur with multi-master replication, where clients can concurrently update the data record at any server that holds a replica; this is the case of conflicting writes. A synchronization process reconciles conflicting replicas. The resulting synchronized version is tagged with a version vector greater than any of the conflicting ones so that the conflict is resolved.

A simple form of synchronization of two conflicting replicas is to take the union of them. A typical application for this union semantics is an online shopping cart: if the shopping cart of some user is replicated on two servers and the two versions differ (maybe due to failures), the final order contains all items in both versions and the user might have to manually remove duplicates before placing the order. If such an automatic synchronization is not possible when the version vectors of the replicas are concurrent, the conflict has to be resolved by a user. That is, the user has to read the conflicting versions, decide how to resolve the conflict and then issue another write to resolve the conflict. The written value gets assigned a version vector that subsumes both conflicting ones.

The process of maintaining version vectors is slightly different from the one for maintaining vector clocks. The aim is to reconcile divergent replicas into one common version – that is, a version with an identical version vector at all replicas. In contrast, the vector clocks of client processes in a message passing system usually differ. We assume that we start with an initial version that is identical at all replicas. Further modifications of replicas are possible by updates (a client issues a write to one replica) and synchronization (two replicas try to agree on a common version). We now describe version vector maintenance with union semantics for the synchronization process. In this setting, each data record consists of a set of values and the synchronization of the data record computes the set union; that is, the result of a merge is again a set of values.

For updates, if the context is equal to or greater than the current version vector, then the current version in the database is overwritten, because the client has previously read the current version or even a more recent version from a different replica. Then the write context is taken to be the new version vector and the vector element of the writing client is advanced. If the context is smaller than the current version vector, it means that the version that was read by the client is outdated and has been overwritten by some other client. However, we do not want to lose this write even if it is based on an outdated version. Hence, the database system merges the current version and the written version and then advances the vector element of the writing client. If the context and the current version vector in the database are in conflict, the client has read from one replica (holding a version written by another client) but writes to another replica (holding a version of a concurrent write of yet another client). If the database does not want to lose any of those writes, it merges the written version and the current version; then, it takes the maximum of the context and current version vector elements, and lastly advances the vector element of the writing client.

If in a multi-master setting clients are allowed to update a data record at any replica, the synchronization step ensures that all replicas have the same version of the data record. The synchronization can then be implemented as an epidemic algorithm as described in Section 10.4. Whenever one replica has a larger version vector than the other replica, the larger version vector is taken as the most recent one and the corresponding data record replaces the other one. In case the two version vectors are in conflict, with the union semantics we replace both version vectors by their element-wise maximum and merge the two data records by taking the union.

1.Initialization: For n client processes, a version vector is a vector (or an array) of n elements. Each replica of a data record maintains one version vector. Initially, for each process all elements are 0: for version vector VVi at replica i, VVi[j] = 0 for all j (where j 2 {1, , n})

2.Update: When a client process j sends a write request to overwrite a set of values vali at replica i, it sends the new set of values valj and the context ctxj (that is, the version vector of the last read). Based on the context, the replica checks whether it has to overwrite its value set or take the union, then computes the maximum over the context and its own version vector and lastly advances the element of the writing client:

(a)if ctxjVVi, then set vali = valj; else set vali = vali ⋃ valj

(b)for k = 1, , n: VVi[k] = max{VVi[k], ctxj[k]}

(c)VVi[j] = VVi[j] + 1.

3.Synchronization: Whenever two replicas i and j have different version vectors (that is, VViVVj) for one data record, the synchronization process reconciles the two versions by either overwriting one value set (if one vector clock supersedes the other) or by taking the union of the two value sets (and merging the version vectors by taking their element-wise maximum). After the synchronization process, the replicas have identical values vali and valj as well as identical version vectors VVi and VVj for the data record:

if VVi > VVj: set valj = vali and VVj = VVi

if VVj > VVi: set vali = valj and VVi = VVj

image

Fig. 12.19. Version vector synchronization with union merge

else set vali = valj = vali ⋃ valj and for k = 1, , n: VVi[k] = VVj[k] = max{VVi[k], VVj[k]}

More advanced forms of merging usually involve a semantic decision that requires interaction of the user or a more intelligent application logic. If such a user interaction is needed, sibling versions for a data record have to be maintained until the user writes a merged version: the database systems stores all concurrent versions – the siblings – together with their attached version vectors. That is, for each data record the replica i maintains a set Di of tuples of values and version vectors: Di = {vali1, VVi1), (vali2, VVi2), …}. As soon as a user writes a merged version that is meant to replace the siblings, the version vector of the merged version is set to be larger than all the version vectors of the siblings; the siblings (and their version vectors) can then be deleted. Likewise, the context of a writing client j is a set C of version vectors containing all the version vectors of those siblings that were returned in the answer to the last read request for the data record: Cj = {ctxj1, ctxj2, …}. More formally, version vector maintenance with sibling semantics works as follows:

1.Initialization: For n client processes, a version vector is a vector (or an array) of n elements. Each replica of a data record maintains a set D of pairs of values and version vectors. Initially, the set contains a single pair (vali1, VVi1) where the version vector element for each process is 0: for version vector VVi1 at replica i, VVi1[j] = 0 for all j (where j 2 {1, , n})

2.Update: When a client process j sends a write request to overwrite some or all values in the data set Di at replica i, it sends the new value valj and the context Cj (that is, the set of version vectors of the last read). The replica checks which siblings in Di are covered by Cj, then computes the maximum over the context vectors as the new version vector for valj, advances the element of the writing client, and adds valj to its data set:

image

Fig. 12.20. Version vector synchronization with siblings

(a)if there is a pair (val, VV) ∊ Di and a ctxjl ∊ Cj such that ctxjlVV, then remove (val, VV) from Di

(b)for k = 1, , n: VVnew[k] = max{ctxjl[k] | ctxjl ∊ Cj}

(c)VVnew[j] = VVnew[j] + 1

(d)add (valj, VVnew) to D

Note that only the siblings with version vectors less than the newly generated VVnew are overwritten; other siblings may not be deleted because

the reconciling client did not want to overwrite them;

the reconciling client did not read these siblings (stale read on an outdated replica);

they have been introduced by an intermediate write by another client.

3.Synchronization: Whenever two replicas i and j have different data sets Di and Dj for one data record, the synchronization process reconciles the two versions by only keeping the values with the highest version vectors. After the synchronization process, the replicas have identical data sets Di = Dj = D' for the data record:

D' = {(val, VV) | (val, VV) ∊ Di ⋃ Dj and there is no (val', VV') such that (val', VV') > (val, VV)}

Figure 12.20 shows a synchronization step that creates two siblings because the version vectors of the two synchronized replicas are concurrent; this concurrency was caused by clients a and b because their writes were concurrently based on the same (initial) version. The siblings are later replaced by a new version written by client c.

More generally, replica versions can be maintained with a fork-and-join semantics: concurrent modifications lead to a fork in a graph of versions and after a merge these concurrent versions can be joined again (see for example [SS05]).

12.3.5 Optimizations of Vector Clocks

In distributed systems with a large amount of processes and with a high communication frequency between the processes, traditional vector clocks do not scale well. Indeed, for large systems, vector clocks raise problems due to the following reasons:

The size of a vector clock (that is, the number of its elements) grows with the number of client processes taking part in the distributed communication: each vector clock contains one element for each client process. Vector clocks hence have as their size the overall number of client processes although some client processes may not actively take part in the communication. It can hence quickly turn out to be a problem in a distributed system to maintain vector clocks for a large number of client processes. What is more, modern distributed systems are dynamic: new processes can join or leave the system at any time. Due to this, the overall number of processes is not known in advance and the vector clocks must be able to grow and shrink to support the dynamics of the system. Vector clocks should hence be implemented as sets of tuples (processID, counter) of the process identifier and the counter value of the process; tuples can be added to or removed from this set whenever processes join or leave the system. Yet, this entails the problem of having system-wide unique process identifiers which themselves must be long enough to ensure this uniqueness. Moreover, the size of the tuple sets cannot be bounded as there is no upper bound on the number of processes in the system. Hence, even with the dynamic tuple-based implementation, vector clocks do not scale well when the number of active processes increases.

For long-running systems with frequent communication, the counter values in the elements of the vector clock are incremented rapidly and hence furthermore increase the size of the vector clock; when the size of counters is limited, this causes an overflow the vector elements. In other words, vector clocks do not scale will with the number of message exchanges.

Message sizes increase when they piggyback large vector clocks. Hence the vector clock alone may quickly exceed any reasonable message size and lead to unacceptable communication overhead.

Distributed systems with vector clocks thus need some kind of vector clock bounding in order to support a large number of client processes over a long period of time. Some of these optimization – while ensuring some size bounds for the vector clocks – either incorrectly introduce causalities (two events are considered causally related although they are not) or they incorrectly introduce concurrencies (two events are considered concurrent although they are not). In contrast, for version vectors the so-called dotted version vectors [PBA+10] promise full correctness of the causality relation while only needing vector clocks of the size of the replication factor. We survey some of these options below. The following options for bounding have been analyzed:

Approximate vector clocks: Instead of using one element for each client process, several client processes can share one vector element. That is, several client process IDs i1, i2, i3 are mapped to the same index i and the vector clock only has as many elements as there are different groups of client processes (where all client IDs of a group are mapped to the same index). This approach leads to the case that although the vector clocks for two events e1 and e2 can be ordered (that is, VC(e1) < VC(e2)), this ordering can no longer differentiate whether indeed e1 happened before e2 or whether they are concurrent. In other words, these vector clocks only approximate the happened-before relation (see [BR02]). While they satisfy the weak clock property, they do not satisfy the strong clock property but only the property that if VC(e1) < VC(e2) then e1e2 or e1||e2. Note that when the vector clock only consists of one element (all process IDs map to the same index), such approximate vector clocks coincide with the basic scalar Lamport clock.

Client IDs versus replica IDs: For version vectors, one way to reduce the size of the vectors is to use replica IDs instead of client IDs. This is based on the observation that synchronization takes place only between replicas and hence version vectors only need one element per replica for each data record; that is, version vectors have the size of the replication factor. However, with a simple counter for each of the replica IDs we run into the following problem of lost updates. Two clients might concurrently write to the same replica based on an identical context; or – more generally – a client might issue a write with a stale context: another client might already have overwritten the read version. With a version vector based on client IDs, the database could simply handle the stale write as concurrent (for example, by creating siblings). However, with version vectors simply based on replica IDs, this concurrency cannot be expressed. The replica has basically only two options to handle this: it could either reject the stale write (in this case this write will be lost) or it could overwrite the existing newer version by stepping forward the replica counter appropriately (in this case the version in the database will be lost). Hence in both cases one of the concurrent versions will be lost – an undesirable behavior for a database system. Note that this setting also leads to a form of semantic ambiguity of version vectors: if the second write was instead issued to a different replica, it would be handled as concurrent to the first one. This difference is illustrated in Figures 12.21 and 12.22: in Figure 12.21 both clients write to the same replica and Client b uses a stale context in its write request which causes the replica to lose one of the updates (of either Client a or Client b); in contrast, in Figure 12.22 Client b writes to a different replica than Client a and hence both writes are correctly handled as concurrent because both replicas independently step forward their version vectors and can later on be synchronized as usual. In conclusion, while using replica IDs instead of client IDs leads to smaller version vectors, the lost update problem seriously restricts the reliability of the system.

image

Fig. 12.21. Version vector with replica IDs and stale context

Dotted Version Vectors: Fortunately, dotted version vectors [PBA+10] come to the rescue: dotted version vectors use replica IDs (instead of client IDs) in conjunction with more sophisticated version counters. More precisely, note that simple counters actually represent an interval of versions: from the initial value 0 up to the current value of the counter. A dotted version vector not only uses such simple counters (that represent an interval of version) for each replica, but in addition it can use a single point of time (the so-called dot) that is independent from the interval. With this mechanism, two concurrent writes on the same replica can be handled as concurrent because the dot will be different for both writes. Hence, dotted version vectors enable the use of replica IDs for version vectors without the problem of lost updates – even if clients have the same context or if one client has a stale context.

Vector clock pruning: Pruning means that some vector elements are discarded; this can be implemented by assigning a timestamp to each element and setting the timestamp to the local system time whenever a process advances the element. Whenever the last update of an element becomes older than a specified time-to-live value, this element is simply removed from the vector. Vector clock pruning can raise situations where two comparable vector clocks become incomparable due to discarded elements. In such cases, manual conflict resolution by a user might become necessary. However, such cases should happen only rarely; for example, when a process that has been idle for a long time rejoins the communication.

image

Fig. 12.22. Version vector with replica IDs and concurrent write

Vector clock resetting: The vector clock elements can be reset to 0 when the counter values exceed some limit; in this way, the size of each vector clock element can be bounded. Resettable vector clocks rely on the notion of communication phases: a new phase is started by sending a reset message that resets all vector clocks in the system. Resettable vector clocks work well when causality of events only has to be checked for events inside the same phase; comparison of events from different phases may lead to incorrect results due to the reuse of vector clocks in the different phases. Design criteria for good resettable vector clocks are low message overhead (for sending reset messages), non-blocking reset messages and fault-tolerance [AKD06, YH97].

Incremental vector clocks: It is often the case that not all vector clock elements change in between two message sending events between two clients. Hence the size of the piggybacked timestamp can be decreased when only the changed vector clock elements are piggybacked (instead of the entire current vector clock). In other words, only the increment since the last communication must be sent. There is an extra overhead involved in this incremental vector clock maintenance: In order to determine which elements have changed, each client process has to keep track of the last timestamp for each client process it has ever communicated with. [SK92] propose such a system that relies on first-in-first out channels between the client processes; that is, message ordering must be ensured, so that no message overtaking can occur as otherwise the causal order cannot be reestablished. [WA09] rely on re-ordering of version vectors to quickly determine the parts of the vectors that have changed.

12.4 Bibliographic Notes

Replication is a long and widely studied area in distributed systems [BG82, KA10]. Gray et al [GHOS96] discuss several replication models for transactions as well as simpler non-transactional replication like commutative updates. Whereas Wiesmann [WPS+00] et al analyze 1) replication model, 2) server interaction and 3) voting as the three parameters of replication. A comprehensive survey of optimistic replication approaches is given by Saito and Shapiro in [SS05]. Several approaches – for example [LKPMJP05, TDW+12, LLC14] – propose a middleware between the database client and the database backend to support data replication. Hinted handoff has been applied for example in Amazon’s Dynamo system [DHJ+07].

Quorum systems also have a long history (see for example Thomas [Tho79] or Gifford [Gif79]). A survey of quorum system is given in [JPPMAK03].

The basic idea of the Paxos algorithm was developed by Lamport [Lam98] and several variations of it have been proposed including fast Paxos [Lam06], Byzantine Paxos [Lam11], and high-throughput Paxos [KA14]. It has since been used in several data management systems [RST11, CGR07].

Multiversion concurrency control (MVCC) has a long history, too. Serializability theory for MVCC was discussed by Bernstein and Goodman in [BG83]. Several variations of MVCC are applied in modern database systems to improve concurrency performance in database systems; see for example [SCB+14].

An influential starting point for the investigation of logical clocks and the definition of the happened-before relation in distributed systems was the seminal article about scalar clock (that is, Lamport clocks) [Lam78]. Several adaptations ensued leading to vector clocks [SK92, TRA96, YH97, AL97, PST+97, BR02, AKD06] and version vectors [PPR+83, PBA+10].

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.69.199