- Inside the bin folder, create a new file called streams.rs.
- Add the following code and run it with cargo run --bin streams:
1 extern crate futures; 2 3 use std::thread; 4 5 use futures::prelude::*; 6 use futures::executor::block_on; 7 use futures::future::poll_fn; 8 use futures::stream::{iter_ok, iter_result}; 9 use futures::channel::mpsc;
- Now, let's add our constants, implementations, and so on:
11 #[derive(Debug)] 12 struct QuickStream { 13 ticks: usize, 14 } 15 16 impl Stream for QuickStream { 17 type Item = usize; 18 type Error = Never; 19 20 fn poll_next(&mut self, _cx: &mut task::Context) ->
Poll<Option, Self::Error> { 21 match self.ticks { 22 ref mut ticks if *ticks > 0 => { 23 *ticks -= 1; 24 println!("Ticks left on QuickStream: {}", *ticks); 25 Ok(Async::Ready(Some(*ticks))) 26 } 27 _ => { 28 println!("QuickStream is closing!"); 29 Ok(Async::Ready(None)) 30 } 31 } 32 } 33 } 34 35 const FINISHED: Result<Async<()>, Never> = Ok(Async::Ready(()));
- Our quick_streams example would be:
37 fn quick_streams() { 38 let mut quick_stream = QuickStream { ticks: 10 }; 39 40 // Collect the first poll() call 41 block_on(poll_fn(|cx| { 42 let res = quick_stream.poll_next(cx).unwrap(); 43 println!("Quick stream's value: {:?}", res); 44 FINISHED 45 })) 46 .unwrap(); 47 48 // Collect the second poll() call 49 block_on(poll_fn(|cx| { 50 let res = quick_stream.poll_next(cx).unwrap(); 51 println!("Quick stream's next svalue: {:?}", res); 52 FINISHED 53 })) 54 .unwrap(); 55 56 // And now we should be starting from 7 when collecting the
rest of the stream 57 let result: Vec<_> =
block_on(quick_stream.collect()).unwrap(); 58 println!("quick_streams final result: {:?}", result); 59 }
- There are several ways to iterate through streams; let's add them to our code base:
61 fn iterate_streams() { 62 use std::borrow::BorrowMut; 63 64 let stream_response = vec![Ok(5), Ok(7), Err(false), Ok(3)]; 65 let stream_response2 = vec![Ok(5), Ok(7), Err(false), Ok(3)]; 66 67 // Useful for converting any of the `Iterator` traits into a
`Stream` trait. 68 let ok_stream = iter_ok::<_, ()>(vec![1, 5, 23, 12]); 69 let ok_stream2 = iter_ok::<_, ()>(vec![7, 2, 14, 19]); 70 71 let mut result_stream = iter_result(stream_response); 72 let result_stream2 = iter_result(stream_response2); 73 74 let ok_stream_response: Vec<_> =
block_on(ok_stream.collect()).unwrap(); 75 println!("ok_stream_response: {:?}", ok_stream_response); 76 77 let mut count = 1; 78 loop { 79 match block_on(result_stream.borrow_mut().next()) { 80 Ok((res, _)) => { 81 match res { 82 Some(r) => println!("iter_result_stream result #{}:
{}", count, r), 83 None => { break } 84 } 85 }, 86 Err((err, _)) => println!("iter_result_stream had an
error #{}: {:?}", count, err), 87 } 88 count += 1; 89 } 90 91 // Alternative way of iterating through an ok stream 92 let ok_res: Vec<_> = block_on(ok_stream2.collect()).unwrap(); 93 for ok_val in ok_res.into_iter() { 94 println!("ok_stream2 value: {}", ok_val); 95 } 96 97 let (_, stream) = block_on(result_stream2.next()).unwrap(); 98 let (_, stream) = block_on(stream.next()).unwrap(); 99 let (err, _) = block_on(stream.next()).unwrap_err(); 100 101 println!("The error for our result_stream2 was: {:?}", err); 102 103 println!("All done."); 104 }
- And now for our channeling example:
106 fn channel_threads() { 107 const MAX: usize = 10; 108 let (mut tx, rx) = mpsc::channel(0); 109 110 let t = thread::spawn(move || { 111 for i in 0..MAX { 112 loop { 113 if tx.try_send(i).is_ok() { 114 break; 115 } else { 116 println!("Thread transaction #{} is still pending!", i); 117 } 118 } 119 } 120 }); 121 122 let result: Vec<_> = block_on(rx.collect()).unwrap(); 123 for (index, res) in result.into_iter().enumerate() { 124 println!("Channel #{} result: {}", index, res); 125 } 126 127 t.join().unwrap(); 128 }
- Dealing with errors and channels can be done as follows:
130 fn channel_error() { 131 let (mut tx, rx) = mpsc::channel(0); 132 133 tx.try_send("hola").unwrap(); 134 135 // This should fail 136 match tx.try_send("fail") { 137 Ok(_) => println!("This should not have been successful"), 138 Err(err) => println!("Send failed! {:?}", err), 139 } 140 141 let (result, rx) = block_on(rx.next()).ok().unwrap(); 142 println!("The result of the channel transaction is: {}", 143 result.unwrap()); 144 145 // Now we should be able send to the transaction since we
poll'ed a result already 146 tx.try_send("hasta la vista").unwrap(); 147 drop(tx); 148 149 let (result, rx) = block_on(rx.next()).ok().unwrap(); 150 println!("The next result of the channel transaction is: {}", 151 result.unwrap()); 152 153 // Pulling more should result in None 154 let (result, _) = block_on(rx.next()).ok().unwrap(); 155 println!("The last result of the channel transaction is:
{:?}", 156 result); 157 }
- We can even work with buffers and channels together. Let's add our channel_buffer function:
159 fn channel_buffer() { 160 let (mut tx, mut rx) = mpsc::channel::(0); 161 162 let f = poll_fn(move |cx| { 163 if !tx.poll_ready(cx).unwrap().is_ready() { 164 panic!("transactions should be ready right away!"); 165 } 166 167 tx.start_send(20).unwrap(); 168 if tx.poll_ready(cx).unwrap().is_pending() { 169 println!("transaction is pending..."); 170 } 171 172 // When we're still in "Pending mode" we should not be able 173 // to send more messages/values to the receiver 174 if tx.start_send(10).unwrap_err().is_full() { 175 println!("transaction could not have been sent to the
receiver due 176 to being full..."); 177 } 178 179 let result = rx.poll_next(cx).unwrap(); 180 println!("the first result is: {:?}", result); 181 println!("is transaction ready? {:?}", 182 tx.poll_ready(cx).unwrap().is_ready()); 183 184 // We should now be able to send another message
since we've pulled 185 // the first message into a result/value/variable. 186 if !tx.poll_ready(cx).unwrap().is_ready() { 187 panic!("transaction should be ready!"); 188 } 189 190 tx.start_send(22).unwrap(); 191 let result = rx.poll_next(cx).unwrap(); 192 println!("new result for transaction is: {:?}", result); 193 194 FINISHED 195 }); 196 197 block_on(f).unwrap(); 198 }
- Just because we're using the futures crate doesn't mean everything has to be concurrent. Add the following example to demonstrate how to block with channels:
200 fn channel_threads_blocking() { 201 let (tx, rx) = mpsc::channel::(0); 202 let (tx_2, rx_2) = mpsc::channel::<()>(2); 203 204 let t = thread::spawn(move || { 205 let tx_2 = tx_2.sink_map_err(|_| panic!()); 206 let (a, b) =
block_on(tx.send(10).join(tx_2.send(()))).unwrap(); 207 208 block_on(a.send(30).join(b.send(()))).unwrap(); 209 }); 210 211 let (_, rx_2) = block_on(rx_2.next()).ok().unwrap(); 212 let (result, rx) = block_on(rx.next()).ok().unwrap(); 213 println!("The first number that we sent was: {}",
result.unwrap()); 214 215 drop(block_on(rx_2.next()).ok().unwrap()); 216 let (result, _) = block_on(rx.next()).ok().unwrap(); 217 println!("The second number that we sent was: {}",
result.unwrap()); 218 219 t.join().unwrap(); 220 }
- Sometimes we'll need concepts such as unbounded channels; let's add our channel_unbounded function:
222 fn channel_unbounded() { 223 const MAX_SENDS: u32 = 5; 224 const MAX_THREADS: u32 = 4; 225 let (tx, rx) = mpsc::unbounded::(); 226 227 let t = thread::spawn(move || { 228 let result: Vec<_> = block_on(rx.collect()).unwrap(); 229 for item in result.iter() { 230 println!("channel_unbounded: results on rx: {:?}", item); 231 } 232 }); 233 234 for _ in 0..MAX_THREADS { 235 let tx = tx.clone(); 236 237 thread::spawn(move || { 238 for _ in 0..MAX_SENDS { 239 tx.unbounded_send(1).unwrap(); 240 } 241 }); 242 } 243 244 drop(tx); 245 246 t.join().ok().unwrap(); 247 }
- And now we can add our main function:
249 fn main() { 250 println!("quick_streams():"); 251 quick_streams(); 252 253 println!(" iterate_streams():"); 254 iterate_streams(); 255 256 println!(" channel_threads():"); 257 channel_threads(); 258 259 println!(" channel_error():"); 260 channel_error(); 261 262 println!(" channel_buffer():"); 263 channel_buffer(); 264 265 println!(" channel_threads_blocking():"); 266 channel_threads_blocking(); 267 268 println!(" channel_unbounded():"); 269 channel_unbounded(); 270 }