Threading Together Conversations

As a first attempt at threading conversations, you might start with some basic string heuristics on the Subject header of the message and eventually get to the point where you’re inspecting senders, recipients, and date stamps in an attempt to piece things together. Fortunately, mail servers are slightly more sophisticated than you might think and, as you know from mbox: The Quick and Dirty on Unix Mailboxes, there are Message-ID, In-Reply-To, and References headers that can be used to extract conversations from messages in a mailbox. A message threading algorithm commonly known as “jwz threading,”[20] takes all of this into account and provides a reasonable approach to parsing out message threads. All of the specifics for the algorithm can be found online at http://www.jwz.org/doc/threading.html. The implementation we’ll be using is a fairly straightforward modification[21] of the one found in the Mail Trends project, which provides some other useful out-of-the-box tools. Given that no checkins for the project hosted on Google Code have occurred since early 2008, it’s unclear whether Mail Trends is being actively maintained anywhere, but the project nonetheless provides a useful starting point for mail analysis, as evidenced by the salvaging of jwz threading.

Let’s go ahead and take a look at the overall workflow in Example 3-14, and then we’ll dive into a few more of the details.

Example 3-14. Creating discussion threads from mbox data via “jwz threading” (mailboxes_threading.py)

# -*- coding: utf-8 -*-

import sys
import couchdb
from mailboxes__jwzthreading import thread, Message
from mailboxes__CouchDBBulkReader import CouchDBBulkReader
from datetime import datetime as dt
from prettytable import PrettyTable

try:
    import jsonlib2 as json
except:
    import json

DB = sys.argv[1]
NUM_PROC_THREADS = 3 # Recommendation: ~1 thread/core

# Pull the data as efficient as possible from CouchDB by using a thread
# pool to get as close as possible to being I/O bound.
# A single request to _all_docs works except that it CPU bound to a single core

now = dt.now()
print >> sys.stderr, 'Bulk reading from CouchDB...'
br = CouchDBBulkReader(DB, NUM_PROC_THREADS)
docs = br.read()
print >> sys.stderr, '	%s' % (dt.now() - now, )

now = dt.now()
print >> sys.stderr, 'Threading in Memory...'
threads = thread([Message(doc) for doc in docs])
print >> sys.stderr, '	%s' % (dt.now() - now, )

# Write out threading info into another database.
# Note that writes to CouchDB are serialized to append-only
# databases, so threading is unlikely to help here, and besides,
# the average document size is very small, making this a quick operation

now = dt.now()
print >> sys.stderr, 'Bulk writing to CouchDB...'
server = couchdb.Server('http://localhost:5984')
db = server.create(DB + '-threads')
results = db.update([{'thread': thread} for thread in threads],
                    all_or_nothing=True)
print >> sys.stderr, '	%s' % (dt.now() - now, )

# Some basic stats

print >> sys.stderr, 'Total number of threads: %s' % len(threads)
print >> sys.stderr

# Compute (_id, len(thread)) tuples
# You could also compute thread length directly in CouchDB using a simple reducer function

stats = sorted(zip([result[1] for result in results], [len(t) for t in threads]),
               key=lambda x: x[1])

fields = ['Thread Id', 'Thread Length']
pt = PrettyTable(fields=fields)
[pt.set_field_align(f, 'l') for f in fields]

for stat in stats:
    pt.add_row(stat)

pt.printt()

The overall flow is that we bulk-read messages out of CouchDB, perform threading in memory, and then write out the thread information to a separate database where each thread is a document that contains references back to the original message’s content. Finally, some basic statistics about the length of discussion threads are printed out. You could just as easily use a map/reduce approach to calculating statistics about thread length (see Map/Reduce-Inspired Frequency Analysis). A discussion thread is a document that looks like Example 3-15.

Example 3-15. Sample threading results

{
   "_id": "b6d4f96224bc546acd34c405e6fff62f",
   "_rev": "1-1bf63dcdd94067ad647afe2ea3ade63c",
   "thread": [
       {
           "external_id": "24a30d62545728e26eb3311d63ae6e02",
           "subject": "FW: Sitara EOL Bridge Problem Today"
       },
       {
           "external_id": "bb808c9081912f5861295bf1d105dd02",
           "subject": "FW: Sitara EOL Bridge Problem Today"
       },
       {
           "external_id": "3b2c340c28782c8986737c35a332cd88",
           "subject": "FW: Sitara EOL Bridge Problem Today"
       }
   ]
   }

