Chapter 14. Thread Safety and Synchronization

We covered the basics of both thread safety and synchronization in Chapter 12, explaining how you can use mutexes and guards to ensure the safety of your global data and how condition variables can help two threads communicate events between each other synchronizing their actions. This chapter extends that discussion. In the safety arena, you will get to see readers/writer locks, the atomic operation wrapper, using tokens for fine-grain locking on data structures, and an introduction to recursive mutexes. We also introduce semaphores and barriers as new synchronization mechanisms.

14.1 Protection Primitives

To ensure consistency, multithreaded programs must use protection primitives around shared data. Some examples of protection were given in Chapter 12, where we introduced mutexes and guards. Here, we consider the remaining ACE-provided protection primitives.

Table 14.1 lists the primitives that are available in ACE. All these primitives have a common interface but do not share type relationships and therefore cannot be polymorphically substituted at runtime.

Table 14.1. Protection Primitives in ACE

image

14.1.1 Recursive Mutexes

If a thread acquires the same mutex twice, what do you think will happen: Will the thread block or not? If you think it won't, you are wrong. In most cases, the thread will block, deadlocking on itself! The reason is that in general, mutexes are unaware of the identity of the acquiring thread, thus making it impossible for the mutex to make the smart decision of not blocking.

If you are using a mutex from several intercalling methods, it sometimes becomes difficult to track whether a thread already has acquired a particular mutex. In the following code, the logger will deadlock on itself once it receives a critical log message; the guard has already acquired the mutex in the log() method and will then try to reacquire it from within logCritical():


typedef ACE_Thread_Mutex MUTEX;
class Logger
{
public:
  void log (LogMessage *msg)
  {
    ACE_GUARD (MUTEX, mon, mutex_);
    if (msg->priority () == LogMessage::CRITICAL)
      logCritical (msg);
  }

  void logCritical (LogMessage *msg)
  {
    // Acquires the same mutex as log()!
    ACE_GUARD(MUTEX, mon, mutex_);
  }

private:
  MUTEX mutex_;
};

static Logger logger;

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  CriticalLogMessage cm;
  logger.log(&cm);  // Will cause deadlock.
  return 0;
}

This situation can be avoided by using a recursive mutex. Recursive mutexes, unlike regular mutexes, allow the same thread to acquire the mutex multiple times without blocking on itself. To achieve this, we simply modify the initial typedef:


typedef ACE_Recursive_Thread_Mutex MUTEX;

The ACE_Recursive_Thread_Mutex has the same API as regular mutexes, making them readily substitutable wherever a regular mutex is used.

14.1.2 Readers/Writer Locks

A readers/writer lock allows many threads to hold the lock simultaneously for reading, but only one thread can hold it for writing. This can provide efficiency gains, especially if the data being protected is frequently read by multiple threads but is infrequently written or is written to by only a few threads. In most cases, readers/writer locks are slower than mutexes; therefore, it important for you to apply them only when contention on reads is much higher than on writes.

ACE includes readers/writer locks in the form of the ACE_RW_Mutex and ACE_RW_Thread_Mutex classes. To acquire these mutexes for reading, you need to call acquire_read(); to acquire the mutex for writing, you must call acquire_write().

Let's say that our home network includes a network discovery agent that keeps track of all the devices currently connected on the home network. The agent keeps a list of all devices that are currently on the network, and clients can ask it whether a current device is currently present. Let's also say, for this example's sake, that this list could be very long and that devices are added or removed from the network infrequently. This gives us a good opportunity to apply a readers/writer lock, as the list is traversed, or read, much more than it is modified. We use the guard macros we saw in Chapter 12 to ensure that the acquire_read()/ release() and acquire_write()/release() are called in pairs; that is, we don't forget to call release() after we are done with the mutex:


class HA_DiscoveryAgent
{
public:
  void add_device (Device *device)
  {
    ACE_WRITE_GUARD (ACE_RW_Thread_Mutex, mon, rwmutex_);
    list_add_item_i (device);
  }
  void remove_device (Device *device)
  {
    ACE_READ_GUARD (ACE_RW_Thread_Mutex, mon, rwmutex_);
    list_remove_item_i(device);
  }

