The Observable.create method

At any time, you can create your own custom implementation of Observable with the Observable.create method. This method takes an instance of the ObservableEmitter<T> interface as a source to observe. Take a look at the following code example:

fun main(args: Array<String>) { 
 
    val observer: Observer<String> = object : Observer<String> { 
        override fun onComplete() { 
            println("All Completed") 
        } 
 
        override fun onNext(item: String) { 
            println("Next $item") 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error Occured => ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
            println("New Subscription ") 
        } 
    }//Create Observer 
 
    val observable:Observable<String> = Observable.create<String> {//1 
        it.onNext("Emitted 1") 
        it.onNext("Emitted 2") 
        it.onNext("Emitted 3") 
        it.onNext("Emitted 4") 
        it.onComplete() 
    } 
 
    observable.subscribe(observer) 
 
    val observable2:Observable<String> = Observable.create<String> {//2 
        it.onNext("Emitted 1") 
        it.onNext("Emitted 2") 
        it.onError(Exception("My Exception")) 
    } 
 
    observable2.subscribe(observer) 
} 

First, we created an instance of the Observer interface as the previous example. I will not elaborate the observer value, as we have already seen an overview in the previous example, and will see this in detail later in this chapter. At comment 1, we created an Observable value with the Observable.create method. We have emitted four strings from the Observable value with the help of onNext method, then notified it is complete with the onComplete method. At comment 2, we almost did the same except here, instead of calling onComplete, we called onError with a custom Exception function.

Here is the output of the program:

The Observable.create method is useful, especially when you are working with a custom data structure and want to have a control over which values are getting emitted. You can also emit values to the observer from a different thread.

The Observable contract (http://reactivex.io/documentation/contract.html) states that Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.69.163