#!/usr/bin/python # -*- coding: utf-8 -*- """Import an mbox file into LevelDB. Email is kind of unstructured, so I thought I’d try using LevelDB as a triple store: a set of entity-attribute-value triples. In this case the entities are message numbers, the attributes are either header names (lowercased and ending with ':', such as 'subject:' or 'from:') or numbers for chunks of the body of about 2K. The numbers are prefixed by a single digit and colon indicating how many digits they are, so chunk 11 is attribute '2:11'. Because LevelDB doesn’t index, we have to organize the keyspace for efficient retrieval. For attributes other than the body, we store the following pairs: entity attribute -> value attribute value entity -> '' The body is only stored in the first form. Message numbers begin with '#' to avoid ambiguity. Python can store a few hundred thousand key-value pairs in LevelDB per second, so, if each message creates less than a hundred entries, it should be able to store a few thousand messages per second in LevelDB. I did import 914 messages and 10MiB in 1.8 s, which is about 500 messages and 5.7MB per second, a bit short of what I was hoping for. The 3.8 G of mail in my current mail spool would still only take 11 minutes. (It imported 1G of mail, 41631 messages, in 4'16". Retrieving the headers of a random mail takes consistently 90ms, but 38ms of that is just starting the program. The resulting LevelDB is 897MB, containing 2.50 million key-value pairs, and compressed very poorly despite the use of Snappy — just dumping out the database with leveldump.py generates 1.2 GB, about 35% larger.) """ import sys import leveldb def main(argv): msgno = 0 db = leveldb.LevelDB(argv[1]) current_header = None body = body_chunk_no = body_chunk_size = None in_header = True batch = write_queue(db) while True: line = sys.stdin.readline() if line == '': # EOF break elif line.startswith('From '): flush_header(msgno, batch, current_header) current_header = None flush_body(msgno, batch, body, body_chunk_no) body_chunk_no = body = None msgno += 1 in_header = True flush_header(msgno, batch, ('envelope-from:', line[5:])) elif not in_header: body.append(line) body_chunk_size += len(line) if body_chunk_size > 2048: flush_body(msgno, batch, body, body_chunk_no) del body[:] body_chunk_no += 1 body_chunk_size = 0 elif line == '\n': flush_header(msgno, batch, current_header) current_header = None body = [] body_chunk_no = 0 body_chunk_size = 0 in_header = False elif line[0] in ' \t': if current_header is None: current_header = ('unknown-header:', '') current_header = (current_header[0], current_header[1] + ' ' + line.strip()) elif ':' in line: colidx = line.index(':') header_name = line[:colidx+1].lower().replace(' ', '_') flush_header(msgno, batch, current_header) current_header = (header_name, line[colidx+1:].strip()) if len(batch) > 1024: batch.flush() flush_header(msgno, batch, current_header) flush_body(msgno, batch, body, body_chunk_no) batch.flush() def flush_header(msgno, db, header): if header is None: return attribute, value = header db.Put('eav %s %s' % (msgno, attribute), value) db.Put('ave %s %s %s' % (attribute, value, str(msgno)), '') def flush_body(msgno, db, body, chunk_no): if chunk_no is None: return value = ''.join(body) cn = str(chunk_no) assert len(cn) < 10 attribute = '%d:%s' % (len(cn), cn) db.Put('eav %s %s' % (msgno, attribute), value) class write_queue(object): def __init__(self, db): """ Arguments: - `db`: a LevelDB """ self._db = db self.items = leveldb.WriteBatch() self.n = 0 def Put(self, key, value): self.items.Put(key, value) self.n += 1 def __len__(self): return self.n def flush(self): self._db.Write(self.items) self.items = leveldb.WriteBatch() self.n = 0 if __name__ == '__main__': main(sys.argv)