Message aggregation within a composite

A typical messaging requirement is to aggregate multiple related messages for processing within a single BPEL process instance. There are two parts to this recipe; the first is to route related messages through to the same instance of a BPEL process. This can be achieved using a correlation set defined against a common value present in each message.

The second is to determine when we have all the messages that belong to the aggregation. Typically, most use cases fall into two broad patterns:

  • Fixed duration: In this scenario, we don't know how many messages we expect to receive, so we will process all those received within a specified period of time.
  • Wait for all: In this scenario, we know how many messages we expect to receive. Once they have been received, we can process them as an aggregated message. It's usual to combine this with a timeout, so the process doesn't wait forever, if some messages aren't received.

An example of the first pattern is an order aggregation process, whereby we aggregate the orders we receive for a particular book over a period of time (for example, 24 hours) and then place a single order with the publisher for the total amount of orders received.

Getting ready

This recipe makes use of the new aggregation feature in Oracle SOA Suite 11gR1 Patch Set 5 (11.1.1.6.0), so you need to ensure that you have installed either this or a later release of the Oracle SOA Suite.

Create an SOA application with a project containing an empty composite (named WarehouseService in the following example).

How to do it...

  1. Drag a BPEL process from the SOA Component Palette onto our composite. This will launch the Create BPEL Process wizard. Specify an appropriate name (WarehouseService, in the following screenshot), and for the template, select Base on a WSDL.
  2. Click on the Find Existing WSDLs icon. This will launch the SOA Resource Browser. Browse the filesystem to select the WSDL that you wish to import and click on OK.

    Note

    Ideally, the WSDL should define a one-way asynchronous operation that we will use to implement our aggregation service. While synchronous operations are supported, they are not recommended and should be avoided.

    For the purpose of following this example, select the WarehouseService_1.0.wsdl node included with the samples for this chapter.

    Ensure Warehouse Service is selected as the Port Type, and click on OK. This will add BPEL Process Warehouse Service to our composite.

  3. In the SOA Composite Editor, select the BPEL process service component, as shown next:
    How to do it...
  4. In the Property Inspector pane in the lower-right corner of Oracle JDeveloper, click on the Add icon (circled in the following screenshot).
    How to do it...
  5. This will open the Create Property dialog. In the Name field, enter bpel.config.reenableAggregationOnComplete, and in the Value field, enter true; then, click on OK.

    Note

    If the Property Inspector pane is not displayed, select Property Inspector from the View menu bar in JDeveloper.

  6. Next, we need to create and initialize a While loop to process our messages. Create an xsd:boolean variable named orderComplete and use an Assign activity to set it to false().

    Create a variable named waitUntil, of type xsd:dateTime, and use an Assign activity to set it to the following value:

    xp20:add-dayTimeDuration-to-dateTime(xp20:current-dateTime(), 'PT1H')
  7. Next, drag a While activity onto the BPEL process (after the Assign activity) and set its loop condition to the following value:
    $orderComplete = false()
  8. Then, drag a Pick activity onto the While activity, double-click on the OnMessage icon to open the Edit OnMessage window.

    For Partner Link, select the same partner link used by the initial receive activity (warehouseservice_client, in this example), and select the same operation (submitBookOrder).

    Click on the auto-create variable (the plus icon) to launch the Create Variable window and give the variable a meaningful name (for example, nextBookOrder). Next, click on OK.

  9. Now we need to add a step to aggregate the book orders. Drag an Assign activity onto the OnMessage activity.

    Add an expression to increment the quantity of the original order received, by the quantity of the new order; in other words:

    origOrder/quantity = origOrder/quantity + nextOrder/quantity

    The XPath expression, should look something like the following:

    $inputVariable.payload.ns2:bookOrder/ns2:quantity + $nextBookOrder.payload.ns2:bookOrder/ns2:quantity
  10. Select the Pick activity and click on the Add onAlarm icon. Double-click on the onAlarm icon to open the Edit onAlarm window.

    Set the first radio button to Until, and set the second radio button to Expression; then, set the expression value to be $waitUntil.

  11. Next, drag an Assign activity onto the OnAlarm activity, and assign the value true() to $orderComplete.
  12. Within the Structure view for the BPEL process, right-click on the Properties folder and select Create Property…, as shown in the following screenshot:
    How to do it...

    This will launch the Create CorrelationSet Property window. Give the property a meaningful name (for example, isbn), and then click on the search icon to launch the Type Chooser window and select the appropriate schema type (for example, xsd:string).

  13. Click on the Create Property Alias… icon, circled in the following screenshot:
    How to do it...

    This will open the Property Alias window. In the Type Explorer window, expand the WarehouseService_1.0.wsdl node (under the Project WSDL Files folder). Next, expand the Message Types folder and select Part - payload (under submitBookOrder) as highlighted in the following screenshot:

    How to do it...
  14. In the Query field, enter the XPath location of the ISBN in the submitBookOrder message, which is:
    /tns: ns1:bookOrder/ns1:isbn

    Then click on OK.

  15. Within the Structure view for the BPEL process, expand the Correlation Sets folder, and then expand the Process folder. Then, right-click on the Correlation Sets folder and select Create Correlation Set…, as shown in the following screenshot:
    How to do it...

    This will launch the Create Correlation Set window; give the correlation set a meaningful name, for example, isbnCS.

    How to do it...
  16. Next, click on the plus icon to launch the Property Chooser window, and select the isbn property created in step 12.
  17. Now we need to initialize the correlation set. Within the BPEL Editor, double-click on the receiveInput activity to open the Edit Receive window, and select the Correlations tab.
  18. Click on Create Correlation… ( the plus icon circled in the following screenshot). This will add an empty correlation to the Receive activity.
  19. For the Correlation Set field, select isbnCS from the drop-down list. Next, select Yes from the Initiate field dropdown, as shown in the following screenshot:
    How to do it...
  20. Within the BPEL Editor, double-click on the onMessage activity to open the Edit onMessage window, and select the Correlations tab.
  21. Click on Create Correlation…, the plus icon. This will add an empty correlation to the onMessage activity. For the Correlation Set field, select isbnCS from the drop-down list. Next, select No from the Initiate dropdown.
  22. Deploy the WarehouseService composite to the Oracle SOA Suite and use Enterprise Manager to submit multiple submitBookOrder messages.

    For book orders that contain the same ISBN number, you should see that they are routed through to the same instance of the BPEL process.

