Chapter 11. Threads

Threads

Introduction

A variety of techniques can be used to solve a problem. One common programming methodology is to divide the problem into smaller specific subtasks. Then, given the nature of the subtasks and the sequence in which they must be done, parcel them out to separate processes (generated via a fork system call). Then, if needed, have the processes communicate with one another using interprocess communication constructs. When each of the subtasks (processes) is finished, its solved portion of the problem is integrated into the whole to provide an overall solution. Indeed, this approach is the basis for distributed computation.

However, generating and managing individual processes consume system resources. As we have seen in previous chapters, there is a considerable amount of system overhead associated with the creation of a process. During its execution lifetime, the system must keep track of the current state, program counter, memory usage, file descriptors, signal tables, and other details of the process. Further, when the process exits, the operating system must remove the process-associated data structures and return the process's status information to its parent process, if still present, or to init. If we add to this the cost of the process communicating with other processes using standard interprocess communication facilities, we may find this approach too resource expensive.

Aware of such limitations, the designers of modern versions of UNIX anticipated the need for constructs that would facilitate concurrent solutions but not be as system-intensive as separate processes. As in a multitasking operating system where multiple processes are running concurrently (or pseudo-concurrently), why not allow individual processes the ability to simultaneously take different execution paths through their process space? This idea led to a new abstraction called a thread. Conceptually, a thread is a distinct sequence of execution steps performed within a process. In a traditional setting, with a UNIX process executing, say, a simple C/C++ program, there is a single thread of control. At any given time there is only one execution point (as referenced by the program counter). The thread starts at the first statement in main and continues through the program logic in a serial manner until the process finishes. In a multithreading (MT) setting, there can be more than one thread ofcontrol active within a process, each progressing concurrently. Be aware thatsome authors use the term multithreaded program to mean any program that uses two or more processes running concurrently that share memory and communicate with one another (such as what we did in Chapter 8). If the processes (with their threads) are executing and making progress simultaneously on separate processors, parallelism occurs.

There are certain problem types that lend themselves to a multithreaded solution. Problems that inherently consist of multiple, independent tasks are prime candidates. For example, monitor and daemon programs that manage a large number of simultaneous connections, concurrent window displays, and similar processes can be written using threads. Similarly, producer—consumer and client—server type problems can have elegant multithreaded solutions. Additionally, some numerical computations (such as matrix multiplication) that can be divided into separate subtasks also benefit from multithreading. On the other side of the coin, there are many problems that must be done in a strict serial manner for which multithreading may produce a less efficient solution.

Each thread has its own stack, register set, program counter, thread-specific data, thread-local variables, thread-specific signal mask, and state information. However, all such threads share the same address space, general signal handling, virtual memory, data, and I/O with the other threads within the same process. In a multithreaded process each thread executes independently and asynchronously. In this setting communication between threads within the same process is less complex, as each can easily reference common data. However, as expected, many tasks handled by threads have sequences that must be done serially. When these sequences are encountered, synchronization problems concerning data access—updating—can arise. Most commonly the operating system uses a priority-based, preemptive, non-time-slicing algorithm to schedule thread activities. With preemptive scheduling the thread executes until it blocks or has used its allotted time. The term sibling is sometimes used to denote other peer threads, as there is no inherent parent/child relationship with threads.

Figure 11.1 compares communications for multiple processes, each with a single thread, with that of a single process with multiple threads.

Conceptual communications—multiple single-threaded processes versus a single process with multiple threads.

Figure 11.1. Conceptual communications—multiple single-threaded processes versus a single process with multiple threads.

At system implementation level there are two traditional approaches ormodels used to generate and manage threads. The first, the user-level model, runs on top of the existing operating system and is transparent to it. Theoperating system maintains a runtime system to manage thread activities. Library functions and system calls made within the thread are wrapped in special code to allow for thread run-time management.[1] Threads implemented in this manner have low system overhead and are easily extensible should additional functionality be needed. On occasion, the operating system may have difficulty efficiently managing a user-level-implemented thread that is not time sliced and contains CPU-intensive code. More importantly, user-level threads are designed to share resources with other threads within their process space when running on a single processor.

The second approach is the kernel-level model. In this implementation, the operating system is aware of each thread. While the management of kernel-level threads is less intensive than that of individual processes, it is still more expensive than user-level-based threads. A kernel that offers direct thread support must contain system-level code for each specified thread function. However, kernel-level threads can support parallelism with multiple threads running on multiple processors.

In an attempt to incorporate the best of both approaches, many thread implementations are composites (hybrids). In one implementation the system binds each user-level thread to a kernel-level thread on a one-to-one basis. Windows NT and OS/2 use this approach. Another approach is to have the system multiplex user-level threads with a single kernel-level thread (many-to-one). A more common composite model relies on the support of multiple user-level threads with a pool of kernel-level threads. The kernel-level threads are run as lightweight processes (LWP) and are scheduled and maintained by the operating system. In a multiprocessor setting some operating systems allow LWPs to be bound to a specific CPU.

Older versions of Linux supported user-level threads. In newer versions of Linux there are kernel threads. However, these threads are used by the operating system to execute kernel-related functions and are not directly associated with the execution of a user's program. Multithreaded support is offered through LWPs. Linux uses a system specific (non-portable) clone system call to generate an LWP for the thread. Threaded applications should use one of the standardized thread APIs described in the next paragraph. By comparison, Sun's Solaris uses a variation of the kernel-level scheme whereby the user can specify a one-to-one binding or use the default many-to-many mapping. While Sun's approach is more flexible, it is more complex to implement and requires the interaction of two scheduling routines (user and kernel).

From a programming standpoint, there is a variety of thread APIs. Two of the more common are POSIX (Portable Operating System Interface) Pthreads and Sun's thread library (UNIX International, or UI). By far, POSIX threads (based on POSIX 1003.1 standards generated by the IEEE Technical Committee on Operating Systems) are the most widely implemented thread APIs. The POSIX standard 1003.1c is an extension of the 1003.1 and includes additional interface support for multi-threading.

In current versions of Linux (such as Red Hat 5.0 and later) the POSIX 1003.1c thread API is implemented using the LinuxThreads library, which isincluded in the glibc2 GNU library. In the Sun Solaris environment, both the POSIX and Sun thread libraries are supported, since Sun's effort at producing thread libraries predates the POSIX standards and many of the POSIX constructs have a strong Sun flavor. We will concentrate on POSIX threads, asthey are guaranteed to be fully portable to other environments that are POSIX-compliant. The POSIX thread functions, except for semaphore manipulation, begin with the prefix pthread, while POSIX thread constants begin with the prefix PTHREAD_. The thread-related functions (over 100) are found in section 3thr of the manual pages.

Note that all programs using POSIX thread functions need the preprocessor directive #include <pthread.h> for the inclusion of thread prototypes. If the code will contain reentrant functions,[2] the defined constant #define _REENTRANT should be placed prior to all includes and program code. A list of supported reentrant functions can be found by interrogating the /usr/lib/libc.a library. For example,

linux$  ar t /usr/lib/libc.a  |  grep '_r.o'  |  pr -n -2 -t

      1   random_r.o                     32   getspnam_r.o
      2   rand_r.o                       33   sgetspent_r.o
      3   drand48_r.o                    34   fgetspent_r.o
      4   erand48_r.o                    35   getnssent_r.o
      5   lrand48_r.o                    36   gethstbyad_r.o
      6   nrand48_r.o                    37   gethstbynm2_r.o
      7   mrand48_r.o                    38   gethstbynm_r.o
      8   jrand48_r.o                    39   gethstent_r.o
      9   srand48_r.o                    40   getnetbyad_r.o
     10   seed48_r.o                     41   getnetent_r.o
     11   lcong48_r.o                    42   getnetbynm_r.o
     12   tmpnam_r.o                     43   getproto_r.o
     13   strtok_r.o                     44   getprtent_r.o
     14   ctime_r.o                      45   getprtname_r.o
     15   readdir_r.o                    46   getsrvbynm_r.o
     16   readdir64_r.o                  47   getsrvbypt_r.o
     17   getgrent_r.o                   48   getservent_r.o
     18   getgrgid_r.o                   49   getrpcent_r.o
     19   getgrnam_r.o                   50   getrpcbyname_r.o
     20   fgetgrent_r.o                  51   getrpcbynumber_r.o
     21   getpwent_r.o                   52   ether_aton_r.o
     22   getpwnam_r.o                   53   ether_ntoa_r.o
     23   getpwuid_r.o                   54   getnetgrent_r.o
     24   fgetpwent_r.o                  55   getaliasent_r.o
     25   getlogin_r.o                   56   getaliasname_r.o
     26   ttyname_r.o                    57   nscd_getpw_r.o
     27   mntent_r.o                     58   nscd_getgr_r.o
     28   efgcvt_r.o                     59   nscd_gethst_r.o
     29   qefgcvt_r.o                    60   getutent_r.o
     30   hsearch_r.o                    61   getutid_r.o
     31   getspent_r.o                   62   getutline_r.o

As shown, reentrant functions have _r appended to their non-reentrant name.

The object code library for POSIX thread functions must be linked in at compile time using the compiler switch -lpthread. Additionally, _POSIX_C_SOURCE[3] should be defined if strict compliance to POSIX standards is required. For example,

linux$ cat demo.c

#define _REENTRANT
#include <pthread.h>
.
.
.
linux$ g++ demo.cxx -D_POSIX_C_SOURCE -lpthread

Threads and their use are a complex topic. This chapter presents the basics of POSIX threads—their generation, scheduling, synchronization, and use. Readers wishing to gain additional insight on the topic are encouraged to read current texts that address thread programming, such as Comer et al., 2001; Nichols et al., 1996; Kleiman et al., 1996; Mitchell et al., 2000; Norton et al., 1997; Lewis et al., 1996; Northrup, 1996; and Wall, 2000, as well as the online documentation (manual pages) and vendor-supplied system support documentation.

Creating a Thread

Every process contains at least one main or initial thread of control created by the operating system when the process begins to execute. The library function pthread_create is used to add a new thread of control to the current process. The new thread executes in concert with the other threads within the process and, if directed, with other threads of control in other processes. The syntax for pthread_create is shown in Table 11.1.

There are four pointer-based arguments for pthread_create.[4] The first, *thread, is a reference to a pthread_t type object (an unsigned integer that is typedefed as the data type pthread_t). Upon successful completion of the pthread_create call, *thread will reference aunique (to this process only) integer thread ID (TID) value. If this argument is set to NULL, the generated thread ID will not be returned. The second argument, indicated as *attr, references a dynamically allocated attribute structure. This structure contains a single void pointer (as shown in the code sequence below lifted from the /usr/include/bits/pthreadtypes.h file).

Table 11.1. The pthread_create Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int  pthread_create(pthread_t  *thread,
                    pthread_attr_t *attr,
                    void *(*start_routine)(void *),
                    void *arg);

Return

Success

Failure

Sets errno

0

Nonzero

 
/*    Attributes for threads.   */

typedef struct __pthread_attr_s {
  int __detachstate;
  int __schedpolicy;
  struct __sched_param __schedparam;
  int __inheritsched;
  int __scope;
  size_t __guardsize;
  int __stackaddr_set;
  void *__stackaddr;
  size_t __stacksize;
} pthread_attr_t;

Attributes govern how the thread behaves. Attributes include stack size—address; scheduling policy—priority and detached state. If the *attr value is set to NULL, the new thread uses the system defaults for its attributes. If the user wants to modify the default attributes prior to thread creation, he or sheshould call the library function pthread_attr_init. The pthread_attr_init library function and related attribute setting functions are covered in a following section.

The third argument for pthread_create is a reference to a user-defined function that will be executed by the new thread. The user-defined function should be written to return a pointer to a void and have a single void pointer argument. If the return type of the user-defined function is a pointer, but it is not of type void, use a typecast (e.g., (void * (*)()) ) to pass the function reference and keep the compiler from complaining. A reference to the actual argument to be passed to the user-defined function is the fourth argument for pthread_create. As with the user-defined function reference, this argument is also a void pointer. If multiple arguments are to be passed to the user-defined function, a structure containing the arguments should be statically allocated and initialized.[5] The reference to the structure should be cast to a void pointer when pthread_create is called.

Once a thread is created, it has its own set of attributes and an execution stack. It inherits its signal mask (which it then can alter) and scheduling priority from the calling program (the initiating thread). It does not inherit any pending signals. If needed, a thread can allocate its own storage for thread-specific data.

If the pthread_create call is successful, it returns a value of 0 and sets the *thread reference to a unique ID for this process. If the call fails, it returns a nonzero value. POSIX thread functions do not routinely set errno when they fail, but instead return a nonzero value, which indicates the source of the error encountered. However, errno is set when a library function or system call fails in a code segment being executed by the thread. In these cases the errno value is thread-specific. If the pthread_create call fails and returns the value EAGAIN (11), it indicates a system-imposed limit—for example, the total number of threads has been exceeded. A newly created thread (which in Linux is directly tied to an LWP) begins with the execution of the referenced user-defined function. The thread continues to execute until

  • the function completes (implicitly or explicitly).

  • a call is made to pthread_exit.

  • the thread is canceled with a call to pthread_cancel.

  • the process that created the thread exits (implicitly or explicitly).

  • one of the threads performs an exec.

Exiting a Thread

The pthread_exit library call terminates a thread in much the same manner as a call to exit terminates a process. The pthread_exit library call is shown in Table 11.2.

The pthread_exit library call takes a single argument, a reference to a retval value. This reference is returned when a nondetached thread is exited. Upon termination, the thread releases its thread-specific (but not process-specific) resources. If the thread was nondetached, its status information and thread ID are kept by the system until a join is issued or the creating process terminates. When the function being executed by the thread performs a return (implicitly or explicitly), the system implicitly calls pthread_exit.

Table 11.2. The pthread_exit Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

void pthread_exit (void * retval);

Return

Success

Failure

Sets errno

This call does not return

  

In Chapter 3, Section 5, “Ending a Process,” the atexit library function was presented. This function allows the user to specify one or more user-defined functions to be called in a LIFO (last-in, first-out) manner when the process exits. In asimilar vein there is a small suite of pthread cleanup calls that can be usedto specify and manipulate user-defined functions that are called when athread exits. In this grouping are the calls pthread_cleanup_pop, whichremoves a function from the cancellation cleanup stack, and thread_cleanup_push, which pushes a function on the cancellation stack of the current thread. Additionally, nonportable versions of these functions (called pthread_cleanup_pop_restore_np and pthread_cleanup_push_defer_np) are provided. A full discussion of these functions is beyond the scope of this text.

Basic Thread Management

Once a thread is created, we can direct the calling process to wait until the thread is finished (it calls pthread_exit or is cancelled). This is accomplished with a call to the pthread_join library function shown in Table 11.3.

The first argument is a valid thread ID (as returned from the pthread_create call). The specified thread must be associated with the calling process and should not be specified as detached. The second argument, **status, references a static location where the completion status of the waited upon thread will be stored. The status value is the value passed to pthread_exit, or the value returned when the function code reaches a return statement.[6] If the second argument to pthread_create is set to NULL, the status information will be discarded.

Table 11.3. The pthread_join Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_join( pthread_t target_thread,
                  void **status );

Return

Success

Failure

Sets errno

0

Nonzero

 

There are some caveats associated with joining threads. A thread should bewaited upon (joined) by only one other thread. The thread issuing the joindoes not need to be the initiating thread. If multiple threads wait for the same thread to complete, only one will receive the correct status information. The joins in competing threads will return an error. Should the thread initiating the join be canceled, the waited upon thread can be joined by another thread.[7] If the targeted thread has terminated prior to the issuing of the call to pthread_join, the call will return immediately and will not block. Last, but certainly not least, a nondetached thread (which is the default) that is not joined will not release its resources when the thread finishes and will only release its resources when its creating process terminates. Such threads can be the source of memory leaks. If pthread_join is successful, it returns a 0; otherwise, it returns a nonzero value. The return of ESRCH (3) means an undetached thread for the corresponding thread ID could not be found or the thread ID was set to 0. The return of EINVAL (22) means the thread specified by the thread ID is detached or the calling thread has already issued a join for the same thread ID. If EDEADLK (35) is returned, it indicates a deadlock situation has been encountered.

The process of joining a thread is somewhat analogous to a process waiting on a forked child process. However, unlike a forked child process, a thread can become detached with a single library call. When a detached thread finishes, its resources are automatically returned to the system. The pthread_detach library call (Table 11.4) is used to dynamically detach a joinable thread. In a later section the generation of a detached thread using a thread attribute object will be addressed.

Table 11.4. The pthread_detach Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_detach (pthread_t threadID);

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_detach library function accepts a thread ID as its only argument. If successful, the call to pthread_detach detaches the indicated thread and returns a 0 value. If the call fails, the indicated thread is not detached, and a nonzero value is returned. The value EINVAL (22) is returned if an attempt to detach an already detached thread is made, or ESRCH (3) is returned if no such thread ID is found. Remember, once a thread is detached, other threads can no longer synchronize their activities based on its termination.

Program 11.1 uses the pthread_create and pthread_join library calls to create and join several threads.

Example 11.1. Creating and joining threads.

