Chapter 7. BigQuery and Data Warehousing

With more enterprises leaning on real-time data and analytics to drive business decisions, BigQuery and data warehousing techniques are critical. As Google Cloud’s serverless, Petabyte-scale data-warehouse, BigQuery is often your first and last stop for data storage, large-scale analytics, and even SQL based ML models.

The following recipes show examples of implementing data loading, scalable data querying, and streaming in BigQuery. Included are tips and tricks above and beyond standard SQL and specific to the BigQuery service and implementation.

All code samples for this chapter are located at https://github.com/ruiscosta/google-cloud-cookbook/chapter-8. You can follow along and copy the code for each individual recipe by going to the folder with that recipe’s number

7.1 Building a Pivot Table in BigQuery

Problem

You want to build a traditional pivot table in BigQuery, but need to do so in SQL.

Solution

You can build a query with CASE statements. Alternatively you can use a publicly available stored procedure fhoffa.x.pivot().

  1. Implement a series of IF statements, in this case using SUM() aggregation function since we want a raw count.

    #standardSQL
    SELECT
      datehour,
      SUM(IF(title LIKE '%google%', 1, NULL)) google_views,
      SUM(IF(title LIKE '%cloud%', 1, NULL)) cloud_views,
      SUM(IF(title LIKE '%maps%', 1, NULL)) maps_views
    FROM
      `bigquery-public-data`.wikipedia.pageviews_2020
    WHERE
      EXTRACT(date FROM datehour) = "2020-01-02"
    GROUP BY
      datehour
    ORDER BY
      datehour ASC
  2. Outputs:

Fig caption
  1. Alternatively you can use a publicly shared function, fhoffa.x.pivot().

    CALL fhoffa.x.pivot(
      'bigquery-public-data.iowa_liquor_sales.sales' # source table
      , 'fh-bigquery.temp.test' # destination table
      , ['date'] # row_ids
      , 'store_number' # pivot_col_name
      , 'sale_dollars' # pivot_col_value
      , 30 # max_columns
      , 'SUM' # aggregation
      , '' # optional_limit
    );

Discussion

