Building the Wolfram Info System

Now that we have our generic InfoSys module in place, we can work on specific backends. We’ll start with only one, our Wolfram backend. This module will call WolframAlpha to retrieve relevant information about our users’ annotations.

Our first step is to define our backend interface. Since all our backends will have the same contract, this is a perfect use case for a backend behaviour. A behaviour is a contract, a common API across modules. We have seen OTP behaviours, such as GenServer and Supervisor, as well as behaviours from libraries like Plug. Remember, each plug implements two functions, init/1 and call/2. Our behaviour will be a tiny contract between the information system and each backend, consisting of just two callbacks, name and compute. Create a file in lib/info_sys/backend.ex, and key this in:

 defmodule​ InfoSys.Backend ​do
  @callback name() :: String.t()
  @callback compute(query :: String.t(), opts :: Keyword.t()) ::
  [%InfoSys.Result{}]
 end

We define two functions. We don’t actually declare a function. Instead, we use typespecs, which specify not just the name of our functions but also the types of arguments and return values. In our case, the name function takes no arguments but returns a String type, so you can see the String.t in the typespec. The compute function takes a String.t query, a Keyword.t list of options, and returns a list of %InfoSys.Result{} structs.

With our behaviour in place, we can write our first backend. To do so, we’ll need to establish our dependencies. Wolfram Alpha returns XML responses, and we’ll use an XML parser to avoid processing those by hand. Let’s add :sweet_xml to our deps list in mix.exs. We want to add the dependencies to info_sys since umbrellas manage dependencies at the child applications:

 {​:sweet_xml​, ​"​​~> 0.6.5"​},

Next, run $ mix deps.get to grab the dependency from Hex. With our XML library in place, we’re ready to sign up as a WolframAlpha API developer and retrieve our application ID. Visit the WolframAlpha developer portal,[33] sign up for a new account, and follow the instructions to get your AppID.

Now that you have a developer API key, you could place it directly in config/dev.exs, but there’s a better way. You shouldn’t check in private credentials under version control. In fact, Phoenix points you in the right direction with the generated config/prod.secret.exs file. That file references environment variables that are securely set on the production server, meaning you can establish sensitive configuration in your local development environment without checking secret values into version control. That way you can include sensitive credentials properly. Let’s add our API key lookup to our development and prod environments. Since tests will not hit the Wolfram API directly, we don’t need to set a key for that environment. Add the following entry to your config/dev.exs and config/prod.secret.exs like this:

 wolfram_app_id =
  System.get_env(​"​​WOLFRAM_APP_ID"​) ||
 raise​ ​"""
  environment variable WOLFRAM_APP_ID is missing.
  """
 
 config ​:info_sys​, ​:wolfram​, ​app_id:​ wolfram_app_id

With setup out of the way, we can now implement our Wolfram backend in lib/info_sys/wolfram.ex, like this:

1: defmodule​ InfoSys.Wolfram ​do
import​ SweetXml
alias InfoSys.Result
5:  @behaviour InfoSys.Backend
@base ​"​​http://api.wolframalpha.com/v2/query"
@impl true
10: def​ name, ​do​: ​"​​wolfram"
@impl true
def​ compute(query_str, _opts) ​do
query_str
15:  |> fetch_xml()
|> xpath(​~​x​"​​/queryresult/pod[contains(@title, 'Result') or
contains(@title, 'Definitions')]
/subpod/plaintext/text()"​)
|> build_results()
20: end
defp​ build_results(nil), ​do​: []
defp​ build_results(answer) ​do
25:  [%Result{​backend:​ __MODULE__, ​score:​ 95, ​text:​ to_string(answer)}]
end
defp​ fetch_xml(query) ​do
{​:ok​, {_, _, body}} = ​:httpc​.request(String.to_charlist(url(query)))
30: 
body
end
defp​ url(input) ​do
35: "​​#{​@base​}​​?"​ <>
URI.encode_query(​appid:​ id(), ​input:​ input, ​format:​ ​"​​plaintext"​)
end
defp​ id, ​do​: Application.fetch_env!(​:info_sys​, ​:wolfram​)[​:app_id​]
40: end