The most interesting parts of this listing are the references to mailboxes_jwzthreading and CouchDBBulkReader. The API exposed by mailboxes_jwzthreading simply converts each of the message documents fetched from CouchDB into a slightly different form and then passes them into the threading algorithm, which takes care of the core work and returns a convenient JSON format we can ingest back into CouchDB. The details of mailboxes_jwzthreading are available at http://github.com/ptwobrussell/Mining-the-Social-Web/blob/master/python_code/mailboxes__jwzthreading.py.

The use of CouchDBBulkReader could have been omitted altogether in favor of a bulk read through the couchdb module, but CouchDBBulkReader provides a significant performance boost by using an internal thread pool to dispatch multiple requests to CouchDB at the same time. The underlying issue is that CouchDB only uses a single core for a single read or a single write request to the server. This might initially strike you as odd given all of the hubbub about how Erlang—the underlying implementation language—has such extensive support for concurrency, but it’s a design decision made by the maintainers of the project[22] for sensible reasons. The good news is that you can divvy out multiple requests to the server all at the same time and heat up multiple cores fairly easily. This is the approach taken by CouchDBBulkReader, which is introduced in Example 3-16. Briefly, an initial request is made that fetches only the ID values for every document in the database (a relatively fast operation since the amount of data that ends up being marshalled is very small), and then these IDs are sorted and chunked into equal parts. Each chunk is assigned to a thread that fetches the full document for that particular range.

As a rule of thumb, don’t use more than one processing thread per core for the thread pool, and use performance-monitoring tools like top on a *nix system or Task Manager on Windows to track just how much you are taxing your system. Ideally, you’d see the beam.smp daemon process associated with CouchDB pegged at around 200% if you were working on a machine with two cores, which effectively makes you “CPU-bound,” but anything above 150% is still a substantial improvement. You should observe that the bulk read consumes nearly all of the time required to retrieve the data, while the actual threading and writing the thread documents back to CouchDB take virtually no time at all. An interesting exercise would be to consider porting the threading algorithm into a map/reduce paradigm used by the couchdb module, or even rewriting it in JavaScript.

Note

The threadpool package used in Example 3-16 is available via the usual means: easy_install threadpool.

Example 3-16. Using a thread pool to maximize read throughput from CouchDB (mailboxes__CouchDBBulkReader.py)

# -*- coding: utf-8 -*-

from datetime import datetime as dt
from math import ceil
import httplib
import urllib
import time
import threadpool
try:
    import jsonlib2 as json
except:
    import json


class CouchDBBulkReader:

    def __init__(
        self,
        db,
        num_threads,
        host='localhost',
        port=5984,
        ):

        self.db = db
        self.num_threads = num_threads
        self.host = host
        self.port = port
        self.results = []

        id_buckets = self._getDocIds()
        self.pool = threadpool.ThreadPool(self.num_threads)
        requests = threadpool.makeRequests(self._getDocs, id_buckets,
                self._callback, self._errCallback)
        [self.pool.putRequest(req) for req in requests]
        self.pool.wait()

    def read(self):
        while True:
            try:
                time.sleep(0.5)
                self.pool.poll()
            except threadpool.NoResultsPending:
                return self.results
            except KeyboardInterrupt:
                print 'Keyboard interrupt. Exiting'
                sys.exit()
            finally:
                self.pool.joinAllDismissedWorkers()

    # Called to quickly get all of the document ids which can be sorted and dibbied out

    def _getDocIds(self):

        # Helper function to tersely compute a list of indices that evenly distribute
        # the items in it

        def partition(alist, indices):
            return [alist[i:j] for (i, j) in zip([0] + indices, indices
                    + [None])][:-1]

        try:
            conn = httplib.HTTPConnection(self.host, self.port)
            conn.request('GET', '/%s/_all_docs' % (self.db, ))
            response = conn.getresponse()
            if response.status != 200:  #  OK
                print 'Unable to get docs: %s %s' % (response.status,
                        response.reason)
                sys.exit()

            ids = [i['id'] for i in json.loads(response.read())['rows']
                   if not i['id'].startswith('_')]
            ids.sort()
        finally:
            conn.close()

        partition_size = int(ceil(1.0 * len(ids) / self.num_threads))
        indices = []

        _len = len(ids)
        idx = 0
        while idx < _len:
            idx += partition_size
            indices.append(idx)

        return partition(ids, indices)

    def _getDocs(self, ids):
        try:
            (startkey, endkey) = (ids[0], ids[-1])
            conn = httplib.HTTPConnection(self.host, self.port)
            conn.request('GET',
                         '/%s/_all_docs?startkey="%s"&endkey="%s"&include_docs=true'
                          % (self.db, startkey, endkey))
            response = conn.getresponse()
            if response.status != 200:  #  OK
                print 'Unable to get docs: %s %s' % (response.status,
                        response.reason)
                sys.exit()
            return response.read()
        finally:
            conn.close()

    def _errCallback(self, request, result):
        print 'An Error occurred:', request, result
        sys.exit()

    def _callback(self, request, result):
        rows = json.loads(result)['rows']
        self.results.extend([row['doc'] for row in rows])

