Chapter 14. Message Interaction Patterns

In every business process, messages are exchanged between participants. So far we have only looked at simple interactions, that is a single request followed by a reply, whether synchronous or asynchronous.

Asynchronous messaging adds additional complexities around the routing and correlation of replies. In this chapter, we will look at how BPEL PM uses WS-Addressing to manage this and in situations where this can't be used, examine how we can use correlation sets in BPEL to achieve the same result.

As part of this we look at some common, but more complex, messaging patterns and requirements; such as:

  • How we can handle multiple exchanges of messages, either synchronous or asynchronous between two participants.

  • How BPEL can be used to aggregate messages from multiple sources.

  • And though not strictly a message interaction pattern, examine one technique for process scheduling.

Finally, as we explore these patterns, we take the opportunity to cover some of BPEL's more advanced features, including FlowN, Pick, and Dynamic Partner Links.

Message routing

A key requirement in any message exchange is to ensure that messages are routed to the appropriate service endpoint. Initial web service implementations were built using SOAP over HTTP, primarily because HTTP is well understood and is able to leverage the existing Internet infrastructure.

However, one of the limitations of HTTP is that it is stateless in nature, and thus provides no support for conversations requiring the exchange of multiple messages. With synchronous interactions this is not an issue, because the response message for a particular request can be returned in the HTTP response.

However, with asynchronous interactions this is a more serious limitation. To understand why, look at the diagram below. The diagram shows a simple asynchronous interaction between two processes, A and B. In this case the interaction is started by process A initiating process B, which does some work before sending a response back to process A.

Message routing

This all looks pretty straightforward, until you consider how it actually works. The first thing to note is that this consists of two operations, one for the initial invocation, the other for the response. Each operation (or message) is sent as separate HTTP POST (with the HTTP response being empty).

When we invoke an operation on a BPEL process, the BPEL engine will either invoke a new process to handle it, as with the initial request, or route it through to the appropriate instance of an already running process.

This is where the complexity comes in. While this example shows Process A invoking Process B, it could potentially be invoked from multiple clients, for example another process or an external client, so how does Process B know the service endpoint it needs to invoke for the call back?

Secondly, assuming we have multiple instances of Process A and B running at the same time; once we have routed the message to the correct service endpoint, how does the BPEL engine know which instance of the Process A to route the response from Process B to?

WS-Addressing

To solve these issues, the BPEL engine makes use of WS-Addressing, which provides a standardized way of including all the addressing specific information as SOAP headers within a SOAP message.

To demonstrate how WS-Addressing achieves this, let us look at the WS-Addressing headers BPEL PM inserts into our request and response messages in the previous example.

Request message with WS-Addressing

The initial request sent by BPEL PM with WS-Addressing headers inserted looks something like the following:

<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2003/03/addressing"
xmlns:ptns="http://xmlns.oracle.com/ProcessA">
<env:Header>
 
<wsa:ReplyTo>
<wsa:Address>http://hostname:80/orabpel/default/ProcessA/1.0/
ProcessB/ProcessBRequester</wsa:Address>
<PortType>ptns:ProcessBCallback</PortType>
<ServiceName>ptns:ProcessBCallbackService</ServiceName>
</wsa:ReplyTo>
<wsa:MessageID>uuid://hostname/ProcessA/aaaaaaaa</wsa:MessageID>

</env:Header>
<env:Body>
...
</env:Body>
</env:Envelope>

From this we can see that we have two additional elements present in the SOAP header. The first element, wsa:ReplyTo contains the wsa:Address element which specifies the endpoint that Process B should send its asynchronous response to. Secondly, it contains the property wsa:MessageId which uniquely identifies the conversation, which as we will see in a moment is used to correlate the response message to the original requestor.

Response message with WS-Addressing

When sending our response message, the response will be sent to the address specified in the wsa:ReplyTo endpoint reference. In addition, if we look at the message below, we can see that it contains the property <wsa:RelatesTo> which contains the message ID specified in the original message.

It's this value that enables the endpoint to correlate the response back to the original request. In our case this enables the BPEL engine to route the response from Process B back to the instance of Process A which sent the original request.

<env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsa=http://schemas.xmlsoap.org/ws/2003/03/addressing>
<env:Header>
 
<wsa:RelatesTo> uuid://hostname/ProcessA/aaaaaaaa</wsa:RelatesTo>

</env:Header>
<env:Body>
...
</env:Body>
</env:Envelope>

Additional message exchanges

In the above example, it's quite feasible for Process A and Process B to send multiple messages to each other; any further exchange of messages between the two process instances will just contain the same <wsa:RelatesTo> property within the SOAP header.

For example, after receiving a response from Process B, Process A sends a second request to process B. Instead of re-sending the wsa:ReplyTo element and a new wsa:MessageId, it will just include the <wsa:RelatesTo> element containing the original message ID.

This provides sufficient information for BPEL PM to route the request through to the correct instance of Process B; if Process B then needs to send a second response to Process A it will just use the address specified in the wsa:ReplyTo element contained in the first request message and embed the <wsa:RelatesTo> property as before.

Using BPEL correlation sets

For situations where WS-Addressing isn't appropriate or available, BPEL provides the concept of correlation sets. Essentially correlation sets allow you to use one or more fields present in the body of all correlated messages (for example, orderId) to act as a pseudo-conversation ID (equivalent to the <wsa:MessageID> and <wsa:RelatesTo> properties in WS-Addressing).

A correlation set consists of one more properties; these properties are then mapped using property aliases to the corresponding field in each of the messages that are being exchanged. The combined value of these properties at run time should result in a unique value (as least unique across all instances of the same process), that allows the BPEL engine to route the message to the appropriate instance of a process.

Using correlation sets for multiple process interactions

A common requirement is for a client to make multiple invocations against the same instance of a process. The first is pretty much the same as a standard synchronous or asynchronous request, but all subsequent requests are subtly different as we now need to route the request through to the appropriate instance of an already running process rather than initiate a new instance.

Take the UserRegistration process; this is a long running process which needs to handle multiple synchronous requests during its lifecycle. The first operation submitUserRegistration is called by the client to initiate the process, which validates all the provided user information and returns a confirmation of success or otherwise.

The only information that is not validated at this stage is the email address; for this the process sends an email to the provided address containing a unique token which the user can use to confirm their address.

Once they have received the email they can launch their browser and submit the token. The web client will then invoke the confirmEmailAddress operation. It's at this point we need to use correlation sets to route this request to the appropriate instance of the UserRegistration process.

Defining a correlation set property

The first step is to choose a unique field that could act as a property. One such approach would be to use the userId specified by the user. However, for our purposes we want to use a value that the user will only have access to once they have received their confirmation email, so will use the token contained in the email.

To create a property, within the Structure view for the BPEL process, right-click on the Properties folder and select Create Property... as shown in the following screenshot:

Defining a correlation set property

This will launch the Create CorrelationSet Property window. Give the property a meaningful name, such as EmailToken and then click the flashlight to launch the Type Chooser and select the appropriate schema type (for example, xsd:string) as shown in the following screenshot:

Defining a correlation set property

Defining correlation set

Once we've defined our Correlation Set Property(s), the next step is to define the Correlation Set itself.

Correlation Sets can be defined either at the process level or for a particular scope. In most cases the process level will suffice, but if you need to have multiple correlated conversations within the same process instance, for example, iterations through a while loop, then define the Correlation Set at scope level.

Within the BPEL Structure view expand the Correlation Sets folder, and then the Process folder, and right-click on the Correlation Sets folder and from the menu select Create Correlation Set... as shown in the following screenshot:

Defining correlation set

This will launch the Create Correlation Set window displayed in the following screenshot. Give the correlation set a meaningful name, such as EmailTokenCS in our case, and then select the + symbol to add one or more properties to the correlation set. This will bring up the Property Chooser where you can select any previously defined properties.

Defining correlation set

Using correlation sets

Next we need to specify which messages we wish to route with our correlation set. For our purposes we want to use the Correlation Set to route the inbound message for the operation confirmEmailAddress to the appropriate process instance.

