S3-triggered events

So far, we have written a lambda function that's driven by API requests or is scheduled to be triggered. Now let's complete our application's data cycle by adding an S3-triggered task that will compute the medians for each category every time a new dataset is collected—in other words, once the scheduled task is finished.

In order to do so, we'll add one more operation to the same data collection application that we've been working on:

  1. First, let's redefine the median computation. The original code also used pandas, so we have two options: use NumPy as we just did for the ML part, or use vanilla Python. As our raw data collection doesn't need NumPy, let's stick with the second option. Here is the no-dependency code for the medians:
from statistics import median
from datetime import datetime
parsestr = '%Y-%m-%d %H:%M:%S'

def _calc_medians(data):
results = {}
for record in data:
ct = record["complaint_type"]
if ct not in results:
results[ct] = []

spent = datetime.strptime(record['closed_date'], parsestr) - datetime.strptime(record['created_date'], parsestr)
spent = spent.seconds / 3600 # hours
results[ct].append(spent)

return {k : median(v) for k, v in results.items()}
  1. Now we'll add two more functions:
    • One will check whether a new object fits the pattern for the data we're interested in (the trigger will be executed on any file creation event in the bucket).
    • The second one will pull the data using the requests library.

In both cases, the code is fairly trivial:

def _is_dataset(key):
'''check if triggered by data we're interested in '''
return ('311data' in key) and key.endswith('.json')

def _get_raw_data(bucket, key):
r = rq.get(f'https://{bucket}.s3.amazonaws.com/{key}')
r.raise_for_status()
return r.json()
  1. Finally, we can now pull all three functions together under one overarching function. For data upload, we'll reuse the _upload_json function that we wrote for the scheduled data collection:
MEDIANS_FOLDER = '311/'

def compute_medians(event):
if _is_dataset(event.key):
data = _get_raw_data(bucket=event.bucket, key=event.key)
medians = _calc_medians(data)
_upload_json(medians, 'medians.json', bucket=BUCKET, key=MEDIANS_FOLDER)
  1. Lastly, we need to add a decorator, setting the trigger. Here is how the code will look as a whole:
def _is_dataset(key):
'''check if triggered by data we're interested in '''
return ('311data' in key) and key.endswith('.json')

def _calc_medians(data):

results = {}
for record in data:
ct = record["complaint_type"]
if ct not in results:
results[ct] = []

spent = datetime.strptime(record['closed_date'],
'%Y-%m-%d %H:%M:%S')
- datetime.strptime(record['created_date'],
'%Y-%m-%d %H:%M:%S')
spent = spent.seconds // 3600 # hours
results[ct].append(spent)

return {k : median(v) for k, v in results.items()}


def _get_raw_data(bucket, key):
r = rq.get(f'https://{bucket}.s3.amazonaws.com/{key}')
r.raise_for_status()
return r.json()


@app.on_s3_event(bucket=BUCKET,
events=['s3:ObjectCreated:*'])
def compute_medians(event):
if _is_dataset(event.key):
data = _get_raw_data(bucket=event.bucket, key=event.key)
medians = _calc_medians(data)
_upload_json(medians, 'medians.json', bucket=BUCKET,
key=MEDIANS_FOLDER)

Chalice is capable of defining other events that would trigger your actions. Alternatively, you can specify an event via the AWS web console, as it has more options. For example, you can trigger your lambda by talking to Alexa—in fact, any conversation with Alexa that you've had was running as a Lambda function!

Just a quick reminder—if you deployed your versions of API, especially the scheduler pipeline, don't forget to stop them or you will start getting invoices in a few years.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.17.230