© Matthew Wilkes 2020
M. WilkesAdvanced Python Developmenthttps://doi.org/10.1007/978-1-4842-5793-7_9

9. Viewing the data

Matthew Wilkes1 
(1)
Leeds, West Yorkshire, UK
 

We started investigating the types of queries we might be interested in at the end of the previous chapter, but we’ve not yet written any routines to help us make sense of the data we’re collecting. In this chapter, we return to Jupyter notebooks, this time as a data analysis tool rather than a prototyping aid.

IPython and Jupyter seamlessly support both synchronous and asynchronous function calls. We have a (mostly) free choice between the two types of API. As the rest of the apd.aggregation package is asynchronous, I recommend that we create some utility coroutines to extract and analyze data.

Query functions

A Jupyter notebook would be able to import and use SQLAlchemy functions freely, but that would require users to understand a lot about the internals of the aggregation system’s data structures. It would effectively mean that the tables and models that we’ve created become part of the public API, and any changes to them may mean incrementing the major version number and documenting changes for end-users.

Instead, let’s create some functions that return DataPoint records for users to interact with. This way, only the DataPoint objects and the function signatures are part of the API that we must maintain for people. We can always add more functions later, as we discover additional requirements.

To begin with, the most important feature that we need is the ability to find data records, ordered by the time they were collected. This lets users write some analysis code to analyze the values of the sensors over time. We may also want to filter this by the sensor type, the deployment identifier, and a date range.

We have to decide what form we want the function to have. Should it return a list or tuple of objects or an iterator? A tuple would allow us to easily count the number of items we retrieved and to iterate over the list multiple times. On the other hand, an iterator would allow us to minimize RAM use, which may help us support much larger data sets, but restricts us to only being able to iterate over the data once. We’ll create iterator functions, as they allow for more efficient code. The iterators can be converted to tuples by the calling code, so our users are able to choose to iterate over a tuple if they prefer.

Before we can write this function, we need a way for users to set up a database connection. As one of our aims is to hide the details of the database from our end-users, we don’t want to require using a SQLAlchemy function for this. The custom function we create (Listing 9-1) for connecting to the database can also set up context variables to represent our connection and avoid the need for an explicit session argument to all of our search functions.
import contextlib
from contextvars import ContextVar
import functools
import typing as t
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
db_session_var: ContextVar[Session] = ContextVar("db_session")
@contextlib.contextmanager
def with_database(uri: t.Optional[str] = None) -> t.Iterator[Session]:
    """Given a URI, set up a DB connection, and return a Session as a context manager """
    if uri is None:
        uri = "postgresql+psycopg2://localhost/apd"
    engine = create_engine(uri)
    sm = sessionmaker(engine)
    Session = sm()
    token = db_session_var.set(Session)
    try:
        yield Session
        Session.commit()
    finally:
        db_session_var.reset(token)
        Session.close()
Listing 9-1

query.py with a context manager to connect to the database

This function acts as a (synchronous) context manager, setting up a database connection and an associated session and both returning that session and setting it as the value of the db_session_var context variable before entering the body of the associated with block. It also unsets this session, commits any changes, and closes the session when the context manager exits. This ensures that there are no lingering locks in the database, that data is persisted, and that if functions that use the db_session_var variable can only be used inside the body of this context manager.

If we ensure that the environment that we’ve installed the aggregation package into is registered as a kernel with Jupyter, we can start to experiment with writing utility functions in a notebook. I’d also recommend installing some helper packages so we can more easily visualize the results.
> pipenv install ipython matplotlib
> pipenv run ipython kernel install --user --name="apd.aggregation"
We can now start a new Jupyter notebook (Listing 9-2), select the apd.aggregation kernel and connect to the database, using the new with_database(...) decorator. To test the connection, we can manually query the database using the resulting session and our datapoint_table object.
from apd.aggregation.query import with_database
from apd.aggregation.database import datapoint_table
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    print(session.query(datapoint_table).count())
Listing 9-2

Jupyter cell to find number of sensor records

We also need to write the function that returns DataPoint objects for the user to analyze. Eventually, we’ll have to deal with performance issues due to processing large amounts of data, but the first code you write to solve a problem should not be optimized, a naïve implementation is both easier to understand and more likely not to suffer from being too clever. We’ll look at some techniques for optimization in the next chapter.

Premature Optimization

Debugging is twice as hard as writing the code in the first place. Therefore, if you write the code as cleverly as possible, you are, by definition, not smart enough to debug it.

—Brian Kernighan

Python is not the fastest programming language; it can be tempting to write your code to minimize the inherent slowness, but I would strongly recommend fighting this urge. I’ve seen “highly optimized” code that takes an hour to execute, which, when replaced with a naïve implementation of the same logic, takes two minutes to complete.

It isn’t common, but when you make your code more elaborate, you’re making your job harder when it comes to improving it.

If you write the simplest version of a method, you can compare it to subsequent versions to determine if you’re making code faster or just more complex.

The first version of get_data() that we’ll implement is one that returns all the DataPoint objects in the database, without having to worry about dealing with any SQLAlchemy objects. We already decided that we would create a generator coroutine, rather than a function (or coroutine) that returns a list of DataPoint objects, so our initial implementation is the one in Listing 9-3.
async def get_data() -> t.AsyncIterator[DataPoint]:
    db_session = db_session_var.get()
    loop = asyncio.get_running_loop()
    query = db_session.query(datapoint_table)
    rows = await loop.run_in_executor(None, query.all)
    for row in rows:
        yield DataPoint.from_sql_result(row)
Listing 9-3

Simplest implementation of get_data()

This function gets the session from the context variable set up by with_database(...), builds a query object, and then runs that object’s all method using an executor, giving way to other tasks while the all method runs. Iterating over the query object rather than calling query.all() would cause database operations to be triggered as the loop runs, so we must be careful to only set up the query in asynchronous code and delegate the all() function call to the executor. The result of this is a list of SQLAlchemy’s lightweight result named tuples in the rows variable, which we can then iterate over yielding the matching DataPoint object.

As rows variable contains a list of all the result objects, we know that all the data has been processed by the database and parsed SQLAlchemy in the executor before execution passes back to our get_data() function . This means that we’re using all the RAM needed to store the full results set before the first DataPoint object is available to the end-user. Storing all this data when we don’t know that we need all of it is a little memory and time inefficient, but elaborate methods to paginate the data in the iterator would be an example of premature optimization. Don’t change this from the naïve approach until it becomes a problem.

We always have to deal with the memory and time overheads of retrieving the SQLAlchemy row objects, but the numbers in Table 9-1 give us an idea of how much overhead we are adding to the system by converting them to DataPoint classes. A million rows would involve an extra 152 megabytes of RAM and an additional 1.5 seconds of processing time. Both of these are well within the capacity of modern computers and appropriate for infrequent tasks, so they are not of immediate concern.
Table 9-1

Comparison of RAM usage and instantiation time for the SQLAlchemy row and our DataPoint class

Object

Size1

Time to instantiate2

SQLAlchemy result row

80 bytes

0.4 microseconds

DataPoint

152 bytes

1.5 microseconds

*Results may vary between Python implementations and processing power available

However, because we’re creating an iterator, there is no guarantee that our DataPoint objects will all be resident in memory at once. If the consuming code does not keep a reference to them, then they can be garbage collected immediately after they’re used. For example, in Listing 9-4 we use our two new helper functions to count the rows without any data point objects being resident in memory.
from apd.aggregation.query import with_database, get_data
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    count = 0
    async for datapoint in get_data():
        count += 1
    print(count)
Listing 9-4

Jupyter cell to count data points using our helper context manager

Merely counting the data points isn’t an interesting way of analyzing the data. We can start trying to make sense of the data by plotting values on a scatter plots. Let’s start with a simple sanity check, plotting the value of the RelativeHumidity sensor against date (Listing 9-5). This is a good one to start with, as the stored data is a floating-point number rather than a dictionary-based structure, so we don’t need to parse the values.

The matplotlib library is perhaps the most popular plotting library in Python. Its plot_date(...) function is a great fit for plotting a series of values against time. It takes a list of values for the x axis and a corresponding list of values for the y axis, as well as the style to be used when plotting a point3 and a flag to set which axis contains the date values. Our get_data(...) function doesn’t return what we need for the x and y parameters directly, it returns an async iterator of data point objects.

