© Federico Marani 2019
Federico MaraniPractical Django 2 and Channels 2https://doi.org/10.1007/978-1-4842-4099-1_8

8. Backend for a Mobile Customer-Service App

Federico Marani1 
(1)
London, UK
 

In this chapter we will move away from HTML and normal HTTP. We will create the necessary infrastructure to support a mobile app, including asynchronous communication using WebSocket and HTTP server-side events.

This chapter is centered on Django Channels, including these topics:
  • How to integrate it

  • How to build WebSocket consumers

  • How to integrate it with Redis

  • How to use it to serve content asynchronously

Django Channels

Django Channels is a recent addition to the Django ecosystem. It is officially supported by Django, and all development happens on GitHub alongside Django itself. It is not, however, included when installing Django.

Channels allows us to write asynchronous code to deal with incoming requests, which is helpful in case we have a naturally asynchronous interaction between client and server—for example, a chat session.

While it is technically possible to build a chat server with normal Django views if the real-time component is not critical, synchronous systems will not be able to scale up as well as asynchronous systems when the number of clients increases.

That said, development is much easier on synchronous systems. Do not consider asynchronous programming to be always a better paradigm, because, as anything in computing, it comes with trade-offs.

Asynchronous Code vs. Synchronous Code

Synchronous programming is a very well-understood execution model. Your code is executed from beginning to end, before returning control to Django. On the other end, in asynchronous programming, this is not always the case. Your asynchronous code operates in coordination with an asyncio event loop. There is a lot of documentation online that I encourage you to look, like1.

Fortunately, you do not need to understand everything about asynchronous programming before using Django Channels. Some of the underlying details are hidden by this library, which simplifies your job. One of the most important things to understand is that you cannot mix synchronous and asynchronous code freely. Every time you do so, you need to cross a boundary2.

I encourage you to spend some time reading about asynchronous programming online. It will help you to follow the code and concepts presented in this chapter.

Installation and Configuration

To install Channels on our system, we will set it up along with Redis, an open source in-memory data structure server ( https://redis.io ). This is for the same reasons that we used PostgreSQL at the start: to always work on environments that are the same that production deployments will use.

Go ahead and download and install Redis on your operating system. After you have done so, proceed with installing Channels:
pipenv install channels
pipenv install channels_redis

Our project will need a new file, routing.py, to be placed in the same folder of settings.py. This file, for now, will not declare any route besides the built-in ones.

The contents of booktime/routing.py are this:
from channels.routing import ProtocolTypeRouter
application = ProtocolTypeRouter({})
We will connect this file with a configuration for Channels using Redis. The application channels needs to be the first because it overrides the runserver command of standard Django:
INSTALLED_APPS = [
    'channels',
    ...
]
ASGI_APPLICATION = "booktime.routing.application"
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}
You are ready now to use this library. This is the output of the new runserver command:
$ ./manage.py runserver
Performing system checks...
System check identified no issues (0 silenced).
August 15, 2018 - 18:56:30
Django version 2.1, using settings 'booktime.settings' Starting ASGI/Channels version 2.1.2 development server at...
Quit the server with CONTROL-C.
2018-08-15 18:56:30,270 - INFO - server - HTTP/2 support not enabled
2018-08-15 18:56:30,271 - INFO - server - Configuring endpoint...
2018-08-15 18:56:30,272 - INFO - server - Listening on 127.0.0.1:8000

Use of Redis

As previously mentioned, Redis is an in-memory data structure server. It can be used as a very simple database, a cache, or a message broker. It has a very different set of operations from normal databases, and its use case is very different.

In our setup, Channels will be using Redis to pass messages between different instances of the running Django application. Redis will also enable us to do message passing between instances running on different machines as well as running on a single server.

Besides requiring Redis to support the communication between processes, the code in this chapter will use Redis directly to create a simple presence mechanism. By default, Channels does not offer a way to ascertain whether or not certain users are connected to our endpoints.

Consumers

We can consider consumers in Channels as the equivalent of class-based views, with the difference that consumers are not as high level and numerous as CBVs. There are two base consumer classes, SyncConsumer and AsyncConsumer . Their interfaces are the same except that asynchronous code relies on the Python async/await syntax.

The structure of consumers is based on a combination of message handlers using class methods and the send() built-in method. Consumers can have multiple message handlers, and the routing of messages is based on the type value of the message.

Every consumer, when the class is initialized, will receive a scope object, stored in self.scope. It contains information about the initial request such as:
  • scope["path"]: The path of the request

  • scope["user"]: The current user (only when authentication is enabled)

  • scope["url_route"]: Contains the matched route (if using a URL router)

There are several subclasses of the original consumer:
  • WebsocketConsumer and AsyncWebsocketConsumer

  • JsonWebsocketConsumer and AsyncJsonWebsocketConsumer

  • AsyncHttpConsumer

Each one of those subclasses adds a few helper methods, for example, to manage JSON encoding/decoding or WebSockets.

Channels, Channel Layers, and Groups

One of the core concepts of this library is channels. A channel is essentially a mailbox, and each consumer gets one automatically when it is instantiated as part of request handling, and it is removed when the consumer terminates. This is how messages are dispatched from the outside to a given consumer.

A channel layer is the equivalent of a mailman. They are needed to transport messages between various instances of Django. This layer can deliver messages either to a single consumer (by knowing its channel name) or to a set of consumers (by knowing their group channel name).

Sending messages to a single consumer is not a common use case. It is more common to send messages to all consumers associated with a particular user (useful in case he or she opened the site on multiple tabs), or to all consumers of a particular set of users (like in our chat case).

To this end, Channels offers an abstraction called groups. Groups are entities you can send messages to, which then will be forwarded to all the consumers’ channels connected to it. The connection of channels to groups is made through the methods group_add() and group_discard() on the channel layer.

To send messages to a group, you can use group_send() . When a message is sent through this method, the channel layers will forward the message to all the consumers’ channels connected. Then, the consumers will automatically handle the message by calling the message handler for that specific message type.

It is then the responsibility of the handler to either forward back to the HTTP client the message (by using send()) or do the required computation.

Routing and Middleware

Channels includes various routers, which are used to route requests to specific consumers. Routing can be based on a protocol (HTTP or WebSocket), URLs, or channel names.

In our project we will initially use ProtocolTypeRouter because we need to separate WebSocket-handling code from normal Django views. We will use routers together with special middleware provided by Channels.

Middleware here is not the same as middleware in standard Django. This middleware is fully asynchronous. It offers something that in principle is similar to Django middleware: filtering, blocking, and adding additional information to scopes.

In our application we shall be using AuthMiddlewareStack , which is a combination of authentication, session, and cookie middleware components. This middleware stack will take care of loading the user session, establishing whether the connection is authenticated, and, if so, loading the user object in the consumer scope.

Chat for Our Customer-Service Operators

Let’s start building something concrete with Channels: the internal chat page for customer-service operators. We will start by downloading a must-have, reconnecting-websocket, which will deal with unstable connections for us:
$ curl -o main/static/js/reconnecting-websocket.min.js
   https://raw.githubusercontent.com/joewalnes/reconnecting-websocket/
   master/reconnecting-websocket.min.js
We will offer to our customer-service representatives a very simple chat page, which you are free to style as you wish. This is the content of main/templates/chat_room.html:
{% load static %}
<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8"/>
    <title>Chat Room</title>
    <script
      src="{% static "js/reconnecting-websocket.min.js" %}"
      charset="utf-8"></script>
  </head>
  <body>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br/>
    <input id="chat-message-input" type="text" size="100"/><br/>
    <input id="chat-message-submit" type="button" value="Send"/>
  </body>
  <script>
    var roomName = {{ room_name_json }};
    var chatSocket = new ReconnectingWebSocket(
      'ws://' + window.location.host + '/ws/customer-service/' +
      roomName + '/'
    );
    chatSocket.onmessage = function (e) {
      var data = JSON.parse(e.data);
      var username = data['username'];
      if (data['type'] == "chat_join") {
        message = (username + ' joined ');
      } else if (data['type'] == "chat_leave") {
        message = (username + ' left ');
      } else {
        message = (username + ': ' + data['message'] + ' ');
      }
      document
        .querySelector('#chat-log')
        .value += message;
    };
    chatSocket.onclose = function (e) {
      console.error('Chat socket closed unexpectedly');
    };
    document
      .querySelector('#chat-message-input')
      .focus();
    document
      .querySelector('#chat-message-input')
      .onkeyup = function (e) {
        if (e.keyCode === 13) { // enter, return
          document
            .querySelector('#chat-message-submit')
            .click();
        }
      };
    document
      .querySelector('#chat-message-submit')
      .onclick = function (e) {
        var messageInputDom = document.querySelector(
          '#chat-message-input'
        );
        var message = messageInputDom.value;
        chatSocket.send(
          JSON.stringify({'type': 'message', 'message': message})
        );
        messageInputDom.value = ";
      };
    setInterval(function () {
      chatSocket.send(JSON.stringify({'type': 'heartbeat'}));
    }, 10000);
  </script>
</html>

This is our customized version of an example included in the Channels repository. It is a simple message visualizer/sender that sends and receives JSON messages over a WebSocket connection.

WebSocket Protocol Format

We are going to define a very simple format for WebSocket messages, server to client:
{
    type: "TYPE",
    username: "who is the originator of the event",
    message: "This is the displayed message" (optional)
}
Here, TYPE can have the following values:
  • chat_join : The username joined the chat.

  • chat_leave : The username left the chat.

  • chat_message : The username sent a message.

This is enough to define server to client. Now for the client to server:
{
    type: "TYPE",
    message: "This is the displayed message" (optional)
}
TYPE in this case can have the following values:
  • message : The username sent a message.

  • heartbeat : A ping to let the server know that the user is active.

This describes our entire WebSocket protocol. We are going to use this to build both the customer-service part and the mobile interface for our company’s customer.

Heartbeat Mechanism in Redis

Our chat is going to need a presence system that is user-centric rather than connection-centric. Any user, whether a customer-service representative or an end user, will become unavailable once all WebSocket connections initiated by them are closed or become inactive for more than 10 seconds. This will ensure a more robust way to handle network problems or browser crashes.

We are going to rely on the expiration feature of Redis. Redis has a very efficient way to set values for specific keys, only temporarily. When setting values along with an expiration time, Redis will automatically remove these in due time. It is the perfect mechanism for us, as it takes care of recycling keys automatically.

The heartbeat signals coming from clients will issue a Redis SETEX command on a key named customer-service_ORDERID_USEREMAIL setting a dummy value of 1 with an expiration of 10 seconds.

This is enough to support a page to display a dynamic list of who is connected to what customer-service chat. This page will issue a Redis KEYS command with the prefix customer-service_, and generate the information from the returned result.

To use Redis directly, we need to add a new dependency to our system:
$ pipenv install aioredis

Given that we are using asynchronous consumers, we will need asynchronous network libraries with it. The aioredis library is the asynchronous version of the Redis Python client.

Bootstrap Page

We are going to add a URL to main/urls.py and a view to main/views.py to serve the chat_room.html template presented earlier. Let’s start with the view:
...
def room(request, order_id):
    return render(
        request,
        "chat_room.html",
        {"room_name_json": str(order_id)},
    )
