Communicating Mutually Exclusive Threads

The hardest kind of programming with threads is when the threads need to pass data back and forth. Imagine that we are in the same situation as the previous section. We have threads that process the same data, so we need to run synchronized. In our new case, however, imagine that it's not enough just to say, “Don't run while I am running.” We need the threads to be able to say, “OK, I have some data ready for you,” and to suspend themselves if there isn't data ready. There is a convenient parallel programming idiom known as wait/notify that does exactly this. Figure 14-3 shows this in four stages.

Figure 14-3. Communicating mutually exclusive threads

image

Wait/notify is a tricky language-independent protocol that has been developed by ingenious minds. Wait/notify wasn't invented by Java, and it's not Java-specific. If you've ever taken a college-level course in operating system concurrency, you have probably seen it. Otherwise, you just need to appreciate it as the accepted solution to this problem. Wait/notify is used when synchronized methods in the same class need to communicate with each other. The most common occurrence of this is a producer/consumer situation—one thread is producing data irregularly and another thread is consuming (processing) it.

Methods in the threads are called only from within synchronized code, which means they are called only when a mutex lock is held. However, simple synchronization is not enough. The consumer might grab the lock and then find that there is nothing in the buffer to consume. The producer might grab the lock and find that there isn't yet room in the buffer to put something. You could make either of these spin in a busy loop continually grabbing the lock, testing whether they can move some data and releasing the lock if not. But busy loops are never used in production code. They burn CPU cycles without any productive result.[1]

The correct approach lies in the two method calls wait() and notify(). This protocol allows a thread to pause or wait while another thread takes over the lock.

  • The wait() method says, “Oops, even though I have the lock I can't go any further until you have some data for me, so I will release the lock and suspend myself here. One of you notifiers carry on! I'll wake up when the lock becomes free, and try to grab it.”

  • The notify() method says, “Hey, I just produced some data, so I will release the lock and suspend myself here. One of you waiters wake up, take the lock back, then carry on!”

The pseudo-code for the way the producer works is:

// producer thread
grab mutex lock
produce_data()
notify() // this includes releasing the lock.

The pseudo-code for the consumer is:

// consumer thread
grab mutex lock, i.e. enter synchronized code
while( no_data )
     wait()      // releases lock and stalls here till notified
                 // then it tries to re-acquire the lock
                 // if it does not get the lock, it stays waiting
                 // otherwise it proceeds to check the while again
                 // if there now is data, it falls through
consume_the_data()
leave synchronized code

The consumer waits in a loop, testing whether there is yet data, because a different consumer might have grabbed the data, in which case it needs to wait again. As we have already seen, entering and leaving synchronized code is trivially achieved by applying the keyword “synchronized” to a method, so the templates become like this:

// producer thread
produce_data()
notify()

// consumer thread
while( no_data )
     wait()
consume_the_data()

Usually, the producer is storing the produced data into some kind of bounded buffer, which means the producer can fill it up and will need to wait() until there is room. The consumer needs to notify() the producer when something is removed from the buffer.

The pseudo-code is:

// producer thread—produces one datum
while( buffer_full )
     wait()
produce_data()
notify()

// consumer thread—consumes one datum
while( no_data )
     wait()
consume_the_data()
notify()

The reason we walked through this step-by-step is that it makes the following program easier to understand. If you didn't follow the previous pseudo-code, go back over the previous section again. This code directly implements the pseudo-code, and demonstrates the use of wait/notify in communicating mutually exclusive threads.

There are three classes that follow. The first is a class that contains a main driver program. It simply instantiates a producer thread and a consumer thread and lets them go at it.

public class showWaitNotify {
  public static void main(String args[]) {
     Producer p = new Producer();

     Consumer c = new Consumer(p);
     p.start();
     c.start();
  }
}

The second class is the Producer class. It implements the previous pseudo-code and demonstrates the use of wait/notify. It has two key methods: one that produces data (actually, it just reads an incrementing ID value) and stores it into an array. The other method, called consume(), will try to return successive values from this array. The value of this setup is that you can call produce() and consume() from separate threads: They won't overrun the array; they won't get something before it has been produced; they won't step on each other; and neither ever gets in a busy wait.

class Producer extends Thread {
    private String [] buffer = new String [8];
    private int pi = 0;  // produce index
    private int gi = 0;  // get index

    public void run() {
       // just keep producing
       for(;;) produce();
    }

    private int sequence=0;
    private final String id() {
        return "" + sequence++;
    }

    synchronized void produce() {
       // while there isn't room in the buffer
       while ( pi-gi+1 > buffer.length ) {
           try {wait();} catch(Exception e) {}
       }
       int index = pi % buffer.length;
       buffer[index] = id();
       System.out.println("produced["+ index +"] " + buffer[index]);
       pi++;
       notifyAll();
    }

    synchronized String consume(){
       // while there's nothing left to take from the buffer
       while (pi==gi) {
           try {wait();} catch(Exception e) {}
       }
       int index = gi++ % buffer.length;
       String result = buffer[index];
       notifyAll();
       return result;
    }
}

The produce() method puts a datum in the buffer, and consume() can be called from within a Consumer class to pull something out for the consumer.

Expressions like “pi % buffer.length” are a programming idiom to index an array as a circular buffer. It is a cheap way to let the pi and gi be used as subscripts, be incremented without limit, and also make sure that pi never gets more than one buffer's worth ahead of gi. We want the pi and gi values to count up serially 1 2 3 4 5 6 7 8 9 10 11 12, so we can make sure pi never gets more than 8 ahead of gi. We need the subscripts to count 1 2 3 4 5 6 7 0 1 2 3 “modulo the buffer size”. Plug in some actual values to see how this works.

