© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2022
A. CarruthersBuilding the Snowflake Data Cloudhttps://doi.org/10.1007/978-1-4842-8593-0_8

8. Data Pipelines

Andrew Carruthers1  
(1)
Birmingham, UK
 

Most of our data ingestion into Snowflake is via one of two routes: flat file submission loaded via S3 or directly landed using ELT/ETL tooling . We do not address ELT/ETL tooling specifically in this chapter but do call out entry points where relevant, instead of focusing on the Snowflake components and interactions.

You should be familiar with data historization and Slowly Changing Dimensions (SCD) concepts from previous chapters. I reuse the same pattern here but also recognize not every feed requires SCD2. However, for historical reporting purposes, it is most common to want to report temporally and be able to accurately re-create reports for any point in time that we hold data.

Once more, we call out how we address data corrections (at the source only, not in our repository) to preserve our system integrity and not allow data changes in two separate systems.

Please ensure your account was created for AWS with Business Critical Edition. The examples here are dependent upon Business Critical Edition features. Otherwise, please re-register using a different email address by selecting the Business Critical option at https://signup.snowflake.com .

A conceptual end-to-end data flow is shown in Figure 8-1. You have seen most of these components in previous chapters, but they are now collated into a simple view to be enriched throughout this chapter.
Figure 8-1

Conceptual data flow

I have referenced multiple schemas in diagrams to reinforce the message to segregate data for security and best practice. However, each chapter has a page budget. This chapter was proposed with 20 pages. I am far in excess, so I have developed everything using the SYSADMIN role, which is not recommended, and leave it for you to segregate into the proper roles. I trust you will forgive me.

What Is a Data Pipeline?

Data may be sourced from a variety of tools and delivered from data streams, low volume but high-frequency micro-batches, or high volume but less frequent data dumps. Data arrives continuously, with ever-increasing volumes, higher velocity, and from an expanding variety of sources. Our customers increasingly expect up-to-date data as freshness is critical and underpins decision-making. We also see a greater desire to hold historical data over longer periods for trend analysis. Traditional batch-based processing is being replaced with continuous feeds. Transactional boundaries become auto-ingestion, performance bottlenecks removed by serverless compute models.

We use the term data pipeline to describe a suite of related actions and operations to move data from one or more sources through a suite of transformation processes into a target destination for storage and subsequent consumption. Please also refer to the definition at https://en.wikipedia.org/wiki/Pipeline_(computing) .

In large organizations, we would typically land data in the AWS account S3 bucket, then ingest it into Snowflake using the external stage capability demonstrated in Chapter 7. Adopting an external stage approach enforces security options not available via other means. Another option utilizes named stages, noting any connections to upload files must be directly into Snowflake, a challenge we do not face when external stages are used, rendering named stages a far less attractive option. We would also prefer not to land data into table stages and certainly not into user stages. We want to make life a little easier for our operational support team. Our approach, therefore, focuses on external stages for the most part.

We have executed commands to create and merge data for loading and unloading files with discrete steps for each transition. We would not sit at our Snowflake console manually processing data in a real-world scenario. Instead, we would deploy components to automatically load and transform our data to publication, ready for consumption. Moving away from manual processing, Figure 8-2 illustrates data pipeline options and introduces new concepts for real-time data streaming and batch files.
Figure 8-2

Data pipeline options

Batch Files

Based on a fixed schedule , batch files are typically produced by end-of-day processing from transactional source systems. Transactional systems may perform more than one operation on data throughout the working day. A single output file rarely captures all the individual operations carried out throughout the day but presents the most recent data state at the extract point. In other words, batch files ingested into Snowflake typically represent either a single end-of-day cut or perhaps several intra-day incremental cuts of data.

Source systems that feed into Snowflake are often constrained by both technology and design; therefore cannot easily make changes to facilitate data interchange. In an ideal world, data would be delivered in real time or as near real time as possible. Next, let’s talk about some tooling.

Real-Time Stream

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. more information is at https://kafka.apache.org .

The Snowflake Connector for Kafka reads data from one or more Apache Kafka topics and loads the data into a Snowflake table. More information is at https://docs.snowflake.com/en/user-guide/kafka-connector.html .

Search for MSK – Amazon Managed Streaming for Apache Kafka from your AWS account. A fully managed, highly available, and secure Apache Kafka service. Amazon MSK makes it easy for you to migrate, build, and run real-time streaming applications on Apache Kafka.

ELT/ETL tools can also be used to implement real-time streams and batch files into Snowflake, to trial products from the user interface, as illustrated in Figure 8-3, in the top right click Partner Connect, where several Snowflake partners are presented.
Figure 8-3

Partner Connect

Space does not permit a walk-through of every data pipeline option, and our focus returns to batch files . As we embark on a journey to develop our first data pipeline using Snowpipe, I must first introduce some new concepts.

Snowpipe

Snowpipe implements event-triggered data ingestion using AWS account components and built-in Snowflake capabilities. A full walk-through for AWS is found at https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html .

This section relies upon the external stage built in Chapter 7. While Snowpipe can be configured to work with internal (named, table, and user) stages, the auto-ingest feature described next only works with external stages. We, therefore, focus on external stages for Snowpipe, and I leave the internal stage for you to investigate.

Note

Check the cost implications of using Snowpipe as the file ingestion and serverless compute charges soon mount up with many small files.

Figure 8-4 provides a Snowpipe overview showing internal and external stages. We use PUT from SnowSQL or file upload from the user interface into the internal stage but do not specify how data is landed in the S3 bucket for subsequent processing via the external stage.
Figure 8-4

Snowpipe overview

Snowflake Pipe object is for a single target table and optionally has a file format. Pipes execute using serverless compute, which is a Snowflake provisioned resource. More information at https://docs.snowflake.com/en/user-guide/admin-serverless-billing.html .

