image

CHAPTER

18

Blocking Queues and Synchronizers

images you saw in the previous chapter, the Java language has had support for threads and concurrency from day one. You have seen the use of synchronization primitives such as synchronized and volatile, and the use of the Thread class. These concurrency primitives addressed the needs of the then-available hardware technology (in 1995), where all commercially available machines offered only a single-core CPU for processing. The threads in those days provided asynchrony, rather than concurrency, which was adequate for the demands of that time. Now, by default, all machines come with a multicore processor; your machine may even have multiple processors. So these days, software developers need to write applications that leverage parallelism to fully exploit the available hardware.

The question is, what kinds of applications can leverage this parallelism? In many situations, the parallelism helps in improving application performance. Consider the use-case of an automated price quote system. Such a web application requires access to three databases—the pricing database, which provides the item’s base price; the customer database, which provides the discount structure for a customer; and the shipping database, which provides the basic shipping costs for various modes of shipping. The computations and results provided by these three database accesses are independent of each other; in the end, the three results are aggregated to generate a final price quote, as depicted in Figure 18-1.

image

FIGURE 18-1.   Using parallelism in an automated price quote system

Let’s look at a few more use-cases where implementing parallelism can help. A game with multiple animations can benefit from forking each animation to run on an independent processor. An image-processing application, where each pixel in an image needs some sort of manipulation, such as reversing its color, will benefit from splitting the number of pixels into smaller groups and assigning each group to an independent core for processing. A game can be more exciting when multiple events take place concurrently. A healthcare application may run various tests concurrently to determine a patient’s diagnosis. An application that evaluates portfolios needs to communicate with various markets concurrently. There is no end to this list; you will be able to find several areas where parallelism can help improve application performance and generate a richer user experience.

From our discussions, do not jump to the conclusion that every application must use the concurrency framework that you are going to learn about in this and the next chapter. Typical sequential operations cannot be split into parallel programs and would continue running in a single thread. The kinds of applications we’ve looked at so far in this book, which are inherently small, do not require parallelism. Many GUI applications also do not require explicit thread programming from the developer’s perspective.

In general, you would use the concurrency framework for a variety of performance-scaling needs as well as for splitting heavy linear tasks into smaller tasks that can be executed concurrently. Generally, CPU-intensive applications will be ideal candidates for exploiting parallelism.

You have seen the use of the interrupt, join, sleep, notify, and wait methods to implement concurrency in our programs. The use of these methods was a bit tedious. J2SE 5.0 came up with a higher-level framework to provide easier, simpler, and less-error-prone handling of threads. This is called a concurrency framework, and it enables you to write applications that it can apply multiple threads to. In turn, the VM maps those multiple threads onto the available multiple hardware threads. On the other hand, if you write a single-threaded application (say, a for loop that iterates through millions of records), the VM can only use one execution thread to do the work. And this can only map to one hardware thread, even if 127 other hardware threads are idle. Higher-level concurrency APIs can help you rewrite this into a multithreaded application. A lot of art and science is involved in how different VM implementations map Java threads to hardware threads on different systems. The important thing to note is that modern VMs on multiple-core/multiple-processor architectures do make a mapping that is of great benefit to multithreaded applications. So let’s start with the various synchronization constructs introduced in J2SE 5.0.

In particular, you will learn the following constructs in this chapter:

image  Blocking queues
image  Countdown latches
image  Semaphores
image  Barriers
image  Phasers
image  Exchangers

Blocking Queues

Many synchronization problems can be formulated with the help of blocking queues. A blocking queue is a queue that blocks when a thread tries to dequeue from an empty queue or tries to enqueue an item into a full queue. A thread trying to remove an item from an empty queue blocks until some other thread puts an item into that queue. Similarly, a thread trying to insert an item into a queue that is full will block until some other thread removes an item from that queue. The blocking may be implemented by using either the polling or the wait-notify mechanism discussed in the last chapter. In the case of polling, a reader thread calls the get method on the queue periodically until a message becomes available in the queue. In case of the wait-notify mechanism, the reader thread simply waits on the queue object that notifies it whenever an item is available for reading.

In real life, you come across many situations where these blocking queues can be used. Consider buying a ticket at the railway booth. When you approach the counter, you find line of people (that is, a queue). You stand at the end of the queue and wait for your turn to come. The person at the counter takes requests from the beginning of the queue—one request at a time. Serving a request requires a few synchronization operations that take place at the counter. Once one person is served, the next person in the line gets an opportunity. When more customers arrive at the station, the queue grows dynamically. The size of a queue may be restricted due to space limitations at some railway stations. The ticket officer who deals with only one customer at a time becomes a point of key synchronization in this entire scenario. Both the consumer (the traveler) and the producer (the ticket issuer) are relieved of the duty of taking care of synchronized access to the shared resources (travel tickets). Without the implementation of a disciplined queue in this scenario, all travelers would jump to get their hands on whatever tickets are available at the counter, and chaos would ensue.

