“You and I are streaming data engines.”
— Jeff Hawkins, in an interview by Knowstuff1 from 2012
If you’re a data analyst or data scientist or you’re on an executive team, you know the value of access to continuous and timely data at any given time. You want to know that whenever you’re querying data, transforming it, or accessing it in any way that the data represents the most up-to-date information available to use for data analysis.
If you have stale data, you might make inaccurate conclusions or have skewed statistics that will lead to misinformed strategic decisions that can affect your company down the line. Access to continuous data is a beneficial thing for anyone, regardless of role.
Nowadays, we know that data is generated much faster than it ever used to be before. In the past, corporate data would be updated infrequently, either daily or weekly or even monthly, and added to your data warehouse. Data accumulates over time, which leads to it becoming more and more challenging to process.
Now we have app data, mobile data, and data sensors that generate this constant flow of useful analytical data, but it can really be a challenge to get it into a data warehouse because it’s being generated so quickly. Multitudes of tiny files are being generated, and that can definitely lead to problems.
Unfortunately, this methodology allows for loading data only daily or hourly or even half-hourly. It does not provide fast access to the data that was generated. Users are often requesting the ability to analyze our data as quickly as it’s coming in to make important decisions based on the results being generated.
If you decide to implement a continuously loading system, you’re probably aware of is COPY command, which was designed for batch-loading scenarios. After accumulating data over some time, such as hours or days, you can then launch a COPY command to load data into your target table in Snowflake.
Note
The COPY command is mainly a SQL command for loading files into a Snowflake table. The command supports different options and file formats. Please see the Snowflake documentation.2
As a work around for near real-time task, you may leverage a micro-batching approach by using COPY command. It then takes a couple of minutes to use a COPY statement on a schedule to load it. However, it is still not a fully continuous load, because fresh data that arrives and ready for loading into data warehouse won’t be triggered itself. Usually, humans or a scheduler drives it.
If you have data that’s being generated continuously, you might think that it’d be great if there were an easily affordable, lightweight way to get your data up-to-date in Snowflake. Luckily, Snowflake agrees with you and created a service called Snowpipe. Snowpipe is an autoscaling Snowflake cloud service that provides continuously loaded data into the Snowflake data warehouse from internal and external stages.
With a continuous loading approach like Snowpipe, you have a data-driven way for new data to arrive from Snowflake to your target table.
Data Warehouse Loading Approaches
Approach | Definition | Snowpipe Options |
---|---|---|
Batch | Data accumulates over time (daily, hourly) and is then loaded periodically. | Point at an S3 bucket and a destination table in your warehouse where new data is automatically uploaded. |
Microbatch | Data accumulates over small time windows (minutes) and then is loaded. | A technical resource can interface directly using a REST API3 along with Java and Python SDKs to enable highly customized loading use cases. |
Continuously (near real time) | Every data item is loaded individually as it arrives in near real time. | Also available is a way to integrate to Apache Kafka4 using a Kafka connector.5 |
With Snowpipe you have two options. The first option is to use Snowpipe as a bucket AWS S3, where you define event notifications on your S3 bucket and then have these event notifications sent to Snowflake as soon as new files land in the S3 bucket. Those files are automatically picked up by Snowpipe and loaded into your target tables.
Key Snowpipe Benefits
Benefits | Description |
---|---|
Continuous loading, immediate insight. | Continuously generated data is available for analysis in seconds. |
Avoid repeated manual COPY commands. High level of availability for building custom integration. | No manual effort is required for loading data into Snowflake. Automated loading with no need for manual COPY commands. Using a REST API and SDK, you can build your own data pipeline system. |
Full support for semistructured data on load. | Availability of many industry-standard formats such as XML, JSON, Parquet, ORC, and Avro. No transformation is needed to load varying data types, and there’s no trade-off between flexibility and performance. |
You pay only for the compute time you use to load data. | The “pay only for what you use” pricing model means idle time is not charged for. Snowflake’s built-for-the-cloud solution scales storage separately from compute, up and down, transparently, and automatically. This requires a full understanding of the cost of loading data. There is a separate expense item for “loading data” in your Snowflake bill. This has a serverless billing model via utilization-based billing. |
Zero management. | No indexing, tuning, partitioning, or vacuuming on load. |
Serverless. | Serverless loading without contention. No servers to manage and no impact to other workloads thanks to unlimited concurrency. |
Loading Data Continuously
Snowpipe Auto-Ingest
Snowpipe REST API using AWS Lambda
Snowpipe Auto-Ingest
Snowpipe Auto-Ingest is a fully automatic mode that loads data from the block store into the target table. The speed and ease of configuration provided by using DDL allows any data engineer or even analysts to configure their automatic continuous data loading process in minutes.
Caution
The option auto_ingest is not available unless it is explicitly enabled on your Snowflake account. Please contact Snowflake Support6 for the enable options in your Snowflake account.
Figure 6-2 shows the main components of how this integration works.
Building a Data Pipeline Using the Snowpipe Auto-Ingest Option
Stream Producer is a sample producer for Kinesis Data Firehose. For simplicity, in this case, instead of a stream producer based on the Lambda service, we just can use Firehose Test Generator, which is available to us when we are creating a Firehose stream.
Kinesis Data Firehose as stream delivery service.
S3 bucket as an external Snowflake stage.
- The following Snowflake services:
- a.
Snowpipe
- b.
Snowflake data warehouse
- c.
Snowflake console
- a.
First, we have to create an external stage and a pipe using the auto_ingest option. When we execute the DDL, we have to get a unique identifier for an internal queue service (with AWS, it is based on SQS) that is already linked to the Snowpipe serverless loader.
Second, we must create a new S3 bucket and configure an S3 bucket event notification that has to send notification events into Snowpipe SNS. The Snowpipe loader gets events about the new file into an S3 bucket and queues pipe statements that contain specific COPY commands. Snowflake computes services fully and automatically scales when executing DDL statements from the pipe queue. The last step is to create and configure a stream that produces intensively a lot of events.
Caution
You cannot control transaction boundaries for load with Snowpipe.
Exercise 1. Building a Data Pipeline Using the Snowpipe Auto-Ingest Option
- 1.
Log into your Snowflake account and choose Worksheet.
- 2.
Create Snowflake external stages based on an S3 bucket.
Replace <your_AWS_KEY_ID> with your AWS credentials, and replace <your_s3_bucket> with your S3 bucket URL.
Run the DDL statements on the worksheet, as shown in Listing 6-1.
Creating External Stages
Note Variant is universal semistructured data type of Snowflake for loading data in formats such as JSON,11 Avro,12 ORC,13 Parquet,14 or XML.15 For more information, you can refer to the references given.16
In the first part of Listing 6-1, we create a new external stage17 called snowpipe.public.snowstage based on an S3 bucket, and we are providing the URL S3 bucket and the credentials.18 Additionally, you can set encryption options.19
The next step is to define a target table called snowpipe.public.snowtable for the data that we want to load continuously. The table takes a variant column as input for the JSON data.
The last part of the script is a definition of a new pipe called snowpipe.public.snowpipe. You can see the pipe is set to auto_ingest=true, which means that we are using notifications from S3 into SQS to notify Snowflake about newly arrived data that is ready to load. Also, you can see that the pipe wraps a familiar COPY statement that defines the transformations and the data loading operations that we want to perform on the data as it becomes available.
- 3.Check the correctness of the configuration using the following commands. Using show statements, you can see the status of any pipes and stages.# check exists pipes and stagesshow pipes;show stages;
- 4.
Copy the SQS ARN link from the NotificationChannel field .
- 5.Using a simple select statement, we can check the count of loaded data.# check count of rows in target tableselect count(*) from snowpipe.public.snowtable
- 6.
Log into your AWS account.
- 7.Create an AWS S3 bucket called snowpipebucket, as shown in Figure 6-5.
- 8.Set notification events for S3 for Snowpipe using the path S3 ➤ snowpipebucket ➤ Properties ➤ Advanced settings ➤ Events, as shown in Figure 6-6.
- 9.
Create a new Kinesis Data Firehose stream using the path Amazon Kinesis ➤ Data Firehose ➤ Create Delivery Stream.
You can see what that looks like in Figure 6-7.
- 10.Set the source to a direct PUT command, as shown in Figure 6-8.
- 11.Choose a destination for your S3 bucket, as shown in Figure 6-9.
- 12.Enable logging using the CloudWatch service, as shown in Figure 6-10.
- 13.
Create an IAM role with a policy, as follows:
...{"Sid": "","Effect": "Allow","Action": ["s3:AbortMultipartUpload","s3:GetBucketLocation","s3:GetObject","s3:ListBucket","s3:ListBucketMultipartUploads","s3:PutObject"],"Resource": ["arn:aws:s3:::snowpipebucket","arn:aws:s3:::snowpipebucket/*",]},... - 14.Run the testing stream, as shown in Figure 6-11.
- 15.
Check the file in the S3 bucket.
- 16.Check the count of loaded data.# check count of rows in target tableselect count(*) from snowpipe.public.snowtable
Snowpipe REST API Using AWS Lambda
If the Auto-Ingest option is not available to your account for some reason, you will need a flexible way to integrate with other services so that you can still implement your code through the Snowpipe REST API.
Figure 6-12 shows the second option. On the left side, you can see your application. This can be an actual application if you are running one on a virtual machine or a Docker container, but it also can be code that you are running on AWS Lambda. Your Lambda function or application then takes care of placing the load files in the S3 bucket as soon as the file is persisted there.
Snowpipe then adds these files to a queue behind the REST API endpoint. You will invoke the REST API, and that will invoke the Snowpipe loader service, which works off of that queue to load the data into the target tables that you have defined. For step-by-step instructions to do this, you can refer to the official documentation.20
Summary
In this chapter, we covered Snowpipe features that allow you to continuously build a data pipeline. In addition, you learned about billing and considered several basic options for using the features. Finally, you built data pipelines based on Snowpipe integrations.
In the next chapter, we will discuss Snowflake administration and cover the primary Snowflake objects in more detail.