Inheriting the following code from Chapter 7, an IAM role/profile, storage integration, and a trust relationship are assumed to be established. We begin by re-creating our external stage.
USE ROLE      sysadmin;
USE DATABASE  TEST;
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA    public;
CREATE OR REPLACE STAGE TEST.public.test_stage
STORAGE_INTEGRATION = test_integration
DIRECTORY           = ( ENABLE = TRUE AUTO_REFRESH = TRUE )
ENCRYPTION          = ( TYPE = 'SNOWFLAKE_SSE' )
URL                 = 's3://btsdc-test-bucket/'
FILE_FORMAT         = TEST.public.test_pipe_format;
Next, create a test staging table.
CREATE OR REPLACE TABLE pipe_load COPY GRANTS
(
id       NUMBER,
content  VARCHAR(255)
);

REST API

With our staging table created and the test file uploaded to S3, let’s create a pipe using the REST API . We must manually refresh the pipe to load data. In this example, we reference our external stage though the code is easy to modify to load from an internal stage. Please refer to Chapter 7 for configuration and data load details.
CREATE OR REPLACE PIPE test_pipe AS
COPY INTO pipe_load FROM @TEST.public.test_stage
FILE_FORMAT = (TYPE = CSV);
And prove the pipe exists.
SHOW PIPES;
You should see results similar to those in Figure 8-5.
Figure 8-5

SHOW PIPES output (part)

So far, so good. You have learned how to create a PIPE object, but our test file has not been loaded. For example, you can try to query the data.
SELECT * FROM pipe_load;
We should not see any rows returned. We can check the pipe is running with this command.
SELECT system$pipe_status('TEST.public.test_pipe');
Where the output should be as in Figure 8-6.
Figure 8-6

Check pipe status

With our pipe running, create a pipe_test.txt test file that contains the following.
id,content
1000,ABC
1001,DEF
Using the AWS Management Console , upload the pipe_test.txt test file into our btsdc-test-bucket S3 bucket. Note that your bucket name will differ. Do not forget to remove all other files, then prove the file has been uploaded.
LIST @TEST.public.test_stage;
To invoke the pipe, we must refresh. Note the response in Figure 8-7.
ALTER PIPE test_pipe REFRESH;
Figure 8-7

ALTER PIPE response

After a refresh, our pipe may take a few seconds to process the test file. If a huge test file was created, it might take longer. The following checks the progress.
SELECT *
FROM TABLE(validate_pipe_load(PIPE_NAME=>'TEST.public.test_pipe', START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())));
We may see an error at this point for the run. The absence of data indicates all is well, but to check all loads most recent first, use this query.
SELECT *
FROM TABLE(information_schema.copy_history(TABLE_NAME=>'PIPE_LOAD', START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())))
ORDER BY last_load_time DESC;
The result set in Figure 8-8 shows an error deliberately induced to illustrate what may be seen. Note the result set is only partial; further information is available in the user interface.
Figure 8-8

Snowpipe copy history

Finally, execute the following to clean up .
REMOVE @TEST.public.test_stage;

AUTO_INGEST

Having proven our pipe works with manual invocation, we turn our attention to automating Snowpipe by setting the AUTO_INGEST option . Figure 8-9 introduces a new AWS component, SQS enabling event notifications for our S3 bucket when new files are available to load. Snowflake documentation is at https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html#option-1-creating-a-new-s3-event-notification-to-automate-snowpipe .
Figure 8-9

SQS integration

Set up Snowflake Objects

Create a test staging table for AUTO_INGEST.
CREATE OR REPLACE TABLE pipe_load_sqs COPY GRANTS
(
id       NUMBER,
content  VARCHAR(255)
);
Create a new pipe, test_pipe_sqs, with AUTO_INGEST = TRUE.
CREATE OR REPLACE PIPE test_pipe_sqs
AUTO_INGEST = TRUE
AS
COPY INTO pipe_load_sqs FROM @TEST.public.test_stage
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);

For example, redefining an existing pipe to change the file format retains the ARN; therefore, the SQS configuration does not need amending.

Check the pipe exists, taking note of the notification channel for the following configuration of AWS SQS.
SHOW PIPES;
SHOW PIPES LIKE 'test_pipe_sqs';
You should see the results in Figure 8-10.
Figure 8-10

Snowpipe notification channel

Prove that the pipe is running. Note the extended result set, which includes the notification channel.
SELECT system$pipe_status('TEST.public.test_pipe_sqs');

Configure AWS SQS

Please read the Snowflake documentation on configuring event notifications before commencing work on this section. There are some limitations, specifically, a single event notification for the entire S3 bucket. Documentation is at https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html#step-4-configure-event-notifications .

Each Snowpipe requires SQS configuring. The following information has been abstracted. The Snowflake documentation did not match the AWS fields listed when writing this book.

In the following walk-through, replace btsdc-test-bucket with your bucket name, remembering that S3 bucket names are global in scope.

Log in to the AWS Management Console.
  1. 1.

    Select S3 ➤ btsdc-test-bucket ➤ Properties

     
  2. 2.

    Scroll down to Event notifications ➤ Create event notification

     
  3. 3.

    General configuration ➤ Event name ➤ Snowpipe SQS

     
  4. 4.
    Ensure these options are checked.
    1. a.

      General configuration ➤ Event types ➤ All object create events

       
    2. b.

      General configuration ➤ Destination ➤ SQS queue

       
    3. c.

      General configuration ➤ Destination ➤ Enter SQS queue ARN

       
     
  5. 5.

    Populate the SQS queue with your pipe notification channel, which should look like this: “arn:aws:sqs:eu-west-2:291942177718:sf-snowpipe-AIDAUH6I4DO3MBH6RKJF6-pZC_wzWEcC6ATt5tvZpjyg”.

     
  6. 6.

    Save the changes.

     

AWS documentation is at https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html .

