Implementing approximate counters in Python

With the concept of approximate counters in mind, let's try to implement the data structure in Python, building on our previous design for the lock-based counter. Consider the following Chapter16/example4.py file—specifically, the LockedCounter class and the ApproximateCounter class:

# Chapter16/example4.py

import threading
import time

class LockedCounter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()

def increment(self, x):
with self.lock:
new_value = self.value + x
time.sleep(0.001) # creating a delay
self.value = new_value

def get_value(self):
with self.lock:
value = self.value

return value

class ApproximateCounter:
def __init__(self, global_counter):
self.value = 0
self.lock = threading.Lock()
self.global_counter = global_counter
self.threshold = 10

def increment(self, x):
with self.lock:
new_value = self.value + x
time.sleep(0.001) # creating a delay
self.value = new_value

if self.value >= self.threshold:
self.global_counter.increment(self.value)
self.value = 0

def get_value(self):
with self.lock:
value = self.value

return value

While the LockedCounter class remains the same as in our previous example (this class will be used to implement our global counter objects), the ApproximateCounter class, which contains the implementation of the approximate counter logic that we discussed previously, is of interest. A newly initialized ApproximateCounter object will be given a starting value of 0, and it will also have a lock, as it is also a lock-based data structure. The important attributes of an ApproximateCounter object are the global counter that it needs to report to and the threshold that specifies the rate at which it reports to its corresponding global counter. As mentioned previously, here, we are simply choosing 10 as an arbitrary value for the threshold.

In the increment() method of the ApproximateCounter class, we can also see the same increment logic: the method takes in a parameter named x and increments the value of the counter by x while holding the lock of the calling approximate counter object. Additionally, the method also has to check whether the newly incremented value of the counter is past its threshold; if so, it will increment the value of its global counter by an amount that is equal to the current value of the local counter, and that value of the local counter will be set back to 0. The get_value() method that is used to return the current value of the counter in this class is the same as what we saw previously.

Now, let's test and compare the scalability of the new data structure in our main program. First, we will regenerate the data for the scalability of our old single-lock counter data structure:

# Chapter16/example4.py

from concurrent.futures import ThreadPoolExecutor

# Previous single-lock counter

single_counter_n_threads = []
single_counter_times = []
for n_workers in range(1, 11):
single_counter_n_threads.append(n_workers)

counter = LockedCounter()

start = time.time()

with ThreadPoolExecutor(max_workers=n_workers) as executor:
executor.map(counter.increment,
[1 for i in range(100 * n_workers)])

single_counter_times.append(time.time() - start)

Just like in our previous example, we are using a ThreadPoolExecutor object to process tasks concurrently, in separate threads, while keeping track of the time it took for each iteration to finish; there is nothing surprising here. Next, we will generate the same data with a corresponding number of active threads in the iterations of the for loop, as follows:

# New approximate counters

def thread_increment(counter):
counter.increment(1)

approx_counter_n_threads = []
approx_counter_times = []
for n_workers in range(1, 11):
approx_counter_n_threads.append(n_workers)

global_counter = LockedCounter()

start = time.time()

local_counters = [ApproximateCounter(global_counter) for i in range(n_workers)]
with ThreadPoolExecutor(max_workers=n_workers) as executor:
for i in range(100):
executor.map(thread_increment, local_counters)

approx_counter_times.append(time.time() - start)

print(f'Number of threads: {n_workers}')
print(f'Final counter: {global_counter.get_value()}.')
print('-' * 40)

Let's take some time to analyze the preceding code. First, we have an external thread_increment() function that takes in a counter and increments it by 1; this function will be used as refactored code later on, to individually increment our local counters.

Again, we will be iterating through a for loop to analyze the performance of this new data structure with a changing number of active threads. Inside each iteration, we first initialize a LockedCounter object as our global counter, together with a list of local counters, which are instances of the ApproximateCounter class. All of them are associated with the same global counter (which was passed in the initialization method), as they need to report to the same counter.

Next, similar to what we have been doing to schedule tasks for multiple threads, we are using a context manager to create a thread pool, inside of which we will be distributing the tasks (incrementing the local counters) via a nested for loop. The reason we are looping through another for loop is to simulate the number of tasks consistent with what we implemented in the previous example, and also to distribute those tasks across all of the local counters concurrently. We are also printing out the final value of the global counter in each iteration, to ensure that our new data structure is working correctly.

Finally, in our main program, we will be plotting the data points that are generated from the two for loops, to compare the scalability of the two data structures via their respective performances:

# Chapter16/example4.py
import matplotlib.pyplot as plt

# Plotting

single_counter_line, = plt.plot(
single_counter_n_threads,
single_counter_times,
c = 'blue',
label = 'Single counter'
)
approx_counter_line, = plt.plot(
approx_counter_n_threads,
approx_counter_times,
c = 'red',
label = 'Approximate counter'
)
plt.legend(handles=[single_counter_line, approx_counter_line], loc=2)
plt.xlabel('Number of threads'); plt.ylabel('Time in seconds')
plt.show()

Run the script, and the first output that you will receive will include the individual final values of the global counters in our second for loop, as follows:

> python3 example4.py
Number of threads: 1
Final counter: 100.
----------------------------------------
Number of threads: 2
Final counter: 200.
----------------------------------------
Number of threads: 3
Final counter: 300.
----------------------------------------
Number of threads: 4
Final counter: 400.
----------------------------------------
Number of threads: 5
Final counter: 500.
----------------------------------------
Number of threads: 6
Final counter: 600.
----------------------------------------
Number of threads: 7
Final counter: 700.
----------------------------------------
Number of threads: 8
Final counter: 800.
----------------------------------------
Number of threads: 9
Final counter: 900.
----------------------------------------
Number of threads: 10
Final counter: 1000.
----------------------------------------

As you can see, the final values that we obtained from the global counters are all correct, proving that our data structure is working as intended. Additionally, you will obtain a graph similar to the following:

Scalability of single-lock counter and approximate counters

The blue line indicates the changes in speed of the single-lock counter data structure, while the red line indicates those of the approximate counter data structure. As you can see, even though the performance of the approximate counter does worsen somewhat as the number of threads increases (due to overheads such as creating individual local counters and distributing an increasing number of increment tasks), our new data structure is highly scalable, especially in comparison to our previous single-lock counter data structure.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.201.217