Source code for asyncqlio.orm.schema.table

import collections
import itertools
import logging
import sys
import typing
from collections import OrderedDict

from asyncqlio import db as md_db
from asyncqlio.exc import SchemaError
from asyncqlio.orm import inspection as md_inspection, session as md_session
from asyncqlio.orm.schema import column as md_column, relationship as md_relationship
from asyncqlio.sentinels import NO_DEFAULT, NO_VALUE

PY36 = sys.version_info[0:2] >= (3, 6)
logger = logging.getLogger(__name__)

[docs]class TableMetadata(object): """ The root class for table metadata. This stores a registry of tables, and is responsible for calculating relationships etc. .. code-block:: python3 meta = TableMetadata() Table = table_base(metadata=meta) """ def __init__(self): #: A registry of table name -> table object for this metadata. self.tables = {} #: The DB object bound to this metadata. self.bind = None # type: md_db.DatabaseInterface
[docs] def register_table(self, tbl: 'TableMeta', *, autosetup_tables: bool = False) -> 'TableMeta': """ Registers a new table object. :param tbl: The table to register. :param autosetup_tables: Should tables be setup again? """ tbl.metadata = self self.tables[tbl.__tablename__] = tbl if autosetup_tables: self.setup_tables() return tbl
[docs] def get_table(self, table_name: str) -> 'typing.Type[Table]': """ Gets a table from the current metadata. :param table_name: The name of the table to get. :return: A :class:`.Table` object. """ try: return self.tables[table_name] except KeyError: # we can load this from the name instead for table in self.tables.values(): if table.__name__ == table_name: return table else: return None
[docs] def setup_tables(self): """ Sets up the tables for usage in the ORM. """ self.resolve_floating_relationships() self.resolve_aliases() self.resolve_backrefs()
[docs] def resolve_aliases(self): """ Resolves all alias tables on relationship objects. """ for tbl in self.tables.copy().values(): if isinstance(tbl, AliasedTable): continue for relationship in tbl.iter_relationships(): if relationship._table_alias is None: # auto-create the alias name # using the relationship name relationship._table_alias = "r_{}_{}".format( relationship.owner_table.__name__, relationship._name.lower() ) if not isinstance(relationship._table_alias, AliasedTable): relationship._table_alias = AliasedTable(relationship._table_alias, relationship.foreign_table) self.tables[relationship._table_alias.alias_name] = relationship._table_alias
[docs] def resolve_backrefs(self): """ Resolves back-references. """ for tbl in self.tables.values(): # type: TableMeta if isinstance(tbl, AliasedTable): # don't try and setup aliased tables continue for relationship in tbl.iter_relationships(): if relationship.back_reference is None: continue table, name = relationship.back_reference.split(".") table = self.get_table(table) # create the new relationship object # this flips the two columns so that the join path is correct new_rel = md_relationship.Relationship(relationship.right_column, relationship.left_column, load="joined", use_iter=False) # call `__set_name__` so that it knows what table it's assigned to new_rel.__set_name__(table, name) table._relationships[name] = new_rel relationship.back_reference = new_rel
[docs] def resolve_floating_relationships(self): """ Resolves any "floating" relationships - i.e any relationship/foreign keys that don't directly reference a column object. """ for tbl in self.tables.values(): if isinstance(tbl, AliasedTable): # don't try and resolve relationships on aliases continue for column in tbl._columns.values(): # if the fk is none we don't need to set it up if column.foreign_key is None: continue foreignkey = column.foreign_key if foreignkey.foreign_column is None: table, column = foreignkey._f_name.split(".") table_obb = self.get_table(table) if table_obb is None: raise SchemaError("No such table '{}' exists in FK {}" .format(table, foreignkey)) col = table_obb.get_column(column) if col is None: raise SchemaError("No such column '{}' exists on table '{}'" "(from FK {})" .format(column, table, foreignkey)) foreignkey.foreign_column = col for relation in tbl._relationships.values(): assert isinstance(relation, md_relationship.Relationship) resolving_columns = [col for col in [relation.left_column, relation.right_column] if isinstance(col, str)] for to_resolve in resolving_columns: table, column = to_resolve.split(".") table_obb = self.get_table(table) if table_obb is None: raise SchemaError("No such table '{}' exists" "(from relationship {})".format(table, relation)) col = table_obb.get_column(column) if col is None: raise SchemaError("No such column '{}' exists on table '{}'" .format(table, column)) if (to_resolve == relation.left_column) is True: relation.left_column = col logger.debug("Resolved {} to {}".format(to_resolve, col)) elif (to_resolve == relation.right_column) is True: relation.right_column = col logger.debug("Resolved {} to {}".format(to_resolve, col)) else: raise SchemaError("Could not resolve column '{}' - it did not match the " "left or right column!")
[docs]class TableMeta(type): """ The metaclass for a table object. This represents the "type" of a table class. """ def __prepare__(*args, **kwargs): return OrderedDict() def __new__(mcs, name: str, bases: tuple, class_body: dict, register: bool = True, *args, **kwargs): if register is False: return type.__new__(mcs, name, bases, class_body) # hijack columns columns = OrderedDict() relationships = OrderedDict() for col_name, value in class_body.copy().items(): if isinstance(value, md_column.Column): columns[col_name] = value # nuke the column class_body.pop(col_name) elif isinstance(value, md_relationship.Relationship): relationships[col_name] = value class_body.pop(col_name) class_body["_columns"] = columns class_body["_relationships"] = relationships try: class_body["__tablename__"] = kwargs["table_name"] except KeyError: class_body["__tablename__"] = name.lower() return type.__new__(mcs, name, bases, class_body) def __init__(self, tblname: str, tblbases: tuple, class_body: dict, register: bool = True, *args, **kwargs): """ Creates a new Table instance. :param register: Should this table be registered in the TableMetadata? :param table_name: The name for this table. """ # create the new type object super().__init__(tblname, tblbases, class_body) if register is False: return elif not hasattr(self, "metadata"): raise TypeError("Table {} has been created but has no metadata - did you subclass Table" " directly instead of a clone?".format(tblname)) # emulate `__set_name__` on Python 3.5 # also, set names on columns unconditionally it = itertools.chain(self._columns.items(), self._relationships.items()) if not PY36: it = itertools.chain(class_body.items(), it) for name, value in it: if hasattr(value, "__set_name__"): value.__set_name__(self, name) # ================ # # TABLE ATTRIBUTES # # ================ # #: A dict of columns for this table. self._columns = self._columns # type: typing.Dict[str, md_column.Column] #: A dict of relationships for this table. self._relationships = self._relationships # type: typing.Dict[str, md_relationship.Relationship] #: The primary key for this table. #: This should be a :class:`.PrimaryKey`. self._primary_key = self._calculate_primary_key() logger.debug("Registered new table {}".format(tblname)) self.metadata.register_table(self) def __getattr__(self, item): if item.startswith("_"): raise AttributeError("'{}' object has no attribute {}".format(self.__name__, item)) col = self.get_column(item) if col is None: try: return next(filter(lambda tup: tup[0] == item, self._relationships.items()))[1] except StopIteration: raise AttributeError(item) from None else: return col @property def _bind(self): return self.metadata._bind @property def __quoted_name__(self): return '"{}"'.format(self.__tablename__) @property def columns(self) -> 'typing.List[md_column.Column]': """ :return: A list of :class:`.Column` this Table has. """ return list(self.iter_columns()) def __repr__(self): try: return "<Table object='{}' name='{}'>".format(self.__name__, self.__tablename__) except AttributeError: return super().__repr__()
[docs] def iter_relationships(self) -> 'typing.Generator[md_relationship.Relationship, None, None]': """ :return: A generator that yields :class:`.Relationship` objects for this table. """ for rel in self._relationships.values(): yield rel
[docs] def iter_columns(self) -> 'typing.Generator[md_column.Column, None, None]': """ :return: A generator that yields :class:`.Column` objects for this table. """ for col in self._columns.values(): yield col
[docs] def get_column(self, column_name: str, *, raise_si: bool = False) -> 'typing.Union[md_column.Column, None]': """ Gets a column by name. :param column_name: The column name to lookup. This can be one of the following: - The column's ``name`` - The column's ``alias_name()`` for this table :return: The :class:`.Column` associated with that name, or None if no column was found. """ try: return self._columns[column_name] except KeyError: for column in self._columns.values(): alias = column.alias_name(table=self) if alias == column_name: return column return None
[docs] def get_relationship(self, relationship_name) -> 'typing.Union[md_relationship.Relationship, None]': """ Gets a relationship by name. :param relationship_name: The name of the relationship to get. :return: The :class:`.Relationship` associated with that name, or None if it doesn't existr. """ try: return self._relationships[relationship_name] except KeyError: return None
def _calculate_primary_key(self) -> typing.Union['PrimaryKey', None]: """ Calculates the current primary key for a table, given all the columns. If no columns are marked as a primary key, the key will not be generated. """ pk_cols = [] for col in self.iter_columns(): if col.primary_key is True: pk_cols.append(col) if pk_cols: pk = PrimaryKey(*pk_cols) pk.table = self logger.debug("Calculated new primary key {}".format(pk)) return pk return None @property def primary_key(self) -> 'PrimaryKey': """ :getter: The :class:`.PrimaryKey` for this table. :setter: A new :class:`.PrimaryKey` for this table. .. note:: A primary key will automatically be calculated from columns at define time, if any columns have ``primary_key`` set to True. """ return self._primary_key @primary_key.setter def primary_key(self, key: 'PrimaryKey'): key.table = self self._primary_key = key def _internal_from_row(cls, values: dict, *, existed: bool = False): obb = object.__new__(cls) # type: Table # init but dont pass any values obb.__init__() setattr(obb, "_{}__existed".format(cls.__name__), existed) obb._init_row(**values) return obb
[docs]class Table(metaclass=TableMeta, register=False): """ The "base" class for all tables. This class is not actually directly used; instead :meth:`.table_base` should be called to get a fresh clone. """ def __init__(self, **kwargs): #: The actual table that this object is an instance of. self.table = type(self) #: If this row existed before. #: If this is True, this row was fetched from the DB previously. #: Otherwise, it is a fresh row. self.__existed = False #: If this row is marked as "deleted". #: This means that the row cannot be updated. self.__deleted = False #: The session this row is attached to. self._session = None # type: md_session.Session #: A mapping of Column -> Previous values for this row. #: Used in update generation. self._previous_values = {} #: A mapping of relationship -> rows for this row. self._relationship_mapping = collections.defaultdict(lambda: []) #: A mapping of Column -> Current value for this row. self._values = {} if kwargs: self._init_row(**kwargs) def _init_row(self, **values): """ Initializes the rows for this table, setting the values of the object. :param values: The values to pass into this column. """ for name, value in values.items(): column = self.table.get_column(name) if column is None: raise TypeError("Unexpected row parameter: '{}'".format(name)) self._values[column] = value return self def __repr__(self): gen = ("{}={}".format(, self.get_column_value(col)) for col in self.table.columns) return "<{} {}>".format(self.table.__name__, " ".join(gen)) def __eq__(self, other): if not isinstance(other, Table): return NotImplemented if other.table != self.table: raise ValueError("Rows to compare must be on the same table") return self.primary_key == other.primary_key def __le__(self, other): if not isinstance(other, Table): return NotImplemented if other.table != self.table: raise ValueError("Rows to compare must be on the same table") return self.primary_key <= other.primary_key def __setattr__(self, key, value): # ensure we're not doing stupid shit until we get _values try: object.__getattribute__(self, "_values") except AttributeError: return super().__setattr__(key, value) # micro optimization # if it's in our __dict__, it's probably not a column # so bypass the column check and set it directly if key in self.__dict__: return super().__setattr__(key, value) col = self.table.get_column(column_name=key) if col is None: return super().__setattr__(key, value) # call on_set for the column return col.type.on_set(self, value) @property def primary_key(self) -> typing.Union[typing.Any, typing.Iterable[typing.Any]]: """ Gets the primary key for this row. If this table only has one primary key column, this property will be a single value. If this table has multiple columns in a primary key, this property will be a tuple. """ pk = self.table.primary_key # type: PrimaryKey result = [] for col in pk.columns: val = self.get_column_value(col) result.append(val) if len(result) == 1: return result[0] return tuple(result) def __getattr__(self, item: str): obb = self._resolve_item(item) return obb __hash__ = object.__hash__ # sql generation methods def _get_insert_sql(self, emitter: typing.Callable[[], str], session: 'md_session.Session'): """ Gets the INSERT into statement SQL for this row. """ if self._session is None: self._session = session q = "INSERT INTO {} ".format(self.table.__quoted_name__) params = {} column_names = [] sql_params = [] for column in self.table.iter_columns(): column_names.append(column.quoted_name) value = self.get_column_value(column) if value is NO_VALUE or (value is None and column.default is NO_DEFAULT): sql_params.append("DEFAULT") else: # emit a new param name = emitter() param_name = session.bind.emit_param(name) # set the params to value # then add the {param_name} to the VALUES params[name] = value sql_params.append(param_name) q += "({}) ".format(", ".join(column_names)) q += "VALUES " q += "({}) ".format(", ".join(sql_params)) # check if we support RETURNS if session.bind.dialect.has_returns: columns_to_get = [] # always return every column # this allows filling in of autoincrement + defaults for column in self.table.iter_columns(): columns_to_get.append(column) to_return = ", ".join(column.quoted_name for column in columns_to_get) q += " RETURNING {}".format(to_return) q += ";" return q, params def _get_update_sql(self, emitter: typing.Callable[[], str], session: 'md_session.Session'): """ Gets the UPDATE statement SQL for this row. """ if self._session is None: self._session = session params = {} base_query = "UPDATE {} SET ".format(self.table.__quoted_name__) # the params to "set" sets = [] # first, get our row history history = md_inspection.get_row_history(self) # ensure the row actually has some history # otherwise, ignore it if not history: return None, None for col, d in history.items(): # minor optimization if d["old"] == d["new"]: continue # get the next param from the counter # then store the name and the value in the row p = emitter() params[p] = d["new"] sets.append("{} = {}".format(col.quoted_name, session.bind.emit_param(p))) # ensure there are actually fields to set if not sets: return None, None base_query += ", ".join(sets) wheres = [] for col in self.table.primary_key.columns: # get the param name # then store it in the params counter # and build a new condition for the WHERE clause p = emitter() params[p] = history[col]["old"] wheres.append("{} = {}".format(col.quoted_name, session.bind.emit_param(p))) base_query += " WHERE ({});".format(" AND ".join(wheres)) return base_query, params def _get_delete_sql(self, emitter: typing.Callable[[], str], session: 'md_session.Session') \ -> typing.Tuple[str, typing.Any]: """ Gets the DELETE sql for this row. """ if self._session is None: self._session = session query = "DELETE FROM {} ".format(self.table.__quoted_name__) # generate the where clauses wheres = [] params = {} for col, value in zip(self.table.primary_key.columns, md_inspection.get_pk(self, as_tuple=True)): name = emitter() params[name] = value wheres.append("{} = {}".format(col.quoted_fullname, session.bind.emit_param(name))) query += "WHERE ({}) ".format(" AND ".join(wheres)) return query, params # value loading methods def _resolve_item(self, name: str): """ Resolves an item on this row. This will check: - Functions decorated with :func:`.row_attr` - Non-column :class:`.Table` members - Columns :param name: The name to resolve. :return: The object returned, if applicable. """ # try and load a relationship loader object try: return self.get_relationship_instance(name) except ValueError: pass # failed to load relationship, too, so load a column value instead col = self.table.get_column(name) if col is None: raise AttributeError("{} was not a function or attribute on the associated table, " "and was not a column".format(name)) from None return col.type.on_get(self)
[docs] def get_old_value(self, column: 'md_column.Column'): """ Gets the old value from the specified column in this row. """ if column.table != self.table: raise ValueError("Column table must match row table") try: return self._previous_values[column] except KeyError: return NO_VALUE
[docs] def get_column_value(self, column: 'md_column.Column', return_default: bool = True): """ Gets the value from the specified column in this row. :param column: The column. :param return_default: If this should return the column default, or NO_VALUE. """ if column.table != self.table: raise ValueError("Column table must match row table") try: return self._values[column] except KeyError: if return_default: default = column.default if default is NO_DEFAULT: return None else: return default else: return NO_VALUE
[docs] def store_column_value(self, column: 'md_column.Column', value: typing.Any, track_history: bool = True): """ Updates the value of a column in this row. This will also update the history of the value, if applicable. """ if self.__deleted: raise RuntimeError("This row is marked as deleted") if column not in self._previous_values and track_history: if column in self._values: self._previous_values[column] = self._values[column] self._values[column] = value return self
[docs] def get_relationship_instance(self, relation_name: str): """ Gets a 'relationship instance'. :param relation_name: The name of the relationship to load. """ try: relation = next(filter( lambda relationship: relationship._name == relation_name, self.table.iter_relationships() )) except StopIteration: raise ValueError("No such relationship '{}'".format(relation_name)) rel = relation.get_instance(self, self._session) rel.set_rows(self._relationship_mapping[relation]) rel._update_sub_relationships(self._relationship_mapping) return rel
def _load_columns_using_table(self, table: 'TableMeta', record: dict, buckets: dict, seen: list): """ Recursively organizes columns in a record into table buckets by scanning the relationships inside the table. :param table: The :class:`.TableMeta` to use to load the table. :param record: The dict-like record to read from. :param buckets: The dict of buckets to store tables in. :param seen: A list of relationships that have already been seen. This prevents infinite \ loops. Outside of internal code, this should be passed in as an empty list. """ for relationship in table.iter_relationships(): if relationship in seen: continue seen.append(relationship) self._load_columns_using_relationship(relationship, record, buckets) self._load_columns_using_table(relationship.foreign_table, record, buckets, seen) def _load_columns_using_relationship(self, relationship, record: dict, buckets: dict): """ Loads columns from a record dict using a relationship object. """ if relationship not in buckets: buckets[relationship] = {} # iterate over every column in the record # checking to see if the column adds up for cname, value in record.copy().items(): # this will load using cname too, thankfully # use the foreign column to load the columns # since this is the one we're joining on column = relationship.foreign_table.get_column(cname) if column is not None: # use the actual name # if we use the cname, it won't expand into the row correctly actual_name = buckets[relationship][actual_name] = value # get rid of the record # so it doesn't come around in the next relationship check record.pop(cname) def _update_relationships(self, record: dict): """ Updates relationship data for this row, storing any extra rows that are needed. :param record: The dict record of extra data to store. """ if self.__deleted: raise RuntimeError("This row is marked as deleted") if self.table not in self._relationship_mapping: self._relationship_mapping[self.table] = [self] buckets = {} seen = [] # this will load columns recursively self._load_columns_using_table(self.table, record, buckets, seen) # store the new relationship data for relationship, subdict in buckets.items(): # Prevent null values from showing up if all(i is None for i in subdict.values()): continue row = relationship.foreign_table._internal_from_row(subdict, existed=True) # ensure the row doesn't already exist with the PK try: next(filter(lambda r: r.primary_key == row.primary_key, self._relationship_mapping[relationship])) except StopIteration: # only append if the row didn't exist earlier # i.e that the filter raised StopIteration self._relationship_mapping[relationship].append(row) else: row._session = self._session
[docs] def to_dict(self, *, include_attrs: bool = False) -> dict: """ Converts this row to a dict, indexed by Column. :param include_attrs: Should this include row_attrs? """ # todo: include row attrs d = {col: self.get_column_value(col) for col in self.table.columns} return d
[docs]def table_base(name: str = "Table", meta: 'TableMetadata' = None): """ Gets a new base object to use for OO-style tables. This object is the parent of all tables created in the object-oriented style; it provides some key configuration to the relationship calculator and the DB object itself. To use this object, you call this function to create the new object, and subclass it in your table classes: .. code-block:: python3 Table = table_base() class User(Table): ... Binding the base object to the database object is essential for querying: .. code-block:: python3 # ensure the table is bound to that database db.bind_tables(Table) # now we can do queries sess = db.get_session() user = await == 2).first() Each Table object is associated with a database interface, which it uses for special querying inside the object, such as :meth:`.Table.get`. .. code-block:: python3 class User(Table): id = Column(Integer, primary_key=True) ... db.bind_tables(Table) # later on, in some worker code user = await User.get(1) :param name: The name of the new class to produce. By default, it is ``Table``. :param meta: The :class:`.TableMetadata` to use as metadata. :return: A new Table class that can be used for OO tables. """ if meta is None: meta = TableMetadata() # This is the best way of cloning the Table object, instead of using `type()`. # It works on all Python versions, and is directly calling the metaclass. clone = TableMeta.__new__(TableMeta, name, (Table,), {"metadata": meta}, register=False) return clone
[docs]class AliasedTable(object): """ Represents an "aliased table". This is a transparent proxy to a :class:`.TableMeta` table, and will create the right Table objects when called. .. code-block:: python3 class User(Table): id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String, nullable=False, unique=True) password = Column(String, nullable=False) NotUser = AliasedTable("not_user", User) """ def __init__(self, alias_name: str, table: 'typing.Type[Table]'): """ :param alias_name: The name of the alias for this table. :param table: The :class:`.TableMeta` used to alias this table. """ self.alias_name = alias_name self.alias_table = table # proxy getattr def __getattr__(self, item): return getattr(self.alias_table, item) # proxy call to the alias table # so it makes new rows def __call__(self, *args, **kwargs): return self.alias_table(*args, **kwargs) def __repr__(self): return "<Alias {} for {}>".format(self.alias_name, self.alias_table)
[docs] def get_column(self, column_name: str) -> 'md_column.Column': """ Gets a column by name from the specified table. This will use the base :meth:`.TableMeta.get_column`, and then search for columns via their alias name using this table. """ c = self.alias_table.get_column(column_name) if c is not None: return c for column in self.alias_table.iter_columns(): if column.alias_name(self) == column_name: return column return None
# override some attributes @property def __tablename__(self) -> str: return self.alias_name @property def __quoted_name__(self): return '"{}"'.format(self.alias_name)
[docs]class PrimaryKey(object): """ Represents the primary key of a table. A primary key can be on any 1 to N columns in a table. .. code-block:: python3 class Something(Table): first_id = Column(Integer) second_id = Column(Integer) pkey = PrimaryKey(Something.first_id, Something.second_id) Something.primary_key = pkey Alternatively, the primary key can be automatically calculated by passing ``primary_key=True`` to columns in their constructor: .. code-block:: python3 class Something(Table): id = Column(Integer, primary_key=True) print(Something.primary_key) """ def __init__(self, *cols: 'md_column.Column'): #: A list of :class:`.Column` that this primary key encompasses. self.columns = list(cols) # type: typing.List[md_column.Column] #: The table this primary key is bound to. self.table = None def __repr__(self): return "<PrimaryKey table='{}' columns='{}'>".format(self.table, self.columns)