Test AUTO_INGEST

Our new pipe, test_pipe_sqs, should be running .
SELECT system$pipe_status('TEST.public.test_pipe_sqs');
With our pipe running, create a pipe_test_sqs.txt test file that contains the following.
id,content
1000,ABC
1001,DEF
Using the AWS Management Console , upload the pipe_test_sqs.txt test file into our btsdc-test-bucket S3 bucket, noting your bucket name will differ, not forgetting to remove all other files to prove the file can be seen via the external stage.
LIST @TEST.public.test_stage;
Note

Snowpipe typically runs within a minute of file receipt into S3.

Identical to the steps performed for our REST API Snowpipe walk-through, we can reuse the same SQL statements to check for progress and historical activity and ensure our test file has loaded into the target table .
SELECT *
FROM TABLE(validate_pipe_load(PIPE_NAME=>'TEST.public.test_pipe_sqs', START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())));
SELECT *
FROM TABLE(information_schema.copy_history(TABLE_NAME=>'PIPE_LOAD_SQS', START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())))
ORDER BY last_load_time DESC;
SELECT * FROM pipe_load_sqs;

Cleanup

Apart from cleanup scripts to remove our test code, we should also be mindful to periodically remove redundant files uploaded to external stages. Remember, each file contributes toward AWS account storage costs.
DROP PIPE test_pipe;
DROP PIPE test_pipe_sqs;
REMOVE @TEST.public.test_stage;

Further Considerations

Temporal Data Load

Snowpipe provides a simple, quick, and convenient way to rapidly ingest files into Snowflake, ideal for many micro-batch and continuous data ingestion use cases where temporal data management/SCD 2 is not required but noting Snowpipe does not remove consumed files.

This section is not prescriptive and offers an alternative way to ingest data into Snowflake, building upon the external stage created earlier. There are other ways to load data, and your use cases may significantly differ. I hope you find enough information of value to adapt to your own scenarios.

Recent changes to the stage syntax (for external stages only) optionally add a directory table to the stage. We previously included this line in our external stage definition, calling out the AUTO_REFRESH option , which manages the contents of the directory table. Note that directory content update is not immediate and may take a while to refresh after file upload into S3.
DIRECTORY           = ( ENABLE = TRUE AUTO_REFRESH = TRUE )

We extensively use the directory table features in Chapter 9. But, for now, we use the new capability in a limited manner to explain a temporal data load pattern where file receipt order determines ingestion.

Figure 8-11 depicts how Snowflake components can be configured to ingest data, noting the serial approach, which may be adequate for some use cases and is presented to show the “art of the possible.”
Figure 8-11

Temporal data load schematic

Streams

Streams can only be created on tables, external tables, and stage directory tables. Streams perform change data capture and are incredibly useful.

But there is a catch, and it is very subtle.

Suppose a stream is declared on a staging table, where we remove the contents in preparation for our next data load. In that case, the stream captures the events as DELETE operations, so the next run processes the deleted records in addition to the newly loaded data resulting in incorrect results. For this reason, we cannot use a stream when promoting data from the staging table (stg_content_test) through to the application table (scd1_content_test). But instead, we must use a stored procedure. If we give some thought to our design, we can create a generic stored procedure re-usable by many feeds, a single piece of code satisfying many feed ingestions, demonstrated next.

A stream can also generate an event since this fragment is later used to trigger a task. We use this feature to detect the presence of files loaded into S3.
WHEN system$stream_has_data ( '<your_stream_here>' )

According to Snowflake documentation, the directory table is only updated for new or updated files and not deleted files; see https://docs.snowflake.com/en/sql-reference/sql/create-stage.html#directory-table-parameters-directorytableparams .

During the preparation of this chapter, a time delay was observed between loading a file into S3 and the directory auto-refreshing, but the LIST command returned correct results immediately. Similar to Snowpipe, SQS can be configured to auto-refresh S3buckets. At the time of writing, this is a public preview feature. Instructions are at https://docs.snowflake.com/en/user-guide/data-load-dirtables-auto-s3.html .

To ensure that the S3 contents and directory table remain synchronized (or for the impatient), it may be necessary to force a refresh.
ALTER STAGE TEST.public.test_stage REFRESH;
Figure 8-12 shows the response when a deleted file has been removed from the directory table.
Figure 8-12

Directory refresh

Caution

Streams becomes stale when its offset is outside the data retention period for its source table; see https://docs.snowflake.com/en/user-guide/streams.html .

But we are getting ahead of ourselves and must focus on setting up our core structures and test cases without confusing matters, so ignore streams for this next section. We return to streams later.

Build Core Objects

First, create the content_test_20220115_130442.txt test file with the following data. The date_timestamp file suffix is explained later.
id,content,last_updated
1000,ABC,2022-01-15 13:04:42
1001,DEF,2022-01-15 13:04:42
1002,GHI,2022-01-15 13:04:42
Next, create a second test file named content_test_20220115_133505.txt. Populate that second file with the following data.
id,content,last_updated
1000,ABX,2022-01-15 13:35:05
1001,DEF,2022-01-15 13:04:42
1003,JKL,2022-01-15 13:35:05
The following differences now exist between the first file and the second.
1000 ABC -> ABX therefore last_updated has changed from original record
1001 No change
1002 Missing
1003 Is a new record
Upload both files into the S3 bucket, and then examine the content of our external stage where we should see both files.
LIST @TEST.public.test_stage;
Create tables to stage our files and then for our application.
CREATE OR REPLACE TABLE stg_content_test
(
id            NUMBER,
content       VARCHAR(30),
last_updated  TIMESTAMP_NTZ DEFAULT current_timestamp()::TIMESTAMP_NTZ NOT NULL
);
CREATE OR REPLACE TABLE scd1_content_test
(
id            NUMBER,
content       VARCHAR(30),
last_updated  TIMESTAMP_NTZ DEFAULT current_timestamp()::TIMESTAMP_NTZ NOT NULL
);
And create a stream on scd1_content_test.
CREATE OR REPLACE STREAM strm_scd1_content_test ON TABLE scd1_content_test;