  int contains_device (Device *device)
  {
    ACE_READ_GUARD_RETURN
      (ACE_RW_Thread_Mutex, mon, rwmutex_, -1);
    return list_contains_item_i (device);
  }

private:
  void list_add_item_i (Device * device);
  int list_contains_item_i (Device * device);
  void list_remove_item_i (Device* device);

private:
  DeviceList deviceList_;
  ACE_RW_Thread_Mutex rwmutex_;
};

14.1.3 Atomic Operation Wrapper

On most machine architectures, changes to basic types are atomic; that is, an increment of an integer variable does not require the use of synchronization primitives. However, on most multiprocessors, this is not true and is dependent on the memory ordering properties of the machine. If the machine memory is strongly ordered, modifications made to memory on one processor are immediately visible to the other processors and so synchronization is not required around global variables; otherwise, it is.

To help achieve transparent synchronization around basic types, ACE provides the ACE_Atomic_Op template wrapper. This class overloads all basic operations and ensures that a synchronization guard is used before the operation is performed.

To illustrate the use of ACE_Atomic_Op, we will solve the classic producer/consumer problem using busy waiting. To implement our solution, we create a producer and a consumer task. Both tasks share a common buffer: The items that are produced are put into the buffer by the producer, and the consumer in turn picks up these items. To ensure that the producer doesn't overflow the buffer and that the consumer doesn't underflow, we use in and out counters. Whenever the producer adds an item, it increments the in counter by 1; similarly, when the client consumes an item, it increments the out counter by 1. The producer can produce only if (in – out) is not equal to the buffer size—the buffer isn't full—otherwise, it must wait. The client can consume only if (in – out) is not zero—the buffer isn't empty. Let's also assume that the program is running on a multiprocessor on which memory is not strongly ordered. This means that if we use regular integers as our in and out counters, the increments will not be consistent; the producer may increment the in count, but the client will not see the increment, and vice versa. To ensure consistency, we first need to create a safe unsigned int type that we can use for the counters in our example:


typedef ACE_Atomic_Op<ACE_Thread_Mutex, unsigned int> SafeUInt;

The data passed between consumer and producer is an integer that, let's imagine for the example's sake, must be safely incremented to be produced correctly. We create a safe int type as:


typedef ACE_Atomic_Op<ACE_Thread_Mutex, int> SafeInt;

The producer and consumer tasks share the common production buffer and the in and out counter types, which we create on the main stack and pass by reference to each of the tasks. The producer uses a SafeInt, itemNo as the production variable, which it increments on each production and then adds to the buffer. We add a termination condition to each one of the threads such that after producing and consuming a fixed number of items, both threads terminate gracefully. To read the actual value stored in all the ACE_Atomic_Op objects, we use the value() accesor throughout the program:


class Producer : public ACE_Task_Base
{
public:
  Producer (int *buf, SafeUInt &in, SafeUInt &out)
    : buf_(buf), in_(in), out_(out)
  { }

  int svc (void)
  {
    SafeInt itemNo = 0;
    while (1)
      {
        // Busy wait.
        do
          { }
        while (in_.value () - out_.value () == Q_SIZE);

        itemNo++;
        buf_[in_.value () % Q_SIZE] = itemNo.value ();
        in_++;

        ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Produced %d "),
                    itemNo.value ()));

        if (check_termination (itemNo.value ()))
          break;
      }

    return 0;
  }

  int check_termination (int item)
  {
    return (item == MAX_PROD);
  }

private:
  int * buf_;
  SafeUInt& in_;
  SafeUInt& out_;
};

class Consumer : public ACE_Task_Base
{
public:
  Consumer (int *buf, SafeUInt &in, SafeUInt& out)
    : buf_(buf), in_(in), out_(out)
  { }

  int svc (void)
  {
    while (1)
      {
        int item;

        // Busy wait.
        do
          { }
        while (in_.value () - out_.value () == 0);

        item = buf_[out_.value () % Q_SIZE];
        out_++;

        ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Consumed %d "),
                    item));

