import json from functools import update_wrapper from django.apps import apps from django.contrib import admin from django.contrib.admin.views.autocomplete import AutocompleteJsonView from django.contrib.admin.widgets import AutocompleteMixin, AutocompleteSelect from django.core.exceptions import FieldDoesNotExist, PermissionDenied from django.db.models import Case, Max, Q, Value, When from django.http import Http404, JsonResponse from django.urls import path class DrillDownAutocompleteJsonView(AutocompleteJsonView): """Handle AutocompleteWidget's AJAX requests for data.""" def process_request(self, request): """ Validate request integrity, extract and return request parameters. Since the subsequent view permission check requires the target model admin, which is determined here, raise PermissionDenied if the requested app, model or field are malformed. Raise Http404 if the target model admin is not configured properly with search_fields. """ ( term, model_admin, source_field, to_field_name ) = super().process_request(request) if 'drilldown' in request.GET: try: drilldown_enabled = int(request.GET.get('drilldown')) except ValueError: raise PermissionDenied from e try: drilldown_enabled = {1: True, 0: False}[drilldown_enabled] except KeyError: raise PermissionDenied from e self.drilldown_enabled = drilldown_enabled if drilldown_enabled: if 'filtered_by_dict' in request.GET: try: filtered_by_dict = json.loads( request.GET.get("filtered_by_dict")) except json.decoder.JSONDecodeError as e: raise PermissionDenied from e app_label = request.GET["app_label"] model_name = request.GET["model_name"] source_model = apps.get_model(app_label, model_name) try: drilldown_field = self.admin_site._registry[ source_model].get_drilldown_autocomplete_fields(request)[source_field.name] filtered_by = set(drilldown_field['filtered_by']) except KeyError as e: raise PermissionDenied from e if(set(filtered_by_dict.keys()) > filtered_by): raise PermissionDenied rel_paths = drilldown_field['relationship_path'] target_model = source_field.remote_field.model try: for v in rel_paths.values(): target_model._meta.get_field(v) except FieldDoesNotExist as e: raise PermissionDenied from e self.target_model = target_model self.filtered_by_dict = filtered_by_dict self.drilldown_field = drilldown_field self.drilldown_filter_data = { rel_paths[k]: v for k,v in filtered_by_dict.items()} return term, model_admin, source_field, to_field_name def get_queryset(self): """Return queryset based on ModelAdmin.get_search_results().""" qs = super().get_queryset() if getattr(self, 'drilldown_enabled', False): if hasattr(self, 'filtered_by_dict'): dd_field = self.drilldown_field drilldown_filter_conditions = Q(**self.drilldown_filter_data) if dd_field.get('autoselect_on_singleton', False): self.autoselect = False if dd_field.get('included_only', False): qs = qs.filter(drilldown_filter_conditions).annotate( ddok=Value(1)) else: qs = qs.annotate(ddok=Max(Case(When(drilldown_filter_conditions, then=Value( 1)), default=Value(0)))).order_by('-ddok', *qs.query.order_by) if not getattr(self, 'autoselect', True) and qs.filter(ddok=1).count() == 1: self.autoselect = True else: qs = qs.annotate(ddok=Value(1)) return qs def serialize_result(self, obj, to_field_name): """ Convert the provided model object to a dictionary that is added to the results list. """ out = super().serialize_result(obj, to_field_name) if hasattr(obj, 'ddok'): out['ddok'] = obj.ddok if getattr(self, 'autoselect', False) and obj.ddok == 1: out['autoselect'] = True return out class DrillDownAutocompleteMixin(AutocompleteMixin): url_name = "%s:drilldown_autocomplete" class DrillDownAutocompleteSelect(AutocompleteSelect, DrillDownAutocompleteMixin): pass class DrillDownAutocompleteModelAdmin(admin.options.BaseModelAdmin): drilldown_autocomplete_fields = dict() class Media: css = { 'all': ('admin/css/drilldown_autocomplete.css',) } def get_drilldown_autocomplete_fields(self, request): return self.drilldown_autocomplete_fields def formfield_for_foreignkey(self, db_field, request, **kwargs): """ Get a form Field for a ForeignKey. """ db = kwargs.get("using") if "widget" not in kwargs: daf = self.get_drilldown_autocomplete_fields(request) if db_field.name in daf: attrs = {'data-drilldown_enabled': 1} if 'filtered_by' in daf[db_field.name]: attrs["data-filtered_by"] = json.dumps( list(daf[db_field.name]['filtered_by'])) if daf[db_field.name].get('included_only', False): attrs['data-included_only'] = 1 else: if 'reset_on_excluded' in daf[db_field.name]: attrs['data-reset_on_excluded'] = json.dumps( list(daf[db_field.name]['reset_on_excluded'])) if 'reset_on_included' in daf[db_field.name]: attrs['data-reset_on_included'] = json.dumps( list(daf[db_field.name]['reset_on_included'])) kwargs["widget"] = DrillDownAutocompleteSelect( db_field, self.admin_site, attrs=attrs, using=db ) return super().formfield_for_foreignkey(db_field, request, **kwargs) def drilldown_autocomplete_view(self, request): return DrillDownAutocompleteJsonView.as_view(admin_site=self.admin_site)(request) def get_urls(self): def wrap(view, cacheable=False): def wrapper(*args, **kwargs): return self.admin_site.admin_view(view, cacheable)(*args, **kwargs) wrapper.admin_site = self return update_wrapper(wrapper, view) urls = super().get_urls() drilldown_urls = [ path("drilldown/", wrap(self.drilldown_autocomplete_view), name="drilldown_autocomplete"), ] return drilldown_urls + urls