How to do it...

  1. Inside the bin folder, create a new file called sinks.rs.
  2. Add the following code and run it with cargo run --bin sinks:
1   extern crate futures;
2   
3   use futures::prelude::*;
4   use futures::future::poll_fn;
5   use futures::executor::block_on;
6   use futures::sink::flush;
7   use futures::stream::iter_ok;
8   use futures::task::{Waker, Context};
9   
10  use std::mem;
  1. Let's add our examples with using vectors as sinks:
12  fn vector_sinks() {
13    let mut vector = Vec::new();
14    let result = vector.start_send(0);
15    let result2 = vector.start_send(7);
16  
17    println!("vector_sink: results of sending should both be 
Ok(()): {:?} and {:?}", 18 result, 19 result2); 20 println!("The entire vector is now {:?}", vector); 21 22 // Now we need to flush our vector sink. 23 let flush = flush(vector); 24 println!("Our flush value: {:?}", flush); 25 println!("Our vector value: {:?}",
flush.into_inner().unwrap()); 26 27 let vector = Vec::new(); 28 let mut result = vector.send(2); 29 // safe to unwrap since we know that we have not flushed the
sink yet 30 let result = result.get_mut().unwrap().send(4); 31 32 println!("Result of send(): {:?}", result); 33 println!("Our vector after send(): {:?}",
result.get_ref().unwrap()); 34 35 let vector = block_on(result).unwrap(); 36 println!("Our vector should already have one element: {:?}",
vector); 37 38 let result = block_on(vector.send(2)).unwrap(); 39 println!("We can still send to our stick to ammend values:
{:?}", 40 result); 41 42 let vector = Vec::new(); 43 let send_all = vector.send_all(iter_ok(vec![1, 2, 3])); 44 println!("The value of vector's send_all: {:?}", send_all); 45 46 // Add some more elements to our vector... 47 let (vector, _) = block_on(send_all).unwrap(); 48 let (result, _) = block_on(vector.send_all(iter_ok(vec![0, 6,
7]))).unwrap(); 49 println!("send_all's return value: {:?}", result); 50 }

We can map/transform our sinks values. Let's add our mapping_sinks example:

52  fn mapping_sinks() {
53    let sink = Vec::new().with(|elem: i32| Ok::<i32, Never>(elem 
* elem)); 54 55 let sink = block_on(sink.send(0)).unwrap(); 56 let sink = block_on(sink.send(3)).unwrap(); 57 let sink = block_on(sink.send(5)).unwrap(); 58 println!("sink with() value: {:?}", sink.into_inner()); 59 60 let sink = Vec::new().with_flat_map(|elem| iter_ok(vec![elem;
elem].into_iter().map(|y| y * y))); 61 62 let sink = block_on(sink.send(0)).unwrap(); 63 let sink = block_on(sink.send(3)).unwrap(); 64 let sink = block_on(sink.send(5)).unwrap(); 65 let sink = block_on(sink.send(7)).unwrap(); 66 println!("sink with_flat_map() value: {:?}",
sink.into_inner()); 67 }

We can even send messages to multiple sinks. Let's add our fanout function:

69  fn fanout() {
70    let sink1 = vec![];
71    let sink2 = vec![];
72    let sink = sink1.fanout(sink2);
73    let stream = iter_ok(vec![1, 2, 3]);
74    let (sink, _) = block_on(sink.send_all(stream)).unwrap();
75    let (sink1, sink2) = sink.into_inner();
76  
77    println!("sink1 values: {:?}", sink1);
78    println!("sink2 values: {:?}", sink2);
79  }

Next, we'll want to implement a structure for a customized sink. Sometimes our application will require us to manually flush our sinks instead of doing it automatically. Let's add our ManualSink structure:

81  #[derive(Debug)]
82  struct ManualSink {
83    data: Vec,
84    waiting_tasks: Vec,
85  }
86  
87  impl Sink for ManualSink {
88    type SinkItem = Option; // Pass None to flush
89    type SinkError = ();
90  
91    fn start_send(&mut self, op: Option) -> Result<(), 
Self::SinkError> { 92 if let Some(item) = op { 93 self.data.push(item); 94 } else { 95 self.force_flush(); 96 } 97 98 Ok(()) 99 } 100 101 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<(), ()> { 102 Ok(Async::Ready(())) 103 } 104 105 fn poll_flush(&mut self, cx: &mut Context) -> Poll<(), ()> { 106 if self.data.is_empty() { 107 Ok(Async::Ready(())) 108 } else { 109 self.waiting_tasks.push(cx.waker().clone()); 110 Ok(Async::Pending) 111 } 112 } 113 114 fn poll_close(&mut self, _cx: &mut Context) -> Poll<(), ()> { 115 Ok(().into()) 116 } 117 } 118 119 impl ManualSink { 120 fn new() -> ManualSink { 121 ManualSink { 122 data: Vec::new(), 123 waiting_tasks: Vec::new(), 124 } 125 } 126 127 fn force_flush(&mut self) -> Vec { 128 for task in self.waiting_tasks.clone() { 129 println!("Executing a task before replacing our values"); 130 task.wake(); 131 } 132 133 mem::replace(&mut self.data, vec![]) 134 } 135 }

And now for our manual flush function:

137 fn manual_flush() {
138   let mut sink = ManualSink::new().with(|x| Ok::<Option, ()> 
(x)); 139 let _ = sink.get_mut().start_send(Some(3)); 140 let _ = sink.get_mut().start_send(Some(7)); 141 142 let f = poll_fn(move |cx| -> Poll<Option<_>, Never> { 143 // Try to flush our ManualSink 144 let _ = sink.get_mut().poll_flush(cx); 145 let _ = flush(sink.get_mut()); 146 147 println!("Our sink after trying to flush: {:?}",
sink.get_ref()); 148 149 let results = sink.get_mut().force_flush(); 150 println!("Sink data after manually flushing: {:?}", 151 sink.get_ref().data); 152 println!("Final results of sink: {:?}", results); 153 154 Ok(Async::Ready(Some(()))) 155 }); 156 157 block_on(f).unwrap(); 158 }

And lastly, we can add our main function:

160 fn main() {
161   println!("vector_sinks():");
162   vector_sinks();
163 
164   println!("
mapping_sinks():");
165   mapping_sinks();
166 
167   println!("
fanout():");
168   fanout();
169 
170   println!("
manual_flush():");
171   manual_flush();
172 }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.85.221