IN THIS CHAPTER
Unfortunately, as of Java 1.2, most blocking I/O statements in the JDK still ignore interrupt requests. This presents a problem when you want to have a thread stop waiting on the I/O operation and do something else—typically to clean up and die. This problem generally arises in situations where data is read from a stream intermittently. A common case is a conversation between a client and server over a socket's streams. Typically in this scenario, a thread blocks for long periods of time on a stream waiting for another request to come across the line.
In this chapter, I'll present some techniques that you can use to get a thread to break out of a blocked I/O state. I'll begin by demonstrating the blocking problem with an example.
The read()
methods of most subclasses of InputStream
in the JDK ignore interrupts and don't even respond when the blocked thread is stopped. One notable exception is the PipedInputStream
class, whose read()
method does respond to interrupts by throwing an InterruptedIOException.
The class DefiantStream
(see Listing 15.1) shows that when a thread is blocked waiting to read data from the keyboard, it does not respond to an interrupt or a stop request.
Example 15.1. DefiantStream.java—Demonstration of interrupt() and stop() Being Ignored
1: import java.io.*; 2: 3: public class DefiantStream extends Object { 4: public static void main(String[] args) { 5: final InputStream in = System.in; 6: 7: Runnable r = new Runnable() { 8: public void run() { 9: try { 10: System.err.println( 11: "about to try to read from in"); 12: in.read(); 13: System.err.println("just read from in"); 14: } catch ( InterruptedIOException iiox ) { 15: iiox.printStackTrace(); 16: } catch ( IOException iox ) { 17: iox.printStackTrace(); 18: //} catch ( InterruptedException ix ) { 19: // InterruptedException is never thrown! 20: // ix.printStackTrace(); 21: } catch ( Exception x ) { 22: x.printStackTrace(); 23: } finally { 24: Thread currThread = 25: Thread.currentThread(); 26: System.err.println("inside finally: " + 27: " currThread=" + currThread + " " + 28: " currThread.isAlive()=" + 29: currThread.isAlive()); 30: } 31: } 32: }; 33: 34: Thread t = new Thread(r); 35: t.start(); 36: 37: try { Thread.sleep(2000); } 38: catch ( InterruptedException x ) { } 39: 40: System.err.println("about to interrupt thread"); 41: t.interrupt(); 42: System.err.println("just interrupted thread"); 43: 44: try { Thread.sleep(2000); } 45: catch ( InterruptedException x ) { } 46: 47: System.err.println("about to stop thread"); 48: // stop() is being used here to show that the extreme 49: // action of stopping a thread is also ineffective. 50: // Because stop() is deprecated, the compiler issues 51: // a warning. 52: t.stop(); 53: System.err.println("just stopped thread, t.isAlive()=" + 54: t.isAlive()); 55: 56: try { Thread.sleep(2000); } 57: catch ( InterruptedException x ) { } 58: 59: System.err.println("t.isAlive()=" + t.isAlive()); 60: System.err.println("leaving main()"); 61: } 62: }
The InputStream
used in this example is System.in
(line 5). It simply passes lines of input from the keyboard to the stream. Input is passed a line at a time from the keyboard to System.in
so the Enter key must be pressed to send the characters typed. A new Runnable
and a new Thread
are created (lines 7–35) to block trying to read bytes from in
(System.in
). This new thread will block on read()
until a line is typed in on the keyboard and the Enter key is pressed (line 12). If any of several types of Exception
are thrown, a stack trace is printed (lines 14–22). Regardless of how the thread leaves the try
block, it will jump into the finally
block and print some information (lines 23–30).
After the new thread is spawned, it is given a chance (two seconds) to get to the read()
by the main
thread (lines 37–38). Next, the main
thread interrupts the new thread (lines 40–42) to see if this has any effect. After another two seconds pass (lines 44–45), the main
thread takes a harsher approach and tries to stop the new thread (lines 47–53). Just in case the thread takes a while to notice, the main
thread sleeps for yet another two seconds (lines 56–57) before checking again (line 59).
Listing 15.2 shows the output produced when DefiantStream
is run. A few seconds after the "leaving main()" message was printed, I typed the "INPUT FROM KEYBOARD" line to supply the blocked thread with some data.
Example 15.2. Output from DefiantStream
1: about to try to read from in
2: about to interrupt thread
3: just interrupted thread
4: about to stop thread
5: just stopped thread, t.isAlive()=false
6: t.isAlive()=false
7: leaving main()
8: --------->INPUT FROM KEYBOARD
9: inside finally:
10: currThread=Thread[Thread-0,5,main]
11: currThread.isAlive()=false
The interrupt()
was ignored (lines 2–3), and quite surprisingly, so was the stop()
(lines 4–5). Notice that isAlive()
returns false,
but the thread remains blocked on the read()
! Even after the main
thread dies (line 7), the blocked thread continues to wait for input. After a few more seconds, I typed line 8 to supply data to the System.in
stream. Sure enough, the read()
was no longer blocked, and the dead thread printed the last three lines! The somewhat dead thread is able to print that it is not alive (line 11). The thread is somewhat dead in the sense that it returns false
from isAlive(),
but it continues to execute code! Note that read()
probably threw an instance of ThreadDeath
(a subclass of Error
) after it got some data, so that only the finally
block was executed as the thread shut down.
The example shows the behavior as of JDK 1.2 on the Win32 platform. As of this writing, there are several bug reports/requests for enhancement (depending on your perspective) pending on these issues. Future releases of the JDK might change this behavior. You can always use DefiantStream
to check to see how a particular JavaVM responds.
Although interrupt()
and stop()
are ignored by a thread blocked on read(),
the thread can usually be unblocked by closing the stream with another thread.
The next series of examples presents three classes: CalcServer,
CalcWorker,
and CalcClient.
These classes work with each other to demonstrate in a somewhat realistic client/server configuration the technique of closing a blocked stream with another thread.
CalcServer
is started first and waits for socket connections on port 2001.
When a connection is received, CalcServer
creates a new CalcWorker
to handle the detailed communications with the client. In a separate VM on the same machine, CalcClient
is launched and creates a connection to CalcServer.
CalcClient
makes one request of the server, and then leaves the line open. The server (through CalcWorker
) responds to the first request and then blocks on a socket read()
waiting for an additional request—a request that never comes. After running for 15 seconds, the server shuts down and uses the stream closing technique to get the thread inside CalcWorker
to break out of its blocked state.
The class CalcServer
(see Listing 15.3) waits for socket connections from CalcClient.
It passes the socket to a new instance of CalcWorker
to manage the server side of the client session. When CalcServer
is shut down, it uses another closing technique to break the internal thread out of its blocked state on the accept()
method of ServerSocket.
Example 15.3. CalcServer.java—Listens for Connections from CalcClient
1: import java.io.*; 2: import java.net.*; 3: import java.util.*; 4: 5: public class CalcServer extends Object { 6: private ServerSocket ss; 7: private List workerList; 8: 9: private Thread internalThread; 10: private volatile boolean noStopRequested; 11: 12: public CalcServer(int port) throws IOException { 13: ss = new ServerSocket(port); 14: workerList = new LinkedList(); 15: 16: noStopRequested = true; 17: Runnable r = new Runnable() { 18: public void run() { 19: try { 20: runWork(); 21: } catch ( Exception x ) { 22: // in case ANY exception slips through 23: x.printStackTrace(); 24: } 25: } 26: }; 27: 28: internalThread = new Thread(r); 29: internalThread.start(); 30: } 31: 32: private void runWork() { 33: System.out.println( 34: "in CalcServer - ready to accept connections"); 35: 36: while ( noStopRequested ) { 37: try { 38: System.out.println( 39: "in CalcServer - about to block " + 40: "waiting for a new connection"); 41: Socket sock = ss.accept(); 42: System.out.println( 43: "in CalcServer - received new connection"); 44: workerList.add(new CalcWorker(sock)); 45: } catch ( IOException iox ) { 46: if ( noStopRequested ) { 47: iox.printStackTrace(); 48: } 49: } 50: } 51: 52: // stop all the workers that were created 53: System.out.println("in CalcServer - putting in a " + 54: "stop request to all the workers"); 55: Iterator iter = workerList.iterator(); 56: while ( iter.hasNext() ) { 57: CalcWorker worker = (CalcWorker) iter.next(); 58: worker.stopRequest(); 59: } 60: 61: System.out.println("in CalcServer - leaving runWork()"); 62: } 63: 64: public void stopRequest() { 65: System.out.println( 66: "in CalcServer - entering stopRequest()"); 67: noStopRequested = false; 68: internalThread.interrupt(); 69: 70: if ( ss != null ) { 71: try { 72: ss.close(); 73: } catch ( IOException x ) { 74: // ignore 75: } finally { 76: ss = null; 77: } 78: } 79: } 80: 81: public boolean isAlive() { 82: return internalThread.isAlive(); 83: } 84: 85: peublic static void main(String[] args) { 86: int port = 2001; 87: 88: try { 89: CalcServer server = new CalcServer(port); 90: Thread.sleep(15000); 91: server.stopRequest(); 92: } catch ( IOException x ) { 93: x.printStackTrace(); 94: } catch ( InterruptedException x ) { 95: // ignore 96: } 97: } 98: }
CalcServer
is a self-running object, and 15 seconds after it is created, its stopRequest()
method is called (lines 89–91). CalcServer
creates a ServerSocket
to listen to port 2001
for client connections (line 13). In its runWork()
method, the internal thread blocks on the accept()
method of ServerSocket
waiting for new socket connections (line 41). When a connection is received, the socket is passed to a new instance of CalcWorker,
and a reference to this CalcWorker
is added to workerList
(line 44).
When stopRequest()
is invoked, noStopRequested
is set to false
(line 67) and the internal thread is interrupted (line 68). Much like a blocked read(),
the accept()
method ignores interrupts. To get the accept()
method to throw an exception, the ServerSocket
is closed (lines 70–78). Back in runWork(),
the internal thread jumps down to the catch
block (lines 45–49) because of this IOException.
A stack trace is printed only if a stop has not been requested (lines 46–48) and in this case, no trace is printed because the closing of the ServerSocket
caused the exception. The internal thread continues on and invokes stopRequest()
on each of the CalcWorker
objects (lines 55–59).
The class CalcWorker
(see Listing 15.4) handles the server-side portion of a client session. It creates streams off the socket and communicates with CalcClient
over them. When it is shut down, it uses a closing technique to break the internal thread out of its blocked state on the read()
method of InputStream.
Example 15.4. CalcWorker.java—The Server-Side Portion of a Client Session
1: import java.io.*; 2: import java.net.*; 3: 4: public class CalcWorker extends Object { 5: private InputStream sockIn; 6: private OutputStream sockOut; 7: private DataInputStream dataIn; 8: private DataOutputStream dataOut; 9: 10: private Thread internalThread; 11: private volatile boolean noStopRequested; 12: 13: public CalcWorker(Socket sock) throws IOException { 14: sockIn = sock.getInputStream(); 15: sockOut = sock.getOutputStream(); 16: 17: dataIn = new DataInputStream( 18: new BufferedInputStream(sockIn)); 19: dataOut = new DataOutputStream( 20: new BufferedOutputStream(sockOut)); 21: 22: noStopRequested = true; 23: Runnable r = new Runnable() { 24: public void run() { 25: try { 26: runWork(); 27: } catch ( Exception x ) { 28: // in case ANY exception slips through 29: x.printStackTrace(); 30: } 31: } 32: }; 33: 34: internalThread = new Thread(r); 35: internalThread.start(); 36: } 37: 38: private void runWork() { 39: while ( noStopRequested ) { 40: try { 41: System.out.println("in CalcWorker - about to " + 42: "block waiting to read a double"); 43: double val = dataIn.readDouble(); 44: System.out.println( 45: "in CalcWorker - read a double!"); 46: dataOut.writeDouble(Math.sqrt(val)); 47: dataOut.flush(); 48: } catch ( IOException x ) { 49: if ( noStopRequested ) { 50: x.printStackTrace(); 51: stopRequest(); 52: } 53: } 54: } 55: 56: // In real-world code, be sure to close other streams and 57: // the socket as part of the clean-up. Omitted here for 58: // brevity. 59: 60: System.out.println("in CalcWorker - leaving runWork()"); 61: } 62: 63: public void stopRequest() { 64: System.out.println( 65: "in CalcWorker - entering stopRequest()"); 66: noStopRequested = false; 67: internalThread.interrupt(); 68: 69: if ( sockIn != null ) { 70: try { 71: sockIn.close(); 72: } catch ( IOException iox ) { 73: // ignore 74: } finally { 75: sockIn = null; 76: } 77: } 78: 79: System.out.println( 80: "in CalcWorker - leaving stopRequest()"); 81: } 82: 83: public boolean isAlive() { 84: return internalThread.isAlive(); 85: } 86: }
CalcWorker
is a self-running object that spends most of its time with its internal thread blocked waiting to read a new request from the client. It keeps a reference to the raw InputStream
retrieved from the socket for use in closing (line 14). It then wraps the raw streams with DataInputStream
and DataOutputStream
to get the desired functionality (lines 17–20).
When using the closing technique to try to get a blocked read()
to unblock, always attempt to close the rawest stream possible. In this example, the rawest InputStream
is the one returned by the getInputStream()
method on Socket.
This maximizes the chances that the stream will throw an exception.
Inside runWork(),
the internal thread blocks waiting to read a double
from the client (line 43). When the thread reads a double,
it calculates the mathematical square root of the double
and sends the resulting double
back down to the client (lines 46–47). The thread then goes back to its blocked state waiting to read more data from the client.
When stopRequest()
is invoked, noStopRequested
is set to false
(line 66) and the internal thread is interrupted (line 67). A blocked read()
does not respond to an interrupt, so the socket's input stream is closed (lines 69–77) causing the blocked read()
to throw an IOException.
Back up in runWork(),
the internal thread jumps down to the catch
block (lines 48–53). A stack trace is printed only if a stop has not been requested (lines 49–52), and in this case, no trace is printed because the closing of the stream caused the exception.
Invoking close()
on an InputStream
can sometimes block. This can occur if both read()
and close()
are synchronized.
The thread blocked on the read()
has the instance lock. Meanwhile, another thread is trying to invoke close()
and blocks trying to get exclusive access to the lock. As of JDK 1.2, BufferedInputStream
has added the synchronized
modifier to its close()
method (it wasn't present in 1.0 and 1.1). This is not indicated in the Javadoc, but is shown in the source code and through reflection. If this is the case, the closing technique will not work unless done on the underlying stream.
The class CalcClient
(see Listing 15.5) is run in a different VM than CalcServer,
but on the same machine. It creates a connection to the server and makes only one request. After that, it keeps the connection open, but does no further communication.
Example 15.5. CalcClient.java—Code to Test CalcWorker
1: import java.io.*; 2: import java.net.*; 3: 4: public class CalcClient extends Object { 5: public static void main(String[] args) { 6: String hostname = "localhost"; 7: int port = 2001; 8: 9: try { 10: Socket sock = new Socket(hostname, port); 11: 12: DataInputStream in = new DataInputStream( 13: new BufferedInputStream(sock.getInputStream())); 14: DataOutputStream out = new DataOutputStream( 15: new BufferedOutputStream(sock.getOutputStream())); 16: 17: double val = 4.0; 18: out.writeDouble(val); 19: out.flush(); 20: 21: double sqrt = in.readDouble(); 22: System.out.println("sent up " + val + ", got back " + sqrt); 23: 24: // Don't ever send another request, but stay alive in 25: // this eternally blocked state. 26: Object lock = new Object(); 27: while ( true ) { 28: synchronized ( lock ) { 29: lock.wait(); 30: } 31: } 32: } catch ( Exception x ) { 33: x.printStackTrace(); 34: } 35: } 36: }
CalcClient
is crudely written to simply show the minimal communication necessary. It creates a socket connection to the server, extracts the data streams, and wraps them to get a DataInputStream
and a DataOutputStream
(lines 10–15). Then it writes a double
over to the server and waits for a different double
to be returned (lines 17–21). After that, CalcClient
keeps the socket connection up, but does not communicate with the server any more. Instead, it goes into an infinite wait state (lines 26–31).
When CalcClient
is run, it produces one line of output and waits to be killed:
sent up 4.0, got back 2.0
On the server side, CalcServer
and CalcWorker
produce the output shown in Listing 15.6. Notice that when the stop request comes in, no exceptions are printed and the worker and server shut down in an orderly manner.
Example 15.6. Output from CalcServer and CalcWorker
1: in CalcServer - ready to accept connections 2: in CalcServer - about to block waiting for a new connection 3: in CalcServer - received new connection 4: in CalcServer - about to block waiting for a new connection 5: in CalcWorker - about to block waiting to read a double 6: in CalcWorker - read a double! 7: in CalcWorker - about to block waiting to read a double 8: in CalcServer - entering stopRequest() 9: in CalcServer - putting in a stop request to all the workers 10: in CalcWorker - entering stopRequest() 11: in CalcWorker - leaving stopRequest() 12: in CalcServer - leaving runWork() 13: in CalcWorker - leaving runWork()
The read()
method on PipedInputStream
will throw a subclass of IOException
called InterruptedIOException
if the blocked thread is interrupted while waiting for bytes to arrive. This is very useful functionality that I would like to see implemented across the whole java.io
package. Until that happens, other techniques have to be used to get out of the blocked state. The next example illustrates such a technique.
The class ThreadedInputStream
(see Listing 15.7) is a subclass of FilterInputStream
and responds to interrupts by throwing an InterruptedIOException.
It uses an internal thread to read from the underlying stream and loads the bytes into a ByteFIFO
(discussed in Chapter 18, "First-In-First-Out (FIFO) Queue"). This read-ahead mechanism can help speed performance but does carry the cost of an extra thread running in the VM. Although very useful, instances of ThreadedInputStream
are not particularly lightweight and should be used sparingly in an application.
Example 15.7. ThreadedInputStream.java—Interruptible read() Capability
1: import java.io.*; 2: 3: // uses SureStop from chapter 16 4: // uses ByteFIFO from chapter 18 5: 6: public class ThreadedInputStream extends FilterInputStream { 7: private ByteFIFO buffer; 8: 9: private volatile boolean closeRequested; 10: private volatile boolean eofDetected; 11: private volatile boolean ioxDetected; 12: private volatile String ioxMessage; 13: 14: private Thread internalThread; 15: private volatile boolean noStopRequested; 16: 17: public ThreadedInputStream(InputStream in, int bufferSize) { 18: super(in); 19: 20: buffer = new ByteFIFO(bufferSize); 21: 22: closeRequested = false; 23: eofDetected = false; 24: ioxDetected = false; 25: ioxMessage = null; 26: 27: noStopRequested = true; 28: Runnable r = new Runnable() { 29: public void run() { 30: try { 31: runWork(); 32: } catch ( Exception x ) { 33: // in case ANY exception slips through 34: x.printStackTrace(); 35: } 36: } 37: }; 38: 39: internalThread = new Thread(r); 40: internalThread.setDaemon(true); 41: internalThread.start(); 42: } 43: 44: public ThreadedInputStream(InputStream in) { 45: this(in, 2048); 46: } 47: 48: private void runWork() { 49: byte[] workBuf = new byte[buffer.getCapacity()]; 50: 51: try { 52: while ( noStopRequested ) { 53: int readCount = in.read(workBuf); 54: 55: if ( readCount == -1 ) { 56: signalEOF(); 57: stopRequest(); 58: } else if ( readCount > 0 ) { 59: addToBuffer(workBuf, readCount); 60: } 61: } 62: } catch ( IOException iox ) { 63: if ( !closeRequested ) { 64: ioxMessage = iox.getMessage(); 65: signalIOX(); 66: } 67: } catch ( InterruptedException x ) { 68: // ignore 69: } finally { 70: // no matter what, make sure that eofDetected is set 71: signalEOF(); 72: } 73: } 74: 75: private void signalEOF() { 76: synchronized ( buffer ) { 77: eofDetected = true; 78: buffer.notifyAll(); 79: } 80: } 81: 82: private void signalIOX() { 83: synchronized ( buffer ) { 84: ioxDetected = true; 85: buffer.notifyAll(); 86: } 87: } 88: 89: private void signalClose() { 90: synchronized ( buffer ) { 91: closeRequested = true; 92: buffer.notifyAll(); 93: } 94: } 95: 96: private void addToBuffer(byte[] workBuf, int readCount) 97: throws InterruptedException { 98: 99: // Create an array exactly as large as the number of 100: // bytes read and copy the data into it. 101: byte[] addBuf = new byte[readCount]; 102: System.arraycopy(workBuf, 0, addBuf, 0, addBuf.length); 103: 104: buffer.add(addBuf); 105: } 106: 107: private void stopRequest() { 108: if ( noStopRequested ) { 109: noStopRequested = false; 110: internalThread.interrupt(); 111: } 112: } 113: 114: public void close() throws IOException { 115: if ( closeRequested ) { 116: // already closeRequested, just return 117: return; 118: } 119: signalClose(); 120: 121: SureStop.ensureStop(internalThread, 10000); 122: stopRequest(); 123: 124: // Use a new thread to close "in" in case it blocks 125: final InputStream localIn = in; 126: Runnable r = new Runnable() { 127: public void run() { 128: try { 129: localIn.close(); 130: } catch ( IOException iox ) { 131: // ignore 132: } 133: } 134: }; 135: 136: Thread t = new Thread(r, "in-close"); 137: // give up when all other non-daemon threads die 138: t.setDaemon(true); 139: t.start(); 140: } 141: 142: private void throwExceptionIfClosed() throws IOException { 143: if ( closeRequested ) { 144: throw new IOException("stream is closed"); 145: } 146: } 147: 148: // Throws InterruptedIOException if the thread blocked on 149: // read() is interrupted while waiting for data to arrive. 150: public int read() 151: throws InterruptedIOException, IOException { 152: 153: // Using read(byte[]) to keep code in one place —makes 154: // single-byte read less efficient, but simplifies 155: // the coding. 156: byte[] data = new byte[1]; 157: int ret = read(data, 0, 1); 158: 159: if ( ret != 1 ) { 160: return -1; 161: } 162: 163: return data[0] & 0x000000FF; 164: } 165: 166: // Throws InterruptedIOException if the thread blocked on 167: // read() is interrupted while waiting for data to arrive. 168: public int read(byte[] dest) 169: throws InterruptedIOException, IOException { 170: 171: return read(dest, 0, dest.length); 172: } 173: 174: // Throws InterruptedIOException if the thread blocked on 175: // read() is interrupted while waiting for data to arrive. 176: public int read( 177: byte[] dest, 178: int offset, 179: int length 180: ) throws InterruptedIOException, IOException { 181: 182: throwExceptionIfClosed(); 183: 184: if ( length < 1 ) { 185: return 0; 186: } 187: 188: if ( ( offset < 0 ) || 189: ( ( offset + length ) > dest.length ) 190: ) { 191: 192: throw new IllegalArgumentException( 193: "offset must be at least 0, and " + 194: "(offset + length) must be less than or " + 195: "equal to dest.length. " + 196: "offset=" + offset + 197: ", (offset + length )=" + ( offset + length ) + 198: ", dest.length=" + dest.length); 199: } 200: 201: byte[] data = removeUpTo(length); 202: 203: if ( data.length > 0 ) { 204: System.arraycopy(data, 0, dest, offset, data.length); 205: return data.length; 206: } 207: 208: // no data 209: if ( eofDetected ) { 210: return -1; 211: } 212: 213: // no data and not end of file, must be exception 214: stopRequest(); 215: 216: if ( ioxMessage == null ) { 217: ioxMessage = "stream cannot be read"; 218: } 219: 220: throw new IOException(ioxMessage); 221: } 222: 223: private byte[] removeUpTo(int maxRead) throws IOException { 224: // Convenience method to assist read(byte[], int, int). 225: // Waits until at least one byte is ready, EOF is 226: // detected, an IOException is thrown, or the 227: // stream is closed. 228: try { 229: synchronized ( buffer ) { 230: while ( buffer.isEmpty() && 231: !eofDetected && 232: !ioxDetected && 233: !closeRequested 234: ) { 235: 236: buffer.wait(); 237: } 238: 239: // If stream was closed while waiting, 240: // get out right away. 241: throwExceptionIfClosed(); 242: 243: // Ignore eof and exception flags for now, see 244: // if any data remains. 245: byte[] data = buffer.removeAll(); 246: 247: if ( data.length > maxRead ) { 248: // Pulled out too many bytes, 249: // put excess back. 250: byte[] putBackData = 251: new byte[data.length - maxRead]; 252: System.arraycopy(data, maxRead, 253: putBackData, 0, putBackData.length); 254: buffer.add(putBackData); 255: 256: byte[] keepData = new byte[maxRead]; 257: System.arraycopy(data, 0, 258: keepData, 0, keepData.length); 259: data = keepData; 260: } 261: 262: return data; 263: } 264: } catch ( InterruptedException ix ) { 265: // convert to an IOException 266: throw new InterruptedIOException("interrupted " + 267: "while waiting for data to arrive for reading"); 268: } 269: } 270: 271: public long skip(long n) throws IOException { 272: throwExceptionIfClosed(); 273: 274: if ( n <= 0 ) { 275: return 0; 276: } 277: 278: int skipLen = (int) Math.min(n, Integer.MAX_VALUE); 279: int readCount = read(new byte[skipLen]); 280: 281: if ( readCount < 0 ) { 282: return 0; 283: } 284: 285: return readCount; 286: } 287: 288: public int available() throws IOException { 289: throwExceptionIfClosed(); 290: return buffer.getSize(); 291: } 292: 293: public boolean markSupported() { 294: return false; 295: } 296: 297: public synchronized void mark(int readLimit) { 298: // ignore method calls, mark not supported 299: } 300: 301: public synchronized void reset() throws IOException { 302: throw new IOException( 303: "mark-reset not supported on this stream"); 304: } 305: }
ThreadedInputStream
extends FilterInputStream
(line 6) and passes the InputStream
handed to the constructor up the constructor for the superclass (line 18). The superclass has a protected
member variable called in
that holds a reference to the underlying InputStream;
this reference is used throughout ThreadedInputStream.
The internal thread basically just reads as much data as it can and loads it into the ByteFIFO
buffer (lines 52–61). If the buffer is full, the internal thread blocks waiting for some data to be read out of the buffer (line 104). When the internal thread gets to the end of the file (EOF), it sets a flag (lines 75–80). If the internal thread encounters an IOException
while reading into the buffer, it sets a flag (lines 82–87). The ByteFIFO
is used for all of the wait-notify signaling.
When an external thread comes in to read some data, it gets the data from the ByteFIFO
buffer (lines 201–220, 223–269). The external thread pays attention to the EOF and exception flags only if there is no more data in the buffer. This delays the reporting until the external thread catches up to the internal thread. If the external thread is blocked waiting for some data to arrive (line 236) and is then interrupted, it will jump to the catch
block (lines 264–268). There, the InterruptedException
is caught, and a new InterruptedIOException
is thrown in its place (lines 266–267). This means that a thread blocked on a read()
will now respond to interrupts!
The close()
method (lines 114–140) is used to shut down the internal thread (notice that in this class, stopRequest()
is private
). The close()
method can be safely invoked more than once because it simply ignores subsequent requests. It starts by signaling that a close has been requested (line 119), which will cause any blocked read()
calls on ThreadedInputStream
to throw an IOException
(line 241). It then uses SureStop
to make sure that even if all else fails, the internal thread will be stopped in 10 seconds (line 121). Inside close(),
a new thread is created to invoke close()
on the underlying stream (lines 125–139). This step is necessary in case the call blocks for quite a while—or even forever if a deadlock scenario occurs. ThreadedInputStream
can't control what kind of InputStream
it is passed in its constructor, so this extra thread is just added insurance.
The class ThreadedInputStream
just focuses on the task of splitting up the transfer of data between two threads. It performs poorly on single-byte reads and does not have any support for the mark-reset mechanism. BufferedThreadedInputStream
(see Listing 15.8) makes up for these shortcomings by using a ThreadedInputStream
with a BufferedInputStream
added on both ends to smooth data flow.
Example 15.8. BufferedThreadedInputStream.java—ThreadedInputStream with Buffering
1: import java.io.*; 2: 3: // uses ThreadedInputStream 4: 5: public class BufferedThreadedInputStream 6: extends FilterInputStream { 7: 8: // fixed class that does *not* have a synchronized close() 9: private static class BISFix extends BufferedInputStream { 10: public BISFix(InputStream rawIn, int buffSize) { 11: super(rawIn, buffSize); 12: } 13: 14: public void close() throws IOException { 15: if ( in != null ) { 16: try { 17: in.close(); 18: } finally { 19: in = null; 20: } 21: } 22: } 23: } 24: 25: public BufferedThreadedInputStream( 26: InputStream rawIn, 27: int bufferSize 28: ) { 29: 30: super(rawIn); // super-class'"in" is set below 31: 32: // rawIn -> BufferedIS -> ThreadedIS -> 33: // BufferedIS -> read() 34: 35: BISFix bis = new BISFix(rawIn, bufferSize); 36: ThreadedInputStream tis = 37: new ThreadedInputStream(bis, bufferSize); 38: 39: // Change the protected variable 'in'from the 40: // superclass from rawIn to the correct stream. 41: in = new BISFix(tis, bufferSize); 42: } 43: 44: public BufferedThreadedInputStream(InputStream rawIn) { 45: this(rawIn, 2048); 46: } 47: 48: // Overridden to show that InterruptedIOException might 49: // be thrown. 50: public int read() 51: throws InterruptedIOException, IOException { 52: 53: return in.read(); 54: } 55: 56: // Overridden to show that InterruptedIOException might 57: // be thrown. 58: public int read(byte[] b) 59: throws InterruptedIOException, IOException { 60: 61: return in.read(b); 62: } 63: 64: // Overridden to show that InterruptedIOException might 65: // be thrown. 66: public int read(byte[] b, int off, int len) 67: throws InterruptedIOException, IOException { 68: 69: return in.read(b, off, len); 70: } 71: 72: // Overridden to show that InterruptedIOException might 73: // be thrown. 74: public long skip(long n) 75: throws InterruptedIOException, IOException { 76: 77: return in.skip(n); 78: } 79: 80: // The remainder of the methods are directly inherited from 81: // FilterInputStream and access "in" in the much the same 82: // way as the methods above do. 83: }
BufferedThreadedInputStream
has a nested class (lines 8–23) called BISFix
that simply extends BufferedInputStream
and overrides close()
so that it is not synchronized.
This is a critical difference that is needed so that close()
can be executed while another thread is blocked inside read().
In the constructor (lines 25–42), the raw input stream is wrapped in a BISFix
(modified BufferedInputStream
), which is wrapped in a ThreadedInputStream,
which is wrapped in another BISFix.
This provides buffering for both the internal thread and any external thread that does some reading.
Now it's time to combine the techniques presented so far. CalcServerTwo
(see Listing 15.9) has been slightly modified to work with CalcWorkerTwo.
CalcWorkerTwo
(see Listing 15.10) now uses a BufferedThreadedInputStream
as an inline filter so that when it is blocked trying to read(),
it will respond to an interrupt.
Example 15.9. CalcServerTwo.java—Modified to Work with CalcWorkerTwo
1: import java.io.*; 2: import java.net.*; 3: import java.util.*; 4: 5: public class CalcServerTwo extends Object { 6: private ServerSocket ss; 7: private List workerList; 8: 9: private Thread internalThread; 10: private volatile boolean noStopRequested; 11: 12: public CalcServerTwo(int port) throws IOException { 13: ss = new ServerSocket(port); 14: workerList = new LinkedList(); 15: 16: noStopRequested = true; 17: Runnable r = new Runnable() { 18: public void run() { 19: try { 20: runWork(); 21: } catch ( Exception x ) { 22: // in case ANY exception slips through 23: x.printStackTrace(); 24: } 25: } 26: }; 27: 28: internalThread = new Thread(r); 29: internalThread.start(); 30: } 31: 32: private void runWork() { 33: System.out.println( 34: "in CalcServer - ready to accept connections"); 35: 36: while ( noStopRequested ) { 37: try { 38: System.out.println( 39: "in CalcServer - about to block " + 40: "waiting for a new connection"); 41: Socket sock = ss.accept(); 42: System.out.println( 43: "in CalcServer - received new connection"); 44: workerList.add(new CalcWorkerTwo(sock)); 45: } catch ( IOException iox ) { 46: if ( noStopRequested ) { 47: iox.printStackTrace(); 48: } 49: } 50: } 51: 52: // stop all the workers that were created 53: System.out.println("in CalcServer - putting in a " + 54: "stop request to all the workers"); 55: Iterator iter = workerList.iterator(); 56: while ( iter.hasNext() ) { 57: CalcWorkerTwo worker = (CalcWorkerTwo) iter.next(); 58: worker.stopRequest(); 59: } 60: 61: System.out.println("in CalcServer - leaving runWork()"); 62: } 63: 64: public void stopRequest() { 65: System.out.println( 66: "in CalcServer - entering stopRequest()"); 67: noStopRequested = false; 68: internalThread.interrupt(); 69: 70: if ( ss != null ) { 71: try { 72: ss.close(); 73: } catch ( IOException x ) { 74: // ignore 75: } finally { 76: ss = null; 77: } 78: } 79: } 80: 81: public boolean isAlive() { 82: return internalThread.isAlive(); 83: } 84: 85: public static void main(String[] args) { 86: int port = 2001; 87: 88: try { 89: CalcServerTwo server = new CalcServerTwo(port); 90: Thread.sleep(15000); 91: server.stopRequest(); 92: } catch ( IOException x ) { 93: x.printStackTrace(); 94: } catch ( InterruptedException x ) { 95: // ignore 96: } 97: } 98: }
Example 15.10. CalcWorkerTwo.java—Using BufferedThreadedInputStream
1: import java.io.*; 2: import java.net.*; 3: 4: public class CalcWorkerTwo extends Object { 5: private DataInputStream dataIn; 6: private DataOutputStream dataOut; 7: 8: private Thread internalThread; 9: private volatile boolean noStopRequested; 10: 11: public CalcWorkerTwo(Socket sock) throws IOException { 12: dataIn = new DataInputStream( 13: new BufferedThreadedInputStream( 14: sock.getInputStream())); 15: dataOut = new DataOutputStream( 16: new BufferedOutputStream( 17: sock.getOutputStream())); 18: 19: noStopRequested = true; 20: Runnable r = new Runnable() { 21: public void run() { 22: try { 23: runWork(); 24: } catch ( Exception x ) { 25: // in case ANY exception slips through 26: x.printStackTrace(); 27: } 28: } 29: }; 30: 31: internalThread = new Thread(r); 32: internalThread.start(); 33: } 34: 35: private void runWork() { 36: while ( noStopRequested ) { 37: try { 38: System.out.println("in CalcWorker - about to " + 39: "block waiting to read a double"); 40: double val = dataIn.readDouble(); 41: System.out.println( 42: "in CalcWorker - read a double!"); 43: dataOut.writeDouble(Math.sqrt(val)); 44: dataOut.flush(); 45: } catch ( InterruptedIOException iiox ) { 46: System.out.println("in CalcWorker - blocked " + 47: "read was interrupted!!!"); 48: } catch ( IOException x ) { 49: if ( noStopRequested ) { 50: x.printStackTrace(); 51: stopRequest(); 52: } 53: } 54: } 55: 56: // In real-world code, be sure to close other streams 57: // and the socket as part of the clean-up. Omitted here 58: // for brevity. 59: 60: System.out.println("in CalcWorker - leaving runWork()"); 61: } 62: 63: public void stopRequest() { 64: System.out.println( 65: "in CalcWorker - entering stopRequest()"); 66: noStopRequested = false; 67: internalThread.interrupt(); 68: System.out.println( 69: "in CalcWorker - leaving stopRequest()"); 70: } 71: 72: public boolean isAlive() { 73: return internalThread.isAlive(); 74: } 75: }
CalcWorkerTwo
has been modified to take advantage of the interruptible I/O of BufferedThreadedInputStream
(line 13). The InterruptedIOException
has been caught in this code simply to print a message (lines 45–47) but can be ignored. The stopRequest()
method (lines 63–70) has been simplified back down to the self-running object template—only an interrupt()
is necessary to unblock a read().
The same CalcClient
code can be used with the new CalcServerTwo
and CalcWorkerTwo.
Listing 15.11 shows possible output from CalcServerTwo.
Your output should match very closely with only a message or two swapped.
Example 15.11. Possible Output from CalcServerTwo
1: in CalcServer - ready to accept connections 2: in CalcServer - about to block waiting for a new connection 3: in CalcServer - received new connection 4: in CalcServer - about to block waiting for a new connection 5: in CalcWorker - about to block waiting to read a double 6: in CalcWorker - read a double! 7: in CalcWorker - about to block waiting to read a double 8: in CalcServer - entering stopRequest() 9: in CalcServer - putting in a stop request to all the workers 10: in CalcWorker - entering stopRequest() 11: in CalcWorker - leaving stopRequest() 12: in CalcWorker - blocked read was interrupted!!! 13: in CalcServer - leaving runWork() 14: in CalcWorker - leaving runWork()
The main difference in the output from what was seen before is line 12. Here, a message is printed confirming that an InterruptedIOException
was thrown when the blocked thread was interrupted.
Blocked I/O statements can be troublesome to deal with, but this chapter offered a few techniques to avoid this difficulty. A blocked read()
on an InputStream
ignores interrupt()
and stop(),
but generally will throw an exception if its stream is closed by another thread. A blocked accept()
on a ServerSocket
also ignores interrupts, but will throw an exception if the ServerSocket
is closed by another thread.
ThreadedInputStream
and BufferedThreadedInputStream
provide a mechanism that allows a blocked read()
to be interrupted and throw an InterruptedIOException.
Although this technique comes with the overhead of another thread, it can be very useful by providing interruptible I/O.
18.118.12.232