CHAPTER 6

image

Synchronizers

Java provides the synchronized keyword for synchronizing thread access to critical sections. Because it can be difficult to correctly write synchronized code that’s based on synchronized, high-level synchronizers (classes that facilitate common forms of synchronization) are included in the concurrency utilities. In this chapter, I introduce you to the countdown latch, cyclic barrier, exchanger, semaphore, and phaser synchronizers.

Countdown Latches

A countdown latch causes one or more threads to wait at a “gate” until another thread opens this gate, at which point these other threads can continue. It consists of a count and operations for “causing a thread to wait until the count reaches zero” and “decrementing the count.”

The java.util.concurrent.CountDownLatch class implements the countdown latch synchronizer. You initialize a CountDownLatch instance to a specific count by invoking this class’s CountDownLatch(int count) constructor, which throws java.lang.IllegalArgumentException when the value passed to count is negative.

CountDownLatch also offers the following methods:

  • void await(): Force the calling thread to wait until the latch has counted down to zero, unless the thread is interrupted, in which case java.lang.InterruptedException is thrown. This method returns immediately when the count is zero.
  • boolean await(long timeout, TimeUnit unit): Force the calling thread to wait until the latch has counted down to zero or the specified timeout value in unit time-units has expired, or the thread is interrupted, in which case InterruptedException is thrown. This method returns immediately when the count is zero. It returns true when the count reaches zero or false when the waiting time elapses.
  • void countDown(): Decrement the count, releasing all waiting threads when the count reaches zero. Nothing happens when the count is already zero when this method is called.
  • long getCount(): Return the current count. This method is useful for testing and debugging.
  • String toString(): Return a string identifying this latch as well as its state. The state, in brackets, includes string literal "Count =" followed by the current count.

You’ll often use a countdown latch to ensure that threads start working at approximately the same time. For example, check out Listing 6-1.

Listing 6-1’s default main thread first creates a pair of countdown latches. The startSignal countdown latch prevents any worker thread from proceeding until the default main thread is ready for them to proceed. The doneSignal countdown latch causes the default main thread to wait until all worker threads have finished.

The default main thread next creates a runnable with a run() method that is executed by subsequently created worker threads.

run() first outputs a message and then calls startSignal’s await() method to wait for this countdown latch’s count to read zero before proceeding, at which point run() outputs a message that indicates work being done and sleeps for a random period of time (0 through 999 milliseconds) to simulate this work.

At this point, run() invokes doneSignal’s countDown() method to decrement this latch’s count. Once this count reaches zero, the default main thread waiting on this signal will continue, shutting down the executor and terminating the application.

After creating the runnable, the default main thread obtains an executor that’s based on a thread pool of NTHREADS threads, and then calls the executor’s execute() method NTHREADS times, passing the runnable to each of the NTHREADS pool-based threads. This action starts the worker threads, which enter run().

Next, the default main thread outputs a message and sleeps for one second to simulate doing additional work (giving all the worker threads a chance to have entered run() and invoke startSignal.await()), invokes startSignal’s countDown() method to cause the worker threads to start running, outputs a message to indicate that it’s doing something else, and invokes doneSignal’s await() method to wait for this countdown latch’s count to reach zero before it can proceed.

Compile Listing 6-1 as follows:

javac CountDownLatchDemo.java

Run the resulting application as follows:

java CountDownLatchDemo

You should observe output that’s similar to the following (message order may differ somewhat):

main thread doing something
1445802274931: Thread[pool-1-thread-2,5,main]: entered run()
1445802274931: Thread[pool-1-thread-3,5,main]: entered run()
1445802274931: Thread[pool-1-thread-1,5,main]: entered run()
main thread doing something else
1445802275931: Thread[pool-1-thread-2,5,main]: doing work
1445802275931: Thread[pool-1-thread-3,5,main]: doing work
1445802275933: Thread[pool-1-thread-1,5,main]: doing work

Cyclic Barriers

A cyclic barrier lets a set of threads wait for each other to reach a common barrier point. The barrier is cyclic because it can be reused after the waiting threads are released. This synchronizer is useful in applications involving a fixed-size party of threads that must occasionally wait for each other.

The java.util.concurrent.CyclicBarrier class implements the cyclic barrier synchronizer. You initialize a CyclicBarrier instance to a specific number of parties (threads working toward a common goal) by invoking this class’s CyclicBarrier(int parties) constructor. This constructor throws IllegalArgumentException when the value passed to parties is less than 1.

