Building a serverless function as a data pipeline

So far, we have only used serverless functions as API endpoints, but they can serve in many other ways as well. For example, they can be triggered to run for each new file uploaded to a specific folder on S3, or scheduled to run at a specific time. 

Let's create one more application for data collection. We can specify that we need the requests library in requirements.txt. We can also copy and paste the _get_data function from Chapter 15, Packaging and Testing with Poetry and PyTest, along with the resource and time columns. One part of the code that we are still missing is that for uploading data to S3. Here is the code:

def _upload_json(obj, filename, bucket, key):
S3 = boto3.client('s3', region_name='us-east-1')
key += ('/' + filename)

S3.Object(Bucket=bucket, Key=key).put(Body=json.dumps(obj))

Having all the necessary pieces, let's pull them together as one function. Here is the code:

from datetime import date, timedelta

def get_data(event):
yesterday = date.today() - timedelta(days=1)

data = _get_data(resource, time_col, yesterday, offset=0)
_upload_json(data, f'{yesterday:%Y-%m-%d}.json', bucket=BUCKET, key=KEY)

Finally, all we need now is to add a chalice decorator, @app.schedule('rate(1 day)'). Here, instead of the app.route decorator, we use app.schedule and define a corresponding frequency. Here is how it will look as a whole:

import json
from datetime import date, timedelta
BUCKET, FOLDER = 'philipp-packt', '311/raw_data'
resource = 'fhrw-4uyv'
time_col = 'Created Date'
app = Chalice(app_name='collect-311')

def _upload_json(obj, filename, bucket, key):
S3 = boto3.client('s3', region_name='us-east-1')
key += ('/' + filename)

S3.Object(Bucket=bucket, Key=key).put(Body=json.dumps(obj))


def _get_data(resource, time_col, date, offset=0):
'''collect data from NYC open data
'''

Q = f"where=created_date between '{date:%Y-%m-%d}' AND '{date:%Y-%m-%d}T23:59:59.000'"
url = f'https://data.cityofnewyork.us/resource/{resource}.json?$limit=50000&$offset={offset}&${Q}'
r = rq.get(url)
r.raise_for_status()

data = r.json()
if len(data) == 50_000:
offset2 = offset + 50000
data2 = _get_data(resource, time_col, date, offset=offset2)
data.extend(data2)

return data

@app.schedule('rate(1 day)')
def get_data(event):
yesterday = date.today() - timedelta(days=1)
data = _get_data(resource, time_col, yesterday, offset=0)
_upload_json(data, f'{yesterday:%Y-%m-%d}.json', bucket=BUCKET, key=FOLDER)

Once deployed, this function will run every day, collecting data for the previous date and storing it as JSON in our S3 bucket. But can we go further and prepare medians for our prediction model, automatically? Let's find out in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.204.216