Chapter 15. Breaking Out of a Blocked I/O State

IN THIS CHAPTER

Unfortunately, as of Java 1.2, most blocking I/O statements in the JDK still ignore interrupt requests. This presents a problem when you want to have a thread stop waiting on the I/O operation and do something else—typically to clean up and die. This problem generally arises in situations where data is read from a stream intermittently. A common case is a conversation between a client and server over a socket's streams. Typically in this scenario, a thread blocks for long periods of time on a stream waiting for another request to come across the line.

In this chapter, I'll present some techniques that you can use to get a thread to break out of a blocked I/O state. I'll begin by demonstrating the blocking problem with an example.

The read() Method Ignores Interrupts and Stop Requests

The read() methods of most subclasses of InputStream in the JDK ignore interrupts and don't even respond when the blocked thread is stopped. One notable exception is the PipedInputStream class, whose read() method does respond to interrupts by throwing an InterruptedIOException.

The class DefiantStream (see Listing 15.1) shows that when a thread is blocked waiting to read data from the keyboard, it does not respond to an interrupt or a stop request.

Example 15.1. DefiantStream.java—Demonstration of interrupt() and stop() Being Ignored

 1: import java.io.*;
 2:
 3: public class DefiantStream extends Object {
 4:     public static void main(String[] args) {
 5:         final InputStream in = System.in;
 6:
 7:         Runnable r = new Runnable() {
 8:                 public void run() {
 9:                     try {
10:                         System.err.println(
11:                                 "about to try to read from in");
12:                         in.read();
13:                         System.err.println("just read from in");
14:                     } catch ( InterruptedIOException iiox ) {
15:                         iiox.printStackTrace();
16:                     } catch ( IOException iox ) {
17:                         iox.printStackTrace();
18:                     //} catch ( InterruptedException ix ) {
19:                     //  InterruptedException is never thrown!
20:                     //  ix.printStackTrace();
21:                     } catch ( Exception x ) {
22:                         x.printStackTrace();
23:                     } finally {
24:                         Thread currThread =
25:                                 Thread.currentThread();
26:                         System.err.println("inside finally:
" +
27:                             "  currThread=" + currThread + "
" +
28:                             "  currThread.isAlive()=" +
29:                             currThread.isAlive());
30:                     }
31:                 }
32:             };
33: 
34:         Thread t = new Thread(r);
35:         t.start();
36:
37:         try { Thread.sleep(2000); } 
38:         catch ( InterruptedException x ) { }
39:
40:         System.err.println("about to interrupt thread");
41:         t.interrupt();
42:         System.err.println("just interrupted thread");
43:
44:         try { Thread.sleep(2000); } 
45:         catch ( InterruptedException x ) { }
46:
47:         System.err.println("about to stop thread");
48:         // stop() is being used here to show that the extreme
49:         // action of stopping a thread is also ineffective.
50:         // Because stop() is deprecated, the compiler issues
51:         // a warning.
52:         t.stop();
53:         System.err.println("just stopped thread, t.isAlive()=" +
54:                 t.isAlive());
55:
56:         try { Thread.sleep(2000); } 
57:         catch ( InterruptedException x ) { }
58:
59:         System.err.println("t.isAlive()=" + t.isAlive());
60:         System.err.println("leaving main()");
61:     }
62: }

The InputStream used in this example is System.in (line 5). It simply passes lines of input from the keyboard to the stream. Input is passed a line at a time from the keyboard to System.in so the Enter key must be pressed to send the characters typed. A new Runnable and a new Thread are created (lines 7–35) to block trying to read bytes from in (System.in). This new thread will block on read() until a line is typed in on the keyboard and the Enter key is pressed (line 12). If any of several types of Exception are thrown, a stack trace is printed (lines 14–22). Regardless of how the thread leaves the try block, it will jump into the finally block and print some information (lines 23–30).

After the new thread is spawned, it is given a chance (two seconds) to get to the read() by the main thread (lines 37–38). Next, the main thread interrupts the new thread (lines 40–42) to see if this has any effect. After another two seconds pass (lines 44–45), the main thread takes a harsher approach and tries to stop the new thread (lines 47–53). Just in case the thread takes a while to notice, the main thread sleeps for yet another two seconds (lines 56–57) before checking again (line 59).

Listing 15.2 shows the output produced when DefiantStream is run. A few seconds after the "leaving main()" message was printed, I typed the "INPUT FROM KEYBOARD" line to supply the blocked thread with some data.

Example 15.2. Output from DefiantStream

 1: about to try to read from in
 2: about to interrupt thread
 3: just interrupted thread
 4: about to stop thread
 5: just stopped thread, t.isAlive()=false
 6: t.isAlive()=false
 7: leaving main()
 8: --------->INPUT FROM KEYBOARD
 9: inside finally:
10:   currThread=Thread[Thread-0,5,main]
11:   currThread.isAlive()=false

