449
29
ThreadCommunicationTechniques
Julien Hamaide
Fishing Cactus
Multithreaded systems are common nowadays. They usually involve synchroni-
zation and lock primitives implemented at the operating system level. Working
with those primitives is far from trivial and can lead to problems such as dead-
locks. This chapter introduces communication techniques that do not use such
primitives and rely on simple concepts. They can be applied to a lot of common
threading systems.
29.1LatencyandThreading
When writing multithreaded code, care should be taken to protect all data that
could be accessed simultaneously by several threads. Most of the time, by using a
mutex or a critical section, the problem is avoided. But this approach has a cost:
threads may stall waiting for a resource to be released. If the resource is heavily
accessed, it may put all threads to sleep, wasting cycles. On the other hand, this
approach has a low latency. When working on protected data, the thread waits
until the action can be executed and then executes it right away. But does the ac-
tion really need it to be executed instantaneously? Latency of some actions is not
critical (e.g., playing a sound). Fortunately, few problems in video games need a
very low-latency treatment. While writing multithreaded code, don’t try to solve
the general problem. Solve your particular problem. For example, if we use a
thread pool, we know the number of threads is limited and that a thread won’t be
destroyed.
Communication among threads is accomplished by simply sending data from
one thread to another, whether by using an operating system primitive, such as a
semaphore, or a piece of memory. A common practice is to send data through
shared variables, variables that should be protected by primitives such as critical
450 29.ThreadCommunicationTechniques
sections. But most of the time, those variables are falsely shared. If a first-in-
first-out (FIFO) queue is used to communicate commands between threads, the
item count is a shared variable. If only a single thread writes to the collection,
and only a single thread is reading from the collection, should both threads share
that variable? The item count can always be expressed with two variables—one
that counts the inserted element and one that counts the removed element, the
difference being the item currently in the queue. This strategy is used in the sim-
ple structures presented here.
29.2SingleWriter,SingleReader
This problem is the classic producer-consumer model. A thread produces items
that are consumed by another thread. It is this example that is generally employed
to teach how semaphores are used. The goal is to provide a single writer, single
reader (SWSR) FIFO queue, without using synchronization objects. A fixed-size
item table is preallocated. This table serves as memory storage for the data trans-
fer. Two indices are used, one for the first object to be popped and the other for
the storage of the next item to be pushed. The object is “templatized” with item
type and maximum item count, as shown in Listing 29.1.
A variable must not be written to by two threads at the same time, but noth-
ing prevents two threads from reading that variable concurrently. If the producer
thread only writes to
WriteIndex, and the consumer thread only writes to Read-
Index
, then there should be no conflict [Acton 2009]. The code shown in List-
ing 29.2 introduces the
Push() method, used by the producer to add an item at
the end of the queue.
The method first tests if there is space available. If no, it simply returns false,
letting the caller decide what to do (retry after some sleep, delay to next frame,
etc.). If some space is available, the item is copied to the local item table using
the
WriteIndex, and then a write barrier is inserted. Finally, the WriteIndex is
incremented. The call to
WriteBarrier() is the most important step of this
method. It prevents both the compiler and CPU from reordering the writes to
template <typename Item, int ItemCount> class ProducerConsumerFIFO
{
Item ItemTable[ItemCount];
volatile unsigned int ReadIndex, WriteIndex;
};
Listing 29.1. FIFO class definition.
29.2SingleWriter,SingleReader 451
bool ProducerConsumerFIFO::Push(const Item& item)
{
if (!IsFull())
{
unsigned int index = WriteIndex;
ItemTable[index % ItemCount] = item;
WriteBarrier();
WriteIndex = index + 1;
return (true);
}
return (false);
}
Listing 29.2. FIFO Push() method.
memory and ensures the item is completely copied to memory before the index is
incremented. Let’s imagine that the CPU had reordered the writes and the
WriteIndex has already updated to its new value, but the item is not yet copied.
If at the same time, the consumer queries the
WriteIndex and detects that a new
item is available, it might start to read the item, although it is not completely
copied.
When the
WriteIndex is incremented, the modulo operator is not applied to
it. Instead, the modulo operator is applied only when accessing the item table.
The
WriteIndex is then equal to the number of items ever pushed onto the
queue, and the
ReadIndex is equal to the number of items ever popped off the
queue. So the difference between these is equal to the number of items left in the
queue. As shown in Listing 29.3, the queue is full if the difference between the
indices is equal to the maximum item count. If the indices wrap around to zero,
the difference stays correct as long as indices are unsigned. If
ItemCount is not a
power of two, the modulo operation is implemented with a division. Otherwise,
the modulo can be implemented as
index & (ItemCount - 1).
bool ProducerConsumerFIFO::IsFull() const
{
return ((WriteIndex - ReadIndex) == ItemCount);
}
Listing 29.3. FIFO IsFull() method.
452 29.ThreadCommunicationTechniques
bool ProducerConsumerFIFO::Pop(Item& item)
{
if (!IsEmpty())
{
unsigned int index = ReadIndex;
item = ItemTable[index % ItemCount];
ReadBarrier();
ReadIndex = index + 1;
return (true);
}
return (false);
}
bool IsEmpty() const
{
return (WriteIndex == ReadIndex);
}
Listing 29.4. FIFO Pop() method.
The Push() method only writes to WriteIndex and ItemTable. The write
index is only written to by the producer thread, so no conflict can appear. But the
item table is shared by both threads. That’s why we check that the queue is not
full before copying the new item.
At the other end of the queue, the
Pop() method is used by the consumer
thread (see Listing 29.4). A similar mechanism is used to pop an item. The
Is-
Empty()
method ensures there is an item available. If so, the item is read. A read
barrier is then inserted, ensuring
ReadIndex is written to after reading the item.
If incremented
ReadIndex was stored before the end of the copy, the item con-
tained in the item table might be changed by another
Push().
A question that may arise is what happens if the consumer has already con-
sumed an item, but the read index is not updated yet? Nothing important—the
producer still thinks the item is not consumed yet. This is conservative. At worst,
the producer misses a chance to push its item. The latency of this system is then
dependent on the traffic.
For this technique to work, a fundamental condition must be met: writing an
index to the shared memory must be atomic. For x86 platforms, 32-bit and 64-bit
writes are atomic. On PowerPC platforms, only 32-bit writes are atomic. But on
29.3TheAggregator 453
the PlayStation 3 SPU, there are no atomic writes under 128 bytes! That means
each index must use 128 bytes, even though only four bytes are really used. If
some undefined behavior occurs, always verify the generated code.
We now have a simple SWSR FIFO queue using no locks. Let’s see what can
be built with this structure.
29.3TheAggregator
By using the structure we have introduced, an aggregator can be created. This
structure allows several threads to queue items for a single receiver thread. This
kind of system can be used as a command queue for some subsystems (e.g.,
sound system, streaming system). While commands might not be treated in the
order they are queued, commands inserted by a single thread are processed in
order. For example, if you create a sound, then set its volume, and finally play it,
the pushed commands are processed in order. As shown by Listing 29.5, the ag-
gregator is built upon the FIFO queue described earlier.
Each pushing thread is assigned to a SWSR FIFO. This FIFO must be acces-
sible to the thread when it pushes an item, and this can be accomplished through
thread local storage or a simple table containing a mapping between thread ID
and its assigned FIFO. Listing 29.6 shows the
Push() method using a thread lo-
cal storage. This code assumes that a FIFO has already been assigned to the
thread. To assign it, a registration method is called from each producer thread.
This registration method stores the pointer of the FIFO assigned to the caller
thread in the thread local storage, as shown in Listing 29.7. The
GetNextAvaila-
bleFifo()
function returns the next available FIFO and asserts whether any are
available.
Listing 29.8 presents the consumer code. It needs to select an item from all
threads using the most equitable repartition. In this example, a simple round-
robin algorithm is used, but this algorithm might be adapted to special situations.
Once again, it depends on your problem and your data.
template <typename Item, int ItemCount, int ThreadCount>
class Aggregator
{
ProducerConsumerFIFO<Item, ItemCount> ThreadFifoTable[ThreadCount];
};
Listing 29.5. Aggregator class definition.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.200.71