Implementing an inotify driver

The structure of this driver is the same as the previous ones. The first step is to declare the types used in it:

Source = namedtuple('Source', ['response'])
Sink = namedtuple('Sink', ['request'])

# Sink objects
AddWatch = namedtuple('AddWatch', ['id', 'path', 'flags'])
Start = namedtuple('Start', [])

# Source objects
Event = namedtuple('Event', ['id', 'path'])

This driver has one source observable and one sink observable. However, the API of this driver is not based on request/response. The inputs are requests, but the output is a continuous observable of inotify events. So a single request triggers many item emissions on the sink observable.

There are two possible sink items:

  • The AddWatch item adds a new file to watch on the watcher associated with the driver
  • The Start item starts the watcher

There is a single Event sink item that is emitted each time an inotify event is received.

The driver factory accepts a loop argument because the reference to the asyncio event loop is needed in this driver. When no loop is provided, then the current event loop is being used:

def make_driver(loop = None):
loop = asyncio.get_event_loop() if loop is None else loop

Then the subscription function of the driver follows the same pattern as was previously used:

def on_subscribe(observer):
watcher = aionotify.Watcher()

def on_next(item):
if type(item) is AddWatch:
watcher.watch(alias=item.id, path=item.path,
flags=item.flags)
elif type(item) is Start:
asyncio.ensure_future(read_events())
else:
observer.on_error("unknown item: {}".format(type(item)))

sink.request.subscribe(
on_next=on_next,
on_error=lambda e: observer.on_error(e)

The watcher object is created on subscription to the source observable. This object is configured each time an AddWatch item is received, and the driver starts waiting for events when the Start item is received. Note that there is no on_completed handler on the sink observer. This driver should continue to send events even if the sink observable completes. Stopping the emission should be done when the source observable is disposed of (this is not implemented here to simplify the code). Waiting for the watcher events occurs in the read_events coroutine. Here is its implementation:

async def read_events():
nonlocal observer
await watcher.setup(loop)
while True:
event = await watcher.get_event()
loop.call_soon(observer.on_next, Event(id=event.alias, path=event.name))
watcher.close()

This coroutine is also nested in on_subscribe. It first starts by enabling the watcher object by calling the setup method. This method requires the event loop as a parameter. Then an infinite loop waits for events and forwards events to the source observer. In this example, the coroutine contains an infinite loop. This loop should stop when the source observable is disposed of so that the coroutine can complete.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.119.81