An overview of the stream framework API

You can see the hierarchy of the stream framework classes in the following diagram:

An overview of the stream framework API

The Stream class

The Stream class is abstract and provides a generic view on the sequence of byte lists. It implements numerous methods to help you manage or republish streams in your application. The sequence of events can be seamlessly provided by Stream. The events that are generated by Stream store the data to be delivered. In case of a failure, the Stream class generates an error event. When all the events have been sent, the Stream class generates done event.

The validation methods of the Stream class

The Stream class has the following methods that help you validate data returned from a stream:

  • any: This method checks whether the test callback function accepts any element provided by this stream
  • every: This method checks whether the test callback function accepts all the elements provided by this stream
  • contains: This method checks whether the needle object occurs in the elements provided by this stream

The search methods of the Stream class

The following methods of the Stream class help you search for specific elements in a stream:

  • firstWhere: This method finds the first element of a stream that matches the test callback function
  • lastWhere: This method finds the last element of a stream that matches the test callback function
  • singleWhere: This method finds the single element in a stream that matches the test callback function

The subset methods of the Stream class

The Stream class provides the following methods to create a new stream that contains a subset of data from the original one:

  • where: This method creates a new stream from the original one with data events that satisfy the test callback function
  • skip: This method creates a new stream with data events that are left after skipping the first count of the data events from the original stream
  • skipWhere: This method creates a new stream with the data events from the original stream when they are matched by the test callback function
  • take: This method creates a new stream with the data events that are provided at most the first count values of this stream
  • takeWhere: This method creates a new stream with the data events from the original stream when the test callback function is successful

You will find other methods of the Stream class on the official help page at https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart-async.Stream.

Creating a stream

The Stream class has several factory constructors to create single-subscription or broadcast streams. Let's discuss them in detail.

A new stream from the Future class

The first factory constructor could confuse you because it creates a new single-subscription stream from the Future instance with the following command:

factory Stream.fromFuture(Future<T> future)

This confusion will be quickly cleared if I remind you about the translation of the Future class into the Stream class via the asStream method from the same class, as shown in the following code:

…
Stream<T> asStream() => new Stream.fromFuture(this);
…

The resulting stream will fire an data or error event when the Future class is completed, and closes itself with the done event. The following code snippet shows how this factory constructor can be used:

import 'dart:async';

main() {
  // single sample data
  var data = new Future<num>.delayed(
      const Duration(milliseconds:500), () {
    // Return single value
    return 2;
  });
  // create the stream
  Stream<num> stream = new Stream<num>.fromFuture(data);
  // Start listening
  var subscriber = stream.listen((data) {
    print(data);
  }, onError:(error){
    print(error);
  }, onDone:() {
    print('done'),
  });
}

The execution of the preceding code prints the following result on the console:

2
done

A new stream from the Iterable class

It's quite obvious that you should have a factory constructor that creates a single-subscription stream from any Iterable instance. Iteration over a stream happens only if the stream has a listener and can be interrupted if the listener cancels the subscription. Any error that occurs in the iteration process immediately ends the stream and throws an error. In this case, the done event doesn't fire because the iteration was not complete.

In the following code snippet, we create an instance of the Iterable class with a factory constructor that generates the sequence of numbers. We intentionally generate an Exception at the location of the third element:

import 'dart:async';

main() {
  // some sample generated data
  var data = new Iterable<num>.generate(5, (int indx) {
    if (indx < 3) {
      return indx;
    } else {
      throw new Exception('Wrong data'),
    }
  }); 
  // create the stream
  Stream<num> stream = new Stream<num>.fromIterable(data);
  // Start listening
  var subscriber = stream.listen((data) {
    print(data);
  }, onError:(error){
    print(error);
  }, onDone:() {
    print('done'),
  });
}

The result of the preceding code is as follows:

0
1
2
Exception: Wrong data

A new stream with periodically generated events

In the previous topic, we used the Iterable.generate factory constructor to emit numeric data into our stream. However, we can do that well via the Stream.periodic factory constructor, as follows:

factory Stream.periodic(Duration period, 
  [T computation(int computationCount)])

We can do this in a natural stream and a less verbose way, as follows:

import 'dart:async';

main() {
  // some sample generated data
  Stream<num> stream = new Stream
    .periodic(const Duration(milliseconds: 500), (int count) {
    // Return count
    return count;
  });
  // Start listening
  var subscriber = stream.listen((data) {
    print(data);
  }, onError:(error){
    print(error);
  }, onDone:() {
    print('done'),
  });
}