The interrupt() was ignored (lines 2–3), and quite surprisingly, so was the stop() (lines 4–5). Notice that isAlive() returns false, but the thread remains blocked on the read()! Even after the main thread dies (line 7), the blocked thread continues to wait for input. After a few more seconds, I typed line 8 to supply data to the System.in stream. Sure enough, the read() was no longer blocked, and the dead thread printed the last three lines! The somewhat dead thread is able to print that it is not alive (line 11). The thread is somewhat dead in the sense that it returns false from isAlive(), but it continues to execute code! Note that read() probably threw an instance of ThreadDeath (a subclass of Error) after it got some data, so that only the finally block was executed as the thread shut down.

Note

The example shows the behavior as of JDK 1.2 on the Win32 platform. As of this writing, there are several bug reports/requests for enhancement (depending on your perspective) pending on these issues. Future releases of the JDK might change this behavior. You can always use DefiantStream to check to see how a particular JavaVM responds.

Closing a Stream to Break Out of the Blocked State

Although interrupt() and stop() are ignored by a thread blocked on read(), the thread can usually be unblocked by closing the stream with another thread.

The next series of examples presents three classes: CalcServer, CalcWorker, and CalcClient. These classes work with each other to demonstrate in a somewhat realistic client/server configuration the technique of closing a blocked stream with another thread.

CalcServer is started first and waits for socket connections on port 2001. When a connection is received, CalcServer creates a new CalcWorker to handle the detailed communications with the client. In a separate VM on the same machine, CalcClient is launched and creates a connection to CalcServer. CalcClient makes one request of the server, and then leaves the line open. The server (through CalcWorker) responds to the first request and then blocks on a socket read() waiting for an additional request—a request that never comes. After running for 15 seconds, the server shuts down and uses the stream closing technique to get the thread inside CalcWorker to break out of its blocked state.

Class CalcServer and Breaking Out of a Blocked accept()

The class CalcServer (see Listing 15.3) waits for socket connections from CalcClient. It passes the socket to a new instance of CalcWorker to manage the server side of the client session. When CalcServer is shut down, it uses another closing technique to break the internal thread out of its blocked state on the accept() method of ServerSocket.

Example 15.3. CalcServer.java—Listens for Connections from CalcClient

 1: import java.io.*;
 2: import java.net.*;
 3: import java.util.*;
 4:
 5: public class CalcServer extends Object {
 6:     private ServerSocket ss;
 7:     private List workerList;
 8:    
 9:     private Thread internalThread;
10:     private volatile boolean noStopRequested;
11:
12:     public CalcServer(int port) throws IOException {
13:         ss = new ServerSocket(port);
14:         workerList = new LinkedList();
15:
16:         noStopRequested = true;
17:         Runnable r = new Runnable() {
18:                 public void run() {
19:                     try {
20:                         runWork();
21:                     } catch ( Exception x ) {
22:                         // in case ANY exception slips through
23:                         x.printStackTrace();
24:                     }
25:                 }
26:             };
27:
28:         internalThread = new Thread(r);
29:         internalThread.start();
30:     }
31:
32:     private void runWork() {
33:         System.out.println(
34:                 "in CalcServer - ready to accept connections");
35: 
36:         while ( noStopRequested ) {
37:             try {
38:                 System.out.println(
39:                         "in CalcServer - about to block " +
40:                         "waiting for a new connection");
41:                 Socket sock = ss.accept();
42:                 System.out.println(
43:                     "in CalcServer - received new connection");
44:                 workerList.add(new CalcWorker(sock));
45:             } catch ( IOException iox ) {
46:                 if ( noStopRequested ) {
47:                     iox.printStackTrace();
48:                 }
49:             }
50:         }
51:
52:         // stop all the workers that were created
53:         System.out.println("in CalcServer - putting in a " +
54:                 "stop request to all the workers");
55:         Iterator iter = workerList.iterator();
56:         while ( iter.hasNext() ) {
57:             CalcWorker worker = (CalcWorker) iter.next();
58:             worker.stopRequest();
59:         }
60:
61:         System.out.println("in CalcServer - leaving runWork()");
62:     }
63:
64:     public void stopRequest() {
65:         System.out.println(
66:                 "in CalcServer - entering stopRequest()");
67:         noStopRequested = false;
68:         internalThread.interrupt();
69:
70:         if ( ss != null ) {
71:             try {
72:                 ss.close();
73:             } catch ( IOException x ) {
74:                 // ignore
75:             } finally {
76:                 ss = null;
77:             }
78:         }
79:     }
80: 
81:     public boolean isAlive() {
82:         return internalThread.isAlive();
83:     }
84:
85:     peublic static void main(String[] args) {
86:         int port = 2001;
87:
88:         try {
89:             CalcServer server = new CalcServer(port);
90:             Thread.sleep(15000);
91:             server.stopRequest();
92:         } catch ( IOException x ) {
93:             x.printStackTrace();
94:         } catch ( InterruptedException x ) {
95:             // ignore
96:         }
97:     }
98: }

CalcServer is a self-running object, and 15 seconds after it is created, its stopRequest() method is called (lines 89–91). CalcServer creates a ServerSocket to listen to port 2001 for client connections (line 13). In its runWork() method, the internal thread blocks on the accept() method of ServerSocket waiting for new socket connections (line 41). When a connection is received, the socket is passed to a new instance of CalcWorker, and a reference to this CalcWorker is added to workerList (line 44).