Another real-life scenario where blocking queues come to the rescue is in the implementation of a chat server. In a typical chat application, multiple users communicate with each other concurrently. The application has many reader and writer threads. The reader threads take incoming messages and put them on the main message queue. The writer threads take messages off the queue, one at a time, and send them to the appropriate chat clients. The use of a queue effectively decouples the reading and writing processes. At any given time, if the queue is full, the reader thread that has a message to post will have to wait for a slot to be available in the queue. Similarly, if the queue is empty, all writer threads will have to wait until a message becomes available in the queue. Thus, even if the speeds (connection and data transfer speeds) at which the readers and writers operate vary greatly, the integrity of the data (messages) is never compromised.

Blocking queues are also useful in many other scenarios, such as a bulletin board service, a stock-trading system, and more. You will see many practical applications of blocking queues and their uses as you read further.

Characteristics of Blocking Queues

The typical characteristics of a blocking queue may be summarized as follows:

image  Blocking queues provide methods to add items to them. Calls to these methods are blocking calls, where the inserter of an item in the queue has to wait until space becomes available in the queue.
image  The queues provide methods to remove items from them. Once again, these are blocking calls, and takers are made to wait for an item to be put into an empty queue.
image  The add and remove methods may optionally provide a timeout on their wait operations and may be interruptible.
image  Generally, the put and take operations are implemented in separate threads, thus providing good isolation between the two types of operations. Also, they generally achieve this without blocking the entire queue, thus improving the concurrency of these operations significantly.
image  You cannot insert null elements in a blocking queue.
image  A blocking queue may be bound by capacity.
image  The implementations are thread-safe. However, bulk operations such as addAll may not be necessarily performed atomically and may sometimes fail after a few elements have been added.
image  A blocking queue does not intrinsically support a “close” or “shutdown” operation to indicate that no more items can be added.
 

From these characteristics, one can easily deduce that the primary application of a blocking queue would lie in the producer-consumer scenario described in the previous chapter. These queues can safely be used with multiple producers and multiple consumers.

The BlockingQueue Interface

J2SE 5.0 introduced this blocking queue construct as a part of the concurrency framework. The BlockingQueue interface facilitates queue construction and provides several methods for operating on a queue. Once a queue is constructed, you can use the add or remove method to add or remove an element from the queue, as the names suggest. These methods throw some exceptions in the case of a failure. A queue generally has a restricted size. If such a queue gets full, an add operation on it fails. Similarly, a remove operation on an empty queue fails. Besides these very obvious add/remove methods, the BlockingQueue interface provides put and take methods, which block if the operation does not succeed. The put method blocks until an empty slot is available in the queue, and the take method waits until an element is added to an empty queue. The BlockingQueue also provides special methods called offer and poll. These methods do not block; on the contrary, they return a special value to the caller so that the caller can decide whether to wait or to proceed further with its work. Both these methods also provide a timeout version, where they wait for the operation to succeed for a specified amount of time.

Implementations of the BlockingQueue Interface

J2SE 5.0 provides several implementations of the BlockingQueue interface. This section describes these various implementations.

ArrayBlockingQueue

The ArrayBlockingQueue class implements a bounded blocking queue backed by an array. This implements FIFO (first-in, first-out) ordering. New elements are inserted at the tail end, and retrieval takes place from the head of the queue. The tail end of the queue is the element that’s on the queue for the shortest amount of time, and the head element is the one on the queue for the longest amount of time. This queue is of the fixed size, where the size is decided at the time of its construction. The array capacity cannot be increased at a later time. An overloaded constructor also allows you to specify the fairness policy for ordering waiting threads. If this policy is set to true, all blocked operations for insertion or removal will be processed in FIFO order. By default, this is set to false, indicating that the ordering may not be fair to the waiting threads. Although this can cause starvation and predictability problems, it produces better throughput.

image

NOTE

In a situation where many threads compete for a shared resource, a greedy thread may acquire it and make it unavailable to those waiting for it for long periods of time. This situation is called starvation, where the waiting threads starve for the resource to become available.

Another variation of the constructor takes a Collection parameter, through which you can supply the initial data items to the queue.

LinkedBlockingQueue

The LinkedBlockingQueue extends the concept of the array-blocking queue by making the maximum capacity optional. You may still specify the capacity to prohibit excessive expansion. If you do not specify the capacity, the default is the max integer value. Not having a limit on the capacity can be advantageous because the producers do not have to wait if the consumers are behind schedule in picking up the items. Like the array-based queue, an overloaded constructor can accept the initial feed from a collection. This queue generally has higher throughput than an array-based queue, but at the same time has less-predictable performance. Most operations on this queue run in constant time, except for the remove operations, which run in linear time.

PriorityBlockingQueue

A PriorityBlockingQueue is an unbounded queue, where you can decide on the priority ordering of the elements, the same as PriorityQueue from Chapter 16. Priority can be decided by the natural ordering of elements or by a comparator supplied by you. According to a priority queue’s ordering, an attempt to insert noncomparable objects results in a ClassCastException. If the system resources are exhausted, an add operation may fail even though it is an unbounded queue. If the elements have equal priorities, their ordering is not guaranteed. To enforce ordering in such situations, you need to provide your own classes or comparators on secondary keys.

DelayedQueue