We can convert an async iterable of data point objects to a list of tuples containing date and value pairs from a single sensor using a list comprehension. At that point, we have a list of date and value pairs and can use the built-in zip(...)4 function to invert the grouping to a pair of lists, one for date and the other for value.
from apd.aggregation.query import with_database, get_data
from matplotlib import pyplot as plt
async def plot():
    points = [
        (dp.collected_at, dp.data)
        async for dp in get_data()
        if dp.sensor_name=="RelativeHumidity"
    ]
    x, y = zip(*points)
    plt.plot_date(x, y, "o", xdate=True)
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    await plot()
plt.show()
Listing 9-5

Relative humidity plotting jupyter cell, with the output chart it generates

../images/481001_1_En_9_Chapter/481001_1_En_9_Figa_HTML.jpg

Filtering data

It would be nice to filter the data in the query stage, rather than just discarding all sensor data that doesn’t meet our criteria when we are iterating through them. Right now, every piece of data is selected, a result object is created, then a DataPoint object, and only then are irrelevant entries skipped. To this end, we can add an additional parameter to the get_data(...) method that determines if a filter on sensor_data will be applied to the generated query.
async def get_data(sensor_name: t.Optional[str] = None) -> t.AsyncIterator[DataPoint]:
    db_session = db_session_var.get()
    loop = asyncio.get_running_loop()
    query = db_session.query(datapoint_table)
    if sensor_name:
        query = query.filter(datapoint_table.c.sensor_name == sensor_name)
    query = query.order_by(datapoint_table.c.collected_at)
This approach saves a lot of overhead, as it means that only the relevant sensor data points are passed to the end-user, but also it’s a more natural interface. Users expect to be able to specify what data they want, not to get absolutely all data and manually filter it. The version of the function in Listing 9-6 takes less than one second to execute with my sample data set (compared to over 3 seconds for the previous version) but shows the same chart.
from apd.aggregation.query import with_database, get_data
from matplotlib import pyplot as plt
async def plot():
    points = [(dp.collected_at, dp.data) async for dp in get_data(sensor_name="RelativeHumidity")]
    x, y = zip(*points)
    plt.plot_date(x, y, "o", xdate=True)
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    await plot()
plt.show()
Listing 9-6

Delegating filtering to the get_data function

This plotting function is short and not overly complex; it represents quite a natural interface for loading data from the database. The downside is that having multiple deployments mixed in together results in unclear charts, where there are multiple data points for a given time. Matplotlib supports calling plot_date(...) multiple times with different logical result sets, which are then displayed using different colors. Our users can achieve this by creating multiple point lists as they iterate over the results of the get_data(...) call, as shown in Listing 9-7.
import collections
from apd.aggregation.query import with_database, get_data
from matplotlib import pyplot as plt
async def plot():
    legends = collections.defaultdict(list)
    async for dp in get_data(sensor_name="RelativeHumidity"):
        legends[dp.deployment_id].append((dp.collected_at, dp.data))
    for deployment_id, points in legends.items():
        x, y = zip(*points)
        plt.plot_date(x, y, "o", xdate=True)
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    await plot()
plt.show()
Listing 9-7

Plotting all sensor deployments independently

../images/481001_1_En_9_Chapter/481001_1_En_9_Figb_HTML.jpg
This one again makes the interface unnatural; it would be more logical for end-users to iterate over deployments and then iterate over sensor data values, rather than iterate over all data points and organize them into lists manually. An alternative would be to create a new function that lists all the deployment ids, then allow get_data(...) to filter by deployment_id . This would allow us to loop over the individual deployments and make a new get_data(...) call to get only that deployment’s data. This is demonstrated in Listing 9-8.
async def get_deployment_ids():
    db_session = db_session_var.get()
    loop = asyncio.get_running_loop()
    query = db_session.query(datapoint_table.c.deployment_id).distinct()
    return [row.deployment_id for row in await loop.run_in_executor(None, query.all)]
async def get_data(
    sensor_name: t.Optional[str] = None,
    deployment_id: t.Optional[UUID] = None,
) -> t.AsyncIterator[DataPoint]:
    db_session = db_session_var.get()
    loop = asyncio.get_running_loop()
    query = db_session.query(datapoint_table)
    if sensor_name:
        query = query.filter(datapoint_table.c.sensor_name == sensor_name)
    if deployment_id:
        query = query.filter(datapoint_table.c.deployment_id == deployment_id)
    query = query.order_by(
        datapoint_table.c.collected_at,
    )
Listing 9-8

Extended data collection functions for deployment_id filtering

This new function can be used to loop over multiple calls to get_data(...), rather than the plot function looping and sorting the resulting data points into independent lists. Listing 9-9 demonstrates a very natural interface to looping over all the deployments for a single sensor, which behaves identically to the previous version.
import collections
from apd.aggregation.query import with_database, get_data, get_deployment_ids
from matplotlib import pyplot as plt
async def plot(deployment_id):
    points = []
    async for dp in get_data(sensor_name="RelativeHumidity", deployment_id=deployment_id):
        points.append((dp.collected_at, dp.data))
    x, y = zip(*points)
    plt.plot_date(x, y, "o", xdate=True)
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    deployment_ids = await get_deployment_ids()
    for deployment in deployment_ids:
        await plot(deployment)
plt.show()
Listing 9-9

Plotting all deploymens using the new helper functions

This approach allows the end-user to interrogate each deployment individually, so only the relevant data for a combination of sensor and deployment is loaded into RAM at once. It’s a perfectly appropriate API to offer the end-user.

Multilevel iterators

We previously reworked the interface for filtering by sensor name to do the filtering in the database to avoid iterating over unnecessary data. Our new deployment id filter isn’t used to exclude data we don’t need, it’s used to make it easier to loop over each logical group independently. We don’t need to use a filter here, we’re using one to make the interface more natural.

If you’ve worked with the itertools module in the standard library much, you may have used the groupby(...) function . This takes an iterator and a key function and returns an iterator of iterators, the first being the value of the key function and the second being a run of values that match the given result of the key function. This is the same problem we’ve been trying to solve by listing our deployments and then filtering the database query.

The key function given to groupby(...) is often a simple lambda expression, but it can be any function, such as one of the functions from the operator module. For example, operator.attrgetter("deployment_id") is equivalent to lambda obj: obj.deployment_id, and operator.itemgetter(2) is equivalent to lambda obj: obj[2].

For this example, we’ll define a key function that returns the value of an integer modulo 3 and a data() generator function that yields a fixed series of numbers, printing its status as it goes. This allows us to see clearly when the underlying iterator is advanced.
import itertools
import typing as t
def mod3(n: int) -> int:
    return n % 3
def data() -> t.Iterable[int]:
    for number in [0, 1, 4, 7, 2, 6, 9]:
        print(f"Yielding {number}")
        yield number
We can loop over the contents of the data() generator and print the value of the mod3 function, which lets us see that the first group has one item, then there’s a group of three items, then a group of one, then a group of two.
>>> print([mod3(number) for number in data()])
data() is starting
Yielding 0
Yielding 1
Yielding 4
Yielding 7
Yielding 2
Yielding 6
Yielding 9
data() is complete
[0, 1, 1, 1, 2, 0, 0]

Setting up a groupby does not consume the underlying iterable; each item it generates is processed as the groupby is iterated over. To work correctly, the groupby only needs to decide if the current item is in the same group as the previous one or if a new group has started, it doesn’t analyze the iterable as a whole. Items with the same value for the key function are only grouped together if they are a contiguous block in the input iterator, so it’s common to ensure that the underlying iterator is sorted to avoid splitting groups up.

By creating a groupby over our data with the mod3(...) key function, we can create a two-level loop, first iterating over the values of the key function, then iterating over the values from data() that produce that key value.
>>> for val, group in itertools.groupby(data(), mod3):
...     print(f"Starting new group where mod3(x)=={val}")
...     for number in group:
...         print(f"x=={number} mod3(x)=={mod3(val)}")
...     print(f"Group with mod3(x)=={val} is complete")
...
data() is starting
Yielding 0
Starting new group where mod3(x)==0
x==0 mod3(x)==0
Yielding 1
Group with mod3(x)==0 is complete
Starting new group where mod3(x)==1
x==1 mod3(x)==1
Yielding 4
x==4 mod3(x)==1
Yielding 7
x==7 mod3(x)==1
Yielding 2
Group with mod3(x)==1 is complete
Starting new group where mod3(x)==2
x==2 mod3(x)==2
Yielding 6
Group with mod3(x)==2 is complete
Starting new group where mod3(x)==0
x==6 mod3(x)==0
Yielding 9
x==9 mod3(x)==0
data() is complete
Group with mod3(x)==0 is complete