...
urlpatterns = [
    ...
    path(
        "customer-service/<int:order_id>/",
        views.room,
        name="cs_chat",
    ),
]

The chat bootstrap page is now complete, but its JavaScript will not work because the WebSocket endpoint does not exist yet.

WebSocket Consumer

In the consumer is where most of the work happens. We will put the consumer in main/consumers.py:
import aioredis
import logging
from django.shortcuts import get_object_or_404
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from . import models
logger = logging.getLogger(__name__)
class ChatConsumer(AsyncJsonWebsocketConsumer):
    EMPLOYEE = 2
    CLIENT = 1
    def get_user_type(self, user, order_id):
        order = get_object_or_404(models.Order, pk=order_id)
        if user.is_employee:
            order.last_spoken_to = user order.save()
            return ChatConsumer.EMPLOYEE
        elif order.user == user:
            return ChatConsumer.CLIENT
        else:
            return None
    async def connect(self):
        self.order_id = self.scope["url_route"]["kwargs"][
            "order_id"
        ]
        self.room_group_name = (
            "customer-service_%s" % self.order_id
        )
        authorized = False
        if self.scope["user"].is_anonymous:
           await self.close()
        user_type = await database_sync_to_async(
            self.get_user_type
        )(self.scope["user"], self.order_id)
        if user_type == ChatConsumer.EMPLOYEE:
            logger.info(
                "Opening chat stream for employee %s",
                self.scope["user"],
            )
            authorized = True
        elif user_type == ChatConsumer.CLIENT:
            logger.info(
                "Opening chat stream for client %s",
                self.scope["user"],
            )
            authorized = True
        else:
            logger.info(
                "Unauthorized connection from %s",
                self.scope["user"],
            )
            await self.close()
        if authorized:
            self.r_conn = await aioredis.create_redis(
                "redis://localhost"
            )
            await self.channel_layer.group_add(
                self.room_group_name, self.channel_name
            )
            await self.accept()
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "chat_join",
                    "username": self.scope[
                        "user"
                    ].get_full_name(),
                },
            )
    async def disconnect(self, close_code):
        if not self.scope["user"].is_anonymous:
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "chat_leave",
                    "username": self.scope[
                        "user"
                    ].get_full_name(),
                },
            )
            logger.info(
                "Closing chat stream for user %s",
                self.scope["user"],
            )
            await self.channel_layer.group_discard(
                self.room_group_name, self.channel_name
            )
    async def receive_json(self, content):
        typ = content.get("type")
        if typ == "message":
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "chat_message",
                    "username": self.scope[
                        "user"
                    ].get_full_name(),
                    "message": content["message"],
                },
            )
        elif typ == "heartbeat":
            await self.r_conn.setex(
                "%s_%s"
                % (
                    self.room_group_name,
                    self.scope["user"].email,
                ),
                10,  # expiration (in 10 seconds)
                "1", # dummy value
            )
    async def chat_message(self, event):
        await self.send_json(event)
    async def chat_join(self, event):
        await self.send_json(event)
    async def chat_leave(self, event):
        await self.send_json(event)

We derived our consumer from AsyncJsonWebsocketConsumer , which takes care of the WebSocket low-level aspects and JSON encoding. We need to implement receive_json(), connect(), and disconnect() for this class to work.

The first thing connect() does is generate a room name, which will be used as a channel name for the group_*() calls. After this, we need to make sure the user has the permission to be here, and this requires accessing the database.

Accessing the database from an asynchronous consumer requires wrapping the code in a synchronous function, and then using the database_sync_to_async() method . This is because Django itself, especially the ORM, is written in a synchronous manner.

The get_user_type() method in the preceding code, besides checking the user type, stores the name of the last employee the customer has spoken to in the order.

The three main methods, receive_json() , connect() , and disconnect() , are using the channel layer methods group_send() , group_add() , and group_discard() to manage the communication and synchronization between all the different consumer instances of one chat room.

In the method receive_json(), we use the group_send() method to handle the two types of messages we listed in the WebSocket protocol format section. Messages of type “message” are forwarded as is to all connected consumers, and messages of type “heartbeat” are used to update the expiration time of the Redis key (or create the key if it does not exist already).

In the methods connect() and disconnect(), we use the group_send() method to generate the join/leave messages of the various users.

It is important to remember that group_send() is not sending the data back to the browser’s WebSocket connection. It is only used to relay information between consumers, using the configured channel layer. Each consumer will receive this data through message handlers.

Finally, in ChatConsumer there are three handlers: chat_message(), chat_join(), and chat_leave(). All of them send the message straight back to the browser WebSocket connection, because all the processing will happen in the frontend.

The name of the message handler that will handle the message is derived from the type field. If group_send() is called with a message['type'] of chat_message, the receiving consumer will handle this with the chat_message() handler .

In light of these last few paragraphs, take your time to re-read the code and cross-reference the things I just mentioned about what the code does. Also consider when reading the code that asynchronous programming is not the default style in Python.

This consumer requires a new field in our database schema. Let’s quickly add it to our main/models.py:
...
class Order(models.Model):
    ...
    last_spoken_to = models.ForeignKey(
        User,
        null=True,
        related_name="cs_chats",
        on_delete=models.SET_NULL,
    )
    ...

After this addition, do not forget to run the management commands makemigrations and migrate to have this applied to the database.

After having defined the consumer and all that it requires, we’ll continue with defining the route for this consumer.

Routing

At the moment we have an empty {} routing variable inside booktime/routing.py. We need to change this.