A DelayedQueue is a specialized priority queue where the ordering is based on the delay time of each element—the time remaining before an element can be removed from the queue. The DelayedQueue requires the objects to remain resident on the queue for a specified amount of time before they are retrieved. The implementation allows element retrieval only when its delay has expired. The element having the furthest (longest) delay is considered to be at the head of the queue and is the first one available for retrieval. If the delay for none of the elements has expired (in other words, all elements have a positive delay time), then the poll operation will return null because nothing can be retrieved. A peek operation on such a queue will still allow you to see the first unexpired element. The size method returns a total count of both expired and unexpired elements.

SynchronousQueue

The SynchronousQueue implementation defines a blocking queue in which an insert operation must wait for a corresponding remove operation, and vice versa. This is typically used in handoff designs, where an object running in one thread must sync up with an object running in another thread. A common application of this would be in a work-sharing system, where enough consumers are available to ensure that producers do not have to wait to hand over their tasks. Also, the converse is true in the same sense that enough producers are available to ensure consumers do not have to wait. In fact, a synchronous queue does not have any internal capacity—not even the capacity to hold a single object. Therefore, a peek operation would always return null because nothing is available on the queue. The only time an object is available on the queue is when you are trying to remove it. If you know Ada, you probably know its rendezvous channels; a synchronous queue is quite similar to these. This class supports an optional fairness policy that, when set to true, grants threads access in FIFO order. A synchronous queue is an obvious choice when you need a task-handoff design where one thread needs to sync with another one.

TransferQueue

Java SE 7 introduced a new interface called TransferQueue that extends the BlockingQueue. Like a synchronous queue, here a producer may wait for consumers to receive elements. For this, the transfer method was added to this new interface. The existing put method of the BlockingQueue enqueues elements without waiting for receipt. The transfer method is a blocking call; the tryTransfer version of the transfer method is a nonblocking call with an optional timeout. The interface provides getWaitingConsumerCount, which returns an estimate of the number of consumers waiting to receive elements via take or a timed poll method. The hasWaitingConsumer method tests whether at least one consumer is waiting to receive an element. A TransferQueue may be capacity bounded like other blocking queues. The LinkedTransferQueue provides a concrete implementation of this interface.

We will now look at the use of blocking queues with the help of an example.

Stock-trading System

Consider the operation of a stock market, where millions of trades take place during the hours of operation. Furthermore, consider all the traders placing buy and sell orders on IBM or Microsoft stock. We will create a blocking queue to allow these traders to add sell orders to this queue as well as to pick up the pending orders. At any given time, if the queue is full, a trader will have to wait for a slot to become empty. Similarly, a buyer will have to wait until a sell order is available in the queue. To simplify the situation, let’s say that a buyer must always purchase the full quantity of stock available for sale and that no partial or over-purchases can be made. This is illustrated in Figure 18-2.

image

FIGURE 18-2.   Using blocking queues in a stock-trading system

The multiple sellers put their sell orders in the queue. The queue is obviously created and maintained by the stock exchange server. When a buyer comes in, he picks up an order from the top of the queue and buys whatever quantity is available in the current sell order. This requires synchronized access to the database and other resources on the server. The updates to these resources are made by the underlying code on the server. Neither buyers nor sellers are responsible for implementing any synchronization mechanism in their code. This is an example of a blocking queue. The stock exchange server will create and operate such queues for each traded scrip on the exchange.

The implementation of this scenario is given in Listing 18-1.

image

Listing 18-1   Stock Exchange Trade Server Based on Blocking Queues

image
 
image

In the main function, we create an instance of LinkedBlockingQueue:

image

image

Because the LinkedBlockingQueue has unlimited capacity, traders are able to place any number of orders in the queue. If we had used an ArrayBlockingQueue instead, we would have been restricted to a limited number of trades on each scrip.

Next, we create an instance of Seller that is a Runnable class (the implementation is discussed later):

image

image

We create 100 instances of our traders who put their sell orders in the queue. Each sell order will have a random quantity. We thus create an array of 100 threads and schedule them for execution:

image

image

Likewise, we create 100 buyers who pick up the pending sell orders:

image

image

Once the producer and consumer threads are created, they keep running forever, placing orders on and retrieving orders from the queue and blocking themselves periodically, depending on the load at the given time. We need some means of terminating the application. Thus, the main thread now waits for the user to hit the ENTER key on the keyboard:

image

image

When this happens, the main function interrupts all the running producer and consumer threads, requesting that they abort and quit:

image

image

Now, let’s look at the implementation of Seller, which implements the Runnable interface and provides a constructor that takes our OrderQueue as its parameter. The run method sets up an infinite loop:

image

image

In each iteration, we generate a random number for the trade quantity value:

image

image

The order is placed in the queue via a call to its put method. Note that this is a blocking call, so the thread will have to wait for an empty slot in the queue, just in case the queue has a limited capacity specified at the time of its creation.

image

image

For the user’s benefit, we print the sell order details, along with the details of the thread that has placed this order, to the user console:

image

image

