© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2022
A. CarruthersBuilding the Snowflake Data Cloudhttps://doi.org/10.1007/978-1-4842-8593-0_10

10. Semi-Structured and Unstructured Data

Andrew Carruthers1  
(1)
Birmingham, UK
 

This chapter is the one I have been looking forward to the most. Although all the chapters have been very enjoyable to write, setting the right tone and content is technically challenging. This one is set to stretch our boundaries the most.

Note

While writing this chapter, some differences were observed between the Snowflake documentation and AWS Management Console layout; hence some sections have textual descriptions.

What is semi-structured and unstructured data?

Semi-structured data is typically a document format where all the relationships are described in the document. In other words, a single document containing semi-structured data is self-contained, self-describing, and not reliant upon external references to be complete. While the content is usually human-readable, we often use external tools to display the content in an easily understood format. Don’t worry. All will become clear when we dive into one of the semi-structured formats (JSON).

Unstructured data is all around us. We rarely think of our daily interaction with media in general as unstructured data, but it is. We use unstructured data whenever we watch TV, read a book, or open an email. Learning how to interact with the emerging—and for some of us, the very exciting—realm of unstructured data is where the “art of the possible” has become a reality. This chapter demonstrates how to load images into Snowflake, allowing programmatic content interrogation. I can’t wait to get into this part. I never thought I would get excited over invoices.

Figure 10-1, used with the kind permission of Snowflake Inc., illustrates the three layers of data management available in Snowflake. I assume structured data is well known and understood by most readers; therefore, I do not dwell on what is considered common knowledge.
Figure 10-1

Evolution of data management

There are plenty of books available on structured data. Instead, we start with semi-structured data, JSON, and other familiar formats. I hope to offer fresh insights, tools, and techniques to address content before moving on to unstructured data , where the various data formats bring their own challenges. I offer ways to extract the content into Snowflake, taking advantage of the latest (at the time of writing) features and providing a framework for extending unstructured support according to your needs.

As Saurin Shah, a Snowflake product manager, says, “It doesn’t matter how we store the data. It matters how data is stored and consumed.” Integrating semi-structured and unstructured data into Snowflake has significant potential to unlock and transform value for our organizations, particularly when enriching with our relational data. Imagine the possibilities that become a reality if we can readily access content locked up inside our documents, PDFs, and images of various formats and then add real-time streaming data.

Semi-Structured Data

This chapter focuses on JSON. Snowflake supplies the tutorial at https://docs.snowflake.com/en/user-guide/json-basics-tutorial.html . Snowflake also supports other semi-structured file formats, including Avro, ORC, Parquet, and XML to a lesser degree. Returning to JSON, you may find the tool at https://jsonformatter.org useful.

AWS allows a single SQS queue on the S3 bucket. Using AWS Management Console , create a new S3 bucket, btsdc-json-bucket, copying settings from the existing btsdc-test-bucket.

Begin by creating a new storage integration named json_integration.
USE ROLE     accountadmin;
USE DATABASE TEST;
USE SCHEMA   public;
CREATE OR REPLACE STORAGE INTEGRATION json_integration
TYPE                      = EXTERNAL_STAGE
STORAGE_PROVIDER          = S3
ENABLED                   = TRUE
STORAGE_AWS_ROLE_ARN      = 'arn:aws:iam::616701129608:role/test_role'
STORAGE_ALLOWED_LOCATIONS = ( 's3://btsdc-json-bucket/' );
USE ROLE securityadmin;
GRANT USAGE ON INTEGRATION json_integration TO ROLE sysadmin;
DESC INTEGRATION json_integration;
Make a note of your values as they will differ. The following are mine.
  • STORAGE_AWS_IAM_USER_ARN: arn:aws:iam::321333230101:user/vnki-s-ukst5070

  • STORAGE_AWS_EXTERNAL_ID: GH06274_SFCRole=3_Vf/LKebJXaqzvO+unFpnT5OzqGM=

In AWS Management Console , we must edit our role trust relationship to add the preceding STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID values. Navigate to IAM ➤ Roles. In the drop-down list, select S3, and then “Allow S3 to call AWS services on your behalf,” and click Next.

Click Edit policy and modify your JSON as follows.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::321333230101:user/vnki-s-ukst5070"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "GH06274_SFCRole=3_Vf/LKebJXaqzvO+unFpnT5OzqGM="
        }
      }
    }
  ]
}
Update test_policy to include a new bucket.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObjectVersion",
                "s3:DeleteObject",
                "s3:GetObjectVersion"
            ],
            "Resource": [ "arn:aws:s3:::btsdc-test-bucket/*",
                          "arn:aws:s3:::btsdc-ingest-bucket/*",
                          "arn:aws:s3:::btsdc-json-bucket/*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::*"
        }
    ]
}
When we upload files into our S3 buckets, we should be able to select contents.
 SELECT $1 FROM @TEST.public.json_stage/json_test.json;
Having resolved AWS security dependencies for our new S3 bucket and storage integration, we can rely upon Snowflake structures built in previous chapters; therefore, assume familiarity and set our context accordingly.
USE ROLE      sysadmin;
USE DATABASE  TEST;
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA    public;
To automate, we create a new stage to address two parameters: DIRECTORY and ENCRYPTION. These were previously introduced but not explained.
CREATE OR REPLACE STAGE TEST.public.json_stage
STORAGE_INTEGRATION = json_integration
DIRECTORY           = ( ENABLE = TRUE AUTO_REFRESH = TRUE )
ENCRYPTION          = ( TYPE = 'SNOWFLAKE_SSE' )
URL                 = 's3://btsdc-json-bucket/'
FILE_FORMAT         = TEST.public.test_pipe_format;

ENCRYPTION addresses where in the data upload process our files are encrypted, documentation notes we should specify server-side encryption (SNOWFLAKE_SSE) for unstructured data files; see https://docs.snowflake.com/en/sql-reference/sql/create-stage.html#internal-stage-parameters-internalstageparams .

DIRECTORY has several parameters, all documented at https://docs.snowflake.com/en/sql-reference/sql/create-stage.html#directory-table-parameters-directorytableparams . At the time of writing, the Snowflake documentation does not explain how AUTO_REFRESH is enabled, this document explains how SQS should be configured; see https://docs.snowflake.com/en/user-guide/data-load-dirtables-auto-s3.html#step-2-configure-event-notifications . First, we must identify the ARN of the SQS queue for the directory table in the directory_notification_channel field.
DESC STAGE TEST.public.json_stage;
SELECT "property_value"
FROM   TABLE ( RESULT_SCAN ( last_query_id()))
WHERE  "property" = 'DIRECTORY_NOTIFICATION_CHANNEL';
In our example. your results will differ. an example response is.
 arn:aws:sqs:eu-west-2:321333230101:sf-snowpipe-AIDAUVUHT7IKQ7JAXV7BV-NcSl4ctKOyISNW9XBbvhOQ