File : p11.1.cxx
  |     /*
  |              Creating and joining threads
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstdlib>
  |     #include <pthread.h>
 10     #include <sys/types.h>
  |     #include <sys/time.h>
  |     #include <unistd.h>
  |     using namespace std;
  |     int MAX=5;
  +     inline int my_rand( int, int );
  |     void       *say_it( void * );
  |     int
  |     main(int argc, char *argv[]) {
  |       pthread_t       thread_id[MAX];
 20       int     status, *p_status = &status;
  |       setvbuf(stdout, (char *) NULL, _IONBF, 0);
  |       if ( argc > MAX+1 ){                  // check arg list
  |         cerr << *argv << " arg1, arg2, ... arg" << MAX << endl;
  |         return 1;
  +       }
  |       cout << "Displaying" << endl;
  |       for (int i = 0; i < argc-1; ++i) {    // generate threads
  |        if( pthread_create(&thread_id[i],NULL,say_it,(void *)argv[i+1]) > 0){
  |          cerr << "pthread_create failure" << endl;
 30          return 2;
  |        }
  |       }
  |       for (int i=0; i < argc-1; ++i){      // wait for each thread
  |         if ( pthread_join(thread_id[i], (void **) p_status) > 0){
  +           cerr << "pthread_join failure" << endl;
  |           return 3;
  |         }
  |         cout << endl << "Thread " << thread_id[i] << " returns "
  |              << status;
 40       }
  |       cout << endl << "Done" << endl;
  |       return 0;
  |     }
  |     //   Display the word passed a random # of times
  +     void *
  |     say_it(void *word) {
  |       int numb = my_rand(2,6);
  |       cout << (char *)word << "	 to be printed " << numb
  |            << " times." << endl;
 50       for (int i=0; i < numb; ++i){
  |         sleep(1);
  |         cout << (char *) word << " ";
  |       }
  |       return (void *) NULL;
  +     }
  |     //   Generate a random # within given range
  |     int
  |     my_rand(int start, int range){
  |       struct timeval t;
 60       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
  |     }

When Program 11.1 executes, it examines the number of arguments passed in on the command line. For each argument (up to a limit of 5), the program creates a separate thread. As each thread is created, its thread ID is saved in the thread_id array. As written, the program passes pthread_create a NULL value as its second argument; therefore, each thread created has the default set of attributes. The user-defined function say_it is passed as the starting point for each thread. The appropriate command-line argument is passed to the say_it function as the fourth argument of pthread_create.[8] Following the creation of the threads, the program waits for the threads to finish using the pthread_join library function call. The value returned from each thread and its thread ID is displayed.

The user-defined say_it function is used to display the passed-in sequence of characters a random number of times. At the start of the say_it function, a random value is calculated. The library functions srand and rand that we have used previously are not used, as they are not safe to use in a multiple thread setting. However, there is a library function, rand_r, that is multithread-safe. The rand_r library function is incorporated into a user-defined inline function called my_rand. In the my_rand function the number of elapsed microseconds since 00:00 Universal Coordinated Time, January 1, returned by the gettimeofday[9] library function, is used as a seed value for rand_r. The value returned by rand_r is then adjusted to fall within the specified limits. The calculated random value and the sequence of characters to be displayed are shown on the screen. Finally, a loop is entered, and for the calculated number of times, the function sleeps one second and then prints the passed-in sequence of characters. A compilation sequence and run of the program is shown in Figure 11.2.

Example 11.2. A Compilation and run of Program 11.1.

linux$ g++  p11.1.c -o p11.1 -lpthread
linux$ p11.1 s p a c e
Displaying
s        to be printed 5 times.
p        to be printed 5 times.
a        to be printed 5 times.
c        to be printed 3 times.
e        to be printed 3 times.
s p a c e s p a c e s e p a c s p a a s p
Thread 1026 returns 0
Thread 2051 returns 0
Thread 3076 returns 0        <-- 1
Thread 4101 returns 0
Thread 5126 returns 0
Done
  • (1)Each of these threads is supported by an LWP. Each LWP has its own process ID as well as its thread ID.

In this run, Program 11.1 was passed five command-line arguments: s, p, a, c, e. The program creates five new threads, one for each argument. The number of times each argument will be printed is then displayed. The request to print this information was one of the first lines of code in the user-defined function say_it (see line 48). As shown, all five threads process this statement prior to any one of the threads displaying its individual words. This is somewhat misleading. If we move the sleep statement in the for loop of the say_it function to be after the print statement within the loop, we should see the initial output statements being interspersed with the display ofeach word. If we count the number of words displayed, we will find they correspond to the number promised (e.g., letter s is displayed five times, etc). A casual look at the remainder of the output might lead one to believe the threads exit in an orderly manner. The pthread_join's are done in a second loop in the calling function (main). Since the thread IDs are passed to pthread_join in order, the information concerning their demise is also displayed in order. Viewing the output, we have no way to tell which thread ended first (even though it would seem reasonable that one of the threads that had to display the fewest number of words would be first). When each thread finishes with the say_it function, it returns a NULL. This value, which is picked up by the pthread_join, is displayed as a 0. The return statement in the say_it function can be replaced with a call to pthread_exit. However, if we replace the return with pthread_exit, most compilers will complain that no value is returned by the say_it function, forcing us to include the return statement even if it is unreachable! If we run this program several times, the output sequences will vary.

As written, the display of each word (command-line argument) is preceded by a call to sleep for one second. In the run shown in Figure 11.3, sleep is called 19 times (7 for f, 5 for a, etc.). Yet, the length of time it takes for the program to complete is far less than 19 seconds. This is to be expected, as each thread is sleeping concurrently. We can verify the amount of time used by the program using the /usr/bin/time[10] utility. Several reruns of Program 11.1 using the /usr/bin/time utility confirms our conjecture.

Example 11.3. Timing a run of Program 11.1.

linux$ /usr/bin/time -p p11.1 f a c e
Displaying
f        to be printed 7 times.
a        to be printed 5 times.
c        to be printed 3 times.
e        to be printed 4 times.
f a c e f a c e f e c a f e a af  f f
Thread 1026 returns 0
Thread 2051 returns 0
Thread 3076 returns 0
Thread 4101 returns 0
Done
real 7.07        <-- 1
user 0.00        <-- 2
sys  0.02        <-- 3
  • (1)Elapsed real time (in seconds).

  • (2)CPU seconds in user mode

  • (3)CPU seconds in kernel mode

Thread Attributes

In a POSIX implementation, if we want to generate a thread that does not have the default attributes (obtained by setting the second parameter of the pthread_create call to NULL), an attribute object is used. To use an attribute object, it must first be initialized. This is accomplished using the library call pthread_attr_init (see Table 11.5).

Table 11.5. The pthread_attr_init Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_attr_init ( pthread_attr_t *attr );

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_attr_init library function has a single argument, a reference to a previously allocated pthread_attr_t type object. If the call is successful, it returns a 0 and initializes the referenced attribute object with the default value for each attribute (see Table 11.6). A return of ENOMEM (12) indicates the system does not have sufficient memory to initialize the thread attribute object.

Once initialized, individual attribute values can be modified (see the following discussion). The attribute object is passed as the second argument to the pthread_create call. The newly created thread will have the specified attributes. The attribute object is independent of the thread, and changes to the attribute object after a thread has been created are not reflected in existing threads. Once established, a thread attribute object can be used in the generation of multiple threads. Thread attributes and their default values are shown in Table 11.6.

Table 11.6. Thread Attributes and Default Settings.

Attribute

Default

Comments

detachstate

PTHREAD_CREATE_JOINABLE

A nondetached thread that can be joined by other threads. The thread's resources are not freed until a call is made to pthread_join or the calling process exits.

inheritsched

PTHREAD_EXPLICIT_SCHED

Indicates whether or not scheduling attributes are inherited from parent thread or set explicitly by the attribute object.

schedparam

0

Scheduling parameters (priority).

schedpolicy

SCHED_OTHER

Scheduling is determined by the system (most often some sort of timesharing). Note the missing PTHREAD_prefix.

scope

PTHREAD_SCOPE_SYSTEM

Scope of scheduling contention—with all threads in same process or all processes in the system.

As presented, if the user wants a thread to have different characteristics, he or she should first initialize the attribute object using the pthread_attr_init library call and then change the attributes he or she wants to be different. Each attribute listed in Table 11.7 has an associated pthread_attr_setxxx and pthread_attr_getxxx function call that will act upon the attribute object.

Table 11.7. Thread Attribute Set and Get functions

Attribute

Set and get Calls

Defined Constants[11] for the 2nd setxxx parameter

 
int pthread_attr_setdetachstate (
    pthread_attr_t *attr,
    int detachstate);

PTHREAD_CREATE_JOINABLE

detachstate

  
 
int pthread_attr_getdetachstate (
    const pthread_attr_t *attr,
    int *detachstate);

PTHREAD_CREATE_DETACHED

 
int pthread_attr_setinheritsched (
    pthread_attr_t *attr,
    int inheritsched

PTHREAD_EXPLICIT_SCHED

inheritsched

  
 
int pthread_attr_getinheritsched (
    const pthread_attr_t *attr,
    int *inheritsched) ;

PTHREAD_INHERIT_SCHED

 
int pthread_attr_setschedparam (
    pthread_attr_t *attr,
    const struct sched_param *param);

0

Schedparam

 

Reference to valid sched_param[12] structure with its sched_priority member assigned a valid priority.

 
int pthread_attr_getschedparam (
    pthread_attr_t *attr,
    const struct sched_param *param);
 
int pthread_attr_setschedpolicy (
    pthread_attr_t *attr,
    int policy);

SCHED_OTHER[13]

SCHED_FIFO

SCHED_RR

Schedpolicy

  
 
int pthread_attr_getschedpolicy (
    const pthread_attr_t *attr,
    int *policy);
 
 
int pthread_attr_setscope (
    pthread_attr_t *attr,
    int contentionscope);

PTHREAD_SCOPE_SYSTEM

Scope

  
 
int pthread_attr_getscope (
    const pthread_attr_t *attr,
    int *contentionscope);

PTHREAD_SCOPE_PROCESS

[11] The highlighted values are the default.

[12] The sched_param structure is found in the include file <sched.h>, which is included by the <pthread.h> file.

[13] Only processes with superuser privileges can specify SCHED_FIFO or SCHED_RR.

If in Program 11.1 we wanted to use the thread attribute object to indicate our threads be detached (that is, the thread would exit as soon as it has completed its task rather than for a join to be done in the calling function), we would add and modify the following program statements:[14]

  .
  .
  .
  |     int
  |     main(int argc, char *argv[]) {
  |       pthread_t       thread_id[MAX];
 20       int     status, *p_status = &status;
  |       pthread_attr_t  attr_obj;               // allocate attribute object
  |       setvbuf(stdout, (char *) NULL, _IONBF, 0);
  .
  .
  .
  |       cout << "Displaying" << endl;
  |                                               // Allocate & set atrib. obj
  |       pthread_attr_init( &attr_obj);
  |       pthread_attr_setdetachstate( &attr_obj, PTHREAD_CREATE_DETACHED );
  |       for (int i = 0; i < argc-1; ++i) {      // generate threads
  .
  .
  .

The set and get attribute calls return a 0 if they are successful. If they fail, they return EINVAL (22) if passed an invalid parameter or ENOTSUP (95)[15] if the parameter specifies an unsupported feature.

Scheduling Threads

We have alluded to scheduling in the previous sections. As a subject, scheduling is complex and is often the topic of an entire chapter or more in an operating systems text. Understanding the Linux Kernel provides an excellent in-depth presentation of Linux scheduling. Essentially, scheduling is used to determine which ready-to-run task the CPU will execute next. A check of any good operating systems text will reveal a variety of scheduling policies, some of the more common of which are

  • First come, first served—. first to request service is processed first (also called a FIFO arrangement).

  • Shortest job first—. the task requiring least amount of processing is done first.

  • Priority-based—. tasks with higher priority are done first.

  • Round-robin—. each task gets a small time slice; tasks reside in a circular queue until they are finished.

Furthermore, many of these strategies can be implemented as nonpreemptive (once the task begins, it goes to completion) or as preemptive (the task can be removed by a task of a higher priority). In current operating systems preemption is the norm. Keep in mind, as noted in Chapter 1, processes can be in user mode or kernel mode. Traditionally, a process running on a single processor system is nonpreemptive when it is in kernel mode.

Similar to processes, threads can be in one of several states. A very simplified thread state diagram is shown in Figure 11.4. As shown, a thread can be ready to run (able to run if selected); running (active); or blocked (waiting on some other event, say I/O or a wake-up, to occur).

High-level thread state diagram.

Figure 11.4. High-level thread state diagram.

In Linux the scheduling of a POSIX thread is determined by two factors: priority and scheduling policy.

The system uses the static priority value of a thread to assign it to a ready-to-run list. The static priority values for a Linux system range from 0 to 99, with 0 being the lowest priority. When dealing with a POSIX thread, higher priority threads are scheduled before those with a lower priority. Usually, the priority of a thread is inherited from its creating thread. However, the priority of a thread can be set at creation time by modifying the thread attribute object before the thread is created. Conceptually, the system maintains a separate list for each static priority value. The library calls sched_get_priority_max and sched_get_priority_min can be used to determine the actual priority limits for a specific scheduling policy.

POSIX specifies three scheduling policies for threads. The first policy, SCHED_OTHER, is used for normal (conventional) processes. The remaining two policies, SCHED_FIFO and SCHED_RR, are for real-time (time-dependent) applications. Implementation-wise, as all scheduling is basically preemptive, these policies are used by the system to determine where threads are inserted in a list and how they move within the list.

  • SCHED_OTHER—. the system default. This is a time-sharing policy where competing threads with the same static priority (0) are multiplexed (each receiving a time slice) according to their dynamic priority. A thread's dynamic priority is based on its nice level, which is established at runtime. The nice level can be changed using the setpriority system call. The thread's dynamic priority is increased each time it is ready to run but is denied access to the CPU.

  • SCHED_FIFO—. a first in, first out scheduling policy. SCHED_FIFO threads have a static priority greater than 0. The highest priority, longest waiting thread becomes the next thread to execute. The thread will execute until it finishes, is blocked (such as by an I/O request), is preempted, or yields (calls sched_yield). Initially, ready-to-run SCHED_FIFO scheduled threads are placed at the end of their list. If a SCHED_FIFO thread is preempted, it remains at the head of its list and resumes its execution when all higher priority threads have finished or blocked. A SCHED_FIFO thread that yields the processor is placed at the end of its list.

  • SCHED_RR—. Round-robin scheduling. This is similar to SCHED_FIFO with a time slicing factor (quantum) added in. When a thread has run for a period of time equal to it quantum (and has not finished) it is placed at the end of its list. If a SCHED_RR scheduled process is preempted, as with SCHED_FIFO, it stays at the head of its list, but when recalled, only gets to complete the remaining portion of its quantum.

A scheduling policy and priority of a thread that already exists can be set with the pthread_setschedparam library function, detailed in Table 11.8.

Table 11.8. The pthread_setschedparam Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_setschedparam(
    pthread_t target_thread, int policy,
    const struct sched_param *param );

Return

Success

Failure

Sets errno

0

Nonzero

 

The first argument of pthread_setschedparam is a valid thread ID. The second argument should be one of the following defined constants: SCHED_OTHER, SCHED_FIFO, or SCHED_RR (see previous discussion). The third argument for this call is a reference to a sched_param structure (examine the include file <sched.h> for definition details).

If successful, the pthread_setschedparam library returns a 0; otherwise, it returns one of the following values: EPERM (1), the calling process does not have superuser privileges; ESRCH (3), the specified thread does not exist; EFAULT (14), param references an illegal address; or EINVAL (22), invalid policy or param value or priority value is inconsistent with scheduling policy. Again, it is important to note that only the superuser can specify SCHED_FIFO or SCHED_RR scheduling (and thus change a thread's static priority to something other than 0); others are left with the system default SCHED_FIFO with a static priority of 0.

Program 11.2 demonstrates the use of the pthread_setschedparam, pthread_getschedparam, sched_get_priority_max, and sched_ get_priority_min functions. In main a scheduling policy is specified and the system's maximum and minimum priority values for the policy are displayed. The policy and an arbitrarily calculated priority are assigned to a parameter structure, which is passed to the threadfunc function. When a new thread is created, the threadfunc function calls pthread_setschedparam to set the scheduling policy and priority. To confirm the changes have actually been made, the pthread_getschedparam library function is used to retrieve the current scheduling and thread priority settings. The returned results are displayed.

Example 11.2. Manipulating scheduling parameters.

File : p11.2.cxx
  |     /*
  |        Changing scheduling parameters.
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstring>
  |     #include <pthread.h>
 10     #include <sched.h>
  |     #include <unistd.h>
  |     using namespace std;
  |     char *p[] = {"OTHER ","FIFO ","RR "};
  |     struct parameter {                           // data to pass
  +       int   policy;                              // new policy
  |       int   priority;                            // new priority
  |     };
  |     void *threadfunc( void *);
  |     int
 20     main( ) {
  |       pthread_t        t_id;
  |       struct parameter parm;
  |       int              status;
  |       setvbuf(stdout, (char *)NULL, _IONBF, 0);  // non-buffered output
  +       for( int i=0; i < 3; ++i ){                // display limits
  |         cout << "Policy SCHED_" << p[i] << "	 MAX = "
  |              << sched_get_priority_max(i);
  |         cout << " MIN = " << sched_get_priority_min(i) << endl;
  |         parm.policy  = i;                        // assign data to pass
 30         parm.priority= (i+1) * 2;                // make up a priority
  |         status=pthread_create( &t_id, NULL, threadfunc, (void *)&parm );
  |         sleep(1);
  |       }
  |       return 0;
  +     }
  |     void *
  |     threadfunc( void *d ) {
  |       struct    sched_param  param;              // local to this function
  |       int       policy;
 40       parameter *p_ptr=(parameter *)d;           // cast to access
  |       param.sched_priority = p_ptr->priority;    // passed data value
  |                                                  // set new scheduling
  |       pthread_setschedparam(pthread_self(), p_ptr->policy, &param );
  |       memset(&param, 0, sizeof(param));              // clear
  +                                                      // retrieve
  |       pthread_getschedparam(pthread_self(), &policy, &param );
  |       cout << "In thread with policy = SCHED_" << p[policy]
  |            << " 	priority = " << (param.sched_priority)
  |            << " effective ID = " << geteuid() << endl;
 50       return NULL;
  |     }

Figure 11.5 shows two runs of Program 11.2. In the first the effective ID of the user is 0 (that of the superuser). As a result, the requested scheduling changes are implemented. In the second run the effective ID of the user is 500. The requested changes are not made in the thread and default remains in effect.

Example 11.5. Running Program 11.2 as root and as a regular user.

linux#  p11.2
Policy SCHED_OTHER       MAX = 0 MIN = 0
In thread with policy = SCHED_OTHER     priority = 0 effective ID = 0         <-- 1
Policy SCHED_FIFO        MAX = 99 MIN = 1
In thread with policy = SCHED_FIFO      priority = 4 effective ID = 0
Policy SCHED_RR          MAX = 99 MIN = 1
In thread with policy = SCHED_RR        priority = 6 effective ID = 0
.
.
.
linux$ p11.2
Policy SCHED_OTHER       MAX = 0 MIN = 0
In thread with policy = SCHED_OTHER     priority = 0 effective ID = 500         <-- 2
Policy SCHED_FIFO        MAX = 99 MIN = 1
In thread with policy = SCHED_OTHER     priority = 0 effective ID = 500
Policy SCHED_RR          MAX = 99 MIN = 1
In thread with policy = SCHED_OTHER     priority = 0 effective ID = 500
  • (1)Run program as root (eid = 0)—requested changes are implemented.

  • (2)Run program as regular user—requested changes are not implemented.

By now I am sure you have noticed the similarities between the pthread_setschedparam library call and the pthread_attr_ setschedpolicy and pthread_attr_setschedparam library calls. The pthread_setschedparam call combines the functionality of the attribute-based calls and allows the user to modify scheduling policy and priority on the fly for an existing thread.

Using Signals in Threads

In a POSIX multithreaded setting, signals are delivered to the process. If a signal is synchronous (the result of the action of a particular thread), the operating system passes the signal on to the thread that generated the exception. Thus, synchronous signals such as SIGFPE (divide by zero), SIGSEGV (addressing violation), and SIGPIPE (broken pipe) would be passed on to the offending thread (which only makes sense). If a synchronous pthread_kill call (see Table 11.9) is used to send a signal, the specified thread also receives the signal. According to POSIX standards, if a signal is asynchronous (not related to a particular thread's activity), such as SIGHUP (hang-up) or SIGINT (interrupt), generated by, say, the kill system call, the decision as to which thread should handle the signal is based upon the signal mask configuration of the threads within the process. If more than one thread has not blocked the received signal, there is no guarantee as to which thread will actually receive the signal. To maintain mutex lock integrity, signals handled by threads that cause the thread to terminate, stop—continue will cause the process associated with the thread to also terminate, stop—continue. The details of mutex locking are covered in Section 11.8, “Thread Synchronization.” The handling of asynchronous signals by threads in Linux (using the LinuxThreads library implementation) departs somewhat from the POSIX standard. LinuxThreads are LWPs that have their own process IDs. Thus, a signal is always directed to a specific thread.

Table 11.9. The pthread_kill Library Function.

Include File(s)

<pthread.h>
<signal.h>

Manual Section

3

Summary

int pthread_kill(pthread_t thread, int signo);

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_kill library function accepts a thread ID (of a sibling) as its first argument and an integer signal value as its second argument. Similar to the kill system call, the existence of a given thread can be checked by setting the signal argument for pthread_kill to 0 and examining the return value of the call. If pthread_kill is successful, the signal is sent, and the function returns a 0. If the call fails, a nonzero value is returned, and the signal is not sent. The return of ESRCH (3) means the specified thread does not exist, while the return of EINVAL (22) means an invalid signal was specified. The pthread_kill call cannot be used to send a signal to a thread in another process.

All the threads within the process share a common table of information specifying what action should be taken upon receipt of a specific signal. Each thread can alter its action for a particular signal (with certain constraints) by using a signal mask. When a thread is created, it inherits its signal mask and priority from its creating thread. The new thread starts with a clean slate and does not inherit any pending (not acted upon) signals from its creator. The signal mask can be manipulated with the pthread_sigmask library function (Table 11.10).

As we are working with signals, an additional header file <signal.h> is required when using this call. The first argument for pthread_sigmask should be one of the defined constants below. These constants indicate how the signal mask should be changed.

Table 11.10. The pthread_sigmask Library Function.

Include File(s)

<pthread.h>
<signal.h>

Manual Section

3

Summary

int pthread_sigmask(int how,
                    const sigset_t *newmask,
                    sigset_t *oldmask);

Return

Success

Failure

Sets errno

0

Nonzero

 

Constant

Value

Meaning

SIG_BLOCK

1

Block (ignore) the indicated signal(s).

SIG_UNBLOCK

2

Remove the indicated signal(s).

SIG_MASK

3

Replace the current mask with the referenced mask.

The second argument, newmask, is a reference to a sigset_t type object. If we track down the sigset_t data type, which is usually found in the include file <sys/signal.h>, we should find that it is a reference to an array of unsigned long integers. The array is used as a bit mask for signals. If we are modifying the signal mask, the newmask argument should reference the new signal mask. While we can hand-craft the new signal mask, most prefer to use the appropriate POSIX signal mask manipulation functions shown in Table 11.11.

Table 11.11. Signal Mask Manipulation Functions.

Signal Mask Function

Functionality

int sigemptyset(sigset_t *set)

Initialize the signal set to exclude all defined signals.

int sigfillset(sigset_t *set)

Initialize the signal set to include all defined signals.

int sigaddset(sigset_t *set, int signo)

Add the indicated signal to the signal set.

int sigdelset(sigset_t *set, int signo)

Remove the indicated signal from the signal set.

int sigismember(sigset_t *set, int signo)

Returns a nonzero value if the indicated signal is a member of the referenced signal set.

When using the signal mask manipulation functions, be sure to initialize a signal mask before attempting to manipulate it.

The third argument of pthread_sigmask references the current signal mask (a returned value). Thus, if the second argument is set to NULL, making the how argument a don't-care, the function returns the current signal mask via the third argument, *oldmask.

If the pthread_sigmask call is successful, it returns a 0. It returns EFAULT (14) when passed an invalid address for newmask or oldmask. Otherwise, it returns the value EINVAL (22) if the how argument was not SIG_BLOCK, SIG_UNBLOCK, or SIG_MASK.

Program 11.3 demonstrates the use of the pthread_sigmask call and several of the signal mask manipulation calls.

Example 11.3. Using pthread_sigmask.

File : p11.3.cxx
  |     /*
  |         pthread_sigmask example
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstring>
  |     #include <cstdlib>
 10     #include <pthread.h>
  |     #include <unistd.h>
  |     #include <signal.h>
  |     using namespace std;
  |     const int MAX=3;
  +     void  trapper( int );
  |     int   global_i = 0;
  |     int
  |     main(int argc, char *argv[]) {
  |       struct sigaction  new_action;
 20       new_action.sa_handler = &trapper;
  |       new_action.sa_flags = 0;
  |       sigset_t        my_sigs;            // Signal mask
  |       int             sig_in;
  |       setvbuf(stdout, (char *)NULL, _IONBF, 0);
  +       if ( argc > 1 && argc < MAX+2 ) {
  |         sigemptyset(&my_sigs);             // Clear it out, set to all 0's
  |         while( argc-- > 1 )                // Add signal #'s passed in
  |           sigaddset(&my_sigs, atoi(argv[argc]));
  |       } else {
 30         cerr << *argv << " SIG1 ... SIG" <<  MAX << endl;
  |         return 1;
  |       }
  |       for (int i=1; i < NSIG; ++i)        // Attempt to trap all signals
  |        sigaction(i, &new_action, NULL);
  +                                           // BLOCK signals in mask
  |       pthread_sigmask(SIG_BLOCK, &my_sigs, NULL);
  |       cout << "Signal bits turned on" << endl;
  |       for (int i=1; i < NSIG; ++i)
  |         putchar( sigismember(&my_sigs, i) == 0 ? '0' : '1'),
 40       cout << "
Waiting for signals
";   // Wait for a few signals
  |       while (global_i < MAX){
  |         if ( (sigwait(&my_sigs, &sig_in)) != -1 )
  |           cout << "Signal " << sig_in
  |                << " in mask - no signal catcher" << endl;
  +         ++global_i;
  |       }
  |       return 0;
  |     }
  |     void
 50     trapper( int s ){
  |       cout << "Signal " << s << " not in mask - in signal catcher" << endl;
  |       ++global_i;
  |     }

Program 11.3 uses its command-line values (assumed to be numeric values representing valid signal numbers) to build a signal mask. The sigemptyset call clears the array representing the signal mask. Each value passed on the command line is added to the signal mask by a call to sigaddset (line 28). After the signal mask is created, the program uses the sigset call to replace the default action for the receipt of each signal with a call to the user-defined trapper function. The sole purpose of the trapper function is to print a message displaying the numeric value of an incoming signal. The call to pthread_sigmask blocks the receipt of the signals in the constructed signal mask. The specified signals will not be handled by the thread (keep in mind that even though we have not made a call to pthread_create, we are still dealing with a single thread within main). The content of the signal mask is displayed using the sigismember function (line 39) to verify the presence or absence of specific signals in the signal mask.

The program then waits for the receipt of signals. In a loop the sigwait call (Table 11.12) is used to wait for signals.

Table 11.12. The sigwait Library Function.

Include File(s)

<pthread.h>
<signal.h>

Manual Section

3

Summary

int sigwait(const sigset_t *set, int *sig);

Return

Success

Failure

Sets errno

0

Nonzero

 

The POSIX version of sigwait takes two arguments: a reference to the signal set and a reference to a location to store the returned signal value. If successful, sigwait returns a 0 and a reference to the signal as its second argument (remember, this call waits for any signal, not just those in the signal mask). If the call is not successful, it returns a nonzero value, sets its second argument to −1, and returns the value EINVAL (22), indicating an unsupported signal number was found in the signal set, or EFAULT (14) if passed an invalid signal set address.

It is unwise to use sigwait to manage synchronous signals, such as floating-point exceptions that are sent to the process itself. Additionally, the LinuxThreads library implementation of sigwait installs a dummy signal-catching routine for each signal specified in the signal mask. A sample run of Program 11.3 is shown in Figure 11.6.

Example 11.6. Output from Program 11.3.

linux$ p11.3 2 3
Signal bits turned on
011000000000000000000000000000000000000000000000000000000000000
Waiting for signals
Signal 20 not in mask - in signal catcher        <-- 1
Signal 3 in mask - no signal catcher        <-- 2
Signal 2 in mask - no signal catcher        <-- 3
  • (1)User enters ^Z from keyboard.

  • (2)User enters ^ from keyboard.

  • (3)User enters ^C from keyboard.

In the output shown, produced from a run of Program 11.3, the user passed the program the values 2 and 3 on the command line. The program uses these values to create a signal mask. When signal 2 or 3 is received, the initial thread, generated when main executes, does not handle the signals (that is, the trapper function is not called). When a non-blocked signal is received (for example the ^Z), the default action occurs: A call to the user-defined function trapper is made.

In a multithreaded setting most authors advocate using a separate thread to handle signal processing. An example of how this can be done is shown in Program 11.4.

Example 11.4. Using a separate thread to handle signal processing.

File : p11.4.cxx
  |     /*
  |            Handling signals in a separate thread
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <pthread.h>
  |     #include <iostream>
  |     #include <csignal>
  |     #include <cstdlib>
 10     #include <unistd.h>
  |     using namespace std;
  |     void      *sibling(void *);
  |     void      *thread_sig_handler(void *);
  |     sigset_t  global_sig_set;
  +     int       global_parent_id;
  |     int
  |     main( ){
  |       pthread_t  t1,t2,t3;
  |       sigfillset( &global_sig_set );                // set of all signals
 20                                                     // BLOCK all in set
  |       pthread_sigmask(SIG_BLOCK, &global_sig_set, NULL);
  |                                                     // Create 3 threads
  |       pthread_create(&t1, NULL, thread_sig_handler, NULL);
  |       pthread_create(&t2, NULL, sibling, NULL);
  +       pthread_create(&t3, NULL, sibling, NULL);
  |       global_parent_id = getpid( );
  |       while (1){
  |         cout << "main thread 	 PID: " << getpid() << " TID: "
  |              <<  pthread_self() << endl;
 30         sleep(3);
  |       }
  |       return 0;
  |     }
  |     void *
  +     sibling(void *arg){
  |       while(1){
  |         cout << "sibling thread 	 PID: " << getpid() << " TID: "
  |              << pthread_self() << endl;
  |         sleep(3);
 40       }
  |       return NULL;
  |     }
  |     void *
  |     thread_sig_handler(void *arg) {
  +       int  sig;
  |       cout << "signal thread 	 PID: " << getpid() << " TID: "
  |            <<  pthread_self() << endl;
  |       while(1){
  |         sigwait( &global_sig_set, &sig );
 50         if ( sig == SIGINT ){
  |           cout << "I am dead" << endl;
  |           kill( global_parent_id, SIGKILL );
  |         }
  |         cout << endl << "signal " << sig << " caught by signal thread "
  +              << pthread_self() << endl;
  |       }
  |       return NULL;
  |     }

In line 21 the program generates and fills a signal mask (indicating all signals). A call to pthread_sigmask blocks the signals (except, of course, those signals that even when requested cannot be blocked). Any threads that are subsequently generated will inherit this information. Next, a thread to handle signal processing is created. This thread runs the code in the thread_sig_handler function. This function identifies itself and enters an endless loop. In the loop the call sigwait causes this thread to block, waiting for the receipt of one of the signals in the signal mask. When a signal is received, it is examined. If the signal is SIGINT, the thread sends the parent process (running the main thread) a SIGKILL signal, which terminates all processing. Notice that due to their declaration placement in the program code, the signal mask and PID of the initial thread (process) are global. Once the signal-processing thread is established, two additional threads are generated. Both of these threads run the code found in the sibling function. This function contains another endless loop. Every couple of seconds the thread running this code identifies itself and displays its process and thread IDs. When all threads have been generated, main also enters a loop where every few seconds it identifies itself (process and thread IDs). A run of this program (shown in Figure 11.7) is very informative.

Example 11.7. Output from Program 11.4.

linux$ p11.4
signal thread    PID: 15760 TID: 1026
sibling thread   PID: 15761 TID: 2051        <-- 1
main thread      PID: 15758 TID: 1024
sibling thread   PID: 15762 TID: 3076

signal 3 caught by signal thread 1026        <-- 2

signal 20 caught by signal thread 1026
sibling thread   PID: main thread        PID: 15758 TID: 1024         <-- 3
sibling thread   PID: 15761 TID: 2051
15762 TID: 3076
I am dead

signal 2 caught by signal thread 1026        <-- 4
Killed
  • (1)Four threads are generated: main, a signal thread and two sibling threads. Each has its own thread ID and is run as a separate process.

  • (2)User enters ^ and ^Z from the keyboard. These signals are handled by the signal-processing thread.

  • (3)While output is not buffered, it is still interleaved as threads compete.

  • (4)Entering a ^C (from the keyboard) terminates processing. The signal-processing thread is able to slip in one last message.

Thread Synchronization

It is a given that to fully utilize threads, we need to reliably coordinate their activities. A thread's access to critical sections of common code that modify shared data structures must be protected in a manner such that the integrity of the data referenced is not compromised. POSIX thread activities can be synchronized in a variety of ways.

Mutex Variables

One of the easiest methods to ensure coordination is to use a mutual exclusion lock, or mutex. Conceptually, a mutex can be thought of as a binary semaphore with ownership whose state either allows (0, unlocked)[16] or prohibits (nonzero, locked) access. Unlike a binary semaphore, any thread within the scope of a mutex can lock a mutex, but only the thread that locked the mutex should unlock it. Again, while it should not be done, unlocking a mutex that a thread did not lock does not generate an error. However, such action results in undefined behavior—forewarned is forearmed. Threads unable to obtain a lock are suspended. As actions on a mutex are atomic, and only one thread at a time can successfully modify the state of a mutex, it can be used to force the serialization of thread activities. If threads associated with multiple processes are to be coordinated, the mutex must be mapped to a common section of memory shared by the processes involved.

The manner in which a mutex is created and initialized determines how it will be used. A mutex is of data type pthread_mutex_t. Examining the include file <bits/pthreadtypes.h>, we find this data type to be the following structure:

typedef struct {
  int __m_reserved;                  /* Reserved for future use          */
  int __m_count;                     /* Depth of recursive locking       */
  _pthread_descr __m_owner;          /* Owner thread (if recursive or
                                        errcheck)                        */
  int __m_kind;                      /* Mutex kind: fast, recursive or
                                        errcheck                         */
  struct _pthread_fastlock __m_lock; /* Underlying fast lock             */
} pthread_mutex_t;

Keep in mind that pthread_mutex_t is what is known as an opaque data type. That is, its specific structure is implementation-dependent. Thus, a POSIX threads implementation for the pthread_mutex_t data type for Linux might well be different from Sun's implementation. When in doubt, check it out!

A mutex, like a thread, can have its attributes specified via an attribute object, which is then passed to the pthread_mutex_init library function. Not to be outdone, the mutex attribute object also has its own initialization function, pthread_mutexattr_init, a deallocation function, and library functions (such as pthread_mutexattr_settype) for modification of the attribute settings once the object has been created. Changing the settings of the mutex attribute object will not change the settings of those mutexes previously allocated. We will restrict this section of our discussion to the following library calls: pthread_mutexattr_init, pthread_mutexattr_ settype, and pthread_mutex_init (tables 11.13, 11.14, and 11.15).

Table 11.13. The pthread_mutexattr_init Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_mutexattr_init( pthread_mutexattr_t *attr );

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutexattr_init call, which initializes the mutex attribute object, is passed a reference to a pthread_mutexattr_t structure. The definition of this structure, shown below, is found in the /usr/include/bits/pthreadtypes.h file.

typedef struct {
 int __mutexkind;
} pthread_mutexattr_t;

At present, the LinuxThreads implementation of POSIX threads provides support only for the attribute specifying the mutex kind. In LinuxThreads a mutex can be one of the following three kinds, with the default being the type fast. The kind (type) of a mutex determines the system's behavior when a thread attempts to lock the mutex (Table 11.14).

Mutex Kind

Constant

Behavior

fast

PTHREAD_MUTEX_FAST

If the mutex is already locked by another thread, the calling thread blocks. If the thread that owns (locked) the mutex attempts to lock it a second time, the thread will deadlock! The thread that unlocks the mutex is assumed to be the owner of the mutex. Unlocking a nonlocked mutex will result in undefined behavior.

recursive

PTHREAD_MUTEX_RECURSIVE

The system will the record number of lock requests for the mutex. The mutex is unlocked only when an equal number of unlock operations have been performed.

error-checking

PTHREAD_MUTEX_ERRORCHECK

If a thread attempts to lock a locked mutex, an error (EDEADLK) is returned.

If the pthread_mutexattr_init call is successful, it returns a 0 and a reference to a default pthread_mutexattr_t object; otherwise, it returns the value ENOMEM (12) to indicate there was insufficient memory to perform the initialization. One final note: A fast mutex (the default) is POSIX-based and thus portable. The mutex kinds recursive and error-checking are nonstandard and thus nonportable.

The pthread_mutexattr_settype library function is used to modify a mutex attribute object.

Table 11.15. The pthread_mutexattr_settype Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_mutexattr_settype(pthread_mutexattr_t *attr,
                          int               kind );

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutexattr_settype library call is passed a valid reference to a pthread_mutexattr_t object (presumably previously initialized by a successful call to pthread_mutexattr_init) and an integer argument (defined constant) indicating the mutex kind. The mutex kind is specified by one of the previously discussed PTHREAD_MUTEX_xxx constants. For example, the code sequence

pthread_mutexattr_t      my_attributes;
pthread_mutexattr_init( &my_attributes );
. . .
pthread_mutexattr_settype( &my_attributes, PTHREAD_MUTEX_RECURSIVE);

would allocate a mutex attribute object, set the default attributes, and then at a later point change the mutex kind to recursive.[17] If the pthread_muexattr_settype call is successful, it returns a 0; otherwise, it returns the value EINVAL (22), indicating it has found an invalid argument.

At this point, we must keep several things in mind. First, initializing a mutex attribute object does not create the actual mutex. Second, if the mutex attribute object is to be shared by threads in separate address spaces, the user is responsible for setting up the mapping of the mutex attribute object to a common shared memory location. Third, if a mutex is shared across processes, it must be allocated dynamically, and therefore a call to pthread_mutex_init and/or pthread_init would be needed. The mechanics of how to set up a shared mutex for threads sharing the same process space and those in separate process spaces can be found in Program 11.5.

Next, let's look at initializing a mutex using the pthread_mutex_init library call. Table 11.16 provides the details for pthread_mutex_init.

Table 11.16. The pthread_mutex_init Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_mutex_init(pthread_mutex_t  *mutex,
                   const pthread_mutexattr_t
                   *mutexattr);

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutex_init library function initializes a mutex. Its first argument, *mutex, is a reference to the mutex, and the second argument, *mutexattr, is a reference to a previously initialized mutex attribute object. If the second argument is NULL, the mutex will be initialized with the default attributes. Thus, with pthread_mutex_init, we can generate a mutex with the default characteristics. For example, with the statements

. . .
pthread_mutex_t      my_lock;
. . .
pthread_mutex_init( &my_lock, NULL );

pthread_mutex_init returns a 0 and a reference to the mutex if successful. If the pthread_mutex_init call fails, it returns the value EINVAL (22) to indicate an invalid value for either the mutex or mutexattr argument.

While this approach (if we use an attribute object) is somewhat circuitous, it does allow greater freedom over the allocation of the mutex object. An alternate approach is to use a predefined constant PTHREAD_MUTEX_ INIIALIZER[18] to initialize the mutex. The code sequence for this is:

pthread_mutex_t    my_lock = PTHREAD_MUTEX_INITIALIZER;

Additionally, LinuxThreads supports similar initializers for its two nonstandard mutex types: PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP and PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP.

Once the mutex has been created and initialized, there are four library functions that operate on the mutex, listed in Table 11.17.

Table 11.17. The mutex Manipulation Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_mutex_lock(    pthread_mutex_t *mutex );
int pthread_mutex_trylock( pthread_mutex_t *mutex );
int pthread_mutex_unlock(  pthread_mutex_t *mutex );
int pthread_mutex_destroy( pthread_mutex_t *mutex );

Return

Success

Failure

Sets errno

0

Nonzero

 

Each function takes a single argument, *mutex, a reference to the mutex. The library functions pthread_mutex_lock and pthread_mutex_unlock are used to lock and unlock a mutex. A pthread_mutex_lock, issued on a previously locked mutex, causes the issuing thread to block until the lock is free. If the mutex is unlocked, pthread_mutex_lock locks the mutex and changes the ownership of the mutex to the thread issuing the lock. The manual pages on this function note that should the owner of a mutex issue a second lock on mutex that it has previously locked, deadlock will result. The pthread_mutex_unlock library function is used to unlock a mutex. The thread issuing the unlock call should be the same as the thread that locked the mutex; otherwise, the resulting behavior is unspecified. Again, it is very easy to break the rules, as with the default fast type mutex, the concept of ownership is not enforced. The call pthread_mutex_ trylock is similar to pthread_mutex_lock (i.e., it will lock an unlocked mutex); however, it will not cause the thread to block if the indicated mutex is already locked. The library function pthread_mutex_destroy causes the referenced mutex to become uninitialized. However, the user must explicitly free the memory referenced by the mutex for it to be reclaimed by the operating system. In general, if two or more resources are to be acquired by multiple threads and each resource is protected by a mutex, a locking hierarchy should be established to reduce the chance of deadlock. That is, each thread should attempt to gain access to each resource in the same order, and when done, release the associated mutexes in the reverse order of their acquisition.

When these functions are successful, they return a 0; otherwise, they return a nonzero value. Both pthread_mutex_trylock and pthread_mutex_destroy return the value EBUSY (16) if the mutex is already locked. All functions return the value EINVAL (22) if the mutex argument is invalid.

