Chapter 18. ACE Streams Framework

The ACE Streams framework implements the Pipes and Filters pattern described in [1]. The framework is an excellent way to model processes consisting of a set of ordered steps. Each step, or filter in Pipes and Filters terminology, in the process is implemented as an ACE_Task derivative. As each step is completed, the data is handed off to the next step for continuation, using the ACE_Task objects' message queues, or pipes in Pipes and Filters terminology. Steps can be multithreaded in order to increase throughput if the data lends itself to parallel processing. In this chapter, we explore the following classes: ACE_Stream, ACE_Module, ACE_Task, and ACE_Message_Block.

18.1 Overview

Another commonly known implementation of the Pipes and Filters pattern is the UNIX System V STREAMS framework. If you're familiar with System V STREAMS, you will recognize the concepts. The ACE Streams framework allows the flexible, dynamically configurable assembly of a set of modules into a stream through which information travels. Each module has an opportunity to manipulate the data in the stream and can modify it, remove it, or add to it before passing it to the next module in the stream. Data moves bidirectionally in the stream, and each module in the stream has a reader and a writer task—one for each data direction.

The first thing to do when using ACE Streams is to identify the sequence of events you wish to process. The steps should be as discrete as possible, with well-defined outputs and inputs. Where possible, identify and document opportunities for parallel processing within each step.

Once your steps are identified, you can begin implementing each as a derivative of ACE_Task. The svc() method of each will use getq() to get a unit of work and put_next() to pass the completed work to the next step. At this time, identify which tasks are “downstream” and which are “upstream” in nature. Downstream tasks can be thought of as moving “out of” your primary application as they move down the stream. For instance, a stream implementing a protocol stack would use each task to perform a different stage of protocol conversion. The final task would send the converted data, perhaps via TCP/IP, to a remote peer. Similarly, upstream tasks can be thought of as moving data “into” your primary application.

Module instances, or paired downstream and upstream tasks, can be created once the tasks are implemented. In our protocol conversion example, the downstream tasks would encode our data and the upstream tasks would decode it.

The final step is to create the ACE_Stream instance and push the ordered list of modules onto it. You can then use put() to move data onto the stream for processing and obtain the results with get(). See Figure 18.1.

Figure 18.1. Diagram of an ACE_Stream

image

18.2 Using a One-Way Stream

In this section, we look at an answering machine implementation that uses a one-way stream to record and process messages. Each module of the stream performs one function in the set of functions required to implement the system. The functions are

  1. Answer incoming call
  2. Collect Caller Identification data
  3. Play outgoing message
  4. Collect incoming message
  5. Return recording device to the pool
  6. Encode collected message into a normalized form
  7. Save message and metadata into message repository
  8. Send notification of received message

As defined, each step of the process is very specific in what it must do. This may seem a bit like overkill, but in a more complex application, it is an excellent way to divide the work among team members. Each person can focus on the implementation of his or her own step, as long as the interfaces between each step are well defined.

18.2.1 Main Program

We're going to work through this example from the top down; that is, we start with main() and then dig down into successively lower levels:


int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  RecordingDevice *recorder =
    RecordingDeviceFactory::instantiate (argc, argv);

Here, main() begins by using the RecordingDeviceFactory to instantiate a RecordingDevice instance based on command line parameters.1 Our system may have many kinds of recording devices: voice modems, video phones, e-mail receivers, and so on. The command line parameters tell us which kind of device this instance of the application is communicating with.



RecordingStream *recording_stream;
ACE_NEW_RETURN (recording_stream, RecordingStream, -1);

if (recording_stream->open (0) < 0)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("%p "),
                     ACE_TEXT ("RecordingStream->open()")),
                    0);

Next, our example creates and opens an instance of RecordingStream. As we will see, this is a simple derivative of ACE_Stream. If the open() fails, we will print a brief message and exit with an error code.

The final task of main() is to enter an infinite loop, waiting for messages to arrive and recording them when they do:


for (;;)
  {
    ACE_DEBUG ((LM_INFO,
                ACE_TEXT ("Waiting for incoming message ")));
    RecordingDevice *activeRecorder =
      recorder->wait_for_activity ();

    ACE_DEBUG ((LM_INFO,
                ACE_TEXT ("Initiating recording process ")));

    recording_stream->record (activeRecorder);
  }

The RecordingDevice::wait_for_activity() method will block until there is some activity, such as a ring, on the physical device. The RecordingDevice instance is then given to the RecordingStream to process our list of directives.

18.2.2 RecordingStream

As mentioned, the RecordingStream object is a simple derivative of ACE_Stream. We begin with the constructor, which takes care of the details of initializing the base class:


class RecordingStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
  typedef ACE_Stream<ACE_MT_SYNCH> inherited;
  typedef ACE_Module<ACE_MT_SYNCH> Module;

  RecordingStream () : inherited()
  { }

A stream will always have at least two modules installed: head and tail. The default downstream task of the head module simply passes any data down the stream. The default tail module, however, will treat any received data as an error. Our example can prevent this from happening in two ways.

  1. We can code our last task in the stream to not send data any farther.
  2. We can install a replacement for the task in the tail module.

In order to prevent special conditions, which are generally a sign of a bad design, we will use the second option. Therefore, our stream's open() method will create a Module with an instance of our EndTask object and install that at the tail of the stream if no other module is specified:


