Filtering streams

As we saw in the preceding section, it is possible to filter a stream of events and, from it produce a new stream of events. You might be familiar with being able to filter items in an array. ES5 introduced a number of new operators for arrays such as filter and some. The first of these produces a new array containing only elements which match the rule in the filter. Some is a similar function which simply returns true if any element of the array matches. These same sorts of functions are also supported on streams as well as functions you might be familiar with from functional languages such as First and Last. In addition to the functions which would make sense for arrays, there are a number of time series based functions which make much more sense when you consider that streams exist in time.

We've already seen debounce which is an example of a time based filter. Another very simple application of debounce is to prevent the annoying bug of users double-clicking a submit button. Consider how much simpler the code for that is using a stream:

Rx.Observable.FromEvent(button, "click")
.debounce(1000).subscribe((x)=>doSomething());

You might also find it that functions like Sample – which generates a set of events from a time window. This is a very handy function when we're dealing with observables which may produce a large number of events. Consider an example from our example world of Westeros.

Unfortunately, Westeros is quite a violent place where people seem to die in unpleasant ways. So many people die that we can't possibly keep an eye on each one so we'd like to just sample the data and gather a few causes of death.

To simulate this incoming stream, we will start with an array, something like the following:

var deaths = [
  {
    Name:"Stannis",
    Cause: "Cold"
  },
  {
    Name: "Tyrion",
    Cause: "Stabbing"
  },
…
}

Tip

You can see we're using an array to simulate a stream of events. This can be done with any stream and is a remarkably easy way to perform testing on complex code. You can build a stream of events in an array and then publish them with appropriate delays giving an accurate representation of anything from a stream of events from the filesystem to user interactions.

Now we need to make our array into a stream of events. Fortunately, there are some shortcuts for doing that using the from method. This will simply return a stream which is immediately executed. What we'd like is to pretend we have a regularly distributed stream of events or, in our rather morbid case, deaths. This can be done by using two methods from RxJS: interval and zip. interval creates a stream of events at a regular interval. zip matches up pairs of events from two streams. Together these two methods will emit a new stream of events at a regular interval:

function generateDeathsStream(deaths) {
  return Rx.Observable.from(deaths).zip(Rx.Observable.interval(500), (death,_)=>death);
}

In this code we zip together the deaths array with an interval stream which fires every 500ms. Because we're not super interested in the interval event we simply discard it and project the item from the array onwards.

Now we can sample this stream by simply taking a sample and then subscribing to it. Here we're sampling every 1500ms:

generateDeathsStream(deaths).sample(1500).subscribe((item) => { /*do something */ });

You can have as many subscribers to a stream as you like so if you wanted to perform some sampling, as well as perhaps some aggregate functions like simply counting the events, you could do so by having several subscribers:

Var counter = 0;
generateDeathsStream(deaths).subscribe((item) => { counter++ });
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.217.5.86