Synchronizers
Java provides the synchronized keyword for synchronizing thread access to critical sections. Because it can be difficult to correctly write synchronized code that’s based on synchronized, high-level synchronizers (classes that facilitate common forms of synchronization) are included in the concurrency utilities. In this chapter, I introduce you to the countdown latch, cyclic barrier, exchanger, semaphore, and phaser synchronizers.
Countdown Latches
A countdown latch causes one or more threads to wait at a “gate” until another thread opens this gate, at which point these other threads can continue. It consists of a count and operations for “causing a thread to wait until the count reaches zero” and “decrementing the count.”
The java.util.concurrent.CountDownLatch class implements the countdown latch synchronizer. You initialize a CountDownLatch instance to a specific count by invoking this class’s CountDownLatch(int count) constructor, which throws java.lang.IllegalArgumentException when the value passed to count is negative.
CountDownLatch also offers the following methods:
You’ll often use a countdown latch to ensure that threads start working at approximately the same time. For example, check out Listing 6-1.
Listing 6-1’s default main thread first creates a pair of countdown latches. The startSignal countdown latch prevents any worker thread from proceeding until the default main thread is ready for them to proceed. The doneSignal countdown latch causes the default main thread to wait until all worker threads have finished.
The default main thread next creates a runnable with a run() method that is executed by subsequently created worker threads.
run() first outputs a message and then calls startSignal’s await() method to wait for this countdown latch’s count to read zero before proceeding, at which point run() outputs a message that indicates work being done and sleeps for a random period of time (0 through 999 milliseconds) to simulate this work.
At this point, run() invokes doneSignal’s countDown() method to decrement this latch’s count. Once this count reaches zero, the default main thread waiting on this signal will continue, shutting down the executor and terminating the application.
After creating the runnable, the default main thread obtains an executor that’s based on a thread pool of NTHREADS threads, and then calls the executor’s execute() method NTHREADS times, passing the runnable to each of the NTHREADS pool-based threads. This action starts the worker threads, which enter run().
Next, the default main thread outputs a message and sleeps for one second to simulate doing additional work (giving all the worker threads a chance to have entered run() and invoke startSignal.await()), invokes startSignal’s countDown() method to cause the worker threads to start running, outputs a message to indicate that it’s doing something else, and invokes doneSignal’s await() method to wait for this countdown latch’s count to reach zero before it can proceed.
Compile Listing 6-1 as follows:
javac CountDownLatchDemo.java
Run the resulting application as follows:
java CountDownLatchDemo
You should observe output that’s similar to the following (message order may differ somewhat):
main thread doing something
1445802274931: Thread[pool-1-thread-2,5,main]: entered run()
1445802274931: Thread[pool-1-thread-3,5,main]: entered run()
1445802274931: Thread[pool-1-thread-1,5,main]: entered run()
main thread doing something else
1445802275931: Thread[pool-1-thread-2,5,main]: doing work
1445802275931: Thread[pool-1-thread-3,5,main]: doing work
1445802275933: Thread[pool-1-thread-1,5,main]: doing work
Cyclic Barriers
A cyclic barrier lets a set of threads wait for each other to reach a common barrier point. The barrier is cyclic because it can be reused after the waiting threads are released. This synchronizer is useful in applications involving a fixed-size party of threads that must occasionally wait for each other.
The java.util.concurrent.CyclicBarrier class implements the cyclic barrier synchronizer. You initialize a CyclicBarrier instance to a specific number of parties (threads working toward a common goal) by invoking this class’s CyclicBarrier(int parties) constructor. This constructor throws IllegalArgumentException when the value passed to parties is less than 1.
Alternatively, you can invoke the CyclicBarrier(int parties, Runnable barrierAction) constructor to initialize a cyclic barrier to a specific number of parties and a barrierAction that’s executed when the barrier is tripped. In other words, when parties - 1 threads are waiting and one more thread arrives, the arriving thread executes barrierAction and then all threads proceed. This runnable is useful for updating shared state before any of the threads continue. This constructor throws IllegalArgumentException when the value passed to parties is less than 1. (The former constructor invokes this constructor passing null to barrierAction—no runnable will be executed when the barrier is tripped.)
CyclicBarrier also offers the following methods:
Cyclic barriers are useful in parallel decomposition scenarios, where a lengthy task is divided into subtasks whose individual results are later merged into the overall result of the task. CyclicBarrier’s Javadoc presents example code that’s completed in Listing 6-2.
Listing 6-2’s default main thread first creates a square matrix of floating-point values and dumps this matrix to the standard output stream. This thread then instantiates the Solver class, which creates a separate thread for performing a calculation on each row. The modified matrix is then dumped.
Solver presents a constructor that receives its matrix argument and saves its reference in field data along with the number of rows in field N. The constructor then creates a cyclic barrier with N parties and a barrier action that’s responsible for merging all of the rows into a final matrix. Finally, the constructor creates a worker thread that executes a separate Worker runnable that’s responsible for processing a single row in the matrix. The constructor then waits until the workers are finished.
Worker’s run() method repeatedly invokes processRow() on its specific row until done() returns true, which (in this example) it does after processRow() executes one time. After processRow() returns, which indicates that the row has been processed, the worker thread invokes await() on the cyclic barrier; it cannot proceed.
At some point, all of the worker threads will have invoked await(). When the final thread, which processes the final row in the matrix, invokes await(), it will trigger the barrier action, which merges all processed rows into a final matrix. In this example, a merger isn’t required, but it would be required in more complex examples.
The final task performed by mergeRows() is to notify the main thread that invoked Solver’s constructor. This thread is waiting on the monitor associated with String object "abc". A call to notify() suffices to wake up the waiting thread, which is the only thread waiting on this monitor.
Compile Listing 6-2 as follows:
javac CyclicBarrierDemo.java
Run the resulting application as follows:
java CyclicBarrierDemo
You should observe output that’s similar to the following (message order may differ somewhat):
0.0 1.0 2.0
3.0 4.0 5.0
6.0 7.0 8.0
main thread waiting
Processing row: 0
Processing row: 1
Processing row: 2
merging
main thread notified
0.0 10.0 20.0
30.0 40.0 50.0
60.0 70.0 80.0
Exchangers
An exchanger provides a synchronization point where threads can swap objects. Each thread presents some object on entry to the exchanger’s exchange() method, matches with a partner thread, and receives its partner’s object on return. Exchangers can be useful in applications such as genetic algorithms (see http://en.wikipedia.org/wiki/Genetic_algorithm) and pipeline designs.
The generic java.util.concurrent.Exchanger<V> class implements the exchanger synchronizer. You initialize an exchanger by invoking the Exchanger() constructor. You then invoke either of the following methods to perform an exchange:
Listing 6-3 expands on the repeated buffer filling and emptying Exchanger example presented in Exchanger’s Javadoc.
Listing 6-3’s default main thread creates an exchanger and a pair of buffers via static field initializers. It then instantiates the EmptyingLoop and FillingLoop local classes and passes these runnables to new Thread instances whose threads are then started. (I could have used executors.) Each runnable’s run() method enters an infinite loop that repeatedly adds to or removes from its buffer. When the buffer is full or empty, the exchanger is used to swap these buffers and the filling or emptying continues.
Compile Listing 6-3 as follows:
javac ExchangerDemo.java
Run the resulting application as follows:
java ExchangerDemo
You should observe a prefix of the output that’s similar to the following (the message order may differ somewhat):
Adding I0
Adding I1
Adding I2
Adding I3
Adding I4
Adding I5
Adding I6
Adding I7
Adding I8
Adding I9
taking: I0
taking: I1
taking: I2
taking: I3
taking: I4
taking: I5
taking: I6
taking: I7
taking: I8
taking: I9
emptying thread wants to exchange
Adding: NI0
Adding: NI1
Adding: NI2
Adding: NI3
Adding: NI4
Adding: NI5
Adding: NI6
Adding: NI7
Adding: NI8
Adding: NI9
filling thread wants to exchange
filling thread receives exchange
emptying thread receives exchange
Adding: NI10
taking: NI0
Adding: NI11
taking: NI1
Adding: NI12
Semaphores
A semaphore maintains a set of permits for restricting the number of threads that can access a limited resource. A thread attempting to acquire a permit when no permits are available blocks until some other thread releases a permit.
Note Semaphores whose current values can be incremented past 1 are known as counting semaphores, whereas semaphores whose current values can be only 0 or 1 are known as binary semaphores or mutexes. In either case, the current value cannot be negative.
The java.util.concurrent.Semaphore class implements this synchronizer and conceptualizes a semaphore as an object maintaining a set of permits. You initialize a semaphore by invoking the Semaphore(int permits) constructor where permits specifies the number of available permits. The resulting semaphore’s fairness policy is set to false (unfair). Alternatively, you can invoke the Semaphore(int permits, boolean fair) constructor to also set the semaphore’s fairness setting to true (fair).
SEMAPHORES AND FAIRNESS
When the fairness setting is false, Semaphore makes no guarantees about the order in which threads acquire permits. In particular, barging is permitted; that is, a thread invoking acquire() can be allocated a permit ahead of a thread that has been waiting—logically the new thread places itself at the head of the queue of waiting threads. When fair is set to true, the semaphore guarantees that threads invoking any of the acquire() methods are selected to obtain permits in the order in which their invocation of those methods was processed (first-in-first-out; FIFO). Because FIFO ordering necessarily applies to specific internal points of execution within these methods, it’s possible for one thread to invoke acquire() before another thread but reach the ordering point after the other thread, and similarly upon return from the method. Also, the untimed tryAcquire() methods don’t honor the fairness setting; they’ll take any available permits.
Generally, semaphores used to control resource access should be initialized as fair, to ensure that no thread is starved out from accessing a resource. When using semaphores for other kinds of synchronization control, the throughput advantages of unfair ordering often outweigh fairness considerations.
Semaphore also offers the following methods:
Listing 6-4 expands on the “controlling access to a pool of items” Semaphore example presented in Semaphore’s Javadoc.
Listing 6-4’s default main thread creates a resource pool, a runnable for repeatedly acquiring and putting back resources, and an array of executors. Each executor is told to execute the runnable.
Pool’s String getItem() and void putItem(String item) methods obtain and return string-based resources. Before obtaining an item in getItem(), the calling thread must acquire a permit from the semaphore, which guarantees that an item is available for use. When the thread finishes with the item, it calls putItem(String), which returns the item to the pool and then releases a permit to the semaphore, which lets another thread acquire that item.
No synchronization lock is held when acquire() is called because that would prevent an item from being returned to the pool. However, String getNextAvailableItem() and boolean markAsUnused(String item) are synchronized to maintain pool consistency. (The semaphore encapsulates the synchronization for restricting access to the pool separately from the synchronization that’s required for maintaining pool consistency.)
Compile Listing 6-4 as follows:
javac SemaphoreDemo.java
Run the resulting application as follows:
java SemaphoreDemo
You should observe a prefix of the output that’s similar to the following (message order may differ somewhat):
pool-1-thread-1 acquiring I0
pool-2-thread-1 acquiring I1
pool-3-thread-1 acquiring I2
pool-5-thread-1 acquiring I3
pool-7-thread-1 acquiring I4
pool-4-thread-1 acquiring I5
pool-6-thread-1 acquiring I6
pool-9-thread-1 acquiring I7
pool-8-thread-1 acquiring I8
pool-10-thread-1 acquiring I9
pool-9-thread-1 putting back I7
pool-2-thread-1 putting back I1
pool-11-thread-1 acquiring I7
pool-9-thread-1 acquiring I1
pool-8-thread-1 putting back I8
pool-2-thread-1 acquiring I8
pool-5-thread-1 putting back I3
pool-8-thread-1 acquiring I3
pool-4-thread-1 putting back I5
pool-5-thread-1 acquiring I5
pool-6-thread-1 putting back I6
pool-4-thread-1 acquiring I6
pool-1-thread-1 putting back I0
pool-6-thread-1 acquiring I0
pool-7-thread-1 putting back I4
pool-1-thread-1 acquiring I4
pool-10-thread-1 putting back I9
pool-7-thread-1 acquiring I9
pool-3-thread-1 putting back I2
pool-10-thread-1 acquiring I2
A phaser is a more flexible cyclic barrier. Like a cyclic barrier, a phaser lets a group of threads wait on a barrier; these threads continue after the last thread arrives. A phaser also offers the equivalent of a barrier action. Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser uses phases and phase numbers.
A phase is the phaser’s current state, and this state is identified by an integer-based phase number. When the last of the registered threads arrives at the phaser barrier, a phaser advances to the next phase and increments its phase number by 1.
The java.util.concurrent.Phaser class implements a phaser. Because this class is thoroughly described in its Javadoc, I’ll point out only a few constructor and methods:
Listing 6-5 provides a demonstration of the phaser synchronizer. It’s based on the first example in Phaser’s Javadoc.
Listing 6-5’s default main thread creates a pair of runnable tasks that each report the time (in milliseconds) at which its starts to run. It then runs these tasks after creating a Phaser instance and waiting for both tasks to arrive at the barrier.
Compile Listing 6-5 as follows:
javac PhaserDemo.java
Run the resulting application as follows:
java PhaserDemo
You should observe output that’s similar to the following (and the application should not end—press Ctrl+C or your keystroke equivalent to end the application):
pool-1-thread-1 running at 1445806012709
pool-2-thread-1 running at 1445806012712
As you would expect from countdown latch behavior, both threads start running at (in this case) the same time even though a thread may have been delayed by as much as 349 milliseconds thanks to the presence of Thread.sleep().
Comment out phaser.arriveAndAwaitAdvance(); // await the ... and you should now observe the threads starting at radically different times, as illustrated here:
pool-2-thread-1 running at 1445806212870
pool-1-thread-1 running at 1445806213013
EXERCISES
The following exercises are designed to test your understanding of Chapter 6’s content:
Summary
Java provides the synchronized keyword for synchronizing thread access to critical sections. Because it can be difficult to correctly write synchronized code that’s based on synchronized, high-level synchronizers are included in the concurrency utilities.
A countdown latch causes one or more threads to wait at a “gate” until another thread opens this gate, at which point these other threads can continue. It consists of a count and operations for “causing a thread to wait until the count reaches zero” and “decrementing the count.”
A cyclic barrier lets a set of threads wait for each other to reach a common barrier point. The barrier is cyclic because it can be reused after the waiting threads are released. This synchronizer is useful in applications involving a fixed-size party of threads that must occasionally wait for each other.
An exchanger provides a synchronization point where threads can swap objects. Each thread presents some object on entry to the exchanger’s exchange() method, matches with a partner thread, and receives its partner’s object on return.
A semaphore maintains a set of permits for restricting the number of threads that can access a limited resource. A thread attempting to acquire a permit when no permits are available blocks until some other thread releases a permit.
A phaser is a more flexible cyclic barrier. Like a cyclic barrier, a phaser lets a group of threads wait on a barrier; these threads continue after the last thread arrives. A phaser also offers the equivalent of a barrier action. Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser uses phases and phase numbers.
Chapter 7 presents the Locking Framework.
3.138.37.20