virtual int open (void *arg,
                  Module *head = 0, Module *tail = 0)
{
  if (tail == 0)
    ACE_NEW_RETURN (tail,
                    Module ("End Module", new EndTask ()),
                    -1);
  this->inherited::open (arg, head, tail);

Our open() method changes only the default stream tail to suit our needs. Otherwise, the head and tail modules are passed to the inherited open() method. This allows RecordingStream to be configured and reused in other situations.

Our design described eight steps in the process, and we've created eight ACE_Task derivatives to implement those steps. In the next part of open(), we create an instance of each of these eight objects and a Module to contain them:


Module *answerIncomingCallModule;
ACE_NEW_RETURN (answerIncomingCallModule,
                Module ("Answer Incoming Call",
                        new AnswerIncomingCall ()),
                -1);

Module *getCallerIdModule;
ACE_NEW_RETURN (getCallerIdModule,
                Module ("Get Caller ID", new GetCallerId ()),
                -1);


Module *playOGMModule;
ACE_NEW_RETURN (playOGMModule,
                Module ("Play Outgoing Message",
                        new PlayOutgoingMessage ()),
                -1);

Module *recordModule;
ACE_NEW_RETURN (recordModule,
                Module ("Record Incoming Message",
                        new RecordIncomingMessage ()),
                -1);

Module *releaseModule;
ACE_NEW_RETURN (releaseModule,
                Module ("Release Device",
                        new ReleaseDevice ()),
                -1);

Module *conversionModule;
ACE_NEW_RETURN (conversionModule,
                Module ("Encode Message",
                        new EncodeMessage ()),
                -1);

Module *saveMetaDataModule;
ACE_NEW_RETURN (saveMetaDataModule,
                Module ("Save Meta-Data",
                        new SaveMetaData ()),
                -1);

Module *notificationModule;
ACE_NEW_RETURN (notificationModule,
                Module ("Notify Someone",
                        new NotifySomeone ()),
                -1);

The general design of the Stream framework is that an ACE_Stream contains a list of ACE_Module instances and that each ACE_Module contains one or two ACE_Task instances. The tasks can be designated either downstream—invoked as data moves away from the stream's head—or upstream—invoked as data moves toward the stream's head. Our instantiation of getCallerIdModule follows:



Module *getCallerIdModule;
ACE_NEW_RETURN (getCallerIdModule,
                Module ("Get Caller ID", new GetCallerId ()),
                -1);

This creates a Module (ACE_Module<ACE_MT_SYNC>) with the name “Get Caller ID” and an instance of the GetCallerId object, which is a derivative of ACE_Task, as required by the ACE_Module constructor signature. The GetCallerId instance is installed as the module's downstream task. We don't provide an upstream task, because our stream will store the recorded messages to disk and not expect anything to return back upstream to main().

The final part of our RecordingStream's open() method now pushes the modules onto the stream in the correct order:


if (this->push (notificationModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("notificationModule")),
                    -1);
if (this->push (saveMetaDataModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("saveMetaDataModule")),
                    -1);
if (this->push (conversionModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("conversionModule")),
                    -1);
if (this->push (releaseModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("releaseModule")),
                    -1);
if (this->push (recordModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("recordModule")),
                    -1);
if (this->push (playOGMModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("playOGMModule")),
                    -1);

if (this->push (getCallerIdModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT ("getCallerIdModule")),
                    -1);
if (this->push (answerIncomingCallModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p ")
                     ACE_TEXT ("answerIncomingCallModule")),
                    -1);

Pushing the modules onto the stream in the correct order is very important. If your process is order dependent and you push the first-used module first, you'll be in for an unpleasant surprise! The push/pop stream terminology is the same as a stack. So be sure that you remember: first pushed, last used.

The remainder of our RecordingStream is the record() method. Whereas the constructor protected main() from the details of stream creation, record() protects it from direct interaction with the stream API:


int record (RecordingDevice *recorder)
{
  ACE_Message_Block * mb;
  ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1);

  Message *message = (Message *)mb->wr_ptr ();
  mb->wr_ptr (sizeof(Message));

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("RecordingStream::record() - ")
              ACE_TEXT ("message->recorder(recorder) ")));
  message->recorder (recorder);

  int rval = this->put (mb);
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("RecordingStream::record() - ")
              ACE_TEXT ("this->put() returns %d "),
              rval));
  return rval;
}

An ACE_Stream is, ultimately, a fancy implementation of a linked list of ACE_Task objects. Each ACE_Task comes from the manufacturer with a built-in ACE_Message_Queue at no extra charge. That's primarily because the message queue is such an easy way to request work from a task.2

To give our stream some work to do, we create an ACE_Message_Block that will go into the first downstream task's message queue with put(). As our stream will be dealing with recording messages, we create the message block with enough space to contain our Message3 object. We will basically be using the message block's data area to contain a Message object's contents.

Once our message is created, we provide it with the RecordingDevice pointer so that it will be able to ask the physical device to do things during the message processing. There may be many RecordingDevice implementations, each able to talk to a different kind of device. The tasks of the recording stream, however, don't need to know any of these details and need only the base class (RecordingDevice) pointer to get the job done.

Finally, the stream's put() method is used to start the message down the stream by putting it on the first module's downstream task's message queue. Recall that when we opened the stream, we allowed it to use the default head module, which simply passes its input on down the stream.

18.2.3 Tasks

Many of the tasks in our simple example need only delegate their action to the RecordingDevice instance, primarily because our design has required the RecordingDevice implementations to do all the “dirty work” required to talk with the physical device. Our tasks serve only as the “glue” to ensure that things happen in the right order.

AnswerIncomingCall

Before we look at our base class, let's look at the first task in our stream:



class AnswerIncomingCall : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("AnswerIncomingCall::process()"));

    if (message->recorder ()->answer_call () < 0)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p "),
                         ACE_TEXT ("AnswerIncomingCall")),
                        -1);
      return 0;
  }
};

The job of AnswerIncomingCall is to simply tell the recording device to “pick up the phone.” Recall that in main(), we were notified that “the phone is ringing” only by the fact that the recording device's wait_for_activity() unblocked. It is up to our stream's first task to request that the recording device answer the incoming call. In a more robust application, our stream may be in use by many recording devices at one time. Therefore, we have chosen to put the recording device instance pointer into the message, in the recording stream's record() method, which is passed to each task of the stream. AnswerIncomingCall uses the recorder() method to retrieve this pointer and then invokes answer_call() on it to tell the physical device to respond to the incoming call.

BasicTask

AnswerIncomingCall and the tasks that follow all possess a process() method in which they implement their required functionality. They also don't bother to move the Message they're given on to the next task in the stream. These things, and others, are handled by the common base class: BasicTask.

