SubscribeOn and ObserveOn

We learned how to run a task on a Scheduler. But how can we use this opportunity to work with Observables? RxJava provides a subscribeOn() method that can be used with every Observable object. The subscribeOn() method takes Scheduler as a parameter and takes care of executing the Observable call on that Scheduler.

For a real-world example, we will tune up our loadList() function. First of all, we need a new getApps() method to retrieve our installed apps list:

private Observable<AppInfo> getApps() {
return Observable
            .create(subscriber -> {
                List<AppInfo> apps = new ArrayList<>();

                SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
                Type appInfoType = new TypeToken<List<AppInfo>>() {
                }.getType();
                String serializedApps = sharedPref.getString("APPS", "");
if (!"".equals(serializedApps)) {
                    apps = new Gson().fromJson(serializedApps,  appInfoType);
                }

for (AppInfo app : apps) {
                    subscriber.onNext(app);
                }
                subscriber.onCompleted();
            });
}

The getApps() method returns an Observable of AppInfo. It basically reads our installed apps list from Android's SharedPreferences, desterilizes it, and starts emitting AppInfo items one by one. Using this new method to retrieve our list, loadList() needs to be changed like this:

private void loadList() {
mRecyclerView.setVisibility(View.VISIBLE);

    getApps()
            .subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
                    Toast.makeText(getActivity(), "Here is the  list!", Toast.LENGTH_LONG).show();
                }

@Override
public void onError(Throwable e) {
                    Toast.makeText(getActivity(), "Something went  wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
                }

@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
                }
            });
}

If we run this code, StrictMode will report a violation because SharedPreferences are typically slow I/O operations. What we need to do is to specify getApps() needs to be executed on which Scheduler:

getApps()
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<AppInfo>() { […]

Schedulers.io() will get rid of the StrictMode violation, but our app is now poorly crashing because of this:

            at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:58)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
            at java.util.concurrent.FutureTask.run(FutureTask.java:237)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
            at java.lang.Thread.run(Thread.java:841)
     Caused by: android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.

Only the original thread that created a view hierarchy can touch its views.

We are back to the Android world again. This message is simply telling us that we are trying to modify the UI from a thread that is not the UI thread. It makes sense: we asked to execute the code on the I/O Scheduler. So, basically, we need to execute the code with the I/O Scheduler, but we need to be on the UI thread when the result comes in. RxJava lets you subscribe to a specific Scheduler but also to observe a specific Scheduler. We just need to add a couple of lines to our loadList() function, and every piece will be in place:

getApps()
.onBackpressureBuffer()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new Observer<AppInfo>() { […]

The observeOn() method will provide the result on a specific Scheduler: the UI thread in our example. The onBackpressureBuffer() method will instruct Observable that if it emits items faster than Observer can consume, it has to store them in a buffer and provide them with the proper timing. After this changes, if we run the app, we have our classic installed apps list:

SubscribeOn and ObserveOn
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.239.103