© Dmitry Anoshin, Dmitry Shirokov, Donna Strok 2020
D. Anoshin et al.Jumpstart Snowflakehttps://doi.org/10.1007/978-1-4842-5328-1_6

6. Continuous Data Loading with Snowpipe

Dmitry Anoshin1 , Dmitry Shirokov2 and Donna Strok3
(1)
British Columbia, Canada
(2)
Burnaby, BC, Canada
(3)
Seattle, WA, USA
 

“You and I are streaming data engines.”

— Jeff Hawkins, in an interview by Knowstuff1 from 2012

If you’re a data analyst or data scientist or you’re on an executive team, you know the value of access to continuous and timely data at any given time. You want to know that whenever you’re querying data, transforming it, or accessing it in any way that the data represents the most up-to-date information available to use for data analysis.

If you have stale data, you might make inaccurate conclusions or have skewed statistics that will lead to misinformed strategic decisions that can affect your company down the line. Access to continuous data is a beneficial thing for anyone, regardless of role.

Nowadays, we know that data is generated much faster than it ever used to be before. In the past, corporate data would be updated infrequently, either daily or weekly or even monthly, and added to your data warehouse. Data accumulates over time, which leads to it becoming more and more challenging to process.

Now we have app data, mobile data, and data sensors that generate this constant flow of useful analytical data, but it can really be a challenge to get it into a data warehouse because it’s being generated so quickly. Multitudes of tiny files are being generated, and that can definitely lead to problems.

Let’s look at the traditional way of dealing with loading data into a data warehouse. Figure 6-1 shows data that’s being generated continuously, loaded into a staging environment like S3, and then batched daily or hourly into your database.
../images/482416_1_En_6_Chapter/482416_1_En_6_Fig1_HTML.jpg
Figure 6-1

Classical approach to loading into a data warehouse

Unfortunately, this methodology allows for loading data only daily or hourly or even half-hourly. It does not provide fast access to the data that was generated. Users are often requesting the ability to analyze our data as quickly as it’s coming in to make important decisions based on the results being generated.

If you decide to implement a continuously loading system, you’re probably aware of is COPY command, which was designed for batch-loading scenarios. After accumulating data over some time, such as hours or days, you can then launch a COPY command to load data into your target table in Snowflake.

Note

The COPY command is mainly a SQL command for loading files into a Snowflake table. The command supports different options and file formats. Please see the Snowflake documentation.2

As a work around for near real-time task, you may leverage a micro-batching approach by using COPY command. It then takes a couple of minutes to use a COPY statement on a schedule to load it. However, it is still not a fully continuous load, because fresh data that arrives and ready for loading into data warehouse won’t be triggered itself. Usually, humans or a scheduler drives it.

If you have data that’s being generated continuously, you might think that it’d be great if there were an easily affordable, lightweight way to get your data up-to-date in Snowflake. Luckily, Snowflake agrees with you and created a service called Snowpipe. Snowpipe is an autoscaling Snowflake cloud service that provides continuously loaded data into the Snowflake data warehouse from internal and external stages.

With a continuous loading approach like Snowpipe, you have a data-driven way for new data to arrive from Snowflake to your target table.

Table 6-1 describes the data warehouse loading approaches.
Table 6-1

Data Warehouse Loading Approaches

Approach

Definition

Snowpipe Options

Batch

Data accumulates over time (daily, hourly) and is then loaded periodically.

Point at an S3 bucket and a destination table in your warehouse where new data is automatically uploaded.

Microbatch

Data accumulates over small time windows (minutes) and then is loaded.

A technical resource can interface directly using a REST API3 along with Java and Python SDKs to enable highly customized loading use cases.

Continuously (near real time)

Every data item is loaded individually as it arrives in near real time.

Also available is a way to integrate to Apache Kafka4 using a Kafka connector.5

With Snowpipe you have two options. The first option is to use Snowpipe as a bucket AWS S3, where you define event notifications on your S3 bucket and then have these event notifications sent to Snowflake as soon as new files land in the S3 bucket. Those files are automatically picked up by Snowpipe and loaded into your target tables.

The second option is to build your own integration with Snowpipe using a REST API. You can create your own applications that will call the Snowpipe loader according to your criteria. In Table 6-2 you can find a summary of the critical benefits of using Snowpipe’s service.
Table 6-2

Key Snowpipe Benefits

Benefits

Description

Continuous loading, immediate insight.

Continuously generated data is available for analysis in seconds.

Avoid repeated manual COPY commands. High level of availability for building custom integration.

No manual effort is required for loading data into Snowflake.

Automated loading with no need for manual COPY commands.

Using a REST API and SDK, you can build your own data pipeline system.

Full support for semistructured data on load.

Availability of many industry-standard formats such as XML, JSON, Parquet, ORC, and Avro.

No transformation is needed to load varying data types, and there’s no trade-off between flexibility and performance.

