Exception handling

Handling exceptions in reactive programming with Rx is something odd. We're used to handling exceptions by writing code that executes when some other code hangs. More precisely, we write multiple execution flows that execute when the application runs properly or badly. The real meaning of exception handling means handling an exception in the usual way of doing something. Exception handling is not error handling.

Although, this being a philosophical definition, we're used to using exception handling to handle unpredictable behaviors.

In Rx, exception handling is identical with the difference that here we don't deal with code rows that execute one by one; we actually deal with the sequences of messages that must continue flowing regardless of whether an exception may invalidate one or multiple messages. In smarter words, the show must go on. Obviously, a single exception must prevent the whole sequence from running anymore.

In other (simpler) words, when we want to handle an exception message, we will define the new sequence that will flow messages in place of the initial source sequence.

Catch

Catch is the main extension method that gives us the ability to handle exceptions in Rx sequences. It simply starts flowing messages from a second source sequence when the first hangs. Kindly consider that in state-driven programming, catching an exception means executing some other code from the one in the Try clause, while in reactive programming, catching an exception means flowing from another sequence. We can choose to source from a fruitful sequence or an empty one to cause the sequence's premature ending.

Here's a complete example:

var r = new Random(DateTime.Now.GetHashCode());
 
//an infinite message source of integer numbers running at 10hz 
var source1 = Observable.Interval(TimeSpan.FromMilliseconds(100)) 
    .Select(x => r.Next(1, 20)) 
    //raise an exception on high values 
    .Select(x => 
    { 
        if (x >= 19) 
            throw new ArgumentException("Value too high"); 
        else 
            return x; 
    }) 
    //a single shared subscription available to all following subscribers 
    .Publish(); 
 
//enable the connectable sequence 
source1.Connect(); 
 
//an infinite message source of integer numbers running at 1hz 
var source2 = Observable.Interval(TimeSpan.FromMilliseconds(1000)) 
    .Select(x => r.Next(20, 40)); 
 
//a new sequence that continues with source2 when source1 raise an error 
var output = source1.Catch(source2) 
    //we want message metadata for out testing purpose 
    .Materialize(); 
 
//output all values 
output.Subscribe(x => Console.WriteLine(x)); 
//output when source1 raise the error 
source1.Materialize() 
    .Where(x => x.Kind == NotificationKind.OnError) 
    .Subscribe(x => Console.WriteLine("Error: {0}", x.Exception)); 
 
Console.ReadLine(); 

The preceding example shows the usage of the Catch extension method; we use it to source from source2 when source1 faults. Because of testing needs, we have to use the Publish/Connect pattern to share a single subscription with the source1 sequence from the Catch operator and from the Materialize one we use to see when the exception occurs.

An interesting aspect is the availability of another overload that allows the Catch method to accept a specific exception type to handle:

//specific exception handling 
var output2 = source1.Catch<int, ArgumentException>(exType => source2); 

As mentioned if we want a premature sequence end, we can simply specify an empty continuing sequence as the parameter for the Catch operator:

//handle excetion and stop flowing messages 
var output3 = source1.Catch(Observable.Empty<int>()); 

Real-world experience says that we have to carefully use this method because it may bring your application into unintentional exception hiding logics. This may make your application difficult to debug or in an inconsistent state, as happens in the case where we hide the exception without logging the event.

OnErrorResumeNext

Similar to the Catch method, there is the OnErrorResumeNext method. Although many experienced developers may remember this name that was a specific error handling logic in the old VisualBasic languages (prior to .NET), the old one from VB was instead identical to the reactive Catch implementation. The OnErrorResumeNext method produces a sequence concatenation regardless of whether the first sequence experiences some errors or not. The only exception handling here is that if the first sequence hangs, the second sequence immediately starts flowing messages. However, the Catch method does not concatenate sourcing sequences. It simply starts flowing messages from another sequence if the first hangs.

Finally

The Finally method gives us the ability to execute some code when a sequence completes regardless of whether it completes with an OnComplete message or with an OnError message. Here's a complete example:

var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
    //stops after 5 seconds 
    .TakeUntil(Observable.Return(0).Delay(TimeSpan.FromSeconds(5))); 
 
source.Subscribe(x => Console.WriteLine(x)); 
 
//log the completion of the source 
source.Finally(() => Console.WriteLine("END")) 
    //force the Finally sequence to  
    //start working by registering 
    //an empty subscriber 
    .Subscribe(); 
 
Console.ReadLine(); 

In the preceding example, we can see the Finally method in action. The example is extremely useful because it shows a more reactive-styled programming.

First, we will create a sourcing sequence that must flow messages only for 5 seconds. To specify the timeout, we will use another sequence as the parameter that will fire its only message to stop the main sourcing sequence from flowing. Then, by using the Finally operator, we will specify Action that will fire when the sourcing sequence completes.

An interesting aspect of this example is that to force the Finally sequence to start its job we need to attach a subscriber. To accomplish this, we can simply use the Subscribe method without passing any concrete subscriber. This happens because the result sequence from the Finally operator runs in a lazy fashion, waiting for a subscription to exist before doing its job.

Tip

A sequence that starts producing messages only when a subscription exists is a Cold sequence. These are lazy execution sequences. On the other hand, a sequence that produces messages regardless of whether a subscriber exists or not is a Hot sequence. Although the practical difference when using the two sequence types from other sequences or operators is slightly visible, because all the sequences produce messages once subscribed, the difference when no subscription exists is evident. Another great difference is that, often, the Cold sequences produce the same message flow for all their subscribers regardless of how many times we subscribe to the sourcing sequence, while Hot observers usually have their own message flow that will continue flowing regardless of whether a subscription exists or not. This means that a message repetition may never occur, but in case we need it, it is possible with external operators.

Retry

Another widely used approach when experiencing unwanted behaviors from external systems or unpredictable functions is the ability to repeat our logic until we get our desired result.

This choice has its pros, such as the ability to avoid unintentional network errors or system low availability. However, the choice has its cons, such as the ability to reduce system response time, increase overall resource usage, and (the terrifying one) the possibility to duplicate data or create inconsistent data stores if we don't properly manage all repeating logics.

The most critical time in a retry logic is when an attempt fails, in other words, when we eventually need to rollback the partially saved data, the partially executed logics, or the partially sent commands (to external systems).

This doesn't mean that the retry logic is wrong in itself. It simply focuses a lot on its usage because it may bring the invisible issues. Here's a complete example (it is better to execute this example without the debugger by pressing Ctrl + F5 ):

//a finite sequence of 5 values
var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Take(5) 
    .Select(x => DateTime.Now) 
    .Select(x => 
    { 
        //lets raise some error 
        if (x.Second % 10 == 0) 
            throw new ArgumentException("Wrong milliseconds value"); 
        else 
            return x; 
    }) 
    //restart he sourcing sequence on error (max 2 times) 
    .Retry(2) 
    //materialize to read message metadata 
    .Materialize(); 
 
source.Subscribe(x => Console.WriteLine(x)); 
Console.ReadLine(); 

The preceding example shows a usage of the Reply sequence available through the Reply extension method.

The execution shows that we have to produce 5 messages. If any message has a timestamp with seconds divisible by 10 an exception raises. When an exception reaches the Retry sequence, this simply closes the subscription and starts another subscription, restarting the counter of 5 messages.

At the end of the sequence construction, although we materialize the Reply sequence, we will never see the error message. We will simply receive the concatenation of all the messages before and after the error. Obviously, this may bring an unintentional hidden exception that may make it tricky to find unwanted behaviors.

The solution lies in the usage of the Materialize operator together with a filter that will let us trace only unwanted exceptions when we're running our application in the production stage. This choice will give us the ability to know when an exception occurred with a light additional resource usage. Differently, once we're trying to investigate an exception we already know, we may edit the filter to trace multiple information other than exceptions to help us find the root cause of the investigating exception.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.37.196