The run method will run indefinitely, placing orders periodically in the queue. This thread can be interrupted by another thread via a call to its interrupt method. This is what the main thread does whenever it wants to stop the trading. The interrupt method generates the Interruptedexception. The exception handler simply sets the shutdownRequest flag to true, which causes the infinite loop in the run method to terminate:

image

image

Finally, let’s look at the implementation of Buyer, which is mostly similar to Seller, except for its run method, which we’ll study now. The run method picks up a pending trade from the top of the queue by calling its take method:

image

image

Note that the method will block if no orders are available in the queue. Once again, we print the order and thread details for the user’s knowledge:

image

image

Note that now we need synchronized access to the server resources to perform atomic updates on the server. This now becomes the responsibility of the server implementation, and the seller and buyer threads do not have to worry about synchronization issues.

Some typical program output is shown here:

image

image

The program terminates when the user hits the ENTER key on the keyboard.

If we had not used the blocking queues in this program, there would be contention in accessing the trades placed on an unsynchronized queue. Everybody would try grabbing an order selling a stock below the current market price. Multiple traders would pick up the same order, and chaos and fights among traders would break out. Because the blocking queue ensures synchronized access to the queue, the integrity of trades is never compromised.

We used the LinkedBlockingQueue in the preceding example; however, we could have used a priority-based queue so that the trades are automatically arranged based on their bids and offers. An order with the highest bid and the lowest offer always tops the queue. To use the priority-based queue, we would need to provide an appropriate comparator.

In the next section, we discuss the use of the newly introduced LinkedTransferQueue class (in Java SE 7).

The LinkedTransferQueue example

We already discussed this class and some of its methods earlier in the chapter, so in this section we jump directly into its application. We will develop a “lucky number” generator that produces a number using randomization and hands it over to a waiting customer. We create 10 customer (consumer) threads. The consumer threads are created with a time gap of two seconds. If a consumer is available, the lucky number generator thread (which is our producer) will produce a number and hand it over to the consumer. Once a consumer receives its lucky number, it quits. Thus, each consumer will get exactly one number. The lucky number generator, along with 10 consumers, is given in Listing 18-2.

image

Listing 18-2   A Lucky Number Generator Based on TransferQueue

image
 
image

In the main method, we create an instance of TransferQueue that stores String objects:

image

image

The method then creates a single producer thread and starts it:

image

image

The producer thread takes the queue we just created as its parameter. After creating the producer thread, the program creates 10 consumer threads with a gap of two seconds in between every 2 threads:

image

image

The Producer that implements Runnable stores the TransferQueue object received in its constructor in a private field. The run method creates an infinite program loop, and we check whether a consumer is waiting for a lucky number; if so, we create a lucky number and transfer it to the waiting consumer:

image

image

The producer thread then sleeps for one second before checking for another waiting consumer.

The Consumer class also implements Runnable, and in its run method it receives the lucky number by calling the take method on the TransferQueue object:

image

image

Once a lucky number is obtained, the consumer thread terminates. Some typical program output is shown here:

image

image

Synchronizers

J2SE 5.0 added several synchronization constructs to the language as a part of the java.util.concurrent package. It includes classes that offer semaphores, barriers, latches, and exchangers. We will now study the use of these classes for synchronization.

Semaphores

Semaphores are useful when you want n number of entities to access a shared resource in a synchronized way. A typical application of semaphores is observed in server applications where multiple threads compete for resources that are in some way limited in number. For example, a website with a lot of concurrency may receive several requests at any given time for certain data stored in an internal database on the server. Because the database connections are expensive in terms of time to create and resources held, you might create only a limited number of connections and keep them in a pool. When a web request arrives to access the data, the server application hands over one of the connections to it, provided one is available in the pool at that instance of time. If not, the web request is made to wait until some connection is returned to the pool by one of the existing users. In some situations, the server application may also decide to increase the pool size if the demand is high. The semaphores help you in achieving this functionality, as you will see shortly.

Semaphores allow n number of entities to access m number of resources in a synchronized way. Contrast this with the use of the synchronized keyword, which allows access only by a single entity. To explain this better, let’s look at a practical situation (see Figure 18-3).

image

FIGURE 18-3.   A banking scenario that implements semaphores

A bank has multiple tellers to serve its customers. Consider this a semaphore count. When a customer walks into the bank, he acquires a permit to engage the teller. After the agent is engaged, the semaphore count is reduced by one. When all the available agents are engaged, the count becomes zero. Now, when a new customer arrives, he cannot obtain a permit and therefore has to wait until an agent becomes available. When the customer finishes his business, he releases his permit, resulting in an increase in semaphore count. The new customer can now get a permit. Thus, multiple customers can now get synchronized access to the bank resources.

The Semaphore class implements the semaphores in Java. When you instantiate this class, you specify the number of permits. To acquire a permit, you use the acquire method. You may acquire more than one permit by specifying it as a parameter to the acquire method. This is a blocking call that blocks itself until a permit is available, or until the waiting thread is interrupted. If you do not want the thread to block on a permit, use the tryAcquire method. It returns false if the required number of permits is not available. The method takes an optional parameter that specifies the amount of time for which the thread will wait for the permit to become available.

