454 29.ThreadCommunicationTechniques
bool Aggregator::Push(const Item& item)
{
// From thread local storage.
ProducerConsumerFIFO *fifo = ThreadFifo.GetValue();
assert(fifo);
return (fifo->Push(item));
}
Listing 29.6. Aggregator Push() method.
void Aggregator::InitializeWriterThread()
{
int data_index = GetNextAvailableFifo();
assert((index != -1) && (!"No buffer left"));
ThreadFifo.SetValue(&ThreadFifoTable[data_index]);
}
Listing 29.7. Method used to register writer threads.
bool Aggregator::Pop(Item& item)
{
assert(GetCurrentThreadId() == ReaderThreadIdentifier);
int current_table_index = LastTableIndex;
do
{
current_table_index = (current_table_index + 1) % ThreadCount;
ProducerConsumerFIFO *fifo =
&ThreadDataTable[current_table_index];
if (fifo->Pop(item))
{
LastTableIndex = current_table_index;
return (true);
}
} while (current_table_index != LastTableIndex);
return (false);
}
Listing 29.8. Aggregator Pop() method.
29.4TheDispatcher 455
If no element is available at all after checking each thread’s FIFO, the func-
tion returns false. The function does not wait for an element, and lets the caller
decide what to do if none are available. It can decide to sleep, to do some other
work, or to retry if low latency is important. For example, in a sound system, the
update might process all commands until there are no commands left and then
update the system.
To prevent any other thread from reading from the aggregator, we also regis-
ter the reader thread and store its identifier. The identifiers are then matched in
Pop() in debug mode.
This implementation does not allow more than a fixed number of threads.
But generally in a game application, the number of threads is fixed, and a thread
pool is used, thus limiting the need for a variable-size architecture.
From a performance point of view, care should be taken with memory usage.
Depending on the architecture, different FIFO structures should be placed on dif-
ferent cache lines. To ensure consistency, when a thread writes to a memory slot,
the corresponding line cache is invalidated in all other processor caches. An up-
dated version must be requested by the thread reading the same cache line. The
traffic on the bus can increase substantially, and the waiting threads are stalled by
cache misses. The presented implementation does not take this problem into ac-
count, but an ideal solution would be to align the FIFO items with cache lines,
thus padding the item up to the cache line size so that neighbor items are not
stored in the same cache line.
29.4TheDispatcher
In the case of a single producer and multiple consumers, we can create a dis-
patcher. The dispatcher is a single writer, multiple readers queue. This system
can be used to dispatch work to several threads, and it can be implemented using
the SWSR FIFO. The implementation is similar to that of the aggregator.
29.5TheGateway
The final structure presented here is the gateway. It is a multiple writers, multi-
ple readers queue. This queue is not really a FIFO anymore since a thread must
be used to pop items from the aggregator and push them in the dispatcher. Its role
is quite similar to a scheduler, with the central task of distributing items. This
distribution can process the data before dispatching it. Nothing prevents this cen-
tral thread from reading every possible item, doing some work on them (e.g.,
sorting or filtering), and finally pushing them to the reader threads. As an exam-
456 29.ThreadCommunicationTechniques
ple, in a job system, this thread can pop all jobs in local variables and sort them
by priority before pushing them to each worker thread. It might also try to pre-
vent two ALU-intensive jobs from being assigned to threads that shared the same
ALU.
While implementing this structure, some important questions drive its details.
Do all writer threads also read from the queue? Is the central thread one of the
writer or reader threads? Or is it a special thread that only works on dispatching
the items? No generic implementation of the gateway is practical. A custom im-
plementation is recommended for each system.
29.6Debugging
Multithreading and debugging frequently can’t get along. Every queue we have
presented is made from an SWSR queue. This means that a condition must be
satisfied: for a single SWSR queue, only a single thread can read and only a sin-
gle thread can write. The queue should be able to detect if an unexpected thread
accesses its data. Comparing the ID of the accessing thread with the expected
thread ID and asserting their equality is strongly advised. If some crash continues
to occur despite the correct thread accessing the collection, it might be the barri-
er. Check compiler-generated code and validate that the correct instructions are
issued to prevent reordering (e.g.,
lwsync on the PowerPC).
References
[Acton 2009] Mike Acton. “Problem #1: Increment Problem.” CellPerformance. August
7, 2009. Available at http://cellperformance.beyond3d.com/articles/index.html.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.154.252