To configure this, double-click the receive activity for this operation to open the Receive activity window and select the Correlations tab as shown:

Using correlation sets

Next select the + symbol; this will launch the Correlation Set Chooser as shown in the following screenshot:

Using correlation sets

From here we can select the EmailTokenCS we defined previously. Click OK and this will return us to the Correlations tab, showing the newly added correlation.

Using correlation sets

We can see here that we have to specify one additional property Initiate. This is used to specify which message should be used to initiate the correlation set.

Initializing the correlation set

As you would expect, the value of the property(s) contained in the first message exchanged in any sequence of correlated messages must be used to initiate the value of each property contained within the correlation set.

However, rather than implicitly initialize the correlation set based on the first message exchange, BPEL expects you to explicitly define which message activity should be the first in the sequence by setting the Initiate property to Yes.

Note

If we try to initialize an already initialized correlation set, or try to use a correlation set that isn't initialized, then a run-time exception will be thrown by the BPEL engine. Likewise, once initialized, the value of these properties must be identical in all subsequent messages sent as part of the sequence of correlated messages, or again the BPEL engine will throw an exception.

When initializing a correlation set, any outbound message can be used to achieve this. However, there are practical restrictions on which inbound messages can be used to initiate a correlation set, since the process must first receive the inbound message before it can use it to initialize a correlation set.

Essentially, if an inbound message is used to create a new instance of a process, or is routed through to the process by another mechanism (for example, a different correlation set) then it can be used for the purpose of initiating our correlation set.

In our case, we are using the correlation set to route the inbound message for the confirmEmailAddress operation through to an already running process instance, so we need to initialize the correlation set in an earlier message. We can do this within the invoke activity for the sub-processs validateEmailAddress.

We define a correlation set for an Invoke activity as we would for any message based activity, that is, we open its properties window and select the Correlations tab as shown in the following screenshot:

Initializing the correlation set

However, you may notice that when creating a correlation for an Invoke activity, we are required to set the additional attribute Pattern. This is because, unlike any other message activity, Invoke can consist of two messages, the initial outbound request, and an optional corresponding inbound response. The pattern attribute is used to specify to which message the Correlation Set should be applied; that is, out for the outbound request, in for the inbound response and out-in for both.

Since validateEmailAddress is a one-way operation, we need to set the Pattern attribute to out.

Note that if you choose to initiate the correlation with an out-in pattern, then the outbound request is used to initiate the Correlation Set.

Defining property aliases

Once the messages to be exchanged as part of our Correlation Set have been defined, the final step is to map the properties used by the Correlation Set, to the corresponding fields in each of the messages exchanged.

To do this, we need to create a property alias for every Message Type exchanged that is, i.e. validateEmailAddress and confirmEmailAddress in our User Registration example.

To create an alias, within the Structure view for the BPEL process, right-click on the Property Aliases folder and select Create Property, this will launch the Create Property Alias window, as shown:

Defining property aliases

In the Property drop down, select the property that you wish to define the alias for and then using the Type Explorer navigate through the Message Types, Partner Links down to the relevant Message Type and Part that you want to map the property to.

This will activate the Query field, where we specify the XPath for the field containing the property in the specified message type. Rather than type it all by hand, press Ctrl + Space to use the XPath Building Assistant.

Once we have defined an alias for each of the messages exchanged within our correlation set, we can view them within the Structure view of the BPEL process as shown in the following screenshot:

Defining property aliases

This completes the definition of our Correlation Set.

Note

A BPEL process can define multiple Correlation Sets, and messages exchanged within a BPEL process can be exchanged in zero, one, or more correlation sets. When a message is involved in multiple Correlation Sets, it can be the same or different fields that are mapped to a corresponding property. You will of course require a separate property alias for each correlation set.

Message aggregation

A typical messaging requirement is to aggregate multiple related messages for processing within a single BPEL process instance. Messages are aggregated using a common correlation Id, in much the same way as we covered above.