The implementation of our bank teller example is given in Listing 18-3.

image

Listing 18-3   Bank Teller Implementation Based on Semaphores

image
 
image

The Bank class declares two static variables. The COUNT variable controls the number of customers who will visit the bank. The semaphore variable holds a reference to the created Semaphore:

image

image

The first parameter to the Semaphore constructor specifies the number of permits. In this case, we have set it to 2, indicating that the bank has only two agents. The second parameter specifies the fairness setting. When set to true, it indicates the FIFO behavior—first come, first served. Because overhead is involved in maintaining a queue of customers, this fairness setting need not always be efficient. As discussed earlier, the default setting is false, where a customer arriving later may obtain the permit earlier.

In the main method, we set up a loop to process requests from all 100 customers:

image

image

For each customer, we create an anonymous thread:

image

image

In the overridden run method, the thread tries to acquire a permit by calling the tryAcquire method of the Semaphore class:

image

image

The first parameter specifies the amount of time to wait, and the second parameter specifies the unit of time. The TimeUnit class defines several constants that allow you to specify the unit of time, even in terms of the number of days. In our case, the thread waits for 10 milliseconds to acquire a permit. If a permit is not obtained within this amount of time, the thread simply gives up and continues with its work. This means that the customer no longer waits in the bank and just walks away. If the permit is obtained, the thread requests the service by calling the getService method on the Teller class, which is discussed later:

image

image

The getService method takes a parameter that identifies the current iteration count and thus the executing thread. After the thread is done with the banker, it releases the permit by calling the release method on the semaphore object:

image

image

Note that we have created an instance of an anonymous Thread class. To start this, we call the start method on the created instance.

Now, let’s look at the Teller class. The Teller class defines one static method called getService:

image

image

In the method, we print the id for the currently executing thread:

image

image

We make the thread sleep for a random amount of time:

image

image

The sleep time varies, anywhere from 0 to 10 milliseconds. This is done to simulate the condition that each customer will take a variable amount of time with the agent. Due to this, when you run the program multiple times, you will find that the number of customers served on each run differs from the previous runs.

The output from two sample runs is shown here:

Run 1:

image

image

Run 2:

image

image

In the second run, obviously the customers were too impatient with the speed of the tellers and quickly left the bank. Now, try changing the customer wait time from 10 milliseconds to a higher value. You will now find that a greater number of customers is now served. This is because the customers are now willing to spend more time in the bank waiting for an agent to become available. You can also experiment by changing the number of permits to allocate more agents.

Barriers

A barrier is like a common crossover point, where everybody waits to join up with the others in the team before crossing over. This is illustrated in Figure 18-4.

image

FIGURE 18-4.   Illustrating a barrier type of synchronization construct

All members of the team decide to meet at one end of the bridge before crossing over. Once a member arrives, he has to wait for others to reach the same junction. When everyone hits the barrier, the program can continue with the next task. Once a barrier (that is, a meeting point) is decided, any other team can use the same barrier to synchronize their actions.

Java provides the barrier implementation in the CyclicBarrier class. Using this class, a set of threads is made to wait for each other to reach a common barrier point. This is typically used in programs where a fixed-sized party of threads must occasionally wait for each other before they can all proceed further. For example, in horse racing, all horses must reach the starting gate before the race starts. This barrier is considered cyclic because it can be reused after all waiting threads are released.

The CyclicBarrier class constructor takes a parameter that decides the number of members in the team. Each member of the team, after completing its work, arrives at the barrier and waits by calling its await method. When all the members arrive at the barrier, the barrier is broken and the program can proceed. Another variation of the constructor takes an additional parameter that refers to a Runnable class, which is executed after the barrier breaks.

To illustrate the use of this class, we will write a program that computes ln (1−x), where ln is the natural logarithm. The definition is given here:

image

image

We will create a thread that computes a single term of this series. Thus, to compute 10 terms, we will create 10 threads. Each thread puts the result in a common array and waits at a barrier for the others to finish. When everybody finishes, a waiting thread computes the array sum and displays the result to the user. The implementation is given in Listing 18-4.

image

Listing 18-4   Natural Logarithm Calculator Demonstrating a Cyclic Barrier

image
 
image

The NaturalLogCalc class declares a few static variables:

image

image

The numberOfTerms decides the number of terms in the series to compute. The termArray provides an array to store the result of each computation. The static variable x sets the value for x in the expression ln (1−x). The main method creates an instance of the CyclicBarrier class:

image

image

The first parameter to the constructor decides the number of parties. This is set to the number of terms we want to compute. This naturally equals the number of threads we will create for computing terms. The second parameter to the constructor is an instance of the runnable class that will be executed after the barrier is broken. We use an anonymous class here. In the overridden run method, the program computes the sum of all elements of the termArray and prints the result to the user.

The main program now proceeds to create the threads for computing individual terms:

image

image

The TermCalc is a Runnable class. Let’s look at its implementation:

image

image

The class is declared private because it will be used only within the enclosing class. It is also declared static so that it is accessible within the static main method. In the run method, we compute the term value and store it in the termArray at an appropriate index:

image

image

