The AsyncIO scheduler

Last but not least, AsyncIOScheduler allows us to schedule the emissions of an observable on an AsyncIO event loop. Obviously, when writing an application based on AsyncIO and ReactiveX, this is the most used scheduler. The prototype of this scheduler is the following one:

 AsyncIOScheduler.__init__(self, loop=None)

The optional loop parameter allows us to specify the event loop to use with the scheduler. The default value uses the current event loop. Using this scheduler is the same as the other ones:

import asyncio
import threading

loop = asyncio.get_event_loop()
asyncio_scheduler = AsyncIOScheduler()
numbers = Observable.from_([1,2,3,4], scheduler=asyncio_scheduler)

subscription = numbers
.map(lambda i: i*2)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next({}) {}"
.format(threading.get_ident(), i)),
on_error = lambda e: print("on_error({}): {}"
.format(threading.get_ident(), e)),
on_completed = lambda: print("on_completed({})"
.format(threading.get_ident()))
)

print("starting event ")
loop.run_forever()
loop.close()

This sample code gives the following result:

starting event loop
on_next(140736035734400) number is: 2
on_next(140736035734400) number is: 4
on_next(140736035734400) number is: 6
on_next(140736035734400) number is: 8
on_completed(140736035734400)

The important point here is the fact that the items are emitted after the event loop starts, although the subscription has been done before. This is the purpose of AsyncIOScheduler: it ensures that items are emitted in the context of the event loop, and if the event loop is not started yet, then the emission of items is deferred until the event loop starts. If the same code is used without using AsyncIOScheduler, then the output is the following one:

on_next(140736035734400) number is: 2
on_next(140736035734400) number is: 4
on_next(140736035734400) number is: 6
on_next(140736035734400) number is: 8
on_completed(140736035734400)
starting event loop

In this case, all items are processed before the event loop runs. This is an expected behavior because, without any scheduler provided, the subscription is synchronous. All items are already emitted when the subscribe function returns, so do not forget to use AsyncIOScheduler when using an AsyncIO event loop. Otherwise, when the event loop starts, the observables have already completed and nothing happens after that!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.74.25