This chapter will show us how to create custom operators and custom schedulers. We will cover some advanced techniques, such as Interactive Extensions (Ix) and event sourcing with Reactive Extension (Rx).
Here is a short list:
Pattern<T>
In the Rx framework, there are multiple extension methods that help us transform a sequence into another one, subscribe to a sequence, or create a new sequence from other objects or from scratch.
In classic .NET development, all these methods are simply functions, because they almost always provide a result. In Reactive Programming, any function that returns an observable sequence is an operator. There are transforming operators, creational operators, diagnostic operators, and so on.
Usually, we use operators from the Observable
helper class or by using other helper classes from the official Rx library set available throughout the NuGet package explorer. When we cannot find the right operator, or when we want to improve an operator already available, maybe, by adding new overloads or changing its implementation, we can create a new operator by creating an extension method that supports the generic pattern. With this design, we will be able to reuse our operators in the future in all our projects or redistribute them within libraries or components.
Another great usage of custom operators is composing other operators to behave as desired. Usually, the goal of adding an overload to an already available operator or adding a new composite operator or other operators is to make it easier to use.
As the last choice, we may create a custom operator from scratch using LINQ or other features of the CLR together with the low level Rx elements, such as IObservable
, IObserver
, or Subject
.
Usually, making use of operator composition reduces the chances of releasing bugs and unwanted behaviors.
In the following example, we will create an operator to source from a generic IEnumerable<T>
variable.
To create a similar operator, we will use a Create
operator. Here is the code:
static void Main(string[] args) { //1000 items to source from var items = Enumerable.Range(0, 1000) .Select(x => { //raise an exception on item #400 //within VS the debugger will stop the execution as the exception bubbles //simply press F5 again to continue bubble the exception to Rx sequence if (x == 400) throw new ArgumentException("The item #400 has been sourcing"); return x; }); //invoke our custom operator var sequence = items.AsObservable(); //output value and metadata sequence.Materialize().Subscribe(x => Console.WriteLine("-> {0}", x)); Console.ReadLine(); } } public static class RxOperators { public static IObservable<T> AsObservable<T>(this IEnumerable<T> source) { return Observable.Create<T>(observer => { foreach (var item in source) try { observer.OnNext(item); } catch (Exception ex) { observer.OnError(ex); break; } observer.OnCompleted(); return Disposable.Empty; }); } }
The AsObservable
operator we created in the preceding example sources data from a generic enumerable with the for-each
clause. This means that we will never materialize the whole collection in the memory. Instead, we will stream data, as this is available at the enumerator, routing data into the messages available to the returning sequence. Together, we will eventually handle generated exceptions by routing these as error messages. At the end, we will signal the completion by routing the proper message. Regarding this implementation, we don't need to implement the disposal of the observer. Wherever we implement this feature, we can use the Disposable.Create
method to write an action to stop flowing useless messages to an observer that has just been disposed.
Consider that when raising an exception in the Observer
implementation instead of the sourcing sequence implementation, the behavior is different. In the preceding example, we handled errors from the source. This means that we will route properly only these kind of errors. Otherwise, by raising an exception within the observer
implementation, observer
will simply cause the exit from the underlying Create
implementation, stopping its execution immediately. Carefully, let this happen because this is an error hiding (anti-pattern) case.
When network programming, there are multiple high-level systems that may help us create amazing applications. Web services (WCF or ASMX), binary remote services (WCF, Remoting), Enterprise Services (COM+), RESTful services and API (WebApi, WCF, MVC), OData services, queues (MSMQ, Azure Service Bus Queue, Storage Queue), and so on use it. Often, when dealing with IoT devices, we need work at a lower level acting as a network socket server.
Consider that, within Microsoft Azure, we can deal with IoT devices with the IotHub that already supports a lot of standard IoT protocols, such as MQTT. When a protocol is not yet implemented, we can still develop a custom protocol for the IotHub gateway. This lets the hub work with our protocol without having to redefine the whole system's architecture.
In this case, we can use the CLR TcpListener
class that helps us create a socket server with a few code lines with a good level of scalability and reliability (if properly used). Obviously, this choice is available in Rx programming too, but we need to change the application's design a bit to work in the reactive way.
When programming TcpListener
in a reactive programming way, we need to route something from the listener. At the lowest level, we can only route the client connection as a message; we will route any TcpClient
connecting to our listener. To route the TcpClient
messages, we will write a reusable operator named AcceptObservableClient
.
Here is the code:
//the extension method must be put in a static class public static IObservable<TcpClient> AcceptObservableClient(this TcpListener listener) { //start listening with a 4 clients buffer backlog listener.Start(4); return Observable.Create<TcpClient>(observer => { while (true) { //accept newly clients from the listener var client = listener.AcceptTcpClient(); //route the client to the observer //into an asynchronous task to let multiple clients connect altogether Task.Factory.StartNew(() => observer.OnNext(client), TaskCreationOptions.LongRunning); } //mandatory to comply with the .Create action signature return Disposable.Empty; }); }
The operator will start listening for a remote connection with the help of a TcpListener
object. Then, each client will flow out as a message within a sequence executing into an asynchronous Task
, letting other clients connect to the same listener.
Now, we will see how to use this operator. Here is the code:
//convert a TcpListener into an observable sequence on port 23 (telnet) var tcpClientsSequence = TcpListener.Create(23) .AcceptObservableClient(); //subscribe to newly remote clients var observer = tcpClientsSequence.Subscribe(client => { //remote endpoint (IP:PORT) var endpoint = client.Client.RemoteEndPoint as IPEndPoint; Console.Write("{0} -> ", endpoint); //get the remote stream using (var stream = client.GetStream()) while (true) { //read bytes until available var b = stream.ReadByte(); if (b < 0) break; else Console.Write((char)b); } Console.WriteLine(); Console.WriteLine("{0} -> END", endpoint); Console.WriteLine(); });
The preceding code is pretty easy. We will create a new TcpListener
variable on port 23
, and with the help of our newly created operator, we will obtain a sequence of TcpClients
.
Now, we can subscribe to the remote client connection in the usual way. In this implementation, we will simply get access to the remote stream (we're using the stream as a read-only stream, but it accepts writes too) and we will wait until the end of the stream by evaluating the read byte value.
Now, that we created the AcceptObservableClient
operator, to create a complete socket server in Reactive Programming, we simply have to go a bit higher in the level of our design. AcceptObservableClient
flows out very low level network programming objects (TcpClient
), while we need an operator that flows out data messages. As a general choice, we will use Byte
, as it is the only reusable data message that can contains any kind of other messages. In real world implementations, we will use specific business messages instead of general purpose Byte
type messages.
We need to create another operator, the AsNetworkByteSource
, which will route messages from the underlying TcpClient
sequence into another sequence (of Byte
) with a different granularity, because a single TcpClient
variable flows out multiple or no bytes
. This means that routing 10 TcpClients
means not routing 10 bytes
(usually more).
The operator will route the KeyValuePair<IPEndPoint, byte>
messages to give us the ability to correlate multiple bytes of the same source.
Here is the code:
public static IObservable<KeyValuePair<IPEndPoint, byte>> AsNetworkByteSource(this IObservable<TcpClient> source) { return Observable.Create<KeyValuePair<IPEndPoint, byte>>(observer => { using (var innerObserver = source.Subscribe(client => { using (var stream = client.GetStream()) while (true) { var b = stream.ReadByte(); if (b < 0) break; else observer.OnNext(new KeyValuePair<IPEndPoint, byte>(client.Client.RemoteEndPoint as IPEndPoint, (byte)b)); } })) { //dispose the innerObserver when completes } observer.OnCompleted(); //mandatory to comply with the .Create action signature return Disposable.Empty; }); }
The code is similar to the one that we have already seen in the previous operator usage. The difference is that, here, we wrapped the TcpClient
usage within a Create
operator to flow out different messages.
Here is the usage code:
//convert a TcpListener into an observable sequence on port 23 (telnet) var tcpClientsSequence = TcpListener.Create(23) .AcceptObservableClient() .AsNetworkByteSource(); Console.WriteLine("Subscribing..."); var bytesObserver = tcpClientsSequence.Subscribe(x => { Console.WriteLine("{0} -> {1} ({2})", x.Key, x.Value, (char)x.Value); });
With this example, we can see that the usage complexity has been heavily reduced. We don't have to deal with low level TcpClient
objects because we have already received bytes within our sequence together with the source IP and port detail.
Now, we need to abstract more of our code to deal with higher level messages. In the next example, we will consider a text row as a single message. This means that we need to move from per-byte messaging to per-row messaging without losing the client session isolation. To accomplish this task, we need the following flowchart:
TcpListener
.TcpClient
messaging into byte messaging.For a better understanding, here is the simplified flowchart:
Here is the complete example:
//convert a TcpListener into an observable sequence on port 23 (telnet) var tcpClientsSequence = TcpListener.Create(23) .AcceptObservableClient() .AsNetworkByteSource(); //map the source message into another with a byte buffer of a single byte var bufferUntileCRLFSequence = tcpClientsSequence .Select(x => new { x.Key, buffer = new[] { x.Value }.AsEnumerable() }) //group by client session IPEndPoint (IP/Port) .GroupBy(x => x.Key); //a crlf byte buffer var crlf = new byte[] { 0x000d, 0x000a }; //subscribe to all nested sequence groups per remote endpoint bufferUntileCRLFSequence.Subscribe(endpoint => { var clientSequence = endpoint //apply an accumulator function to obtain the byte buffer per client //the function will check if the buffer terminates with the CRLF then in the case will create a new buffer otherwise it will concat the previous buffer with the new byte .Scan((last, i) => new { last.Key, buffer = last.buffer.Skip(last.buffer.Count() - 2).SequenceEqual(crlf) ? i.buffer : last.buffer.Concat(i.buffer) }) //wait the CR+LF message to read per row .Where(x => x.buffer.Skip(x.buffer.Count() - 2).SequenceEqual(crlf)); //subscribe to the client sequence clientSequence.Subscribe(row => Console.WriteLine("{0} -> {1}", row.Key, Encoding.ASCII.GetString(row.buffer.ToArray()))); });
In the previous examples, we often saw the use of the Create
operator to start flowing messages based on our needs without the right implementation of the IDisposable
interface to handle multithreaded code because of editorial needs (shortness and readability). In a real-world application, mostly, it is a good idea to provide the right cancellation support to our sequences, although created with a low level Create
operator.
Here is the complete example:
var sequence = Observable.Create<int>(observer => { //a task is required for all time consuming activities Task.Factory.StartNew(() => { for (int i = 0; i < 100; i++) { //lot of CPU time Thread.SpinWait(10000000); //diagnostic output Debug.WriteLine(string.Format("Flowing value: {0}", i)); //flow out a message observer.OnNext(i); } observer.OnCompleted(); }); return Disposable.Empty; }); var subscription = sequence.Subscribe(x => Console.WriteLine(x)); //wait 5 seconds Thread.Sleep(1000); //kill the subscription subscription.Dispose(); Console.ReadLine();
The preceding example shows that although we can stop the subscription as we wish (effectively the observer stops receiving messages), the inner loop will continue wasting our resources. In the case of the usage of Task
in the Create
inner implementation, we can use the Task
cancellation to gracefully stop the loop.
Here is an example:
var sequence = Observable.Create<int>(observer => { var cts = new CancellationTokenSource(); var token = cts.Token; var task = Task.Factory.StartNew(() => { for (int i = 0; i < 100; i++) { //raise an exception to stop thread's execution on task cancellation request token.ThrowIfCancellationRequested(); //lot of CPU time Thread.SpinWait(10000000); //diagnostic output Debug.WriteLine(string.Format("Flowing value: {0}", i)); //flow out a message observer.OnNext(i); } }, token); //executes the following action at the subscription disposal return Disposable.Create(() => cts.Cancel()); });
This other implementation of the Create
operator will correctly handle the subscription's disposal by stopping the inner loop from wasting resources.
If we are lazy, we can avoid implementing the Disposable.Create
method by using a Disposable
object from the System.Reactive.Disposable
namespace. The direct substitute to raise a Task
cancellation is CancellationDisposable
:
//raise the token cancellation at the disposal return new CancellationDisposable(cts);
Other usable disposables are as follows:
BooleanDisposable
: We can check for the dispose status with a Boolean flagContextDisposable
: Routes the disposal to SynchronizationContext
SerialDisposable
: We can change the underlying disposable object with a new one causing the old one's disposalCompositeDisposable
: Disposes multiple resourcesScheduledDisposable
: We can schedule the disposal within our schedulerRefCountDisposable
: Waits for referred disposables that are already disposed before triggering the disposal of their inner disposable resourcesSingleAssignmentDisposable
: The underlying disposable resource can never changeMultipleAssignmentDisposable
: We can reuse the inner disposable resource in other MultipleAssignmentDisposable
objectsIn the rare case where we have still not found our right disposing design, as in the case where we don't have a loop that lets us check for the Task
cancellation, we can still make a low-level Thread
kill with the usage of the Thread.Abort
method.
Here is an example:
var sequence = Observable.Create<int>(observer => { var thread = new Thread(new ThreadStart(() => { for (int i = 0; i < 100; i++) { //lot of CPU time Thread.SpinWait(10000000); //diagnostic output Debug.WriteLine(string.Format("Flowing value: {0}", i)); Thread.BeginCriticalRegion(); //don't kill me here Thread.EndCriticalRegion(); //flow out a message observer.OnNext(i); } })); thread.Start(); //executes the following action at the subscription disposal return Disposable.Create(() => thread.Abort()); });
As is evident in the previous example, there is the ability to specify a Critical
section to avoid killing Thread
in a potentially dangerous code block. Although we have the ability to specify a Critical
section, killing Thread
is always something potentially dangerous because it may lead to state-drive systems in unwanted states or with data inconsistency that is usually difficult to diagnose.
When designing a custom operator is not enough for our needs, we can write a custom provider similar to what happens with LINQ.
Within the Rx world, we can write a custom provider by implementing the IQbservable
interface. This interface acts in a similar way as the IQueryable
interface of LINQ that exposes a query made by Expression
that may contain any composition of Linq.Expression
in a hierarchical structure that names the Expression
tree.
The creation of a custom provider is outside the scope of this book because it needs deep LINQ knowledge and requires a lot of pages. As a suggestion to anyone wanting to try writing their own custom provider, there are valid examples and already made providers that can behave as a starting point to design the wanted provider.
A custom provider overview (MSDN) with a downloadable example can be found at the following link:
https://msdn.microsoft.com/en-us/library/hh242971(v=vs.103).aspx
The IQbservable
interface (MSDN) is available at:
https://msdn.microsoft.com/en-us/library/system.reactive.linq.iqbservableprovider(v=vs.103).aspx
The IQbservable
over wire (Dave Sexton) can be found at the following link:
http://davesexton.com/blog/post/LINQ-to-Cloud-IQbservable-Over-the-Wire.aspx
3.15.18.198