The first parameter is the duration that gives you the interval between emitting events. The second parameter is the callback function that computes the event values. This function has a single parameter that defines sequential number of iterations.

A new stream from the transformation pipe

The Stream.eventTransformed factory constructor is quite interesting because it creates a new Stream from the existing one with the help of a sink transformation, as shown in the following code:

factory Stream.eventTransformed(Stream source, 
  EventSink mapSink(EventSink<T> sink))

The first parameter is the source stream that provides events to the new one. The mapSink callback function is the one that is called when a new stream is listening. All the events from the existing stream pass through the sink to reach a new stream. This constructor is widely used to create stream transformers. In the following code, we will create a DoublingSink class. It accepts the output stream as an argument of the constructor. We will implement the number-doubling algorithm inside the add method. The other addError and close methods are simple and pass the incoming parameter values to the underlying stream object, as shown in the following code:

import 'dart:async';

/**
 * An interface that abstracts creation or handling of 
 * Stream events.
 */
class DoublingSink implements EventSink<num> {
  final EventSink<num> _output;
  
  DoublingSink(this._output);
  
  /** Send a data event to a stream. */
  void add(num event) {
    _output.add(event * 2);
  }
  
  /** Send an async error to a stream. */
  void addError(errorEvent, [StackTrace stackTrace])  => 
      _output.addError(errorEvent, stackTrace);
  
  /** Send a done event to a stream.*/
  void close() => _output.close();
}

The DoublingTransformer class implements StreamTransformer for numbers. In the bind method, which is compulsory, we will create a new stream via the eventTransformer constructor and return the instance of DoublingSink as result of the constructor's callback, as shown in the following code:

class DoublingTransformer implements StreamTransformer<num, num> {
  Stream<num> bind(Stream<num> stream) {
    return new Stream<num>.eventTransformed(stream, 
        (EventSink sink) => new DoublingSink(sink));
  }
}

void main() {
  // some sample data
  var data = [1,2,3,4,5]; 
  // create the stream
  var stream = new Stream<num>.fromIterable(data); 
  // Create DoublingTransformer
  var streamTransformer = new DoublingTransformer();
  // Bound streams
  var boundStream = stream.transform(streamTransformer);
  // Because we start listening the 'bound' stream the 'listen' method
  // invokes the 'doublingTransformer' closure
  boundStream.listen((data) {
    print('$data'),
  });
}

In the main method, we created a simple stream via the Stream.fromIterable factory constructor and created a stream transformer as an instance of DoublingTransformer. So, we can combine them together in a call of the transform method. When we start listening to the bounded stream, events from the source stream will be doubled inside DoublingSink and accommodated here. The following result is expected:

2
4
6
8
10

A new stream from StreamController

In the previous topics, we saw how a stream can be easily created from another stream with the help of one of the factory constructors. However, you can create a stream from scratch with help of StreamController, which gives you more control over generating events of a stream. With StreamController, we can create a stream to send the data, error, and done events to the stream directly. A stream can be created via the StreamController class through the different factory constructors. If you plan to create a single-subscription stream, use the following factory constructor:

factory StreamController( {void onListen(), void onPause(), void   
  onResume(), onCancel(),bool sync: false})

The controller has a life cycle that presents the following states:

  • Initial state: This is where the controller has no subscription. The controller buffers all the data events in this state.
  • Subscribed state: In this state, the controller has a subscription. The onListen and onCancel callback functions are called when the subscriber registers or ends the subscription accordingly. The callback functions onPause and onResume are called when the controlling stream via a subscriber changes the state to pause or resume. The controller may not call the onResume callback function if the new data from the stream was canceled.
  • Canceled state: In this state, the controller has no subscription.
  • Closed state: In this state, adding more events is not allowed.

If the sync attribute is equal to true, it tells the controller that the events might be directly passed into the listening stream by the subscriber when the add, addError, or close methods are called. In this case, the events will be passed only after the code that creates the events has returned.

A stream instance is available via the stream property. Use the add, addError, and close methods of StreamSink to manage the underlying stream. The controller buffers the data until a subscriber starts listening, but bear in mind that the buffering approach is not optimized to keep a high volume of events. The following code snippet shows you how to create a single-subscription stream with StreamController:

import 'dart:async';

main() {
  // create the stream
  Stream<num> stream = createStream();
  // Start listening
  StreamSubscription<num> sub = createSubscription(stream);
}

StreamSubscription<num> createSubscription(Stream<num> stream) {
  StreamSubscription subscriber;
  subscriber = stream.listen((num data) {
    print('onData: $data'),
    // Pause subscription on 3-th element
    if (data == 3) {
      subscriber.pause(new Future.delayed(
          const Duration(milliseconds: 500), () => 'ok'));
    }
  }, 
  onError:(error) => print('onError: $error'), 
  onDone:() => print('onDone'));
  return subscriber;
}

Stream<num> createStream() {
  StreamController<num> controller = new 
    StreamController<num>(
      onListen:() => print('Listening'), 
      onPause: () => print('Paused'), 
      onResume: () => print('Resumed'), 
      onCancel: () => print('Canceled'), sync: false);
  //
  num i = 0;
  Future.doWhile((){
    controller.add(i++);
    // Throws exception on 5-th element
    if (i == 5) {
      controller.addError('on ${i}-th element'),
    }
    // Stop stream at 7-th event
    if (i == 7) {
      controller.close();
      return false;
    }
    return true;
  });
  return controller.stream;
}

In the preceding code, we intentionally throw an error at the 5-th element and stop the stream at the 7-th element. The stream subscriber paused listening to the stream at the 3-th element and resumed it after a delay of 500 milliseconds. This will generate the following result:

Listening
onData: 0
onData: 1
onData: 2
onData: 3
Paused
onData: 4
onError: on 5-th element
onData: 5
onData: 6
Canceled
onDone

The following factory constructor creates a controller for the broadcast stream, which can be listened to more than once:

factory StreamController.broadcast({void onListen(),
  void onCancel(), bool sync: false})

The controller created by this constructor delivers the data, error, or done events to all listeners when the add, addError, or close methods are called. The invocation method with the same name is called in an order and is always before a previous call is returned. This controller, as opposed to the single-subscribed one, doesn't have the internal queue of events. This means that the data or error event will be lost if there are no listeners registered at the time, this event is added. Each listener subscription acts independently. If one subscription pauses, then only this one is affected, so all the events buffer internally in the controller until the subscription resumes or cancels. The controller has a life cycle that has the following states:

  • Initial state: This is where the controller has no subscription. The controller losses all the fired data and error events in this state.
  • Subscribed state: This is where the first subscription is added to the controller. The onListen and onCancel callback functions are called at the moment when the first subscriber is registered or the last one ends its subscription simultaneously.
  • Canceled state: In this state, the controller has no subscription.
  • Closed state: In this state, adding more events is not allowed.

If the sync attribute is equal to true, it tells the controller that events might be passed directly into the listening stream by subscribers when the add, addError, or close methods are called. Hence, the events will be passed after the code that creates the event is returned, but this is not guaranteed when multiple listeners get the events. Independent of the value of the sync attribute, each listener gets all the events in the correct order. The following is a slightly changed version of the previous code with two subscriptions:

import 'dart:async';

main() {
  // create the stream
  Stream<num> stream = createStream();
  StreamSubscription<num> sub1 = createSubscription(stream, 1);
  StreamSubscription<num> sub2 = createSubscription(stream, 2);
}

StreamSubscription<num> createSubscription(Stream<num> stream, num number) {
  // Start listening
  StreamSubscription subscriber;
  subscriber = stream.listen((num data) {
   print('onData ${number}: $data'),
   // Pause subscription on 3-th element
   if (data == 3) {
     subscriber.pause(new Future.delayed(
         const Duration(milliseconds: 500), () => 'ok'));
   }
  }, 
  onError:(error) => print('onError: $error'), 
  onDone:() => print('onDone'));
  return subscriber;
}

Stream<num> createStream() {
  StreamController<num> controller = new 
    StreamController<num>.broadcast(
      onListen:() => print('Listening'), 
      onCancel: () => print('Canceled'), sync: false);
  //
  num i = 0;
  Future.doWhile((){
    controller.add(i++);
    // Throws exception on 5-th element
    if (i == 5) {
      controller.addError('on ${i}-th element'),
    }
    // Stop stream at 7-th event
    if (i == 7) {
      controller.close();
      return false;
    }
    return true;
  });
  return controller.stream;
}