Finally, the third class is another thread that will be the consumer in this example. It starts off with a common Java idiom: Another class is passed into the constructor, and all the constructor does is save a copy of this object for later use. This is the way that the consumer can call the consume() method of the producer.

class Consumer extends Thread {
   Producer whoIamTalkingTo;
   // java idiom for a constructor
   Consumer(Producer who) { whoIamTalkingTo = who; }

   public void run() {
      java.util.Random r = new java.util.Random();
      for(;;) {
          String result = whoIamTalkingTo.consume();
          System.out.println("consumed: "+result);
          // next line is just to make it run a bit slower.
          int randomtime = Math.abs(r.nextInt() % 250);
          try{sleep(randomtime);} catch(Exception e){}
      }
   }
}

The technique of passing an object into a constructor that saves the reference to it, for later communicating something back, is a common idiom known as a “callback.” We already saw this in the Interfaces chapter. Make sure you understand the previous example, to see how this is written and used.

The run method of this consumer simply repeats over and over again a get method, printing what it got, followed by a sleep for a random period. The sleep is just to give the producer some work to do in adapting to an asynchronous consumer.

When you compile these three classes together and try running them, you will see output similar to the following (not necessarily identical, because of asynchronicity):

% java plum
produced[0] 0
produced[1] 1
produced[2] 2
consumed: 0
produced[3] 3
produced[4] 4
produced[5] 5
produced[6] 6
produced[7] 7
produced[0] 8
consumed: 1
produced[1] 9
produced[2] 10
consumed: 2
produced[3] 11
consumed: 3
produced[4] 12
consumed: 4
produced[5] 13
consumed: 5
produced[6] 14

And so on.

Notice that the producer filled much of the buffer before the consumer ran at all. Then each time the slow consumer removed something from the buffer, the producer reused that now empty slot. And always the consumer got exactly what was stored there with no data race corruption. If this explanation of wait/notify seems complicated, your impression is correct. Programming threaded code is hard in the general case and these methods supply a specialized feature that makes one aspect of it a little easier.

The good news is that, for simple producer/consumer code, you don't have to bother with any of this! Two classes, PipedInputStream and PipedOutputStream, in the I/O library can be used in your code to support simple asynchronous communication between threads. We will look at this later.

The wait and notify methods are in the basic class Object, so they are shared by all objects in the system. There are several variants:

public final native void notify();
public final native void notifyAll();

public final void wait() throws InterruptedException;
public final void wait(long time, int nanos) throws InterruptedException;
public final native void wait(long timeout) throws InterruptedException;

The difference between notify() and notifyAll() is that notifyAll() wakes up all threads that are in the wait list of this object. That might be appropriate if they are all readers or they are all waiting for the same answer, or can all carry on once the data has been written. In the example, there is only one other thread, so notify() or notifyAll() will have the same effect. There are three forms of wait, including two that allow a wait to time out after the specified period of milliseconds or milliseconds and nanoseconds(!) have elapsed. Why have a separate wait list or wait set for each object, instead of just blocking waiting for the lock? Because the whole point of wait/notifying is to take the objects out of contention for systems resources until they are truly able to run! A notify notifies an arbitrary thread in the wait list. You can't rely on FIFO order.

Interrupting a thread

Now we have all that theory behind us, let's explain the minor point of why statements like this have an exception handler:

try {sleep(randomtime);} catch(Exception e){}
try {wait();} catch(Exception e) {}

It's quite easy. One thread can interrupt another sleeping thread by calling its interrupt() method. This will make the interrupted thread wake up and take the exception branch. The thread must be able to tell the difference between waking up because it has been “notified” and waking up because it has been “interrupted”. So the second case is detected by raising the exception InterruptedException in the thread. Statements like sleep(), join(), and wait() that are potentially prone to being interrupted in the middle need to catch this exception, or declare that their method can throw it.

The interrupt() method of Thread sets the interrupt state (a boolean flag) of a thread to “true.” A thread can query and clear its own interrupt state using the Thread.interrupted() method. (You call it twice to clear it!) You can query the interrupted state of any thread using isInterrupted(). The method interrupt() does not wake a thread that is waiting to acquire a synchronization lock.

Two further points: In this text, I generally use the parent type Exception, instead of the correct subtype, InterruptedException. This is to minimize the size of the lines in the example. Always catch the narrowest type of exception you can or you might catch more than you bargained for. And do something sensible with it, like print an error message. Finally, note that the interrupt() method was not implemented in the first major release of the JDK.

Synchronized code isn't a perfect solution, because a system can still get into deadlock. Deadlock or deadly embrace is the situation where there is a circular dependency among several threads between resources held and resources required. In its simplest form, Thread “A” holds lock “X” and needs lock “Y,” while Thread “B” holds lock “Y” and is waiting for lock “X” to become free. Result: That part of the system grinds to a halt. This can happen all too easily when one synchronized method calls another. The “volatile” keyword can also be applied to data. This informs the compiler that several threads might be accessing this simultaneously. The data therefore needs to be completely refreshed from memory (rather than a value that was read into a register three cycles ago) and completely stored back on each access.

Volatile is apparently intended for accessing objects like real-time clocks that sit on the memory bus. It is convenient to glue them on there, because they can be read and written with the usual “load” and “store” memory access instructions, instead of requiring a dedicated I/O port. They can return the current time via a single cycle read, but the value will change unpredictably due to actions outside the program. Volatile doesn't seem intended for general use on arbitrary objects in applications. The keyword isn't used anywhere in the current version of the run-time library.

Generalized thread programming is a discipline in its own right and one that is growing in importance with the use of multiprocessor systems and server systems.


[1] Well, OK, one of my colleagues pointed out a research paper that showed a brief busy loop followed by a wait/notify was superior to either used alone. Let's leave research papers out of this for now.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.109.8