We use two programs to demonstrate the use of a mutex. The programs are adaptations of an earlier producer—consumer example described in some detail in the chapter on semaphores (Chapter 7). As both programs use the same section of produce and consume logic, this code along with a small driver routine called do_work has been placed in a separate file. For similar reasons, code for includes and declarations have been placed in a common header file called local_mutex.h. Program 11.5 uses a mutex to coordinate the activities of multiple nondetached threads. Each thread executes a series of calls to the produce and consume routines. These routines access a shared file that acts as a common buffer. Access to the buffer is controlled by the mutex. The include file for Program 11.5 is shown below.

Example 11.5. Header file for mutex example.

File : local_mutex.h
  |     /*
  |          Common local header file: local_mutex.h
  |     */
  |     #ifndef LOCAL_MUTEX_H
  +     #define LOCAL_MUTEX_H
  |     #define _GNU_SOURCE
  |     #define _REENTRANT
  |     #include <iostream>
  |     #include <cstdio>
 10     #include <pthread.h>
  |     #include <fstream>
  |     #include <stdlib.h>
  |     #include <unistd.h>
  |     #include <sys/types.h>
  +     #include <sys/wait.h>
  |     #include <sys/time.h>
  |                                                // When we share a mutex
  |     #include <sys/ipc.h>                       // we will need these.
  |     #include <sys/shm.h>
 20     static const char *BUFFER="./buffer";
  |     static const int MAX=99;
  |     void do_work( void );
  |     using namespace std;
  |     #endif

Most of what we see in the header file is self-explanatory or has been covered in previous sections. However, there are some items of note. The first mutex program example contains multiple threads within one process space; the second example uses single threads, each associated with its own heavyweight process. When interprocess coordination is required, the mutex and two additional common variables must be mapped to a shared memory location. To accomplish this, we need to use the IPC shared memory functions (discussed in Chapter 8), and thus must include their corresponding header files. The defined constant BUFFER is the name of a local file that will be used as a common shared location to store generated data.

The listing containing the common logic for the production and consumption of data is shown in Program 11.5.PC.

Example 11.5.PC. Common producer—consumer code for mutex examples.

File : p11.5.PC.cxx
  |     /*
  |           Common producer & consumer code
  |     */
  |     #include "local_mutex.h"
  +     struct timespec some_time;
  |     fstream         fptr;                  // common buffer location
  |     extern pthread_mutex_t *m_LOCK;        // shared mutex
  |     extern int             *s_shm,         // setup flag
  |                            *c_shm;         // counter
 10     /*
  |         Generate a random # within specified range
  |     */
  |     int
  |     my_rand(int start, int range){
  +       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
  |     }
 20     /*
  |           Produce a random # and write to a common repository
  |     */
  |     void
  |     produce( ) {
  +       int   err, *n;
  |       cout << pthread_self( ) << "	 P: attempting to produce 	"
  |            << getpid( ) << endl;
  |       cout.flush( );
  |       if (pthread_mutex_trylock(m_LOCK) != 0) {      // LOCK
 30         cout << pthread_self( ) << "	 P: lock busy             	"
  |              << getpid( ) << endl;
  |         cout.flush( );
  |         return;
  |       }
  +       n  = new int;                                  // allocate
  |       *n = my_rand(1,MAX);
  |       fptr.open(BUFFER, ios::out | ios::app);        // Open for append
  |       fptr.write( (char *) n, sizeof(*n) );
  |       fptr.close( );
 40       delete n;                                      // release
  |       cout << pthread_self() << "	 P: The # [" << *n
  |            << "] deposited    	" << getpid( )  << endl;
  |         cout.flush( );
  |       some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  +       nanosleep(&some_time, NULL);                   // sleep a bit
  |       if ((err=pthread_mutex_unlock(m_LOCK)) != 0){  // UNLOCK
  |         cerr << "P: unlock failure " << err << endl;
  |         cout.flush( );
  |         exit(102);
 50       }
  |     }
  |     /*
  |           Consume the next random number from the common repository
  |     */
  +     void
  |     consume( ) {
  |       int             err, *n;
  |       cout << pthread_self( ) << "	 C: attempting to consume 	"
  |            << getpid( ) << endl;
 60       cout.flush( );
  |       if (pthread_mutex_trylock(m_LOCK) != 0) {      // LOCK
  |         cout << pthread_self( ) << "	 C: lock busy             	"
  |              << getpid( ) << endl;
  |         cout.flush( );
  +         return;
  |       }
  |       fptr.open(BUFFER, ios::in);                   // Try to read
  |       if ( fptr )  {                                 // If present
  |         fptr.close( );
 70         fptr.open (BUFFER, ios::in|uis::out);        // Reopen for R/W
  |       }
  |       fptr.seekp( *c_shm * sizeof(int), ios::beg );
  |       n = new int;                                   // allocate
  |       *n = 0;
  +       fptr.read( (char *)n, sizeof(*n));
  |       if ((*n) > 0) {                               // For positive values
  |         cout << pthread_self() << "	 C: The # [" << *n
  |              << "] obtained    	" << getpid( )   << endl;
  |         cout.flush( );
 80         fptr.seekp( *c_shm * sizeof(int), ios::beg );
  |         *n = -(*n);
  |         fptr.write( (char *) n, sizeof(*n) );
  |         fptr.close( );
  |         ++*c_shm;                                   // increment counter
  +       } else {
  |         cout << pthread_self( ) << "	 C: No new # to consume     	"
  |              << getpid( ) << endl;
  |         cout.flush( );
  |       }
 90       delete n;                                       // release
  |       fptr.close( );
  |       some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  |       nanosleep(&some_time, NULL);
  |       if ((err=pthread_mutex_unlock(m_LOCK)) != 0){  // UNLOCK
  +         cerr<< "C: unlock failure " << err << endl;
  |         exit(104);
  |       }
  |     }
  |     /*
100       Simulate some work, 10 iterations about half produce, half consume
  |     */
  |     void
  |     do_work( ) {
  |       if (!(*s_shm)) {                               // Clear @ start
  +         pthread_mutex_lock(m_LOCK);                  // LOCK
  |         if (!(*s_shm)++) {
  |           cout << pthread_self( ) << "  	  : clearing the buffer  	"
  |                << getpid() << endl;
  |           fptr.open( BUFFER, ios::out | ios::trunc );
110           fptr.close( );
  |         }
  |         pthread_mutex_unlock(m_LOCK);                // UNLOCK
  |       }
  |       for (int i = 0; i < 10; ++i) {
  +         some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  |         nanosleep(&some_time, NULL);                 // sleep a bit
  |         switch ( my_rand(1,2) ) {
  |         case 1:
  |           produce();
120           break;
  |         case 2:
  |           consume();
  |         }
  |       }
  +     }

Overall, the produce and consume code listed in Program 11.5.PC is very much the same as what we saw in the Chapter 7 example. Nonetheless, there have been some important changes and a few additions. At the top of the listing a timespec structure, some_time, is declared. The program uses the nanosleep real-time library function instead of sleep to suspend the current thread from execution.

Next is the declaration of a file stream pointer, fptr, which is used to reference the file where generated values will be stored and retrieved. Following this is a reference to a mutex (m_LOCK) and two references to integers (s_shm and c_shm). The first, s_shm, is used as a flag to indicate whether or not the file, used as a storage place for generated data, has been cleared (reset). The second, c_shm, is used as a counter—index offset for the current item to be retrieved from the file. As the variables m_LOCK, s_shm, and c_shm were initially declared in the source file with the code for the function main, they are qualified as extern (external) here.

As shown, the user-defined produce function attempts, using the pthread_mutex_trylock call, to lock the mutex referenced by m_LOCK. If it is not successful, a message is displayed and the function is exited. If the mutex can be obtained, a random value is generated. The random value is stored at a temporarily allocated memory location referenced by n. Once the value is stored in the file, the file buffer is flushed and the file closed. Next, the temporary location is freed, and a call to nanosleep is made to simulate some additional processing. As a last step, the mutex is released.

Conversely, the user-defined consume function tries to lock the mutex. If it is successful, it tries to consume (read) a number from the file. To accomplish this, the file is opened for reading and writing (ios::in | ios::out). The offset into the file is calculated using the current index referenced by c_shm multiplied by the sizeof of the data value written to the file. The seekp method is used to move the file pointer to the proper offset in the file where the value is to be retrieved. As in the produce function, a temporarily allocated location is used to store the retrieved value. The newly allocated location is initially cleared (set to 0) prior to the call to read. If the value obtained is positive, the value is displayed. The displayed value is written back to its same location in the file as a negative number. The seekp method is used to move the file pointer back to its proper location so the update process will overwrite the correct data item. Rewriting the value as a negative is used as a technique for keeping track of consumed values. Once the value is rewritten, the file buffer is flushed and the current index value is incremented. Whether or not a valid value is retrieved, the file is closed. A short nanosleep (again to simulate additional processing) is made, and the temporary storage location freed. Finally, the mutex is unlocked and made available to other threads.

The user-defined function do_work uses the s_shm reference to determine if the file that stores the output has been cleared. Upon first entry, *s_shm references a 0. In this case the if logic is entered. The mutex referenced by m_LOCK is used to bracket access to the code that opens the file for writing (ios::out), truncating (ios::trunc) its content. The value referenced by s_shm is incremented in this process, allowing the initialization to occur only once. Following this, the user-defined do_work function sleeps for a few seconds to permit random startup times, and then loops 10 times to simulate a series of production and consumption events. If the random number generator is decent, there should be a somewhat even split of the calls to produce and consume within the loop. The loop also contains a call to nanosleep.

Program 11.5.INTRA contains the code for the function main for an intraprocess mutex example. All threads in this example share a common address space and are associated with an underlying LWP.

Example 11.5.INTRA. Code for function main for intraprocess mutex example.

File : p11.5.INTRA.cxx
  |     /*
  |        INTRA process main (multiple threads - one process space)
  |        Compile: g++  p11.5.PC.cxx  p11.5.INTRA.cxx  -lpthread -o INTRA
  |     */
  +     #include "local_mutex.h"
  |     pthread_mutex_t LOCK,   *m_LOCK = &LOCK;
  |     int             setup,  *s_shm  = &setup,
  |                     current,*c_shm  = &current;
  |     int
 10     main(int argc, char *argv[]) {
  |       int  i, n;
  |       pthread_t worker[MAX];                 // worker TID's
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  +         return 1;
  |       }
  |       pthread_mutex_init(&LOCK,  NULL);
  |       *s_shm = 0;                            // Start as NOT setup
  |       *c_shm = 0;                            // current index (offset)
 20       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
                                                 // # of threads to create
  |       for( i=0; i < n; ++i)                  // create each thread
  |         pthread_create( &worker[i], NULL,
                           (void *(*)(void *))do_work, (void *)NULL );
  |                                              // wait for all to finish
  |       for(i=0; i < n; ++i )
  +         pthread_join(worker[i], (void **) NULL);
  |                                              // show contents of buffer
  |       cout << "Contents of " << BUFFER
  |            << " negative values were 'consumed'." << endl;
  |       fstream  fptr;
 30       bool     done = false;
  |       fptr.open( BUFFER, ios::in );
  |       while ( !done ) {
  |         fptr.read( (char *)&n, sizeof(n) );
  |         if ( fptr.fail() )
  +           done = true;
  |         else
  |           cout << n << endl;
  |       }
  |       fptr.close( );
 40       return 0;
  |     }

In Program 11.5.INTRA several static declarations are made (by their placement prior to the function main). These are a mutex called LOCK and a reference to the mutex called *m_LOCK. The variables setup (is the file setup—cleared) and current (the index into the file) are also statically allocated. In main the mutex is initialized (line 17). Remember that initialization should be done only once—re-initializing an already initialized mutex results in undefined behavior. As the value NULL is passed as the second argument to pthread_mutex_init, the mutex has the default characteristics. The next two statements assign both the setup and current variables the value 0. Next, the program checks the value passed in on the command line. This value represents the number of threads to be produced. As written, the value should be less than MAX (set arbitrarily at 99 in the include file). A for loop is used to create the specified number of threads. The thread IDs are saved, and each thread is passed a reference to the do_work function. As the do_work function reference is not the required data type for this parameter, it must be cast to keep the compiler from complaining. Once all the threads are created, the program waits for all the threads to terminate. When all threads are done, the contents of the file (the common buffer used by the threads) is displayed.

A compilation and partial run of the program is shown in Figure 11.8.

Example 11.8. A compile and partial run of Program 11.5 with intraprocess mutexes.

linux$ g++  p11.5.PC.cxx  p11.5.INTRA.cxx  -lpthread -o INTRA
linux$ INTRA 3
1026      : clearing the buffer         18353
1026     P: attempting to produce       18353
1026     P: The # [94] deposited        18353        <-- 1
2051     C: attempting to consume       18354
2051     C: lock busy                   18354
3076     C: attempting to consume       18355
3076     C: lock busy                   18355
. . .
1026     C: attempting to consume       18353
3076     P: attempting to produce       18355
3076     P: lock busy                   18355
1026     C: The # [48] obtained         18353
2051     C: attempting to consume       18354
2051     C: No new # to consume         18354
. . .
1026     P: The # [51] deposited        18353
1026     P: attempting to produce       18353
1026     P: The # [30] deposited        18353
Contents of ./buffer negative values were 'consumed'.
-94
-48
-50
-58
-98
-49
51
30
  • (1)Notice the different thread IDs. Each thread is mapped to a separate LWP (that has its own PID).

While the output of this program tends to be somewhat voluminous, it is very informative. With all the cout statements, it is easy to see what individual threads are doing. In this case there are three threads, with the thread IDs of1026, 2051, and 3076. The mutex forces the competing threads to access the shared data structure in a manner that prevents data loss. If we rerun the program and pass the output to grep and keep only the lines containing the # symbol, we should find the output to be well behaved and furthermore note that the values placed in the file by one thread can be consumed by a different thread. A sample of this type of output is shown in Figure 11.9.

Example 11.9. Filtering the output of Program 11.3.

linux$ INTRA 3 | grep '#'
1026     C: No new # to consume         18321
3076     P: The # [51] deposited        18323
2051     P: The # [77] deposited        18322
3076     C: The # [51] obtained         18323
2051     P: The # [61] deposited        18322        <-- 1
3076     P: The # [86] deposited        18323
2051     C: The # [77] obtained         18322
1026     C: The # [61] obtained         18321        <-- 1
3076     C: The # [86] obtained         18323          |
1026     C: No new # to consume         18321          |
2051     P: The # [33] deposited        18322          |
3076     P: The # [96] deposited        18323          |
2051     C: The # [33] obtained         18322          |
3076     P: The # [91] deposited        18323          |
  • (1)Deposited by thread 2051 and consumed by thread 1026

The section of code containing main can be rewritten to support multiple heavyweight processes using a mutex mapped to a shared memory location. In this implementation the setup and current variables, which must be accessible across processes, are also mapped to a shared memory location. Program 11.5.INTER displays the code to accomplish this.

Example 11.5.INTER. Code for function main for interprocess mutex example.

File : p11.5.INTER.cxx
  |     /*
  |        INTER process main (multiple processes - 1 thread each)
  |        Compile: g++  p11.5.PC.cxx  p11.5.INTER.cxx  -lpthread -o INTER
  |     */
  +     #include "local_mutex.h"
  |     pthread_mutex_t  *m_LOCK;                 // Shared memory pointer
  |     int              m_shmid, i_shmid,        // Shared memory IDs
  |                      *s_shm, *c_shm;          // Shared setup and counter
  |     int
 10     main(int argc, char *argv[]) {
  |       pthread_mutexattr_t  the_attr_obj;             // attribute object
  |       int  i, n;
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  +         return 1;
  |       }
  |       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
  |       if((m_shmid=shmget(IPC_PRIVATE,sizeof(pthread_mutex_t),IPC_CREAT| 0666))<0){
  |         perror("shmget fail mutex");
 20         return 2;
  |       }
  |       if ((m_LOCK=(pthread_mutex_t *)shmat(m_shmid,0,0)) == (pthread_ mutex_t *) -1){
  |         perror("shmat fail mutex");
  |         return 3;
  +       }
  |       if ((i_shmid=shmget(IPC_PRIVATE,sizeof(int)*2,IPC_CREAT|0666))<0){
  |         perror("shmget fail ints");
  |         return 4;
  |       }
 30       if ((s_shm=(int *) shmat(i_shmid, 0, 0)) == (int *) -1){
  |         perror("shmat ints");
  |         return 5;
  |       }
  |       c_shm  = s_shm + sizeof(int);            // reference  correct loc
  +       *s_shm = *c_shm = 0;                     // start counter (offset)
  |       pthread_mutexattr_init( &the_attr_obj);  // initialize attrib obj
  |       for( i=0; i < n; ++i)
  |         if ( fork() == 0 ){                    // generate child process
  |           do_work( );                          // child process does work
 40           exit( 2 );
  |         }
  |       while( (n = (int) wait( NULL)) && n != -1 )  // wait for child processes
  |                     ;
  |       shmdt((char *) m_LOCK);                  // cleanup shared memory
  +       shmdt((char *) s_shm);
  |       shmctl(m_shmid, IPC_RMID, (struct shmid_ds *) 0);
  |       shmctl(i_shmid, IPC_RMID, (struct shmid_ds *) 0);
  |       cout << "Contents of " << BUFFER         // show contents of buffer
  |            << " negative values were 'consumed'." << endl;
 50       fstream  fptr;
  |       bool     done = false;
  |       fptr.open( BUFFER, ios::in );
  |       while ( !done ) {
  |       fptr.read( (char *)&n, sizeof(n) );
  +         if ( fptr.fail() )
  |           done = true;
  |         else
  |           cout << n << endl;
  |       }
 60       fptr.close( );
  |       return 0;
  |     }

While some of the code is similar to the preceding intraprocess example, additional steps are taken to create and manipulate the shared mutex as well as the shared data values (setup and current). First, a shared memory segment large enough to hold a mutex is allocated (line 18), and a reference to the segment is to m_LOCK. Second, a shared memory segment large enough to hold two integers is allocated. The first part of the second segment, which will hold the value for setup, is referenced by s_shm. The second half of the segment, used to hold the value of current, is referenced by c_shm. The sizeof operator is used to find the proper offset for the second integer reference (see line 34). The shared memory locations for setup and current are set to 0. The for loop that generated the threads is replaced with a loop to generate child processes (using fork). The child process, which by default has a single thread of control, executes the do_work function. The initial process waits for all the child processes to terminate. Once the child processes are finished, the shared memory is removed and the contents of thecommon buffer (the file) are displayed. This program segment can be compiled with the statement

linux$ g++ p11.5.PC.cxx p11.5.INTER.cxx -lpthread -o INTER

The output of this interprocess mutex example should be similar to the intraprocess example with the thread IDs remaining constant at 1024. A sample output sequence is shown in Figure 11.10.

Example 11.10. A run of Program 11.3 with inter-process mutexes.

linux$ INTER 3 | grep #
1024     P: The # [76] deposited        18755
1024     P: The # [25] deposited        18754
1024     C: The # [76] obtained         18755
1024     P: The # [68] deposited        18754
1024     C: The # [25] obtained         18755        <-- 1
1024     C: The # [68] obtained         18754
1024     C: No new # to consume         18753
1024     C: No new # to consume         18755
1024     C: No new # to consume         18754
1024     C: No new # to consume         18755
1024     P: The # [17] deposited        18754
1024     C: The # [17] obtained         18755
  • (1)Notice all the thread IDs are the same. Eachthread is associated with a heavyweight (standard) process—not with a LWP

Condition Variables