We will manage all routes specific to our site in main/routing.py:
from django.urls import path
from . import consumers
websocket_urlpatterns = [
    path(
        "ws/customer-service/<int:order_id>/",
        consumers.ChatConsumer
    )
]
We will manage all the general routes in booktime/routing.py:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import main.routing
application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
        URLRouter(
            main.routing.websocket_urlpatterns
        )
    ),
})

This setup will assign a URL path to our WebSocket consumer and relay WebSocket traffic to this new “websocket” route. If we do not add the “http” protocol type to the main router, Channels will add it automatically to support the standard Django views.

You can now test your page by loading the chat URL in your browser. You should be able to load the chat and type some sentences into the chat. If you load the chat in multiple browsers, you will see that what you type in one window will appear in the other.

Automated Testing

As with anything in this book, I will explain how to test the code that was exposed so far. Channels offers constructs called communicators. You can think of communicators as the equivalent to the test clients that are included in the Django TestCase.

Communicators, unlike consumers, do not have a synchronous version and an asynchronous version. They only come with an asynchronous API. To use this with the standard Python Unittest framework, which runs synchronously, we would need to use some low-level asyncio API. That is what we are going to do in the tests.

We will put these tests in main/tests/test_consumers.py:
import asyncio
from django.contrib.auth.models import Group
from django.test import TestCase
from channels.db import database_sync_to_async
from channels.testing import WebsocketCommunicator
from main import consumers
from main import factories
class TestConsumers(TestCase):
    def test_chat_between_two_users_works(self):
        def init_db():
            user = factories.UserFactory(
                email="[email protected]",
                first_name="John",
                last_name="Smith",
            )
            order = factories.OrderFactory(user=user)
            cs_user = factories.UserFactory(
                email="[email protected]",
                first_name="Adam",
                last_name="Ford",
                is_staff=True,
            )
            employees, _ = Group.objects.get_or_create(
                name="Employees"
            )
            cs_user.groups.add(employees)
            return user, order, cs_user
        async def test_body():
            user, order, cs_user = await database_sync_to_async(
                init_db
            )()
            communicator = WebsocketCommunicator(
                consumers.ChatConsumer,
                "/ws/customer-service/%d/" % order.id,
            )
            communicator.scope["user"] = user
            communicator.scope["url_route"] = {
                "kwargs": {"order_id": order.id}
            }
            connected, _ = await communicator.connect()
            self.assertTrue(connected)
            cs_communicator = WebsocketCommunicator(
                consumers.ChatConsumer,
                "/ws/customer-service/%d/" % order.id,
            )
            cs_communicator.scope["user"] = cs_user
            cs_communicator.scope["url_route"] = {
                "kwargs": {"order_id": order.id}
            }
            connected, _ = await cs_communicator.connect()
            self.assertTrue(connected)
            await communicator.send_json_to(
                {
                    "type": "message",
                    "message": "hello customer service",
                }
            )
            await asyncio.sleep(1)
            await cs_communicator.send_json_to(
                {"type": "message", "message": "hello user"}
            )
            self.assertEquals(
                await communicator.receive_json_from(),
                {"type": "chat_join", "username": "John Smith"},
            )
            self.assertEquals(
                await communicator.receive_json_from(),
                {"type": "chat_join", "username": "Adam Ford"},
            )
            self.assertEquals(
                await communicator.receive_json_from(),
                {
                    "type": "chat_message",
                    "username": "John Smith",
                    "message": "hello customer service",
                },
            )
            self.assertEquals(
                await communicator.receive_json_from(),
                {
                    "type": "chat_message",
                    "username": "Adam Ford",
                    "message": "hello user",
                },
            )
            await communicator.disconnect()
            await cs_communicator.disconnect()
            order.refresh_from_db()
            self.assertEquals(order.last_spoken_to, cs_user)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test_body())

In this test we are testing that the connection works and the messages are relayed as they should be. To test this we use two communicators, one that represents the customer-service operator and the other the final user.

Note that the preceding test function contains two subfunctions, one for the synchronous database initialization and the other with the main asynchronous body. The main asynchronous body is then run by referencing the asyncio loop directly.

The communicators take as arguments the consumer to connect to and a URL. The URL is not strictly necessary but it is used by the URLRouter to inject the routes in the consumer scope. The URLRouter does not support naming routes, therefore the use of reverse() when referencing URLs is not possible.

Our consumer also needs a test for blocking unauthorized users:
    ...
    def test_chat_blocks_unauthorized_users(self):
        def init_db():
            user = factories.UserFactory(
                email="[email protected]",
                first_name="John",
                last_name="Smith",
            )
            order = factories.OrderFactory()
            return user, order
        async def test_body():
            user, order = await database_sync_to_async(init_db)()
            communicator = WebsocketCommunicator(
                consumers.ChatConsumer,
                "/ws/customer-service/%d/" % order.id,
            )
            communicator.scope["user"] = user
            communicator.scope["url_route"] = {
                "kwargs": {"order_id": order.id}
            }
            connected, _ = await communicator.connect()
            self.assertFalse(connected)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test_body())

Admittedly, the low-level asyncio functions here are simplistic. If you are going to start writing many asynchronous tests, you may want to look at Pytest, which is what Django Channels uses internally for releases.

Chat Dashboard (with Presence)

Our customer-service representatives need to be able to see if there are customers waiting to be served. We need to build a dashboard that dynamically updates with a list of chat rooms and the various people in it.

We are going to adopt a similar approach to the chat we built before, but instead of WebSockets we will use a simpler unidirectional approach. We will use HTTP server-sent events.

Server-sent events (SSE) is essentially an HTTP connection that stays open and keeps receiving chunks of information, as soon as they happen. Every chunk of information is prefixed with the word “data:” and it terminated with two newline characters.