Alternatively, you can invoke the CyclicBarrier(int parties, Runnable barrierAction) constructor to initialize a cyclic barrier to a specific number of parties and a barrierAction that’s executed when the barrier is tripped. In other words, when parties - 1 threads are waiting and one more thread arrives, the arriving thread executes barrierAction and then all threads proceed. This runnable is useful for updating shared state before any of the threads continue. This constructor throws IllegalArgumentException when the value passed to parties is less than 1. (The former constructor invokes this constructor passing null to barrierAction—no runnable will be executed when the barrier is tripped.)

CyclicBarrier also offers the following methods:

  • int await():Force the calling thread to wait until all parties have invoked await() on this cyclic barrier. The calling thread will also stop waiting when it or another waiting thread is interrupted, another thread times out while waiting, or another thread invokes reset() on this cyclic barrier. If the calling thread has its interrupted status set on entry or is interrupted while waiting, this method throws InterruptedException and the calling thread’s interrupted status is cleared. The method throws java.util.concurrent.BrokenBarrierException when the barrier is reset (via reset()) while any thread is waiting, or the barrier is broken when await() is invoked or while any thread is waiting. When any thread is interrupted while waiting, all other waiting threads throw BrokenBarrierException and the barrier is placed in the broken state. If the calling thread is the last thread to arrive and a non-null barrierAction was supplied in the constructor, the calling thread executes this runnable before allowing the other threads to continue. This method returns the arrival index of the calling thread, where index getParties() - 1 indicates the first thread to arrive and zero indicates the last thread to arrive.
  • int await(long timeout, TimeUnit unit): This method is equivalent to the previous method except that it lets you specify how long the calling thread is willing to wait. This method throws java.util.concurrent.TimeoutException when this timeout expires while the thread is waiting.
  • int getNumberWaiting(): Return the number of parties that are currently waiting at the barrier. This method is useful for debugging and in partnership with assertions.
  • int getParties(): Return the number of parties that are required to trip the barrier.
  • boolean isBroken(): Return true when one or more parties broke out of this barrier because of interruption or timeout since the cyclic barrier was constructed or the last reset, or when a barrier action failed because of an exception; otherwise, return false.
  • void reset(): Reset the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException. Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to resynchronize in some other way and choose one thread to perform the reset. Therefore, it might be preferable to create a new barrier for subsequent use.

Cyclic barriers are useful in parallel decomposition scenarios, where a lengthy task is divided into subtasks whose individual results are later merged into the overall result of the task. CyclicBarrier’s Javadoc presents example code that’s completed in Listing 6-2.

Listing 6-2’s default main thread first creates a square matrix of floating-point values and dumps this matrix to the standard output stream. This thread then instantiates the Solver class, which creates a separate thread for performing a calculation on each row. The modified matrix is then dumped.

Solver presents a constructor that receives its matrix argument and saves its reference in field data along with the number of rows in field N. The constructor then creates a cyclic barrier with N parties and a barrier action that’s responsible for merging all of the rows into a final matrix. Finally, the constructor creates a worker thread that executes a separate Worker runnable that’s responsible for processing a single row in the matrix. The constructor then waits until the workers are finished.

Worker’s run() method repeatedly invokes processRow() on its specific row until done() returns true, which (in this example) it does after processRow() executes one time. After processRow() returns, which indicates that the row has been processed, the worker thread invokes await() on the cyclic barrier; it cannot proceed.

At some point, all of the worker threads will have invoked await(). When the final thread, which processes the final row in the matrix, invokes await(), it will trigger the barrier action, which merges all processed rows into a final matrix. In this example, a merger isn’t required, but it would be required in more complex examples.

The final task performed by mergeRows() is to notify the main thread that invoked Solver’s constructor. This thread is waiting on the monitor associated with String object "abc". A call to notify() suffices to wake up the waiting thread, which is the only thread waiting on this monitor.

Compile Listing 6-2 as follows:

javac CyclicBarrierDemo.java

Run the resulting application as follows:

java CyclicBarrierDemo

You should observe output that’s similar to the following (message order may differ somewhat):

0.0 1.0 2.0
3.0 4.0 5.0
6.0 7.0 8.0

main thread waiting
Processing row: 0
Processing row: 1
Processing row: 2
merging
main thread notified

0.0 10.0 20.0
30.0 40.0 50.0
60.0 70.0 80.0

Exchangers

