Handling exceptions in reactive programming with Rx is something odd. We're used to handling exceptions by writing code that executes when some other code hangs. More precisely, we write multiple execution flows that execute when the application runs properly or badly. The real meaning of exception handling means handling an exception in the usual way of doing something. Exception handling is not error handling.
Although, this being a philosophical definition, we're used to using exception handling to handle unpredictable behaviors.
In Rx, exception handling is identical with the difference that here we don't deal with code rows that execute one by one; we actually deal with the sequences of messages that must continue flowing regardless of whether an exception may invalidate one or multiple messages. In smarter words, the show must go on. Obviously, a single exception must prevent the whole sequence from running anymore.
In other (simpler) words, when we want to handle an exception message, we will define the new sequence that will flow messages in place of the initial source sequence.
Catch
is the main extension method that gives us the ability to handle exceptions in Rx sequences. It simply starts flowing messages from a second source sequence when the first hangs. Kindly consider that in state-driven programming, catching an exception means executing some other code from the one in the Try
clause, while in reactive programming, catching an exception means flowing from another sequence. We can choose to source from a fruitful sequence or an empty one to cause the sequence's premature ending.
Here's a complete example:
var r = new Random(DateTime.Now.GetHashCode()); //an infinite message source of integer numbers running at 10hz var source1 = Observable.Interval(TimeSpan.FromMilliseconds(100)) .Select(x => r.Next(1, 20)) //raise an exception on high values .Select(x => { if (x >= 19) throw new ArgumentException("Value too high"); else return x; }) //a single shared subscription available to all following subscribers .Publish(); //enable the connectable sequence source1.Connect(); //an infinite message source of integer numbers running at 1hz var source2 = Observable.Interval(TimeSpan.FromMilliseconds(1000)) .Select(x => r.Next(20, 40)); //a new sequence that continues with source2 when source1 raise an error var output = source1.Catch(source2) //we want message metadata for out testing purpose .Materialize(); //output all values output.Subscribe(x => Console.WriteLine(x)); //output when source1 raise the error source1.Materialize() .Where(x => x.Kind == NotificationKind.OnError) .Subscribe(x => Console.WriteLine("Error: {0}", x.Exception)); Console.ReadLine();
The preceding example shows the usage of the Catch
extension method; we use it to source from source2
when source1
faults. Because of testing needs, we have to use the Publish
/Connect
pattern to share a single subscription with the source1
sequence from the Catch
operator and from the Materialize
one we use to see when the exception occurs.
An interesting aspect is the availability of another overload that allows the Catch
method to accept a specific exception type to handle:
//specific exception handling var output2 = source1.Catch<int, ArgumentException>(exType => source2);
As mentioned if we want a premature sequence end, we can simply specify an empty continuing sequence as the parameter for the Catch
operator:
//handle excetion and stop flowing messages var output3 = source1.Catch(Observable.Empty<int>());
Real-world experience says that we have to carefully use this method because it may bring your application into unintentional exception hiding logics. This may make your application difficult to debug or in an inconsistent state, as happens in the case where we hide the exception without logging the event.
Similar to the Catch
method, there is the OnErrorResumeNext
method. Although many experienced developers may remember this name that was a specific error handling logic in the old VisualBasic
languages (prior to .NET), the old one from VB was instead identical to the reactive Catch
implementation. The OnErrorResumeNext
method produces a sequence concatenation regardless of whether the first sequence experiences some errors or not. The only exception handling here is that if the first sequence hangs, the second sequence immediately starts flowing messages. However, the Catch
method does not concatenate sourcing sequences. It simply starts flowing messages from another sequence if the first hangs.
The Finally
method gives us the ability to execute some code when a sequence completes regardless of whether it completes with an OnComplete
message or with an OnError
message. Here's a complete example:
var source = Observable.Interval(TimeSpan.FromSeconds(1)) //stops after 5 seconds .TakeUntil(Observable.Return(0).Delay(TimeSpan.FromSeconds(5))); source.Subscribe(x => Console.WriteLine(x)); //log the completion of the source source.Finally(() => Console.WriteLine("END")) //force the Finally sequence to //start working by registering //an empty subscriber .Subscribe(); Console.ReadLine();
In the preceding example, we can see the Finally
method in action. The example is extremely useful because it shows a more reactive-styled programming.
First, we will create a sourcing sequence that must flow messages only for 5
seconds. To specify the timeout, we will use another sequence as the parameter that will fire its only message to stop the main sourcing sequence from flowing. Then, by using the Finally
operator, we will specify Action
that will fire when the sourcing sequence completes.
An interesting aspect of this example is that to force the Finally
sequence to start its job we need to attach a subscriber. To accomplish this, we can simply use the Subscribe
method without passing any concrete subscriber. This happens because the result sequence from the Finally
operator runs in a lazy fashion, waiting for a subscription to exist before doing its job.
A sequence that starts producing messages only when a subscription exists is a Cold
sequence. These are lazy execution sequences. On the other hand, a sequence that produces messages regardless of whether a subscriber exists or not is a Hot
sequence. Although the practical difference when using the two sequence types from other sequences or operators is slightly visible, because all the sequences produce messages once subscribed, the difference when no subscription exists is evident. Another great difference is that, often, the Cold
sequences produce the same message flow for all their subscribers regardless of how many times we subscribe to the sourcing sequence, while Hot
observers usually have their own message flow that will continue flowing regardless of whether a subscription exists or not. This means that a message repetition may never occur, but in case we need it, it is possible with external operators.
Another widely used approach when experiencing unwanted behaviors from external systems or unpredictable functions is the ability to repeat our logic until we get our desired result.
This choice has its pros, such as the ability to avoid unintentional network errors or system low availability. However, the choice has its cons, such as the ability to reduce system response time, increase overall resource usage, and (the terrifying one) the possibility to duplicate data or create inconsistent data stores if we don't properly manage all repeating logics.
The most critical time in a retry logic is when an attempt fails, in other words, when we eventually need to rollback the partially saved data, the partially executed logics, or the partially sent commands (to external systems).
This doesn't mean that the retry logic is wrong in itself. It simply focuses a lot on its usage because it may bring the invisible issues. Here's a complete example (it is better to execute this example without the debugger by pressing Ctrl + F5 ):
//a finite sequence of 5 values var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Take(5) .Select(x => DateTime.Now) .Select(x => { //lets raise some error if (x.Second % 10 == 0) throw new ArgumentException("Wrong milliseconds value"); else return x; }) //restart he sourcing sequence on error (max 2 times) .Retry(2) //materialize to read message metadata .Materialize(); source.Subscribe(x => Console.WriteLine(x)); Console.ReadLine();
The preceding example shows a usage of the Reply
sequence available through the Reply
extension method.
The execution shows that we have to produce 5
messages. If any message has a timestamp with seconds divisible by 10
an exception raises. When an exception reaches the Retry
sequence, this simply closes the subscription and starts another subscription, restarting the counter of 5
messages.
At the end of the sequence construction, although we materialize the Reply
sequence, we will never see the error message. We will simply receive the concatenation of all the messages before and after the error. Obviously, this may bring an unintentional hidden exception that may make it tricky to find unwanted behaviors.
The solution lies in the usage of the Materialize
operator together with a filter that will let us trace only unwanted exceptions when we're running our application in the production stage. This choice will give us the ability to know when an exception occurred with a light additional resource usage. Differently, once we're trying to investigate an exception we already know, we may edit the filter to trace multiple information other than exceptions to help us find the root cause of the investigating exception.
3.143.239.103