Chapter 18. First-In-First-Out (FIFO) Queue

IN THIS CHAPTER

The First-In-First-Out (FIFO) queue is a data structure that has many applications in systems development. In this chapter, I'll show you a version of a FIFO queue that supports simultaneous access by multiple threads. In particular, I'll present two versions that are usable in the real world, as they are, and extendable:

  • ObjectFIFO, which holds references to objects

  • ByteFIFO, which holds byte values directly

How a FIFO Queue Works

As elements are added to a FIFO queue, they are stored in order. When elements are removed from a FIFO queue, they come out in exactly the same order as they were added. The first element put into the queue will be the first element taken out of the queue, the second element added to the queue will be the second element removed from the queue, and so on.

Figure 18.1 shows an example of how elements are added to a FIFO queue. In the first step, item A is added to an empty FIFO queue. In the second step, you can see that item A is at the bottom (where it will eventually be removed) and that item B is being added. In the third step, items A and B are in the queue, and item C is being added. The result is that items A, B, and C are inside the queue and are stacked in the order they were added.

Adding items to a FIFO queue.

Figure 18.1. Adding items to a FIFO queue.

Figure 18.2 continues this example. In the first step, you can see that items A, B, and C are in the queue and that item A is about to be removed. In the second step, item A is gone, and items B and C have shifted down. When item D is added, it ends up behind item C. In the third step, items B, C, and D are in the FIFO queue, and item B is being removed. The fourth step shows only items C and D remaining; item C is about to be removed. The result is that only item D remains inside the queue, and items A, B, and C were removed in the same order as they were added. If another element were to be removed, it would be item D, after which the queue would be empty again.

Removing items from and adding items to a FIFO queue.

Figure 18.2. Removing items from and adding items to a FIFO queue.

Implementing a FIFO with an Array

You can implement a FIFO queue in many ways, including using a Vector, a linked list, an array, or any number of other options. In this chapter, I use an array of a fixed size to hold the elements. Because the array is fixed in size, the capacity of the FIFO is the length of the array. The capacity is the maximum number of elements that can be inside the queue at any one time. When all the array positions are filled, the FIFO is considered to be full. In addition, when the FIFO is full, calls to the add() method should block until space is available. Space becomes available when another thread invokes the remove() method. The slots are reused when items have been removed so that the array can be thought of as circular—after getting to the end, it wraps around to the beginning.

Figure 18.3 shows a FIFO implemented with an array that has a length of 5. The cells are numbered from 0 to 4. The capacity is the maximum number of elements the FIFO can hold. In this case, capacity is 5. The size is the number of elements currently being held inside the FIFO queue. In all FIFO queues, size is initially 0 and can grow to a maximum value that equals capacity.

A FIFO implementation that uses an array to hold the items.

Figure 18.3. A FIFO implementation that uses an array to hold the items.

Two indexes into the array are held: the head and the tail. The head points to the cell where the next element added will be put. An element may be added to the cell pointed to by head only when the FIFO queue is not full. The tail points to the cell that holds the next element to be removed. An element may be removed from the cell pointed to by tail only when the FIFO is not empty. Initially, both head and tail point to cell 0.

The first snapshot of the FIFO in Figure 18.3 shows an empty queue. The next snapshot shows how it looks after item S01 is added. You can see that size has increased to 1, and that head now points to cell 1 (capacity and tail remain unchanged). The third snapshot shows how the FIFO looks after item S02 is added. Notice that size has increased to 2, and that head now points to cell 2. The last snapshot of this figure shows how the FIFO looks after item S03 has been added.

Figure 18.4 continues this example. The first snapshot shows how the FIFO queue looks after one item has been removed. The item removed used to be in cell 0 and was S01. As a result of this removal, size decreased to 2, and tail now points to cell 1. The second snapshot shows how the FIFO looks after item S04 has been added. The third snapshot shows the result of adding S05. The size has increased to 4, and the head pointer wrapped around and now points to cell 0. When S06 is added, it goes into cell 0, and head moves over to point to cell 1. At this time, both head and tail are pointing to the same cell. When the FIFO queue is empty or full, head and tail will point to the same cell.

Adding and removing items with an array-based FIFO Queue.

Figure 18.4. Adding and removing items with an array-based FIFO Queue.

Table 18.1 summarizes how a FIFO queue is implemented with a fixed-size array.

Table 18.1. An Array Implementation of a FIFO Queue: A Summary of Terms

Term Explanation
capacity The maximum number of items that can be held (the length of the array)
size The current number of items in the queue
head The index into the array where the next item will be added
tail The index into the array where the next item will be removed
empty When size is equal to 0
full When size is equal to capacity

Simple Implementation in Java: SimpleObjectFIFO

The class SimpleObjectFIFO shows how this model can be implemented in Java. This version holds references to objects as its item type. Synchronization is used to ensure that two or more threads can safely interact with the FIFO queue (see Chapter 7, "Concurrent Access to Objects and Variables"). The wait-notify mechanism is used to signal between threads when the state of the FIFO queue changes (see Chapter 8, "Inter-thread Communication"). Whenever an item is added or removed, notifyAll() is used to signal any and all waiting threads that the state of the FIFO queue has changed.

When one thread tries to add an item to a full queue, it blocks until the queue is no longer full. It does this by entering a wait state (which implicitly releases the lock) to allow another thread to come along and remove an item. When the other thread removes an item, it signals the waiting thread by invoking notifyAll(). The waiting thread then wakes up, reacquires the lock, returns from wait(), and completes the request to add the item.

Similarly, when one thread tries to remove an item from an empty queue, it blocks until the queue is no longer empty. It enters a wait state to allow another thread to come along and add an item. Listing 18.1 shows the code for SimpleObjectFIFO.