        if (check_termination (item))
          break;
      }

    return 0;
  }

  int check_termination (int item)
  {
    return (item == MAX_PROD);
  }

private:
  int * buf_;
  SafeUInt& in_;
  SafeUInt& out_;
};

The threads themselves are created as usual on the main stack:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
  int shared_buf[Q_SIZE];
  SafeUInt in = 0;
  SafeUInt out = 0;

  Producer producer (shared_buf, in, out);
  Consumer consumer (shared_buf, in, out);

  producer.activate();
  consumer.activate();
  producer.wait();
  consumer.wait();

  return 0;
}

14.1.4 Token Management

So far, we have been using synchronization and protection primitives on simplistic resources. In many cases, however, global resources are more complex structures, such as records in a memory-resident table or tree. Multiple threads act on individual records on the table by first obtaining the records, making sure that they have exclusive access to the record, making the desired modification, and then releasing the record. A simplistic solution to this problem is to create a separate lock structure that maintains a lock for each record that is to be managed as a unit. However, many complex issues come into play, such as how the lock structure is managed efficiently or how one avoids or detects deadlock situations, and what happens if the record table also needs to be accessed by remote threads.

ACE provides a framework solution that solves all these problems: the Token framework. Each record has associated with it an ACE_Token lock that internally maintains a strict FIFO ordering of threads that are waiting on the token. Before using a record, you acquire the token; after the modification, you release the token to the next waiter in line. Furthermore, a token manager is used to handle the tokens themselves, managing the creation, reference counting, and deletion of tokens.

The following example illustrates the use of tokens to provide record-level locking on a fixed-size table of device records that are maintained for our automated household. The records are kept within an HA_Device_Repository that supports update operations. We will assume that a device can be updated only by one client at any particular instant of time. To enforce this invariant, we create a token, an ACE_Local_Mutex, for every device that must first be acquired by a thread before it can perform an update. Once the token has been acquired, the thread can go into the device table, obtain the appropriate device, and update it. After completing the modification, the token is released, and the next thread in line can obtain the token:


class HA_Device_Repository
{
public:

  enum { N_DEVICES = 100 };

  HA_Device_Repository ()
  {
    for (int i = 0; i < N_DEVICES; i++)
      tokens_[i] = new ACE_Local_Mutex (0, 0, 1);
  }
  ~HA_Device_Repository ()
  {
    for (int i = 0; i < N_DEVICES; i++)
      delete tokens_[i];
  }

  int update_device (int device_id, char *commands)
  {
    this->tokens_[device_id]->acquire ();

    Device *curr_device = this->devices_[device_id];
    internal_do (curr_device);

    this->tokens_[device_id]->release ();

    return 0;
  }

  void internal_do (Device *device);

private:
  Device *devices_[N_DEVICES];
  ACE_Local_Mutex *tokens_[N_DEVICES];
  unsigned int seed_;
};

To illustrate the device repository, we modify our HA_CommandHandler task so that it invokes update_device() on the repository, using multiple threads:


class HA_CommandHandler : public ACE_Task_Base
{
public:
  enum { N_THREADS = 5 };

  HA_CommandHandler (HA_Device_Repository &rep) : rep_(rep)
  { }

  int svc (void)
  {
    for (int i = 0; i < HA_Device_Repository::N_DEVICES; i++)
      rep_.update_device (i, "");
    return 0;
  }

private:
  HA_Device_Repository &rep_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  HA_Device_Repository rep;
  HA_CommandHandler handler (rep);
  handler.activate (THR_NEW_LWP | THR_JOINABLE,
                    HA_CommandHandler::N_THREADS);
  handler.wait ();
  return 0;
}

