Receiving messages in Python

We will use the recently installed paho-mqtt version 1.3.1 module to subscribe to a specific topic and run code when we receive messages in the topic. We will create a VehicleCommandProcessor class in the same Python file, named vehicle_mqtt_client.py, in the main virtual environment folder. This class will represent a command processor associated to an instance of the previously coded Vehicle class, configure the MQTT client and the subscription to the client, and declare the code for the callbacks that are going to be executed when certain events related to MQTT are fired.

We will split the code for the VehicleCommandProcessor class into many code snippets to make it easier to understand each code section. You have to add the next lines to the existing vehicle_mqtt_client.py Python file. The following lines declare the VehicleCommandProcessor class and its constructor, that is, the __init__ method. The code file for the sample is included in the mqtt_python_gaston_hillar_04_01 folder, in the vehicle_mqtt_client.py file:

class VehicleCommandProcessor:
commands_topic = ""
processed_commands_topic = ""
active_instance = None

def __init__(self, name, vehicle):
self.name = name
self.vehicle = vehicle
VehicleCommandProcessor.commands_topic =
"vehicles/{}/commands".format(self.name)
VehicleCommandProcessor.processed_commands_topic =
"vehicles/{}/executedcommands".format(self.name)
self.client = mqtt.Client(protocol=mqtt.MQTTv311)
VehicleCommandProcessor.active_instance = self
self.client.on_connect = VehicleCommandProcessor.on_connect
self.client.on_subscribe = VehicleCommandProcessor.on_subscribe
self.client.on_message = VehicleCommandProcessor.on_message
self.client.tls_set(ca_certs = ca_certificate,
certfile=client_certificate,
keyfile=client_key)
self.client.connect(host=mqtt_server_host,
port=mqtt_server_port,
keepalive=mqtt_keepalive)

We have to specify a name for the command processor and the Vehicle instance that the command processor will control in the name and vehicle required arguments. The constructor, that is, the __init__ method, saves the received name and vehicle in attributes with the same names. Then, the constructor sets the values for the commands_topic and processed_commands_topic class attributes. The constructor uses the received name to determine the topic name for the commands and for the successfully processed commands, based on the specifications we discussed earlier. The MQTT client will receive messages in the topic name saved in the command_topic class attribute, and will publish messages to the topic name saved in the processed_commands_topic class attribute.

Then, the constructor creates an instance of the mqtt.Client class (paho.mqtt.client.Client) that represents an MQTT client and that we will use to communicate with an MQTT server. The code assigns this instance to the client attribute (self.client). As in our previous example, we want to work with MQTT version 3.11, and therefore, we specified mqtt.MQTTv311 as the value for the protocol argument.

The code also saves a reference to this instance in the active_instance class attribute because we have to access the instance in the static methods that the constructor will specify as callbacks for the different events that the MQTT client fires. We want to have all the methods related to the vehicle command processor in the VehicleCommandProcessor class.

Then, the code assigns static methods to attributes of the self.client instance. The following table summarizes these assignments:

Attribute Assigned static method
client.on_connect VehicleCommandProcessor.on_connect
client.on_message VehicleCommandProcessor.on_message
client.on_subscribe VehicleCommandProcessor.on_subscribe

 

Static methods do not receive either self or cls as the first argument, and therefore, we can use them as callbacks with the required number of arguments. Note that we will code and analyze these static methods in the next paragraphs.

The call to the self.client.tls_set method configures encryption and authentication options. Finally, the constructor calls the client.connect method and specifies the values for the host, port, and keepalive arguments. This way, the code asks the MQTT client to establish a connection to the specified MQTT server. Remember that the connect method runs with an asynchronous execution, and therefore, it is a non-blocking call.

If you want to establish a connection with an MQTT server that isn't using TLS, you need to remove the call to the self.client.tls_set method. In addition, you need to use the appropriate port instead of the 8883 port that is specified when working with TLS. Remember that the default port when you don't work with TLS is 1883.

The following lines declare the on_connect static method that is part of the VehicleCommandProcessor class. You have to add these lines to the existing vehicle_mqtt_client.py Python file. The code file for the sample is included in the mqtt_python_gaston_hillar_04_01 folder, in the vehicle_mqtt_client.py file:

    @staticmethod
