From Message Broker to Service Broker

Again, we will look at the CTU business case started in the previous chapter. While the Brazilian CIO was busy re-engineering and implementing order fulfillment in a Pan-American way, urgent needs for a lightweight service broker for mobile OSS/BSS operations were expressed in the Chile regional office.

A new project named Extended Data Interchange (XDI) was started independently. Initial requirements were to cover message brokering between ERP (Oracle EBS R11) and mobile/network equipment providers, routing purchase orders to different suppliers. Direct communication was impossible for the following two reasons:

  • OeBS R11 was able to post messages as SOAP 1.1, where most of the suppliers required SOAP 1.2.
  • Supplier endpoint maintenance was considered impractical in the core ERP and the need for a middleware layer was expressed. The solution had to be compact enough to be moved out from the production to the communication zone or even to the DMZ. Basic security features were implemented in the new broker.

From the very beginning, this solution was considered as temporary, as more mature enterprise products were expected. However, nothing is more permanent than temporary. Migration from R11 to R12 with the SOAP 1.2 support has been considerably delayed; full-fledged ESBs were prohibited by the headquarter until completion of the pilot SOA project in Brazil. We are sure that you must be familiar with such situations.

What have we got as an architect here?

  • We have local developers familiar with JEE.
  • We managed to negotiate the Canonical Protocol for all parties as HTTP. That greatly simplifies our life as we will deal with only one synchronous protocol, with simple operations and no SOAP conversions. If SOAP 1.2 is required, we will wrap our message in the required envelope.
  • We realize that this broker will be temporary anyway, but all EBO/EBM structures, transformations, routing slips are related to the core business, and will stay. Therefore, the solution must be highly modular, ensuring easy migration or coexistence with solutions to come.
  • Obviously, MB must be a very reliable and a good performer.

Naturally, new requirements emerged quite soon. Now, we have to transport more business messages (not only the purchase order as it was planned initially). Also, we have more application to integrate in addition to a single instance of OEBS. Still, all of this doesn't sound that bad. We can easily deduce that we have the classic RDT interchange pattern with transformation, possibly with translation and content-based routing. Synchronous APIs do not require complex processing; all brokering must be performed from a central location. The broker will be some kind of single point of contact, acting in the same manner as a Front Controller. The Front Controller is one of the common JEE patterns, although it is commonly related to UI and presentation-tier integration (http://www.oracle.com/technetwork/java/frontcontroller-135648.html). Physical realization of a Front Controller is our main interest as it's a classic servlet, and after consultations with developers we agreed that this will be the heart of our Message Broker.

Talking about Message Broker's body parts, the next vital organ will be Business Delegate, which can be accessed through the servlet's helper. As we discussed in the previous chapter, it has the responsibility to transform and dispatch messages to the actual workers (service providers). This will be the solution's brain. The last D (deliver) from RTD will be closely associated with Business Delegate, which presents the Adapter Factory pattern. Despite the initial intention to serve only HTTP communications, file/FTP operations requirements were added into the design.

A simplified Message Broker implementation

Now, we will walk through the RTD parts focusing on pattern implementation in a simple hub-and-spoke solution.

Receive

The receiving part must contain functions for message parsing, message identification, and the extraction of an execution plan based on message ID. These are typical parts of any servlet, except probably the execution plan, which we add for dynamic task invocation. A servlet's lifecycle starts with initialization, which occurs only once when the servlet is started, as presented in the following code:

String dispatchSufix = "";  
String mappingURL = null;
...    
public void init(ServletConfig config) throws ServletException {
        super.init(config);
        ...
        tmp = config.getInitParameter(MAPPING_URL);
        if ( tmp != null ) mappingURL = tmp;
        
        tmp = config.getInitParameter(OK_PAGE);
        if ( tmp != null ) okPage = tmp;
        ...
        tmp = config.getInitParameter(XMLRULESPROVIDER_URL);
        if ( tmp != null ) XDIRuleListWSURL = tmp;    

This fact gives us the possibility to put all heavy, but one-time, operations into the initialization routine, for instance, obtaining and caching the data sources. For simplicity, we will not use the database directly in this example, and in general the performance is not our primary concern. Initially, we just need some pages to display the positive and negative responses, resource mapping, default logging level, and type of rule engine we will use for the process type resolution (name of routing slip), and default transformation. All these parameters will be registered in the web.xml deployment descriptor according to the servlet specification and reassigned during the initiation step as shown in the preceding code.

We will simplify two default servlet operations, doGet and doPost, by calling the same processRequest procedure that will be our entry point to the processing logic:

public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException  {
         processRequest(request, response);
  }
public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    doGet(request, response);
  }

