Piped I/O for Threads

A pipe is an easy way to move data between two threads. One thread writes into the pipe, and the other reads from it. This forms a producer/consumer buffer, ready-programmed for you! There are two stream classes that we always use together in a matched consumer/producer pair:

  • PipedInputStream –. Gets bytes from a pipe (think “hosepipe”; it's just a data structure that squirts bytes at you).

  • PipedOutputStream –. Puts bytes into a pipe (think “drainpipe”; it's just a data structure that drinks down bytes that you pour into it).

An object of one of these classes is connected to an object of the other class, providing a safe way (without data race conditions) for one thread to send a stream of data to another thread.

As an example of the use of piped streams, the following program reimplements the Producer/Consumer problem, but uses piped streams instead of wait/notify. If you compare, you'll see that this is considerably simpler. There is no visible shared buffer—the pipe stream between the two threads encapsulates it.

The next example shows one thread sending primitive types to another. You can also send Strings using the classes PipedWriter and PipedReader. These four Piped classes are part of the java.io package.

Their use should be clear from the examples here, and it is fully explained in the two chapters on I/O.

import java.io.*;
public class expipes {

  public static void main(String args[]) {
     Producer p = new Producer();

     Consumer c = new Consumer(p);
     c.start();
     p.start();
  }
}


///// This class writes into the pipe until it is full, at which
///// point it is blocked until the consumer takes something out.

class Producer extends Thread {
    protected PipedOutputStream po = new PipedOutputStream();
    private DataOutputStream dos = new DataOutputStream(po);

    public void run() {
       // just keep producing numbers
       for(;;) produce();
    }

    private int sequence=0;
    private final int id() {
        return sequence++;
    }

    void produce() {
       long t = id();
       System.out.println("produced " + t);
       try {dos.writeLong( t );}
       catch (IOException ie) { System.out.println(ie); }
    }

}



///// This class consumes everything sent over the pipe.
///// The pipe does the synchronization. When the pipe is full,
///// this thread's read from the pipe is blocked.
class Consumer extends Thread {
   private PipedInputStream pip;
   private DataInputStream d;

   // java constructor idiom, save argument.
   Consumer(Producer who) {
      try {
          pip = new PipedInputStream(who.po);
          d = new DataInputStream( pip );
      } catch (IOException ie) {
          System.out.println(ie);
      }
   }

    long get(){
       long i=0;
       try {   i= d.readLong();  // read from pipe.
       } catch (IOException ie) {System.out.println(ie);}
       return i;
    }

   public void run() {
      java.util.Random r = new java.util.Random();
      for(;;) {
          long result = get();
          System.out.println("consumed: "+result);
          // next lines are just to make things asynchronous
          int randomtime = r.nextInt() % 1250;
          try{sleep(randomtime);} catch(Exception e){}
      }
   }
}

The output of this program is a list of serially increasing numbers. The numbers are passed in a buffer in a thread-safe way from the producer thread to the consumer thread. The piped streams allow for real simplification in interthread communication.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.134.133