I invite you to read some documentation online about the protocol format3. SSE is not as complex as WebSockets. It is quite easy to implement, and all major browsers support it.

Similarly to what we did before, we will use reconnecting-eventsource, which will deal with unstable connections for us:
$ curl -o main/static/js/reconnecting-eventsource.js
    https://cdn.jsdelivr.net/npm/[email protected]/
    dist/ReconnectingEventSource.js
In the same spirit of the chat page, this is our very simple dashboard, stored in main/templates/customer_service.html:
{% load static %}
<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8"/>
    <title>Chat Rooms</title>
    <script
      src="{% static "js/reconnecting-eventsource.js" %}"
      charset="utf-8"></script>
    </head>
    <body>
      <h1>Customer chats</h1>
      <div id="notification-area"></div>
      <script>
        var source = new ReconnectingEventSource('/customer-service/notify/');
        source.addEventListener('message', function (e) {
          document
            .getElementById("notification-area")
            .innerHTML = "";
          var data = JSON.parse(e.data);
          var html;
          for (var i = 0; i < data.length; i++) {
            html = '<div><a href="' + data[i]['link'] + '">' + data[i]['text'] + '</a></div>';
            document
              .getElementById("notification-area")
              .innerHTML += html;
          }
        }, false);
    </script>
  </body>
</html>
../images/466106_1_En_8_Chapter/466106_1_En_8_Fig1_HTML.png
Figure 8-1

Customer-service dashboard

We are again going to add a URL to main/urls.py to serve the template above:
...
urlpatterns = [
    ...
    path(
        "customer-service/",
        TemplateView.as_view(
            template_name="customer_service.html"
        ),
        name="cs_main",
    ),
]

HTTP Server-Sent Events Consumer

The above page will receive dynamic data from a new consumer in main/consumers.py:
...
import asyncio
import json
from django.urls import reverse
from channels.exceptions import StopConsumer
from channels.generic.http import AsyncHttpConsumer
...
class ChatNotifyConsumer(AsyncHttpConsumer):
    def is_employee_func(self, user):
        return not user.is_anonymous and user.is_employee
    async def handle(self, body):
        is_employee = await database_sync_to_async(
            self.is_employee_func
        )(self.scope["user"])
        if is_employee:
            logger.info(
                "Opening notify stream for user %s and params %s",
                self.scope.get("user"),
                self.scope.get("query_string"),
            )
            await self.send_headers(
                headers=[
                    ("Cache-Control", "no-cache"),
                    ("Content-Type", "text/event-stream"),
                    ("Transfer-Encoding", "chunked"),
                ]
            )
            self.is_streaming = True
            self.no_poll = (
                self.scope.get("query_string") == "nopoll"
            )
            asyncio.get_event_loop().create_task(self.stream())
        else:
            logger.info(
                "Unauthorized notify stream for user %s and params %s",
                self.scope.get("user"),
                self.scope.get("query_string"),
            )
            raise StopConsumer("Unauthorized")
    async def stream(self):
        r_conn = await aioredis.create_redis("redis://localhost")
        while self.is_streaming:
            active_chats = await r_conn.keys(
                "customer-service_*"
            )
            presences = {}
            for i in active_chats:
                _, order_id, user_email = i.decode("utf8").split(
                    "_"
                )
                if order_id in presences:
                    presences[order_id].append(user_email)
                else:
                    presences[order_id] = [user_email]
            data = []
            for order_id, emails in presences.items():
                data.append(
                    {
                        "link": reverse(
                        "cs_chat",
                        kwargs={"order_id": order_id}
                    ),
                    "text": "%s (%s)"
                    % (order_id, ", ".join(emails)),
                }
            )
            payload = "data: %s " % json.dumps(data)
            logger.info(
                "Broadcasting presence info to user %s",
                self.scope["user"],
            )
            if self.no_poll:
                await self.send_body(payload.encode("utf-8"))
                self.is_streaming = False
            else:
                await self.send_body(
                    payload.encode("utf-8"),
                    more_body=self.is_streaming,
                )
                await asyncio.sleep(5)
    async def disconnect(self):
        logger.info(
            "Closing notify stream for user %s",
            self.scope.get("user"),
        )
        self.is_streaming = False

This consumer implements the AsyncHttpConsumer interface with the handle() and disconnect() methods. Due to the streaming nature of the endpoint we are trying to build, we need to keep the connection open. We will be calling the send_headers() method to start the HTTP response, and we will call send_body(), with a more_body argument set to True, in a separate asynchronous task that stays active.

The stream() method activation is added to the event loop, and will stay active until the disconnect() method is called (disconnection initiated by the client). When this method is called, it will set an is_streaming flag to False, which will cause the stream() inner loop, running in a different asyncio task, to exit.

The stream() method will periodically read from Redis the keys that are not expired, and send them back to the client. If, at connection time, the nopoll flag is passed in, it will exit the loop without waiting for a client disconnection.

Streaming only happens for authorized users. For unauthorized users, the handle() method will raise a StopConsumer exception, which will stop the consumer and close the current connection with the client.

Routing

The SSE consumer is the first HTTP consumer that we have created, and it will require some additional configuration to work. We will define all non-websocket HTTP routes in main/routing.py:
from channels.auth import AuthMiddlewareStack
...
http_urlpatterns = [
    path(
        "customer-service/notify/",
        AuthMiddlewareStack(
            consumers.ChatNotifyConsumer
        )
    )
]
And we will override the default HTTP route in booktime/routing.py with a custom one:
from django.urls import re_path
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.http import AsgiHandler
import main.routing
application = ProtocolTypeRouter(
    {
        "websocket": AuthMiddlewareStack(
            URLRouter(main.routing.websocket_urlpatterns)
        ),
        "http": URLRouter(
            main.routing.http_urlpatterns
            + [re_path(r"", AsgiHandler)]
        ),
    }
)