Set up Test Cases

Before continuing, we must explain some assumptions . When ingesting temporal data, it is typical for submitted files to be suffixed with a date_timestamp. The usual convention for date_timestamp is YYYYMMDD_HH24MISS, making sorting easy for fairly obvious reasons. We might reasonably expect files to contain either every record from the source system (whole file submission) or only the changes made since the last file was submitted (delta file submission).

Our data pipeline must be configured according to the expected pattern. Here, we demonstrate the whole file submission pattern. In this example, we expect the id attribute to be unique in each file and never reused.

We have a lot of work to do, except for the end-state. We also add three more attributes: valid_from is the date_timestamp when the record was received into the system; valid_to is the date_timestamp when the record details changed, or a very long future date_timestamp defaulted to 99991231_235959; and current_flag is a Yes/No flag indicating the current record as a quick lookup.

Figure 8-13 shows our test files created side by side, along with the expected outcome from run 1, where we process our first file, content _test_20220115_130442.txt. Next, an Action column describes the expected outcome from run 2 where we process our second file, content _test_20220115_133505.txt.
Figure 8-13

Test file loads, actions, and outcomes

In case you are wondering, this is an example of test-driven development, where we set out our test conditions , actions, and expected results before we begin developing our code. I strongly recommend this approach to software development. Chapter 13 explains how test cases validate code changes before deployment.

Although Figure 8-13 may be daunting, the content explains how SCD2 works.
  • New records are those with an id attribute that does not exist in the most recently processed data set output. We expect an INSERT operation with valid_from date to the timestamp when data was received into the system, valid_to set to the default value, and current_flag set to Y.

  • Absent records indicate a logical DELETE where valid_to is set to the timestamp of when the file was processed, current_flag set to N.

  • Changed records result in a logical DELETE for the old record and an INSERT for the new record.

Build the Temporal Pipeline

Having established the test cases, we move on to write a stored procedure to both ingest and historize our data using the components developed so far and later introduce a task to implement the auto-ingestion process.

Tasks are the built-in Snowflake scheduling component . A full explanation of tasks is beyond the scope of this chapter. The page budget is under pressure through explaining in depth how components interact and are dependent, which is good preparation for later. Snowflake provides a far more detailed appraisal of tasks at https://docs.snowflake.com/en/user-guide/tasks-intro.html .

Our objectives are as follows.
  • Identify each file in the external stage ordered by file name and date_timestamp, representing the timestamp recorded when the file was generated. We cannot rely upon the date_timestamp of when the file landed in S3 as this might result in files being processed out of sequence.

  • Copy each file’s content into the corresponding staging table. Ordinarily, we would hard-code the target table and filter our view, ensuring a 1:1 correlation between source and target; however, doing so would be of little value. Instead, we apply some logic to make our stored procedure more generic but at the cost of introducing an assumption. The file name and target table name are correlated.

  • For each file content loaded into the staging table, determine the difference between the staging table content and the latest view of data in the application table, then merge the difference .

Let’s unpack the most recent assumption a little more. Our example has two files containing full data sets but time separated: content_test_20220115_130442.txt and content_test_20220115_133505.txt. If we remove the leading underscore and date_timestamp suffix, we are left with content_test and this name, with a little imagination, becomes the target table.

Because we need to detect changes between our staging data and our latest held data, we must introduce new objects to enable automation . Figure 8-14 shows the addition of two stored procedures.
Figure 8-14

Application objects

Working backward, let’s start with building our generic stored procedure (sp_stg_to_scd1) to ingest from the staging table to the application table. We need the stored procedure to embed it in the main ingestion stored procedure interfacing with the external stage.
CREATE OR REPLACE PROCEDURE sp_stg_to_scd1( P_SOURCE_DATABASE      STRING,
                                            P_SOURCE_TABLE         STRING,
                                            P_SOURCE_ATTRIBUTE     STRING,
                                            P_TARGET_TABLE         STRING,
                                            P_MATCH_ATTRIBUTE      STRING ) RETURNS STRING
LANGUAGE javascript
EXECUTE AS CALLER
AS
$$
   var sql_stmt     = "";
   var stmt         = "";
   var recset       = "";
   var result       = "";
   var update_cols  = "";
   var debug_string = '';
   sql_stmt  = "INSERT INTO " + P_TARGET_TABLE + " "
   sql_stmt += "SELECT * ";
   sql_stmt += "FROM   " + P_SOURCE_TABLE     + " ";
   sql_stmt += "WHERE  " + P_SOURCE_ATTRIBUTE + " IN ";
   sql_stmt += "       ( ";
   sql_stmt += "       SELECT " + P_SOURCE_ATTRIBUTE + " ";
   sql_stmt += "       FROM   " + P_SOURCE_TABLE     + " ";
   sql_stmt += "       MINUS ";
   sql_stmt += "       SELECT " + P_SOURCE_ATTRIBUTE + " ";
   sql_stmt += "       FROM   " + P_TARGET_TABLE     + " ";
   sql_stmt += "       ); ";
   stmt = snowflake.createStatement ({ sqlText:sql_stmt });
   debug_string += sql_stmt;
   try
   {
       recset = stmt.execute();
   }
   catch { result = sql_stmt; }
   sql_stmt  = "SELECT column_name "
   sql_stmt += "FROM   " + P_SOURCE_DATABASE + ".information_schema.columns "
   sql_stmt += "WHERE  table_name = :1 "
   sql_stmt += "AND    column_name NOT IN ( :2, :3 ) "
   sql_stmt += "ORDER BY ordinal_position ASC; "
   stmt = snowflake.createStatement ({ sqlText:sql_stmt, binds:[ P_TARGET_TABLE, P_SOURCE_ATTRIBUTE, P_MATCH_ATTRIBUTE ] });
   sql_stmt      = sql_stmt.replace(":1", "'" + P_TARGET_TABLE     + "'");
   sql_stmt      = sql_stmt.replace(":2", "'" + P_SOURCE_ATTRIBUTE + "'");
   sql_stmt      = sql_stmt.replace(":3", "'" + P_MATCH_ATTRIBUTE  + "'");
   debug_string += sql_stmt;
   try
   {
       recset = stmt.execute();
       while(recset.next())
       {
          update_cols += "tgt." + recset.getColumnValue(1) + " = stg." + recset.getColumnValue(1) + ",        "
       }
       update_cols = update_cols.substring(0, update_cols.length -9)
   }
   catch { result = sql_stmt; }
   sql_stmt  = "UPDATE " + P_TARGET_TABLE + " tgt "
   sql_stmt += "SET    " + update_cols    + ", ";
   sql_stmt += "       tgt." + P_MATCH_ATTRIBUTE + " = stg." + P_MATCH_ATTRIBUTE +" ";
   sql_stmt += "FROM   " + P_SOURCE_TABLE + " stg ";
   sql_stmt += "WHERE  tgt." + P_SOURCE_ATTRIBUTE + "  = stg." + P_SOURCE_ATTRIBUTE +" ";
   sql_stmt += "AND    tgt." + P_MATCH_ATTRIBUTE  + " != stg." + P_MATCH_ATTRIBUTE +"; ";
   stmt = snowflake.createStatement ({ sqlText:sql_stmt });
   debug_string += sql_stmt;
   try
   {
       recset = stmt.execute();
   }
   catch { result = sql_stmt; }
   sql_stmt  = "DELETE FROM " + P_TARGET_TABLE     + " "
   sql_stmt += "WHERE  " + P_SOURCE_ATTRIBUTE + " NOT IN ";
   sql_stmt += "       ( ";
   sql_stmt += "       SELECT " + P_SOURCE_ATTRIBUTE + " ";
   sql_stmt += "       FROM   " + P_SOURCE_TABLE     + " ";
   sql_stmt += "       ); ";
   stmt = snowflake.createStatement ({ sqlText:sql_stmt });
   debug_string += sql_stmt;
   try
   {
       recset = stmt.execute();
   }
   catch { result = sql_stmt; }
   return debug_string;
//   return result;
$$;

Note the return value is the full list of generated SQL statements to assist with your debugging. Each SQL statement can be called independently.

Our test code provides a template to build our next stored procedure later in this chapter, but for now, we must prove sp_stg_to_scd1 works as expected, and the next code section provides the steps.

Load our content_test_20220115_130442.txt and content_test_20220115_133505.txt test files into S3, then load using the COPY command and stored procedure call and TRUNCATE to clear our staging table in between.
COPY INTO stg_content_test
FROM @TEST.public.test_stage/content_test_20220115_130442.txt
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
PURGE       = TRUE;
Load first staged file into application table.
CALL sp_stg_to_scd1('TEST', 'STG_CONTENT_TEST', 'ID', 'SCD1_CONTENT_TEST', 'LAST_UPDATED');
Clear out the staging table in preparation for the second file load.
TRUNCATE TABLE stg_content_test;
Move the second staged file into a staging table.
COPY INTO stg_content_test
FROM @TEST.public.test_stage/content_test_20220115_133505.txt
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
PURGE       = TRUE;
Merge second staged file into application table.
CALL sp_stg_to_scd1('TEST', 'STG_CONTENT_TEST', 'ID', 'SCD1_CONTENT_TEST', 'LAST_UPDATED');
Clear out the staging table in preparation for subsequent file load.
TRUNCATE TABLE stg_content_test;
Prove that the data is correct in table scd1_content_test, noting this is only an interim step. Additional processing has yet to be developed before we see the expected outcome shown in our test case.
SELECT * FROM scd1_content_test ORDER BY id ASC;
After both files have been run according to the instructions, the scd1_content_test table contains the current data view where only the latest records are available. For the curious, this is the Slowly Changing Dimension 1 pattern (SCD1); there is no history, only the latest view of data. Of note is the LAST_UPDATED attribute showing ID 1001 has the earliest date. Figure 8-15 shows the output from table scd1_content_test.
Figure 8-15

Application results : scd1_content_test table

Now that we have our staging to application stored procedure, we can build our main ingestion stored procedure interfacing with the external stage, then call sp_stg_to_scd1 to promote data, re-introduce our stream, which correctly captures deltas, and historize our data into SCD2 pattern.

Remember, our test files need uploading because the COPY command has PURGE = TRUE , meaning the files are removed from the STAGE after loading into the staging table.

The next step is to create our scd2_content_test table, where our historization will land. Note the decision attribute to assist in debugging and understanding; it is not essential for historization.
CREATE OR REPLACE TABLE scd2_content_test
(
id            NUMBER,
content       VARCHAR(30),
valid_from    TIMESTAMP_NTZ,
valid_to      TIMESTAMP_NTZ,
current_flag  VARCHAR(1),
decision      VARCHAR(100)
);
Next, let’s implement a view joining both historization table scd2_content_test and the stream.
CREATE OR REPLACE VIEW v_content_test
AS
SELECT decision,
       id,
       content,
       valid_from,
       valid_to,
       current_flag,
       'I' AS dml_type