Given that our tasks are all derivatives of ACE_Task and that they expect to get their input from a message queue, it is easy to create a base class for all of them. This base class takes care of reading data from the queue, requesting a derivative to process it, and passing it on to the next task in the stream. The base class also takes care of shutting down the tasks cleanly when the containing stream is shut down, either explicitly or implicitly by destruction:



class BasicTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;

  BasicTask () : inherited()
  { }

  virtual int open (void * = 0)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("BasicTask::open() starting ")
                ACE_TEXT ("%d threads "),
                this->desired_threads ()));

    return this->activate (THR_NEW_LWP | THR_JOINABLE,
                           this->desired_threads ());
  }

BasicTask extends ACE_Task and takes care of the housework our stream's tasks would have otherwise been bothered with. The virtual open() method activates the object into one or more threads as required by the virtual desired_threads() method. For our simple example, none of the tasks require more than one thread, but we've provided two methods—overriding of open() or desired_threads()—of customizing this if a task needs to do so.

We next provide a simple put() method that will put a message block onto the task's message queue. The put() method is used by the Streams framework to move messages along the stream. The putq() method could have been used instead, but then the stream would be tied more closely to the task implementation. That is, the stream would be assuming that all tasks wish to use their message queue for communication. A put() method could just as well send the message to a file for the svc() method to pick up at a predefined interval:


int put (ACE_Message_Block *message,
         ACE_Time_Value *timeout)
{
  return this->putq (message, timeout);
}

Before we investigate svc(), we need to take a look at the close()4 method. When shut down via its close() method, a stream will cause the close() method of each task to be invoked with a flags value of 1:


virtual int close (u_long flags)
{
  int rval = 0;


  if (flags == 1)
    {
      ACE_Message_Block *hangup = new ACE_Message_Block ();
      hangup->msg_type (ACE_Message_Block::MB_HANGUP);
      if (this->putq (hangup) == -1)
        {
          hangup->release ();
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("%p "),
                             ACE_TEXT ("Task::close() putq")),
                            -1);
        }


      rval = this->wait ();
    }


  return rval;
}

When closed by the stream, the task needs to take steps to ensure that all its threads are shut down cleanly. We do this by creating a message block of type MB_HANGUP. Our svc() method will look for this and close down cleanly when it arrives. After enqueuing the hang-up request, close() waits for all threads of the task to exit before returning. The combination of the hang-up message and wait() ensures that the stream will not shut down before all its tasks have had a chance to do so.

As with any other task, svc() is the workhorse of BasicTask. Here, we get the message, ask for our derivatives to do whatever work is necessary, and then send the message on to the next task in the stream:



virtual int svc (void)
{
  for (ACE_Message_Block *message = 0; ; )
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("BasicTask::svc() - ")
                  ACE_TEXT ("waiting for work " )));

      if (this->getq (message) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                           ACE_TEXT ("getq")),
                          -1);

The svc() method consists of an infinite loop of the actions described earlier. The first action is to get a message from the queue. We use the simple form of getq(), which will block until data becomes available.


if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
  {
    if (this->putq (message) == -1)
      {
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("%p "),
                    ACE_TEXT ("Task::svc() putq")));
        message->release ();
      }
    break;
  }

With the message in hand, we check whether it is the special hang-up message. If so, we put it back into the queue5 so that peer threads of the task can also shut down cleanly. We then exit the for(;;) loop and allow the task to end.

If the message is not the special hang-up message, we continue to process it:



Message *recordedMessage =
  (Message *)message->rd_ptr ();

if (this->process (recordedMessage) == -1)
  {
    message->release ();
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p "),
                       ACE_TEXT ("process")),
                      -1);
  }

With the message block in hand and a determination that the stream isn't being closed, we extract the read pointer from the message block and cast it into a Message pointer. Thus, we get back to the data that was originally put into the stream by the RecordingStream's record() method. The virtual process() method is now invoked to allow derivatives to do work on the message as required by the design specification:


ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("BasicTask::svc() - ")
            ACE_TEXT ("Continue to next stage " )));


if (this->next_step (message) < 0)
  {
    ACE_ERROR ((LM_ERROR,
                ACE_TEXT ("%p "),
                ACE_TEXT ("put_next failed")));
    message->release ();
    break;
  }

If all went well with process(), we attempt to send the message to the next module of the stream. Remember that in our discussion of RecordingStream, we mentioned the issue of the default downstream tail task and the need to create a replacement that would not treat input data as an error. The behavior of next_step() in BasicTask is to simply invoke put_next():


protected:
  virtual int next_step (ACE_Message_Block *message_block)
  {
    return this->put_next (message_block);
  }

This will put the message block onto the next task's message queue for processing. Our custom EndTask will override this method to do nothing:


class EndTask : public BasicTask
{
protected:
  virtual int process (Message *)
  {
    ACE_TRACE (ACE_TEXT ("EndTask::process()"));
    return 0;
  }

  virtual int next_step (ACE_Message_Block *mb)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("EndTask::next_step() - ")
                ACE_TEXT ("end of the line. ")));
    mb->release ();
    return 0;
  }
};

The nice thing about creating the custom EndTask is that we don't have to add any special code to any other task—or even the BasicTask base class—to handle the end-of-stream condition. This approach to special conditions is much more powerful and flexible than a host of if statements!

Now that we've seen how AnswerIncomingCall uses the BasicTask and how BasicTask itself is implemented, we can move quickly through the rest of the tasks in the stream.

GetCallerId

Like AnswerIncomingCall, GetCallerId delegates its work to the RecordingDevice on which we expect to record the Message:


class GetCallerId : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("GetCallerId::process()"));

    CallerId *id;
    id = message->recorder ()->retrieve_callerId ();
    if (!id)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p "),
                         ACE_TEXT ("GetCallerId")),
                        -1);

    message->caller_id (id);
    return 0;
  }
};

The retrieve_callerId() method returns an opaque CallerId object. The CallerId object contains a reference to the message's originator. This reference could be a phone number, e-mail address, or even IP address, depending on the physical device that is taking the message for us. We store the CallerId in the Message for use later when we write the message's metadata.

PlayOutgoingMessage