When stopRequest() is invoked, noStopRequested is set to false (line 67) and the internal thread is interrupted (line 68). Much like a blocked read(), the accept() method ignores interrupts. To get the accept() method to throw an exception, the ServerSocket is closed (lines 70–78). Back in runWork(), the internal thread jumps down to the catch block (lines 45–49) because of this IOException. A stack trace is printed only if a stop has not been requested (lines 46–48) and in this case, no trace is printed because the closing of the ServerSocket caused the exception. The internal thread continues on and invokes stopRequest() on each of the CalcWorker objects (lines 55–59).

Class CalcWorker and Breaking Out of a Blocked read()

The class CalcWorker (see Listing 15.4) handles the server-side portion of a client session. It creates streams off the socket and communicates with CalcClient over them. When it is shut down, it uses a closing technique to break the internal thread out of its blocked state on the read() method of InputStream.

Example 15.4. CalcWorker.java—The Server-Side Portion of a Client Session

 1: import java.io.*;
 2: import java.net.*;
 3:
 4: public class CalcWorker extends Object {
 5:     private InputStream sockIn;
 6:     private OutputStream sockOut;
 7:     private DataInputStream dataIn;
 8:     private DataOutputStream dataOut;
 9:    
10:     private Thread internalThread;
11:     private volatile boolean noStopRequested;
12:
13:     public CalcWorker(Socket sock) throws IOException {
14:         sockIn = sock.getInputStream();
15:         sockOut = sock.getOutputStream();
16:
17:         dataIn = new DataInputStream(
18:                 new BufferedInputStream(sockIn));
19:         dataOut = new DataOutputStream(
20:                 new BufferedOutputStream(sockOut));
21:
22:         noStopRequested = true;
23:         Runnable r = new Runnable() {
24:                 public void run() {
25:                     try {
26:                         runWork();
27:                     } catch ( Exception x ) {
28:                         // in case ANY exception slips through
29:                         x.printStackTrace();
30:                     }
31:                 }
32:             };
33:
34:         internalThread = new Thread(r);
35:         internalThread.start();
36:     }
37: 
38:     private void runWork() {
39:         while ( noStopRequested ) {
40:             try {
41:                 System.out.println("in CalcWorker - about to " +
42:                         "block waiting to read a double");
43:                 double val = dataIn.readDouble();
44:                 System.out.println(
45:                         "in CalcWorker - read a double!");
46:                 dataOut.writeDouble(Math.sqrt(val));
47:                 dataOut.flush();
48:             } catch ( IOException x ) {
49:                 if ( noStopRequested ) {
50:                     x.printStackTrace();
51:                     stopRequest();
52:                 }
53:             }
54:         }
55:
56:         // In real-world code, be sure to close other streams and
57:         // the socket as part of the clean-up. Omitted here for
58:         // brevity.
59:
60:         System.out.println("in CalcWorker - leaving runWork()");
61:     }
62:
63:     public void stopRequest() {
64:         System.out.println(
65:                 "in CalcWorker - entering stopRequest()");
66:         noStopRequested = false;
67:         internalThread.interrupt();
68:
69:         if ( sockIn != null ) {
70:             try {
71:                 sockIn.close();
72:             } catch ( IOException iox ) {
73:                 // ignore
74:             } finally {
75:                 sockIn = null;
76:             }
77:         }
78:
79:         System.out.println(
80:                 "in CalcWorker - leaving stopRequest()");
81:     }
82:
83:     public boolean isAlive() {
84:         return internalThread.isAlive();
85:     }
86: }

CalcWorker is a self-running object that spends most of its time with its internal thread blocked waiting to read a new request from the client. It keeps a reference to the raw InputStream retrieved from the socket for use in closing (line 14). It then wraps the raw streams with DataInputStream and DataOutputStream to get the desired functionality (lines 17–20).

Tip

When using the closing technique to try to get a blocked read() to unblock, always attempt to close the rawest stream possible. In this example, the rawest InputStream is the one returned by the getInputStream() method on Socket. This maximizes the chances that the stream will throw an exception.

Inside runWork(), the internal thread blocks waiting to read a double from the client (line 43). When the thread reads a double, it calculates the mathematical square root of the double and sends the resulting double back down to the client (lines 46–47). The thread then goes back to its blocked state waiting to read more data from the client.

When stopRequest() is invoked, noStopRequested is set to false (line 66) and the internal thread is interrupted (line 67). A blocked read() does not respond to an interrupt, so the socket's input stream is closed (lines 69–77) causing the blocked read() to throw an IOException. Back up in runWork(), the internal thread jumps down to the catch block (lines 48–53). A stack trace is printed only if a stop has not been requested (lines 49–52), and in this case, no trace is printed because the closing of the stream caused the exception.

Caution