FROM   (
       SELECT 'New Record - Insert or Existing Record - Ignore' AS decision,
              id,
              content,
              last_updated        AS valid_from,
              LAG ( last_updated ) OVER ( PARTITION BY id ORDER BY last_updated DESC ) AS valid_to_raw,
              CASE
                 WHEN valid_to_raw IS NULL
                    THEN '9999-12-31'::TIMESTAMP_NTZ
                    ELSE valid_to_raw
              END AS valid_to,
              CASE
                 WHEN valid_to_raw IS NULL
                    THEN 'Y'
                    ELSE 'N'
              END AS current_flag,
              'I' AS dml_type
       FROM   (
              SELECT strm.id,
                     strm.content,
                     strm.last_updated
              FROM   strm_scd1_content_test  strm
              WHERE  strm.metadata$action   = 'INSERT'
              AND    strm.metadata$isupdate = 'FALSE'
              )
       )
UNION ALL
SELECT decision,
       id,
       content,
       valid_from,
       valid_to,
       current_flag,
       dml_type
FROM   (
       SELECT decision,
              id,
              content,
              valid_from,
              LAG ( valid_from ) OVER ( PARTITION BY id ORDER BY valid_from DESC ) AS valid_to_raw,
              valid_to,
              current_flag,
              dml_type
       FROM   (
              SELECT 'Existing Record - Insert' AS decision,
                     strm.id,
                     strm.content,
                     strm.last_updated            AS valid_from,
                     '9999-12-31'::TIMESTAMP_NTZ  AS valid_to,
                     'Y'                          AS current_flag,
                     'I' AS dml_type
              FROM   strm_scd1_content_test strm
              WHERE  strm.metadata$action   = 'INSERT'
              AND    strm.metadata$isupdate = 'TRUE'
              UNION ALL
              SELECT 'Existing Record - Delete',
                     tgt.id,
                     tgt.content,
                     tgt.valid_from,
                     current_timestamp(),
                     'N',
                     'D' AS dml_type
              FROM   scd2_content_test tgt
              WHERE  tgt.id IN
                     (
                     SELECT DISTINCT strm.id
                     FROM   strm_scd1_content_test strm
                     WHERE  strm.metadata$action   = 'INSERT'
                     AND    strm.metadata$isupdate = 'TRUE'
                     )
              AND    tgt.current_flag = 'Y'
              )
       )
UNION ALL
SELECT 'Missing Record - Delete',
       strm.id,
       strm.content,
       tgt.valid_from,
       current_timestamp()::TIMESTAMP_NTZ AS valid_to,
       NULL,
       'D' AS dml_type
FROM   scd2_content_test          tgt
INNER JOIN strm_scd1_content_test strm
   ON  tgt.id   = strm.id
WHERE  strm.metadata$action   = 'DELETE'
AND    strm.metadata$isupdate = 'FALSE'
AND    tgt.current_flag       = 'Y';

Test the Temporal Pipeline

As with all successful testing, we must ensure our baseline is known, and the cleanest way is to start from the top . Do not re-create storage integration or the trust relationship breaks; re-creating the stage is fine.
CREATE OR REPLACE STAGE TEST.public.test_stage
STORAGE_INTEGRATION = test_integration
DIRECTORY           = ( ENABLE = TRUE AUTO_REFRESH = TRUE )
ENCRYPTION          = ( TYPE = 'SNOWFLAKE_SSE' )
URL                 = 's3://btsdc-test-bucket/'
FILE_FORMAT         = TEST.public.test_pipe_format;
CREATE OR REPLACE STREAM strm_test_stage ON STAGE TEST.public.test_stage;
Upload files into S3 then refresh the stage directory.
ALTER STAGE TEST.public.test_stage REFRESH;
Clear out tables.
TRUNCATE TABLE stg_content_test;
TRUNCATE TABLE scd1_content_test;
CREATE OR REPLACE STREAM strm_scd1_content_test ON TABLE scd1_content_test;
TRUNCATE TABLE scd2_content_test;
Copy the first test file into the staging table.
COPY INTO stg_content_test
FROM @TEST.public.test_stage/content_test_20220115_130442.txt
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
PURGE       = TRUE;
Promote to SCD1 table.
CALL sp_stg_to_scd1('TEST', 'STG_CONTENT_TEST', 'ID', 'SCD1_CONTENT_TEST', 'LAST_UPDATED');
Clear out staging table ready for next run.
TRUNCATE TABLE stg_content_test;
Merge SCD1 table into SCD2 table using v_content_test contents.
MERGE INTO scd2_content_test tgt
USING v_content_test strm
ON    tgt.id         = strm.id
AND   tgt.valid_from = strm.valid_from
AND   tgt.content    = strm.content
WHEN MATCHED AND strm.dml_type = 'U' THEN
UPDATE SET tgt.valid_to        = strm.valid_to,
           tgt.current_flag    = 'N',
           tgt.decision        = strm.decision
WHEN MATCHED AND strm.dml_type = 'D' THEN
UPDATE SET tgt.valid_to        = strm.valid_to,
           tgt.current_flag    = 'N',
           tgt.decision        = strm.decision
WHEN NOT MATCHED AND strm.dml_type = 'I' THEN
INSERT
(
tgt.id,
tgt.content,
tgt.valid_from,
tgt.valid_to,
tgt.current_flag,
tgt.decision
) VALUES (
strm.id,
strm.content,
current_timestamp(),
strm.valid_to,
strm.current_flag,
strm.decision
);
Copy the second test file into the staging table.
COPY INTO stg_content_test
FROM @TEST.public.test_stage/content_test_20220115_133505.txt
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
PURGE       = TRUE;
Promote to the SCD1 table.
CALL sp_stg_to_scd1('TEST', 'STG_CONTENT_TEST', 'ID', 'SCD1_CONTENT_TEST', 'LAST_UPDATED');
Clear out the staging table to be ready for the next run.
TRUNCATE TABLE stg_content_test;
Merge SCD1 table into SCD2 table using v_content_test contents.
MERGE INTO scd2_content_test tgt
USING v_content_test strm
ON    tgt.id         = strm.id
AND   tgt.valid_from = strm.valid_from
AND   tgt.content    = strm.content
WHEN MATCHED AND strm.dml_type = 'U' THEN
UPDATE SET tgt.valid_to        = strm.valid_to,
           tgt.current_flag    = 'N',
           tgt.decision        = strm.decision