One of the nice features that the ACE Token framework provides is deadlock detection, which comes in handy to make sure that the algorithms you are designing for your system do not cause deadlock. To illustrate a deadlock situation and the ACE deadlock detection features, we create two tasks, each running a single thread. Both tasks share two resources—resource1 and resource2—that are protected by named tokens of the same name. Unlike the previous example, in which we precreated and shared unnamed tokens, this example uses the framework to manage the tokens for us by name. This allows both threads to create an ACE_Local_Mutex named resource1, secure in the knowledge that they are indeed sharing a single token. The mutex1 object acts as a proxy to a shared token for resource1. The shared token itself is reference counted; when both mutex1 objects for threads one and two go out of scope, the token is deleted:


class ThreadOne : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_Local_Mutex mutex1 ("resource1",
                            0, // Deadlock detection enabled.
                            1);// Debugging enabled.
    mutex1.acquire ();
    ACE_OS::sleep (2);
    ACE_Local_Mutex mutex2 ("resource2", 0, 1);
    mutex2.acquire ();
    return 0;
  }
};

class ThreadTwo : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_Local_Mutex mutex2 ("resource2",
                            0, // Deadlock detection enabled.
                            1);// Debugging enabled.
    mutex2.acquire ();
    ACE_OS::sleep (2);
    ACE_Local_Mutex mutex1 ("resource1",
                            0, // Deadlock detection enabled.
                            1);// Debugging enabled.
    mutex1.acquire ();
    return 0;
  }
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ThreadOne t1;
  ThreadTwo t2;

  t1.activate ();
  ACE_OS::sleep (1);
  t2.activate ();
  t1.wait ();
  t2.wait ();

  return 0;
}

Resource acquisitions are forced to occur in the following order:

• Thread one acquires a lock on resource1

• Thread two acquires a lock on resource2

• Thread one blocks trying to acquire resource2

• Thread two blocks trying to acquire resource1

This order eventually leads to a deadlock situation in which thread one is waiting for resource2 and can't get it because thread two has it and thread two wants resource1 but can't get it because thread one has it. Both threads end up waiting for each other forever.

Note that when we created the ACE_Local_Mutex, we specified that we did not want deadlock detection to be ignored and wanted debugging on. This causes ACE to detect the deadlock; when it occurs, the acquire() call on the local mutex fails, setting errno to EDEADLOCK. Enabling debugging causes the following output to be displayed when the program is run:


(3224) acquired resource1
(1192) acquired resource2
(3224) waiting for resource2, owner is /USYYID/612/1192, total waiters == 2
(1192) Deadlock detected.
/USYYID/612/1192 owns resource2 and is waiting for resource1.
/USYYID/612/3224 owns resource1 and is waiting for resource2.

Note that although it can detect deadlock, the Token framework cannot prevent it. An appropriate prevention scheme must be devised by the application programmer; this could, for example, include a try and back-off strategy. That is, if deadlock is detected, back off releasing all your own locks and start acquiring them all over again.

14.2 Thread Synchronization

Synchronization is a process by which you can control the order in which threads execute to complete a task. We have already seen several examples of this; we often try to order our code execution such that the main thread doesn't exit before the other threads are done. In fact, the protection code in the previous section also has a similar flavor to it; we try to control the order of the threads as they access shared resources. However, we believe that synchronization and protection are sufficiently different to warrant separate sections.

In many instances, you want to make sure that one thread executes before the other or some other specific ordering is needed. For example, you might want a background thread to execute if a certain condition is true, or you may want one thread to signal another thread that it is time for it to go. ACE provides a number of primitives that are specifically designed for these purposes. We list them in Table 14.2.

Table 14.2. ACE Synchronization Primitives

image

14.2.1 Using Semaphores

A semaphore is a non-negative integer count that is used to coordinate access among multiple resources. Acquiring a semaphore causes the count to decrement, and releasing a semaphore causes the count to increment. If the count reaches 0—no resources left—and a thread tries to acquire the semaphore, it will block until the semaphore count is incremented to a value that is greater than 0. This happens when another thread releases on the semaphore. A semaphore count will never be negative. When using a semaphore, you initialize the semaphore count to a non-negative value that represents the number of resources you have.

Mutexes assume that the thread that acquires the mutex will also be the one that releases it. Semaphores, on the other hand, are usually acquired by one thread but released by another. It is this unique characteristic that allows one to use semaphores so effectively.

