Micro-fusion

Micro-fusion is a more complex optimization and is related to runtime optimization and the reuse of shared resources. One good example of micro-fusion is the conditional operator. To understand this problem, let's take a look at the following diagram:

Diagram 4.13. Conditional problem with the truck example

Let's imagine the following situation. The store made an order for n items. After some time, the factory sent the items in a truck to the store. However, to finally arrive at Store B, the truck must go through the Inspection Department and verify that all items are of the expected quality. Unfortunately, some of the items were not carefully packed, and only some of the order arrived at the store. After that, the factory prepared another truck and sent it to the store again. This happened repeatedly until all of the ordered items arrived at the store. Fortunately, the factory realised that they had spent a lot of time and money delivering items through separate Inspection Departments and decided to hire their own local inspector from the Inspection Department (Diagram 4.14):

Diagram 4.14. Solved Conditional overhead with the dedicated inspector on the factory side.

All items can now be verified at the factory and then sent to the store without visiting the Inspection Department.

How does that story correlate with programming? Let's take a look at the following example:

Flux.from(factory)
   .filter(inspectionDepartment)
   .subscribe(store);

Here, we have a similar situation. The downstream subscriber has requested a certain number of elements from the source. While emitting elements through the chain of operators, elements are moving through the conditional operator, which may reject some of the elements. To satisfy downstream demand, the filter operator for each rejected item must execute an additional request(1) call upstream. According to the design of current reactive libraries (such as RxJava or Reactor 3), the request operation has its own additional CPU overhead.

According to David Karnok's research, each "... call to request() usually ends up in an atomic CAS loop, costing 21-45 cycles for each dropped element."

This means that conditional operators, such as the filter operator, may have a significant impact on the overall performance! For that reason, there is a type of micro-fusion called ConditionalSubscriber. This type of optimization allows us to verify the condition right on the source side and send the required amount of elements without additional request calls.

The second type of micro-fusion is the most complicated one. This fusion is related to asynchronous boundaries between operators, which were mentioned in Chapter 3Reactive Streams - the New Streams' Standard. To understand the problem, let's imagine an operators chain with a few asynchronous boundaries, as in the following example:

Flux.just(1, 2, 3)
   .publishOn(Schedulers.parallel())                                 // (1)
   .concatMap(i -> Flux.range(0, i)
.publishOn(Schedulers.parallel())
) // (2) .subscribe();

The previous example shows the Reactor's operators chain. This chain includes two asynchronous boundaries, which means that the queue should appear here. For example, the nature of the concatMap operator is that it may potentially produce n elements on each incoming element from the upstream. Thus, it is impossible to predict how many elements will be produced by internal Fluxes. To handle backpressure and avoid overwhelming the consumer, it is necessary to put the result into the queue. The publishOn operator also requires an internal queue to transfer elements in a Reactive Stream from one worker thread to another. As well as the queues overhead, there are more dangerous request() calls through the asynchronous boundaries. These may cause even more significant memory overhead. To understand the problem, let's take a look at the following diagram:

Diagram 4.15. Async boundaries overhead without optimization

The previous example expands the internal behavior of the earlier code snippet. Here, we have a massive overhead in the internals of concatMap, where we are required to send a request for each inner stream until the downstream demand is satisfied. Each operator with a queue has its own CAS loop, which in the event of an inappropriate request model may cause significant performance overhead. For example, request(1) or any other amount of elements that is small enough in comparison to the whole amount of data may be considered an inappropriate request model.

CAS (compare-and-swap) is a single operation that returns a value of 1 or 0 depending on the success of the operation. Since we want the operation to succeed, we repeat CAS operations until it is successful. These repetitive CAS operations are called CAS loops.

To prevent memory and performance overhead, we should switch communication protocols, as proposed by the Reactive Streams specification. Supposing that the chain of elements inside the boundary, or boundaries, has a shared queue and switching the whole chain of operators to use the upstream operator as the queue without additional request calls may increase the overall performance significantly. Hence, the downstream may drain values from the upstream until it returns null if the value is not available to indicate the end of the stream. To notify the downstream that elements are available, the upstream calls the downstream's onNext with null values as the specific exclusion for that protocol. Also, error cases or completion of the stream will be notified as usual over onError or onComplete. Thereby, the previous example may be optimized in the following way:

Diagram 4.16. Queue subscription fusion with protocol switching

In this example, the publishOn and concatMap operators may be significantly optimized. In the first case, there are no intermediate operators, which have to be executed in the main thread. Thus, we may directly use the just operator as a queue and pull from it on a separate thread. In the case of concatMap, all inner streams may also be considered as queues so that each stream may be drained without any additional request calls.

We should note that nothing is preventing communication between publishOn and concatMap using an optimized protocol, but at the time of writing, such optimizations were not implemented, so we decided to expose the communication mechanisms as is.

In summary, as we can see from this section, the internals of the Reactor library are even more complicated than it seems at first glance. With powerful optimizations, Reactor has moved far ahead of RxJava 1.x and thereby provides better performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.253.223