From the output of the print statements, we can see that the groupby only ever pulls one item at a time, but manages the iterators it provides in such a way that looping over the values is natural. Whenever the inner loop requests a new item, the groupby function requests a new item from the underlying iterator and then decides its behavior based on that value. If the key function reports the same value as the previous item, it yields the new value to the inner loop; otherwise, it signals that the inner loop is complete and holds the value until the next inner loop starts.

The iterators behave just as we’d expect if we had concrete lists of items; there is no requirement to iterate over the inner loop if we don’t need to. If we don’t iterate over the inner loop completely before advancing the outer loop, the groupby object will transparently advance the source iterable as though we had. In the following example, we skip the group of three where mod3(...)==1, and we can see that the underlying iterator is advanced three times by the groupby object:
>>> for val, group in itertools.groupby(data(), mod3):
...     print(f"Starting new group where mod3(x)=={val}")
...     if val == 1:
...         # Skip the ones
...         print("Skipping group")
...         continue
...     for number in group:
...         print(f"x=={number} mod3(x)=={mod3(val)}")
...     print(f"Group with mod3(x)=={val} is complete")
...
data() is starting
Yielding 0
Starting new group where mod3(x)==0
x==0 mod3(x)==0
Yielding 1
Group with mod3(x)==0 is complete
Starting new group where mod3(x)==1
Skipping group
Yielding 4
Yielding 7
Yielding 2
Starting new group where mod3(x)==2
x==2 mod3(x)==2
Yielding 6
Group with mod3(x)==2 is complete
Starting new group where mod3(x)==0
x==6 mod3(x)==0
Yielding 9
x==9 mod3(x)==0
data() is complete
Group with mod3(x)==0 is complete
The behavior is intuitive when we’re using it, but it can be hard to follow how it’s implemented. Figure 9-1 shows a pair of flow charts, one for the outer loop and one for each individual inner loop.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig1_HTML.jpg
Figure 9-1

Flow chart demonstrating how groupby works

If we had a standard iterator (as opposed to an asynchronous iterator), we could sort the data by deployment_id and use itertools.groupby(...) to simplify our code to handle multiple deployments without needing to query for the individual deployments. Rather than making a new get_data(...) call for each, we could iterate over the groups and handle the internal iterator in the same way we already do, using list comprehensions and zip(...).

Unfortunately, there is no fully asynchronous equivalent of groupby at the time of writing. While we can write a function that returns an async iterator whose values are UUID and async iterator of DataPoint pairs, there is no way of grouping these automatically.

At the risk of writing clever code, we can write an implementation of groupby that works with asynchronous code ourselves using closures. It would expose multiple iterators to the end-user that work on the same underlying iterator, in just the same way as itertools.groupby(...). It would be better to use a library function for this if one were available.

Each time we find a new value of the key function, we need to return a new generator function that maintains a reference to the underlying source iterator. This way, when someone advances an item iterator, it can choose to either yield the data point it receives or to indicate that it’s the end of the item iterator, as the groupby function does. Equally, if we advance the outer iterator before an item iterator has been consumed, it needs to “fast-forward” through the underlying iterator until the start of a new group is found.

The code in Listing 9-10 is a single function that delegates to our get data function and wraps it in the appropriate groupby logic, as opposed to a generic function that can adapt any iterator.
async def get_data_by_deployment(
    *args, **kwargs
) -> t.AsyncIterator[t.Tuple[UUID, t.AsyncIterator[DataPoint]]]:
    """Return an Async Iterator that contains two-item pairs.
    These pairs are a string (deployment_id), and an async iterator that contains
    the datapoints with that deployment_id.
    Usage example:
        async for deployment_id, datapoints in get_data_by_deployment():
            print(deployment_id)
            async for datapoint in datapoints:
                print(datapoint)
            print()
    """
    # Get the data, using the arguments to this function as filters
    data = get_data(*args, **kwargs)
    # The two levels of iterator share the item variable, initialise it # with the first item from the iterator. Also set last_deployment_id
    # to None, so the outer iterator knows to start a new group.
    last_deployment_id: t.Optional[UUID] = None
    try:
        item = await data.__anext__()
    except StopAsyncIteration:
        # There were no items in the underlying query, return immediately
        return
    async def subiterator(group_id: UUID) -> t.AsyncIterator[DataPoint]:
        """Using a closure, create an iterator that yields the current
        item, then yields all items from data while the deployment_id matches
        group_id, leaving the first that doesn't match as item in the enclosing
        scope."""
        # item is from the enclosing scope
        nonlocal item
        while item.deployment_id == group_id:
            # yield items from data while they match the group_id this iterator represents
            yield item
            try:
                # Advance the underlying iterator
                item = await data.__anext__()
            except StopAsyncIteration:
                # The underlying iterator came to an end, so end the subiterator too
                return
    while True:
        while item.deployment_id == last_deployment_id:
            # We are trying to advance the outer iterator while the
            # underlying iterator is still part-way through a group.# Speed through the underlying until we hit an item where
            # the deployment_id is different to the last one (or,
            # is not None, in the case of the start of the iterator)
            try:
                item = await data.__anext__()
            except StopAsyncIteration:
                # We hit the end of the underlying iterator: end this # iterator too
                return
        last_deployment_id = item.deployment_id
        # Instantiate a subiterator for this group
        yield last_deployment_id, subiterator(last_deployment_id)
Listing 9-10

An implementation of get_data_by_deployment that acts like an asynchronous groupby

This uses await data.__anext__() to advance the underlying data iterator, rather than an async for loop, to make the fact that the iterator is consumed in multiple places more obvious.

An implementation of this generator coroutine is in the code for this chapter. I’d encourage you to try adding print statements and breakpoints to it, to help understand the control flow. This code is more complex than most Python code you’ll need to write (and I’d caution you against introducing this level of complexity into production code; having it as a self-contained dependency is better), but if you can understand how it works, you’ll have a thorough grasp on the details of generator functions, asynchronous iterators, and closures. As asynchronous code is used more in production code, libraries to offer this kind of complex manipulation of iterators are sure to become available.

Additional filters

We’ve added get_data(...) filters for sensor_name and deployment_id, but it’s also useful to choose the range of time that’s being displayed. We can implement this with two datetime filters which are used to filter on the collected_at field. The implementation of get_data(...) that supports this is shown in Listing 9-11, but because get_data_by_deployment(...) passes all arguments through to get_data(...) unchanged, we don’t need to modify that function to allow date windows in our analysis.
async def get_data(
    sensor_name: t.Optional[str] = None,
    deployment_id: t.Optional[UUID] = None,
    collected_before: t.Optional[datetime.datetime] = None,
    collected_after: t.Optional[datetime.datetime] = None,
) -> t.AsyncIterator[DataPoint]:
    db_session = db_session_var.get()
    loop = asyncio.get_running_loop()
    query = db_session.query(datapoint_table)
    if sensor_name:
        query = query.filter(datapoint_table.c.sensor_name == sensor_name)
    if deployment_id:
        query = query.filter(datapoint_table.c.deployment_id == deployment_id)
    if collected_before:
        query = query.filter(datapoint_table.c.collected_at < collected_before)
    if collected_after:
        query = query.filter(datapoint_table.c.collected_at > collected_after)
    query = query.order_by(
        datapoint_table.c.deployment_id,
        datapoint_table.c.sensor_name,
        datapoint_table.c.collected_at,
    )
    rows = await loop.run_in_executor(None, query.all)
    for row in rows:
        yield DataPoint.from_sql_result(row)
Listing 9-11

get_data method with sensor, deployment, and date filters

Testing our query functions

The query functions need to be tested, just like any others. Unlike most of the functions we’ve written so far, the query functions take lots of optional arguments that significantly change the output of the returned data. Although we don’t need to test a wide range of values for each filter (we can trust that our database’s query support works correctly), we need to test that each option works as intended.

We need some setup fixtures to enable us to test functions that depend on a database being present. While we could mock the database connection out, I wouldn’t recommend this, as databases are very complex pieces of software and not well suited to being mocked out.

The most common approach to testing database applications is to create a new, empty database and allow the tests to control the creation of tables and data. Some database software, like SQLite, allows for new databases to be created on the fly, but most require the database to be set up in advance.

Given that we’re assuming there’s an empty database available to us, we need a fixture to connect to it, a fixture to set up the tables, and a fixture to set up the data. The connect fixture is very similar to the with_database context manager,5 and the function to populate the database will include sample data that we can insert using db_session.execute(datapoint_table.insert().values(...)).

The fixture to set up the database tables is the most difficult one. The easiest approach is to use metadata.create_all(...), as we did before we had introduced alembic for database migrations. This works fine for most applications, so it’s the best choice in general. Our application includes a database view that’s not managed by SQLAlchemy, but by a custom migration in Alembic. Therefore, we need to use Alembic’s upgrade functionality to set up our database tables. The relevant fixtures we need are given as Listing 9-12.
import datetime
from uuid import UUID
from apd.aggregation.database import datapoint_table
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.environment import EnvironmentContext
import pytest
@pytest.fixture
def db_uri():
    return "postgresql+psycopg2://apd@localhost/apd-test"
@pytest.fixture
def db_session(db_uri):
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    engine = create_engine(db_uri, echo=True)
    sm = sessionmaker(engine)
    Session = sm()
    yield Session
    Session.close()
@pytest.fixture
def migrated_db(db_uri, db_session):
    config = Config()
    config.set_main_option("script_location", "apd.aggregation:alembic")
    config.set_main_option("sqlalchemy.url", db_uri)
    script = ScriptDirectory.from_config(config)
    def upgrade(rev, context):
        return script._upgrade_revs(script.get_current_head(), rev)
    def downgrade(rev, context):
        return script._downgrade_revs(None, rev)
    with EnvironmentContext(config, script, fn=upgrade):
        script.run_env()
    try:
        yield
    finally:
        # Clear any pending work from the db_session connection
        db_session.rollback()
        with EnvironmentContext(config, script, fn=downgrade):
            script.run_env()
@pytest.fixture
def populated_db(migrated_db, db_session):
    datas = [
        {
            "id": 1,
            "sensor_name": "Test",
            "data": "1",
            "collected_at": datetime.datetime(2020, 4, 1, 12, 0, 1),
            "deployment_id": UUID("b4c68905-b1e4-4875-940e-69e5d27730fd"),
        },
        # Additional sample data omitted from listing for brevity's sake
    ]
    for data in datas:
        insert = datapoint_table.insert().values(**data)
        db_session.execute(insert)
Listing 9-12

Database setup fixtures

This gives us an environment where we can write tests that query a database that contains only known values, so we can write meaningful assertions.

Parameterized tests

Pytest has a special piece of functionality for generating multiple tests that do something very similar: the parameterize mark. If a test function is marked as parameterized, it can have additional arguments that do not correspond to fixtures, as well as a series of values for these parameters. The test function will be run multiple times, once for each different argument value function. We can use this feature to write functions that test various filtering methods of our functions without lots of duplication, as shown in Listing 9-13.
class TestGetData:
    @pytest.fixture
    def mut(self):
        return get_data
    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "filter,num_items_expected",
        [
        ({}, 9),
        ({"sensor_name": "Test"}, 7),
        ({"deployment_id": UUID("b4c68905-b1e4-4875-940e-69e5d27730fd")}, 5),
        ({"collected_after": datetime.datetime(2020, 4, 1, 12, 2, 1),}, 3),
        ({"collected_before": datetime.datetime(2020, 4, 1, 12, 2, 1),}, 4),
        (
            {
                "collected_after": datetime.datetime(2020, 4, 1, 12, 2, 1),
                "collected_before": datetime.datetime(2020, 4, 1, 12, 3, 5),
            },
            2,
        ),
        ],
    )
    async def test_iterate_over_items(
        self, mut, db_session, populated_db, filter, num_items_expected
    ):
        db_session_var.set(db_session)
        points = [dp async for dp in mut(**filter)]
        assert len(points) == num_items_expected
Listing 9-13

A parameterized get_data test to verify different filters

The first time this test is run, it has filter={}, num_items_expected=9 as parameters. The second run has filter={"sensor_name": "Test"}, num_items_expected=7, and so on. Each of these test functions will run independently and will be counted as a new passing or failing test, as appropriate.

This will result in six tests being generated, with names like TestGetData.test_iterate_over_items[filter5-2]. This name is based on the parameters, with complex parameter values (like filter) being represented by their name and the zero-based index into the list, and simpler parameters (like num_items_expected) included directly. Most of the time, you won’t need to care about the name, but it can be very helpful to identify which variant of a test is failing.

Displaying multiple sensors

We’ve now got three functions that help us connect to the database and iterate over DataPoint objects in a sensible order and with optional filtering. So far we’ve been using the matplotlib.pyplot.plot_dates(...) function to convert pairs of sensor values and dates to a single chart. This is a helper function that makes it easier to generate a plot by making various drawing functions available in a global namespace. It is not the recommended approach when making multiple charts.

We want to be able to loop over each of our sensor types and generate a chart for each. If we were to use the pyplot API, we would be constrained to using a single plot, with the highest values skewing the axes to make the lowest impossible to read. Instead, we want to generate an independent plot for each and show them side by side. For this, we can use the matplotlib.pyplot.figure(...) and figure.add_subplot(...) functions. A subplot is an object which behaves broadly like matplotlib.pyplot but representing a single plot inside a larger grid of plots. For example, figure.add_subplot(3,2,4) would be the fourth plot in a three-row, two-column grid of plots.

Right now, our plot(...) function assumes that the data it is working with is a number, which can be passed directly to matplotlib for display on our chart. Many of our sensors have different data formats though, such as the temperature sensor which has a dictionary of temperature and the unit being used as its value attribute. These different values need to be converted to numbers before they can be plotted.

We can refactor our plotting function out to a utility function in apd.aggregation to vastly simplify our Jupyter notebooks, but we need to ensure that it can be used with other formats of sensor data. Each plot needs to provide some configuration for the sensor to be graphed, a subplot object to draw the plot in, and a mapping from deployment ids to a user-facing name for populating the plot’s legend. It should also accept the same filtering arguments as get_data(...), to allow users to constrain their charts by date or deployment id.

We’ll pass this config data as an instance of a data class, which also contains a reference to a “clean” function. This clean function is what’s responsible for converting a DataPoint instance to a pair of values that can be plotted by matplotlib. The clean function must transform an iterable of DataPoint objects to an iterable of (x, y) pairs that matplotlib can understand. For RelativeHumidity and RAMAvailable sensors, this is a simple matter of yielding the date/float tuple, like our code has done so far.
async def clean_passthrough(
    datapoints: t.AsyncIterator[DataPoint],
) -> t.AsyncIterator[t.Tuple[datetime.datetime, float]]:
    async for datapoint in datapoints:
        if datapoint.data is None:
            continue
        else:
            yield datapoint.collected_at, datapoint.data

The config data class also needs some string parameters, such as the title of the chart, the axis labels, and the sensor_name that needs to be passed to get_data(...) in order to find the data needed for this chart. Once we have the Config class defined, we can create two config objects that represent the two sensors which use raw floating-point numbers as their value type and a function to return all registered configs.

Combining the figure functions from matplotlib with our new config system allows us to write a new plot_sensor(...) function (Listing 9-14) that can generate any number of charts using only a few simple lines of code in the Jupyter notebook.
@dataclasses.dataclass(frozen=True)
class Config:
    title: str
    sensor_name: str
    clean: t.Callable[[t.AsyncIterator[DataPoint]], t.AsyncIterator[t.Tuple[datetime.datetime, float]]]
    ylabel: str
configs = (
    Config(
        sensor_name="RAMAvailable",
        clean=clean_passthrough,
        title="RAM available",
        ylabel="Bytes",
    ),
    Config(
        sensor_name="RelativeHumidity",
        clean=clean_passthrough,
        title="Relative humidity",
        ylabel="Percent",
    ),
)
def get_known_configs() -> t.Dict[str, Config]:
    return {config.title: config for config in configs}
async def plot_sensor(config: Config, plot: t.Any, location_names: t.Dict[UUID,str], **kwargs) -> t.Any:
    locations = []
    async for deployment, query_results in get_data_by_deployment(sensor_name=config.sensor_name, **kwargs):
        points = [dp async for dp in config['clean'](query_results)]
        if not points:
            continue
        locations.append(deployment)
        x, y = zip(*points)
        plot.set_title(config['title'])
        plot.set_ylabel(config['ylabel'])
        plot.plot_date(x, y, "-", xdate=True)
    plot.legend([location_names.get(l, l) for l in locations])
    return plot
Listing 9-14

New config objects and plot function that uses it