Semaphores are probably the most versatile synchronization primitive provided by ACE. In fact, a binary semaphore can be used in place of a mutex, and a regular semaphore can be used in most places where a condition variable is being used.

Once again, we implement a solution to the producer/consumer problem, this time using counting semaphores. We use the built-in ACE_Message_Queue held by the consumer as the shared data buffer between the producer and consumer. Using semaphores, we control the maximum number of messages the message queue can have at any particular instant. In other words, we implement a high-water-mark mechanism; the message queue already has this functionality built in; we essentially reproduce it in an inefficient way for the sake of the example. To make things interesting, the consumer task has multiple threads.

The Producer produces ACE_Message_Blocks that it enqueues on the consumer task's message queue. Each block contains an integer identifier as the only payload. When the producer is done producing, it sends a hang-up message to the consumer, making it shut down.

Before the producer can produce any items, it must acquire the producer semaphore, psema_, which represents the maximum capacity of the consumer's message queue, or the high-water mark. The initial count value for psema_ is set to the high water mark to begin with. Each time psema_ is acquired, this number is atomically decremented. Finally, when this count becomes 0, the producer cannot enqueue more elements on the consumer's queue:


class Producer : public ACE_Task_Base
{
public:
  enum { MAX_PROD = 128 };

  Producer (ACE_Semaphore& psema, ACE_Semaphore& csema,
            Consumer &consumer)
      : psema_(psema), csema_(csema), consumer_(consumer)
  { }

  int svc (void)
  {
    for (int i = 0; i <= MAX_PROD; i++)
      produce_item (i);
    hang_up ();
    return 0;
  }

  void produce_item (int item)
  {
    psema_.acquire ();
    ACE_Message_Block *mb
      = new ACE_Message_Block (sizeof (int),
                               ACE_Message_Block::MB_DATA);
    ACE_OS::memcpy (mb->wr_ptr (), &item, sizeof item);
    mb->wr_ptr (sizeof (int));
    this->consumer_.putq (mb);

    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Produced %d "), item));
    csema_.release();
  }

  void hang_up ()
  {
    psema_.acquire ();
    ACE_Message_Block *mb =
      new ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP);
    this->consumer_.putq (mb);
    csema_.release ();
  }

private:
  ACE_Semaphore& psema_;
  ACE_Semaphore& csema_;
  Consumer& consumer_;
};

The csema_ semaphore, on the other hand, is initialized to 0 in the beginning; its value is incremented, by calling csema_.release(), by the producer every time it produces a new element. If the consumer threads are blocked on an acquire() of the csema_ variable, one thread will wake up each time the producer calls release(), causing the woken thread to consume the new element.

Once a consumer thread unblocks, it consumes the element from the queue and then calls release() on psema_, the producer semaphore, as the consumption has freed up a space in the shared message queue. This increments the psema_ count by 1, allowing the producer to continue and use up the free space in the queue:


class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
  enum { N_THREADS = 5 };

  Consumer (ACE_Semaphore& psema, ACE_Semaphore& csema)
    : psema_(psema), csema_(csema), exit_condition_(0)
  { }

  int svc (void)
  {
    while (!is_closed ())
      consume_item ();
    return 0;
  }

  void consume_item ()
  {
    csema_.acquire ();
    if (!is_closed ())
      {
        ACE_Message_Block *mb;
        this->getq (mb);
        if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
          {
            shutdown ();
            mb->release ();
            return;
          }
        else
          {
            ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT ("(%t) Consumed %d "),
                        *((int*)mb->rd_ptr ())));
            mb->release();
          }
        psema_.release ();
      }
  }

  void shutdown (void)
  {
    exit_condition_ = 1;
    this->msg_queue ()->deactivate ();
    csema_.release (N_THREADS);
  }

  int is_closed (void)
  {
    return exit_condition_;
  }

private:
  ACE_Semaphore& psema_;
  ACE_Semaphore& csema_;
  int exit_condition_;
};