Switching to AWS Management Console , navigate to S3 ➤ btsdc-json-bucket ➤ Properties ➤ Event notification. Create an S3_json_directory_update event notification. Check the event notifications, as shown in Figure 10-2.
Figure 10-2

Setting event notification

At the foot of the page, select the SQS queue, and enter the SQS queue ARN as shown in Figure 10-3.
Figure 10-3

Setting SQS queue

Then save the changes.

If SQS is not configured, we may need to manually refresh our stage directory.
ALTER STAGE TEST.public.json_stage REFRESH;

File Format

In previous chapters, we encountered the FILE FORMAT object , but we did not pay much attention to them. We used simple declarations to skip the first record, since this is typically a header record used to convey attribute names that already exist in our staging tables.

But there are many other uses for FILE FORMAT objects, and some relate to semi-structured data where we might use a FILE FORMAT to interact with a stage, including setting the TYPE to JSON, as this example illustrates.
CREATE OR REPLACE FILE FORMAT TEST.public.test_json_format
TYPE = JSON;

When writing this book, XML is only supported as a Public Preview feature.

Other options include setting the compression type. While Snowflake automatically detects several compression types by default, some are not yet natively supported; see https://docs.snowflake.com/en/user-guide/intro-summary-loading.html#compression-of-staged-files . This example shows how to manually set the compression type.
CREATE OR REPLACE FILE FORMAT TEST.public.test_json_format_brotli
TYPE        = JSON
COMPRESSION = Brotli;

Please also refer to Snowflake documentation at https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html#create-file-format .

The order of precedence for a FILE FORMAT application varies according to where FILE FORMAT is declared. Please refer to https://docs.snowflake.com/en/user-guide/data-load-prepare.html#overriding-default-file-format-options .

JSON

JSON is short-hand for JavaScript Object Notation, a human-readable text format for storing and transporting information. JSON content is self-describing and, due to the nested nature of relationships in parent-child form, easy to understand.

This JSON fragment expands upon the same data set in Chapter 9 used in the object tagging discussion.
{
   "employee":[
   {
      "firstName": "John","lastName": "Doe",
      "gender": "Male",
      "socialSecurityNumber": "12345678",
      "dateOfBirth": "12-Oct-2001",
      "address": {
          "streetAddress": "43 Kingston Drive","city": "Doncaster","state": "South Yorkshire",
          "postalCode": "DN9 4BS"
      },
      "phoneNumbers": [
        { "type": "home", "number": "9012345678" } ]
   },
   {
      "firstName": "Jane",
      "lastName": "Jones",
      "gender": "Female",
      "socialSecurityNumber": "23456789",
      "dateOfBirth": "23-Jan-2005",
      "address": {
          "streetAddress": "22 St James Drive","city": "Leeds",
          "state": "West Yorkshire","postalCode": "DN9 4BX"
       },
       "phoneNumbers": [{ "type": "home", "number": "8901234567" },{ "type": "work", "number": "7890123456" }]
   }]
}

Immediately we can read the JSON structure and recognize meaningful information, but what is less clear (this example has deliberately been structured to illustrate the point) is the built-in structured nature of JSON. To view in a more readable format, please navigate to https://jsonformatter.org , then type or cut and paste the preceding into the browser from the accompanying file, where a cleaner, easier-to-read version is presented in the right-hand pane.

Please experiment with changing the content and then “validate” your changes. Note the use of curly braces ({ }) that hold objects resulting in key/value pairs and square brackets ([ ]) that hold lists and arrays. These concepts are important to grasp for later use in materialized views explained next.

Variant Data Type

Now that you have a basic grasp of JSON , let’s prepare a file containing the preceding or reformatted content from https://jsonformatter.org and load it into S3. For this demonstration and using your favorite text editor, please create json_test.json and upload it into S3.

Another way to view JSON , assuming your browser opens JSON files, is to associate the file type with your browser when attempting to open json_test.json. You should see a stylized representation of the file content and tabs offering different views. In Firefox, these look like Figure 10-4. Note the partial screenshot shown.
Figure 10-4

Firefox JSON sample display

With our sample file json_test.json uploaded to S3, we can see the json_test.json file contents.

SELECT $1 FROM @TEST.public.json_stage/json_test.json;

For troubleshooting external stage issues, please see Chapter 7, where a step-by-step configuration guide is available.

For our semi-structured data , we use the VARIANT data type. There are others, including OBJECT and ARRAY. All are described at https://docs.snowflake.com/en/sql-reference/data-types-semistructured.html#semi-structured-data-types .

Note

VARIANT is limited to 16 MB in size.

Declare our staging table to ingest the json_test.json file.
CREATE OR REPLACE TABLE stg_json_test
(
json_test   VARIANT
);
Now load json_test.json into stg_json_test using FILE FORMAT declared earlier.
COPY INTO stg_json_test FROM @TEST.public.json_stage/json_test.json
FILE_FORMAT = test_json_format;
Confirm data has loaded as expected.
SELECT * FROM stg_json_test;
We should see our JSON data set as shown in Figure 10-5. when you click the record, the pop-up window displays nicely formatted JSON, as you saw in the right-hand pane at https://jsonformatter.org .
Figure 10-5

Firefox JSON sample display

Handling Large Files

What if our JSON file exceeds the maximum variant size of 16 MB? What options do we have to load larger files?

Depending upon how our JSON is constructed and remembering earlier comments regarding square brackets, which hold lists and arrays. One option is to strip the outer array by redefining the file format, truncate the staging table, and then reload. This follows Snowflake’s recommendation at https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html#semi-structured-data-size-limitations .
CREATE OR REPLACE FILE FORMAT TEST.public.test_json_format
TYPE              = JSON
STRIP_OUTER_ARRAY = TRUE
TRUNCATE TABLE stg_json_test;
COPY INTO stg_json_test FROM @TEST.public.json_stage/json_test.json
FILE_FORMAT = test_json_format;

But this approach does not work for JSON without an outer array to strip. We need to consider other options.

While researching this chapter, I came across a technique worthy of consideration. I have not tested or endorsed the next approach. I simply offer it as a novel way forward; see https://medium.com/snowflake/breaking-the-16-mb-limit-kinda-fe8d36658b .

Other approaches to splitting large JSON files include utilizing an ELT tool, which may offer built-in capabilities, or developing a custom process using a third-party tool to pre-process JSON into manageable chunks, as jq purports to do; see https://stedolan.github.io/jq/ . Also, consider using an external table to access your JSON file and pre-processing data before loading it into Snowflake.

Regardless of the approach chosen, remember the S3 bucket where your JSON file lands using SQS could invoke a Lambda function to pre-process, then copy to the S3 bucket where Snowpipe or other auto-ingestion processing occurs. Unfortunately, a fuller investigation is beyond the scope of this book.

Materialized Views