The preceding code snippet generates the following result:

Listening
onData 1: 1
onData 2: 1
onData 1: 2
onData 2: 2
onData 1: 3
onData 2: 3
onData 1: 4
onError: on 5-th element
onData 1: 5
onData 1: 6
onDone
onData 2: 4
onError: on 5-th element
onData 2: 5
onData 2: 6
Canceled
onDone

These results reaffirm the fact that the broadcast stream doesn't guarantee the order of the delivery events to different listeners. It only guarantees an order of the delivery events inside each listener.

What does the StreamSubscription class do?

The listen method of the Stream class adds the following subscription to the stream:

StreamSubscription<T> listen(
  void onData(T event), 
    { Function onError, void onDone(), bool cancelOnError});

A callback function onData is called every time when a new data event comes from this stream. Existence of this function is important because without it nothing will happen. The optional onError callback is called when an error comes from the stream. This function accepts one or two arguments. The first argument is always an error from the stream. The second argument, if it exists, is a StackTrace instance. It can be equal to null if the stream received an error without StackTrace itself. When the stream closes, it calls the onDone callback function. The cancelOnError flag informs the subscription to start the cancellation the moment the error occurs.

A result of this method is the instance of the StreamSubscription class. It provides events to the listener and holds the callback functions used in the listen method of the Stream class. You can set or override all the three callback functions via the onData, onError, and onDone methods of the StreamSubscriber class. The listening stream can be paused and resumed with the pause and resume methods. A special flag isPaused returns true if an instance of the StreamSubscription class is paused. The stream subscription can end with the cancel method at any time. It returns a Future instance, which completes with a null value when the stream is done cleaning up. This feature is also useful for tasks such as closing a file after reading it.

Minimizing access to the Stream class members using StreamView

The StreamView class is wrapper for the Stream class exposes only the isBroadcast getter, the asBroadCastStream and listen methods from original one. So if you need clear Stream interface in your code, you can use it like this:

import 'dart:async';

main() {
  // some sample data
  var data = [1,2,3,4,5]; 
  // create the stream
  var stream = new Stream<num>.fromIterable(data);
  // Create a view
  var streamView = new StreamView(stream);
  // Now listen stream view like stream
  var subscriber = streamView.listen((data) {
    print(data);
  }, onError:(error){
    print(error);
  }, onDone:() {
    print('done'),
  });
}

You will get the following result:

1
2
3
4
5
done

The Sink and EventSink interfaces

A Sink class represents a generic interface for data receivers. It defines the add method that will put the data in the sink and the close method, which tells the sink that no data will be added in future. The EventSink class uses the add method of the Sink class to send a data event to a stream, as well as the close method to send a done event. The addError method belongs to the EventSink class that sends an asynchronous error to a stream.

Importance of the StreamConsumer interface

We can bind one stream to another via the pipe method of the Stream class. The consumer stream is represented by the StreamConsumer interface. This interface defines the contract between two streams. The addStream method is used to consume the elements of the source stream. The consumer stream will listen on the source stream and do something for each event. It may stop listening after an error or may consume all errors and stop at the done event. The close method tells the consumer that no future streams will be added. This method returns the Future instance that is completed when the source stream has been consumed and the consumer is closed.

What does the StreamSink class do?

The StreamSink class combines methods from StreamConsumer and EventSink. You should know that methods from both the classes will block each other. We cannot send the data or error events via the methods of the EventSink class while we are adding the source stream via the addStream method from StreamConsumer. We can start using the methods from EventSink only after the Future instance returned by the addStream method is completed with a value. Also, the addStream method will be delayed until the underling system consumes the data added by the EventSink method. The StreamSink class has a done getter that returns the Future instance that is completed when the owned StreamSink class is finished with one of the following conditions:

  • It is completed with an error as a result of adding events in one of the add, addError, or close methods of the EventSink class
  • It is completed with success when all the events have been processed and the sink has been closed or the sink has been stopped from handling more events

Transforming streams with the StreamTransformer class

The StreamTransformer class helps you create a new consumer stream that is bound to the original one via the bind method. The StreamTransformer class can be instantiated through two factory constructors that define different strategies on how the transformation will happen. In following factory constructor, we need to specify the following special transformer function:

const factory StreamTransformer (Function StreamSubscription<T> 
  transformer(Stream<S> stream, boolcancelOnError))

