Chapter 4. Performing Common Distributed System Tasks

In the previous chapter, you learned about the ZooKeeper client API and programming using the APIs. You saw how to write a client to connect to the ZooKeeper server instance and execute methods to carry out operations in the ZooKeeper namespace. You also learned how to implement watchers in a client so as to register for specific events and get notifications when such events occur, all in real time.

The simple yet powerful and elegant programming model of ZooKeeper enables us to implement high-level primitives for distributed systems. For example, we used the concept of ephemeral znodes to build an emulation of a cluster monitor in the previous chapter.

In this chapter, you will learn:

  • How to carry out common distributed system tasks such as leader election, group memberships, and two-phase commit protocol
  • How to implement a few distributed data structures such as barriers, locks, and queues

The high-level constructs outlined in this chapter are also known as ZooKeeper recipes. These are implemented on the client side using ZooKeeper's programming model and require no special support from the server side. Again, in the absence of ZooKeeper and its APIs, the implementation of these recipes would have been quite complex and difficult.

Some of the third-party and community-developed ZooKeeper client bindings also provide these high-level distributed systems' constructs as a part of their client library. For example, Netflix Curator, a feature-rich Java client framework for ZooKeeper, provides many of the recipes mentioned in this chapter. You will learn more about Curator later in this book. Kazoo, the Python client library that you learned in the previous chapter, also implements some of these recipes that developers can directly use in their client applications.

The ZooKeeper distribution is shipped with recipes for leader election and distributed lock and queue; these can be used inside distributed applications. The Java implementations for the three recipes can be found in the recipes folder of the distribution.

ZooKeeper recipes

In this section, you will learn to develop high-level distributed system constructs and data structures using ZooKeeper. As mentioned earlier, most of these constructs and functions are of utmost importance in building scalable distributed architectures, but they are fairly complicated to implement from scratch. Developers can often get bogged down while implementing these and integrating them with their application logic. In this section, you will learn how to develop algorithms to build some of these high-level functions using ZooKeeper's data model and primitives and see how ZooKeeper makes it simple, scalable, and error free, with much less code.

Barrier

Barrier is a type of synchronization method used in distributed systems to block the processing of a set of nodes until a condition is satisfied. It defines a point where all nodes must stop their processing and cannot proceed until all the other nodes reach this barrier.

The algorithm to implement a barrier using ZooKeeper is as follows:

  1. To start with, a znode is designated to be a barrier znode, say /zk_barrier.
  2. The barrier is said to be active in the system if this barrier znode exists.
  3. Each client calls the ZooKeeper API's exists() function on /zk_barrier by registering for watch events on the barrier znode (the watch event is set to true).
  4. If the exists() method returns false, the barrier no longer exists, and the client proceeds with its computation.
  5. Else, if the exists() method returns true, the clients just waits for watch events.
  6. Whenever the barrier exit condition is met, the client in charge of the barrier will delete /zk_barrier.
  7. The deletion triggers a watch event, and on getting this notification, the client calls the exists() function on /zk_barrier again.
  8. Step 7 returns true, and the clients can proceed further.

    Note

    The barrier exists until the barrier znode ceases to exist!

In this way, we can implement a barrier using ZooKeeper without much of an effort.

The example cited so far is for a simple barrier to stop a group of distributed processes from waiting on some condition and then proceed together when the condition is met. There is another type of barrier that aids in synchronizing the beginning and end of a computation; this is known as a double barrier. The logic of a double barrier states that a computation is started when the required number of processes join the barrier. The processes leave after completing the computation, and when the number of processes participating in the barrier become zero, the computation is stated to end.

The algorithm for a double barrier is implemented by having a barrier znode that serves the purpose of being a parent for individual process znodes participating in the computation. Its algorithm is outlined as follows:

Phase 1: Joining the barrier znode can be done as follows:

  1. Suppose the barrier znode is represented by znode/barrier. Every client process registers with the barrier znode by creating an ephemeral znode with /barrier as the parent. In real scenarios, clients might register using their hostnames.
  2. The client process sets a watch event for the existence of another znode called ready under the /barrier znode and waits for the node to appear.
  3. A number N is predefined in the system; this governs the minimum number of clients to join the barrier before the computation can start.
  4. While joining the barrier, each client process finds the number of child znodes of /barrier:
    M = getChildren(/barrier, watch=false)
  5. If M is less than N, the client waits for the watch event registered in step 3.
  6. Else, if M is equal to N, then the client process creates the ready znode under /barrier.
  7. The creation of the ready znode in step 5 triggers the watch event, and each client starts the computation that they were waiting so far to do.