First, we will make an instance of a request helper, a log handler, and two supporting objects, namely MessageHeader and Acknowledge:

    protected void processRequest(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException    {
      ...
        MessageRequestHelper helper = getMessageRequestHelper(request, response);
        LogHandler logger = new LogHandler();
       ...
       MessageHeader mheader = new MessageHeader();
       Acknowledge ack = new Acknowledge();

The MessageHeader object will contain all message-context elements that we will extract from the inbound message. It is ideal that we will have a unified standard header (which is not always true) within a single organization (CTU), but external partners/mobile equipment suppliers use their own standards, which are not always compatible with SDBH. Thus, we will have to modify the message header after identifying the trading partner(s). Have a look at the following figure:

Receive

This modification can be done on the MessageHeader Java object if the receiver is one, or after combining the message header and message body together in an individual transformation step within an execution plan if we have multiple TP receivers.

We can say the same about the Acknowledge object. Initially, the sender was only one, Oracle e-Business Suite, and the Acknowledge was predefined for this application. However, as it was described in the business case, quite soon, other internal applications discovered the benefits of this Message Broker and the transformation of the Acknowledge message becomes necessary. Luckily, standardization of the Acknowledge message is easier than Message Header, as all applications are internal.

You can understand the structure of the Acknowledge object from the overridden toString() function, which we recommend that you should have for every entity class. The following code is for object structure only, as you can be better off with Apache commons for the implementation of the toString() function, by using ToStringBuilder.reflectionToString(this) for instance:

/*
    * @webmethod 
    */
    public String toString(){
        return getClass().getName()
        +" ackReceiptID:"+ackReceiptID
        +" ackDateReceived:"+ackDateReceived
        +" ackDateProcessed:"+ackDateProcessed
        +" ackTradingPartnerID:"+ackTradingPartnerID
        +" ackFileName:"+ackFileName
        +" ackReferenceNumber:"+ackReferenceNumber
        +" ackMessageID:"+ackMessageId
        +" ackMessageStatusCode:"+ackMessageStatusCode
        +" ackEventCode:"+ackEventCode
        +" ackEventDescription:"+ackEventDescription
        +" ackEventSource:"+ackEventSource
        +" ackAction:"+ackAction;
    }

When we talk about Message Header, we assume that it will be based (if not compatible) on the SBDH standard. For Acknowledge, there are no publicly accepted standards, but the structure of an existing, common Logging service (and its storage) could be a good start. In this design, a traditional log4j library was used for technical errors and a log DB structure was standardized long before the creation of this Message Broker. In addition to this, the function-related LogHandler is responsible for choosing an appropriate way to register business events (type of information, warning, and error).

Naturally, WS realization will be the most atomic and modular, therefore LogHandler has the capability to select the logging procedure. However, there were some concerns regarding possible performance deprivation. Logging level and realization are the parameters settled during servlet's initialization, and the default values are in the web.xml descriptor. We must remember that parameters acquired during the init phase will stay active during the servlet's life. Therefore, the logging level of the process must be configurable from the execution plan's header:

// Implementation of the task logger in LogHandler
public void log(String ipMsgtype, Task currtask) throws java.io.IOException  {
        try{
            log(ipMsgtype,
                getLogEventDescription(currtask), currtask.getMsgId(),   currtask.getSenderTPId(), 
                currtask.getTaskEngine(), currtask.getXDIInstanceID(),      currtask.getEdiProcessReportLevel()
           );
       }
       catch(Exception ex)  {  ex.printStackTrace(); }
    }

//Implementation of the basic logger in LogHandler
    public void log(String ipMsgtype, String ipMsgtext, String ipMsgcode, String ipUsermsg, String ipMsgsrc,  
      String ipMsgjobid, String ipLoglvl) throws java.io.IOException
    {
        EventLogHandler logwriter = new EventLogHandler();
        logwriter.log(ipMsgtype, ipMsgtext, ipMsgcode, ipUsermsg, ipMsgsrc, ipMsgjobid, ipLoglvl);
    }
....
//Typical use
  logger.log( "INFO", "Rule engine in use : " + RuleEngineType , mheader.getXDIMsgId(), mheader.getXDIUser(),
    "XDI.RuleEngine", mheader.getXDIJobRefId(), "3" );

Although the realization of the Logger WS is pretty straightforward, it also has two distinctive paths: either you write data directly into the DB or enqueue the message into Advanced Queue. Anyway, it is a one-way service and must not disrupt message processing. It is important to mention that Log4j (or a newer SLF4J library if you choose to use it) also has its own configuration for level and layout, which we set in a LogHandler.

Now, after the helper's initiation, our first task is to recognize an inbound message. There are two possible strategies here, based on the XML parsing: DOM and SAX. Obviously, each strategy has its own pros and cons, but if we require really high performance, we should rely on MsgID recognition by the SAX parser. We instantiate the SAX cmdHelper in the servlet's processRequest and execute the SAX parsing function.

The SAX parsing sequence is presented by the following code snippet:

//In XDIServlet, processRequest 
MessageCommand cmdHelper = helper.getCommand();
    XdiMsgID = cmdHelper.execute(helper);
    logger.log( "INFO", "New message received. Message ID: "+ XdiMsgID", "N/A", "N/A", XDIServlet.servletInstance + " Front Controller", "N/A", "3");

//In MessageRequestHelper
.....
    private static final String elementName = "XDIMSG_ID";
....
    public MessageCommand getCommand(){
        java.util.ArrayList elementList = new java.util.ArrayList();
        elementList.add(elementName);
        return new MessageGetElementCommand(elementList);
    }

// In MessageGetElementCommand    
public String execute(MessageRequestHelper helper) throws javax.servlet.ServletException, java.io.IOException {
    String elementContent = null;
    //
    // Start reading content
    //
    try{
        java.io.Reader bodyReader = helper.getReader();
        elementContent = getElementContent(bodyReader);
        bodyReader.close();
    }
    catch ( javax.xml.parsers.ParserConfigurationException e){
        throw new javax.servlet.ServletException(e);
    }
    catch ( org.xml.sax.SAXException e){
        throw new javax.servlet.ServletException(e);
    }
    return elementContent;
    }
...... 
// where getElementContent is SAX Parser implementation

private String getElementContent(java.io.InputStream inputStream) throws
    javax.xml.parsers.ParserConfigurationException,
    org.xml.sax.SAXException,
    java.io.IOException {
        String elementContent = null;
        inElement = false;
        SAXParserFactory factory = SAXParserFactory.newInstance();
        factory.setNamespaceAware(true);
        SAXParser parser = factory.newSAXParser();
       
        parser.parse(inputStream,this);
        if ( stringBuffer.length() > 0 ) elementContent = stringBuffer.toString();
        return elementContent;
    }

This SAX parsing routine is pretty standard for all the XML servlets as it's the first step to map message type/content for further actions. For simple routing actions, this implementation will be sufficient. As a matter of fact, if we want to have a full-fledged Message Broker with content-based routing, an implementation of the DOM parsing is inevitable, as shown in the following code:

//In XDIServlet,   processRequest  
//new: DOM parser. 
//Get message DOM using Oracle XDK
    XMLDocument msgXdiDOM = helper.getMessageDOM(ack);
 
//In MessageRequestHelper
    public XMLDocument getMessageDOM(Acknowledge ack) throws ServletException, IOException  {
    String bodyroot = null;

    //declaration for different DOM specs
    XMLDocument msgXdiDOM = null;
    // Document msgXdiDOM;
    ... 
    XDIMessageHelper msghelper = new XDIMessageHelper();
    try
    {
        //Oracle parser            
        msgXdiDOM = msghelper.getmsgXdiasDOM(getReader()); 
    }
    catch(Exception ex) {
        logger.log( "ERR", "Unable to parse incoming message", "N/A", "N/A", XDIServlet.servletInstance + " Front Controller", "N/A",  "3" );
        ack.setackMessageStatusCode(ack.STATUS_CRITICAL_ERROR);
        ack.setackEventCode(ack.STATUSCODE_CRITICAL_ERROR);
        ack.setackEventDescription(ex.toString());
        ex.printStackTrace();
      }
        return msgXdiDOM;
    }

......
// In XDIMessageHelper. Actual parsing 
//Technically you can use any DOM parser. We use classic oracle.xml.parser.v2.* It's also configurable through Servlets initiation
    public XMLDocument getmsgXdiasDOM(java.io.Reader reader) 
        throws IOException, SAXParseException, SAXException
    {
        XMLDocument msgXdiDOM = OraXMLHelper.parse(reader, null);
        return msgXdiDOM;
    }

As you have noticed, we are populating the Acknowledge object (passing the ack parameter) every time when it's necessary, and definitely in case of errors. We will do exactly the same with MessageHeader, right after obtaining the DOM message:

// set Message header values
    helper.setMessageHeader(msgXdiDOM, mheader, ack);

Why do we need to do this? For the same reason why the SAX parser with one element (MsgID) recognition is not enough. For the guaranteed identification of the business process, we will need SenderID and EventName at least (and some more, but we will skip the details for brevity):

Receive

Our MessageHeader elements will be used in the next step for the recognition of business process and extraction of the execution plan for this process. It will be used further for the construction of the XML Message Header in the delivery phase. Here, we will discuss how we are going to implement marshalling/unmarshalling for the core objects that we have in our broker. You can see a list of objects in the preceding screenshot.

Naturally, not all of them have to be converted into XML and back, but Process, ProcessHeader, Acknowledge, and MessageHeader are the primary candidates. For instance, in Chapter 2, An Introduction to Oracle Fusion – a Solid Foundation for Service Inventory, dedicated to modern SOA technology, we mentioned several common O/X mappers such as JAXB and JiBX frameworks. The key factors that are naturally defining our choice of marshaller are performance and ease of configuration. Spring O/X can be a very good alternative as we do not need to construct the JAXB context, JiBX binding factories, and so on. We have other options such as using Castor XML mappings, XMLBeans marshallers, and XStreams. The choice is yours, but you can also implement the conversion of an object to XML without any libraries. Here, our task is really simple. Therefore, good, simple Java constructs will work quite well and surprisingly fast. Every object has its own helper, where we have a primitive section for XML construction. First, we get an instance of the element writer utility, based on System.out.println(), as shown in the following code:

ElementWriter ewriter = new ElementWriter();
    java.io.StringWriter xdiDocWriter = new java.io.StringWriter();

We will then write our elements as shown in the following code:

xdiDocWriter.write("<mhs:MessageHeader>"); 
ewriter.element(xdiDocWriter, "mhs:XDIMsgId", mhs.getXDIMsgId());
ewriter.element(xdiDocWriter, "mhs:BusinessEvent", mhs.getBusinessEvent());
ewriter.element(xdiDocWriter, "mhs:Sender", mhs.getSender());

For more complex objects and messages, we will advise you to use the Spring framework, or any other that suits you.

Now, we have all the necessary functionalities to extract an execution plan. Similar to the realization discussed in the previous chapter, the execution plans are the XML objects stored as a file object, and they will be extracted from XML mapping file, linking the sender and the message IDs with process. The name and location of this file is configured using the web.xml deployment descriptor and extracted during the servlet's init phase. The FileIO realization of this lookup makes this broker extremely lightweight and suitable for autonomous installation on DMZ or in an integration zone without connecting to any database. If your requirements are not that strict, you can implement ExecutionPlanLookupService as a WS (using SCA from the previous chapter) and invoke it using MessageHeader as an input parameter. This was not the case when we decided to build this broker. After the extraction of the execution plan XML, we are ready to process it.

Transform

Transformation is not the only task we will perform; therefore, this name is a bit misleading. In general, we can invoke any EJB or another HTTP endpoint registered in the execution plan. Initially, we agreed that the scope of our tasks will be transformation, translation, and delivery (as FileIO or HTTP post), which are presented as individual helpers (dispatchers) that are controlled by the ProcessHandler factory. Please refer to the following figure:

Transform

VETRO sequence on custom Service Broker

It's a classic VETRO pattern, where validation (for V) is initially done by the receiver (servlet) when we parse it into DOM and implicitly done during the individual transformation (enrichment) steps. A message incompliant to the declared XSD will result in failure in transformation.

Technically, we are looping though the task list nodes and invoking a related helper to execute the task as follows:

    TaskHelper tskhelper = new TaskHelper();
    TransformerHandler transformhandler = new TransformerHandler();
    
    try {
        setBody(reader, true);
        XMLDocument taskListDOM = tskhelper.getTaskListDOM(mheader,ack);
        // new call for task ArrayList
        ArrayList  tasklist = tskhelper.getProcessTaskList(taskListDOM, mheader,ack);
...
         for (int i = 0; i < tasklist.size(); i++) {
             Task currtask = (Task)tasklist.get(i);
..
             if(currtask.getTaskAction().equals("Transform")){
                 log.info( "Executing:" + currtask.getTaskAction());
                 msgbodyReader = transformhandler.transformdispatcher(getReader(), currtask, request);
             if(currtask.getTaskAction().equals("Deliver")){
                 log.info( "Executing:" + currtask.getTaskAction());
                 DeliveryHandler deliverer = new DeliveryHandler();
                 deliverer.deliver(getReader(), currtask, request,ack, mheader);

The transformation engine type is defined as a parameter for the transformation task, and the dispatcher will send it to an appropriate engine where transformation is finally done. This is demonstrated in the following code:

    try {
        stylesheetfile = task.getStylesheetLocation();
        stylesheet = new XSLTInputSource(stylesheetfile);
        xmlsource = new XSLTInputSource(reader);
    }
    catch (Exception e) { ... }
    try {
    ...
        XSLTResultTarget xmlresult = new XSLTResultTarget(out);
        XSLTProcessor transformer = XSLTProcessorFactory.getProcessor();
        transformer.process(xmlsource, stylesheet, xmlresult );
        ...
        java.io.Reader msgbodyReader  = new java.io.StringReader(xmlresult.toString());
        return  msgbodyReader;
    try {
        stylesheetfile = task.getStylesheetLocation();
        stylesheet = new XSLTInputSource(stylesheetfile);
        xmlsource = new XSLTInputSource(reader);
    }
    catch (Exception e) { ... }
    try {
    ...
        XSLTResultTarget xmlresult = new XSLTResultTarget(out);
        XSLTProcessor transformer = XSLTProcessorFactory.getProcessor();
        transformer.process(xmlsource, stylesheet, xmlresult );
        ...
        java.io.Reader msgbodyReader = new java.io.StringReader(xmlresult.toString());
        return msgbodyReader;

Deliver

The last broker's responsibility is to deliver the message to the ultimate recipient(s):

//Dispatcher uses the task engine to dispatch to the certain task
...         
if(task.getTaskEngine().equals("XDIMB.apache.httpcomponents.httpclient")) { 
        log.info("Dispatching as HTTP, TaskCommType: "+ task.getTaskCommType() + "; Engine: "+ task.getTaskEngine());
            httpdeliverer.deliverCommonHTTP(outbodyReader, task, request,  ack, mheader);  
    }
... 
//here is the standard Apache HTTP Component library 
....
    tskurl = task.getReceiverEndpoint();
    userName = task.getReceiverEndpointUserName();
    port = task.getReceiverEndpointPort();
    tskhost = task.getReceiverEndpointHost();
    ...
    HttpParams params = new SyncBasicHttpParams();
    HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {...}

    ...
    HttpRequestExecutor httpexecutor = new HttpRequestExecutor();
    HttpContext context = new BasicHttpContext(null);
    HttpHost host = new HttpHost (tskhost, port );
    DefaultHttpClientConnection conn = new DefaultHttpClientConnection();
    ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();
    ...
    BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", tskurl);
    request.setEntity(requestBodies[i]);
    ...
    request.setParams(params);
    httpexecutor.preProcess(request, httpproc, context);
    HttpResponse response = httpexecutor.execute(request, conn, context);
    response.setParams(params);
    httpexecutor.postProcess(response, httpproc, context);
....

//If you want to dispatch to an other HTTP poster, add engine type to Execution Plan, new IF branch to dispatcher and new Java  deliverer

The pros and cons of a simplified Message Broker

The solution based on the presented architecture has been delivered quickly and served its purposes really well for a limited number of trading partners (message recipients). Most importantly, performance was more than acceptable and it was really reliable, so the tactical goals were achieved. We can even see this servlet-based approach as a good investment in the REST service infrastructure. Commonly, REST is implemented by Jersey-servlets, and we encourage you to look at this technology as it's not in the scope of this book. You will find a lot of similarities with the quick example we discussed previously. Oracle has many good examples that cover the servlet pattern and its utilization in JAX-RS/Jersey (http://docs.oracle.com/cd/E19776-01/820-4867/ghqxq/index.html). This means that you really do not have to do everything from ground zero for message brokering and service implementation. The actual purpose of this example was to demonstrate the physical implementation of some patterns such as Mediation and Adapter Factory, as discussed before, and mainly the EAI-SOA path of evolution: point2point | hub-and-spoke | message broker | service broker | full-fledge ESB.

The example also demonstrated the complexity of the task. We didn't cover a lot of features that are compulsory for a full-scale solution, for instance:

  • Basic security
  • MTOM / messages with attachments
  • Throttling / Load balancing

Also, many more solutions will be covered later. However, most importantly, we didn't implement true service decoupling, as no Proxy concepts were provided. It is a good time to return to the CTU service broker now and see how we can improve this solution using the discussed and tested patterns.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.65.247