As a first attempt at threading conversations, you might start with some basic string heuristics on the Subject header of the message and eventually get to the point where you’re inspecting senders, recipients, and date stamps in an attempt to piece things together. Fortunately, mail servers are slightly more sophisticated than you might think and, as you know from mbox: The Quick and Dirty on Unix Mailboxes, there are Message-ID, In-Reply-To, and References headers that can be used to extract conversations from messages in a mailbox. A message threading algorithm commonly known as “jwz threading,”[20] takes all of this into account and provides a reasonable approach to parsing out message threads. All of the specifics for the algorithm can be found online at http://www.jwz.org/doc/threading.html. The implementation we’ll be using is a fairly straightforward modification[21] of the one found in the Mail Trends project, which provides some other useful out-of-the-box tools. Given that no checkins for the project hosted on Google Code have occurred since early 2008, it’s unclear whether Mail Trends is being actively maintained anywhere, but the project nonetheless provides a useful starting point for mail analysis, as evidenced by the salvaging of jwz threading.
Let’s go ahead and take a look at the overall workflow in Example 3-14, and then we’ll dive into a few more of the details.
Example 3-14. Creating discussion threads from mbox data via “jwz threading” (mailboxes_threading.py)
# -*- coding: utf-8 -*- import sys import couchdb from mailboxes__jwzthreading import thread, Message from mailboxes__CouchDBBulkReader import CouchDBBulkReader from datetime import datetime as dt from prettytable import PrettyTable try: import jsonlib2 as json except: import json DB = sys.argv[1] NUM_PROC_THREADS = 3 # Recommendation: ~1 thread/core # Pull the data as efficient as possible from CouchDB by using a thread # pool to get as close as possible to being I/O bound. # A single request to _all_docs works except that it CPU bound to a single core now = dt.now() print >> sys.stderr, 'Bulk reading from CouchDB...' br = CouchDBBulkReader(DB, NUM_PROC_THREADS) docs = br.read() print >> sys.stderr, ' %s' % (dt.now() - now, ) now = dt.now() print >> sys.stderr, 'Threading in Memory...' threads = thread([Message(doc) for doc in docs]) print >> sys.stderr, ' %s' % (dt.now() - now, ) # Write out threading info into another database. # Note that writes to CouchDB are serialized to append-only # databases, so threading is unlikely to help here, and besides, # the average document size is very small, making this a quick operation now = dt.now() print >> sys.stderr, 'Bulk writing to CouchDB...' server = couchdb.Server('http://localhost:5984') db = server.create(DB + '-threads') results = db.update([{'thread': thread} for thread in threads], all_or_nothing=True) print >> sys.stderr, ' %s' % (dt.now() - now, ) # Some basic stats print >> sys.stderr, 'Total number of threads: %s' % len(threads) print >> sys.stderr # Compute (_id, len(thread)) tuples # You could also compute thread length directly in CouchDB using a simple reducer function stats = sorted(zip([result[1] for result in results], [len(t) for t in threads]), key=lambda x: x[1]) fields = ['Thread Id', 'Thread Length'] pt = PrettyTable(fields=fields) [pt.set_field_align(f, 'l') for f in fields] for stat in stats: pt.add_row(stat) pt.printt()
The overall flow is that we bulk-read messages out of CouchDB, perform threading in memory, and then write out the thread information to a separate database where each thread is a document that contains references back to the original message’s content. Finally, some basic statistics about the length of discussion threads are printed out. You could just as easily use a map/reduce approach to calculating statistics about thread length (see Map/Reduce-Inspired Frequency Analysis). A discussion thread is a document that looks like Example 3-15.
Example 3-15. Sample threading results
{ "_id": "b6d4f96224bc546acd34c405e6fff62f", "_rev": "1-1bf63dcdd94067ad647afe2ea3ade63c", "thread": [ { "external_id": "24a30d62545728e26eb3311d63ae6e02", "subject": "FW: Sitara EOL Bridge Problem Today" }, { "external_id": "bb808c9081912f5861295bf1d105dd02", "subject": "FW: Sitara EOL Bridge Problem Today" }, { "external_id": "3b2c340c28782c8986737c35a332cd88", "subject": "FW: Sitara EOL Bridge Problem Today" } ] }
The most interesting parts of this listing are the references to
mailboxes_jwzthreading
and CouchDBBulkReader
. The API exposed by mailboxes_jwzthreading
simply converts each of
the message documents fetched from CouchDB into a slightly different form
and then passes them into the threading algorithm, which takes care of the
core work and returns a convenient JSON format we can ingest back into
CouchDB. The details of mailboxes_jwzthreading
are available at http://github.com/ptwobrussell/Mining-the-Social-Web/blob/master/python_code/mailboxes__jwzthreading.py.
The use of CouchDBBulkReader
could have been omitted altogether in favor of a bulk read through the
couchdb
module, but CouchDBBulkReader
provides a significant
performance boost by using an internal thread pool to dispatch multiple
requests to CouchDB at the same time. The underlying issue is that CouchDB
only uses a single core for a single read or a single write request to the
server. This might initially strike you as odd given all of the hubbub
about how Erlang—the underlying implementation language—has such extensive
support for concurrency, but it’s a design decision made by the
maintainers of the project[22] for sensible reasons. The good news is that you can divvy
out multiple requests to the server all at the same time and heat up
multiple cores fairly easily. This is the approach taken by CouchDBBulkReader
, which is introduced in Example 3-16. Briefly, an initial request is
made that fetches only the ID values for every document in the database (a
relatively fast operation since the amount of data that ends up being
marshalled is very small), and then these IDs are sorted and chunked into
equal parts. Each chunk is assigned to a thread that fetches the full
document for that particular range.
As a rule of thumb, don’t use more than one processing thread per
core for the thread pool, and use performance-monitoring tools like
top
on a *nix system or Task Manager on
Windows to track just how much you are taxing your system. Ideally, you’d
see the beam.smp
daemon process associated with CouchDB
pegged at around 200% if you were working on a machine with two cores,
which effectively makes you “CPU-bound,” but anything above 150% is still
a substantial improvement. You should observe that the bulk read consumes
nearly all of the time required to retrieve the data, while the actual
threading and writing the thread documents back to CouchDB take virtually
no time at all. An interesting exercise would be to consider porting the
threading algorithm into a map/reduce paradigm used by the couchdb
module, or even rewriting it in JavaScript.
The threadpool
package used in Example 3-16 is available via the usual
means: easy_install
threadpool
.
Example 3-16. Using a thread pool to maximize read throughput from CouchDB (mailboxes__CouchDBBulkReader.py)
# -*- coding: utf-8 -*- from datetime import datetime as dt from math import ceil import httplib import urllib import time import threadpool try: import jsonlib2 as json except: import json class CouchDBBulkReader: def __init__( self, db, num_threads, host='localhost', port=5984, ): self.db = db self.num_threads = num_threads self.host = host self.port = port self.results = [] id_buckets = self._getDocIds() self.pool = threadpool.ThreadPool(self.num_threads) requests = threadpool.makeRequests(self._getDocs, id_buckets, self._callback, self._errCallback) [self.pool.putRequest(req) for req in requests] self.pool.wait() def read(self): while True: try: time.sleep(0.5) self.pool.poll() except threadpool.NoResultsPending: return self.results except KeyboardInterrupt: print 'Keyboard interrupt. Exiting' sys.exit() finally: self.pool.joinAllDismissedWorkers() # Called to quickly get all of the document ids which can be sorted and dibbied out def _getDocIds(self): # Helper function to tersely compute a list of indices that evenly distribute # the items in it def partition(alist, indices): return [alist[i:j] for (i, j) in zip([0] + indices, indices + [None])][:-1] try: conn = httplib.HTTPConnection(self.host, self.port) conn.request('GET', '/%s/_all_docs' % (self.db, )) response = conn.getresponse() if response.status != 200: # OK print 'Unable to get docs: %s %s' % (response.status, response.reason) sys.exit() ids = [i['id'] for i in json.loads(response.read())['rows'] if not i['id'].startswith('_')] ids.sort() finally: conn.close() partition_size = int(ceil(1.0 * len(ids) / self.num_threads)) indices = [] _len = len(ids) idx = 0 while idx < _len: idx += partition_size indices.append(idx) return partition(ids, indices) def _getDocs(self, ids): try: (startkey, endkey) = (ids[0], ids[-1]) conn = httplib.HTTPConnection(self.host, self.port) conn.request('GET', '/%s/_all_docs?startkey="%s"&endkey="%s"&include_docs=true' % (self.db, startkey, endkey)) response = conn.getresponse() if response.status != 200: # OK print 'Unable to get docs: %s %s' % (response.status, response.reason) sys.exit() return response.read() finally: conn.close() def _errCallback(self, request, result): print 'An Error occurred:', request, result sys.exit() def _callback(self, request, result): rows = json.loads(result)['rows'] self.results.extend([row['doc'] for row in rows])
With tools in hand to compute discussion threads, let’s now turn back to the Enron data.
Running the code in Example 3-14 creates a database that provides rapid access to message ID values grouped as discussion threads. With a database already containing the original messages themselves and another Lucene-backed database containing keyword search capabilities, you can really begin to do some very interesting things. Let’s now revisit Raptor from the previous section by considering the task of computing the individual sets of email addresses associated by any discussion thread where Raptor was mentioned, or, to put it another way, the various sets of people involved in discussions using this term. The resulting data structure we want to get to would look something like what’s shown in Example 3-17.
Example 3-17. Ideal results from threading together discussions
{ "participants : [ [email protected], [email protected], ... ], "message_ids" : [ "id1", "id2", ... ], "subject" : "subject" }
The approach we’ll take involves the following three steps:
Query couchdb-lucene
for message IDs associated
with a term of interest such as Raptor.
Look up discussion threads associated with any of those message IDs.
Compute the unique set of email addresses that appear in any of the header fields associated with messages in any of the threads.
Example 3-18 recycles some previous code and demonstrates one possible implementation that ties it all together.
Example 3-18. A robust approach for threading together discussion threads from mbox data (mailboxes__participants_in_conversations.py)
# -*- coding: utf-8 -*- import sys import httplib from urllib import quote from urllib import urlencode import json DB = sys.argv[1] # enron QUERY = sys.argv[2] # Query couchdb-lucene by_subject and by_content message_ids_of_interest = [] for idx in ['by_subject', 'by_content']: try: conn = httplib.HTTPConnection('localhost', 5984) conn.request('GET', '/%s/_fti/_design/lucene/%s?q=%s&limit=50000' % (DB, idx, quote(QUERY))) response = conn.getresponse() if response.status == 200: response_body = json.loads(response.read()) message_ids_of_interest.extend([row['id'] for row in response_body['rows']]) else: print 'An error occurred fetching the response: %s %s' % (response.status, response.reason) sys.exit() finally: conn.close() # Remove any duplicates message_ids_of_interest = list(set(message_ids_of_interest)) # Perform discussion thread filtering in memory. It's a relatively # small amount of data try: conn = httplib.HTTPConnection('localhost', 5984) conn.request('GET', '/%s/_all_docs?include_docs=true' % (DB + '-threads', )) response = conn.getresponse() if response.status != 200: # OK print 'Unable to get docs: %s %s' % (response.status, response.reason) sys.exit() threads = [dict([('thread_id', row['doc']['_id']), ('message_ids', [t['external_id'] for t in row['doc']['thread']])]) for row in json.loads(response.read())['rows']] finally: conn.close() # Find only the threads that have a message_id appearing in the list of message ids # fetched from the Lucene index threads_of_interest = [t for t in threads for message_id in t['message_ids'] if message_id in message_ids_of_interest] # Remove duplicates seen = [] idx = 0 while idx < len(threads_of_interest): if threads_of_interest[idx]['thread_id'] in seen: threads_of_interest.pop(idx) else: seen.append(threads_of_interest[idx]['thread_id']) idx += 1 # Cull out message ids for threads of interest message_ids_for_threads_of_interest = [t['message_ids'] for t in threads_of_interest] # Flatten out the list of lists into just a list and remove duplicates message_ids_for_threads_of_interest = list(set([message_id for message_ids in message_ids_for_threads_of_interest for message_id in message_ids])) # Query CouchDB for the email addresses in various headers of interest using a bulk # request try: conn = httplib.HTTPConnection('localhost', 5984) post_params = json.dumps({'keys': message_ids_for_threads_of_interest}) conn.request('POST', '/%s/_all_docs?include_docs=true' % (DB, ), post_params) response = conn.getresponse() if response.status != 200: # OK print 'Unable to get docs: %s %s' % (response.status, response.reason) sys.exit() full_docs = [row['doc'] for row in json.loads(response.read())['rows']] finally: conn.close() # Finally, with full messages of interest on hand, parse out headers of interest and # and compute unique sets of email addresses for each thread by decorating # threads_of_interest for thread in threads_of_interest: participants = [] for message_id in thread['message_ids']: doc = [d for d in full_docs if d['_id'] == message_id][0] try: participants.append(doc.get('From')) participants.extend(doc.get('To')) if doc.get('Cc'): participants.extend(doc.get('Cc')) if doc.get('Bcc'): participants.extend(doc.get('Bcc')) except: pass # Maybe a X-To header, etc. as opposed to To? thread['participants'] = list(set(participants)) thread['subject'] = doc['Subject'] print json.dumps(threads_of_interest, indent=4)
Sample output from the script is shown in Example 3-19.
Example 3-19. Sample results from threading discussions for a search query of “Raptor”
[ { "thread_id": "b6d4f96224bc546acd34c405e6c471c5", "participants": [ "[email protected]", "[email protected]" ], "message_ids": [ "24a30d62545728e26eb3311d63effb47" ], "subject": "FW: Note on Valuation" }, { "thread_id": "b6d4f96224bc546acd34c405e6dbc0d4", "participants": [ "[email protected]", "[email protected]", "[email protected]", "[email protected]", "[email protected]" ], "message_ids": [ "24a30d62545728e26eb3311d633cf6b3" ], "subject": "Tax Accruals on the Raptor Companies" }, { "thread_id": "b6d4f96224bc546acd34c405e6eb7adf", "participants": [ "[email protected]", "[email protected]" ], "message_ids": [ "3b2c340c28782c8986737c35a357c6ae" ], "subject": "FW: E-11 Raptor" }, ... output truncated ... ]
In short, the script has provided some tools for determining who participated in what conversations based on a keyword heuristic. You could load an mbox file into a decent mail client and search out this information via a manual process, but the demonstrated approach is generally useful and could be adapted for many automated or semiautomated analyses.
[20] The “jwz” is a reference to its author, Jamie Zawinski.
[21] Namely, there were some modifications to make the code a little more object-oriented, the input/output formats were changed to consume our JSONified message objects, and the memory profile was dramatically decreased by considering only the few fields needed by the threading algorithm.
[22] Fundamentally, it’s unlikely that CouchDB will ever support utilizing multiple cores for a bulk write operation because of the way writes are serialized to disk. However, it’s not hard to imagine a patch that takes advantage of multiple cores for certain kinds of read operations, given that the underlying data structure is a tree, which inherently lends itself to being traversed at multiple nodes.
18.119.0.38