Chapter 16. Thread Pools

Most network servers are designed to handle multiple client requests simultaneously. As we have seen, there are multiple ways to achieve this, including using reactive event handling, multiple processes, or multiple threads in our servers. When building a multithreaded server, many design options are available, including

• Spawning a new thread for each request

• Spawning a new thread for each connection/session

• Prespawning a managed pool of threads, or creating a thread pool

In this chapter, we explore thread pools in some detail. We start by defining what we mean by a thread pool and what advantages it provides over some of the other approaches to building multithreaded servers. We then go into some detail about various types of thread pools and their performance characteristics, illustrating a few types with example pools that are built using the ACE components. Finally, we look at two ACE_Reactor implementations that can use thread pools for concurrent event handling.

16.1 Understanding Thread Pools

In the thread pool model, we do not create a new thread for each session or request and then destroy it; rather, we prespawn all the threads we are going to use and keep them around in a pool. This bounds the cost of the resources that the server is going to use, so you as a developer always know how many threads are running in the server.

Contrast this with the thread-per-request model, in which a new thread is created for each request. If it receives a large spurt of requests in a short period of time, the server will spawn a large number of threads to handle the load. This will cause degradation in service for all requests and may cause resource allocation failures as the load increases. In the thread pool model, when a request arrives, a thread is chosen from the queue to handle the request; if there are no threads in the pool when the request arrives, it is enqueued until one of the worker threads returns to the pool.

The thread pool model has several variants, each with different performance characteristics:

Half-sync/half-async model. In this model, a single listener thread asynchronously receives requests and buffers them in a queue. A separate set of worker threads synchronously processes the requests.

Leader/followers model. In this model, a single thread is the leader and the rest are followers in the thread pool. When a request arrives, the leader picks it up, selects one of the followers as the new leader, and then continues to process the request. Thus in this case, the thread that receives the request is also the one that handles it.

16.2 Half-Sync/Half-Async Model

This model breaks the thread pool into three separate layers:

  1. The asynchronous layer, which receives the asynchronous requests
  2. The queueing layer, which buffers the requests
  3. The synchronous layer, which contains several threads of control blocked on the queueing layer

When the queueing layer indicates that there is new data, the synchronous layer handles the request synchronously in a separate thread of control. This model has the following advantages.

• The queuing layer can help handle bursty clients; if there are no threads to handle a request, they are simply buffered in the queueing layer.

The synchronous layer is simple and independent of any asynchronous processing details. Each synchronous thread blocks on the queueing layer, waiting for a request.

This model also has disadvantages.

• Because a thread switch occurs at the queueing layer, we must incur synchronization and context switch overhead. Furthermore, on a multiprocessor machine, we may experience data copying and cache coherency overhead.

• We cannot keep any request information on the stack or in thread-specific storage, as the request is processed in a different worker thread.

Let's go through a simple example implementation of this model. In this example, the asynchronous layer is implemented as an ACE_Task subclass, Manager, which receives requests on its underlying message queue. Each worker thread is implemented by the Worker class, which is also an ACE_Task derivative. When it receives a request, the Manager picks a Worker object from the worker thread pool and enqueues the request on the Worker object's underlying ACE_Message_Queue. This queue acts as the queueing layer between the asynchronous Manager and the synchronous Worker class.

First, let's look at the Manager task:


class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

  Manager ()
    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
  {
    ACE_TRACE (ACE_TEXT ("Manager::Manager"));
  }

  int svc (void)
  {
    ACE_TRACE (ACE_TEXT ("Manager::svc"));

    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));

    // Create pool.
    create_worker_pool ();

    while (!done ())
      {
        ACE_Message_Block *mb = NULL;
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);

        // Get a message request.
        if (this->getq (mb, &tv) < 0)
          {
            shut_down ();
            break;
          }

        // Choose a worker.
        Worker *worker;
        {
          ACE_GUARD_RETURN (ACE_Thread_Mutex,
                            worker_mon, this->workers_lock_, -1);

          while (this->workers_.is_empty ())
            workers_cond_.wait ();

          this->workers_.dequeue_head (worker);
        }

        // Ask the worker to do the job.
        worker->putq (mb);
      }

    return 0;
  }

  int shut_down (void);

  ACE_thread_t thread_id (Worker *worker);

  virtual int return_to_work (Worker *worker)
  {
    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                      worker_mon, this->workers_lock_, -1);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Worker %d returning to work. "),
                worker->thr_mgr ()->thr_self ()));
    this->workers_.enqueue_tail (worker);
    this->workers_cond_.signal ();

    return 0;
  }


