Now, having enough knowledge about the Spring Cloud Function ecosystem, we may get back on topic and see how to use this awesome module. There is an additional module called Spring Cloud Starter Stream App Function, which enables the use of Spring Cloud Function features in Spring Cloud Data Flow. This module allows us to use pure jars and deploy them as part of the Spring Cloud Data Flow without any redundant overhead from Spring Boot. Since we have a plain mapping here, a simplified Validation function can be reduced to a conventional Function<Payment, PaymentValidation> function and can look as follows:
public class PaymentValidator
implements Function<Payment, Payment> {
public Payment apply(Payment payment) { ... }
}
After the packaging and publishing of an artifact, we may be able to write the following stream pipe script to connect our HTTP source to the Spring Cloud Function Bridge:
SendPaymentEndpoint=Endpoint: http --path-pattern=/payment --port=8080 | Validator: function --class-name=com.example.PaymentValidator --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments-validation.jar?raw=true
source.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT
source.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT
processor.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT
processor.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT
sink.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT
sink.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT
Finally, to complete the first part of the process, we have to deliver a validated payment to the different destinations, choosing the endpoint with regards to the result of the validation. Since the validation function is a pure function that should not have access to the infrastructure (such as RabbitMQ routings headers), we should delegate that responsibility elsewhere. Fortunately, Spring Cloud Data Flow offers Router Sink, which allows routing incoming messages to different queues based on an expression such as the following:
... | router --expression="payload.isValid() ? 'Accepted' : 'Rejected'"
Alternatively, we may configure a source that listens to the particular message queue name. For example, pipe script is responsible for listening to the RabbitMQ channel called Accepted, which looks like the following:
...
Accepted=Accepted: rabbit --queues=Accepted
According to the payments flow diagram, the following step of a payment processing persists its status with the Accepted state. In that way, users can visit a particular page with their payments and check the state of the processing for each payment. Hence, we should provide integration with a database. For example, we may store states of the payment's transitions in MongoDB. Spring Cloud Data Flow offers a MongoDB Sink. Using that, we can easily write incoming messages to MongoDB. Relying on Spring Data Flow add-ons, we can broadcast messages to both the MongoDB sink and the next execution step. Such a technique could only be a valid solution in the case of a fully reliable message broker such as Apache Kafka. As we might know, Kafka persists messages. Hence, messages would be available in the message broker even if the execution crashes at some stage. Consequently, MongoDB holds a state intended for use in UI, while the actual processing state is held inside the message broker; therefore, it is available for replay at any point in time. On the other hand, in the case of fast, in-memory message brokers such as RabbitMQ, it would be sufficient to rely on the state stored in MongoDB as a source of truth. Consequently, we must ensure that the Payment's state has been stored before the execution of the next step. Unfortunately, to achieve such functionality, we have to write a custom Spring Cloud Stream application that wraps MongoDB as a processing stage in the process.
By repeating similar operations for the rest of the process, we can achieve the following execution flow:
The preceding flow visualization is represented by the following pipe script:
SendPaymentEndpoint=Endpoint: http --path-pattern=/payment --port=8080 | Validator: function --class-name=com.example.PaymentValidator --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isValid() ? 'Accepted' : 'Rejected'"
Accepted=Accepted: rabbit --queues=Accepted | MarkAccepted: mongodb-processor --collection=payment | Limiter: function --class-name=com.example.PaymentLimiter --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isLimitBreached() ? 'Rejected' : 'Checked'"
Checked=Checked: rabbit --queues=Checked | MarkChecked: mongodb-processor --collection=payment | Approver: function --class-name=com.example.PaymentApprover --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isApproved() ? 'Approved' : 'Rejected'"
Approved=Approved: rabbit --queues=Approved | MarkApproved: mongodb-processor --collection=payment | Executor: function --class-name=com.example.PaymentExecutor --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isExecuted() ? 'Executed' : 'Rejected'"
Executed=Executed: rabbit --queues=Executed | MarkExecuted: mongodb --collection=payment
Rejected=Rejected: rabbit --queues=Rejected | MarkRejected: mongodb --collection=payment
Finally, by deploying that stream, we can execute payment and see the execution logs in the console.
A noticeable point here is that a deployment process is as simple as business logic development. First of all, the Spring Cloud Data Flow toolkit is built on top of Spring Cloud Deployer, which is used for deployment on to modern platforms such as Cloud Foundry, Kubernetes, Apache Mesos, or Apache YARN. The toolkit exposes Java APIs that allow the configuration of an application's source (for instance, Maven repository, artifactId, groupId, and version) and its subsequent deployment to the target platform. Along with that, Spring Cloud Deployer is flexible enough and provides a wider list of configurations and properties, one of which is the number of replicas for deployable instance.
Embracing the mentioned possibilities, Spring Cloud Data Flow provides a one-button click (or one terminal command) deployment with the ability to pass the required configurations and properties.
To summarize, we have started from the foundation of Spring Cloud Streams and finished the story with a powerful abstraction over a few modules. As a result, we have seen that, with the support of those projects, we can build a reactive system by applying different abstractions of reactive programming. The mentioned technique of using a message broker for asynchronous, reliable messaging covers most business needs. Moreover, that technique may reduce the cost of the development of a reactive system in general and may be used for rapid development of a system such as big web stores, IoT, or chat applications. However, it is important to remember that, even though the described approach improves system reliability, scalability, and throughput, it may harm request processing latency due to the additional communication, especially with persistent message brokers. So, if our system tolerates a few milliseconds of additional delay between sending and receiving messages, then such an approach may apply to our case.