The transformer function receives a bounded stream as an argument and returns an instance of the StreamSubscription class. If you are planning to implement your own stream transformer function, it will look like this:

import 'dart:async';

void main() {
  // some sample data
  var data = [1,2,3,4,5]; 
  // create the stream
  var stream = new Stream<num>.fromIterable(data); 
  // Create StreamTransformer with transformer closure
  var streamTransformer = 
    new StreamTransformer<num, num>(doublingTransformer);
  // Bound streams
  var boundStream = stream.transform(streamTransformer);
  // Because we start listening the 'bound' stream the 
  // 'listen' method invokes the 'doublingTransformer' 
  // closure
  boundStream.listen((data) {
    print('$data'),
  });
}

StreamSubscription doublingTransformer(Stream<num> input, 
  bool cancelOnError) {

  StreamController<num> controller;
  StreamSubscription<num> subscription;
  controller = new StreamController<num>(
    onListen: () {
      subscription = input.listen((data) {
          // Scale the data double.
          controller.add(data * 2);
        },
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: cancelOnError);
    });
  return controller.stream.listen(null);
}

The preceding code generates the following output:

2
4
6
8
10

The other factory method creates a StreamTransformer class that delegates events to the special functions, which handle the data, error, and done events, as shown in the following code:

factory StreamTransformer.fromHandlers({
      void handleData(S data, EventSink<T> sink),
      void handleError(Object error, StackTrace stackTrace, 
        EventSink<T> sink),
      void handleDone(EventSink<T> sink)})

The changed version of the previous example is as follows:

import 'dart:async';

void main() {
  // some sample data
  var data = [1,2,3,4,5]; 
  // create the stream
  var stream = new Stream<num>.fromIterable(data); 
  // Create StreamTransformer with transformer closure
  var streamTransformer = new StreamTransformer<num, num>
    .fromHandlers(
      handleData:handleData, 
      handleError:handleError, 
      handleDone:handleDone);
  // Bound streams
  var boundStream = stream.transform(streamTransformer);
  // Because we start listening the 'bound' stream the 
  // 'listen' method invokes the 'handleData' function
  boundStream.listen((data) {
    print('$data'),
  });
}

handleData(num data, EventSink<num> sink) {
  sink.add(data * 2);
}

handleError(Object error, StackTrace stackTrace, EventSink<num> sink) {
  sink.addError(error, stackTrace);
}

handleDone(EventSink<num> sink) {
  sink.close();
}

The following result of this execution looks similar to previous one:

2
4
6
8
10

Traverse streams with StreamIterator

The StreamIterator class permits a stream to be read using the iterator operations. It has the moveNext method that waits for the next stream's value to become available and returns the Future value of the bool type, as follows:

Future<bool> moveNext();

If the result of moveNext is a success, then the Future class completes with the true value, else the iteration is done and no new value will be available. The current value of the stream exists in the current property of the StreamIterator instance, as shown in the following code:

T get current;

This value is valid when the Future class returned by the moveNext method completes with the true value and only until the next iteration. A StreamIterator class is an abstract class and can be instantiated only via the factory constructor, as follows:

factory StreamIterator(Stream<T> stream)

Let's change the example from the previous topic to use StreamIterator. We will create a simple stream from Iterable as we did before. Then, we will create an instance of StreamIterator. Finally, we will use the forEach function to iterate over the stream and call the closure function to print scaled elements of the stream, as shown in the following code:

main() {
  // some sample data
  var data = [1,2,3,4,5]; 
  // create the stream
  var stream = new Stream<num>.fromIterable(data);
  // Create an iterator
  var iterator = new StreamIterator(stream);
  // Iterate over all elements of iterator and print values
  forEach(iterator, (value) {
    // Scale the data double.
    print(value * 2);
  });
}

Actually, this code looks similar to the ones where we used iterators. All the magic happens inside the forEach method, as shown in the following code:

forEach(StreamIterator iterator, f(element)) {
  return Future.doWhile(() {
    Future future = iterator.moveNext();
    future.then((bool hasNext) {
      if (hasNext) {
        f(iterator.current);
      }
    });
    return future;
  });
}

As the moveNext method returns the Future value, we need to use the doWhile method of the Future class to perform the iteration. The Boolean result of Future returns a hasNext parameter. We call the closure function until the value of the hasNext parameter is true.

The code generates the following result:

2
4
6
8
10
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.171.125