We saw the power of the simple print
statement in the previous example. In a similar way, other simple techniques can be used to debug programs without requiring to resort to a debugger.
Debugging can be thought of as a step-wise process of exclusion until the programmer arrives at the truth—the cause of the bug. It essentially involves the following steps:
In this section, we will look at some simple debugging techniques one by one using examples. We will start with the example of a word searcher program that looks for lines containing a specific word in a list of files—and appends and returns the lines in a list.
Here is the listing of the code for the word searcher program:
import os import glob def grep_word(word, filenames): """ Open the given files and look for a specific word. Append lines containing word to a list and return it """ lines, words = [], [] for filename in filenames: print('Processing',filename) lines += open(filename).readlines() word = word.lower() for line in lines: if word in line.lower(): lines.append(line.strip()) # Now sort the list according to length of lines return sorted(words, key=len)
You may have noticed a subtle bug in the preceding code—it appends to the wrong list. It reads from the list "lines," and appends to the same list, which will cause the list to grow forever; the program will go into an infinite loop when it encounters even a single line containing the given word.
Let's run the program on the current directory:
>>> parse_filename('lines', glob.glob('*.py')) (hangs)
On any day, you may find this bug easily. On a bad day, you may be stuck on this for a while, not noticing that the same list being read from is being appended to.
Here are a few things that you can do:
sys.exit
function, which will cause the interpreter to exit at that point.sys.exit
can never be missed of course.The code is rewritten as follows to insert a specific sys.exit(…)
call between the two loops:
import os import glob def grep_word(word, filenames): """ Open the given files and look for a specific word. Append lines containing word to a list and return it """ lines, words = [], [] for filename in filenames: print('Processing',filename) lines += open(filename).readlines() sys.exit('Exiting after first loop') word = word.lower() for line in lines: if word in line.lower(): lines.append(line.strip()) # Now sort the list according to length of lines return sorted(words, key=len)
When trying it out a second time, we get this output:
>>> grep_word('lines', glob.glob('*.py')) Exiting after first loop
Now it's pretty clear that the problem is not in the first loop. You can now proceed to debug the second loop (we are assuming that you are totally blind to the wrong variable usage, so you are figuring out the issue the hard way, by debugging).
Whenever you suspect a block of code inside a loop to be causing a bug, there are a few tricks to debug this, and confirm your suspicion. These include the following:
continue
statement until you identify the specific block of code that is causing the issue.if 0:
. This is more useful if the block is a line of code or a few lines of code.In this case, we will use the first trick to figure out the issue. Here is the modified code:
def grep_word(word, filenames): """ Open the given files and look for a specific word. Append lines containing word to a list and return it """ lines, words = [], [] for filename in filenames: print('Processing',filename) lines += open(filename).readlines() # Debugging steps # 1. sys.exit # sys.exit('Exiting after first loop') word = word.lower() for line in lines: if word in line.lower(): words.append(line.strip()) continue # Now sort the list according to length of lines return sorted(words, key=len) >>> grep_word('lines', glob.glob('*.py')) []
Now the code executes, making it pretty clear that the problem is in the processing step. Hopefully, from there it is just one step to figure out the bug, as the programmer has finally got his eye on the line causing the issue by way of the process of debugging.
We have spent some time figuring out issues in the program by following a couple of debugging steps documented in the previous sections. With this, our hypothetical programmer was able to find the issue in the code and solve it.
Here is the final code with the bug fixed:
def grep_word(word, filenames): """ Open the given files and look for a specific word. Append lines containing word to a list and return it """ lines, words = [], [] for filename in filenames: print('Processing',filename) lines += open(filename).readlines() word = word.lower() for line in lines: if word in line.lower(): words.append(line.strip()) # Now sort the list according to length of lines return sorted(words, key=len)
The output is as follows:
>>> grep_word('lines', glob.glob('*.py')) ['for line in lines:', 'lines, words = [], []', '#lines.append(line.strip())', 'lines += open(filename).readlines()', 'Append lines containing word to a list and', 'and return list of lines containing the word.', '# Now sort the list according to length of lines', "print('Lines => ', grep_word('lines', glob.glob('*.py')))"]
Let's summarize the simple debugging tricks that we've learned so far in this section, and also look at a few related tricks and techniques.
A programmer can skip code blocks that they suspect of causing a bug during debugging. If the block is inside a loop, this can be done by skipping execution with a continue
statement. We've seen an example of this already.
If the block is outside of a loop, this can be done by using if 0
, and moving the suspect code to the dependent block, as follows:
if 0:# Suspected code block perform_suspect_operation1(args1, args2, ...) perform_suspect_operation2(…)
If the bug disappears after this, then you're sure that the problem lies in the suspected block of code.
This trick has its own deficiency, in that it requires indenting large blocks of code to the right, which once the debugging is finished, should be indented back. Hence it is not advised for anything more than five or six lines of code.
If you're in the middle of a hectic programming session, and you're trying to figure out an elusive bug, having already tried print statements, using the debugger, and other approaches, a rather drastic, but often fantastically useful, approach is to stop the execution just before or at the suspected code path using a function, sys.exit
expression.
A sys.exit(<strategic message>)
expression stops the program dead in its tracks, so this can't be missed by the programmer. This is often very useful in the following scenarios:
sys.exit
will allow you to pinpoint the problem. The programmer can then decide to fix the issue by correcting the input or variable processing code.When writing concurrent programs, wrong usage of resource locking or other issues can make it difficult to track bugs like deadlocks, race conditions, and others. Since debugging multithreaded or multiple process programs via the debugger is very difficult, a simple technique is to put sys.exit
in the suspect function after implementing the correct exception-handling code.
sys.exit(<message>)
line from one line of code to the next until you identify the problem can be used as a last resort.In cases where you suspect the problem is not inside your function, but in a function that you are calling from your code, this approach can be used.
Since the function is outside of your control, you can try and replace it with a wrapper function in a module where you have control.
For example, the following is generic code for processing serial JSON data. Let's assume that the programmer finds a bug with processing of certain data (maybe having a certain key-value pair), and suspects the external API to be the source of the bug. The bug may be that the API times out, returns a corrupt response, or in the worst case, causes a crash:
import external_api def process_data(data): """ Process data using external API """ # Clean up data—local function data = clean_up(data) # Drop duplicates from data—local function data = drop_duplicates(data) # Process line by line JSON for json_elem in data: # Bug ? external_api.process(json_elem)
One way to verify this is to dummy or fake the API for the specific ranges or values of the data. In this case, it can be done by creating a wrapper function as follows:
def process(json_data, skey='suspect_key',svalue='suspect_value'): """ Fake the external API except for the suspect key & value """ # Assume each JSON element maps to a Python dictionary for json_elem in json_data: skip = False for key in json_elem: if key == skey: if json_elem[key] == svalue: # Suspect key,value combination - dont process # this JSON element skip = True break # Pass on to the API if not skip: external_api.process(json_elem) def process_data(data): """ Process data using external API """ # Clean up data—local function data = clean_up(data) # Drop duplicates from data—local function data = drop_duplicates(data) # Process line by line JSON using local wrapper process(data)
If your suspicion is indeed correct, this will cause the problem to disappear. You can then use this as a test code, and communicate with the stakeholders of the external API to get the problem fixed, or write code to make sure that the problem key-value pair is skipped in data sent to the API.
In modern web application programming, you are never too far away from a blocking I/O call in your program. This can be a simple URL request, a slightly involved external API request, or maybe a costly database query and such calls can be the sources of bugs.
You may find either of the following situations:
When you encounter problems with costly I/O, replicating them can often be a problem. This is because of the following reasons:
A common technique that is very useful in these cases is to save the return data of these APIs/functions, and then mock the functions by using their return data to replace the functions/APIs themselves. This is an approach similar to mock testing, but it is used in the context of debugging.
Let's look at an example of an API that returns business listings on websites, given a business address including details like its name, street address, city, and so on. The code looks like this:
import config search_api = 'http://api.%(site)s/listings/search' def get_api_key(site): """ Return API key for a site """ # Assumes the configuration is available via a config module return config.get_key(site) def api_search(address, site='yellowpages.com'): """ API to search for a given business address on a site and return results """ req_params = {} req_params.update({ 'key': get_api_key(site), 'term': address['name'], 'searchloc': '{0}, {1}, {1}'.format(address['street'], address['city'], address['state'])}) return requests.post(search_api % locals(), params=req_params) def parse_listings(addresses, sites): """ Given a list of addresses, fetch their listings for a given set of sites, process them """ for site in sites: for address in addresses: listing = api_search(address, site) # Process the listing process_listing(listing, site) def process_listings(listing, site): """ Process a listing and analzye it """ # Some heavy computational code # whose details we are not interested.
Note that in this last piece of code, the actual work is being done in the process_listings
function, the code for which is not shown, as the example is illustrative.
Let's say you are trying to debug this function. However, due to a delay or error in the API calls, you find you are wasting a lot of valuable time in fetching the listings themselves. What are some of the techniques that you can use to avoid this dependency? Here are a few things that you can do:
api_search
function via a caching or memoize patterns so that further calls after the first call, return data from memory.We will look at each of these in turn.
In this technique, you construct a filename using unique keys from the input data. If a matching file exists on disk, it is opened and the data is returned; otherwise, the call is made and the data is written. This can be achieved by using a file caching decorator as the following code illustrates:
import hashlib import json import os def unique_key(address, site): """ Return a unique key for the given arguments """ return hashlib.md5(''.join((address['name'], address['street'], address['city'], site)).encode('utf-8')).hexdigest() def filecache(func): """ A file caching decorator """ def wrapper(*args, **kwargs): # Construct a unique cache filename filename = unique_key(args[0], args[1]) + '.data' if os.path.isfile(filename): print('=>from file<=') # Return cached data from file return json.load(open(filename)) # Else compute and write into file result = func(*args, **kwargs) json.dump(result, open(filename,'w')) return result return wrapper @filecache def api_search(address, site='yellowpages.com'): """ API to search for a given business address on a site and return results """ req_params = {} req_params.update({ 'key': get_api_key(site), 'term': address['name'], 'searchloc': '{0}, {1}, {1}'.format(address['street'], address['city'], address['state'])}) return requests.post(search_api % locals(), params=req_params)
Here's how this preceding code works:
api_search
function is decorated with filecache
as a decorator. filecache
uses unique_key
as the function to calculate the unique filename for storing the results of an API call. In this case, the unique_key
function uses the hash of a combination of the business name, street, and city, plus the site queried for in order to build the unique value.This works pretty well in most cases. Most data is loaded just once, and on further calls, returned from the file cache. However, this suffers from the problem of stale data, as once the file is created, the data is always returned from it. Meanwhile, the data on the server may have changed.
This can be solved by using an in-memory key-value store and saving the data there instead of in files on disk. One can use well-known key-value stores such as Memcached, MongoDB, or Redis for this purpose. In the following example, we'll show you how to replace the
filecache
decorator with a memory cached decorator using Redis.
In this technique, a unique in-memory cache key is constructed using unique values from the input arguments. If the cache is found on the cache store by querying using the key, its value is returned from the store; or else the call is made and the cache is written. To ensure that data is not too stale, a fixed time-to-live (TTL) is used. We use Redis as the cache store engine:
from redis import StrictRedis def memoize(func, ttl=86400): """ A memory caching decorator """ # Local redis as in-memory cache cache = StrictRedis(host='localhost', port=6379) def wrapper(*args, **kwargs): # Construct a unique key key = unique_key(args[0], args[1]) # Check if its in redis cached_data = cache.get(key) if cached_data != None: print('=>from cache<=') return json.loads(cached_data) # Else calculate and store while putting a TTL result = func(*args, **kwargs) cache.set(key, json.dumps(result), ttl) return result return wrapper
The only thing that changes in the rest of the code is that we replace the filecache
decorator with the memoize
one:
@memoize def api_search(address, site='yellowpages.com'): """ API to search for a given business address on a site and return results """ req_params = {} req_params.update({ 'key': get_api_key(site), 'term': address['name'], 'searchloc': '{0}, {1}, {1}'.format(address['street'], address['city'], address['state'])}) return requests.post(search_api % locals(), params=req_params)
The advantages of this version over the previous one are as follows:
There are a few other techniques for mocking external API calls and similar dependencies. Some of these are listed as follows:
StringIO
object in Python to read/write data, instead of using a file. For example, the filecache
or memoize
decorators can be easily modified to use a StringIO
object.127.0.0.1
IP address) by editing the system's host file, adding an entry for the host in question, and putting its IP as 127.0.0.1
. The call to localhost can always return a standard (canned) response.For example, on Linux and other POSIX systems, you can add a line like this in the /etc/hosts
file:
# Only for testing—comment out after that! 127.0.0.1 api.website.com
Another technique, which is mostly useful for performance testing and debugging, is to feed functions with data that is similar, but not the same as the original data.
Let's say, for example, that you are working on an application that works with patient/doctor data for patients under a specific insurance scheme (say Medicare/Medicaid in the US, ESI in India) to analyze and find out patterns such as common ailments, top 10 health issues in terms of government expenses, and so on.
Let's say that your application is expected to load and analyze tens of thousands of rows of patient data from a database at one time, which is expected to scale to 1-2 million under peak load. You want to debug the application and find out performance characteristics under such a load, but you don't have any real data, as the data is in the collection stage.
In such scenarios, libraries or functions that generate and return mock data are very useful. In this section, we will use a third-party Python library to accomplish this.
Let's assume that for a patient we need the following basic fields:
The schematics
library in Python provides a way to generate such data structures using simple types, which can then be validated, transformed, and also mocked.
The schematics
library is installable via pip
using the following command:
$ pip install schematics
To generate a model of a person with just their name and age is as simple as writing a class in schematics
:
from schematics import Model from schematics.types import StringType, DecimalType class Person(Model): name = StringType() age = DecimalType()
To generate mock data, a mock object is returned, and a primitive is created using this:
>>> Person.get_mock_object().to_primitive() {'age': u'12', 'name': u'Y7bnqRt'} >>> Person.get_mock_object().to_primitive() {'age': u'1', 'name': u'xyrh40EO3'}
One can create custom types using schematics
. For the Patient
model, for example, let's say that we are only interested in the age group 18-80, so we need to return age data in that range.
The following custom type does that for us:
from schematics.types import IntType class AgeType(IntType): """ An age type for schematics """ def __init__(self, **kwargs): kwargs['default'] = 18 IntType.__init__(self, **kwargs) def to_primitive(self, value, context=None): return random.randrange(18, 80)
Also, since the names returned by the schematics
library are just random strings, they have some room for improvement. The following NameType
class improves upon it by returning names containing a clever mix of vowels and consonants:
import string import random class NameType(StringType): """ A schematics custom name type """ vowels='aeiou' consonants = ''.join(set(string.ascii_lowercase) - set(vowels)) def __init__(self, **kwargs): kwargs['default'] = '' StringType.__init__(self, **kwargs) def get_name(self): """ A random name generator which generates names by clever placing of vowels and consontants """ items = ['']*4 items[0] = random.choice(self.consonants) items[2] = random.choice(self.consonants) for i in (1, 3): items[i] = random.choice(self.vowels) return ''.join(items).capitalize() def to_primitive(self, value, context=None): return self.get_name()
When combining both of these new types, our Person
class looks much better when returning mock data:
class Person(Model): name = NameType() age = AgeType()
>>> Person.get_mock_object().to_primitive() {'age': 36, 'name': 'Qixi'} >>> Person.get_mock_object().to_primitive() {'age': 58, 'name': 'Ziru'} >>> Person.get_mock_object().to_primitive() {'age': 32, 'name': 'Zanu'}
In a similar way, it is rather easy to come up with a set of custom types and standard types to satisfy all the fields required for a Patient
model:
class GenderType(BaseType): """A gender type for schematics """ def __init__(self, **kwargs): kwargs['choices'] = ['male','female'] kwargs['default'] = 'male' BaseType.__init__(self, **kwargs) class ConditionType(StringType): """ A gender type for a health condition """ def __init__(self, **kwargs): kwargs['default'] = 'cardiac' StringType.__init__(self, **kwargs) def to_primitive(self, value, context=None): return random.choice(('cardiac', 'respiratory', 'nasal', 'gynec', 'urinal', 'lungs', 'thyroid', 'tumour')) import itertools class BloodGroupType(StringType): """ A blood group type for schematics """ def __init__(self, **kwargs): kwargs['default'] = 'AB+' StringType.__init__(self, **kwargs) def to_primitive(self, value, context=None): return ''.join(random.choice(list(itertools.product(['AB','A','O','B'],['+','-']))))
Now, combining all these with some standard types and default values into a Patient
model, we get the following code:
class Patient(Model): """ A model class for patients """ name = NameType() age = AgeType() gender = GenderType() condition = ConditionType() doctor = NameType() blood_group = BloodGroupType() insured = BooleanType(default=True) last_visit = DateTimeType(default='2000-01-01T13:30:30')
Now, creating random data of any size is as easy as invoking the get_mock_object
method on the Patient
class for any number n:
patients = map(lambda x: Patient.get_mock_object().to_primitive(), range(n))
For example, to create 10,000 random sets of patient data, we use the following:
>>> patients = map(lambda x: Patient.get_mock_object().to_primitive(), range(1000))
This data can be input to the processing functions as mock data until the real data is made available.
Let's now move on from these simple tricks and techniques to something more involved, mainly configuring logging in your applications.
18.219.81.43