Commit 6998dd24 authored by Daniel Klaffenbach's avatar Daniel Klaffenbach 🐍

Initial commit

parents
Pipeline #3153 skipped
*~
*.pyc
*.pyo
*.swp
.__afs*
db.sqlite3
.settings/
*/settings/local.py
from __future__ import unicode_literals
USERNAME_MODEL_FIELD = '_journal_entry_username'
from django.contrib import admin
from django.db.models import Q, ForeignKey
from .models import JournalEntry
class JournalAdmin(admin.ModelAdmin):
"""
This class can be used for all models with a Journal enabled. It
provides a custom history view, which displays all `JournalEntry`
records for a model and inline models.
"""
object_history_template = 'model_journal/object_history.html'
def history_view(self, request, object_id, extra_context=None):
if not hasattr(self.model, '_MODEL_JOURNAL_ATRR'):
return super(JournalAdmin, self).history_view(request, object_id, extra_context=extra_context)
# Finds all JournalEntries for `self.model`
obj = self.get_object(request, object_id)
related_journal_manager = getattr(obj, obj._MODEL_JOURNAL_ATRR)
journal_filter = Q(
Q(object_id=object_id) &
related_journal_manager.get_content_type_filter(with_children=True, with_parents=True)
)
"""
We also want to show the journal entries of all models displayed through
inline instances. So loop through the inline instances, find all models
with a `Journal` relation and adjust the journal_filter accordingly.
"""
for inline in self.get_inline_instances(request):
model = inline.model
if not hasattr(model, '_MODEL_JOURNAL_ATRR'):
# This model does not have a Journal relation
continue
# Find the name of the ForeignKey of the related/inline model.
fk_name = inline.fk_name
if not fk_name:
# If `fk_name` was not set in the inline admin, try to guess it
for field in model._meta.fields:
if isinstance(field, ForeignKey) and field.related_model == self.model:
fk_name = field.name
break
if not fk_name:
continue
# Finds all JournalEntries for `inline.model`
inline_filter = Q(Q(object_id__in=model.objects.filter(**{fk_name: object_id}).values_list('pk', flat=True)) &
Q(content_type__app_label=model._meta.app_label) &
Q(content_type__model=model.__name__.lower()))
journal_filter = journal_filter | inline_filter
journal = JournalEntry.objects.filter(journal_filter)
extra_context = {
'journal': journal
}
return super(JournalAdmin, self).history_view(request, object_id, extra_context=extra_context)
from __future__ import unicode_literals
from django.apps import AppConfig
class ModelJournalConfig(AppConfig):
name = 'model_journal'
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db.models import signals
from django.utils.functional import curry
from . import USERNAME_MODEL_FIELD
class ModelJournalMiddleware(object):
def process_request(self, request):
if not request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
if hasattr(request, 'user') and request.user.is_authenticated():
user = request.user
username = getattr(user, user.USERNAME_FIELD)
else:
username = ''
set_pre_save_attrs = curry(self._set_pre_save_info, username)
signals.pre_save.connect(set_pre_save_attrs, dispatch_uid =(self.__class__, request,), weak=False)
def process_response(self, request, response):
signals.pre_save.disconnect(dispatch_uid=(self.__class__, request,))
return response
def _set_pre_save_info(self, username, sender, instance, **kwargs):
if hasattr(sender, 'get_log_entry_username'):
setattr(instance, USERNAME_MODEL_FIELD, username)
# -*- coding: utf-8 -*-
# Generated by Django 1.9.6 on 2016-05-31 06:33
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
import jsonfield.fields
class Migration(migrations.Migration):
initial = True
dependencies = [
('contenttypes', '0002_remove_content_type_name'),
]
operations = [
migrations.CreateModel(
name='JournalEntry',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('user', models.CharField(db_index=True, editable=False, max_length=100)),
('time', models.DateTimeField(auto_now_add=True)),
('operation', models.PositiveSmallIntegerField(choices=[(1, 'Created'), (2, 'Changed'), (3, 'Deleted')], db_index=True, editable=False)),
('object_id', models.TextField(db_index=True, editable=False)),
('attrs', jsonfield.fields.JSONField(editable=False)),
('content_type', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
],
options={
'ordering': ('id',),
},
),
]
from __future__ import unicode_literals
from getpass import getuser
import logging
from django.apps import apps
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ImproperlyConfigured
from django.db import models
from django.db.models import Q
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _, ugettext
from jsonfield import JSONField
from . import USERNAME_MODEL_FIELD
from django.utils.formats import date_format
from django.utils.html import format_html
from django.utils.safestring import mark_safe
logger = logging.getLogger(__name__)
class JournalEntryManager(models.Manager):
use_for_related_fields = True
def get_content_type_filter(self, with_children=False, with_parents=False):
if not hasattr(self, 'instance'):
return None
content_type_filter = Q(
Q(content_type__app_label=self.instance._meta.app_label) &
Q(content_type__model=self.instance.__class__.__name__.lower())
)
if with_children:
for cls in apps.get_models():
if issubclass(cls, self.instance.__class__) and cls != self.instance.__class__:
content_type_filter |= Q(
Q(content_type__app_label=cls._meta.app_label) &
Q(content_type__model=cls.__name__.lower())
)
if with_parents:
for cls in self.instance._meta.get_parent_list():
content_type_filter |= Q(
Q(content_type__app_label=cls._meta.app_label) &
Q(content_type__model=cls.__name__.lower())
)
return content_type_filter
def with_children(self):
"""
Useful for related managers to get the full journal for the model,
in case model inheritance is used.
"""
if not hasattr(self, 'instance'):
return self.all()
else:
content_type_filter = self.get_content_type_filter(with_children=True)
return self.model.objects.filter(object_id=self.instance.pk).filter(content_type_filter)
def with_parents(self):
"""
Useful for related managers to get the full journal for the model,
in case model inheritance is used.
"""
if not hasattr(self, 'instance'):
return self.all()
else:
content_type_filter = self.get_content_type_filter(with_parents=True)
return self.model.objects.filter(object_id=self.instance.pk).filter(content_type_filter)
@python_2_unicode_compatible
class JournalEntry(models.Model):
"""
Contains all logs for all models. The objects are referenced through a
GenericForeignKey and the attributes are stored in a JSON field.
"""
OPERATION_ADD = 1
OPERATION_CHANGE = 2
OPERATION_DELETE = 3
OPERATION_CHOICES = (
(OPERATION_ADD, _('Created')),
(OPERATION_CHANGE, _('Changed')),
(OPERATION_DELETE, _('Deleted'))
)
user = models.CharField(db_index=True, editable=False, max_length=100)
time = models.DateTimeField(auto_now_add=True, editable=False)
operation = models.PositiveSmallIntegerField(choices=OPERATION_CHOICES, db_index=True, editable=False)
content_type = models.ForeignKey(ContentType, editable=False)
object_id = models.TextField(db_index=True, editable=False)
obj = GenericForeignKey('content_type', 'object_id')
attrs = JSONField(editable=False)
objects = JournalEntryManager()
class Meta:
ordering = ('id', )
def __str__(self):
return "JournalEntry from %s" %date_format(self.time, "SHORT_DATETIME_FORMAT")
def _get_last_entry(self):
"""
Returns the predecessor of this JournalEntry.
:rtype: JournalEntry | None
"""
return self.__class__.objects.filter(content_type=self.content_type, object_id=self.object_id, pk__lt=self.pk).last()
def get_diff(self):
last = self._get_last_entry()
if last:
attrs_last = last.attrs
else:
attrs_last = {}
diff = {}
for attr in self.attrs:
if attr not in attrs_last:
diff[attr] = (None, self.attrs[attr])
elif attrs_last[attr] != self.attrs[attr]:
diff[attr] = (attrs_last[attr], self.attrs[attr])
return diff
def get_changed_attributes(self):
if self.operation == self.OPERATION_ADD:
return []
last = self._get_last_entry()
if not last:
return []
diff = []
for attr in self.attrs:
if attr not in last.attrs:
diff.append(attr)
elif last.attrs[attr] != self.attrs[attr]:
diff.append(attr)
return diff
def get_display_short(self):
if self.operation == self.OPERATION_CHANGE:
attrs = ', '.join(self.get_changed_attributes())
if not attrs:
return ugettext('%s was saved without any changes.' %self.obj._meta.verbose_name)
else:
return ugettext('Changed attributes: %s' %attrs)
elif self.operation == self.OPERATION_ADD:
return ugettext('%s was added.' %self.obj._meta.verbose_name)
else:
return ugettext('%s was deleted.' %self.obj._meta.verbose_name)
def get_display_full(self):
verbose_name = self.obj._meta.verbose_name
if self.operation == self.OPERATION_ADD:
label = ugettext('%s was added.' %verbose_name)
li = ''
for attr in self.attrs:
if self.attrs[attr]:
li += format_html('<li><strong>{}:</strong> "{}"</li>', attr, self.attrs[attr])
if li:
label += '<ul>%s</ul>' %li
return mark_safe(label)
elif self.operation == self.OPERATION_CHANGE:
diff = self.get_diff()
if not diff:
return ugettext('%s was saved without any changes.' %self.obj._meta.verbose_name)
else:
label = ugettext('%s was changed:' %verbose_name)
li = ''
for i in diff:
li += format_html('<li><strong>{}:</strong> "{}" &rarr; "{}"</li>', i, diff[i][0], diff[i][1])
label += li
return mark_safe(label)
else:
return ugettext('%s was deleted.' %verbose_name)
class Journal(GenericRelation):
def __init__(self, exclude=[]):
super(Journal, self).__init__(to=JournalEntry)
self._exclude = exclude
def contribute_to_class(self, cls, name):
super(Journal, self).contribute_to_class(cls, name)
if not hasattr(cls, 'get_journal_entry_username'):
setattr(cls, 'get_journal_entry_username', get_journal_entry_username)
else:
logger.info("Using get_journal_entry_username() provided by model %s." %cls.__name__)
if not hasattr(cls, '_MODEL_JOURNAL_ATRR'):
setattr(cls, '_MODEL_JOURNAL_ATRR', name)
elif cls._MODEL_JOURNAL_ATRR != name:
logger.error("Multiple Journal relations defined for model %s." %cls.__name__)
return
models.signals.post_save.connect(self.post_save, sender=cls, weak=False)
models.signals.post_delete.connect(self.post_delete, sender=cls, weak=False)
def create_journal_entry(self, instance, operation):
attrs = {}
for field in instance._meta.fields:
if field.attname not in self._exclude:
attrs[field.attname] = getattr(instance, field.attname)
JournalEntry.objects.create(
user=instance.get_journal_entry_username(),
operation=operation,
attrs=attrs,
object_id=instance.pk,
content_type=ContentType.objects.get_for_model(instance)
)
def post_save(self, instance, created, **kwargs):
if created:
operation = JournalEntry.OPERATION_ADD
else:
operation = JournalEntry.OPERATION_CHANGE
self.create_journal_entry(instance, operation)
def post_delete(self, instance, **kwargs):
self.create_journal_entry(instance, JournalEntry.OPERATION_DELETE)
def get_journal_entry_username(self):
try:
return getattr(self, USERNAME_MODEL_FIELD)
except AttributeError:
if 'model_journal.middleware.ModelJournalMiddleware' not in settings.MIDDLEWARE_CLASSES:
raise ImproperlyConfigured(
"The ModelJournalMiddleware is missing from your MIDDLEWARE_CLASSES"
)
else:
username = getuser()
logger.info("Using current system user (%s) for ModelJournal logging." %username)
return username
{% extends "admin/base_site.html" %}
{% load i18n admin_urls %}
{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">{% trans 'Home' %}</a>
&rsaquo; <a href="{% url 'admin:app_list' app_label=opts.app_label %}">{{ opts.app_config.verbose_name }}</a>
&rsaquo; <a href="{% url opts|admin_urlname:'changelist' %}">{{ module_name }}</a>
&rsaquo; <a href="{% url opts|admin_urlname:'change' object.pk|admin_urlquote %}">{{ object|truncatewords:"18" }}</a>
&rsaquo; {% trans 'History' %}
</div>
{% endblock %}
{% block content %}
<div id="content-main">
<div class="module">
{% if journal %}
<table id="change-history">
<thead>
<tr>
<th scope="col">{% trans 'Date/time' %}</th>
<th scope="col">{% trans 'User' %}</th>
<th scope="col">{% trans 'Action' %}</th>
</tr>
</thead>
<tbody>
{% for action in journal %}
<tr>
<th scope="row">{{ action.time|date:"DATETIME_FORMAT" }}</th>
<td>{{ action.user }}</td>
<td>{{ action.get_display_full }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>{% trans "This object doesn't have a change history. It probably wasn't added via this admin site." %}</p>
{% endif %}
</div>
</div>
{% endblock %}
from django.test import TestCase
# Create your tests here.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment