Concurrency and schedulers

Chapter 2Asynchronous Programming in Python, explained the principles of concurrency, and its two categories:

  • I/O concurrency
  • CPU concurrency

An asynchronous framework is designed to deal with I/O concurrency by multiplexing all I/O requests on a single process and thread. The AsyncIO framework and ReactiveX are both tools in this category. As such, they are a perfect fit for applications that are I/O bound, such as network-based applications and tasks involving interactions with databases. However, there are situations where a full asynchronous design cannot be applied. This can occur in two cases:

  • When doing CPU-intensive actions
  • When using blocking APIs

Both of them break the behavior of an asynchronous system because they block the event loop for a very long time, and prevent other tasks from executing. On some asynchronous frameworks, handling such cases is simply not possible. The code must be fully asynchronous and not use CPU-intensive operations or blocking APIs, which is not always possible; so one has to ignore these behaviors and the application will break sooner or later.

Fortunately, AsyncIO and RxPY acknowledge that such cases exist, and they provide tools to deal with them. More than that, ReactiveX and RxPY allow us to deal with these situations once again in an easy way. So what does it mean to handle CPU concurrency in asynchronous code? Since the issue is that the current core is saturated by an action, there are only two solutions. Both of them have been covered in Chapter 2Asynchronous Programming in Python: multithreading and multiprocessing. These are the two ways to use all the available CPU cycles of all cores. ReactiveX and RxPY allow us to use multithreading in an application and provide a way to schedule where each operator will execute. In practice, this means that it is possible to specify, for each operator in an operator chain, on what execution context they will run. In an AsyncIO application, there are two kinds of execution context:

  • The event loop
  • A thread (eventually running its own event loop)

The selection of an execution context is possible in three different ways:

  • When creating an observable
  • By using the subscribe_on operator
  • By using the observe_on operator

These three methods allow us to control the execution context of two parts of an observable chain: the source of the chain and each operator in a chain. The source of the chain is the first observable of the chain, usually created with a factory operator. It is possible to control on what execution context the items of this observable are emitted, either by providing a scheduler object to the factory operator or by using the subscribe_on operator. Consider the following code:

numbers = Observable.from_([1,2,3,4])

subscription = numbers
.map(lambda i: i*2)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

The default behavior of RxPY is to consume all the source observable synchronously, at subscription time. So in this example, all items are emitted and transformed with the two map operators within the execution of the subscribe operator. When the subscribe call returns, the numbers observable is completed. The first way to change this is by providing a scheduler object to the from_ operator. This can be done by changing the first line to something like the following:

scheduler = SomeScheduler()
numbers = Observable.from_([1,2,3,4], scheduler=scheduler)

With this code, the items of the numbers observable are not emitted anymore when subscribe is called. Rather, they are scheduled for emission when subscribe is called. Depending on the type of the scheduler, this can mean different things: either emitting the items in the context of another thread or emitting items in the context of the AsyncIO event loop. The same behavior can be applied by using the subscribe_on operator:

scheduler = SomeScheduler()
numbers = Observable.from_([1,2,3,4])

subscription = numbers
.map(lambda i: i*2)
.map(lambda i: "number is: {}".format(i))
.subscribe_on(scheduler)
.subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

Note the addition of the subscribe_on call before the subscription. Also note that there is no scheduler provided to the from_ operator. Calling the subscribe_on operator controls on which execution context the source observable of the chain will emit its items. The subscribe_on operator can be use anywhere in the chain, but only once. This method is more flexible than providing the scheduler when creating the observable. Sometimes the code that creates the observable is not aware of the scheduler to use, but another piece of code that completes the observable chain has this knowledge.

These two methods allow us to control how the source items are emitted. Another operator allows us to specify the execution context that is used for all operators following its call to observe_on:

scheduler = SomeScheduler()
another_scheduler = AnotherScheduler()
yet_another_scheduler = AnotherScheduler()
numbers = Observable.from_([1,2,3,4])

subscription = numbers
.map(lambda i: i*2)
.observe_on(another_scheduler)
.map(lambda i: "number is: {}".format(i))
.subscribe_on(scheduler)
.observe_on(yet_another_scheduler)
.subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

The first map operator is executed in the context of scheduler (as requested with the call to subscribe_on), the second map operator is executed in the context of another_scheduler, and the subscription callbacks are called in the context of yet_another_scheduler. So an observable chain can switch between several execution contexts, simply by calling the observe_on operator several times. Once again, compared to the classic way of executing code in another thread and handling synchronization, ReactiveX provides an easy and elegant solution to a complex topic. The following figure shows marble diagram for the subscribe_on and observe_on operators for this last example:

Figure 5.1: The subscribe_on and observe_on operators

In this figure, the shape of the vertical arrows identifies an execution context. The execution context of the source observable corresponds to the scheduler provided in the subscribe_on call, even though it is called later in the chain. The first map operator runs in this execution context. Then, a first call to observe_on changes this execution context (the empty triangles). The second map operator runs in this second execution context. Finally, observe_on again changes the execution context (the empty diamonds) before the final observable is emitted.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.214.27