After the computation is done, the thread waits at the barrier:

image

image

After everybody arrives at the barrier, the thread comes out of its await method call and proceeds to completion. At this stage, the termArray is fully filled and the main program can proceed to compute the sum of all the terms. When you run the program, you will see output similar to the following:

image

image

Note that every time you run the program, you will see a different order of execution for the threads. Try changing the value of x and the number of terms to compute and observe the execution results.

Countdown Latches

This is yet another synchronization tool provided beginning in J2SE 5.0. In some ways, a countdown latch is like a generalized barrier from the previous section. It provides methods that allow threads to wait for a condition; the only difference is that in the case of a barrier, the threads wait on other threads, whereas in the case of a latch, threads wait for a certain count to reach zero. You specify this count in the constructor. When this count reaches zero, all waiting threads are released.

After the count reaches zero, the latch is not reset and the condition that is now set to true remains true forever. Note that in the beginning, the latch condition starts out with a false status. We will now look at the concept of a countdown latch in the context of the stock-trading server discussed previously.

In our earlier stock-trading example, which was discussed in the context of blocking queues, we created several buyer and seller threads. Ideally, all these trades should have equal privilege when the first order is placed on the exchange. However, because our program creates threads in a definite chronological order, it is likely that the threads created first will find an opportunity to execute before the other threads created later. To avoid this situation, we can make all the threads wait with their orders until we give them the go-ahead signal. This is the purpose of the countdown latch, which is why Java provides the CountDownLatch class. We specify the count in its constructor. The countDown method decrements the count by one. Each thread, whenever it is ready, is made to wait for this count to become zero by calling the await method. Thus, we create all the threads, making each thread wait in its run method. After we have created all the threads, we bring the count down to zero. Now, the threads will proceed with their execution. Which thread gets the CPU first is now solely decided by the thread scheduler.

Likewise, when the stock exchange closes its operations at the end of the trading day, it may need to confirm that all running threads have completed fully before proceeding with further cleanup. This can be achieved by setting the count to 200 (note that we have created a combined total of 200 threads of producers and consumers). We will ask each thread to decrement the count by calling the countDown method at the end of its run method. We make the main thread wait until this count becomes zero. Thus, the main thread cannot proceed until all threads signal their successful completion of the run method. Once this happens, the main thread will continue with any further cleanup operations.

The modified program that implements these countdown latches is given in Listing 18-5.

image

Listing 18-5   Enhanced Stock Exchange Using a Countdown Latch

image
 
image
 
image

In the main method, we create two latches as follows:

image

image

Note that the count for the start signal is set to 1 and that for the stop signal is set to 200. Thus, one single countdown operation on the start signal will release all its waiting threads for execution. The stop signal count has to go from 200 to 0 before the waiting main thread can proceed. Each of our individual buyer and seller threads will decrement the count by one. We send the references to these latches in the Buyer and Seller constructors:

image

image

At the beginning of the run method of both Buyer and Seller, we add the following code:

image

image

Thus, after the thread is created, it is made to wait on the startLatch for a go-ahead signal. In the main method, after we have created all the threads, we execute the following statement:

image

image

The countDown method brings the start latch count to zero. Now, all created threads can begin their execution. All these ready-to-run threads will acquire the CPU based on the scheduling policy of the underlying platform. The point is that each trader gets an equal priority to make the first trade.

At the end of the run method of both the Buyer and Seller classes, we add the following statement:

image

image

This decrements the stop count by one. In the main method, we wait on this count to become zero by executing the await method:

image

image

The program proceeds only when the stopSignal count goes to zero. By this time, every trader has had a chance to complete its pending trade. The main thread can now proceed with rest of the cleanup.

Phaser

Java SE 7 introduced a new reusable synchronization barrier called Phaser that is similar in functionality to CyclicBarrier and CountDownLatch; however, this class provides for more flexible usage. The barriers you’ve studied so far worked on a fixed number of parties. A phaser works with a variable number of barriers, in the sense that you can register a new party at any time and an already registered party can deregister itself upon arrival at the barrier. So the number of parties registered to synchronize on a phaser may vary over time. Like a CyclicBarrier, a phaser can be reused. This means that after a party has arrived at a phaser, it may register itself one more time and await another arrival. Thus, a phaser will have many generations. Once all the parties registered for a particular phase arrive at the phaser, the phase number is advanced. The phase number starts with zero and, after reaching Integer.MAX_VALUE, wraps around to zero again. On a phase change, an optional action may be performed by overriding its onAdvance method. This method can also be used to terminate the phaser; once the phaser is terminated, all synchronization methods immediately return and attempts to register new parties fail.

Another important feature of a phaser is that it may be tiered. This allows you to arrange phasers in tree structures to reduce contention. A smaller group obviously has fewer parties contending for synchronization. Arranging a large number of parties into smaller groups would thus reduce contention. Even though it increases the total throughput, building a phaser requires more overhead. Finally, one more important feature of a phaser is its monitoring. An independent object can monitor the current state of a phaser. This monitor can query the phaser for the number of parties registered and the number of parties that have arrived and have not arrived at a particular phase number.