Having successfully landed our test file into our staging table, we must consider how to access the JSON content . At this point, we now discuss the merits of directly accessing JSON content and creating materialized views that overlay the JSON structure.

We aim to normalize the JSON structure into a relational format to facilitate joining with our structured data content. We may choose to implement a suite of three normal forms of materialized views which would extend the pattern explained here, which stops at a single denormalized materialized view.

If we consider the structure of our sample JSON document and attempt to directly access elements, we soon identify reasons to consider adopting a more sophisticated approach. Referencing individual elements in an array does not provide a scalable delivery pattern.
SELECT json_test:employee[0].firstName::STRING,
       json_test:employee[1].firstName::STRING
FROM   stg_json_test;
To explain notation used in the code sample: The canonical reference to attribute tables then attribute:top level array[n].leaf node::datatype. Note the difference in the field delimiters. Figure 10-6 shows the output. As we expect, both attributes are in the same record.
Figure 10-6

Firefox JSON sample display

We need tools to make JSON content accessible by turning the semi-structured format into a more familiar relational format. In other words, extract each nested section embedded in the JSON document into its constituent structured component, ready for consumption. We now walk through how to do this using Snowflake functions as mechanisms to turn an array into a nested result set.
SELECT e.value:firstName::STRING
FROM   stg_json_test,
       LATERAL FLATTEN ( json_test:employee ) e;
Figure 10-7 shows the effect of using both LATERAL and FLATTEN . We can extend the attribute list using the same pattern to all attributes at the same level in the JSON record, which we do next.
Figure 10-7

LATERAL and FLATTEN output

By referencing nested object attributes we can flatten the address object too.
SELECT e.value:firstName::STRING,
       e.value:lastName::STRING,
       e.value:gender::STRING,
       e.value:socialSecurityNumber::STRING,
       e.value:dateOfBirth::STRING,
       e.value:address.streetAddress::STRING,
       e.value:address.city::STRING,
       e.value:address.state::STRING,
       e.value:address.postalCode::STRING,
       p.value:type::STRING,
       p.value:number::STRING
FROM   stg_json_test,
       LATERAL FLATTEN ( json_test:employee )                  e,
       LATERAL FLATTEN ( e.value:phoneNumbers, OUTER => TRUE ) p;
Note

Nested records in square brackets should be lateral flattened.

With the preceding query, we can create a view and/or a materialized view.
CREATE OR REPLACE MATERIALIZED VIEW mv_stg_json_test
AS
SELECT e.value:firstName::STRING              AS first_name,
       e.value:lastName::STRING               AS last_name,
       e.value:gender::STRING                 AS gender,
       e.value:socialSecurityNumber::STRING   AS social_security_number,
       e.value:dateOfBirth::STRING            AS date_of_birth,
       e.value:address.streetAddress::STRING  AS street_address,
       e.value:address.city::STRING           AS city,
       e.value:address.state::STRING          AS state,
       e.value:address.postalCode::STRING     AS post_code,
       p.value:type::STRING                   AS phone_type,
       p.value:number::STRING                 AS phone_number
FROM   stg_json_test,
       LATERAL FLATTEN ( json_test:employee )                  e,
       LATERAL FLATTEN ( e.value:phoneNumbers, OUTER => TRUE ) p;
The following identifies materialized views in the current schema.
SHOW MATERIALIZED VIEWS;
The following demonstrates there are records in the new materialized view.
SELECT * FROM mv_stg_json_test;

Direct Query or Materialized View

Now we have built code to perform both direct query access against our staged data and a materialized view. We should consider the relative merits of both approaches.

With direct query access, we can easily address new elements as they appear in the source JSON document when they arrive but at the cost of explicit attribute referencing. As you can see, the syntax is somewhat convoluted, labor intensive to implement, does not itself to integration with other Snowflake objects and JSON does not support a data type any data, dates, timestamps, and numbers are treated as strings. The advantages of direct query access are no additional disk storage charges and end-user access to the raw JSON.

The materialized view approach requires up-front configuration. It is relatively inflexible in the event of changes to the underlying JSON document structure and incurs additional storage costs. The advantages materialized views offer include.
  • Abstraction from the JSON pathing by renaming attributes into more user-friendly conventions

  • Improved performance when accessing deeply buried information in the JSON

  • Cleaner syntax when integrating with other Snowflake objects and the possibility of micro-partition pruning improves performance due to both micro-partition and cluster key definition

  • Materialized views used for summarizations and aggregations and subject to RBAC

  • Conversion from JSON date and timestamp strings into their corresponding Snowflake data type provides an opportunity for performant temporal queries.

  • Materialized views can be used for query rewrite even if not referenced in the SQL

The general use cases for direct query access are when results change often, results are not used often, and queries are not resource-intensive. The general use cases for materialized views are when results don’t change often, results are often used, and queries are resource intensive.

In summary, our preferred approach is to retain our raw JSON and build materialized views on top, providing the best of both approaches but at the cost of additional (cheap) storage for the materialized view. Note that materialized views can also be used on external tables.

MV Limitations

While materialized views offer significant benefits over direct query access, there are some limitations: One or more materialized views can be declared but only against a single table. For the best performance, the cluster key should differ from the underlying table. The number of cluster columns should be kept low, with the cluster key defined as lowest to highest cardinality. Temporal functions such as current_timestamp() cannot be used in materialized view declaration. For replicated materialized views, only the definition is carried across to the secondary, not the results, to maintain data.
SET AUTO_REFRESH_MATERIALIZED_VIEWS_ON_SECONDARY = TRUE;

Serverless compute is used for maintenance and therefore incurs both storage and compute costs for automatic management as the materialized view stores the results of the declaring query. Costs are a function of the amount of data that changes in each base table, the number of materialized views created on each table with compute costs calculated in 1-second increments .

Note

The Time Travel feature is not supported in materialized views.

Automation

In Chapter 8, we implemented Snowpipe , recognizing that materialized views by their nature are limited to the current version of data only. We may wish to historize ingested JSON documents.

We can now take advantage of the stage directory, which provides a much richer and more natural SQL interface than the LIST command.
LIST @TEST.public.json_stage;
SELECT * FROM DIRECTORY ( @TEST.public.json_stage );
We can build a stream on top of the stage directory for change data capture.
CREATE OR REPLACE STREAM strm_test_public_json_stage ON STAGE TEST.public.json_stage;
And query the stage.
SELECT * FROM strm_test_public_json_stage;

However, we can do more with a stage directory where Snowflake supplies several file functions documented at https://docs.snowflake.com/en/sql-reference/functions-file.html#file-functions .