With these new functions in place, we can modify the Jupyter notebook cell to call the plot_sensor(...) function instead of writing our own plotting function in Jupyter. The code that an end-user of apd.aggregation needs to write to connect to the database and render two charts (shown as Listing 9-15) is significantly shorter, thanks to these helper functions.
import asyncio
from matplotlib import pyplot as plt
from apd.aggregation.query import with_database
from apd.aggregation.analysis import get_known_configs, plot_sensor
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    coros = []
    figure = plt.figure(figsize = (20, 5), dpi=300)
    configs = get_known_configs()
    to_display = configs["Relative humidity"], configs["RAM available"]
    for i, config in enumerate(to_display, start=1):
        plot = figure.add_subplot(1, 2, i)
        coros.append(plot_sensor(config, plot, {}))
    await asyncio.gather(*coros)
display(figure)
Listing 9-15

Jupyter cell to plot both Humidity and RAM Available, and their output

../images/481001_1_En_9_Chapter/481001_1_En_9_Figc_HTML.jpg
As the Temperature and SolarCumulativeOutput sensors return serialized objects from the pint package in the format {'unit': 'degC', 'magnitude': 8.4}, we can’t use these with our existing clean_passthrough() function ; we need to create a new one. The simplest is to assume that the units are always the same and extract the magnitude line only. This would chart any temperatures in a different scale incorrectly, as the units are not being corrected. For now, all of our sensors return values in degrees centigrade, so this isn’t a serious concern.
async def clean_magnitude(datapoints):
    async for datapoint in datapoints:
        if datapoint.data is None:
            continue
        yield datapoint.collected_at, datapoint.data["magnitude"]
If we use this new cleaner function to add a new config object for temperature, we see the chart in Figure 9-2. It’s clear from this data we can see that the temperature sensor is not entirely reliable: the temperature in my office rarely exceeds the melting point of steel.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig2_HTML.jpg
Figure 9-2

Temperature sensor output with obvious errors skewing the data

Processing data

An advantage of the approach that we’ve taken is that we can perform relatively arbitrary transforms on the data that we’re given, allowing us to discard data points that we consider to be incorrect. It’s often better to discard data when analyzing than during collection, as bugs in the function to check a data point’s validity won’t cause data loss if it’s only checked during analysis. We can always delete incorrect data after the fact, but we can never recollect data that we chose to ignore.

One way of fixing this problem with the temperature sensor would be to make the clean iterator look at a moving window on the underlying data rather than just one DataPoint at a time. This way, it can use the neighbors of a sensor value to discard values that are too different.

The collections.deque type is useful for this, as it offers a structure with an optional maximum size, so we can add each temperature we find to the deque, but when reading it, we only see the last n entries that were added. A deque can have items added or removed from either the left or right edges, so it’s essential to be consistent about adding and popping from the same end when using it as a limited window.

We can begin by filtering out any values that are out of the supported range of the DHT22 sensors,6 to remove the most egregious incorrect data. This removes many, but not all, of the incorrect readings. A simple way of filtering out single item peaks is to have a three-item window and yield the middle item unless it is too different to the average of the temperatures on either side, as shown in Listing 9-16. We don’t want to remove all legitimate fluctuations, so our definition of “not too different” must take into account that a run of readings such as 21c, 22c, 21c are legitimate while excluding runs such as 20c, 60c, 23c.
async def clean_temperature_fluctuations(
    datapoints: t.AsyncIterator[DataPoint],
) -> t.AsyncIterator[t.Tuple[datetime.datetime, float]]:
    allowed_jitter = 2.5
    allowed_range = (-40, 80)
    window_datapoints: t.Deque[DataPoint] = collections.deque(maxlen=3)
    def datapoint_ok(datapoint: DataPoint) -> bool:
        """Return False if this data point does not contain a valid temperature"""
        if datapoint.data is None:
            return False
        elif datapoint.data["unit"] != "degC":
            # This point is in a different temperature system. While it # could be converted
            # this cleaner is not yet doing that.
            return False
        elif not allowed_range[0] < datapoint.data["magnitude"] < allowed_range[1]:
            return False
        return True
    async for datapoint in datapoints:
        if not datapoint_ok(datapoint):
            # If the datapoint is invalid then skip directly to the next item
            continue
        window_datapoints.append(datapoint)
        if len(three_temperatures) == 3:
            # Find the temperatures of the datapoints in the window, then # average
            # the first and last and compare that to the middle point.
            window_temperatures = [dp.data["magnitude"] for dp in window_datapoints]
            avg_first_last = (window_temperatures[0] + window_temperatures[2]) / 2
            diff_middle_avg = abs(window_temperatures[1] - avg_first_last)
            if diff_middle_avg > allowed_jitter:
                pass
            else:
                yield window_datapoints[1].collected_at, window_temperatures[1]
        else:
            # The first two items in the iterator can't be compared to both # neighbors
            # so they should be yielded
            yield datapoint.collected_at, datapoint.data["magnitude"]
    # When the iterator ends the final item is not yet in the middle
    # of the window, so the last item must be explicitly yielded
    if datapoint_ok(datapoint):
        yield datapoint.collected_at, datapoint.data["magnitude"]
Listing 9-16

An example implementation of a cleaner function for temperature

This cleaner function produces a much smoother temperature trend, as demonstrated in Figure 9-3. The cleaner filters out any data points where the temperature could not be found as well as any severe errors. It is retaining fine detail of temperature trends; as the window contains the last three data points recorded (even those which were not excluded from the data set), a sudden change in temperature will start to be reflected in the output data so long as it persists for at least two consecutive readings.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig3_HTML.jpg
Figure 9-3

Result of the same data with an appropriate cleaner

Exercise 9-1: Add a Cleaner For Solarcumulativeoutput

The SolarCumulativeOutput sensor returns a number of watt-hours, serialized in the same way as the temperature sensor. If we chart this, we see an upward trending line that moves in irregular steps. It would be much more useful to see the power generated at a moment in time rather than the total up until that time.

To achieve this, we need to convert watt-hours to watts, which means dividing the number of watt-hours by the amount of time between data points.

Write a clean_watthours_to_watts(...) iterator coroutine that keeps track of the last time and watt-hour readings, finds the difference, and then returns watts divided by time elapsed.

For example, the following two date and value pairs should result in a single output entry at 1pm with a value of 5.0.
[
    (datetime.datetime(2020, 4, 1, 12, 0, 0), {"magnitude": 1.0, "unit": "watt_hour"}),
    (datetime.datetime(2020, 4, 1, 13, 0, 0), {"magnitude": 6.0, "unit": "watt_hour"})
]

The code accompanying this chapter contains a work environment for this exercise, consisting of a test setup with a series of unit tests for this function but no implementation. There is also an implementation of the cleaner as part of the final code for this chapter.

With these cleaners and config entries in place for solar power and temperature, we can draw a 2x2 grid of charts. As the charts are now showing the desired data, it’s a good time to improve readability by adding in values for deployment names, which are passed as the final argument to plot_sensor(...) in Listing 9-17.
import asyncio
from uuid import UUID
from matplotlib import pyplot as plt
from apd.aggregation.query import with_database
from apd.aggregation.analysis import get_known_configs, plot_sensor
location_names = {
 UUID('53998a51-60de-48ae-b71a-5c37cd1455f2'): "Loft",
 UUID('1bc63cda-e223-48bc-93c2-c1f651779d69'): "Living Room",
 UUID('ea0683de-6772-4678-bfe7-6014f54ffc8e'): "Office",
 UUID('5aaa901a-7564-41fb-8eba-50cdd6fe9f80'): "Outside",
}
with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
    coros = []
    figure = plt.figure(figsize = (20, 10), dpi=300)
    configs = get_known_configs().values()
    for i, config in enumerate(configs, start=1):
        plot = figure.add_subplot(2, 2, i)
        coros.append(plot_sensor(config, plot, location_names))
    await asyncio.gather(*coros)
display(figure)
Listing 9-17

Final Jupyter cell to display 2x2 grid of charts

../images/481001_1_En_9_Chapter/481001_1_En_9_Figd_HTML.jpg

Interactivity with Jupyter widgets

So far, our code to generate the charts has no interactivity available to the end-user. We are currently displaying all data points ever recorded, but it would be handy to be able to filter to only show a time period without needing to modify the code to generate the chart.

To do this, we add an optional dependency on ipywidgets, using the extras_require functionality of setup.cfg, and reinstall the apd.aggregation package in our environment using pipenv install -e .[jupyter].