The other challenge is to determine when we have all the messages that belong to the aggregation. Typically, most use cases fall into two broad patterns:

  • Fixed Duration: In this scenario we don't know how many messages we expect to receive, so will process all those received within a specified period of time.

  • Wait For All: In this scenario we know how many messages we expect to receive; once they have been received we can then process them as an aggregated message. It's usual to combine this with a timeout in case some messages aren't received so that the process doesn't wait forever.

An example of the first pattern is the oBay auction process. Here, during the period for which the auction is in progress, we need to route zero or more bids from various sources to the appropriate instance of the auction. Then once the auction has finished, select the highest bid as the winner. The outline of the process is shown on the next page.

From this, we can see that the process supports two asynchronous operations, each with a corresponding callback, namely:

  • initateAuction: This operation is used to instantiate the auction process. Once started, the auction will run for a preset period until completing and then invoke the callback returnAuctionResult, to return the result of the auction to the client which initiated the auction.

  • submitBid: This operation is used to submit a bid to the auction. The operation is responsible for checking each bid to see if we have a new highest bid, and if so updates the current bid price appropriately, before returning the result of the bid to the client. The process then loops back round to process the next bid.

    Message aggregation

Message routing

The first task for the aggregator is to route bids through to the appropriate instance of the auction process. As with our earlier UserRegistration example, we can use a correlation set to route messages to the appropriate instance. In this example, we will create a correlation set based on the element auctionId, which is included in the message payload for initateAuction and submitBid.

At first glance this looks pretty straightforward, as we can use correlation sets for aggregation in much the same way as we have already covered. However, this scenario presents us with an additional complexity, which is that a single instance of a BPEL process may receive multiple messages of the same type at approximately the same time.

To manage this we need to implement a queuing mechanism, so that we can process each bid in turn before moving onto the next. This is achieved by implementing the interaction between the client submitting the bid and the auction process as asynchronous.

Note

With asynchronous operations, BPEL saves received messages to the BPEL delivery queue. The delivery service then handles the processing of these messages, either instantiating a new process or correlating the message to a waiting, receive, or onMessage activity in an already running process instance.

If a process is not ready to receive a message, then the message will remain in the queue until the process is ready.

This introduces a number of complexities over our previous correlation example. This is because a BPEL process can only support one inbound Partner Link (for example, client), for which the BPEL engine generates a corresponding concrete WSDL which defines all operations that can be invoked against that BPEL process (as well as any corresponding callbacks).

When BPEL generates the WSDL it includes the appropriate WS-Addressing header definitions for each of the operations. However, only operations which are used to instantiate a process, that is, initiateAuction in the case of the auction process, include WS-Addressing headers to hold the reply to address and message ID.

Correlating the callback

The first complexity this causes is that whenever a client submits a request to the auction process via the submitBid operation, it doesn't include a message ID, so we can't use WS-Addressing to correlate the response of the auction process back to the client.

At first, the obvious answer might appear to be just to use the auctionId to correlate the result of the bid back to the client. However, while the auctionId allows us to uniquely identify a single instance of an auction, it doesn't allow us to uniquely identify a bidder. This at first may seem strange, but recall we may have several clients calling the auction process at the same time, and thus all waiting for a response. We need to ensure that each response is returned to the appropriate instance.

Thus the calling client will need to pass a unique key in the submitBid request message (for example, bidId) that the auction process can include in the response. Assuming we are using BPEL to implement the client, we then need to implement a correlation set based on this property in the calling process, so that the BPEL engine can route the response to the appropriate instance of the client process.

Specifying the reply to address

The second complexity is that whenever a client submits a request to the auction process via the submitBid operation, it doesn't include a replyToAddress within the SOAP header.

As a result the BPEL engine doesn't know which address to send the reply to; or rather it will attempt to send the reply to the process which initiated the auction. It was this request which contained the wsa:ReplyTo header specifying the callback endpoint for the client Partner Link.

This highlights the other issue, namely our auction process supports two callbacks, one to return the auction result and the other to return the bid result. Yet the replyToAddress on the Partner Link is being fixed with the initial invocation of the process, forcing both callbacks to be routed to the same endpoint, which is not what we want.

Creating a proxy process