Combining the file functions with stream attributes, we can build a view. Note that metadata$row_id is always blank and therefore omitted from view declaration.
CREATE OR REPLACE SECURE VIEW v_strm_test_public_json_stage
AS
SELECT '@TEST.public.json_stage'                         AS stage_name,
       get_stage_location    ( @TEST.public.json_stage ) AS stage_location,
       relative_path,
       get_absolute_path     ( @TEST.public.json_stage,
                             relative_path )             AS absolute_path,
       get_presigned_url     ( @TEST.public.json_stage,
                             relative_path )             AS presigned_url,
       build_scoped_file_url ( @TEST.public.json_stage,
                             relative_path )             AS scoped_file_url,
       build_stage_file_url  ( @TEST.public.json_stage,
                             relative_path )             AS stage_file_url,
       size,
       last_modified,
       md5,
       etag,
       file_url,
       metadata$action,
       metadata$isupdate
FROM   strm_test_public_json_stage;
We can now query the view noting the contents are empty because creation time was after directory refresh time; therefore, to test, we must reload files into S3 and then refresh.
SELECT * FROM v_strm_test_public_json_stage;
And finally, add a task to detect the presence of new data in our stream and trigger a stored procedure (not shown) to historize.
 CREATE OR REPLACE TASK task_load_json_data
WAREHOUSE = COMPUTE_WH
SCHEDULE  = '1 minute'
WHEN system$stream_has_data ( 'strm_test_public_json_stage' )
AS
CALL sp_load_json_data();

Note the task has not resumed because the stored procedure has not been declared; this is an exercise for you to complete.

Unstructured Data

With a little thought, we can leverage AWS to pre-process our documents in S3 and deliver content ready for ingestion in JSON format. Figure 10-8 outlines the steps required: (1) Files land in a new S3 bucket; (2) SQS detects file presence and calls (3) Lambda, which detects the file type and converts to JSON; (4) file is written to existing S3 bucket mapped via external stage.
Figure 10-8

Integrated approach

Note

If using AWS Free Tier, be aware that Textract usage limitations of 100 invocations before charges apply.

Once a JSON file has landed in S3, we can treat the content as semi-structured data and ingest it according to the preceding pattern.

If our file exceeds 16 MB, we may also pre-process it to split into multiple source files or strip the outer array according to need.

File Preparation

In AWS Management Console , navigate to S3 and create a new S3 bucket: btsdc-ingest-bucket. We require a second bucket to prevent circular writes when the Lambda function is triggered.

For testing purposes, we use a simple PNG file created from a screenshot of an invoice.

We will later upload the test files into btsdc-ingest-bucket. The Lambda function is triggered, and files are copied to btsdc-test-bucket.

Lambda

We now focus on creating a Lambda function configured using our AWS account.

Note

This is test code and test configuration. It should not be relied on for production implementation.

In AWS Management Console , search for “lambda” and select it, as shown in Figure 10-9.
Figure 10-9

AWS Lambda

Click “Create function” and ensure “Author from scratch” is selected. Populate the “Basic information” fields, as shown in Figure 10-10.
Figure 10-10

AWS Lambda basic information

Click the “Create function ” button. You are presented with more configuration options, as shown in Figure 10-11. Note the Copy ARN, Add trigger and Upload from buttons.
Figure 10-11

AWS Lambda basic information

A basic Lambda function is available in the Code source ➤ lambda_function tab. Note that the Deploy button is grayed out. Whenever we change the Python code, the Deploy button is enabled.

Click “Add trigger” and populate S3. Select your ingestion bucket name, and the “All object create events” event type. Select the “Recursive invocation” checkbox and click Add. See Figure 10-12.
Figure 10-12

AWS Lambda basic information

If an error is received during your testing, it may be due to prior event registration. Navigate to S3 ➤ Properties ➤ Event notification and remove any events from file_to_json, and then retry.

Triggers can be viewed under the Configuration ➤ Triggers tab.

The execution role can be viewed under the Configuration ➤ Execution role tab, which needs amending. Click Edit and select the “Create a new role from AWS policy templates” option. Enter a name: S3-ingest-to-S3-stage. Then select “Amazon S3 object read-only permissions” from the drop-down list. The screen automatically returns to the Lambda summary screen.

This URL navigates directly to the page to edit; see https://console.aws.amazon.com/iamv2/home?#/roles/details/S3-ingest-to-S3-stage?section=permissions .

Alternatively, navigate to Lambda ➤ Functions ➤ file_to_json ➤ Configuration ➤ Execution role.

Note

Your organization’s security profile may restrict access, do not assume the following suggested policy meets requirements.

Click the policy name beginning with AWSLambdaS3ExecutionRole-nnnnnnn. Then navigate to the Edit policy ➤ JSON tab and paste the following. The permissions apply to all S3 buckets and those for Amazon Textract.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "textract:DetectDocumentText",
                "textract:AnalyzeDocument",
                "textract:StartDocumentTextDetection",
                "textract:StartDocumentAnalysis",
                "textract:GetDocumentTextDetection",
                "textract:GetDocumentAnalysis"
            ],
            "Resource": "*"
        }
    ]
}

Click “Review policy” and then the “Save changes” button, which returns to the Lambda function overview.

This code copies a file from the source S3 bucket into the destination bucket. Cut and paste it into the Code tab to test if the policy works correctly.

Note

destination_bucket_name must be changed as in AWS. S3 bucket names are global in scope across all AWS accounts.

The following code is intended to be cut and pasted, Python is indenting sensitive, and the code format incorrectly implies a line break after the “# Specify source bucket” and “# Write copy statement” statements.

Note

The AWS Lambda console implements alignment markers that must be adhered to; otherwise, an indentation error occurs.

import time
import json
import boto3
s3_client=boto3.client('s3')
# lambda function to copy file from one s3 bucket to another s3 bucket
def lambda_handler(event, context):
    # Specify source bucket
    source_bucket_name=event['Records'][0]['s3']['bucket']['name']
    # Get file that has been uploaded
    file_name=event['Records'][0]['s3']['object']['key']
    # Specify destination bucket – this must be changed as AWS S3 bucket names are global in scope
    destination_bucket_name='btsdc-test-bucket'
    # Specify from where file needs to be copied
    copy_object={'Bucket':source_bucket_name,'Key':file_name}
    # Write copy statement
    s3_client.copy_object(CopySource=copy_object,Bucket=destination_bucket_name,Key=file_name)
    return {
        'statusCode': 200,
        'body': json.dumps('File has been Successfully Copied')
    }

Then click Deploy.

Upload files to btsdc-ingest-bucket and check they appear in btsdc-test-bucket. We may also check the file has been copied across correctly by downloading it from the target S3 bucket.

Delete files from both btsdc-ingest-bucket and btsdc-test-bucket.

Extracting File Content

This section has the potential to expand into a separate book itself. One challenge with authoring code samples is keeping content to manageable sections while conveying enough information to inform and equip the audience. Cue one epic fail!

Let’s start by investigating Amazon Textract, a file processing utility. Its documentation is at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/textract.html#Textract.Client.analyze_document . Note that Textract has two modes of operation: synchronous for small files and asynchronous for larger files. In this chapter, we investigate Synchronous operation using textract.analyze_document. Documentation is at https://docs.aws.amazon.com/textract/latest/dg/analyzing-document-text.html while integrating with our S3 copy code.

