#!/usr/bin/python3 # -*- coding: utf-8 -*- """μSQL. How much of SQL can you implement in a couple of hours? About this much. This is a crude hack with major limitations. It’s easy to hit them. >>> sql = DB() >>> sql.tables {} >>> sql("insert into parents (child, parent) values ('kragen', 'carolyn'), ('serafina', 'carolyn'), ('carolyn', 'john')") >>> sql.tables {'parents': [{'parent': 'carolyn', 'child': 'kragen'}, {'parent': 'carolyn', 'child': 'serafina'}, {'parent': 'john', 'child': 'carolyn'}]} >>> sql("select * from parents") [{'parent': 'carolyn', 'child': 'kragen'}, {'parent': 'carolyn', 'child': 'serafina'}, {'parent': 'john', 'child': 'carolyn'}] >>> sql("select parent from parents") [{'parent': 'carolyn'}, {'parent': 'carolyn'}, {'parent': 'john'}] >>> sql("select parent from parents parents where child = 'kragen'") [{'parent': 'carolyn'}] >>> sql("select parent from parents where child = 'kragen'") [{'parent': 'carolyn'}] >>> sql("select a.parent, b.child from parents a, parents b where a.child = b.parent") [{'parent': 'john', 'child': 'kragen'}, {'parent': 'john', 'child': 'serafina'}] Note that the above test spuriously fails in doctest half the time due to hash ordering. That isn’t really okay but it’s good enough for a quick hack. There’s a simple minimal shell for interactive use: $ ./usql.py » insert into parents(child, parent) values('kragen', 'carolyn'), ('serafina', 'carolyn'), ('carolyn', 'john') » select * from parents [{'child': 'kragen', 'parent': 'carolyn'}, {'child': 'serafina', 'parent': 'carolyn'}, {'child': 'carolyn', 'parent': 'john'}] » select a.child, b.parent from parents a, parents b where a.parent = b.child [{'child': 'kragen', 'parent': 'john'}, {'child': 'serafina', 'parent': 'john'}] » \w parents.db » \q $ ./usql.py » \r parents.db » select * from parents where child='serafina' [{'child': 'serafina', 'parent': 'carolyn'}] » quit Limitations include: - Although you can select from an arbitrary number of tables, the WHERE clause must be a single equality test, so you can’t join them usefully. - There are a lot of cases where random junk text added to your queries will be ignored. - The only query plan is a nested loop join. Just be thankful it doesn’t materialize the Cartesian product in RAM before applying your WHERE clauses. - But it does materialize the whole query result set. - No dirty bit is kept, so it’s easy to accidentally exit the CLI without saving your modified database. - The only infix operator supported is `=`, and that only in the WHERE clause. - No nested queries. - Results are displayed as Python objects. - There is no DDL and consequently no type-checking. - UPDATE and DELETE are not supported. """ import doctest import pickle import re import readline import traceback # Python 6 try: input = raw_input except NameError: pass select_re = re.compile(r'''(?xi) select \s+ (?P \* | (?:[a-z._]+\s*, \s*)* [a-z._]+) \s* from \s+ (?P(?:[a-z._]+(?:\s+[a-z_]+)?\s*, \s*)* [a-z._]+(?:\s+[a-z_]+)?) \s* (?:where \s+ (?P.*))?$''') insert_re = re.compile(r'''(?xi) insert \s* into \s* (?P [a-z_]+) \s* \( (?P (?:[a-z_]+\s*,\s*)+ [a-z_]+) \s* \) \s* values \s* (?P.*)''') values_re = re.compile(r'\(([^)]*)\)') value_re = re.compile(r"'(?:[^']|'')*'|[\d.]+") class DB(object): def __init__(self): self.tables = {} def __call__(self, query): return self.exec_sql(query) def exec_sql(self, query): mo = insert_re.match(query) if mo: return self.insert(mo) mo = select_re.match(query) if mo: return self.select(mo) raise QueryNotUnderstood(query) def insert(self, mo): fields = [field.strip() for field in mo.group('fields').split(',')] # Super sloppy! Will ignore any garbage in between things. values = [[eval_value(val) for val in value_re.findall(values)] for values in values_re.findall(mo.group('values'))] table = mo.group('table') if table not in self.tables: self.tables[table] = [] self.tables[table].extend([dict(zip(fields, vv)) for vv in values]) def select(self, mo): tables = [table.strip() for table in mo.group('tables').split(',')] tables = [table.split() if ' ' in table else (table, table) for table in tables] if mo.group('where') is None: predicate = lambda tuples: True else: left, right = [compile_expr(f.strip()) for f in mo.group('where').split('=')] predicate = lambda tuples: left(tuples) == right(tuples) projection = compile_projection(mo.group('fields')) results = [] for tuples in self.generate(tables): if predicate(tuples): results.append(projection(tuples)) return results def generate(self, table_spec): if not table_spec: yield {} return table, alias = table_spec[0] assert alias.lower() != 'where' # parser bug, maybe not fixed for t in self.tables[table]: for rest_tuples in self.generate(table_spec[1:]): rest_tuples[alias] = t yield rest_tuples def compile_projection(s): if s == '*': def projection(tuples): fields = {} for table, v in tuples.items(): for field in v: if field not in fields: fields[field] = set() fields[field].add(table) result = {} for field in fields: if len(fields[field]) == 1: (source,) = fields[field] result[field] = tuples[source][field] else: for source in fields[field]: result['{}.{}'.format(source, field)] = tuples[source][field] return result else: fields = [f.strip() for f in s.split(',')] names = [f.split('.')[-1] for f in fields] fns = [compile_expr(f) for f in fields] def projection(tuples): return dict(zip(names, [fn(tuples) for fn in fns])) return projection def compile_expr(expr): if expr.startswith("'") or expr[0].isdigit(): val = eval_value(expr) return lambda tuples: val parts = expr.split('.') if len(parts) == 2: table, field = parts return lambda tuples: tuples[table][field] def eval(tuples): for t in tuples.values(): if expr in t: return t[expr] return eval def eval_value(s): if s.startswith("'"): return s[1:-1].replace("''", "'") elif '.' in s: return float(s) else: return int(s) class QueryNotUnderstood(Exception): pass class Bug(Exception): pass if __name__ == '__main__': #doctest.testmod() sql = DB() while True: try: statement = input("» ") except EOFError: break except KeyboardInterrupt: print("Use ^D, quit, exit, or \q to quit.") continue if statement in ['quit', 'exit', r'\q']: break try: if statement.startswith(r'\w '): with open(statement[3:], 'wb') as f: pickle.dump(sql, f, protocol=2) elif statement.startswith(r'\r '): with open(statement[3:], 'rb') as f: sql = pickle.load(f) else: result = sql(statement) if result is not None: print(result) except Exception: traceback.print_exc() except KeyboardInterrupt: print('Interrupted!')