If you are summing across a particular column, use the SUM() function as described above. If you’re pulling out the values themselves, you would use AVG(), MA(), or ANY_VALUE(). For example if you wanted to preserve all the article names that look like “google” and put that in the “google_views” column you would use: MAX((IF(title LIKE '%google%', title, NULL)) google_views.

Many pivot use-cases can be handled with the public stored procedure fhoffa.x.pivot(). More information can be found here: https://towardsdatascience.com/easy-pivot-in-bigquery-one-step-5a1f13c6c710

7.2 Adding Partitioned and Clustered columns to an existing Table

Problem

You have a BigQuery table that you need to add a partitioned column and clustered columns to increase query performance and decrease query costs.

Solution

BigQuery Standard SQL allows you to create a new table with the partitioned and clustered columns and populate it in the same statement with an AS clause. Note, it will not be the same table, though you could do this in place?

  1. Validate the source table doesn’t have partitioned or clustered columns. Notice the lack of columns on the right

    bq show bigquery-public-data:london_bicycles.cycle_hire
  2. Outputs:

Fig caption
  1. Dry-run a time-based query to see how much data is scanned.

    # pre-partitioned dry run
    QUERY='
    SELECT
      duration,
      bike_id,
      start_date,
      start_station_name,
      end_date,
      end_station_name
    FROM
      `bigquery-public-data.london_bicycles.cycle_hire`
    WHERE
      EXTRACT(DATE FROM start_date ) = "2016-04-03"
      AND EXTRACT(DATE FROM end_date ) = "2016-04-03"
    ORDER BY
      duration DESC
    LIMIT
      5
    '
    bq query 
    --use_legacy_sql=false 
    --location=EU 
    --dry_run 
    $QUERY
  2. Create a new table in your dataset, indicating the partitioned and schema columns.

    CREATE OR REPLACE TABLE
      mydataset1.cycle_hire_partitiond_clustered
    PARTITION BY
      DATE(start_date)
    CLUSTER BY
      bike_id
    OPTIONS( expiration_timestamp=TIMESTAMP "2025-01-01 00:00:00 UTC",
         description="My partitioned and clustered cycle_hire table",
         labels=[("cookbook_query", "development")] )
    AS
    SELECT
      *
    FROM
      `bigquery-public-data.london_bicycles.cycle_hire`
  3. Validate that the destination table does have partitioned or clustered columns.

    bq show mydataset.cycle_hire_partitiond_clustered
Fig caption
  1. Dry-run query on new table

  2. # post-partitioned dry run
    QUERY='
    SELECT
      duration,
      bike_id,
      start_date,
      start_station_name,
      end_date,
      end_station_name
    FROM
      `dhodun1.mydataset.cycle_hire_partitioned_clustered`
    WHERE
      EXTRACT(DATE FROM start_date ) = "2016-04-03"
      AND EXTRACT(DATE FROM end_date ) = "2016-04-03"
    ORDER BY
      duration DESC
    LIMIT
      5
    '
    bq query 
    --use_legacy_sql=false 
    --location=EU 
    --dry_run 
    $QUERY
    # Should output that query will process 2MB

7.3 Selecting the Top-1 Result (scalably)

Problem

You want to return the top item from a sorted list in a BigQuery query, but your dataset is particularly large and causing issues with the ROW_NUMBER() OVER() windowing function since you’re forcing a full sort of the dataset.

Solution

You can apply a trick with ARRAY_AGG(x LIMIT 1)[OFFSET(0)] which will allow BigQuery to drop all data that isn’t the number 1 row, increasing query performance.

  1. Run a query using ROW_NUMBER OVER() windowing function. This query should succeed but will take a while.

    SELECT
      rental_id,
      duration,
      bike_id,
      end_date
    FROM (
      SELECT
        rental_id,
        duration,
        bike_id,
        end_date,
        ROW_NUMBER() OVER (ORDER BY end_date ASC) rental_num
    
      FROM
        `bigquery-public-data`.london_bicycles.cycle_hire )
    WHERE
      rental_num = 1
  2. Run a query using ROW_NUMBER OVER() windowing function. This query should succeed more quickly.

    SELECT
      rental.*
    FROM (
      SELECT
        ARRAY_AGG( rentals
        ORDER BY rentals.end_date ASC LIMIT 1)[OFFSET(0)] rental
      FROM (
        SELECT
          rental_id,
          duration,
          bike_id,
          end_date
        FROM
          `bigquery-public-data`.london_bicycles.cycle_hire) rentals )

Discussion

While ROW_NUMBER() has a scalable implementation that can perform distributed sorts across multiple BigQuery nodes, the entire sort needs to be calculated. In particularly large queries, using ARRAY_AGG allows BigQuery to drop unneeded data.

7.4 De-duplicating Rows in BigQuery with Row Key

Problem

You are loading data into an existing BigQuery table that might introduce duplicate rows. This is common with repeated batch uploads, or when consolidating from a streaming ingest table that might contain “newer” versions of a given record or aggregation.

Solution

Using the MERGE ON...WHEN MATCHED clause in DML, you can easily indicate to BigQuery a join key to de-duplicate on and insert the new rows into your final table.

  1. Create a production table.

    bq cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset.cycle_hire
  2. Create a loading table with new and duplicate data:

    CREATE OR REPLACE TABLE
      mydataset.temp_loading_table AS (
      --Grab 5 duplicate rows
      SELECT
        *
      FROM
        mydataset.cycle_hire
      LIMIT
        5)
    UNION ALL (
    
      --Add a new unique row
      SELECT
        111147469109,
        3180,
        7054,
        '2015-09-03 12:45:00 UTC',
        111,
        'Park Lane, Hyde Park',
        '2015-09-03 11:52:00 UTC',
        300,
        'Serpentine Car Park, Hyde Park',
        NULL,
        NULL,
        NULL)
  3. Verify the counts in both tables:

    --Number of Rows in base table
    SELECT COUNT(*) FROM mydataset.cycle_hire;
    --Number of Rows in loading table
    SELECT COUNT(*) FROM mydataset.temp_loading_table;
  4. Merge the data into your production table with an ON clause

    MERGE
      my_london_bicycles_dataset.cycle_hire rentals
    USING
      my_london_bicycles_dataset.temp_loading_table temp
    ON
      temp.rental_id = rentals.rental_id
      WHEN NOT MATCHED
      THEN
        INSERT ROW
  5. Verify only 1 row has been inserted:

    --Number of rows now in base table
    SELECT COUNT(*) FROM my_london_bicycles_dataset.cycle_hire;

Discussion

This recipe shows how to insert data in a way that doesn’t cause duplicates. For an example of how to filter out duplicates inline in a query, see the following example but without any need for sorting in the ARRAY_AGG function.

It’s also worth noting that you can accomplish this with SELECT DISTINCT * FROM mydataset.mytable, which will ensure every record is entirely unique (not just the key), but this is often a more costly query.

7.5 De-duplicating Rows in BigQuery with Timestamp

Problem

Your application is INSERTING updated rows or aggregations into a BigQuery table rather than UPDATING existing ones for performance reasons, and you need to select just the newest data. This is common in streaming applications and frameworks, such as Dataflow.

Solution

You could easily do this with standard GROUP BY key(s) ORDER BY timestamp LIMIT 1, but for a more scalable approach we implement our ARRAY_AGG(x LIMIT 1)[OFFSET(0)] trick from before so that an older data is simply dropped by BigQuery workers in-flight.

  1. Create a production table.

    bq --location=eu cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset.cycle_hire
  2. Insert a “new” row with same ride_id but updated duration

    INSERT INTO
      dhodun1.mydataset.cycle_hire
    VALUES
      ( 47469109, 3300, 7054, '2015-09-03 12:47:00 UTC', 111, 'Park Lane, Hyde Park', '2015-09-03 11:52:00 UTC', 300, 'Serpentine Car Park, Hyde Park', NULL, NULL, NULL ),
      ( 47469109, 3660, 7054, '2015-09-03 12:53:00 UTC', 111, 'Park Lane, Hyde Park', '2015-09-03 11:52:00 UTC', 300, 'Serpentine Car Park, Hyde Park', NULL, NULL, NULL )
  3. View all three records

    SELECT
      *
    FROM
      mydataset.cycle_hire
    WHERE
      rental_id = 47469109
    ORDER BY
      end_date DESC
  4. Use ARRAY_AGG to get the latest one

    SELECT
      latest_record.*
    FROM (
      SELECT
        rental_id,
        ARRAY_AGG(rentals ORDER BY end_date DESC LIMIT 1)[OFFSET(0)] latest_record
      FROM
        mydataset.cycle_hire rentals
      WHERE
        rental_id = 47469109
      GROUP BY
        rental_id )
  5. Remove WHERE clause to query over the entire dataset.

Discussion

This trick adds an outer SELECT clause, mostly to clean up the nested output from ARRAY_AGG, but is a scalable implementation of cleaning up data when there are newer records. This can be hidden from the user by placing the query in a View or Materialized View for end-users analysts or downstream programs to consume from.

This use case is particularly common with streaming systems or any high-throughput systems that have newer versions of data or aggregations. While BigQuery does support ACID compliant UPDATE AND DELETE DML statements, so that individual records can be mutated an updated, this is generally a costly operation since the files backing BigQuery are immutable (an update triggers entire file-rewrites), and the system overall is tuned for OLAP. For this reason, some scalable streaming systems or otherwise would opt to generate new records in BigQuery, and let BigQuery perform the deduplication as above.

7.6 Un-deleting a Table in BigQuery

Problem

You have accidentally deleted a table in BigQuery and need to recover it.

Solution

You can either use snapshot decorators with the bq tool to recover the data. If you have already re-created a table,

  1. Create a dummy table:

    CREATE OR REPLACE TABLE
      mydataset.cycle_hire AS
    SELECT
      *
    FROM
      `bigquery-public-data`.london_bicycles.cycle_hire
  2. Note the UTC time.

    date -u +%s
  3. “Accidentally delete the table”

    DROP TABLE mydataset.cycle_hire
  4. Restore the data into a temporary staging table. Note we multiply the unix seconds by 1000 since we need milliseconds.

    bq --location=eu cp [email protected] mydataset.cycle_hire_restored

Discussion

This fix should also work if you have already replaced the table, which occasionally happens if you have automated pipelines configured to create the table if it doesn’t exist. In this situation, you can also use BigQuery “Time Travel” with FOR SYSTEM TIME AS OF in SQL to recover the data:

SELECT
 *
FROM
 mydataset.cycle_hire_restored
FOR SYSTEM TIME AS OF '2020-12-29 21:44:09.413928 UTC'

7.7 Streaming JSON or Avro Data into BigQuery with a Dataflow Template

Problem

You have well-formed JSON or Avro data on a PubSub topic and you’re looking to ingest that real-time into a BigQuery table.

Solution

Configuring and executing a Dataflow template can quickly spin up a persistent, streaming Dataflow job with little to no code to achieve this.

  1. Sample a message from the sample public taxi data topic:

    gcloud pubsub subscriptions create taxi-test-sub --topic projects/pubsub-public-data/topics/taxirides-realtime
    gcloud pubsub subscriptions pull projects/<your-project-id>/subscriptions/taxi-test-sub
  2. Will result in JSON message data:

    {"ride_id":"7128ff90-62f7-42de-a93f-67d1f9e7c713","point_idx":2401,"latitude":40.74163,"longitude":-73.95092000000001,"timestamp":"2021-01-06T20:56:11.93239-05:00","meter_reading":55.330975,"meter_increment":0.02304497,"ride_status":"enroute","passenger_count":2}
  3. Create a destination BigQuery Tables, both for well-formed data and a dead-letter table. A schema is needed on the BigQuery table matching the JSON data on the Pub/Sub topic.

    cat <<EOF > schema.json
    [
          {
            "name": "ride_id",
            "type": "STRING"
          },
          {
            "name": "point_idx",
            "type": "INTEGER"
          },
          {
            "name": "latitude",
            "type": "FLOAT"
          },
          {
            "name": "longitude",
            "type": "FLOAT"
          },
          {
            "name": "timestamp",
            "type": "TIMESTAMP"
          },
          {
            "name": "meter_reading",
            "type": "FLOAT"
          },
          {
            "name": "meter_increment",
            "type": "FLOAT"
          },
          {
            "name": "ride_status",
            "type": "STRING"
          },
          {
            "name": "passenger_count",
            "type": "INTEGER"
          }
        ]
    EOF
    bq mk mydataset.taxi_data schema.json
    bq mk mydataset.taxi_deadletter
  4. Create a temporary GCS bucket for the Dataflow Job:

    BUCKET=gs://my-bucket
    gsutil mb $BUCKET
  5. In the Google Cloud Console, open the Dataflow page and click Create Job From Template:

  6. Add a job name and choose “Pub/Sub Topic to BigQuery”. Note the other templates that are available.

Fig caption
  1. For Input Pub/Sub Topic, point to the topic:

projects/pubsub-public-data/topics/taxirides-realtime
  1. Add the BigQuery table and the temporary GCS bucket for the dataflow job:

Fig caption
  1. Optionally, click SHOW OPTIONAL PARAMETERS and fill in the dead-letter BigQuery table. Also take note of other parameters that are available.

Fig caption
  1. Click RUN JOB

  2. Give the job a couple minutes to spin up, then click on the “ReadPubSubTopic” and “WriteSuccessfulRecords” graph nodes to see elements are flowing through the dataflow job:

Fig caption
  1. Validate that data is being written to BigQuery:

    bq head mydataset.taxi_data
  2. Make sure to Stop the Dataflow Job

Discussion

This recipe shows how to quickly get up and running with your first streaming pipeline. You can extend the functionality to Avro data using the Avro template. You can also provide a JavaScript transform in the additional parameters for lightweight transformations that don’t require you to write the entire pipeline code. Any records that are failed to be transformed or processed will be written to the dead-letter table for later analysis (you can test this if you have your own topic and you publish a malformed message, it should show up quickly in this table). Lastly, if you inspect the details of the BigQuery table in the UI, you’ll note that some of your data has been committed to the “Streaming Buffer” but is still immediately available for queries.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset