8

Creating Coroutines, Events, and Message-Driven Transactions

The FastAPI framework is an asynchronous framework that runs over the asyncio platform, which utilizes the ASGI protocol. It is well known for its 100% support for asynchronous endpoints and non-blocking tasks. This chapter will focus on how we create highly scalable applications with asynchronous tasks and event-driven and message-driven transactions.

We learned in Chapter 2, Exploring the Core Features, that Async/Await or asynchronous programming is a design pattern that enables other services or transactions to run outside the main thread. The framework uses the async keyword to create asynchronous processes that will run on top of other thread pools and will be awaited, instead of invoking them directly. The number of external threads is defined during the Uvicorn server startup through the --worker option.

In this chapter, we will delve into the framework and scrutinize the various components of the FastAPI Framework that can run asynchronously using multiple threads. The following highlights will help us understand how asynchronous FastAPI is:

  • Implementing coroutines
  • Creating asynchronous background tasks
  • Understanding Celery tasks
  • Building message-driven transactions using RabbitMQ
  • Building publish/subscribe messaging using Kafka
  • Applying reactive programming in tasks
  • Customizing events
  • Implementing asynchronous Server-Sent Events (SSE)
  • Building an asynchronous WebSocket

Technical requirements

This chapter will cover asynchronous features, software specifications, and the components of a newsstand management system prototype. The discussions will use this online newspaper management system prototype as a specimen to understand, explore, and implement asynchronous transactions that will manage the newspaper content, subscription, billing, user profiles, customers, and other business-related transactions. The code has all been uploaded to https://github.com/PacktPublishing/Building-Python-Microservices-with-FastAPI under the ch08 project.

Implementing coroutines

In the FastAPI framework, a thread pool is always present to execute both synchronous API and non-API transactions for every request. For ideal cases where both the transactions have minimal performance overhead with CPU-bound and I/O-bound transactions, the overall performance of using the FastAPI framework is still better than those frameworks that use non-ASGI-based platforms. However, when contention occurs due to high CPU-bound traffic or heavy CPU workloads, the performance of FastAPI starts to wane due to thread switching.

Thread switching is a context switch from one thread to another within the same process. So, if we have several transactions with varying workloads running in the background and on the browser, FastAPI will run these transactions in the thread pool with several context switches. This scenario will cause contention and degradation to lighter workloads. To avoid performance issues, we apply coroutine switching instead of threads.

Applying coroutine switching

The FastAPI framework works at the optimum speed with a mechanism called coroutine switching. This approach allows transaction-tuned tasks to work cooperatively by allowing other running processes to pause so that the thread can execute and finish more urgent tasks, and resume "awaited" transactions without preempting the thread. These coroutine switches are programmer-defined components and not kernel-related or memory-related features. In FastAPI, there are two ways of implementing coroutines: (a) applying the @asyncio.coroutine decorator, and (b) using the async/await construct.

Applying @asyncio.coroutine

asyncio is a Python extension that implements the Python concurrency paradigm using a single-threaded and single-process model and provides API classes and methods for running and managing coroutines. This extension provides an @asyncio.coroutine decorator that transforms API and native services into generator-based coroutines. However, this is an old approach and can only be used in FastAPI that uses Python 3.9 and below. The following is a login service transaction of our newsstand management system prototype implemented as a coroutine:

@asyncio.coroutine
def build_user_list(query_list):
    user_list = []
    for record in query_list:
        yield from asyncio.sleep(2)
        user_list.append(" ".join([str(record.id), 
            record.username, record.password]))
    return user_list

build_user_list() is a native service that converts all login records into the str format. It is decorated with the @asyncio.coroutine decorator to transform the transaction into an asynchronous task or coroutine. A coroutine can invoke another coroutine function or method using only the yield from clause. This construct pauses the coroutine and passes the control of the thread to the coroutine function invoked. By the way, the asyncio.sleep() method is one of the most widely used asynchronous utilities of the asyncio module, which can pause a process for a few seconds, but is not the ideal one. On the other hand, the following code is an API service implemented as a coroutine that can minimize contention and performance degradation in client-side executions:

@router.get("/login/list/all")
@asyncio.coroutine
def list_login():
    repo = LoginRepository()
    result = yield from repo.get_all_login()
    data = jsonable_encoder(result)
    return data

The list_login() API service retrieves all the login details of the application’s users through a coroutine CRUD transaction implemented in GINO ORM. The API service again uses the yield from clause to run and execute the get_all_login() coroutine function.

A coroutine function can invoke and await multiple coroutines concurrently using the asyncio.gather() utility. This asyncio method manages a list of coroutines and waits until all its coroutines have completed their tasks. Then, it will return a list of results from the corresponding coroutines. The following code is an API that retrieves login records through an asynchronous CRUD transaction and then invokes count_login() and build_user_list() concurrently to process these records:

@router.get("/login/list/records")
@asyncio.coroutine
def list_login_records():
    repo = LoginRepository()
    login_data = yield from repo.get_all_login()
    result = yield from 
       asyncio.gather(count_login(login_data), 
            build_user_list(login_data))
    data = jsonable_encoder(result[1])
    return {'num_rec': result[0], 'user_list': data}

list_login_records() uses asyncio.gather() to run the count_login() and build_user_list() tasks and later extract their corresponding returned values for processing.

Using the async/await construct

Another way of implementing a coroutine is using async/await constructs. As with the previous approach, this syntax creates a task that can pause anytime during its operation before it reaches the end. But the kind of coroutine that this approach produces is called a native coroutine, which is not iterable in the way that the generator type is. The async/await syntax also allows the creation of other asynchronous components such as the async with context managers and async for iterators. The following code is the count_login() task previously invoked in the generator-based coroutine service, list_login_records():

async def count_login(query_list):
    await asyncio.sleep(2)
    return len(query_list)

The count_login() native service is a native coroutine because of the async keyword placed before its method definition. It only uses await to invoke other coroutines. The await keyword suspends the execution of the current coroutine and passes the control of the thread to the invoked coroutine function. After the invoked coroutine finishes its process, the thread control will yield back to the caller coroutine. Using the yield from construct instead of await will raise an error because our coroutine here is not generator-based. The following is an API service implemented as a native coroutine that manages data entry for the new administrator profiles:

@router.post("/admin/add")
async def add_admin(req: AdminReq):
    admin_dict = req.dict(exclude_unset=True)
    repo = AdminRepository()
    result = await repo.insert_admin(admin_dict)
    if result == True: 
        return req 
    else: 
        return JSONResponse(content={'message':'update 
            trainer profile problem encountered'}, 
              status_code=500)

Both generator-based and native coroutines are monitored and managed by an event loop, which represents an infinite loop inside a thread. Technically, it is an object found in the thread, and each thread in the thread pool can only have one event loop, which contains a list of helper objects called tasks. Each task, pre-generated or manually created, executes one coroutine. For instance, when the previous add_admin() API service invokes the insert_admin() coroutine transaction, the event loop will suspend add_admin() and tag its task as an awaited task. Afterward, the event loop will assign a task to run the insert_admin() transaction. Once the task has completed its execution, it will yield the control back to add_admin(). The thread that manages the FastAPI application is not interrupted during these shifts of execution since it is the event loop and its tasks that participate in the coroutine switching mechanism. Let us now use these coroutines to build our application

Designing asynchronous transactions

There are a few programming paradigms that we can follow when creating coroutines for our application. Utilizing more coroutine switching in the process can help improve the software performance. In our newsstand application, there is an endpoint, /admin/login/list/enc, in the admin.py router that returns a list of encrypted user details. In its API service, shown in the following code, each record is managed by an extract_enc_admin_profile() transaction call instead of passing the whole data record to a single call, thus allowing the concurrent executions of tasks. This strategy is better than running the bulk of transactions in a thread without context switches:

@router.get("/admin/login/list/enc")
async def generate_encypted_profile():
    repo = AdminLoginRepository()
    result = await repo.join_login_admin()
    encoded_data = await asyncio.gather(
       *(extract_enc_admin_profile(rec) for rec in result))
    return encoded_data

Now, the extract_enc_admin_profile() coroutine, shown in the following code, implements a chaining design pattern, where it calls the other smaller coroutines through a chain. Simplifying and breaking down the monolithic and complex processes into smaller but more robust coroutines will improve the application’s performance by utilizing more context switches. In this API, extract_enc_admin_profile() creates three context switches in a chain, better than thread switches:

async def extract_enc_admin_profile(admin_rec):
    p = await extract_profile(admin_rec)
    pinfo = await extract_condensed(p)
    encp = await decrypt_profile(pinfo)
    return encp

On the other hand, the following implementation is the smaller subroutines awaited and executed by extract_enc_admin_profile():

async def extract_profile(admin_details):
    profile = {}
    login = admin_details.parent
    profile['firstname'] = admin_details.firstname
    … … … … … …
    profile['password'] = login.password 
    await asyncio.sleep(1)
    return profile
async def extract_condensed(profiles):
    profile_info = " ".join([profiles['firstname'], 
       profiles['lastname'], profiles['username'], 
       profiles['password']])
    await asyncio.sleep(1)
    return profile_info 
async def decrypt_profile(profile_info):
    key = Fernet.generate_key()
    fernet = Fernet(key)
    encoded_profile = fernet.encrypt(profile_info.encode())
    return encoded_profile

These three subroutines will give the main coroutine the encrypted str that contains the details of an administrator profile. All these encrypted strings will be collated by the API service using the asyncio.gather() utility.

Another programming approach to utilizing the coroutine switching is the use of pipelines created by asyncio.Queue. In this programming design, the queue structure is the common point between two tasks: (a) the task that will place a value to the queue called the producer, and (b) the task that will fetch the item from the queue, the consumer. We can implement a one producer/one consumer interaction or a multiple producers/multiple consumers setup with this approach.

The following code highlights the process_billing() native service that builds a producer/consumer transaction flow. The extract_billing() coroutine is the producer that retrieves the billing records from the database and passes each record one at a time to the queue. build_billing_sheet(), on the other hand, is the consumer that fetches the record from the queue structure and generates the billing sheet:

async def process_billing(query_list):
    billing_list = []
    
    async def extract_billing(qlist, q: Queue):
        assigned_billing = {}
        for record in qlist:
            await asyncio.sleep(2)
            assigned_billing['admin_name'] = "{} {}"
              .format(record.firstname, record.lastname)
            if not len(record.children) == 0:
                assigned_billing['billing_items'] = 
                      record.children
            else:
                assigned_billing['billing_items'] = None
            
            await q.put(assigned_billing)
    async def build_billing_sheet(q: Queue):
        while True: 
            await asyncio.sleep(2)
            assigned_billing = await q.get()
            name = assigned_billing['admin_name']
            billing_items = 
                assigned_billing['billing_items']
            if not billing_items == None:
                for item in billing_items:
                    billing_list.append(
                    {'admin_name': name, 'billing': item})
            else: 
                billing_list.append(
                    {'admin_name': name, 'billing': None})
            q.task_done()

In this programming design, the build_billing() coroutine will explicitly wait for the record queued by extract_billing(). This setup is possible due to the asyncio.create_task() utility, which directly assigns and schedules a task to each coroutine.

The queue is the only method parameter common to the coroutines because it is their common point. The join()of asyncio.Queue ensures that all the items passed to the pipeline by extract_billing() are fetched and processed by build_billing_sheet(). It also blocks the external controls that would affect the coroutine interactions. The following code shows how to create asyncio.Queue and schedule a task for execution:

    q = asyncio.Queue()
    build_sheet = asyncio.create_task(
               build_billing_sheet(q))
    await asyncio.gather(asyncio.create_task(
             extract_billing(query_list, q)))
    
    await q.join()
    build_sheet.cancel()
    return billing_list

By the way, always pass cancel()to the task right after its coroutine has completed the process. On the other hand, we can also apply other ways so that the performance of our coroutines can improve.

Using the HTTP/2 protocol

Coroutine execution can be faster in applications running on the HTTP/2 protocol. We can replace the Uvicorn server with Hypercorn, which now supports ASGI-based frameworks such as FastAPI. But first, we need to install hypercorn using pip:

pip install hypercorn

For HTTP/2 to work, we need to create an SSL certificate. Using OpenSSL, our app has two PEM files for our newsstand prototype: (a) the private encryption (key.pem) and (b) the certificate information (cert.pem.) We place these files in the main project folder before executing the following hypercorn command to run our FastAPI application:

hypercorn --keyfile key.pem --certfile cert.pem main:app       --bind 'localhost:8000' --reload

Now, let us explore other FastAPI tasks that can also use coroutines.

Creating asynchronous background tasks

In Chapter 2, Exploring the Core Features, we first showcased the BackgroundTasks injectable API class, but we didn’t mention creating asynchronous background tasks. In this discussion, we will be focusing on creating asynchronous background tasks using the asyncio module and coroutines.

Using the coroutines

The framework supports the creation and execution of asynchronous background processes using the async/await structure. The following native service is an asynchronous transaction that generates a billing sheet in CSV format in the background:

async def generate_billing_sheet(billing_date, query_list):
    filepath = os.getcwd() + '/data/billing-' + 
                  str(billing_date) +'.csv'
    with open(filepath, mode="a") as sheet:
        for vendor in query_list:
            billing = vendor.children
            for record in billing:
                if billing_date == record.date_billed:
                    entry = ";".join(
             [str(record.date_billed), vendor.account_name, 
              vendor.account_number, str(record.payable),
              str(record.total_issues) ])
                    sheet.write(entry)
                await asyncio.sleep(1) 

This generate_billing_sheet() coroutine service will be executed as a background task in the following API service, save_vendor_billing():

@router.post("/billing/save/csv")
async def save_vendor_billing(billing_date:date, 
              tasks: BackgroundTasks):
    repo = BillingVendorRepository()
    result = await repo.join_vendor_billing()
    tasks.add_task(generate_billing_sheet, 
            billing_date, result)
    tasks.add_task(create_total_payables_year, 
            billing_date, result)
    return {"message" : "done"}

Now, nothing has changed when it comes to defining background processes. We usually inject BackgroundTasks into the API service method and apply add_task() to provide task schedules, assignments, and execution for a specific process. But since the approach is now to utilize coroutines, the background task will use the event loop instead of waiting for the current thread to finish its jobs.

If the background process requires arguments, we pass these arguments to add_task() right after its first parameter. For instance, the arguments for the billing_date and query_list parameters of generate_billing_sheet() should be placed after the generate_billing_sheet injection into add_task(). Moreover, the billing_date value should be passed before the result argument because add_task() still follows the order of parameter declaration in generate_billing_sheet() to avoid a type mismatch.

All asynchronous background tasks will continuously execute and will not be awaited even if their coroutine API service has already returned a response to the client.

Creating multiple tasks

BackgroundTasks allows the creation of multiple asynchronous transactions that will execute concurrently in the background. In the save_vendor_billing() service, there is another task created for a new transaction called the create_total_payables_year() transaction, which requires the same arguments as generate_billing_sheet(). Again, this newly created task will be utilizing the event loop instead of the thread.

The application always encounters performance issues when the background processes have high-CPU workloads. Also, tasks generated by BackgroundTasks are not capable of returning values from the transactions. Let us look for another solution where tasks can manage high workloads and execute processes with returned values.

Understanding Celery tasks

Celery is a non-blocking task queue that runs on a distributed system. It can manage asynchronous background processes that are huge and heavy with CPU workloads. It is a third-party tool, so we need to install it first through pip:

pip install celery

It schedules and runs tasks concurrently on a single server or distributed environment. But it requires a message transport to send and receive messages, such as Redis, an in-memory database that can be used as a message broker for messages in strings, dictionaries, lists, sets, bitmaps, and stream types. Also, we can install Redis on Linux, macOS, and Windows. Now, after the installation, run its redis-server.exe command to start the server. In Windows, the Redis service is set to run by default after installation, which causes a TCP bind listener error. So, we need to stop it before running the startup command. Figure 8.1 shows Windows Task Manager with the Redis service giving a Stopped status:

Figure 8.1 – Stopping the Redis service

Figure 8.1 – Stopping the Redis service

After stopping the service, we should now see Redis running as shown in Figure 8.2:

Figure 8.2 – A running Redis server

Figure 8.2 – A running Redis server

Creating and configuring the Celery instance

Before creating Celery tasks, we need a Celery instance placed in a dedicated module of our application. The newsstand prototype has the Celery instance in the /services/billing.py module, and the following is part of the code that shows the process of Celery instantiation:

from celery import Celery
from celery.utils.log import get_task_logger 
celery = Celery("services.billing",   
   broker='redis://localhost:6379/0', 
   backend='redis://localhost', 
   include=["services.billing", "models", "config"])
class CeleryConfig:
    task_create_missing_queues = True
    celery_store_errors_even_if_ignored = True
    task_store_errors_even_if_ignored = True 
    task_ignore_result = False
    task_serializer = "pickle"
    result_serializer = "pickle"
    event_serializer = "json"
    accept_content = ["pickle", "application/json", 
          "application/x-python-serialize"]
    result_accept_content = ["pickle", "application/json",
          "application/x-python-serialize"]
celery.config_from_object(CeleryConfig)
celery_log = get_task_logger(__name__)

To create the Celery instance, we need the following details:

  • The name of the current module containing the Celery instance (the first argument)
  • The URL of Redis as our message broker (broker)
  • The backend result where the results of tasks are stored and monitored (backend)
  • The list of other modules used in the message body or by the Celery task (include)

After the instantiation, we need to set the appropriate serializer and content types to process the incoming and outgoing message body of the tasks involved, if there are any. To allow the passing of full Python objects with non-JSON-able values, we need to include pickle as a supported content type, then declare a default task and result serializer to the object stream. However, using a pickle serializer poses some security issues because it tends to expose some transaction data. To avoid compromising the app, apply sanitation to message objects, such as removing sensitive values or credentials, before pursuing the messaging operation.

Apart from the serialization options, other important properties such as task_create_missing_queues, task_ignore_result, and error-related configuration should also be part of the CeleryConfig class. Now, we declare all these details in a custom class, which we will inject into the config_from_object() method of the Celery instance.

Additionally, we can create a Celery logger through its get_task_logger()with the name of the current task.

Creating the task

The main goal of the Celery instance is to annotate Python methods to become tasks. The Celery instance has a task() decorator that we can apply to all callable procedures we want to define as asynchronous tasks. Part of the task() decorator is the task’s name, an optional unique name composed of the package, module name(s), and the method name of the transaction. It has other attributes that can add more refinement to the task definition, such as the auto_retry list, which registers Exception classes that may cause execution retries when emitted, and max_tries, which limits the number of retry executions of a task. By the way, Celery 5.2.3 and below can only define tasks from non-coroutine methods.

The services.billing.tasks.create_total_payables_year_celery task shown here adds all the payable amounts per date and returns the total amount:

@celery.task(
    name="services.billing.tasks
            .create_total_payables_year_celery", 
                auto_retry=[ValueError, TypeError], 
                  max_tries=5)
def create_total_payables_year_celery(billing_date,
              query_list):
        total = 0.0
        for vendor in query_list:
            billing = vendor.children
            for record in billing:
                if billing_date == record.date_billed:
                    total += record.payable      
        celery_log.info('computed result: ' + str(total))
        return total   

The given task has only five (5) retries to recover when it encounters either ValueError or TypeError at runtime. Also, it is a function that returns a computed amount, which is impossible to create when using BackgroundTasks. All functional tasks use the Redis database as the temporary storage for their returned values, which is the reason there is a backend parameter in the Celery constructor.

Calling the task

FastAPI services can call these tasks using the apply_async()or delay()function. The latter is the easier option since it is preconfigured and only needs the parameters for the transaction to get the result. The apply_async() function is a better option since it accepts more details that can optimize the task execution. These details are queue, time_limit, retry, ignore_result, expires, and some kwargs of arguments. But both these functions return an AsyncResult object, which returns resources such as the task’s state, the wait() function to help the task finish its operation, and the get() function to return its computed value or an exception. The following code is a coroutine API service that calls the services.billing.tasks.create_total_payables_year_celery task using the apply_async method:

@router.post("/billing/total/payable")
async def compute_payables_yearly(billing_date:date):
    repo = BillingVendorRepository()
    result = await repo.join_vendor_billing()
    total_result = create_total_payables_year_celery
       .apply_async(queue='default', 
            args=(billing_date, result))
    total_payable = total_result.get(timeout=1)
    return {"total_payable": total_payable }

Setting task_create_missing_queues to True at the CeleryConfig setup is always recommended because it automatically creates the task queue, default or not, once the worker server starts. The worker server places all the loaded tasks in a task queue for execution, monitoring, and result retrieval. Thus, we should always define a task queue in the apply_async() function’s argument before extracting AsyncResult.

The AsyncResult object has a get() method that releases the returned value of the task from the AsyncResult instance, with or without a timeout. In the compute_payables_yearly() service, the amount payable in AsyncResult is retrieved by the get() function with a timeout of 5 seconds. Let us now deploy and run our tasks using the Celery server

Starting the worker server

Running the Celery worker creates a single process that handles and manages all the queued tasks. The worker needs to know in which module the Celery instance is created, together with the tasks to establish the server process. In our prototype, the services.billing module is where we place our Celery application. Thus, the complete command to start the worker is the following:

celery  -A services.billing worker -Q default -P solo -c 2 -l info

Here, -A specifies the module of our Celery object and tasks. The -Q option indicates that the worker will be using a low-, normal-, or high-priority queue. But first, we need to set task_create_missing_queues to True in the Celery setup. We also need to indicate the number of threads that the worker needs for task execution by adding the -c option. The -P option specifies the type of thread pool that the worker will be utilizing. By default, the Celery worker uses the prefork pool applicable to most CPU-bound transactions. Other options are solo, eventlet, and gevent, but our setup will be utilizing solo, the most suitable choice for running CPU-intensive tasks in a microservice environment. On the other hand, the -l option enables the logger we set using get_task_logger() during the setup. Now, there are also ways to monitor our running tasks and one of those options is to use the Flower tool.

Monitoring the tasks

Flower is Celery’s monitoring tool that observes and monitors all tasks executions by generating a real-time audit on a web-based platform. But first, we need to install it using pip:

pip install flower

And then, we run the following celery command with the flower option:

celery -A services.billing flower

To view the audit, we run http://localhost:5555/tasks on a browser. Figure 8.3 shows a Flower snapshot of an execution log incurred by the services.billing.tasks.create_total_payables_year_celery task:

Figure 8.3 – The Flower monitoring tool

Figure 8.3 – The Flower monitoring tool

So far, we have used Redis as our in-memory backend database for task results and a message broker. Let us now use another asynchronous message broker that can replace Redis, RabbitMQ.

Building message-driven transactions using RabbitMQ

RabbitMQ is a lightweight asynchronous message broker that supports multiple messaging protocols such as AMQP, STOM, WebSocket, and MQTT. It requires erlang before it works properly in Windows, Linux, or macOS. Its installer can be downloaded from https://www.rabbitmq.com/download.html.

Creating the Celery instance

Instead of using Redis as the broker, RabbitMQ is a better replacement as a message broker that will mediate messages between the client and the Celery worker threads. For multiple tasks, RabbitMQ can command the Celery worker to work on these tasks one at a time. The RabbitMQ broker is good for huge messages and it saves these messages to disk memory.

To start, we need to set up a new Celery instance that will utilize the RabbitMQ message broker using its guest account. We will use the AMQP protocol as the mechanism for a producer/consumer type of messaging setup. Here is the snippet that will replace the previous Celery configuration:

celery = Celery("services.billing",   
    broker='amqp://guest:[email protected]:5672',   
    result_backend='redis://localhost:6379/0', 
    include=["services.billing", "models", "config"])

Redis will still be the backend resource, as indicated in Celery’s backend_result, since it is still simple and easy to control and manage when message traffic increases. Let us now use the RabbitMQ to create and manage message-driven transactions.

Monitoring AMQP messaging

We can configure the RabbitMQ management dashboard to monitor the messages handled by RabbitMQ. After the setup, we can log in to the dashboard using the account details to set the broker. Figure 8.4 shows a screenshot of RabbitMQ’s analytics of a situation where the API services called the services.billing.tasks.create_total_payables_year_celery task several times:

Figure 8.4 – The RabbitMQ management tool

Figure 8.4 – The RabbitMQ management tool

If the RabbitMQ dashboard fails to capture the behavior of the tasks, the Flower tool will always be an option for gathering the details about the arguments, kwargs, UUID, state, and processing date of the tasks. And if RabbitMQ is not the right messaging tool, we can always resort to Apache Kafka.

Building publish/subscribe messaging using Kafka

As with RabbitMQ, Apache Kafka is an asynchronous messaging tool used by applications to send and store messages between producers and consumers. However, it is faster than RabbitMQ because it uses topics with partitions where producers can append various types of messages across these minute folder-like structures. In this architecture, the consumers can consume all these messages in a parallel mode, unlike in queue-based messaging, which enables producers to send multiple messages to a queue that can only allow message consumption sequentially. Within this publish/subscribe architecture, Kafka can handle an exchange of large quantities of data per second in continuous and real-time mode.

There are three Python extensions that we can use to integrate the FastAPI services with Kafka, namely the kafka-python, confluent-kafka, and pykafka extensions. Our online newsstand prototype will use kafka-python, so we need to install it using the pip command:

pip install kafka-python

Among the three extensions, it is only with kafka-python that we can channel and apply Java API libraries to Python for the implementation of a client. We can download Kafka from https://kafka.apache.org/downloads.

Running the Kafka broker and server

Kafka has a ZooKeeper server that manages and synchronizes the exchange of messages within Kafka’s distributed system. The ZooKeeper server runs as the broker that monitors and maintains the Kafka nodes and topics. The following command starts the server:

C:..kafkainwindowszookeeper-server-start.bat            C:..kafkaconfigzookeeper.properties

Now, we can start the Kafka server by running the following console command:

C:..kafkainwindowskafka-server-start.bat                C:..kafkaconfigserver.properties

By default, the server will run on localhost at port 9092.

Creating the topic

When the two servers have started, we can now create a topic called newstopic through the following command:

C:..kafka-topics.bat --create --bootstrap-server             localhost:9092 --replication-factor 1 --partitions 3         --topic newstopic

The newstopic topic has three (3) partitions that will hold all the appended messages of our FastAPI services. These are also the points where the consumers will simultaneously access all the published messages.

Implementing the publisher

After creating the topic, we can now implement a producer that publishes messages to the Kafka cluster. The kafka-python extension has a KafkaProducer class that instantiates a single thread-safe producer for all the running FastAPI threads. The following is an API service that sends a newspaper messenger record to the Kafka newstopic topic for the consumer to access and process:

from kafka import KafkaProducer
producer = KafkaProducer(
     bootstrap_servers='localhost:9092')
def json_date_serializer(obj):
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError ("Data %s not serializable" % 
             type(obj))
@router.post("/messenger/kafka/send")
async def send_messnger_details(req: MessengerReq): 
    messenger_dict = req.dict(exclude_unset=True)
    producer.send("newstopic", 
       bytes(str(json.dumps(messenger_dict, 
          default=json_date_serializer)), 'utf-8')) 
    return {"content": "messenger details sent"}

The coroutine API service, send_messenger_details(), asks for details about a newspaper messenger and stores them in a BaseModel object. And then, it sends the dictionary of profile details to the cluster in byte format. Now, one of the options to consume Kafka tasks is to run its built-in kafka-console-consumer.bat command.

Running a consumer on a console

Running the following command from the console is one way to consume the current messages from the newstopic topic:

kafka-console-consumer.bat --bootstrap-server                                                                                                                                                                                                                 127.0.0.1:9092 --topic newstopic

This command creates a consumer that will connect to the Kafka cluster to read in real time the current messages from newtopic sent by the producer. Figure 8.5 shows the capture of the consumer while it is running on the console:

Figure 8.5 – The Kafka consumer

Figure 8.5 – The Kafka consumer

If we want the consumer to read all the messages sent by the producer starting from the point where the Kafka server and broker began running, we need to add the --from-beginning option to the command. The following will read all the messages from newstopic and continuously capture incoming messages in real time:

kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic newstopic --from-beginning

Another way of implementing a consumer using the FastAPI framework is through SSE. Typical API service implementation will not work with the Kafka consumer requirement since we need a continuously running service that subscribes to newstopic for real-time data. So, let us now explore how we create SSE in the FastAPI framework and how it will consume Kafka messages.

Implementing asynchronous Server-Sent Events (SSE)

SSE is a server push mechanism that sends data to the browser without reloading the page. Once subscribed, it generates event-driven streams in real time for various purposes.

Creating SSE in the FastAPI framework only requires the following:

  • The EventSourceResponse class from the sse_starlette.see module
  • An event generator

Above all, the framework also allows non-blocking implementation of the whole server push mechanism using coroutines that can run even on HTTP/2. The following is a coroutine API service that implements a Kafka consumer using SSE’s open and lightweight protocol:

from sse_starlette.sse import EventSourceResponse
@router.get('/messenger/sse/add')
async def send_message_stream(request: Request):
        
    async def event_provider():
        while True:
            if await request.is_disconnected():
                break
            message = consumer.poll()
            if not len(message.items()) == 0:
                for tp, records in message.items():
                   for rec in records:
                     messenger_dict = 
                      json.loads(rec.value.decode('utf-8'),
                       object_hook=date_hook_deserializer )
                                             
                     repo = MessengerRepository()
                     result = await 
                      repo.insert_messenger(messenger_dict)
                     id = uuid4()
                     yield {
                       "event": "Added … status: {},  
                           Received: {}". format(result, 
                            datetime.utcfromtimestamp(
                               rec.timestamp // 1000)
                               .strftime("%B %d, %Y 
                                      [%I:%M:%S %p]")),
                       "id": str(id),
                       "retry": SSE_RETRY_TIMEOUT,
                       "data": rec.value.decode('utf-8')
                      }
            
            await asyncio.sleep(SSE_STREAM_DELAY)
    return EventSourceResponse(event_provider())

send_message_stream() is a coroutine API service that implements the whole SSE. It returns a special response generated by an EventSourceResponse function. While the HTTP stream is open, it continuously retrieves data from its source and converts any internal events into SSE signals until the connection is closed.

On the other hand, event generator functions create internal events, which can also be asynchronous. send_message_stream(), for instance, has a nested generator function, event_provider(), which consumes the last message sent by the producer service using the consumer.poll() method. If the message is valid, the generator converts the message retrieved into a dict object and inserts all its details into the database through MessengerRepository. Then, it yields all the internal details for the EventSourceResponse function to convert into SSE signals. Figure 8.6 shows the data streams generated by send_message_stream()rendered from the browser:

Figure 8.6 – The SSE data streams

Figure 8.6 – The SSE data streams

Another way to implement a Kafka consumer is through WebSocket. But this time, we will focus on the general procedure of how to create an asynchronous WebSocket application using the FastAPI framework.

Building an asynchronous WebSocket

Unlike in SSE, connection in WebSocket is always bi-directional, which means the server and client communicate with each other using a long TCP socket connection. The communication is always in real time and it doesn’t require the client or the server to reply to every event sent.

Implementing the asynchronous WebSocket endpoint

The FastAPI framework allows the implementation of an asynchronous WebSocket that can also run on the HTTP/2 protocol. The following is an example of an asynchronous WebSocket created using the coroutine block:

import asyncio
from fastapi import WebSocket
@router.websocket("/customer/list/ws")
async def customer_list_ws(websocket: WebSocket):
    await websocket.accept()
    repo = CustomerRepository()
    result = await repo.get_all_customer()
    
    for rec in result:
        data = rec.to_dict()
        await websocket.send_json(json.dumps(data, 
           default=json_date_serializer))
        await asyncio.sleep(0.01)
        client_resp = await websocket.receive_json()
        print("Acknowledging receipt of record id 
           {}.".format(client_resp['rec_id']))
    await websocket.close()    

First, we decorate a coroutine function with @router.websocket() when using APIRouter, or @api.websocket() when using the FastAPI decorator to declare a WebSocket component. The decorator must also define a unique endpoint URL for the WebSocket. Then, the WebSocket function must have an injected WebSocket as its first method argument. It can also include other parameters such as query and header parameters.

The WebSocket injectable has four ways for sending messages, namely send(), send_text(), send_json(), and send_bytes(). Applying send() will always manage every message as plain text by default. The previous customer_list_ws()coroutine is a WebSocket that sends every customer record in JSON format.

On the other hand, there are also four methods the WebSocket injectable can provide, and these are the receive(), receive_text(), receive_json(), and receive_bytes() methods. The receive() method expects the message to be in plain-text format by default. Now, our customer_list_ws() endpoint expects a JSON reply from a client because it invokes the receive_json() method after its send message operation.

The WebSocket endpoint must close the connection right after its transaction is done.

Implementing the WebSocket client

There are many ways to create a WebSocket client but this chapter will focus on utilizing a coroutine API service that will perform a handshake with the asynchronous customer_list_ws() endpoint once called on a browser or a curl command. Here is the code of our WebSocket client implemented using the websockets library that runs on top of the asyncio framework:

import websockets
@router.get("/customer/wsclient/list/")  
async def customer_list_ws_client():
    uri = "ws://localhost:8000/ch08/customer/list/ws"
    async with websockets.connect(uri) as websocket:
        while True:
           try:
             res = await websocket.recv()
             data_json = json.loads(res, 
                object_hook=date_hook_deserializer)
                   
             print("Received record: 
                       {}.".format(data_json))
                   
             data_dict = json.loads(data_json)
             client_resp = {"rec_id": data_dict['id'] }
             await websocket.send(json.dumps(client_resp))
                    
           except websockets.ConnectionClosed:
                 break
        return {"message": "done"}

After a successful handshake is created by the websockets.connect() method, customer_list_ws_client() will have a loop running continuously to fetch all incoming consumer details from the WebSocket endpoint. The message received will be converted into its dictionary needed by other processes. Now, our client also sends an acknowledgment notification message back to the WebSocket coroutine with JSON data containing the customer ID of the profile. The loop will stop once the WebSocket endpoint closes its connection.

Let us now explore other asynchronous programming features that can work with the FastAPI framework.

Applying reactive programming in tasks

Reactive programming is a paradigm that involves the generation of streams that undergo a series of operations to propagate some changes during the process. Python has an RxPY library that offers several methods that we can apply to these streams asynchronously to extract the terminal result as desired by the subscribers.

In the reactive programming paradigm, all intermediate operators working along the streams will execute to propagate some changes if there is an Observable instance beforehand and an Observer that subscribes to this instance. The main goal of this paradigm is to achieve the desired result at the end of the propagation process using functional programming.

Creating the Observable data using coroutines

It all starts with the implementation of a coroutine function that will emit these streams of data based on a business process. The following is an Observable function that emits publication details in str format for those publications that did well in sales:

import asyncio
from rx.disposable import Disposable
async def process_list(observer):
      repo = SalesRepository()
      result = await repo.get_all_sales()
      
      for item in result:
        record = " ".join([str(item.publication_id),  
          str(item.copies_issued), str(item.date_issued), 
          str(item.revenue), str(item.profit), 
          str(item.copies_sold)])
        cost = item.copies_issued * 5.0
        projected_profit = cost - item.revenue
        diff_err = projected_profit - item.profit
        if (diff_err <= 0):
            observer.on_next(record)
        else:
            observer.on_error(record)
      observer.on_completed()

An Observable function can be synchronous or asynchronous. Our target is to create an asynchronous one such as process_list(). The coroutine function should have the following callback methods to qualify as an Observable function:

  • An on_next() method that emits items given a certain condition
  • An on_completed() method that is executed once when the function has completed the operation
  • An on_error() method that is called when an error occurs on Observable

Our process_list() emits the details of the publication that gained some profit. Then, we create an asyncio task for the call of the process_list() coroutine. We created a nested function, evaluate_profit(), which returns the Disposable task required by RxPY’s create() method for the production of the Observable stream. The cancellation of this task happens when the Observable stream is all consumed. The following is the complete implementation for the execution of the asynchronous Observable function and the use of the create() method to generate streams of data from this Observable function:

def create_observable(loop):
    def evaluate_profit(observer, scheduler):
        task = asyncio.ensure_future(
            process_list(observer), loop=loop)
        return Disposable(lambda: task.cancel())
    return rx.create(evaluate_profit)

The subscriber created by create_observable()is our application’s list_sales_by_quota() API service. It needs to get the current event loop running for the method to generate the observable. Afterward, it invokes the subscribe() method to send a subscription to the stream and extract the needed result. The Observable’s subscribe() method is invoked for a client to subscribe to the stream and observe the occurring propagations:

@router.get("/sales/list/quota")
async def list_sales_by_quota():
    loop = asyncio.get_event_loop()
    observer = create_observable(loop)
    
    observer.subscribe(
        on_next=lambda value: print("Received Instruction 
              to buy {0}".format(value)),
        on_completed=lambda: print("Completed trades"),
        on_error=lambda e: print(e),
        scheduler = AsyncIOScheduler(loop)   
    )
    return {"message": "Notification 
           sent to the background"}

The list_sales_by_quote() coroutine service shows us how to subscribe to an Observable. A subscriber should utilize the following callback methods:

  • An on_next() method to consume all the items from the stream
  • An on_completed() method to indicate the end of the subscription
  • An on_error() method to flag an error during the subscription process

And since the Observable processes run asynchronously, the scheduler is an optional argument that provides the right manager to schedule and run these processes. The API service used AsyncIOScheduler as the appropriate schedule for the subscription. But there are other shortcuts to generating Observables that do not use a custom function.

Creating background process

As when we create continuously running Observables, we use the interval() function instead of using a custom Observable function. Some observables are designed to end successfully, but some are created to run continuously in the background. The following Observable runs in the background periodically to provide some updates on the total amount received from newspaper subscriptions:

import asyncio
import rx
import rx.operators as ops
async def compute_subscriptions():
    total = 0.0
    repo = SubscriptionCustomerRepository()
    result = await repo.join_customer_subscription_total()
    
    for customer in result:
        subscription = customer.children
        for item in subscription:
            total = total + (item.price * item.qty)
    await asyncio.sleep(1)
    return total
def fetch_records(rate, loop) -> rx.Observable:
    return rx.interval(rate).pipe(
        ops.map(lambda i: rx.from_future(
          loop.create_task(compute_subscriptions()))),
        ops.merge_all()
    )

The interval() method creates a stream of data periodically in seconds. But this Observable imposes some propagations on its stream because of the execution of the pipe() method. The Observable’s pipe() method creates a pipeline of reactive operators called the intermediate operators. This pipeline can consist of a chain of operators running one at a time to change items from the streams. It seems that this series of operations creates multiple subscriptions on the subscriber. So, fetch_records() has a map() operator in its pipeline to extract the result from the compute_subcription() method. It uses merge_all() at the end of the pipeline to merge and flatten all substreams created into one final stream, the stream expected by the subscriber. Now, we can also generate Observable data from files or API response.

Accessing API resources

Another way of creating an Observable is using the from_() method, which extracts resources from files, databases, or API endpoints. The Observable function retrieves its data from a JSON document generated by an API endpoint from our application. The assumption is that we are running the application using hypercorn, which uses HTTP/2, and so we need to bypass the TLS certificate by setting the verify parameter of httpx.AsyncClient() to False.

The following code highlights the from_() in the fetch_subscription() operation, which creates an Observable that emits streams of str data from the https://localhost:8000/ch08/subscription/list/all endpoint. These reactive operators of the Observable, namely filter(), map(), and merge_all(), are used to propagate the needed contexts along the stream:

async def fetch_subscription(min_date:date, 
         max_date:date, loop) -> rx.Observable:
    headers = {
            "Accept": "application/json",
            "Content-Type": "application/json"
        }
    async with httpx.AsyncClient(http2=True, 
             verify=False) as client:
        content = await 
          client.get('https://localhost:8000/ch08/
            subscription/list/all', headers=headers)
    y = json.loads(content.text)
    source = rx.from_(y)
    observable = source.pipe(
      ops.filter(lambda c: filter_within_dates(
               c, min_date, max_date)),
      ops.map(lambda a: rx.from_future(loop.create_task(
            convert_str(a)))),
      ops.merge_all(),
    )
    return observable

The filter() method is another pipeline operator that returns Boolean values from a validation rule. It executes the following filter_within_dates() to verify whether the record retrieved from the JSON document is within the date range specified by the subscriber:

def filter_within_dates(rec, min_date:date, max_date:date):
    date_pur = datetime.strptime(
             rec['date_purchased'], '%Y-%m-%d')
    if date_pur.date() >= min_date and 
             date_pur.date() <= max_date:
        return True
    else:
        return False

On the other hand, the following convert_str() is a coroutine function executed by the map() operator to generate a concise profile detail of the newspaper subscribers derived from the JSON data:

async def convert_str(rec):
    if not rec == None:
        total = rec['qty'] * rec['price']
        record = " ".join([rec['branch'], 
            str(total), rec['date_purchased']])
        await asyncio.sleep(1)
        return record

Running these two functions modifies the original emitted data stream from JSON to a date-filtered stream of str data. The coroutine list_dated_subscription()API service, on the other hand, subscribes to fetch_subscription() to extract the newspaper subscriptions within the min_date and max_date range:

@router.post("/subscription/dated")
async def list_dated_subscription(min_date:date, 
            max_date:date):
     
    loop = asyncio.get_event_loop()
    observable = await fetch_subscription(min_date, 
             max_date, loop)
    
    observable.subscribe(
       on_next=lambda item: 
         print("Subscription details: {}.".format(item)),
       scheduler=AsyncIOScheduler(loop)
    )

Although the FastAPI framework does not yet fully support reactive programming, we can still create coroutines that can work with various RxPY utilities. Now, we will explore how coroutines are not only for background processes but also for FastAPI event handlers.

Customizing events

The FastAPI framework has special functions called event handlers that execute before the application starts up and during shutdown. These events are activated every time the uvicorn or hypercorn server reloads. Event handlers can also be coroutines.

Defining the startup event

The startup event is an event handler that the server executes when it starts up. We decorate the function with the @app.on_event("startup") decorator to create a startup event. Applications may require a startup event to centralize some transactions, such as the initial configuration of some components or the set up of data-related resources. The following example is the application startup event that opens a database connection for the GINO repository transactions:

app = FastAPI()
@app.on_event("startup")
async def initialize():
    engine = await db.set_bind("postgresql+asyncpg://
          postgres:admin2255@localhost:5433/nsms")

This initialize() event is defined in our application’s main.py file so that GINO can only create the connection once every server reload or restart.

Defining shutdown events

Meanwhile, the shutdown event cleans up unwanted memory, destroys unwanted connections, and logs the reason for shutting down the application. The following is the shutdown event of our application that closes the GINO database connection:

@app.on_event("shutdown")
async def destroy():
    engine, db.bind = db.bind, None
    await engine.close()

We can define startup and shutdown events in APIRouter but be sure this will not cause transaction overlapping or collision with other routers. Moreover, event handlers do not work in mounted sub-applications.

Summary

The use of coroutines is one of the factors that makes the FastAPI microservice application fast, aside from its use of an ASGI-based server. This chapter has proven that using coroutines to implement API services will improve the performance better than utilizing more threads in the thread pool. Since the framework runs on an asyncio platform, we can utilize asyncio utilities to design various design patterns to manage the CPU-bound and I/O-bound services.

This chapter used Celery and Redis for creating and managing asynchronous background tasks for behind-the-scenes transactions such as logging, system monitoring, time-sliced computations, and batch jobs. We learned that RabbitMQ and Apache Kafka provided an integrated solution for building asynchronous and loosely coupled communication between FastAPI components, especially for the message-passing part of these interactions. Most importantly, coroutines were applied to create these asynchronous and non-blocking background processes and message-passing solutions to enhance performance. Reactive programming was also introduced in this chapter through the RxPy extension module.

This chapter, in general, concludes that the FastAPI framework is ready to build a microservice application that has a reliable, asynchronous, message-driven, real-time message-passing, and distributed core system. The next chapter will highlight other FastAPI features that provide integrations with UI-related tools and frameworks, API documentation using OpenAPI Specification, session handling, and circumventing CORS.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.195.249