In this task object, we retrieve an ACE_FILE_Addr pointing to an outgoing message appropriate to the recording device: an MP3 to be played through a voice modem, a text file to be sent over a socket, and so on. The recorder's play_message() is then given the file for playing. If the recording device is smart enough, it could even convert a text message to audio data:


class PlayOutgoingMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("PlayOutgoingMessage::process()"));

    ACE_FILE_Addr outgoing_message =
      this->get_outgoing_message (message);

    int pmrv =
      message->recorder ()->play_message (outgoing_message);
    if (pmrv < 0)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p "),
                         ACE_TEXT ("PlayOutgoingMessage")),
                        -1);
    return 0;
  }
  ACE_FILE_Addr get_outgoing_message (Message *message)
  {
    // ...
  }
};

RecordIncomingMessage

Our survey of trivial objects continues with RecordIncomingMessage. The recorder is now asked to capture the incoming message and record it to a queue/spool location where it can be processed later in the stream. The location of the message and its type are remembered by the Message so that later modules will be able to quickly locate the recorded data:


class RecordIncomingMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("RecordIncomingMessage::process()"));

    ACE_FILE_Addr incoming_message =
      this->get_incoming_message_queue ();

    MessageType *type =
      message->recorder ()->record_message (incoming_message);
    if (!type)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p "),
                         ACE_TEXT ("RecordIncomingMessage")),
                        -1);
    message->incoming_message (incoming_message, type);
    return 0;
  }

  ACE_FILE_Addr get_incoming_message_queue (void)
  {
    // ...
  }
};

ReleaseDevice

After a message has been collected, we release the physical device. Remember that main() is operating in a different thread from the stream tasks. Thus, a new message can come into our application while we're finalizing the processing of the current one. In fact, the recording device could represent many physical channels, and we could implement a system that has many simultaneous “current” calls. In such a system, our BasicTask might instantiate each task into many threads instead of only one:


class ReleaseDevice : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));
    message->recorder ()->release ();
    return 0;
  }
};

EncodeMessage

To keep life easy for other applications using recorded data, it is preferable to encode the three message types—text, audio, video—into a standard format. We might, for instance, require that all audio be encoded into “radio-quality” MP3:


class EncodeMessage : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));

    ACE_FILE_Addr &incoming = message->addr ();
    ACE_FILE_Addr addr = this->get_message_destination (message);

    if (message->is_text ())
      Util::convert_to_unicode (incoming, addr);
    else if (message->is_audio ())
      Util::convert_to_mp3 (incoming, addr);
    else if (message->is_video ())
      Util::convert_to_mpeg (incoming, addr);
    message->addr (addr);
    return 0;
  }

  ACE_FILE_Addr get_message_destination (Message *message)
  {
    // ...
  }
};

The get_message_destination() method determines the final location of the encoded message. The Util object methods take care of encoding the message appropriately and placing the result into the final path. This final path is then added to the Message in case remaining tasks need to know what it is.

SaveMetaData

SaveMetaData is our most complex task yet consists of only a handful of lines. By now, you should have an appreciation of how the Streams framework, along with a little housekeeping, can eliminate all the tedious coding and allow you to focus on your application logic!

The message's metadata will describe the message for other applications. This information includes the path to the message, the type of message—text, audio, or video—and other interesting bits. We've chosen to create a simple XML (Extensible Markup Language) file to contain the metadata:


class SaveMetaData : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("SaveMetaData::process()"));

    ACE_CString path (message->addr ().get_path_name ());
    path += ".xml";

    ACE_FILE_Connector connector;
    ACE_FILE_IO file;
    ACE_FILE_Addr addr (path.c_str ());
    if (connector.connect (file, addr) == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p "),
                         ACE_TEXT ("create meta-data file")),
                        0);
    file.truncate (0);
    this->write (file, "<Message> ");
    // ...
    this->write (file, "</Message> ");
    file.close ();
    return 0;
  }

private:
  int write (ACE_FILE_IO &file, const char *str)
  {
    return file.send (str, ACE_OS::strlen (str));
  }
};

NotifySomeone

Our final task is to notify someone that a new message has arrived. This task may be as simple as log file entry or something more complex, such as sending the message as an attachment in an e-mail to an interested party:


class NotifySomeone : public BasicTask
{
protected:
  virtual int process (Message *message)
  {
    ACE_TRACE (ACE_TEXT ("NotifySomeone::process()"));

    // Format an email to tell someone about the
    // newly received message.
    // ...

    // Display message information in the logfile
    ACE_DEBUG ((LM_INFO,
                ACE_TEXT ("New message from %s ")
                ACE_TEXT ("received and stored at %s "),
                message->caller_id ()->string (),
                message->addr ().get_path_name ()));
    return 0;
  }
};

18.2.4 Remainder

We have seen that creating and using the ACE_Stream framework is very easy to do. The bulk of our sample code, in fact, had little or nothing to do with the framework itself. If we had included the answering system objects, we would find that the stream interaction code was less than 10 percent of the total. This is as it should be. A good framework should do as much as possible and allow you to focus on your application.

18.3 A Bidirectional Stream

In this section, we use a bidirectional stream to implement a command stream. The general idea is that each module on the stream will implement one command supported by a RecordingDevice. The RecordingDevice will configure a Command object and place it on the stream; the first module capable of processing the Command will do so and return the results up the stream for the RecordingDevice to consume. Figure 18.2 shows the logical structure of the Command Stream.

Figure 18.2. Logical structure of the command stream

image

We will work this example from the inside out so that we can focus on the details of the CommandStream itself. The following subsections describe the stream, its tasks, and how the RecordingDevice derivative uses the stream.

18.3.1 CommandStream

For purposes of this example, we have created a RecordingDevice that will record a message delivered on a socket. Each task pair of our command stream will implement a RecordingDevice command by interacting with the socket in an appropriate manner.

We begin with the definition of our CommandStream class:


class CommandStream : public ACE_Stream<ACE_MT_SYNCH>
{
public:
  typedef ACE_Stream<ACE_MT_SYNCH> inherited;

  CommandStream (ACE_SOCK_Stream *peer)
    : inherited (), peer_(peer) { }

