Chapter 6. Working with Data

Without question, one of the greatest paradigm shifts when working with Cloud Computing is the near unlimited storage now available to users. Cheap, scalable blob storage, in the form of GCS allows administrators to start from a standpoint of “never delete data”. Separation of compute and storage in the cases of BigQuery, Spark on Dataproc with persistence on GCS, etc extends this model and allows users to pay for only what they use of the expensive component (compute) while storing as much as possible, generally saving on engineering effort. These recipes show clever tips and tricks when working with the various data layers of Google Cloud, from how to move data round GCS buckets faster, to extending PubSub functionality, to automatically archiving long-term data.

6.1 Speeding up GCE Transfers - Multiprocessing

Problem

You want to increase the speed for a bulk transfer, either to/from GCS or within the GCS service.

Solution

You can leverage the ‘-m’ flag to multiprocess your transfer.

  1. Create a bucket and upload data to observe normal single-process transfer speed

    BUCKET_NAME=my-bucket-4312
    gsutil mb -l gs://$BUCKET_NAME
    # can cancel once you see transfer speed, will move ~250GB total
    gsutil cp -r gs://gcp-public-data-landsat/LC08/01/044/034/* gs://$BUCKET_NAME
  2. Add ‘-m’ to multi-process your transfer and observe greatly increased speed.

    # can cancel once you see greatly increased transfer speed
    gsutil -m cp -r gs://gcp-public-data-landsat/LC08/01/044/034/* gs://$BUCKET_NAME
  3. Delete test files

    gsutil -m rm -r  gs://$BUCKET_NAME/034/*

6.2 Speeding up GCS Transfers - Parallel Composite Uploads for large files

Problem

You want to increase the speed for a bulk transfer, particularly to/from GCS service for large files.

Solution

You can leverage parallel composite uploads, set at command line or in your .boto configuration file.

  1. Create a bucket and download a single file to then perform test uploads

    BUCKET_NAME=my-bucket-4312
    gsutil mb -l gs://$BUCKET_NAME
    # copy a largish file locally (~250MB)
    gsutil cp gs://gcp-public-data-landsat/LC08/01/044/017/LC08_L1GT_044017_20200809_20200809_01_RT/LC08_L1GT_044017_20200809_20200809_01_RT_B8.TIF .
  2. Upload the file as a single chunk and observe transfer speed.

    gsutil cp LC08_L1GT_044017_20200809_20200809_01_RT_B8.TIF gs://$BUCKET_NAME
  3. Upload the file as several simultaneous chunks and observe transfer speed. If your previous upload saturated your link, you may not see a performance increase.

    gsutil -o "GSUtil:parallel_composite_upload_threshold=200M,GSUtil:parallel_composite_upload_component_size=50M,GSUtil:parallel_process_count=8" cp LC08_L1GT_044017_20200809_20200809_01_RT_B8.TIF gs://$BUCKET_NAME
  4. Delete the file

    gsutil rm gs://$BUCKET_NAME/LC08_L1GT_044017_20200809_20200809_01_RT_B8.TIF

Discussion

You can leverage parallel composite uploads, set at command line or in your .boto configuration file. This is particularly helpful when uploading large files. However, this requires that both the source and destination environments have a CRC32C library installed for integrity checking. There are additional caveats, such as a maximum of 32 objects per composite.

6.3 Adding event timestamps to PubSub

Problem

The `timestamp` attribute on a PubSub message indicates the publish time, that is, when the message reached the PubSub service and potentially not when the event actually was created, particularly in offline scenarios. You want to process based on event time, not publish time.

Solution

You can leverage PubSub metadata, which can then be consumed by downstream applications.

  1. Generate a timestamp as part of your payload dictionary

    def generate_event():
        return {
            'user_id': random.randint(1, 100),
            'action': random.choices(['start', 'stop', 'rewind', 'download'], k=1),
            'timestamp': datetime.datetime.utcnow().strftime('%m/%d/%Y %H:%M:%S %Z')
        }
  2. When you publish to PubSub, add a ‘timestamp’ field in addition to the encoded data

    def publish_burst(publisher, topic_path, buffer):
        for message in buffer:
            json_str = json.dumps(message)
            data = json_str.encode('utf-8')
            publisher.publish(topic_path, data, timestamp=message['timestamp'])
            print('Message for event {} published at {}'.format(message['timestamp'], datetime.datetime.utcnow().strftime('%m/%d/%Y %H:%M:%S %Z')))
  3. Now your PubSub messages will have an event ‘timestamp’ attribute you can address in addition to the automatically set ‘publishTime’ metadata which you can use to order your data based on when it occurs. Dataflow, for example, allows you to use event_timestamp vs the publish time with the following (in Java)

    pipeline.apply("ReadMessage", PubsubIO.readStrings()
                            .withTimestampAttribute("timestamp")
                            .fromTopic(options.getInputTopic()))

