In the previous chapter, you learned about the ZooKeeper client API and programming using the APIs. You saw how to write a client to connect to the ZooKeeper server instance and execute methods to carry out operations in the ZooKeeper namespace. You also learned how to implement watchers in a client so as to register for specific events and get notifications when such events occur, all in real time.
The simple yet powerful and elegant programming model of ZooKeeper enables us to implement high-level primitives for distributed systems. For example, we used the concept of ephemeral znodes to build an emulation of a cluster monitor in the previous chapter.
In this chapter, you will learn:
The high-level constructs outlined in this chapter are also known as ZooKeeper recipes. These are implemented on the client side using ZooKeeper's programming model and require no special support from the server side. Again, in the absence of ZooKeeper and its APIs, the implementation of these recipes would have been quite complex and difficult.
Some of the third-party and community-developed ZooKeeper client bindings also provide these high-level distributed systems' constructs as a part of their client library. For example, Netflix Curator, a feature-rich Java client framework for ZooKeeper, provides many of the recipes mentioned in this chapter. You will learn more about Curator later in this book. Kazoo, the Python client library that you learned in the previous chapter, also implements some of these recipes that developers can directly use in their client applications.
The ZooKeeper distribution is shipped with recipes for leader election and distributed lock and queue; these can be used inside distributed applications. The Java implementations for the three recipes can be found in the recipes
folder of the distribution.
In this section, you will learn to develop high-level distributed system constructs and data structures using ZooKeeper. As mentioned earlier, most of these constructs and functions are of utmost importance in building scalable distributed architectures, but they are fairly complicated to implement from scratch. Developers can often get bogged down while implementing these and integrating them with their application logic. In this section, you will learn how to develop algorithms to build some of these high-level functions using ZooKeeper's data model and primitives and see how ZooKeeper makes it simple, scalable, and error free, with much less code.
Barrier is a type of synchronization method used in distributed systems to block the processing of a set of nodes until a condition is satisfied. It defines a point where all nodes must stop their processing and cannot proceed until all the other nodes reach this barrier.
The algorithm to implement a barrier using ZooKeeper is as follows:
/zk_barrier
.exists()
function on /zk_barrier
by registering for watch events on the barrier znode (the watch event is set to true
).exists()
method returns false
, the barrier no longer exists, and the client proceeds with its computation.exists()
method returns true
, the clients just waits for watch events./zk_barrier
.exists()
function on /zk_barrier
again.true
, and the clients can proceed further.In this way, we can implement a barrier using ZooKeeper without much of an effort.
The example cited so far is for a simple barrier to stop a group of distributed processes from waiting on some condition and then proceed together when the condition is met. There is another type of barrier that aids in synchronizing the beginning and end of a computation; this is known as a double barrier. The logic of a double barrier states that a computation is started when the required number of processes join the barrier. The processes leave after completing the computation, and when the number of processes participating in the barrier become zero, the computation is stated to end.
The algorithm for a double barrier is implemented by having a barrier
znode that serves the purpose of being a parent for individual process znodes participating in the computation. Its algorithm is outlined as follows:
Phase 1: Joining the barrier
znode can be done as follows:
barrier
znode is represented by znode/barrier
. Every client process registers with the barrier
znode by creating an ephemeral znode with /barrier
as the parent. In real scenarios, clients might register using their hostnames.ready
under the /barrier
znode and waits for the node to appear./barrier
:M = getChildren(/barrier, watch=false)
ready
znode under /barrier
.watch
event, and each client starts the computation that they were waiting so far to do.Phase 2: Leaving the barrier
can be done as follows:
/barrier
(in step 2 of Phase 1: Joining the barrier)./barrier
:M = getChildren(/barrier, watch=True)
If M is not equal to 0, this client waits for notifications (observe that we have set the watch event to True
in the preceding call).
If M is equal to 0, then the client exits the barrier
znode.
The preceding procedure suffers from a potential herd effect where all client processes wake up to check the number of children left in the barrier when a notification is triggered. To get away with this, we can use a sequential ephemeral znode to be created in step 2 of Phase 1: Joining the barrier. Every client process watches its next lowest sequential ephemeral znode to go away as an exit criterion. This way, only a single event is generated for any client completing the computation, and hence, not all clients need to wake up together to check on its exit condition. For a large number of client processes participating in a barrier, the herd effect can negatively impact the scalability of the ZooKeeper service, and developers should be aware of such scenarios.
A Java language implementation of a double barrier can be found in the ZooKeeper documentation at http://zookeeper.apache.org/doc/r3.4.6/zookeeperTutorial.html.
A distributed queue is a very common data structure used in distributed systems. A special implementation of a queue, called a producer-consumer queue, is where a collection of processes called producers generate or create new items and put them in the queue, while consumer processes remove the items from the queue and process them. The addition and removal of items in the queue follow a strict ordering of FIFO.
A producer-consumer queue can be implemented using ZooKeeper. A znode will be designated to hold a queue instance, say queue-znode. All queue items are stored as znodes under this znode. Producers add an item to the queue by creating a znode under the queue-znode, and consumers retrieve the items by getting and then deleting a child from the queue-znode.
The FIFO order of the items is maintained using sequential property of znode provided by ZooKeeper. When a producer process creates a znode for a queue item, it sets the sequential flag. This lets ZooKeeper append the znode name with a monotonically increasing sequence number as the suffix. ZooKeeper guarantees that the sequence numbers are applied in order and are not reused. The consumer process processes the items in the correct order by looking at the sequence number of the znode.
The pseudocode for the algorithm to implement a producer-consumer queue using ZooKeeper is shown here:
/_QUEUE_
represent the top-level znode for our queue implementation, which is also called the queue-node.create()
method with the znode name as "queue-"
and set the sequence and ephemeral flags if the create()
method call is set true
:create( "queue-", SEQUENCE_EPHEMERAL)
The sequence flag lets the new znode get a name like queue-
N, where N is a monotonically increasing number.
getChildren()
method call on the queue-node with a watch
event set to true
:M = getChildren(/_QUEUE_, true)
It sorts the children list M
, takes out the lowest numbered child znode from the list, starts processing on it by taking out the data from the znode, and then deletes it.
get_children()
method call.get_children()
returns an empty list; this means that no more znodes or items are left under /_QUEUE_
.It's quite possible that in step 3, the deletion of a znode by a client will fail because some other client has gained access to the znode while this client was retrieving the item. In such scenarios, the client should retry the delete
call.
Using this algorithm for implementation of a generic queue, we can also build a priority queue out of it, where each item can have a priority tagged to it. The algorithm and implementation is left as an exercise to the readers.
C and Java implementations of the distributed queue recipe are shipped along with the ZooKeeper distribution under the recipes
folder. Developers can use this recipe to implement distributed lock in their applications.
Kazoo, the Python client library for ZooKeeper, has distributed queue implementations inside the kazoo.recipe.queue
module. This queue implementation has priority assignment to the queue items support as well as the queue locking support that is built into it.
A lock in a distributed system is an important primitive that provides the applications with a means to synchronize their access to shared resources. Distributed locks need to be globally synchronous to ensure that no two clients can hold the same lock at any instance of time.
Typical scenarios where locks are inevitable are when the system as a whole needs to ensure that only one node of the cluster is allowed to carry out an operation at a given time, such as:
ZooKeeper can be used to implement mutually exclusive locks for processes that run on different servers across different networks and even geographically apart.
To build a distributed lock with ZooKeeper, a persistent znode is designated to be the main lock-znode. Client processes that want to acquire the lock will create an ephemeral znode with a sequential flag set under the lock-znode. The crux of the algorithm is that the lock is owned by the client process whose child znode has the lowest sequence number. ZooKeeper guarantees the order of the sequence number, as sequence znodes are numbered in a monotonically increasing order. Suppose there are three znodes under the lock-znode: l1, l2, and l3. The client process that created l1 will be the owner of the lock. If the client wants to release the lock, it simply deletes l1, and then the owner of l2 will be the lock owner, and so on.
The pseudocode for the algorithm to implement a distributed lock service with ZooKeeper is shown here:
Let the parent lock node be represented by a persistent znode, /_locknode_
, in the Zookeeper tree.
Phase 1: Acquire a lock with the following steps:
create("/_locknode_/lock-",CreateMode=EPHEMERAL_SEQUENTIAL)
method.getChildren("/_locknode_/lock-", false)
method on the lock node. Here, the watch flag is set to false
, as otherwise it can lead to a herd effect.exists("/_locknode_/<znode path with next lowest sequence number>, True)
method.exists()
method returns false
, go to step 2.exists()
method returns true
, wait for notifications for the watch event set in step 4.Phase 2: Release a lock as follows:
While it's not recommended that you use a distributed system with a large number of clients due to the herd effect, if the other clients also need to know about the change of lock ownership, they could set a watch on the /_locknode_
lock node for events of the NodeChildrenChanged
type and can determine the current owner.
If there was a partial failure in the creation of znode due to connection loss, it's possible that the client won't be able to correctly determine whether it successfully created the child znode. To resolve such a situation, the client can store its session ID in the znode data field or even as a part of the znode name itself. As a client retains the same session ID after a reconnect, it can easily determine whether the child znode was created by it by looking at the session ID.
The idea of creating an ephemeral znode prevents a potential dead-lock situation that might arise when a client dies while holding a lock. However, as the property of the ephemeral znode dictates that it gets deleted when the session times out or expires, ZooKeeper will delete the znode created by the dead client, and the algorithm runs as usual. However, if the client hangs for some reason but the ZooKeeper session is still active, then we might get into a deadlock. This can be solved by having a monitor client that triggers an alarm when the lock holding time for a client crosses a predefined time out.
The ZooKeeper distribution is shipped with the C and Java language implementation of a distributed lock in the recipes
folder. The recipe implements the algorithm you have learned so far and takes into account the problems associated with partial failure and herd effect.
The previous recipe of a mutually exclusive lock can be modified to implement a shared lock as well. Readers can find the algorithm and pseudocode for a shared lock using ZooKeeper in the documentation at http://zookeeper.apache.org/doc/r3.4.6/recipes.html#Shared+Locks.
In distributed systems, leader election is the process of designating a single server as the organizer, coordinator, or initiator of some task distributed among several individual servers (nodes). After a leader election algorithm is run, a leader or a coordinator among the set of nodes is selected, and the algorithm must ascertain that all the nodes in the system acknowledge its candidature without any discrepancies for the correct functioning of the system.
A leader in a distributed system is required to act as a centralized controller of tasks that simplifies process synchronization. However, a centralized node is a single point of failure, and during failure, it can lead to an anomaly in the system. Hence, a correct and robust leader election algorithm is required to choose a new coordinator or leader on failure of the existing one.
A leader election algorithm has the following two required properties:
ZooKeeper can be used to implement a leader-election algorithm, and it can use this algorithm as a leader elector service in distributed applications. The algorithm is similar to the one we used to develop a global mutually-exclusive distributed lock.
Client processes nominating themselves as leaders use the SEQUENCE
|EPHEMERAL
flags when creating znodes under a parent znode. ZooKeeper automatically appends a monotonically increasing sequence number as a suffix to the child znode as the sequence flag is set. The process that created the znode with the smallest appended sequence number is elected as the leader. However, the algorithm should also take into account the failure of the leader.
The pseudocode for the algorithm is outlined here.
Let /_election_
be the election znode path that acts as the root for all clients participating in the leader election algorithm.
Clients with proposals for their nomination in the leader election procedure perform the following steps:
/_election_/candidate-sessionID_
path, with both the SEQUENCE
and EPHEMERAL
flags. The sessionID
identifier, as a part of the znode name, helps in recognizing znodes in the case of partial failures due to connection loss. Now, say that ZooKeeper assigns a sequence number N to the znode when the create()
call succeeds.L = getChildren("/_election_", false)
Here, L
represents the list of children of "/_election_"
.
The watch is set to false
to prevent any herd effect.
/_election_/candidate-sessionID_M
, where M is the largest sequence number such that M is less than N, and candidate-sessionID_M
is a znode in L
as follows:exists("/_election_/candidate-sessionID_M", true)
getChildren(("/_election_", false)
method on the election znode.L
be the new list of children of _election_
. The leader is then elected as follows:candidate-sessionID_N
(this client) is the smallest node in L
, then declare itself as the leader./_election_/candidate-sessionID_M
, where M is the largest sequence number such that M is less than N and candidate-sessionID_M
is a znode in L
.Optionally, a persistent znode is also maintained where the client declaring itself as the leader can store its identifier so that other clients can query who the current leader is by reading this znode at any given time. Such a znode also ensures that that the newly elected leader has acknowledged and executed the leader election procedure correctly.
A Java language implementation for the leader election algorithm using ZooKeeper can be found in the recipes
folder of the latest ZooKeeper distribution.
A group membership protocol in a distributed system enables processes to reach a consensus on a group of processes that are currently alive and operational in the system. Membership protocols belong to the core components of a distributed system; they aid in maintaining service availability and consistency for the applications. It allows other processes to know when a process joins the system and leaves the system, thereby allowing the whole cluster to be aware of the current system state.
The implementation of a basic group membership protocol is very simple, and we have already used a variant of it while developing the cluster monitor emulation in Chapter 3, Programming with Apache ZooKeeper. A group membership protocol can be developed using the concept of ephemeral znodes. Any client that joins the cluster creates an ephemeral znode under a predefined path to locate memberships in the ZooKeeper tree and set a watch on the parent path. When another node joins or leaves the cluster, this node gets a notification and becomes aware of the change in the group membership.
The pseudocode for the algorithm to implement this group membership protocol is shown here.
Let a persistent znode, /membership
, represent the root of the group in the ZooKeeper tree. A group membership protocol can then be implemented as follows:
/membership
, thereby being aware of other members in the group. This is done as shown in the following code:L = getChildren("/membership", true)
L
.The preceding algorithm suffers from the herd effect, as events of NodeChildrenChanged
emitted due to the joining or leaving of members will cause all other members to wake up and find the current membership of the system.
The two-phase commit (2PC) protocol is a distributed algorithm that coordinates all the processes that participate in a distributed atomic transaction on whether to commit or abort (roll back) the transaction. 2PC is a specialized type of consensus protocol and is widely used in transaction processing systems. It ensures an atomic behavior that guarantees that either all the transactions or none of them are completed so that the resources under transactional control remain synchronized.
The 2PC protocol consists of two phases, which are as follows:
The 2PC protocol is depicted in the following diagram:
We can develop a 2PC protocol implementation using ZooKeeper.
Let /2PC_Transactions
represent the root node to run the 2PC algorithm in ZooKeeper. The algorithm to do so is as follows.
/2PC_Transactions/TX
. We can use the leader election algorithm to elect the coordinator using ZooKeeper. The coordinator node sets a watch on the transaction node.tx_result
, is created under /2PC_Transactions/TX
by the coordinator to post the result of the protocol, commit, or abort, and any additional outcomes of the transactions./2PC_Transactions
as well as /2PC_Transactions/TX/tx_result
znode paths./2PC_Transactions/TX
path, with their own identifier (say hostnames) and vote for commit or abort by writing to the data field of their specific znodes./2PC_Transactions/TX
equals the number of participants, it checks the votes of all the participants by reading the participants' znodes./2PC_Transactions/TX/tx_result
znode.NodeDataChanged
for /2PC_Transactions/TX/tx_result
.The preceding algorithm might be a little slow, as all messaging happens through the coordinator node, but it takes care of the participant nodes' failure during the execution of the protocol, using ephemeral znodes.
Service discovery is one of the key components of distributed systems and service-oriented architectures where services need to find each other. In the simplest way, service discovery helps clients determine the IP and port for a service that exists on multiple hosts. One common example of this is how a web service can find the right host that serves a caching service in the network. At first glance, it appears that we can use a Domain Name System (DNS) service as a service discovery system. However, a solution with DNS is not viable when the service locations change frequently due to auto or manual scaling, new deployments of services, or when services are failed over or replaced with newer hosts due to host failures.
Important properties of a service discovery system are mentioned here:
Let /services
represent the base path in the ZooKeeper tree for services of the system or platform. Persistent znodes under /services
designate services available to be consumed by clients.
A simple service discovery model with ZooKeeper is illustrated as follows:
/services
. For example, if a server is hosting a web-caching service, it creates an ephemeral znode with its hostname in /services/web_cache
. Again, if some other server hosts a file-serving service, it creates another ephemeral znode with its hostname in /services/file_server
and so on./services/web_cache
.If a new host is added to serve web caching under this path, the client will automatically know the details about the new location. Again, if an existing host goes down, the client gets the event notification and can take the necessary action of connecting to another host.
A service discovery system provides a seamless mechanism to guarantee service continuity in the case of failures and is an indispensable part of building a robust and scalable distributed platform. Apache Curator provides an extension called curator-x-discovery in its ZooKeeper library; this implements a service registration and discovery model. It also provides a service discovery server called curator-x-discovery-server that exposes a RESTful web service to register, remove, and query services for non-Java or legacy applications to use the service discovery functionalities.
18.216.53.143