An exchanger provides a synchronization point where threads can swap objects. Each thread presents some object on entry to the exchanger’s exchange() method, matches with a partner thread, and receives its partner’s object on return. Exchangers can be useful in applications such as genetic algorithms (see http://en.wikipedia.org/wiki/Genetic_algorithm) and pipeline designs.

The generic java.util.concurrent.Exchanger<V> class implements the exchanger synchronizer. You initialize an exchanger by invoking the Exchanger() constructor. You then invoke either of the following methods to perform an exchange:

  • V exchange(V x): Wait for another thread to arrive at this exchange point (unless the calling thread is interrupted), and then transfer the given object to it, receiving the other thread’s object in return. If another thread is already waiting at the exchange point, it’s resumed for thread-scheduling purposes and receives the object passed in by the calling thread. The current thread returns immediately, receiving the object passed to the exchanger by the other thread. This method throws InterruptedException when the calling thread is interrupted.
  • V exchange(V x, long timeout, TimeUnit unit): This method is equivalent to the previous method except that it lets you specify how long the calling thread is willing to wait. It throws TimeoutException when this timeout expires while the thread is waiting.

Listing 6-3 expands on the repeated buffer filling and emptying Exchanger example presented in Exchanger’s Javadoc.

Listing 6-3’s default main thread creates an exchanger and a pair of buffers via static field initializers. It then instantiates the EmptyingLoop and FillingLoop local classes and passes these runnables to new Thread instances whose threads are then started. (I could have used executors.) Each runnable’s run() method enters an infinite loop that repeatedly adds to or removes from its buffer. When the buffer is full or empty, the exchanger is used to swap these buffers and the filling or emptying continues.

Compile Listing 6-3 as follows:

javac ExchangerDemo.java

Run the resulting application as follows:

java ExchangerDemo

You should observe a prefix of the output that’s similar to the following (the message order may differ somewhat):

Adding I0
Adding I1
Adding I2
Adding I3
Adding I4
Adding I5
Adding I6
Adding I7
Adding I8
Adding I9
taking: I0
taking: I1
taking: I2
taking: I3
taking: I4
taking: I5
taking: I6
taking: I7
taking: I8
taking: I9
emptying thread wants to exchange
Adding: NI0
Adding: NI1
Adding: NI2
Adding: NI3
Adding: NI4
Adding: NI5
Adding: NI6
Adding: NI7
Adding: NI8
Adding: NI9
filling thread wants to exchange
filling thread receives exchange
emptying thread receives exchange
Adding: NI10
taking: NI0
Adding: NI11
taking: NI1
Adding: NI12

Semaphores

A semaphore maintains a set of permits for restricting the number of threads that can access a limited resource. A thread attempting to acquire a permit when no permits are available blocks until some other thread releases a permit.

Image Note  Semaphores whose current values can be incremented past 1 are known as counting semaphores, whereas semaphores whose current values can be only 0 or 1 are known as binary semaphores or mutexes. In either case, the current value cannot be negative.

The java.util.concurrent.Semaphore class implements this synchronizer and conceptualizes a semaphore as an object maintaining a set of permits. You initialize a semaphore by invoking the Semaphore(int permits) constructor where permits specifies the number of available permits. The resulting semaphore’s fairness policy is set to false (unfair). Alternatively, you can invoke the Semaphore(int permits, boolean fair) constructor to also set the semaphore’s fairness setting to true (fair).

SEMAPHORES AND FAIRNESS

When the fairness setting is false, Semaphore makes no guarantees about the order in which threads acquire permits. In particular, barging is permitted; that is, a thread invoking acquire() can be allocated a permit ahead of a thread that has been waiting—logically the new thread places itself at the head of the queue of waiting threads. When fair is set to true, the semaphore guarantees that threads invoking any of the acquire() methods are selected to obtain permits in the order in which their invocation of those methods was processed (first-in-first-out; FIFO). Because FIFO ordering necessarily applies to specific internal points of execution within these methods, it’s possible for one thread to invoke acquire() before another thread but reach the ordering point after the other thread, and similarly upon return from the method. Also, the untimed tryAcquire() methods don’t honor the fairness setting; they’ll take any available permits.

Generally, semaphores used to control resource access should be initialized as fair, to ensure that no thread is starved out from accessing a resource. When using semaphores for other kinds of synchronization control, the throughput advantages of unfair ordering often outweigh fairness considerations.

Semaphore also offers the following methods:

  • void acquire(): Acquire a permit from this semaphore, blocking until one is available or the calling thread is interrupted. InterruptedException is thrown when it’s interrupted.
  • void acquire(int permits): Acquire permits permits from this semaphore, blocking until they are available or the calling thread is interrupted. InterruptedException is thrown when interrupted; IllegalArgumentException is thrown when permits is less than zero.
  • void acquireUninterruptibly(): Acquire a permit, blocking until one is available.
  • void acquireUninterruptibly(int permits): Acquire permits permits, blocking until they are all available. IllegalArgumentException is thrown when permits is less than zero.
  • int availablePermits(): Return the current number of available permits. This method is useful for debugging and testing.
  • int drainPermits(): Acquire and return a count of all permits that are immediately available.
  • int getQueueLength(): Return an estimate of the number of threads waiting to acquire permits. The returned value is only an estimate because the number of threads may change dynamically while this method traverses internal data structures. This method is designed for use in monitoring the system state and not for synchronization control.
  • boolean hasQueuedThreads(): Query whether any threads are waiting to acquire permits. Because cancellations may occur at any time, a true return value doesn’t guarantee that another thread will ever acquire permits. This method is designed mainly for use in monitoring the system state. It returns true when there may be other waiting threads.
  • boolean isFair(): Return the fairness setting (true for fair and false for unfair).
  • void release(): Release a permit, returning it to the semaphore. The number of available permits is increased by one. If any threads are trying to acquire a permit, one thread is selected and given the permit that was just released. That thread is reenabled for thread scheduling purposes.
  • void release(int permits): Release permits permits, returning them to the semaphore. The number of available permits is increased by permits. If any threads are trying to acquire permits, one is selected and given the permits that were just released. If the number of available permits satisfies that thread’s request, the thread is reenabled for thread scheduling purposes; otherwise, the thread will wait until sufficient permits are available. If there are permits available after this thread’s request has been satisfied, those permits are assigned to other threads trying to acquire permits. IllegalArgumentException is thrown when permits is less than zero.
  • String toString(): Return a string identifying this semaphore as well as its state. The state, in brackets, includes the string literal "Permits =" followed by the number of permits.
  • boolean tryAcquire(): Acquire a permit from this semaphore but only when one is available at the time of invocation. Return true when the permit was acquired. Otherwise, return immediately with value false.
  • boolean tryAcquire(int permits): Acquire permits permits from this semaphore but only when they are available at the time of invocation. Return true when the permits were acquired. Otherwise, return immediately with value false. IllegalArgumentException is thrown when permits is less than zero.
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit): Like the previous method but the calling thread waits when permits permits aren’t available. The wait ends when the permits become available, the timeout expires, or the calling thread is interrupted, in which case InterruptedException is thrown.
  • boolean tryAcquire(long timeOut, TimeUnit unit): Like tryAcquire(int permits) but the calling thread waits until a permit is available. The wait ends when the permit becomes available, the timeout expires, or the calling thread is interrupted, in which case InterruptedException is thrown.