At this point, you may be thinking that this all may be too complex. However, the solution is rather straightforward and that is to use a proxy process, which supports the same operations as the auction process.

With this approach the client invokes either the initateAuction or submitBid operation on the proxy, which then forwards the request to the auction process. The auction process then returns the result to the proxy, which then returns it to the original client.

This not only solves the problem of having a fixed reply to address, but has the additional benefit of shielding the client from having to use Correlation Sets, as it can use WS-Addressing to communicate with the proxy.

Using the pick activity

Our proxy process needs to support both operations, initateAuction and submitBid, that is, either operation can be used to initiate an instance of the proxy process. To achieve this, we will use the <pick> activity at the start of our process, in place of a <receive> activity.

A pick activity is similar to a receive activity; the difference being that with a pick activity you can specify the process waits for one of a set of events, events can either be the receipt of a message or an alarm event (which we look at later in this chapter).

Each message is specified in a separate <onMessage> branch, each branch containing one or more activities to be executed on receipt of the corresponding message. To use a pick activity, drag a <pick> activity from the Process Activities list of the Component Pallet on to your process.

As the pick activity is used to receive the initial message which starts the process, we need to set the createInstance attribute on the activity. To do this, double-click the pick activity to open the Pick activity window as shown below, and select the Create Instance checkbox.

Using the pick activity

Next, within the process diagram, click on the + symbol to expand the <pick> activity. By default it will have two branches illustrated as follows:

Using the pick activity

The first branch contains an <onMessage> component, with a corresponding area where you can drop a sequence of one or more activities that will be executed if the corresponding message is received.

The second branch contains an <onAlarm> sub-activity, with a corresponding area for activities. It doesn't make sense to have this as part of the initial activity in a process, so right-click on the onAlarm sub-activity and select delete to remove it

We require two OnMessage branches, one for each operation that the process supports. For this, click on the Add OnMessage Branch icon (highlighted in the previous diagram) to add another <onMessage> branch.

The next step is to configure the onMessage branch. Double-click on the first branch to open the OnMessage Branch activity window as shown in the following screenshot:

Using the pick activity

As we can see an OnMessage Branch is configured in a similar fashion to a Receive activity. For the purpose of our proxy, we will configure the first onMessage branch to support the initateAuction operation (as shown in the previous screenshot) and the second onMessage branch to support the submitBid operation.

Each branch will just contain an invoke and receive activity to call the corresponding operation provided by the auction process, and a final invoke activity to return the result of the operation to the caller of the process.

Defining the correlation sets

For our proxy process we need to define a Correlation Set for the submitBid operation, to ensure that replies from the Auction process are routed through to the correct instance of the AuctionProxy process.

As mentioned earlier, this requires us to include a unique bidId within the submitBid message. To generate this we can use the XPath function generateGUID, which is available under the category BPEL XPath Extension Function within the expression builder.

Note, we don't need to define a correlation set for the initateAuction operation, as the corresponding operation on the auction process is still using WS-Addressing.

Completing the aggregation

All that remains is to add in the logic that enables the process to determine when the aggregation is complete. For a scenario where we know how many messages we expect, every time we receive a message we just need to check whether there are any outstanding messages and proceed accordingly.

However, for scenarios where we are waiting for a fixed duration, as is the case with our auction process, it's slightly trickier. The challenge is that for the period over which the auction is running, the process will spend most of its time in a paused state, waiting for the receive activity to return details of the next bid.

So the only opportunity we have within the logic of our process to check whether the duration has expired is after the receipt of a bid, which may arrive long after the auction has completed or not at all (since the auction has theoretically finished).

Ideally what we want to do is place a timeout on the Receive activity, so that it either receives the next bid or times out on completion of the auction, whichever occurs first.

Fortunately, this can be easily accomplished by replacing the Receive activity for the submitBid operation with a Pick activity. The Pick would contain two branches: an onMessage branch configured in an identical fashion to the Receive activity and an onAlarm branch configured to trigger once the finish time for the auction has been reached.

To configure the onAlarm branch, double-click on it to open the OnAlarm Branch activity window as shown in the following screenshot:

Completing the aggregation

We can see that an OnAlarm branch is configured in a similar fashion to a Wait activity in that we can specify the Pick waits For a specified duration of time or Until a specified deadline. In either case you specify a fixed value or specify an XPath expression to calculate the value at run time.

For our purposes we have pre-calculated the finish time for the auction based on its start time and duration, and have configured the Pick activity to wait until this time.

When triggered, the process will execute the activities contained in the OnAlarm branch and will then move onto the activity following the Pick. In the case of our auction process, the branch contains just a single activity which sets the flag auctionComplete to true, causing the process to exit the while loop containing the Pick activity. Upon exiting the loop, the process calculates and returns the auction result before completing.

Scheduling services

A common requirement is to schedule a process or service to run at regular intervals. For example, the oBay accountBilling process is required to be run once every night. One approach would be to use a scheduling tool; there are a number of tools available for this, including:

  • Quartz: This is an open-source, Java-based scheduler. The advantage of Quartz is that it is already used internally by the BPEL engine for scheduling, so is available to use as part of the SOA Suite platform. However, this approach requires knowledge of the API as well as Java.

  • Oracle Database Job Scheduler: Provided as part of the Oracle Database, such as Quartz. It is available regardless of which platform you are running the SOA Suite on (assuming you are using Oracle as the backend database). However, it requires knowledge of PL/SQL.

While these are all perfectly valid approaches, they all require knowledge of components outside the SOA suite; an alternate approach is to use BPEL to implement the scheduler.

One approach is to implement a BPEL process which continuously loops, with the sole purpose of launching other scheduled BPEL processes. However, as the process never dies, this will result in an ever increasing audit trail, causing the objects persisted in the database as well as the in-memory size of the process to grow over time, which eventually will have a negative impact on the performance of the engine.

A better approach is to have an XML file that specifies a series of one or more services (or jobs) to be scheduled. We can then use the file adapter to read this file and trigger a scheduling process which can invoke each of the scheduled jobs. Once all jobs have been triggered then the scheduling process can be allowed to complete.

The trick with this approach is to re-cycle the scheduling file; that is, in the process of reading the file, the file adapter will move it to an archive directory. To ensure the scheduling process is re-run every day, we need to move the file back into the directory being polled by the adapter. We can do this using the scheduling process.

Defining the schedule file

For our oBay example, we are simply going to create a scheduling process that is run once at the start of the day; the schedule file will then contain details of each job to be run, and at what time during the day. The schema for our scheduling file is as follows:

<?xml version="1.0" encoding="utf-8"?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns="http://xmlns.packtpub.com/obay/xsd/sch"
targetNamespace="http://xmlns.packtpub.com/obay/xsd/sch"
elementFormDefault="qualified" >
<xsd:element name="schedule" type="tSchedule"/>
<xsd:element name="job" type="tJob"/>
<xsd:complexType name="tSchedule">
<xsd:sequence>
<xsd:element name="startTime" type="xsd:time"/>
<xsd:element ref="job" minOccurs="1"
maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>
 
<xsd:complexType name="tJob">
<xsd:sequence>
<xsd:element name="endpoint" type="xsd:string" />
<xsd:element name="startTime" type="xsd:time"/>
<xsd:element name="jobDetail" type="xsd:anyType"/>
</xsd:sequence>
</xsd:complexType>

</xsd:schema>

The bulk of the schedule file is made up by the Job element; with each schedule file containing one or more jobs. The job elements contains three elements:

  • endpoint: Defines the endpoint of the service to invoke.

  • startTime: Defines the time that the service should be invoked.

  • jobDetail: Defined as xsd:anyType; is used to hold details specific to the service being invoked.

For the purpose of our accountBilling process our schedule file looks as follows:

<?xml version="1.0" encoding="UTF-8" ?>
<schedule xmlns="http://xmlns.packtpub.com/obay/xsd/sch">
<startTime>0:2:55.125</startTime>
<job>
<endpoint>
http://localhost:80/orabpel/obay/accountBilling/1.0/
</endpoint>
<startTime>T02:00:00</startTime>
<jobDetail>
</jobDetail>
</job>
</schedule>