private:
  int create_worker_pool (void)
  {
    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                      worker_mon,
                      this->workers_lock_,
                      -1);
    for (int i = 0; i < POOL_SIZE; i++)
      {
        Worker *worker;
        ACE_NEW_RETURN (worker, Worker (this), -1);
        this->workers_.enqueue_tail (worker);
        worker->activate ();
      }

    return 0;
  }

  int done (void);

private:
  int shutdown_;
  ACE_Thread_Mutex workers_lock_;
  ACE_Condition<ACE_Thread_Mutex> workers_cond_;
  ACE_Unbounded_Queue<Worker* > workers_;
};

First, note that the Manager has an ACE_Unbounded_Queue of Worker objects. This represents the worker thread pool, which is protected by the workers_lock_ mutex. When it starts in its svc() method, the Manager thread first creates the underlying worker thread pool and then blocks on its underlying message queue waiting for requests.

When a request does arrive, the Manager dequeues the request and then dequeues the first worker off the worker pool and enqueues the request onto its message queue. Note that if no workers are available, perhaps because they are all busy processing messages, we block on the workers_cond_ condition variable, waiting for a thread to return to work. When a thread does return to work, it enqueues itself back on the worker queue and notifies the Manager by signaling the condition variable. The Manager thread then wakes up and hands the new request off to the worker thread that has just returned to the pool:



class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
  Worker (IManager *manager) : manager_(manager)
  { }

  virtual int svc (void)
  {
    thread_id_ = ACE_Thread::self ();
    while (1)
      {
        ACE_Message_Block *mb = 0;
        if (this->getq (mb) == -1)
          ACE_ERROR_BREAK
            ((LM_ERROR, ACE_TEXT ("%p "), ACE_TEXT ("getq")));
        if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
          {
            ACE_DEBUG ((LM_INFO,
                        ACE_TEXT ("(%t) Shutting down ")));
            mb->release ();
            break;
          }

        // Process the message.
        process_message (mb);

        // Return to work.
        this->manager_->return_to_work (this);
      }

    return 0;
  }

The worker thread sits in an infinite loop waiting for a request to arrive on its queue. When a request does arrive, the worker thread picks it up and then processes it. Once the processing completes, the worker thread notifies its Manager, where it is once again enqueued in the worker thread pool.

16.2.1 Taking Advantage of ACE_Task

The previous example illustrated how to build a half-sync/half-async thread pool from available ACE components. Often, however, you can take advantage of the multithreaded queueing capability of ACE_Task to considerably simplify the implementation of such a thread pool. Making use of this capability yields the following revised Manager class:



class Manager : public ACE_Task<ACE_MT_SYNCH>
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

  Manager () : shutdown_(0)
  {
    ACE_TRACE (ACE_TEXT ("Manager::Manager"));
  }

  int svc (void)
  {
    ACE_TRACE (ACE_TEXT ("Manager::svc"));

    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));

    // Create pool.
    Workers pool;
    pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE);

    while (!done ())
      {
        ACE_Message_Block *mb = 0;
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);

        // Get a message request.
        if (this->getq (mb, &tv) < 0)
          {
            pool.msg_queue ()->deactivate ();
            pool.wait ();
          }

        // Ask the worker pool to do the job.
        pool.putq (mb);
      }

    return 0;
  }

private:
  int done (void);

  int shutdown_;
};

That's it. All of the locking and condition variable logic is contained in the ACE_Message_Queue class. We also illustrated another way to notify the threads in the worker pool that it's time to shut down: we used the deactivate() method to shut the ACE_Message_Queue down.