Sometimes we need to coordinate the activities of threads using the current value of mutex-protected data. Say, for example, we want to notify a reader thread once a writer thread has filled its data set. The counter for the number of items in the data set and the access to the data is protected by a mutex. The POSIX implementation of threads provides a construct called a condition variable that can be used for this purpose. A condition variable is associated with a specific mutex and predicate (condition). Similar to a mutex, a condition variable should be global and can be mapped to shared memory if it is to be used by threads in more than one process space. A thread uses the condition variable to either notify other cooperating threads (with access to the same condition variable) that the condition (predicate) has been met or to block and wait for the receipt of notification. When a thread blocks on a condition variable, it atomically releases the associated mutex, allowing other threads access to the mutex. Several threads can be blocked, waiting for the same notification. The thread that generates the notification does so by signaling the associated condition variable.

The majority of what was discussed concerning the creation and initialization techniques of a mutex also applies to the creation and initialization ofacondition variable. The corresponding library functions have the occurrences of the string _mutex_ replaced with _cond_.

As with a mutex, a condition variable attribute object can be created, set, and then referenced when creating a condition variable. However, at present the LinuxThreads implementation of POSIX threads does not support condition variable attributes, and the reference to the cond_attr object (the second parameter to the pthread_cond_init function) is ignored. Like a mutex, a condition variable can be created and initialized in a single statement:

pthread_cond_t my_condition = PTHREAD_COND_INITIALIZER;

When a thread wants to notify others, it uses the library function pthread_cond_signal (signal one thread) or pthread_cond_broadcast (signal all threads). The specifics for the condition variable notification functions can be found in Table 11.18.

Table 11.18. The Condition Variable Notification Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_cond_signal(   pthread_cond_t *cond );
int pthread_cond_broadcast(pthread_cond_t *cond );

Return

Success

Failure

Sets errno

0

Nonzero

 

The argument *cond is a reference to a condition variable of the type pthread_cond_t. When pthread_cond_signal is used, one thread blocked on the same condition variable will be unblocked. If several threads are blocked, the thread receiving notification is not specified. If pthread_cond_broadcast is called, all threads blocked on the condition variable are notified. Once awakened, a thread must still acquire the associated mutex. Either call essentially becomes a no-op if there are no threads waiting for notification. The value EINVAL (22) is returned if *cond references an illegal address.

The library functions pthread_cond_wait and pthread_cond_ timedwait cause the calling thread to wait and block on a condition variable. Under the covers these functions atomically unlock the associated mutex (which must be locked prior to the call), suspend the thread's execution, and relock the mutex (by issuing a pthread_lock_mutex). The waiting thread does not consume CPU time.

Table 11.19. The Condition Wait Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_cond_wait(pthread_cond_t  *cond,
                      pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,
                     pthread_mutex_t      *mutex,
                     const struct timespec *abstime);

Return

Success

Failure

Sets errno

0

Nonzero

 

The first argument, *cond, is a reference to a condition variable. The second argument, *mutex, is a reference to the associated mutex. The pthread_cond_timedwait function is similar to pthread_cond_ wait except it will time out and return an error, ETIME (62), if the value referenced by *abstime is met or exceeded. The *abstime argument for pthread_cond_timedwait references a timespec structure that can be tracked down to the following structure:

typedef struct  timespec {
             time_t         tv_sec;         /* seconds         */
             long           tv_nsec;        /* and nanoseconds */
    } timespec_t;

If a signal or fork interrupts either of these calls, the value EINTR (4) is returned. If any of the arguments are invalid, the value EFAULT (14) is returned.

Program 11.6, a variation of a bounded buffer producer-consumer problem, demonstrates the use of a condition variable. In this program, a reader thread continually reads data from standard input and fills a small data buffer. When the buffer is full, the reader thread notifies a writer thread to empty (display) the buffer so that it may be filled again. When an end-of-file is encountered, notification is sent to the writer thread to write out any partial data left in the buffer. A finished flag is set, notifying both the reader and writer that processing is complete.

Example 11.6. Using a condition variable.

File : p11.6.cxx
  |     /*
  |         Using a condition variable
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <cctype>
  |     #include <pthread.h>
  |     using namespace std;
 10     const int MAX=5;
  |                                                   // global
  |     pthread_mutex_t lock_it  = PTHREAD_MUTEX_INITIALIZER;
  |     pthread_cond_t  write_it = PTHREAD_COND_INITIALIZER;
  |     typedef struct {                              // a small data buffer
  +       char            buffer[MAX];                // the buffer
  |       int             how_many;                   // # of chars in buffer
  |     } BUFFER;
  |     BUFFER         share = {"", 0};               // start as empty
  |     void           *read_some (void *),
 20                    *write_some(void *);
  |     bool           finished = false;using namespace std;
  |     int
  |     main( ) {
  |     pthread_t       t_read,
  +                     t_write;                     // TID's
  |                                                  // create the threads
  |       pthread_create(&t_read,  NULL, read_some, (void *) NULL);
  |       pthread_create(&t_write, NULL, write_some,(void *) NULL);
  |                                                   // wait for the writer
 30       pthread_join(t_write, (void **) NULL);
  |       pthread_mutex_destroy( &lock_it  );         // clean up
  |       pthread_cond_destroy(  &write_it );
  |      return 0;
  |    }
  +    //        Code to fill the buffer
  |    void *
  |    read_some(void * junk) {
  |      char  ch;
  |      cout << "R " << pthread_self( ) << "	: Starting" << endl;
 40      while (!finished) {
  |        pthread_mutex_lock(&lock_it);
  |        if (share.how_many != MAX) {                 // buffer not full
  |          cin.get(ch);
  |          if ( cin.fail( ) ) {                       // end-of-file
  +            share.buffer[share.how_many] = (char)NULL;
  |            share.how_many = MAX;
  |            finished       = true;                // we are all done
  |            cout << "R " << pthread_self( ) << "	: Signaling done" << endl;
  |            pthread_cond_signal(&write_it);       // signal condition var
 50            pthread_mutex_unlock(&lock_it);
  |            break;
  |          } else {                                // sanitize input chars
  |            share.buffer[share.how_many] =  isalnum(ch) ? ch : '#';
  |            cout << "R " << pthread_self( ) << "	: Got char ["
  +                         << share.buffer[share.how_many++] << "]" << endl;
  |            if ( share.how_many == MAX ) {           // if full
  |              cout << "R " << pthread_self( ) << "	: Signaling full" << endl;
  |              pthread_cond_signal(&write_it);
  |            }
 60          }
  |        }
  |        pthread_mutex_unlock(&lock_it);
  |      }
  |      cout << "R " << pthread_self( ) << "	: Exiting" << endl;
  +      return NULL;
  |    }
  |    //    Code to write (display) buffer
  |    void *
  |    write_some(void *junk) {
 70      int i;
  |      cout << "W " << pthread_self( ) << "	: Starting" << endl;
  |      while (!finished ) {
  |        pthread_mutex_lock(&lock_it);
  |        cout << "W " << pthread_self( ) << "	: Waiting" << endl;
  +        while (share.how_many != MAX)                // while not full
  |          pthread_cond_wait(&write_it, &lock_it);    // wait for notify
  |        cout << "W " << pthread_self( ) << "	: Writing buffer" << endl;
  |        for( i=0; share.buffer[i] && share.how_many; ++i, share.how_many--)
  |          cout.put(share.buffer[i]);
 80        cout.put('
'),
  |        pthread_mutex_unlock(&lock_it);
  |      }
  |       cout << "W " << pthread_self( ) << "	: Exiting" << endl;
  |       return NULL;
  +     }

In this program a mutex, lock_it, and a condition variable, write_it, are allocated and initialized to their default settings prior to main. Their location, prior to main and to the functions that will reference them, guarantees they will be global in scope and accessible by all threads associated with this process space. A small buffer consisting of five locations and an indexing counter is defined, and a buffer of this type, called share, is allocated and initialized. A Boolean flag called finished is set to false before processing begins. In main, two threads are created: one to be the reader (consuming data from an input source) that executes the read_some function and another to be the writer (producing output) that executes the write_some function. After the threads are created, the program waits for the thread executing the write_some function to terminate. When this occurs, the mutex and condition variables are removed and the program terminates.

The read_some function loops while the finished flag is false. The mutex lock_it is used to serialize access to the code that manipulates theshared buffer. Once the mutex is obtained, the count of the number of characters in the buffer (the predicate) is checked. If the buffer is full, the mutex is released (the assumption being the buffer will be processed by awriter, which will need to gain access via the mutex). If the buffer is not filled, an additional character is obtained from standard input. The new character ischecked; if it is not an end-of-file value, the character is added to the buffer and the character count is incremented. If the character fills the buffer, a call to pthread_cond_signal is made to notify the condition variable write_it. If the input character was an end-of-file value, a NULL value isinserted in the buffer in place of the end-of-file value. Next, the character counter is set to its maximum value to satisfy the predicate check in the writer,the finished flag is set to true, and pthread_cond_signal is used to notify the writer, so the remaining contents of the buffer can be processed.

The thread executing the writer code also loops, while the finished flag is set to false. Like the reader, it uses the lock_it mutex to gain access to the shared code and data. The inner while statement checks the count of the number of characters stored in the buffer. As long as the count is less than the maximum, the thread executing this code continues to block due to the call to pthread_cond_wait. When notified by the reader (when the character count is at the maximum), the while loop is exited, and the writer displays the contents of the common buffer. As the contents of the buffer are displayed, the character counter is decremented accordingly.

A compilation and run of Program 11.6 on a local system is shown in Figure 11.11.

Example 11.11. A compile and run of Program 11.6.

linux$ $ g++ p11.6.cxx -lpthread -o p11.6
linux$ p11.6        <-- 1
R 1026  : Starting
W 2051  : Starting
twinkle toes        <-- 2
R 1026  : Got char [t]
W 2051  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full        <-- 3
W 2051  : Writing buffer
twink
R 1026  : Got char [l]
W 2051  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 2051  : Writing buffer
le#to
R 1026  : Got char [e]
W 2051  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D        <-- 4
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer
es#
W 2051  : Exiting
  • (1)Program is run—a reader (TID 1026) and writer (TID 2051) thread are generated.

  • (2)User enters the phrase "twinkle toes," terminated by a carriage return.

  • (3)Reader thread signals it is full.

  • (4)User types CTRL+D to signify end-of-file from the keyboard. The remaining stored input is displayed.

When the program is run, its input is obtained from the keyboard. The user enters the string twinkle toes, and the program responds by displaying each character as it is obtained. Display by individual threads is labeled as either R for reader or W for writer, and the thread's ID is given. After the fifth character is processed, the reader thread signals the condition variable. As there is only one writer thread (in this case thread ID 2051), it “wakes up” and processes (displays) the contents of the buffer. Nonalphanumeric characters are displayed as #. When a CTRL+D is entered to indicate end-of-file, the remaining contents of the buffer are displayed and the program terminates.

A little experimentation with this program produces some interesting output. For example, if we duplicate the writer pthread_create statement in main (line 28) so we have two writers, each with its own thread of control, the program on our system produces the output shown in Figure 11.12.

Example 11.12. A run of Program 11.6 with two writers, using signal notification.

linux$ p11.6
R 1026  : Starting
W 2051  : Starting        <-- 1
W 3076  : Starting
twinkle toes
R 1026  : Got char [t]
W 2051  : Waiting
W 3076  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full
W 2051  : Writing buffer        <-- 2
twink
R 1026  : Got char [l]
W 2051  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 3076  : Writing buffer        <-- 3
le#to
R 1026  : Got char [e]
W 3076  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer        <-- 4
es#
W 2051  : Exiting
^C
  • (1)This time there are two writer threads, TID 2051 and 3076.

  • (2)Writer TID 2051 is notified first that the buffer is full.

  • (3)Writer TID 3076 is notified second that the buffer is full.

  • (4)Writer TID 2051 is notified this time. Buffer is written out and this thread exits. Remaining writer thread does not exit until CTRL+C is entered.

The output shows the writer threads alternating the task of displaying the output. At the end of the input sequence, CTRL+D causes the reader thread to signal the condition variable that termination is necessary. The thread to act upon the signal is the writer thread 2051 (2051 and 3076 are alternating). Writer thread 3076 (the thread ID passed to the single call to join) is unaware of the change, continues looping, and must be terminated with a CTRL+C.

If we keep two writer threads and change the two pthread_cond_signal calls to pthread_cond_broadcast (lines 49 and 58), we obtain the output shown in Figure 11.13.

Example 11.13. A run of Program 11.6 with two writer threads, using broadcast notification.

linux$ p11.6
R 1026  : Starting
W 2051  : Starting
W 3076  : Starting
twinkle toes
R 1026  : Got char [t]
W 2051  : Waiting
W 3076  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full
W 3076  : Writing buffer
twink
R 1026  : Got char [l]
W 3076  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 3076  : Writing buffer
le#to
R 1026  : Got char [e]
W 3076  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer
es#
W 2051  : Exiting
^C

In this example, when all threads are awakened with the call to pthread_cond_broadcast and placed in contention, the thread with the ID of 3076 is always the first to act on the signal until the last (exiting) broadcast is made. Keep in mind that the actual thread chosen by the operating system is not specified. While our example seems robust, there are still some conditions we did not try (see Exercise 11.9).

Read/Write Locks

When writing programs, it is not unusual to run into a situation, such as with adatabase, where the data involved is read more often than it is modified (written). In these situations a locking mechanism that permits simultaneous reading of data if no writing is occurring and the writing of data if no reading or writing of data is needed. Until fairly recently, a POSIX thread implementation of read/write locks was not available and users were left to write their own. Fortunately, newer versions of LinuxThreads contain support for POSIX-based read/write locks.

A read/write lock should always be set before it is used. A read/write lock is initialized with the library function pthread_rwlock_init (Table 11.20).

Table 11.20. The pthread_rwlock_init Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_rwlock_init( pthread_rwlock_t     *rwlock,
               const pthread_rwlockattr_t *attr );

Return

Success

Failure

Sets errno

0

Nonzero

 

The data type of the first argument of this call, pthread_rwlock_t, is defined as

typedef struct _pthread_rwlock_t {
struct _pthread_fastlock __rw_lock; /* Lock to guarantee mutual exclusion */
int __rw_readers;                   /* Number of readers */
_pthread_descr __rw_writer;         /* Identity of writer, or NULL if none */
_pthread_descr __rw_read_waiting;   /* Threads waiting for reading */
_pthread_descr __rw_write_waiting;  /* Threads waiting for writing */
int __rw_kind;                      /* Reader/Writer preference selection */
int __rw_pshared;                   /* Shared between processes or not */
} pthread_rwlock_t;

The argument *rwlock is a reference to the read/write lock to be initialized. The argument attr is used to reference an attribute object (similar to previously presented pthread functions). If this argument is set to NULL, the read/write lock is set to the default.

A typical read/write lock initialization sequence for use with multiple threads in a single process is.[19]

pthread_rwlock_t  rw_lock;
. . .
pthread_rwlock_init( &rw_lock, NULL );

If the pthread_rwlock_init call is successful, it returns a 0. If the call fails, a nonzero value is returned. The return of EINVAL (22) indicates an invalid argument.

As with mutexes, there is a suite of read/write lock manipulation functions. A summary of some of the more commonly used functions is shown in Table 11.21. All the functions take a single reference to an allocated read/write lock. We restrict our discussion to the basics: locking and unlocking a read/write lock.

Table 11.21. Some Common Read/Write Lock Library Functions.

Function Prototype

Description

int pthread_rwlock_rdlock(
    pthread_rwlock_t *rwlock );

Locks the referenced read/write lock for reading. If the lock is currently held for writing, the calling thread blocks. Multiple threads can hold the lock for reading.

int pthread_rwlock_wrlock(
    pthread_rwlock_t *rwlock );

Locks the referenced read/write lock for writing. If the lock is currently held for reading or writing, the calling thread blocks. Only one thread can hold the lock for writing.

int pthread_rwlock_unlock(
    pthread_rwlock_t *rwlock );

Unlock a read/write lock held by the calling thread. If other threads are blocked on the read/write lock, one of them will be unblocked. At present, the implementation favors blocked writers over blocked readers. If the calling thread does not hold a lock for reading or writing but issues this unlock call, the program's behavior is undefined.

int pthread_rwlock_tryrdlock(
    pthread_rwlock_t *rwlock );

Try to lock the referenced read/write lock for reading. If the lock is not held for writing, it returns a read lock. If the lock is currently heldfor writing, it returns the error value EBUSY (16).

int pthread_rwlock_trywrlock(
    pthread_rwlock_t *rwlock );

Try to lock the referenced read/write lock for writing. If the lock is not held for reading or writing, return a write lock. If the lock is currently held for reading or writing, return the error value EBUSY (16).

Program 11.7 uses read/write locks to allow multiple threads read/write access to a stack of characters stored as a singularly linked list. Each thread can push (add) a character to the list, pop (remove) a character from a non-empty list, display the list, sleep, or quit. A random number generator drives the activities of each thread. Threads compete with one another for access to the list. The header file for Program 11.7 is shown below.

Example 11.7. Header file for read/write lock example.

File : local_stack.h
  |     /*
  |          Common local header file: local_stack.h
  |     */
  |     #ifndef  LOCAL_STACK_H
  +     #define  LOCAL_STACK_H
  |     #define  _GNU_SOURCE
  |     #define  _REENTRANT
  |     #include <iostream>
  |     #include <cstdlib>
 10     #include <pthread.h>
  |     #include <unistd.h>
  |     #include <sys/time.h>
  |     using namespace std;
  |     const int MAX=6;
  +     class Stack {
  |        public:
  |                   Stack     ( ) : head( NULL ) {}         <-- 1
  |                   ~Stack    ( );
  |           bool    StackEmpty( void ) const { return (head == NULL); }
 20           void    Display   ( void ) const ;
  |           void    Push      ( const char );
  |           char    Pop       ( void );
  |        private:
  |           struct node {
  +              char         item;
  |              struct node *next;
  |           };
  |           node *head;
  |     };
 30     #endif
  • (1)User-defined Stack class implemented as a linked list.

As might be expected, the content of this file is similar to that of the header file for the previous example. However, some new items have been added. This example uses a user-defined Stack class. The definition of the class is found at the bottom of the header file. Code for the more complex Stack methods is found in the Program 11.7B. Additionally, Program 11.7B contains the code each thread will execute. This code consists of a driver function called do_stack that randomly chooses an activity for the thread on each pass through the loop.

Example 11.7B. Stack class methods and common list manipulation functions for read/write lockexample.