The HTTP route now is associated with a URLRouter that includes our http_urlpatterns defined in main/routing.py and, for all other routes, falls back to AsgiHandler, which in turn forwards to the standard Django handlers.

AsgiHandler is a translator from the Asynchronous Server Gateway Interface (ASGI) protocol to the Web Server Gateway Interface (WSGI) protocol, which is an internal protocol used by core Django.

Automated Testing

Testing for the SSE consumer will use a different type of communicator, HttpCommunicator . We will use that after establishing a WebSocket connection and after the connection timed out, and we will check whether the right events are returned from the SSE request. The timeout is to give time to Redis to remove the entries from memory.
...
import json
from channels.testing import HttpCommunicator
class TestConsumers(TestCase):
    ...
    def test_chat_presence_works(self):
        def init_db():
            user = factories.UserFactory(
                email="[email protected]",
                first_name="John",
                last_name="Smith",
            )
            order = factories.OrderFactory(user=user)
            cs_user = factories.UserFactory(
                email="[email protected]",
                first_name="Adam",
                last_name="Ford",
                is_staff=True,
            )
            employees, _ = Group.objects.get_or_create(
                name="Employees"
            )
            cs_user.groups.add(employees)
            return user, order, cs_user
        async def test_body():
            user, order, notify_user = await database_sync_to_async(
                init_db
            )()
            communicator = WebsocketCommunicator(
                consumers.ChatConsumer,
                "/ws/customer-service/%d/" % order.id,
            )
            communicator.scope["user"] = user
            communicator.scope["url_route"] = {
                "kwargs": {"order_id": order.id}
            }
            connected, _ = await communicator.connect()
            self.assertTrue(connected)
            await communicator.send_json_to(
                {"type": "heartbeat"}
            )
            await communicator.disconnect()
            communicator = HttpCommunicator(
                consumers.ChatNotifyConsumer,
                "GET",
                "/customer-service/notify/",
            )
            communicator.scope["user"] = notify_user
            communicator.scope["query_string"] = "nopoll"
            response = await communicator.get_response()
            self.assertTrue(
                response["body"].startswith(b"data: ")
            )
            payload = response["body"][6:]
            data = json.loads(payload.decode("utf8"))
            self.assertEquals(
                data,
                [
                    {
                        "link": "/customer-service/%d/" % order.id,
                        "text": "%d ([email protected])" % order.id,
                    }
                ],
                "expecting someone in the room but no one found",
            )
            await asyncio.sleep(10)
            communicator = HttpCommunicator(
                consumers.ChatNotifyConsumer,
                "GET",
                "/customer-service/notify/",
            )
            communicator.scope["user"] = notify_user
            communicator.scope["query_string"] = "nopoll"
            response = await communicator.get_response()
            self.assertTrue(
                response["body"].startswith(b"data: ")
            )
            payload = response["body"][6:]
            data = json.loads(payload.decode("utf8"))
            self.assertEquals(
                data,
                [],
                "expecting no one in the room but someone found",
            )
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test_body())

Mobile APIs

We will stop using Django Channels for the rest of this chapter. The rest of the APIs that we are planning to use from our mobile app will be built using Django REST Framework (DRF). We will build an authentication endpoint and an API for order retrieval.

Authentication

For our mobile app, we will use a token-based authentication. This is best practice for non-web apps, as it gives us a bit more security in case the client device is compromised. No credentials are stored on the device, besides the token, which can be easily invalidated if needs to be.

To do so, we will need to enable this in Django Rest Framework:
INSTALLED_APPS = [
    ...
    "rest_framework",
    "rest_framework.authtoken",
    ...
]
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": (
        "rest_framework.authentication.SessionAuthentication",
        "rest_framework.authentication.TokenAuthentication",
        "rest_framework.authentication.BasicAuthentication",
    ),
    ...
}
This will add an extra model to our system. The previous step will require running the migrate command . After this, we will automatically generate a token for every new user with a new signal in main/signals.py:
from django.conf import settings
from rest_framework.authtoken.models import Token
...
@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def create_auth_token(
    sender, instance=None, created=False, **kwargs
):
    if created:
        Token.objects.create(user=instance)
Every new user, from now on, can access authenticated DRF endpoints with tokens, besides the already-existing methods. Next we need to create the login endpoint, which we can add at the bottom of our main/urls.py:
from rest_framework.authtoken import views as authtoken_views
...
urlpatterns = [
    ...
    path(
        "mobile-api/auth/",
        authtoken_views.obtain_auth_token,
        name="mobile_token",
    ),
]
That is it. We have a working mobile authentication endpoint. To complete this, there is the associated test (main/tests/test_endpoints.py):
from django.urls import reverse
from rest_framework.test import APITestCase
from main import models
class TestEndpoints(APITestCase):
    def test_mobile_login_works(self):
        user = models.User.objects.create_user(
            "user1", "abcabcabc"
        )
        response = self.client.post(
            reverse("mobile_token"),
            {"username": "user1", "password": "abcabcabc"},
        )
        jsonresp = response.json()
        self.assertIn("token", jsonresp)

Retrieving Orders

