How it works...

As explained in the comments of the code, calling std::sync::mpsc::channel() generates a tuple consisting of a Sender and a Receiver, which are conventionally called tx for transmission and rx for reception [12].

This naming convention doesn't come from Rust, but has been a standard in the telecommunications industry since at least 1960 when the RS-232 (Recommended Standard 232) was introduced, detailing how computers and modems should communicate with each other.

These two halves of the same channel can communicate with each other independently of the current thread they're in. The module's name, mspc, tells us that this channel is a Multi-producer, single-consumer channel, which means that we can clone our sender as many times as we want. We can use this fact to our advantage when dealing with closures [16 to 21]:

for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
println!("sending: {}", i);
tx.send(i).expect("Disconnected from receiver");
});
}

We do not need to wrap our sender in an Arc, because it natively supports arbitrary cloning! Inside of the closure you can see the sender's main functionality. The send() method sends data across threads to the receiver. It will return an error if the receiver is not available anymore, as in when it is dropped too early. In this thread here, we will simply send the numbers 0 to 9 concurrently to the receiver. One thing to note is that because a channel's halves are statically typed, they are only going to be able to send one specific data type around. If the first thing you send is an i32, your channel will only work with i32. If you send a String, it will be a String channel.

On to the receiver we go [23 to 28]:

for _ in 0..10 {
let msg = rx.recv().expect("Disconnected from sender");
println!("received: {}", msg);
}

The recv() method, which stands for receive, blocks the current thread until a message has arrived. Similar to its counterpart, it returns an error if the sender is unavailable. Because we know that we only sent 10 messages, we only call it 10 times. There is no need to explicitly join the threads we created for the sender, because recv() blocked the main thread until no more messages were left, which means that the sender finished sending all they had to send, that is, all the threads already finished their job. This way, we already joined them.

But in real life, you do not have a guarantee about the amount of times a client will send information to you. For a more realistic demonstration, we will now create a thread that sends random messages [37] to the receiver until it finally has enough and quits by sending "Goodbye!" [48]. Note how we created a new channel pair, as the old one was set to the type i32  because integer literals such as 1 or 2 are treated as i32 by default.

While the sending code looks almost identical to the one before, the receiving end looks a bit different [55 to 57]:

    for msg in rx {
println!("received: {}", msg);
}

As you can see, a receiver can be iterated over. It behaves like an infinite iterator over all messages that will ever come, blocking when waiting for a new one, similar to calling recv() in a loop. The difference is that the iteration will automatically stop when the sender is unavailable. Because we terminate the sending thread when it sends "Goodbye!" [48], this iteration over the receiver will also stop when receiving it, as the sender will have been dropped at that point. Because this means that we have a guarantee about the sending thread being finished, we do not need to join it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.107.193