  virtual int open (void *arg,
                    ACE_Module<ACE_MT_SYNCH> *head = 0,
                    ACE_Module<ACE_MT_SYNCH> *tail = 0);
  Command *execute (Command *command);


private:
  CommandStream () { }
  ACE_SOCK_Stream *peer_;
};

We expect clients of to provide a socket when instantiating CommandStream. After opening the stream, clients use the execute() method to request command execution.

The open() method is where the stream's modules are created and pushed onto the stream. The method begins by invoking the superclass's open() method so that we don't have to duplicate that functionality:


int CommandStream::open (void *arg,
                         ACE_Module<ACE_MT_SYNCH> *head,
                         ACE_Module<ACE_MT_SYNCH> *tail)
{
  ACE_TRACE (ACE_TEXT ("CommandStream::open(peer)"));

  if (this->inherited::open (arg, head, tail) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                       ACE_TEXT ("Failed to open superclass")),
                      -1);

In this example, we are not providing custom head/tail modules. If no task is registered to handle a command, the task will reach the default tail module and cause an error. A more robust implementation would replace the default tail module with one that would return an error back upstream, in much the same way that results will be returned upstream.

We now create a task pair for each RecordingDevice command we intend to support:


CommandModule *answerCallModule;
ACE_NEW_RETURN (answerCallModule,
                AnswerCallModule (this->peer_),
                -1);
CommandModule *retrieveCallerIdModule;
ACE_NEW_RETURN (retrieveCallerIdModule,
                RetrieveCallerIdModule (this->peer_),
                -1);
CommandModule *playMessageModule;
ACE_NEW_RETURN (playMessageModule,
                PlayMessageModule (this->peer_),
                -1);
CommandModule *recordMessageModule;
ACE_NEW_RETURN (recordMessageModule,
                RecordMessageModule (this->peer_),
                -1);

At this point, we don't care about the specific tasks. Each CommandModule is a derivative of ACE_Module and knows what tasks need to be created to process the command it represents. This approach helps to decouple the module and the stream. As each module is constructed, we provide it with a copy of the pointer to the socket. We'll see later that the module's tasks are then able to fetch this pointer when they need to interact with the socket.

With the stream ready and the modules created, we can now push each one onto the stream. Because this stream doesn’t represent an ordered set of steps, we can push them in any order. Because a Command must flow down the stream until it encounters a task pair to process it, it is wise to have the module responsible for the most-used command be at the beginning of the stream. Our example is pretty much a one-shot sequence of commands, though, so the order doesn’t matter:


if (this->push (answerCallModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT (answerCallModule->name())),
                    -1);
if (this->push (retrieveCallerIdModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT (retrieveCallerIdModule->name())),
                    -1);
if (this->push (playMessageModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT (playMessageModule->name())),
                    -1);
if (this->push (recordMessageModule) == -1)
  ACE_ERROR_RETURN ((LM_ERROR,
                     ACE_TEXT ("Failed to push %p "),
                     ACE_TEXT (recordMessageModule->name())),
                    -1);

The final CommandStream method is execute(). A client of the stream will construct a Command instance and provide it to execute() for processing. The execute() method will send the command downstream and wait for a result to be returned:


Command *CommandStream::execute (Command *command)
{
  ACE_Message_Block *mb;
  ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0);
  if (this->put (mb) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("Fail on put command %d: %p "),
                       command->command_,
                       ACE_TEXT ("")),
                      0);

  this->get (mb);
  command = (Command *)mb->data_block ();
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Command (%d) returns (%d) "),
              command->command_,
              command->numeric_result_));

  return command;
}

The Command object is a derivative of ACE_Data_Block. This allows us to provide it directly to the ACE_Message_Block constructor. It also allows us to take advantage of the fact that the ACE Streams framework—ACE_Message_Block, in particular—will free the data block's memory at the appropriate time so that we don't have any memory leaks.

Once the message block is configured, we start it down the stream with the put() method. We immediately invoke get() to wait for the return data to be given to us. The return data is then cast back to a Command instance and returned to the caller. The TextListener implementation of RecordingDevice uses execute() to implement each RecordingDevice command. But we first need to look at the objects that do the work.

18.3.2 Supporting Objects and Base Classes

Command

In order for all this to work cleanly, we need a bit of support structure. First and foremost is our Command object, the interface between the clients of the CommandStream, such as TextListener, and the tasks that provide the implementation:


class Command : public ACE_Data_Block
{
public:
  // Result Values
  enum {
    RESULT_PASS    = 1,
    RESULT_SUCCESS = 0,
    RESULT_FAILURE = -1
  };

  // Commands
  enum {
    CMD_UNKNOWN            = -1,
    CMD_ANSWER_CALL        = 10,
    CMD_RETRIEVE_CALLER_ID,
    CMD_PLAY_MESSAGE,
    CMD_RECORD_MESSAGE
  } commands;

  int flags_;
  int command_;

  void *extra_data_;

  int numeric_result_;
  ACE_CString result_;
};

The Command class extends ACE_Data_Block so that we can take advantage of the autodestruction provided by ACE_Message_Block. Recall that CommandStream::execute() simply instantiates an ACE_Message_Block with the provided Command. The fact that Command is an ACE_Data_Block makes this trivial. In a more robust application, we would create further derivatives of Command instead of using the generic public member variables.

CommandModule

CommandModule is the base class for all the modules our CommandStream will be using. We got a hint of this in CommandStream::open(). The CommandModule is nothing more than an adapter around its ACE_Module base class so that we can easily provide the socket to the module and easily retrieve the socket. All necessary casting is handled internally by CommandModule so that its clients—the command implementation tasks—can remain cast free:


class CommandModule : public ACE_Module<ACE_MT_SYNCH>
{
public:
  typedef ACE_Module<ACE_MT_SYNCH> inherited;
  typedef ACE_Task<ACE_MT_SYNCH> Task;

  CommandModule (const ACE_TCHAR *module_name,
                   CommandTask *writer,
                   CommandTask *reader,
                   ACE_SOCK_Stream *peer);

  ACE_SOCK_Stream &peer (void);
};

The constructor has essentially the same signature as that of ACE_Module but with the more specific data types appropriate to our application:


CommandModule::CommandModule (const ACE_TCHAR *module_name,
                              CommandTask *writer,
                              CommandTask *reader,
                              ACE_SOCK_Stream *peer)
  : inherited(module_name, writer, reader, peer)
{ }

The peer() method makes use of the arg() method of ACE_Module to retrieve its optional data. The arg() result is then cast into an ACE_SOCK_Stream as expected by the clients of CommandModule:


ACE_SOCK_Stream &CommandModule::peer (void)
{
  ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg ();
  return *peer;
}

CommandTask

The workhorse of our CommandStream framework is the CommandTask object, which extends ACE_Task and is the base class for all the tasks that implement each command. CommandTask provides a svc() method that will

• Retrieve Command requests from its message queue

• Determine whether the task can process the Command

• Decide where to pass the Command next


class CommandTask : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;

  virtual ~CommandTask () { }

  virtual int open (void * = 0 );

  int put (ACE_Message_Block *message,
           ACE_Time_Value *timeout);

  virtual int svc (void);

  virtual int close (u_long flags);

protected:
  CommandTask (int command);

  virtual int process (Command *message);

  int command_;
};

Before we look at svc() in detail, let's see what the other methods are doing for us. The constructor simply initializes the ACE_Task base class and sets the command_ attribute. This will be provided by a derivative and should be one of the enumerated values from Command. The svc() method will compare the command_ attribute of an incoming Command to this value to determine whether a CommandTask derivative instance can process the requested command:


CommandTask::CommandTask (int command)
  : inherited (), command_(command)
{ }

The open() method is as we've seen before. It simply creates a new thread in which the task will execute. For this example, we need only one thread per command:


int CommandTask::open (void *)
{
  return this->activate ();
}

The put() method is similarly familiar:


int CommandTask::put (ACE_Message_Block *message,
                      ACE_Time_Value *timeout)
{
  return this->putq (message, timeout);
}

The close() method is yet another boilerplate from our previous example:


int CommandTask::close (u_long flags)
{
  int rval = 0;
  if (flags == 1)
    {
      ACE_Message_Block *hangup = new ACE_Message_Block;
      hangup->msg_type (ACE_Message_Block::MB_HANGUP);
      if (this->putq (hangup->duplicate ()) == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("%p "),
                             ACE_TEXT ("Task::close() putq")),
                            -1);
        }
      hangup->release ();
      rval = this->wait ();
    }

  return rval;
}

Next comes our virtual process() method. This base class implementation returns a failure code. The CommandTask derivatives are expected to override this method to implement the command they represent. Thus, process() will return RESULT_FAILURE if processing fails, RESULT_SUCCESS if it succeeds, or RESULT_PASS if it chooses not to process the command. On failure or success, the Command will be returned upstream to where the command stream's execute() method is blocking on getq(). On RESULT_PASS, the Command instance will continue downstream. Any return from an upstream task will allow the Command to continue the upstream journey:


int CommandTask::process (Command *)
{
  ACE_TRACE (ACE_TEXT ("CommandTask::process()"));
  return RESULT_FAILURE;
}

With the trivia behind us, we can now take a look at svc(). We begin with the usual business of taking a message block off the task's queue and checking for shutdown:


int CommandTask::svc (void)
{
  ACE_Message_Block *message;

  for (;;)
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("CommandTask::svc() - ")
                  ACE_TEXT ("%s waiting for work "),
                  this->module ()->name ()));

      if (this->getq (message) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("%p "),
                           ACE_TEXT ("getq")),
                          -1);

      if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
        {
          if (this->putq (message->duplicate ()) == -1)
            {
              ACE_ERROR_RETURN ((LM_ERROR,
                                 ACE_TEXT ("%p "),
                                 ACE_TEXT ("Task::svc() putq")),
                                -1);
            }

          message->release ();
          break;
        }

Now that we have a valid message block, we extract its data block. We know the data block is a Command instance, so we cast that back. If the command we're being asked to execute is not “ours,” the message block is sent on to the next module in the stream. As this is a bidirectional stream, the put_next() could be moving data up- or downstream, but we don't care at this stage:


Command *command = (Command *)message->data_block ();

ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT ("CommandTask::svc() - ")
            ACE_TEXT ("%s got work request %d "),
            ACE_TEXT (this->module ()->name ()),
            command->command_));

if (command->command_ != this->command_)
  {
    this->put_next (message->duplicate ());
  }

If the Command is our responsibility, we need to invoke process() on it. Or, more specifically, our derivative needs to process it. We help by setting the numeric_result_ attribute to –1 if process() failed. This allows our derivative's process() to simply return Command::RESULT_FAILURE,, yet the command stream's client can inspect numeric_result_:



else
  {
    int result = this->process (command);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("CommandTask::svc() - ")
                ACE_TEXT ("%s work request %d result is %d "),
                ACE_TEXT (this->module ()->name ()),
                command->command_,
                result));

    if (result == Command::RESULT_FAILURE)
      {
        command->numeric_result_ = -1;
      }

If the Command is intended for this task, the task's process() method can still decide to let someone else handle the processing. Perhaps a scenario will require two or more tasks to process the same Command. In any case, a RESULT_PASS return value will let the message continue along the stream:


else if (result == Command::RESULT_PASS)
  {
    this->put_next (message->duplicate ());
  }

Any other return value must be success, and we need to decide what should be done with the Command. If the current task is on the downstream side of the CommandStream (is_writer()), we want to turn around the Command and send it back to the stream head. This is simply done by putting the message block on our sibling task's message queue:


else // result == Command::RESULT_SUCCESS
  {
    if (this->is_writer ())
      {
        this->sibling ()->putq
          (message->duplicate ());
      }

On the other hand, if the task is on the upstream side, we want the Command to keep flowing upstream:



else // this->is_reader ()
  {
    this->put_next (message->duplicate ());
  }

That completes the section where this task is processing a command. All that is left now is to release the message block taken off the message queue and wrap up the method:


message->release ();
    }   // for (;;)

  return 0;
}

18.3.3 Implementations

Answer Call

