Implementation of the S3 driver

Storing files on a database is a side-effect: the action of storing an object on a database is not deterministic, and the result of this action does not only depend on the input arguments (it can fail for many reasons, such as the quota being exceeded or a network error). So, this feature must be implemented as a driver. For this driver, new imports are needed, as can be seen in the following example:

from collections import namedtuple
from io import BytesIO
from rx import Observable
import boto3
from boto3.session import Session

from cyclotron import Component

The boto3 package contains the AWS SDK, and the Session object will be used to configure the connection with the S3 database. The BytesIO class is needed to use the S3 upload function with a buffer as input. The other imports are the usual ones and use RxPY and Cyclotron. The next step consists of defining the API of this driver, that is, its Source and Sink definitions. This can be seen in the following example:

Source = namedtuple('Source', ['response'])
Sink = namedtuple('Sink', ['request'])

# Sink objects
Configure = namedtuple('Configure', [
'access_key', 'secret_key',
'bucket', 'endpoint_url', 'region_name'])

UploadObject = namedtuple('UploadObject', ['key', 'data', 'id'])

# Source objects
UploadReponse = namedtuple('UploadReponse', ['key', 'id'])

This driver uses only one observable as input and returns a single observable. So, the Source object contains only one field, named response, and the Sink object contains only one field, named request. The S3 driver takes a Sink object as input and returns a Source object.

There are two possible types of item in the Sink observable:

  • The Configure item contains all the information needed to connect to the S3 database. The access and secret keys are used to authenticate it to the S3 server. The bucket name is fixed and provided during configuration. The endpoint_url parameter contains the URL of the S3 server, and the region name must match the region operating the server.
  • The second type of item is UploadObject. It corresponds to a request to upload a new file on the database. The item's fields are key, used to identify the object, a buffer containing data from the audio file, and a request id.

There is one possible type of item in the Source response observable: UploadResponse. This item is emitted to notify when an upload request has been completed. It contains the key of the object that has been uploaded, and the request id provided in the UploadObject item.

Now the structure of the driver can be written, similarly to the one of the drivers implemented previously. This can be seen in the following example:

def make_driver():
def driver(sink):

def on_subscribe(observer):
sink.request.subscribe(
on_next=on_next,
on_error=lambda e: observer.on_error(e),
on_completed=lambda : observer.on_completed())

return Source(
response=Observable.create(on_subscribe)
)

return Component(call=driver, input=Sink)

The make_driver function is a factory that returns a Component object. The entry point of the Component is the driver function, and it takes a sink object as its input. The driver function is the actual driver implementation. It returns a Source object, with the response field being a custom observable. The on_subscribe function is the subscription function of this observable. When an observer subscribes to the response observable, the driver subscribes to the sink request observable. Once again, this pattern allows it to chain subscriptions. Note that error and completion coming from the request observable are forwarded to the response observable.

This means that, if the request observable completes for any reason, then the response observable will also complete, for the same reason. Finally, the content of the on_subscribe function can be implemented as a nested on_next function. This can be seen in the following example:

        def on_subscribe(observer):
client = None
bucket = None

def on_next(item):
nonlocal client
nonlocal bucket

if type(item) is Configure:
...
elif type(item) is UploadObject:
...
else:
observer.on_error(
"unknown item: {}".format(type(item)))

sink.request.subscribe(
on_next=on_next,
on_error=lambda e: observer.on_error(e),
on_completed=lambda : observer.on_completed())

Two variables are defined in the scope of the on_subscribe function: client and bucket. These variables are set when the Configure item is received, and they are used when an UploadObject item is received. Any other received object raises an error. When a Configure object is received, a boto3 client is created, as can be seen in the following example:

if type(item) is Configure:
session = Session(aws_access_key_id=item.access_key,
aws_secret_access_key=item.secret_key)
client = session.client('s3',
endpoint_url=item.endpoint_url,
region_name=item.region_name)
bucket = item.bucket

First, a session object is created from the credentials being received. Then a client for the S3 service is created. Since custom (non-AWS) endpoints must be supported, the endpoint URL is provided here. The region is also provided here. The last line just saves the bucket name for future reference.

When an UploadObject item is received, the corresponding audio file is uploaded on the database, as can be seen in the following example:

elif type(item) is UploadObject:
data = BytesIO(item.data)
client.upload_fileobj(data, bucket, item.key)
observer.on_next(UploadReponse(key=item.key, id=item.id))

The upload_fileobj method of the client object allows it to upload a file on the database. This function takes a file-like object as input, the name of the bucket, and the key used to identify the object. The data received in the UploadObject item contains bytes data. So, this data buffer must be converted to a file-like object. This is done with the BytesIO class which takes a bytes-like object as input and returns a file-like object as output. The BytesIO class is very interesting in this case because it allows us to create a file-like object without having to create a real file. Once the file is uploaded, an UploadResponse item is emitted on the observer of the response observable.

The full code for the S3 driver is available in the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of this book, in the file audio-encode-server-2/audio_encode_server/s3.py.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.136.18.218