Implementation of the encoder driver

The encoder driver is implemented on top of the sox library. The API of this library is simple, which makes it well suited for this study. However, it works only on files and not buffers, which would be an issue for a production-grade asynchronous application. This issue is ignored in this chapter to focus on the overall structure of the application. However, the next chapter will explain how to deal with this. Ensure that the sox package is installed on your system before running the server. The installation instructions are available in the Technical requirements section of this chapter. Before going into the driver implementation, let's start by writing the function that will actually perform the audio conversion. Here is its code:

import sox

def mp3_to_flac(data, dest_path, name):
tmp_filename = '/tmp/transcode-tmp.mp3'
dest_filename = os.path.join(dest_path, name + '.flac')
with open(tmp_filename, 'wb') as content_file:
size = content_file.write(data)
status = 0 if size == len(data) else -1
transformer = sox.Transformer()
transformer.convert(samplerate=16000, n_channels=2, bitdepth=16)
transformer.build(tmp_filename, dest_filename)
return dest_filename

The sox library is available in the sox module. To use the sox library, one has to create a transformer object, associate audio manipulations to it, and finally apply these manipulations to an input file. The result is written to an output file. The mp3_to_flac function takes three parameters as input:

  • The data of the MP3 file to convert
  • The path where the result must be saved
  • The name of the converted file

The first two lines of the conversion function build a temporary file path and the destination file path. The temporary file path is used to store MP3 data so that sox can convert it. Then, the data is written to the temporary file. After that comes the encoding part. A transformer object is created and a conversion is done. In this case, the audio is converted to 16 KHz, stereo, with 16 bits per sample. Finally, calling the build method on the transformer does the audio conversion. The converted audio is available in the dest_filename file. This name is returned by mp3_to_flac. The sox library determines the format of the input and output files from their extension. This is why the filenames are constructed with the .mp3 and .flac extensions.

The next step is to define the APIs of the drivers. This means defining the Source and Sink objects that they deal with. Here is what is needed in this driver:

Source = namedtuple('Source', ['response'])
Sink = namedtuple('Sink', ['request'])

# Sink events
Initialize = namedtuple('Initialize', ['storage_path'])
EncodeMp3 = namedtuple('Encode', ['id', 'data', 'key'])

# Source events
EncodeResult = namedtuple('EncodeResult', ['id', 'file'])

The Source and Sink objects are only composed of one observable each:

  • A response observable for Source
  • A request observable for Sink

There are two possible Sink request items:

  • The initialization of the driver
  • A request to encode some data

The initialization item contains the path in which to store the encoded files. The encoding request contains the id of the request, the data of the audio file, and a key. The id of the request will be provided back on the response. This allows the user of the encoding driver to match responses with previous requests. The key is the name to use when encoding the file. There is only one source item, which is sent each time a file has been converted. This item contains two fields: the id of the request and the name of the converted file, including its absolute storage path.

Now the skeleton of the driver can be written:

def make_driver():
def encoder(sink):
return Source(
response=Observable.create(on_subscribe)
)

return Component(call=encoder, input=Sink)

The make_driver function is a factory to create the driver object. This function returns a Component object whose entry point is the encoder function, with input parameters of type Sink. The encoder function is the actual implementation of the driver. It takes a Sink object as input and returns a Source object. The response observable is created with a subscription callback whose implementation is nested in the encoder function:

def on_subscribe(observer):
storage_path = None

def on_next(item):
if type(item) is Initialize:
nonlocal storage_path
storage_path = item.storage_path
elif type(item) is EncodeMp3:
encoded_file = mp3_to_flac(
item.data, storage_path, name=item.key)
observer.on_next(
EncodeResult(id=item.id, file=encoded_file))
else:
observer.on_error("unknown item: {}".format(type(item)))

sink.request.subscribe(
on_next=on_next,
on_error=lambda e: observer.on_error(e),
on_completed=lambda: observer.on_completed(),
)

Let's start by studying the end of this function. When an observer subscribes to the response observable, the driver subscribes to the request observable. This allows us to chain subscriptions in an upstream way. When the final observer subscribes, this starts a subscription chain up to the source observable. The request subscription forwards errors and completion to the response observable. This allows us to propagate errors and completion. Since there is no special action to take in these cases, the driver just forwards these events.

At the beginning of the function, a variable is declared to store the storage path of the encoded files. Finally, the handling of the request items is done in the on_next function. The two types of item are handled, and any other object raises an error. On receipt of the Initialize item, the storage path is updated. On reception of an EncodeMp3 item, the mp3_to_flac function is called, and an EncodeResult object is sent to the observer. The driver is now ready to be used by the application!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.71.211