Chapter 7. Advanced Techniques

This chapter will show us how to create custom operators and custom schedulers. We will cover some advanced techniques, such as Interactive Extensions (Ix) and event sourcing with Reactive Extension (Rx).

Here is a short list:

  • Designing a custom operator
  • Designing a custom scheduler
  • Creating Pattern<T>
  • Event sourcing with Rx
  • Interactive Extensions (Ix)

Designing a custom operator

In the Rx framework, there are multiple extension methods that help us transform a sequence into another one, subscribe to a sequence, or create a new sequence from other objects or from scratch.

In classic .NET development, all these methods are simply functions, because they almost always provide a result. In Reactive Programming, any function that returns an observable sequence is an operator. There are transforming operators, creational operators, diagnostic operators, and so on.

Usually, we use operators from the Observable helper class or by using other helper classes from the official Rx library set available throughout the NuGet package explorer. When we cannot find the right operator, or when we want to improve an operator already available, maybe, by adding new overloads or changing its implementation, we can create a new operator by creating an extension method that supports the generic pattern. With this design, we will be able to reuse our operators in the future in all our projects or redistribute them within libraries or components.

Another great usage of custom operators is composing other operators to behave as desired. Usually, the goal of adding an overload to an already available operator or adding a new composite operator or other operators is to make it easier to use.

As the last choice, we may create a custom operator from scratch using LINQ or other features of the CLR together with the low level Rx elements, such as IObservable, IObserver, or Subject.

Usually, making use of operator composition reduces the chances of releasing bugs and unwanted behaviors.

Designing the AsObservable operator

In the following example, we will create an operator to source from a generic IEnumerable<T> variable.

To create a similar operator, we will use a Create operator. Here is the code:

static void Main(string[] args) 
        { 
            //1000 items to source from 
            var items = Enumerable.Range(0, 1000) 
                .Select(x => 
                { 
                    //raise an exception on item #400 
                    //within VS the debugger will stop the execution as the 
                      exception bubbles 
                    //simply press F5 again to continue bubble the exception to Rx 
                      sequence 
                    if (x == 400) 
                        throw new ArgumentException("The item #400 has been 
                        sourcing"); 
                     
 
                    return x; 
                }); 
 
            //invoke our custom operator 
            var sequence = items.AsObservable(); 
 
            //output value and metadata 
            sequence.Materialize().Subscribe(x => Console.WriteLine("-> {0}", x)); 
 
            Console.ReadLine(); 
        } 
    } 
 
    public static class RxOperators 
    { 
        public static IObservable<T> AsObservable<T>(this IEnumerable<T> source) 
        { 
            return Observable.Create<T>(observer => 
            { 
                foreach (var item in source) 
                    try 
                    { 
                        observer.OnNext(item); 
                    } 
                    catch (Exception ex) 
                    { 
                        observer.OnError(ex); 
                        break; 
                    } 
 
                observer.OnCompleted(); 
                return Disposable.Empty; 
            }); 
        } 
    } 

The AsObservable operator we created in the preceding example sources data from a generic enumerable with the for-each clause. This means that we will never materialize the whole collection in the memory. Instead, we will stream data, as this is available at the enumerator, routing data into the messages available to the returning sequence. Together, we will eventually handle generated exceptions by routing these as error messages. At the end, we will signal the completion by routing the proper message. Regarding this implementation, we don't need to implement the disposal of the observer. Wherever we implement this feature, we can use the Disposable.Create method to write an action to stop flowing useless messages to an observer that has just been disposed.

Tip

Consider that when raising an exception in the Observer implementation instead of the sourcing sequence implementation, the behavior is different. In the preceding example, we handled errors from the source. This means that we will route properly only these kind of errors. Otherwise, by raising an exception within the observer implementation, observer will simply cause the exit from the underlying Create implementation, stopping its execution immediately. Carefully, let this happen because this is an error hiding (anti-pattern) case.

Designing the AcceptObservableClient operator

When network programming, there are multiple high-level systems that may help us create amazing applications. Web services (WCF or ASMX), binary remote services (WCF, Remoting), Enterprise Services (COM+), RESTful services and API (WebApi, WCF, MVC), OData services, queues (MSMQ, Azure Service Bus Queue, Storage Queue), and so on use it. Often, when dealing with IoT devices, we need work at a lower level acting as a network socket server.

Tip