File : p11.7B.cxx
  |     #include "local_stack.h"
  |                                                // previously declared
  |     extern pthread_rwlock_t  *rw_ACCESS;       // RW lock
  |     extern Stack   *S;                         // Stack
  +                                                // remaining Stack methods
  |     Stack::~Stack( ){                          // List destructor
  |       node *curr = head, *next;
  |       while( curr ){
  |         next = curr->next;
 10         delete curr;
  |         curr = next;
  |       }
  |       head = NULL;
  |     }
  +     void                                       // Display the list
  |     Stack::Display( void ) const {
  |       node *temp = head;
  |       cout << "	" << pthread_self() << " [head]" << endl;
  |       while( temp != NULL ){
 20          cout << "	" << pthread_self() << " [" << temp->item
  |               << "]" << endl;
  |          cout.flush( );
  |          temp = temp->next;
  |          sleep(1);                             // slow things down
  +       }
  |       cout << "	" << pthread_self( ) << " [tail]" << endl;
  |     }
  |     void                                       // Add an item
  |     Stack::Push( const char item ){
 30       node *temp = new node;
  |       temp->item = item;
  |       temp->next = head;
  |       head       = temp;
  |     }
  +     char                                       // Remove an item
  |     Stack::Pop( void ){
  |       char item;
  |       node *temp = head;
  |       item = temp->item;
 40       head = temp->next;
  |       delete temp;
  |       return item;
  |     }
  |     int                                        // Random # in range
  +     my_rand(int start, int range){
  |       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
 50     }
  |     void *
  |     do_stack( void *junk ) {                  // Activity for thread
  |       char  item;
  |       sleep( my_rand(1,3) );                  // random start up time
  +       do {
  |         switch ( my_rand(1,10) ) {            // choose value 1-10
  |         case 1: case 2:                       // Display 2/10
  |           pthread_rwlock_rdlock(rw_ACCESS);   // read lock - block on W
  |           cout << pthread_self( ) << " Display:" << endl;
 60           if ( S->StackEmpty( ) )
  |             cout << pthread_self( ) << " Empty list" << endl;
  |           else
  |             S->Display();
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  +           break;
  |         case 3: case 4: case 5:               // Add item 3/10
  |           item = my_rand(1,25) + 64;
  |           pthread_rwlock_wrlock(rw_ACCESS);   // write lock - block on W|R
  |           cout << pthread_self( ) << " Push   : " << item << endl;
 70           S->Push( item );
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  |           break;
  |         case 6: case 7: case 8:               // Remove item 3/10
  |           pthread_rwlock_wrlock(rw_ACCESS);   // write lock - block
                                                     on W|R
  +           if (S->StackEmpty( ))
  |             cout << pthread_self( ) << " Underflow" << endl;
  |           else {
  |             cout << pthread_self( ) << " Pop    : ";
  |             item = S->Pop( );
 80             cout << pthread_self( ) << " " << item << endl;
  |           }
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  |           break;
  |         case 9:                               // Sleep 1/10
  +           cout << pthread_self( ) << " Sleep  :" << endl;
  |           sleep( my_rand(1,3));
  |           break;
  |         case 10:                              // Quit 1/10
  |           cout << pthread_self( ) << " Quit   :" << endl;
 90           return NULL;
  |         }
  |       } while ( 1 );
  |     }

In the do_stack loop, a random value from 1 to 10 is generated. This value determines the thread's activity. Given a good distribution of random values, approximately 20 percent of the time the thread executing this code should display the current list. About 30 percent of the time the thread should generate a new character and push the character onto the list. Roughly 30 percent of the time the thread should pop a character off the top of the list (if it is not empty). The remaining 20 percent of the thread will either sleep a few seconds or quit its activity.

The activities other than sleep or quit are bracketed with the appropriate read/write lock calls. As the display (reading) of the list can be done by multiple threads, a call to pthread_rwlock_rdlock is made before the display to obtain a read lock, and a call to pthread_rwlock_unlock is made once the display is completed to release the read lock. The Push and Pop methods, which cause the list contents to be modified, are bracketed with a call to pthread_rwlock_wrlock and pthread_rwlock_unlock calls. Thus, only one thread at a time is allowed to modify the linked list. It is important to note that all critical code was bracketed with the read/write locks. For example, if we were to move outside the bracketed area and check for an empty stack found in the section of code that calls Pop (line 75), we would on occasion find our program failing due to race conditions. This could occur when we have one item in our list. For example, say a thread calls Stack_Empty, finds the stack is not empty, and attempts to Pop (remove) the item. At the same time, a second thread (also finding the list to be not empty) also attempts to remove an item. While both consider the list to be not empty, one of the threads will draw an error, as the competing thread will have beaten it to the punch.

Each line of output identifies the underlying thread that generated it. The code for main is found in Program 11.7C.

Example 11.7C. Code for function main for read/write lock example.

File : p11.7C.cxx
  |     #include "local_stack.h"
  |                                                    // global by placement
  |     pthread_rwlock_t *rw_ACCESS=new pthread_rwlock_t;
  |     Stack            *S=new Stack;
  +     void *do_stack( void * );
  |     int
  |     main( int argc, char *argv[] ){
  |       int  i, n;
  |       pthread_t worker[MAX];
 10       pthread_rwlock_init(rw_ACCESS, NULL);
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  |         return 1;
  |       }
  +       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
  |       for( i=0; i < n; ++i )                         // create threads
  |         pthread_create(&worker[i],NULL,do_stack,(void *) NULL);
  |       for( i=0; i < n; ++i )                         // wait
  |         pthread_join(worker[i], (void **) NULL);
 20       return 0;
  |     }

Figure 11.14 shows a portion of the output generated from a run of Program 11.7 when four threads are competing for access to the list. As can be seen, multiple threads can display the list, but only one thread at a time can modify the list.

Example 11.14. A compilation and run of Program 11.7 with four competing threads.

linux$ g++ p11.7B.cxx p11.7C.cxx -o p11.7 -lpthread
linux$ p11.7  4
2051 Push   : A
2051 Sleep  :
3076 Pop    : 3076 A
3076 Push   : L        <-- 1
3076 Push   : L          |
3076 Push   : L          |
3076 Push   : K          |
3076 Push   : K        <-- 1
3076 Push   : F
3076 Quit   :
1026 Pop    : 1026 F
1026 Quit   :
4101 Push   : J
4101 Pop    : 4101 J
4101 Display:        <-- 2
        4101 [head]    |
        4101 [K]       |
2051 Display:          |
        2051 [head]        <-- 2
        2051 [K]
        4101 [K]
        2051 [K]
        4101 [L]
        2051 [L]
        4101 [L]
        2051 [L]
        4101 [L]
        2051 [L]
        4101 [tail]
4101 Display:
        4101 [head]
        4101 [K]
        2051 [tail]        <-- 3
        4101 [K]             |
        4101 [L]             |
        4101 [L]        <-- 3
        4101 [L]
        4101 [tail]
. . .
  • (1)A series of letters are pushed onto the list by several different threads.

  • (2)Thread 4101 begins to display the list. Shortly, thereafter, thread 2051 displays the list as well. This is perfectly acceptable, as more than one thread can access the list concurrently for reading. Their output is interspersed on the screen.

  • (3)Eventually, each thread finishes its display of the list.

Multithread Semaphores

The activities of threads may also be coordinated with semaphores. A semaphore is similar in function to a mutex. However, a semaphore can act as either a binary entity similar to a mutex or as a counting entity. Counting semaphores can be used to manage multiple resources. As they are somewhat more complex, semaphores are more system-intensive than mutexes. Semaphore concepts and operations were presented in some detail in Chapter 7. However, the Chapter 7 semaphore operations are System V-based and are not multithread safe. POSIX 1003.1b defines semaphores that can be used with threads. As these semaphore operations were written prior to the creation of the POSIX thread library calls, their interface has a slightly different flavor. Most notably, these operations do not begin with the sequence pthread_ and do set errno when they fail. All programs that contain POSIX semaphore operations must include <semaphore.h>.

Conceptually, a POSIX semaphore is a nonnegative integer value that can be used to synchronize thread activities. Increment (post) and decrement (wait) operations are performed on the semaphore. A decrement issued on a 0 valued semaphore will cause the issuing thread to block until another thread increments the semaphore. Unlike a mutex, for which there is a sense of ownership, a semaphore does not need to be acquired (decremented) and released (incremented) by the same thread.

A semaphore must be initialized before it is used. The library call sem_init, shown in Table 11.22, is used to initialize a semaphore.

Table 11.22. The sem_init Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int
sem_init( sem_t *sem, int pshared,
          unsigned int value );

Return

Success

Failure

Sets errno

0

−1

Yes

The sem_t data type referenced by sem is declared in <semaphore.h> as

/* System specific semaphore definition. */
typedef struct {
  struct _pthread_fastlock __sem_lock;
  int __sem_value;
  _pthread_descr __sem_waiting;
} sem_t;

The sem argument references the semaphore to be initialized. The pshared argument is used to indicate if the semaphore will be shared between processes. A value of 0 indicates the semaphore is not to be shared between processes, while a nonzero value indicates the semaphore is shareable. If the semaphore is to be shared between processes, the programmer is responsible for mapping the semaphore to a shared memory location or to a memory-mapped file. At present, the LinuxThreads implementation of POSIX threads does not support process-shared semaphores. Given this limitation, this argument should always be set to 0. The argument value is a nonnegative integer that specifies the starting value of the semaphore. A successful sem_init call returns a 0 and sets the referenced semaphore to the indicated initial value. If the call fails, it returns a value of −1 and sets errno to indicate the source of the error (see Table 11.23). In a multithreaded setting a semaphore should be initialized only once.

Table 11.23. sem_init Error Messages.

#

Constant

perror Message

Explanation

22

EINVAL

Invalid argument

The value argument exceeds the value of SEM_VALUE_MAX.

89

ENOSYS

Function not implemented

The pshared argument is not 0.

Once created, a semaphore can be locked using the library call sem_wait or sem_trywait. Keep in mind that underneath locking, a semaphore is an atomic decrement operation against the value of the semaphore.

Both calls require a reference to a semaphore of type sem_t. If the referenced semaphore is nonzero, the call decrements (by one) the referenced semaphore. If the semaphore is 0, the sem_wait call blocks until the semaphore becomes greater than zero. If the semaphore is 0, the sem_trywait call does not block and returns immediately. Both calls return a 0 if they are successful; otherwise, sem_trywait returns a −1 and sets errno to the value shown in Table 11.25. Unsuccessful calls do not change the state of the semaphore.

Table 11.24. The sem_wait and sem_trywait Library Functions.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_wait( sem_t * sem );
int sem_trywait( sem_t * sem );

Return

Success

Failure

Sets errno

0

−1

sem_trywait only

Table 11.25. sem_wait and sem_trywait Error Message.

#

Constant

perror Message

Explanation

11

EAGAIN

Resource temporarily unavailable

The sem_trywait found the semaphore to be 0.

Semaphores are unlocked (i.e., incremented) using the sem_post library call (Table 11.26).

Table 11.26. The sem_post Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_post(sem_t *sem);

Return

Success

Failure

Sets errno

0

−1

Yes

The sem_post call unlocks the referenced semaphore. If the semaphore was previously at 0 and there are other threads or LWPs blocking on the semaphore, they will be notified using the current scheduling policy (most often, the highest priority, longest waiting thread or LWP is scheduled next). If the semaphore was not previously at 0, its value is incremented by one. If successful, sem_post returns a value of 0; otherwise, it returns a value of −1 and sets errno to the value in Table 11.27 to indicate the error condition. The sem_post call is asynchronous-signal-safe (able to be called from within a signal handler).

Table 11.27. sem_post Error Message.

#

Constant

perror Message

Explanation

34

ERANGE

Numerical result out of range

If the semaphore were incremented, its value would exceed SEM_VALUE_MAX.

Chapter 7 provides a number of semaphore examples that can be readily adapted to a multithreaded setting by changing the nonmultithreaded semaphore operations to their POSIX multithread equivalents. Rather than duplicate these previous examples, in Program 11.8 I have used a semaphore to coordinate the activity of cooperating threads to determine when the threads have carried on their activities in a specific sequence.

Example 11.8. Using POSIX Semaphores with Threads

File : p11.8.cxx
  |     /*
  |            Using semaphores with threads
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <pthread.h>
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstdlib>
 10     #include <cstring>
  |     #include <unistd.h>
  |     #include <sys/time.h>
  |     #include <semaphore.h>                        // for POSIX semaphores
  |     using namespace std;
  +     const int BUF_SIZE= 15;
  |     const int MAX     = 4;
  |     int   world_state = 1;
  |     sem_t check_state;
  |     typedef struct {
 20             char word[BUF_SIZE];
  |             int  my_state;
  |     } Info;
  |     void *speaker( Info * );
  |     //   Generate a random # within given range
  +     int
  |     my_rand(int start, int range){
  |       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
 30                    / (RAND_MAX+1.0));
  |     }
  |     int
  |     main( int argc, char *argv[] ){
  |       pthread_t t_ID[MAX];
  +       Info     words[MAX];
  |       if ( argc != MAX+1 ) {
  |          cerr << "Usage " << *argv << " word1 ... word" << MAX << endl;
  |          return 1;
  |       }
 40       sem_init( &check_state, 0, 1 );      // start semaphore at 1
  |       for (int i = 0; i < MAX; ++i){
  |         strcpy( words[i].word, argv[i+1] );
  |         words[i].my_state = i+1;
  |         if ( (pthread_create( &t_ID[i],NULL,
  +            ( void *(*)(void *) )speaker,(void *) &words[i])) != 0 ) {
  |          perror("Thread create speaker");
  |          return i;
  |          }
  |       }
 50       pthread_join( t_ID[MAX-1], (void **) NULL);
  |       cout << "!" << endl;
  |       return 0;
  |     }
  |     /*
  +        Display the passed in word
  |     */
  |     void  *
  |     speaker( Info * s ){
  |       while( true ) {
 60         sleep(my_rand(1,3));
  |         sem_wait( &check_state );        // obtain & decrement else block
  |         cout << s->word << " ";
  |         cout.flush( );
  |         if ( s->my_state == world_state ) {
  +           ++world_state;
  |           if ( world_state > MAX ) break;
  |         } else {
  |           cout << endl;
  |           world_state = 1;
 70         }
  |         sem_post( &check_state );          // release & increment
  |       }
  |       return( (void *) NULL );
  |     }

In Program 11.8 the file <semaphore.h> is included, as POSIX semaphores are used. A global integer, world_state (declared before main), is allocated and set to 1. This variable is used by the cooperating threads to determine when processing should stop (i.e., when this variable exceeds the value MAX). Access to the world_state variable is controlled by the semaphore check_state. A typedef is used to create a user-defined type called Info. Items of this type will have storage for 15 characters and an integer value. The character array will hold a short sequence of characters (a word), and the integer, a value indicating the current state of output. In main two arrays are allocated. The first, t_ID, is used to store the thread IDs. The second array, called words, stores the word and state information that is passed to each thread. The sem_init call is used to set the allocated semaphore to 1. As the second argument of this call is 0, only threads within the same process space can share the semaphore. A loop is used to create additional threads and pass each a value obtained from the command line (stored in the elements of the words array) and its state value. Each thread is directed to execute the user-defined function speaker. The thread in main then waits (by way of a pthread_join) for the last thread generated to exit. When the pthread_join is completed, the program concludes.

The speaker function loops continuously. It sleeps for a random number of seconds (1—3), and then attempts to lock (obtain) the semaphore. Once it is successful, the thread displays the word it was passed. It then checks its state value against the current state value for the process (stored as world_state). If its state value is equivalent to the current value of world_state, the world_state is incremented, as progress is being made toward the printing of the words in the proper (command line) order. If the thread's state value is not equivalent to the world_state value, out-of-order processing has occurred, and the world_state variable is reset to 1 to indicate a restarting of the sequence. Once the evaluation and manipulation of world_state is complete the semaphore is released.

A run of Program 11.8 and its output are shown in Figure 11.15.

Example 11.15. A run of Program 11.8.

linux$ p11.8 once upon a time
upon
a
once time        <-- 1
upon
a
once time
upon
once time
a
time
once upon a once
time
a
a
once upon a time !        <-- 2
  • (1)Each time progress is no longer being made, the output sequence restarts.

  • (2)All command-line arguments are displayed in order; processing stops.

As can be seen, the threads do not finish their activity until they successfully display the sequence in the same order that it was passed on the command line.

The function sem_getvalue, which retrieves the current value of the semaphore, is also supported (Table 11.28).

Table 11.28. The sem_getvalue Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_getvalue(sem_t * sem, int * sval);

Return

Success

Failure

Sets errno

0

−1

 

This function stores, in the location referenced by sval, the current value of the semaphore referenced by sem. The returned value should not be used by the program to make decisions, as it is transient and its use in a decision construct could result in race conditions.

Thread-Specific Data

Sometimes we need to be able to maintain data that is specific to a given thread but is referenced in a section of code that will be executed by multiple threads. Data of this nature, stored in a memory block private to the thread, is usually termed thread-specific data, or TSD. This data is referenced using a thread-specific pointer and the associated key. The keys for TSD are global to all the threads within the process. To make use of TSD, a thread must allocate and bind (associate) the key with the data. The library call pthread_key_create (Table 11.29) is used to allocate a new key. Only one thread should issue the create call for a TSD item.

Table 11.29. The pthread_key_create Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_key_create( pthread_key_t  *key,
                       void(*destr_function)
                       (void *) );

Return

Success

Failure

Sets errno

0

−1

 

As its first argument, the pthread_key_create library call is passed a reference, *key, to a pthread_key_t data type. The pthread_key_t data type is typedefed as an unsigned int. If the call to pthread_key_create is successful, the *key argument references the newly allocated key. There is a system limit on the number of keys per thread. The limit is designated by the defined constant PTHREAD_KEYS_MAX.

The second argument for pthread_key_create is a reference to a destructor function. If this argument is non-NULL, the referenced function is called and passed the associated TSD when the thread exits (i.e., calls pthread_exit) or is cancelled. If TSD is allocated within the destructor function, the system attempts to repeat destructor calls until all keys are NULL. As might be expected, this behavior could lead to some interesting recursive situations. Some thread implementations, such as LinuxThreads, limit the number of calls to resolve the removal of TSD; some do not. In LinuxThreads these calls are limited by the defined constant PTHREAD_DESTRUCTOR_ITERATIONS. If the call to pthread_key_create is successful, it returns a value of 0; otherwise, it returns the value EAGAIN (11) if the limit for the number of keys has been exceeded or ENOMEM (12) if insufficient memory is available to allocate the key.

The pthread_key_delete library function (Table 11.30) is used to remove the storage associated with a specific key (versus the data associated with the key).

Table 11.30. The pthread_key_delete Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_key_delete(pthread_key_t key);

Return

Success

Failure

Sets errno

0

−1

 

A valid TSD key is pthread_key_delete's only argument. If the key value is non-NULL, it is removed. While it would seem reasonable, the pthread_key_delete function does not automatically call the key's associated destructor function. If the function is passed an invalid key, it returns the value EINVAL (22); otherwise, it returns a 0, indicating successful removal of the key.

TSD is manipulated with the pthread_setspecific and pthread_getspecific library calls (Table 11.31).

Both functions accept a globally allocated key argument. The key, created by a call to pthread_key_create, is used by the pthread_setspecific library function to store the data referenced by the pointer argument. By definition, each calling thread can bind a different data value with the key. Most often, pointer references memory that has been dynamically allocated by the calling thread. Once bound, the associated values are individually maintained on a per-thread basis. The pthead_setspecific function will fail and return the value ENOMEM (12) if there is insufficient memory to associate a value with a key. If successful, pthread_setspecific returns a 0. The pthread_getspecific library function uses the key argument to retrieve (return) a reference to the TSD. If the key is not bound, a NULL (0) is returned.

Table 11.31. The TSD manipulation Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_setspecific(pthread_key_t  key,
                    const void     *pointer);
void
*pthread_getspecific(pthread_key_t key);

Return

Success

Failure

Sets errno

0

−1

 

Program 11.9 demonstrates one approach for using TSD.

Example 11.9. Using TSD.

