Using Transformation to track execution time

Another useful example for the ObservableTransformer can be the tracking of the emission times of the items produced by the Observable. In other words, how much times has passed since the start when this item was emitted.

Again, we will use a very similar approach as before. We will create a class, called TimingObservableTransformer:

package packt.reactivestocks;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;

public class TimingObservableTransformer<R> implements
ObservableTransformer<R, R> {

private final
Consumer<Long> timerAction;

public
TimingObservableTransformer(Consumer<Long> timerAction) {
this.timerAction = timerAction;
}

}

Here, we have added a field that expects a variable of the Consumer<Long> type. We will use this as a handler; it will execute an action when each item is emitted, and it passes the elapse time since the start of the subscription.

To do this, we use Observable.combineLatest() to record the time at the very beginning, and then we will merge it with the original item from the upstream Observable. It might sound confusing, but the actual implementation is quite simple:

@Override
public ObservableSource<R> apply(Observable<R> upstream) {
return Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)
.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() -
pair.first.getTime();
long diffSeconds = diff / 1000;

timerAction
.accept(diffSeconds);
})
.map(pair -> pair.second);
}

Let's break this down piece by piece. The following part basically records the current time at the moment of subscription and merges it with the items produced by the original upstream Observable using the Pair class from android.util:

Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)

Next, the following block receives the pair and calculates the time difference in seconds between the initial time (pair.first) and the current time:

.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() -
pair.first.getTime();
long diffSeconds = diff / 1000;

timerAction
.accept(diffSeconds);
})

This calculation is done with this:

long diff = currentTime.getTime() - pair.first.getTime();
long diffSeconds = diff / 1000;

Then, the time is passed to the timerAction Consumer and some action that we will provide is executed.

Finally, the following makes the Observable return the original value of the upstream Observable without any date related stuff so that it can be used in the same way as before the Transformer was applied:

.map(pair -> pair.second);

Again, we will add some static Factory Methods to make the instantiation easier with this:

public static <R> TimingObservableTransformer<R> timeItems(Consumer<Long> timerAction) {
return new TimingObservableTransformer<>(timerAction);
}

At the end, the Transformer is used as shown:

Observable.interval(4, TimeUnit.SECONDS)
.compose(timeItems((seconds) -> {
Log.d("APP", "Seconds passed since the start: " + seconds);
}))
.subscribe(this::log);

After this code is executed, we will find the following lines in the logs:

APP: Seconds passed since the start: 4
APP: Seconds passed since the start: 8
APP: Seconds passed since the start: 12
APP: Seconds passed since the start: 16

It is worth pointing out that we could have used something like the following to record the instant when the subscription happened and then when the Observable was terminated (or in .doOnNext() for each item) to capture the elapsed time:

Date startDate;
...
return
upstream
.doOnSubscribe((disposable) -> {
this.startDate = new Date();
})
.doOnTerminate(() -> {
});

The problem with this approach is that we will need to have some shared field in the TimingObservableTransformer class and when the Observable subscribes, there will be side effects in the Transformer. This will mean that this transformer instance can't be reused between the different subscriptions for the same Observable. Consider the given example:

final Observable<Long> observable = Observable.interval(4, TimeUnit.SECONDS)
.compose(timeItems((seconds) -> {
Log.d("APP", "Seconds passed since the start: " + seconds);
}));

observable.subscribe(this::log);
observable.subscribe(this::log);

This will break because the different subscriptions (and thus different items during separate executions) will end up using the same variables that will cause the date being overwritten or just plainly wrong.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.227.4