You may also need to run the following, to ensure that the system-wide Jupyter installation has the support functionality for widgets enabled:
> pip install --user widgetsnbextension
> jupyter nbextension enable --py widgetsnbextension

With this installed, we can request that Jupyter create interactive widgets for each argument and call the function with the user-selected values. Interactivity allows the person viewing the notebook to choose arbitrary input values without needing to modify the code for the cell or even understand the code.

Figure 9-4 shows an example of a function which adds two integers and which has been connected to Jupyter’s interactivity support. In this case, the two integer arguments are given a default value of 100 and are rendered as sliders. Users can manipulate these sliders, and the result of the function is recomputed automatically.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig4_HTML.jpg
Figure 9-4

Interactive view of an addition function

Multiply nested synchronous and asynchronous code

We can’t pass coroutines to the interactive(...) function as it’s defined to expect a standard, synchronous function. It's a synchronous function itself, so it’s not even possible for it to await the result of a coroutine call. Although IPython and Jupyter allow await constructs in places where they aren’t usually permitted, this is done by wrapping the cell in a coroutine7 and scheduling it as a task; it is not deep magic that truly marries synchronous and asynchronous code, it’s a hack for convenience.

Our plotting code involves awaiting the plot_sensor(...) coroutine , so Jupyter must wrap the cell into a coroutine. Coroutines can only be called by coroutines or directly on an event loop’s run(...) function, so asynchronous code generally grows to the point that the entire application is asynchronous. It’s a lot easier to have a group of functions that are all synchronous or all asynchronous than it is to mix the two approaches.

We can’t do that here because we need to provide a function to interactive(...), over which we have no control of the implementation. The way we get around this problem is that we must convert the coroutine into a new synchronous method. We don’t want to rewrite all the code to a synchronous style just to accommodate the interactive(...) function, so a wrapper function to bridge the gap is a better fit.

The coroutine requires access to an event loop that it can use to schedule tasks and which is responsible for scheduling it. The existing event loop we have won’t do, as it is busy executing the coroutine that’s waiting for interactive(...) to return. If you recall, it’s the await keyword that implements cooperative multitasking in asyncio, so our code will only ever switch between different tasks when it hits an await expression.

If we are running a coroutine, we can await another coroutine or task, which allows the event loop to execute other code. Execution won’t return to our code until the function that was being awaited has completed execution, but other coroutines can run in the meantime. We can call synchronous code like interactive(...) from an asynchronous context, but that code can introduce blocking. As this blocking is not blocking on an await statement, execution cannot be passed to another coroutine during this period. Calling any synchronous function from an asynchronous function is equivalent to guaranteeing that a block of code does not contain an await statement, which guarantees that no other coroutine’s code will be run.

Until now, we have used the asyncio.run(...) function to start a coroutine from synchronous code and block waiting for its result, but we’re already inside a call to asyncio.run(main()) so we cannot do this again.8 As the interactive(...) call is blocking without an await expression, our wrapper will be running in a context where it’s guaranteed that no coroutine code can run. Although the wrapper function that we use to convert our asynchronous coroutine to a synchronous function must arrange for that coroutine to be executed, it cannot rely on the existing event loop to do this.

To make this explicit, imagine a function that takes two functions as arguments, as shown in Listing 9-18. These functions both return an integer. This function invokes both of the functions that were passed as arguments, adds the results, and then returns the sum of those integers. If all the functions involved are synchronous, there are no problems.
import typing as t
def add_number_from_callback(a: t.Callable[[], int], b: t.Callable[[], int]) -> int:
    return a() + b()
def constant() -> int:
    return 5
print(add_number_from_callback(constant, constant))
Listing 9-18

Example of calling only synchronous functions from a synchronous context

We can even call this add_number_from_callback(...) function from an asynchronous context and get the right result, with the caveat that add_number_from_callback(...) blocks the entire process, potentially negating the benefits of asynchronous code.
async def main() -> None:
    print(add_number_from_callback(constant, constant))
asyncio.run(main())
Our particular invocation is low risk because we know that there are no IO requests which could potentially block for a long time. However, we might want to add a new function that returns a number from a HTTP request. If we already had a coroutine to get the result of a HTTP request, we might want to use this rather than reimplementing this as a synchronous function. An example of a coroutine to get the number (in this case from the random.org random number generator service) is as follows:
import aiohttp
async def async_get_number_from_HTTP_request() -> int:
    uri = "https://www.random.org/integers/?num=1&min=1&max=100&col=1" "&base=10&format=plain"
    async with aiohttp.ClientSession() as http:
        response = await http.get(uri)
        return int(await response.text())

As this is a coroutine, we can’t pass it directly to the add_number_from_callback(...) function. If we were to try, we’d see Python error TypeError: unsupported operand type(s) for +: 'int' and 'coroutine'.9

You might write a wrapper function for async_get_number_from_HTTP_request to create a new task that we can wait for, but that would submit the coroutine to the existing event loop, which we’ve already decided isn’t a possible solution. We would have no way of awaiting this task, as it’s not valid to use await in a synchronous function, and it’s not valid to call asyncio.run(...) in a nested fashion. The only way of waiting for this would be to loop doing nothing until the task is complete, but this loop prevents the event loop from scheduling the task, resulting in a contradiction.
def get_number_from_HTTP_request() -> int:
    task = asyncio.create_task(async_get_number_from_HTTP_request())
    while not task.done():
        pass
    return task.result()

The main() task constantly loops over the task.done() check, never hitting an await statement and so never giving way to the async_get_number_from_HTTP_request() task. This function results in a deadlock.

Tip

It’s also possible to create blocking asynchronous code with any long-running loop that doesn’t contain an explicit await statement or an implicit one such as async for and async with.

You shouldn’t need to write a loop that checks for another coroutine’s data, as we’ve done here. You should await that coroutine rather than looping. If you do ever need a loop with no awaits inside, you can explicitly give the event loop a chance to switch into other tasks by awaiting a function that does nothing, such as await asyncio.sleep(0), so long as you’re looping in a coroutine rather than a synchronous function that a coroutine called.

We can’t convert the entire call stack to the asynchronous idiom, so the only remaining way around this problem is to start a second event loop, allowing the two tasks to run in parallel. We’ve blocked our current event loop, but we can start a second one to execute the asynchronous HTTP code.

This approach makes it possible to call async code from synchronous contexts, but all tasks scheduled in the main event loop are still blocked waiting for the HTTP response. This only solves the problem of deadlocks when mixing synchronous and asynchronous code; the performance penalty is still in place. You should avoid mixing synchronous and asynchronous code wherever possible. The resulting code is difficult to understand, can introduce deadlocks, and negates the performance benefits of asyncio.

A helper function that takes a coroutine and executes it in a new thread, without involving the currently running event loop, is given as Listing 9-19. This also includes a coroutine that makes use of this wrapper to pass the HTTP coroutine as though it were a synchronous function.
def wrap_coroutine(f):
    @functools.wraps(f)
    def run_in_thread(*args, **kwargs):
        loop = asyncio.new_event_loop()
        wrapped = f(*args, **kwargs)
        with ThreadPoolExecutor(max_workers=1) as pool:
            task = pool.submit(loop.run_until_complete, wrapped)
        return task.result()
    return run_in_thread
async def main() -> None:
    print(
        add_number_from_callback(
            constant, wrap_coroutine(async_get_number_from_HTTP_request)
        )
    )
Listing 9-19

Wrapper function to start a second event loop and delegate new async tasks there

We can use this same approach to allow our plot_sensor(...) coroutine to be used in an interactive(...) function call, as shown in Listing 9-20.
import asyncio
from uuid import UUID
import ipywidgets as widgets
from matplotlib import pyplot as plt
from apd.aggregation.query import with_database
from apd.aggregation.analysis import (get_known_configs, plot_sensor, wrap_coroutine)
@wrap_coroutine
async def plot(*args, **kwargs):
    location_names = {
     UUID('53998a51-60de-48ae-b71a-5c37cd1455f2'): "Loft",
     UUID('1bc63cda-e223-48bc-93c2-c1f651779d69'): "Living Room",
     UUID('ea0683de-6772-4678-bfe7-6014f54ffc8e'): "Office",
     UUID('5aaa901a-7564-41fb-8eba-50cdd6fe9f80'): "Outside",
    }
    with with_database("postgresql+psycopg2://apd@localhost/apd") as session:
        coros = []
        figure = plt.figure(figsize = (20, 10), dpi=300)
        configs = get_known_configs().values()
        for i, config in enumerate(configs, start=1):
            plot = figure.add_subplot(2, 2, i)
            coros.append(plot_sensor(config, plot, location_names, *args, **kwargs))
        await asyncio.gather(*coros)
    return figure
start = widgets.DatePicker(
    description='Start date',
)
end = widgets.DatePicker(
    description='End date',
)
out = widgets.interactive(plot, collected_after=start, collected_before=end)
display(out)
Listing 9-20

Interactive chart filtering example, with output shown

../images/481001_1_En_9_Chapter/481001_1_En_9_Fige_HTML.jpg

Tidying up

We now have lots of complex logic in the Jupyter cell. We should move this to some more general utility functions so that end-users don’t need to deal with the details of how to plot charts. We don’t want users to have to deal with the details of converting coroutines to wrapped functions to pass to the interactive system, so we can provide a helper function for them to use, as in Listing 9-21.
async def plot_multiple_charts(*args: t.Any, **kwargs: t.Any) -> Figure:
    # These parameters are pulled from kwargs to avoid confusing function
    # introspection code in IPython widgets
    location_names = kwargs.pop("location_names", None)
    configs = kwargs.pop("configs", None)
    dimensions = kwargs.pop("dimensions", None)
    db_uri = kwargs.pop("db_uri", "postgresql+psycopg2://apd@localhost/apd")
    with with_database(db_uri):
        coros = []
        if configs is None:
            # If no configs are supplied, use all known configs
            configs = get_known_configs().values()
        if dimensions is None:
            # If no dimensions are supplied, get the square root of the # number
            # of configs and round it to find a number of columns. This will
            # keep the arrangement approximately square. Find rows by
            # multiplying out rows.
            total_configs = len(configs)
            columns = round(math.sqrt(total_configs))
            rows = math.ceil(total_configs / columns)
        figure = plt.figure(figsize=(10 * columns, 5 * rows), dpi=300)
        for i, config in enumerate(configs, start=1):
            plot = figure.add_subplot(columns, rows, i)
            coros.append(plot_sensor(config, plot, location_names, *args, **kwargs))
        await asyncio.gather(*coros)
    return figure
def interactable_plot_multiple_charts(
    *args: t.Any, **kwargs: t.Any
) -> t.Callable[..., Figure]:
    with_config = functools.partial(plot_multiple_charts, *args, **kwargs)
    return wrap_coroutine(with_config)
Listing 9-21

Genericized versions of the plot functions

This leaves us with Jupyter code that instantiates the widgets and the location names, then calls interactable_plot_multiple_charts(...) to generate the function to pass to the interactive(...) function. The resulting Jupyter cell, which is equivalent to the previous implementation but significantly shorter, is as follows:
import ipywidgets as widgets
from apd.aggregation.analysis import interactable_plot_multiple_charts
plot = interactable_plot_multiple_charts(location_names=location_names)
out = widgets.interact(plot, collected_after=start, collected_before=end)
display(out)

Persistent endpoints

The next logical piece of cleanup we could do is to move the configuration of endpoints to a new database table. This would allow us to automatically generate the location_names variable, ensure the colors used on each chart are consistent across invocations, and also let us update all sensor endpoints without having to pass their URLs each time.

To do this, we’ll create a new database table and data class to represent a deployment of apd.sensors. We also need command-line utilities to add and edit the deployment metadata, utility functions to get the data, and tests for all of this.

Exercise 9-2: Implement Stored Deployments

The changes involved in storing deployments in the database require creating new tables, new console scripts, migrations, and some work on tests.

Implement any or all of the following features, according to what you would find useful:
  • Deployment object and table that contains id, name, URI, and API key

  • Command-line scripts to add, edit, and list deployments

  • Tests for the command-line scripts

  • Make servers and api_key arguments to collect_sensor_data optional, using the stored values if omitted

  • Helper function to get a deployment record by its ID

  • An additional field for the deployment table for the color that should be used to plot its data

  • Modifications to plot functions to use a deployment’s name and line color directly from its database record

All of these are included in the same implementation that accompanies this chapter.

Charting maps and geographic data

We’ve been focused on xy plots of value against time in this chapter, as it represents the test data we’ve been retrieving. Sometimes we need to plot data against other axes. The most common of these is latitude against longitude, so the plot resembles a map.

If we extract the latitude and longitude items from the data set (say, a dictionary mapping coordinates to a temperature record for places around Great Britain), we can pass these as the arguments to plot(...) to see them visualized, as shown in Listing 9-22.
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
lats = [ll[0] for ll in datapoints.keys()]
lons = [ll[1] for ll in datapoints.keys()]
ax.plot(lons, lats, "o")
plt.show()
Listing 9-22

Plotting lat/lons using matplotlib, and the resulting chart

../images/481001_1_En_9_Chapter/481001_1_En_9_Figf_HTML.jpg
The shape of the data is only very approximately like an outline of Great Britain, which is shown in Figure 9-5. Most people who look at this plot would not recognize it as such.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig5_HTML.jpg
Figure 9-5

Outline of Great Britain, the island that comprises England, Wales, and Scotland

The distortion is because we’ve plotted this according to the equirectangular map projection, where latitude and longitude are an equally spaced grid that does not take the shape of the earth into account. There is no one correct map projection; it very much depends on what the map’s intended use is.

We need the map to look familiar to most people, who will be very familiar with the outline of whatever country they live in. We want people who look at it to look at the data, not the unusual projection. The most commonly used projection is the Mercator projection, which the OpenStreetMap (OSM) project provides implementations for in many programming languages, including Python.10 The merc_x(...) and merc_y(...) functions to implement the projection won’t be included in the listings, as they’re rather complex mathematical functions.

Tip

When drawing maps that show areas of hundreds of square kilometers, it becomes more and more important to use a projection function, but for small-scale maps, it’s possible to provide a more familiar view using the ax.set_aspect(...) function . Changing the aspect ratio moves the point where distortion is at a minimum from the equator to another latitude; it doesn’t correct for the distortion. For example, ax.set_aspect(1.7) would move the point of least distortion to 54 degrees latitude, as 1.7 is equal to 1 / cos(54).

With the projection functions available, we can re-run the plotting function and see that the points match up much more closely with the outline that we expect, as shown in Figure 9-6. In this case, the labels on the axes no longer show coordinates; they show meaningless numbers. We should ignore these labels for now.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig6_HTML.jpg
Figure 9-6

Map using the merc_x and merc_y projections from OSM

New plot types

This only shows us the position of each data point, not the value associated with it. The plotting functions we’ve used so far all plot two values, the x and y coordinates. While we could label the plot points with temperatures, or color code with a scale, the resulting chart isn’t very easy to read. Instead, there are some other plot types in matplotlib that can help us: specifically tricontourf(...). The tricontour family of plotting functions take three-dimensional input of (x, y, value) and interpolate between them to create a plot with areas of color representing a range of values.

While the tricontour functions plot the color areas, we should also plot the points where the measurements were taken, albeit less prominently (Listing 9-23). This works the same way as plotting multiple data sets on a chart; we can call the various plot functions as many times as needed to display all the data; they do not need to be the same type of plot, so long as the axes are compatible.
fig, ax = plt.subplots()
lats = [ll[0] for ll in datapoints.keys()]
lons = [ll[1] for ll in datapoints.keys()]
temperatures = tuple(datapoints.values())
x = tuple(map(merc_x, lons))
y = tuple(map(merc_y, lats))
ax.tricontourf(x, y, temperatures)
ax.plot(x, y, 'wo', ms=3)
ax.set_aspect(1.0)
plt.show()
Listing 9-23

Color contours and scatter on the same plot

../images/481001_1_En_9_Chapter/481001_1_En_9_Figg_HTML.jpg
This is understandable once we know what we’re looking at, but we can improve it further by plotting the coastline of the island of Great Britain on the map. Given the list of coordinates representing the coastline of Great Britain,11 we can make a final call to the plot function, this time specifying that we want to draw a line rather than dots. The final version (Figure 9-7) of our plot is much easier to read, especially if we enable drawing a legend by calling plt.colorbar(tcf) where tcf is the result of the ax.tricontourf(...) function call.
../images/481001_1_En_9_Chapter/481001_1_En_9_Fig7_HTML.jpg
Figure 9-7

Plot of temperatures around Great Britain on a typical winter’s day

Tip

