Patterns in Python – behavioral

Behavioral patterns are the last stage in the complexity and functionality of patterns. They also come last chronologically in the object life cycle in a system since objects are first created then built into larger structures, before they interact with each other.

These patterns encapsulate models of communication and interaction between objects. These patterns allow us to describe complex workflows that may be difficult to follow at runtime.

Typically, Behavioral patterns favor object composition over inheritance as usually, the interacting objects in a system would be from separate class hierarchies.

In this brief discussion, we will look at the following patterns: Iterator, Observer, and State.

The Iterator pattern

An iterator provides a way to access elements of a container object sequentially without exposing the underlying object itself. In other words, an iterator is a proxy that provides a single method of iterating over a container object.

Iterators are everywhere in Python, so there is no special need to introduce them.

All container/sequence types in Python, that is, list, tuple, str, and set, implement their own iterators. Dictionaries also implement iterators over their keys.

In Python, an iterator is any object that implements the magic method __iter__, and also responds to the iter function returning the iterator instance.

Usually, the iterator object that is created is hidden behind the scenes in Python.

For example, we iterate through a list as follows:

>>> for i in range(5):
...         print(i)
... 
0
1
2
3
4

Internally, something very similar to the following happens:

>>> I = iter(range(5))
>>> for i in I:
...         print(i)
... 
0
1
2
3
4

Every sequence type implements its own iterator type as well in Python. Examples for this are given as follows:

  • Lists:
    >>> fruits = ['apple','oranges','grapes']
    >>> iter(fruits)
    <list_iterator object at 0x7fd626bedba8>
    
  • Tuples:
    >>> prices_per_kg = (('apple', 350), ('oranges', 80), ('grapes', 120))
    >>> iter(prices_per_kg)
    <tuple_iterator object at 0x7fd626b86fd0>
    
  • Sets:
    >>> subjects = {'Maths','Chemistry','Biology','Physics'}
    >>> iter(subjects)
    <set_iterator object at 0x7fd626b91558>
    

Even dictionaries come with their own special key iterator type in Python3:

>>> iter(dict(prices_per_kg))
<dict_keyiterator object at 0x7fd626c35ae8>

We will explore a small example of implementing your own iterator class/type in Python now:

class Prime(object):
    """ An iterator for prime numbers """

    def __init__(self, initial, final=0):
        """ Initializer - accepts a number """
        # This may or may not be prime
        self.current = initial
        self.final = final
        
    def __iter__(self):
        return self

    def __next__(self):
        """ Return next item in iterator """
        return self._compute()

    def _compute(self):
        """ Compute the next prime number """

        num = self.current
        
        while True:
            is_prime = True
            
            # Check this number
            for x in range(2, int(pow(self.current, 0.5)+1)):
                if self.current%x==0:
                    is_prime = False
                    break


            num = self.current
            self.current += 1

            if is_prime:
                return num
            
            # If there is an end range, look for it
            if self.final > 0 and self.current>self.final:
                raise StopIteration

This preceding class is a prime number iterator, which returns prime numbers between two limits:

>>> p=Prime(2,10)
>>> for num in p:
... print(num)
... 
2
3
5
7
>>> list(Prime(2,50))
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]

The prime number iterator without the end limit is an infinite iterator. For example, the following iterator will return all prime numbers starting from 2 and will never stop:

>>> p = Prime(2)

However by combining this with the itertools module, one can extract specific data that one wants from such infinite iterators.

For example here, we use it with the islice method of itertools to compute the first 100 prime numbers:

>>> import itertools
>>> list(itertools.islice(Prime(2), 100))
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541]

Similarly, the following are the first 10 prime numbers ending with 1 in the unit's place using the filterfalse method:

>>> list(itertools.islice(itertools.filterfalse(lambda x: x % 10 != 1, Prime(2)), 10))
[11, 31, 41, 61, 71, 101, 131, 151, 181, 191]

In a similar way, the following are the first 10 palindromic primes:

>>> list(itertools.islice(itertools.filterfalse(lambda x: str(x)!=str(x)[-1::-1], Prime(2)), 10))
[2, 3, 5, 7, 11, 101, 131, 151, 181, 191]

Interested readers are referred to the documentation on the itertools module and its methods to find fun and interesting ways to use and manipulate data for such infinite generators.

