Merge pull request #217 from Corvia/master

#216 Allow ContentType queries to be performed on non-default databases.
fix_request_path_info
Diederik van der Boor 2016-05-31 17:18:06 +02:00
commit 966776aea5
5 changed files with 73 additions and 24 deletions

View File

@ -16,6 +16,7 @@ Please see LICENSE and AUTHORS for more information.
from __future__ import absolute_import from __future__ import absolute_import
from django.db import models from django.db import models
from django.db.utils import DEFAULT_DB_ALIAS
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.utils import six from django.utils import six
@ -71,7 +72,7 @@ class PolymorphicModel(six.with_metaclass(PolymorphicModelBase, models.Model)):
def translate_polymorphic_Q_object(self_class, q): def translate_polymorphic_Q_object(self_class, q):
return translate_polymorphic_Q_object(self_class, q) return translate_polymorphic_Q_object(self_class, q)
def pre_save_polymorphic(self): def pre_save_polymorphic(self, using=DEFAULT_DB_ALIAS):
"""Normally not needed. """Normally not needed.
This function may be called manually in special use-cases. When the object This function may be called manually in special use-cases. When the object
is saved for the first time, we store its real class in polymorphic_ctype. is saved for the first time, we store its real class in polymorphic_ctype.
@ -80,13 +81,14 @@ class PolymorphicModel(six.with_metaclass(PolymorphicModelBase, models.Model)):
(used by PolymorphicQuerySet._get_real_instances) (used by PolymorphicQuerySet._get_real_instances)
""" """
if not self.polymorphic_ctype_id: if not self.polymorphic_ctype_id:
self.polymorphic_ctype = ContentType.objects.get_for_model(self, for_concrete_model=False) self.polymorphic_ctype = ContentType.objects.db_manager(using).get_for_model(self, for_concrete_model=False)
pre_save_polymorphic.alters_data = True pre_save_polymorphic.alters_data = True
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
"""Overridden model save function which supports the polymorphism """Overridden model save function which supports the polymorphism
functionality (through pre_save_polymorphic).""" functionality (through pre_save_polymorphic)."""
self.pre_save_polymorphic() using = kwargs.get('using', self._state.db or DEFAULT_DB_ALIAS)
self.pre_save_polymorphic(using=using)
return super(PolymorphicModel, self).save(*args, **kwargs) return super(PolymorphicModel, self).save(*args, **kwargs)
save.alters_data = True save.alters_data = True
@ -103,7 +105,7 @@ class PolymorphicModel(six.with_metaclass(PolymorphicModelBase, models.Model)):
# Note that model_class() can return None for stale content types; # Note that model_class() can return None for stale content types;
# when the content type record still exists but no longer refers to an existing model. # when the content type record still exists but no longer refers to an existing model.
try: try:
model = ContentType.objects.get_for_id(self.polymorphic_ctype_id).model_class() model = ContentType.objects.db_manager(self._state.db).get_for_id(self.polymorphic_ctype_id).model_class()
except AttributeError: except AttributeError:
# Django <1.6 workaround # Django <1.6 workaround
return None return None
@ -122,13 +124,13 @@ class PolymorphicModel(six.with_metaclass(PolymorphicModelBase, models.Model)):
model_class = self.get_real_instance_class() model_class = self.get_real_instance_class()
if model_class is None: if model_class is None:
return None return None
return ContentType.objects.get_for_model(model_class, for_concrete_model=True).pk return ContentType.objects.db_manager(self._state.db).get_for_model(model_class, for_concrete_model=True).pk
def get_real_concrete_instance_class(self): def get_real_concrete_instance_class(self):
model_class = self.get_real_instance_class() model_class = self.get_real_instance_class()
if model_class is None: if model_class is None:
return None return None
return ContentType.objects.get_for_model(model_class, for_concrete_model=True).model_class() return ContentType.objects.db_manager(self._state.db).get_for_model(model_class, for_concrete_model=True).model_class()
def get_real_instance(self): def get_real_instance(self):
"""Normally not needed. """Normally not needed.
@ -139,7 +141,7 @@ class PolymorphicModel(six.with_metaclass(PolymorphicModelBase, models.Model)):
real_model = self.get_real_instance_class() real_model = self.get_real_instance_class()
if real_model == self.__class__: if real_model == self.__class__:
return self return self
return real_model.objects.get(pk=self.pk) return real_model.objects.db_manager(self._state.db).get(pk=self.pk)
def __init__(self, * args, ** kwargs): def __init__(self, * args, ** kwargs):
"""Replace Django's inheritance accessor member functions for our model """Replace Django's inheritance accessor member functions for our model