Listing 6-4 expands on the “controlling access to a pool of items” Semaphore example presented in Semaphore’s Javadoc.

Listing 6-4’s default main thread creates a resource pool, a runnable for repeatedly acquiring and putting back resources, and an array of executors. Each executor is told to execute the runnable.

Pool’s String getItem() and void putItem(String item) methods obtain and return string-based resources. Before obtaining an item in getItem(), the calling thread must acquire a permit from the semaphore, which guarantees that an item is available for use. When the thread finishes with the item, it calls putItem(String), which returns the item to the pool and then releases a permit to the semaphore, which lets another thread acquire that item.

No synchronization lock is held when acquire() is called because that would prevent an item from being returned to the pool. However, String getNextAvailableItem() and boolean markAsUnused(String item) are synchronized to maintain pool consistency. (The semaphore encapsulates the synchronization for restricting access to the pool separately from the synchronization that’s required for maintaining pool consistency.)

Compile Listing 6-4 as follows:

javac SemaphoreDemo.java

Run the resulting application as follows:

java SemaphoreDemo

You should observe a prefix of the output that’s similar to the following (message order may differ somewhat):

pool-1-thread-1 acquiring I0
pool-2-thread-1 acquiring I1
pool-3-thread-1 acquiring I2
pool-5-thread-1 acquiring I3
pool-7-thread-1 acquiring I4
pool-4-thread-1 acquiring I5
pool-6-thread-1 acquiring I6
pool-9-thread-1 acquiring I7
pool-8-thread-1 acquiring I8
pool-10-thread-1 acquiring I9
pool-9-thread-1 putting back I7
pool-2-thread-1 putting back I1
pool-11-thread-1 acquiring I7
pool-9-thread-1 acquiring I1
pool-8-thread-1 putting back I8
pool-2-thread-1 acquiring I8
pool-5-thread-1 putting back I3
pool-8-thread-1 acquiring I3
pool-4-thread-1 putting back I5
pool-5-thread-1 acquiring I5
pool-6-thread-1 putting back I6
pool-4-thread-1 acquiring I6
pool-1-thread-1 putting back I0
pool-6-thread-1 acquiring I0
pool-7-thread-1 putting back I4
pool-1-thread-1 acquiring I4
pool-10-thread-1 putting back I9
pool-7-thread-1 acquiring I9
pool-3-thread-1 putting back I2
pool-10-thread-1 acquiring I2

