Chapter 12. Basic Multithreaded Programming

Multithreaded programming has become more and more of a necessity in today's software. Whereas yesterday, most general-purpose operating systems provided programmers with only user-level thread libraries, most of today's operating systems provide real preemptive multitasking. As the prices of multiprocessor machines fall, their use has also become prevalent. The scale of new software also appears to be growing. Whereas formerly, concurrent users numbered in the hundreds or thousands, today that number has jumped tenfold. Development of real time software has always required the use of thread priorities; with the surge of intelligent embedded devices, many more developers are getting involved in this field. All these factors combined make it more and more important for developers to be familiar with threading concepts and their productive use.

This chapter introduces the basics of using threads with the ACE toolkit. Because ACE provides so much in the area of threads, we have divided the discussion on threads into two chapters. This chapter covers the basics: creating new threads of control, safety primitives that ensure consistency when you have more than one thread accessing a global structure, and event and data communication between threads. Chapter 13 discusses more advanced topics, such as thread scheduling and management.

12.1 Getting Started

By default, a process is created with a single thread, which we call the main thread. This thread starts executing in the main() function of your program and ends when main() completes. Any extra threads that your process may need have to be explicitly created. To create your own thread with ACE, all you have to do is create a subclass of the ACE_Task_Base class and override the implementation of the virtual svc() method. The svc() method serves as the entry point for your new thread; that is, your thread starts in the svc() method and ends when the svc() method returns, in a fashion similar to the main thread.

You will often find yourself using extra threads to help process incoming messages for your network servers. This prevents clients that do not require responses from blocking on the network server, waiting for long-running requests to complete. In our first example, we create a home automation command handler class, HA_CommandHandler, that is responsible for applying long-running command sequences to the various devices that are connected on our home network. For now, we simulate the long-running processing with a sleep call. We print out the thread identifier for both the main thread and the command handler thread, using the ACE_DEBUG() macro's %t format specifier, so that we can see the two threads running in our debug log:


#include "ace/Task.h"

class HA_CommandHandler : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG
      ((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running ")));
    ACE_OS::sleep (4);
    return 0;
  }
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_DEBUG
    ((LM_DEBUG, ACE_TEXT ("(%t) Main Thread running ")));

  HA_CommandHandler handler;
  int result = handler.activate ();
  ACE_ASSERT (result == 0);

  handler.wait ();
  return 0;
}

To start the thread, you must create an instance of HA_CommandHandler and call activate() on it. Before doing this, we print out the main thread's identifier so that we can compare both child and main identifiers in our output debug log.

After activating the child thread, the main thread calls wait() on the handler object, waiting for its threads to complete before continuing and falling out of the main() function. Once the child thread completes the svc() method and exits, the wait() call in the main thread will complete, control will fall out of the main() function, and the process will exit. Why does the main thread have to wait for the child thread to complete? On many platforms, once the main thread returns from the main() function, the C runtime sees this as an indication that the process is ready to exit and destroys the entire running process, including the child thread. If we allowed this to happen, the program might exit before the child thread ever got scheduled and got a chance to execute.

The output shows the two threads—the main thread and the child command handler thread—running:


(496) Main Thread running
(3648) Handler Thread running

12.2 Basic Thread Safety

One of the most difficult problems you deal with when writing multithreaded programs is maintaining consistency of all globally available data. Because you have multiple threads accessing the same objects and structures, you must make sure that any updates made to these objects are safe. What safety means in this context is that all state information remains in a consistent state.

ACE provides a rich array of primitives to help you to achieve this goal. We cover a few of the most useful and commonly used primitives in the next few sections and continue coverage on the rest of these components in Chapter 14.

12.2.1 Using Mutexes

Mutexes, the simplest protection primitive available, provide a simple acquire(), release() interface. If successful in getting the mutex, the acquiring thread, acquire(), continues forward; otherwise, it blocks until the holder of the mutex releases it by using release().

As shown in Table 14.1, ACE provides several mutex classes. ACE_Mutex can be used as a lightweight synchronization primitive for threads and as a heavyweight cross-process synchronization primitive.

In the next example, we add a device repository to our home automation example. This repository contains references to all the devices connected to our home network, as well as the interface to apply command sequences to the various devices connected to our home network. Let us suppose that only one thread can make updates in the repository at a time, without causing consistency problems.

The repository creates and manages an ACE_Thread_Mutex object as a data member that it uses to ensure the consistency constraint. This is a common idiom that you will find yourself using on a regular basis. Whenever it calls the update_device() method, a thread first has to acquire the mutex before continuing forward, as only one thread can have the mutex at a time; at no point will two threads simultaneously update the state of the repository. It is important that release() be called on the mutex so that other threads can acquire the repository mutex and update the repository after the first thread is done. When the repository is destroyed, the destructor of the mutex will ensure that it properly releases all resources that it holds:


class HA_Device_Repository
{
public:
  HA_Device_Repository ()
  { }

  void update_device (int device_id)
  {
    mutex_.acquire ();
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Updating device %d "),
                device_id));
    ACE_OS::sleep (1);
    mutex_.release ();
  }

private:
  ACE_Thread_Mutex mutex_;
};

To illustrate the mutex in action, we modify HA_CommandHandler to call update_device() on the repository and then create two handler tasks that compete with each other, trying to update devices in the repository at the same time:


class HA_CommandHandler : public ACE_Task_Base
{
public:
  enum {NUM_USES = 10};

  HA_CommandHandler (HA_Device_Repository& rep) : rep_(rep)
  { }

  virtual int svc (void)
  {
    ACE_DEBUG
      ((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running ")));
    for (int i=0; i < NUM_USES; i++)
      this->rep_.update_device (i);
    return 0;
  }

private:
  HA_Device_Repository & rep_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_Device_Repository rep;
  HA_CommandHandler handler1 (rep);
  HA_CommandHandler handler2 (rep);
  handler1.activate ();
  handler2.activate ();

  handler1.wait ();
  handler2.wait ();
  return 0;
}

The output from this program shows the two handler threads competing to update devices in the repository:


(3768) Handler Thread running
(3768) Updating device 0
(1184) Handler Thread running
(1184) Updating device 0
(3768) Updating device 1
(1184) Updating device 1
(3768) Updating device 2
(1184) Updating device 2
(3768) Updating device 3
(1184) Updating device 3
(3768) Updating device 4

You may notice that on your platform, one thread may hang onto the repository until it is done before it lets go or that the threads run amok among one another, with no particular order as to which thread uses the repository. You can ensure strict ordering—if that is what you need—by using an ACE_Token, which is discussed in Section 14.1.4. But be aware that although tokens support strict ordering and are recursive, they are slower and heavier than mutexes.

12.2.2 Using Guards

In many cases, exceptional conditions cause deadlock in otherwise perfectly working code. This usually happens when we overlook an exceptional path and forget to unlock a mutex. Let's illustrate this with a piece of code:


int
HA_Device_Repository::update_device (int device_id)
{
  this->mutex_.acquire ();
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Updating device %d "),
              device_id));

  // Allocate a new object.
  ACE_NEW_RETURN (object, Object, -1);
  // ...
  // Use the object

  this->mutex_.release ();
}

You can spot the problem here pretty easily: The ACE_NEW_RETURN() macro returns when an error occurs, thereby preventing the lock from being released, and we have a nasty deadlock. In real code, it becomes more difficult to spot such returns, and having multiple release() calls all over your code can quickly become very ugly.

ACE provides a convenience class that solves this problem. The ACE_Guard set of classes and macros help simplify your code and prevent deadlock situations. The guard is based on the familiar C++ idiom of using the constructor and destructor calls for resource acquisition and release. The guard classes acquire a specified lock when they are constructed, and release the lock when they are destroyed. Using a guard on your stack, you are always assured that the lock will be released, no matter what pathological path your code may wind through.

Using a guard instead of explicit calls to acquire and release the mutex makes the previous snippet of code much easier to read:


int
HA_Device_Repository::update_device (int device_id)
{
  // Construct a guard specifying the type of the mutex as
  // a template parameter and passing in the mutex to hold
  // as a parameter.
  ACE_Guard<ACE_Thread_Mutex> guard (this->mutex_);

  // This can throw an exception that is not caught here.
  ACE_NEW_RETURN (object, Object, -1);
  // ..
  // Use the object.
  // ..
  // Guard is destroyed, automatically releasing the lock.
}

All we do here is create an ACE_Guard object on the stack. This automatically acquires and releases the mutex on function entry and exit: just what we want. The ACE_Guard template class takes the lock type as its template parameter and also requires you to pass in a lock object that the guard will operate on.

Table 12.1 lists the rich variety of guards that ACE provides. Most of these guards are self-explanatory; we talk about them and use them in several examples in Chapter 14. For further details on each of the guards, consult the ACE reference documentation.

ACE also provides a set of convenient macros that you can use to allocate guards on the stack. These macros expand to use the guard classes listed in Table 12.1 and perform error checking on the underlying acquire() and release() calls. They return on an error, optionally with a return value. In the lists of macros below, LockType is used as T in the guard class template, GuardName is the name of the guard object that's created, and LockObject is the lock object referenced by the guard.