File : p11.9.cxx
  |     /*
  |       Using thread specific data
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <pthread.h>
  |     #include <stdlib.h>
  |     #include <unistd.h>
 10     using namespace std;
  |     const int MAX=20;
  |     void *TSD( int ),                             // manipulates TSD
  |           free_me( void *  );                     // destructor
  |     static pthread_key_t  key;                    // global TSD key
  +     int
  |     main( int argc, char *argv[] ) {
  |       pthread_t thr_id[MAX];
  |       int inc;
  |       if ( argc  < 2 || atoi(argv[1]) > MAX){
 20         cerr << *argv << " num_threads" << endl;
  |         return 1;
  |       }
  |                                                   // generate key (once)
  |       pthread_key_create(&key, (void(*)(void*))free_me);
  +       for(int i=0; i < atoi(argv[1]); ++i){
  |        inc = i+1;                                 // can't cast an expr
  |        if (pthread_create(&thr_id[i],NULL,(void *(*)(void *))TSD,
           (void *)inc) > 0){
  |          cerr << "pthread_create failure" << endl;
  |          return 2;
 30        }
  |       }
  |                                                   // wait for all threads
  |       for(int i=0; i < argc-1; ++i)
  |         pthread_join(thr_id[i], NULL);
  +       sleep( 1 );
  |       return 0;
  |     }
  |     /*
  |        TSD routine - passed a value that it will keep private
 40     */
  |     void *
  |     TSD( int private_stuff ){
  |       static pthread_mutex_t  the_lock;
  |       void   *tsd = NULL;
  +       tsd = pthread_getspecific(key);             // initially NULL
  |       if (tsd == NULL) {
  |         tsd = new pthread_key_t;                  // create storage
  |         tsd = &private_stuff;                     // make the association
  |         pthread_setspecific(key, tsd);
 50         cout << pthread_self( ) << " TSD starts at 	 "
  |              <<  *(int *)pthread_getspecific(key) << endl;
  |       }
  |       for( int i=0; i < 3; ++i ){
  |         sleep(1);
  +         pthread_mutex_lock(&the_lock);            // enter critical region
  |         cout << pthread_self( ) << " incrementing" << endl;
  |         *(int *)pthread_getspecific(key) *= 2;    // double private value
  |         cout << pthread_self( ) << " yielding" << endl;
  |         pthread_mutex_unlock(&the_lock);          // exit critical region
 60         sched_yield();                            // notify scheduler
  |       }
  |       cout << pthread_self( ) << " TSD finishes at 	 "
  |            << *(int *)pthread_getspecific(key) << endl;
  |       cout.flush( );
  +       pthread_exit(NULL);
  |       return NULL;
  |     }
  |     /*
  |           Dummy destructor routine
 70     */
  |     void
  |     free_me( void *value ){
  |       cout << pthread_self( ) << " free reference to 	 "
  |            << *(int *) value  << endl;
  +     }

The prototype for the user-defined function TSD that will manipulate the TSD and the user-defined function free_me that will act as a destructor are placed prior to main. In addition, the key that will be used to access TSD is allocated prior to main. Its placement assures it will be global in scope. In main, a call is made to pthread_key_create (line 24), and the addresses of the key and the destructor function are passed. A cast is used to keep the compiler from complaining about argument type mismatch. In the first for loop in main, the value of the loop counter plus one is assigned to the variable inc.[20] As each thread is created, it is passed a reference to the user-defined TSD function and the value stored in inc. Again, the cast operator is used to keep the compiler from flagging what it would consider to be mismatched arguments. Once the threads have been generated, main uses a second for loop with a call to pthread_join to wait for the threads to finish their activities. A call to sleep follows the pthread_join loop to allow the final terminating thread sufficient time to flush its output buffer.

In the user-defined function TSD a mutex, the_lock, and a void pointer, tsd, are allocated. The storage class for the_lock is static, while the storage class for tsd is auto (the default). A call is made to pthread_getspecific, and the value returned is assigned to tsd. If the key passed to pthread_getspecific has not been associated with a TSD value, the pthread_getspecific call will return a NULL value. If the returned value is NULL (which it should be upon initial entry), a storage location is allocated using the new operator (line 47). The TSD is associated with the key using the pthread_setspecific library function. Next, a for loop is used to simulate activity. Within the loop, a mutex called the_lock is used to bracket a section of code where we would like to keep our display messages from being interleaved with those of other threads that will be executing the same code. In this section of code the TSD is multiplied by 2. A cast is used to coerce the void * to an int *. A call to library function sched_yield causes the current thread to yield its execution to another thread with the same or greater priority. At different points within the program, informational messages tagged with the thread ID are displayed.

A run of Program 11.9 is shown in Figure 11.16.

Example 11.16. A run of Program 11.9 with four competing threads.

1026 TSD starts at       1        <-- 1
2051 TSD starts at       2
3076 TSD starts at       3
4101 TSD starts at       4
1026 incrementing
1026 yielding
4101 incrementing
4101 yielding
3076 incrementing
3076 yielding
2051 incrementing        <-- 2
2051 yielding
1026 incrementing
1026 yielding
4101 incrementing
4101 yielding
3076 incrementing
3076 yielding
2051 incrementing
2051 yielding
1026 incrementing
1026 yielding
1026 TSD finishes at     8        <-- 3
1026 free reference to   8
4101 incrementing
4101 yielding
3076 incrementing
3076 yielding
2051 incrementing
2051 yielding
4101 TSD finishes at     32
4101 free reference to   32
3076 TSD finishes at     24
3076 free reference to   24
2051 TSD finishes at     16
2051 free reference to   16
  • (1)Each thread has a different starting value for its data.

  • (2)Each thread accesses its private data, increments it and then yields three times for each tread.

  • (3)As each thread finishes, it releases its reference to the TSD. The order of processing is determined by the scheduler and may vary across multiple invocations.

As can be seen in Figure 11.16, each thread maintains its own TSD value. The call to the destructor function free_me is made as each thread finishes its execution.

In Program 11.9 we allocated the key for the TSD prior to main and created the key in main. We can, if we are careful, allocate and create the key within the code segment shared by multiple threads. For example, in Program 11.9 we can remove (comment out) the statement prior to main that allocates the key (line 14)

// static pthread_key_t  key;

and the pthread_key_create statement (line 24) within main

// pthread_key_create(&key, (void(*)(void*))free_me);