Using FlowN

To ensure that our schedule process supports the concurrent execution of jobs, we need to process them in parallel. If the number of branches/jobs was fixed at design time, we could use the <flow> activity to achieve this.

For our scenario, the number of branches will be determined by the number of jobs defined in our scheduling file. For cases such as these we can use the <flowN> activity; this will create N branches, where N is calculated at run time.

Each branch performs the same activities and has access to the same global data, but is assigned an index number from 1 to N to allow it to look up the data specific to that branch.

To use a FlowN activity, drag a <flowN> activity from the Process Activities list of the Component Pallet on to your process. Double-click on it to open the FlowN activity window as shown in the following figure:

Using FlowN

In addition to the activity Name, it takes two parameters. The first parameter is N which contains an XPath expression used at run time to calculate the number of parallel branches required. This typically uses the count function to count the number of nodes in a variable. In our case, we need to calculate the number of job elements, so our expression is defined as follows:

count(bpws:getVariableData('InputVariable','schedule', '/sch:schedule/sch:job'))

The second parameter, Index Variable, is used to specify the variable into which the index value will be placed at run time. While we have defined this as a global variable, each branch will be given its own local copy of the variable containing its assigned index number.

Accessing branch specific data in FlowN

The first step within the flowN branch is to get a local copy of the data that is to be processed by that specific branch, that is, the Job in our case.