Invoking close() on an InputStream can sometimes block. This can occur if both read() and close() are synchronized. The thread blocked on the read() has the instance lock. Meanwhile, another thread is trying to invoke close() and blocks trying to get exclusive access to the lock. As of JDK 1.2, BufferedInputStream has added the synchronized modifier to its close() method (it wasn't present in 1.0 and 1.1). This is not indicated in the Javadoc, but is shown in the source code and through reflection. If this is the case, the closing technique will not work unless done on the underlying stream.

Class CalcClient

The class CalcClient (see Listing 15.5) is run in a different VM than CalcServer, but on the same machine. It creates a connection to the server and makes only one request. After that, it keeps the connection open, but does no further communication.

Example 15.5. CalcClient.java—Code to Test CalcWorker

 1: import java.io.*;
 2: import java.net.*;
 3:
 4: public class CalcClient extends Object {
 5:     public static void main(String[] args) {
 6:         String hostname = "localhost";
 7:         int port = 2001;
 8:
 9:         try {
10:             Socket sock = new Socket(hostname, port);
11:
12:             DataInputStream in = new DataInputStream(
13:                 new BufferedInputStream(sock.getInputStream()));
14:             DataOutputStream out = new DataOutputStream(
15:                 new BufferedOutputStream(sock.getOutputStream()));
16:
17:             double val = 4.0;
18:             out.writeDouble(val);
19:             out.flush();
20:
21:             double sqrt = in.readDouble();
22:             System.out.println("sent up " + val +
                              ", got back " + sqrt);
23:
24:             // Don't ever send another request, but stay alive in
25:             // this eternally blocked state.
26:             Object lock = new Object();
27:             while ( true ) {
28:                 synchronized ( lock ) {
29:                     lock.wait();
30:                 }
31:             }
32:         } catch ( Exception x ) {
33:             x.printStackTrace();
34:         }
35:     }
36: }

CalcClient is crudely written to simply show the minimal communication necessary. It creates a socket connection to the server, extracts the data streams, and wraps them to get a DataInputStream and a DataOutputStream (lines 10–15). Then it writes a double over to the server and waits for a different double to be returned (lines 17–21). After that, CalcClient keeps the socket connection up, but does not communicate with the server any more. Instead, it goes into an infinite wait state (lines 26–31).

Output When Run

When CalcClient is run, it produces one line of output and waits to be killed:

sent up 4.0, got back 2.0

On the server side, CalcServer and CalcWorker produce the output shown in Listing 15.6. Notice that when the stop request comes in, no exceptions are printed and the worker and server shut down in an orderly manner.

Example 15.6. Output from CalcServer and CalcWorker

 1: in CalcServer - ready to accept connections
 2: in CalcServer - about to block waiting for a new connection
 3: in CalcServer - received new connection
 4: in CalcServer - about to block waiting for a new connection
 5: in CalcWorker - about to block waiting to read a double
 6: in CalcWorker - read a double!
 7: in CalcWorker - about to block waiting to read a double
 8: in CalcServer - entering stopRequest()
 9: in CalcServer - putting in a stop request to all the workers
10: in CalcWorker - entering stopRequest()
11: in CalcWorker - leaving stopRequest()
12: in CalcServer - leaving runWork()
13: in CalcWorker - leaving runWork()

Throwing InterruptedIOException When Interrupted

The read() method on PipedInputStream will throw a subclass of IOException called InterruptedIOException if the blocked thread is interrupted while waiting for bytes to arrive. This is very useful functionality that I would like to see implemented across the whole java.io package. Until that happens, other techniques have to be used to get out of the blocked state. The next example illustrates such a technique.

Class ThreadedInputStream

The class ThreadedInputStream (see Listing 15.7) is a subclass of FilterInputStream and responds to interrupts by throwing an InterruptedIOException. It uses an internal thread to read from the underlying stream and loads the bytes into a ByteFIFO (discussed in Chapter 18, "First-In-First-Out (FIFO) Queue"). This read-ahead mechanism can help speed performance but does carry the cost of an extra thread running in the VM. Although very useful, instances of ThreadedInputStream are not particularly lightweight and should be used sparingly in an application.

Example 15.7. ThreadedInputStream.java—Interruptible read() Capability

  1: import java.io.*;
  2:
  3: // uses SureStop from chapter 16
  4: // uses ByteFIFO from chapter 18
  5:
  6: public class ThreadedInputStream extends FilterInputStream {
  7:     private ByteFIFO buffer;
  8:
  9:     private volatile boolean closeRequested;
 10:     private volatile boolean eofDetected;
 11:     private volatile boolean ioxDetected;
 12:     private volatile String ioxMessage;
 13:
 14:     private Thread internalThread;
 15:     private volatile boolean noStopRequested;
 16:
 17:     public ThreadedInputStream(InputStream in, int bufferSize) {
 18:         super(in);
 19:
 20:         buffer = new ByteFIFO(bufferSize);
 21:
 22:         closeRequested = false;
 23:         eofDetected = false;
 24:         ioxDetected = false;
 25:         ioxMessage = null;
 26:
 27:         noStopRequested = true;
 28:         Runnable r = new Runnable() {
 29:                 public void run() {
 30:                     try {
 31:                         runWork();
 32:                     } catch ( Exception x ) {
 33:                         // in case ANY exception slips through
 34:                         x.printStackTrace();
 35:                     }
 36:                 }
 37:             };
 38: 
 39:         internalThread = new Thread(r);
 40:         internalThread.setDaemon(true);
 41:         internalThread.start();
 42:     }
 43:
 44:     public ThreadedInputStream(InputStream in) {
 45:         this(in, 2048);
 46:     }
 47:
 48:     private void runWork() {
 49:         byte[] workBuf = new byte[buffer.getCapacity()];
 50:
 51:         try {
 52:             while ( noStopRequested ) {
 53:                 int readCount = in.read(workBuf);
 54:
 55:                 if ( readCount == -1 ) {
 56:                     signalEOF();
 57:                     stopRequest();
 58:                 } else if ( readCount > 0 ) {
 59:                     addToBuffer(workBuf, readCount);
 60:                 }
 61:             }
 62:         } catch ( IOException iox ) {
 63:             if ( !closeRequested ) {
 64:                 ioxMessage = iox.getMessage();
 65:                 signalIOX();
 66:             }
 67:         } catch ( InterruptedException x ) {
 68:             // ignore
 69:         } finally {
 70:             // no matter what, make sure that eofDetected is set
 71:             signalEOF();
 72:         }
 73:     }
 74:
 75:     private void signalEOF() {
 76:         synchronized ( buffer ) {
 77:             eofDetected = true;
 78:             buffer.notifyAll();
 79:         }
 80:     }
 81:
 82:     private void signalIOX() {
 83:         synchronized ( buffer ) {
 84:             ioxDetected = true;
 85:             buffer.notifyAll();
 86:         }
 87:     }
 88:    
 89:     private void signalClose() {
 90:         synchronized ( buffer ) {
 91:             closeRequested = true;
 92:             buffer.notifyAll();
 93:         }
 94:     }
 95:
 96:     private void addToBuffer(byte[] workBuf, int readCount)
 97:             throws InterruptedException {
 98:
 99:         // Create an array exactly as large as the number of
100:         // bytes read and copy the data into it.
101:         byte[] addBuf = new byte[readCount];
102:         System.arraycopy(workBuf, 0, addBuf, 0, addBuf.length);
103:
104:         buffer.add(addBuf);
105:     }
106:
107:     private void stopRequest() {
108:         if ( noStopRequested ) {
109:             noStopRequested = false;
110:             internalThread.interrupt();
111:         }
112:     }
113:
114:     public void close() throws IOException {
115:         if ( closeRequested ) {
116:             // already closeRequested, just return
117:             return;
118:         }
119:         signalClose();
120:
121:         SureStop.ensureStop(internalThread, 10000);
122:         stopRequest();
123: 
124:         // Use a new thread to close "in" in case it blocks
125:         final InputStream localIn = in;
126:         Runnable r = new Runnable() {
127:                 public void run() {
128:                     try {
129:                         localIn.close();
130:                     } catch ( IOException iox ) {
131:                         // ignore
132:                     }
133:                 }
134:             };
135:
136:         Thread t = new Thread(r, "in-close");
137:         // give up when all other non-daemon threads die
138:         t.setDaemon(true); 
139:         t.start();
140:     }
141:
142:     private void throwExceptionIfClosed() throws IOException {
143:         if ( closeRequested ) {
144:             throw new IOException("stream is closed");
145:         }
146:     }
147:
148:     // Throws InterruptedIOException if the thread blocked on
149:     // read() is interrupted while waiting for data to arrive.
150:     public int read()
151:             throws InterruptedIOException, IOException {
152:
153:         // Using read(byte[]) to keep code in one place —makes
154:         // single-byte read less efficient, but simplifies
155:         // the coding.
156:         byte[] data = new byte[1];
157:         int ret = read(data, 0, 1);
158:
159:         if ( ret != 1 ) {
160:             return -1;
161:         }
162:
163:         return data[0] & 0x000000FF;
164:     }
165: 
166:     // Throws InterruptedIOException if the thread blocked on
167:     // read() is interrupted while waiting for data to arrive.
168:     public int read(byte[] dest)
169:             throws InterruptedIOException, IOException {
170:
171:         return read(dest, 0, dest.length);
172:     }
173:
174:     // Throws InterruptedIOException if the thread blocked on
175:     // read() is interrupted while waiting for data to arrive.
176:     public int read(
177:                 byte[] dest,
178:                 int offset,
179:                 int length
180:             ) throws InterruptedIOException, IOException {
181:
182:         throwExceptionIfClosed();
183:
184:         if ( length < 1 ) {
185:             return 0;
186:         }
187:
188:         if ( ( offset < 0 ) ||
189:              ( ( offset + length ) > dest.length )
190:            ) {
191:
192:             throw new IllegalArgumentException(
193:                 "offset must be at least 0, and " +
194:                 "(offset + length) must be less than or " +
195:                 "equal to dest.length. " +
196:                 "offset=" + offset +
197:                 ", (offset + length )=" + ( offset + length ) +
198:                 ", dest.length=" + dest.length);
199:         }
200:
201:         byte[] data = removeUpTo(length);
202: 
203:         if ( data.length > 0 ) {
204:             System.arraycopy(data, 0, dest, offset, data.length);
205:             return data.length;
206:         }
207:
208:         // no data
209:         if ( eofDetected ) {
210:             return -1;
211:         }
212:
213:         // no data and not end of file, must be exception
214:         stopRequest();
215:
216:         if ( ioxMessage == null ) {
217:             ioxMessage = "stream cannot be read";
218:         }
219:
220:         throw new IOException(ioxMessage);
221:     }
222:
223:     private byte[] removeUpTo(int maxRead) throws IOException {
224:         // Convenience method to assist read(byte[], int, int).
225:         // Waits until at least one byte is ready, EOF is
226:         // detected,  an IOException is thrown, or the
227:         // stream is closed.
228:         try {
229:             synchronized ( buffer ) {
230:                 while ( buffer.isEmpty() &&
231:                         !eofDetected &&
232:                         !ioxDetected &&
233:                         !closeRequested
234:                       ) {
235:    
236:                     buffer.wait();
237:                 }
238:    
239:                 // If stream was closed while waiting,
240:                 // get out right away.
241:                 throwExceptionIfClosed();
242: 
243:                 // Ignore eof and exception flags for now, see
244:                 // if any data remains.
245:                 byte[] data = buffer.removeAll();
246:    
247:                 if ( data.length > maxRead ) {
248:                     // Pulled out too many bytes,
249:                     // put excess back.
250:                     byte[] putBackData =
251:                             new byte[data.length - maxRead];
252:                     System.arraycopy(data, maxRead,
253:                             putBackData, 0, putBackData.length);
254:                     buffer.add(putBackData);
255:    
256:                     byte[] keepData = new byte[maxRead];
257:                     System.arraycopy(data, 0,
258:                             keepData, 0, keepData.length);
259:                     data = keepData;
260:                 }
261:    
262:                 return data;
263:             }
264:         } catch ( InterruptedException ix ) {
265:             // convert to an IOException
266:             throw new InterruptedIOException("interrupted " +
267:                 "while waiting for data to arrive for reading");
268:         }
269:     }
270:
271:     public long skip(long n) throws IOException {
272:         throwExceptionIfClosed();
273:
274:         if ( n <= 0 ) {
275:             return 0;
276:         }
277:
278:         int skipLen = (int) Math.min(n, Integer.MAX_VALUE);
279:         int readCount = read(new byte[skipLen]);
280:
281:         if ( readCount < 0 ) {
282:             return 0;
283:         }
284:
285:         return readCount;
286:     }
287: 
288:     public int available() throws IOException {
289:         throwExceptionIfClosed();
290:         return buffer.getSize();
291:     }
292:
293:     public boolean markSupported() {
294:         return false;
295:     }
296:
297:     public synchronized void mark(int readLimit) {
298:         // ignore method calls, mark not supported
299:     }
300:
301:     public synchronized void reset() throws IOException {
302:         throw new IOException(
303:                 "mark-reset not supported on this stream");
304:     }
305: }

ThreadedInputStream extends FilterInputStream (line 6) and passes the InputStream handed to the constructor up the constructor for the superclass (line 18). The superclass has a protected member variable called in that holds a reference to the underlying InputStream; this reference is used throughout ThreadedInputStream.

The internal thread basically just reads as much data as it can and loads it into the ByteFIFO buffer (lines 52–61). If the buffer is full, the internal thread blocks waiting for some data to be read out of the buffer (line 104). When the internal thread gets to the end of the file (EOF), it sets a flag (lines 75–80). If the internal thread encounters an IOException while reading into the buffer, it sets a flag (lines 82–87). The ByteFIFO is used for all of the wait-notify signaling.

When an external thread comes in to read some data, it gets the data from the ByteFIFO buffer (lines 201–220, 223–269). The external thread pays attention to the EOF and exception flags only if there is no more data in the buffer. This delays the reporting until the external thread catches up to the internal thread. If the external thread is blocked waiting for some data to arrive (line 236) and is then interrupted, it will jump to the catch block (lines 264–268). There, the InterruptedException is caught, and a new InterruptedIOException is thrown in its place (lines 266–267). This means that a thread blocked on a read() will now respond to interrupts!

The close() method (lines 114–140) is used to shut down the internal thread (notice that in this class, stopRequest() is private). The close() method can be safely invoked more than once because it simply ignores subsequent requests. It starts by signaling that a close has been requested (line 119), which will cause any blocked read() calls on ThreadedInputStream to throw an IOException (line 241). It then uses SureStop to make sure that even if all else fails, the internal thread will be stopped in 10 seconds (line 121). Inside close(), a new thread is created to invoke close() on the underlying stream (lines 125–139). This step is necessary in case the call blocks for quite a while—or even forever if a deadlock scenario occurs. ThreadedInputStream can't control what kind of InputStream it is passed in its constructor, so this extra thread is just added insurance.

Class BufferedThreadedInputStream

The class ThreadedInputStream just focuses on the task of splitting up the transfer of data between two threads. It performs poorly on single-byte reads and does not have any support for the mark-reset mechanism. BufferedThreadedInputStream (see Listing 15.8) makes up for these shortcomings by using a ThreadedInputStream with a BufferedInputStream added on both ends to smooth data flow.

Example 15.8. BufferedThreadedInputStream.java—ThreadedInputStream with Buffering

 1: import java.io.*;
 2:
 3: // uses ThreadedInputStream
 4:
 5: public class BufferedThreadedInputStream
 6:         extends FilterInputStream {
 7:
 8:     // fixed class that does *not* have a synchronized close()
 9:     private static class BISFix extends BufferedInputStream {
10:         public BISFix(InputStream rawIn, int buffSize) {
11:             super(rawIn, buffSize);
12:         }
13:
14:         public void close() throws IOException {
15:             if ( in != null ) {
16:                 try {
17:                     in.close();
18:                 } finally {
19:                     in = null;
20:                 }
21:             }
22:         }
23:     }
24:
25:     public BufferedThreadedInputStream(
26:                 InputStream rawIn,
27:                 int bufferSize
28:             ) {
29:
30:         super(rawIn); // super-class'"in" is set below
31:
32:         // rawIn -> BufferedIS -> ThreadedIS ->
33:         //       BufferedIS -> read()
34: 
35:         BISFix bis = new BISFix(rawIn, bufferSize);
36:         ThreadedInputStream tis =
37:                 new ThreadedInputStream(bis, bufferSize);
38:
39:         // Change the protected variable 'in'from the
40:         // superclass from rawIn to the correct stream.
41:         in = new BISFix(tis, bufferSize);
42:     }
43:
44:     public BufferedThreadedInputStream(InputStream rawIn) {
45:         this(rawIn, 2048);
46:     }
47:
48:     // Overridden to show that InterruptedIOException might
49:     // be thrown.
50:     public int read()
51:             throws InterruptedIOException, IOException {
52:
53:         return in.read();
54:     }
55:
56:     // Overridden to show that InterruptedIOException might
57:     // be thrown.
58:     public int read(byte[] b)
59:             throws InterruptedIOException, IOException {
60:
61:         return in.read(b);
62:     }
63:
64:     // Overridden to show that InterruptedIOException might
65:     // be thrown.
66:     public int read(byte[] b, int off, int len)
67:             throws InterruptedIOException, IOException {
68:
69:         return in.read(b, off, len);
70:     }
71:
72:     // Overridden to show that InterruptedIOException might
73:     // be thrown.
74:     public long skip(long n)
75:             throws InterruptedIOException, IOException {
76:
77:         return in.skip(n);
78:     }
79:
80:     // The remainder of the methods are directly inherited from
81:     // FilterInputStream and access "in" in the much the same
82:     // way as the methods above do.
83: }

BufferedThreadedInputStream has a nested class (lines 8–23) called BISFix that simply extends BufferedInputStream and overrides close() so that it is not synchronized. This is a critical difference that is needed so that close() can be executed while another thread is blocked inside read().

In the constructor (lines 25–42), the raw input stream is wrapped in a BISFix (modified BufferedInputStream), which is wrapped in a ThreadedInputStream, which is wrapped in another BISFix. This provides buffering for both the internal thread and any external thread that does some reading.

Using BufferedThreadedInputStream for Interruptible I/O

Now it's time to combine the techniques presented so far. CalcServerTwo (see Listing 15.9) has been slightly modified to work with CalcWorkerTwo. CalcWorkerTwo (see Listing 15.10) now uses a BufferedThreadedInputStream as an inline filter so that when it is blocked trying to read(), it will respond to an interrupt.

Example 15.9. CalcServerTwo.java—Modified to Work with CalcWorkerTwo

 1: import java.io.*;
 2: import java.net.*;
 3: import java.util.*;
 4:
 5: public class CalcServerTwo extends Object {
 6:     private ServerSocket ss;
 7:     private List workerList;
 8:    
 9:     private Thread internalThread;
10:     private volatile boolean noStopRequested;
11:
12:     public CalcServerTwo(int port) throws IOException {
13:         ss = new ServerSocket(port);
14:         workerList = new LinkedList();
15:
16:         noStopRequested = true;
17:         Runnable r = new Runnable() {
18:                 public void run() {
19:                     try {
20:                         runWork();
21:                     } catch ( Exception x ) {
22:                         // in case ANY exception slips through
23:                         x.printStackTrace();
24:                     }
25:                 }
26:             };
27:
28:         internalThread = new Thread(r);
29:         internalThread.start();
30:     }
31:
32:     private void runWork() {
33:         System.out.println(
34:                 "in CalcServer - ready to accept connections");
35: 
36:         while ( noStopRequested ) {
37:             try {
38:                 System.out.println(
39:                         "in CalcServer - about to block " +
40:                         "waiting for a new connection");
41:                 Socket sock = ss.accept();
42:                 System.out.println(
43:                     "in CalcServer - received new connection");
44:                 workerList.add(new CalcWorkerTwo(sock));
45:             } catch ( IOException iox ) {
46:                 if ( noStopRequested ) {
47:                     iox.printStackTrace();
48:                 }
49:             }
50:         }
51:
52:         // stop all the workers that were created
53:         System.out.println("in CalcServer - putting in a " +
54:                 "stop request to all the workers");
55:         Iterator iter = workerList.iterator();
56:         while ( iter.hasNext() ) {
57:             CalcWorkerTwo worker = (CalcWorkerTwo) iter.next();
58:             worker.stopRequest();
59:         }
60:
61:         System.out.println("in CalcServer - leaving runWork()");
62:     }
63:
64:     public void stopRequest() {
65:         System.out.println(
66:                 "in CalcServer - entering stopRequest()");
67:         noStopRequested = false;
68:         internalThread.interrupt();
69:
70:         if ( ss != null ) {
71:             try {
72:                 ss.close();
73:             } catch ( IOException x ) {
74:                 // ignore
75:             } finally {
76:                 ss = null;
77:             }
78:         }
79:     }
80:
81:     public boolean isAlive() {
82:         return internalThread.isAlive();
83:     }
84: 
85:     public static void main(String[] args) {
86:         int port = 2001;
87:
88:         try {
89:             CalcServerTwo server = new CalcServerTwo(port);
90:             Thread.sleep(15000);
91:             server.stopRequest();
92:         } catch ( IOException x ) {
93:             x.printStackTrace();
94:         } catch ( InterruptedException x ) {
95:             // ignore
96:         }
97:     }
98: }

Example 15.10. CalcWorkerTwo.java—Using BufferedThreadedInputStream

 1: import java.io.*;
 2: import java.net.*;
 3:
 4: public class CalcWorkerTwo extends Object {
 5:     private DataInputStream dataIn;
 6:     private DataOutputStream dataOut;
 7:   
 8:     private Thread internalThread;
 9:     private volatile boolean noStopRequested;
10:
11:     public CalcWorkerTwo(Socket sock) throws IOException {
12:         dataIn = new DataInputStream(
13:             new BufferedThreadedInputStream(
14:                 sock.getInputStream()));
15:         dataOut = new DataOutputStream(
16:             new BufferedOutputStream(
17:                 sock.getOutputStream()));
18:
19:         noStopRequested = true;
20:         Runnable r = new Runnable() {
21:                 public void run() {
22:                     try {
23:                         runWork();
24:                     } catch ( Exception x ) {
25:                         // in case ANY exception slips through
26:                         x.printStackTrace();
27:                     }
28:                 }
29:             };
30:
31:         internalThread = new Thread(r);
32:         internalThread.start();
33:     }
34: 
35:     private void runWork() {
36:         while ( noStopRequested ) {
37:             try {
38:                 System.out.println("in CalcWorker - about to " +
39:                         "block waiting to read a double");
40:                 double val = dataIn.readDouble();
41:                 System.out.println(
42:                         "in CalcWorker - read a double!");
43:                 dataOut.writeDouble(Math.sqrt(val));
44:                 dataOut.flush();
45:             } catch ( InterruptedIOException iiox ) {
46:                 System.out.println("in CalcWorker - blocked " +
47:                         "read was interrupted!!!");
48:             } catch ( IOException x ) {
49:                 if ( noStopRequested ) {
50:                     x.printStackTrace();
51:                     stopRequest();
52:                 }
53:             }
54:         }
55:
56:         // In real-world code, be sure to close other streams
57:         // and the socket as part of the clean-up. Omitted here
58:         // for brevity.
59:
60:         System.out.println("in CalcWorker - leaving runWork()");
61:     }
62:
63:     public void stopRequest() {
64:         System.out.println(
65:                 "in CalcWorker - entering stopRequest()");
66:         noStopRequested = false;
67:         internalThread.interrupt();
68:         System.out.println(
69:                 "in CalcWorker - leaving stopRequest()");
70:     }
71:
72:     public boolean isAlive() {
73:         return internalThread.isAlive();
74:     }
75: }

CalcWorkerTwo has been modified to take advantage of the interruptible I/O of BufferedThreadedInputStream (line 13). The InterruptedIOException has been caught in this code simply to print a message (lines 45–47) but can be ignored. The stopRequest() method (lines 63–70) has been simplified back down to the self-running object template—only an interrupt() is necessary to unblock a read().

The same CalcClient code can be used with the new CalcServerTwo and CalcWorkerTwo. Listing 15.11 shows possible output from CalcServerTwo. Your output should match very closely with only a message or two swapped.

Example 15.11. Possible Output from CalcServerTwo

 1: in CalcServer - ready to accept connections
 2: in CalcServer - about to block waiting for a new connection
 3: in CalcServer - received new connection
 4: in CalcServer - about to block waiting for a new connection
 5: in CalcWorker - about to block waiting to read a double
 6: in CalcWorker - read a double!
 7: in CalcWorker - about to block waiting to read a double
 8: in CalcServer - entering stopRequest()
 9: in CalcServer - putting in a stop request to all the workers
10: in CalcWorker - entering stopRequest()
11: in CalcWorker - leaving stopRequest()
12: in CalcWorker - blocked read was interrupted!!!
13: in CalcServer - leaving runWork()
14: in CalcWorker - leaving runWork()

The main difference in the output from what was seen before is line 12. Here, a message is printed confirming that an InterruptedIOException was thrown when the blocked thread was interrupted.

Summary

Blocked I/O statements can be troublesome to deal with, but this chapter offered a few techniques to avoid this difficulty. A blocked read() on an InputStream ignores interrupt() and stop(), but generally will throw an exception if its stream is closed by another thread. A blocked accept() on a ServerSocket also ignores interrupts, but will throw an exception if the ServerSocket is closed by another thread.

ThreadedInputStream and BufferedThreadedInputStream provide a mechanism that allows a blocked read() to be interrupted and throw an InterruptedIOException. Although this technique comes with the overhead of another thread, it can be very useful by providing interruptible I/O.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.177.151