185 lines
7.5 KiB
Python
185 lines
7.5 KiB
Python
import json
|
|
from functools import update_wrapper
|
|
|
|
from django.apps import apps
|
|
from django.contrib import admin
|
|
from django.contrib.admin.views.autocomplete import AutocompleteJsonView
|
|
from django.contrib.admin.widgets import AutocompleteMixin, AutocompleteSelect
|
|
from django.core.exceptions import FieldDoesNotExist, PermissionDenied
|
|
from django.db.models import Case, Max, Q, Value, When
|
|
from django.http import Http404, JsonResponse
|
|
from django.urls import path
|
|
|
|
|
|
class DrillDownAutocompleteJsonView(AutocompleteJsonView):
|
|
"""Handle AutocompleteWidget's AJAX requests for data."""
|
|
admin_context = None
|
|
|
|
def process_request(self, request):
|
|
"""
|
|
Validate request integrity, extract and return request parameters.
|
|
|
|
Since the subsequent view permission check requires the target model
|
|
admin, which is determined here, raise PermissionDenied if the
|
|
requested app, model or field are malformed.
|
|
|
|
Raise Http404 if the target model admin is not configured properly with
|
|
search_fields.
|
|
"""
|
|
(
|
|
term,
|
|
model_admin,
|
|
source_field,
|
|
to_field_name
|
|
) = super().process_request(request)
|
|
|
|
if 'filtered_by_dict' in request.GET:
|
|
try:
|
|
filtered_by_dict = json.loads(
|
|
request.GET.get("filtered_by_dict"))
|
|
except json.decoder.JSONDecodeError as e:
|
|
raise PermissionDenied from e
|
|
app_label = request.GET["app_label"]
|
|
model_name = request.GET["model_name"]
|
|
source_model = apps.get_model(app_label, model_name)
|
|
try:
|
|
drilldown_field = self.admin_context.get_drilldown_autocomplete_fields(request)[source_field.name]
|
|
filtered_by = set(drilldown_field['filtered_by'])
|
|
except KeyError as e:
|
|
raise PermissionDenied from e
|
|
if (set(filtered_by_dict.keys()) > filtered_by):
|
|
raise PermissionDenied
|
|
rel_paths = drilldown_field['relationship_path']
|
|
target_model = source_field.remote_field.model
|
|
try:
|
|
for v in rel_paths.values():
|
|
target_model._meta.get_field(v)
|
|
except FieldDoesNotExist as e:
|
|
raise PermissionDenied from e
|
|
self.target_model = target_model
|
|
self.filtered_by_dict = filtered_by_dict
|
|
self.drilldown_field = drilldown_field
|
|
self.drilldown_filter_data = {
|
|
rel_paths[k]: v for k, v in filtered_by_dict.items()}
|
|
|
|
return term, model_admin, source_field, to_field_name
|
|
|
|
def get_queryset(self):
|
|
"""Return queryset based on ModelAdmin.get_search_results()."""
|
|
qs = super().get_queryset()
|
|
if hasattr(self, 'filtered_by_dict'):
|
|
dd_field = self.drilldown_field
|
|
drilldown_filter_conditions = Q(**self.drilldown_filter_data)
|
|
if dd_field.get('autoselect_on_singleton', False):
|
|
self.autoselect = False
|
|
if dd_field.get('included_only', False):
|
|
qs = qs.filter(drilldown_filter_conditions).annotate(
|
|
ddok=Value(1))
|
|
else:
|
|
qs = qs.annotate(ddok=Max(Case(When(drilldown_filter_conditions, then=Value(
|
|
1)), default=Value(0)))).order_by('-ddok', *qs.query.order_by)
|
|
if not getattr(self, 'autoselect', True) and qs.filter(ddok=1).count() == 1:
|
|
self.autoselect = True
|
|
else:
|
|
qs = qs.annotate(ddok=Value(1))
|
|
return qs
|
|
|
|
def serialize_result(self, obj, to_field_name):
|
|
"""
|
|
Convert the provided model object to a dictionary that is added to the
|
|
results list.
|
|
"""
|
|
out = super().serialize_result(obj, to_field_name)
|
|
if hasattr(obj, 'ddok'):
|
|
out['ddok'] = obj.ddok
|
|
if getattr(self, 'autoselect', False) and obj.ddok == 1:
|
|
out['autoselect'] = True
|
|
return out
|
|
|
|
|
|
class DrillDownAutocompleteMixin(AutocompleteMixin):
|
|
url_name = "%s:drilldown_autocomplete"
|
|
|
|
|
|
class DrillDownAutocompleteSelect(AutocompleteSelect, DrillDownAutocompleteMixin):
|
|
pass
|
|
|
|
|
|
class DrillDownAutocompleteAdmin(admin.options.BaseModelAdmin):
|
|
drilldown_autocomplete_fields = dict()
|
|
|
|
class Media:
|
|
css = {
|
|
'all': ('admin/css/drilldown_autocomplete.css',)
|
|
}
|
|
|
|
def get_drilldown_autocomplete_fields(self, request):
|
|
return self.drilldown_autocomplete_fields
|
|
|
|
def formfield_for_foreignkey(self, db_field, request, **kwargs):
|
|
"""
|
|
Get a form Field for a ForeignKey.
|
|
"""
|
|
db = kwargs.get("using")
|
|
|
|
if "widget" not in kwargs:
|
|
daf = self.get_drilldown_autocomplete_fields(request)
|
|
if db_field.name in daf:
|
|
def get_fields_to_reset_recursive(daf, fields_to_reset, seen=None):
|
|
to_reset = set()
|
|
if seen is None:
|
|
seen = set()
|
|
for f in fields_to_reset:
|
|
if f in daf:
|
|
to_reset.add(f)
|
|
if f not in seen and 'reset_on_reset' in daf[f]:
|
|
seen.add(f)
|
|
to_reset.update(get_fields_to_reset_recursive(
|
|
daf, daf[f]['reset_on_reset'], seen))
|
|
return to_reset
|
|
|
|
attrs = {'data-drilldown_enabled': 1}
|
|
if 'filtered_by' in daf[db_field.name]:
|
|
attrs["data-filtered_by"] = json.dumps(
|
|
list(daf[db_field.name]['filtered_by']))
|
|
if daf[db_field.name].get('included_only', False):
|
|
attrs['data-included_only'] = 1
|
|
else:
|
|
if 'reset_on_excluded' in daf[db_field.name]:
|
|
reset_on_excluded = set(
|
|
daf[db_field.name]['reset_on_excluded'])
|
|
reset_on_excluded = get_fields_to_reset_recursive(
|
|
daf, reset_on_excluded)
|
|
attrs['data-reset_on_excluded'] = json.dumps(
|
|
list(reset_on_excluded))
|
|
|
|
if 'reset_on_included' in daf[db_field.name]:
|
|
reset_on_included = set(
|
|
daf[db_field.name]['reset_on_included'])
|
|
reset_on_included = get_fields_to_reset_recursive(
|
|
daf, reset_on_included)
|
|
attrs['data-reset_on_included'] = json.dumps(
|
|
list(reset_on_included))
|
|
kwargs["widget"] = DrillDownAutocompleteSelect(
|
|
db_field, self.admin_site, attrs=attrs, using=db
|
|
)
|
|
return super().formfield_for_foreignkey(db_field, request, **kwargs)
|
|
|
|
def drilldown_autocomplete_view(self, request):
|
|
return DrillDownAutocompleteJsonView.as_view(admin_site=self.admin_site, admin_context=self)(request)
|
|
|
|
def get_urls(self):
|
|
def wrap(view, cacheable=False):
|
|
def wrapper(*args, **kwargs):
|
|
return self.admin_site.admin_view(view, cacheable)(*args, **kwargs)
|
|
|
|
wrapper.admin_site = self
|
|
return update_wrapper(wrapper, view)
|
|
|
|
urls = super().get_urls()
|
|
drilldown_urls = [
|
|
path("drilldown/", wrap(self.drilldown_autocomplete_view),
|
|
name="drilldown_autocomplete"),
|
|
]
|
|
return drilldown_urls + urls
|