The revised Workers class shows how to respond to the queue shutting down, as well as the simpler logic resulting from fuller use of the ACE_Message_Queue class's capabilities:


class Workers : public ACE_Task<ACE_MT_SYNCH>
{
public:
  Workers ()
  { }

  virtual int svc (void)
  {
    while (1)
      {
        ACE_Message_Block *mb = 0;
        if (this->getq (mb) == -1)
          {
            ACE_DEBUG ((LM_INFO,
                        ACE_TEXT ("(%t) Shutting down ")));
            break;
          }

        // Process the message.
        process_message (mb);
      }

    return 0;
  }

Remember that all the threads in the pool are executing in the same Workers object. The ACE_Message_Queue class contains all the logic necessary to safely and fairly dequeue message blocks for processing.

16.2.2 Using an Activation Queue and Futures

In the previous example, we kept things simple by using simple message blocks as the unit of work. We also assumed that the client thread that enqueues the work on the Manager's queue did not require any results. Although both of these conditions are often true, sometimes a more object-oriented solution is more appropriate: using method requests as the unit of work and futures as results.

In this next example, we add these features to our previous example. Let's begin with a new Method Request object that expects to return a string as the result of the operation:


class LongWork : public ACE_Method_Request
{
public:
  virtual int call (void)
  {
    ACE_TRACE (ACE_TEXT ("LongWork::call"));
    ACE_DEBUG
      ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task ")));
    ACE_OS::sleep (1);

    char buf[1024];
    ACE_OS::strcpy (buf, ACE_TEXT ("Completed assigned task "));
    ACE_CString *msg;
    ACE_NEW_RETURN
      (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);
    result_.set (msg);
    return 0;
  }

  ACE_Future<ACE_CString*> &future (void)
  {
    ACE_TRACE (ACE_TEXT ("LongWork::future"));
    return result_;
  }

  void attach (CompletionCallBack *cb)
  {
    result_.attach (cb);
  }

private:
  ACE_Future<ACE_CString*> result_;
};

Next, let's add a Future Observer that will automatically get called back when the result from the LongWork operation is available:



class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>
{
public:
  virtual void update (const ACE_Future<ACE_CString*> & future)
  {
    ACE_CString *result;

    // Block for the result.
    ((ACE_Future<ACE_CString*>)future).get (result);
    ACE_DEBUG ((LM_INFO, ACE_TEXT("%C "), result->c_str ()));
    delete result;
  }
};

The Manager and Worker both need to be modified so that they use an ACE_Activation_Queue instead of an ACE_Message_Queue. Also, as it will not be using the underlying message queue in ACE_Task, Manager will inherit from the lighter ACE_Task_Base class, which does not include the message queue. Worker, however, illustrates how to use the ACE_Task message queue in an ACE_Activation_Queue:


class Worker: public ACE_Task<ACE_MT_SYNCH>
{
public:
  Worker (IManager *manager)
    : manager_(manager), queue_ (msg_queue ())
  { }

  int perform (ACE_Method_Request *req)
  {
    ACE_TRACE (ACE_TEXT ("Worker::perform"));
    return this->queue_.enqueue (req);
  }

  virtual int svc (void)
  {
    thread_id_ = ACE_Thread::self ();
    while (1)
      {
        ACE_Method_Request *request = this->queue_.dequeue();
        if (request == 0)
          return -1;

        // Invoke the request
        int result = request->call ();
        if (result == -1)
          break;

        // Return to work.
        this->manager_->return_to_work (this);
      }

    return 0;
  }

  ACE_thread_t thread_id (void);

private:
  IManager *manager_;
  ACE_thread_t thread_id_;
  ACE_Activation_Queue queue_;
};

The Worker class has not changed much, except that now, instead of dequeueing an ACE_Message_Block, the svc() method dequeues and then immediately invokes the Method Request object. Once the method request has returned, the worker thread returns to the worker thread pool that is held by the Manager:


class Manager : public ACE_Task_Base, private IManager
{
public:
  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

  Manager ()
    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
  {
    ACE_TRACE (ACE_TEXT ("Manager::TP"));
  }