Table 12.1. ACE Guard Classes

image

The following guard macros do not return values:

ACE_GUARD (LockType, GuardName, LockObject)

ACE_WRITE_GUARD (LockType, GuardName, LockObject)

ACE_READ_GUARD (LockType, GuardName, LockObject)

These guard macros return ReturnValue on an error:

ACE_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

ACE_WRITE_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

ACE_READ_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

In the following code snippet, the guard return macro is used with the device repository:


int
HA_Device_Repository::update_device (int device_id)
{
  ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, mutex_, -1);

  ACE_NEW_RETURN (object, Object, -1);
  // Use the object.
  // ...
}

If there is an error, the macro returns –1 from the method; otherwise, it creates an ACE_Guard<ACE_Thread_Mutex> instance called mon on the stack. If the mutex-protected method does not return a value, the not-return-value guards should be used in conjunction with the errno facility.

12.3 Intertask Communication

When writing multithreaded programs, you will often feel the need for your tasks to communicate with one another. This communication may take the form of something simple, such as one thread informing another that it is time to exit, or something more complicated, such as communicating threads passing data back and forth.

In general, intertask communication can be divided into two broad categories:

  1. State change or event notifications, whereby only the event occurrence needs to be communicated, but no data is passed between the two threads
  2. Message passing, whereby data is passed between the two threads, possibly forming a work chain, in which the first thread processes the data and then passes it along to the next for further processing

12.3.1 Using Condition Variables

A thread can use condition variables to communicate a state change, an event arrival, or the satisfaction of another condition to other interested threads. A condition variable is always used in conjunction with a mutex. These variables also have a special characteristic in that you can do a timed block on the variable. This makes it easy to use a condition variable to manage a simple event loop. We show an example of this when we talk about timers in Chapter 20.

We can easily change our command handler example to use a condition variable to coordinate access to the device repository instead of using a mutex for protection. We start by modifying the repository so that it can record which task currently owns it:


class HA_Device_Repository
{
public:
  HA_Device_Repository() : owner_(0)
  { }

  int is_free (void)
    { return (this->owner_ == 0); }

  int is_owner (ACE_Task_Base* tb)
    { return (this->owner_ == tb); }

  ACE_Task_Base *get_owner (void)
    { return this->owner_; }

  void set_owner (ACE_Task_Base *owner)
    { this->owner_ = owner; }

  int update_device (int device_id);

private:
  ACE_Task_Base * owner_;
};

Next, we modify the command handler such that it uses a condition variable (waitCond_) and mutex (mutex_), to coordinate access to the repository. Both the condition variable and the mutex are created on the main() thread stack and are passed to the command handlers during construction.

To use a condition variable, you must first acquire the mutex, check whether the system is in the required state—the required condition is true—and, if so perform the required action and then release the mutex. If the condition is not in the required state, you must call wait() on the condition variable, waiting for the system state to change. Once the system state changes, the thread that is making the change signals the condition variable, waking up one or more of the threads that are waiting for the change. The waiting threads wake up, check the system state again, and, if the state is still amenable, perform the required action; otherwise, they wait again.

In the command handler, the handler thread is waiting for the is_free() condition to become true on the repository. If this happens to be the case, the handler thread successfully acquires the repository and marks itself as the owner, after which it frees the mutex. If any other competing handler tries to acquire the repository for update at this time, the is_free() method will return 0, and the thread will block by calling wait() on the condition variable.

Once the successful thread is done updating, it removes itself as the owner of the repository and calls signal() on the condition variable. This causes the blocked thread to wake up, check whether the repository is free, and, if so, go on its merry way acquiring the repository for update.

You may notice that the blocking thread does not release the mutex before it falls asleep on wait(); nor does it try to acquire it once it wakes up. The reason is that the condition variable ensures the automatic release of the mutex right before falling asleep and acquisition of the mutex just before waking up:


int
HA_CommandHandler::svc (void)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Handler Thread running ")));

  for (int i = 0; i < NUM_USES; i++)
    {
      this->mutex_.acquire ();
      while (!this->rep_.is_free ())
        this->waitCond_.wait ();
      this->rep_.set_owner (this);
      this->mutex_.release ();

      this->rep_.update_device (i);

      ACE_ASSERT (this->rep_.is_owner (this));
      this->rep_.set_owner (0);

      this->waitCond_.signal ();
    }

  return 0;
}

As usual, we create two handlers that compete with each other to acquire the repository. Both are passed the condition variable, mutex, and repository by reference when the handler is constructed.

