Source code for pghistory.models

import uuid
import warnings

import django
from django.apps import apps
from django.conf import settings
from django.db import connections, DEFAULT_DB_ALIAS
from django.db import models
from django.db.models.functions import Cast
from django.db.models.sql import Query
from django.db.models.sql.compiler import SQLCompiler

from pghistory import core, utils


# This class is to preserve backwards compatibility with migrations
PGHistoryJSONField = utils.JSONField


[docs]class Context(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) metadata = utils.JSONField(default=dict)
[docs] @classmethod def install_pgh_attach_context_func(cls, using=DEFAULT_DB_ALIAS): """ Installs a custom store procedure for upserting context for historical events. The upsert is aware of when tracking is enabled in the app (i.e. using pghistory.context()) This stored procedure is automatically installed in pghistory migration 0004. """ with connections[using].cursor() as cursor: cursor.execute( f""" CREATE OR REPLACE FUNCTION _pgh_attach_context() RETURNS {cls._meta.db_table}.id%TYPE AS $$ DECLARE _pgh_context_id UUID; _pgh_context_metadata JSONB; BEGIN BEGIN SELECT INTO _pgh_context_id CURRENT_SETTING('pghistory.context_id'); SELECT INTO _pgh_context_metadata CURRENT_SETTING('pghistory.context_metadata'); EXCEPTION WHEN OTHERS THEN END; IF _pgh_context_id IS NOT NULL AND _pgh_context_metadata IS NOT NULL THEN INSERT INTO {cls._meta.db_table} (id, metadata, created_at, updated_at) VALUES (_pgh_context_id, _pgh_context_metadata, NOW(), NOW()) ON CONFLICT (id) DO UPDATE SET metadata = EXCLUDED.metadata, updated_at = EXCLUDED.updated_at; RETURN _pgh_context_id; ELSE RETURN NULL; END IF; END; $$ LANGUAGE plpgsql; """ )
class EventQueryCompiler(SQLCompiler): def _get_cte(self): """ Returns a CTE that selects from proxied fields """ event_model = self.query.model annotations = { f"_{field.column}": Cast(field.pgh_proxy, output_field=field) for field in self.proxy_fields } values = [ field.name for field in event_model._meta.fields if not hasattr(field, "pgh_proxy") ] + [f"_{field.column}" for field in self.proxy_fields] qset = models.QuerySet(event_model).annotate(**annotations).values(*values) sql, params = qset.query.as_sql(self, self.connection) with self.connection.cursor() as cursor: sql = cursor.mogrify(sql, params) if isinstance(sql, bytes): # psycopg 2/3 return different types sql = sql.decode("utf-8") for field in self.proxy_fields: sql = sql.replace(f"_{field.column}", field.column) sql = sql.replace("->", "->>") return "WITH pgh_event_cte AS (\n" + sql + "\n)\n" @property def proxy_fields(self): return [f for f in self.query.model._meta.fields if hasattr(f, "pgh_proxy")] def as_sql(self, *args, **kwargs): """ If there are proxied fields on the event model, return a select from a CTE. Otherwise don't do anything special """ sql, params = super().as_sql(*args, **kwargs) if any(self.proxy_fields): if django.VERSION < (3, 2): # pragma: no cover raise RuntimeError("Must use Django 3.2 or above to proxy fields on event models") cte = self._get_cte() sql = cte + sql.replace(f'"{self.query.model._meta.db_table}"', '"pgh_event_cte"') return sql, params
[docs]class EventQuery(Query): """A query over an event CTE when proxy fields are used"""
[docs] def get_compiler(self, *args, **kwargs): """ Overrides the Query method get_compiler in order to return an EventQueryCompiler. """ compiler = super().get_compiler(*args, **kwargs) compiler.__class__ = EventQueryCompiler return compiler
[docs]class EventQuerySet(models.QuerySet): """QuerySet with support for proxy fields""" def __init__(self, model=None, query=None, using=None, hints=None): if query is None: query = EventQuery(model) super().__init__(model, query, using, hints)
[docs]class PghEventModel: "A descriptor for accessing the pgh_event_model field on a tracked model" def __get__(self, instance, owner): if len(owner.pgh_event_models) == 1: return owner.pgh_event_models[list(owner.pgh_event_models)[0]] else: raise ValueError( f"{owner.__name__} has more than one tracker." " Use the pgh_event_models dictionary to retrieve the event" " model by label." )
[docs]class Event(models.Model): """ An abstract model for base elements of a event """ pgh_id = models.AutoField(primary_key=True) pgh_created_at = models.DateTimeField(auto_now_add=True) pgh_label = models.TextField(help_text="The event label.") pgh_trackers = None pgh_tracked_model = None objects = EventQuerySet.as_manager() class Meta: abstract = True @property def can_revert(self): """True if the event model can revert the tracked model""" model_fields = {f.name for f in self.pgh_tracked_model._meta.fields} tracked_fields = {f.name for f in self._meta.fields} return model_fields.issubset(tracked_fields)
[docs] def revert(self, using=DEFAULT_DB_ALIAS): """ Reverts the tracked model based on the event fields. Raises a RuntimeError if the event model doesn't track all fields """ if not self.can_revert: raise RuntimeError( f'Event model "{self.__class__.__name__}" cannot revert' f' "{self.pgh_tracked_model.__class__.__name__}" because it' " doesn't track every field." ) qset = models.QuerySet(model=self.pgh_tracked_model, using=using) pk = getattr(self, self.pgh_tracked_model._meta.pk.name) return qset.update_or_create( pk=pk, defaults={ field.name: getattr(self, field.name) for field in self.pgh_tracked_model._meta.fields if field != self.pgh_tracked_model._meta.pk }, )[0]
[docs] @classmethod def pghistory_setup(cls): """ Called when the class is prepared (see apps.py) to finalize setup of the model and register triggers """ if ( not cls._meta.abstract and cls._meta.managed and not cls._meta.proxy ): # pragma: no branch for tracker in cls.pgh_trackers or []: tracker.pghistory_setup(cls) # Set up the event model utility properties. Don't overwrite any # existing attributes if not hasattr(cls.pgh_tracked_model, "pgh_event_models"): cls.pgh_tracked_model.pgh_event_models = {} # Use dir here, otherwise the property is evaluated if "pgh_event_model" not in dir(cls.pgh_tracked_model): cls.pgh_tracked_model.pgh_event_model = PghEventModel() if isinstance(cls.pgh_tracked_model.pgh_event_models, dict): # pragma: no branch cls.pgh_tracked_model.pgh_event_models.update( {tracker.label: cls for tracker in cls.pgh_trackers or []} )
[docs] @classmethod def check(cls, **kwargs): """ Allow proxy models to inherit this model and define their own fields that are dynamically pulled from context """ errors = super().check(**kwargs) # If all local fields are proxied, ignored error E017 and allow the fields to be declared if any(error for error in errors if error.id == "models.E017") and all( hasattr(field, "pgh_proxy") for field in cls._meta.local_fields ): return [error for error in errors if error.id != "models.E017"] else: return errors
class EventsQueryCompiler(SQLCompiler): def _get_empty_select(self): """ When targetting a model that has no event tables, there are no valid tables from which a CTE can be generated. This method generates a CTE that returns an empty table in the schema of the Events table. Note that it's impossible to create an empty CTE, so we select NULL VALUES and LIMIT to 0. """ col_name_clause = ", ".join([field.column for field in self.query.model._meta.fields]) col_select_clause = ",\n".join( [ f"_pgh_obj_event.{field.column}::" f"{field.rel_db_type(self.connection)} AS {field.attname}" for field in self.query.model._meta.fields ] ) values_list = ["(NULL)" for field in self.query.model._meta.fields] return f""" SELECT {col_select_clause} FROM ( VALUES ({', '.join(values_list)}) LIMIT 0 ) AS _pgh_obj_event({col_name_clause}) WHERE pgh_model IS NOT NULL """ def _validate(self): if ( isinstance(self.references, (list, tuple)) and len({r.__class__ for r in self.references}) > 1 ): raise ValueError("The objects passed to references() are not of the same type.") elif ( isinstance(self.tracks, (list, tuple)) and len({o.__class__ for o in self.tracks}) > 1 ): raise ValueError("The objects passed to tracks() are not of the same type.") elif self.references_model and self.tracks_model: raise ValueError("Cannot use both tracks() and references().") @property def references_model(self): if isinstance(self.references, models.QuerySet): return self.references.model._meta.concrete_model elif isinstance(self.references, (list, tuple)) and self.references: return self.references[0].__class__._meta.concrete_model @property def references(self): return self.query.references @property def tracks_model(self): if isinstance(self.tracks, models.QuerySet): return self.tracks.model._meta.concrete_model elif isinstance(self.tracks, (list, tuple)) and self.tracks: return self.tracks[0].__class__._meta.concrete_model @property def tracks(self): return self.query.tracks @property def across(self): return core.event_models( models=self.query.across, references_model=self.references_model, tracks_model=self.tracks_model, ) def _get_context_clauses(self, event_model): """ Get the clauses for obtaining context based on the event model We have the following cases to handle: 1. No pgh_context 2. A pgh_context foreign key is used 3. A pgh_context JSON is used with pgh_context_id 4. A pgh_context JSON is used without pgh_context_id """ proxy_fields = [] for field in self.query.model._meta.fields: if hasattr(field, "pgh_proxy"): if not field.pgh_proxy.startswith("pgh_context__"): # pragma: no cover raise RuntimeError( "Proxy fields on Events models can only proxy the pgh_context field." " E.g. pgh_context__url" ) proxy_fields.append((field, field.pgh_proxy.split("__", 1)[1])) elif not field.attname.startswith("pgh_"): warnings.warn( f"django-pghistory extra field '{field}' in event model" f" '{self.query.model._meta.label}' declared. Use" " 'pghistory.ProxyField' to define a proxy fields instead.", DeprecationWarning, ) proxy_fields.append((field, field.name)) context_join_clause = "" final_context_columns_clause = "".join( [f"_pgh_obj_event.{field.column},\n" for field, _ in proxy_fields] ) if not hasattr(event_model, "pgh_context"): context_id_column_clause = "NULL::UUID AS pgh_context_id" context_column_clause = "NULL::JSONB AS pgh_context" # If the aggregate event model has any proxy fields, # make them null since there is no context on this event annotated_context_columns_clause = "".join( [ f"NULL::{field.rel_db_type(self.connection)} AS {field.column},\n" for field, _ in proxy_fields ] ) elif isinstance(event_model._meta.get_field("pgh_context"), models.ForeignKey): context_id_column_clause = "pgh_context_id" context_column_clause = "_pgh_context.metadata AS pgh_context" # If the aggregate event model has any proxy fields, # pull these directly from the context metadata annotated_context_columns_clause = "".join( [ f"(_pgh_context.metadata->>'{attr}')::" f"{field.rel_db_type(self.connection)} AS {field.column},\n" for field, attr in proxy_fields ] ) context_join_clause = f""" LEFT OUTER JOIN {Context._meta.db_table} _pgh_context ON _pgh_context.id = _event.pgh_context_id """ elif isinstance(event_model._meta.get_field("pgh_context"), utils.DjangoJSONField): context_column_clause = "pgh_context" annotated_context_columns_clause = "".join( [ f"(pgh_context->>'{attr}')::" f"{field.rel_db_type(self.connection)} AS {field.column},\n" for field, attr in proxy_fields ] ) if hasattr(event_model, "pgh_context_id"): context_id_column_clause = "pgh_context_id" else: context_id_column_clause = "NULL::UUID AS pgh_context_id" else: raise AssertionError return ( final_context_columns_clause, context_column_clause, context_id_column_clause, context_join_clause, annotated_context_columns_clause, ) def _get_where_clause(self, event_model): if self.references: rows = self.references cols = [ field.column for field in event_model._meta.fields if utils.related_model(field) == self.references_model ] elif self.tracks: rows = self.tracks cols = [event_model._meta.get_field("pgh_obj").column] else: return "" if isinstance(rows, models.QuerySet) or len(rows) > 1: opt = "IN" # TODO: Use a subquery pks = "','".join(f"{o.pk}" for o in rows) pks = f"('{pks}')" else: opt = "=" pks = f"'{rows[0].pk}'" return "WHERE " + " OR ".join(f"_event.{col} {opt} {pks}" for col in cols) def _get_select(self, event_model): where_clause = self._get_where_clause(event_model) ( final_context_columns_clause, context_column_clause, context_id_column_clause, context_join_clause, annotated_context_columns_clause, ) = self._get_context_clauses(event_model) prev_data_clause = """ LAG(row_to_json(_event)) OVER ( PARTITION BY _event.pgh_obj_id, _event.pgh_label ORDER BY _event.pgh_id ) AS _prev_data """ pgh_obj_id_column_clause = "pgh_obj_id::TEXT" if not hasattr(event_model, "pgh_obj_id"): prev_data_clause = "NULL::JSONB AS _prev_data" pgh_obj_id_column_clause = "NULL::TEXT AS pgh_obj_id" event_table = event_model._meta.db_table return f""" SELECT CONCAT('{event_model._meta.label}', ':', _pgh_obj_event.pgh_id) AS pgh_slug, _pgh_obj_event.pgh_id, _pgh_obj_event.pgh_created_at, _pgh_obj_event.pgh_label, {final_context_columns_clause} _pgh_obj_event.pgh_obj_id, '{event_model._meta.label}' AS pgh_model, '{event_model.pgh_tracked_model._meta.label}' AS pgh_obj_model, ( SELECT JSONB_OBJECT_AGG(filtered.key, filtered.value) FROM ( SELECT key, value FROM JSONB_EACH(_pgh_obj_event._curr_data::JSONB) ) filtered WHERE filtered.key NOT LIKE 'pgh_%%' ) AS pgh_data, ( SELECT JSONB_OBJECT_AGG(curr.key, array[prev.value, curr.value]) FROM ( SELECT key, value FROM JSONB_EACH(_pgh_obj_event._curr_data::JSONB) ) curr LEFT OUTER JOIN ( SELECT key, value FROM JSONB_EACH(_pgh_obj_event._prev_data::JSONB) ) prev ON curr.key = prev.key WHERE curr.key NOT LIKE 'pgh_%%' AND curr.value != prev.value AND prev IS NOT NULL ) AS pgh_diff, _pgh_obj_event.pgh_context_id, _pgh_obj_event.pgh_context FROM ( SELECT pgh_id, pgh_created_at, pgh_label, row_to_json(_event) AS _curr_data, {annotated_context_columns_clause} {prev_data_clause}, {context_id_column_clause}, {context_column_clause}, {pgh_obj_id_column_clause} FROM {event_table} _event {context_join_clause} {where_clause} ORDER BY _event.pgh_id ) _pgh_obj_event """ def _get_cte(self): """ Returns the CTE clause for the aggregate event query """ events_table = self.query.model._meta.db_table inner_cte = "UNION ALL ".join( [self._get_select(event_model) for event_model in self.across] ) if not inner_cte: inner_cte = self._get_empty_select() return f"WITH {events_table} AS (\n" + inner_cte + "\n)\n" def as_sql(self, *args, **kwargs): self._validate() base_sql, base_params = super().as_sql(*args, **kwargs) # Create the CTE that will be queried and insert it into the # main query cte = self._get_cte() return cte + base_sql, base_params
[docs]class EventsQuery(Query): """A query over an aggregate event CTE""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.references = [] self.tracks = [] self.across = [] def get_compiler(self, *args, **kwargs): compiler = super().get_compiler(*args, **kwargs) compiler.__class__ = EventsQueryCompiler return compiler def __chain(self, _name, klass=None, *args, **kwargs): clone = getattr(super(), _name)(self.__class__, *args, **kwargs) clone.references = self.references clone.tracks = self.tracks clone.across = self.across return clone
[docs] def chain(self, klass=None): return self.__chain("chain", klass)
[docs]class EventsQuerySet(models.QuerySet): """QuerySet with support for Common Table Expressions""" def __init__(self, model=None, query=None, using=None, hints=None): # Only create an instance of a Query if this is the first invocation in # a query chain. if query is None: query = EventsQuery(model) super().__init__(model, query, using, hints)
[docs] def across(self, *event_models): """Aggregates events across the provided event models""" qs = self._clone() qs.query.across = [ apps.get_model(model) if isinstance(model, str) else model for model in event_models ] return qs
[docs] def references(self, *objs): """Query any rows that reference the objs. If, for example, a foreign key or pgh_obj field points to the object, it will be aggregated. """ assert len(objs) >= 1 if isinstance(objs[0], (list, tuple, models.QuerySet)): assert len(objs) == 1 objs = objs[0] qs = self._clone() qs.query.references = objs return qs
[docs] def tracks(self, *objs): """Query any rows with pgh_obj equal to the objs.""" assert len(objs) >= 1 if isinstance(objs[0], (list, tuple, models.QuerySet)): assert len(objs) == 1 objs = objs[0] qs = self._clone() qs.query.tracks = objs return qs
[docs]class NoObjectsManager(models.Manager): """ Django's dumpdata and other commands will not work with Events models by default because of how they aggregate multiple tables based on objects. We use this as the default manager for aggregate events so that dumpdata and other management commands still work with these models """
[docs] def get_queryset(self, *args, **kwargs): return models.QuerySet(self.model, using=self._db).none()
[docs]class Events(models.Model): """ A proxy model for aggregating events together across tables and rendering diffs """ pgh_slug = models.TextField( primary_key=True, help_text="The unique identifier across all event tables." ) pgh_model = models.CharField(max_length=64, help_text="The event model.") pgh_id = models.BigIntegerField(help_text="The primary key of the event.") pgh_created_at = models.DateTimeField( auto_now_add=True, help_text="When the event was created." ) pgh_label = models.TextField(help_text="The event label.") pgh_data = utils.JSONField(help_text="The raw data of the event.") pgh_diff = utils.JSONField(help_text="The diff between the previous event of the same label.") pgh_context_id = models.UUIDField(null=True, help_text="The context UUID.") pgh_context = utils.JSONField( null=True, help_text="The context associated with the event.", ) pgh_obj_model = models.CharField(max_length=64, help_text="The object model.") pgh_obj_id = models.TextField(null=True, help_text="The primary key of the object.") objects = EventsQuerySet.as_manager() no_objects = NoObjectsManager() class Meta: managed = False verbose_name_plural = "events" # See the docs for NoObjectsManager about why this is the default # manager default_manager_name = "no_objects"
[docs] @classmethod def check(cls, **kwargs): """ Allow proxy models to inherit this model and define their own fields that are dynamically pulled from context """ errors = super().check(**kwargs) return [error for error in errors if error.id != "models.E017"]
[docs]class MiddlewareEvents(Events): """ A proxy model for aggregating events. Includes additional fields that are captured by the pghistory middleware """ user = core.ProxyField( "pgh_context__user", models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.DO_NOTHING, help_text="The user associated with the event.", ), ) url = core.ProxyField( "pgh_context__url", models.TextField(help_text="The url associated with the event."), ) class Meta: proxy = True verbose_name_plural = "middleware events"
# These models are deprecated from pghistory import deprecated # noqa
[docs]class BaseAggregateEvent(Event): """ A proxy model for aggregating events together across tables and rendering diffs """ pgh_table = models.CharField( max_length=64, help_text="The table under which the event is stored" ) pgh_data = utils.JSONField(help_text="The raw data of the event row") pgh_diff = utils.JSONField( help_text="The diff between the previous event and the current event" ) pgh_context = models.ForeignKey( "pghistory.Context", null=True, help_text="The context, if any, associated with the event", on_delete=models.DO_NOTHING, ) objects = deprecated.AggregateEventQuerySet.as_manager() no_objects = deprecated.NoObjectsManager() class Meta: abstract = True # See the docs for NoObjectsManager about why this is the default # manager default_manager_name = "no_objects"
[docs]class AggregateEvent(BaseAggregateEvent): """ A proxy model for aggregating events together across tables and rendering diffs """ class Meta: managed = False