Phase 2: Leaving the barrier can be done as follows:

  1. Client processing on finishing the computation deletes the znode it created under /barrier (in step 2 of Phase 1: Joining the barrier).
  2. The client process then finds the number of children under /barrier:
    M = getChildren(/barrier, watch=True)

    If M is not equal to 0, this client waits for notifications (observe that we have set the watch event to True in the preceding call).

    If M is equal to 0, then the client exits the barrier znode.

The preceding procedure suffers from a potential herd effect where all client processes wake up to check the number of children left in the barrier when a notification is triggered. To get away with this, we can use a sequential ephemeral znode to be created in step 2 of Phase 1: Joining the barrier. Every client process watches its next lowest sequential ephemeral znode to go away as an exit criterion. This way, only a single event is generated for any client completing the computation, and hence, not all clients need to wake up together to check on its exit condition. For a large number of client processes participating in a barrier, the herd effect can negatively impact the scalability of the ZooKeeper service, and developers should be aware of such scenarios.

Note

A Java language implementation of a double barrier can be found in the ZooKeeper documentation at http://zookeeper.apache.org/doc/r3.4.6/zookeeperTutorial.html.

Queue

A distributed queue is a very common data structure used in distributed systems. A special implementation of a queue, called a producer-consumer queue, is where a collection of processes called producers generate or create new items and put them in the queue, while consumer processes remove the items from the queue and process them. The addition and removal of items in the queue follow a strict ordering of FIFO.

A producer-consumer queue can be implemented using ZooKeeper. A znode will be designated to hold a queue instance, say queue-znode. All queue items are stored as znodes under this znode. Producers add an item to the queue by creating a znode under the queue-znode, and consumers retrieve the items by getting and then deleting a child from the queue-znode.

The FIFO order of the items is maintained using sequential property of znode provided by ZooKeeper. When a producer process creates a znode for a queue item, it sets the sequential flag. This lets ZooKeeper append the znode name with a monotonically increasing sequence number as the suffix. ZooKeeper guarantees that the sequence numbers are applied in order and are not reused. The consumer process processes the items in the correct order by looking at the sequence number of the znode.

The pseudocode for the algorithm to implement a producer-consumer queue using ZooKeeper is shown here:

  1. Let /_QUEUE_ represent the top-level znode for our queue implementation, which is also called the queue-node.
  2. Clients acting as producer processes put something into the queue by calling the create() method with the znode name as "queue-" and set the sequence and ephemeral flags if the create() method call is set true:
    create( "queue-", SEQUENCE_EPHEMERAL)

    The sequence flag lets the new znode get a name like queue- N, where N is a monotonically increasing number.

  3. Clients acting as consumer processes process a getChildren() method call on the queue-node with a watch event set to true:
    M = getChildren(/_QUEUE_, true)

    It sorts the children list M, takes out the lowest numbered child znode from the list, starts processing on it by taking out the data from the znode, and then deletes it.

  4. The client picks up items from the list and continues processing on them. On reaching the end of the list, the client should check again whether any new items are added to the queue by issuing another get_children() method call.
  5. The algorithm continues when get_children() returns an empty list; this means that no more znodes or items are left under /_QUEUE_.

It's quite possible that in step 3, the deletion of a znode by a client will fail because some other client has gained access to the znode while this client was retrieving the item. In such scenarios, the client should retry the delete call.

Using this algorithm for implementation of a generic queue, we can also build a priority queue out of it, where each item can have a priority tagged to it. The algorithm and implementation is left as an exercise to the readers.

C and Java implementations of the distributed queue recipe are shipped along with the ZooKeeper distribution under the recipes folder. Developers can use this recipe to implement distributed lock in their applications.

Kazoo, the Python client library for ZooKeeper, has distributed queue implementations inside the kazoo.recipe.queue module. This queue implementation has priority assignment to the queue items support as well as the queue locking support that is built into it.

Lock

A lock in a distributed system is an important primitive that provides the applications with a means to synchronize their access to shared resources. Distributed locks need to be globally synchronous to ensure that no two clients can hold the same lock at any instance of time.

Typical scenarios where locks are inevitable are when the system as a whole needs to ensure that only one node of the cluster is allowed to carry out an operation at a given time, such as:

  • Write to a shared database or file
  • Act as a decision subsystem
  • Process all I/O requests from other nodes

ZooKeeper can be used to implement mutually exclusive locks for processes that run on different servers across different networks and even geographically apart.

To build a distributed lock with ZooKeeper, a persistent znode is designated to be the main lock-znode. Client processes that want to acquire the lock will create an ephemeral znode with a sequential flag set under the lock-znode. The crux of the algorithm is that the lock is owned by the client process whose child znode has the lowest sequence number. ZooKeeper guarantees the order of the sequence number, as sequence znodes are numbered in a monotonically increasing order. Suppose there are three znodes under the lock-znode: l1, l2, and l3. The client process that created l1 will be the owner of the lock. If the client wants to release the lock, it simply deletes l1, and then the owner of l2 will be the lock owner, and so on.

The pseudocode for the algorithm to implement a distributed lock service with ZooKeeper is shown here:

Let the parent lock node be represented by a persistent znode, /_locknode_, in the Zookeeper tree.

Phase 1: Acquire a lock with the following steps:

  1. Call the create("/_locknode_/lock-",CreateMode=EPHEMERAL_SEQUENTIAL) method.
  2. Call the getChildren("/_locknode_/lock-", false) method on the lock node. Here, the watch flag is set to false, as otherwise it can lead to a herd effect.
  3. If the znode created by the client in step 1 has the lowest sequence number suffix, then the client is owner of the lock, and it exits the algorithm.
  4. Call the exists("/_locknode_/<znode path with next lowest sequence number>, True) method.
  5. If the exists() method returns false, go to step 2.
  6. If the exists() method returns true, wait for notifications for the watch event set in step 4.

Phase 2: Release a lock as follows:

  1. The client holding the lock deletes the node, thereby triggering the next client in line to acquire the lock.
  2. The client that created the next higher sequence node will be notified and hold the lock. The watch for this event was set in step 4 of Phase 1: Acquire a lock.

While it's not recommended that you use a distributed system with a large number of clients due to the herd effect, if the other clients also need to know about the change of lock ownership, they could set a watch on the /_locknode_ lock node for events of the NodeChildrenChanged type and can determine the current owner.

If there was a partial failure in the creation of znode due to connection loss, it's possible that the client won't be able to correctly determine whether it successfully created the child znode. To resolve such a situation, the client can store its session ID in the znode data field or even as a part of the znode name itself. As a client retains the same session ID after a reconnect, it can easily determine whether the child znode was created by it by looking at the session ID.

The idea of creating an ephemeral znode prevents a potential dead-lock situation that might arise when a client dies while holding a lock. However, as the property of the ephemeral znode dictates that it gets deleted when the session times out or expires, ZooKeeper will delete the znode created by the dead client, and the algorithm runs as usual. However, if the client hangs for some reason but the ZooKeeper session is still active, then we might get into a deadlock. This can be solved by having a monitor client that triggers an alarm when the lock holding time for a client crosses a predefined time out.

The ZooKeeper distribution is shipped with the C and Java language implementation of a distributed lock in the recipes folder. The recipe implements the algorithm you have learned so far and takes into account the problems associated with partial failure and herd effect.

The previous recipe of a mutually exclusive lock can be modified to implement a shared lock as well. Readers can find the algorithm and pseudocode for a shared lock using ZooKeeper in the documentation at http://zookeeper.apache.org/doc/r3.4.6/recipes.html#Shared+Locks.

Leader election

In distributed systems, leader election is the process of designating a single server as the organizer, coordinator, or initiator of some task distributed among several individual servers (nodes). After a leader election algorithm is run, a leader or a coordinator among the set of nodes is selected, and the algorithm must ascertain that all the nodes in the system acknowledge its candidature without any discrepancies for the correct functioning of the system.

A leader in a distributed system is required to act as a centralized controller of tasks that simplifies process synchronization. However, a centralized node is a single point of failure, and during failure, it can lead to an anomaly in the system. Hence, a correct and robust leader election algorithm is required to choose a new coordinator or leader on failure of the existing one.

A leader election algorithm has the following two required properties:

  • Liveness: This ensures that most of the time, there is a leader
  • Safety: This ensures that at any given time, there is either no leader or one leader

ZooKeeper can be used to implement a leader-election algorithm, and it can use this algorithm as a leader elector service in distributed applications. The algorithm is similar to the one we used to develop a global mutually-exclusive distributed lock.

Client processes nominating themselves as leaders use the SEQUENCE|EPHEMERAL flags when creating znodes under a parent znode. ZooKeeper automatically appends a monotonically increasing sequence number as a suffix to the child znode as the sequence flag is set. The process that created the znode with the smallest appended sequence number is elected as the leader. However, the algorithm should also take into account the failure of the leader.

The pseudocode for the algorithm is outlined here.

Let /_election_ be the election znode path that acts as the root for all clients participating in the leader election algorithm.

Clients with proposals for their nomination in the leader election procedure perform the following steps:

  1. Create a znode with the /_election_/candidate-sessionID_ path, with both the SEQUENCE and EPHEMERAL flags. The sessionID identifier, as a part of the znode name, helps in recognizing znodes in the case of partial failures due to connection loss. Now, say that ZooKeeper assigns a sequence number N to the znode when the create() call succeeds.
  2. Retrieve the current list of children in the election znode as follows:
    L = getChildren("/_election_", false)

    Here, L represents the list of children of "/_election_".

    The watch is set to false to prevent any herd effect.

  3. Set a watch for changes in /_election_/candidate-sessionID_M, where M is the largest sequence number such that M is less than N, and candidate-sessionID_M is a znode in L as follows:
    exists("/_election_/candidate-sessionID_M", true)
  4. Upon receiving a notification of znode deletion for the watches set in step 3, execute the getChildren(("/_election_", false) method on the election znode.
  5. Let L be the new list of children of _election_. The leader is then elected as follows:
    1. If candidate-sessionID_N (this client) is the smallest node in L, then declare itself as the leader.
    2. Watch for changes on /_election_/candidate-sessionID_M, where M is the largest sequence number such that M is less than N and candidate-sessionID_M is a znode in L.
  6. If the current leader crashes, the client having the znode with the next highest sequence number becomes the leader and so on.

Optionally, a persistent znode is also maintained where the client declaring itself as the leader can store its identifier so that other clients can query who the current leader is by reading this znode at any given time. Such a znode also ensures that that the newly elected leader has acknowledged and executed the leader election procedure correctly.

A Java language implementation for the leader election algorithm using ZooKeeper can be found in the recipes folder of the latest ZooKeeper distribution.

Group membership

A group membership protocol in a distributed system enables processes to reach a consensus on a group of processes that are currently alive and operational in the system. Membership protocols belong to the core components of a distributed system; they aid in maintaining service availability and consistency for the applications. It allows other processes to know when a process joins the system and leaves the system, thereby allowing the whole cluster to be aware of the current system state.

The implementation of a basic group membership protocol is very simple, and we have already used a variant of it while developing the cluster monitor emulation in Chapter 3, Programming with Apache ZooKeeper. A group membership protocol can be developed using the concept of ephemeral znodes. Any client that joins the cluster creates an ephemeral znode under a predefined path to locate memberships in the ZooKeeper tree and set a watch on the parent path. When another node joins or leaves the cluster, this node gets a notification and becomes aware of the change in the group membership.

The pseudocode for the algorithm to implement this group membership protocol is shown here.

Let a persistent znode, /membership, represent the root of the group in the ZooKeeper tree. A group membership protocol can then be implemented as follows:

  1. Clients joining the group create ephemeral nodes under the group root to indicate membership.
  2. All the members of the group will register for watch events on /membership, thereby being aware of other members in the group. This is done as shown in the following code:
    L = getChildren("/membership", true)
  3. When a new client arrives and joins the group, all other members are notified.
  4. Similarly, when a client leaves due to failure or otherwise, ZooKeeper automatically deletes the ephemeral znodes created in step 2. This triggers an event, and other group members get notified.
  5. Live members know which node joined or left by looking at the list of children L.

The preceding algorithm suffers from the herd effect, as events of NodeChildrenChanged emitted due to the joining or leaving of members will cause all other members to wake up and find the current membership of the system.

Two-phase commit

The two-phase commit (2PC) protocol is a distributed algorithm that coordinates all the processes that participate in a distributed atomic transaction on whether to commit or abort (roll back) the transaction. 2PC is a specialized type of consensus protocol and is widely used in transaction processing systems. It ensures an atomic behavior that guarantees that either all the transactions or none of them are completed so that the resources under transactional control remain synchronized.

The 2PC protocol consists of two phases, which are as follows:

  • In the first phase, the coordinator node asks all the transaction's participating processes to prepare and vote to either commit or abort the transaction.
  • In the second phase, the coordinator decides whether to commit or abort the transaction, depending on the result of the voting in the first phase. If all participants voted for commit, it commits the transaction; otherwise, it aborts it. It finally notifies the result to all the participants.

The 2PC protocol is depicted in the following diagram:

Two-phase commit

We can develop a 2PC protocol implementation using ZooKeeper.

Let /2PC_Transactions represent the root node to run the 2PC algorithm in ZooKeeper. The algorithm to do so is as follows.

  1. A coordinator node creates a transaction znode, say /2PC_Transactions/TX. We can use the leader election algorithm to elect the coordinator using ZooKeeper. The coordinator node sets a watch on the transaction node.
  2. Another persistent znode, tx_result, is created under /2PC_Transactions/TX by the coordinator to post the result of the protocol, commit, or abort, and any additional outcomes of the transactions.
  3. Each participating client node sets a watch on the /2PC_Transactions as well as /2PC_Transactions/TX/tx_result znode paths.
  4. When the coordinator node creates the transaction znode, it notifies the participating client nodes that the coordinator node is requesting for voting on the transaction.
  5. The participants then create an ephemeral child znode in the /2PC_Transactions/TX path, with their own identifier (say hostnames) and vote for commit or abort by writing to the data field of their specific znodes.
  6. The coordinator is notified of the creation of all the child znodes, and when the number of child znodes in /2PC_Transactions/TX equals the number of participants, it checks the votes of all the participants by reading the participants' znodes.
  7. If all the participants voted for commit, the coordinator commits the transaction; otherwise, it aborts it. Subsequently, it posts the result of the transaction by writing to the /2PC_Transactions/TX/tx_result znode.
  8. The participant znodes get to know the outcome of the transaction when it gets a notification of NodeDataChanged for /2PC_Transactions/TX/tx_result.

The preceding algorithm might be a little slow, as all messaging happens through the coordinator node, but it takes care of the participant nodes' failure during the execution of the protocol, using ephemeral znodes.

Service discovery

Service discovery is one of the key components of distributed systems and service-oriented architectures where services need to find each other. In the simplest way, service discovery helps clients determine the IP and port for a service that exists on multiple hosts. One common example of this is how a web service can find the right host that serves a caching service in the network. At first glance, it appears that we can use a Domain Name System (DNS) service as a service discovery system. However, a solution with DNS is not viable when the service locations change frequently due to auto or manual scaling, new deployments of services, or when services are failed over or replaced with newer hosts due to host failures.

Important properties of a service discovery system are mentioned here:

  • It allows services to register their availability
  • It provides a mechanism to locate a live instance of a particular service
  • It propagates a service change notification when the instances of a service change

Let /services represent the base path in the ZooKeeper tree for services of the system or platform. Persistent znodes under /services designate services available to be consumed by clients.

A simple service discovery model with ZooKeeper is illustrated as follows:

  • Service registration: For service registrations, hosts that serve a particular service create an ephemeral znode in the relevant path under /services. For example, if a server is hosting a web-caching service, it creates an ephemeral znode with its hostname in /services/web_cache. Again, if some other server hosts a file-serving service, it creates another ephemeral znode with its hostname in /services/file_server and so on.
  • Service discovery: Now, clients joining the system, register for watches in the znode path for the particular service. If a client wants to know the servers in the infrastructure that serve a web-caching service, the client will keep a watch in /services/web_cache.

    If a new host is added to serve web caching under this path, the client will automatically know the details about the new location. Again, if an existing host goes down, the client gets the event notification and can take the necessary action of connecting to another host.

A service discovery system provides a seamless mechanism to guarantee service continuity in the case of failures and is an indispensable part of building a robust and scalable distributed platform. Apache Curator provides an extension called curator-x-discovery in its ZooKeeper library; this implements a service registration and discovery model. It also provides a service discovery server called curator-x-discovery-server that exposes a RESTful web service to register, remove, and query services for non-Java or legacy applications to use the service discovery functionalities.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.53.143