View File

@ -112,8 +112,8 @@ class PolymorphicQuerySet(QuerySet):
def _filter_or_exclude(self, negate, *args, **kwargs): def _filter_or_exclude(self, negate, *args, **kwargs):
"We override this internal Django functon as it is used for all filter member functions." "We override this internal Django functon as it is used for all filter member functions."
translate_polymorphic_filter_definitions_in_args(self.model, args) # the Q objects translate_polymorphic_filter_definitions_in_args(self.model, args, using=self._db) # the Q objects
additional_args = translate_polymorphic_filter_definitions_in_kwargs(self.model, kwargs) # filter_field='data' additional_args = translate_polymorphic_filter_definitions_in_kwargs(self.model, kwargs, using=self._db) # filter_field='data'
return super(PolymorphicQuerySet, self)._filter_or_exclude(negate, *(list(args) + additional_args), **kwargs) return super(PolymorphicQuerySet, self)._filter_or_exclude(negate, *(list(args) + additional_args), **kwargs)
def order_by(self, *args, **kwargs): def order_by(self, *args, **kwargs):
@ -309,8 +309,9 @@ class PolymorphicQuerySet(QuerySet):
# - also record the correct result order in "ordered_id_list" # - also record the correct result order in "ordered_id_list"
# - store objects that already have the correct class into "results" # - store objects that already have the correct class into "results"
base_result_objects_by_id = {} base_result_objects_by_id = {}
self_model_class_id = ContentType.objects.get_for_model(self.model, for_concrete_model=False).pk content_type_manager = ContentType.objects.db_manager(self._db)
self_concrete_model_class_id = ContentType.objects.get_for_model(self.model, for_concrete_model=True).pk self_model_class_id = content_type_manager.get_for_model(self.model, for_concrete_model=False).pk
self_concrete_model_class_id = content_type_manager.get_for_model(self.model, for_concrete_model=True).pk
for base_object in base_result_objects: for base_object in base_result_objects:
ordered_id_list.append(base_object.pk) ordered_id_list.append(base_object.pk)
@ -335,7 +336,7 @@ class PolymorphicQuerySet(QuerySet):
# upcast it and put it in the results # upcast it and put it in the results
results[base_object.pk] = transmogrify(real_concrete_class, base_object) results[base_object.pk] = transmogrify(real_concrete_class, base_object)
else: else:
real_concrete_class = ContentType.objects.get_for_id(real_concrete_class_id).model_class() real_concrete_class = content_type_manager.get_for_id(real_concrete_class_id).model_class()
idlist_per_model[real_concrete_class].append(getattr(base_object, pk_name)) idlist_per_model[real_concrete_class].append(getattr(base_object, pk_name))
# For each model in "idlist_per_model" request its objects (the real model) # For each model in "idlist_per_model" request its objects (the real model)
@ -344,7 +345,7 @@ class PolymorphicQuerySet(QuerySet):
# Then we copy the extra() select fields from the base objects to the real objects. # Then we copy the extra() select fields from the base objects to the real objects.
# TODO: defer(), only(): support for these would be around here # TODO: defer(), only(): support for these would be around here
for real_concrete_class, idlist in idlist_per_model.items(): for real_concrete_class, idlist in idlist_per_model.items():
real_objects = real_concrete_class.base_objects.filter(**{ real_objects = real_concrete_class.base_objects.db_manager(self._db).filter(**{
('%s__in' % pk_name): idlist, ('%s__in' % pk_name): idlist,
}) })
real_objects.query.select_related = self.query.select_related # copy select related configuration to new qs real_objects.query.select_related = self.query.select_related # copy select related configuration to new qs

View File

