Threaded crawler

Now we will extend the sequential crawler to download the web pages in parallel. Note that if misused, a threaded crawler could request content too fast and overload a web server or cause your IP address to be blocked. To avoid this, our crawlers will have a delay flag to set the minimum number of seconds between requests to the same domain.

The Alexa list example used in this chapter covers 1 million separate domains, so this problem does not apply here. However, a delay of at least one second between downloads should be considered when crawling many web pages from a single domain in future.

How threads and processes work

Here is a diagram of a process containing multiple threads of execution:

How threads and processes work

When a Python script or other computer program is run, a process is created containing the code and state. These processes are executed by the CPU(s) of a computer. However, each CPU can only execute a single process at a time and will quickly switch between them to give the impression that multiple programs are running simultaneously. Similarly, within a process, the program execution can switch between multiple threads, with each thread executing different parts of the program. This means that when one thread is waiting for a web page to download, the process can switch and execute another thread to avoid wasting CPU time. So, using all the resources of our computer to download data as fast as possible requires distributing our downloads across multiple threads and processes.

Implementation

Fortunately, Python makes threading relatively straightforward. This means we can keep a similar queuing structure to the link crawler developed in Chapter 1, Introduction to Web Scraping, but start the crawl loop in multiple threads to download these links in parallel. Here is a modified version of the start of the link crawler with the crawl loop moved into a function:

import time
import threading
from downloader import Downloader
SLEEP_TIME = 1

def threaded_crawler(..., max_threads=10):
    # the queue of URL's that still need to be crawled
    crawl_queue = [seed_url]
    # the URL's that have been seen 
    seen = set([seed_url])
    D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                html = D(url)
                ...

Here is the remainder of the threaded_crawler function to start process_queue in multiple threads and wait until they have completed:

    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                # remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            # set daemon so main thread can exit when receives ctrl-c
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)

        # all threads have been processed
        # sleep temporarily so CPU can focus execution elsewhere
        time.sleep(SLEEP_TIME))

The loop in the preceding code will keep creating threads while there are URLs to crawl until it reaches the maximum number of threads set. During the crawl, threads may also prematurely shut down when there are currently no more URLs in the queue. For example, consider a situation when there are two threads and two URLs to download. When the first thread finishes its download, the crawl queue is empty so this thread exits. However, the second thread may then complete its download and discover additional URLs to download. The thread loop will then notice that there are still more URLs to download and the maximum number of threads has not been reached so create a new download thread.

The interface to process_link_crawler is still the same as the threaded crawler and is available at https://bitbucket.org/wswp/code/src/tip/chapter04/process_test.py. Now, let's test the performance of this multiprocess version of the link crawler with the following command:

$ time python threaded_test.py 5
...
4m50.465s

Since there are five threads, downloading is approximately five times faster! Further analysis of the threaded performance will be covered in the Performance section.

Cross-process crawler

To improve the performance further, the threaded example can be extended to support multiple processes. Currently, the crawl queue is held in local memory, which means other processes cannot contribute to the same crawl. To address this, the crawl queue will be transferred to MongoDB. Storing the queue independently means that even crawlers on separate servers could collaborate on the same crawl.

Note that for more robust queuing, a dedicated message passing tool such as Celery should be considered; however, MongoDB will be reused here to minimize the number of technologies introduced. Here is an implementation of the new MongoDB-backed queue:

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MongoQueue:
    # possible states of a download
    OUTSTANDING, PROCESSING, COMPLETE = range(3)

    def __init__(self, client=None, timeout=300):
        self.client = MongoClient() if client is None else client
        self.db = self.client.cache
        self.timeout = timeout

    def __nonzero__(self):
        """Returns True if there are more jobs to process
        """
        record = self.db.crawl_queue.find_one(
            {'status': {'$ne': self.COMPLETE}}
        )
        return True if record else False

    def push(self, url):
        """Add new URL to queue if does not exist
        """
        try:
            self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            pass # this is already in the queue

    def pop(self):
        """Get an outstanding URL from the queue and set its status to processing. If the queue is empty a KeyError exception is raised.
        """
        record = self.db.crawl_queue.find_and_modify(
            query={'status': self.OUTSTANDING},
            update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def complete(self, url):
        self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

    def repair(self):
        """Release stalled jobs
        """
        record = self.db.crawl_queue.find_and_modify(
            query={
                'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
                'status': {'$ne': self.COMPLETE}
            },
            update={'$set': {'status': self.OUTSTANDING}}
        )
        if record:
            print 'Released:', record['_id']

The queue in the preceding code defines three states: OUTSTANDING, PROCESSING, and COMPLETE. When a new URL is added, the state is OUTSTANDING, and when a URL is popped from the queue for downloading, the state is PROCESSING. Also, when the downloading is complete, the state is COMPLETE. Much of this implementation is concerned with how to handle when a URL is popped from the queue but the processing is never completed, for example, if the process that was handling the popped URL was terminated. To avoid losing the results of those URLs, this class takes a timeout argument, which is set to 300 seconds by default. In the repair() method, if the processing of a URL is found to take longer than this timeout, we assume that there has been an error and the URL state is returned to OUTSTANDING to be processed again.

Some minor changes are required to the threaded crawler to support this new queue type, which are highlighted here:

def threaded_crawler(...):
    ...
    # the queue of URL's that still need to be crawled
    crawl_queue = MongoQueue()
    crawl_queue.push(seed_url)

    def process_queue():
        while True:
            # keep track that are processing url
            try:
                url = crawl_queue.pop()
            except KeyError:
                # currently no urls to process
                break
           else:
                ...
                crawl_queue.complete(url)

The first change is replacing Python's built-in queue with the new MongoDB-based queue, named MongoQueue. This queue handles duplicate URLs internally, so the seen variable is no longer required. Finally, the complete() method is called after processing a URL to record that it has been successfully parsed.

This updated version of the threaded crawler can then be started in multiple processes with this snippet:

import multiprocessing

def process_link_crawler(args, **kwargs):
    num_cpus = multiprocessing.cpu_count()
    processes = []
    for i in range(num_cpus):
        p = multiprocessing.Process(target=threaded_crawler, 
            args=[args], kwargs=kwargs)
        p.start()
        processes.append(p)
    # wait for processes to complete
    for p in processes:
        p.join()

This structure might look familiar now because the multiprocessing module follows a similar interface to the threading module used earlier. This code simply finds the number of CPUs available, starts the threaded crawler in a new process for each, and then waits for all the processes to complete execution.

Now, let's test the performance of this multiprocess version of the link crawler with using the following command. The interface to process_link_crawler is still the same as the threaded crawler and is available at https://bitbucket.org/wswp/code/src/tip/chapter04/process_test.py:

$ time python process_test.py 5
Starting 2 processes
...
2m5.405s

As detected by the script, the server on which this was tested has two CPUs, and the running time is approximately double that of the previous threaded crawler on a single process. In the next section, we will further investigate the relative performance of these three approaches.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.182.50