The start operator

The start operator schedules the execution of a function asynchronously and emits its result in an observable. The marble diagram of the start operator is shown in the following figure:

Figure 4.17: The start operator

Its prototype is as follows:

Observable.start(func, scheduler=None)

The function provided as an argument is scheduled immediately, not during the subscription. If no scheduler is provided, the timeout scheduler is used (see Chapter 5Concurrency and Parallelism in RxPY, for details on schedulers), with a timeout value of 0; that is, no timeout. In this case, the execution of the func function is still done in the context of a timer thread. Let's see this in action, as follows:

def foo():
print("foo from {}".format(threading.get_ident()))
return 1

number = Observable.start(foo)
print("subscribing...")
number.subscribe(
on_next=lambda i: print("on_next: {} from {}".format(
i, threading.get_ident())),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The preceding code sample will provide the following output:

foo from 123145595981824
subscribing...
on_next: 1 from 140736035734400
completed

When the start operator is called, the execution of the foo function is scheduled immediately. The log confirms that it is called before the subscription to the observable on the 123145595981824 thread. Once the subscription is done, the result of foo is received on the observable, in the context of the main thread of the application; that is, 140736035734400.

This behavior is different when the asyncio scheduler is being used. In this case, the foo function is executed from the event loop. An example is as follows:

from rx.concurrency import AsyncIOScheduler

scheduler = AsyncIOScheduler()

def foo():
print("foo from {}".format(threading.get_ident()))
return 2


loop = asyncio.get_event_loop()
done = loop.create_future()

number = Observable.start(foo, scheduler=scheduler)
print("subscribing...")
number.subscribe(
lambda i: print("on_next: {} from {}".format(
i, threading.get_ident())),
lambda e: print("on_error: {}".format(e)),
lambda: done.set_result(0)
)

print("staring mainloop from {}".format(threading.get_ident()))
loop.run_until_complete(done)
loop.close()

In this new example, an AsyncIOScheduler is passed to the start operator, and this changes the way actions are scheduled, as follows:

subscribing...
staring mainloop from 140736035734400
foo from 140736035734400
on_next: 2 from 140736035734400

In this case, the subscription happens first; then the event loop is started, the foo function is called, and finally, the result of foo is emitted on the observable. The foo function is called after the event loop starts. When the observable is created, the execution of foo is scheduled on the event loop. However, since the event loop has not started yet, its execution is pending. Once the event loop starts, foo is executed and its return value is emitted. Also, in this case, both the execution of foo and the reception of the item are done in the context of the main thread of the application; that is, 140736035734400.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.131.47