How it works...

First, let's talk about the QuickStream structure:

  • The poll_next() function will continuously be invoked, and with each iteration, i's ticks attribute will be decremented by 1
  • Polling will stop when the ticks attribute reaches 0 and returns futures::Async::Ready<None>

Within the quick_streams() function:

  • We build a futures::task::Context by using futures::future::poll_on(f: FnMut(|cx: Context|)), so that we can explicitly invoke QuickStream's poll_next() function on lines 42 and 50
  • Since we have declared 10 ticks on line 38, our first two block_on's poll_next() calls should yield 9 and 8
  • The next block_on call, on line 57, will keep polling QuickStream until futures::Async::Ready<None> is returned from the ticks attribute equaling zero

Within iterate_streams():

  • futures::stream::iter_ok will convert an Iterator into a Stream, which will always be ready to return the next value
  • futures::stream::iter_result does the same thing as iter_ok, except we use Result values instead of Ok values
  • On lines 78 through 89, we iterate through the stream's results and print out some information depending on whether the value was Ok or an Error type. If the None type has been returned from our stream, then we will break the loop
  • Lines 92 through 95 show an alternative way of iterating through a stream's Ok results by using the into_iter() calls
  • Lines 97 through 99 show an alternative way of iterating through a stream's Result return types
Loops, iterated results, and collect() calls are synchronous. We used this functions for demonstrative/educational purposes only. Combinators such as map(), filter(), and_then(), etc. would be used in a real application for streams and channels.

The channel_threads() function:

  • On line 107, we define the maximum number of sends we want to attempt.
  • On line 108, we declare a channel to send messages to. Channel capacity is the buffer size (the argument of futures::channel::mpsc::channel) + the number of senders (each sender is guaranteed a slot within the channel). Channels will return a futures::channel::mpsc::Receiver<T>, which implements the Stream trait, and a futures::channel::mpsc::Sender<T>, which implements the Sink trait.
  • Lines 110 through 120 is where we spawn a thread and attempt to send 10 signals, looping until each send is sent successfully.
  • We collect, and display, our results on line 122 through 125, and join our threads on line 127.

The channel_error() section:

  • On line 131, we declare our channel with a 0 usize buffer as the argument, which gives us one slot for the initial sender
  • We send the first message successfully on line 133
  • Lines 136 through 139 should fail, since we are trying to send a message to a channel that is considered full (since we did not receive the value, drop the initial sender, flush the stream, and so on)
  • On line 146, we use the sender's futures::channel::mpsc::Sender::try_send(&mut self, msg: T) functions, which won't block our thread unless we don't drop/invoke the sender's destroyer method using drop(T) on line 147
  • Polling the stream any additional times after receiving the last value will always return None

Next, the channel_buffer() function:

  • We set up a future closure with poll_fn() on line 162.
  • We check to see if our sender is ready to be polled with its futures::sink::poll_ready(&mut self, cx: &mut futures::task::Context) method on lines 163 through 165.
  • Sinks have a method called futures::sink::start_send(&mut self, item: <Self as Sink>::SinkItem) -> Result<(), <Self as Sink>::SinkError>, which prepares the message to be delivered, but won't until we flush or close the sink. poll_flush() is often used to guarantee that every message has been sent from the sink.
  • Polling the stream for the next value will also alleviate space within the sink/sender using the futures::stream::poll_next(&mut self, cx: &mut futures::task::Context) method, as we have done on line 179.
  • We can check if our sender is ready, as we have done on line 182 using the futures::Async::is_ready(&self) -> bool method.
  • Our final value should be 22 and displayed to the console from line 192.

Then the channel_threads_blocking() function:

  • First, we set up our channels on lines 201 and 202.
  • Then we spawn a thread that will map all of tx_2's errors into a panic! (line 205), and then we send the value of 10 to our first channel while joining a second sender with the () value (line 206). On line 208, we send the value of 30 and another empty value () to our second channel.
  • On line 211 we poll the second channel, which would hold a value of ().
  • On line 212 we poll the first channel, which would hold a value of 10.
  • We drop the second channel's receiver (line 215), since we need to close or flush for the second tx_2.send() call on line 208 (tx_2 is known as variable b on this line).
  • After performing the drop, we can finally return our second value from the first channel's sender, which should be 30.

And the channel_unbounded() function:

  • On line 225 we declare an unbounded channel, which means that sending messages to this channel will always succeed as long as the receiver is not closed. Messages will be buffered on an as-needed basis, and since this channel is unbounded, our application can exhaust our available memory.
  • Lines 227 through 232 spawn a thread that collects all of the receiver's messages (line 228), and we iterate through them on line 229. The item on line 230 is a tuple of the index in which the message was received and the message's value (in our case, this is always 1).
  • Lines 237 through 241 is what will spawn the number of threads (using the MAX_THREADS constant) as well as the number of times that we want to send per thread using the MAX_THREADS constant.
  • Lines 244 we will drop (which closes) the channel's sender so that we may collect all of the messages from line 228.
  • We join the spawned thread with our current thread on line 246, which will execute the collection and iterations commands (lines 228 through 231).
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.226.121