def on_connect(client, userdata, flags, rc):
print("Result from connect: {}".format(
mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED
connack code
if rc == mqtt.CONNACK_ACCEPTED:
# Subscribe to the commands topic filter
client.subscribe(
VehicleCommandProcessor.commands_topic,
qos=2)

After a connection has been successfully established with the MQTT server, the specified callback in the self.client.on_connect attribute will be executed, that is, the on_connect static method (marked with the @staticmethod decorator). This static method receives the mqtt.Client instance that established the connection with the MQTT server in the client argument.

The code checks the value of the rc argument that provides the CONNACK code returned by the MQTT server. If this value matches mqtt.CONNACK_ACCEPTED, it means that the MQTT server accepted the connection request, and therefore, the code calls the client.subscribe method with VehicleCommandProcessor.commands_topic as an argument to subscribe to the topic specified in the commands_topic class attribute and specifies a QoS level of 2 for the subscription.

The following lines declare the on_subscribe static method that is part of the VehicleCommandProcessor class. You have to add these lines to the existing vehicle_mqtt_client.py Python file. The code file for the sample is included in the mqtt_python_gaston_hillar_04_01 folder, in the vehicle_mqtt_client.py file:

    @staticmethod
def on_subscribe(client, userdata, mid, granted_qos):
print("I've subscribed with QoS: {}".format(
granted_qos[0]))

The on_subscribe static method displays the QoS level granted by the MQTT server for the topic filter we specified. In this case, we just subscribed to a single topic filter, and therefore, the code grabs the first value from the received granted_qos array.

The following lines declare the on_message static method that is part of the VehicleCommandProcessor class. You have to add these lines to the existing vehicle_mqtt_client.py Python file. The code file for the sample is included in the mqtt_python_gaston_hillar_04_01 folder, in the vehicle_mqtt_client.py file:

    @staticmethod
def on_message(client, userdata, msg):
if msg.topic == VehicleCommandProcessor.commands_topic:
print("Received message payload:
{0}".format(str(msg.payload)))
try:
message_dictionary = json.loads(msg.payload)
if COMMAND_KEY in message_dictionary:
command = message_dictionary[COMMAND_KEY]
vehicle =
VehicleCommandProcessor.active_instance.vehicle
is_command_executed = False
if KEY_MPH in message_dictionary:
mph = message_dictionary[KEY_MPH]
else:
mph = 0
if KEY_DEGREES in message_dictionary:
degrees = message_dictionary[KEY_DEGREES]
else:
degrees = 0
command_methods_dictionary = {
CMD_TURN_ON_ENGINE: lambda:
vehicle.turn_on_engine(),
CMD_TURN_OFF_ENGINE: lambda:
vehicle.turn_off_engine(),
CMD_LOCK_DOORS: lambda: vehicle.lock_doors(),
CMD_UNLOCK_DOORS: lambda:
vehicle.unlock_doors(),
CMD_PARK: lambda: vehicle.park(),
CMD_PARK_IN_SAFE_PLACE: lambda:
vehicle.park_in_safe_place(),
CMD_TURN_ON_HEADLIGHTS: lambda:
vehicle.turn_on_headlights(),
CMD_TURN_OFF_HEADLIGHTS: lambda:
vehicle.turn_off_headlights(),
CMD_TURN_ON_PARKING_LIGHTS: lambda:
vehicle.turn_on_parking_lights(),
CMD_TURN_OFF_PARKING_LIGHTS: lambda:
vehicle.turn_off_parking_lights(),
CMD_ACCELERATE: lambda: vehicle.accelerate(),
CMD_BRAKE: lambda: vehicle.brake(),
CMD_ROTATE_RIGHT: lambda:
vehicle.rotate_right(degrees),
CMD_ROTATE_LEFT: lambda:
vehicle.rotate_left(degrees),
CMD_SET_MIN_SPEED: lambda:
vehicle.set_min_speed(mph),
CMD_SET_MAX_SPEED: lambda:
vehicle.set_max_speed(mph),
}
if command in command_methods_dictionary:
method = command_methods_dictionary[command]
# Call the method
method()
is_command_executed = True
if is_command_executed:

VehicleCommandProcessor.active_instance.
publish_executed_command_message(message_dictionary)
else:
print("I've received a message with an
unsupported command.")
except ValueError:
# msg is not a dictionary
# No JSON object could be decoded
print("I've received an invalid message.")

Whenever there is a new message received in the topic saved in the commands_topic class attribute to which we have subscribed, the specified callback in the self.client.on_messsage attribute will be executed, that is, the previously coded on_message static method (marked with the @staticmethod decorator). This static method receives the mqtt.Client instance that established the connection with the MQTT server in the client argument and an mqtt.MQTTMessage instance in the msg argument.

The mqtt.MQTTMessage class describes an incoming message.

 

The msg.topic attribute indicates the topic in which the message has been received. Thus, the static method checks whether the msg.topic attribute matches the value in the commands_topic class attribute. In this case, whenever the on_message method is executed, the value in msg.topic will always match the value in the topic class attribute because we just subscribed to one topic. However, if we subscribe to more than one topic, it is always necessary to check which is the topic in which the message was sent and in which we are receiving the message. Hence, we included the code to have a clear idea of how to check the topic for the received message.

The code prints the payload for the message that has been received, that is, the msg.payload attribute. Then, the code assigns the result of the json.loads function to deserialize msg.payload to a Python object and assigns the results to the message_dictionary local variable. If the contents of msg.payload are not JSON, a ValueError exception will be captured, the code will print a message indicating that the message doesn't include a valid command, and no more code will be executed in the static method. If the contents of msg.payload are JSON, we will have a dictionary in the message_dictionary local variable.

Then, the code checks whether the value saved in the COMMAND_KEY string is included in the message_dictionary dictionary. If the expression evaluates to True, it means that the JSON message converted to a dictionary includes a command that we have to process. However, before we can process the command, we have to check which is the command, and therefore, it is necessary to retrieve the value associated with the key equivalent to the value saved in the COMMAND_KEY string. The code is capable of running specific code when the value is any of the commands that we have analyzed as requirements.

The code uses the active_instance class attribute that has a reference to the active VehicleCommandProcessor instance to call the necessary methods for the associated vehicle based on the command that has to be processed. We had to declare the callbacks as static methods, and therefore, we use this class attribute to access the active instance. Once the command has been successfully processed, the code sets the is_command_executed flag to True. Finally, the code checks the value of this flag, and if it is equal to True, the code calls the publish_executed_command_message for the VehicleCommandProcessor instance saved in the active_instance class attribute.

Of course, in a real-life example, we should add more validations. The previous code is simplified to allow us to keep our focus on MQTT.

The following lines declare the publish_executed_command_message method that is part of the VehicleCommandProcessor class. You have to add these lines to the existing vehicle_mqtt_client.py Python file. The code file for the sample is included in the mqtt_python_gaston_hillar_04_01 folder, in the vehicle_mqtt_client.py file:

    def publish_executed_command_message(self, message):
response_message = json.dumps({
SUCCESFULLY_PROCESSED_COMMAND_KEY:
message[COMMAND_KEY]})
result = self.client.publish(
topic=self.__class__.processed_commands_topic,
payload=response_message)
return result

The publish_executed_command_message method receives the message dictionary that has been received with the command in the message argument. The method calls the json.dumps function to serialize a dictionary to a JSON-formatted string with the response message that indicates the command has been successfully processed. Finally, the code calls the client.publish method with the processed_commands_topic variable as the topic argument and the JSON-formatted string (response_message) in the payload argument.

In this case, we are not evaluating the response received from the publish method. In addition, we are using the default value for the qos argument that specifies the desired quality of service. Thus, we will publish this message with a QoS level equal to 0. In Chapter 5, Testing and Improving our Vehicle Control Solution in Python, we will work with more advanced scenarios in which we will add code to check the results of the method and we will add code to the on_publish callback that is fired when a message is successfully published, as we did in our previous example. In this case, we use QoS level 2 only for the messages that we receive with the commands.

 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.245.196