How to do it...

  1. Inside the bin folder, create a new file called streams.rs.
  2. Add the following code and run it with cargo run --bin streams:
1   extern crate futures;
2   
3   use std::thread;
4   
5   use futures::prelude::*;
6   use futures::executor::block_on;
7   use futures::future::poll_fn;
8   use futures::stream::{iter_ok, iter_result};
9   use futures::channel::mpsc;
  1. Now, let's add our constants, implementations, and so on:
11  #[derive(Debug)]
12  struct QuickStream {
13    ticks: usize,
14  }
15  
16  impl Stream for QuickStream {
17    type Item = usize;
18    type Error = Never;
19  
20    fn poll_next(&mut self, _cx: &mut task::Context) -> 
Poll<Option, Self::Error> { 21 match self.ticks { 22 ref mut ticks if *ticks > 0 => { 23 *ticks -= 1; 24 println!("Ticks left on QuickStream: {}", *ticks); 25 Ok(Async::Ready(Some(*ticks))) 26 } 27 _ => { 28 println!("QuickStream is closing!"); 29 Ok(Async::Ready(None)) 30 } 31 } 32 } 33 } 34 35 const FINISHED: Result<Async<()>, Never> = Ok(Async::Ready(()));
  1. Our quick_streams example would be:
37  fn quick_streams() {
38    let mut quick_stream = QuickStream { ticks: 10 };
39  
40    // Collect the first poll() call
41    block_on(poll_fn(|cx| {
42        let res = quick_stream.poll_next(cx).unwrap();
43        println!("Quick stream's value: {:?}", res);
44        FINISHED
45      }))
46      .unwrap();
47  
48    // Collect the second poll() call
49    block_on(poll_fn(|cx| {
50        let res = quick_stream.poll_next(cx).unwrap();
51        println!("Quick stream's next svalue: {:?}", res);
52        FINISHED
53      }))
54      .unwrap();
55  
56    // And now we should be starting from 7 when collecting the 
rest of the stream 57 let result: Vec<_> =
block_on(quick_stream.collect()).unwrap(); 58 println!("quick_streams final result: {:?}", result); 59 }
  1. There are several ways to iterate through streams; let's add them to our code base:
61  fn iterate_streams() {
62    use std::borrow::BorrowMut;
63  
64    let stream_response = vec![Ok(5), Ok(7), Err(false), Ok(3)];
65    let stream_response2 = vec![Ok(5), Ok(7), Err(false), Ok(3)];
66  
67    // Useful for converting any of the `Iterator` traits into a 
`Stream` trait. 68 let ok_stream = iter_ok::<_, ()>(vec![1, 5, 23, 12]); 69 let ok_stream2 = iter_ok::<_, ()>(vec![7, 2, 14, 19]); 70 71 let mut result_stream = iter_result(stream_response); 72 let result_stream2 = iter_result(stream_response2); 73 74 let ok_stream_response: Vec<_> =
block_on(ok_stream.collect()).unwrap(); 75 println!("ok_stream_response: {:?}", ok_stream_response); 76 77 let mut count = 1; 78 loop { 79 match block_on(result_stream.borrow_mut().next()) { 80 Ok((res, _)) => { 81 match res { 82 Some(r) => println!("iter_result_stream result #{}:
{}", count, r), 83 None => { break } 84 } 85 }, 86 Err((err, _)) => println!("iter_result_stream had an
error #{}: {:?}", count, err), 87 } 88 count += 1; 89 } 90 91 // Alternative way of iterating through an ok stream 92 let ok_res: Vec<_> = block_on(ok_stream2.collect()).unwrap(); 93 for ok_val in ok_res.into_iter() { 94 println!("ok_stream2 value: {}", ok_val); 95 } 96 97 let (_, stream) = block_on(result_stream2.next()).unwrap(); 98 let (_, stream) = block_on(stream.next()).unwrap(); 99 let (err, _) = block_on(stream.next()).unwrap_err(); 100 101 println!("The error for our result_stream2 was: {:?}", err); 102 103 println!("All done."); 104 }
  1. And now for our channeling example:
106 fn channel_threads() {
107   const MAX: usize = 10;
108   let (mut tx, rx) = mpsc::channel(0);
109 
110   let t = thread::spawn(move || {
111     for i in 0..MAX {
112       loop {
113         if tx.try_send(i).is_ok() {
114           break;
115         } else {
116           println!("Thread transaction #{} is still pending!", i);
117         }
118       }
119     }
120   });
121 
122   let result: Vec<_> = block_on(rx.collect()).unwrap();
123   for (index, res) in result.into_iter().enumerate() {
124     println!("Channel #{} result: {}", index, res);
125   }
126 
127   t.join().unwrap();
128 }
  1. Dealing with errors and channels can be done as follows:
130 fn channel_error() {
131   let (mut tx, rx) = mpsc::channel(0);
132 
133   tx.try_send("hola").unwrap();
134 
135   // This should fail
136   match tx.try_send("fail") {
137     Ok(_) => println!("This should not have been successful"),
138     Err(err) => println!("Send failed! {:?}", err),
139   }
140 
141   let (result, rx) = block_on(rx.next()).ok().unwrap();
142   println!("The result of the channel transaction is: {}",
143        result.unwrap());
144 
145   // Now we should be able send to the transaction since we 
poll'ed a result already 146 tx.try_send("hasta la vista").unwrap(); 147 drop(tx); 148 149 let (result, rx) = block_on(rx.next()).ok().unwrap(); 150 println!("The next result of the channel transaction is: {}", 151 result.unwrap()); 152 153 // Pulling more should result in None 154 let (result, _) = block_on(rx.next()).ok().unwrap(); 155 println!("The last result of the channel transaction is:
{:?}", 156 result); 157 }
  1. We can even work with buffers and channels together. Let's add our channel_buffer function:
159 fn channel_buffer() {
160   let (mut tx, mut rx) = mpsc::channel::(0);
161 
162   let f = poll_fn(move |cx| {
163     if !tx.poll_ready(cx).unwrap().is_ready() {
164       panic!("transactions should be ready right away!");
165     }
166 
167     tx.start_send(20).unwrap();
168     if tx.poll_ready(cx).unwrap().is_pending() {
169       println!("transaction is pending...");
170     }
171 
172     // When we're still in "Pending mode" we should not be able
173     // to send more messages/values to the receiver
174     if tx.start_send(10).unwrap_err().is_full() {
175       println!("transaction could not have been sent to the 
receiver due 176 to being full..."); 177 } 178 179 let result = rx.poll_next(cx).unwrap(); 180 println!("the first result is: {:?}", result); 181 println!("is transaction ready? {:?}", 182 tx.poll_ready(cx).unwrap().is_ready()); 183 184 // We should now be able to send another message
since we've pulled 185 // the first message into a result/value/variable. 186 if !tx.poll_ready(cx).unwrap().is_ready() { 187 panic!("transaction should be ready!"); 188 } 189 190 tx.start_send(22).unwrap(); 191 let result = rx.poll_next(cx).unwrap(); 192 println!("new result for transaction is: {:?}", result); 193 194 FINISHED 195 }); 196 197 block_on(f).unwrap(); 198 }
  1. Just because we're using the futures crate doesn't mean everything has to be concurrent. Add the following example to demonstrate how to block with channels:
200 fn channel_threads_blocking() {
201   let (tx, rx) = mpsc::channel::(0);
202   let (tx_2, rx_2) = mpsc::channel::<()>(2);
203 
204   let t = thread::spawn(move || {
205     let tx_2 = tx_2.sink_map_err(|_| panic!());
206     let (a, b) = 
block_on(tx.send(10).join(tx_2.send(()))).unwrap(); 207 208 block_on(a.send(30).join(b.send(()))).unwrap(); 209 }); 210 211 let (_, rx_2) = block_on(rx_2.next()).ok().unwrap(); 212 let (result, rx) = block_on(rx.next()).ok().unwrap(); 213 println!("The first number that we sent was: {}",
result.unwrap()); 214 215 drop(block_on(rx_2.next()).ok().unwrap()); 216 let (result, _) = block_on(rx.next()).ok().unwrap(); 217 println!("The second number that we sent was: {}",
result.unwrap()); 218 219 t.join().unwrap(); 220 }
  1. Sometimes we'll need concepts such as unbounded channels; let's add our channel_unbounded function:
222 fn channel_unbounded() {
223   const MAX_SENDS: u32 = 5;
224   const MAX_THREADS: u32 = 4;
225   let (tx, rx) = mpsc::unbounded::();
226 
227   let t = thread::spawn(move || {
228     let result: Vec<_> = block_on(rx.collect()).unwrap();
229     for item in result.iter() {
230       println!("channel_unbounded: results on rx: {:?}", item);
231     }
232   });
233 
234   for _ in 0..MAX_THREADS {
235     let tx = tx.clone();
236 
237     thread::spawn(move || {
238       for _ in 0..MAX_SENDS {
239         tx.unbounded_send(1).unwrap();
240       }
241     });
242   }
243 
244   drop(tx);
245 
246   t.join().ok().unwrap();
247 }
  1. And now we can add our main function:
249 fn main() {
250   println!("quick_streams():");
251   quick_streams();
252 
253   println!("
iterate_streams():");
254   iterate_streams();
255 
256   println!("
channel_threads():");
257   channel_threads();
258 
259   println!("
channel_error():");
260   channel_error();
261 
262   println!("
channel_buffer():");
263   channel_buffer();
264 
265   println!("
channel_threads_blocking():");
266   channel_threads_blocking();
267 
268   println!("
channel_unbounded():");
269   channel_unbounded();
270 }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.131.38.210