Understanding time-based tasks

Pipelines are especially useful to schedule data collection, for example, downloading new data every night.

Say we want to collect new data on 311 calls in NYC for the previous day, every morning. First, let's write the pulling function itself. The code is fairly trivial. You can take a look at the Socrata (the data-sharing platform New York uses) API documentation via this link, https://dev.socrata.com/consumers/getting-started.html. The only tricky part is that the dataset can be large—but Socrata won't give us more than 50,000 rows at once. Hence, if the length of the input is equal to 50,000, most likely, the data was capped, and we'll need to make another pull with the offset, over and over until the number of rows is smaller. resource in the arguments represents a unique ID of the dataset—you can obtain it from the dataset's web page:

def _get_data(resource, time_col, date, offset=0):          
Q = f"where=created_date between '{date}' AND '{date}T23:59:59.000'"
url = f'https://data.cityofnewyork.us/resource/{resource}.json?$limit=50000&$offset={offset}&${Q}'

r = rq.get(url, headers=headers)
r.raise_for_status()

data = r.json()
if len(data) == 50_000:
data2 = _get_data(resource, time_col, date, offset=(offset +
50000)
data.extend(data2)

return data

Now, let's write the task itself. It is actually fairly short—all we do is define a resource, time_column (on which we'll query the API), and the date to pull for:

class Collect311(luigi.Task):
time_col = 'Created Date'
date = luigi.DateParameter()
resource = 'fhrw-4uyv'

def output(self):
path = f'{folder}/311/{self.date:%Y/%m/%d}.csv'
return luigi.LocalTarget(path)

def run(self):
data = _get_data(self.resource, self.time_col, self.date,
offset=0)
df = pd.DataFrame(data)

self.output().makedirs()
df.to_csv(self.output().path)

The task can essentially be used on any dataset Socrata provides—all you need is to specify resource and the column on which to query. The data parameter also defines the outcome path to the file. Now, we can run the task from the command line:

$ python -m luigi --module luigi_311 Collect311 --date 2019-06-07 --local-scheduler

We can now run this task even in batches, using the DateRange feature:

$ python -m luigi --module luigi_311 RangeDaily --of Collect311 --start 2019-06-01 --days-back 10  --local-scheduler

The preceding code will generate up to 10 tasks, one for each of the 10 days—today and 10 days back, but no further than June 1, 2019. This trick is especially nice for scheduling—even if something will prevent the task from running on the date, it will re-run the next day or the day after that, and so on.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.205.165