Consider that, within Microsoft Azure, we can deal with IoT devices with the IotHub that already supports a lot of standard IoT protocols, such as MQTT. When a protocol is not yet implemented, we can still develop a custom protocol for the IotHub gateway. This lets the hub work with our protocol without having to redefine the whole system's architecture.

In this case, we can use the CLR TcpListener class that helps us create a socket server with a few code lines with a good level of scalability and reliability (if properly used). Obviously, this choice is available in Rx programming too, but we need to change the application's design a bit to work in the reactive way.

When programming TcpListener in a reactive programming way, we need to route something from the listener. At the lowest level, we can only route the client connection as a message; we will route any TcpClient connecting to our listener. To route the TcpClient messages, we will write a reusable operator named AcceptObservableClient.

Here is the code:

//the extension method must be put in a static class 
public static IObservable<TcpClient> AcceptObservableClient(this TcpListener listener) 
{ 
    //start listening with a 4 clients buffer backlog 
    listener.Start(4); 
 
    return Observable.Create<TcpClient>(observer => 
    { 
        while (true) 
        { 
            //accept newly clients from the listener 
            var client = listener.AcceptTcpClient(); 
            //route the client to the observer 
            //into an asynchronous task to let multiple clients connect 
              altogether   
            Task.Factory.StartNew(() => observer.OnNext(client), 
            TaskCreationOptions.LongRunning); 
        } 
 
        //mandatory to comply with the .Create action signature 
        return Disposable.Empty; 
    }); 
} 

The operator will start listening for a remote connection with the help of a TcpListener object. Then, each client will flow out as a message within a sequence executing into an asynchronous Task, letting other clients connect to the same listener.

Now, we will see how to use this operator. Here is the code:

//convert a TcpListener into an observable sequence on port 23 (telnet) 
var tcpClientsSequence = TcpListener.Create(23) 
    .AcceptObservableClient(); 
 
//subscribe to newly remote clients 
var observer = tcpClientsSequence.Subscribe(client => 
{ 
    //remote endpoint (IP:PORT) 
    var endpoint = client.Client.RemoteEndPoint as IPEndPoint; 
    Console.Write("{0} -> ", endpoint); 
    //get the remote stream 
    using (var stream = client.GetStream()) 
        while (true) 
        { 
            //read bytes until available 
            var b = stream.ReadByte(); 
            if (b < 0) 
                break; 
            else 
                Console.Write((char)b); 
        } 
    Console.WriteLine(); 
    Console.WriteLine("{0} -> END", endpoint); 
    Console.WriteLine(); 
}); 

The preceding code is pretty easy. We will create a new TcpListener variable on port 23, and with the help of our newly created operator, we will obtain a sequence of TcpClients.