Discussion

You can set attributes in PubSub messages that can be intelligently consumed in downstream data, in this case an even-timestamp attribute. Dataflow is smart enough to not only use this timestamp in place of publishing timestamp for time-based processing such as windowing, but it is able to efficiently use this metadata to update it’s watermark - the internal mechanism by which it tracks how up-to-date data is. https://stackoverflow.com/questions/42169004/what-is-the-watermark-heuristic-for-pubsubio-running-on-gcd.

6.4 Mounting GCS as a File-System (sort of)

Problem

You want to use traditional filesystem based tools to interact with GCS blobs and ‘directories’

Solution

You can use GCSFuse as a beta-quality convenience option to mount a GCS bucket to your VM. These are the Linux instructions.

  1. Create a test VM:

    gcloud compute --project=dhodun1 instances create gcs-fuse-vm 
        --zone=us-central1-a --machine-type=e2-medium 
        --scopes=https://www.googleapis.com/auth/cloud-platform 
        --image=ubuntu-2004-focal-v20210315 --image-project=ubuntu-os-cloud 
        --boot-disk-size=100GB
  2. Create your bucket and populate some test data

    BUCKET_NAME=my-bucket-4312
    gsutil mb -l gs://$BUCKET_NAME
    gsutil -m cp -r gs://gcp-public-data-landsat/LC08/01/044/034/LC08_L1GT_044034_20130330_20170310_01_T2 gs://$BUCKET_NAME
  3. SSH Onto the VM

  4. Install FUSE (Ubunutu / Debian latest releases)

    export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
    echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
    curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
  5. Check the current gcloud credentials to see what GCS fuse will use

    gcloud auth list
  6. Create mount point and mount:

    BUCKET_NAME=my-bucket-4312
    mkdir $BUCKET_NAME
    gcsfuse --implicit-dirs $BUCKET_NAME $BUCKET_NAME
  7. Add a file and test that it’s indeed on GCS

    ls $BUCKET_NAME
    cd $BUCKET_NAME
    echo "gcs fuse is cool!" > file.txt
    cd
    gsutil cat gs://$BUCKET_NAME/file.txt
  8. Add entry to /etc/fstab to automatically mount

    vim /etc/fstab
    # add:
    my-bucket-4312 /home/dhodun/my-bucket-4312 gcsfuse rw,implicit_dirs,x-systemd.requires=network-online.target,allow_other,uid=1001,gid=1002
  9. Restart VM and test

    sudo shutdown -r now

Discussion

GCS Fuse is a great beta-quality convenience tool for mounting and accessing GCS blobs. It is not a production replacement for a traditional file store (see Cloud Filestore), and lacks many features of a true POSIX filesystem, as well as some GCS features such as metadata. Also note that you will still incur GCS storage charges, particularly for nearline and coldline (archival) storage. See: https://cloud.google.com/storage/docs/gcs-fuse#notes.

6.5 Designing your Schema for Spanner with Interleaved Tables

Problem

You want to speed up certain queries in Spanner and child tables in your schema design container primary keys from the parent table.

Solution

You can leverage “Interleaved Tables” in Spanner to co-located on disk the child data with the parent data based on a primary key. This is an alternative to “Foreign Keys”.

  1. Normally for basic, abbreviated star-schema setup you would create databases with similar primary keys as such.

    -- Schema hierarchy:
    -- + Customers (sibling table of Orders)
    -- + Orders (sibling table of Customers)
    CREATE TABLE Customers (
      CustomerId INT64 NOT NULL,
      FirstName  STRING(1024),
      LastName   STRING(1024),
      Address     STRING(1024),
    ) PRIMARY KEY (CustomerId);
    CREATE TABLE Orders (
      OrderId       INT64 NOT NULL,
      OrderTotal    INT64 NOT NULL,
      QuantityItems INT64 NOT NULL,
      OrderItems   ARRAY(INT64),
    ) PRIMARY KEY (CustomerId, OrderId);
  2. In Cloud Spanner, you can indicate that one table is a child of another, so long it contains a common Primary Key. In this case all the orders will be laid out on disk under a given CustomerId.

    -- Schema hierarchy:
    -- + Customers
    --   + Orders (interleaved table, child table of Orders)
    CREATE TABLE Customers (
      CustomerId INT64 NOT NULL,
      FirstName  STRING(1024),
      LastName   STRING(1024),
      Adress     STRING(1024),
    ) PRIMARY KEY (CustomerId);
    CREATE TABLE Orders (
      OrderId       INT64 NOT NULL,
      OrderTotal    INT64 NOT NULL,
      QuantityItems INT64 NOT NULL,
      OrderItems   ARRAY(INT64),
    ) PRIMARY KEY (CustomerId, OrderId),
      INTERLEAVE IN PARENT Customers ON DELETE CASCADE;