Our mobile app will need a way to retrieve the orders of the current authenticated user. We will add a new endpoint to our main/endpoints.py:
from rest_framework.decorators import (
    api_view,
    permission_classes,
)
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
...
@api_view()
@permission_classes((IsAuthenticated,))
def my_orders(request):
    user = request.user
    orders = models.Order.objects.filter(user=user).order_by(
        "-date_added"
    )
    data = []
    for order in orders:
        data.append(
            {
                "id": order.id,
                "image": order.mobile_thumb_url,
                "summary": order.summary,
                "price": order.total_price,
            }
        )
    return Response(data)

This is a Django Rest Framework API defined as a function. DRF allows us to define, just as Django, endpoints as classes or functions. Because this is a custom read-only endpoint, it is easier to define it as a function rather than as a class.

This view uses some new properties of the Order model :
class Order(models.Model):
    ...
    @property
    def mobile_thumb_url(self):
        products = [i.product for i in self.lines.all()]
        if products:
            img = products[0].productimage_set.first()
            if img:
                return img.thumbnail.url
    @property
    def summary(self):
        product_counts = self.lines.values(
            "product__name"
        ).annotate(c=Count("product__name"))
        pieces = []
        for pc in product_counts:
            pieces.append(
                "%s x %s" % (pc["c"], pc["product__name"])
            )
        return ", ".join(pieces)
    @property
    def total_price(self):
        res = self.lines.aggregate(
            total_price=Sum("product__price")
        )
        return res["total_price"]
Finally, the view needs an URL in main/urls.py:
urlpatterns = [
    ...
    path(
        "mobile-api/my-orders/",
        endpoints.my_orders,
        name="mobile_my_orders",
    ),
]
This is enough for this view to work. We are going to add the specific test to main/test/test_endpoints.py, along with the previous one:
...
from rest_framework import status
from rest_framework.authtoken.models import Token
from main import factories
class TestEndpoints(APITestCase):
    ...
    def test_mobile_flow(self):
        user = factories.UserFactory(email="[email protected]")
        token = Token.objects.get(user=user)
        self.client.credentials(
            HTTP_AUTHORIZATION="Token " + token.key
        )
        orders = factories.OrderFactory.create_batch(
            2, user=user
        )
        a = factories.ProductFactory(
            name="The book of A", active=True, price=12.00
        )
        b = factories.ProductFactory(
            name="The B Book", active=True, price=14.00
        )
        factories.OrderLineFactory.create_batch(
            2, order=orders[0], product=a
        )
        factories.OrderLineFactory.create_batch(
            2, order=orders[1], product=b
        )
        response = self.client.get(reverse("mobile_my_orders"))
        self.assertEqual(
            response.status_code, status.HTTP_200_OK
        )
        expected = [
            {
                "id": orders[1].id,
                "image": None,
                "price": 28.0,
                "summary": "2 x The B Book",
            },
            {
                "id": orders[0].id,
                "image": None,
                "price": 24.0,
                "summary": "2 x The book of A",
            },
        ]
        self.assertEqual(response.json(), expected)

Order Shipment Tracking

BookTime relies on an external company to ship the items bought online. This company has its own tracking system, and we want to use it to deliver this extra information back to our mobile app users.

We are going to add a custom endpoint to get the shipment status of an order. Let’s pretend the delivery company has supplied BookTime with access to an HTTP API that we can use to get real-time information about the status of the shipment, and that is where this information will come from.

The API of the delivery company will not be used directly by the mobile app because we may want to add other companies in the future, without having to change the mobile app code.

This is what our system will do:
  • Receive mobile app requests for shipment information

  • Issue API requests to the third-party company

  • Forward shipment information back to the mobile app

To put it succinctly, it will look like a reverse-proxy system.

This is a good use case for Channels given its asynchronous nature. Doing this in an asynchronous manner will make our solution scale to a higher number of simultaneous requests.

If we did this in the standard synchronous way, we would potentially block all threads on waiting for the remote system to return us shipment statuses. If that were to occur, our API would become unresponsive. We need to avoid this.

By using non-blocking, asynchronous code, our APIs will not become unresponsive, even if the third-party system that we are interfacing with goes offline.

To simulate this hypothetical API, we will use Pastebin. Go to https://pastebin.com , click the New Paste button, type In Transit in the text box, and click Create New Paste. Once you have done so (see Figure 8-2), click the “raw” label to get its raw URL. Copy the URL. We will use this to simulate our API.
../images/466106_1_En_8_Chapter/466106_1_En_8_Fig2_HTML.png
Figure 8-2

Click “raw” to copy the raw Pastebin URL for the API simulation

Now that we have our, admittedly simple, test API, we can write a client for it. This API will need to be consumed with an asynchronous network library. For the same reasons we used aioredis in the previous sections, we will use an HTTP client library called aiohttp , which needs to be installed:
$ pipenv install aiohttp
Having done so, we are ready to write our consumer. We will add this to main/consumers.py (replace put_url_here with your Pastebin URL):
import aiohttp
...
...
class OrderTrackerConsumer(AsyncHttpConsumer):
    def verify_user(self, user, order_id):
        order = get_object_or_404(models.Order, pk=order_id)
        return order.user == user
    async def query_remote_server(self, order_id):
        async with aiohttp.ClientSession() as session:
            async with session.get(
                "http://pastebin.com/put_url_here"
            ) as resp:
                return await resp.read()
    async def handle(self, body):
        self.order_id = self.scope["url_route"]["kwargs"][
            "order_id"
        ]
        is_authorized = await database_sync_to_async(
            self.verify_user
        )(self.scope["user"], self.order_id)
        if is_authorized:
            logger.info(
                "Order tracking request for user %s and order %s",
                self.scope.get("user"),
                self.order_id
            )
            payload = await self.query_remote_server(self.order_id)
            logger.info(
                "Order tracking response %s for user %s and order %s",
                payload,
                self.scope.get("user"),
                self.order_id
            )
            await self.send_response(200, payload)
        else:
            raise StopConsumer("unauthorized")