The ACE_Condition class is a template and requires the type of mutex being used as its argument. Because we are coordinating access between threads, we use ACE_Thread_Mutex as the mutex type. The condition variable instance also keeps a reference to the mutex, so that it can automatically acquire and release it on the wait() call, as described earlier. This reference is passed to the condition variable during construction:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_Device_Repository rep;
  ACE_Thread_Mutex rep_mutex;
  ACE_Condition<ACE_Thread_Mutex> wait (rep_mutex);

  HA_CommandHandler handler1 (rep, wait, rep_mutex);
  HA_CommandHandler handler2 (rep, wait, rep_mutex);

  handler1.activate ();
  handler2.activate ();

  handler1.wait ();
  handler2.wait ();

  return 0;
}

12.3.2 Message Passing

As mentioned earlier, message passing is often used to communicate data and event occurrences between threads. The sender creates a message that it enqueues on a message queue for the receiver to pick up. The receiver is either blocked or polling the queue, waiting for new data to arrive. Once the data is in the queue, the receiver dequeues the data, uses it, and then goes back to waiting for new data on the queue.

The queue acts as a shared resource between the two threads, and thus the enqueue and dequeue operations must be protected. (See Figure 12.1.) It also would be handy if the queue supports a blocking dequeue call that unblocks when new data arrives. Finally, it would be convenient if each task object that we created comes out of the box with a message queue attached to it. That way, we wouldn't have to have a global queue. Instead, if a task has a reference to any other task, it can send it messages.

Figure 12.1. Using a queue for communication

image

Fortunately, all these features come out of the box with ACE. Up to this point, we have been using ACE_Task_Base as the base class for all our example threads. ACE also provides a facility to queue messages between threads that are derived from the ACE_Task template. By deriving your class from ACE_Task, you automatically inherit a message queue of type ACE_Message_Queue, which you can use in your new class. ACE_Message_Queue is modeled after the queueing facilities available with System V streams. However, unlike their System V counterparts, the ACE facility allows for efficient intertask communication within a single process and does not provide for interprocess communication.

The message queue provides a type-safe interface, allowing you to enqueue messages that are instances of ACE_Message_Block.

Message Blocks

The ACE_Message_Block is an efficient data container that can be used to efficiently store and share messages. You can think of the message block as an advanced data buffer that supports such nice features as reference counting and data sharing. Each message block contains two pointers: a rd_ptr(), which points to the next byte to be read, and a wr_ptr(), which points to the next available empty byte. You can use these pointers to copy data into and get data out of the message block.

You can use the copy() method to copy data into the message block:


ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (128), -1);

const char *deviceAddr= "Dev#12";
mb->copy (deviceAddr, ACE_OS::strlen (deviceAddr)+1);

Or, you can use the wr_ptr() directly. When doing so, you must move the wr_ptr() forward manually, so that the next write is at the end of the buffer:


ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (128), -1);

const char *commandSeq= "CommandSeq#14";
ACE_OS::sprintf (mb->wr_ptr (), commandSeq);
// Move the wr_ptr() forward in the buffer by the
// amount of data we just put in.
mb->wr_ptr (ACE_OS::strlen (commandSeq) +1);

The rd_ptr() is similar to the write pointer. You can use it directly to get the data, but you must be careful to move it forward by the number of bytes you have already read so that you don't read the same data over and over. Once you are done working with the message block, release it using the release() method, causing the reference count to be decremented. When the count reaches 0, ACE will automatically release the memory that was allocated for the block:


ACE_DEBUG((LM_DEBUG,
           ACE_TEXT ("Command Sequence --> %s "),
           mb->rd_ptr ()));
mb->rd_ptr (ACE_OS::strlen (mb->rd_ptr ())+1);
mb->release ();

Message blocks also include a type field, which can be set during construction or through the msg_type() modifier. The message-type field comes in handy when you want to distinguish processing of the message based on its type or to send a simple command notification. An example of the latter is the use of ACE_Messsage_Block::MB_HANGUP message type to inform the message receiver that the source has shut down:


// Send a hangup notification to the receiver.
ACE_NEW_RETURN
  (mb, ACE_Message_Block (128, ACE_Message_Block::MB_HANGUP), -1);
// Send an error notification to the receiver.
mb->msg_type (ACE_Message_Block::MB_ERROR);

ACE_Message_Block also offers the methods duplicate(), to create a new reference to the block's data, incrementing the reference count, and clone(), to create a deep copy of the message block.

Using the Message Queue