You pay only for the compute time you use to load data.

The “pay only for what you use” pricing model means idle time is not charged for.

Snowflake’s built-for-the-cloud solution scales storage separately from compute, up and down, transparently, and automatically.

This requires a full understanding of the cost of loading data. There is a separate expense item for “loading data” in your Snowflake bill.

This has a serverless billing model via utilization-based billing.

Zero management.

No indexing, tuning, partitioning, or vacuuming on load.

Serverless.

Serverless loading without contention.

No servers to manage and no impact to other workloads

thanks to unlimited concurrency.

Loading Data Continuously

Let’s take a closer look at some options for loading data.
  • Snowpipe Auto-Ingest

  • Snowpipe REST API using AWS Lambda

Snowpipe Auto-Ingest

Snowpipe Auto-Ingest is a fully automatic mode that loads data from the block store into the target table. The speed and ease of configuration provided by using DDL allows any data engineer or even analysts to configure their automatic continuous data loading process in minutes.

Caution

The option auto_ingest is not available unless it is explicitly enabled on your Snowflake account. Please contact Snowflake Support6 for the enable options in your Snowflake account.

Figure 6-2 shows the main components of how this integration works.

The data source provides continuous data feeds into services like AWS Kinesis,7 AWS Managed Streaming for Kafka (MSK),8 and Hosted Apache Kafka.9 You can use them to stage your files into an external stage (e.g., S3 bucket) as soon as files arrive in the bucket. S3 sends a notification via an SQS queue10 to Snowpipe, and as soon as that notification about a new file in the queue is received, Snowpipe runs a serverless loader application that loads the files from S3 into the target tables behind the scenes.
../images/482416_1_En_6_Chapter/482416_1_En_6_Fig2_HTML.jpg
Figure 6-2

Snowflake continuous data loading approach using Snowpipe with Auto-Ingest

Building a Data Pipeline Using the Snowpipe Auto-Ingest Option

To build an example of a continuously loaded data pipeline, we need the following components:
  • Stream Producer is a sample producer for Kinesis Data Firehose. For simplicity, in this case, instead of a stream producer based on the Lambda service, we just can use Firehose Test Generator, which is available to us when we are creating a Firehose stream.

  • Kinesis Data Firehose as stream delivery service.

  • S3 bucket as an external Snowflake stage.

  • The following Snowflake services:
    1. a.

      Snowpipe

       
    2. b.

      Snowflake data warehouse

       
    3. c.

      Snowflake console

       
Figure 6-3 shows an overview of the component interaction.
../images/482416_1_En_6_Chapter/482416_1_En_6_Fig3_HTML.jpg
Figure 6-3

Snowpipe data loading using Auto-Ingest mode

To understand how internal integration actually takes place, we need to dive a little bit into the internal structure of Snowpipe. Figure 6-4 shows the main steps of integration.
../images/482416_1_En_6_Chapter/482416_1_En_6_Fig4_HTML.jpg
Figure 6-4

Snowpipe data loading using Auto-Ingest mode, Snowpipe detail view

First, we have to create an external stage and a pipe using the auto_ingest option. When we execute the DDL, we have to get a unique identifier for an internal queue service (with AWS, it is based on SQS) that is already linked to the Snowpipe serverless loader.

Second, we must create a new S3 bucket and configure an S3 bucket event notification that has to send notification events into Snowpipe SNS. The Snowpipe loader gets events about the new file into an S3 bucket and queues pipe statements that contain specific COPY commands. Snowflake computes services fully and automatically scales when executing DDL statements from the pipe queue. The last step is to create and configure a stream that produces intensively a lot of events.

Caution

You cannot control transaction boundaries for load with Snowpipe.

Exercise 1. Building a Data Pipeline Using the Snowpipe Auto-Ingest Option

In this exercise, we will build the pipeline shown in Figure 6-3. Specifically, the following instructions show the process of creating a continuous data pipeline for Snowflake using Snowpipe:
  1. 1.

    Log into your Snowflake account and choose Worksheet.

     
  2. 2.

    Create Snowflake external stages based on an S3 bucket.

    Replace <your_AWS_KEY_ID> with your AWS credentials, and replace <your_s3_bucket> with your S3 bucket URL.

    Run the DDL statements on the worksheet, as shown in Listing 6-1.

     
#create a new database for testing snowpipe
create database snowpipe data_retention_time_in_days = 1;
show databases like 'snow%';
# create a new external stage
create or replace stage snowpipe.public.snowstage
url='S3://<your_s3_bucket>'
credentials=(
AWS_KEY_ID='<your_AWS_KEY_ID>',
AWS_SECRET_KEY='<your_AWS_SEKRET_KEY>');
# create target table for Snowpipe
create or replace table snowpipe.public.snowtable(
    jsontext variant
);
# create a new pipe
create or replace pipe snowpipe.public.snowpipe
    auto_ingest=true as
            copy into snowpipe.public.snowtable
            from @snowpipe.public.snowstage
            file_format = (type = 'JSON');