  int perform (ACE_Method_Request *req)
  {
    ACE_TRACE (ACE_TEXT ("Manager::perform"));
    return this->queue_.enqueue (req);
  }

  int svc (void)
  {
    ACE_TRACE (ACE_TEXT ("Manager::svc"));

    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));

    // Create pool when you get in the first time.
    create_worker_pool ();

    while (!done ())
      {
        ACE_Time_Value tv ((long)MAX_TIMEOUT);
        tv += ACE_OS::time (0);

        // Get the next message
        ACE_Method_Request *request = this->queue_.dequeue (&tv);
        if (request == 0)
          {
            shut_down ();
            break;
          }

        // Choose a worker.
        Worker *worker = choose_worker ();

        // Ask the worker to do the job.
        worker->perform (request);
      }

    return 0;
  }

  int shut_down (void);

  virtual int return_to_work (Worker *worker)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
    ACE_DEBUG
      ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work. ")));
    this->workers_.enqueue_tail (worker);
    this->workers_cond_.signal ();

    return 0;
  }

private:
  Worker *choose_worker (void)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)

      while (this->workers_.is_empty ())
        workers_cond_.wait ();

    Worker *worker;
    this->workers_.dequeue_head (worker);
    return worker;
  }

  int create_worker_pool (void)
  {
    ACE_GUARD_RETURN
      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
    for (int i = 0; i < POOL_SIZE; i++)
      {
        Worker *worker;
        ACE_NEW_RETURN (worker, Worker (this), -1);
        this->workers_.enqueue_tail (worker);
        worker->activate ();
      }

    return 0;
  }

  int done (void)
  {
    return (shutdown_ == 1);
  }

  int thread_id (Worker *worker)
  {
    return worker->thread_id ();
  }

private:
  int shutdown_;
  ACE_Thread_Mutex workers_lock_;
  ACE_Condition<ACE_Thread_Mutex> workers_cond_;
  ACE_Unbounded_Queue<Worker* > workers_;
  ACE_Activation_Queue queue_;
};

The Manager class is also almost the same, the only difference being the use of the ACE_Activation_Queue instead of the underlying message queue. The Manager also has a new public method, perform(), that clients can use to enqueue Method Requests onto the Manager's activation queue:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
  Manager tp;
  tp.activate ();

  ACE_Time_Value tv;
  tv.msec (100);

  // Wait for a few seconds every time you send a message.
  CompletionCallBack cb;
  LongWork workArray[OUTSTANDING_REQUESTS];
  for (int i = 0; i < OUTSTANDING_REQUESTS; i++)
    {
      workArray[i].attach (&cb);
      ACE_OS::sleep (tv);
      tp.perform (&workArray[i]);
    }

  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}

16.3 Leader/Followers Model

In this model, a single group of threads is used to wait for new requests and to handle the request. One thread is chosen as the leader and blocks on the incoming request source. When a request arrives, the leader thread first obtains the request, promotes one of the followers to leader status, and goes on to process the request it had received. The new leader waits on the request source for any new requests while the old leader processes the request that was just received. Once the old leader is finished, it returns to the end of thread pool as a follower thread.

The leader/followers model has the advantage that performance improves, as there is no context switch between threads. This also allows for keeping request data on the stack or in thread-specific storage.

This model also has some disadvantages.

It is not easy to handle bursty clients, as there might not be an explicit queueing layer.

• This model is more complex to implement.

The following simple example implements the leader/followers thread pool model. In this case, a single ACE_Task (the LF_ThreadPool class) encapsulates all the threads in the thread pool. Remember, there can be only one leader at any given time; the thread ID of the current leader of the pool is maintained in current_leader_. As in our first example, the LF_ThreadPool is given new work as an ACE_Message_Block on its message queue; that is, a unit of work is a message:


class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  LF_ThreadPool () : shutdown_(0), current_leader_(0)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));
  }

  virtual int svc (void);

  void shut_down (void)
  {
    shutdown_ = 1;
  }

