#!/usr/bin/python # -*- coding: utf-8 -*- """Summarize or query hashtags in my urls file provided on standard input. If invoked with no arguments, it just summarizes the hashtags in the file. If invoked with one or more arguments, those arguments are a conjunctive query to run on the file. You can invoke it with the single argument --interactive to run a series of queries interactively. The file is formatted as a sequence of sections for different dates, each begun by a line that reads "links from somedate:". Each line is a link with a paragraph of free-form text (in Markdown in my case) and potentially some #hashtag items in there. Hashtags can contain word characters and arbitrary non-terminal punctuation; case and punctuation are considered insignificant when comparing. Hashtags in the file must be preceded by a hyphen or whitespace. Query terms can be of the following forms: * #walnuts: include only items that have the hashtag #walnuts (or #Walnuts or #wal-nuts, etc.); you’ll have to quote this to protect it from your shell. * -#raisins: exclude items that have the hashtag #raisins (or #rAiSiNs or whatever); probably also needs quoting. * whatever: include only items that contain the text “whatever” * -whatever: exclude items that contain the text “whatever” Text searches are case-insensitive unless the term contains uppercase letters. These probably should be done in UTF-8 but probably aren’t. """ import collections import sys import re def main(argv): if argv[1:] == ['--interactive']: query = None interactive = True else: query = compile_query(argv[1:]) interactive = False variant_counts = {} date = None output = Output() links_re = re.compile(r'^links from (\S+):$') # XXX don’t include 's at the end hashtag_re = re.compile(r'[-\s](#[!-~\w]*\w)') items = [] for line in sys.stdin: date_mo = links_re.match(line) if date_mo: date = date_mo.group(1) continue tags = [] variants = [] for mo in hashtag_re.finditer(line): variant = mo.group(1) canonical = canonicalize_tag(variant) if canonical not in variants: variant_counts[canonical] = collections.Counter() variant_counts[canonical][variant] += 1 tags.append(canonical) variants.append(variant) matched = query and query.matches(line, tags) if matched or not query: items.append(Item(date, line, tags)) if matched: output.write(date, line) canonical_counts = collections.Counter() for item in items: canonical_counts.update(item.tags) summarize(canonical_counts, variant_counts) if interactive: interact(items, output, variant_counts) def summarize(canonical_counts, variant_counts): for freq, canonical in sorted(((f, k) for k, f in canonical_counts.items()), reverse=True): variants = variant_counts[canonical] most_popular = max(variants, key=variants.__getitem__) print "%6d %s" % (freq, most_popular) def interact(items, output, variant_counts): by_tag = index(items) while True: query_string = output.prompt() if not query_string: break query = compile_query(query_string.split()) canonical_counts = collections.Counter() for item in query.evaluate(items, by_tag): canonical_counts.update(item.tags) output.write(item.date, item.line) summarize(canonical_counts, variant_counts) def index(items): by_tag = {} for item in items: for tag in item.tags: if tag not in by_tag: by_tag[tag] = [] by_tag[tag].append(item) return by_tag Item = collections.namedtuple('Item', ('date', 'line', 'tags')) def compile_query(terms): if terms: return Query(terms) else: return None class Query: def __init__(self, terms): self.terms = [compile_term(term) for term in terms] def matches(self, line, tags): for term in self.terms: if not term.matches(line, tags): return False return True def evaluate(self, items, by_tag): best_term = min(self.terms, key=lambda term: term.plan(items, by_tag)) for item in best_term.generate(items, by_tag): if all(term.matches(item.line, item.tags) for term in self.terms): yield item class Output: def __init__(self): self.last_date = None self.tty = None def prompt(self): self.last_date = None if self.tty is None: self.tty = open('/dev/tty') print '☺ ', sys.stdout.flush() return self.tty.readline() def write(self, date, line): if date != self.last_date: print "\nlinks from %s:" % date self.last_date = date print line def compile_term(term): if term.startswith('-#'): return LacksTag(canonicalize_tag(term[2:])) elif term.startswith('#'): return HasTag(canonicalize_tag(term[1:])) elif (term.startswith('-') and term == term.lower()): return Lacks(term[1:]) elif term.startswith('-'): return LacksSensitively(term[1:]) elif term == term.lower(): return Has(term) else: return HasSensitively(term) class QueryTerm(collections.namedtuple('QueryTerm', ['term'])): def plan(self, items, tags_dict): "Estimate the query cost." return len(items) def generate(self, items, tags_dict): "Generate a conservative approximation to items matching this query." return items class LacksTag(QueryTerm): def matches(self, line, tags): return self.term not in tags # We could have a .plan() that returns a smaller number of items, # but that would reflect the expected output size, not the # expected query cost. class HasTag(QueryTerm): def matches(self, line, tags): return self.term in tags def plan(self, items, tags_dict): return len(tags_dict.get(self.term, items)) def generate(self, items, tags_dict): return tags_dict.get(self.term, items) class Lacks(QueryTerm): def matches(self, line, tags): return self.term not in line.lower() class LacksSensitively(QueryTerm): def matches(self, line, tags): return self.term not in line class Has(QueryTerm): def matches(self, line, tags): return self.term in line.lower() class HasSensitively(QueryTerm): def matches(self, line, tags): return self.term in line def canonicalize_tag(tag): return re.sub(r'\W', '', tag.lower()) if __name__ == '__main__': main(sys.argv)