Content registry

We have seen in this chapter that data ingestion is an area that is often overlooked, and that its importance cannot be underestimated. At this point, we have a pipeline that enables us to ingest data from a source, schedule that ingest, and direct the data to our repository of choice. But the story does not end there. Now we have the data, we need to fulfil our data management responsibilities. Enter the content registry.

We're going to build an index of metadata related to that data we have ingested. The data itself will still be directed to storage (HDFS, in our example) but, in addition, we will store metadata about the data, so that we can track what we've received and understand basic information about it, such as, when we received it, where it came from, how big it is, what type it is, and so on.

Choices and more choices

The choice of which technology we use to store this metadata is, as we have seen, one based upon knowledge and experience. For metadata indexing, we will require at least the following attributes:

  • Easily searchable
  • Scalable
  • Parallel write ability
  • Redundancy

There are many ways to meet these requirements, for example we could write the metadata to Parquet, store in HDFS, and search using Spark SQL. However, here we will use Elasticsearch as it meets the requirements a little better, most notably because it facilitates low latency queries of our metadata over a REST API, very useful for creating dashboards. In fact, Elasticsearch has the advantage of integrating directly with Kibana, meaning it can quickly produce rich visualizations of our content registry. For this reason, we will proceed with Elasticsearch in mind.

Going with the flow

Using our current NiFi pipeline flow, let's fork the output from "Fetch GKG files from URL" to add an additional set of steps to allow us to capture and store this metadata in Elasticsearch. These are:

  1. Replace the flow content with our metadata model.
  2. Capture the metadata.
  3. Store directly in Elasticsearch.

Here's what this looks like in NiFi:

Going with the flow

Metadata model

So, the first step here is to define our metadata model. And there are many areas we could consider, but let's select a set that helps tackle a few key points from earlier discussions. This will provide a good basis upon which further data can be added in the future, if required. So, let's keep it simple and use the following three attributes:

  • File size
  • Date ingested
  • File name

These will provide basic registration of received files.

Next, inside the NiFi flow, we'll need to replace the actual data content with this new metadata model. An easy way to do this, is to create a JSON template file from our model. We'll save it to local disk and use it inside a FetchFile processor to replace the flow's content with this skeleton object. This template will look something like:

{ 
  "FileSize": SIZE, 
  "FileName": "FILENAME", 
  "IngestedDate": "DATE" 
} 

Note the use of placeholder names (SIZE, FILENAME, DATE) in place of the attribute values. These will be substituted, one-by-one, by a sequence of ReplaceText processors, that swap the placeholder names for an appropriate flow attribute using regular expressions provided by the NiFi Expression Language, for example DATE becomes ${now()}.

The last step is to output the new metadata payload to Elasticsearch. Once again, NiFi comes ready with a processor for this; the PutElasticsearch processor.

An example metadata entry in Elasticsearch:

{
         "_index": "gkg",
         "_type": "files",
         "_id": "AVZHCvGIV6x-JwdgvCzW",
         "_score": 1,
         "source": {
            "FileSize": 11279827,
            "FileName": "20150218233000.gkg.csv.zip",
            "IngestedDate": "2016-08-01T17:43:00+01:00"
         }

Now that we have added the ability to collect and interrogate metadata, we now have access to more statistics that can be used for analysis. This includes:

  • Time-based analysis, for example, file sizes over time
  • Loss of data, for example, are there data holes in the timeline?

If there is a particular analytic that is required, the NIFI metadata component can be adjusted to provide the relevant data points. Indeed, an analytic could be built to look at historical data and update the index accordingly if the metadata does not exist in current data.

Kibana dashboard

We have mentioned Kibana a number of times in this chapter. Now that we have an index of metadata in Elasticsearch, we can use the tool to visualize some analytics. The purpose of this brief section is to demonstrate that we can immediately start to model and visualize our data. To see Kibana used in a more complex scenario, have a look at Chapter 9 , News Dictionary and Real-Time Tagging System. In this simple example, we have completed the following steps:

  1. Added the Elasticsearch index for our GDELT metadata to the Settings tab.
  2. Selected file size under the Discover tab.
  3. Selected Visualize for file size.
  4. Changed the Aggregation field to Range.
  5. Entered values for the ranges.

The resulting graph displays the file size distribution:

Kibana dashboard

From here, we are free to create new visualizations or even a fully-featured dashboard that can be used to monitor the status of our file ingest. By increasing the variety of metadata written to Elasticsearch from NiFi, we can make more fields available in Kibana and even start our data science journey right here with some ingest-based actionable insights.

Now that we have a fully-functioning data pipeline delivering us real-time feeds of data, how do we ensure data quality of the payload we are receiving? Let's take a look at the options.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.116.51