Example 18.1. SimpleObjectFIFO.java—A Simple FIFO Queue That Uses an Array Internally

 1: public class SimpleObjectFIFO extends Object {
 2:     private Object[] queue;
 3:     private int capacity;
 4:     private int size;
 5:     private int head;
 6:     private int tail;
 7:
 8:     public SimpleObjectFIFO(int cap) {
 9:         capacity = ( cap > 0 ) ? cap : 1; // at least 1
10:         queue = new Object[capacity];
11:         head = 0;
12:         tail = 0;
13:         size = 0;
14:     }
15:
16:     public synchronized int getSize() {
17:         return size;
18:     }
19:
20:     public synchronized boolean isFull() {
21:         return ( size == capacity );
22:     }
23:
24:     public synchronized void add(Object obj) throws InterruptedException {
25:         while ( isFull() ) {
26:             wait();
27:         }
28:
29:         queue[head] = obj;
30:         head = ( head + 1 ) % capacity;
31:         size++;
32: c
33:         notifyAll(); // let any waiting threads know about change
34:     }
35:
36:     public synchronized Object remove() throws InterruptedException {
37:         while ( size == 0 ) {
38:             wait();
39:         }
40:
41:         Object obj = queue[tail];
42:         queue[tail] = null; // don't block GC by keeping reference
43:         tail = ( tail + 1 ) % capacity;
44:         size—;
45:
46:         notifyAll(); // let any waiting threads know about change
47:
48:         return obj;
49:     }
50:
51:     public synchronized void printState() {
52:         StringBuffer sb = new StringBuffer();
53:
54:         sb.append("SimpleObjectFIFO:
");
55:         sb.append("       capacity=" + capacity + "
");
56:
57:         sb.append("           size=" + size);
58:         if ( isFull() ) {
59:             sb.append(" - FULL");
60:         } else if ( size == 0 ) {
61:             sb.append(" - EMPTY");
62:         }
63:         sb.append("
");
64:
65:         sb.append("           head=" + head + "
");
66:         sb.append("           tail=" + tail + "
");
67:
68:         for ( int i = 0; i < queue.length; i++ ) {
69:             sb.append("       queue[" + i + "]=" + queue[i] + "
");
70:         }
71: c
72:         System.out.print(sb);
73:     }
74: }

SimpleObjectFIFO has several member variables. None of them are marked as volatile because all accesses and modifications to them occur within synchronized methods. The private member variable queue is an array of Objects used to hold the references to objects added (line 2). The length of this array is held in capacity (line 3). The current number of items in the FIFO queue is held in size (line 4). The index into queue for the next addition is held in head (line 5). The index into queue for the next item to be removed is held in tail (line 6).

The constructor (lines 8–14) takes as its only argument the capacity of the new FIFO queue. If the indicated capacity is less than 1, a minimum of 1 is silently used (line 9). An array is allocated (line 10), and the head and tail pointers and size are all set to 0 (lines 10–13).

The getSize() method is synchronized to ensure that the most recent value for size is returned (lines 16–18). The isFull() method is synchronized to ensure that nothing changes while the full determination is being made (lines 20–22). The queue is considered to be full if the current size is equal to the capacity.

The add() method c (lines 24–34) also must be synchronized for multithread safety. Because add() could potentially have to wait if the FIFO queue is currently full, it declares that it might throw an InterruptedException (line 24). This exception is thrown if the thread is interrupted before it has a chance to add the item to the queue. The while loop (lines 25–27) is used to keep the thread from proceeding if the FIFO queue is currently full. It must be a while loop because if more than one thread is trying to simultaneously add items, the following statements would not always work:

if ( isFull() ) { // dangerous to use 'if'instead of 'while'
    wait();
}

If two threads are trying to add an item, and a third thread removes one item, only one of the two trying to add may proceed. The notifyAll() method is used instead of notify() so that all waiting threads return from wait() to check out the changed conditions (this is a deliberate choice to support the notification of threads that don't want to add or remove items, but simply want to know about the changes to the FIFO queue). If only one slot is available, only one of the threads waiting to add an item should proceed. This is why a while loop is necessary to ensure that—although the thread was notified and returned from wait()—it won't proceed unless the condition it was waiting on is still being met.

When the thread trying to add an item gets to line 29, at least one space is available in the FIFO queue for its item. The reference to the item is assigned to the slot where head is currently pointing (line 29). Next, head is incremented to point to the next cell. If head is already pointing to the last array position, it will be wrapped around to point to the first array position (line 30). Because an item has been added, size is incremented to reflect this change (line 31). Just before returning from add(), any and all waiting threads are notified of the change (line 33).

The remove() method (lines 36–49) also must be synchronized for multithread safety. Because remove() could potentially have to wait if the FIFO queue is currently empty, it declares that it might throw an InterruptedException (line 36). This exception is thrown if the thread is interrupted before it has a chance to remove an item from the queue. The while loop (lines 37–39) is used to keep the thread from proceeding if the FIFO queue is currently empty. The FIFO queue is considered empty as long as its size is 0. A while loop is also necessary here (for generally the same reasons that it was necessary inside add()), in case two threads are simultaneously blocked waiting to remove an item.

When the thread trying to remove an item gets to line 41, at least one item is available for removal in the FIFO queue. The reference to this item is stored in obj (line 41) so that it can be returned at the end of the method. Next, the slot in the FIFO queue has its value set to null (line 42) to ensure that the queue doesn't keep any unnecessary references to items that have been removed. If this were not done, it's possible that an item would not be available for garbage collection solely because of an outdated reference held in the FIFO queue. After this, tail is incremented to point to the next cell. If tail is already pointing to the last array position, it will be wrapped around to point to the first array position (line 43). Because an item has been removed, size is reduced to reflect this change (line 44). Just before returning from remove(), any and all waiting threads are notified of the change (line 46). Finally, the reference to the item just removed is returned to the caller (line 48).

The printState() method (lines 51–73) cis used to print out the values of all the member variables and the current contents of the queue. It is synchronized to ensure that nothing changes during printing. This method isn't really part of a FIFO queue implementation, but I include it here to show you the internal values of the FIFO queue at various points in time.

The class SimpleObjectFIFOTest, in Listing 18.2, isc used to show how SimpleObjectFIFO works.

Example 18.2. SimpleObjectFIFOTest.java—Code to Demonstrate SimpleObjectFIFO

 1: public class SimpleObjectFIFOTest extends Object {
 2:     public static void main(String[] args) {
 3:         try {
 4:             SimpleObjectFIFO fifo = new SimpleObjectFIFO(5);
 5:             fifo.printState();
 6:
 7:             fifo.add("S01");
 8:             fifo.printState();
 9:
10:             fifo.add("S02");
11:             fifo.printState();
12:
13:             fifo.add("S03");
14:             fifo.printState();
15:
16:             Object obj = fifo.remove();
17:             System.out.println("just removed obj=" + obj);
18:             fifo.printState();
19:
20:             fifo.add("S04");
21:             fifo.printState();
22:
23:             fifo.add("S05");
24:             fifo.printState();
25:
26:             fifo.add("S06");
27:             fifo.printState();
28:         } catch ( InterruptedException x ) {
29:             x.printStackTrace();
30:         }
31:     }
32: }

SimpleObjectFIFOTest simply performs the steps shown in Figures 18.3 and 18.4 and invokes printState() after each change. When cit is run, it produces the output shown in Listing 18.3.

Example 18.3. Output from SimpleObjectFIFOTest

 1: SimpleObjectFIFO:
 2:        capacity=5
 3:            size=0 - EMPTY
 4:            head=0
 5:            tail=0
 6:        queue[0]=null
 7:        queue[1]=null
 8:        queue[2]=null
 9:        queue[3]=null
10:        queue[4]=null
11: SimpleObjectFIFO:
12:        capacity=5
13:            size=1
14:            head=1
15:            tail=0
16:        queue[0]=S01
17:        queue[1]=null
18:        queue[2]=null
19:        queue[3]=null
20:        queue[4]=null
21: SimpleObjectFIFO:
22:        capacity=5
23:            size=2
24:            head=2
25:            tail=0
26:        queue[0]=S01
27:        queue[1]=S02
28:        queue[2]=null
29:        queue[3]=null
30:        queue[4]=null
31: SimpleObjectFIFO:
32:        capacity=5
33:            size=3
34:            head=3c
35:            tail=0
36:        queue[0]=S01
37:        queue[1]=S02
38:        queue[2]=S03
39:        queue[3]=null
40:        queue[4]=null
41: just removed obj=S01
42: SimpleObjectFIFO:
43:        capacity=5
44:            size=2
45:            head=3
46:            tail=1
47:        queue[0]=null
48:        queue[1]=S02
49:        queue[2]=S03
50:        queue[3]=null
51:        queue[4]=null
52: SimpleObjectFIFO:
53:        capacity=5
54:            size=3
55:            head=4
56:            tail=1
57:        queue[0]=null
58:        queue[1]=S02
59:        queue[2]=S03
60:        queue[3]=S04
61:        queue[4]=null
62: SimpleObjectFIFO:
63:        capacity=5
64:            size=4
65:            head=0
66:            tail=1
67:        queue[0]=null
68:        queue[1]=S02
69:        queue[2]=S03
70:        queue[3]=S04
71:        queue[4]=S05
72: SimpleObjectFIFO:
73:        capacity=5
74:            size=5 - FULL
75:            head=1
76:            tail=1
77:        queue[0]=S06
78:        queue[1]=S02
79:        queue[2]=S03
80:        queue[3]=S04
81:        queue[4]=S05

You will cnotice that capacity remains constant throughout but that size moves up and down as items are added and removed from the FIFO queue. In particular, take note of what happens when S01 is removed (line 41). The FIFO queue's size decreases from 3 to 2, and the tail pointer moves from cell 0 to cell 1. Also, take special note that the reference that was held to S01 is set to null (line 47). You should compare the output here with Figures 18.3 and 18.4 and notice that they correspond closely with one another.

An Expanded FIFO Queue for Object References: ObjectFIFO

The class ObjectFIFO removes the printState() method from SimpleObjectFIFO and expands on its foundation to create a more feature-rich FIFO queue for holding object references. The public API for ObjectFIFO consists of the following:

public ObjectFIFO(int cap)
public int getCapacity()
public synchronized int getSize()
public synchronized boolean isEmpty()
public synchronized boolean isFull()
public synchronized void add(Object obj) throws InterruptedException
public synchronized void addEach(Object[] list) throws InterruptedException
public synchronized Object remove() throws InterruptedException
public synchronized Object[] removeAll() throws InterruptedException
public synchronized Object[] removeAtLeastOne() throws InterruptedException
public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException
public synchronized void waitUntilEmpty() throws InterruptedException
public synchronized void waitWhileEmpty() throws InterruptedException
public synchronized void waitUntilFull() throws InterruptedException
public synchronized void waitWhileFull() throws InterruptedException

All the methods (except getCapacity()) are synchronized to ensure that multiple threads can safely and simultaneously interact with ObjectFIFO. The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block waiting for something to change declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.4 shows the code for ObjectFIFO. You should feel free to expand on this functionality by adding more methods—especially consider adding timeout options to all the methods that can block!

Example 18.4. ObjectFIFO.java—Fuller Implementation of a FIFO for Objects

  1: public class ObjectFIFO extends Object {
  2:     private Object[] queue;
  3:     private int capacity;
  4:     private int size;
  5:     private int head;
  6:     private int tail;
  7:
  8:     public ObjectFIFO(int cap) {
  9:         capacity = ( cap > 0 ) ? cap : 1; // at least 1
 10:         queue = new Object[capacity];
 11:         head = 0;
 12:         tail = 0;
 13:         size = 0;
 14:     }
 15:
 16:     public int getCapacity() {
 17:         return capacity;
 18:     }
 19:
 20:     public synchronized int getSize() {
 21:         return size;
 22:     }
 23:
 24:     public synchronized boolean isEmpty() {
 25:         return ( size == 0 );
 26:     }
 27:
 28:     public synchronized boolean isFull() {
 29:         return ( size == capacity );
 30:     }
 31:
 32:     public synchronized void add(Object obj)
 33:             throws InterruptedException {
 34:
 35:         waitWhileFull();
 36: 
 37:         queue[head] = obj;
 38:         head = ( head + 1 ) % capacity;
 39:         size++;
 40:
 41:         notifyAll(); // let any waiting threads know about change
 42:     }
 43:
 44:     public synchronized void addEach(Object[] list)
 45:             throws InterruptedException {
 46:
 47:         //
 48:         // You might want to code a more efficient
 49:         // implementation here ... (see ByteFIFO.java)
 50:         //
 51:
 52:         for ( int i = 0; i < list.length; i++ ) {
 53:             add(list[i]);
 54:         }
 55:     }
 56:
 57:     public synchronized Object remove()
 58:             throws InterruptedException {
 59:
 60:         waitWhileEmpty();
 61:
 62:         Object obj = queue[tail];
 63:
 64:         // don't block GC by keeping unnecessary reference
 65:         queue[tail] = null;
 66:
 67:         tail = ( tail + 1 ) % capacity;
 68:         size—;
 69:
 70:         notifyAll(); // let any waiting threads know about change
 71:
 72:         return obj;
 73:     }
 74:
 75:     public synchronized Object[] removeAll()
 76:             throws InterruptedException {
 77:
 78:         //
 79:         // You might want to code a more efficient
 80:         // implementation here ... (see ByteFIFO.java)
 81:         //
 82:
 83:         Object[] list = new Object[size]; // use the current size
 84: 
 85:         for ( int i = 0; i < list.length; i++ ) {
 86:             list[i] = remove();
 87:         }
 88:
 89:         // if FIFO was empty, a zero-length array is returned
 90:         return list;
 91:     }
 92:
 93:     public synchronized Object[] removeAtLeastOne()
 94:             throws InterruptedException {
 95:
 96:         waitWhileEmpty(); // wait for at least one to be in FIFO
 97:         return removeAll();
 98:     }
 99:
100:     public synchronized boolean waitUntilEmpty(long msTimeout)
101:             throws InterruptedException {
102:
103:         if ( msTimeout == 0L ) {
104:             waitUntilEmpty();  // use other method
105:             return true;
106:         }
107:
108:         // wait only for the specified amount of time
109:         long endTime = System.currentTimeMillis() + msTimeout;
110:         long msRemaining = msTimeout;
111:
112:         while ( !isEmpty() && ( msRemaining > 0L ) ) {
113:             wait(msRemaining);
114:             msRemaining = endTime - System.currentTimeMillis();
115:         }
116:
117:         // May have timed out, or may have met condition,
118:         // calc return value.
119:         return isEmpty();
120:     }
121:
122:     public synchronized void waitUntilEmpty()
123:             throws InterruptedException {
124: 
125:         while ( !isEmpty() ) {
126:             wait();
127:         }
128:     }
129:
130:     public synchronized void waitWhileEmpty()
131:             throws InterruptedException {
132:
133:         while ( isEmpty() ) {
134:             wait();
135:         }
136:     }
137:
138:     public synchronized void waitUntilFull()
139:             throws InterruptedException {
140:
141:         while ( !isFull() ) {
142:             wait();
143:         }
144:     }
145:
146:     public synchronized void waitWhileFull()
147:             throws InterruptedException {
148:
149:         while ( isFull() ) {
150:             wait();
151:         }
152:     }
153: }

In ObjectFIFO, the member variables, the constructor, and the getSize() and isFull() methods work the same as in SimpleObjectFIFO (described earlier in this chapter).

The getCapacity() method (lines 16–18) has been added for convenience to determine the maximum number of object references that can be held in the FIFO queue. The isEmpty() method (lines 24–26) returns true if the size is currently 0.

The add() method (lines 32–42) has been changed slightly to call waitWhileFull(), rather than handle the wait() call directly. Otherwise, the add() in ObjectFIFO is the same as in SimpleObjectFIFO.

The addEach() method (lines 44–55) supports adding each element in an Object[] as its own item. If an Object[] should be added as one item, add() should be called instead. Inside addEach(), the array is simply stepped through, and each element is individually put into the FIFO queue by invoking add() (lines 52–54). This could be done in a more efficient manner that directly works with queue, head, tail, and size (see ByteFIFO for one technique).

The remove() method (lines 57–73) has been changed slightly to call waitWhileEmpty(), rather than handle the wait() call directly. Otherwise, the remove() in ObjectFIFO is the same as in SimpleObjectFIFO.

The removeAll() method (lines 75–91) supports removing all the items currently in the FIFO queue and returning them in an Object[]. This method does not block—even if the current size is 0. If the queue is empty, a zero-length array will be returned. Inside removeAll(), an Object[] is created based on the current value of size (line 83). Then for each item, remove() is called, and the value returned is stored in this new array (line 86). Finally, the Object[] is returned (line 90). This could be done in a more efficient manner that directly works with queue, head, tail, and size (see ByteFIFO for one technique).

The removeAtLeastOne() method (lines 93–98) is used to wait until at least one item is in the FIFO queue and then to remove and return all the items in the queue. This method will block as long as the queue is empty (line 96). As soon as it is not empty (or if it wasn't empty to start with), removeAll() is invoked, and the Object[] it generates is returned (line 97).

The rest of the methods (waitUntilX and waitWhileX) kindly encapsulate the wait-notify mechanism so that users of ObjectFIFO don't have to burden themselves with synchronized and wait(). One of them, waitUntilEmpty(long msTimeout), takes a timeout value so that the waiting is not indefinite. You should consider extending this functionality to the others.

The waitUntilEmpty(long msTimeout) method (lines 100–120) is used to block until either no more items are in the FIFO queue or until the specified number of milliseconds elapses. If the queue is empty, true is returned; otherwise, false is returned. If the timeout is 0, the indefinite waitUntilEmpty() is called, and after that returns, true is returned (lines 103–106). Otherwise, the "waiting for the full timeout" technique from Chapter 14 is used (lines 109–119).

The ObjectFIFOTest class, in Listing 18.5, is used to demonstrate the ObjectFIFO class.

Example 18.5. ObjectFIFOTest.java—Demonstration Code for ObjectFIFO

  1: public class ObjectFIFOTest extends Object {
  2:
  3:     private static void fullCheck(ObjectFIFO fifo) {
  4:         try {
  5:             // Sync'd to allow messages to print while
  6:             // condition is still true.
  7:             synchronized ( fifo ) {
  8:                 while ( true ) {
  9:                     fifo.waitUntilFull();
 10:                     print("FULL");
 11:                     fifo.waitWhileFull();
 12:                     print("NO LONGER FULL");
 13:                 }
 14:             }
 15:         } catch ( InterruptedException ix ) {
 16:             return;
 17:         }
 18:     }
 19:
 20:     private static void emptyCheck(ObjectFIFO fifo) {
 21:         try {
 22:             // Sync'd to allow messages to print while
 23:             // condition is still true.
 24:             synchronized ( fifo ) {
 25:                 while ( true ) {
 26:                     fifo.waitUntilEmpty();
 27:                     print("EMPTY");
 28:                     fifo.waitWhileEmpty();
 29:                     print("NO LONGER EMPTY");
 30:                 }
 31:             }
 32:         } catch ( InterruptedException ix ) {
 33:             return;
 34:         }
 35:     }
 36: 
 37:     private static void consumer(ObjectFIFO fifo) {
 38:         try {
 39:             print("just entered consumer()");
 40:
 41:             for ( int i = 0; i < 3; i++ ) {
 42:                 synchronized ( fifo ) {
 43:                     Object obj = fifo.remove();
 44:                     print("DATA-OUT - did remove(), obj=" + obj);
 45:                 }
 46:                 Thread.sleep(3000);
 47:             }
 48:
 49:             synchronized ( fifo ) {
 50:                 boolean resultOfWait = fifo.waitUntilEmpty(500);
 51:                 print("did waitUntilEmpty(500), resultOfWait=" +
 52:                         resultOfWait + ", getSize()=" +
 53:                         fifo.getSize());
 54:             }
 55:
 56:             for ( int i = 0; i < 3; i++ ) {
 57:                 synchronized ( fifo ) {
 58:                     Object[] list = fifo.removeAll();
 59:                     print("did removeAll(), list.length=" +
 60:                             list.length);
 61:
 62:                     for ( int j = 0; j < list.length; j++ ) {
 63:                         print("DATA-OUT - list[" + j + "]=" +
 64:                                 list[j]);
 65:                     }
 66:                 }
 67:                 Thread.sleep(100);
 68:             }
 69: 
 70:             for ( int i = 0; i < 3; i++ ) {
 71:                 synchronized ( fifo ) {
 72:                     Object[] list = fifo.removeAtLeastOne();
 73:                     print(
 74:                         "did removeAtLeastOne(), list.length=" +
 75:                         list.length);
 76:
 77:                     for ( int j = 0; j < list.length; j++ ) {
 78:                         print("DATA-OUT - list[" + j + "]=" +
 79:                                 list[j]);
 80:                     }
 81:                 }
 82:                 Thread.sleep(1000);
 83:             }
 84:
 85:             while ( !fifo.isEmpty() ) {
 86:                 synchronized ( fifo ) {
 87:                     Object obj = fifo.remove();
 88:                     print("DATA-OUT - did remove(), obj=" + obj);
 89:                 }
 90:                 Thread.sleep(1000);
 91:             }
 92:
 93:             print("leaving consumer()");
 94:         } catch ( InterruptedException ix ) {
 95:             return;
 96:         }
 97:     }
 98:
 99:     private static void producer(ObjectFIFO fifo) {
100:         try {
101:             print("just entered producer()");
102:             int count = 0;
103:
104:             Object obj0 = new Integer(count);
105:             count++;
106:             synchronized ( fifo ) {
107:                 fifo.add(obj0);
108:                 print("DATA-IN - did add(), obj0=" + obj0);
109:
110:                 boolean resultOfWait = fifo.waitUntilEmpty(500);
111:                 print("did waitUntilEmpty(500), resultOfWait=" +
112:                         resultOfWait + ", getSize()=" +
113:                         fifo.getSize());
114:             }
115: 
116:             for ( int i = 0; i < 10; i++ ) {
117:                 Object obj = new Integer(count);
118:                 count++;
119:                 synchronized ( fifo ) {
120:                     fifo.add(obj);
121:                     print("DATA-IN - did add(), obj=" + obj);
122:                 }
123:                 Thread.sleep(1000);
124:             }
125:
126:             Thread.sleep(2000);
127:
128:             Object obj = new Integer(count);
129:             count++;
130:             synchronized ( fifo ) {
131:                 fifo.add(obj);
132:                 print("DATA-IN - did add(), obj=" + obj);
133:             }
134:             Thread.sleep(500);
135:
136:             Integer[] list1 = new Integer[3];
137:             for ( int i = 0; i < list1.length; i++ ) {
138:                 list1[i] = new Integer(count);
139:                 count++;
140:             }
141:
142:             synchronized ( fifo ) {
143:                 fifo.addEach(list1);
144:                 print("did addEach(), list1.length=" +
145:                         list1.length);
146:             }
147:
148:             Integer[] list2 = new Integer[8];
149:             for ( int i = 0; i < list2.length; i++ ) {
150:                 list2[i] = new Integer(count);
151:                 count++;
152:             }
153: 
154:             synchronized ( fifo ) {
155:                 fifo.addEach(list2);
156:                 print("did addEach(), list2.length=" +
157:                         list2.length);
158:             }
159:
160:             synchronized ( fifo ) {
161:                 fifo.waitUntilEmpty();
162:                 print("fifo.isEmpty()=" + fifo.isEmpty());
163:             }
164:
165:             print("leaving producer()");
166:         } catch ( InterruptedException ix ) {
167:             return;
168:         }
169:     }
170:
171:     private static synchronized void print(String msg) {
172:         System.out.println(
173:             Thread.currentThread().getName() + ": " + msg);
174:     }
175:
176:     public static void main(String[] args) {
177:         final ObjectFIFO fifo = new ObjectFIFO(5);
178:
179:         Runnable fullCheckRunnable = new Runnable() {
180:                 public void run() {
181:                     fullCheck(fifo);
182:                 }
183:             };
184:
185:         Thread fullCheckThread =
186:                 new Thread(fullCheckRunnable, "fchk");
187:         fullCheckThread.setPriority(9);
188:         fullCheckThread.setDaemon(true); // die automatically
189:         fullCheckThread.start();
190:
191:         Runnable emptyCheckRunnable = new Runnable() {
192:                 public void run() {
193:                     emptyCheck(fifo);
194:                 }
195:             };
196:
197:         Thread emptyCheckThread =
198:                 new Thread(emptyCheckRunnable, "echk");
199:         emptyCheckThread.setPriority(8);
200:         emptyCheckThread.setDaemon(true); // die automatically
201:         emptyCheckThread.start();
202:
203:         Runnable consumerRunnable = new Runnable() {
204:                 public void run() {
205:                     consumer(fifo);
206:                 }
207:             };
208: 
209:         Thread consumerThread =
210:                 new Thread(consumerRunnable, "cons");
211:         consumerThread.setPriority(7);
212:         consumerThread.start();
213:
214:         Runnable producerRunnable = new Runnable() {
215:                 public void run() {
216:                     producer(fifo);
217:                 }
218:             };
219:
220:         Thread producerThread =
221:                 new Thread(producerRunnable, "prod");
222:         producerThread.setPriority(6);
223:         producerThread.start();
224:     }
225: }

ObjectFIFOTest starts four threads to simultaneously access an ObjectFIFO created with a capacity of 5 items (line 177). The fullCheckThread (lines 179–189) is used to run the fullCheck() method (lines 3–18). The fullCheck() method first waits until the FIFO queue is full (line 9) and then prints a message (line 10). It then waits until the FIFO queue is no longer full (line 11) and then prints a message (line 12). It continues to loop through these two checks indefinitely (line 8).

Next, the "main" thread creates the emptyCheckThread (lines 191–201) to run emptyCheck() (lines 20–35). This method works just like fullCheck() but instead prints messages when the FIFO queue transitions to and from the empty state.

The consumerThread is then created (lines 203–212) to run consumer() (lines 37–97). Inside the consumer() method, the thread exercises all the removeX methods.

Finally the producerThread is created (lines 214–223) to run producer() (lines 99–169). Inside the producer() method, several of the addX methods are used to sporadically add items to the queue.

The print() method (lines 171–174) is used to produce all the console output. It prefixes each message with the name of the thread that sent the message.

Listing 18.6 shows the output from a particular run of ObjectFIFOTest. Your output is likely to differ slightly because the four threads running will be scheduled somewhat randomly. The main thing to notice is that all the items added are eventually removed in the proper order.

Example 18.6. Possible Output from ObjectFIFOTest

 1: echk: EMPTY
 2: cons: just entered consumer()
 3: prod: just entered producer()
 4: prod: DATA-IN - did add(), obj0=0
 5: echk: NO LONGER EMPTY
 6: cons: DATA-OUT - did remove(), obj=0
 7: echk: EMPTY
 8: prod: did waitUntilEmpty(500), resultOfWait=true, getSize()=0
 9: prod: DATA-IN - did add(), obj=1
10: echk: NO LONGER EMPTY
11: prod: DATA-IN - did add(), obj=2
12: prod: DATA-IN - did add(), obj=3
13: cons: DATA-OUT - did remove(), obj=1
14: prod: DATA-IN - did add(), obj=4
15: prod: DATA-IN - did add(), obj=5
16: prod: DATA-IN - did add(), obj=6
17: fchk: FULL
18: cons: DATA-OUT - did remove(), obj=2
19: fchk: NO LONGER FULL
20: prod: DATA-IN - did add(), obj=7
21: fchk: FULL
22: cons: did waitUntilEmpty(500), resultOfWait=false, getSize()=5
23: cons: did removeAll(), list.length=5
24: cons: DATA-OUT - list[0]=3
25: cons: DATA-OUT - list[1]=4
26: cons: DATA-OUT - list[2]=5
27: cons: DATA-OUT - list[3]=6
28: cons: DATA-OUT - list[4]=7
29: fchk: NO LONGER FULL
30: echk: EMPTY
31: prod: DATA-IN - did add(), obj=8
32: echk: NO LONGER EMPTY
33: cons: did removeAll(), list.length=1
34: cons: DATA-OUT - list[0]=8
35: echk: EMPTY
36: cons: did removeAll(), list.length=0
37: prod: DATA-IN - did add(), obj=9
38: echk: NO LONGER EMPTY
39: cons: did removeAtLeastOne(), list.length=1
40: cons: DATA-OUT - list[0]=9
41: echk: EMPTY
42: prod: DATA-IN - did add(), obj=10
43: echk: NO LONGER EMPTY
44: cons: did removeAtLeastOne(), list.length=1
45: cons: DATA-OUT - list[0]=10
46: echk: EMPTY
47: prod: DATA-IN - did add(), obj=11
48: echk: NO LONGER EMPTY
49: cons: did removeAtLeastOne(), list.length=1
50: cons: DATA-OUT - list[0]=11
51: echk: EMPTY
52: prod: did addEach(), list1.length=3
53: echk: NO LONGER EMPTY
54: fchk: FULL
55: cons: DATA-OUT - did remove(), obj=12
56: fchk: NO LONGER FULL
57: fchk: FULL
58: cons: DATA-OUT - did remove(), obj=13
59: fchk: NO LONGER FULL
60: fchk: FULL
61: cons: DATA-OUT - did remove(), obj=14
62: fchk: NO LONGER FULL
63: fchk: FULL
64: cons: DATA-OUT - did remove(), obj=15
65: fchk: NO LONGER FULL
66: fchk: FULL
67: cons: DATA-OUT - did remove(), obj=16
68: fchk: NO LONGER FULL
69: fchk: FULL
70: cons: DATA-OUT - did remove(), obj=17
71: fchk: NO LONGER FULL
72: prod: did addEach(), list2.length=8
73: fchk: FULL
74: cons: DATA-OUT - did remove(), obj=18
75: fchk: NO LONGER FULL
76: cons: DATA-OUT - did remove(), obj=19
77: cons: DATA-OUT - did remove(), obj=20
78: cons: DATA-OUT - did remove(), obj=21
79: cons: DATA-OUT - did remove(), obj=22
80: echk: EMPTY
81: prod: fifo.isEmpty()=true
82: prod: leaving producer()
83: cons: leaving consumer()

A FIFO Queue for Bytes: ByteFIFO

The class ByteFIFO is very similar to ObjectFIFO, except that it holds byte values instead of object references. It is much more efficient to store bytes directly, rather than wrap them in Byte instances and store them as references. The public API for ByteFIFO consists of the following:

public ByteFIFO(int cap)
public int getCapacity()
public synchronized int getSize()
public synchronized boolean isEmpty()
public synchronized boolean isFull()
public synchronized void add(byte b) throws InterruptedException
public synchronized void add(byte[] list) throws InterruptedException
public synchronized byte remove() throws InterruptedException
public synchronized byte[] removeAll()
public synchronized byte[] removeAtLeastOne() throws InterruptedException
public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException
public synchronized void waitUntilEmpty() throws InterruptedException
public synchronized void waitWhileEmpty() throws InterruptedException
public synchronized void waitUntilFull() throws InterruptedException
public synchronized void waitWhileFull() throws InterruptedException

This is pretty much the same API as ObjectFIFO, except that byte and byte[] are passed and returned instead of Object and Object[]. In addition, the addEach() method is gone; it is replaced with an overloaded add() method that takes a byte[] as the parameter.

All the methods (except getCapacity()) are synchronized to ensure that multiple threads can safely and simultaneously interact with ByteFIFO. The getCapacity() method does not have to be synchronized because the capacity never changes. The methods that might block, waiting for something to change, declare that they will throw an InterruptedException if interrupted while waiting. Listing 18.7 shows the code for ByteFIFO.

Example 18.7. ByteFIFO.java—A FIFO Queue That Holds Bytes

  1: public class ByteFIFO extends Object {
  2:     private byte[] queue;
  3:     private int capacity;
  4:     private int size;
  5:     private int head;
  6:     private int tail;
  7:
  8:     public ByteFIFO(int cap) {
  9:         capacity = ( cap > 0 ) ? cap : 1; // at least 1
 10:         queue = new byte[capacity];
 11:         head = 0;
 12:         tail = 0;
 13:         size = 0;
 14:     }
 15:
 16:     public int getCapacity() {
 17:         return capacity;
 18:     }
 19:
 20:     public synchronized int getSize() {
 21:         return size;
 22:     }
 23:
 24:     public synchronized boolean isEmpty() {
 25:         return ( size == 0 );
 26:     }
 27:
 28:     public synchronized boolean isFull() {
 29:         return ( size == capacity );
 30:     }
 31:
 32:     public synchronized void add(byte b)
 33:             throws InterruptedException {
 34: 
 35:         waitWhileFull();
 36:
 37:         queue[head] = b;
 38:         head = ( head + 1 ) % capacity;
 39:         size++;
 40:
 41:         notifyAll(); // let any waiting threads know about change
 42:     }
 43:
 44:     public synchronized void add(byte[] list)
 45:             throws InterruptedException {
 46:
 47:         // For efficiency, the bytes are copied in blocks
 48:         // instead of one at a time. As space becomes available,
 49:         // more bytes are copied until all of them have been
 50:         // added.
 51:
 52:         int ptr = 0;
 53:
 54:         while ( ptr < list.length ) {
 55:             // If full, the lock will be released to allow
 56:             // another thread to come in and remove bytes.
 57:             waitWhileFull();
 58:
 59:             int space = capacity - size;
 60:             int distToEnd = capacity - head;
 61:             int blockLen = Math.min(space, distToEnd);
 62:
 63:             int bytesRemaining = list.length - ptr;
 64:             int copyLen = Math.min(blockLen, bytesRemaining);
 65:
 66:             System.arraycopy(list, ptr, queue, head, copyLen);
 67:             head = ( head + copyLen ) % capacity;
 68:             size += copyLen;
 69:             ptr += copyLen;
 70:
 71:             // Keep the lock, but let any waiting threads
 72:             // know that something has changed.
 73:             notifyAll();
 74:         }
 75:     }
 76:
 77:     public synchronized byte remove()
 78:             throws InterruptedException {
 79:
 80:         waitWhileEmpty();
 81:
 82:         byte b = queue[tail];
 83:         tail = ( tail + 1 ) % capacity;
 84:         size—;
 85:
 86:         notifyAll(); // let any waiting threads know about change
 87:
 88:         return b;
 89:     }
 90:
 91:     public synchronized byte[] removeAll() {
 92:         // For efficiency, the bytes are copied in blocks
 93:         // instead of one at a time.
 94:
 95:         if ( isEmpty() ) {
 96:             // Nothing to remove, return a zero-length
 97:             // array and do not bother with notification
 98:             // since nothing was removed.
 99:             return new byte[0];
100:         }
101:
102:         // based on the current size
103:         byte[] list = new byte[size];
104:
105:         // copy in the block from tail to the end
106:         int distToEnd = capacity - tail;
107:         int copyLen = Math.min(size, distToEnd);
108:         System.arraycopy(queue, tail, list, 0, copyLen);
109:
110:         // If data wraps around, copy the remaining data
111:         // from the front of the array.
112:         if ( size > copyLen ) {
113:             System.arraycopy(
114:                     queue, 0, list, copyLen, size - copyLen);
115:         }
116:
117:         tail = ( tail + size ) % capacity;
118:         size = 0; // everything has been removed
119:
120:         // Signal any and all waiting threads that
121:         // something has changed.
122:         notifyAll();
123: 
124:         return list;
125:     }
126:
127:     public synchronized byte[] removeAtLeastOne()
128:             throws InterruptedException {
129:
130:         waitWhileEmpty(); // wait for at least one to be in FIFO
131:         return removeAll();
132:     }
133:
134:     public synchronized boolean waitUntilEmpty(long msTimeout)
135:             throws InterruptedException {
136:
137:         if ( msTimeout == 0L ) {
138:             waitUntilEmpty();  // use other method
139:             return true;
140:         }
141:
142:         // wait only for the specified amount of time
143:         long endTime = System.currentTimeMillis() + msTimeout;
144:         long msRemaining = msTimeout;
145:
146:         while ( !isEmpty() && ( msRemaining > 0L ) ) {
147:             wait(msRemaining);
148:             msRemaining = endTime - System.currentTimeMillis();
149:         }
150:
151:         // May have timed out, or may have met condition,
152:         // calc return value.
153:         return isEmpty();
154:     }
155:
156:     public synchronized void waitUntilEmpty()
157:             throws InterruptedException {
158:
159:         while ( !isEmpty() ) {
160:             wait();
161:         }
162:     }
163:
164:     public synchronized void waitWhileEmpty()
165:             throws InterruptedException {
166: 
167:         while ( isEmpty() ) {
168:             wait();
169:         }
170:     }
171:
172:     public synchronized void waitUntilFull()
173:             throws InterruptedException {
174:
175:         while ( !isFull() ) {
176:             wait();
177:         }
178:     }
179:
180:     public synchronized void waitWhileFull()
181:             throws InterruptedException {
182:
183:         while ( isFull() ) {
184:             wait();
185:         }
186:     }
187: }

The getCapacity(), getSize(), isEmpty(), isFull(), waitUntilEmpty(long msTimeout), waitUntilEmpty(), waitWhileEmpty(), waitUntilFull(), and waitWhileFull() methods work exactly the same as in ObjectFIFO (see the descriptions earlier in this chapter). The removeAtLeastOne() method (lines 127–132) differs only in that it returns a byte[]. The add(byte b) method (lines 32–42) differs only in that it is passed a byte (line 32) and stores a byte (line 37).

The remove() method (lines 77–89) is much the same as before but instead handles bytes. In addition, it no longer has to set the vacated cell to null (or any other value) because the values in a byte[] don't interfere with garbage collection.

The add(byte[] list) method (lines 44–75) efficiently stores the values directly into the queue, rather than repeatedly invoking add(byte b). If list has a length greater than 0, the while loop is entered. Inside the while, waitWhileFull() is invoked (line 57) to block and wait, if necessary, for more space to become available. The number of open cells (space) is calculated (line 59). The number of cells (distToEnd) between the current position of head and the end of the array is calculated (line 60). The lesser of space and distToEnd is used to calculate blockLen, which is the largest block that can be copied in one operation (line 61).

Next, the number of bytes that still have to be copied (bytesRemaining) from list into queue is calculated (line 63). The number of bytes that will be copied this time through the loop (copyLen) is the lesser of blockLen and bytesRemaining (line 64). The actual copying from list into queue is performed for copyLen bytes (line 66). The values for head, size, and ptr are all adjusted based on copyLen (lines 67–69). Any and all waiting threads are notified that the state of the FIFO queue has changed (line 73). The while loop continues until all the bytes in list have been copied into queue. Each time through the while loop, the thread may block in waitWhileFull() to allow another thread to come in and remove some bytes to make more space.

The removeAll() method (lines 91–125) efficiently copies bytes from queue into a new byte array. Note that it has no need to throw an InterruptedException because it never blocks. This byte array has a length equal to the current number of bytes in the FIFO queue—possibly a length of 0 if the FIFO queue is currently empty (lines 95–100). If the queue is not empty, a byte[] is created, with a length equal to the current number of items in the queue (line 103). The number of cells from the tail pointer to the end of the array is calculated and stored in distToEnd (line 106). The number of bytes to copy (copyLen) is the minimum of distToEnd and size (line 107). These bytes are copied into list, which will be returned (line 108). If more bytes have to be copied from the beginning of the queue because the data wrapped around, they are copied based on the difference between size and copyLen (lines 112–115). The tail pointer is adjusted by size to reflect the removal of the bytes (line 117). The size is set to 0 because the FIFO queue is now empty (line 118). Any and all waiting threads are notified of the changes to the FIFO queue (line 122). Finally, the array containing the copied bytes is returned (line 124).

ByteFIFOTest, in Listing 18.8, is used to demonstrate some of the functionality of ByteFIFO. Basically, a set of strings is serialized and passed by one thread through a ByteFIFO to another thread. This other thread gathers up the bytes as they come through the FIFO queue and reconstructs the strings.

Example 18.8. ByteFIFOTest.java—Code to Demonstrate ByteFIFO

  1: import java.io.*;
  2:
  3: public class ByteFIFOTest extends Object {
  4:     private ByteFIFO fifo;
  5:     private byte[] srcData;
  6:
  7:     public ByteFIFOTest() throws IOException {
  8:         fifo = new ByteFIFO(20);
  9:
 10:         makeSrcData();
 11:         System.out.println("srcData.length=" + srcData.length);
 12:
 13:         Runnable srcRunnable = new Runnable() {
 14:                 public void run() {
 15:                     src();
 16:                 }
 17:             };
 18:         Thread srcThread = new Thread(srcRunnable);
 19:         srcThread.start();
 20:
 21:         Runnable dstRunnable = new Runnable() {
 22:                 public void run() {
 23:                     dst();
 24:                 }
 25:             };
 26:         Thread dstThread = new Thread(dstRunnable);
 27:         dstThread.start();
 28:     }
 29:
 30:     private void makeSrcData() throws IOException {
 31:         String[] list = {
 32:                 "The first string is right here",
 33:          "The second string is a bit longer and also right here",
 34:                 "The third string",
 35:                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ",
 36:                 "0123456789",
 37:                 "The last string in the list"
 38:             };
 39: 
 40:         ByteArrayOutputStream baos = new ByteArrayOutputStream();
 41:         ObjectOutputStream oos = new ObjectOutputStream(baos);
 42:         oos.writeObject(list);
 43:         oos.flush();
 44:         oos.close();
 45:
 46:         srcData = baos.toByteArray();
 47:     }
 48:
 49:     private void src() {
 50:         try {
 51:             boolean justAddOne = true;
 52:             int count = 0;
 53:
 54:             while ( count < srcData.length ) {
 55:                 if ( !justAddOne ) {
 56:                     int writeSize = (int) ( 40.0 * Math.random() );
 57:                     writeSize = Math.min(writeSize, srcData.length - count);
 58:
 59:                     byte[] buf = new byte[writeSize];
 60:                     System.arraycopy( srcData, count, buf, 0, writeSize);
 61:                     fifo.add(buf);
 62:                     count += writeSize;
 63:
 64:                     System.out.println( "just added " + writeSize + " bytes");
 65:                 } else {
 66:                     fifo.add(srcData[count]);
 67:                     count++;
 68:
 69:                     System.out.println( "just added exactly 1 byte");
 70:                 }
 71:
 72:                 justAddOne = !justAddOne;
 73:             }
 74:         } catch ( InterruptedException x ) {
 75:             x.printStackTrace();
 76:         }
 77:     }
 78: 
 79:     private void dst() {
 80:         try {
 81:             boolean justAddOne = true;
 82:             int count = 0;
 83:             byte[] dstData = new byte[srcData.length];
 84:
 85:             while ( count < dstData.length ) {
 86:                 if ( !justAddOne ) {
 87:                     byte[] buf = fifo.removeAll();
 88:                     if ( buf.length > 0 ) {
 89:                         System.arraycopy(buf, 0, dstData, count, buf.length);
 90:                         count += buf.length;
 91:                     }
 92:
 93:                     System.out.println(
 94:                         "just removed " + buf.length + " bytes");
 95:                 } else {
 96:                     byte b = fifo.remove();
 97:                     dstData[count] = b;
 98:                     count++;
 99:
100:                     System.out.println(
101:                         "just removed exactly 1 byte");
102:                 }
103:
104:                 justAddOne = !justAddOne;
105:             }
106:
107:             System.out.println( "received all data, count=" + count);
108:
109:             ObjectInputStream ois = new ObjectInputStream(
110:                     new ByteArrayInputStream(dstData));
111:
112:             String[] line = (String[]) ois.readObject();
113:
114:             for ( int i = 0; i < line.length; i++ ) {
115:                 System.out.println("line[" + i + "]=" + line[i]);
116:             }
117:         } catch ( ClassNotFoundException x1 ) {
118:             x1.printStackTrace();
119:         } catch ( IOException iox ) {
120:             iox.printStackTrace();
121:         } catch ( InterruptedException x ) {
122:             x.printStackTrace();
123:         }
124:     }
125:
126:     public static void main(String[] args) {
127:         try {
128:             new ByteFIFOTest();
129:         } catch ( IOException iox ) {
130:             iox.printStackTrace();
131:         }
132:     }
133: }

The constructor for ByteFIFOTest creates a relatively small ByteFIFO with a capacity of 20 (line 8) for transferring data from one thread to another. The makeSrcData() method is called (line 10) to load srcData (line 5) with the bytes that will be pushed through the ByteFIFO. Next, a thread is created to run the src() method, and another thread is created to run the dst() method.

Inside makeSrcData() (lines 30–47), an array of strings (lines 31–38) is created and written to an ObjectOutputStream (lines 41–44). The ObjectOutputStream() passes the serialized data on to a ByteArrayOutputStream() (line 40). The bytes collected are turned into a byte[] and stored in srcData (line 46).

The src() method (lines 49–77) takes the bytes from srcData and pushes them into the FIFO queue. It alternates between adding a single byte and adding a byte array each time through the while loop by toggling the justAddOne variable (lines 51, 55, and 72). The size of the byte[] to be added is randomly determined (line 56) to keep things interesting. As data is added to the ByteFIFO, messages are printed (lines 64 and 69). This method completes when all the bytes in srcData have been added to the FIFO queue.

The dst() method (lines 79–124) removes the bytes from the ByteFIFO, stores them in a local array, and then de-serializes the object to confirm its successful transmission. The dst() method alternates between remove() and removeAll() each time through the while loop. The looping continues until the specified number of bytes has been removed (lines 83–85). As data is removed from the ByteFIFO, messages are printed (lines 93–94, 100, and 101). When all the bytes have been retrieved, they are used to create a ByteArrayInputStream that is, in turn, used to create an ObjectInputStream (lines 109–110). The one object that is serialized is a String[], and an attempt to read it back and cast it into its proper type is made (line 112). The String[] is then iterated through, and each string is printed to confirm uncorrupted delivery (lines 114–116).

Listing 18.9 shows possible output when ByteFIFOTest is run. Your output is likely to differ a bit, but ultimately, the list of strings printed at the end should match exactly.

Example 18.9. Possible Output from Running ByteFIFOTest

 1: srcData.length=224
 2: just added exactly 1 byte
 3: just removed exactly 1 byte
 4: just removed 19 bytes
 5: just added 26 bytes
 6: just added exactly 1 byte
 7: just added 7 bytes
 8: just added exactly 1 byte
 9: just removed exactly 1 byte
10: just removed 20 bytes
11: just added 20 bytes
12: just removed exactly 1 byte
13: just added exactly 1 byte
14: just removed 15 bytes
15: just added 18 bytes
16: just removed exactly 1 byte
17: just added exactly 1 byte
18: just removed 18 bytes
19: just removed exactly 1 byte
20: just added 21 bytes
21: just removed 20 bytes
22: just added exactly 1 byte
23: just removed exactly 1 byte
24: just added 0 bytes
25: just removed 0 bytes
26: just added exactly 1 byte
27: just removed exactly 1 byte
28: just removed 20 bytes
29: just added 33 bytes
30: just removed exactly 1 byte
31: just added exactly 1 byte
32: just removed 13 bytes
33: just removed exactly 1 byte
34: just removed 20 bytes
35: just added 23 bytes
36: just added exactly 1 byte
37: just removed exactly 1 byte
38: just removed 19 bytes
39: just added 24 bytes
40: just added exactly 1 byte
41: just added 5 bytes
42: just added exactly 1 byte
43: just removed exactly 1 byte
44: just added 6 bytes
45: just removed 19 bytes
46: just added exactly 1 byte
47: just removed exactly 1 byte
48: just added 20 bytes
49: just removed 20 bytes
50: just added exactly 1 byte
51: just removed exactly 1 byte
52: just added 8 bytes
53: just removed 8 bytes
54: received all data, count=224
55: line[0]=The first string is right here
56: line[1]=The second string is a bit longer and also right here
57: line[2]=The third string
58: line[3]=ABCDEFGHIJKLMNOPQRSTUVWXYZ
59: line[4]=0123456789
60: line[5]=The last string in the list

Summary

First-In-First-Out queues that can be simultaneously and safely accessed by multiple threads play a useful role in system development. In this chapter, I showed you two types of FIFO queues: one that holds object references (ObjectFIFO) and one that holds byte values (ByteFIFO). These implementations conveniently encapsulate the complexity of the wait-notify mechanism to simplify the use of the classes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.197.135