"""Core functionality and interface of pghistory"""
import copy
import re
import sys
import warnings
from django.apps import apps
from django.db import connections
from django.db import models
from django.db.models import sql
from django.db.models.fields.related import RelatedField
from django.db.models.sql import compiler
from django.utils.module_loading import import_string
import pgtrigger
from pghistory import config, constants, trigger, utils
if utils.psycopg_maj_version == 2:
from psycopg2.extensions import AsIs as Literal
elif utils.psycopg_maj_version == 3:
import psycopg.adapt
class Literal:
def __init__(self, val):
self.val = val
class LiteralDumper(psycopg.adapt.Dumper):
def dump(self, obj):
return obj.val.encode("utf-8")
def quote(self, obj):
return self.dump(obj)
else:
raise AssertionError
_registered_trackers = {}
def _fmt_trigger_name(label):
"""Given a history event label, generate a trigger name"""
if label:
return re.sub("[^0-9a-zA-Z]+", "_", label)
else: # pragma: no cover
return None
[docs]class Tracker:
"""For tracking an event when a condition happens on a model."""
label = None
def __init__(self, label=None):
self.label = label or self.label or self.__class__.__name__.lower()
[docs] def setup(self, event_model):
"""Set up the tracker for the event model"""
pass
[docs] def pghistory_setup(self, event_model):
"""Registers the tracker for the event model and calls user-defined setup"""
tracked_model = event_model.pgh_tracked_model
if (tracked_model, self.label) in _registered_trackers:
raise ValueError(
f'Tracker with label "{self.label}" already exists'
f' for model "{tracked_model._meta.label}". Supply a'
" different label as the first argument to the tracker."
)
_registered_trackers[(tracked_model, self.label)] = event_model
self.setup(event_model)
[docs]class ManualTracker(Tracker):
"""For manually tracking an event."""
[docs]class Event(Tracker):
"""The deprecated base class for event trackers. Use `Tracker` instead"""
def __init__(self, label=None):
warnings.warn(
"The django-pghistory 'Event' class is deprecated and renamed to 'Tracker'.",
DeprecationWarning,
)
super().__init__(label=label)
[docs]class DatabaseTracker(Tracker):
"""For tracking an event automatically based on database changes."""
when = None
condition = None
operation = None
snapshot = None
def __init__(
self,
label=None,
*,
when=None,
condition=None,
operation=None,
snapshot=None,
):
super().__init__(label=label)
self.when = when or self.when
self.condition = condition or self.condition
self.operation = operation or self.operation
self.snapshot = snapshot or self.snapshot
def add_event_trigger(
self, *, event_model, label, snapshot, when, operation, condition=None, name=None
):
pgtrigger.register(
trigger.Event(
event_model=event_model,
label=label,
name=_fmt_trigger_name(name or label),
snapshot=snapshot,
when=when,
operation=operation,
condition=condition,
)
)(event_model.pgh_tracked_model)
[docs] def setup(self, event_model):
self.add_event_trigger(
event_model=event_model,
label=self.label,
snapshot=self.snapshot,
when=self.when,
operation=self.operation,
condition=self.condition,
)
[docs]class DatabaseEvent(DatabaseTracker):
"""
The deprecated base class for all trigger-based trackers.
Use `DatabaseTracker` instead.
"""
def __init__(
self,
label=None,
*,
when=None,
condition=None,
operation=None,
snapshot=None,
): # pragma: no cover
warnings.warn(
"The django-pghistory 'DatabaseEvent' class is deprecated and renamed to"
" 'DatabaseTracker'.",
DeprecationWarning,
)
super().__init__(
label=label,
when=when,
condition=condition,
operation=operation,
snapshot=snapshot,
)
[docs]class Changed(pgtrigger.Condition):
"""A utilty to create conditions based on changes in the tracked model.
Given the event model, we create a condition as follows:
- If the event model trackes every field from the main model, we can
use a standard ``OLD.* IS DISTINCT FROM NEW.*`` condition to snapshot
every change on the main model.
- If the event model tracks a subset of the fields of the main model,
only changes to event fields will trigger a snapshot. In other words,
if the main model has an int and char field, but the event model only
tracks the char field, the condition will be
``OLD.char_field IS DISTINCT FROM NEW.char_field``.
- If one has fields on the event model and wishes to ignore them from
triggering snapshots, pass them to the ``exclude`` argument to this
utility.
"""
def __init__(self, event_model, exclude=None):
self.event_model = event_model
self.exclude = exclude or []
def resolve(self, model):
event_fields = [
field.name
for field in self.event_model._meta.fields
if not field.name.startswith("pgh_")
]
model_fields = [f.name for f in model._meta.fields]
# By default, any field in both the main model and event model that
# change will trigger the condition. You can exclude fields from
# the event model that will trigger snapshots.
conditional_fields = [f for f in event_fields if f not in self.exclude]
if set(event_fields) == set(model_fields) == set(conditional_fields):
# We're tracking every field on any change
condition = pgtrigger.Condition("OLD.* IS DISTINCT FROM NEW.*")
else:
# We're either tracking a subset of fields or we have
condition = pgtrigger.Q()
for field in conditional_fields:
if hasattr(model, field):
condition |= pgtrigger.Q(**{f"old__{field}__df": pgtrigger.F(f"new__{field}")})
return condition.resolve(model)
[docs]class Snapshot(DatabaseTracker):
"""
Tracks changes to fields.
A snapshot tracker tracks inserts and updates. It ensures that no
duplicate rows are created with a pre-configured condition.
NOTE: Two triggers are created since Insert triggers do
not allow comparison against the OLD values. We could also
place this in one trigger and do the condition in the plpgsql code.
"""
def __init__(self, label=None, delayed=False):
self.delayed = delayed
return super().__init__(label=label)
[docs] def setup(self, event_model):
self.add_event_trigger(
event_model=event_model,
label=self.label,
name=f"{self.label}_insert",
snapshot="NEW",
when=pgtrigger.After,
operation=pgtrigger.Insert,
)
self.add_event_trigger(
event_model=event_model,
label=self.label,
name=f"{self.label}_update",
snapshot="NEW",
when=pgtrigger.After,
operation=pgtrigger.Update,
condition=Changed(event_model),
)
class PreconfiguredDatabaseTracker(DatabaseTracker):
"""
A base database tracker that only takes a condition. Subclasses
preconfigure the other parameters
"""
def __init__(self, label=None, *, condition=None):
return super().__init__(label=label, condition=condition)
[docs]class AfterInsertOrUpdate(PreconfiguredDatabaseTracker):
"""
A database tracker that happens after insert/update
"""
operation = pgtrigger.Insert | pgtrigger.Update
snapshot = "NEW"
[docs]class AfterInsert(PreconfiguredDatabaseTracker):
"""For trackers that fire after a database insert"""
operation = pgtrigger.Insert
snapshot = "NEW"
[docs]class BeforeUpdate(PreconfiguredDatabaseTracker):
"""
For trackers that fire before a database update. The OLD values of the row
will be snapshot to the event model
"""
operation = pgtrigger.Update
snapshot = "OLD"
[docs]class AfterUpdate(PreconfiguredDatabaseTracker):
"""
For trackers that fire after a database update. The NEW values of the row
will be snapshot to the event model
"""
operation = pgtrigger.Update
snapshot = "NEW"
[docs]class BeforeDelete(PreconfiguredDatabaseTracker):
"""
For trackers that fire before a database deletion.
"""
operation = pgtrigger.Delete
snapshot = "OLD"
[docs]class BeforeUpdateOrDelete(PreconfiguredDatabaseTracker):
"""
A database tracker that snapshots the old row during an update or delete
"""
operation = pgtrigger.Update | pgtrigger.Delete
snapshot = "OLD"
def _pascalcase(string):
"""Convert string into pascal case."""
string = re.sub(r"^[\-_\.]", "", str(string))
if not string: # pragma: no branch
return string
return string[0].upper() + re.sub(
r"[\-_\.\s]([a-z])",
lambda matched: matched.group(1).upper(),
string[1:],
)
def _generate_event_model_name(base_model, tracked_model, fields):
"""Generates a default history model name"""
name = tracked_model._meta.object_name
if fields:
name += "_" + "_".join(fields)
name += f"_{base_model._meta.object_name.lower()}"
return _pascalcase(name)
def _get_field_construction(field):
_, _, args, kwargs = field.deconstruct()
if isinstance(field, models.ForeignKey):
default = config.foreign_key_field()
elif isinstance(field, RelatedField): # pragma: no cover
default = config.related_field()
else:
default = config.field()
kwargs.update(default.kwargs)
cls = field.__class__
if isinstance(field, models.OneToOneField):
cls = models.ForeignKey
elif isinstance(field, models.FileField):
kwargs.pop("primary_key", None)
for field_class, exclude_kwargs in config.exclude_field_kwargs().items():
if isinstance(field, field_class):
for exclude_kwarg in exclude_kwargs:
kwargs.pop(exclude_kwarg, None)
return cls, args, kwargs
def _generate_history_field(tracked_model, field):
"""
When generating a history model from a tracked model, ensure the fields
are set up properly so that related names and other information
from the tracked model do not cause errors.
"""
field = tracked_model._meta.get_field(field)
if isinstance(field, models.AutoField):
return models.IntegerField()
elif isinstance(field, models.BigAutoField): # pragma: no cover
return models.BigIntegerField()
elif not field.concrete: # pragma: no cover
# Django doesn't have any non-concrete fields that appear
# in ._meta.fields, but packages like django-prices have
# non-concrete fields
return field
# The "swappable" field causes issues during deconstruct()
# since it tries to load models. Patch it and set it back to the original
# value later
field = copy.deepcopy(field)
swappable = getattr(field, "swappable", constants.UNSET)
field.swappable = False
cls, args, kwargs = _get_field_construction(field)
field = cls(*args, **kwargs)
if swappable is not constants.UNSET:
field.swappable = swappable
return field
def _generate_related_name(base_model, fields):
"""
Generates a related name to the tracking model based on the base
model and traked fields
"""
related_name = base_model._meta.object_name.lower()
return "_".join(fields) + f"_{related_name}" if fields else related_name
def _validate_event_model_path(*, app_label, model_name, abstract):
if app_label not in apps.app_configs:
raise ValueError(f'App label "{app_label}" is invalid')
app = apps.app_configs[app_label]
models_module = app.module.__name__ + ".models"
if not abstract and hasattr(sys.modules[models_module], model_name):
raise ValueError(
f"App {app_label} already has {model_name} model. You must"
" explicitly declare an unused model name for the pghistory model."
)
elif models_module.startswith("django."):
raise ValueError(
"A history model cannot be generated under third party app"
f' "{app_label}". You must explicitly pass an app label'
" when configuring tracking."
)
def _get_obj_field(*, obj_field, tracked_model, obj_fk, related_name, base_model, fields):
if obj_fk is not constants.UNSET:
warnings.warn(
"The django-pghistory 'obj_fk' argument is deprecated. Use 'obj_field' instead.",
DeprecationWarning,
)
return obj_fk
elif obj_field is None: # pragma: no cover
return None
elif obj_field is constants.UNSET:
obj_field = config.obj_field()
if related_name is not None:
warnings.warn(
"The django-pghistory 'related_name' argument is deprecated. Use the"
" 'related_name' option of 'obj_field' instead.",
DeprecationWarning,
)
if obj_field._kwargs.get("related_name", constants.DEFAULT) == constants.DEFAULT:
obj_field._kwargs["related_name"] = related_name or _generate_related_name(
base_model, fields
)
if isinstance(obj_field, config.ObjForeignKey):
return models.ForeignKey(tracked_model, **obj_field.kwargs)
else: # pragma: no cover
raise TypeError("obj_field must be of type pghistory.ObjForeignKey.")
def _get_context_field(*, context_field, context_fk):
if context_fk is not constants.UNSET:
warnings.warn(
"The django-pghistory 'context_fk' argument is deprecated. Use "
"'context_field' instead.",
DeprecationWarning,
)
return context_fk
elif context_field is None: # pragma: no cover
return None
elif context_field is constants.UNSET:
context_field = config.context_field()
if isinstance(context_field, config.ContextForeignKey):
return models.ForeignKey("pghistory.Context", **context_field.kwargs)
elif isinstance(context_field, config.ContextJSONField):
return utils.JSONField(**context_field.kwargs)
else: # pragma: no cover
raise TypeError(
"context_field must be of type pghistory.ContextForeignKey"
" or pghistory.ContextJSONField."
)
def _get_context_id_field(*, context_id_field):
if context_id_field is None:
return None
elif context_id_field is constants.UNSET: # pragma: no branch
context_id_field = config.context_id_field()
if isinstance(context_id_field, config.ContextUUIDField):
return models.UUIDField(**context_id_field.kwargs)
else: # pragma: no cover
raise TypeError("context_id_field must be of type pghistory.ContextUUIDField.")
[docs]def create_event_model(
tracked_model,
*trackers,
fields=None,
exclude=None,
obj_fk=constants.UNSET,
context_fk=constants.UNSET,
obj_field=constants.UNSET,
context_field=constants.UNSET,
context_id_field=constants.UNSET,
related_name=None,
name=None,
model_name=None,
app_label=None,
base_model=None,
attrs=None,
meta=None,
abstract=True,
):
"""
Obtain a base event model.
Instead of using `pghistory.track`, which dynamically generates an event
model, one can instead construct a event model themselves, which
will also set up event tracking for the tracked model.
Args:
tracked_model (models.Model): The model that is being tracked.
*trackers (List[`Tracker`]): The event trackers. When using any tracker that
inherits `pghistory.DatabaseTracker`, such as
`pghistory.AfterInsert`, a Postgres trigger will be installed that
automatically tracks the event with a generated event model. Trackers
that do not inherit `pghistory.DatabaseTracker` are assumed to have
manual events created by the user.
fields (List[str], default=None): The list of fields to snapshot
when the event takes place. When no fields are provided, the entire
model is snapshot when the event happens. Note that snapshotting
of the OLD or NEW row is configured by the ``snapshot``
attribute of the `DatabaseTracker` object. Manual events must specify
these fields during manual creation.
exclude (List[str], default=None): Instead of providing a list
of fields to snapshot, a user can instead provide a list of fields
to not snapshot.
obj_field (pghistory.ObjForeignKey, default=unset): The foreign key field
configuration that references the tracked object. Defaults to an
unconstrained non-nullable foreign key. Use ``None`` to create a event model
with no reference to the tracked object.
context_field (Union[pghistory.ContextForeignKey, pghistory.ContextJSONField], default=unset):
The context field configuration. Defaults to a nullable unconstrained foreign key.
Use ``None`` to avoid attaching historical context altogether.
context_id_field (pghistory.ContextUUIDField, default=unset): The context ID field
configuration when using a ContextJSONField for the context_field. When using
a denormalized context field, the ID field is used to track the UUID of the
context. Use ``None`` to avoid using this field for denormalized context.
model_name (str, default=None): Use a custom model name
when the event model is generated. Otherwise a default
name based on the tracked model and fields will be created.
app_label (str, default=None): The app_label for the generated
event model. Defaults to the app_label of the tracked model. Note,
when tracking a Django model (User) or a model of a third-party
app, one must manually specify the app_label of an internal app to
use so that migrations work properly.
base_model (models.Model, default=pghistory.models.Event): The base model for the event
model. Must inherit pghistory.models.Event.
attrs (dict, default=None): Additional attributes to add to the event model
meta (dict, default=None): Additional attributes to add to the Meta class of the
event model.
abstract (bool, default=True): ``True`` if the generated model should
be an abstract model.
Example:
Create a manual event model::
class MyEventModel(create_event_model(
TrackedModel,
pghistory.AfterInsert('model_create'),
)):
# Add custom indices or change default field declarations...
""" # noqa
event_model = import_string("pghistory.models.Event")
base_model = base_model or config.base_model()
assert issubclass(base_model, event_model)
obj_field = _get_obj_field(
obj_field=obj_field,
tracked_model=tracked_model,
obj_fk=obj_fk,
related_name=related_name,
base_model=base_model,
fields=fields,
)
context_field = _get_context_field(context_field=context_field, context_fk=context_fk)
context_id_field = _get_context_id_field(context_id_field=context_id_field)
if name is not None: # pragma: no cover
warnings.warn(
"The 'name' argument for pghistory.create_event_model is"
" deprecated. Use the 'model_name' argument",
DeprecationWarning,
)
model_name = name
model_name = model_name or _generate_event_model_name(base_model, tracked_model, fields)
app_label = app_label or tracked_model._meta.app_label
_validate_event_model_path(app_label=app_label, model_name=model_name, abstract=abstract)
app = apps.app_configs[app_label]
models_module = app.module.__name__ + ".models"
attrs = attrs or {}
attrs.update({"pgh_trackers": trackers})
meta = meta or {}
exclude = exclude or []
fields = fields or [f.name for f in tracked_model._meta.fields if f.name not in exclude]
class_attrs = {
"__module__": models_module,
"Meta": type("Meta", (), {"abstract": abstract, "app_label": app_label, **meta}),
"pgh_tracked_model": tracked_model,
**{field: _generate_history_field(tracked_model, field) for field in fields},
**attrs,
}
if isinstance(context_field, utils.JSONField) and context_id_field:
class_attrs["pgh_context_id"] = context_id_field
if context_field:
class_attrs["pgh_context"] = context_field
if obj_field:
class_attrs["pgh_obj"] = obj_field
event_model = type(model_name, (base_model,), class_attrs)
if not abstract:
setattr(sys.modules[models_module], model_name, event_model)
return event_model
def get_event_model(*args, **kwargs):
warnings.warn(
"The django-pghistory 'get_event_model' function is deprecated. Use"
" 'create_event_model' instead.",
DeprecationWarning,
)
return create_event_model(*args, **kwargs)
[docs]def ProxyField(proxy, field):
"""
Proxies a JSON field from a model and adds it as a field in the queryset.
Args:
proxy (str): The value to proxy, e.g. "user__email"
field (Type[django.models.Field]): The field that will be used to cast
the resulting value
"""
if not isinstance(field, models.Field): # pragma: no cover
raise TypeError(f'"{field}" is not a Django model Field instace')
field.pgh_proxy = proxy
return field
[docs]def track(
*trackers,
fields=None,
exclude=None,
obj_fk=constants.UNSET,
context_fk=constants.UNSET,
obj_field=constants.UNSET,
context_field=constants.UNSET,
context_id_field=constants.UNSET,
related_name=None,
model_name=None,
app_label=None,
base_model=None,
attrs=None,
meta=None,
):
"""
A decorator for tracking events for a model.
When using this decorator, an event model is dynamically generated
that snapshots the entire model or supplied fields of the model
based on the ``events`` supplied. The snapshot is accompanied with
the label that identifies the event.
Args:
*trackers (List[`Tracker`]): The event trackers. When using any tracker that
inherits `pghistory.DatabaseTracker`, such as
`pghistory.AfterInsert`, a Postgres trigger will be installed that
automatically tracks the event with a generated event model. Trackers
that do not inherit `pghistory.DatabaseTracker` are assumed to have
manual events created by the user.
fields (List[str], default=None): The list of fields to snapshot
when the event takes place. When no fields are provided, the entire
model is snapshot when the event happens. Note that snapshotting
of the OLD or NEW row is configured by the ``snapshot``
attribute of the `DatabaseTracker` object. Manual events must specify
these fields during manual creation.
exclude (List[str], default=None): Instead of providing a list
of fields to snapshot, a user can instead provide a list of fields
to not snapshot.
obj_field (pghistory.ObjForeignKey, default=unset): The foreign key field
configuration that references the tracked object. Defaults to an
unconstrained non-nullable foreign key. Use ``None`` to create a event model
with no reference to the tracked object.
context_field (Union[pghistory.ContextForeignKey, pghistory.ContextJSONField], default=unset):
The context field configuration. Defaults to a nullable unconstrained foreign key.
Use ``None`` to avoid attaching historical context altogether.
context_id_field (pghistory.ContextUUIDField, default=unset): The context ID field
configuration when using a ContextJSONField for the context_field. When using
a denormalized context field, the ID field is used to track the UUID of the
context. Use ``None`` to avoid using this field for denormalized context.
model_name (str, default=None): Use a custom model name
when the event model is generated. Otherwise a default
name based on the tracked model and fields will be created.
app_label (str, default=None): The app_label for the generated
event model. Defaults to the app_label of the tracked model. Note,
when tracking a Django model (User) or a model of a third-party
app, one must manually specify the app_label of an internal app to
use so that migrations work properly.
base_model (models.Model, default=`pghistory.models.Event`): The base model for the event
model. Must inherit `pghistory.models.Event`.
attrs (dict, default=None): Additional attributes to add to the event model
meta (dict, default=None): Additional attributes to add to the Meta class of the
event model.
""" # noqa
def _model_wrapper(model_class):
create_event_model(
model_class,
*trackers,
fields=fields,
exclude=exclude,
obj_fk=obj_fk,
context_fk=context_fk,
obj_field=obj_field,
context_field=context_field,
context_id_field=context_id_field,
model_name=model_name,
related_name=related_name,
app_label=app_label,
abstract=False,
base_model=base_model,
attrs=attrs,
meta=meta,
)
return model_class
return _model_wrapper
class _InsertEventCompiler(compiler.SQLInsertCompiler):
def as_sql(self, *args, **kwargs):
ret = super().as_sql(*args, **kwargs)
assert len(ret) == 1
params = [
param if field.name != "pgh_context" else Literal("_pgh_attach_context()")
for field, param in zip(self.query.fields, ret[0][1])
]
return [(ret[0][0], params)]
[docs]def create_event(obj, *, label, using="default"):
"""Manually create a event for an object.
Events are automatically linked with any context being tracked
via `pghistory.context`.
Args:
obj (models.Model): An instance of a model.
label (str): The event label.
using (str): The database
Raises:
ValueError: If the event label has not been registered for the model.
Returns:
models.Model: The created event model.
"""
# Verify that the provided label is tracked
if (obj.__class__, label) not in _registered_trackers:
raise ValueError(
f'"{label}" is not a registered tracker label for model {obj._meta.object_name}.'
)
event_model = _registered_trackers[(obj.__class__, label)]
event_model_kwargs = {
"pgh_label": label,
**{
field.attname: getattr(obj, field.attname)
for field in event_model._meta.fields
if not field.name.startswith("pgh_")
},
}
if hasattr(event_model, "pgh_obj"):
event_model_kwargs["pgh_obj"] = obj
event_obj = event_model(**event_model_kwargs)
# The event model is inserted manually with a custom SQL compiler
# that attaches the context using the _pgh_attach_context
# stored procedure. Django does not allow one to use F()
# objects to reference stored procedures, so we have to
# inject it with a custom SQL compiler here.
query = sql.InsertQuery(event_model)
query.insert_values(
[field for field in event_model._meta.fields if not isinstance(field, models.AutoField)],
[event_obj],
)
if utils.psycopg_maj_version == 3:
connections[using].connection.adapters.register_dumper(Literal, LiteralDumper)
vals = _InsertEventCompiler(query, connections[using], using=using).execute_sql(
event_model._meta.fields
)
# Django <= 2.2 does not support returning fields from a bulk create,
# which requires us to fetch fields again to populate the context
if isinstance(vals, int): # pragma: no cover
return event_model.objects.get(pgh_id=vals)
else:
# Django >= 3.1 returns the values as a list of one element
if isinstance(vals, list) and len(vals) == 1: # pragma: no branch
vals = vals[0]
for field, val in zip(event_model._meta.fields, vals):
setattr(event_obj, field.attname, val)
return event_obj
def event_models(
models=None, references_model=None, tracks_model=None, include_missing_pgh_obj=False
):
"""
Retrieve and filter all events models.
Args:
models (List[Model], default=None): The starting list of event models.
references_model (Model, default=None): Filter by event models that reference this model.
tracks_model (Model, default=None): Filter by models that directly track this model
and have pgh_obj fields
including_missing_pgh_obj (bool, default=False): Return tracked models even if the pgh_obj
field is not available.
"""
from pghistory.models import Event, BaseAggregateEvent # noqa
models = models or [
model
for model in apps.get_models()
if issubclass(model, Event)
and not issubclass(model, BaseAggregateEvent)
and not model._meta.abstract
and not model._meta.proxy
and model._meta.managed
]
if references_model:
models = [
model
for model in models
if any(utils.related_model(field) == references_model for field in model._meta.fields)
]
if tracks_model and not include_missing_pgh_obj:
models = [
model
for model in models
if "pgh_obj" in (f.name for f in model._meta.fields)
and utils.related_model(model._meta.get_field("pgh_obj")) == tracks_model
]
elif tracks_model and include_missing_pgh_obj:
models = [
model
for model in models
if model.pgh_tracked_model._meta.concrete_model == tracks_model
]
return models