I confess. I did not author all of the following code. It is based upon code supplied and used with the kind permission of Saurin Shah of Snowflake Inc., extended to showcase the “art of the possible” with apologies for removing comments and blank lines to reduce the page count. The accompanying file has a complete version.

Amend our Lambda function by cutting and pasting it into the body, then deploy. Testing should be straightforward. Upload a PNG or JPG file into your S3 ingest bucket, and assuming all is well, a JSON file containing all the text appears in the S3 output bucket.
import time
import pathlib
import json
import boto3
s3_client         = boto3.client('s3')
json_return_array = [ ]
def textract_analyze_document(source_bucket_name, file_name):
   doc_client = boto3.client('textract')
   response = doc_client.analyze_document(Document={'S3Object': { 'Bucket': source_bucket_name, 'Name': file_name}}, FeatureTypes=["TABLES"])
   blocks = response['Blocks']
   key_map = {}
   value_map = {}
   block_map = {}
   for block in blocks:
      block_id = block['Id']
      block_map[block_id] = block
      if block['BlockType'] == "KEY_VALUE_SET":
         if 'KEY' in block['EntityTypes']:
            key_map[block_id] = block
         else:
            value_map[block_id] = block
      elif block['BlockType'] == "LINE":
         key_map[block_id] = block
   return key_map, value_map, block_map
def find_value_block(key_block, value_map):
   value_block = {}
   for relationship in key_block['Relationships']:
      if relationship['Type'] == 'VALUE':
         for value_id in relationship['Ids']:
            value_block = value_map[value_id]
   return value_block
def get_kv_relationship(key_map, value_map, block_map):
   kvs = {}
   for block_id, key_block in key_map.items():
      value_block = find_value_block(key_block, value_map)
      key = get_text(key_block, block_map)
      val = get_text(value_block, block_map)
      kvs[key] = val
   return kvs
def get_text(result, blocks_map):
   text = ''
   if 'Relationships' in result:
      for relationship in result['Relationships']:
         if relationship['Type'] == 'CHILD':
            for child_id in relationship['Ids']:
               word = blocks_map[child_id]
               if word['BlockType'] == 'WORD':
                  text += word['Text'] + ' '
               if word['BlockType'] == 'SELECTION_ELEMENT':
                  if word['SelectionStatus'] == 'SELECTED':
                     text += 'X '
   return text
def lambda_handler(event, context):
   source_bucket_name = event['Records'][0]['s3']['bucket']['name']
   file_name = event['Records'][0]['s3']['object']['key']
   target_bucket_name='btsdc-test-bucket'
   file_extension = pathlib.Path(file_name).suffix
   file_extension = file_extension.lower()
   copy_object = {'Bucket':source_bucket_name, 'Key':file_name}
   if file_extension == '.png' or file_extension == '.jpg':
      key_map, value_map, block_map = textract_analyze_document(source_bucket_name, file_name)
      kvs = get_kv_relationship(key_map, value_map, block_map)
      json_return_array.append(kvs)
      json_return_string = json.dumps({"data" : json_return_array})
      s3_client.put_object(Bucket = target_bucket_name, Body = json_return_string, Key = file_name + '.json')
      s3_client.copy_object(CopySource = copy_object, Bucket = target_bucket_name, Key=file_name)
      s3_client.delete_object(Bucket = source_bucket_name, Key=file_name)
   return {
      'statusCode': 200,
      'body': json.dumps('File has been Successfully Copied')
   }

Mapping to JSON

With our PNG document converted to JSON , we can use the semi-structured data integration pattern articulated at the start of the chapter to ingest content into Snowflake.
 LIST @TEST.public.json_stage;
SELECT $1 FROM @TEST.public.json_stage/test_json.png.json;

There is one caveat. During testing, we encountered the following error.

Invalid UTF8 detected in string ‘ “0xA3 ”: “”,’ File ‘test_json.png.json’, line 38, character 1, row 37, column “TRANSIENT_STAGE_TABLE”[“$1”:1].

The fix was to delete the offending line (38) in JSON and then reload and retry.

You may also find it necessary to load your file into https://jsonformatter.org and save the output before uploading .

With data accessible, I leave it to you for further processing.

Troubleshooting

If files have not been copied across, navigate to Lambda ➤ Functions ➤ file_to_json ➤ Monitor ➤ Logs. Click the “View logs in CloudWatch” option, which opens a new browser tab showing all available log streams in date order, the most recent first. Refresh, and then click the most recent log where information similar to what is shown in Figure 10-13 is displayed. Note the example shown is for two Lambda invocations, the first failed, and the second most recent invocation succeeded.
Figure 10-13

CloudWatch sample output

Further information can be found by opening up each section where the Lambda error line and reason appear, as shown in Figure 10-14.
Figure 10-14

CloudWatch error example

SQL Integration

The previous section demonstrated how to automatically process files on receipt and convert them to JSON. This chapter builds all required Snowflake capability to stage and interact with JSON. Let’s now focus on directly accessing the content in files held on S3 using SQL. To do so, we must create an external function to call a remote service, in our example, an AWS Python Lambda, using API integration for which documentation is at https://docs.snowflake.com/en/sql-reference/sql/create-api-integration.html#for-amazon-api-gateway and https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws.html . Note the very helpful embedded YouTube tutorial.

The remote service must expose an HTTPS endpoint, both accept and return a single JSON record. Figure 10-15 illustrates the proposed approach where (1) represents our file delivered into S3; (2) is our new Lambda function, which extracts the file content; (3) maps the remote service endpoint into Snowflake; (4) external function and SQL call to extract file content.
Figure 10-15

Remote service and API integration

Tracking Template

We populate the following template copied from https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-planning.html#preparing-to-use-the-aws-management-console . You may also wish to add this template to your support documentation for each external function created.

Note

Using the template saves you time and prevents troubleshooting and rework. I found out the hard way!

