Non-blocking data structures

One of the issues we saw earlier was that if we wanted to share something more complex than an integer or a Boolean between threads and if we wanted to mutate it, we needed to use a Mutex. This is not entirely true, since one crate, Crossbeam, allows us to use great data structures that do not require locking a Mutex. They are therefore much faster and more efficient.

Often, when we want to share information between threads, it's usually a list of tasks that we want to work on cooperatively. Other times, we want to create information in multiple threads and add it to a list of information. It's therefore not so usual for multiple threads to be working with exactly the same variables, since as we have seen, that requires synchronization and it will be slow.

This is where Crossbeam shows all its potential. Crossbeam gives us some multithreaded queues and stacks, where we can insert data and consume data from different threads. We can, in fact, have some threads doing an initial processing of the data and others performing a second phase of the processing. Let's see how we can use these features. First, add crossbeam to the dependencies of the crate in the Cargo.toml file. Then, we start with a simple example:

extern crate crossbeam;

use std::thread;
use std::sync::Arc;

use crossbeam::sync::MsQueue;

fn main() {
let queue = Arc::new(MsQueue::new());

let handles: Vec<_> = (1..6)
.map(|_| {
let t_queue = queue.clone();
thread::spawn(move || {
for _ in 0..1_000_000 {
t_queue.push(10);
}
})
})
.collect();

for handle in handles {
handle.join().unwrap();
}

let final_queue = Arc::try_unwrap(queue).unwrap();
let mut sum = 0;
while let Some(i) = final_queue.try_pop() {
sum += i;
}

println!("Final sum: {}", sum);
}

Let's first understand what this example does. It will iterate 1,000,000 times in 5 different threads, and each time it will push a 10 to a queue. Queues are FIFO lists, first input, first output. This means that the first number entered will be the first one to pop() and the last one will be the last to do so. In this case, all of them are a 10, so it doesn't matter.

Once the threads finish populating the queue, we iterate over it and we add all the numbers. A simple computation should make you able to guess that if everything goes perfectly, the final number should be 50,000,000. If you run it, that will be the result, and that's not all. If you run it by executing cargo run --release, it will run blazingly fast. On my computer, it took about one second to complete. If you want, try to implement this code with the standard library Mutex and vector, and you will see that the performance difference is amazing.

As you can see, we still needed to use an Arc to control the multiple references to the queue. This is needed because the queue itself cannot be duplicated and shared, it has no reference count.

Crossbeam not only gives us FIFO queues. We also have LIFO stacks. LIFO comes from last input, first output, and it means that the last element you inserted in the stack will be the first one to pop(). Let's see the difference with a couple of threads:

extern crate crossbeam;

use std::thread;
use std::sync::Arc;
use std::time::Duration;

use crossbeam::sync::{MsQueue, TreiberStack};

fn main() {
let queue = Arc::new(MsQueue::new());
let stack = Arc::new(TreiberStack::new());

let in_queue = queue.clone();
let in_stack = stack.clone();
let in_handle = thread::spawn(move || {
for i in 0..5 {
in_queue.push(i);
in_stack.push(i);
println!("Pushed :D");
thread::sleep(Duration::from_millis(50));
}
});

let mut final_queue = Vec::new();
let mut final_stack = Vec::new();

let mut last_q_failed = 0;
let mut last_s_failed = 0;

loop {
// Get the queue
match queue.try_pop() {
Some(i) => {
final_queue.push(i);
last_q_failed = 0;
println!("Something in the queue! :)");
}
None => {
println!("Nothing in the queue :(");
last_q_failed += 1;
}
}

// Get the stack
match stack.try_pop() {
Some(i) => {
final_stack.push(i);
last_s_failed = 0;
println!("Something in the stack! :)");
}
None => {
println!("Nothing in the stack :(");
last_s_failed += 1;
}
}

// Check if we finished
if last_q_failed > 1 && last_s_failed > 1 {
break;
} else if last_q_failed > 0 || last_s_failed > 0 {
thread::sleep(Duration::from_millis(100));
}
}

in_handle.join().unwrap();

println!("Queue: {:?}", final_queue);
println!("Stack: {:?}", final_stack);
}

As you can see in the code, we have two shared variables: a queue and a stack. The secondary thread will push new values to each of them, in the same order, from 0 to 4. Then, the main thread will try to get them back. It will loop indefinitely and use the try_pop() method. The pop() method can be used, but it will block the thread if the queue or the stack is empty. This will happen in any case once all values get popped, since no new values are being added, so the try_pop() method will help not to block the main thread and end gracefully.

The way it checks whether all the values were popped is by counting how many times it failed to pop a new value. Every time it fails, it will wait for 100 milliseconds, while the push thread only waits for 50 milliseconds between pushes. This means that if it tries to pop new values two times and there are no new values, the pusher thread has already finished.

It will add values as they are popped to two vectors and then print the result. In the meantime, it will print messages about pushing and popping new values. You will understand this better by seeing the output:

Note that the output can be different in your case, since threads don't need to be executed in any particular order.

In this example output, as you can see, it first tries to get something from the queue and the stack but there is nothing there, so it sleeps. The second thread then starts pushing things, two numbers actually. After this, the queue and the stack will be [0, 1]. Then, it pops the first item from each of them. From the queue, it will pop the 0 and from the stack it will pop the 1 (the last one), leaving the queue as [1] and the stack as [0].

It will go back to sleep and the secondary thread will insert a 2 in each variable, leaving the queue as [1, 2] and the stack as [0, 2]. Then, the main thread will pop two elements from each of them. From the queue, it will pop the 1 and the 2, while from the stack it will pop the 2 and then the 0, leaving both empty.

The main thread then goes to sleep, and for the next two tries, the secondary thread will push one element and the main thread will pop it, twice.

It might seem a little bit complex, but the idea is that these queues and stacks can be used efficiently between threads without requiring a Mutex, and they accept any Send type. This means that they are great for complex computations, and even for multi-staged complex computations.

The Crossbeam crate also has some helpers to deal with epochs and even some variants of the mentioned types. For multithreading, Crossbeam also adds a great utility: scoped threads.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.244.250