private:
  int become_leader (void);

  Follower *make_follower (void);

  int elect_new_leader (void);

  int leader_active (void)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    return this->current_leader_ != 0;
  }

  void leader_active (ACE_thread_t leader)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    this->current_leader_ = leader;
  }

  void process_message (ACE_Message_Block *mb);

  int done (void)
  {
    return (shutdown_ == 1);
  }

private:
  int shutdown_;
  ACE_thread_t current_leader_;
  ACE_Thread_Mutex leader_lock_;
  ACE_Unbounded_Queue<Follower*> followers_;
  ACE_Thread_Mutex followers_lock_;
  static long LONG_TIME;
};

The svc() method for the LF_ThreadPool follows:


int
LF_ThreadPool::svc (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));
  while (!done ())
    {
      become_leader ();  // Block until this thread is the leader.

      ACE_Message_Block *mb = NULL;
      ACE_Time_Value tv (LONG_TIME);
      tv += ACE_OS::gettimeofday ();

      // Get a message, elect new leader, then process message.
      if (this->getq (mb, &tv) < 0)
        {
          if (elect_new_leader () == 0)
            break;
          continue;
        }

      elect_new_leader ();
      process_message (mb);
    }

  return 0;
}

As each thread starts, it first tries to become a leader by calling become_leader(). If it can't become a leader, the thread blocks in the become_leader() method and will return only once it has indeed become leader. Once a thread becomes leader, it blocks on the message queue until a message arrives. When it receives a new message, the leader first selects a new leader by calling elect_new_leader() and then processes the incoming message. The new leader will wake up from its previous become_leader() call and then block on the message queue, waiting for the next request:


int
LF_ThreadPool::become_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  if (leader_active ())
    {
      Follower *fw = make_follower ();
      {
        // Wait until told to do so.
        while (leader_active ())
          fw->wait ();
      }

      delete fw;
    }

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader ")));

  // Mark yourself as the active leader.
  leader_active (ACE_Thread::self ());
  return 0;
}

Follower*
LF_ThreadPool::make_follower (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
  Follower *fw;
  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
  this->followers_.enqueue_tail (fw);
  return fw;
}

When it calls become_leader(), a thread first checks whether there is already a current active leader. If there is, the thread creates a new Follower object, which is enqueued on the followers_ queue. The thread then calls wait() on the new follower object. The Follower is a thin wrapper around a condition variable. Once a follower thread wakes up, it is the new leader and returns in the svc() method as described previously:


int
LF_ThreadPool::elect_new_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  leader_active (0);


  // Wake up a follower
  if (!followers_.is_empty ())
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        follower_mon,
                        this->followers_lock_,
                        -1);
      // Get the old follower.
      Follower *fw;
      if (this->followers_.dequeue_head (fw) != 0)
        return -1;
      ACE_DEBUG ((LM_ERROR,
                  ACE_TEXT ("(%t) Resigning and Electing %d "),
                  fw->owner ()));
      return (fw->signal () == 0) ? 0 : -1;
    }
  else
    {
      ACE_DEBUG
        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left ")));
      return -1;
    }
}

To designate a new leader, the previous leader first changes the current_leader_ value to 0, indicating that no active leader is in the pool. The old leader then obtains a follower from the followers_ queue and wakes the Follower up by calling signal() on it. The blocked follower thread then wakes up, notices that there is no current leader, and marks itself as the current leader.

When it finishes processing a message, the leader thread once again calls the become_leader() method and tries to become the leader again. If it can't, it will become a follower and be enqueued in the followers_ queue.

16.4 Thread Pools and the Reactor

The ACE_Reactor has several implementations that you have seen previously in this book. Some of these implementations allow only a single owner thread to run the event loop and dispatch event handlers; others, however, allow you to have multiple threads run the event loop at once. The underlying reactor implementation builds a thread pool using these client-supplied threads and uses them to wait for and then dispatch events.

The two implementations that provide for this functionality are

ACE_TP_Reactor

ACE_WFMO_Reactor

16.4.1 ACE_TP_Reactor

