Any class that extends System.IO.Stream
is some kind of cursor-based flow of data. The same happens when we want to see a video stream, a sort of locally not persisted data that flows only in the network with the ability to go forward and backward, stop, pause, resume, play, and so on. The same behavior is available while streaming any kind of data, thus, the Stream
class is the base class that exposes such behavior for any need.
There are specialized classes that extend Stream
, helping work with the streams of text data (StreamWriter
and StreamReader
), binary serialized data (BinaryReader
and BinaryWriter
), memory-based temporary byte containers (MemoryStream
), network-based streams (NetworkStream
), and many others.
Regarding reactive programming, we are dealing with the ability to source events from any stream regardless of its type (network, file, memory, and so on).
Real-world applications that use reactive programming based on streams are cheats, remote binary listeners (socket programming), and any other unpredictable event-oriented application. On the other hand, it is useless to read a huge file in a reactive way, because there is simply nothing reactive in such cases.
It is time to look at an example. Here's a complete example of a reactive application made for listening to a TPC port and routing string messages (CR + LF divides multiple messages) to all the available observers. The program Main
and the usual ConsoleObserver
methods are omitted for better readability:
public sealed class TcpListenerStringObservable : IObservable<string>, IDisposable { private readonly TcpListener listener; public TcpListenerStringObservable(int port, int backlogSize = 64) { //creates a new tcp listener on given port //with given backlog size listener = new TcpListener(IPAddress.Any, port); listener.Start(backlogSize); //start listening asynchronously listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); } private void OnTcpClientConnected(Task<TcpClient> clientTask) { //if the task has not encountered errors if (clientTask.IsCompleted) //we will handle a single client connection per time //to handle multiple connections, simply put following //code into a Task using (var tcpClient = clientTask.Result) using (var stream = tcpClient.GetStream()) using (var reader = new StreamReader(stream)) while (tcpClient.Connected) { //read the message var line = reader.ReadLine(); //stop listening if nothing available if (string.IsNullOrEmpty(line)) break; else { //construct observer message adding client's remote endpoint address and port var msg = string.Format("{0}: {1}", tcpClient.Client.RemoteEndPoint, line); //route messages foreach (var observer in observerList) observer.OnNext(msg); } } //starts another client listener listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); } private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { observerList.Add(observer); //subscription lifecycle missing //for readability purpose return null; } public void Dispose() { //stop listener listener.Stop(); } }
The preceding example shows how to create a reactive TCP listener that acts as an observable
of string messages.
The observable
method uses an internal TcpListener
class that provides mid-level network services across an underlying Socket
object. The example asks the listener to start listening and starts waiting for a client into another thread with the use of a Task
object. When a remote client becomes available, its communication with the internals of observable
is guaranteed by the OnTcpClientConneted
method that verifies the normal execution of Task
. Then, it catches TcpClient
from Task
, reads the network stream, and appends StreamReader
to such a network stream to start a reading feature.
Once the message reading feature is complete, another Task
starts repeating the procedure. Although, this design handles a backlog of pending connections, it makes available only a single client per time. To change such designs to handle multiple connections altogether, simply encapsulate the OnTcpClientConnected
logic. Here's an example:
private void OnTcpClientConnected(Task<TcpClient> clientTask) { //if the task has not encountered errors if (clientTask.IsCompleted) Task.Factory.StartNew(() => { using (var tcpClient = clientTask.Result) using (var stream = tcpClient.GetStream()) using (var reader = new StreamReader(stream)) while (tcpClient.Connected) { //read the message var line = reader.ReadLine(); //stop listening if nothing available if (string.IsNullOrEmpty(line)) break; else { //construct observer message adding client's remote endpoint address and port var msg = string.Format("{0}: {1}", tcpClient.Client.RemoteEndPoint, line); //route messages foreach (var observer in observerList) observer.OnNext(msg); } } }, TaskCreationOptions.PreferFairness); //starts another client listener listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected); }
This is the output of the reactive application when it receives two different connections by using telnet
as a client (C:>telnetlocalhost8081
). The program Main
and the usual ConsoleObserver
methods are omitted for better readability:
As you can see, each client starts connecting to the listener by using a different remote port. This gives us the ability to differentiate multiple remote connections although they connect altogether.
3.148.144.228