Phasers

A phaser is a more flexible cyclic barrier. Like a cyclic barrier, a phaser lets a group of threads wait on a barrier; these threads continue after the last thread arrives. A phaser also offers the equivalent of a barrier action. Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser uses phases and phase numbers.

A phase is the phaser’s current state, and this state is identified by an integer-based phase number. When the last of the registered threads arrives at the phaser barrier, a phaser advances to the next phase and increments its phase number by 1.

The java.util.concurrent.Phaser class implements a phaser. Because this class is thoroughly described in its Javadoc, I’ll point out only a few constructor and methods:

  • The Phaser(int threads) constructor creates a phaser that initially coordinates nthreads threads (which have yet to arrive at the phaser barrier) and whose phase number is initially set to 0.
  • The int register() method adds a new unarrived thread to this phaser and returns the phase number to classify the arrival. This number is known as the arrival phase number.
  • The int arriveAndAwaitAdvance() method records arrival and waits for the phaser to advance (which happens after the other threads have arrived). It returns the phase number to which the arrival applies.
  • The int arriveAndDeregister() method arrives at this phaser and deregisters from it without waiting for others to arrive, reducing the number of threads required to advance in future phases.

Listing 6-5 provides a demonstration of the phaser synchronizer. It’s based on the first example in Phaser’s Javadoc.

Listing 6-5’s default main thread creates a pair of runnable tasks that each report the time (in milliseconds) at which its starts to run. It then runs these tasks after creating a Phaser instance and waiting for both tasks to arrive at the barrier.

Compile Listing 6-5 as follows:

javac PhaserDemo.java

Run the resulting application as follows:

java PhaserDemo

You should observe output that’s similar to the following (and the application should not end—press Ctrl+C or your keystroke equivalent to end the application):

pool-1-thread-1 running at 1445806012709
pool-2-thread-1 running at 1445806012712

As you would expect from countdown latch behavior, both threads start running at (in this case) the same time even though a thread may have been delayed by as much as 349 milliseconds thanks to the presence of Thread.sleep().

Comment out phaser.arriveAndAwaitAdvance(); // await the ... and you should now observe the threads starting at radically different times, as illustrated here:

pool-2-thread-1 running at 1445806212870
pool-1-thread-1 running at 1445806213013

EXERCISES

The following exercises are designed to test your understanding of Chapter 6’s content:

  1. Define synchronizer.
  2. Describe the behavior of a countdown latch.
  3. What happens when CountDownLatch’s void countDown() method is called and the count reaches zero?
  4. Describe the behavior of a cyclic barrier.
  5. True or false: CyclicBarrier’s int await() method returns -1 when the barrier is reset while any thread is waiting or when the barrier is broken when await() is invoked.
  6. Describe the behavior of an exchanger.
  7. What does Exchanger’s V exchange(V x) method accomplish?
  8. Describe the behavior of a semaphore.
  9. Identify the two kinds of semaphores.
  10. Describe the behavior of a phaser.
  11. What does Phaser’s int register() method return?
  12. Listing 3-2 (in Chapter 3) presented an enhanced PC application. Recreate this application where the synchronization is handled by the Semaphore class.

Summary

Java provides the synchronized keyword for synchronizing thread access to critical sections. Because it can be difficult to correctly write synchronized code that’s based on synchronized, high-level synchronizers are included in the concurrency utilities.

A countdown latch causes one or more threads to wait at a “gate” until another thread opens this gate, at which point these other threads can continue. It consists of a count and operations for “causing a thread to wait until the count reaches zero” and “decrementing the count.”

A cyclic barrier lets a set of threads wait for each other to reach a common barrier point. The barrier is cyclic because it can be reused after the waiting threads are released. This synchronizer is useful in applications involving a fixed-size party of threads that must occasionally wait for each other.

An exchanger provides a synchronization point where threads can swap objects. Each thread presents some object on entry to the exchanger’s exchange() method, matches with a partner thread, and receives its partner’s object on return.

A semaphore maintains a set of permits for restricting the number of threads that can access a limited resource. A thread attempting to acquire a permit when no permits are available blocks until some other thread releases a permit.

A phaser is a more flexible cyclic barrier. Like a cyclic barrier, a phaser lets a group of threads wait on a barrier; these threads continue after the last thread arrives. A phaser also offers the equivalent of a barrier action. Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser uses phases and phase numbers.

Chapter 7 presents the Locking Framework.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.199.184