The observable cycle issue

What is the observable cycle issue? The observable cycle issue is a natural consequence of flow-based programming. Flow-based programming, as well as ReactiveX programs, consist of defining a directed graph of streams/observables. There are no constraints on the structure of this graph. So, the final code may consist of either a directed acyclic graph or a directed cyclic graph. Some definitions should clarify what the differences between these graphs are:

  • A graph is a structure composed of nodes that are connected to each other by edges
  • A directed graph is a graph whose edges have a direction
  • An acyclic graph is a graph in which it is not possible to start traversing it from a node and looping back to it during traversal
  • A cycle graph is a graph in which, when traversing it from a node, it is possible to loop back to this node during traversal

The following diagram shows two simple examples of a directed cycle graph and a directed acyclic graph:

Figure 3.1: A directed cyclic graph on the left, and a directed acyclic graph on the right

All the ReactiveX examples that have been used up to now in this book were directed acyclic graphs of observables. In order to understand what a directed cyclic graph of observables is, let's consider two components, A and B. Both components accept an observable as input and return an observable as output. The output observable of component A is the input of component B, and the output of component B is the input of component A. In other words, these components are inter-dependent. The following figure shows this:

Figure 3.2: A cycle between two components

This is a very common situation, which most of the time occurs with several components forming a cycle. Let's try to implement it:

from rx import Observable

def component_a(input):
return input.map(lambda i: i*2)

def component_b(input):
input.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
return Observable.from_([1, 2, 3])

b_out = component_b(???)
a_out = component_a(b_out)

The implementation of the two components as two functions is straightforward. Component A is just a wrapper on the map operator and multiplies each item by 2. Component B subscribes to its input to print each item and returns a sequence of integers. The issue occurs when we need to connect both components together. How can we set the input of component B to the input of component A? The good news is that with Python—and this example—it is as simple as this:

b_out = component_b(a_out)
a_out = component_a(b_out)

However, there are more complex cases in which this kind of construct is not possible. In these cases, something is needed to decouple the output of component A from the input of component B while still being able to connect them together. This can be done by implementing something which acts both as an observable and an observer. This object could be passed as an input of component B, and then the output observable of component A could subscribe to this object. This is exactly the purpose of Subject.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.107.25