Now that we've seen the CommandModule and CommandTask base classes, we can look at the specific implementations instantiated by CommandStream::open(). In the life cycle of recording a call, the first thing we must do is answer the call.

To implement the Answer Call function, we've created three objects. AnswerCallModule is a CommandModule (ACE_Module) responsible for creating the upstream and downstream tasks. The only method of AnswerCallModule is the constructor, which simply provides the necessary information to the base class instance. The peer parameter is provided as “extra data” to the base class so that the peer() method can use the arg() method to return the socket to either of the tasks:


AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer)
  : CommandModule ("AnswerCall Module",
                   new AnswerCallDownstreamTask (),
                   new AnswerCallUpstreamTask (),
                   peer)
{ }

The two tasks are responsible for handling the CMD_ANSWER_CALL command; therefore, they provide this value to the CommandTask constructor:



AnswerCallDownstreamTask::AnswerCallDownstreamTask (void)
  : CommandTask(Command::CMD_ANSWER_CALL)
{ }

AnswerCallUpstreamTask::AnswerCallUpstreamTask (void)
  : CommandTask(Command::CMD_ANSWER_CALL)
{ }

All that is left now is to implement the process() method that will answer the incoming connection request. The decision to put this on the downstream or upstream side is rather arbitrary and in the end doesn't matter. We've taken the approach that any “active” action will go on the downstream side and any “passive,” or “receiving,” action on the upstream side. Thus, we implement the connection acceptance on the downstream task:


int AnswerCallDownstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream) ")));

  TextListenerAcceptor *acceptor =
    (TextListenerAcceptor *)command->extra_data_;

  CommandModule *module =
    (CommandModule*)this->module ();

  command->numeric_result_ =
    acceptor->accept (module->peer ());

  acceptor->release ();
  return Command::RESULT_SUCCESS;
}

A few things are worth noting here. First, the Command instance's extra data is expected to be a TextListenerAcceptor instance. This sort of implied API could be avoided by creating derivatives of Command for each command verb.

Next of interest is the module() method. Any ACE_Task has access to this method for retrieving the ACE_Module in which the task is contained, if any. Because our module is a CommandModule, we cast the module() return. On our CommandModule, we can then invoke the peer() method to get a reference to the socket.

Finally, we return Command::RESULT_SUCCESS to tell CommandTask::svc() that we've processed the command and that the data is ready to be sent upstream to the client.

CommandTask::svc() will then invoke putq() on the upstream task whose process() method has nothing important to do:


int AnswerCallUpstreamTask::process (Command *)
{
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream) ")));

  return Command::RESULT_SUCCESS;
}

Retrieve Caller ID

The action of retrieving caller ID from a socket consists of gathering up the IP of the remote peer. Like the AnswerCallModule, the RetrieveCallerIdModule consists of nothing more than a constructor:


RetrieveCallerIdModule::RetrieveCallerIdModule
  (ACE_SOCK_Stream *peer)
    : CommandModule ("RetrieveCallerId Module",
                     new RetrieveCallerIdDownstreamTask (),
                     new RetrieveCallerIdUpstreamTask (),
                     peer)
{ }

We consider this to be a read operation, so we've implemented it on the upstream side of the stream:


RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask
  (void)
    : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }

int RetrieveCallerIdUpstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Returning Caller ID data ")));

  ACE_INET_Addr remote_addr;

  CommandModule *module =
    (CommandModule*)this->module ();
  module->peer ().get_remote_addr (remote_addr);
  ACE_TCHAR remote_addr_str[256];
  remote_addr.addr_to_string (remote_addr_str, 256);
  command->result_ = ACE_CString (remote_addr_str);

  return Command::RESULT_SUCCESS;
}

There should be nothing surprising here. We again use the module() method to get access to our CommandModule and its peer(). The peer's address is returned in the result_ attribute of the Command for our client's consumption.

Our downstream object is expectedly simple:


RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask
  (void)
    : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
{ }

int RetrieveCallerIdDownstreamTask::process (Command *)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Retrieving Caller ID data ")));

  return Command::RESULT_SUCCESS;
}

Play Message and Record Message

As you can see, our CommandModule and CommandTask derivatives are well insulated from the mechanics of the ACE_Stream framework. As with our uni-directional stream example, we have gone to some effort to ensure that the application programmer can focus on the task at hand—no pun intended—rather than worry about the details of the underlying framework.

Play Message and Record Message each require an appropriate module constructor. Play Message is then implemented on the downstream side:



int PlayMessageDownstreamTask::process (Command *command)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Play Outgoing Message ")));

  ACE_FILE_Connector connector;
  ACE_FILE_IO file;

  ACE_FILE_Addr *addr =
    (ACE_FILE_Addr *)command->extra_data_;

  if (connector.connect (file, *addr) == -1)
    {
      command->numeric_result_ = -1;
    }
  else
    {
      command->numeric_result_ = 0;

      CommandModule *module =
        (CommandModule*)this->module ();

      char rwbuf[512];
      int rwbytes;
      while ((rwbytes = file.recv (rwbuf, 512)) > 0)
        {
          module->peer ().send_n (rwbuf, rwbytes);
        }
    }

  return Command::RESULT_SUCCESS;
}

RecordMessage is implemented on the upstream side:


int RecordMessageUpstreamTask::process (Command *command)
{
  // Collect whatever the peer sends and write into the
  // specified file.
  ACE_FILE_Connector connector;
  ACE_FILE_IO file;

  ACE_FILE_Addr *addr =
    (ACE_FILE_Addr *)command->extra_data_;
  if (connector.connect (file, *addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p "),
                       ACE_TEXT ("create file")),
                      Command::RESULT_FAILURE);
  file.truncate (0);

  CommandModule *module =
    (CommandModule*)this->module ();

  int total_bytes = 0;
  char rwbuf[512];
  int rwbytes;
  while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0)
    {
      total_bytes += file.send_n (rwbuf, rwbytes);
    }

  file.close ();

  ACE_DEBUG ((LM_INFO,
              ACE_TEXT ("RecordMessageUpstreamTask ")
              ACE_TEXT ("- recorded %d byte message "),
              total_bytes));

  return Command::RESULT_SUCCESS;
}