The ACE_TP_Reactor uses the leader/followers model for its underlying thread pool. Thus, when several threads enter the event loop method, all but one become followers and wait in a queue. The leader thread waits for events. When an event occurs, the leader selects a new leader and dispatches the event handlers that had been signaled. Before selecting a new leader or dispatching events, the leader first suspends the handlers that it is about to call back. By suspending the handlers, it makes sure that they can never be invoked by two threads at the same time, thereby making our lives easier, as we don't have to deal with protection issues arising from multiple calls into our event handlers. Once it finishes the dispatch, the leader resumes the handler in the reactor, making it available for dispatch once again.

The following example is a rehash from an ACE test that illustrates the ACE_TP_Reactor:



int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_TP_Reactor sr;
  ACE_Reactor new_reactor (&sr);
  ACE_Reactor::instance (&new_reactor);

  ACCEPTOR acceptor;
  ACE_INET_Addr accept_addr (rendezvous);

  if (acceptor.open (accept_addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p "),
                       ACE_TEXT ("open")),
                      1);

  ACE_DEBUG ((LM_DEBUG,
             ACE_TEXT ("(%t) Spawning %d server threads... "),
             svr_thrno));

  ServerTP serverTP;
  serverTP.activate (THR_NEW_LWP | THR_JOINABLE, svr_thrno);

  Client client;
  client.activate ();

  ACE_Thread_Manager::instance ()->wait ();

  return 0;
}

Let's first look at how the ACE_TP_Reactor is set up. We create an ACE_TP_Reactor instance and set it up as the implementation class for the reactor by passing it to the constructor of the ACE_Reactor. We also set the global singleton instance of the ACE_Reactor to be the instance we create here. This makes it easy for us to obtain the reactor through the global singleton. After we do this, we open an acceptor and activate two tasks: one to represent the server thread pool that will run the reactive event loop (ServerTP) and the other to represent clients that connect to the server (Client):


static int
reactor_event_hook (ACE_Reactor *)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) handling events .... ")));

  return 0;
}

class ServerTP : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Running the event loop ")));

    int result =
      ACE_Reactor::instance ()->run_reactor_event_loop
        (&reactor_event_hook);

    if (result == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("(%t) %p "),
                         ACE_TEXT ("Error handling events")),
                        0);

    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Done handling events. ")));

    return 0;
  }
};

Once the ServerTP is activated, multiple threads start up in its svc() method. Each thread immediately starts the reactor's event loop. Note that you can use any of the reactor's event-handling methods here. As we described earlier, the reactor then takes charge, creating the thread pool, selecting the leader, and then dispatching events in multiple threads of control.

[7] contains a thorough explanation of how ACE_TP_Reactor works.

16.4.2 ACE_WFMO_Reactor

The ACE_WFMO_Reactor is available on the Windows platform and uses the WaitForMultipleObjects() function to wait for events. Its use is similar to the ACE_TP_Reactor, although there are a few noteworthy differences.

The ACE_WFMO_Reactor does not suspend and resume handlers the way the ACE_TP_Reactor does. This means that you have to ensure that state information is protected in your event handlers.

• The ACE_WFMO_Reactor uses the concept of ownership to choose one thread that will handle timeouts. If none of the threads that are running the event loop are designated as the owner, using the owner() method, timeouts may not be handled properly.

• State changes to the ACE_WFMO_Reactor are not immediate but are delayed until the reactor reaches a “stable state.” (Under the covers, when you make a change, the leader thread is informed and this thread makes the state changes.)

[7] contains a thorough explanation of how ACE_WFMO_Reactor works.

16.5 Summary

In this chapter, we looked at thread pools. We first talked about a few design models for thread pools, including the leader/followers and half-sync/half-async models. We then implemented each of these, using the ACE_Task and synchronization components. We also saw how to use ACE_Activation_Queue, ACE_Future, and ACE_Future_Observer to implement pools that can execute arbitrary ACE_Method_Requests and then allow for clients to obtain results, using Futures. Finally, we talked about the two reactor implementations that support thread pools: the ACE_TP_Reactor and the ACE_WFMO_Reactor.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.11.98