How it works...

Since the 11.1.1.6 release of the Oracle SOA Suite, Oracle BPEL Process Manager has supported a message aggregation feature. When multiple messages are routed to the same process, the first message is routed to create a new instance and subsequent messages can be routed to continue the created instance using a mid-process receive activity.

By default, this feature is disabled. To enable it, we need to set the property bpel.config.reenableAggregationOnComplete to true.

Note

Prior to 11.1.1.6, the same result could be achieved by using a different name for the operation that created the process instance from the operation used to receive the second and subsequent messages.

Once we have enabled aggregation, we still have to define the key that the BPEL engine should use in order to aggregate messages. For this, we use a correlation set (isbnCS, in the previous example) that consists of one or more properties (isbn in our case). These properties are then mapped, using a property alias, to the corresponding field in the messages that are being aggregated.

The combined value of these properties at runtime should result in a unique value (at least, unique across all instances of the same process) that allows the BPEL engine to route the message to the appropriate instance of a process.

On receipt of the first message in the aggregation, we need to tell the BPEL process to initialize the correlation set, which we did by setting the initiate property to true (in step 19). This then tells the BPEL engine to route all other messages that contain the same value to this instance of the BPEL process.

There's more...

With this pattern, messages are processed in the order they are received in, by the BPEL process; this may be different from the sequence in which the messages are sent. If messages need to be aggregated in a particular order, the messages should be re-sequenced prior to processing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.35.122