When do messages get sent?

Up until this point, we've spent a fair length of time digging in to how the relevant messaging will be generated and sent, but very little about when it happens, apart from the examination of Order creation and cancellation. Since messages correspond directly to various local CRUD operations, it's tempting to simply add the messaging calls into the _create and _update methods that they already have, making sure to account for the is_dirty and is_new flags that we defined in BaseDataObject. Before going down that path, though, it would be a good idea to take a look at all of the messaging processes, from origination to completion, to make sure that they have a clear process termination. The scenario that we need to make sure to avoid, using a Product update process as an example, looks like this:

Where:

  1. An Artisan makes a change to one of their products:
    • The local data change is executed
    • Their Artisan Application sends a message to the Artisan Gateway: Update Product "X"
  2. The Artisan Gateway receives the message:
    • The local data change is executed
    • A message is sent to the corresponding Artisan Application: Update Product "X"
  3. The Artisan Application receives the message:
    • The local data change, which likely doesn't have any updated data, is executed
    • A message is sent to the Artisan Gateway: Update Product "X"

At the end of the last step, the process would, without some check process or exit condition, jump back to the second step, and into an infinite loop of update messages that don't actually do anything. The same scenario could occur with any of the update processes where more than one origin point for data changes could be in play: the Artisan objects can be updated by the artisans they represent and by Central Office staff. Order objects are currently exempt, but it's not difficult to imagine a future need for a customer to alter an Order after it's been transmitted to the artisans who'd be fulfilling items in it.