This consumer is fully asynchronous, with the exception of database queries. It is accepting a request in with an order ID specified in the URL, then forwarding back to the client the result of query_remote_server().

query_remote_server() is using the library that we just installed to issue a GET request to the remote Pastebin we just created. The result of this will simply be passed back to the client.

This consumer will need a URL, which needs to be added to main/routing.py:
...
http_urlpatterns = [
    ...
    path(
        "mobile-api/my-orders/<int:order_id>/tracker/",
        AuthMiddlewareStack(consumers.OrderTrackerConsumer),
    )
]

You should now be able to test this. Please make sure you have at least one order from your current user in the system. If you navigate to the preceding URL with the correct order ID, you should be seeing the content of your Pastebin in your browser.

The test for this will be inside main/tests/test_consumers.py:
from unittest.mock import patch, MagicMock
...
class TestConsumers(TestCase):
    ...
    def test_order_tracker_works(self):
        def init_db():
            user = factories.UserFactory(
                email="[email protected]"
            )
            order = factories.OrderFactory(user=user)
            return user, order
        async def test_body():
            user, order = await database_sync_to_async(
                init_db
            )()
            awaitable_requestor = asyncio.coroutine(
                MagicMock(return_value=b"SHIPPED")
            )
            with patch.object(
                consumers.OrderTrackerConsumer, "query_remote_server"
            ) as mock_requestor:
                mock_requestor.side_effect = awaitable_requestor
                communicator = HttpCommunicator(
                    consumers.OrderTrackerConsumer,
                    "GET",
                    "/mobile-api/my-orders/%d/tracker/" % order.id,
                )
                communicator.scope["user"] = user
                communicator.scope["url_route"] = {
                    "kwargs": {"order_id": order.id}
                }
                response = await communicator.get_response()
                data = response["body"].decode("utf8")
                mock_requestor.assert_called_once()
                self.assertEquals(
                    data,
                    "SHIPPED"
                )
        loop = asyncio.get_event_loop()
        loop.run_until_complete(test_body())

In the preceding test, we are patching the whole method query_remote_server() with an asynchronous function that returns the string “SHIPPED”. In this way, we are excluding the HTTP client from our tests, which makes it faster to run and a good compromise for us, given its simplicity.

Bringing It All Together

There are still a few changes to apply for our mobile integration to go as smoothly as possible. First of all, our token-based authentication, which works only on DRF views at the moment, needs to include WebSocket and async HTTP routes.

We need a custom AuthMiddlewareStack , which we will place in a new file booktime/auth.py (same folder of settings.py), which adds token authentication on top of all the other ways of authenticating:
from urllib.parse import parse_qs
from channels.auth import AuthMiddlewareStack
from rest_framework.authtoken.models import Token
class TokenGetAuthMiddleware:
    def __init__ (self, inner):
        self.inner = inner
    def __call__ (self, scope):
        params = parse_qs(scope["query_string"])
        if b"token" in params:
            try:
                token_key = params[b"token"][0].decode()
                token = Token.objects.get(key=token_key)
                scope["user"] = token.user
            except Token.DoesNotExist:
                pass
        return self.inner(scope)
TokenGetAuthMiddlewareStack = lambda inner: TokenGetAuthMiddleware(
    AuthMiddlewareStack(inner)
)
We will use this new middleware in our routing files. These are the changes to booktime/routing.py:
from .auth import TokenGetAuthMiddlewareStack
...
application = ProtocolTypeRouter(
    {
        "websocket": TokenGetAuthMiddlewareStack(
            URLRouter(main.routing.websocket_urlpatterns)
        ),
        ...
    }
)
These are the changes to main/routing.py:
from booktime.auth import TokenGetAuthMiddlewareStack
...
http_urlpatterns = [
    ...
    path(
        "mobile-api/my-orders/<int:order_id>/tracker/",
        TokenGetAuthMiddlewareStack(consumers.OrderTrackerConsumer),
    )
]

The last thing we need during development of the mobile app, or any mobile app, is to make sure we can serve requests coming from the network, instead of just our local browser.

To change this, we will add an extra setting to our booktime/settings.py:
ALLOWED_HOSTS = ['*']

This causes Django to allow requests with any “Host” header. Please make sure this setting is not enabled on the production environment (just like DEBUG).

From now on we will launch the dev server with the option to listen to all available network interfaces, not just the local interface. We can do so with the following command:
$ ./manage.py runserver 0.0.0.0:8000
Performing system checks...
System check identified no issues (0 silenced).
August 22, 2018 - 15:28:01
Django version 2.1, using settings 'booktime.settings'
Starting ASGI/Channels version 2.1.2 development server at http://0.0.0.0:8000/
Quit the server with CONTROL-C.
2018-08-22 15:28:01,644 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2018-08-22 15:28:01,644 - INFO - server - Configuring endpoint tcp:port=8000:interface=0.0.0.0
2018-08-22 15:28:01,645 - INFO - server - Listening on TCP address 0.0.0.0:8000

This will allow our app (running on a mobile device) to connect to our server through the local network.

Summary

The goal of this chapter was to use Channels, an extension of Django for asynchronous programming, to build a chat backend. Channels introduces many new concepts, consumers, routers, channel layers, and groups, all of which have been explained in this chapter.

In practical terms, we built a chat backend and its dashboard for our company’s customer-service representatives. We also had to reach out to Redis directly for some of the more advanced features that we required that Channels does not offer.

In the last sections, we also talked about the necessary infrastructure for a mobile app, which I will present in the next chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.234.62