RSocket in Java

The RSocket protocol and interaction model found a place and a high demand in Java (along with implementations in C++, JS, Python, and Go) and is implemented on top of Reactor 3. The following programming model is offered by the RSocket-Java module:

RSocketFactory                                                     // (1)
.receive() // (1.1)
.acceptor(new SocketAcceptorImpl()) // (1.2)
.transport(TcpServerTransport.create("localhost", 7000)) // (1.3)
.start() // (1.4)
.subscribe(); //

RSocket socket = RSocketFactory // (2)
.connect() // (2.1)
.transport(TcpClientTransport.create("localhost", 7000)) //
.start() //
.block(); // (2.2)

socket // (3)
.requestChannel( // (3.1)
Flux.interval(Duration.ofMillis(1000)) //
.map(i -> DefaultPayload.create("Hello [" + i + "]")) // (3.2)
) //
.map(Payload::getDataUtf8) // (3.3)
.doFinally(signalType -> socket.dispose()) //
.then() //
.block(); //

Each section of the numbered code is explained here:

  1. This is the Server (receiver) RSocket definition. At point (1.1), we established that we aim to create an RSocket Server, so we want to use the .receive() method here. At point (1.2), we provided the SocketAcceptor implementation, which is the definition of the handlers method called on incoming client connections. In turn, at point (1.3), we defined preferable transport, which is TCP transport in this case. Note that the TCP transport provider is Reactor-Netty. Finally, to start listening to the defined socket address, we start the server and .subscribe() to it.
  2. This is the Client RSocket definition. Here (2.1), instead of the .receive() factory-method, we use .connect(), which provides the client's RSocket instance. For the sake of simplicity in that example, notice that we used the .block() method in order to wait for the successful connection and obtain an instance of active RSocket.
  3. This demonstrates the execution of the request to the server. In this example, we use the channel interaction (3.1), so along with sending the stream of messages, we receive a stream as well. Notice that the default representation of a message in the stream is the Payload class. Hence, at point (3.2), we have to wrap a message into the Payload (in this case, into the default implementation DefaultPayload) or unwrap it (3.3), for example, to String

In the preceding example, we wired duplex communication between the client and server. Here, all communication has been done with the support for the Reactive Streams specification and Reactor 3.

Furthermore, it is important to mention the implementation of SocketAcceptor:

class SocketAcceptorImpl implements SocketAcceptor {               // (1)

@Override // (2)
public Mono<RSocket> accept( //
ConnectionSetupPayload setupPayload, // (2.1)
RSocket reactiveSocket // (2.2)
) {
return Mono.just(new AbstractRSocket() { // (3)
@Override //
public Flux<Payload> requestChannel( // (3.1)
Publisher<Payload> payloads // (3.2)
) { //
return Flux.from(payloads) // (3.3)
.map(Payload::getDataUtf8) //
.map(s -> "Echo: " + s) //
.map(DefaultPayload::create); //
} //
}); //
} //
}

The numeration in the code is explained as follows:

  1. This is the implementation of the SocketAcceptor interface. Note that SocketAcceptor is a representation of the server-side acceptor. 
  2. The SocketAcceptor interface has only one method called accept. This method takes two parameters, including the ConnectionSetupPayload argument (2.1), which represents the first handshake from the client's side during the connection wiring. As we may remember from this section, RSocket is a duplex connection by nature. That nature is represented by the second parameter of the accept method, called sendingRSocket (2.2)Using the second parameter, the server may start streaming requests to the client as if the server was the initiator of the interaction. 
  3. This is the handler RSocket declaration. In this case, the AbstractRSocket class is an abstract implementation of the RSocket interface that emits UnsupportedOperationException for any handling method. Subsequently, by overriding one of the methods (3.1), we can declare which interaction models our server supports. Finally, at point (3.3), we provide the echo functionality, which takes the ongoing stream (3.2) and modifies the incoming messages.

As we can see, the definition of SocketAcceptor does not mean the definition of the handler. In that case, the invocation of the SocketAcceptor#accept method refers to a new incoming connection. In turn, in RSocket-Java, the RSocket interface is a representation of the client and server's handler at the same time. Finally, the communication between parties is peer-to-peer communication, which means that both can handle requests.

Furthermore, to achieve scalability, RSocket-Java offers the RSocket LoadBalancer module, which can be integrated with a service registry such as Eureka. For example, the following code shows naive integration with Spring Cloud Discovery:


Flux
.interval(Duration.ofMillis(100)) // (1)
.map(i ->
discoveryClient
.getInstances(serviceId) // (2)
.stream() //
.map(si ->
new RSocketSupplier(() ->
RSocketFactory.connect() // (3)
.transport( //
TcpClientTransport.create( //
si.getHost(), // (3.1)
si.getPort() // (3.2)
) //
) //
.start() //
) {
public boolean equals(Object obj) { ... } // (4)
//
public int hashCode() { //
return si.getUri().hashCode(); // (4.1)
} //
}
)
.collect(toCollection(ArrayList<RSocketSupplier>::new))
)
.as(LoadBalancedRSocketMono::create); // (5)

In the previous code, the numbered points mean the following:

  1. This is the .interval() operator declaration. Here, the idea is to run periodic retrieving of available instances with some serviceId.
  2. This shows the retrieval of the list of instances.
  1. This is the Supplier from the Mono<RSocket> creation. We use information such as the server host (3.1) and port (3.2)  from each given ServiceInstance to create a proper transport connection.
  2. This is the anonymous RSocketSupplier creation. Here, we override equals and hashCode in order to distinguish which RSocketSupplier are the same. Note, under the hood, LoadBalancedRSocketMono uses HashSet, in which all received instances are stored. In turn, we use URI as a unique identifier of an instance in the group.
  3. This is the stage of transformation of Flux<Collection<RSocketSupplier>> to the LoadBalancedRSocketMono. Note that even though the result is an instance of type Mono the LoadBalancedRSocketMono is a stateful one. Consequently, each new subscriber potentially receives different results. Under the hood, LoadBalancedRSocketMono makes a selection of the RSocket instances using predictive load balancing algorithm and returns selected one to a subscriber.

The previous example shows a naive method of integrating LoadBalancedRScoketMono with DiscoveryClient. Even though the mentioned example is not efficient, we still may learn how to work with LoadBalancedRSocketMono properly.

To generalize, RSocket is a communication protocol that follows Reactive Streams semantics and expands the new horizons for streaming communication across network boundaries with backpressure control support. In turn, there is a powerful Reactor 3 based implementation, which offers a straightforward API for wiring a connection between peers and efficiently utilizes it over the interaction's life cycle.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.212.71