The Observer pattern

The Observer pattern decouples objects, but at the same time allows one set of objects (Subscribers) to keep track of the changes in another object (the Publisher). This avoids one-to-many dependency and references while keeping their interaction alive.

This pattern is also called Publish-Subscribe.

The following is a rather simple example using an Alarm class, which runs in its own thread and generates periodic alarms every second (by default). It also works as a Publisher class, notifying its subscribers whenever the alarm happens.

import threading
import time

from datetime import datetime

class Alarm(threading.Thread):
    """ A class which generates periodic alarms """

    def __init__(self, duration=1):
        self.duration = duration
        # Subscribers
        self.subscribers = []
        self.flag = True
        threading.Thread.__init__(self, None, None)

    def register(self, subscriber):
        """ Register a subscriber for alarm notifications """

        self.subscribers.append(subscriber)

    def notify(self):
        """ Notify all the subscribers """

        for subscriber in self.subscribers:
            subscriber.update(self.duration)
        
    def stop(self):
        """ Stop the thread """

        self.flag = False
        
    def run(self):
        """ Run the alarm generator """

        while self.flag:
            time.sleep(self.duration)
            # Notify
            self.notify()

Our subscriber is a simple DumbClock class, which subscribes to the Alarm object for its notifications and, using that, updates its time:

class DumbClock(object):
    """ A dumb clock class using an Alarm object """

    def __init__(self):
        # Start time
        self.current = time.time()

    def update(self, *args):
        """ Callback method from publisher """

        self.current += args[0]

    def __str__(self):
        """ Display local time """
        
        return datetime.fromtimestamp(self.current).strftime('%H:%M:%S')

Let's get these objects ticking:

  1. First create the alarm with a notification period of 1 second. This allows:
    >>> alarm=Alarm(duration=1)
  2. Next create the DumbClock object:
    >>> clock=DumbClock()
  3. Finally, register the clock object on the alarm object as an observer so that it can receive notifications:
    >>> alarm.register(clock)
  4. Now the clock will keep receiving updates from the alarm. Every time you print the clock, it will show the current time correct to the second:
    >>> print(clock)
    10:04:27

    After a while, it will show you the following:

    >>> print(clock)
    10:08:20
  5. Then it will sleep for a while and print:
    >>> print(clock);time.sleep(20);print(clock)
    10:08:23
    10:08:43

The following are some aspects to keep in mind when implementing observers:

  • References to subscribers: Publishers can choose to keep a reference to subscribers or use a Mediator pattern to get a reference when required. A Mediator pattern decouples many objects in a system from strongly referencing each other. In Python, for example, this could be a collection of weak references or proxies or an object managing such a collection if both publisher and subscriber objects are in the same Python runtime. For remote references, one can use a remote proxy.
  • Implementing Callbacks: In this example, the Alarm class directly updates the state of the subscriber by calling its update method. An alternate implementation is for the publisher to simply notify the subscribers, at which point they query the state of the Publisher using a get_state type of method to implement their own state change:

    This is the preferred option for a Publisher which may be interacting with subscribers of different types/classes. This also allows for decoupling code from the Publisher to the Subscriber as the publisher doesn't have to change its code if the update or notify method of the Subscriber changes.

  • Synchronous versus Asynchronous: In this example, the notify is called in the same thread as the Publisher when the state is changed since the clock needs reliable and immediate notifications to be accurate. In an asynchronous implementation, this could be done asynchronously so that the main thread of the Publisher continues running; for example this may be the preferred approach in systems using asynchronous execution, which returns a future object upon notification, but the actual notification may occur sometime later.

Since we've already encountered asynchronous processing in Chapter 5, Writing Applications That Scale, we will conclude our discussion on the Observer pattern with one more example, showing an asynchronous example where the Publisher and Subscriber interact asynchronously. We will be using the asyncio module in Python for this.

For this example, we will be using the domain of news publishing. Our publisher gets news stories from various sources as news URLs which are tagged to certain specific news channels. Examples of such channels could be—"sports", "international", "technology", "India", and so on.

News subscribers register for news channels they're interested in, consuming news stories as URLs. Once they get a URL they fetch the data of the URL asynchronously. The publisher-to-subscriber notification also happens asynchronously.