Now, let’s look at the use of the Phaser class with a practical example. Suppose we want to write a horse-racing simulation game. We could consider the starting gate of the race to be the barrier; when all the horses arrive at the starting gate, the race may begin. The time that each horse needs to reach the starting gate may vary considerably and therefore synchronization at the barrier is required. Once the race begins, each horse will deregister from the phaser, and the same phaser (starting gate) may be reused for another race, scheduled at a later time. The complete simulation program is presented in Listing 18-6.

image

Listing 18-6   A Horse-racing Simulation Program Using Phaser

image
 
image

We define the maximum horse count as 12, which is the typical average for big horse races. We create a phaser by passing the initial count for the number of parties as 1:

image

image

As more horses (parties) register on this phaser, the number of parties awaiting the synchronization barrier will increase.

In the main method, we declare a thread variable called raceMonitor for holding a reference to a thread object that independently monitors the number of horses that have arrived at the starting gate. We create a monitoring thread that monitors the number of horses that have arrived at the starting gate at a particular instance of time:

image

image

After this, we create an application instance and call its managerace method:

image

image

In the managerace method, we create an array for holding the Horse objects and initialize it with the instances of Horses. We start the race by calling the runrace method, which takes this horse array as an argument.

In the runrace method, each Horse in the team is registered with the phaser via a call to its register method:

image

image

After this registration, we wait for a variable amount of time before calling the arriveAndAwaitAdvance method. The current thread then arrives at the phaser and waits for others to arrive.

After all horses arrive at the starting gate, the main thread sleeps for one second and then we call the arriveAndDeregister method to release each horse from the starting gate. Now, the actual race begins. We can use the phaser again to start another race by reregistering a set of horses.

The implementation of the Horse class is very straightforward. In the run and toString methods, we simply print a message to the user. The most important thing in this class is the generation of the unique ID for each horse. To create a unique ID, we would have used the synchronization techniques we covered so far to increment the field ID. The class AtomicInteger provides this facility with improved efficiency and without the use of synchronization constructs. We create a static variable by instantiating the AtomicInteger class:

image

image

The incrementAndGet method atomically increments the value of this variable by one:

image

image

Note that creating multiple instances of the Horse class causes them to contend for this variable.

The raceMonitor thread class calls the getArrivedParties method on the manager (phaser instance) to periodically print the number of horses that have arrived at the starting gate.

Typical partial output on a sample run is given here:

image

image

The output shows the various states of the application. In the beginning, we assign 12 horses to the race. As horses arrive at the starting gate, they wait for all the others to arrive. Once all the horses arrive, we start the race. In the output, you will see several periodic monitoring messages that tell you how many horses have reached the starting gate at a particular instance of time.

Exchangers

An exchanger allows two threads to exchange objects at a rendezvous point; this is generally useful in pipeline designs. An exchanger is often used when a producer and consumer want to exchange a resource. Remember the producer-consumer problem from the previous chapter? We used wait-notify in its implementation; the same could now be achieved using exchangers. The difference is that in the producer-consumer scenario, the producer produces objects, puts them in a shared channel, and notifies the consumer. The consumer then picks up the produced objects from the shared channel. In the case of an exchanger, each thread presents some object upon entry into the exchange method and receives an object presented by the other thread upon its return from the exchange method. Typically this is used in writing communications software where there are two threads—one that collects some data in a communication buffer and the other that empties the buffer and processes the data. The first thread waits until its buffer is completely filled. At this time, it will exchange its full buffer with an empty buffer provided by the consumer thread. The process will continue in a loop, where in each iteration the producer waits for its buffer to be completely filled and then exchanges it for an empty buffer from the consumer.

The producer puts the items to exchange in its buffer. When the buffer gets full, the producer waits for the consumer with an empty buffer where the contents can be transferred. Similarly, the consumer consumes the items from its buffer. When the buffer is empty, the consumer waits for the producer. When waits occur, the two buffers are exchanged and then both parties can continue. For this to work satisfactorily, you must be sure that the producer is going to produce items on an ongoing basis; otherwise, the items already added to the buffer will sit there waiting for the buffer to get full.

Java implements this functionality with the help of the exchanger class. The class constructor does not take any parameters, except for the type of object to exchange:

image

image

The exchange method performs the actual exchange:

image

image

Another variation of the exchange method allows you to define a timeout during which the thread remains dormant. Let’s now look at an example of using this class (see Figure 18-5).

image

FIGURE 18-5.   Exchanger in action

The producer produces items and hands them over to the consumer on an on-going basis. Both are given baskets. Initially, both baskets are empty. The producer produces the items and puts them in his basket. The consumer keeps waiting for the items to be ready. When the producer’s basket gets full, the two baskets are exchanged. This will be done with the help of the exchanger class. The exchange does not require the use of any of the synchronization mechanisms discussed in the previous chapter. The exchanger class takes care of all the needed synchronization while accessing the two baskets in two threads. Now, the producer has an empty basket and the consumer has a full basket. The producer will produce more items and fill his empty basket, while the consumer will consume the items from his full basket. Eventually, the producer’s basket will become full and the consumer’s basket will become empty. Another exchange will take place at this time, and the process repeats itself. We will now write a program to implement this operation (see Listing 18-7). Note that the program does not contain any explicit synchronization constructs.