With tools in hand to compute discussion threads, let’s now turn back to the Enron data.

Look Who’s Talking

Running the code in Example 3-14 creates a database that provides rapid access to message ID values grouped as discussion threads. With a database already containing the original messages themselves and another Lucene-backed database containing keyword search capabilities, you can really begin to do some very interesting things. Let’s now revisit Raptor from the previous section by considering the task of computing the individual sets of email addresses associated by any discussion thread where Raptor was mentioned, or, to put it another way, the various sets of people involved in discussions using this term. The resulting data structure we want to get to would look something like what’s shown in Example 3-17.

Example 3-17. Ideal results from threading together discussions

{
    "participants : [  [email protected], 
                       [email protected],
                       ...
                    ],
    "message_ids" : [  "id1",
                       "id2",
                       ...
                    ],
    "subject"     : "subject"
    }

The approach we’ll take involves the following three steps:

  • Query couchdb-lucene for message IDs associated with a term of interest such as Raptor.

  • Look up discussion threads associated with any of those message IDs.

  • Compute the unique set of email addresses that appear in any of the header fields associated with messages in any of the threads.

Example 3-18 recycles some previous code and demonstrates one possible implementation that ties it all together.

Example 3-18. A robust approach for threading together discussion threads from mbox data (mailboxes__participants_in_conversations.py)

# -*- coding: utf-8 -*-

import sys
import httplib
from urllib import quote
from urllib import urlencode
import json

DB = sys.argv[1]  # enron
QUERY = sys.argv[2]

# Query couchdb-lucene by_subject and by_content

message_ids_of_interest = []
for idx in ['by_subject', 'by_content']:

    try:
        conn = httplib.HTTPConnection('localhost', 5984)
        conn.request('GET', '/%s/_fti/_design/lucene/%s?q=%s&limit=50000' % (DB,
                     idx, quote(QUERY)))
        response = conn.getresponse()
        if response.status == 200:
            response_body = json.loads(response.read())
            message_ids_of_interest.extend([row['id'] for row in
                    response_body['rows']])
        else:
            print 'An error occurred fetching the response: %s %s' 
                % (response.status, response.reason)
            sys.exit()
    finally:
        conn.close()

# Remove any duplicates

message_ids_of_interest = list(set(message_ids_of_interest))

# Perform discussion thread filtering in memory. It's a relatively
# small amount of data

try:
    conn = httplib.HTTPConnection('localhost', 5984)
    conn.request('GET', '/%s/_all_docs?include_docs=true' % (DB + '-threads', ))
    response = conn.getresponse()
    if response.status != 200:  #  OK
        print 'Unable to get docs: %s %s' % (response.status, response.reason)
        sys.exit()

    threads = [dict([('thread_id', row['doc']['_id']), ('message_ids',
               [t['external_id'] for t in row['doc']['thread']])]) for row in
               json.loads(response.read())['rows']]
finally:
    conn.close()

# Find only the threads that have a message_id appearing in the list of message ids 
# fetched from the Lucene index

