Testing AsyncIO code

Python does not contain any specific features or modules to test asynchronous code. However, this is not really needed: it is possible to use the unittest module to test AsyncIO code quite easily. Each test can instantiate a new event loop, do the testing on it, and stop the event loop. Once the event loop is stopped, assertions can be done to check whether the test has passed or failed, as can be seen in the following example:

class AsyncIOTestCase(TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
self.loop.stop()
self.loop.close()

def test_example(self):
future = self.loop.create_future()

expected_result = [42]
expected_error = None
actual_result = []
actual_error = None

def on_next(i):
actual_result.append(i)

def on_error(e):
nonlocal actual_error
actual_error = e

def on_completed():
self.loop.stop()

Observable.from_future(future).subscribe(
on_next=on_next,
on_error=on_error,
on_completed=on_completed
)

self.assertEqual([], actual_result)
self.loop.call_soon(lambda: future.set_result(42))
self.loop.run_forever()
self.assertEqual(expected_result, actual_result)

The setUp and tearDown methods are called respectively before and after each test has been executed. The setUp method creates a new event loop. The tearDown method stops and closes the event loop. The event loop should already be stopped by the test at this step, but this is a safeguard to ensure that the event loop is stopped even if a test fails.

In this test, the event loop runs forever. It is, however, stopped as soon as the observable completes, which is an easy way to test that the on_completed event has been received. Before the event loop starts, a coroutine is scheduled from lambda wrapping future. When future completes, then the observable completes and the test can continue. Also, note the presence of a first assertion after the subscription: since nothing should have happened before future is completed, this assert checks that nothing has happened yet.

This example is available in the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of this book, in the test_asyncio.py script.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.131.10