How it works...

First, let's take a look at the futures::Sink trait itself:

pub trait Sink {
type SinkItem;
type SinkError;

fn poll_ready(
&mut self,
cx: &mut Context
) -> Result<Async<()>, Self::SinkError>;
fn start_send(
&mut self,
item: Self::SinkItem
) -> Result<(), Self::SinkError>;
fn poll_flush(
&mut self,
cx: &mut Context
) -> Result<Async<()>, Self::SinkError>;
fn poll_close(
&mut self,
cx: &mut Context
) -> Result<Async<()>, Self::SinkError>;
}

We are already familiar with the Item and Error concepts from futures and streams, so we will move on to the required functions:

  • poll_ready must be invoked with the returning value of Ok(futures::Async::Ready(())) before each attempt at using start_send. If the sink receives an error, the sink will no longer be able to receive items.
  • start_send, as stated previously, prepares the message to be delivered, but won't until we flush or close the sink. If the sink uses buffers, the Sink::SinkItem won't be processed until the buffer has been fully completed.
  • poll_flush will flush the sink, which will allow us to collect items that are currently being processed. futures::Async::Ready will return if the sink does not have any more items within the buffer, otherwise, the sink will return futures::Async::Pending.
  • poll_close will flush and close the sink, following the same return rules as poll_flush.

Now, onto our vector_sinks() function:

  • Sinks are implemented for Vec<T> types, so we can declare a mutable vector and use the start_send() function, which will immediately poll our values into the vector on lines 13 through 15.
  • On line 28 we use the futures::SinkExt::send(self, item: Self::SinkItem), which will complete after the item has been processed and flushed through the sink. futures::SinkExt::send_all is recommended for batching multiple items to send through, versus having to manually flush between each send call (as demonstrated on line 43).

Our mapping_sinks() function:

  • Line 51 demonstrates how you can map/manipulate elements within a sink using the futures::SinkExt::with function. This function produces a new sink that iterates through each item and sends the final value as a future to the parent sink.
  • Line 60 illustrates the futures::SinkExt::flat_with_map function that has mostly the same functionality as the futures::SinkExt::with function except each iterated item is sent as a stream value to the parent sink and will return an Iterator::flat_map value instead of a Iterator::map

Next, the fanout() function:

  • The futures::SinkExt::fanout function allows us to send messages to multiple sinks at one time, as we have done on line 72.

And then manual_flush():

  • We first implement our own Sink trait with the ManualSink<T> construct (lines 81 through 135). Our ManualSink's poll_flush method will only return Async::Ready() if our data vector is empty, otherwise, we are going to push the task (futures::task::Waker) into a queue line through the waiting_tasks attribute. We use the waiting_tasks attribute within our force_flush() function (line 128) in order to manually wake up our tasks (line 130).
  • On lines 138 through 140, we build our ManualSink<Option<i32>> and start sending some values.
  • We use poll_fn on line 142 in order to quickly build a futures::task::Context so that we may pass this value down to our underlying poll calls.
  • On line 144 we manually call our poll_flush() function, which will not execute our actual tasks since they are placed within the waiting_tasks attribute. 
  • Until we invoke force_flush(), our sink will not return any values (as indicated on lines 150-151). Once this function has been called upon and the underlying Waker tasks have finished executing, then we can see the messages (line 152) that we sent earlier (lines 139 and 140).
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.135.225