WHEN MATCHED AND strm.dml_type = 'D' THEN
UPDATE SET tgt.valid_to        = strm.valid_to,
           tgt.current_flag    = 'N',
           tgt.decision        = strm.decision
WHEN NOT MATCHED AND strm.dml_type = 'I' THEN
INSERT
(
tgt.id,
tgt.content,
tgt.valid_from,
tgt.valid_to,
tgt.current_flag,
tgt.decision
) VALUES (
strm.id,
strm.content,
current_timestamp(),
strm.valid_to,
strm.current_flag,
strm.decision
);
Now check the SCD2 table contents match the expected results.
SELECT * FROM scd2_content_test ORDER BY id ASC;

That’s a lot of code and testing. I have added information into v_content_test and scd2_content_test, which I hope you find useful in explaining how the delta records are derived. Please also see the accompanying script with additional queries to assist in debugging.

But we are not finished yet. We still have to build our wrapper stored procedure to knit everything together and automate using a task.

Automate the Temporal Pipeline

The wrapper stored procedure that brings everything together follows. I leave it for you to review and understand as this chapter is far too long already. Note the hard-coding in to fix the database name and match column. This is intentionally left for you to figure out a way of dynamically generating, but I would add parameters to sp_load_test_data and then replace them in the CALL statement. Note the parameters MUST be in UPPERCASE when referenced. Likewise, the MERGE statement attributes need further consideration where code generation and lookup tables are both valid approaches.
CREATE OR REPLACE PROCEDURE sp_load_test_data() RETURNS STRING
LANGUAGE javascript
EXECUTE AS CALLER
AS
$$
   var sql_stmt  = "";
   var stmt      = "";
   var recset    = "";
   var result    = "";
   var debug_string    = '';
   var path_to_file    = "";
   var table_name      = "";
   sql_stmt  = "SELECT path_to_file, "
   sql_stmt += "       table_name "
   sql_stmt += "FROM   v_strm_test_stage "
   sql_stmt += "WHERE  metadata$action = 'INSERT' "
   sql_stmt += "ORDER BY path_to_file ASC; ";
   stmt = snowflake.createStatement ({ sqlText:sql_stmt });
   debug_string = sql_stmt;
   try
   {
       recset = stmt.execute();
       while(recset.next())
       {
           path_to_file    = recset.getColumnValue(1);
           table_name      = recset.getColumnValue(2);
           sql_stmt  = "COPY INTO stg_" + table_name + " "
           sql_stmt += "FROM " + path_to_file +" "
           sql_stmt += "FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1) "
           sql_stmt += "PURGE       = TRUE; ";
           debug_string = debug_string + sql_stmt;
           stmt = snowflake.createStatement ({ sqlText:sql_stmt });
           try
           {
              stmt.execute();
              result = "Success";
           }
           catch { result = sql_stmt; }
           sql_stmt  = "CALL sp_stg_to_scd1('TEST', 'STG_" + table_name + "', 'ID', 'SCD1_" + table_name + "', 'LAST_UPDATED'); ";
           debug_string = debug_string + sql_stmt;
           stmt = snowflake.createStatement ({ sqlText:sql_stmt });
           try
           {
              stmt.execute();
              result = "Success";
           }
           catch { result = sql_stmt; }
           sql_stmt  = "MERGE INTO scd2_" + table_name + " tgt "
           sql_stmt += "USING v_" + table_name + " strm "
           sql_stmt += "ON    tgt.id         = strm.id "
           sql_stmt += "AND   tgt.valid_from = strm.valid_from "
           sql_stmt += "AND   tgt.content    = strm.content "
           sql_stmt += "WHEN MATCHED AND strm.dml_type = 'U' THEN "
           sql_stmt += "UPDATE SET tgt.valid_to        = strm.valid_to, "
           sql_stmt += "           tgt.current_flag    = 'N', "
           sql_stmt += "           tgt.decision        = strm.decision "
           sql_stmt += "WHEN MATCHED AND strm.dml_type = 'D' THEN "
           sql_stmt += "UPDATE SET tgt.valid_to        = strm.valid_to, "
           sql_stmt += "           tgt.current_flag    = 'N', "
           sql_stmt += "           tgt.decision        = strm.decision "
           sql_stmt += "WHEN NOT MATCHED AND strm.dml_type = 'I' THEN "
           sql_stmt += "INSERT "
           sql_stmt += "( "
           sql_stmt += "tgt.id, "
           sql_stmt += "tgt.content, "
           sql_stmt += "tgt.valid_from, "
           sql_stmt += "tgt.valid_to, "
           sql_stmt += "tgt.current_flag, "
           sql_stmt += "tgt.decision "
           sql_stmt += ") VALUES ( "
           sql_stmt += "strm.id, "
           sql_stmt += "strm.content, "
           sql_stmt += "current_timestamp(), "
           sql_stmt += "strm.valid_to, "
           sql_stmt += "strm.current_flag, "
           sql_stmt += "strm.decision "
           sql_stmt += "); ";
           debug_string = debug_string + sql_stmt;
           stmt = snowflake.createStatement ({ sqlText:sql_stmt });
           try
           {
              stmt.execute();
              result = "Success";
           }
           catch { result = sql_stmt; }
           sql_stmt  = "TRUNCATE TABLE stg_" + table_name + "; ";
           debug_string = debug_string + sql_stmt;
           stmt = snowflake.createStatement ({ sqlText:sql_stmt });
           try
           {
              stmt.execute();
              result = "Success";
           }
           catch { result = sql_stmt; }
       }
   }
   catch { result = sql_stmt; }
   return debug_string;