To start our module, we import the functions we’ll need and set up a single alias. SweetXml will help us parse the XML we receive, and Result has the struct for the results we’ll use.

Next, we establish our module as a implementation of the InfoSys.Backend behaviour on line 5. In compute on line 13, we build a pipe to take our query, fetch the XML we’ll need, extract the results using the xpath function from SweetXml, and then build the results. We specify our compute function as an implementation of a behaviour with the @impl true notation. That module attribute is not required but it makes our intentions clear. Users of our module can immediately tell which functions implement our behaviour and which ones don’t. Next, we’ll look at the functions that do each one of these tasks.

In fetch_xml on line 28, we contact WolframAlpha with the query string that interests us. We use :httpc, which ships within Erlang’s standard library, to do the straight HTTP request, matching against :ok and the body that we return to the calling client. We use private functions to extract our API key from our application configuration and build the full URL of our API request.

In build_results on line 22, we build a list of result structs. build_results has two different forms, depending on whether we get results back or not. We match on the first argument in our function head. On nil, we need only return an empty list. Otherwise, we build a list of result structs with our expected results and score, and return them to the caller.

Let’s try it out with iex -S mix. First, start a query. We’ve designed our backend to return results to the calling process, which we’ve wrapped in Tasks inside our InfoSys.compute. We don’t yet await the task completion inside compute, but we can issue compute requests and await the tasks inside iex. Remember, each backend will return a spawned Task, like this:

 iex>​ InfoSys.compute(​"​​what is elixir?"​)
 [
  %Task{
  owner: #PID<0.320.0>,
  pid: #PID<0.340.0>,
  ref: #Reference<0.4138658672.566755329.204828>
  }
 ]

That query fires off a single Wolfram backend query and then the task sends results to the calling process. We can call Task.await on our task, but the result should be waiting for us in our current process when the task completes.

Let’s use the flush helper from IEx to see any messages we’ve received:

 iex(13)>​ InfoSys.compute(​"​​what is elixir"​)
 iex(14)>​ flush()
 [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "1 | noun | a sweet flavored liquid (usually containing a small ..."
  }
 ]
 ...
 iex(15)>​ InfoSys.compute(​"​​what is firebird?"​)
 iex(16)>​ flush()
 [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "1 | noun | the male is bright red with black wings and tail 2..."
  }
 ]
 ...

Brilliant. Our Wolfram service is working exactly as we expect. Once the task is complete, we receive the results in our mailbox. We can wait for each task to complete with Task.await. Your results may not be the same, but for every result you see in the list, you get our hardcoded score of 95 percent. Remember, flush() can just return :ok if the message isn’t yet in your inbox. If that happens to you, wait a few seconds and try again.

Monitoring Processes

If you watched closely, you may have also noticed the {:DOWN, ...} message we received, in addition to the task results. Internally, the Task library sets up a monitor from the caller process to the Task. If we wanted to, we could use Process.monitor to detect backend crashes while we’re waiting on results. Once a monitor is set, we’ll get a message when the monitored process dies. For example, you can see this concept at work in IEx:

 iex>​ pid = spawn(​fn​ -> ​:ok​ ​end​)
 iex>​ Process.monitor(pid)
 #Reference<0.0.2.2850>

