You can see the hierarchy of the stream framework classes in the following diagram:
The Stream
class is abstract and provides a generic view on the sequence of byte lists. It implements numerous methods to help you manage or republish streams in your application. The sequence of events can be seamlessly provided by Stream
. The events that are generated by Stream
store the data to be delivered. In case of a failure, the Stream
class generates an error
event. When all the events have been sent, the Stream
class generates done event.
The Stream
class has the following methods that help you validate data returned from a stream:
any
: This method checks whether the test callback function accepts any element provided by this streamevery
: This method checks whether the test callback function accepts all the elements provided by this streamcontains
: This method checks whether the needle object occurs in the elements provided by this streamThe
Stream
class provides the following methods to create a new stream that contains a subset of data from the original one:
where
: This method creates a new stream from the original one with data events that satisfy the test callback functionskip
: This method creates a new stream with data events that are left after skipping the first count of the data events from the original streamskipWhere
: This method creates a new stream with the data events from the original stream when they are matched by the test callback functiontake
: This method creates a new stream with the data events that are provided at most the first count values of this streamtakeWhere
: This method creates a new stream with the data events from the original stream when the test callback function is successfulYou will find other methods of the Stream
class on the official help page at https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart-async.Stream.
The Stream
class has several factory constructors to create single-subscription or broadcast streams. Let's discuss them in detail.
The first factory constructor could confuse you because it creates a new single-subscription stream from the Future
instance with the following command:
factory Stream.fromFuture(Future<T> future)
This confusion will be quickly cleared if I remind you about the translation of the Future
class into the Stream
class via the asStream
method from the same class, as shown in the following code:
… Stream<T> asStream() => new Stream.fromFuture(this); …
The resulting stream will fire an data
or error event
when the Future
class is completed, and closes itself with the done
event. The following code snippet shows how this factory constructor can be used:
import 'dart:async'; main() { // single sample data var data = new Future<num>.delayed( const Duration(milliseconds:500), () { // Return single value return 2; }); // create the stream Stream<num> stream = new Stream<num>.fromFuture(data); // Start listening var subscriber = stream.listen((data) { print(data); }, onError:(error){ print(error); }, onDone:() { print('done'), }); }
The execution of the preceding code prints the following result on the console:
2 done
It's quite obvious that you should have a factory constructor that creates a single-subscription stream from any Iterable
instance. Iteration over a stream happens only if the stream has a listener and can be interrupted if the listener cancels the subscription. Any error that occurs in the iteration process immediately ends the stream and throws an error. In this case, the done
event doesn't fire because the iteration was not complete.
In the following code snippet, we create an instance of the Iterable
class with a factory constructor that generates the sequence of numbers. We intentionally generate an Exception
at the location of the third element:
import 'dart:async'; main() { // some sample generated data var data = new Iterable<num>.generate(5, (int indx) { if (indx < 3) { return indx; } else { throw new Exception('Wrong data'), } }); // create the stream Stream<num> stream = new Stream<num>.fromIterable(data); // Start listening var subscriber = stream.listen((data) { print(data); }, onError:(error){ print(error); }, onDone:() { print('done'), }); }
The result of the preceding code is as follows:
0 1 2 Exception: Wrong data
In the previous topic, we used the Iterable.generate
factory constructor to emit numeric data into our stream. However, we can do that well via the Stream.periodic
factory constructor, as follows:
factory Stream.periodic(Duration period, [T computation(int computationCount)])
We can do this in a natural stream and a less verbose way, as follows:
import 'dart:async'; main() { // some sample generated data Stream<num> stream = new Stream .periodic(const Duration(milliseconds: 500), (int count) { // Return count return count; }); // Start listening var subscriber = stream.listen((data) { print(data); }, onError:(error){ print(error); }, onDone:() { print('done'), }); }
The first parameter is the duration that gives you the interval between emitting events. The second parameter is the callback function that computes the event values. This function has a single parameter that defines sequential number of iterations.
The Stream.eventTransformed
factory constructor is quite interesting because it creates a new
Stream
from the existing one with the help of a sink transformation, as shown in the following code:
factory Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink))
The first parameter is the source stream that provides events to the new one. The mapSink
callback function is the one that is called when a new stream is listening. All the events from the existing stream pass through the sink to reach a new stream. This constructor is widely used to create stream transformers. In the following code, we will create a DoublingSink
class. It accepts the output stream as an argument of the constructor. We will implement the number-doubling algorithm inside the add
method. The other addError
and close
methods are simple and pass the incoming parameter values to the underlying stream object, as shown in the following code:
import 'dart:async'; /** * An interface that abstracts creation or handling of * Stream events. */ class DoublingSink implements EventSink<num> { final EventSink<num> _output; DoublingSink(this._output); /** Send a data event to a stream. */ void add(num event) { _output.add(event * 2); } /** Send an async error to a stream. */ void addError(errorEvent, [StackTrace stackTrace]) => _output.addError(errorEvent, stackTrace); /** Send a done event to a stream.*/ void close() => _output.close(); }
The DoublingTransformer
class implements StreamTransformer
for numbers. In the bind
method, which is compulsory, we will create a new stream via the eventTransformer
constructor and return the instance of DoublingSink
as result of the constructor's callback, as shown in the following code:
class DoublingTransformer implements StreamTransformer<num, num> { Stream<num> bind(Stream<num> stream) { return new Stream<num>.eventTransformed(stream, (EventSink sink) => new DoublingSink(sink)); } } void main() { // some sample data var data = [1,2,3,4,5]; // create the stream var stream = new Stream<num>.fromIterable(data); // Create DoublingTransformer var streamTransformer = new DoublingTransformer(); // Bound streams var boundStream = stream.transform(streamTransformer); // Because we start listening the 'bound' stream the 'listen' method // invokes the 'doublingTransformer' closure boundStream.listen((data) { print('$data'), }); }
In the main
method, we created a simple stream via the Stream.fromIterable
factory constructor and created a stream transformer as an instance of DoublingTransformer
. So, we can combine them together in a call of the transform
method. When we start listening to the bounded stream, events from the source stream will be doubled inside DoublingSink
and accommodated here. The following result is expected:
2 4 6 8 10
In the previous topics, we saw how a stream can be easily created from another stream with the help of one of the factory constructors. However, you can create a stream from scratch with help of StreamController
, which gives you more control over generating events of a stream. With StreamController
, we can create a stream to send the data
, error
, and done
events to the stream directly. A stream can be created via the StreamController
class through the different factory constructors. If you plan to create a single-subscription stream, use the following factory constructor:
factory StreamController( {void onListen(), void onPause(), void onResume(), onCancel(),bool sync: false})
The controller has a life cycle that presents the following states:
data
events in this state.onListen
and onCancel
callback functions are called when the subscriber registers or ends the subscription accordingly. The callback functions onPause
and onResume
are called when the controlling stream via a subscriber changes the state to pause or resume. The controller may not call the onResume
callback function if the new data from the stream was canceled.If the sync
attribute is equal to true
, it tells the controller that the events might be directly passed into the listening stream by the subscriber when the add
, addError
, or close
methods are called. In this case, the events will be passed only after the code that creates the events has returned.
A stream instance is available via the stream
property. Use the add
, addError
, and close
methods of StreamSink
to manage the underlying stream. The controller buffers the data until a subscriber starts listening, but bear in mind that the buffering approach is not optimized to keep a high volume of events. The following code snippet shows you how to create a single-subscription stream with StreamController
:
import 'dart:async'; main() { // create the stream Stream<num> stream = createStream(); // Start listening StreamSubscription<num> sub = createSubscription(stream); } StreamSubscription<num> createSubscription(Stream<num> stream) { StreamSubscription subscriber; subscriber = stream.listen((num data) { print('onData: $data'), // Pause subscription on 3-th element if (data == 3) { subscriber.pause(new Future.delayed( const Duration(milliseconds: 500), () => 'ok')); } }, onError:(error) => print('onError: $error'), onDone:() => print('onDone')); return subscriber; } Stream<num> createStream() { StreamController<num> controller = new StreamController<num>( onListen:() => print('Listening'), onPause: () => print('Paused'), onResume: () => print('Resumed'), onCancel: () => print('Canceled'), sync: false); // num i = 0; Future.doWhile((){ controller.add(i++); // Throws exception on 5-th element if (i == 5) { controller.addError('on ${i}-th element'), } // Stop stream at 7-th event if (i == 7) { controller.close(); return false; } return true; }); return controller.stream; }
In the preceding code, we intentionally throw an error at the 5-th
element and stop the stream at the 7-th
element. The stream subscriber paused listening to the stream at the 3-th
element and resumed it after a delay of 500 milliseconds. This will generate the following result:
Listening onData: 0 onData: 1 onData: 2 onData: 3 Paused onData: 4 onError: on 5-th element onData: 5 onData: 6 Canceled onDone
The following factory constructor creates a controller for the broadcast stream, which can be listened to more than once:
factory StreamController.broadcast({void onListen(), void onCancel(), bool sync: false})
The controller created by this constructor delivers the data
, error
, or done
events to all listeners when the add
, addError
, or close
methods are called. The invocation method with the same name is called in an order and is always before a previous call is returned. This controller, as opposed to the single-subscribed one, doesn't have the internal queue of events. This means that the data
or error
event will be lost if there are no listeners registered at the time, this event is added. Each listener subscription acts independently. If one subscription pauses, then only this one is affected, so all the events buffer internally in the controller until the subscription resumes or cancels. The controller has a life cycle that has the following states:
data
and error
events in this state.onListen
and onCancel
callback functions are called at the moment when the first subscriber is registered or the last one ends its subscription simultaneously.If the sync
attribute is equal to true
, it tells the controller that events might be passed directly into the listening stream by subscribers when the add
, addError
, or close
methods are called. Hence, the events will be passed after the code that creates the event is returned, but this is not guaranteed when multiple listeners get the events. Independent of the value of the sync
attribute, each listener gets all the events in the correct order. The following is a slightly changed version of the previous code with two subscriptions:
import 'dart:async'; main() { // create the stream Stream<num> stream = createStream(); StreamSubscription<num> sub1 = createSubscription(stream, 1); StreamSubscription<num> sub2 = createSubscription(stream, 2); } StreamSubscription<num> createSubscription(Stream<num> stream, num number) { // Start listening StreamSubscription subscriber; subscriber = stream.listen((num data) { print('onData ${number}: $data'), // Pause subscription on 3-th element if (data == 3) { subscriber.pause(new Future.delayed( const Duration(milliseconds: 500), () => 'ok')); } }, onError:(error) => print('onError: $error'), onDone:() => print('onDone')); return subscriber; } Stream<num> createStream() { StreamController<num> controller = new StreamController<num>.broadcast( onListen:() => print('Listening'), onCancel: () => print('Canceled'), sync: false); // num i = 0; Future.doWhile((){ controller.add(i++); // Throws exception on 5-th element if (i == 5) { controller.addError('on ${i}-th element'), } // Stop stream at 7-th event if (i == 7) { controller.close(); return false; } return true; }); return controller.stream; }
The preceding code snippet generates the following result:
Listening onData 1: 1 onData 2: 1 onData 1: 2 onData 2: 2 onData 1: 3 onData 2: 3 onData 1: 4 onError: on 5-th element onData 1: 5 onData 1: 6 onDone onData 2: 4 onError: on 5-th element onData 2: 5 onData 2: 6 Canceled onDone
These results reaffirm the fact that the broadcast stream doesn't guarantee the order of the delivery events to different listeners. It only guarantees an order of the delivery events inside each listener.
The listen
method of the Stream
class adds the following subscription to the stream:
StreamSubscription<T> listen( void onData(T event), { Function onError, void onDone(), bool cancelOnError});
A callback function onData
is called every time when a new data
event comes from this stream. Existence of this function is important because without it nothing will happen. The optional onError
callback is called when an error comes from the stream. This function accepts one or two arguments. The first argument is always an error from the stream. The second argument, if it exists, is a StackTrace
instance. It can be equal to null
if the stream received an error without StackTrace
itself. When the stream closes, it calls the onDone
callback function. The cancelOnError
flag informs the subscription to start the cancellation the moment the error occurs.
A result of this method is the instance of the StreamSubscription
class. It provides events to the listener and holds the callback functions used in the listen
method of the Stream
class. You can set or override all the three callback functions via the onData
, onError
, and onDone
methods of the StreamSubscriber
class. The listening stream can be paused and resumed with the pause
and resume
methods. A special flag isPaused
returns true
if an instance of the StreamSubscription
class is paused. The stream subscription can end with the cancel
method at any time. It returns a Future
instance, which completes with a null
value when the stream is done cleaning up. This feature is also useful for tasks such as closing a file after reading it.
The StreamView
class is wrapper for the Stream class exposes only the isBroadcast
getter, the asBroadCastStream
and listen methods from original one. So if you need clear Stream
interface in your code, you can use it like this:
import 'dart:async'; main() { // some sample data var data = [1,2,3,4,5]; // create the stream var stream = new Stream<num>.fromIterable(data); // Create a view var streamView = new StreamView(stream); // Now listen stream view like stream var subscriber = streamView.listen((data) { print(data); }, onError:(error){ print(error); }, onDone:() { print('done'), }); }
You will get the following result:
1 2 3 4 5 done
A Sink
class represents a generic interface for data receivers. It defines the add
method that will put the data in the sink and the close
method, which tells the sink that no data will be added in future. The
EventSink
class uses the add
method of the Sink
class to send a data
event to a stream, as well as the close
method to send a done
event. The addError
method belongs to the EventSink
class that sends an asynchronous error to a stream.
We can bind one stream to another via the pipe
method of the Stream
class. The consumer stream is represented by the StreamConsumer
interface. This interface defines the contract between two streams. The addStream
method is used to consume the elements of the source stream. The consumer stream will listen on the source stream and do something for each event. It may stop listening after an error or may consume all errors and stop at the done
event. The close
method tells the consumer that no future streams will be added. This method returns the Future
instance that is completed when the source stream has been consumed and the consumer is closed.
The StreamSink
class combines methods from StreamConsumer
and EventSink
. You should know that methods from both the classes will block each other. We cannot send the data
or error
events via the methods of the EventSink
class while we are adding the source stream via the addStream
method from StreamConsumer
. We can start using the methods from EventSink
only after the Future
instance returned by the addStream
method is completed with a value. Also, the addStream
method will be delayed until the underling system consumes the data added by the EventSink
method. The StreamSink
class has a done
getter that returns the Future
instance that is completed when the owned StreamSink
class is finished with one of the following conditions:
add
, addError
, or close
methods of the EventSink
classThe StreamTransformer
class helps you create a new consumer stream that is bound to the original one via the bind
method. The StreamTransformer
class can be instantiated through two factory constructors that define different strategies on how the transformation will happen. In following factory constructor, we need to specify the following special transformer
function:
const factory StreamTransformer (Function StreamSubscription<T> transformer(Stream<S> stream, boolcancelOnError))
The transformer
function receives a bounded stream as an argument and returns an instance of the StreamSubscription
class. If you are planning to implement your own stream transformer function, it will look like this:
import 'dart:async'; void main() { // some sample data var data = [1,2,3,4,5]; // create the stream var stream = new Stream<num>.fromIterable(data); // Create StreamTransformer with transformer closure var streamTransformer = new StreamTransformer<num, num>(doublingTransformer); // Bound streams var boundStream = stream.transform(streamTransformer); // Because we start listening the 'bound' stream the // 'listen' method invokes the 'doublingTransformer' // closure boundStream.listen((data) { print('$data'), }); } StreamSubscription doublingTransformer(Stream<num> input, bool cancelOnError) { StreamController<num> controller; StreamSubscription<num> subscription; controller = new StreamController<num>( onListen: () { subscription = input.listen((data) { // Scale the data double. controller.add(data * 2); }, onError: controller.addError, onDone: controller.close, cancelOnError: cancelOnError); }); return controller.stream.listen(null); }
The preceding code generates the following output:
2 4 6 8 10
The other factory method creates a StreamTransformer
class that delegates events to the special functions, which handle the data
, error
, and done
events, as shown in the following code:
factory StreamTransformer.fromHandlers({ void handleData(S data, EventSink<T> sink), void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), void handleDone(EventSink<T> sink)})
The changed version of the previous example is as follows:
import 'dart:async'; void main() { // some sample data var data = [1,2,3,4,5]; // create the stream var stream = new Stream<num>.fromIterable(data); // Create StreamTransformer with transformer closure var streamTransformer = new StreamTransformer<num, num> .fromHandlers( handleData:handleData, handleError:handleError, handleDone:handleDone); // Bound streams var boundStream = stream.transform(streamTransformer); // Because we start listening the 'bound' stream the // 'listen' method invokes the 'handleData' function boundStream.listen((data) { print('$data'), }); } handleData(num data, EventSink<num> sink) { sink.add(data * 2); } handleError(Object error, StackTrace stackTrace, EventSink<num> sink) { sink.addError(error, stackTrace); } handleDone(EventSink<num> sink) { sink.close(); }
The following result of this execution looks similar to previous one:
2 4 6 8 10
The StreamIterator
class permits a stream to be read using the iterator operations. It has the moveNext
method that waits for the next stream's value to become available and returns the Future
value of the bool
type, as follows:
Future<bool> moveNext();
If the result of moveNext
is a success, then the Future
class completes with the true
value, else the iteration is done and no new value will be available. The current value of the stream exists in the current
property of the StreamIterator
instance, as shown in the following code:
T get current;
This value is valid when the Future
class returned by the moveNext
method completes with the true
value and only until the next iteration. A StreamIterator
class is an abstract class and can be instantiated only via the factory constructor, as follows:
factory StreamIterator(Stream<T> stream)
Let's change the example from the previous topic to use StreamIterator
. We will create a simple stream from Iterable
as we did before. Then, we will create an instance of StreamIterator
. Finally, we will use the forEach
function to iterate over the stream and call the closure function to print scaled elements of the stream, as shown in the following code:
main() { // some sample data var data = [1,2,3,4,5]; // create the stream var stream = new Stream<num>.fromIterable(data); // Create an iterator var iterator = new StreamIterator(stream); // Iterate over all elements of iterator and print values forEach(iterator, (value) { // Scale the data double. print(value * 2); }); }
Actually, this code looks similar to the ones where we used iterators. All the magic happens inside the forEach
method, as shown in the following code:
forEach(StreamIterator iterator, f(element)) { return Future.doWhile(() { Future future = iterator.moveNext(); future.then((bool hasNext) { if (hasNext) { f(iterator.current); } }); return future; }); }
As the moveNext
method returns the Future
value, we need to use the doWhile
method of the Future
class to perform the iteration. The Boolean result of Future
returns a hasNext
parameter. We call the closure function until the value of the hasNext
parameter is true
.
The code generates the following result:
2 4 6 8 10
18.189.171.125