18.3.4 Using the Command Stream

All our component parts are now in place. We have an opaque CommandStream into which one can place a Command instance and retrieve a response. The CommandStream is built from a list of CommandModule derivatives, each of which contains a pair of CommandTask derivatives. The program logic is implemented in the CommandTask derivatives and, for the most part, is immune to the details of the ACE_Stream framework.

Let's now look at how the TextListener implementation of RecordingDevice uses the CommandStream. Because we can concurrently accept connections on a socket and process established connections, we have two RecordingDevice derivatives to implement our socket recorder. The first, TextListenerAcceptor, implements only the wait_for_activity() method of RecordingDevice. That has nothing to do with the stream interaction, so we won't go into the details. In short, TextListenerAcceptor::wait_for_activity() will wait for a connection request on the socket and return a new TextListener instance when that happens.

TextListener implements the other RecordingDevice interface, using the CommandStream. We begin with the constructor as invoked by the acceptor:


TextListener::TextListener (TextListenerAcceptor *acceptor)
  : acceptor_(acceptor)
{
  ACE_TRACE (ACE_TEXT ("TextListener ctor"));

  ACE_NEW (this->command_stream_, CommandStream (&(this->peer_)));
  this->command_stream_->open (0);
}

The TextListenerAcceptor doesn't even accept the incoming socket connection. It shouldn't; that's a job for the Answer Call task, as shown earlier. Therefore, the acceptor's wait_for_activity() method provides a pointer to the acceptor object when the TextListener is created. This is held in a member attribute until ready to be used in the answer_call() method.

We've chosen not to implement an open() method for the TextListenerAcceptor to invoke. Thus, we create and initialize the CommandStream directly in the constructor. The peer_ attribute is not yet connected, but because it is instantiated, we can safely provide a pointer to it in the command stream at this stage. In fact, we must provide it, API aside, so that the answer_call task can perform the connection:


int TextListener::answer_call (void)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::answer_call() ")));

  Command *c = new Command ();
  c->command_ = Command::CMD_ANSWER_CALL;
  c->extra_data_ = this->acceptor_;

  c = this->command_stream_->execute (c);

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::answer_call() ")
              ACE_TEXT ("result is %d "),
              c->numeric_result_));

  return c->numeric_result_;
}

To implement the answer_call() method, the TextListener first creates a Command instance with the command_ attribute set to Command::CMD_ANSWER_CALL.. This tells the command stream what needs to be done. We also save the TextListenerAcceptor instance (acceptor_) as extra data on the Command. This satisfies the implied API of the AnswerCallDownstreamTask so that it can establish the connection.

The newly created Command is then given to the stream for execution. As written, the execute() method will block until the command has been completed. For simplicity, execute() will return a Command instance identical to the one it was given,6 plus the return values.

The remainder of the TextListener methods follow this general pattern. They are presented in life-cycle order.

retrieve_callerId


CallerId *TextListener::retrieve_callerId (void)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("TextListener::retrieve_callerId() ")));

  Command *c = new Command ();
  c->command_ = Command::CMD_RETRIEVE_CALLER_ID;

  c = this->command_stream_->execute (c);

  CallerId *caller_id = new CallerId (c->result_);
  return caller_id;
}

play_message


int TextListener::play_message (ACE_FILE_Addr &addr)
{
  MessageType *type = Util::identify_message (addr);
  if (type->is_text ())
    {
      Command *c = new Command ();
      c->command_ = Command::CMD_PLAY_MESSAGE;
      c->extra_data_ = &addr;

      c = this->command_stream_->execute (c);
      return c->numeric_result_;
    }

  ACE_FILE_Addr temp ("/tmp/outgoing_message.text");
  ACE_FILE_IO *file;

  if (type->is_audio ())
    file = Util::audio_to_text (addr, temp);
  else if (type->is_video ())
    file = Util::video_to_text (addr, temp);
  else
    ACE_ERROR_RETURN
      ((LM_ERROR, ACE_TEXT ("Invalid message type %d "),
        type->get_codec ()), -1);
  int rval = this->play_message (temp);
  file->remove ();
  return rval;
}

record_message


MessageType *TextListener::record_message (ACE_FILE_Addr &addr)
{
  Command *c = new Command ();
  c->command_ = Command::CMD_RECORD_MESSAGE;
  c->extra_data_ = &addr;
  c = this->command_stream_->execute (c);
  if (c->numeric_result_ == -1)
      return 0;

  return new MessageType (MessageType::RAWTEXT, addr);
}

release

The release() method is invoked by our RecordingStream framework when the recording process is complete. Because a new TextListener is instantiated for each recording, we take this opportunity to free that memory:


void TextListener::release (void)
{
  delete this;
}

18.4 Summary

In this chapter, we have investigated the ACE Streams framework. An ACE_Stream is nothing more than a doubly linked list of ACE_Module instances, each of which contains a pair of ACE_Task derivatives. Streams are useful for many things, only two of which we've investigated here. The following list enumerates how one would use a stream.

  1. Create one or more ACE_Task derivatives that implement your application logic.
  2. If applicable, pair these tasks into downstream and upstream components.
  3. For each pair, construct a module to contain them.
  4. Push the modules onto the stream in a last-used/first-pushed manner.

Keep in mind the following when architecting your stream.

• Each task in the stream can exist in one or more threads. Use multiple threads when your application can take advantage of parallel processing.

• The default tail tasks will return an error if any data reaches them. If your tasks don't entirely consume the data, you should at least provide a replacement downstream tail task.

• You can provide your task's open() method with arbitrary data by passing it as the fourth parameter (args) of module's constructor or open() method.

• A task's open() method is invoked as a result of pushing its module onto the stream, not as a result of invoking open() on the module.

ACE_Message_Block instances are reference counted and will not leak memory if you correctly use their duplicate() and release() methods.

One last thing: If you're wondering what happened to the protocol stack we mentioned, rest assured that we haven't forgotten it. The ACE source is distributed with online tutorials. Please check out tutorial 15 in the ACE_wrappers/docs/tutorials directory for a protocol stream implementation example.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.104.29