threads_of_interest = [t for t in threads for message_id in t['message_ids']
                       if message_id in message_ids_of_interest]

# Remove duplicates

seen = []
idx = 0
while idx < len(threads_of_interest):
    if threads_of_interest[idx]['thread_id'] in seen:
        threads_of_interest.pop(idx)
    else:
        seen.append(threads_of_interest[idx]['thread_id'])
        idx += 1

# Cull out message ids for threads of interest

message_ids_for_threads_of_interest = [t['message_ids'] for t in
                                       threads_of_interest]

# Flatten out the list of lists into just a list and remove duplicates

message_ids_for_threads_of_interest = list(set([message_id for message_ids in
        message_ids_for_threads_of_interest for message_id in message_ids]))

# Query CouchDB for the email addresses in various headers of interest using a bulk 
# request

try:
    conn = httplib.HTTPConnection('localhost', 5984)
    post_params = json.dumps({'keys': message_ids_for_threads_of_interest})
    conn.request('POST', '/%s/_all_docs?include_docs=true' % (DB, ), post_params)
    response = conn.getresponse()
    if response.status != 200:  #  OK
        print 'Unable to get docs: %s %s' % (response.status, response.reason)
        sys.exit()

    full_docs = [row['doc'] for row in json.loads(response.read())['rows']]
finally:
    conn.close()

# Finally, with full messages of interest on hand, parse out headers of interest and
# and compute unique sets of email addresses for each thread by decorating 
# threads_of_interest

for thread in threads_of_interest:
    participants = []
    for message_id in thread['message_ids']:
        doc = [d for d in full_docs if d['_id'] == message_id][0]

        try:
            participants.append(doc.get('From'))
            participants.extend(doc.get('To'))
            if doc.get('Cc'):
                participants.extend(doc.get('Cc'))
            if doc.get('Bcc'):
                participants.extend(doc.get('Bcc'))
        except:
            pass  # Maybe a X-To header, etc. as opposed to To?

    thread['participants'] = list(set(participants))
    thread['subject'] = doc['Subject']
print json.dumps(threads_of_interest, indent=4)

Sample output from the script is shown in Example 3-19.

Example 3-19. Sample results from threading discussions for a search query of “Raptor”

[
    {
        "thread_id": "b6d4f96224bc546acd34c405e6c471c5", 
        "participants": [
            "[email protected]", 
            "[email protected]"
        ], 
        "message_ids": [
            "24a30d62545728e26eb3311d63effb47"
        ], 
        "subject": "FW: Note on Valuation"
    }, 
    {
        "thread_id": "b6d4f96224bc546acd34c405e6dbc0d4", 
        "participants": [
            "[email protected]", 
            "[email protected]", 
            "[email protected]", 
            "[email protected]", 
            "[email protected]"
        ], 
        "message_ids": [
            "24a30d62545728e26eb3311d633cf6b3"
        ], 
        "subject": "Tax Accruals on the Raptor Companies"
    }, 
    {
        "thread_id": "b6d4f96224bc546acd34c405e6eb7adf", 
        "participants": [
            "[email protected]", 
            "[email protected]"
        ], 
        "message_ids": [
            "3b2c340c28782c8986737c35a357c6ae"
        ], 
        "subject": "FW: E-11 Raptor"
    },

    ... output truncated ...

    ]

In short, the script has provided some tools for determining who participated in what conversations based on a keyword heuristic. You could load an mbox file into a decent mail client and search out this information via a manual process, but the demonstrated approach is generally useful and could be adapted for many automated or semiautomated analyses.



[20] The “jwz” is a reference to its author, Jamie Zawinski.

[21] Namely, there were some modifications to make the code a little more object-oriented, the input/output formats were changed to consume our JSONified message objects, and the memory profile was dramatically decreased by considering only the few fields needed by the threading algorithm.

[22] Fundamentally, it’s unlikely that CouchDB will ever support utilizing multiple cores for a bulk write operation because of the way writes are serialized to disk. However, it’s not hard to imagine a patch that takes advantage of multiple cores for certain kinds of read operations, given that the underlying data structure is a tree, which inherently lends itself to being traversed at multiple nodes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.0.38