//   return result;
$$;
The v_strm_test_stage stage directory view also needs further consideration because the path_to_file is hard-coded.
CREATE OR REPLACE VIEW v_strm_test_stage COPY GRANTS
AS
SELECT '@TEST.public.test_stage/'||relative_path                              AS path_to_file,
       SUBSTR ( relative_path, 1, REGEXP_INSTR ( relative_path, '_20' ) - 1 ) AS table_name,
       size,
       last_modified,
       metadata$action
FROM   strm_test_stage;
After repeating our test setup, run sp_load_test_data.
CALL sp_load_test_data();
Note

Repeatedly loading the same test file results in the SCD2 table containing multiple records. This is due to the content not changing. It is expected, if unwelcome, behavior.

Finally, we create the task to call the wrapper stored procedure, noting the use of system$stream_has_data, which detects the presence of new data in our stage directory table. If no new files exist, execution is ignored, and the task does not run.
CREATE OR REPLACE TASK task_load_test_data
WAREHOUSE = COMPUTE_WH
SCHEDULE  = '1 minute'
WHEN system$stream_has_data ( 'strm_test_stage' )
AS
CALL sp_load_test_data();

Before we can execute tasks, our role must be entitled, and we must use the ACCOUNTADMIN role.

We normally create a separate role to execute tasks, but space does not permit full production configuration, so more thought should be given to your setup. For expediency, we GRANT TO SYSADMIN, but this does not represent best practice.
USE ROLE accountadmin;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE sysadmin;
Switch back to SYSADMIN .
USE ROLE sysadmin;
On creation and by default, tasks are suspended to enable.
ALTER TASK task_load_test_data RESUME;
To see which tasks have been declared.
SHOW tasks;
And to check when a task was last run.
SELECT timestampdiff ( second, current_timestamp, scheduled_time ) as next_run,
       scheduled_time,
       current_timestamp,
       name,
       state
FROM   TABLE ( information_schema.task_history())
WHERE  state = 'SCHEDULED'
ORDER BY completed_time DESC;
Finally, to suspend a task.
ALTER TASK task_load_test_data SUSPEND;
Note

Tasks can quickly consume credits, ensuring only essential tasks are left running. Also consider adding resource monitors.

Cleanup

By now, you know how to clean up most objects, but to drop a JavaScript stored procedure, the declared prototype must match.
DROP PROCEDURE sp_stg_to_app( VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR );

External Tables

If working through this chapter sequentially, ensure the task is suspended as this interferes with this external table section as we reuse one of the test files.
ALTER TASK task_load_test_data SUSPEND;

We might also consider external tables as an ingestion pattern. They are easy to configure and use. Documentation is at https://docs.snowflake.com/en/sql-reference/sql/create-external-table.html .

After suspending our task, upload files to S3, refresh, and check contents are visible.
ALTER STAGE TEST.public.test_stage REFRESH;
LIST @TEST.public.test_stage;
SELECT $1 FROM @TEST.public.test_stage/content_test_20220115_130442.txt;
Reusing our external stage and the first test file, the syntax for mapping a file to an external table is as follows.
CREATE OR REPLACE EXTERNAL TABLE ext_content_test_20220115_130442
(
id           VARCHAR AS (value:c1::varchar),
content      VARCHAR AS (value:c2::varchar),
last_updated VARCHAR AS (value:c3::varchar)
)
WITH LOCATION = @TEST.public.test_stage/
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
PATTERN     = content_test_20220115_130442.txt;

Parquet, Avro, and ORC file formats metadata can be inferred. The documentation is at https://docs.snowflake.com/en/sql-reference/functions/infer_schema.html .

In line with Snowpipe auto-ingest, external tables can be refreshed using SQS. Documentation is at https://docs.snowflake.com/en/user-guide/tables-external-s3.html . Otherwise, refresh the external table.
ALTER EXTERNAL TABLE ext_content_test_20220115_130442 REFRESH;
To see which external tables are declared.
SHOW EXTERNAL TABLES;
Display content and associated file information.
SELECT $1, metadata$filename FROM @TEST.public.test_stage/;
Finally, the following extracts the content from our external table.
SELECT * FROM ext_content_test_20220115_130442;

This simple example can easily be extended to several files of the same format.

I prefer not to use external tables due to additional management overhead in declaring and maintaining, but I offer this example as another ingestion pattern.

Cleanup

Apart from cleanup scripts to remove out test code, we should also be mindful to periodically remove redundant files uploaded to internal stages remembering each file contributes toward storage costs.
USE ROLE      sysadmin;
USE DATABASE  TEST;
USE SCHEMA    public;
DROP TASK     task_load_test_data;
DROP DATABASE test;
USE ROLE     accountadmin;
DROP STORAGE INTEGRATION test_integration;
REVOKE EXECUTE TASK ON ACCOUNT FROM ROLE sysadmin;

Summary

We began this chapter by explaining data pipelines and illustrating the difference between batch and real-time data streams . We then dived into Snowpipe, Snowflake’s out-of-the-box data ingestion tool showcasing both modes of operation: manual invocation via REST API and automated invocation via AUTO_INGEST.

Recognizing Snowpipe does not process files in receipt order, we investigated an alternative approach. Temporal data load guarantees files are loaded in order. Introducing streams, tasks, and stored procedures by providing a worked example with the expectation the code samples provide a platform for experimentation and enhancement. Note logging and correct homing of components are left for you to develop. The diagrams show a correct physical separation of components in a target state for production deployment rather than the code walk-through, which is delivered using the SYSADMIN role for expediency (and page count).

Personally, this has been the hardest chapter to write by far. Technically complex, demanding, and over the page budget. But I want to give you a decent starting point. I hope you agree.

We are halfway there and now focus on extending data pipelines through data validation, enrichment, and presentation.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.195.111