How it works...

We are creating an application that updates all elements of an array using multiple worker threads. For expensive update operations, this approach can result in substantial performance gains on a multi-core platform.

The difficulty is sharing the work between multiple worker threads, given that each of them may require a different amount of time to process a data element.

We use a shared_index atomic variable to store an index of the next element that has not yet been claimed by any of the worker threads. This variable, along with the array to be processed, is declared as a global variable:

std::atomic<size_t> shared_index{0};
std::vector<int> data;

Our worker function resembles the worker function from earlier recipes but has important differences. Firstly, it has an additional parameter, timeout. This is used to simulate differences in the time required to process each element.

Secondly, instead of a fixed number of iterations, our worker threads run in a loop until the  shared_index variable reaches the maximum value. This indicates that all elements were processed, and the worker can terminate.

On each iteration, a worker reads the value of shared_index. If there are elements to process, it stores the value of the shared_index variable in a local worker_index variable and increments the shared_index variable at the same time.

Though it is possible to use an atomic variable in the same way as a regular variable—first, get its current value, and then increment the variable—it can lead to a race condition. Both worker threads can read the variable at almost the same time. In this case, both of them get the same value, then start processing the same element, interfering with each other. That is why we use a special method, fetch_add, which increments the variable and returns the value it had before the increment as a single, non-interruptible action:

size_t worker_index = shared_index.fetch_add(1);

If the worker_index variable reaches the size of the array, it means that all elements were processed, and the worker can terminate:

if (worker_index >= data.size()) {
break;
}

If the worker_index variable is valid, it is used by the worker to update the value of the array element by this index. In our case, we just multiply it by 2:

data[worker_index] = data[worker_index] * 2;

To simulate expensive data operation, we use a custom delay. The duration of the delay is determined by the timeout parameter:

std::this_thread::sleep_for(std::chrono::milliseconds(timeout));

In the main function, we add elements to process into the data vector. We use a loop to populate the vector with numbers from zero to nine:

for (int i = 0; i < 10; i++) {
data.emplace_back(i);
}

After the initial dataset is ready, we create two worker threads, providing the index and the timeout parameters. Different timeouts of the worker thread are used to simulate different performances:

 std::thread worker1(worker, 1, 50);
std::thread worker2(worker, 2, 20);

Then, we wait till both worker threads complete their jobs, and print the result to the console. When we build and run our application, we get the following output:

As we can see, Worker 2 has processed more elements than Worker 1 because its timeout was 20 milliseconds, compared to the 50 milliseconds of Worker 1. Also, all elements were processed without omissions and repetitions, as intended.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.178.133