- Inside the bin folder, create a new file called sinks.rs.
- Add the following code and run it with cargo run --bin sinks:
1 extern crate futures; 2 3 use futures::prelude::*; 4 use futures::future::poll_fn; 5 use futures::executor::block_on; 6 use futures::sink::flush; 7 use futures::stream::iter_ok; 8 use futures::task::{Waker, Context}; 9 10 use std::mem;
- Let's add our examples with using vectors as sinks:
12 fn vector_sinks() { 13 let mut vector = Vec::new(); 14 let result = vector.start_send(0); 15 let result2 = vector.start_send(7); 16 17 println!("vector_sink: results of sending should both be
Ok(()): {:?} and {:?}", 18 result, 19 result2); 20 println!("The entire vector is now {:?}", vector); 21 22 // Now we need to flush our vector sink. 23 let flush = flush(vector); 24 println!("Our flush value: {:?}", flush); 25 println!("Our vector value: {:?}",
flush.into_inner().unwrap()); 26 27 let vector = Vec::new(); 28 let mut result = vector.send(2); 29 // safe to unwrap since we know that we have not flushed the
sink yet 30 let result = result.get_mut().unwrap().send(4); 31 32 println!("Result of send(): {:?}", result); 33 println!("Our vector after send(): {:?}",
result.get_ref().unwrap()); 34 35 let vector = block_on(result).unwrap(); 36 println!("Our vector should already have one element: {:?}",
vector); 37 38 let result = block_on(vector.send(2)).unwrap(); 39 println!("We can still send to our stick to ammend values:
{:?}", 40 result); 41 42 let vector = Vec::new(); 43 let send_all = vector.send_all(iter_ok(vec![1, 2, 3])); 44 println!("The value of vector's send_all: {:?}", send_all); 45 46 // Add some more elements to our vector... 47 let (vector, _) = block_on(send_all).unwrap(); 48 let (result, _) = block_on(vector.send_all(iter_ok(vec![0, 6,
7]))).unwrap(); 49 println!("send_all's return value: {:?}", result); 50 }
We can map/transform our sinks values. Let's add our mapping_sinks example:
52 fn mapping_sinks() { 53 let sink = Vec::new().with(|elem: i32| Ok::<i32, Never>(elem
* elem)); 54 55 let sink = block_on(sink.send(0)).unwrap(); 56 let sink = block_on(sink.send(3)).unwrap(); 57 let sink = block_on(sink.send(5)).unwrap(); 58 println!("sink with() value: {:?}", sink.into_inner()); 59 60 let sink = Vec::new().with_flat_map(|elem| iter_ok(vec![elem;
elem].into_iter().map(|y| y * y))); 61 62 let sink = block_on(sink.send(0)).unwrap(); 63 let sink = block_on(sink.send(3)).unwrap(); 64 let sink = block_on(sink.send(5)).unwrap(); 65 let sink = block_on(sink.send(7)).unwrap(); 66 println!("sink with_flat_map() value: {:?}",
sink.into_inner()); 67 }
We can even send messages to multiple sinks. Let's add our fanout function:
69 fn fanout() { 70 let sink1 = vec![]; 71 let sink2 = vec![]; 72 let sink = sink1.fanout(sink2); 73 let stream = iter_ok(vec![1, 2, 3]); 74 let (sink, _) = block_on(sink.send_all(stream)).unwrap(); 75 let (sink1, sink2) = sink.into_inner(); 76 77 println!("sink1 values: {:?}", sink1); 78 println!("sink2 values: {:?}", sink2); 79 }
Next, we'll want to implement a structure for a customized sink. Sometimes our application will require us to manually flush our sinks instead of doing it automatically. Let's add our ManualSink structure:
81 #[derive(Debug)] 82 struct ManualSink { 83 data: Vec, 84 waiting_tasks: Vec, 85 } 86 87 impl Sink for ManualSink { 88 type SinkItem = Option; // Pass None to flush 89 type SinkError = (); 90 91 fn start_send(&mut self, op: Option) -> Result<(),
Self::SinkError> { 92 if let Some(item) = op { 93 self.data.push(item); 94 } else { 95 self.force_flush(); 96 } 97 98 Ok(()) 99 } 100 101 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<(), ()> { 102 Ok(Async::Ready(())) 103 } 104 105 fn poll_flush(&mut self, cx: &mut Context) -> Poll<(), ()> { 106 if self.data.is_empty() { 107 Ok(Async::Ready(())) 108 } else { 109 self.waiting_tasks.push(cx.waker().clone()); 110 Ok(Async::Pending) 111 } 112 } 113 114 fn poll_close(&mut self, _cx: &mut Context) -> Poll<(), ()> { 115 Ok(().into()) 116 } 117 } 118 119 impl ManualSink { 120 fn new() -> ManualSink { 121 ManualSink { 122 data: Vec::new(), 123 waiting_tasks: Vec::new(), 124 } 125 } 126 127 fn force_flush(&mut self) -> Vec { 128 for task in self.waiting_tasks.clone() { 129 println!("Executing a task before replacing our values"); 130 task.wake(); 131 } 132 133 mem::replace(&mut self.data, vec![]) 134 } 135 }
And now for our manual flush function:
137 fn manual_flush() { 138 let mut sink = ManualSink::new().with(|x| Ok::<Option, ()>
(x)); 139 let _ = sink.get_mut().start_send(Some(3)); 140 let _ = sink.get_mut().start_send(Some(7)); 141 142 let f = poll_fn(move |cx| -> Poll<Option<_>, Never> { 143 // Try to flush our ManualSink 144 let _ = sink.get_mut().poll_flush(cx); 145 let _ = flush(sink.get_mut()); 146 147 println!("Our sink after trying to flush: {:?}",
sink.get_ref()); 148 149 let results = sink.get_mut().force_flush(); 150 println!("Sink data after manually flushing: {:?}", 151 sink.get_ref().data); 152 println!("Final results of sink: {:?}", results); 153 154 Ok(Async::Ready(Some(()))) 155 }); 156 157 block_on(f).unwrap(); 158 }
And lastly, we can add our main function:
160 fn main() { 161 println!("vector_sinks():"); 162 vector_sinks(); 163 164 println!(" mapping_sinks():"); 165 mapping_sinks(); 166 167 println!(" fanout():"); 168 fanout(); 169 170 println!(" manual_flush():"); 171 manual_flush(); 172 }