@ -8,6 +8,7 @@ import django
from django.db import models from django.db import models
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.db.models import Q, FieldDoesNotExist from django.db.models import Q, FieldDoesNotExist
from django.db.utils import DEFAULT_DB_ALIAS
from django.db.models.fields.related import RelatedField from django.db.models.fields.related import RelatedField
if django.VERSION < (1, 6): if django.VERSION < (1, 6):
@ -35,7 +36,7 @@ from functools import reduce
# functionality to filters and Q objects. # functionality to filters and Q objects.
# Probably a more general queryset enhancement class could be made out of them. # Probably a more general queryset enhancement class could be made out of them.
def translate_polymorphic_filter_definitions_in_kwargs(queryset_model, kwargs): def translate_polymorphic_filter_definitions_in_kwargs(queryset_model, kwargs, using=DEFAULT_DB_ALIAS):
""" """
Translate the keyword argument list for PolymorphicQuerySet.filter() Translate the keyword argument list for PolymorphicQuerySet.filter()
@ -52,7 +53,7 @@ def translate_polymorphic_filter_definitions_in_kwargs(queryset_model, kwargs):
additional_args = [] additional_args = []
for field_path, val in kwargs.copy().items(): # Python 3 needs copy for field_path, val in kwargs.copy().items(): # Python 3 needs copy
new_expr = _translate_polymorphic_filter_definition(queryset_model, field_path, val) new_expr = _translate_polymorphic_filter_definition(queryset_model, field_path, val, using=using)
if type(new_expr) == tuple: if type(new_expr) == tuple:
# replace kwargs element # replace kwargs element
@ -66,7 +67,7 @@ def translate_polymorphic_filter_definitions_in_kwargs(queryset_model, kwargs):
return additional_args return additional_args
def translate_polymorphic_Q_object(queryset_model, potential_q_object): def translate_polymorphic_Q_object(queryset_model, potential_q_object, using=DEFAULT_DB_ALIAS):
def tree_node_correct_field_specs(my_model, node): def tree_node_correct_field_specs(my_model, node):
" process all children of this Q node " " process all children of this Q node "
for i in range(len(node.children)): for i in range(len(node.children)):
@ -75,7 +76,7 @@ def translate_polymorphic_Q_object(queryset_model, potential_q_object):
if type(child) == tuple: if type(child) == tuple:
# this Q object child is a tuple => a kwarg like Q( instance_of=ModelB ) # this Q object child is a tuple => a kwarg like Q( instance_of=ModelB )
key, val = child key, val = child
new_expr = _translate_polymorphic_filter_definition(my_model, key, val) new_expr = _translate_polymorphic_filter_definition(my_model, key, val, using=using)
if new_expr: if new_expr:
node.children[i] = new_expr node.children[i] = new_expr
else: else:
@ -88,7 +89,7 @@ def translate_polymorphic_Q_object(queryset_model, potential_q_object):
return potential_q_object return potential_q_object
def translate_polymorphic_filter_definitions_in_args(queryset_model, args): def translate_polymorphic_filter_definitions_in_args(queryset_model, args, using=DEFAULT_DB_ALIAS):
""" """
Translate the non-keyword argument list for PolymorphicQuerySet.filter() Translate the non-keyword argument list for PolymorphicQuerySet.filter()
@ -102,10 +103,10 @@ def translate_polymorphic_filter_definitions_in_args(queryset_model, args):
""" """
for q in args: for q in args:
translate_polymorphic_Q_object(queryset_model, q) translate_polymorphic_Q_object(queryset_model, q, using=using)
def _translate_polymorphic_filter_definition(queryset_model, field_path, field_val): def _translate_polymorphic_filter_definition(queryset_model, field_path, field_val, using=DEFAULT_DB_ALIAS):
""" """
Translate a keyword argument (field_path=field_val), as used for Translate a keyword argument (field_path=field_val), as used for
PolymorphicQuerySet.filter()-like functions (and Q objects). PolymorphicQuerySet.filter()-like functions (and Q objects).
@ -120,9 +121,9 @@ def _translate_polymorphic_filter_definition(queryset_model, field_path, field_v
# handle instance_of expressions or alternatively, # handle instance_of expressions or alternatively,
# if this is a normal Django filter expression, return None # if this is a normal Django filter expression, return None
if field_path == 'instance_of': if field_path == 'instance_of':
return _create_model_filter_Q(field_val) return _create_model_filter_Q(field_val, using=using)
elif field_path == 'not_instance_of': elif field_path == 'not_instance_of':
return _create_model_filter_Q(field_val, not_instance_of=True) return _create_model_filter_Q(field_val, not_instance_of=True, using=using)
elif not '___' in field_path: elif not '___' in field_path:
return None # no change return None # no change
@ -229,7 +230,7 @@ def translate_polymorphic_field_path(queryset_model, field_path):
return newpath return newpath
def _create_model_filter_Q(modellist, not_instance_of=False): def _create_model_filter_Q(modellist, not_instance_of=False, using=DEFAULT_DB_ALIAS):
""" """
Helper function for instance_of / not_instance_of Helper function for instance_of / not_instance_of
Creates and returns a Q object that filters for the models in modellist, Creates and returns a Q object that filters for the models in modellist,
@ -254,7 +255,7 @@ def _create_model_filter_Q(modellist, not_instance_of=False):
assert False, 'PolymorphicModel: instance_of expects a list of (polymorphic) models or a single (polymorphic) model' assert False, 'PolymorphicModel: instance_of expects a list of (polymorphic) models or a single (polymorphic) model'
def q_class_with_subclasses(model): def q_class_with_subclasses(model):
q = Q(polymorphic_ctype=ContentType.objects.get_for_model(model, for_concrete_model=False)) q = Q(polymorphic_ctype=ContentType.objects.db_manager(using).get_for_model(model, for_concrete_model=False))
for subclass in model.__subclasses__(): for subclass in model.__subclasses__():
q = q | q_class_with_subclasses(subclass) q = q | q_class_with_subclasses(subclass)
return q return q

View File

@ -430,6 +430,8 @@ class PolymorphicTests(TestCase):
The test suite The test suite
""" """
multi_db = True
def test_annotate_aggregate_order(self): def test_annotate_aggregate_order(self):
# create a blog of type BlogA # create a blog of type BlogA
# create two blog entries in BlogA # create two blog entries in BlogA
@ -1162,6 +1164,45 @@ class PolymorphicTests(TestCase):
result = DateModel.objects.annotate(val=DateTime('date', 'day', utc)) result = DateModel.objects.annotate(val=DateTime('date', 'day', utc))
self.assertEqual(list(result), []) self.assertEqual(list(result), [])
def test_save_to_non_default_database(self):
Model2A.objects.db_manager('secondary').create(field1='A1')
Model2C(field1='C1', field2='C2', field3='C3').save(using='secondary')
Model2B.objects.create(field1='B1', field2='B2')
Model2D(field1='D1', field2='D2', field3='D3', field4='D4').save('secondary')
default_objects = list(Model2A.objects.order_by('id'))
self.assertEqual(len(default_objects), 2)
self.assertEqual(repr(default_objects[0]), '<Model2B: id 1, field1 (CharField), field2 (CharField)>')
self.assertEqual(repr(default_objects[1]), '<Model2D: id 2, field1 (CharField), field2 (CharField), field3 (CharField), field4 (CharField)>')
secondary_objects = list(Model2A.objects.db_manager('secondary').order_by('id'))
self.assertEqual(len(secondary_objects), 2)
self.assertEqual(repr(secondary_objects[0]), '<Model2A: id 1, field1 (CharField)>')
self.assertEqual(repr(secondary_objects[1]), '<Model2C: id 2, field1 (CharField), field2 (CharField), field3 (CharField)>')
def test_instance_of_filter_on_non_default_database(self):
Base.objects.db_manager('secondary').create(field_b='B1')
ModelX.objects.db_manager('secondary').create(field_b='B', field_x='X')
ModelY.objects.db_manager('secondary').create(field_b='Y', field_y='Y')
objects = Base.objects.db_manager('secondary').filter(instance_of=Base)
self.assertEqual(len(objects), 3)
self.assertEqual(repr(objects[0]), '<Base: id 1, field_b (CharField)>')
self.assertEqual(repr(objects[1]), '<ModelX: id 2, field_b (CharField), field_x (CharField)>')
self.assertEqual(repr(objects[2]), '<ModelY: id 3, field_b (CharField), field_y (CharField)>')
objects = Base.objects.db_manager('secondary').filter(instance_of=ModelX)
self.assertEqual(len(objects), 1)
self.assertEqual(repr(objects[0]), '<ModelX: id 2, field_b (CharField), field_x (CharField)>')
objects = Base.objects.db_manager('secondary').filter(instance_of=ModelY)
self.assertEqual(len(objects), 1)
self.assertEqual(repr(objects[0]), '<ModelY: id 3, field_b (CharField), field_y (CharField)>')
objects = Base.objects.db_manager('secondary').filter(Q(instance_of=ModelX) | Q(instance_of=ModelY))
self.assertEqual(len(objects), 2)
self.assertEqual(repr(objects[0]), '<ModelX: id 2, field_b (CharField), field_x (CharField)>')
self.assertEqual(repr(objects[1]), '<ModelY: id 3, field_b (CharField), field_y (CharField)>')
class RegressionTests(TestCase): class RegressionTests(TestCase):

View File

@ -25,6 +25,10 @@ if not settings.configured:
'default': { 'default': {
'ENGINE': 'django.db.backends.sqlite3', 'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:' 'NAME': ':memory:'
},
'secondary': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:'
} }
}, },
TEMPLATE_LOADERS=( TEMPLATE_LOADERS=(