The ACE Streams framework implements the Pipes and Filters pattern described in [1]. The framework is an excellent way to model processes consisting of a set of ordered steps. Each step, or filter in Pipes and Filters terminology, in the process is implemented as an ACE_Task
derivative. As each step is completed, the data is handed off to the next step for continuation, using the ACE_Task
objects' message queues, or pipes in Pipes and Filters terminology. Steps can be multithreaded in order to increase throughput if the data lends itself to parallel processing. In this chapter, we explore the following classes: ACE_Stream
, ACE_Module
, ACE_Task
, and ACE_Message_Block
.
Another commonly known implementation of the Pipes and Filters pattern is the UNIX System V STREAMS framework. If you're familiar with System V STREAMS, you will recognize the concepts. The ACE Streams framework allows the flexible, dynamically configurable assembly of a set of modules into a stream through which information travels. Each module has an opportunity to manipulate the data in the stream and can modify it, remove it, or add to it before passing it to the next module in the stream. Data moves bidirectionally in the stream, and each module in the stream has a reader and a writer task—one for each data direction.
The first thing to do when using ACE Streams is to identify the sequence of events you wish to process. The steps should be as discrete as possible, with well-defined outputs and inputs. Where possible, identify and document opportunities for parallel processing within each step.
Once your steps are identified, you can begin implementing each as a derivative of ACE_Task
. The svc()
method of each will use getq()
to get a unit of work and put_next()
to pass the completed work to the next step. At this time, identify which tasks are “downstream” and which are “upstream” in nature. Downstream tasks can be thought of as moving “out of” your primary application as they move down the stream. For instance, a stream implementing a protocol stack would use each task to perform a different stage of protocol conversion. The final task would send the converted data, perhaps via TCP/IP, to a remote peer. Similarly, upstream tasks can be thought of as moving data “into” your primary application.
Module instances, or paired downstream and upstream tasks, can be created once the tasks are implemented. In our protocol conversion example, the downstream tasks would encode our data and the upstream tasks would decode it.
The final step is to create the ACE_Stream
instance and push the ordered list of modules onto it. You can then use put()
to move data onto the stream for processing and obtain the results with get()
. See Figure 18.1.
In this section, we look at an answering machine implementation that uses a one-way stream to record and process messages. Each module of the stream performs one function in the set of functions required to implement the system. The functions are
As defined, each step of the process is very specific in what it must do. This may seem a bit like overkill, but in a more complex application, it is an excellent way to divide the work among team members. Each person can focus on the implementation of his or her own step, as long as the interfaces between each step are well defined.
We're going to work through this example from the top down; that is, we start with main()
and then dig down into successively lower levels:
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
RecordingDevice *recorder =
RecordingDeviceFactory::instantiate (argc, argv);
Here, main()
begins by using the RecordingDeviceFactory
to instantiate a RecordingDevice
instance based on command line parameters.1 Our system may have many kinds of recording devices: voice modems, video phones, e-mail receivers, and so on. The command line parameters tell us which kind of device this instance of the application is communicating with.
RecordingStream *recording_stream;
ACE_NEW_RETURN (recording_stream, RecordingStream, -1);
if (recording_stream->open (0) < 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("RecordingStream->open()")),
0);
Next, our example creates and opens an instance of RecordingStream
. As we will see, this is a simple derivative of ACE_Stream
. If the open()
fails, we will print a brief message and exit with an error code.
The final task of main()
is to enter an infinite loop, waiting for messages to arrive and recording them when they do:
for (;;)
{
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("Waiting for incoming message
")));
RecordingDevice *activeRecorder =
recorder->wait_for_activity ();
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("Initiating recording process
")));
recording_stream->record (activeRecorder);
}
The RecordingDevice::wait_for_activity()
method will block until there is some activity, such as a ring, on the physical device. The RecordingDevice
instance is then given to the RecordingStream
to process our list of directives.
As mentioned, the RecordingStream
object is a simple derivative of ACE_Stream
. We begin with the constructor, which takes care of the details of initializing the base class:
class RecordingStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
typedef ACE_Stream<ACE_MT_SYNCH> inherited;
typedef ACE_Module<ACE_MT_SYNCH> Module;
RecordingStream () : inherited()
{ }
A stream will always have at least two modules installed: head and tail. The default downstream task of the head module simply passes any data down the stream. The default tail module, however, will treat any received data as an error. Our example can prevent this from happening in two ways.
In order to prevent special conditions, which are generally a sign of a bad design, we will use the second option. Therefore, our stream's open()
method will create a Module
with an instance of our EndTask
object and install that at the tail of the stream if no other module is specified:
virtual int open (void *arg,
Module *head = 0, Module *tail = 0)
{
if (tail == 0)
ACE_NEW_RETURN (tail,
Module ("End Module", new EndTask ()),
-1);
this->inherited::open (arg, head, tail);
Our open()
method changes only the default stream tail to suit our needs. Otherwise, the head and tail modules are passed to the inherited open()
method. This allows RecordingStream
to be configured and reused in other situations.
Our design described eight steps in the process, and we've created eight ACE_Task
derivatives to implement those steps. In the next part of open()
, we create an instance of each of these eight objects and a Module
to contain them:
Module *answerIncomingCallModule;
ACE_NEW_RETURN (answerIncomingCallModule,
Module ("Answer Incoming Call",
new AnswerIncomingCall ()),
-1);
Module *getCallerIdModule;
ACE_NEW_RETURN (getCallerIdModule,
Module ("Get Caller ID", new GetCallerId ()),
-1);
Module *playOGMModule;
ACE_NEW_RETURN (playOGMModule,
Module ("Play Outgoing Message",
new PlayOutgoingMessage ()),
-1);
Module *recordModule;
ACE_NEW_RETURN (recordModule,
Module ("Record Incoming Message",
new RecordIncomingMessage ()),
-1);
Module *releaseModule;
ACE_NEW_RETURN (releaseModule,
Module ("Release Device",
new ReleaseDevice ()),
-1);
Module *conversionModule;
ACE_NEW_RETURN (conversionModule,
Module ("Encode Message",
new EncodeMessage ()),
-1);
Module *saveMetaDataModule;
ACE_NEW_RETURN (saveMetaDataModule,
Module ("Save Meta-Data",
new SaveMetaData ()),
-1);
Module *notificationModule;
ACE_NEW_RETURN (notificationModule,
Module ("Notify Someone",
new NotifySomeone ()),
-1);
The general design of the Stream framework is that an ACE_Stream
contains a list of ACE_Module
instances and that each ACE_Module
contains one or two ACE_Task
instances. The tasks can be designated either downstream—invoked as data moves away from the stream's head—or upstream—invoked as data moves toward the stream's head. Our instantiation of getCallerIdModule
follows:
Module *getCallerIdModule;
ACE_NEW_RETURN (getCallerIdModule,
Module ("Get Caller ID", new GetCallerId ()),
-1);
This creates a Module
(ACE_Module<ACE_MT_SYNC>
) with the name “Get Caller ID” and an instance of the GetCallerId
object, which is a derivative of ACE_Task
, as required by the ACE_Module
constructor signature. The GetCallerId
instance is installed as the module's downstream task. We don't provide an upstream task, because our stream will store the recorded messages to disk and not expect anything to return back upstream to main()
.
The final part of our RecordingStream
's open()
method now pushes the modules onto the stream in the correct order:
if (this->push (notificationModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("notificationModule")),
-1);
if (this->push (saveMetaDataModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("saveMetaDataModule")),
-1);
if (this->push (conversionModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("conversionModule")),
-1);
if (this->push (releaseModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("releaseModule")),
-1);
if (this->push (recordModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("recordModule")),
-1);
if (this->push (playOGMModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("playOGMModule")),
-1);
if (this->push (getCallerIdModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT ("getCallerIdModule")),
-1);
if (this->push (answerIncomingCallModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
")
ACE_TEXT ("answerIncomingCallModule")),
-1);
Pushing the modules onto the stream in the correct order is very important. If your process is order dependent and you push the first-used module first, you'll be in for an unpleasant surprise! The push/pop stream terminology is the same as a stack. So be sure that you remember: first pushed, last used.
The remainder of our RecordingStream
is the record()
method. Whereas the constructor protected main()
from the details of stream creation, record()
protects it from direct interaction with the stream API:
int record (RecordingDevice *recorder)
{
ACE_Message_Block * mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1);
Message *message = (Message *)mb->wr_ptr ();
mb->wr_ptr (sizeof(Message));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("RecordingStream::record() - ")
ACE_TEXT ("message->recorder(recorder)
")));
message->recorder (recorder);
int rval = this->put (mb);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("RecordingStream::record() - ")
ACE_TEXT ("this->put() returns %d
"),
rval));
return rval;
}
An ACE_Stream
is, ultimately, a fancy implementation of a linked list of ACE_Task
objects. Each ACE_Task
comes from the manufacturer with a built-in ACE_Message_Queue
at no extra charge. That's primarily because the message queue is such an easy way to request work from a task.2
To give our stream some work to do, we create an ACE_Message_Block
that will go into the first downstream task's message queue with put()
. As our stream will be dealing with recording messages, we create the message block with enough space to contain our Message
3 object. We will basically be using the message block's data area to contain a Message
object's contents.
Once our message is created, we provide it with the RecordingDevice
pointer so that it will be able to ask the physical device to do things during the message processing. There may be many RecordingDevice
implementations, each able to talk to a different kind of device. The tasks of the recording stream, however, don't need to know any of these details and need only the base class (RecordingDevice
) pointer to get the job done.
Finally, the stream's put()
method is used to start the message down the stream by putting it on the first module's downstream task's message queue. Recall that when we opened the stream, we allowed it to use the default head module, which simply passes its input on down the stream.
Many of the tasks in our simple example need only delegate their action to the RecordingDevice
instance, primarily because our design has required the RecordingDevice
implementations to do all the “dirty work” required to talk with the physical device. Our tasks serve only as the “glue” to ensure that things happen in the right order.
Before we look at our base class, let's look at the first task in our stream:
class AnswerIncomingCall : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("AnswerIncomingCall::process()"));
if (message->recorder ()->answer_call () < 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("AnswerIncomingCall")),
-1);
return 0;
}
};
The job of AnswerIncomingCall
is to simply tell the recording device to “pick up the phone.” Recall that in main()
, we were notified that “the phone is ringing” only by the fact that the recording device's wait_for_activity()
unblocked. It is up to our stream's first task to request that the recording device answer the incoming call. In a more robust application, our stream may be in use by many recording devices at one time. Therefore, we have chosen to put the recording device instance pointer into the message, in the recording stream's record()
method, which is passed to each task of the stream. AnswerIncomingCall
uses the recorder()
method to retrieve this pointer and then invokes answer_call()
on it to tell the physical device to respond to the incoming call.
AnswerIncomingCall
and the tasks that follow all possess a process()
method in which they implement their required functionality. They also don't bother to move the Message
they're given on to the next task in the stream. These things, and others, are handled by the common base class: BasicTask
.
Given that our tasks are all derivatives of ACE_Task
and that they expect to get their input from a message queue, it is easy to create a base class for all of them. This base class takes care of reading data from the queue, requesting a derivative to process it, and passing it on to the next task in the stream. The base class also takes care of shutting down the tasks cleanly when the containing stream is shut down, either explicitly or implicitly by destruction:
class BasicTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
typedef ACE_Task<ACE_MT_SYNCH> inherited;
BasicTask () : inherited()
{ }
virtual int open (void * = 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("BasicTask::open() starting ")
ACE_TEXT ("%d threads
"),
this->desired_threads ()));
return this->activate (THR_NEW_LWP | THR_JOINABLE,
this->desired_threads ());
}
BasicTask extends ACE_Task
and takes care of the housework our stream's tasks would have otherwise been bothered with. The virtual open()
method activates the object into one or more threads as required by the virtual desired_threads()
method. For our simple example, none of the tasks require more than one thread, but we've provided two methods—overriding of open()
or desired_threads()
—of customizing this if a task needs to do so.
We next provide a simple put()
method that will put a message block onto the task's message queue. The put()
method is used by the Streams framework to move messages along the stream. The putq()
method could have been used instead, but then the stream would be tied more closely to the task implementation. That is, the stream would be assuming that all tasks wish to use their message queue for communication. A put()
method could just as well send the message to a file for the svc()
method to pick up at a predefined interval:
int put (ACE_Message_Block *message,
ACE_Time_Value *timeout)
{
return this->putq (message, timeout);
}
Before we investigate svc()
, we need to take a look at the close()
4 method. When shut down via its close()
method, a stream will cause the close()
method of each task to be invoked with a flags
value of 1:
virtual int close (u_long flags)
{
int rval = 0;
if (flags == 1)
{
ACE_Message_Block *hangup = new ACE_Message_Block ();
hangup->msg_type (ACE_Message_Block::MB_HANGUP);
if (this->putq (hangup) == -1)
{
hangup->release ();
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("Task::close() putq")),
-1);
}
rval = this->wait ();
}
return rval;
}
When closed by the stream, the task needs to take steps to ensure that all its threads are shut down cleanly. We do this by creating a message block of type MB_HANGUP
. Our svc()
method will look for this and close down cleanly when it arrives. After enqueuing the hang-up request, close()
waits for all threads of the task to exit before returning. The combination of the hang-up message and wait()
ensures that the stream will not shut down before all its tasks have had a chance to do so.
As with any other task, svc()
is the workhorse of BasicTask
. Here, we get the message, ask for our derivatives to do whatever work is necessary, and then send the message on to the next task in the stream:
virtual int svc (void)
{
for (ACE_Message_Block *message = 0; ; )
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("BasicTask::svc() - ")
ACE_TEXT ("waiting for work
" )));
if (this->getq (message) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p
"),
ACE_TEXT ("getq")),
-1);
The svc()
method consists of an infinite loop of the actions described earlier. The first action is to get a message from the queue. We use the simple form of getq()
, which will block until data becomes available.
if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
{
if (this->putq (message) == -1)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("Task::svc() putq")));
message->release ();
}
break;
}
With the message in hand, we check whether it is the special hang-up message. If so, we put it back into the queue5 so that peer threads of the task can also shut down cleanly. We then exit the for(;;)
loop and allow the task to end.
If the message is not the special hang-up message, we continue to process it:
Message *recordedMessage =
(Message *)message->rd_ptr ();
if (this->process (recordedMessage) == -1)
{
message->release ();
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("process")),
-1);
}
With the message block in hand and a determination that the stream isn't being closed, we extract the read pointer from the message block and cast it into a Message
pointer. Thus, we get back to the data that was originally put into the stream by the RecordingStream
's record()
method. The virtual process()
method is now invoked to allow derivatives to do work on the message as required by the design specification:
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("BasicTask::svc() - ")
ACE_TEXT ("Continue to next stage
" )));
if (this->next_step (message) < 0)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("put_next failed")));
message->release ();
break;
}
If all went well with process()
, we attempt to send the message to the next module of the stream. Remember that in our discussion of RecordingStream, we mentioned the issue of the default downstream tail task and the need to create a replacement that would not treat input data as an error. The behavior of next_step()
in BasicTask
is to simply invoke put_next()
:
protected:
virtual int next_step (ACE_Message_Block *message_block)
{
return this->put_next (message_block);
}
This will put the message block onto the next task's message queue for processing. Our custom EndTask
will override this method to do nothing:
class EndTask : public BasicTask
{
protected:
virtual int process (Message *)
{
ACE_TRACE (ACE_TEXT ("EndTask::process()"));
return 0;
}
virtual int next_step (ACE_Message_Block *mb)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("EndTask::next_step() - ")
ACE_TEXT ("end of the line.
")));
mb->release ();
return 0;
}
};
The nice thing about creating the custom EndTask
is that we don't have to add any special code to any other task—or even the BasicTask base class—to handle the end-of-stream condition. This approach to special conditions is much more powerful and flexible than a host of if
statements!
Now that we've seen how AnswerIncomingCall
uses the BasicTask
and how BasicTask
itself is implemented, we can move quickly through the rest of the tasks in the stream.
Like AnswerIncomingCall
, GetCallerId
delegates its work to the RecordingDevice
on which we expect to record the Message
:
class GetCallerId : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("GetCallerId::process()"));
CallerId *id;
id = message->recorder ()->retrieve_callerId ();
if (!id)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("GetCallerId")),
-1);
message->caller_id (id);
return 0;
}
};
The retrieve_callerId()
method returns an opaque CallerId
object. The CallerId
object contains a reference to the message's originator. This reference could be a phone number, e-mail address, or even IP address, depending on the physical device that is taking the message for us. We store the CallerId
in the Message
for use later when we write the message's metadata.
In this task object, we retrieve an ACE_FILE_Addr
pointing to an outgoing message appropriate to the recording device: an MP3 to be played through a voice modem, a text file to be sent over a socket, and so on. The recorder's play_message()
is then given the file for playing. If the recording device is smart enough, it could even convert a text message to audio data:
class PlayOutgoingMessage : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("PlayOutgoingMessage::process()"));
ACE_FILE_Addr outgoing_message =
this->get_outgoing_message (message);
int pmrv =
message->recorder ()->play_message (outgoing_message);
if (pmrv < 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("PlayOutgoingMessage")),
-1);
return 0;
}
ACE_FILE_Addr get_outgoing_message (Message *message)
{
// ...
}
};
Our survey of trivial objects continues with RecordIncomingMessage
. The recorder is now asked to capture the incoming message and record it to a queue/spool location where it can be processed later in the stream. The location of the message and its type are remembered by the Message
so that later modules will be able to quickly locate the recorded data:
class RecordIncomingMessage : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("RecordIncomingMessage::process()"));
ACE_FILE_Addr incoming_message =
this->get_incoming_message_queue ();
MessageType *type =
message->recorder ()->record_message (incoming_message);
if (!type)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("RecordIncomingMessage")),
-1);
message->incoming_message (incoming_message, type);
return 0;
}
ACE_FILE_Addr get_incoming_message_queue (void)
{
// ...
}
};
After a message has been collected, we release the physical device. Remember that main()
is operating in a different thread from the stream tasks. Thus, a new message can come into our application while we're finalizing the processing of the current one. In fact, the recording device could represent many physical channels, and we could implement a system that has many simultaneous “current” calls. In such a system, our BasicTask might instantiate each task into many threads instead of only one:
class ReleaseDevice : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));
message->recorder ()->release ();
return 0;
}
};
To keep life easy for other applications using recorded data, it is preferable to encode the three message types—text, audio, video—into a standard format. We might, for instance, require that all audio be encoded into “radio-quality” MP3:
class EncodeMessage : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));
ACE_FILE_Addr &incoming = message->addr ();
ACE_FILE_Addr addr = this->get_message_destination (message);
if (message->is_text ())
Util::convert_to_unicode (incoming, addr);
else if (message->is_audio ())
Util::convert_to_mp3 (incoming, addr);
else if (message->is_video ())
Util::convert_to_mpeg (incoming, addr);
message->addr (addr);
return 0;
}
ACE_FILE_Addr get_message_destination (Message *message)
{
// ...
}
};
The get_message_destination()
method determines the final location of the encoded message. The Util
object methods take care of encoding the message appropriately and placing the result into the final path. This final path is then added to the Message
in case remaining tasks need to know what it is.
SaveMetaData
is our most complex task yet consists of only a handful of lines. By now, you should have an appreciation of how the Streams framework, along with a little housekeeping, can eliminate all the tedious coding and allow you to focus on your application logic!
The message's metadata will describe the message for other applications. This information includes the path to the message, the type of message—text, audio, or video—and other interesting bits. We've chosen to create a simple XML (Extensible Markup Language) file to contain the metadata:
class SaveMetaData : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("SaveMetaData::process()"));
ACE_CString path (message->addr ().get_path_name ());
path += ".xml";
ACE_FILE_Connector connector;
ACE_FILE_IO file;
ACE_FILE_Addr addr (path.c_str ());
if (connector.connect (file, addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("create meta-data file")),
0);
file.truncate (0);
this->write (file, "<Message>
");
// ...
this->write (file, "</Message>
");
file.close ();
return 0;
}
private:
int write (ACE_FILE_IO &file, const char *str)
{
return file.send (str, ACE_OS::strlen (str));
}
};
Our final task is to notify someone that a new message has arrived. This task may be as simple as log file entry or something more complex, such as sending the message as an attachment in an e-mail to an interested party:
class NotifySomeone : public BasicTask
{
protected:
virtual int process (Message *message)
{
ACE_TRACE (ACE_TEXT ("NotifySomeone::process()"));
// Format an email to tell someone about the
// newly received message.
// ...
// Display message information in the logfile
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("New message from %s ")
ACE_TEXT ("received and stored at %s
"),
message->caller_id ()->string (),
message->addr ().get_path_name ()));
return 0;
}
};
We have seen that creating and using the ACE_Stream
framework is very easy to do. The bulk of our sample code, in fact, had little or nothing to do with the framework itself. If we had included the answering system objects, we would find that the stream interaction code was less than 10 percent of the total. This is as it should be. A good framework should do as much as possible and allow you to focus on your application.
In this section, we use a bidirectional stream to implement a command stream. The general idea is that each module on the stream will implement one command supported by a RecordingDevice
. The RecordingDevice
will configure a Command
object and place it on the stream; the first module capable of processing the Command
will do so and return the results up the stream for the RecordingDevice
to consume. Figure 18.2 shows the logical structure of the Command Stream.
We will work this example from the inside out so that we can focus on the details of the CommandStream
itself. The following subsections describe the stream, its tasks, and how the RecordingDevice
derivative uses the stream.
For purposes of this example, we have created a RecordingDevice
that will record a message delivered on a socket. Each task pair of our command stream will implement a RecordingDevice
command by interacting with the socket in an appropriate manner.
We begin with the definition of our CommandStream
class:
class CommandStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
typedef ACE_Stream<ACE_MT_SYNCH> inherited;
CommandStream (ACE_SOCK_Stream *peer)
: inherited (), peer_(peer) { }
virtual int open (void *arg,
ACE_Module<ACE_MT_SYNCH> *head = 0,
ACE_Module<ACE_MT_SYNCH> *tail = 0);
Command *execute (Command *command);
private:
CommandStream () { }
ACE_SOCK_Stream *peer_;
};
We expect clients of to provide a socket when instantiating CommandStream
. After opening the stream, clients use the execute()
method to request command execution.
The open()
method is where the stream's modules are created and pushed onto the stream. The method begins by invoking the superclass's open()
method so that we don't have to duplicate that functionality:
int CommandStream::open (void *arg,
ACE_Module<ACE_MT_SYNCH> *head,
ACE_Module<ACE_MT_SYNCH> *tail)
{
ACE_TRACE (ACE_TEXT ("CommandStream::open(peer)"));
if (this->inherited::open (arg, head, tail) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p
"),
ACE_TEXT ("Failed to open superclass")),
-1);
In this example, we are not providing custom head/tail modules. If no task is registered to handle a command, the task will reach the default tail module and cause an error. A more robust implementation would replace the default tail module with one that would return an error back upstream, in much the same way that results will be returned upstream.
We now create a task pair for each RecordingDevice
command we intend to support:
CommandModule *answerCallModule;
ACE_NEW_RETURN (answerCallModule,
AnswerCallModule (this->peer_),
-1);
CommandModule *retrieveCallerIdModule;
ACE_NEW_RETURN (retrieveCallerIdModule,
RetrieveCallerIdModule (this->peer_),
-1);
CommandModule *playMessageModule;
ACE_NEW_RETURN (playMessageModule,
PlayMessageModule (this->peer_),
-1);
CommandModule *recordMessageModule;
ACE_NEW_RETURN (recordMessageModule,
RecordMessageModule (this->peer_),
-1);
At this point, we don't care about the specific tasks. Each CommandModule is a derivative of ACE_Module
and knows what tasks need to be created to process the command it represents. This approach helps to decouple the module and the stream. As each module is constructed, we provide it with a copy of the pointer to the socket. We'll see later that the module's tasks are then able to fetch this pointer when they need to interact with the socket.
With the stream ready and the modules created, we can now push each one onto the stream. Because this stream doesn’t represent an ordered set of steps, we can push them in any order. Because a Command
must flow down the stream until it encounters a task pair to process it, it is wise to have the module responsible for the most-used command be at the beginning of the stream. Our example is pretty much a one-shot sequence of commands, though, so the order doesn’t matter:
if (this->push (answerCallModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT (answerCallModule->name())),
-1);
if (this->push (retrieveCallerIdModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT (retrieveCallerIdModule->name())),
-1);
if (this->push (playMessageModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT (playMessageModule->name())),
-1);
if (this->push (recordMessageModule) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Failed to push %p
"),
ACE_TEXT (recordMessageModule->name())),
-1);
The final CommandStream
method is execute()
. A client of the stream will construct a Command
instance and provide it to execute()
for processing. The execute()
method will send the command downstream and wait for a result to be returned:
Command *CommandStream::execute (Command *command)
{
ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0);
if (this->put (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Fail on put command %d: %p
"),
command->command_,
ACE_TEXT ("")),
0);
this->get (mb);
command = (Command *)mb->data_block ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Command (%d) returns (%d)
"),
command->command_,
command->numeric_result_));
return command;
}
The Command
object is a derivative of ACE_Data_Block
. This allows us to provide it directly to the ACE_Message_Block
constructor. It also allows us to take advantage of the fact that the ACE Streams framework—ACE_Message_Block
, in particular—will free the data block's memory at the appropriate time so that we don't have any memory leaks.
Once the message block is configured, we start it down the stream with the put()
method. We immediately invoke get()
to wait for the return data to be given to us. The return data is then cast back to a Command
instance and returned to the caller. The TextListener
implementation of RecordingDevice
uses execute()
to implement each RecordingDevice
command. But we first need to look at the objects that do the work.
In order for all this to work cleanly, we need a bit of support structure. First and foremost is our Command
object, the interface between the clients of the CommandStream
, such as TextListener
, and the tasks that provide the implementation:
class Command : public ACE_Data_Block
{
public:
// Result Values
enum {
RESULT_PASS = 1,
RESULT_SUCCESS = 0,
RESULT_FAILURE = -1
};
// Commands
enum {
CMD_UNKNOWN = -1,
CMD_ANSWER_CALL = 10,
CMD_RETRIEVE_CALLER_ID,
CMD_PLAY_MESSAGE,
CMD_RECORD_MESSAGE
} commands;
int flags_;
int command_;
void *extra_data_;
int numeric_result_;
ACE_CString result_;
};
The Command
class extends ACE_Data_Block
so that we can take advantage of the autodestruction provided by ACE_Message_Block
. Recall that CommandStream::execute()
simply instantiates an ACE_Message_Block
with the provided Command
. The fact that Command
is an ACE_Data_Block
makes this trivial. In a more robust application, we would create further derivatives of Command
instead of using the generic public member variables.
CommandModule
is the base class for all the modules our CommandStream
will be using. We got a hint of this in CommandStream::open()
. The CommandModule
is nothing more than an adapter around its ACE_Module
base class so that we can easily provide the socket to the module and easily retrieve the socket. All necessary casting is handled internally by CommandModule
so that its clients—the command implementation tasks—can remain cast free:
class CommandModule : public ACE_Module<ACE_MT_SYNCH>
{
public:
typedef ACE_Module<ACE_MT_SYNCH> inherited;
typedef ACE_Task<ACE_MT_SYNCH> Task;
CommandModule (const ACE_TCHAR *module_name,
CommandTask *writer,
CommandTask *reader,
ACE_SOCK_Stream *peer);
ACE_SOCK_Stream &peer (void);
};
The constructor has essentially the same signature as that of ACE_Module
but with the more specific data types appropriate to our application:
CommandModule::CommandModule (const ACE_TCHAR *module_name,
CommandTask *writer,
CommandTask *reader,
ACE_SOCK_Stream *peer)
: inherited(module_name, writer, reader, peer)
{ }
The peer()
method makes use of the arg()
method of ACE_Module
to retrieve its optional data. The arg()
result is then cast into an ACE_SOCK_Stream
as expected by the clients of CommandModule
:
ACE_SOCK_Stream &CommandModule::peer (void)
{
ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg ();
return *peer;
}
The workhorse of our CommandStream
framework is the CommandTask
object, which extends ACE_Task
and is the base class for all the tasks that implement each command. CommandTask
provides a svc()
method that will
• Retrieve Command
requests from its message queue
• Determine whether the task can process the Command
• Decide where to pass the Command
next
class CommandTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
typedef ACE_Task<ACE_MT_SYNCH> inherited;
virtual ~CommandTask () { }
virtual int open (void * = 0 );
int put (ACE_Message_Block *message,
ACE_Time_Value *timeout);
virtual int svc (void);
virtual int close (u_long flags);
protected:
CommandTask (int command);
virtual int process (Command *message);
int command_;
};
Before we look at svc()
in detail, let's see what the other methods are doing for us. The constructor simply initializes the ACE_Task
base class and sets the command_
attribute. This will be provided by a derivative and should be one of the enumerated values from Command
. The svc()
method will compare the command_
attribute of an incoming Command
to this value to determine whether a CommandTask
derivative instance can process the requested command:
CommandTask::CommandTask (int command)
: inherited (), command_(command)
{ }
The open()
method is as we've seen before. It simply creates a new thread in which the task will execute. For this example, we need only one thread per command:
int CommandTask::open (void *)
{
return this->activate ();
}
The put()
method is similarly familiar:
int CommandTask::put (ACE_Message_Block *message,
ACE_Time_Value *timeout)
{
return this->putq (message, timeout);
}
The close()
method is yet another boilerplate from our previous example:
int CommandTask::close (u_long flags)
{
int rval = 0;
if (flags == 1)
{
ACE_Message_Block *hangup = new ACE_Message_Block;
hangup->msg_type (ACE_Message_Block::MB_HANGUP);
if (this->putq (hangup->duplicate ()) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("Task::close() putq")),
-1);
}
hangup->release ();
rval = this->wait ();
}
return rval;
}
Next comes our virtual process()
method. This base class implementation returns a failure code. The CommandTask
derivatives are expected to override this method to implement the command they represent. Thus, process()
will return RESULT_FAILURE
if processing fails, RESULT_SUCCESS
if it succeeds, or RESULT_PASS
if it chooses not to process the command. On failure or success, the Command
will be returned upstream to where the command stream's execute()
method is blocking on getq()
. On RESULT_PASS
, the Command
instance will continue downstream. Any return from an upstream task will allow the Command
to continue the upstream journey:
int CommandTask::process (Command *)
{
ACE_TRACE (ACE_TEXT ("CommandTask::process()"));
return RESULT_FAILURE;
}
With the trivia behind us, we can now take a look at svc()
. We begin with the usual business of taking a message block off the task's queue and checking for shutdown:
int CommandTask::svc (void)
{
ACE_Message_Block *message;
for (;;)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("CommandTask::svc() - ")
ACE_TEXT ("%s waiting for work
"),
this->module ()->name ()));
if (this->getq (message) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("getq")),
-1);
if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
{
if (this->putq (message->duplicate ()) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("Task::svc() putq")),
-1);
}
message->release ();
break;
}
Now that we have a valid message block, we extract its data block. We know the data block is a Command
instance, so we cast that back. If the command we're being asked to execute is not “ours,” the message block is sent on to the next module in the stream. As this is a bidirectional stream, the put_next()
could be moving data up- or downstream, but we don't care at this stage:
Command *command = (Command *)message->data_block ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("CommandTask::svc() - ")
ACE_TEXT ("%s got work request %d
"),
ACE_TEXT (this->module ()->name ()),
command->command_));
if (command->command_ != this->command_)
{
this->put_next (message->duplicate ());
}
If the Command
is our responsibility, we need to invoke process()
on it. Or, more specifically, our derivative needs to process it. We help by setting the numeric_result_
attribute to –1 if process()
failed. This allows our derivative's process()
to simply return Command::RESULT_FAILURE,
, yet the command stream's client can inspect numeric_result_
:
else
{
int result = this->process (command);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("CommandTask::svc() - ")
ACE_TEXT ("%s work request %d result is %d
"),
ACE_TEXT (this->module ()->name ()),
command->command_,
result));
if (result == Command::RESULT_FAILURE)
{
command->numeric_result_ = -1;
}
If the Command
is intended for this task, the task's process()
method can still decide to let someone else handle the processing. Perhaps a scenario will require two or more tasks to process the same Command
. In any case, a RESULT_PASS
return value will let the message continue along the stream:
else if (result == Command::RESULT_PASS)
{
this->put_next (message->duplicate ());
}
Any other return value must be success, and we need to decide what should be done with the Command
. If the current task is on the downstream side of the CommandStream (is_writer()
), we want to turn around the Command
and send it back to the stream head. This is simply done by putting the message block on our sibling task's message queue:
else // result == Command::RESULT_SUCCESS
{
if (this->is_writer ())
{
this->sibling ()->putq
(message->duplicate ());
}
On the other hand, if the task is on the upstream side, we want the Command
to keep flowing upstream:
else // this->is_reader ()
{
this->put_next (message->duplicate ());
}
That completes the section where this task is processing a command. All that is left now is to release the message block taken off the message queue and wrap up the method:
message->release ();
} // for (;;)
return 0;
}
Now that we've seen the CommandModule
and CommandTask
base classes, we can look at the specific implementations instantiated by CommandStream::open()
. In the life cycle of recording a call, the first thing we must do is answer the call.
To implement the Answer Call function, we've created three objects. AnswerCallModule
is a CommandModule
(ACE_Module
) responsible for creating the upstream and downstream tasks. The only method of AnswerCallModule
is the constructor, which simply provides the necessary information to the base class instance. The peer
parameter is provided as “extra data” to the base class so that the peer()
method can use the arg()
method to return the socket to either of the tasks:
AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer)
: CommandModule ("AnswerCall Module",
new AnswerCallDownstreamTask (),
new AnswerCallUpstreamTask (),
peer)
{ }
The two tasks are responsible for handling the CMD_ANSWER_CALL
command; therefore, they provide this value to the CommandTask
constructor:
AnswerCallDownstreamTask::AnswerCallDownstreamTask (void)
: CommandTask(Command::CMD_ANSWER_CALL)
{ }
AnswerCallUpstreamTask::AnswerCallUpstreamTask (void)
: CommandTask(Command::CMD_ANSWER_CALL)
{ }
All that is left now is to implement the process()
method that will answer the incoming connection request. The decision to put this on the downstream or upstream side is rather arbitrary and in the end doesn't matter. We've taken the approach that any “active” action will go on the downstream side and any “passive,” or “receiving,” action on the upstream side. Thus, we implement the connection acceptance on the downstream task:
int AnswerCallDownstreamTask::process (Command *command)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream)
")));
TextListenerAcceptor *acceptor =
(TextListenerAcceptor *)command->extra_data_;
CommandModule *module =
(CommandModule*)this->module ();
command->numeric_result_ =
acceptor->accept (module->peer ());
acceptor->release ();
return Command::RESULT_SUCCESS;
}
A few things are worth noting here. First, the Command
instance's extra data is expected to be a TextListenerAcceptor
instance. This sort of implied API could be avoided by creating derivatives of Command
for each command verb.
Next of interest is the module()
method. Any ACE_Task
has access to this method for retrieving the ACE_Module
in which the task is contained, if any. Because our module is a CommandModule
, we cast the module()
return. On our CommandModule
, we can then invoke the peer()
method to get a reference to the socket.
Finally, we return Command::RESULT_SUCCESS
to tell CommandTask::svc()
that we've processed the command and that the data is ready to be sent upstream to the client.
CommandTask::svc()
will then invoke putq()
on the upstream task whose process()
method has nothing important to do:
int AnswerCallUpstreamTask::process (Command *)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream)
")));
return Command::RESULT_SUCCESS;
}
The action of retrieving caller ID from a socket consists of gathering up the IP of the remote peer. Like the AnswerCallModule
, the RetrieveCallerIdModule
consists of nothing more than a constructor:
RetrieveCallerIdModule::RetrieveCallerIdModule
(ACE_SOCK_Stream *peer)
: CommandModule ("RetrieveCallerId Module",
new RetrieveCallerIdDownstreamTask (),
new RetrieveCallerIdUpstreamTask (),
peer)
{ }
We consider this to be a read operation, so we've implemented it on the upstream side of the stream:
RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask
(void)
: CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }
int RetrieveCallerIdUpstreamTask::process (Command *command)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Returning Caller ID data
")));
ACE_INET_Addr remote_addr;
CommandModule *module =
(CommandModule*)this->module ();
module->peer ().get_remote_addr (remote_addr);
ACE_TCHAR remote_addr_str[256];
remote_addr.addr_to_string (remote_addr_str, 256);
command->result_ = ACE_CString (remote_addr_str);
return Command::RESULT_SUCCESS;
}
There should be nothing surprising here. We again use the module()
method to get access to our CommandModule
and its peer()
. The peer's address is returned in the result_
attribute of the Command
for our client's consumption.
Our downstream object is expectedly simple:
RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask
(void)
: CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }
int RetrieveCallerIdDownstreamTask::process (Command *)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Retrieving Caller ID data
")));
return Command::RESULT_SUCCESS;
}
As you can see, our CommandModule and CommandTask derivatives are well insulated from the mechanics of the ACE_Stream
framework. As with our uni-directional stream example, we have gone to some effort to ensure that the application programmer can focus on the task at hand—no pun intended—rather than worry about the details of the underlying framework.
Play Message and Record Message each require an appropriate module constructor. Play Message is then implemented on the downstream side:
int PlayMessageDownstreamTask::process (Command *command)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Play Outgoing Message
")));
ACE_FILE_Connector connector;
ACE_FILE_IO file;
ACE_FILE_Addr *addr =
(ACE_FILE_Addr *)command->extra_data_;
if (connector.connect (file, *addr) == -1)
{
command->numeric_result_ = -1;
}
else
{
command->numeric_result_ = 0;
CommandModule *module =
(CommandModule*)this->module ();
char rwbuf[512];
int rwbytes;
while ((rwbytes = file.recv (rwbuf, 512)) > 0)
{
module->peer ().send_n (rwbuf, rwbytes);
}
}
return Command::RESULT_SUCCESS;
}
RecordMessage
is implemented on the upstream side:
int RecordMessageUpstreamTask::process (Command *command)
{
// Collect whatever the peer sends and write into the
// specified file.
ACE_FILE_Connector connector;
ACE_FILE_IO file;
ACE_FILE_Addr *addr =
(ACE_FILE_Addr *)command->extra_data_;
if (connector.connect (file, *addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p
"),
ACE_TEXT ("create file")),
Command::RESULT_FAILURE);
file.truncate (0);
CommandModule *module =
(CommandModule*)this->module ();
int total_bytes = 0;
char rwbuf[512];
int rwbytes;
while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0)
{
total_bytes += file.send_n (rwbuf, rwbytes);
}
file.close ();
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("RecordMessageUpstreamTask ")
ACE_TEXT ("- recorded %d byte message
"),
total_bytes));
return Command::RESULT_SUCCESS;
}
All our component parts are now in place. We have an opaque CommandStream
into which one can place a Command
instance and retrieve a response. The CommandStream
is built from a list of CommandModule
derivatives, each of which contains a pair of CommandTask
derivatives. The program logic is implemented in the CommandTask
derivatives and, for the most part, is immune to the details of the ACE_Stream
framework.
Let's now look at how the TextListener
implementation of RecordingDevice
uses the CommandStream
. Because we can concurrently accept connections on a socket and process established connections, we have two RecordingDevice
derivatives to implement our socket recorder. The first, TextListenerAcceptor
, implements only the wait_for_activity()
method of RecordingDevice
. That has nothing to do with the stream interaction, so we won't go into the details. In short, TextListenerAcceptor::wait_for_activity()
will wait for a connection request on the socket and return a new TextListener
instance when that happens.
TextListener
implements the other RecordingDevice
interface, using the CommandStream
. We begin with the constructor as invoked by the acceptor:
TextListener::TextListener (TextListenerAcceptor *acceptor)
: acceptor_(acceptor)
{
ACE_TRACE (ACE_TEXT ("TextListener ctor"));
ACE_NEW (this->command_stream_, CommandStream (&(this->peer_)));
this->command_stream_->open (0);
}
The TextListenerAcceptor
doesn't even accept the incoming socket connection. It shouldn't; that's a job for the Answer Call task, as shown earlier. Therefore, the acceptor's wait_for_activity()
method provides a pointer to the acceptor object when the TextListener
is created. This is held in a member attribute until ready to be used in the answer_call()
method.
We've chosen not to implement an open()
method for the TextListenerAcceptor
to invoke. Thus, we create and initialize the CommandStream
directly in the constructor. The peer_
attribute is not yet connected, but because it is instantiated, we can safely provide a pointer to it in the command stream at this stage. In fact, we must provide it, API aside, so that the answer_call
task can perform the connection:
int TextListener::answer_call (void)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TextListener::answer_call()
")));
Command *c = new Command ();
c->command_ = Command::CMD_ANSWER_CALL;
c->extra_data_ = this->acceptor_;
c = this->command_stream_->execute (c);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TextListener::answer_call() ")
ACE_TEXT ("result is %d
"),
c->numeric_result_));
return c->numeric_result_;
}
To implement the answer_call()
method, the TextListener
first creates a Command
instance with the command_
attribute set to Command::CMD_ANSWER_CALL.
. This tells the command stream what needs to be done. We also save the TextListenerAcceptor
instance (acceptor_
) as extra data on the Command
. This satisfies the implied API of the AnswerCallDownstreamTask
so that it can establish the connection.
The newly created Command
is then given to the stream for execution. As written, the execute()
method will block until the command has been completed. For simplicity, execute()
will return a Command
instance identical to the one it was given,6 plus the return values.
The remainder of the TextListener
methods follow this general pattern. They are presented in life-cycle order.
CallerId *TextListener::retrieve_callerId (void)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TextListener::retrieve_callerId()
")));
Command *c = new Command ();
c->command_ = Command::CMD_RETRIEVE_CALLER_ID;
c = this->command_stream_->execute (c);
CallerId *caller_id = new CallerId (c->result_);
return caller_id;
}
int TextListener::play_message (ACE_FILE_Addr &addr)
{
MessageType *type = Util::identify_message (addr);
if (type->is_text ())
{
Command *c = new Command ();
c->command_ = Command::CMD_PLAY_MESSAGE;
c->extra_data_ = &addr;
c = this->command_stream_->execute (c);
return c->numeric_result_;
}
ACE_FILE_Addr temp ("/tmp/outgoing_message.text");
ACE_FILE_IO *file;
if (type->is_audio ())
file = Util::audio_to_text (addr, temp);
else if (type->is_video ())
file = Util::video_to_text (addr, temp);
else
ACE_ERROR_RETURN
((LM_ERROR, ACE_TEXT ("Invalid message type %d
"),
type->get_codec ()), -1);
int rval = this->play_message (temp);
file->remove ();
return rval;
}
MessageType *TextListener::record_message (ACE_FILE_Addr &addr)
{
Command *c = new Command ();
c->command_ = Command::CMD_RECORD_MESSAGE;
c->extra_data_ = &addr;
c = this->command_stream_->execute (c);
if (c->numeric_result_ == -1)
return 0;
return new MessageType (MessageType::RAWTEXT, addr);
}
The release()
method is invoked by our RecordingStream
framework when the recording process is complete. Because a new TextListener
is instantiated for each recording, we take this opportunity to free that memory:
void TextListener::release (void)
{
delete this;
}
In this chapter, we have investigated the ACE Streams framework. An ACE_Stream
is nothing more than a doubly linked list of ACE_Module
instances, each of which contains a pair of ACE_Task
derivatives. Streams are useful for many things, only two of which we've investigated here. The following list enumerates how one would use a stream.
ACE_Task
derivatives that implement your application logic.Keep in mind the following when architecting your stream.
• Each task in the stream can exist in one or more threads. Use multiple threads when your application can take advantage of parallel processing.
• The default tail tasks will return an error if any data reaches them. If your tasks don't entirely consume the data, you should at least provide a replacement downstream tail task.
• You can provide your task's open()
method with arbitrary data by passing it as the fourth parameter (args
) of module's constructor or open()
method.
• A task's open()
method is invoked as a result of pushing its module onto the stream, not as a result of invoking open()
on the module.
• ACE_Message_Block
instances are reference counted and will not leak memory if you correctly use their duplicate()
and release()
methods.
One last thing: If you're wondering what happened to the protocol stack we mentioned, rest assured that we haven't forgotten it. The ACE source is distributed with online tutorials. Please check out tutorial 15 in the ACE_wrappers/docs/tutorials
directory for a protocol stream implementation example.
3.144.104.29