Error management

The audio transcode server is now feature-complete. But unfortunately, it is not very robust for a server that should be able to run for months without issues. This can be seen easily if, instead of an MP3 file, a WAV or FLAC file is provided, or if the S3 server is not started when a transcode request occurs. In these cases, an error is raised, and the server stops.

Chapter 4Exploring Observables and Observers, explained several ways to handle errors. The time has come to see how to apply them in a real application. There are two main ways to handle errors in RxPY—with the on_error handler, or with dedicated operators. The error handler is already used in all custom observables and this allows you to propagate the errors. However, there are several kinds of error that should be handled gracefully. Typically, if an audio-encoding error occurs, then an HTTP error should be returned to the client. The server should not stop working.

So the behavior of the application should be updated as shown in the following diagram. This diagram does not contains the initialization, since it does not change here:

Figure 8.8: Handling errors

In the event of an error, the error must be caught, and an HTTP error must be returned to the client, by passing any other action that is done in the normal way. For example, if a transcode error occurs, then the HTTP error response must be sent immediately, without trying to upload something. In imperative programming, this is usually done with decision statements such as if and else. In ReactiveX, the situation is different: the whole behavior relies on a chain of observables. If any error occurs in one observable, it is propagated and the whole data flow stops. 

So, with the current implementation of the drivers, raising an error does not permit you to handle the error only for the request that failed. An additional layer of abstraction is needed to do this. Figure 1.8 in Chapter 1, An Introduction to Reactive Programming, shows how higher-order observables carry other observables. This is the way errors can be managed per request. The transcode and S3 drivers must return higher-order observables instead of observables. Instead of sending items with the result of the response, each response is an observable that either emits one item (with the response value) or completes on error.

Let's do these changes in the encode server. First, an error item must be defined:

EncodeError = namedtuple('UploadReponse', ['id', 'key'])

And then the result of the encoding must be changed to emit observables instead of items:

elif type(item) is EncodeMp3:
try:
encoded_data = mp3_to_flac(
item.data, samplerate, bitdepth)
observer.on_next(Observable.just(
EncodeResult(id=item.id, key=item.key, data=encoded_data)))
except:
observer.on_next(Observable.throw(
Exception(EncodeError(
key=item.key,
id=item.id))))

The whole encoding action is enclosed in a try block. If no exception is raised, then an observable containing only EncodeResult is sent. However, if an exception is raised, then an observable without an item, but throwing an EncodeError, is returned. Now the response observable is a higher-order observable, it returns observables as items.

Correctly handling the errors is more complex than directly using the catch_exception operator. The catch_exception operator forwards items directly, and when an error is received, it emits items of another observable, provided as a parameter. Here, the items of the encode response source observable must always be forwarded, and any error must be replaced by an item emitted on the response sink of the HTTP driver.

This behavior must be implemented in several steps. First, a higher-order function is needed. This function allows you to route an item on one of two observables, depending on whether an error occurred or not:

def make_error_router():
sink_observer = None

def on_subscribe(observer):
nonlocal sink_observer
sink_observer = observer

def route_error(item, convert):
def catch_item(i):
sink_observer.on_next(convert(i))
return Observable.empty()

return item.catch_exception(catch_item)

return Observable.create(on_subscribe), route_error

Let's start with the description of the route_error function. This function either returns the item provided as input or sends an item to another observable. Note that the item parameter here is in fact an observable. The items sent on sink_observer are converted before being emitted. This allows you to map the errors to other types of objects. Then the make_error_function wraps the route_error function to return two parameters: an observable that emits error items, and the route_error function that must be used in an operator chain to handle errors. This function can be used in the following way, at the beginning of the audio encoder component:

def audio_encoder(sources):
http_encode_error, route_encode_error = make_error_router()

The http_encode_error observable must be added to the list of streams returned to the HTTP response sink:

http = Observable.merge(http_init, http_response, http_encode_error)

However, the route_encode_error function is not usable directly. Since the results of the encode driver are higher-order observables, these items should be flattened at some point to continue the chain with EncodeResult items. So the flat_map operator must be used, in combination with the route_encode_error function. Wrapping these in another higher-order function will make its usage easier:

def catch_or_flat_map(source, error_map, error_router):
return source.flat_map(lambda i: error_router(i, error_map))

The catch_or_flat_map function takes three parameters. source: a higher-order observable, the mapping function to apply to errors, and the error-routing function returned by make_error_router. The result of this function is a flattened observable of the items received as input. Any error will be routed to another observable, via the error_router function. Using this function is now easy, thanks to the let operator:

    store_requests = (
sources.encoder.response
.let(catch_or_flat_map,
error_router=route_encode_error,
error_map=lambda i: httpd.Response(
data='encode error'.encode('utf-8'),
context=i.args[0].id,
status=500))
.observe_on(s3_scheduler)
.map(lambda i: s3.UploadObject(
key=i.key + '.flac',
data=i.data,
id=i.id,
))
)

In this case, any encode error is mapped to an HTTP response, with status 500 (internal error), and help text. Non-error items are forwarded in the chain, and provided to the S3 driver so that they can be uploaded.

This whole code is non-trivial to comprehend, because it is composed of several higher-order functions and higher-order observables. Take some time to read it several times and add some logs to see what happens on each step to get it. This code, with the error handling of the S3 driver, is available in the GitHub repository of this book, in the ch8/audio-encode-server-5 sub-folder.

With these last evolutions, the audio transcoding server is now fully reactive, and asynchronous, and handles errors correctly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.131.10