Before we do this we need to ensure that we are working with local variables, otherwise each branch in the flowN will update the same process variables. The simplest way to achieve this is to drop a scope (which we've named ProcessJob) as the activity within the flowN branch, and to then define any branch specific variables at the scope level and perform all branch specific activities within the scope.

In this case, we have created a single variable JobInputVariable of type Job, which we need to populate with the job element to be processed by the flowN branch. To do this, we need to create an XPath expression that contains a predicate to select the required job based on its position with the node set, in effect doing the equivalent of an array lookup in a language such as Java.

The simplest way to achieve this is to create a standard copy operation, as shown in the following screenshot:

Accessing branch specific data in FlowN

Next we need to modify the From XPath expression (circled in the previous screenshot) so that we only select the required job based on the value of the index. To do this, modify the XPath to add a position based predicate based on the index, to obtain an expression that looks something like the following:

/sch:schedule/sch:job  [bpws:getVariableData('index')]

The next step within our branch is to use a Wait activity to pause the branch until the startTime for the specified job.

Dynamic Partner Links

The final step within our branch is to call the service as defined by the endpoint in the Job element. Up to now we've only dealt in BPEL with static Partner Links, where the endpoint of a service is defined as design time.

However, BPEL also provides support for dynamic Partner Links, where we can override the endpoint specified at design time, with a value specified at run time.

Define common interface

While we can override the endpoint for a partner link, all other attributes of our service definition remain fixed. So to use this approach we must define a common interface that all of our Job services will implement. For our purpose we've defined the following abstract WSDL:

<?xml version="1.0" encoding="UTF-8"?>
<definitions name="Job">
<types>
<schema>
<import namespace="http://xmlns.packtpub.com/obay/xsd/sch"
schemaLocation="../schedule.xsd"/>
<element name="executeJob" type="client:tExecuteJob"/>
 <complexType name="tExecuteJob">
<sequence>
<element ref="sch:Job"/>
</sequence>
</complexType>

</schema>
</types>
<message name="executeJob">
<part name="payload" element="tns:executeJob"/>
</message>
<portType name="Job">
<operation name="executeJob">
<input message="tns:executeJob"/>
</operation>
</portType>
<plnk:partnerLinkType name="Job_PL">
<plnk:role name="Job_Role">
<plnk:portType name="tns:Job"/>
</plnk:role>
</plnk:partnerLinkType>
</definitions>

Examining this, we can see that we've defined a simple one-way operation ( executeJob) that our scheduling process will invoke to initiate our job. For simplicity, we have defined the content of the input message to be that of the job element that we used in our scheduling file.

Define Job Partner Link

Before we can define a Job Partner Link within our Schedule process, we need a WSDL file complete with bindings. The simplest way to do this is to deploy a default process that implements our abstract WSDL. To do this, create a BPEL process (such as JobService) based on our predefined WSDL contract (as described in Chapter 10). The process just needs to contain a simple Receive activity as it should never be called.

Note that for any other service that we wish to invoke as a job, we will need to create a process based on our abstract WSDL, and then once created implement the process as required to carry out the job.

Once we've deployed our default JobService process we can create a Partner Link and invoke it within our scheduler process just as would any other service.

Create endpoint reference

To dynamically invoke the appropriate endpoint at run time we need to update the endpoint reference before invoking the service. To do this, we need to create variable of type EndPointReference (as defined by WS-Addressing) containing just the <Address> element and populate this with the endpoint of the Job service that we want to invoke.

This is important, because if we create an EndpointReference containing any of the other optional elements, then if we try to invoke the Partner Link, the BPEL engine will throw a fault.

Note

To create a variable of type EndpointReference, you will need to import the WS-Addressing schema (located at http://<host>:<port>/orabpel/xmllib/ws-addressing.xsd).

To populate the address element, use a transformation activity rather than an assign activity, as shown in the following screenshot:

Create endpoint reference

If we use an assign too directly to populate the <Address> element, then BPEL by default would create an initialized <EndpointReference> element containing all the other optional elements (each with an empty value).

Update Endpoint

Finally, we use another copy rule to dynamically set the partner link. The key difference here is that the target of the copy rule is the JobService PartnerLink, as shown in the following screenshot:

Update Endpoint

Now when we invoke the JobService via the Partner Link it will dynamically route the request to the updated endpoint.

Re-cycling the scheduling file

As we've already covered, the scheduling process is triggered by the file adapter reading in the schedule.xml file. As part of this activity, the file adapter will move it to an archive directory, to ensure that the file is processed just once.

However, in our case we actually want the file adapter to process the scheduling file on a daily basis. To do this we need to move the file back into the directory being polled by the adapter.

For the purpose of this we have defined the following two directories:

<SOA_HOME>/scheduler/config
<SOA_HOME>/scheduler/execute

When creating our scheduling process, we have configured the file adapter to poll the execute directory on a regular basis (for example, every five minutes), and archive processed files to the config directory.

When the schedule.xml file is placed into the execute directory for the first time, this will trigger file adapter to pick up the file and launch the scheduler process, and at the same time move the schedule file into the config directory.

Within the scheduler process, we then invoke the file adapter to move the schedule.xml file from the config directory back to the execute directory (see Chapter 3Service Enabling Existing Systems for details on how to do this). However, rather than invoke the moveFile operation immediately, we have placed a wait activity in front of it that waits until the startTime defined at the head of the schedule file, as shown:

<?xml version="1.0" encoding="UTF-8" ?>
<schedule xmlns="http://xmlns.packtpub.com/obay/xsd/sch">
<startTime>0:2:55.125</startTime>
<job>
...
</job>
</schedule>

This has a couple of advantages, the first is we use the schedule.xml file to control when the scheduling process is run, as opposed to configuring the file adapter to poll the execute directory once every 24 hours and then deploy the process at the right time to start the clock counting.

The other advantage is that most of the time the schedule.xml file resides in the config directory. Thus, while the file is in this directory we can go in and modify the schedule to add new jobs or update and delete existing jobs, which will then be picked up the next time the scheduler is executed.

Summary

In this chapter, we have looked at the more advanced messaging constructs supported by the Oracle BPEL PM. We have also seen how we can use this to support some of the more complex but relatively common message interaction patterns used in a typical SOA deployment.

We have also used this as an opportunity to introduce some of the more advanced BPEL activities and features such as the Pick and FlowN activity as well as Dynamic Partner Links.

We have not covered every possible pattern. Yet, hopefully you should now have a good understanding of how BPEL PM utilizes WS-Addressing and Correlation Sets to support message interactions that go beyond a single synchronous or asynchronous request and reply. You should now be able to apply this understanding to support your particular requirements.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.134.151