How to do it...

  1. Inside the bin folder, create a new file called combinators.rs.

 

  1. Add the following code and run it with cargo run --bin combinators:
1   extern crate futures;
2   extern crate futures_util;
3   
4   use futures::prelude::*;
5   use futures::channel::{mpsc, oneshot};
6   use futures::executor::block_on;
7   use futures::future::{ok, err, join_all, select_all, poll_fn};
8   use futures::stream::iter_result;
9   use futures_util::stream::select_all as select_all_stream;
10  
11  use std::thread;
12  
13  const FINISHED: Result<Async<()>, Never> = Ok(Async::Ready(()));
  1. Let's add our join_all example function:
15  fn join_all_example() {
16    let future1 = Ok::<_, ()>(vec![1, 2, 3]);
17    let future2 = Ok(vec![10, 20, 30]);
18    let future3 = Ok(vec![100, 200, 300]);
19  
20    let results = block_on(join_all(vec![future1, future2,  
future3])).unwrap(); 21 println!("Results of joining 3 futures: {:?}", results); 22 23 // For parameters with a lifetime 24 fn sum_vecs<'a>(vecs: Vec<&'a [i32]>) -> Box<Future, Error =
()> + 'static> { 25 Box::new(join_all(vecs.into_iter().map(|x| Ok::<i32, ()>
(x.iter().sum())))) 26 } 27 28 let sum_results = block_on(sum_vecs(vec![&[1, 3, 5], &[6, 7,
8], &[0]])).unwrap(); 29 println!("sum_results: {:?}", sum_results); 30 } 31

Next, we will write out our shared function:

32  fn shared() {
33    let thread_number = 2;
34    let (tx, rx) = oneshot::channel::();
35    let f = rx.shared();
36    let threads = (0..thread_number)
37      .map(|thread_index| {
38        let cloned_f = f.clone();
39        thread::spawn(move || {
40          let value = block_on(cloned_f).unwrap();
41          println!("Thread #{}: {:?}", thread_index, *value);
42        })
43      })
44      .collect::<Vec<_>>();
45    tx.send(42).unwrap();
46  
47    let shared_return = block_on(f).unwrap();
48    println!("shared_return: {:?}", shared_return);
49  
50    for f in threads {
51      f.join().unwrap();
52    }
53  }

And now for our select_all example:

55  fn select_all_example() {
56    let vec = vec![ok(3), err(24), ok(7), ok(9)];
57  
58    let (value, _, vec) = block_on(select_all(vec)).unwrap();
59    println!("Value of vec: = {}", value);
60  
61    let (value, _, vec) = 
block_on(select_all(vec)).err().unwrap(); 62 println!("Value of vec: = {}", value); 63 64 let (value, _, vec) = block_on(select_all(vec)).unwrap(); 65 println!("Value of vec: = {}", value); 66 67 let (value, _, _) = block_on(select_all(vec)).unwrap(); 68 println!("Value of vec: = {}", value); 69 70 let (tx_1, rx_1) = mpsc::unbounded::(); 71 let (tx_2, rx_2) = mpsc::unbounded::(); 72 let (tx_3, rx_3) = mpsc::unbounded::(); 73 74 let streams = vec![rx_1, rx_2, rx_3]; 75 let stream = select_all_stream(streams); 76 77 tx_1.unbounded_send(3).unwrap(); 78 tx_2.unbounded_send(6).unwrap(); 79 tx_3.unbounded_send(9).unwrap(); 80 81 let (value, details) = block_on(stream.next()).unwrap(); 82 83 println!("value for select_all on streams: {:?}", value); 84 println!("stream details: {:?}", details); 85 }

Now we can add our flatten, fuse, and inspect functions:

87  fn flatten() {
88    let f = ok::<_, _>(ok::<u32, Never>(100));
89    let f = f.flatten();
90    let results = block_on(f).unwrap();
91    println!("results: {}", results);
92  }
93  
94  fn fuse() {
95    let mut f = ok::<u32, Never>(123).fuse();
96  
97    block_on(poll_fn(move |mut cx| {
98        let first_result = f.poll(&mut cx);
99        let second_result = f.poll(&mut cx);
100       let third_result = f.poll(&mut cx);
101 
102       println!("first result: {:?}", first_result);
103       println!("second result: {:?}", second_result);
104       println!("third result: {:?}", third_result);
105 
106       FINISHED
107     }))
108     .unwrap();
109 }
110 
111 fn inspect() {
112   let f = ok::<u32, Never>(111);
113   let f = f.inspect(|&val| println!("inspecting: {}", val));
114   let results = block_on(f).unwrap();
115   println!("results: {}", results);
116 }

Then we can add our chaining example:

118 fn chaining() {
119   let (tx, rx) = mpsc::channel(3);
120   let f = tx.send(1)
121     .and_then(|tx| tx.send(2))
122     .and_then(|tx| tx.send(3));
123 
124   let t = thread::spawn(move || {
125     block_on(f.into_future()).unwrap();
126   });
127 
128   t.join().unwrap();
129 
130   let result: Vec<_> = block_on(rx.collect()).unwrap();
131   println!("Result from chaining and_then: {:?}", result);
132 
133   // Chaining streams together
134   let stream1 = iter_result(vec![Ok(10), Err(false)]);
135   let stream2 = iter_result(vec![Err(true), Ok(20)]);
136 
137   let stream = stream1.chain(stream2)
138     .then(|result| Ok::<_, ()>(result));
139 
140   let result: Vec<_> = block_on(stream.collect()).unwrap();
141   println!("Result from chaining our streams together: {:?}",   
result); 142 }

And now for our main function:

144 fn main() {
145   println!("join_all_example():");
146   join_all_example();
147 
148   println!("
shared():");
149   shared();
150 
151   println!("
select_all_example():");
152   select_all_example();
153 
154   println!("
flatten():");
155   flatten();
156 
157   println!("
fuse():");
158   fuse();
159 
160   println!("
inspect():");
161   inspect();
162 
163   println!("
chaining():");
164   chaining();
165 }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.123.106