Listing 6-1

Creating External Stages

NoteVariant is universal semistructured data type of Snowflake for loading data in formats such as JSON,11 Avro,12 ORC,13 Parquet,14 or XML.15 For more information, you can refer to the references given.16

In the first part of Listing 6-1, we create a new external stage17 called snowpipe.public.snowstage based on an S3 bucket, and we are providing the URL S3 bucket and the credentials.18 Additionally, you can set encryption options.19

The next step is to define a target table called snowpipe.public.snowtable for the data that we want to load continuously. The table takes a variant column as input for the JSON data.

The last part of the script is a definition of a new pipe called snowpipe.public.snowpipe. You can see the pipe is set to auto_ingest=true, which means that we are using notifications from S3 into SQS to notify Snowflake about newly arrived data that is ready to load. Also, you can see that the pipe wraps a familiar COPY statement that defines the transformations and the data loading operations that we want to perform on the data as it becomes available.

  1. 3.
    Check the correctness of the configuration using the following commands. Using show statements, you can see the status of any pipes and stages.
    # check exists pipes and stages
    show pipes;
    show stages;
     
  2. 4.

    Copy the SQS ARN link from the NotificationChannel field .

     
  3. 5.
    Using a simple select statement, we can check the count of loaded data.
    # check count of rows in target table
    select count(*) from snowpipe.public.snowtable
     
  4. 6.

    Log into your AWS account.

     
  5. 7.
    Create an AWS S3 bucket called snowpipebucket, as shown in Figure 6-5.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig5_HTML.jpg
    Figure 6-5

    Creating a new bucket for stream events

     
  1. 8.
    Set notification events for S3 for Snowpipe using the path S3 ➤ snowpipebucket ➤ Properties ➤ Advanced settings ➤ Events, as shown in Figure 6-6.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig6_HTML.jpg
    Figure 6-6

    Setting S3 bucket notifications via SQS

     
  1. 9.

    Create a new Kinesis Data Firehose stream using the path Amazon Kinesis ➤ Data Firehose ➤ Create Delivery Stream.

    You can see what that looks like in Figure 6-7.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig7_HTML.jpg
    Figure 6-7

    Creating a new Kinesis Firehose delivery stream

     
  1. 10.
    Set the source to a direct PUT command, as shown in Figure 6-8.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig8_HTML.jpg
    Figure 6-8

    Set the type of source as a direct PUT statement

     
  1. 11.
    Choose a destination for your S3 bucket, as shown in Figure 6-9.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig9_HTML.jpg
    Figure 6-9

    Configuration of Firehose, setting up S3 bucket as destination

     
  1. 12.
    Enable logging using the CloudWatch service, as shown in Figure 6-10.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig10_HTML.jpg
    Figure 6-10

    Enabling CloudWatch logging

     
  1. 13.

    Create an IAM role with a policy, as follows:

    ...
            {
                "Sid": "",
                "Effect": "Allow",
                "Action": [
                    "s3:AbortMultipartUpload",
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::snowpipebucket",
                    "arn:aws:s3:::snowpipebucket/*",
                ]
            },
    ...
     
  2. 14.
    Run the testing stream, as shown in Figure 6-11.
    ../images/482416_1_En_6_Chapter/482416_1_En_6_Fig11_HTML.jpg
    Figure 6-11

    Testing

     
  1. 15.

    Check the file in the S3 bucket.

     
  2. 16.
    Check the count of loaded data.
    # check count of rows in target table
    select count(*) from snowpipe.public.snowtable
     

Snowpipe REST API Using AWS Lambda

If the Auto-Ingest option is not available to your account for some reason, you will need a flexible way to integrate with other services so that you can still implement your code through the Snowpipe REST API.

Figure 6-12 shows how to build a pipeline with a custom app using the REST API.
../images/482416_1_En_6_Chapter/482416_1_En_6_Fig12_HTML.jpg
Figure 6-12

Snowpipe data loading using Auto-Ingest mode

Figure 6-12 shows the second option. On the left side, you can see your application. This can be an actual application if you are running one on a virtual machine or a Docker container, but it also can be code that you are running on AWS Lambda. Your Lambda function or application then takes care of placing the load files in the S3 bucket as soon as the file is persisted there.

Snowpipe then adds these files to a queue behind the REST API endpoint. You will invoke the REST API, and that will invoke the Snowpipe loader service, which works off of that queue to load the data into the target tables that you have defined. For step-by-step instructions to do this, you can refer to the official documentation.20

Summary

In this chapter, we covered Snowpipe features that allow you to continuously build a data pipeline. In addition, you learned about billing and considered several basic options for using the features. Finally, you built data pipelines based on Snowpipe integrations.

In the next chapter, we will discuss Snowflake administration and cover the primary Snowflake objects in more detail.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.154.64