and add the following statements (lines 45 to 53) to the user-defined TSD function.

  |     /*
  |        TSD routine - passed a value that it will keep private
 40     */
  |     void *
  |     TSD( int private_stuff ){
  |       static pthread_mutex_t  the_lock;
  |       void   *tsd = NULL;
  +                                                      // do once
  |       static pthread_key_t      key;
  |       static int                done_once = 0;
  |       if ( !done_once ) {
  |         pthread_mutex_lock(&the_lock);               // bracket code
 50         if ( !done_once++ )                          // re-check & inc
  |           pthread_key_create(&key, (void(*)(void*))free_me);
  |         pthread_mutex_unlock(&the_lock);             // end bracket
  |       }
  |       tsd = pthread_getspecific(key);                // initially NULL

In this second approach the storage class qualifier static is used in the user-defined TSD function when allocating the key. An integer flag variable called done_once, specified as static, is also allocated and initially set to 0 (which with the static qualifier should be its default value anyway). The mutex is used to bracket the inner check of the content of the done_once variable. Only one thread will find the done_once variable at 0, increment the variable, and create the key. After the done_once variable is incremented, the outer if will prevent further access to this section of code. This is by no means the only way in which this can be done. Another approach is to use the pthread_once library function.[21]

This section ends with a final program example, Program 11.10, that uses TSD and incorporates a number of the previously discussed thread-related functions. In this example the program is passed the name of a file to store its output and the number of threads (1—10) to generate. Each thread, represented by a different uppercase letter of the alphabet, is responsible for moving about within a common two-dimensional grid. As the thread moves, its path is displayed on the screen. The starting point for a thread is marked by an @ symbol. When the thread moves to a new location, its previous location is changed to lowercase. As written, a thread can move in one of four directions: left, right, up, or down. Moves that run off the grid are wrapped, in a modular fashion, to the next valid row—column location. If a thread moves to a location that is boxed in (i.e., a location where its neighbors on the left, right, top, and bottom are occupied), the thread expires. The program terminates when all threads have been boxed in.

To make the output a bit more interesting, the current state of the grid is displayed using basic vt100 escape codes.[22] The vt100 escape codes are incorporated in the user-defined functions LOCATE (used to place the cursor at a specific screen coordinate) and CLS (used to clear the screen). The display is updated dynamically, and in vt100 supported settings, produces a rough appearance of animation. The header information for Program 11.10 is placed in the file local_TSD.h. The contents of local_TSD.h are shown in Figure 11.17.

Example 11.17. The local_TSD.h file for Program 11.10.

File : local_TSD.h
  |     /* local header file for example p11.10.cxx
  |      */
  |     #define _REENTRANT
  |     #define _GNU_SOURCE
  +     #include <pthread.h>
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstdlib>
  |     #include <cstring>
 10     #include <fcntl.h>
  |     #include <unistd.h>
  |     #include <sys/types.h>
  |     #include <sys/time.h>
  |     #include <sys/stat.h>
  +     #include <sys/mman.h>
  |     using namespace std;
  |     const int  ROW=20, COL=42, MAX=10;
  |     const char ESC='33';
  |     inline
 20     void LOCATE(int row, int col){
  |       cout << ESC << "[" << row << ";" << col << "H";
  |     }
  |     inline
  |     void CLS( ){
  +       LOCATE(1, 1);
  |       cout << ESC << "[2J";
  |     }
  |     int
  |     my_rand(int start, int range){
 30       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
  |     }
  +     typedef struct {
  |        int left, right, top, bot;
  |     }  DIRECTION;
  |     static char     guys[] = "ABCDEFGHIJ";
  |     int             n_dead = 0;
 40     char            *the_file;
  |     pthread_mutex_t scrn_lock;
  |                                                     // function prototypes
  |     void            display_screen(char *);
  |     bool            boxed(char *, int, int);
  +     void            move(char *, int *, int *, char);
  |     void            neighbors( int , int , DIRECTION * );
  |     void            *play( void * );

Along with the vt100 functions, the local_TSD.h file contains a type definition for a DIRECTION structure that will hold the indices of neighbor locations. Within the program the two-dimensional grid is treated as a vector. Five user-defined functions are prototyped. The display_screen function, passed a reference to the grid, displays the current contents of the grid at a set location on the screen. The boxed function, passed a reference to the grid and a row and column location, returns a true if the neighbors of the row—column location are all occupied; otherwise, it returns false. The move function finds and moves to a new location. The move function is passed a reference to the grid and the row—column location, as well as a copy of the current letter associated with a given thread. Upon completion of move, the grid and the row—column location are updated. The neighbors function is passed a copy of a row—column location and returns a DIRECTION structure containing the indices of the neighbor locations. The play function, passed to each thread, serves as a driver routine for the activities of the thread.

Example 11.10. Animating threads.

File : p11.10.cxx
  |     /*
  |            p11.10.cxx: Thread animation
  |            Compile   : g++ -o p11.10  p11.10.cxx  -lpthread
  |     */
  +     #include "local_TSD.h"
  |     int
  |     main(int argc, char *argv[]) {
  |       char        the_screen[ROW][COL];
  |       pthread_t   thread_id[MAX];
 10       int         fd0, n_threads;
  |       struct stat buf;
  |       if (argc != 3) {                               // check cmd line
  |         cerr << "Usage " << *argv <<  " file_name #_threads" << endl;
  |         return 1;
  +       }
  |       if ((n_threads = atoi(argv[2])) > MAX ) {
  |         cerr << "# threads must be < " <<  MAX+1 << endl;
  |         return 2;
  |       }
 20       setbuf(stdout, NULL);
  |       guys[n_threads] = '';
  |       memset(the_screen, ' ', sizeof(the_screen));   // clear screen
                                                            array
  |                                                      // open file for
                                                            mapping
  |       if ((fd0 = open(argv[1], O_CREAT | O_RDWR, 0666)) < 0) {
  +         cerr << "Open error on file " << argv[1] << endl;
  |         return 3;
  |       }                                              // write screen
                                                            to file
  |       write(fd0, the_screen, sizeof(the_screen));
  |       if (fstat(fd0, &buf) < 0) {                    // stats on mapped
                                                            file
 30         cerr << "fstat error on file " << the_file << endl;
  |         return 4;
  |       }                                              // establish the
                                                            mapping
  |       if ((the_file =(char *) mmap(0, (size_t) buf.st_size, PROT_READ |
               PROT_WRITE,
  |                            MAP_SHARED, fd0, 0)) ==  NULL) {
  +         cerr << "mmap failure" << endl;
  |         return 5;
  |       }
  |       CLS( );                                        // clear the display
  |       for (int i=0; i < n_threads; ++i) {            // generate the
                                                            threads
 40         pthread_create( &thread_id[i], NULL, play, (void *)i);
  |       }
  |       do {                                        // in main thread
  |         sleep(1);                                 // pause a bit
  |         pthread_mutex_lock( &scrn_lock);
  +           display_screen(the_file);               // display screen
  |         pthread_mutex_unlock( &scrn_lock);
  |       } while (n_dead < n_threads);               // while threads left
  |       for(int i=0; i < n_threads; ++i)
  |         pthread_join( thread_id[i], (void **) NULL);
 50       LOCATE(25, 1);
  |       close(fd0);
  |       return 0;
  |     }
  |     /*
  +           Play the game by moving a character around the grid
  |     */
  |     void *
  |     play( void *numb ){
  |       static pthread_mutex_t  the_lock;            // single copy of these
 60       static pthread_key_t    the_key;
  |       static int              first_time = 1;
  |       int                     row, col;            // local to each
                                                          invocation
  |       char                    pch;
  |       void                    *my_let = NULL;      // thread specific
                                                          data
  +       if ( first_time ) {
  |         pthread_mutex_lock( &the_lock );
  |         if ( first_time ) {
  |           pthread_key_create( &the_key, NULL );
  |           first_time = 0;
 70         }
  |         pthread_mutex_unlock( &the_lock );
  |       }
  |       if ( (my_let = pthread_getspecific( the_key )) == NULL ) {
  |         my_let = (int *) &numb;
  +         pthread_setspecific( the_key, my_let );    // associate with key
  |       }
  |       row=my_rand(1,ROW)-1;                        // start at random
                                                          location
  |       col=my_rand(1,COL)-1;
  |       pch = (char) (65+*(int *)pthread_getspecific(the_key));
 80       do {
  |         move(the_file, &row, &col, pch);           // move around
  |       } while( !boxed( the_file, row, col));       // while not boxed in
  |       n_dead++;                                    // update terminated
                                                          threads
  |       guys[*(int *)pthread_getspecific(the_key)] = '*';
  +       pthread_mutex_lock( &scrn_lock );
  |         LOCATE(1, 1);
  |         cout << "Dead = " << n_dead << "[" << guys << "]";
  |       pthread_mutex_unlock( &scrn_lock );
  |       return NULL;
 90     }
  |     /*
  |            Find and move to new location.
  |     */
  |     void
  +     move(char *s, int *r, int *c, char pch) {
  |       int       old_offset = (*r * COL + *c),
  |                 new_offset = -1;
  |       DIRECTION d;
  |       neighbors( *r, *c, &d );                       // get neighbor
                                                            locations
100       do {
  |         if ( my_rand(1,3) == 1 ) sleep(1);           // 1/3 time sleep first
  |         switch ( my_rand(1,4) ) {
  |         case 1:
  |           if ( *(s + d.left )  == ' ' ) new_offset = d.left;
  +           break;
  |         case 2:
  |           if ( *(s + d.right ) == ' ' ) new_offset = d.right;
  |           break;
  |         case 3:
110           if ( *(s + d.top   ) == ' ' ) new_offset = d.top;
  |           break;
  |         case 4:
  |           if ( *(s + d.bot   ) == ' ' ) new_offset = d.bot;
  |           break;
  +         }
  |       } while( new_offset == -1 );
  |       *r = new_offset / COL;
  |       *c = new_offset % COL;
  |       *(s + new_offset) = pch;                       // assign new
                                                            location
120       if ( *(s + old_offset) != '@' )                // if its not a
                                                            start loc
  |         *(s + old_offset) += 32;                     // change old loc
                                                            to LC
  |     }
  |     /*
  |           Display the screen using VT100 escape codes for cursor
              placement.
  +     */
  |     void
  |     display_screen(char *s) {
  |       static int   pass = 1;
  |       static char  buffer[COL + 1];
130       LOCATE(1, 33);
  |       cout << "Thread World";
  |       LOCATE(2, 18);
  |       cout << "+------------------------------------------+";
  |       for (int i=3; i < 23; ++i) {
  +         LOCATE(i, 18);                               // move to screen location
  |         strncpy(buffer, (s + (i - 3) * COL), COL);   // get output segment
  |         cout << "|" << buffer << "|";
  |       }
  |       LOCATE(23, 18);
140       cout << "+------------------------------------------+";
  |       LOCATE(24, 20);
  |       cout << "Pass " << ++pass;
  |     }
  |     /*
  +            Check neighbors to see if any free locations are left
  |     */
  |     bool
  |     boxed(char *s, int r, int c) {
  |       DIRECTION d;
150       neighbors( r, c, &d );                         // get my neighbors
  |       return ( *(s+d.left) != ' ' && *(s+d.right) != ' ' &&
  |                *(s+d.bot ) != ' ' && *(s+d.top  ) != ' '),
  |     }
  |     /*
  +            Calculate the surrounding locations
  |     */
  |     void
  |     neighbors( int row, int col, DIRECTION *d ){
  |       d->left  =  row * COL + (col > 0 ? col - 1 : COL - 1);
160       d->right =  row * COL + (col > COL - 2 ? 0 : col + 1);
  |       d->top   =  (row > 0 ? row - 1 : ROW - 1) * COL + col;
  |       d->bot   =  (row > ROW - 2 ? 0 : row + 1) * COL + col;
  |     }

In main, a character array, the_screen, is allocated along with an array to hold the thread IDs. The command-line contents are examined. The user must pass the name of a file and the number of threads to be generated. The number of threads, stored as n_threads, is used as an index to place a NULL in the guys array. The guys array is a list of uppercase letters that represent the active threads. A series of statements are used to map the the_screen display to the file name passed on the command line. While mapping the display to a file is not integral to the overall functionality of the example, it does show how this can be done and would allow, should the user so desire, a means for archiving the program's output. A loop is used to create the threads, and each is passed a reference to the user-defined function play. The initial thread, executing main, then loops until all threads have finished their activities. As it loops, after each one-second sleep, the screen is redisplayed.

The user-definded play function allocates the key. The logic to accomplish this is a variation of our previous example. The TSD *my_let reference, local to each invocation of play, is associated with the key by checking the return from the pthread_getspecific library function. If a NULL value is returned, as in the initial pass-through, the loop value from main (passed in as numb) is assigned to my_let and associated with the key via a call to pthread_setspecific.

A random location is chosen as the starting point for the thread. The character to be displayed is retrieved with a call to pthread_getspecific. Using a do-while loop, the thread moves about the grid while it is not boxed in. Once boxed in, the thread exits the activity loop. It then increments a global value representing the number of inactive (expired) threads and marks the appropriate location in the guys array with an *. Finally, it updates the display of terminated threads and exits.

As written, this example usually produces rather entertaining output. However, as the program contains some oversights in logic, it will on occasion not work as advertised.[23]

Figure 11.18 contains an output display of a partial run of Program 11.10 with five competing threads (letters A—E).

Partial output from Program 11.10.

Figure 11.18. Partial output from Program 11.10.

In the run shown, five threads were generated. Thread D was terminated when it became boxed in (in this case surrounded by its own previous movements shown by the lowercase d's). The upper left corner of the output shown has an * in place of the letter D (indicating its termination). As shown, thread C has also become boxed in. However, either the program has somehow missed this fact (the previously mentioned oversight) or it was interrupted before it had a chance to fully update the screen. The remaining threads are active. As shown, the thread executing main had completed its 15th pass.

Debugging Multithreaded Programs

Writing multithreaded programs that execute correctly can be quite a challenge. Fortunately, there are some tools available to help with the task. Many current C, C++ compilers are bundled with thread-aware debuggers. For example, newer versions of the GNU C, C++ compiler gcc, g++, come with gdb, and Solaris' C, C++ compiler comes with dbx. Thread-aware debuggers automatically recognize multithreaded code. Such debuggers can be used to step through multithreaded programs and examine the contents of mutexes and TSD.

We will use Program 11.11 as source for our thread-debugging example. The debugger presented will be GNU's gdb (version 5.1.90CVS-5.)[24] As presented, this program is syntactically correct but contains logic errors pertaining to the access and manipulation of common data by the multiple detached threads.

Example 11.11. Debugging multithreaded programs.

File : p11.11.cxx
  |     /*
  |          Debugging multithreaded prgrms - WITH LOCKING ERRORS
  |          Compile: g++ p11.11.cxx -lpthread -o p11.11
  |     */
  +     #define _REENTRANT
  |     #define _GNU_SOURCE
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstdlib>
 10     #include <pthread.h>
  |     #include <unistd.h>
  |     #include <sys/time.h>
  |     using namespace std;
  |     const int  MAX=5,
  +                HOME=25;
  |     int
  |     my_rand(int start, int range){
  |       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
 20       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
  |     }
  |     typedef struct {
  |       int increment;
  +       char *phrase;
  |     } argument;
  |     void  step( void * );
  |                                                  // common to all threads
  |     pthread_t       thread_id[MAX];
 30     bool            alive = true, home = false;
  |     int             position,total=0;
  |     char   walk[] = "     |     | ";
  |     int
  |     main(int argc, char *argv[]) {
  +       argument right={ +1, "ZOINK! Stepped off the RIGHT side.
"},
  |                left ={ -1, "SPLAT! Stepped off the LEFT side.
"};
  |       pthread_attr_t attr_obj;
  |       if (argc < 2) {                           /* check arg list       */
  |         cerr <<  *argv << " start_position" << endl;
 40         return 1;
  |       }
  |       position = atoi(argv[1]);
  |       if ( position < 1 )
  |          position = 1;
  +       else if ( position > MAX )
  |          position = MAX;
  |       walk[position+5] = '*';
  |       setvbuf(stdout, (char *) NULL, _IONBF, 0);
  |       cout << "The drunken sailor walk" << endl << endl;
 50       cout << "     +12345+" << endl;
  |       cout << walk << endl;
  |       pthread_attr_init( &attr_obj );
  |       pthread_attr_setdetachstate( &attr_obj, PTHREAD_CREATE_DETACHED );
  |       pthread_create(&thread_id[0], &attr_obj,
  +                      (void *(*) (void *)) step, (void *) &right);
  |       pthread_create(&thread_id[1], &attr_obj,
  |                      (void *(*) (void *)) step, (void *) &left );
  |       pthread_exit(NULL);
  |       return 0;
 60     }
  |     void
  |     step( void *a ) {
  |       argument *my_arg=(argument *)a;
  |       do {
  +         sleep( my_rand(1,3) );                  // pause a bit
  |         walk[position+MAX] = ' ';               // clear old position
  |         position += my_arg->increment;          // calculate new position
  |         alive = bool(position > 0  && position <= MAX);
  |         walk[position+MAX] = alive ? '*' : '$';
 70         cout << walk << endl;
  |         home = bool(++total >= HOME);
  |         if ( !alive || home ) {
  |           if ( !alive )
  |             cout << my_arg->phrase;
  +           else
  |             cout << "The sailor made it home safely this time!
";
  |           pthread_kill(thread_id[ (position < 1 ? 1 : 0)], 9);
  |         }
  |         sched_yield( );
 80       } while ( alive && !home );
  |     }

Program 11.11 contains an assortment of POSIX thread calls. The program, which is purely pedagogical in nature, implements a version of the “drunken sailor” problem. In this version, a drunken sailor is given a starting position on a boardwalk that is five steps wide. The program traces the path of the sailor as he or she progresses down the boardwalk toward home (located an arbitrary number of steps from the start). If the sailor steps off either side of the boardwalk, he or she perishes. If the sailor is still on the boardwalk after a set number of steps he or she is considered to have made it home. The sailor's position on the boardwalk is stored in a variable called position. Two threads manipulate this data. One thread executes a user-defined function, step, moving the sailor to the right, while a second thread executes the same function, moving the sailor to the left (the movement is based on the argument passed to the step function). Both threads are detached from the initiating thread. When the sailor perishes or reaches the end of the walk, the detached threads are terminated. Typical output from Program 11.11 is shown in Figure 11.19.

Several runs of Program 11.11.

Figure 11.19. Several runs of Program 11.11.

In the first run it appears that the program is working pretty much as would be expected. However, the second and third run produces somewhat unexpected results. In the second run it looks as if there might be two sailors on the boardwalk (I suppose one could be seeing double—but this is not the case). In the third run the right side of the boardwalk seems to have disappeared. Clearly, something funny is going on! The problem is tied to the unrestricted access of common data by competing threads. One way to check on what is happening is to run the program in the debugger.

To prepare the program for the debugger, pass the -g argument at compilation time to prevent the automatic removal of additional symbol table information from the executable. For example, the command sequence

linux$ g++ -g p11.11.cxx -lpthread -o p11.11

produces an executable, p11.11, that can be loaded and run in the debugger. When the debugger is invoked, it is passed the name of the executable. For our example this would be

linux$ gdb p11.11
GNU gdb Red Hat Linux (5.1.90CVS-5)
Copyright 2002 Free Software Foundation, Inc.
GDB is free software, covered by the GNU General Public License, and you are
welcome to change it and/or distribute copies of it under certain conditions.
Type "show copying" to see the conditions.
There is absolutely no warranty for GDB.  Type "show warranty" for details.
This GDB was configured as "i386-redhat-linux"...
(gdb)

Suppose we want the debugger to stop in the user-defined function step. We can use the list command in gdb to show us a given sequence of lines (with their line numbers). For example,

(gdb) list 61,81
61      void
62      step( void *a ) {
63        argument *my_arg=(argument *)a;
64        do {
65          sleep( my_rand(1,3) );                    // pause a bit
66          walk[position+MAX] = ' ';                 // clear old position
67          position += my_arg->increment;            // calculate new position
68          alive = bool(position > 0  && position <= MAX);
69          walk[position+MAX] = alive ? '*' : '$';
70          cout << walk << endl;
71          home = bool(++total >= HOME);
72          if ( !alive || home ) {
73            if ( !alive )
74              cout << my_arg->phrase;
75            else
76              cout << "The sailor made it home safely this time!
";
77            pthread_kill(thread_id[ (position < 1 ? 1 : 0)], 9);
78          }
79          sched_yield( );
80        } while ( alive && !home );
81       }

Or, we can also use the list command and pass the name of the user-defined function we would like to see listed (such as step). If we do this, the debugger will show the first N (usually 10) lines of the referenced function. The listing usually begins a line or two prior to the actual function.

(gdb) list step
57                       (void *(*) (void *)) step, (void *) &left );
58        pthread_exit(NULL);
59        return 0;
60      }
61      void
62      step( void *a ) {
63        argument *my_arg=(argument *)a;
64        do {
65          sleep( my_rand(1,3) );                       // pause a bit
66          walk[position+MAX] = ' ';                    // clear old position

To stop at line 66, we establish a breakpoint.

(gdb) break 66
Breakpoint 1 at 0x8048b61: file p11.11.cxx, line 66.

To execute (run) the program, the run command is used. Any values that would normally be passed on the command line are placed after the run command.

(gdb) run 5
Starting program: /home/faculty/gray/revision/11/sailor/p11.11 5
[New Thread 1024 (LWP 3176)]
The drunken sailor walk
     +12345+        <-- 1
     |    *|
[New Thread 2049 (LWP 3193)]
[New Thread 1026 (LWP 3194)]
[New Thread 2051 (LWP 3195)]
[Switching to Thread 1026 (LWP 3194)]
Breakpoint 1, step (a=0xbffffb48) at p11.11.cxx:66
66          walk[position+MAX] = ' ';                    // clear old position
  • (1)General program output.

When the debugger stops at the indicated line, the command info thread can be issued to obtain a wealth of thread information.

(gdb) info thread
  4 Thread 2051 (LWP 3195)  0x420b4b31 in nanosleep () from /lib/i686/libc.so.6
* 3 Thread 1026 (LWP 3194)  step (a=0xbffffb48) at p11.11.cxx:66
  2 Thread 2049 (LWP 3193)  0x420e0037 in poll () from /lib/i686/libc.so.6
  1 Thread 1024 (LWP 3176)  0x420292e5 in sigsuspend () from /lib/i686/libc.so.6

The astute reader will notice a number of things. Thread 1 (the initiating thread) was directed to exit (line 58, pthread_exit(NULL);) but at this juncture still appears to be active. At present, there are four threads associated with the program. The current active thread, identified with an asterisk, is thread ID 3, which is associated with LWP 3194.

The command display variable_name, where variable_name is the name of the variable of interest, directs the debugger to display the current contents of the variable each time a breakpoint is encountered. In the sequence below we have directed the debugger to display the contents of the global variables alive, position, and home before we issue run.

.
.
.
Starting program: /home/faculty/gray/revision/11/sailor/p11.11 5
[New Thread 1024 (LWP 3274)]
The drunken sailor walk

     +12345+
     |    *|
[New Thread 2049 (LWP 3291)]
[New Thread 1026 (LWP 3292)]
[New Thread 2051 (LWP 3293)]
[Switching to Thread 1026 (LWP 3292)]

Breakpoint 1, step (a=0xbffffb48) at p11.11.cxx:66
66          walk[position+MAX] = ' ';                    // clear old position
3: home = false
2: position = 5        <-- 1
1: alive = true
(gdb) cont
Continuing.
     |     $
ZOINK! Stepped off the RIGHT side.
[Switching to Thread 2051 (LWP 3293)]

Breakpoint 1, step (a=0xbffffb40) at p11.11.cxx:66
66          walk[position+MAX] = ' ';                    // clear old position
3: home = false
2: position = 6        <-- 2
1: alive = false
(gdb) cont
Continuing.
     |    *

Breakpoint 1, step (a=0xbffffb40) at p11.11.cxx:66
66          walk[position+MAX] = ' ';                    // clear old position
3: home = false
2: position = 5
1: alive = true
.        <-- 3
.
.
  • (1)At this point the sailor, at position 5, has not reached home and is still alive.

  • (2)Now the sailor is at position 6. He or she has not reached home and is no longer alive. The program should stop here, but it does not.

  • (3)Suddenly, the sailor is at position 5. While still having reached home, the sailor is now alive! Clearly, the thread doing the decrement to the position has performed its activity before the test for being alive was done.

A specific thread can be referenced with the command thread N, where N is the number of the appropriate thread. As shown below, information specific to the thread can be referenced once the thread is loaded.

(gdb) thread 4
[Switching to thread 4 (Thread 2051 (LWP 3342))]#0  step (a=0xbffffb40) at
p11.11.cxx:66
66          walk[position+MAX] = ' ';                     // clear old
                                                            position
(gdb) print *my_arg
$24 = {increment = -1, phrase = 0x8048da0 "SPLAT! Stepped off the LEFT  side.
"}        <-- 1
(gdb) thread 3
[Switching to thread 3 (Thread 1026 (LWP 3341))]#0  step (a=0xbffffb48) at p11.11.cxx:66
66          walk[position+MAX] = ' ';                    // clear old
                                                            position
(gdb) print *my_arg
$25 = {increment = 1, phrase = 0x8048d60 "ZOINK! Stepped off the RIGHT
       side.
"}
  • (1)This is the thread that does the decrement.

Anytime the debugger is stopped, the contents of a mutex can be displayed (assuming it is within the current scope). For example, if we had a mutex called my_lock, its contents before it is acquired would be

(gdb) print my_lock
$1 = {__m_reserved = 0, __m_count = 0, __m_owner = 0x0, __m_kind = 0,
  __m_lock = {__status = 0, __spinlock = 0}}        <-- 1
  • (1)This member is set to 1 when the mutex is locked.

and after it is acquired

(gdb) print my_lock        <-- 1
$2 = {__m_reserved = 0, __m_count = 0, __m_owner = 0x0, __m_kind = 0,
  __m_lock = {__status = 1, __spinlock = 0}}
  • (1)This member is set to 1 when the mutex is locked.

The quit command is used to leave the debugger. An abbreviated listing of gdb commands can be displayed in gdb using the command help. The manual pages on gdb contain a more detailed explanation of how to invoke gdb. On the command line, info gdb provides a wealth of information on how to use gdb (including a fairly detailed sample session).

Summary

A thread is a distinct sequence of steps performed by one or more processes. By definition, all programs contain a single thread of control. Threads, which are created dynamically and execute independently, can be used to provide concurrency at a minimal cost (both in system resources and programming effort). Problems that consist of multiple individual tasks lend themselves to a threaded solution, while problems that are serial in nature do not.

Thread libraries (such as the POSIX thread library) provide programmers with a means for generating and coordinating multiple threads. These threads can be contained within a single process or spread across multiple processes. Threads within a process share most of the process's state information. At a system-implementation level, a thread can be bound (directly associated with an underlying LWP) or unbound. Unbound threads are mapped on an as-needed basis to an LWP from a pool maintained by the system. Threads can be specified as detached. A detached thread cannot be waited upon and exits once it has finished its processing. A nondetached thread can be waited upon (often by its creating thread) and its exiting status obtained. When a thread is created, it is passed a reference to the code it is to execute. Each thread has an associated priority and scheduling algorithm that is used by the system to determine how and when the thread will receive processing time. The actual scheduling of a thread is done by the thread library (if unbound) or by the kernel (if bound). Threads can send and receive signals. In a multithreaded setting, often one thread is designated as the signal catcher. A signal mask is used to specify whether or not a thread will act upon or ignore a particular signal.

Thread activity can be coordinated with mutexes (mutual exclusion locks), condition variables, and semaphores. A mutex is used to lock a specific section of code (often called the critical section) where the integrity of the data accessed must be maintained. The term monitor is sometimes used when the section of code to be locked encompasses the entire module to be executed. Some thread libraries support read/write mutexes. A read/write mutex allows any number of threads to read protected data, but only one thread at a time to write or modify the data. Read/write mutexes are considerably slower than their less complicated brethren. Mutexes that are used by threads in different processes must be mapped to a shared memory location. Condition variables are used to force a thread to block on an arbitrary condition. Multithread-safe semaphores, based on a POSIX4 implementation, can be binary or counting. Semaphores can be used to manage multiple resources. Usually, it is best to lock the minimal number of lines of code needed to maintain data consistency.

If needed, it is possible to have data items in common sections of code that contain a value specific to each thread. Such data, called thread-specific data (TSD), is accessible to all threads but is maintained on a per-thread basis. The system uses a unique key value to determine the data value in a given thread.

Nomenclature and Key Concepts

/usr/bin/time utility

_POSIX_C_SOURCE defined constant

_REENTRANT defined constant

<pthread.h> include file

<semaphore.h> include file

<signal.h> include file

clone system call

condition variable

gdb utility

heavyweight process

kernel-level thread

lightweight process (LWP)

multithreaded

mutex

nanosleep real-time library function

nonpreemptive scheduling

opaque data type

parallelism

POSIX 1003.1 standard

POSIX 1003.1c standard

preemptive scheduling

pthread_attr_getschedparam library function

pthread_attr_init library function

pthread_attr_setschedparam library function

pthread_cleanup_pop library function

pthread_cleanup_pop_restore_np library function

pthread_cleanup_push library function

pthread_cleanup_push_defer_np library function

PTHREAD_COND_ INITIALIZER defined constant

pthread_cond_broadcast library function

pthread_cond_signal library function

pthread_cond_timedwait library function

pthread_cond_wait library function

pthread_create library function

PTHREAD_CREATE_DETACHED defined constant

PTHREAD_CREATE_JOINABLE defined constant

PTHREAD_DESTRUCTOR_ITERATIONS defined constant

pthread_detach library function

PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP

pthread_exit library function

PTHREAD_EXPLICIT_SCHED defined constant

pthread_getschedparam library function

pthread_getspecific library function

PTHREAD_INHERIT_SCHED defined constant

pthread_join library function

pthread_key_create library function

pthread_key_delete library function

pthread_kill library function

pthread_mutex_destroy library function

PTHREAD_MUTEX_ERRORCHECK defined constant

PTHREAD_MUTEX_FAST defined constant

pthread_mutex_init library function

PTHREAD_MUTEX_INITIALIZER defined constant

pthread_mutex_lock library function

PTHREAD_MUTEX_RECURSIVE defined constant

pthread_mutex_t data type

pthread_mutex_trylock library function

pthread_mutex_unlock library function

pthread_mutexattr_init library function

pthread_mutexattr_settype library function

pthread_once library function

PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP

pthread_rwlock_init library function

pthread_rwlock_rdlock library function

pthread_rwlock_t data type

pthread_rwlock_tryrdlock library function

pthread_rwlock_trywrlock library function

pthread_rwlock_unlock library function

pthread_rwlock_wrlock library function

PTHREAD_SCOPE_PROCESS defined constant

PTHREAD_SCOPE_SYSTEM defined constant

pthread_setschedparam library function

pthread_setspecific library function

pthread_sigmask library function

reentrant function

SCHED_FIFO defined constant

sched_get_priority_max library function

sched_get_priority_min library function

SCHED_OTHER defined constant

SCHED_RR defined constant

sched_yield system call

sem_getvalue library function

sem_init library function

sem_post library function

sem_trywait library function

sem_wait library function

setpriority system call

SIG_BLOCK defined constant

SIG_MASK defined constant

SIG_UNBLOCK defined constant

sigaction library function

sigaddset library function

sigdelset library function

sigemptyset library function

sigfillset library function

sigismember library function

sigwait library function

strace utility

thread

thread states

timespec_t data type

user-level thread



[1] For example, calls that would normally block (such as read) are wrapped in code that allows them to be non-blocking.

[2] A reentrant function supports access by multiple threads and maintains the integrity of its data across consecutive calls. Functions that reentrant should not in turn call a function that is non-reentrant.

[3] On occasion, using the -D_POSIX_C_SOURCE flag will get us into trouble, as certain defined data types (such as timeval) do not seem to be officially defined when specifying ANSI C.

[4] This time the e is back in create!

[5] As would be anticipated, locally allocated objects reside on the stack, and their value is undefined when we leave their scope.

[6] If the thread involuntarily terminates, its status information is not relevant.

[7] With POSIX threads the user can issue a cancellation of a specific thread. The thread may have had several cleanup routines associated with it. If one of the associated cleanup routines contains a call to pthread_detach, a subsequent call to pthread_join will fail.

[8] Note the type casting. If necessary, we can also use type casting when passing the function reference, using the less than intuitive typecast (void * (*)()).

[9] For gettimeofday the file <sys/time.h> must be included.

[10] In most versions of UNIX there are several utilities that provide statistics about the amount of time it takes to execute a particular command (or program). The most common of these utilities are time, /usr/bin/time, and timex. Most versions of Linux do not come with timex.

[14] While not shown, we would also remove the lines of code that implement the join. Of course, if the initial thread (process) terminates before all threads have finished, complete output will not be generated.

[15] Linux does not support the constant ENOTSUP directly (which is usually assigned the value 48), but as a patch, defines ENOTSUP in terms of EOPNOTSUPP.

[16] I know, I know—thinking semaphore-wise, you might expect a 1 to indicate unlocked and 0 locked. If the mutex is owned, it is considered locked.

[17] Older version LinuxThreads may not support the pthread_mutexattr_settype call. An equivalent but deprecated call would be

thread_mutexattr_setkind_np(&my_attributes, PTHREAD_MUTEX_RECURSIVE_NP);

[18] In <pthread.h> we find this constant to be defined as {0, 0, 0, PTHREAD_MUTEX_TIMED_NP, __LOCK_INITIALIZER}.

[19] As in previous initialization discussions, the explicit call pthread_rwlock_init can be skipped when we use the single statement approach; that is,

pthread_rwlock_t rw_lock = PTHREAD_RWLOCK_DEFAULT_NP;.

[20] Note that the inc variable is used as a temporay storage location, as in this setting it is not legal to pass a reference to an expression.

[21] If this were a text on just threads, an examination of pthread_once would be in order. However, as this is only an introduction, we will skip the details concerning pthread_once at this time and direct the interested reader to the associated manual page on pthread_once.

[22] I am aware that using vt100 emulation—escape codes will limit the range of platforms on which the example may run. However, as vt100 emulation is fairly ubiquitous, the actual number of platforms excluded should be minimal.

[23] The oversights are the basis for the exercises associated with this program.

[24] Only the command-line version of the debugger will be addressed. GNU also provides a graphical interface for its debugger called xxgdb for those who are working in a windowing environment.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.111.118