Using dataflow variables for lazy evaluation

Dataflow concurrency is a concurrent programming paradigm that has been around for three decades now. What is so exciting about it?

The main idea behind Dataflow concurrency is to reduce the number of variable assignments to one. A variable can only be assigned a value once in its lifetime, while the number of reads is unlimited. If a variable value is not written by a write operation, all the read operations are blocked until the variable is actually written (bind). With this straightforward, single-assignment approach, it is impossible to access an inconsistent value or experience data race conflicts. The deterministic nature of Dataflow concurrency ensures that it will always behave the same. You can run the same operation 5 or 10 million times the result will always be the same. Conversely, if an operation enters into a deadlock the first time, it will do the same every other time you run it. These qualities make it very easy to reason about concurrency, but it comes at a price: code must be deterministic. Random, time, exceptions, and so on are not allowed. The section of code that employs Dataflow concurrency must act as a pure function, with input and output.

The Groovy's GPars framework exposes this alternative concurrency model, and in this recipe we are going to explore how to solve the problem of high latency when invoking external systems, exposed in the Running tasks in parallel and asynchronously recipe.

Getting ready

For setting up this recipe, please refer to the Getting Ready section of the Running tasks in parallel and asynchronously recipe.

Start the dummy web service using groovy app.groovy.

How to do it...

The following steps expose how to modify the CriminalService class to leverage Dataflow concurrency.

  1. Create a new Groovy class named CriminalServiceWithDataflow.
    package org.groovy.cookbook.dataflow
    
    import static groovyx.gpars.dataflow.Dataflow.task
    import groovyx.gpars.dataflow.DataflowVariable
    
    class CriminalServiceWithDataflow {
    
      def baseUrl
    
      CriminalServiceWithDataflow(String url) {
        baseUrl =  url
      }
    
    }
  2. Add a function to retrieve the JSON data for the specified country:
    def fetchData(String country) {
      println "fetching data for ${country}"
      def jsonResponse = new DataflowVariable()
      task {
    
        try {
          "${baseUrl}/${country}".toURL().openConnection().with {
            if( responseCode == 200  ) {
              jsonResponse << inputStream.text
            } else {
              jsonResponse << new RuntimeException('Invalid Response Code from HTTP GET:' +
                responseCode
              )
            }
            disconnect()
          }
        } catch( e ) { jsonResponse <<  e }
      }
      jsonResponse
    }
  3. Add the main function from which data aggregation is done:
    List getData(List countries) {
      List aggregatedJson = []
      countries.each {
        aggregatedJson << fetchData(it)
      }
      aggregatedJson*.val
    }
  4. To test our new class, let's add a simple test case:
    @Test
    void testDataflow() {
    
      def serviceUrl = 'http://localhost:5050'
      def criminalService =new CriminalServiceWithDataflow(serviceUrl)
    
      def data = criminalService.
        getData(['germany', 'us', 'canada'])
    
      assert 3 == data.size()
    
      data.each {
        try {
          println it
        } catch (e) {
          e.printStackTrace()
        }
      }
    
    }

How it works...

The fetchData function of the CriminalServiceWithDataflow class is where the power of Dataflow in action is really visible. The function contains a DataflowVariable named jsonResponse and a task that has the responsibility to populate the variable. This variable can be written only once, through the << operator. The task contains the actual code to access the Criminal Service web service with some simplistic exception handling code. When the value of a DataFlowVariable is read, it will block until the value is set (using <<). In this way, the time required to collect the data for three countries will be equal to the longest response time.

The getData function spans the HTTP requests over 3 threads. Note that the fetchData method is not blocking. The blocking takes place only in the last line of the getData method, when the val method is invoked (and therefore the variable read) on each DataflowVariable containing the HTTP GET response.

It's also worth noting how the exception handling is organized. Let's zoom into the code:

def jsonResponse = new DataflowVariable()
try {
  ...
} catch( e ) {
  jsonResponse <<  e
}

When an exception occurs inside the task, we assign the Exception to the jsonResponse variable of type DataflowVariable. The DataflowVariable class has two methods to access the stored value:

  • The val method that simply returns the Exception
  • The get method that will rethrow the Exception, if any

Use val or get depending on your exception handling requirements. You can test how the exception handling works, by shutting down the Ratpack server or passing invalid countries that will yield a 404 response code.

There's more...

Dataflow concurrency is a very elegant paradigm, and there are more concepts in this model than the one expressed in this recipe. The best way to learn them is head to the official GPars Dataflow documentation located at the following link: http://www.gpars.org/guide/guide/dataflow.html

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.37.254