Stream in a stream

We have been looking at different operators that change the values being emitted. There is another different aspect to streams: what if you need to create a new stream from an existing stream? Another good question is: when does such a situation usually occur?  There are plenty of situations, such as:

  • Based on a stream of keyUp events, do an AJAX call.
  • Count the number of clicks and determine whether the user single, double, or triple-clicked.

You get the idea; we are starting with one type of stream that needs to turn into another type of stream.

Let's first have a look at creating a stream and see what happens when we try to create a stream as the result of using an operator:

let stream$ = Rx.Observable.of(1,2,3)
.map(data => Rx.Observable.of(data));

// Observable, Observable, Observable
stream$.subscribe(data => console.log(data));

At this point, every value that passes through the map() operator produces a new Observable. When you subscribe to stream$, each value that is emitted will be a stream. Your first instinct might be to attach a subscribe() to each of those values, like this:

let stream$ = Rx.Observable
.of(1,2,3)
.map(data => Rx.Observable.of(data))


stream$.subscribe(data => {
data.subscribe(val => console.log(val))
});

// 1, 2, 3

Fight this urge. This will only create code that is hard to maintain. What you want to do is merge all these streams into one so, that you just need one subscribe(). There is an operator just for that, called flatMap(). What flatMap() does is to take your array of streams and turn them into one stream, a metastream. 

It is used in the following way:

let stream$ = Rx.Observable.of(1,2,3)
.flatMap(data => Rx.Observable.of(data))

stream$.subscribe(data => {
console.log(val);
});

// 1, 2, 3

OK, we get it, we don't want a stream of Observables, but rather a stream of values. This operator seems really great. We still aren't quite certain when to use though. Let's make this a bit more realistic. Imagine you have a UI that consists of an input field. The user enters characters into that input field. Imagine that you want to react to one or more characters being entered and, for example, perform an AJAX request as the result of characters being entered. We focus on two things here: how to collect characters being entered and how to perform an AJAX request.

Let' start with the first thing, capturing characters entered into an input field. For this, we need an HTML page and a JavaScript page. Let's start with the HTML page:

<html>
<body>
<input id="input" type="text">
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<script src="app.js"></script>
</body>
</html>

This depicts our input element and a script reference to RxJS, as well as a reference to the app.js file. Then we have app.js file, where we get a reference to the input element and start listening to keystrokes as soon as they are entered:

let elem = document.getElementById('input');
let keyStream$ = Rx.Observable
.fromEvent(elem, 'keyup')
.map( ev => ev.key);

keyStream$.subscribe( key => console.log(key));

// emits entered key chars

Worth highlighting is the fact that we start listening to keyup events being emitted by calling the fromEvent() creation operator. Thereafter, we apply the map() operator to dig out the character value store on ev.key.  Lastly, we subscribe to the stream. As expected, running this code will lead to characters being typed in the console as soon as you input values in the HTML page.

Let's make this more tangible by doing an AJAX request based on what we type. For this, we will be using the fetch() API and an online API called swapi (swapi.com), which contains a collection of APIs containing information on the Star Wars movies. Let's first define our AJAX call and then see how it fits into our existing stream of keys.

We said we would use fetch(). It lets us formulate a GET request as simple as this:

fetch('https://swapi.co/api/people/1')
.then(data => data.json())
.then(data => console.log('data', data));

Of course, we want to turn this request into an Observable so that it can play well with our keyStream$. Fortunately for us, this is easily accomplished through the use of the from() operator. Let's, however, first rewrite our fetch() call into a method that's easy to work with. The result of the rewrite looks like this:

function getStarwarsCharacterStream(id) {
return fetch('https://swapi.co/api/people/' + id)
.then(data => data.json());
}

This code allows us to provide an argument used to construct a URL which we use to fetch some data with AJAX. At this point, we are ready to connect our function to our existing stream. We do that by typing the following:

let keyStream$ = Rx.Observable.fromEvent(elem, 'keyup')
.map(ev => ev.key)
.filter(key => key !== 'Backspace')
.flatMap( key =>
Rx.Observable
.from(getStarwarsCharacterStream(key))
);

We highlight the usage of the flatmap() operator in bold using our from() conversion operator. The operator mentioned last takes our getStarwarsCharacterStream() function as a parameter. The from() operator converts said function into a stream. 

Here, we have learned how to connect two different streams, but also how to convert a Promise into a stream. As good as this approach seems on paper, using flatMap() has its limitations and it is important to understand what they are. For that reason, let's talk about the switchMap() operator next. The benefits of using a switchMap() operator will become clearer when we execute long-running tasks. For argument's sake, let's define such a task, like so:

function longRunningTask(input) {
return new Promise(resolve => {
setTimeout(() => {
resolve('response based on ' + input);
}, 5000);
});
}

In this code, we have a function that takes 5 seconds to execute; enough time to show the point we are trying to make. Next, let's show what the effect is if we keep using the flatMap() operator in the following code:

let longRunningStream$ = keyStream$
.map(ev => ev.key)
.filter(key => elem.value.length >3)
.filter( key => key !== 'Backspace')
.flatMap( key =>
Rx.Observable
.from(longRunningTask(elem.value))
);

longRunningStream$.subscribe(data => console.log(data));

The preceding code works in the following way: every time we hit a key, it generates an event. However, we have a .filter() operator in place that ensures an event is only generated when at least four keys are entered,   filter(key => elem.value.length >3). Let's talk about the user's expectation at this point. If a user enters keys in an input control, they most likely expect a request to be made when they are done typing. A user defines being done as entering a few characters and also that they should be able to remove characters if they were mistyped. So, therefore, we can assume the following input sequence:

// enters abcde
abcde
// removes 'e'

At this point, they have entered characters and, within a reasonable amount of time, edited their answer. The user expects to receive an answer based on abcd. Using the flatMap() operator, however, means the user will get two answers back because, in reality, they typed abcde and abcd. Imagine we get a results list based on these two inputs; it would most likely be two lists that looked somewhat different. The response based on our code would look like this:

Our code most likely would be able to handle the situation  described by rerendering the results list as soon as a new response arrives. There are two problems with this though: firstly, we do an unnecessary network request for abcde, and secondly, if the backend is fast enough in responding, we will see a flickering in the UI as the result list is rendered once and then, shortly after, is rendered again, based on the second response. This is not good, and we want to have a situation where the first request will be abandoned if we keep on typing. This is where the switchMap() operator comes in. It does exactly that. Let's therefore alter the preceding code to the following:

let longRunningStream$ = keyStream$
.map(ev => ev.key)
.filter(key => elem.value.length >3)
.filter( key => key !== 'Backspace')
.switchMap( key =>
Rx.Observable
.from(longRunningTask(elem.value))
)
;

In this code, we simply switched our flatMap() to a switchMap(). When we now execute the code in the exact same way, that is, the user firstly typing 12345 and shortly altering that to 1234, the end result is:

As we can see, we get one request only. The reason for this is that the previous event is aborted when a new event happens—switchMap() is doing its magic. The user is happy and we are happy.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.134.111.124