Expanding Luigi with custom template classes

In the previous section, we used the CopyToTable class as the template instead of luigi.Task. In fact, this is a good pattern to use! If there is any custom configuration or code you can use from one task to another, feel free to create a custom task class of your own. For example, in our practice, we use a custom S3Task class, similar to the one that follows:

from luigi.contrib.s3 import S3Client, S3Target
import pandas as pd
from io import StringIO, BytesIO

class S3Task(luigi.Task):
client = S3Client()

def _upload_csv(df, path):
content = df.to_csv(float_format="%.3f", index=None)
self.client.put_string(
content=content, destination_s3_path=path,
ContentType="text/csv"
)


def _upload_binary(self, df):
format_ = path.split(".")[-1]
funcs = {"msg": "to_msgpack", "fth": "to_feather", "pkl":
"to_pickle"}

if format_ not in funcs:
raise ValueError(
f"format {format_} is not supported yet, should be one
of {funks.keys()}"
)

buffer = BytesIO()
getattr(df, funcs[format_])(buffer, **kwargs)
buffer.seek(0)

bucket, key = self.client._path_to_bucket_and_key(path)
self.client.s3.meta.client.upload_fileobj(
Fileobj=buffer,
Bucket=bucket,
Key=key,
ExtraArgs={"ContentType": "application/octet-stream"},
)

This class has an S3 client by default and can easily write both CSV and binary formats to the cloud, given the dataframe. You might want to expand customization even further. For example, for a special type of task, make sure the data lands on a proper path and the need for a task-specific code is minimal:

class NYCOD(S3Task):
resource:str = None # resource to pull
timecol:str = 'CreationDate'
project:str = 'Undefined'
date = luigi.DateParameter(default=date.today())
s3_path:str = 's3://mybucket/{project}/{date:%Y/%m/%d}.csv

def output(self):
path = self.s3_path.format(project=self.project,
date=self.date)
return S3Target(path, client=self.client)

def run(self):
data = _get_data(self.resource, self.time_col, self.date,
offset=0)
df = pd.DataFrame(data)
self._upload_csv(df, self.output().path)

Let's say you have a few tasks collecting data from the NYC OpenData portal. All of them are scheduled and you want to store a CSV file for each day and for each project. Then, we can wrap more shared code in a class on top of our S3Task.

With that template, our 311 complaint collection task will be quite short:

class Collect311(NYCOD):
time_col = "Created Date"
resource = "fhrw-4uyv"
project = '311'

And, even better, we can create tasks for other datasets from the portal with the same four lines! For example, here is a task that will collect and properly store building permits:

class CollectBuildingPermits(NYCOD):
time_col = "Issued Date"
resource = "rbx6-tga4"
project = 'building_permits'

As you can see, these tasks are now ridiculously short and simple to write, all thanks to the thorough layers of class inheritance. It's not only the concise form—the unification of tasks allows us to concentrate solutions in one place. It makes it easy to test and maintain the code, make it DRY, and change the behavior of all of the corresponding pipelines, at once.

Ease of customization is one of the advantages of Luigi. A thorough arsenal of custom tasks will significantly boost your development pace. All in all, adopting luigi will solidify your processes and make you work on the new stuff, not plumbing the same leaking data pipes—making it true to its name.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.86.154