Ultimately, because the save methods of the various data object classes have no awareness of where the data change they're executing came from, they cannot make any decisions about whether or not it's appropriate to send a message out after the data change has been executed. A possible solution, then, would be to allow (or even require) an additional argument in each save that provides that information, and that could be used to determine whether a message needs to be sent or not. The structure of this modification might look something like this (for a data object living in the Artisan Application's code-base):

def save(self, origin:(str,)):
    """
Saves the instance's state-data to the back-end data-store by 
creating it if the instance is new, or updating it if the 
instance is dirty
"""
    # - Perform the data-save process as it currently exists
    if self.is_new and origin != 'artisan':
        # - Send "create" message
    elif self.is_dirty and origin != 'artisan':
        # - Send "update" message
    self._set_is_new(False)
    self._set_is_dirty(False)

It would be feasible to add an additional abstraction layer between BaseDataObject (where save is defined currently) and each of the concrete data objects that would override the BaseDataObject.save method. This abstraction–an additional ABC–would need to be created in the Artisan Application and Artisan Gateway code-bases, at a minimum, and another variant might be needed in the Central Office application as well, depending on implementation details that haven't been fully explored yet.

The trade-off is that all data objects would have to pay attention to where their data changes originated from. This feels… messy, complicated, and potentially difficult to maintain, at least at first blush.

Another possibility would be to alter DaemonMessage: if the messages themselves contain something, such as data that indicates where they originated from, then the handlers for those messages would be able to tell whether or not a message needs to be sent after the data change had been dealt with. In that design scenario, a Product update message that originated with an Artisan, including an origin specification, might look like this (before being converted to JSON):

{
    'operation':'update',
    'origin':'artisan',
    'data': {
        'target':'product',
        'properties':{
            'oid':str(new_order.oid),
            'name':'Revised Product Name',
            # - Other product-data skipped for brevity
        },
    },
    'signature':'signature hex-string'
}

The corresponding update_product handler method in the ArtisanGatewayDaemon service class, along with other handler methods, currently expects a dict (properties) to act upon, and is called by ArtisanGatewayDaemon._handle_message as the main loop of the service reads messages to be acted upon. We could change what the individual handler methods expect, passing the original message (a DaemonMessage instance) instead, making the handler methods responsible for breaking down the incoming message into the properties and acting upon them as they already do, and giving them the responsibility for determining whether a message needs to be sent and sending it.

Given a DaemonMessage with an origin, and a globally accessible value to compare that origin with, the decision to send a message or not, and sending it if needed, isn't complex. If it were anywhere in the Gateway service (that is, self is the service instance), it would look more or less like this:

# self.message_origin is an attribute containing 'gateway'
# - message is the incoming DaemonMessage instance
# - message.origin is 'artisan'
# - artisan is the relevant Artisan object
if message.origin == self.message_origin:
    sender = RabbitMQSender()
    outbound_message = DaemonMessage(
        operation=message.operation,
        origin=self.message_origin,
        data=message.data,
        signing_key=self.signing_key
    )
    sender.send_message(order_message, artisan.queue_id)

The data used to create the outbound_message might differ, depending on whether the data dictionary or message dictionary of the newly created or recently updated object was used instead.

So, when an incoming message is acted upon:

  • Its origin is checked
  • If that origin is local, then a corresponding outbound_message is created and sent, using the original operation of the incoming message, the local origin and signing_key, and whatever data is appropriate
  • Otherwise, that entire branch is skipped

That's not a lot of code to add—a mere nine lines, assuming that the sender isn't created elsewhere. The changes to DaemonMessage are pretty trivial: adding the origin property and making sure it's accounted for everywhere (basically, anywhere that the operation property is already in use). At this point, this doesn't represent a major change to existing code either—we've only created outbound messages for Order creation and updates so far.

If there is a sticking point, it's in the need to acquire the Artisan instance that relates to the operation  so that the outbound message can use the appropriate message queue (artisan.queue_id). This would be necessary no matter what approach we decide to pursue, though, so it's probably a wash in this case (and it would complicate the idea of modifying save, which we saw previously, even more).

Even with that, this feels like a solid approach. The changes to _handle_message are mostly argument and variable name changes at this point:

def _handle_message(self, message:(DaemonMessage,)) -> None:
    self.info(
        '%s._handle_message called:' % self.__class__.__name__
    )
    target = message.data.get('target')
    self.debug('+- target ....... (%s) %s' % (
        type(target).__name__, target)
    )
    self.debug('+- operation .... (%s) %s' % (
        type(message.operation).__name__, message.operation)
    )
    if message.operation == 'create':
        if target == 'artisan':
            self.create_artisan(message)

# ... removed for brevity

    elif message.operation == 'update':
        if target == 'artisan':
            self.update_artisan(message)
        elif target == 'customer':
            self.update_customer(message)
        elif target == 'order':
            self.update_order(message)
        elif target == 'product':
            self.update_product(message)
        else:
            raise RuntimeError(
                '%s error: "%s" (%s) is not a recognized '
                'object-type/target' % 
                (
                    self.__class__.__name__, target, 
                    type(target).__name__
                )
            )

    # ... removed for brevity

    else:
        raise RuntimeError(
            '%s error: "%s" (%s) is not a recognized '
            'operation' % 
            (
                self.__class__.__name__, message.operation, 
                type(message.operation).__name__
            )
        )

The handler methods (using update_product, as an example) remain largely unchanged:

def update_product(self, message:(DaemonMessage,)) -> None:
    self.info('%s.update_product called' % self.__class__.__name__)
      if type(message) != DaemonMessage:
         raise TypeError(
             '%s.update_product expects a DaemonMessage '
             'instance, but was passed "%s" (%s)' % 
             (
                self.__class__.__name__, message, 
                type(message).__name__
             )
         )

We still need the properties; we're just acquiring them in the individual handler methods instead of in _handle_message:

properties = message.data.get('properties')
self.debug('properties ... %s:' % (type(properties)))
self.debug(str(properties))

The code, from that point until the modified object is saved, remains unchanged:

#   ... and save it.
new_object.save()
self.info('Product %s updated successfully' % new_object.oid)

And then we can check to see if an outbound message needs to be sent, acquire the relevant Artisan, create the message, and send it:

if message.origin == self.message_origin:
  # - Acquire the Artisan whose Product this is
  artisan = self.get_artisan_from_product(new_object)
  sender = RabbitMQSender()
  outbound_message = DaemonMessage(
       operation=message.operation,
       origin=message.origin,
       data=message.data,
       signing_key=self.signing_key
   )
   sender.send_message(order_message, artisan.queue_id)

Since acquiring an Artisan from a Product is going to be a recurring theme, a helper method (get_artisan_from_product) was created to streamline that process. It also highlights the eventual need for a more direct association between products and artisans, but a data object query-based process will suffice for now:

def get_artisan_from_product(
       self, product:(UUID,str,BaseProduct)
    ) -> (Artisan):
    # TODO: Add artisan (owner) to Product classes, and use 
    #       that instead. For now, use this approach
    all_artisans = Artisan.get()
    if isinstance(product, BaseProduct):
       product = product.oid
    elif type(product) == str:
       product = UUID(product)
    for artisan in all_artisans:
        if product in [p.oid for p in artisan.products]:
           return artisan

A final consideration before ending this chapter: when we started this chunk of development, there was still a decision pending with respect to whether message queues were going to be implemented as a "one for all artisans" or "one for each Artisan." No formal decision was made, but there are other considerations that may have arisen as the messaging processes were being thought out:

  • Each Artisan needs at least two separate message queues: one for traffic to the Artisan, and one for traffic from them. If a single queue for all traffic is implemented, then:
    • The code would have to be altered to include both an origin (already done) and a destination in order to assure that, for example, messages dropped in a queue by the Gateway weren't also read by the Gateway
    • Even with that in place, a message that hasn't been read and acted upon by the appropriate destination would almost certainly block other messages in the queue from being read and acted upon, without still more code changes and the attendant complexity
  • If each Artisan has a distinct message queue for inbound and outbound messages, that entire set of complications will simply go away. There is some additional work that will be necessary—providing some means of identifying individual inbound and outbound queues—but if each queue handles only traffic in one direction, to or from one Artisan Application and the Gateway service, this simplifies things considerably, and the development cost should be pretty minimal.
  • As a side benefit, since each message in a queue would, simply because it came from that queue, be immediately associable with the Artisan that the queue belongs to.

The only remaining cost in having multiple message queues is that multiple queues would exist—and that, in the main, is a cost that will be borne by the message queue server.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.203.85