5.9. Using a Blocking Buffer

Problem

Your system needs to wait for input and act on an object the moment it is added to a Buffer. To achieve this, you need your application to block until input is received.

Solution

Use BlockingBuffer to decorate an instance of Buffer. When a process calls get( ) or remove( ) on a buffer decorated with BlockingBuffer, the decorated buffer does not return a value until it has an object to return. The following example creates a BlockingBuffer and a listener that calls remove( ). A BlockingBuffer can only be demonstrated by an example that deals with multiple threads, and the following code uses a Runnable implementation, BufferListener, which is defined in Example 5-8:

import java.util.*;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.buffers.BlockingBuffer;
import org.apache.commons.collections.buffers.BoundedFifoBuffer;

// Create a Blocking Buffer
Buffer buffer = BlockingBuffer.decorate( new BoundedFifoBuffer( ) );

// Create Thread to continously remove( ) from the previous Buffer
               BufferListener listener = new BufferListener(buffer);
Thread listenerThread = new Thread( listener );
listenerThread.start( );

buffer.add( "Hello World!" );
buffer.add( "Goodbye, Y'all." );

The previous example creates an instance of BufferListener—a Runnable object that calls remove( ) on a BoundedFifoBuffer decorated with BlockingBuffer. The listenerThread will block on a call to buffer.remove( ) within the run( ) method of BufferListener , an object that runs in a separate thread and waits for objects to be added to a BlockingBuffer.

Example 5-8. A BufferListener constantly calling remove( )

public class BufferListener implements Runnable {
    private Buffer buffer;

    public BufferListener(Buffer buffer) {
        this.buffer = buffer;
    }

    public void run( ) {
        while(true) {
            String message = (String) buffer.remove( );
            System.out.println( message );
        }
    }
}

The two calls to buffer.add( ) causes BufferListener to print the strings added:

Hello World!
Goodbye, Y'all.

Discussion

A BlockingBuffer is used in a system that needs to act on a piece a data as soon as it is available, and this data structure comes in handy when there are a series of worker threads listening to buffers between components in a pipeline. BlockingBuffer allows you to build cascading pipelines, which automatically notify the next stage of available data. Think of this pattern as a stepped waterfall; water automatically flows down the steps, and each step is a Buffer. (See Figure 5-2.)

Using a BlockingBuffer to create a pipeline

Figure 5-2. Using a BlockingBuffer to create a pipeline

Assume that you need to write a workflow application for a news publisher; the workflow consists of a pipeline: news stories are published as XML files, which are passed to a search indexer and then processed with an XSLT stylesheet. A news story is simply passed as a String containing an XML document. The following example creates a pipeline consisting of two BlockingBuffer instances terminated by an UnboundedFifoBuffer:

import java.util.*;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.buffers.BlockingBuffer;
import org.apache.commons.collections.buffers.UnboundedFifoBuffer;

// Create a Blocking Buffer for each stage, last stage not a blocking buffer
Buffer published = BlockingBuffer.decorate( new UnboundedFifoBuffer( ) );
Buffer indexed = BlockingBuffer.decorate( new UnboundedFifoBuffer( ) );
Buffer transformed = new UnboundedFifoBuffer( );

// Create a Thread that will watch the published Buffer and index a news story
Indexer indexer = new Indexer(published, indexed);
Thread indexerThread = new Thread( indexer );
indexerThread.start( );

// Create a Thread that will watch the indexed Buffer and style a news story
Styler styler = new Styler(index, transformed);
Thread stylerThread = new Thread( styler );
stylerThread.start( );

String newsStory = getNewsStory( );

published.add( newsStory );

The previous example creates three buffers to hold the results of the stages of a pipeline—published, indexed, and transformed. Three Runnable objects are created to perform the task of processing each news story; the Indexer object listens to the published buffer and places its results in the indexed buffer, and the Styler object listens to the indexed buffer and places its results in the transformed buffer. The Indexer object implements Runnable and is constructed with two Buffer objects, inbound and outbound. The Indexer, as shown in Example 5-9, continuously calls remove( ) on a BlockingBuffer and waits until a story is available to process.

Example 5-9. An Indexer stage in a pipeline

public class Indexer implements Runnnable {
    private Buffer inbound;
    private Buffer outbound;

    public Indexer(Buffer inbound, Buffer outbound) {
        this.inbound = inbound;
        this.outbound = outbound;
    }

    public void run( ) {
        while(true) {
            String story = (String) inbound.remove( );
                String processedStory = processStory( story );
            outbound.add( processedStory );
        }
    }

    public String processedStory(String story) {
        // Run story through a search indexer
        return story;
    }
}

The Styler is omitted because it follows the exact same pattern. Every stage in this pipeline is a Runnable implementation running in a thread and listening to an inbound buffer by calling (and blocking) on inbound.remove( ). Using this mechanism allows your system to process information in parallel by running separate stages in separate threads, and there is no need for a controller to coordinate the actions of a complex system. A pipeline can be extended by simply adding another stage with an additional BlockingBuffer. This pattern is useful in a system that models a very complex workflow; instead of attempting to capture a complex symphony of coordination, break the system into autonomous stages that only know about inputs and outputs.

See Also

A BlockingBuffer in Jakarta Commons Collections is analogous to a BlockingQueue in Java 5.0. A BlockingQueue in Java 5.0 has an important feature that is missing in Commons Collections 3.0 implementation of BlockingBuffer: in Java 5.0’s BlockingQueue, you can specify a timeout when adding and removing values from a queue.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.165.62