Your system needs to wait
for input and act on an object the moment it is added to a
Buffer
. To achieve this, you need your application
to block until input is received.
Use BlockingBuffer
to decorate an instance of
Buffer
. When a process calls get( )
or remove( )
on a buffer decorated
with BlockingBuffer
, the decorated buffer does not
return a value until it has an object to return. The following
example creates a BlockingBuffer
and a listener
that calls remove( )
. A
BlockingBuffer
can only be demonstrated by an
example that deals with multiple threads, and the following code uses
a Runnable
implementation,
BufferListener
, which is defined in Example 5-8:
import java.util.*; import org.apache.commons.collections.Buffer; import org.apache.commons.collections.buffers.BlockingBuffer; import org.apache.commons.collections.buffers.BoundedFifoBuffer; // Create a Blocking Buffer Buffer buffer = BlockingBuffer.decorate( new BoundedFifoBuffer( ) );// Create Thread to continously remove( ) from the previous Buffer
BufferListener listener = new BufferListener(buffer);
Thread listenerThread = new Thread( listener ); listenerThread.start( ); buffer.add( "Hello World!" ); buffer.add( "Goodbye, Y'all." );
The previous example creates an instance of
BufferListener
—a Runnable
object that calls remove( )
on a
BoundedFifoBuffer
decorated with
BlockingBuffer
. The
listenerThread
will block on a call to
buffer.remove( )
within the run( )
method
of
BufferListener
,
an object that runs in a separate thread and waits for objects to be
added to a BlockingBuffer
.
Example 5-8. A BufferListener constantly calling remove( )
public class BufferListener implements Runnable {
private Buffer buffer;
public BufferListener(Buffer buffer) {
this.buffer = buffer;
}
public void run( ) {
while(true) {
String message = (String) buffer.remove( );
System.out.println( message );
}
}
}
The two calls to buffer.add( )
causes
BufferListener
to print the strings added:
Hello World! Goodbye, Y'all.
A BlockingBuffer
is used in a system that needs to
act on a piece a data as soon as it is available, and this data
structure comes in handy when there are a series of worker threads
listening to buffers between components in a pipeline.
BlockingBuffer
allows you to build cascading
pipelines, which automatically notify the next stage of available
data. Think of this pattern as a stepped waterfall; water
automatically flows down the steps, and each step is a
Buffer
. (See Figure 5-2.)
Assume that you need to write a workflow application for a news
publisher; the workflow consists of a pipeline: news stories are
published as XML files, which are passed to a search indexer and then
processed with an XSLT stylesheet. A news story is simply passed as a
String
containing an XML document. The following
example creates a pipeline consisting of two
BlockingBuffer
instances terminated by an
UnboundedFifoBuffer
:
import java.util.*; import org.apache.commons.collections.Buffer; import org.apache.commons.collections.buffers.BlockingBuffer; import org.apache.commons.collections.buffers.UnboundedFifoBuffer; // Create a Blocking Buffer for each stage, last stage not a blocking buffer Buffer published = BlockingBuffer.decorate( new UnboundedFifoBuffer( ) ); Buffer indexed = BlockingBuffer.decorate( new UnboundedFifoBuffer( ) ); Buffer transformed = new UnboundedFifoBuffer( ); // Create a Thread that will watch the published Buffer and index a news story Indexer indexer = new Indexer(published, indexed); Thread indexerThread = new Thread( indexer ); indexerThread.start( ); // Create a Thread that will watch the indexed Buffer and style a news story Styler styler = new Styler(index, transformed); Thread stylerThread = new Thread( styler ); stylerThread.start( ); String newsStory = getNewsStory( ); published.add( newsStory );
The previous example creates three buffers to hold the results of the
stages of a pipeline—published
,
indexed
, and transformed
. Three
Runnable
objects are created to perform the task
of processing each news story; the Indexer
object
listens to the published
buffer and places its
results in the indexed
buffer, and the
Styler
object listens to the
indexed
buffer and places its results in the
transformed
buffer. The Indexer
object implements
Runnable
and is constructed with two
Buffer
objects, inbound
and
outbound
. The Indexer
, as shown
in Example 5-9, continuously calls remove( )
on a BlockingBuffer
and waits until a
story is available to process.
Example 5-9. An Indexer stage in a pipeline
public class Indexer implements Runnnable { private Buffer inbound; private Buffer outbound; public Indexer(Buffer inbound, Buffer outbound) { this.inbound = inbound; this.outbound = outbound; } public void run( ) { while(true) { String story = (String) inbound.remove( ); String processedStory = processStory( story ); outbound.add( processedStory ); } } public String processedStory(String story) { // Run story through a search indexer return story; } }
The Styler
is omitted because it follows the exact
same pattern. Every stage in this pipeline is a
Runnable
implementation running in a thread and
listening to an inbound buffer by calling (and blocking) on
inbound.remove( )
. Using this mechanism allows
your system to process information in parallel by running separate
stages in separate threads, and there is no need for a controller to
coordinate the actions of a complex system. A pipeline can be
extended by simply adding another stage with an additional
BlockingBuffer
. This pattern is useful in a system
that models a very complex workflow; instead of attempting to capture
a complex symphony of coordination, break the system into autonomous
stages that only know about inputs and outputs.
A BlockingBuffer
in Jakarta Commons Collections is
analogous to a BlockingQueue
in Java 5.0. A
BlockingQueue
in Java 5.0 has an important feature
that is missing in Commons Collections 3.0 implementation of
BlockingBuffer
: in Java 5.0’s
BlockingQueue
, you can specify a timeout when
adding and
removing values from a queue.
18.191.165.62