===========================================================================
================ Tracking Worksheet: AWS Management Console ===============
===========================================================================
****** Step 1: Information about the Lambda Function (remote service) *****
Your AWS Account ID: 616701129608
Lambda Function Name: parse_image
******** Step 2: Information about the API Gateway (proxy Service) ********
New IAM Role Name: Snowflake_External_Function
New IAM Role ARN: arn:aws:iam::616701129608:role/Snowflake_External_Function
Snowflake VPC ID (optional): com.amazonaws.vpce.eu-west-2.vpce-svc-0839061a5300e5ac1
New API Name: get_image_content
API Gateway Resource Name: snowflake_proxy
Resource Invocation URL: https://8qdfb0w8fh.execute-api.eu-west-2.amazonaws.com/test/snowflake_proxy
Method Request ARN: arn:aws:execute-api:eu-west-2:616701129608:8qdfb0w8fh/*/POST/snowflake_proxy
API Key (optional): uqkOKvci6OajRNCYXvohX9WTY8HpxGzt5vj9eDHV
*** Step 3: Information about the API integration and External Function ***
API Integration Name: document_integration
API_AWS_IAM_USER_ARN: arn:aws:iam::321333230101:user/vnki-s-ukst5070
API_AWS_EXTERNAL_ID: GH06274_SFCRole=3_8GebOvugHdY0QcaNVeO5Ki/H2+Q=
External Function Name: get_image_data

Create a Python Lambda Function

Repeating the steps from Figure 10-9 and Figure 10-10, create a new Lambda function called parse_image. Select Python 3.9 as the runtime, then leave the default execution role as “Create a new role with basic Lambda permissions” before clicking the “Create function” button. After a minute or so, our new function appears similar to Figure 10-11.

Cut and paste the sample code from https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-sample-synchronous.html#sample-synchronous-lambda-function (as follows) and then deploy. This proves that end-to-end configuration works and is replaced later in this chapter.
import json
def lambda_handler(event, context):
    # 200 is the HTTP status code for "ok".
    status_code = 200
    # The return value will contain an array of arrays (one inner array per input row).
    array_of_rows_to_return = [ ]
    try:
        # From the input parameter named "event", get the body, which contains
        # the input rows.
        event_body = event["body"]
        # Convert the input from a JSON string into a JSON object.
        payload = json.loads(event_body)
        # This is basically an array of arrays. The inner array contains the
        # row number, and a value for each parameter passed to the function.
        rows = payload["data"]
        # For each input row in the JSON object...
        for row in rows:
            # Read the input row number (the output row number will be the same).
            row_number = row[0]
            # Read the first input parameter's value. For example, this can be a
            # numeric value or a string, or it can be a compound value such as
            # a JSON structure.
            input_value_1 = row[1]
            # Read the second input parameter's value.
            input_value_2 = row[2]
            # Compose the output based on the input. This simple example
            # merely echoes the input by collecting the values into an array that
            # will be treated as a single VARIANT value.
            output_value = ["Echoing inputs:", input_value_1, input_value_2]
            # Put the returned row number and the returned value into an array.
            row_to_return = [row_number, output_value]
            # ... and add that array to the main array.
            array_of_rows_to_return.append(row_to_return)
        json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return})
    except Exception as err:
         # 400 implies some type of error.
        status_code = 400
        # Tell caller what this function could not handle.
        json_compatible_string_to_return = event_body
    # Return the return value and HTTP status code.
    return {
        'statusCode': status_code,
        'body': json_compatible_string_to_return
    }
Now test the Lambda function using the supplied test case, cut and paste into the Test tab. Then click the Test button, after which you can see the execution result. Use CloudWatch to investigate.
{
  "body":
    "{ "data": [ [ 0, 43, "page" ], [ 1, 42, "life, the universe, and everything" ] ] }"
}

You should see “Execution result: succeeded.”

Create AWS IAM Role

We now use AWS Management Console to configure the IAM role for Snowflake use. Click Roles ➤ Create role. Our role is for another AWS account which is a trusted entity. Select “AWS account” and use your account number recorded in the tracking template (mine is 616701129608). Then click Next ➤ Next. Add a meaningful name such as Snowflake_External_Function, and then click Create role.

Once created, click the new role and record the ARN. (Mine is arn:aws:iam::616701129608:role/Snowflake_External_Function.)

Get Snowflake VPC Information

We must now retrieve the Snowflake VPC information , which relates to the account provisioned by Snowflake, and record information in our tracking template. To do this, from the Snowflake user interface, issue the following commands.
USE ROLE accountadmin;
SELECT system$get_privatelink_config();
Assuming no private links have been configured, and after reformatting at https://jsonformatter.org , the returned JSON document should look like the following.
{
  "privatelink-account-name": "gh06274eu-west-2.privatelink",
  "privatelink-vpce-id": "com.amazonaws.vpce.eu-west-2.vpce-svc-0839061a5300e5ac1",
  "privatelink-account-url": "gh06274eu-west-2.privatelink.snowflakecomputing.com",
  "regionless-privatelink-account-url": "ewaomog-bx93955.privatelink.snowflakecomputing.com",
  "privatelink_ocsp-url": "ocsp.gh06274eu-west-2.privatelink.snowflakecomputing.com",
  "privatelink-connection-urls": "[]"
}

From which we add privatelink-vpce-id: com.amazonaws.vpce.eu-west-2.vpce-svc-0839061a5300e5ac1 to our tracking template. Note that yours will differ.

Create API Gateway Endpoint

In AWS Management Console , search for “API Gateway ” and select the service. From the options presented, find Rest API and click the Build button. Then select New API, at which point you see the Settings screen.

Enter the API name: get_image_content. Optionally, you can populate the Description field. Leave the Endpoint Type set to Regional, and then click Create API.

In the Methods screen, click Actions and select Create Resource. Set the resource name to snowflake_proxy before clicking the “Create resource” button.

Next, click Actions and select Create Method. From the drop-down list, select POST, and click the tick mark. Leave Lambda Function selected as the integration type. Select the “Use Lambda Proxy integration” option. In the Lambda Function dialog box, enter parse_image and click Save. In the pop-up box, click OK. The screen now presents a schematic showing request/response. This can be ignored.

From the Actions drop-down, select Deploy API action. Set the Deployment Stage to [New Stage], set the stage name to “test”, and then click Deploy.

The newly created “test” stage can now be selected and the tree opens. Click POST and record the Invoke URL (in my example, https://8qdfb0w8fh.execute-api.eu-west-2.amazonaws.com/test/snowflake_proxy ). Record this in the tracking template. Note that yours will differ.

Secure API Gateway Proxy

Click Amazon API Gateway ➤ get_image_content ➤ POST ➤ Method Request. Next, click the Edit symbol in Authorization , and in the drop-down list, select AWS IAM. Click the tick mark to save. From the Actions drop-down list, select the Deploy API action. Set the deployment stage to “test” and click Deploy.

Then click Method Execution and retrieve the ARN; in my example, it is arn:aws:execute-api:eu-west-2:616701129608:8qdfb0w8fh/*/POST/snowflake_proxy. Record this in the tracking template. Note thatyours will differ.

On the left of the screen, select Resource Policy and paste the example resource policy from the documentation at https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-ui-proxy-service.html#secure-your-amazon-api-gateway-endpoint .

Replace <12-digit-number> with your AWS account ID, in our example this is 616701129608. Replace <external_function_role> with Snowflake_External_Function. Then replace <method_request_ARN> with your method execution ARN.

