Adding traces

Debugging synchronous Python code is traditionally done with the Python debugger (pdb or ipdb). Unfortunately, this tool is almost useless in asynchronous code, and in reactive code. The reason is that in synchronous code when an exception is raised without being caught, the program stops. From that situation, using the debugger or just reading the printed stack trace allows going very quickly to the source of the error. However, with AsyncIO and ReactiveX the situation is different:

  • On AsyncIO, an exception is propagated until the event loop. The event loop does not exit but interrupts the current task and prints the error. The rest of the application continues to run after that.
  • On ReactiveX, an exception is transposed to an error event. So the observable completes on an error, and this error is propagated until an operator catches it.

In both cases, most of the time, the root of the error is not available when the call stack is finally dumped. So the debugger is of little help.

This is why adding traces is one of the few tools directly available in RxPY to see what is going on when debugging an issue. This can be done with the do_action operator, described in Chapter 9, Operators in RxPY. The first possible way to use it is by setting all three callbacks in each call, as can be seen in the following example:

    .do_action(
on_next=lambda i: print("trace: {}".format(i)),
on_error=lambda e: print("trace: {}".format(e)),
on_completed=lambda: print("trace: completed")
)

This snippet, which would be part of an observable chain, prints a trace for each event that occurs on the observable: next, error, and completed. However, using the do_action operator in this way quickly clutters the code, as the number of traces increases. Instead, a specific observer should be provided so that traces can be added in one line.

For this, a custom Observer class is first needed as can be seen in the following example:

from rx import Observer

class TraceObservable(Observer):
def __init__(self, prefix):
self.prefix = prefix

def on_next(self, value):
print("{} - on next: {}".format(self.prefix, value))

def on_completed(self):
print("{} - completed".format(self.prefix))

def on_error(self, error):
print("{} - on error: {}".format(self.prefix, error))

The previous example declares a new TraceObservable class that inherits from the Observer class. The base constructor of Observer is not called because it is an abstract base class, so it does not implement a constructor. The constructor takes a prefix as a input. This prefix is printed on each event received. The three methods (on_next, on_completed, on_error) implement the interface of Observer, by printing the received event.

This class can then be used in the following way:

import json

Observable.just('{ "foo": 2}')
.do_action(TraceObservable("trace 1"))
.map(lambda i: json.loads(i))
.do_action(TraceObservable("trace 2"))
.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error {}".format(e)),
on_completed=lambda: print("completed")
)

In this example, an observable is created from a JSON string. This string is then decoded and the result is printed. Two traces are added: one before the map operator, and another one after. They are prefixed differently. This next example gives the following result:

trace 1 - on next: { "foo": 2}
trace 2 - on next: {'foo': 2}
on_next: {'foo': 2}
trace 1 - completed
trace 2 - completed
completed

As everything is fine, both trace points print the item received, and also the final subscription. Then they all complete.

The following example shows an error added to the JSON string with an extra double quote: 

Observable.just('{ "foo": 2"}')

When we do this, parsing will fail. The example now gives the following result:

trace 1 - on next: { "foo": 2"}
trace 2 - on error: Expecting ',' delimiter: line 1 column 11 (char 10)
on_error Expecting ',' delimiter: line 1 column 11 (char 10)

trace 1 prints the item to parse. Then trace 2 catches the error raised by the map operator, and prints it. Since the final subscription also prints a error, they also display it.

The important point here is that, in a more complex example, the final subscription will probably not print the original error (at best, it is logged). It will rather catch it and do an action, or notify another error on another observable. So, without a tracing point, the original error is lost, and finding its origin can be very difficult.

In such cases, several trace points can be added in the application, so that the error can be found with a dichotomy search (yes, this is not a very fancy way to debug code in the 21st century).

Another thing that can be very useful in such situations is printing the call stack of the error, once again to more easily find where the issue comes from. When errors are raised from exceptions, the call stack is available. So it can be printed in the on_error method of the TraceObservable, as can be seen in the following example:

def on_error(self, error):

if isinstance(error, Exception):
print("{} - on error: {}, {}".format(
self.prefix, error,
traceback.print_tb(error.__traceback__)))
else:
print("{} - on error: {}".format(self.prefix, error))

With this new code, the error is printed in the following way:

trace 1 - on next: { "foo": 2"}
File "/.../select.py", line 31, in on_next
result = selector(value, count[0])
File "/.../utils.py", line 57, in func_wrapped
ret = fn(*args, **kw)
File "/.../utils.py", line 46, in func1
return func(arg1)
File "rxpy_trace.py", line 26, in <lambda>
.map(lambda i: json.loads(i))
File "/.../json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/.../json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/.../json/decoder.py", line 355, in raw_decode
obj, end = self.scan_once(s, idx)
trace 2 - on error: Expecting ',' delimiter: line 1 column 11 (char 10), None
on_error Expecting ',' delimiter: line 1 column 11 (char 10)

The full call stack is printed, and thus we can find where the exception has been raised, that is, in the file "rxpy_trace.py", line 26.

The full code of this example is available in the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of this book, in the rxpy_trace.py script.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.90.182