Now, we can subscribe to the remote client connection in the usual way. In this implementation, we will simply get access to the remote stream (we're using the stream as a read-only stream, but it accepts writes too) and we will wait until the end of the stream by evaluating the read byte value.

Case study - writing a reactive socket server

Now, that we created the AcceptObservableClient operator, to create a complete socket server in Reactive Programming, we simply have to go a bit higher in the level of our design. AcceptObservableClient flows out very low level network programming objects (TcpClient), while we need an operator that flows out data messages. As a general choice, we will use Byte, as it is the only reusable data message that can contains any kind of other messages. In real world implementations, we will use specific business messages instead of general purpose Byte type messages.

We need to create another operator, the AsNetworkByteSource, which will route messages from the underlying TcpClient sequence into another sequence (of Byte) with a different granularity, because a single TcpClient variable flows out multiple or no bytes. This means that routing 10 TcpClients means not routing 10 bytes (usually more).

The operator will route the KeyValuePair<IPEndPoint, byte> messages to give us the ability to correlate multiple bytes of the same source.

Here is the code:

public static IObservable<KeyValuePair<IPEndPoint, byte>> AsNetworkByteSource(this IObservable<TcpClient> source) 
{ 
    return Observable.Create<KeyValuePair<IPEndPoint, byte>>(observer => 
    { 
        using (var innerObserver = source.Subscribe(client => 
            { 
                using (var stream = client.GetStream()) 
                    while (true) 
                    { 
                        var b = stream.ReadByte(); 
                        if (b < 0) 
                            break; 
                        else 
                            observer.OnNext(new KeyValuePair<IPEndPoint, 
                            byte>(client.Client.RemoteEndPoint
                            as IPEndPoint, (byte)b)); 
                    } 
            })) 
        { 
            //dispose the innerObserver when completes 
        } 
 
        observer.OnCompleted(); 
 
        //mandatory to comply with the .Create action signature 
        return Disposable.Empty; 
    }); 
} 

The code is similar to the one that we have already seen in the previous operator usage. The difference is that, here, we wrapped the TcpClient usage within a Create operator to flow out different messages.

Here is the usage code:

//convert a TcpListener into an observable sequence on port 23 (telnet) 
var tcpClientsSequence = TcpListener.Create(23) 
    .AcceptObservableClient() 
    .AsNetworkByteSource(); 
 
Console.WriteLine("Subscribing..."); 
var bytesObserver = tcpClientsSequence.Subscribe(x => 
{ 
    Console.WriteLine("{0} -> {1} ({2})", x.Key, x.Value, (char)x.Value); 
}); 

With this example, we can see that the usage complexity has been heavily reduced. We don't have to deal with low level TcpClient objects because we have already received bytes within our sequence together with the source IP and port detail.

Now, we need to abstract more of our code to deal with higher level messages. In the next example, we will consider a text row as a single message. This means that we need to move from per-byte messaging to per-row messaging without losing the client session isolation. To accomplish this task, we need the following flowchart:

  1. Flow messaging of remote clients from TcpListener.
  2. Convert TcpClient messaging into byte messaging.
  3. Shape single byte messages into single-byte buffered messages to have the ability to manipulate byte buffers later.
  4. Group byte buffers' messages per client session.
  5. Within the single group subscription, create a client sequence of bytes.
  6. Scan the client sequence to accumulate byte buffers until a CR and LF flows out.
  7. Let flow out-only buffers that are correctly terminated.

For a better understanding, here is the simplified flowchart:

Case study - writing a reactive socket server

The flow chart of the TcpListener demo

Here is the complete example:

//convert a TcpListener into an observable sequence on port 23 (telnet) 
var tcpClientsSequence = TcpListener.Create(23) 
    .AcceptObservableClient() 
    .AsNetworkByteSource(); 
 
//map the source message into another with a byte buffer of a single byte 
var bufferUntileCRLFSequence = tcpClientsSequence 
    .Select(x => new { x.Key, buffer = new[] { x.Value }.AsEnumerable() }) 
    //group by client session IPEndPoint (IP/Port) 
    .GroupBy(x => x.Key); 
 
//a crlf byte buffer 
var crlf = new byte[] { 0x000d, 0x000a }; 
 
//subscribe to all nested sequence groups per remote endpoint 
bufferUntileCRLFSequence.Subscribe(endpoint => 
{ 
    var clientSequence = endpoint 
        //apply an accumulator function to obtain the byte buffer per client 
        //the function will check if the buffer terminates with the CRLF then in the case will create a new buffer otherwise it will concat the previous buffer with the new byte 
        .Scan((last, i) => new { last.Key, buffer = last.buffer.Skip(last.buffer.Count() - 2).SequenceEqual(crlf) ? i.buffer : last.buffer.Concat(i.buffer) }) 
        //wait the CR+LF message to read per row 
        .Where(x => x.buffer.Skip(x.buffer.Count() - 2).SequenceEqual(crlf)); 
 
    //subscribe to the client sequence 
    clientSequence.Subscribe(row =>  
        Console.WriteLine("{0} -> {1}", row.Key, Encoding.ASCII.GetString(row.buffer.ToArray()))); 
}); 

Disposing Create<T>

In the previous examples, we often saw the use of the Create operator to start flowing messages based on our needs without the right implementation of the IDisposable interface to handle multithreaded code because of editorial needs (shortness and readability). In a real-world application, mostly, it is a good idea to provide the right cancellation support to our sequences, although created with a low level Create operator.

Here is the complete example:

var sequence = Observable.Create<int>(observer => 
{ 
    //a task is required for all time consuming activities 
    Task.Factory.StartNew(() => 
    { 
        for (int i = 0; i < 100; i++) 
        { 
            //lot of CPU time 
            Thread.SpinWait(10000000); 
            //diagnostic output 
            Debug.WriteLine(string.Format("Flowing value: {0}", i)); 
            //flow out a message 
            observer.OnNext(i); 
        } 
 
        observer.OnCompleted(); 
    }); 
 
    return Disposable.Empty; 
}); 
 
 
var subscription = sequence.Subscribe(x => Console.WriteLine(x)); 
 
//wait 5 seconds 
Thread.Sleep(1000); 
//kill the subscription 
subscription.Dispose(); 
 
Console.ReadLine(); 

The preceding example shows that although we can stop the subscription as we wish (effectively the observer stops receiving messages), the inner loop will continue wasting our resources. In the case of the usage of Task in the Create inner implementation, we can use the Task cancellation to gracefully stop the loop.

Here is an example:

var sequence = Observable.Create<int>(observer => 
{ 
    var cts = new CancellationTokenSource(); 
    var token = cts.Token; 
 
    var task = Task.Factory.StartNew(() => 
    { 
        for (int i = 0; i < 100; i++) 
        { 
            //raise an exception to stop thread's execution on task cancellation 
             request 
            token.ThrowIfCancellationRequested(); 
 
            //lot of CPU time 
            Thread.SpinWait(10000000); 
            //diagnostic output 
            Debug.WriteLine(string.Format("Flowing value: {0}", i)); 
            //flow out a message 
            observer.OnNext(i); 
        } 
    }, token); 
 
    //executes the following action at the subscription disposal 
    return Disposable.Create(() => cts.Cancel()); 
}); 

This other implementation of the Create operator will correctly handle the subscription's disposal by stopping the inner loop from wasting resources.

If we are lazy, we can avoid implementing the Disposable.Create method by using a Disposable object from the System.Reactive.Disposable namespace. The direct substitute to raise a Task cancellation is CancellationDisposable:

//raise the token cancellation at the disposal 
return new CancellationDisposable(cts); 

Other usable disposables are as follows:

  • BooleanDisposable: We can check for the dispose status with a Boolean flag
  • ContextDisposable: Routes the disposal to SynchronizationContext
  • SerialDisposable: We can change the underlying disposable object with a new one causing the old one's disposal
  • CompositeDisposable: Disposes multiple resources
  • ScheduledDisposable: We can schedule the disposal within our scheduler
  • RefCountDisposable: Waits for referred disposables that are already disposed before triggering the disposal of their inner disposable resources
  • SingleAssignmentDisposable: The underlying disposable resource can never change
  • MultipleAssignmentDisposable: We can reuse the inner disposable resource in other MultipleAssignmentDisposable objects

In the rare case where we have still not found our right disposing design, as in the case where we don't have a loop that lets us check for the Task cancellation, we can still make a low-level Thread kill with the usage of the Thread.Abort method.

Here is an example:

var sequence = Observable.Create<int>(observer => 
{ 
    var thread = new Thread(new ThreadStart(() => 
    { 
        for (int i = 0; i < 100; i++) 
        { 
            //lot of CPU time 
            Thread.SpinWait(10000000); 
            //diagnostic output 
            Debug.WriteLine(string.Format("Flowing value: {0}", i)); 
 
            Thread.BeginCriticalRegion(); 
            //don't kill me here 
            Thread.EndCriticalRegion(); 
 
            //flow out a message 
            observer.OnNext(i); 
        } 
    })); 
 
    thread.Start(); 
 
    //executes the following action at the subscription disposal 
    return Disposable.Create(() => thread.Abort()); 
}); 

As is evident in the previous example, there is the ability to specify a Critical section to avoid killing Thread in a potentially dangerous code block. Although we have the ability to specify a Critical section, killing Thread is always something potentially dangerous because it may lead to state-drive systems in unwanted states or with data inconsistency that is usually difficult to diagnose.

Designing a custom provider

When designing a custom operator is not enough for our needs, we can write a custom provider similar to what happens with LINQ.

Within the Rx world, we can write a custom provider by implementing the IQbservable interface. This interface acts in a similar way as the IQueryable interface of LINQ that exposes a query made by Expression that may contain any composition of Linq.Expression in a hierarchical structure that names the Expression tree.

The creation of a custom provider is outside the scope of this book because it needs deep LINQ knowledge and requires a lot of pages. As a suggestion to anyone wanting to try writing their own custom provider, there are valid examples and already made providers that can behave as a starting point to design the wanted provider.

A custom provider overview (MSDN) with a downloadable example can be found at the following link:

https://msdn.microsoft.com/en-us/library/hh242971(v=vs.103).aspx

The IQbservable interface (MSDN) is available at:

https://msdn.microsoft.com/en-us/library/system.reactive.linq.iqbservableprovider(v=vs.103).aspx

The IQbservable over wire (Dave Sexton) can be found at the following link:

http://davesexton.com/blog/post/LINQ-to-Cloud-IQbservable-Over-the-Wire.aspx

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.18.198