The following code uses my settings; yours will differ.
{
    "Version": "2012-10-17",
    "Statement":
    [
        {
        "Effect": "Allow",
        "Principal":
            {
            "AWS": "arn:aws:sts::616701129608:assumed-role/Snowflake_External_Function/snowflake"
            },
        "Action": "execute-api:Invoke",
        "Resource": "arn:aws:execute-api:eu-west-2:616701129608:8qdfb0w8fh/*/POST/snowflake_proxy"
        }
    ]
}

Click the Save button before clicking get_image_content in the header bar. Click Actions ➤ Deploy API. In the deployment stage, select “test” and then click Deploy.

Snowflake API Integration

In this section, I have substituted values from my AWS account; yours will differ and must be replaced.

Create API Key

This section is included for completeness only. You may not need an API key, but if your organization requires API keys, here is how to create them. Documentation is at https://docs.snowflake.com/en/sql-reference/external-functions-security.html#using-the-api-key-option-in-create-api-integration .

From AWS Management Console, navigate to API Gateway ➤ get_image_content, and on the left-hand panel, select API Keys ➤ Actions ➤ Create API key.

In the Name dialog, enter get_image_content_key. Leave the API key set to Auto Generate, and optionally add a description before clicking Save.

To see the generated key, navigate to API Gateway ➤ get_image_content. On the left-hand panel, select API Keys ➤ get_image_content_key ➤ API key ➤ show. My key looks like this: uqkOKvci6OajRNCYXvohX9WTY8HpxGzt5vj9eDHV. Add this to the tracking template.

Navigate to API Gateway ➤ get_image_content and on the left hand panel select Resources ➤ Method Request ➤ Settings ➤ API Key Required. Ensure it is set to “true”. If not, edit and click the checkbox then Actions ➤ Deploy API.

AWS documentation is at https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-setup-api-key-with-console.html .

Create API Integration

An API integration stores information about an HTTPS proxy service. We create our API integration to access the Lambda function created.
USE ROLE accountadmin;

The use of the AWS API key is not covered in the online video tutorial, hence the preceding section. The API key section is commented out in the following command, which serves as a placeholder. Your API key will differ from mine.

Define our API integration—document_integration.
CREATE OR REPLACE API INTEGRATION document_integration
API_PROVIDER         = aws_api_gateway
API_AWS_ROLE_ARN     = 'arn:aws:iam::616701129608:role/Snowflake_External_Function'
//API_KEY              = 'uqkOKvci6OajRNCYXvohX9WTY8HpxGzt5vj9eDHV'
ENABLED              = TRUE
API_ALLOWED_PREFIXES = ('https://8qdfb0w8fh.execute-api.eu-west-2.amazonaws.com/test/snowflake_proxy');
As we might expect, when creating a new integration, the trust relationship must be established, for which we need some information .
DESC INTEGRATION document_integration;

Record these settings in the tracking template. Yours will differ.

API_AWS_IAM_USER_ARN: arn:aws:iam::321333230101:user/vnki-s-ukst5070

API_AWS_EXTERNAL_ID: GH06274_SFCRole=3_8GebOvugHdY0QcaNVeO5Ki/H2+Q=

Establish Trust Relationship

In AWS Management Console , navigate to IAM and select the Snowflake_External_Function role created. Select the Trust relationships tab and click “Edit trust policy”. Replace AWS ARN with your API_AWS_IAM_USER_ARN. Expand the Condition square brackets to include the following text shown in curly braces {“StringEquals”: {“sts:ExternalId”: “ GH06274_SFCRole=3_8GebOvugHdY0QcaNVeO5Ki/H2+Q=}}: and replace with your API_AWS_EXTERNAL_ID. Mine is shown.
{
        "Version": "2012-10-17",
        "Statement": [
                {
                        "Effect": "Allow",
                        "Principal": {
                                "AWS": "arn:aws:iam::321333230101:user/vnki-s-ukst5070"
                        },
                        "Action": "sts:AssumeRole",
                        "Condition": {"StringEquals": {"sts:ExternalId": " GH06274_SFCRole=3_8GebOvugHdY0QcaNVeO5Ki/H2+Q=}}
                }
        ]
}

Then click the “Update policy” button. If we later update the API integration, the trust relationship will also need updating.

Create Snowflake External Function

After updating the trust relationship , we can create our get_image_data external function.
CREATE OR REPLACE EXTERNAL FUNCTION get_image_data ( n INTEGER, v VARCHAR )
RETURNS VARIANT
API_INTEGRATION = document_integration
AS 'https://8qdfb0w8fh.execute-api.eu-west-2.amazonaws.com/test/snowflake_proxy';

Calling External Function

We can test our function by calling with appropriate values for INTEGER and VARCHAR parameters.
SELECT get_image_data ( 1, 'name' );
For our next section.
GRANT USAGE ON FUNCTION get_image_data(INTEGER, VARCHAR) TO ROLE sysadmin;
The successful response is to echo the input parameters, as shown in Figure 10-16.
Figure 10-16

Successful Lambda invocation

Deploy Lambda Image

This section modifies parse_image to scrape content from an image file.

Prove we can execute the existing external function as sysadmin, returning the same results shown in Figure 10-16.
USE ROLE      sysadmin;
USE DATABASE  TEST;
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA    public;
SELECT get_image_data ( 1, 'name' );
In AWS Management Console , navigate to parse_image. If you have made changes to the template code, consider saving a copy before deploying this code. Note that spaces and comments have been removed to reduce page count.
 import time
import json
import boto3
from urllib.request import urlopen
def textract_analyze_document(contents):
   doc_client = boto3.client('textract')
   response = doc_client.analyze_document(Document={'Bytes': contents}, FeatureTypes=["FORMS"])
   blocks = response['Blocks']
   key_map = {}
   value_map = {}
   block_map = {}
   for block in blocks:
      block_id = block['Id']
      block_map[block_id] = block
      if block['BlockType'] == "KEY_VALUE_SET":
         if 'KEY' in block['EntityTypes']:
            key_map[block_id] = block
         else:
            value_map[block_id] = block
      elif block['BlockType'] == "LINE":
         key_map[block_id] = block
   return key_map, value_map, block_map
def find_value_block(key_block, value_map):
   value_block = {}
   for relationship in key_block['Relationships']:
      if relationship['Type'] == 'VALUE':
         for value_id in relationship['Ids']:
            value_block = value_map[value_id]
   return value_block
def get_kv_relationship(key_map, value_map, block_map):
   kvs = {}
   for block_id, key_block in key_map.items():
      value_block = find_value_block(key_block, value_map)
      key = get_text(key_block, block_map)
      val = get_text(value_block, block_map)
      kvs[key] = val
   return kvs
def get_text(result, blocks_map):
   text = ''
   if 'Relationships' in result:
      for relationship in result['Relationships']:
         if relationship['Type'] == 'CHILD':
            for child_id in relationship['Ids']:
               word = blocks_map[child_id]
               if word['BlockType'] == 'WORD':
                  text += word['Text'] + ' '
               if word['BlockType'] == 'SELECTION_ELEMENT':
                  if word['SelectionStatus'] == 'SELECTED':
                     text += 'X '
   return text
def lambda_handler(event, context):
   array_of_rows_to_return = [ ]
   event_body = event["body"]
   payload = json.loads(event_body)
   rows = payload["data"]
   for row in rows:
      row_number    = row[0]
      param_1       = row[1]
      presigned_url = row[2]
      contents = urlopen(presigned_url).read()
      key_map, value_map, block_map = textract_analyze_document(contents)
      kvs = get_kv_relationship(key_map, value_map, block_map)
      row_to_return = [row_number, kvs]
      array_of_rows_to_return.append(row_to_return)
      json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return})
   return {
      'statusCode': 200,
      'body': json_compatible_string_to_return
   }

