- Inside the bin folder, create a new file called combinators.rs.
- Add the following code and run it with cargo run --bin combinators:
1 extern crate futures; 2 extern crate futures_util; 3 4 use futures::prelude::*; 5 use futures::channel::{mpsc, oneshot}; 6 use futures::executor::block_on; 7 use futures::future::{ok, err, join_all, select_all, poll_fn}; 8 use futures::stream::iter_result; 9 use futures_util::stream::select_all as select_all_stream; 10 11 use std::thread; 12 13 const FINISHED: Result<Async<()>, Never> = Ok(Async::Ready(()));
- Let's add our join_all example function:
15 fn join_all_example() { 16 let future1 = Ok::<_, ()>(vec![1, 2, 3]); 17 let future2 = Ok(vec![10, 20, 30]); 18 let future3 = Ok(vec![100, 200, 300]); 19 20 let results = block_on(join_all(vec![future1, future2,
future3])).unwrap(); 21 println!("Results of joining 3 futures: {:?}", results); 22 23 // For parameters with a lifetime 24 fn sum_vecs<'a>(vecs: Vec<&'a [i32]>) -> Box<Future, Error =
()> + 'static> { 25 Box::new(join_all(vecs.into_iter().map(|x| Ok::<i32, ()>
(x.iter().sum())))) 26 } 27 28 let sum_results = block_on(sum_vecs(vec![&[1, 3, 5], &[6, 7,
8], &[0]])).unwrap(); 29 println!("sum_results: {:?}", sum_results); 30 } 31
Next, we will write out our shared function:
32 fn shared() { 33 let thread_number = 2; 34 let (tx, rx) = oneshot::channel::(); 35 let f = rx.shared(); 36 let threads = (0..thread_number) 37 .map(|thread_index| { 38 let cloned_f = f.clone(); 39 thread::spawn(move || { 40 let value = block_on(cloned_f).unwrap(); 41 println!("Thread #{}: {:?}", thread_index, *value); 42 }) 43 }) 44 .collect::<Vec<_>>(); 45 tx.send(42).unwrap(); 46 47 let shared_return = block_on(f).unwrap(); 48 println!("shared_return: {:?}", shared_return); 49 50 for f in threads { 51 f.join().unwrap(); 52 } 53 }
And now for our select_all example:
55 fn select_all_example() { 56 let vec = vec![ok(3), err(24), ok(7), ok(9)]; 57 58 let (value, _, vec) = block_on(select_all(vec)).unwrap(); 59 println!("Value of vec: = {}", value); 60 61 let (value, _, vec) =
block_on(select_all(vec)).err().unwrap(); 62 println!("Value of vec: = {}", value); 63 64 let (value, _, vec) = block_on(select_all(vec)).unwrap(); 65 println!("Value of vec: = {}", value); 66 67 let (value, _, _) = block_on(select_all(vec)).unwrap(); 68 println!("Value of vec: = {}", value); 69 70 let (tx_1, rx_1) = mpsc::unbounded::(); 71 let (tx_2, rx_2) = mpsc::unbounded::(); 72 let (tx_3, rx_3) = mpsc::unbounded::(); 73 74 let streams = vec![rx_1, rx_2, rx_3]; 75 let stream = select_all_stream(streams); 76 77 tx_1.unbounded_send(3).unwrap(); 78 tx_2.unbounded_send(6).unwrap(); 79 tx_3.unbounded_send(9).unwrap(); 80 81 let (value, details) = block_on(stream.next()).unwrap(); 82 83 println!("value for select_all on streams: {:?}", value); 84 println!("stream details: {:?}", details); 85 }
Now we can add our flatten, fuse, and inspect functions:
87 fn flatten() { 88 let f = ok::<_, _>(ok::<u32, Never>(100)); 89 let f = f.flatten(); 90 let results = block_on(f).unwrap(); 91 println!("results: {}", results); 92 } 93 94 fn fuse() { 95 let mut f = ok::<u32, Never>(123).fuse(); 96 97 block_on(poll_fn(move |mut cx| { 98 let first_result = f.poll(&mut cx); 99 let second_result = f.poll(&mut cx); 100 let third_result = f.poll(&mut cx); 101 102 println!("first result: {:?}", first_result); 103 println!("second result: {:?}", second_result); 104 println!("third result: {:?}", third_result); 105 106 FINISHED 107 })) 108 .unwrap(); 109 } 110 111 fn inspect() { 112 let f = ok::<u32, Never>(111); 113 let f = f.inspect(|&val| println!("inspecting: {}", val)); 114 let results = block_on(f).unwrap(); 115 println!("results: {}", results); 116 }
Then we can add our chaining example:
118 fn chaining() { 119 let (tx, rx) = mpsc::channel(3); 120 let f = tx.send(1) 121 .and_then(|tx| tx.send(2)) 122 .and_then(|tx| tx.send(3)); 123 124 let t = thread::spawn(move || { 125 block_on(f.into_future()).unwrap(); 126 }); 127 128 t.join().unwrap(); 129 130 let result: Vec<_> = block_on(rx.collect()).unwrap(); 131 println!("Result from chaining and_then: {:?}", result); 132 133 // Chaining streams together 134 let stream1 = iter_result(vec![Ok(10), Err(false)]); 135 let stream2 = iter_result(vec![Err(true), Ok(20)]); 136 137 let stream = stream1.chain(stream2) 138 .then(|result| Ok::<_, ()>(result)); 139 140 let result: Vec<_> = block_on(stream.collect()).unwrap(); 141 println!("Result from chaining our streams together: {:?}",
result); 142 }
And now for our main function:
144 fn main() { 145 println!("join_all_example():"); 146 join_all_example(); 147 148 println!(" shared():"); 149 shared(); 150 151 println!(" select_all_example():"); 152 select_all_example(); 153 154 println!(" flatten():"); 155 flatten(); 156 157 println!(" fuse():"); 158 fuse(); 159 160 println!(" inspect():"); 161 inspect(); 162 163 println!(" chaining():"); 164 chaining(); 165 }