image

Listing 18-7   Producer/Consumer Implementation Using Exchanger

image
 
image
 
image

The Productexchanger class first creates an instance of exchanger:

image

image

The exchanger operates on a list of integers as specified in the type parameter of its constructor. This is declared public and static so that it can be accessed by the producer and consumer thread classes without creating the class instance. The main method simply creates the two threads and starts them:

image

image

To terminate the program, the main thread waits indefinitely for the user to hit the ENTER key:

image

image

When this happens, both producer and consumer threads are interrupted in their work:

image

image

Note that this is the nice way of stopping a running thread. You make a request to the running thread by interrupting it and then let the running thread decide when to stop. It would usually do so after finishing whatever it is currently doing and comes to the logical end of a running process.

Now, let’s look at the producer class:

image

image

First, we create an empty buffer for the producer to hold our Integer items:

image

image

In the run method, we set up an infinite loop. The okTorun flag is set to true by default. It will be reset when the thread is interrupted. When this occurs, the loop will break.

image

image

We now check whether the buffer is empty. If it is, we add a few random numbers to it until the buffer is filled completely:

image

image

We then put the thread to sleep for a random amount of time to simulate the condition that producing and filling items take some finite amount of time:

image

image

The program now prints the buffer’s contents for the user’s knowledge:

image

image

At this stage, the producer thread requests an exchange with the exchanger. To do this, it calls the exchange method on the exchanger object defined in the main program:

image

image

The exchange method takes the buffer to exchange as its parameter. After the exchange is performed, the returned value will be the exchanged buffer, which in our case is going to be an empty buffer. A call to the exchange method makes the calling thread wait for another thread to arrive at this exchange point. When the other thread arrives, it transfers its object to it, receiving the object given by the second thread in return. Note that a waiting thread may be interrupted, and if that happens, it will no longer continue to wait on the other thread. If the current thread has its interrupted status set upon entry into the exchange method, an InterruptedException is thrown and the thread’s interrupted status is cleared. When a thread calls an exchange method, if at that time another thread is already waiting at the exchange point, it is awakened and scheduled to run. The waiting thread receives the object passed in by the current thread and the current thread returns immediately. If no other thread is waiting at the exchange when a thread calls the exchange method, it is made to wait at the exchange point. It continues doing so until the point when some other thread enters the exchange or interrupts the current thread.

Finally, in the exception handler we simply reset the okToRun flag so that the infinite loop will terminate on its next iteration:

image

image

Now, let’s look at the Consumer class. As with the Producer class, we create an empty buffer for the use of the consumer:

image

image

In the run method, we set up an infinite loop that is terminated only when the thread is interrupted:

image

image

We test whether the buffer is empty; if it is, we initiate an exchange. Note that the actual exchange does not take place until both parties are ready for an exchange. If the producer thread fills its buffer before the consumer thread is able to empty its buffer, the producer thread is made to wait at the exchange point. Similarly, if the consumer buffer clears its buffer before the producer is able to fill its buffer completely, the consumer is made to wait at the exchange point. When both buffers are ready for an exchange (that is, when the producer buffer is full and the consumer buffer is empty), the exchange takes place:

image

image

After the exchange, we print the buffer contents to verify that the consumer has actually received the items:

image

image

To simulate the condition that the consumer will take a finite amount of time to consume all the contents of the basket, we cause the thread to sleep for a variable amount of time before emptying the buffer:

image

image

Typical program output is shown here:

image

image

Note that after each exchange, the contents of the consumer buffer are identical to the contents of the producer buffer. Thus, the buffers were exchanged as expected without the use of any explicit synchronization constructs.

Summary

In this chapter, you studied many synchronization constructs provided in Java beginning in J2SE 5.0. The use of these constructs relieves developers from employing any explicit synchronization in their programs. Many synchronization needs can be modeled and solved using blocking queues. Java provides several classes to implement blocking queue functionality, where you can create both bounded and dynamically growing queues. You can prioritize the objects in the queue for obtaining the service, and you can also decide on a delayed service for each object.

Besides blocking queues, J2SE 5.0 defines semaphores, countdown latches, barriers, and exchangers. Semaphores allow the sharing of n shared resources among competing threads. The countdown latch makes threads wait until the running threads bring a count value down to zero. A barrier is a common point where the running threads meet after completing their individual work. After each thread arrives at the barrier, a new task can be initiated. Until then, the new task is made to wait. The exchanger allows an easy exchange between the individual buffers of the producer and the consumer. The producer fills the buffer to its completion, and the consumer consumes the items from its buffer until it becomes empty. When both the conditions are met (that is, the producer buffer is full and the consumer buffer is empty), the exchanger performs an exchange of the two buffers.

This chapter covered many synchronization constructs. The need for concurrency-enabled programs is even bigger than the synchronization mechanisms discussed in this chapter. You will understand these needs and how Java addresses them, including its latest Fork/Join framework, in the next chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.159.195