We spawn a pid with a trivial function. We set up a monitor with Process.monitor. We get a reference back to identify this monitor. Meanwhile, the pid process dies immediately because it has no work to do. Let’s use flush to check out our IEx mailbox, like this:

 iex>​ flush()
 {:DOWN, #Reference<0.0.2.2850>, :process, #PID<0.405.0>, :normal} :ok

Nice! We receive a regular Elixir message as a {:DOWN, ...} tuple, informing us that our process died. We won’t be monitoring our backends directly with Process.monitor because the Task module calls it for us, but it’s nice to know how the monitoring primitives work as they power much of the high-level OTP tools you are used to using, such as supervisors and monitors.

To make our backends more friendly to our clients, we need to make a few modifications. We’ll need to detect when a backend crashes so we don’t wait for results that might never arrive. In addition, we need to order the results we get from all the backends by our relevance score so it will be easier to pick the best one. Finally, we need to specify a reasonable timeout so the information systems that take too long won’t hold up other results. Let’s get started.

Working with Task Tools

Elixir’s Task module has a perfect feature for our requirements: task yielding. While Task.await would crash the caller should a given task time out, Task.yield blocks the caller, returning the result, an error, or nil, depending on whether a reply is received. We also need the ability to wait on all tasks, taking no more than a given time for total execution. Fortunately, Elixir provides Task.yield_many, which gives us exactly that.

Let’s apply this feature to our InfoSys client. We’ll automatically collect results and ignore responses from crashed or tardy backends, making our services predictable and safe. Extend your apps/info_sys/lib/info_sys.ex, like this:

1: def​ compute(query, opts \ []) ​do
timeout = opts[​:timeout​] || 10_000
opts = Keyword.put_new(opts, ​:limit​, 10)
backends = opts[​:backends​] || @backends
5: 
backends
|> Enum.map(&async_query(&1, query, opts))
|> Task.yield_many(timeout)
|> Enum.map(​fn​ {task, res} -> res || Task.shutdown(task, ​:brutal_kill​) ​end​)
10:  |> Enum.flat_map(​fn
{​:ok​, results} -> results
_ -> []
end​)
|> Enum.sort(&(&1.score >= &2.score))
15:  |> Enum.take(opts[​:limit​])
end

The compute function now automatically waits for results. When we receive results, we sort them by score and report the top ones. The pipeline is interesting. We start with our backends, and map over them with the queries we fire to our backends. Starting on line 8, we take all spawned backend tasks and call Task.yield_many with our timeout.

Now, things get interesting. Let’s study the pipeline starting on line 9 through the end of the function. It’s an extremely dense chunk of code, but it’s all important.

First we need to walk through each of the task results. We map over each result, which comes in the form of a task-result tuple. In the function head, we match both to task and res for later use. For each one, we execute the expression res || Task.shutdown(task, :brutal_kill). That little snippet is the lynchpin of this block of code.

If we get a result back in res we simply return it, and the other half of the || operator never fires. If we get a nil back, we’ll process the right side. We shut down the task with a :brutal_kill option, meaning it’s an immediate shutdown, without waiting for completion. Note that this snippet also protects us from a race condition. Theoretically, a task could complete between when we ask for the yield_many and when we actually process the results. In this case, we still want to make sure to kill the task.

The result of this map is tuples with either {:ok, result} or {:error, reason}, and we’re ready to process those results. We grab successful results, ignore :error results by returning a []. We sort by score, and then use Enum.take to return up to the limit our client specifies.

And that’s a wrap. Whew.

Now that our code is yielding to our tasks, we’re left with only results that complete successfully within the specified timeout. That’s the beauty of tasks. They allow us a tidy way to handle resources that could otherwise leak.

Let’s give it a try:

 iex> InfoSys.compute(​"​​what is the meaning of life?"​)
 [
  %InfoSys.Result{
 backend:​ ​"​​wolfram"​,
 score:​ 95,
 text:​ ​"​​42 (according to the book The Hitchhiker's Guide..."​,
  }
 ]

Our information system now handles failures exactly as we desire. We were able to add complexity such as isolated failures and timeouts to the combined information system service without changing the policies for individual backends. Because each backend is simply synchronous code running inside a new task process, we can leverage everything in OTP to make our system resilient without changing the business code.

Caching Results

With our asynchronous backend in place, we’re ready to integrate our Cache server. Open up apps/info_sys/lib/info_sys.ex and add the compute function just below the defstruct, like this:

1: alias InfoSys.Cache
def​ compute(query, opts \ []) ​do
timeout = opts[​:timeout​] || 10_000
5:  opts = Keyword.put_new(opts, ​:limit​, 10)
backends = opts[​:backends​] || @backends
{uncached_backends, cached_results} =
fetch_cached_results(backends, query, opts)
10: 
uncached_backends
|> Enum.map(&async_query(&1, query, opts))
|> Task.yield_many(timeout)
|> Enum.map(​fn​ {task, res} ->
15:  res || Task.shutdown(task, ​:brutal_kill​)
end​)
|> Enum.flat_map(​fn
{​:ok​, results} -> results
_ -> []
20: end​)
|> write_results_to_cache(query, opts)
|> Kernel.++(cached_results)
|> Enum.sort(&(&1.score >= &2.score))
|> Enum.take(opts[​:limit​])
25: end

We modified our compute function to read from the cache for each backend given a query, join those values to the fetched results, and write new values to the cache. First, we added a lookup to return results from uncached backend queries, and merged those with existing cached results, on line 9. Then we piped the filtered backends to our original pipeline, performing our async task work as before. Next, we added a new pipe operation, where we write the new results to the cache, then append the cached results before sorting by score, on lines 21 and 22.

To support our new pipe operations, we wrote two private functions. Let’s add the first of them now, below our new compute function:

1: defp​ fetch_cached_results(backends, query, opts) ​do
{uncached_backends, results} =
Enum.reduce(
backends,
5:  {[], []},
fn​ backend, {uncached_backends, acc_results} ->
case​ Cache.fetch({backend.name(), query, opts[​:limit​]}) ​do
{​:ok​, results} -> {uncached_backends, [results | acc_results]}
:error​ -> {[backend | uncached_backends], acc_results}
10: end
end​)
{uncached_backends, List.flatten(results)}
end

On line 1, we defined a fetch_cached_results function to take all backends and accumulate the cached results for the given query, as well as the backends which contain no cached information. This way we can return both the cached result set, as well as the remaining backends that need fresh queries. Now, we can write the results, like this:

1: defp​ write_results_to_cache(results, query, opts) ​do
2:  Enum.map(results, ​fn​ %Result{​backend:​ backend} = result ->
3: :ok​ = Cache.put({backend.name(), query, opts[​:limit​]}, result)
4: 
5:  result
6: end​)
7: end

On line 1, we defined a write_results_to_cache function which uses Cache.put to write our uncached results to our cache using the backend, query, and relevant options as our cache key. These previous three listings hold a moderately large amount of code, but just about all of it is fulfilling the goal of organizing, writing, and reading responses from the cache. Very little of the code is related to the ceremony of managing our cache server. That code lives elsewhere, in our supervisor. Let’s try it out in IEx:

 iex>​ ​:timer​.tc(InfoSys, ​:compute​, [​"​​how old is the universe?"​])
 
 {1306573,
  [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "1.4×10^10 a (Julian years) (time elapsed since the Big Bang)",
  }
  ]}
 
 iex>​ ​:timer​.tc(InfoSys, ​:compute​, [​"​​how old is the universe?"​])
 {53,
  [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "1.4×10^10 a (Julian years) (time elapsed since the Big Bang)",
  }
  ]}
 
 iex>​ ​:timer​.tc(InfoSys, ​:compute​, [​"​​1 + 1"​])
 {1121249,
  [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "2",
  }
  ]}
 
 iex>​ ​:timer​.tc(InfoSys, ​:compute​, [​"​​1 + 1"​])
 {47,
  [
  %InfoSys.Result{
  backend: InfoSys.Wolfram,
  score: 95,
  text: "2",
  }
  ]}

We used :timer.tc to measure the execution time in microseconds to run the given module, function and arguments. We can see our first call returned in 1.3s, while our second identical query returned in 53 microseconds. Issuing a new query of “1 + 1”, yielded a similar result. The first query was uncached, and had to make the remote hop to WolframAlpha, taking just over one second. The next call hit the cache and returned in 47 microseconds. Not bad!

If you want to see the cache sweeping in action, wait 60 seconds, and re-issue one of our cached queries. You’ll see higher latency since our Cache clear operation is doing what it should. That wraps up our service. All that remains is to tie it into our channels.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.179.100