The following is the source code for our publisher:

  import weakref
  import asyncio

  from collections import defaultdict, deque

  class NewsPublisher(object):
    """ A news publisher class with asynchronous notifications """

    def __init__(self):
        # News channels
        self.channels = defaultdict(deque)
        self.subscribers = defaultdict(list)
        self.flag = True

    def add_news(self, channel, url):
        """ Add a news story """

        self.channels[channel].append(url)
        
    def register(self, subscriber, channel):
        """ Register a subscriber for a news channel """
        
        self.subscribers[channel].append(weakref.proxy(subscriber))

    def stop(self):
        """ Stop the publisher """

        self.flag = False
        
    async def notify(self):
        """ Notify subscribers """

        self.data_null_count = 0

        while self.flag:
            # Subscribers who were notified
            subs = []
            
            for channel in self.channels:
                try:
                    data = self.channels[channel].popleft()
                except IndexError:
                    self.data_null_count += 1
                    continue

                subscribers = self.subscribers[channel]
                for sub in subscribers:
                    print('Notifying',sub,'on channel',channel,'with data=>',data)
                    response = await sub.callback(channel, data)
                    print('Response from',sub,'for channel',channel,'=>',response)
                    subs.append(sub)

            await asyncio.sleep(2.0)

The publisher's notify method is asynchronous. It goes through list of channels, finds the subscribers to each of them, and calls back to the subscriber using its callback method, supplying it with the most recent data from the channel.

The callback method itself being asynchronous, it returns a future and no final processed result. Further processing of this future occurs asynchronously inside the fetch_urls method of the subscriber.

The following is the source code for the subscriber:

import aiohttp

class NewsSubscriber(object):
    """ A news subscriber class with asynchronous callbacks """
    
    def __init__(self):
        self.stories = {}
        self.futures = []
        self.future_status = {}
        self.flag = True

    async def callback(self, channel, data):
        """ Callback method """

        # The data is a URL
        url = data
        # We return the response immediately
        print('Fetching URL',url,'...')
        future = aiohttp.request('GET', url)
        self.futures.append(future)
            
        return future
            
    async def fetch_urls(self):

        while self.flag:

            for future in self.futures:
                # Skip processed futures
                if self.future_status.get(future):
                    continue

                response = await future

                # Read data
                data = await response.read()

                print('	',self,'Got data for URL',response.url,'length:',len(data))
                self.stories[response.url] = data
                # Mark as such
                self.future_status[future] = 1

            await asyncio.sleep(2.0)

Notice how both the callback and fetch_urls methods are both declared as asynchronous. The callback method passes the URL from the publisher to the aiohttp module's GET method, which simply returns a future.

The future is appended as a local list of futures, which is processed again asynchronously by the fetch_urls method to get the URL data, which is then appended to the local stories dictionary with the URL as the key.

The following is the asynchronous loop part of the code.

Take a look at the following steps:

  1. To get things started, we create a publisher and add some news stories via specific URLs to couple of channels on the publisher:
          publisher = NewsPublisher()
        
          # Append some stories to the 'sports' and 'india' channel
    
          publisher.add_news('sports', 'http://www.cricbuzz.com/cricket-news/94018/collective-dd-show-hands-massive-loss-to-kings-xi-punjab')
    
          publisher.add_news('sports', 'https://sports.ndtv.com/indian-premier-league-2017/ipl-2017-this-is-how-virat-kohli-recovered-from-the-loss-against-mumbai-indians-1681955')
    
    publisher.add_news('india','http://www.business-standard.com/article/current-affairs/mumbai-chennai-and-hyderabad-airports-put-on-hijack-alert-report-117041600183_1.html')
        publisher.add_news('india','http://timesofindia.indiatimes.com/india/pakistan-to-submit-new-dossier-on-jadhav-to-un-report/articleshow/58204955.cms')
  2. We then create two subscribers, one listening to the sports channel and the other to the india channel:
        subscriber1 = NewsSubscriber()
        subscriber2 = NewsSubscriber()  
        publisher.register(subscriber1, 'sports')
        publisher.register(subscriber2, 'india') 
  3. Now we create the asynchronous event loop:
        loop = asyncio.get_event_loop()
  4. Next, we add the tasks as co-routines to the loop to get the asynchronous loop to start its processing. We need to add the following three tasks:
    • publisher.notify():
    • subscriber.fetch_urls(): (one for each of the two subscribers)
  5. Since both the publisher and subscriber processing loops never exit, we add a timeout to processing via its wait method:
        tasks = map(lambda x: x.fetch_urls(), (subscriber1, subscriber2))
        loop.run_until_complete(asyncio.wait([publisher.notify(), *tasks],                                    timeout=120))
    
        print('Ending loop')
        loop.close()

    The following is our asynchronous Publisher and Subscriber(s) in action, on the console.

    The Observer pattern

