Wrapping disposable resources into Reactive Streams

The using factory method allows the creation of a stream depending on a disposable resource. It implements the try-with-resources approach in reactive programming. Let's assume that it is a requirement to wrap a blocking API represented with the following, intentionally simplified, Connection class:

public class Connection implements AutoCloseable {                 // (1)
private final Random rnd = new Random();

public Iterable<String> getData() { // (2)
if (rnd.nextInt(10) < 3) { // (2.1)
throw new RuntimeException("Communication error");
}
return Arrays.asList("Some", "data"); // (2.2)
}

public void close() { // (3)
log.info("IO Connection closed");
}

public static Connection newConnection() { // (4)
log.info("IO Connection created");
return new Connection();
}
}

The preceding code describes the following:

  1. The Connection class manages some internal resources and notifies this by implementing the AutoClosable interface.
  2. The getData method simulates an IO operation, which may sometimes cause an exception (2.1) or return an Iterable collection with useful data (2.2).  
  3. The close method may free up internal resources and should always be called, even after an error has happened during getData execution.
  4. The static newConnection factory method always returns a new instance of the Connection class.

Usually, connections and connection factories have more complex behaviors, but for the sake of simplicity, we are going to use this simple design.

With the imperative approach, we may receive data from a connection with the following code:

try (Connection conn = Connection.newConnection()) {                 // (1)
conn.getData().forEach( // (2)
data -> log.info("Received data: {}", data)
);
} catch (Exception e) { // (3)
log.info("Error: {}", e.getMessage());
}

The preceding code follows these steps:

  1. Use Java's try-with-resources statement to create a new connection and automatically close it when leaving the current code block.
  2. Get and process business data.
  3. In the event of an exception, log the appropriate message.

The reactive equivalent of the previous code would look like the following:

Flux<String> ioRequestResults = Flux.using(                        // (1)
Connection::newConnection, // (1.1)
connection -> Flux.fromIterable(connection.getData()), // (1.2)
Connection::close // (1.3)
);

ioRequestResults.subscribe( // (2)
data -> log.info("Received data: {}", data), //
e -> log.info("Error: {}", e.getMessage()), //
() -> log.info("Stream finished")); //

The preceding code consists of the following steps:

  1. The using factory method allows the association of the Connection instance life-cycle with the life-cycle of its wrapping stream. The using method needs to know how to create a disposable resource; in this case, it is the code that creates a new connection (1.1). Then, the method has to know how to transform the resource that was just created into a Reactive Stream. In this case, we call the fromIterable method (1.2). Last but not least, how do we close the resource? In our case, when the processing is over, the close method of the connection instance is called.
  2. Of course, to start the actual processing, we need to create a subscription with handles for the onNext, onError, and onComplete signals.

The success path for the preceding code generates the following output:

IO Connection created
Received data: Some
Received data: data
IO Connection closed
Stream finished

Execution with a simulated error generates the following output:

IO Connection created
IO Connection closed
Error: Communication error

In both cases, the using operator created a new connection at first, then executed a workflow (successfully or not), and then closed the previously created connection. In this case, the life cycle of the connection is bound to the life-cycle of the stream. The operator also makes it possible to choose whether the cleanup action should happen before informing a subscriber about stream termination or after.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.39.60