Let's start our discussion of multi-threading in Python with the example of a program used to generate thumbnails of image URLs.
In the example, we are using Pillow, a fork of the Python Imaging Library (PIL) to perform this operation:
# thumbnail_converter.py from PIL import Image import urllib.request def thumbnail_image(url, size=(64, 64), format='.png'): """ Save thumbnail of an image URL """ im = Image.open(urllib.request.urlopen(url)) # filename is last part of the URL minus extension + '.format' pieces = url.split('/') filename = ''.join((pieces[-2],'_',pieces[-1].split('.')[0],'_thumb',format)) im.thumbnail(size, Image.ANTIALIAS) im.save(filename) print('Saved',filename)
The preceding code works very well for single URLs.
Let's say we want to convert five image URLs to their thumbnails:
img_urls = ['https://dummyimage.com/256x256/000/fff.jpg', 'https://dummyimage.com/320x240/fff/00.jpg', 'https://dummyimage.com/640x480/ccc/aaa.jpg', 'https://dummyimage.com/128x128/ddd/eee.jpg', 'https://dummyimage.com/720x720/111/222.jpg'] for url in img_urls: thumbnail_image(urls)
Let's see how such a function performs with respect to time taken in the following screenshot:
The function took approximately 1.7 seconds per URL.
Let's now scale the program to multiple threads so we can perform the conversions concurrently. Here is the rewritten code to run each conversion in its own thread:
import threading for url in img_urls: t=threading.Thread(target=thumbnail_image,args=(url,)) t.start()
The timing that this last program now gives is shown in this screenshot:
With this change, the program returns in 1.76 seconds, almost equal to the time taken by a single URL in serial execution before. In other words, the program has now linearly scaled with respect to the number of threads. Note that, we had to make no change to the function itself to get this scalability boost.
In the previous example, we saw a set of image URLs being processed by a thumbnail generator function concurrently by using multiple threads. With the use of multiple threads, we were able to achieve near linear scalability as compared to serial execution.
However, in real life, rather than processing a fixed list of URLs, it is more common for the URL data to be produced by some kind of URL producer. It could be fetching this data from a database, a comma separated value (CSV) file or from a TCP socket for example.
In such a scenario, creating one thread per URL would be a tremendous waste of resources. It takes a certain overhead to create a thread in the system. We need some way to reuse the threads we create.
For such systems that involve a certain set of threads producing data and another set of threads consuming or processing data, the producer/consumer model is an ideal fit. Such a system has the following features:
Queue
class in the aptly named queue
module.We have rewritten our thumbnail generator to a producer consumer architecture. The resulting code is given next. Since this is a bit detailed, we will discuss each class one by one.
First, let's look at the imports—these are pretty self-explanatory:
# thumbnail_pc.py import threading import time import string import random import urllib.request from PIL import Image from queue import Queue
Next is the code for the producer class:
class ThumbnailURL_Generator(threading.Thread): """ Worker class that generates image URLs """ def __init__(self, queue, sleep_time=1,): self.sleep_time = sleep_time self.queue = queue # A flag for stopping self.flag = True # choice of sizes self._sizes = (240,320,360,480,600,720) # URL scheme self.url_template = 'https://dummyimage.com/%s/%s/%s.jpg' threading.Thread.__init__(self, name='producer') def __str__(self): return 'Producer' def get_size(self): return '%dx%d' % (random.choice(self._sizes), random.choice(self._sizes)) def get_color(self): return ''.join(random.sample(string.hexdigits[:-6], 3)) def run(self): """ Main thread function """ while self.flag: # generate image URLs of random sizes and fg/bg colors url = self.url_template % (self.get_size(), self.get_color(), self.get_color()) # Add to queue print(self,'Put',url) self.queue.put(url) time.sleep(self.sleep_time) def stop(self): """ Stop the thread """ self.flag = False
Let's analyze the producer class code:
ThumbnailURL_Generator
. It generates the URLs (by using the service of a website named http://dummyimage.com) of different sizes, foreground, and background colors. It inherits from the threading.Thread
class.run
method, which goes in a loop, generates a random image URL, and pushes it to the shared queue. Every time, the thread sleeps for a fixed time, as configured by the sleep_time
parameter.stop
method, which sets the internal flag to False
causing the loop to break and the thread to finish its processing. This can be called externally by another thread, typically, the main thread.Now we have the URL consumer class that consumes the thumbnail URLs and creates the thumbnails:
class ThumbnailURL_Consumer(threading.Thread): """ Worker class that consumes URLs and generates thumbnails """ def __init__(self, queue): self.queue = queue self.flag = True threading.Thread.__init__(self, name='consumer') def __str__(self): return 'Consumer' def thumbnail_image(self, url, size=(64,64), format='.png'): """ Save image thumbnails, given a URL """ im=Image.open(urllib.request.urlopen(url)) # filename is last part of URL minus extension + '.format' filename = url.split('/')[-1].split('.')[0] + '_thumb' + format im.thumbnail(size, Image.ANTIALIAS) im.save(filename) print(self,'Saved',filename) def run(self): """ Main thread function """ while self.flag: url = self.queue.get() print(self,'Got',url) self.thumbnail_image(url) def stop(self): """ Stop the thread """ self.flag = False
Here's the analysis of the consumer class:
ThumbnailURL_Consumer
, as it consumes URLs from the queue, and creates thumbnail images of them.run
method of this class goes in a loop, gets a URL from the queue, and converts it to a thumbnail by passing it to the thumbnail_image
method. (Note that this code is exactly the same as that of the thumbnail_image
function we created earlier.)stop
method is very similar, checking for a stop flag every time in the loop, and ending once the flag has been unset.Here is the main part of the code—setting up a couple of producers and consumers each, and running them:
q = Queue(maxsize=200) producers, consumers = [], [] for i in range(2): t = ThumbnailURL_Generator(q) producers.append(t) t.start() for i in range(2): t = ThumbnailURL_Consumer(q) consumers.append(t) t.start()
Here is a screenshot of the program in action:
In the above program, since the producers keep generating random data without any end, the consumers will keep consuming it without any end. Our program has no proper end condition.
Hence, this program will keep running until the network requests are denied or timed out or the disk space of the machine runs out because of thumbnails.
However, a program solving a real world problem should end in some way that is predictable.
This could be due to a number of external constraints
get
method of the queue.In the following section, we will see how to enforce such resource limits by using threading synchronization primitives such as Locks and Semaphores.
You may have observed that we start a thread using its start
method, though the overridden method in the Thread subclass is run
. This is because, in the parent Thread
class, the start
method sets up some state, and then calls the run
method internally. This is the right way to call the thread's run method. It should never be called directly.
In this section, we will see how to modify the program using a Lock
, a synchronization primitive to implement a counter that will limit the number of images created as a way to end the program.
Lock objects in Python allows exclusive access by threads to a shared resource.
The pseudo-code would be as follows:
try: lock.acquire() # Do some modification on a shared, mutable resource mutable_object.modify() finally: lock.release()
However, Lock objects support context-managers via the with statement, so this is more commonly written as follows:
with lock: mutable_object.modify()
To implement a fixed number of images per run, our code needs to be supported to add a counter. However, since multiple threads would check and increment this counter, it needs to be synchronized via a Lock
object.
This is our first implementation of the resource counter class using Locks.
class ThumbnailImageSaver(object): """ Class which saves URLs to thumbnail images and keeps a counter """ def __init__(self, limit=10): self.limit = limit self.lock = threading.Lock() self.counter = {} def thumbnail_image(self, url, size=(64,64), format='.png'): """ Save image thumbnails, given a URL """ im=Image.open(urllib.request.urlopen(url)) # filename is last two parts of URL minus extension + '.format' pieces = url.split('/') filename = ''.join((pieces[-2],'_',pieces[-1].split('.')[0],'_thumb',format)) im.thumbnail(size, Image.ANTIALIAS) im.save(filename) print('Saved',filename) self.counter[filename] = 1 return True def save(self, url): """ Save a URL as thumbnail """ with self.lock: if len(self.counter)>=self.limit: return False self.thumbnail_image(url) print('Count=>',len(self.counter)) return True
Since this modifies the consumer class as well, it makes sense to discuss both changes together. Here is the modified consumer class to accommodate the extra counter needed to keep track of the images:
class ThumbnailURL_Consumer(threading.Thread): """ Worker class that consumes URLs and generates thumbnails """ def __init__(self, queue, saver): self.queue = queue self.flag = True self.saver = saver # Internal id self._id = uuid.uuid4().hex threading.Thread.__init__(self, name='Consumer-'+ self._id) def __str__(self): return 'Consumer-' + self._id def run(self): """ Main thread function """ while self.flag: url = self.queue.get() print(self,'Got',url) if not self.saver.save(url): # Limit reached, break out print(self, 'Set limit reached, quitting') break def stop(self): """ Stop the thread """ self.flag = False
Let's analyze both of these classes. First, we'll look at the new class, ThumbnailImageSaver
:
object
. In other words, it is not a Thread
. It is not meant to be one.limit
parameter equal to the number of images it should save.thumbnail_image
method moves to here from the consumer class. It is called from a save
method, which encloses the call in a synchronized context using the lock.save
method first checks if the count has crossed the configured limit; when this happens, the method returns False
. Otherwise, the image is saved with a call to thumbnail_image
, and the image filename is added to the counter, effectively incrementing the count.Next, we'll consider the modified ThumbnailURL_Consumer
class:
ThumbnailImageSaver
as a saver
argument. The rest of the arguments remain the same.thumbnail_image
method no longer exists in this class, as it is moved to the new class.run
method is much simplified. It makes a call to the save
method of the saver instance. If it returns False
, it means the limit has been reached, the loop breaks, and the consumer thread exits.__str__
method to return a unique ID per thread, which is set in the initializer using the uuid
module. This helps to debug threads in a real-life example.The calling code also changes a bit, as it needs to set up the new object, and configure the consumer threads with it:
q = Queue(maxsize=2000) # Create an instance of the saver object saver = ThumbnailImageSaver(limit=100) producers, consumers = [], [] for i in range(3): t = ThumbnailURL_Generator(q) producers.append(t) t.start() for i in range(5): t = ThumbnailURL_Consumer(q, saver) consumers.append(t) t.start() for t in consumers: t.join() print('Joined', t, flush=True) # To make sure producers don't block on a full queue while not q.empty(): item=q.get() for t in producers: t.stop() print('Stopped',t, flush=True) print('Total number of PNG images',len(glob.glob('*.png')))
The following are the main points to be noted here:
ThumbnailImageSaver
class, and pass it on to the consumer threads when creating them.stop
, but join
on them. This is because the consumers exit automatically when the limit is reached, so the main thread should just wait for them to stop.We use a dictionary instead of an integer as because of the nature of the data.
Since the images are randomly generated, there is a minor chance of one image URL being the same as another one created previously, causing the filenames to clash. Using a dictionary takes care of such possible duplicates.
The following screenshot shows a run of the program with a limit of 100 images. Note that we can only show the last few lines of the console log, since it produces a lot of output:
You can configure this program with any limit of the images, and it will always fetch exactly the same count—nothing more or less.
In the next section, we will familiarize ourselves with another synchronization primitive, namely semaphore, and learn how to implement a resource limiting class in a similar way using the semaphore.
Locks aren't the only way to implement synchronization constraints and write logic on top of them in order to limit resources used/generated by a system.
A semaphore, one of the oldest synchronization primitives in computer science, is ideally suited for such use cases.
A semaphore is initialized with a value greater than zero:
acquire
on a semaphore that has a positive internal value, the value gets decremented by one, and the thread continues on its way.release
on the semaphore, the value is incremented by 1.acquire
once the value has reached zero is blocked on the semaphore until it is woken up by another thread calling release
.Due to this behavior, a semaphore is perfectly suited for implementing a fixed limit on shared resources.
In the following code example, we will implement another class for resource limiting our thumbnail generator program, this time using a semaphore:
class ThumbnailImageSemaSaver(object): """ Class which keeps an exact counter of saved images and restricts the total count using a semaphore """ def __init__(self, limit = 10): self.limit = limit self.counter = threading.BoundedSemaphore(value=limit) self.count = 0 def acquire(self): # Acquire counter, if limit is exhausted, it # returns False return self.counter.acquire(blocking=False) def release(self): # Release counter, incrementing count return self.counter.release() def thumbnail_image(self, url, size=(64,64), format='.png'): """ Save image thumbnails, given a URL """ im=Image.open(urllib.request.urlopen(url)) # filename is last two parts of URL minus extension + '.format' pieces = url.split('/') filename = ''.join((pieces[-2],'_',pieces[-1].split('.')[0],format)) try: im.thumbnail(size, Image.ANTIALIAS) im.save(filename) print('Saved',filename) self.count += 1 except Exception as e: print('Error saving URL',url,e) # Image can't be counted, increment semaphore self.release() return True def save(self, url): """ Save a URL as thumbnail """ if self.acquire(): self.thumbnail_image(url) return True else: print('Semaphore limit reached, returning False') return False
Since the new semaphore-based class keeps the exact same interface as the previous lock-based class—with a save method—there is no need to change any code on the consumer!
Only the calling code needs to be changed.
This line in the previous code initialized the ThumbnailImageSaver
instance:
saver = ThumbnailImageSaver(limit=100)
The preceding line needs to be replaced with the following one:
saver = ThumbnailImageSemaSaver(limit=100)
The rest of the code remains exactly the same.
Let's quickly discuss the new class using the semaphore before seeing this code in action:
acquire
and release
methods are simple wrappers over the same methods on the semaphore.acquire
method. If the semaphore's limit is reached, it will return False
. Otherwise, the thread saves the image and returns True
. In the former case, the calling thread quits.This class behaves in a way similar way to the previous one, and limits resources exactly. The following is an example with a limit of 200 images:
We saw two competing versions of implementing a fixed resource constraint in the previous two examples—one using Lock
and another using Semaphore
.
The differences between the two versions are as follows:
Hence, the effect is that the semaphore version would be faster than the version using Lock.
How much faster? The following timing example for a run of 100 images gives an idea.
This screenshot shows the time it takes for the Lock version to save 100 images:
The following screenshot shows the time for the semaphore version to save a similar number:
By a quick calculation you can see that the semaphore version is about 4 times faster than the lock version for the same logic. In other words, it scales 4 times better.
In this section, we will briefly see the application of another important synchronization primitive in threading, namely the Condition
object.
First, we will get a real life example of using a Condition
object. We will implement a throttler for our thumbnail generator to manage the rate of URL generation.
In the producer/consumer systems in real life, the following three kinds of scenario can occur with respect to the rate of data production and consumption:
There are many ways to solve this problem. Some of them are as follows:
In the following example, we will implement the last approach—to limit the production rate of URLs to a fixed limit using Condition
objects.
A Condition
object is a sophisticated synchronization primitive that comes with an implicit built-in lock. It can wait on an arbitrary condition till it becomes true. The moment the thread calls wait
on the condition, the internal lock is released, but the thread itself becomes blocked:
cond = threading.Condition() # In thread #1 with cond: while not some_condition_is_satisfied(): # this thread is now blocked cond.wait()
Now, another thread can wake up this preceding thread by setting the condition to True, and then calling notify
or notify_all
on the condition object. At this point, the preceding blocked thread is woken up, and continues on its way:
# In thread #2 with cond: # Condition is satisfied if some_condition_is_satisfied(): # Notify all threads waiting on the condition cond.notify_all()
Here is our new class namely ThumbnailURLController
which implements the rate control of URL production using a condition object.
class ThumbnailURLController(threading.Thread): """ A rate limiting controller thread for URLs using conditions """ def __init__(self, rate_limit=0, nthreads=0): # Configured rate limit self.rate_limit = rate_limit # Number of producer threads self.nthreads = nthreads self.count = 0 self.start_t = time.time() self.flag = True self.cond = threading.Condition() threading.Thread.__init__(self) def increment(self): # Increment count of URLs self.count += 1 def calc_rate(self): rate = 60.0*self.count/(time.time() - self.start_t) return rate def run(self): while self.flag: rate = self.calc_rate() if rate<=self.rate_limit: with self.cond: # print('Notifying all...') self.cond.notify_all() def stop(self): self.flag = False def throttle(self, thread): """ Throttle threads to manage rate """ # Current total rate rate = self.calc_rate() print('Current Rate',rate) # If rate > limit, add more sleep time to thread diff = abs(rate - self.rate_limit) sleep_diff = diff/(self.nthreads*60.0) if rate>self.rate_limit: # Adjust threads sleep_time thread.sleep_time += sleep_diff # Hold this thread till rate settles down with a 5% error with self.cond: print('Controller, rate is high, sleep more by',rate,sleep_diff) while self.calc_rate() > self.rate_limit: self.cond.wait() elif rate<self.rate_limit: print('Controller, rate is low, sleep less by',rate,sleep_diff) # Decrease sleep time sleep_time = thread.sleep_time sleep_time -= sleep_diff # If this goes off < zero, make it zero thread.sleep_time = max(0, sleep_time)
Let's discuss the preceding code before we discuss the changes in the producer class that will make use of this class:
Thread
, so it runs in its own thread of execution. It also holds a Condition
object.calc_rate
method, which calculates the rate of generation of URLs by keeping a counter and using timestamps.run
method, the rate is checked. If it's below the configured limit, the condition object notifies all threads waiting on it.throttle
method. This method uses the current rate, calculated via calc_rate
, and uses it to throttle and adjust the sleep times of the producers. It mainly does these two things:Here is the code of the producer class to incorporate the changes:
class ThumbnailURL_Generator(threading.Thread): """ Worker class that generates image URLs and supports throttling via an external controller """ def __init__(self, queue, controller=None, sleep_time=1): self.sleep_time = sleep_time self.queue = queue # A flag for stopping self.flag = True # sizes self._sizes = (240,320,360,480,600,720) # URL scheme self.url_template = 'https://dummyimage.com/%s/%s/%s.jpg' # Rate controller self.controller = controller # Internal id self._id = uuid.uuid4().hex threading.Thread.__init__(self, name='Producer-'+ self._id) def __str__(self): return 'Producer-'+self._id def get_size(self): return '%dx%d' % (random.choice(self._sizes), random.choice(self._sizes)) def get_color(self): return ''.join(random.sample(string.hexdigits[:-6], 3)) def run(self): """ Main thread function """ while self.flag: # generate image URLs of random sizes and fg/bg colors url = self.url_template % (self.get_size(), self.get_color(), self.get_color()) # Add to queue print(self,'Put',url) self.queue.put(url) self.controller.increment() # Throttle after putting a few images if self.controller.count>5: self.controller.throttle(self) time.sleep(self.sleep_time) def stop(self): """ Stop the thread """ self.flag = False
Let's see how the preceding code works:
throttle
on the controller, passing itself as the argument.The calling code also needs quite a few changes. The modified code is shown as follows:
q = Queue(maxsize=2000) # The controller needs to be configured with exact number of # producers controller = ThumbnailURLController(rate_limit=50, nthreads=3) saver = ThumbnailImageSemaSaver(limit=200) controller.start() producers, consumers = [], [] for i in range(3): t = ThumbnailURL_Generator(q, controller) producers.append(t) t.start() for i in range(5): t = ThumbnailURL_Consumer(q, saver) consumers.append(t) t.start() for t in consumers: t.join() print('Joined', t, flush=True) # To make sure producers dont block on a full queue while not q.empty(): item=q.get() controller.stop() for t in producers: t.stop() print('Stopped',t, flush=True) print('Total number of PNG images',len(glob.glob('*.png')))
The main changes here are the ones listed next:
Here is a run of the program configured with 200 images at the rate of 50 images per minute. We show two images of the running program's output, one at the beginning of the program and one towards the end.
You will find that, when the program starts, it almost immediately slows down, and nearly comes to a halt, since the original rate is high. What happens here is that the producers call on the throttle
method, and since the rate is high, they all get blocked on the condition object.
After a few seconds, the rate comes down to the prescribed limit, since no URLs are generated. This is detected by the controller in its loop, and it calls notify_all
on the threads, waking them up.
After a while you will see that the rate is getting settled around the set limit of 50 URLs per minute.
Towards the end of the program, you will see that the rate has almost settled to the exact limit:
We are coming towards the end of our discussion on threading primitives and how to use them in improving the concurrency of your programs and in implementing shared resource constraints and controls.
Before we conclude, we will look at an aspect of Python threads which prevents multi-threaded programs from making full use of the CPU in Python—namely the GIL or Global Interpreter Lock.
3.135.246.245