Shutdown of the consumer task is a little complicated because of the multiple threads that are running in it. Because the producer sends only a single hang-up message to the consumer task, only one thread will receive the hangup; the others are blocked, either waiting for messages on the queue or on the consumer semaphore. Therefore, the consumer thread that receives the hangup must mark the state of the task as closed and then wake up all the other threads so that they notice the closed condition and exit. The consumer's shutdown() method does this by first setting the exit_condition_ to true and then waking up all the threads on the message queue by calling deactivate() on it and then decrementing the semaphore count by the number of consumer threads, thereby releasing all the threads waiting on the semaphore:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_Semaphore psem (5);
  ACE_Semaphore csem (0);

  Consumer consumer (psem, csem);
  Producer producer (psem, csem, consumer);

  producer.activate ();
  consumer.activate (THR_NEW_LWP | THR_JOINABLE,
                     Consumer::N_THREADS);

  producer.wait ();
  consumer.wait ();

  return 0;
}

The code here is pretty much boilerplate, except for the initialization of the two semaphores psem and csem. Once again, note that psem is initialized to the maximum number of allowable elements on the queue, five, indicating that the producer can produce until the queue is full. On the other hand, csem is initialized to 0, indicating that the consumer can't consume at all until the producer calls release on csem.

14.2.2 Using Barriers

Barriers have a role very similar to their name. A group of threads can use a barrier to collectively synchronize with one another. In essence, each thread calls wait() on the barrier when it has reached some well-known state and then blocks, waiting for all other participating threads to call wait(), indicating that they too have reached the mutual state. Once they have reached the barrier, all the threads unblock and continue together.

The ACE barrier component is ACE_Barrier. The constructor of this class takes as its first argument a count of the number of threads that are synchronizing with the barrier. The following example illustrates how barriers can be used to ensure that threads in our home automation command handler task start up and shut down together. To achieve this, we start by creating two barriers: a startup_barrier and a shutdown_barrier. Each barrier is passed a count of the number of threads running in the handler task. The barriers are then passed to the handler by reference:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_Barrier startup_barrier (HA_CommandHandler::N_THREADS);
  ACE_Barrier shutdown_barrier (HA_CommandHandler::N_THREADS);

  HA_CommandHandler handler (startup_barrier, shutdown_barrier);
  handler.activate (THR_NEW_LWP | THR_JOINABLE,
                    HA_CommandHandler::N_THREADS);
  handler.wait ();
  return 0;
}

When a new handler thread is created, it enters the svc() method and initializes itself by calling initialize_handler(), after which it calls wait() on the start-up barrier. This blocks the thread on the barrier until all the other handler threads initialize themselves and also call wait() on the start-up barrier. After completing start-up, the handler begins handling command requests by calling handle_command_requests(). When the handler receives the command to shut down, handle_command_requests() returns –1, and the threads block on the shutdown barrier. Once all threads reach this barrier, they all exit together:


class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
public:
  enum { N_THREADS = 5 };

  HA_CommandHandler (ACE_Barrier& startup_barrier,
                     ACE_Barrier &shutdown_barrier)
    : startup_barrier_(startup_barrier),
      shutdown_barrier_(shutdown_barrier)
    { }
  void initialize_handler (void);
  int handle_command_requests (void);

  int svc (void)
  {
    initialize_handler ();
    startup_barrier_.wait ();
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t: %D) Started ")));

    while (handle_command_requests () > 0)
      ;

    shutdown_barrier_.wait ();
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t: %D) Ended ")));

    return 0;
  }

private:
  ACE_Barrier& startup_barrier_;
  ACE_Barrier& shutdown_barrier_;
};

For illustrative purposes, we set up the initialize_handler() method and handle_command_requests() method to sleep for random time periods, thus ensuring that each handler thread takes a different time period to initialize and to shut down. The barriers will ensure that they all start up and exit at the same time; we display the start-up and shut-down times for each thread by using the %D current timestamp format specifier for the ACE_DEBUG macro. The resultant output shows that start-up and shut-down times are indeed in synch for all threads:


(764: 08/12/2001 23.40.11.214000) Started
(720: 08/12/2001 23.40.11.214000) Started
(1108: 08/12/2001 23.40.11.214000) Started
(1168: 08/12/2001 23.40.11.214000) Started
(1080: 08/12/2001 23.40.11.214000) Started
(1168: 08/12/2001 23.40.11.334000) Ended
(720: 08/12/2001 23.40.11.334000) Ended
(764: 08/12/2001 23.40.11.334000) Ended
(1108: 08/12/2001 23.40.11.334000) Ended
(1080: 08/12/2001 23.40.11.334000) Ended

14.3 Thread-Specific Storage

When you create a thread, all you create is a thread stack, a signal mask, and a task control block for the new thread. The thread carries no other state information on creation. Nonetheless, it is convenient to be able to store state information that is specific to a thread: thread-specific storage (TSS), or thread local storage.

One classic example of the use of TSS is with the application global errno. The global errno is used to indicate the most recent error that has occurred within an application. When multithreaded programs first appeared on UNIX platforms, errno posed a problem. Different errors can occur within different threads of the application, so if errno is global, which error would it hold? Putting errno in thread-specific storage solves this problem. Logically, the variable is global but is kept in TSS.

Thread-specific storage should be used for all objects that you consider to be part and parcel of a thread. ACE uses TSS internally to store a thread-specific output stream for the purposes of logging. By doing this, no locks are required on any of the logging streams, as only the owning thread can access the TSS stream. This helps keeps the code efficient and lightweight.

The ACE_Thread class provides access to the low-level OS TSS methods, but in most cases, you can avoid these tedious APIs by using the ACE_TSS class template. This class is very simple to use. You simply pass it the data you want to be stored in TSS as its template parameter and then use the operator->() method to access the data when you need it. The operator->() creates and stores the data in TSS when called the first time. The destructor for ACE_TSS ensures that the TSS data is properly removed and destroyed.

One useful application of TSS is the addition of a context object that can hold information specific to the current client that is using the thread. For example, if you were to use a thread-per-connection model, you could keep connection-specific information within TSS, where you can get to it easily and efficiently. At least one database server we know keeps transactional context within TSS for each connection it hands out.

We can use the same idea to improve our home automation command handler. The handler keeps a generic-client context object in TSS. The context can then hold any arbitrary, named tidbit of information, using an internal attribute map:


// Client-specific context information.
class ClientContext
{
public:
  void *get_attribute (const char *name);
  void set_attribute (const char *name, void *value);

private:
  Map attributeMap_;
};

To store this object in TSS, all you need to do is wrap the object within the ACE_TSS template. The first access causes an instance of the type ClassContext to be created on the heap and stored within thread-specific storage. For illustrative purposes, we store the current thread ID in the context, only to obtain it at a later point and display it:


class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH>
{
public:
  virtual int svc (void)
  {
    ACE_thread_t tid = this->thr_mgr ()->thr_self ();
    // Set our identifier in TSS.
    this->tss_ctx_->set_attribute ("thread_id", &tid);

    while (handle_requests () > 0)
      ;

    return 0;
  }

  int handle_requests (void)
  {
    ACE_thread_t *tid =
      (ACE_thread_t*)this->tss_ctx_->get_attribute ("thread_id");
    ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) TSS TID: %d "),
                *tid));

    // do work.
    return -1;
  }

private:
  ACE_TSS<ClientContext> tss_ctx_;
};

14.4 Summary

In this chapter, we covered some of the more advanced synchronization and protection primitives that are available from the ACE toolkit. We discussed recursive mutex locks, using the ACE_Recursive_Mutex and ACE_Recursive_Thread_Mutex classes, readers/writer locks with ACE_RW_Mutex and ACE_RW_Thread_Mutex, introduced the Token framework with ACE_Local_Mutex, and talked about enforcing atomic arithmetic operations on multiprocessors with the ACE_Atomic_Op wrapper as new protection primitives. We also introduced several new synchronization primitives, including counting semaphores with ACE_Semaphore and barriers with ACE_Barrier. Finally, we introduced thread-specific storage as an efficient means to get around protection with thread-specific data and the ACE_TSS wrapper.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.143.239