To illustrate the use of ACE_Task and its underlying message queue, we extend our previous automation handler example to include a handler derived from ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>, called Message_Receiver, that receives command messages for the devices on the network from TCP-connected remote clients. (For more on this, see Chapter 6.) On receipt of the commands, Message_Receiver first encapsulates them in message blocks and then enqueues them on the command handler's message queue. The HA_Command_Handler derives from ACE_Task instead of ACE_Task_Base, thus inheriting the required message queue functionality. The command handler thread spends its time waiting for command message blocks to arrive on its message queue; on receiving these messages, the handler proceeds to process the commands.

The remote command messages have a simple header, DeviceCommandHeader, followed by a payload that consists of a null-terminated command string:


struct DeviceCommandHeader
{
  int length_;
  int deviceId_;
};

The Message_Receiver service handler uses the length_ field of the header to figure out the size of the payload. Once it knows the length of the payload, the service handler can create an ACE_Message_Block of the exact size: the length of the payload plus the length of the header. The handler then copies the header and payload into the message block and enqueues it on the command handler task's message queue, using the ACE_Task::putq() method. (A reference to the HA_Command_Handler is kept by the Message_Receiver, which it receives on construction.) If the device ID read in from the header is negative, we use it as an indication that the system needs to shut down; to do this, we send a hangup message to HA_Command_Handler:


int
Message_Receiver::handle_input (ACE_HANDLE)
{
  DeviceCommandHeader dch;
  if (this->read_header (&dch) < 0)
    return -1;

  if (dch.deviceId_ < 0)
    {
      // Handle shutdown.
      this->handler_->putq (shut_down_message ());
      return -1;
    }

  ACE_Message_Block *mb;
  ACE_NEW_RETURN
    (mb, ACE_Message_Block (dch.length_ + sizeof dch), -1);
  // Copy the header.
  mb->copy ((const char*)&dch, sizeof dch);
  // Copy the payload.
  if (this->copy_payload (mb, dch.length_) < 0)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                       ACE_TEXT ("Recieve Failure")), -1);
  // Pass it off to the handler thread.
  this->handler_->putq (mb);
  return 0;
}

It is instructive to look at how the payload is copied into the provided message block. We pass the wr_ptr() directly to ACE_SOCK_Stream::read_n(), which copies the data directly into the block. After the data is copied into the block, we advance the wr_ptr() by the size of the payload. Recall that we had moved the wr_ptr() for the message block forward by the size of the header before we made this call; therefore, the message block now contains the header, followed immediately by the payload:


int
Message_Receiver::copy_payload (ACE_Message_Block *mb,
                                int payload_length)
{
  int result =
    this->peer ().recv_n (mb->wr_ptr (), payload_length);

    if (result <= 0)
      {
        mb->release ();
        return result;
      }

    mb->wr_ptr (payload_length);
    return 0;
}

When it gets a command to shut down the system, the message receiver creates a new message block that has no data in it but has the MB_HANGUP type set. When it receives a message, the HA_Command_Handler first checks the type; if it is a hang-up message, it shuts down the system:


ACE_Message_Block *
Message_Receiver::shut_down_message (void)
{
  ACE_Message_Block *mb;
  ACE_NEW_RETURN
    (mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), 0);
  return mb;
}

On the other side of the fence, the HA_CommandHandler thread blocks, waiting for messages to arrive on its queue by calling getq() on itself. Once a message arrives, getq() will unblock; the handler then reads the messages and applies the received command to the device repository. Finally, it releases the message block, which will deallocate the used memory as the block's reference count drops to 0.

As we said earlier, the system uses a message of type MB_HANGUP to inform the server to shut down. On receiving a message of this type, the handler stops waiting for incoming messages and shuts down the reactor, using the ACE_Reactor::end_reactor_event_loop() method. This causes the command handler process to shut down.

Other Queue Types

ACE provides various ACE_Message_Queue subclasses that provide more than the vanilla FIFO queueing available with ACE_Message_Queue. The ACE_Dynamic_Message_Queue offers priority queues, which include dynamic priority adjustments based on various algorithms. ACE also offers several platform-specific queues that incorporate OS-specific characteristics.

You can specify the queue type used by an ACE_Task during construction or by using the msg_queue() modifier. The queue types are listed in Table 12.2.

Table 12.2. Various Queue Types

image

12.4 Summary

The basic high-level ACE threading components provide an easy-to-use object interface to multithreaded programming. In this chapter, we explained how to use the ACE_Task objects to create new threads of control, how to use ACE_Mutex and ACE_Guard to ensure consistency, and how to use ACE_Condition and the ACE_Message_Queue, ACE_Message_Block, and ACE_Task message-passing constructs to incorporate communication between threads.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.71.146