Assign IAM Policy

We must assign entitlement for parse_image to access S3 bucket btsdc-json-bucket and use Textract. We implemented the same entitlement previously. First, edit test_policy.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObjectVersion",
                "s3:DeleteObject",
                "s3:GetObjectVersion",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "textract:DetectDocumentText",
                "textract:StartDocumentTextDetection",
                "textract:StartDocumentAnalysis",
                "textract:AnalyzeDocument",
                "textract:GetDocumentTextDetection",
                "textract:GetDocumentAnalysis"
            ],
            "Resource": "*"
        }
    ]
}

Click the “Review policy” button and then save the changes. You may be prompted to remove an old version of test_policy.

Next, attach policy. Navigate to IAM ➤ Roles ➤ Snowflake_External_Function ➤ Add permissions ➤ Attach policies. You may also find a default role created for parse_image called something like parse_image-role-eu6asv26 to which test_policy should be attached.

Set the checkbox for test_policy and then click the “Attach policies” button. Do not forget to check that the policy has been attached. Navigate to IAM ➤ Roles ➤ Snowflake_External_Function and check that test_policy appears under the “Permissions policies” section.

Invoke Lambda Image

Let’s reuse btsdc-json-bucket ; your S3 bucket name will differ. Upload a PNG file to your corresponding S3 bucket and from the Snowflake workspace refreshing stage if required and ensure the file is visible
ALTER STAGE TEST.public.json_stage REFRESH;
SELECT * FROM DIRECTORY ( @TEST.public.json_stage );
SELECT * FROM v_strm_test_public_json_stage;

There is a limitation with get_presigned_url function insofar as this must be a fully qualified literal stage name declared as an object and not a quoted string passed through. For more information, see the documentation at https://docs.snowflake.com/en/sql-reference/functions/get_presigned_url.html#get-presigned-url .

Using the get_presigned_url function, we now invoke parse_image. Replace <your_file_here> with the file loaded into btsdc-json-bucket.
SELECT get_image_data(0, get_presigned_url(@TEST.public.json_stage, get_relative_path(@TEST.public.json_stage, 's3://btsdc-json-bucket/<your_file_here>.png')));
Figure 10-17 shows expected results, yours will differ, and the address has been deliberately blanked out.
Figure 10-17

Sample PNG to JSON

The JSON document returns fields and values, but due to the way in which Textract operates, not every key/value pair is returned. A full explanation is beyond the scope of this chapter, but briefly, text in images is bound by boxes before extract, and it is the content of each box that is returned. Examine the returned JSON for information. Note thattrailing spaces are significant for matching specific fields.

In the following code sample , replace <your_string> with the text to search for and <your_file_here> with the file loaded into btsdc-json-bucket.
SELECT parse_json(get_image_data(0, get_presigned_url(@TEST.public.json_stage, relative_path))):"<your_string>"::string
FROM   v_strm_test_public_json_stage
WHERE  relative_path = '<your_file_here>.png';

Automation

Our next step is to automate document JSON extract and parsing, an exercise left for you to complete using your knowledge of streams and tasks.

Monitoring

In AWS Management Console , navigate to CloudWatch ➤ Log groups, where you see the /aws/lambda/parse_image log group. Click it to show the latest logs.

Note that upstream errors (i.e., those which do not cause Lambda to be invoked) are not recorded. Only Lambda invocations create log entries that may not be successful.

Troubleshooting

Inevitably there will be times when we need to fix our code, and during the creation of this example, I made several, hence my note to use the tracking template . One “gotcha” stands out. In the Method Request” setting, I failed to redeploy Actions ➤ Deploy API. I easily missed it. After which, I encountered a further error relating to the API key. See https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-troubleshooting.html#request-failed-for-external-function-ext-func-with-remote-service-error-403-message-forbidden to which the answer was to disable the API key from the Method Request.

The last error encountered related to Lambda not having an entitlement to access Textract, specifically analyze_document, as identified in the CloudWatch logs. The solution was to add a policy containing appropriate entitlement to the Lambda function.

The following document offers solutions to common issues: https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-troubleshooting.html#troubleshooting-external-functions-for-aws .

And for AWS-specific issues, see https://docs.snowflake.com/en/sql-reference/external-functions-creating-aws-troubleshooting.html#label-external-functions-creating-aws-troubleshooting .

Cleanup

Apart from cleanup scripts to remove our test code, we should also be mindful to periodically remove redundant files uploaded to stages remembering each file contributes toward storage costs.
USE ROLE      sysadmin;
USE DATABASE  TEST;
USE SCHEMA    public;
DROP TASK     task_load_test_data;
DROP DATABASE test;
USE ROLE      accountadmin;
DROP STORAGE INTEGRATION json_integration;
DROP API INTEGRATION document_integration;
REVOKE EXECUTE TASK ON ACCOUNT FROM ROLE sysadmin;

Summary

This chapter introduced semi-structured data, focusing on JSON, and loading a sample file into the Snowflake VARIANT data type. We discussed techniques to handle source files of greater than 16 MB. Note that the maximum size a VARIANT can accept is 16 MB.

Noting the self-contained and complex nature of semi-structured data, we utilized materialized views to normalize into a more easily understood format leading to natural integration with our relational format data model. A summary of use cases for the direct query of materialized view creation was also provided. Materialized view limitations were discussed before touching upon automation of semi-structured data ingestion.

Moving on to unstructured data, we dived into unfamiliar territory, looking inside AWS Management Console, configuring a Python lambda function to automatically invoke a Lambda function, and copying and processing an image file into JSON format, thus demonstrating the capability to convert an unstructured image file into a semi-structured format.

Our next example delivered a “hello world” style Lambda function as an external procedure accessible via Snowflake. Using SQL proved that end-to-end connectivity is not a trivial undertaking. We then adapted our working Lambda function to scrape content out of an image document, proving we can convert an image file into a JSON document, albeit with some limitations imposed by Amazon Textract, and then extract specific key/value pairs using SQL. I left automation as an exercise for you to implement using streams and tasks.

Having worked through semi-structured and unstructured data, we now look at the query optimizer and performance tuning.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.216.175