There are lots of GIS libraries for Python and Matplotlib that make more complex maps easier. If you’re planning on drawing lots of maps, I’d encourage you to look at Fiona and Shapely for manipulating points and polygons easily. I strongly recommend these libraries to anyone working with geographic information in Python; they’re very powerful indeed.

The basemap toolkit for matplotlib offers very flexible map drawing tools, but the maintainers have decided against distributing it like a standard Python package so I am unable to recommend it as a general solution to map drawing.

Supporting map type charts in apd.aggregation

We need to make some changes to our config object to support these maps, as they behave differently to all the other plots we’ve made so far. Previously, we’ve iterated over deployments and drawn a single plot for each deployment, representing a single sensor. To draw a map, we’d need to combine two values (coordinate and temperature) and draw a single plot representing all deployments. It’s possible that our individual deployments would move around and would provide a coordinate sensor to record where they were at a given time. A custom cleaner function alone would not be sufficient to combine the values of multiple datapoints.

Backward compatibility in data classes

Our Config object contains a sensor_name parameter , which filters the output of the get_data_by_deployment(...) function call as part of the drawing process. We need to override this part of the system; we no longer want to pass a single parameter to the get_data_by_deployment(...) function; we want to be able to replace the entire call with custom filtering.

The sensor_name= parameter has been made optional and the type changed to an InitVar. We’ve also added a new get_data parameter, which is an optional callable with the same shape as get_data_by_deployment(...). InitVars are another useful feature of data classes, allowing parameters to be specified which are not stored but are available in a post-creation hook called __post_init__(...). In our case, shown in Listing 9-24, we can define such a hook to set up the new get_data= variable based on sensor_name=, maintaining backward compatibility with implementations that only pass a sensor_name=.
@dataclasses.dataclass
class Config:
    title: str
    clean: t.Callable[[t.AsyncIterator[DataPoint]], t.AsyncIterator[t.Tuple[datetime.datetime, float]]]
    get_data: t.Optional[
        t.Callable[..., t.AsyncIterator[t.Tuple[UUID, t.AsyncIterator[DataPoint]]]]
    ] = None
    ylabel: str
    sensor_name: dataclasses.InitVar[str] = None
    def __post_init__(self, sensor_name=None):
        if self.get_data is None:
            if sensor_name is None:
                raise ValueError("You must specify either get_data or sensor_name")
            self.get_data = get_one_sensor_by_deployment(sensor_name)
def get_one_sensor_by_deployment(sensor_name):
    return functools.partial(get_data_by_deployment, sensor_name=sensor_name)
Listing 9-24

Data class with get_data parameter and backward compatibility hook

The __post_init__(...) function is called automatically, passing any InitVar attributes to it. As we are setting get_data in the __post_init__ method, we need to ensure that the data class is not frozen, as this counts as a modification.

This change allows us to change which data is passed to the clean(...) function, but that function still expects to return a time and float tuple to be passed into the plot_date(...) function. We need to change the shape of the clean(...) function.

We will no longer only be using plot_date(...) to draw our points; some types of chart require contours and points, so we must also add another customization point to choose how data are plotted. The new draw attribute of the Config class provides this function.

To support these new function call signatures, we need to make Config a generic class, as shown in Listing 9-25. This makes it possible to specify the underlying data of the Config object (or have the type system infer it from context). The existing data types are of the type Config[datetime.datetime, float], but our map Config will be Config[t.Tuple[float, float], float]. That is, some configs plot a float against a date, others plot a float against a pair of floats.
plot_key = t.TypeVar("plot_key")
plot_value = t.TypeVar("plot_value")
@dataclasses.dataclass
class Config(t.Generic[plot_key, plot_value]):
    title: str
    clean: t.Callable[
        [t.AsyncIterator[DataPoint]], t.AsyncIterator[t.Tuple[plot_key, plot_value]]
    ]
    draw: t.Optional[
        t.Callable[
            [t.Any, t.Iterable[plot_key], t.Iterable[plot_value], t.Optional[str]], None
        ]
    ] = None
    get_data: t.Optional[
        t.Callable[..., t.AsyncIterator[t.Tuple[UUID, t.AsyncIterator[DataPoint]]]]
    ] = None
    ylabel: t.Optional[str] = None
    sensor_name: dataclasses.InitVar[str] = None
    def __post_init__(self, sensor_name=None):
        if self.draw is None:
            self.draw = draw_date
        if self.get_data is None:
            if sensor_name is None:
                raise ValueError("You must specify either get_data or sensor_name")
            self.get_data = get_one_sensor_by_deployment(sensor_name)
Listing 9-25

A generic Config type

The Config class has lots of complex typing information in it now. This does have real benefits, though: the following code raises a typing error:
Config(
    sensor_name="Temperature",
    clean=clean_temperature_fluctuations,
    title="Ambient temperature",
    ylabel="Degrees C",
    draw=draw_map,
)

It also gives us confidence when we read the code; we know that the argument and return types of functions as specified match up. As this code involves lots of manipulating of data structures into iterators of iterators of tuples (etc.), it is easy to get confused about exactly what’s required. This is a perfect use case for typing hints.

We expect users to be creating custom configuration objects with custom draw and clean methods. Having reliable typing information lets them find subtle errors much more quickly.

The config.get_data(...) and config.draw(...) functions we need to handle our existing two plot types are refactoring of code that we’ve already examined in depth in this chapter, but they are available to view in the code that accompanies this chapter for those who are interested in the details.

Drawing a custom map using the new configs

The changes to Config allow us to define map-based configurations, but our current data doesn’t include any data that can be drawn as a map because none of our deployments includes a location sensor. We can use the new config.get_data(...) option to generate some static data rather than real, aggregated data to demonstrate the functionality. We can also add the custom coastline line by extending the draw_map(...) function (Listing 9-26).
def get_literal_data():
    # Get manually entered temperature data, as our particular deployment
    # does not contain data of this shape
    raw_data = {...}
    now = datetime.datetime.now()
    async def points():
        for (coord, temp) in raw_data.items():
            deployment_id = uuid.uuid4()
            yield DataPoint(sensor_name="Location", deployment_id=deployment_id,
            collected_at=now, data=coord)
            yield DataPoint(sensor_name="Temperature", deployment_id=deployment_id,
            collected_at=now, data=temp)
    async def deployments(*args, **kwargs):
        yield None, points()
    return deployments
def draw_map_with_gb(plot, x, y, colour):
    # Draw the map and add an explicit coastline
    gb_boundary = [...]
    draw_map(plot, x, y, colour)
    plot.plot(
        [merc_x(coord[0]) for coord in gb_boundary],
        [merc_y(coord[1]) for coord in gb_boundary],
        "k-",
    )
country = Config(
    get_data=get_literal_data(),
    clean=get_map_cleaner_for("Temperature"),
    title="Country wide temperature",
    ylabel="",
    draw=draw_map_with_gb,
)
out = widgets.interactive(interactable_plot_multiple_charts(configs=configs + (country, )), collected_after=start, collected_before=end)
Listing 9-26

Jupyter function to draw a custom map chart along with the registered charts

../images/481001_1_En_9_Chapter/481001_1_En_9_Figh_HTML.jpg
Exercise 9-3: Add a Bar Chart For Cumulative Solar Power

We wrote a cleaner for the solar generation data to convert it to momentary power instead of cumulative power. This makes it much more evident when power is being generated over time, but it makes understanding the amount generated each day harder.

Write a new cleaner that returns cumulative power per day and a new draw function that displays this as a bar chart.

As always, the code accompanying this chapter includes a starting point and a sample completed version.

Summary

In this chapter, we’ve returned to Jupyter for the purpose that people are most familiar with, rather than purely as a prototyping tool. We’ve also used Matplotlib here, which many users of Jupyter will have come across already. Together, these two make a formidable tool for communicating data analysis outcomes.

We’ve written lots of helper functions to make it easy for people to build custom interfaces in Jupyter to view the data we are aggregating. This has allowed us to define a public-facing API while allowing us lots of flexibility to change the way things are implemented. A good API for end-users is vital for retaining users, so it’s worth spending the time on.

The final version of the accompanying code for this chapter includes all the functions we’ve built up, many of which contain long blocks of sample data. Some of these were too long to include in print, so I recommend that you take a look at the code samples and try them out.

Finally, we’ve looked at some more advanced uses of some technologies we’ve used already, including using the __post_init__(...) hook of data classes to preserve backward compatibility when default arguments do not suffice, and more complex combinations of synchronous and asynchronous code.

Additional resources

The following links provide additional background information on the subjects covered in this chapter:
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.217.253.199