The NewThread scheduler

The NewThread scheduler spawns new threads to emit items. Depending on the operator being used to control scheduling, it spawns a new thread either for each subscription or for each emitted item. Its prototype is the following one:

NewThreadScheduler.__init__(self, thread_factory=None)

The optional thread_factory argument can be provided as an alternative way to spawn threads. If no thread factory is provided, the scheduler uses the threading.Thread class to spawn new threads and configures them as daemon threads.

The NewThreadScheduler object can be used as a parameter of a factory operator, or as a parameter of the subscribe_on operator to control source observable emissions:

from rx import Observable
from rx.concurrency import NewThreadScheduler
import threading

new_thread_scheduler = NewThreadScheduler()
numbers = Observable.from_([1,2,3,4], scheduler=new_thread_scheduler)

subscription = numbers
.map(lambda i: i*2)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next({}) {}"
.format(threading.get_ident(), i)),
on_error = lambda e: print("on_error({}): {}"
.format(threading.get_ident(), e)),
on_completed = lambda: print("on_completed({})"
.format(threading.get_ident()))
)
print("main({})".format(threading.get_ident())

This example gives the following result:

main(140736035734400)
on_next(123145577897984) number is: 2
on_next(123145577897984) number is: 4
on_next(123145577897984) number is: 6
on_next(123145577897984) number is: 8
on_completed(123145577897984)

The items are all emitted on a dedicated thread (123145577897984), which is different from the main thread of the interpreter (140736035734400). If several subscriptions are made to the numbers observable, then each subscription runs on its own thread.

Another way to use the NewThreadScheduler object is to schedule each emission of an item on a dedicated thread. This is done with the observe_on operator. The following figure shows this behavior:

Figure 5.2: The observe_on operator with NewThreadScheduler

With the observe_on operator, each item is scheduled on a dedicated ephemeral thread. The following code example shows this behavior:

from rx import Observable
from rx.concurrency import NewThreadScheduler
import threading

new_thread_scheduler = NewThreadScheduler()
numbers = Observable.from_([1,2,3,4])

subscription = numbers
.map(lambda i: i*2)
.observe_on(new_thread_scheduler)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next({}) {}"
.format(threading.get_ident(), i)),
on_error = lambda e: print("on_error({}): {}"
.format(threading.get_ident(), e)),
on_completed = lambda: print("on_completed({})"
.format(threading.get_ident()))
)
print("main({})".format(threading.get_ident()))

This new example gives the following result:

main(140736035734400)
on_next(123145577897984) number is: 2
on_next(123145583153152) number is: 4
on_next(123145577897984) number is: 6
on_next(123145583153152) number is: 8
on_completed(123145577897984)

With this code, each item emission after the observe_on call is running on a dedicated thread. In this output example, the thread identifiers of items 4 and 8, as well as 2 and 6, are the same on completion because the interpreter recycles the thread identifiers of closed threads. Nevertheless, they all run a dedicated thread, spawned for each of these items.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.67.16