We now move on to the last pattern in our discussion of design patterns, namely the State pattern.

The State pattern

A State pattern encapsulates the internal state of an object in another class (state object). The object changes its state by switching the internally encapsulated state object to different values.

A State object and its related cousin, Finite State Machine (FSM) allow a programmer to implement state transitions seamlessly across different states for the object without requiring complex code.

In Python, the State pattern can be implemented easily, since Python has a magic attribute for an object's class: the __class__ attribute.

It may sound a bit strange, but in Python this attribute can be modified on the dictionary of the instance! This allows the instance to dynamically change its class, something which we can take advantage of to implement this pattern in Python.

The following is a simple example showing this:

>>> class C(object):
...     def f(self): return 'hi'
... 
>>> class D(object): pass
... 
>>> c = C()
>>> c
<__main__.C object at 0x7fa026ac94e0>
>>> c.f()
'hi'
>>> c.__class__=D
>>> c
<__main__.D object at 0x7fa026ac94e0>
>>> c.f()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'D' object has no attribute 'f'

We were able to change the class of the object c at runtime. In this example, this proved dangerous, since C and D are unrelated classes, so this is never a smart thing to do in such cases. This is evident in the way c forgot its f method when it changed to an instance of class D (D has no f method).

However, for related classes, and more specifically, subclasses of a parent class implementing the same interface, this gives a lot of power, and can be used to implement patterns such as State.

In the following example, we have used this technique to implement the State pattern. It shows a computer which can switch from one state to another.

Notice how we are using an iterator to define this class since an iterator defines movement to the next position naturally according to its nature. We are taking advantage of this fact to implement our State pattern:

import random

class ComputerState(object):
    """ Base class for state of a computer """
    
    # This is an iterator
    name = "state"
    next_states = []
    random_states = []

    def __init__(self):
        self.index = 0
        
    def __str__(self):
        return self.__class__.__name__

    def __iter__(self):
        return self

    def change(self):
        return self.__next__()

    def set(self, state):
        """ Set a state """

        if self.index < len(self.next_states):
            if state in self.next_states:
                # Set index
                self.index = self.next_states.index(state)
                self.__class__ = eval(state)
                return self.__class__
            else:
                # Raise an exception for invalid state change    
              current = self.__class__
                new = eval(state)
                raise Exception('Illegal transition from %s to %s' % (current, new))
        else:
            self.index = 0
            if state in self.random_states:
                self.__class__ = eval(state)
                return self.__class__
            
    def __next__(self):
        """ Switch to next state """

        if self.index < len(self.next_states):
            # Always move to next state first
            self.__class__ = eval(self.next_states[self.index])
            # Keep track of the iterator position
            self.index += 1
            return self.__class__
        else:
             # Can switch to a random state once it completes
            # list of mandatory next states.
            # Reset index
            self.index = 0
            if len(self.random_states):
                state = random.choice(self.random_states)
                self.__class__ = eval(state)
                return self.__class__
            else:
                raise StopIteration

Now let's define some concrete subclasses of the ComputerState class.

Each class can define a list of next_states which is a set of legal states the current state can switch to. It can also define a list of random states which are random legal states it can switch to once it has switched to the next state.

For example, the following is the first state: the off state of the computer. The next compulsory state is of course the on state. Once the computer is on, this state can move off to any of the other random states.

Hence the definition is as follows:

class ComputerOff(ComputerState):
    next_states = ['ComputerOn']
    random_states = ['ComputerSuspend', 'ComputerHibernate', 'ComputerOff']

Similarly, the following are the definitions of the other state classes:

class ComputerOn(ComputerState):
    # No compulsory next state    
    random_states = ['ComputerSuspend', 'ComputerHibernate', 'ComputerOff']

class ComputerWakeUp(ComputerState):
    # No compulsory next state
    random_states = ['ComputerSuspend', 'ComputerHibernate', 'ComputerOff']
    
class ComputerSuspend(ComputerState):
    next_states = ['ComputerWakeUp']  
    random_states = ['ComputerSuspend', 'ComputerHibernate', 'ComputerOff']

class ComputerHibernate(ComputerState):
    next_states = ['ComputerOn']  
    random_states = ['ComputerSuspend', 'ComputerHibernate', 'ComputerOff']

Finally, the following is the class for the Computer which uses the state classes to set its internal state.

class Computer(object):
    """ A class representing a computer """

    def __init__(self, model):
        self.model = model
        # State of the computer - default is off.
        self.state = ComputerOff()

    def change(self, state=None):
        """ Change state """

        if state==None:
            return self.state.change()
        else:
            return self.state.set(state)

    def __str__(self):
        """ Return state """
        return str(self.state)

The following are some interesting aspects of this implementation:

  • State as an iterator: We have implemented the ComputerState class as an iterator. This is because a state has, naturally, a list of immediate future states it can switch to and nothing else. For example, a computer in an Off state can move only to the On state next. Defining it as an iterator allows us to take advantage of the natural progression of an iterator from one state to next.
  • Random States: We have implemented the concept of random states in this example. Once a computer moves from one state to its mandatory next state (On to Off, Suspend to WakeUp), it has a list of random states available to move on to. A computer that is On need not always be switched off. It can also go to Sleep (Suspend) or Hibernate.
  • Manual Change: The computer can move to a specific state via the second optional argument of the change method. However, this is possible only if the state change is valid; otherwise an exception is raised.

We will now see our State pattern in action.

The computer is off to start with, of course:

>>> c = Computer('ASUS')
>>> print(c)
ComputerOff

Let's see some automatic state changes:

>>> c.change()
<class 'state.ComputerOn'>

And now, let the state machine decide its next states—note these are random states till the computer enters a state where it has to mandatorily move on to the next state:

>>> c.change()
<class 'state.ComputerHibernate'>

Now the state is Hibernate, which means the next state has to be On as it is a compulsory next state:

>>> c.change()
<class 'state.ComputerOn'>
>>> c.change()
<class 'state.ComputerOff'>

Now the state is Off, which means the next state has to be On:

>>> c.change()
<class 'state.ComputerOn'>

The following are all random state changes:

>>> c.change()
<class 'state.ComputerSuspend'>
>>> c.change()
<class 'state.ComputerWakeUp'>
>> c.change()
<class 'state.ComputerHibernate'>

Now, since the underlying state is an iterator, one can even iterate on the state using a module such as itertools.

The following is an example of this – iterating on the next five states of the computer:

>>> import itertools
>>> for s in itertools.islice(c.state, 5):
... print (s)
... 
<class 'state.ComputerOn'>
<class 'state.ComputerOff'>
<class 'state.ComputerOn'>
<class 'state.ComputerOff'>
<class 'state.ComputerOn'>

Now let's try some manual state changes:

>>> c.change('ComputerOn')
<class 'state.ComputerOn'>
>>> c.change('ComputerSuspend')
<class 'state.ComputerSuspend'>

>>> c.change('ComputerHibernate')
Traceback (most recent call last):
  File "state.py", line 133, in <module>
      print(c.change('ComputerHibernate'))        
  File "state.py", line 108, in change
      return self.state.set(state)
  File "state.py", line 45, in set
      raise Exception('Illegal transition from %s to %s' % (current, new))
Exception: Illegal transition from <class '__main__.ComputerSuspend'> to <class '__main__.ComputerHibernate'>

We get an exception when we try an invalid state transition, as the computer cannot go directly from Suspend to Hibernate. It has to wake up first!

>>> c.change('ComputerWakeUp')
<class 'state.ComputerWakeUp'>
>>> c.change('ComputerHibernate')
<class 'state.ComputerHibernate'>

All good now.

We have completed our discussion of design patterns in Python, so it is time to summarize what we've learned so far.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.84.157