Discussion

Interleaved Tables stores rows physically under its associated parent row, greatly speeding up some queries. In this case, if you wanted to perform general analytics on a given Customer, for example, what is their total spend for last year, or how many items have they ever bought, these analytic queries would complete much faster with Interleaved tables since all the necessary data is physically co-located. The alternative is the database engine has to perform a standard key join on the CustomerId to find all the orders from the order table, which might be a more expensive (slower) operation if the table is organized by timestamp, for example.

6.6 Automatically archiving and deleting objects on GCS

Problem

You want to automatically handle lifecycle management of GCS objects - namely changing the storage class to more archival-friendly classes as files get older, and delete the oldest files according to a policy.

Solution

You can leverage lifecycle management on a GCS bucket level with policies based on an object’s age (or other attributes) to change storage class, delete, and other actions.

  1. Create a bucket to be managed:

    BUCKET_NAME=my-bucket-4312
    gsutil mb -l gs://$BUCKET_NAME
  2. Create a ‘lifecycle_management.json’ file with your bucket’s policy. This policy will archive standard and DRA storage to nearline after 1 year, nearline to coldline after 3 years, and delete items after 7 years. It will not archive multi-regional objects - in this case they are assumed to be serving objects.

    {
        "lifecycle": {
            "rule": [
                {
                    "condition": {
                        "age:": 365,
                        "matchesStorageClass": ["STANDARD", "DURABLE_RECUDED_AVAILBILITY"]
                    },
                    "action": {
                        "type:": "SetStorageClass",
                        "storageClass": "NEARLINE"
                    }
                },
                {
                    "condition": {
                        "age:": 1096,
                        "matchesStorageClass": ["NEARLINE"]
                    },
                    "action": {
                        "type:": "SetStorageClass",
                        "storageClass": "COLDLINE"
                    }
                },
                {
                    "condition": {
                        "age:": 2555,
                        "matchesStorageClass": ["COLDLINE"]
                    },
                    "action": {
                        "type:": "Delete"
                    }
                }
            ]
        }
    }
  3. Apply the policy to the bucket:

    gsutil lifecycle set lifecycle_config.json gs://$BUCKET_NAME
  4. Check the lifecycle

    gsutil lifecycle get gs://$BUCKET_NAME
  5. To remove lifecycle management, you apply a lifecycle config with no rules:

    {
        "lifecycle": {
            "rule": []
        }
    }
  6. Apply:

    gsutil lifecycle set no_lifecycle.json gs://$BUCKET_NAME

Discussion

Lifecycle management is a powerful, simple way to save storage costs and manage a large fleet of objects. Understanding different archival classes, in particular minimum costs for archival classes like NEARLINE and COLDLINE is important. For example, you pay for a minimum storage duration of 90 days for COLDLINE - if you delete an object 1 minute after creating it, you will still pay as if the object existed 90 days. For this reason, and the fact that some storage policies can permanently delete data, tight control and review of these policies is prudent.

6.7 Locking down Firestore Database so a User can edit only their data

Problem

You want to secure Firestore so that an authenticated user can create a document or edit their previous documents.

Solution

Firestore security rules can not only filter on whether a user is authenticated, but match their UserID to documents in the database, allowing for a simple per-user data authentication model.

  1. Initiate Firebase in your main project directory

    Firebase init
  2. Edit the created “.rules” file, in our case firestore.rules. This config allows users to edit their User document (and only theirs) by matching the authenticated UserId with the UserId in the document database.

    rules_version = '2';
    service cloud.firestore {
        
        // Matches to the default database in the project - currently this is the only database
        match /databases/{database}/documents {
            // Matches the userId on the autenticated request with the userId document
            // in the users collection
            // Otherwise, allows authenticated users to create their document
            match /users/{userId} {
                allow read, update, delete: if request.auth != null && request.auth.uid == userId;
                allow create: if request.auth != null;
            }
        }
    }
  3. Apply the rules file (without redeploying the entire project)

    firebase deploy --only firestore:rules

Discussion

Firestore rules and validation provides robust but concise functionality for controlling who can edit or read what in the Firestore database. This includes setting public access for certain fields, validating that newly written input data is formed correctly, and the above example. Note that Firestore rules are not filters - i.e. in the above case, you could not run a query as this User to return all User documents and expect the security rule to only return this user’s document. This query would fail since some (basically all other) documents would not be accessible to the user. You would need to query on for this user’s document.

Recipe ideas:

  • Gsutil rsync

  • Pubsub dedupe?

  • Pubsub snapshot

  • Mysql something?

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.156.140