At a high level, Curator is a set of libraries that build on top of ZooKeeper. One of the core goals of Curator is to manage the ZooKeeper handle for you, removing some (ideally all) of the complexity that connection management entails. Connection management is often tricky, as we have discussed in previous chapters, and Curator might come in handy at times.
As part of managing the handle, Curator implements a set of recipes that developers commonly use, incorporating best practices and known edge-case handling. For example, Curator implements recipes for primitives such as locks, barriers, and caches. For ZooKeeper operations like create
, delete
, getData
, etc., it streamlines programming by allowing us to chain calls, a programming style often called fluent. It also provides namespaces, automatic reconnection, and other facilities that make applications more robust.
The Curator components were originally implemented and contributed by Netflix, and it has recently been promoted to a top-level project of the Apache Software Foundation.
In this chapter, we cover the implementation of the master in our example using Curator features. Our goal is not to provide a detailed and extensive discussion of Curator, but simply to introduce it and highlight some of the features that are convenient to use with a ZooKeeper application. Check the project page for an extensive list of its features.
Just as with ZooKeeper, before doing anything with Curator, we need to create a client. The client is typically an instance of CuratorFramework
that we obtain by calling the Curator framework factory:
CuratorFramework
zkc
=
CuratorFrameworkFactory
.
newClient
(
connectString
,
retryPolicy
);
The connectString
input parameter is the list of ZooKeeper servers we can connect to, just like when creating a ZooKeeper client. The retryPolicy
parameter is a new feature of Curator. It enables the developer to specify a policy for retrying operations in the event of disconnections. Recall that with the regular ZooKeeper interface, we typically resubmit operations upon a connection loss event.
Our example instantiates the CuratorFramework
client. There are other methods in the factory class to create an instance, but we don’t cover them here. One is the CuratorZooKeeperClient
class, which provides some additional functionality on top of the ZooKeeper client, such as enabling operations that are safe in the face of unanticipated disconnections. Unlike the CuratorFramework
class, operations on a CuratorZooKeeperClient
are executed directly against the ZooKeeper client handle.
A fluent API enables us to write code by chaining calls instead of relying upon a rigid signature scheme for invoking an operation. For example, with the standard ZooKeeper API, we create a znode synchronously by invoking something like:
zk
.
create
(
"/mypath"
,
new
byte
[
0
],
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
With the fluent API of Curator, we make the same call this way:
zkc
.
create
().
withMode
(
CreateMode
.
PERSISTENT
).
forPath
(
"/mypath"
,
new
byte
[
0
]);
The create
call returns a CreateBuilder
instance and the subsequent calls return an object of a type that CreateBuilder
extends. For example, CreateBuilder
extends CreateModable<ACLBackgroundPathAndBytesable<String>>
, and withMode
is declared in the generic CreateModable<T>
interface. Builders are available for the other operations as well—delete
, setData
, getData
, checkExists
, and getChildren
—through the Curator framework client object.
To execute the same operation asynchronously, we add inBackground
as follows:
zkc
.
create
().
inBackground
().
withMode
(
CreateMode
.
PERSISTENT
).
forPath
(
"/mypath"
,
new
byte
[
0
]);
This returns immediately, and we have to create one or more listeners to receive the callback that is returned when the znode is created. We discuss listeners and how to register them in the next section.
There are a few different ways to implement the callback for an asynchronous call. If we issue the previous string of calls, the callback is delivered in the form of a CREATE
event to registered listeners. The inBackground
call optionally takes a context object, a concrete callback implementation to invoke, and even an executor (java.util.concurrent.Executor
) to execute the callback. In Java, an executor is an object that executes runnable objects; we can use it here to decouple the execution of the callback from the callback thread of the ZooKeeper client. Using an executor is usually better than creating one new thread for each task.
To set a watch, we simply add watched
to the call chain. For example:
zkc
.
getData
().
inBackground
().
watched
().
forPath
(
"/mypath"
);
The notification triggered by the watcher is processed through listeners as well, and they are passed as a WATCHED
event to a given listener. It is also possible to replace watched
with a call to usingWatcher
, which takes a regular ZooKeeper Watcher
object and calls it when it receives the notification. A third option is to pass a CuratorWatcher
object. The process
method of CuratorWatcher
, unlike from a ZooKeeper Watcher
, might throw an exception.
Listeners process events that the Curator library generates. To exercise this mechanism, the application implements one or more listeners and registers them with the Curator framework client. Events are delivered to all registered listeners.
The listener mechanism is generic and can be used for all manner of events that happen asynchronously. As we discussed in the previous section, Curator uses listeners to process callbacks and watch notifications. The mechanism also can be used to handle the exceptions generated by background tasks.
Let’s have a look at how to implement a listener that processes all callbacks and watch notifications for our master Curator example. The first step is to implement the template for a CuratorListener
:
CuratorListener
masterListener
=
new
CuratorListener
()
{
public
void
eventReceived
(
CuratorFramework
client
,
CuratorEvent
event
)
{
try
{
switch
(
event
.
getType
())
{
case
CHILDREN:
...
break
;
case
CREATE:
...
break
;
case
DELETE:
...
break
;
case
WATCHED:
...
break
;
}
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Exception while processing event."
,
e
);
try
{
close
();
}
catch
(
IOException
ioe
)
{
LOG
.
error
(
"IOException while closing."
,
ioe
);
}
}
};
Because the goal here is just to illustrate the structure that we need to implement, we have omitted the code detail for each of the cases. Check the code examples that come with this book for more detail.
We next need to register the listener. For this we need a framework client, which we can create just like the first client we created:
client
=
CuratorFrameworkFactory
.
newClient
(
hostPort
,
retryPolicy
);
Once we have the framework client, we register the listener as follows:
client
.
getCuratorListenable
().
addListener
(
masterListener
);
A special kind of listener deals with errors reported when a background thread catches an exception. This is a low-level detail, but it might be necessary if you want to handle them in your application. When the application needs to deal with such errors, it must implement a different kind of listener:
UnhandledErrorListener
errorsListener
=
new
UnhandledErrorListener
()
{
public
void
unhandledError
(
String
message
,
Throwable
e
)
{
LOG
.
error
(
"Unrecoverable error: "
+
message
,
e
);
try
{
close
();
}
catch
(
IOException
ioe
)
{
LOG
.
warn
(
"Exception when closing."
,
ioe
);
}
}
};
and register it with the listener client as follows:
client
.
getUnhandledErrorListenable
().
addListener
(
errorsListener
);
Note that implementing listeners as event handlers, as we discussed in this section, is somewhat different from the way we proposed to implement ZooKeeper applications in previous chapters. For the master–worker example implemented directly on top of ZooKeeper (see “A Common Pattern”), we chain calls and callbacks, and each callback is handled by a different callback implementation. The callback implementations even have different types. With Curator, the details of a callback or a watch notification are encapsulated into an Event
object, which makes it amenable to an implementation using a single event handler.
Curator exposes a different set of states than ZooKeeper. It has a SUSPENDED
state, for example, and it uses LOST
to represent session expiration. The state machine for the connection states is illustrated in Figure 8-1. When dealing with state changes, our recommendation is in general to halt all operations of the master because we do not know if the ZooKeeper client will be able to reconnect before the session expires, and even if it does, the client might not be the primary master any more. It is safer to play conservatively in the case of a disconnection.
There is an additional READ_ONLY
state, which is not relevant for our example case. A connection goes into read-only mode if the ZooKeeper ensemble has read-only mode enabled and the server the client is connected to goes into read-only mode. As the server transitions to read-only mode, it cannot form a quorum with other servers because it is partitioned away. While the connection is in read-only mode, the client will miss any update that goes through. Such updates are possible if there is a subset of the ensemble that is able to form a quorum and that receives requests from the client to update the ZooKeeper state. A partition can last for an arbitrarily long period of time (it is out of the control of ZooKeeper) and consequently the number of updates it might miss is unbounded. Missing updates could lead to incorrect behavior of the application, so we strongly recommend thinking carefully about the consequences before enabling it. Note that the ability of going into read-only mode is not exclusive of Curator; ZooKeeper enables such an
option (see Chapter 10).
There are a couple of interesting error scenarios that Curator handles nicely. The first one has to do with the presence of errors during the creation of sequential znodes, and the second one with errors when deleting a znode:
If the server the client is connected to crashes before returning the znode name (with the sequence number) or the client simply disconnects, then the client doesn’t get a response even if the operation has been executed. As a consequence, the client doesn’t know the path to the znode it created. Recall that we use sequential znodes, for example, in recipes that establish an order for participating clients. To address this problem, CreateBuilder
provides a withProtection
call that tells the Curator client to prefix the sequential znode with a unique identifier. If the create
fails, the client retries the operation, and as part of retrying it verifies whether there is already a znode with the unique identifier.
A similar situation occurs with delete
operations. If the client disconnects from the server while executing a delete
operation, it doesn’t know whether the delete
operation has succeeded or not. If the presence of the znode being deleted indicates, for example, that a resource is locked, it is important to delete the znode to make sure that the resource is free to be used again. The Curator client provides a call that enables an application to make the execution of a delete
operation guaranteed. The operation is guaranteed in the sense that the Curator client reexecutes the operation until it succeeds, and for as long as the Curator client instance is valid. To use this feature, the DeleteBuilder
interface defines a guaranteed
call.
Curator provides a variety of recipes, and we encourage you to have a look at the extensive list of available recipes implemented. Here we discuss three recipes that we have used in the implementation of the Curator master: LeaderLatch
, LeaderSelector
, and PathChildrenCache
.
We can use the leader latch primitive to elect a master in our application. First, we need a LeaderLatch
instance:
leaderLatch
=
new
LeaderLatch
(
client
,
"/master"
,
myId
);
The constructor of LeaderLatch
takes a Curator framework client, a ZooKeeper path for this leadership group, and an identifier for this master. To enable callbacks when this Curator client acquires or loses leadership, we need to register an implementation of the LeaderLatchListener
interface. This interface has two methods: isLeader
and notLeader
. This is what our isLeader
implementation looks like:
@Override
public
void
isLeader
(
)
{
.
.
.
/* * Start workersCache
*/
workersCache
.
getListenable
(
)
.
addListener
(
workersCacheListener
)
;
workersCache
.
start
(
)
;
(
new
RecoveredAssignments
(
client
.
getZooKeeperClient
(
)
.
getZooKeeper
(
)
)
)
.
recover
(
new
RecoveryCallback
(
)
{
public
void
recoveryComplete
(
int
rc
,
List
<
String
>
tasks
)
{
try
{
if
(
rc
=
=
RecoveryCallback
.
FAILED
)
{
LOG
.
warn
(
"Recovery of assigned tasks failed."
)
;
}
else
{
LOG
.
info
(
"Assigning recovered tasks"
)
;
recoveryLatch
=
new
CountDownLatch
(
tasks
.
size
(
)
)
;
assignTasks
(
tasks
)
;
}
new
Thread
(
new
Runnable
(
)
{
public
void
run
(
)
{
try
{
/* * Wait until recovery is complete */
recoveryLatch
.
await
(
)
;
/* * Start tasks cache */
tasksCache
.
getListenable
(
)
.
addListener
(
tasksCacheListener
)
;
tasksCache
.
start
(
)
;
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Exception while assigning and getting tasks."
,
e
)
;
}
}
}
)
.
start
(
)
;
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Exception while executing the recovery callback"
,
e
)
;
}
}
}
)
;
}
We start the workers cache before anything else to make sure that we have workers to assign tasks to.
Once we determine that we have tasks to assign that have not been assigned by the previous master, we proceed with assigning them.
We implement a barrier so that we wait until the assignment of recovered tasks ends before we move into assigning new tasks. If we don’t do it, then the new master ends up assigning all recovered tasks again. Also, we do it in a separate thread just so that we don’t lock the ZooKeeper client callback thread.
Once the master finishes with assigning recovered tasks, we start assigning new tasks.
We implement this method as part of the CuratorMasterLatch
class, and CuratorMasterLatch
implements LeaderLatchListener
. We need to register the listener, however, before we actually start. We do both in the runForMaster
method, on top of adding two other listeners for watch events and errors, respectively:
public
void
runForMaster
()
{
client
.
getCuratorListenable
().
addListener
(
masterListener
);
client
.
getUnhandledErrorListenable
().
addListener
(
errorsListener
);
leaderLatch
.
addListener
(
this
);
leaderLatch
.
start
();
}
For the notLeader
call, which we execute once the master loses leadership, we simply close everything, which is sufficient for the purposes of this example. For a real application, you may need to clean up some local state and wait to become master again. If the LeaderLatch
object is not closed, the Curator client will be considered for leadership again.
An alternative recipe for electing a master is LeaderSelector
. The main difference between LeaderLatch
and LeaderSelector
is the listener interface they use. LeaderSelector
uses LeaderSelectorListener
instead, which defines a takeLeadership
method and inherits stateChanged
. We can use the leader latch primitive to elect a master in our application. First, we need a LeaderSelector
instance:
leaderSelector
=
new
LeaderSelector
(
client
,
"/master"
,
this
);
The constructor of LeaderSelector
takes a Curator framework client, a ZooKeeper path for the leadership group this master is participating in, and an implementation of LeaderSelectorListener
. The leadership group is the group of Curator clients participating in the master election. The LeaderSelectorListener
implementation must contain both a takeLeadership
method and a stateChanged
one. The takeLeadership
method is executed upon acquiring leadership, and most of its code for our example is the same as the code for isLeader
. In our case, we implement it as follows:
CountDownLatch
leaderLatch
=
new
CountDownLatch
(
1
)
;
CountDownLatch
closeLatch
=
new
CountDownLatch
(
1
)
;
@Override
public
void
takeLeadership
(
CuratorFramework
client
)
throws
Exception
{
.
.
.
/* * Start workersCache */
workersCache
.
getListenable
(
)
.
addListener
(
workersCacheListener
)
;
workersCache
.
start
(
)
;
(
new
RecoveredAssignments
(
client
.
getZooKeeperClient
(
)
.
getZooKeeper
(
)
)
)
.
recover
(
new
RecoveryCallback
(
)
{
public
void
recoveryComplete
(
int
rc
,
List
<
String
>
tasks
)
{
try
{
if
(
rc
=
=
RecoveryCallback
.
FAILED
)
{
LOG
.
warn
(
"Recovery of assigned tasks failed."
)
;
}
else
{
LOG
.
info
(
"Assigning recovered tasks"
)
;
recoveryLatch
=
new
CountDownLatch
(
tasks
.
size
(
)
)
;
assignTasks
(
tasks
)
;
}
new
Thread
(
new
Runnable
(
)
{
public
void
run
(
)
{
try
{
/* * Wait until recovery is complete */
recoveryLatch
.
await
(
)
;
/* * Start tasks cache */
tasksCache
.
getListenable
(
)
.
addListener
(
tasksCacheListener
)
;
tasksCache
.
start
(
)
;
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Exception while assigning and getting tasks."
,
e
)
;
}
}
}
)
.
start
(
)
;
/* * Decrement latch */
leaderLatch
.
countDown
(
)
;
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Exception while executing the recovery callback"
,
e
)
;
}
}
}
)
;
/* * This latch is to prevent this call from exiting. * If we exit, then we release mastership. */
closeLatch
.
await
(
)
;
}
We provide a separate CountDownLatch
to wait until this Curator client acquires leadership.
If the master exits the takeLeadership
call, it gives up mastership. We use a CountDownLatch
to prevent it from exiting until we close the master.
We implement this method as part of the CuratorMaster
class, and CuratorMaster
implements LeaderSelectorListener
. It is important that the master only exits takeLeadership
if it wants to release mastership. We need, essentially, some form of lock to prevent it from exiting. In our implementation, we use a latch that we decrement when exiting the master instance.
We also start the leader selector in the runForMaster
call, but unlike with LeaderLatch
, we do not need to register a listener here (we register the listener in the constructor instead):
public
void
runForMaster
()
{
client
.
getCuratorListenable
().
addListener
(
masterListener
);
client
.
getUnhandledErrorListenable
().
addListener
(
errorsListener
);
leaderSelector
.
setId
(
myId
);
leaderSelector
.
start
();
}
We additionally give this master an arbitrary identifier. Although we have not done it in this example, we could also set the leader selector to automatically requeue (LeaderSelector.autoRequeue
) upon losing leadership. Requeuing means that this client continuously tries to acquire leadership and it executes takeLeadership
each time leadership is acquired.
As part of implementing the LeaderSelectorListener
interface, we implement a method to handle connection state changes:
@Override
public
void
stateChanged
(
CuratorFramework
client
,
ConnectionState
newState
)
{
switch
(
newState
)
{
case
CONNECTED:
//Nothing to do in this case.
break
;
case
RECONNECTED:
// Reconnected, so I should
// still be the leader.
break
;
case
SUSPENDED:
LOG
.
warn
(
"Session suspended"
)
;
break
;
case
LOST:
try
{
close
(
)
;
}
catch
(
IOException
e
)
{
LOG
.
warn
(
"Exception while closing"
,
e
)
;
}
break
;
case
READ_ONLY:
// We ignore this case.
break
;
}
}
The last recipe we make use of in our example is the children cache (class PathChildrenCache
). We use it both for the list of workers and for the list of tasks. This cache is responsible mainly for keeping a local copy of the list of children and for notifying us of changes to the cached set. Note that because of timing issues, the set might not be identical to the one ZooKeeper stores at a particular point in time, although it will eventually reflect changes to the ZooKeeper state.
To deal with changes for each instance of the cache, we implement the PathChildrenCacheListener
interface, which has a single childEvent
method. For the list of workers, we only care about workers going away because we need to reassign their tasks. Additions to the list are important when assigning new tasks:
PathChildrenCacheListener
workersCacheListener
=
new
PathChildrenCacheListener
()
{
public
void
childEvent
(
CuratorFramework
client
,
PathChildrenCacheEvent
event
)
{
if
(
event
.
getType
()
==
PathChildrenCacheEvent
.
Type
.
CHILD_REMOVED
)
{
/*
* Obtain just the worker's name
*/
try
{
getAbsentWorkerTasks
(
event
.
getData
().
getPath
().
replaceFirst
(
"/workers/"
,
""
));
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Exception while trying to reassign tasks."
,
e
);
}
}
}
};
For the list of tasks, we use additions to the list to trigger the assignment process:
PathChildrenCacheListener
tasksCacheListener
=
new
PathChildrenCacheListener
()
{
public
void
childEvent
(
CuratorFramework
client
,
PathChildrenCacheEvent
event
)
{
if
(
event
.
getType
()
==
PathChildrenCacheEvent
.
Type
.
CHILD_ADDED
)
{
try
{
assignTask
(
event
.
getData
().
getPath
().
replaceFirst
(
"/tasks/"
,
""
));
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Exception when assigning task."
,
e
);
}
}
}
};
Note that we make an assumption here that there is at least one worker available to assign tasks to. If there is no worker available, we need to hold the assignment by remembering the additions to the list that have not been assigned and assign them upon an addition to the list of workers. We do not implement this feature for the sake of simplicity; we leave it as an exercise for the reader.
Curator implements a set of nice extensions to the ZooKeeper API, abstracting away some of the complexities of ZooKeeper and implementing best practices gleaned from production experience and discussions in the community. In this chapter, we have covered how to leverage some of the features of Curator for the implementation of the master role in our master–worker example. We have particularly used the leader election implementations and the children cache to implement important features of the master. These two recipes are not the only ones Curator implements, however; a number of other recipes and features are available.
18.218.129.100