Sourcing from CLR streams

Any class that extends System.IO.Stream is some kind of cursor-based flow of data. The same happens when we want to see a video stream, a sort of locally not persisted data that flows only in the network with the ability to go forward and backward, stop, pause, resume, play, and so on. The same behavior is available while streaming any kind of data, thus, the Stream class is the base class that exposes such behavior for any need.

There are specialized classes that extend Stream, helping work with the streams of text data (StreamWriter and StreamReader), binary serialized data (BinaryReader and BinaryWriter), memory-based temporary byte containers (MemoryStream), network-based streams (NetworkStream), and many others.

Regarding reactive programming, we are dealing with the ability to source events from any stream regardless of its type (network, file, memory, and so on).

Real-world applications that use reactive programming based on streams are cheats, remote binary listeners (socket programming), and any other unpredictable event-oriented application. On the other hand, it is useless to read a huge file in a reactive way, because there is simply nothing reactive in such cases.

It is time to look at an example. Here's a complete example of a reactive application made for listening to a TPC port and routing string messages (CR + LF divides multiple messages) to all the available observers. The program Main and the usual ConsoleObserver methods are omitted for better readability:

    public sealed class TcpListenerStringObservable : IObservable<string>, IDisposable 
    { 
        private readonly TcpListener listener; 
        public TcpListenerStringObservable(int port, int backlogSize = 64) 
        { 
            //creates a new tcp listener on given port 
            //with given backlog size 
            listener = new TcpListener(IPAddress.Any, port); 
            listener.Start(backlogSize); 
 
            //start listening asynchronously 
            listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); 
        } 
 
        private void OnTcpClientConnected(Task<TcpClient> clientTask) 
        { 
            //if the task has not encountered errors 
            if (clientTask.IsCompleted) 
                //we will handle a single client connection per time 
                //to handle multiple connections, simply put following 
                //code into a Task 
                using (var tcpClient = clientTask.Result) 
                using (var stream = tcpClient.GetStream()) 
                using (var reader = new StreamReader(stream)) 
                    while (tcpClient.Connected) 
                    { 
                        //read the message 
                        var line = reader.ReadLine(); 
 
                        //stop listening if nothing available 
                        if (string.IsNullOrEmpty(line)) 
                            break; 
                        else 
                        { 
                            //construct observer message adding client's remote
                            endpoint address and port 
                            var msg = string.Format("{0}: {1}",
                             tcpClient.Client.RemoteEndPoint, line); 
 
                            //route messages 
                            foreach (var observer in observerList) 
                                observer.OnNext(msg); 
                        } 
                    } 
 
            //starts another client listener 
            listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); 
        } 
 
        private readonly List<IObserver<string>> observerList = new 
        List<IObserver<string>>(); 
        public IDisposable Subscribe(IObserver<string> observer) 
        { 
            observerList.Add(observer); 
 
            //subscription lifecycle missing 
            //for readability purpose 
            return null; 
        } 
 
        public void Dispose() 
        { 
            //stop listener 
            listener.Stop(); 
        } 
    } 

The preceding example shows how to create a reactive TCP listener that acts as an observable of string messages.

The observable method uses an internal TcpListener class that provides mid-level network services across an underlying Socket object. The example asks the listener to start listening and starts waiting for a client into another thread with the use of a Task object. When a remote client becomes available, its communication with the internals of observable is guaranteed by the OnTcpClientConneted method that verifies the normal execution of Task. Then, it catches TcpClient from Task, reads the network stream, and appends StreamReader to such a network stream to start a reading feature.

Once the message reading feature is complete, another Task starts repeating the procedure. Although, this design handles a backlog of pending connections, it makes available only a single client per time. To change such designs to handle multiple connections altogether, simply encapsulate the OnTcpClientConnected logic. Here's an example:

private void OnTcpClientConnected(Task<TcpClient> clientTask) 
{ 
    //if the task has not encountered errors 
    if (clientTask.IsCompleted) 
        Task.Factory.StartNew(() => 
            { 
                using (var tcpClient = clientTask.Result) 
                using (var stream = tcpClient.GetStream()) 
                using (var reader = new StreamReader(stream)) 
                    while (tcpClient.Connected) 
                    { 
                        //read the message 
                        var line = reader.ReadLine(); 
 
                        //stop listening if nothing available 
                        if (string.IsNullOrEmpty(line)) 
                            break; 
                        else 
                        { 
                            //construct observer message adding client's remote
                              endpoint address and port 
                            var msg = string.Format("{0}: {1}",
                            tcpClient.Client.RemoteEndPoint, line); 
 
                            //route messages 
                            foreach (var observer in observerList) 
                                observer.OnNext(msg); 
                        } 
                    } 
            }, TaskCreationOptions.PreferFairness); 
 
    //starts another client listener 
    listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); 
} 

This is the output of the reactive application when it receives two different connections by using telnet as a client (C:>telnetlocalhost8081). The program Main and the usual ConsoleObserver methods are omitted for better readability:

Sourcing from CLR streams

The observable routing events from the telnet client

As you can see, each client starts connecting to the listener by using a different remote port. This gives us the ability to differentiate multiple remote connections although they connect altogether.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.148.144.228