#!/usr/bin/python import re import StringIO import warnings from backwardlines import BackwardLines class CouldntParse(Warning): pass class Address: def __init__(self, addr): self.text = addr mo = re.search(r'<(.*?)>', addr) # XXX rre trucho if not mo: mo = re.match(r'\s*(?:\(.*\)\s*)?(\S+@\S+)\s*(?:\(.*\)\s*)?', addr) if mo: self.address = mo.group(1) else: warnings.warn(addr, CouldntParse, 0) self.address = addr def parse_address_list(s): return [Address(addr) for addr in s.split(',')] class BackwardMessage: def __init__(self): self.headers = {} self.continuations = [] self.header_lines = [] def get(self, name, default=None): return self.headers.get(name, default) def __getitem__(self, name): return self.headers[name] def parse(self, line): if line == '\n': self.header_lines = [] elif line.startswith('From '): self.parse_header_lines() else: self.header_lines.append(line) def parse_header_lines(self): for line in self.header_lines: self.parse_header_line(line) def parse_header_line(self, line): if line[0] in ' \t': self.continuations.append(line) else: mo = re.match(r'(.*?):\s*(.*)', line) if mo: header_name = mo.group(1).lower() header_contents = mo.group(2) header_contents += ''.join(reversed(self.continuations)) if header_name in ['from', 'to']: header_contents = parse_address_list(header_contents) self.headers[header_name] = header_contents self.continuations = [] class BackwardReadingMailbox: def __init__(self, mailbox): self.mailbox = mailbox def __iter__(self): current_message = BackwardMessage() for line in BackwardLines(self.mailbox): current_message.parse(line) if line.startswith('From '): yield current_message current_message = BackwardMessage() def recipients(mailbox): return set(addr.address for message in BackwardReadingMailbox(mailbox) for addr in message.get('to', [])) def rendition(number): return '\033[%sm' % number def message_summary(message): address = message.get('from', ['nobody'])[0].address[:25] subject = message.get('subject', '')[:79 - len(address)] return '%s%s %s%s%s' % (rendition(2), address, rendition('0;31'), subject, rendition(0)) test_message = """From somebody From: a To: c Subject: hello hi there you: go! """ test_mbox_2 = """From a To: e , g ! From d To: you , kragen-journal@canonical.org """ def test(): sio = StringIO.StringIO(test_message) i = 0 for message in BackwardReadingMailbox(sio): i += 1 ok(message['subject'], 'hello') ok(message['from'][0].address, 'b@example.org') ok(message['to'][0].address, 'd@example.org') ok(message_summary(message), '\033[2mb@example.org \033[0;31mhello\033[0m') ok(i, 1) sio = StringIO.StringIO(test_mbox_2) messages = list(BackwardReadingMailbox(sio)) ok(len(messages), 2) ok(messages[0]['to'][0].text, 'you ') ok([addr.address for addr in messages[1]['to']], ['f@example.org', 'h@example.org']) ok(recipients(sio), set(['you@example.org', 'f@example.org', 'h@example.org', 'kragen-journal@canonical.org', ])) ok(Address('MAILER-DAEMON@canonical.org (Mail Delivery Subsystem)').address, 'MAILER-DAEMON@canonical.org') def ok(a, b): assert a == b, (a, b) test() def main(outbox, inbox): important_senders = recipients(outbox) i = 0 for message in BackwardReadingMailbox(inbox): if message['from'][0].address in important_senders: print_message_summary(message) i += 1 if i > 20: break if __name__ == '__main__': main(open('/home/kragen/mail/panacea-sent-mail'), open('/home/kragen/mbox'))