sangue/django/offerte_app/sqldes2django.py

481 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
sqldes2django.
Usage:
sqldes2django.py write [--tgt-dir=<tgt-dir>] [--unmanaged=<unmanaged-name>] [--models-file=<models-file>] [--serializers-file=<serializers-file>] [--views-file=<views-file>] [--urls-file=<urls-file>] [--admin-file=<admin-file>] [--resources-file=<resources-file>] [--no-format] [--xml=<xml-path>]
sqldes2django.py (-h | --help)
sqldes2django.py (--version)
Options:
--tgt-dir=<tgt-dir> Directory di output [default: .].
--xml=<xml-path> Directory di output [default: modello_database.xml].
--models-file=<models-file> Nome del file dei modelli [default: <tgt-dir>/models.py].
--serializers-file=<serializers-file> Nome del file dei serializers [default: <tgt-dir>/serializers.py].
--views-file=<views-file> Nome del file delle views [default: <tgt-dir>/views.py].
--urls-file=<urls-file> Nome del file degli url [default: <tgt-dir>/urls.py].
--admin-file=<admin-file> Nome del file dell'admin [default: <tgt-dir>/admin.py].
--resources-file=<resources-file> Nome del file delle resources [default: <tgt-dir>/resources.py].
--no-format Non formattare l'output con autopep8.
--unmanaged=<unmanaged-name> Crea modelli unmanaged (senza migrazioni): supporta pk e fk multiple, setta database=<unmanaged-name>
--version Show version.
-h --help Show this screen.
"""
import xml.etree.ElementTree as ET
from collections import defaultdict
import lark
import os
import sys
import autopep8
import re
from docopt import docopt
"""
Created on Thu Feb 28 15:28:17 2019
@author: guido
"""
# TODO: Validare nomi per uso con Django
# TODO: Trattare le chiavi primarie manuali e le chiavi primarie composite
# TODO: Aggiungere tipi mancanti alla mappa
if __name__ == '__main__':
arguments = docopt(__doc__, version='Sqldes2django 0.1')
for arg in arguments:
if isinstance(arguments[arg], str):
arguments[arg] = arguments[arg].replace("<tgt-dir>",
arguments['--tgt-dir'])
tgtdir = arguments['--tgt-dir']
unmanaged = False
if arguments['--unmanaged']:
unmanaged = True
nome_db = arguments['--unmanaged']
print(arguments)
tgtfil = {x: arguments["--" + x + "-file"]
for x in ('models', 'admin', 'serializers', 'views', 'urls', 'resources',)}
sql_type_grammar = lark.Lark(r"""
%import common.ESCAPED_STRING
%import common.SIGNED_NUMBER
%import common.WS
%import common.CNAME
%ignore WS
tipo: costruttore | costruttore "(" [arg ("," arg)* ] ")"
?costruttore: CNAME
?arg: val | keyword_argument
val: SIGNED_NUMBER | ESCAPED_STRING
keyword_argument: CNAME "=" val""",
parser="lalr",
start="tipo") # TODO: leggere lo standard vero sql
def tutti_parametri(stringa_orig):
params = sql_type_grammar.parse(stringa_orig)
parametri = params.children[1:]
posizionali = []
keyword_args = {}
trovato_kw = False
for x in parametri:
if x.data == "val":
assert not trovato_kw, f"Argomento posizionale non può seguire keyword argument in '{stringa_orig}'"
posizionali.append(x.children[0].value)
if x.data == "keyword_argument":
trovato_kw = True
keyword_args[x.children[0].value] = x.children[1].children[0].value
return {"costruttore": params.children[0].value, "posizionali": posizionali, "keyword_args": keyword_args, "stringa": stringa_orig}
def get_tipo_mappato(parsato):
tipodato = parsato['costruttore'].upper()
return mappa_tipi[tipodato]
def is_parametrico(parsato):
return len(parsato["posizionali"]) > 0 or len(parsato["keyword_args"]) > 0
def get_parametro(parametri, positional=None, keyword=None):
if keyword is not None and keyword in parametri['keyword_args']:
val = parametri['keyword_args'][keyword]
del parametri['keyword_args'][keyword]
return val
if positional is not None:
try:
return parametri['posizionali'][positional]
except IndexError as e:
raise Exception(
f"Richiesti almeno {positional + 1} argomenti posizionali, trovati {len(parametri['posizionali'])} in {parametri['stringa']}") from e
raise Exception("Occorre specificare positional o keyword")
fine_prefisso_RE = re.compile(r'# +-+ +FINE PREFISSO TEMPLATE +-+')
def leggi_prefisso(nome_modulo):
tmpl_prefisso = []
with open(tgtfil[nome_modulo]) as fp:
for l in fp.readlines():
tmpl_prefisso.append(l)
if fine_prefisso_RE.search(l):
break
return ''.join(tmpl_prefisso)
multifkimport = "from compositefk.fields import CompositeForeignKey # Installa django-composite-foreignkey"
multipkmodel = """
class MultiPkModel(models.Model):
class Meta:
abstract = True
def save(self, *args, **kwargs):
raise NotImplementedError("È un MultiPkModel")
# TODO
def __hash__(self):
attributi_chiave_multipla = self._meta.unique_together[0]
parti = []
for x in attributi_chiave_multipla:
try:
parte = getattr(self, x + '_id') # se oltre che parte di primary è foreign, non vogliamo fetchare tutto l'oggetto
except AttributeError:
parte = getattr(self, x)
parti.append(parte)
return hash(tuple(parti))
class MultiPkQuerySet(models.QuerySet):
def get(self, *args, **kwargs):
attributi_chiave_multipla = self.model._meta.unique_together[0]
if any(key not in kwargs for key in attributi_chiave_multipla):
raise Exception("È un MultiPkModel. Fornire anche i seguenti keywoard arguments: " + str(set(attributi_chiave_multipla) - set(kwargs.keys())))
filtro = {k : kwargs[k] for k in attributi_chiave_multipla}
risultati = list(self.filter(**filtro).all())
if len(risultati) == 1:
return risultati[0]
elif len(risultati) == 0:
raise self.model.DoesNotExist(self.model._meta.object_name + " with " + str(filtro))
else:
raise Exception("MultiPkQuerySet.get: righe multiple con " + str(filtro))
class MultiPkManager(models.Manager):
def get_queryset(self):
return MultiPkQuerySet(self.model, using=self._db)
"""
tmpl_models_prefisso = leggi_prefisso('models')
tmpl_models = '''
{multifkimport}
{multipkmodel}
{corpo}
'''
tmpl_model_class = '''
class {nometab}({classe_model}):{db_name}
class Meta:
verbose_name = '{nometab_lower}'
verbose_name_plural = '{nometab_lower}'
{is_managed}{unique_together}
def __str__(self):
return f"{nometab} ({chiavi})"
{righetxt}
'''
tmpl_riga = '''{nomeriga} = models.{tiporiga}({opzioniriga})'''
tmpl_chiave_fk = '''{nomeriga} = models.ForeignKey('{altratab}', {opzioniriga})'''
tmpl_admin_prefisso = leggi_prefisso('admin')
tmpl_admin = '''
{corpo}
'''
tmpl_admin_class = '''
@admin.register(models.{nometab})
class {nometab}Admin(ImportExportModelAdmin):
# resource = resources.{nometab}Resource
# list_per_page = 15
# paginator = CachingPaginator
# show_full_result_count = False
pass
'''
tmpl_serializers_prefisso = leggi_prefisso('serializers')
tmpl_serializers = '''
{corpo}
'''
tmpl_serializer_class = '''
class {nometab}Serializer(serializers.ModelSerializer):
class Meta:
model = models.{nometab}
fields = ('{campi}')
'''
tmpl_views_prefisso = leggi_prefisso('views')
tmpl_views = '''
{corpo}
'''
tmpl_view_class = '''
class {nometab}_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.{nometab}.objects.all()
serializer_class = serializers.{nometab}Serializer
'''
tmpl_urls_prefisso = leggi_prefisso('urls')
tmpl_urls = '''
{urls}
urlpatterns += router.urls
'''
tmpl_url_route = '''router.register(r'{nometaburl}', views.{nometab}_View)'''
tmpl_resources_prefisso = leggi_prefisso('resources')
tmpl_resources = '''
{corpo}
'''
tmpl_resource_class = '''
class {nometab}Resource(resources.ModelResource):
class Meta:
model = models.{nometab}
'''
mappa_tipi = {
'INTEGER': 'IntegerField',
'INT': 'IntegerField',
'NUMERIC': 'DecimalField',
'DECIMAL': 'DecimalField',
'SMALLINT': 'SmallIntegerField',
'FLOAT': 'FloatField',
'TEXT': 'TextField',
'VARCHAR': 'CharField',
'CHAR': 'CharField',
'MEDIUMTEXT': 'TextField',
'DATE': 'DateField',
'TIME': 'TimeField',
'DATETIME': 'DateTimeField',
'TIMESTAMP': 'DateTimeField',
'DECIMAL': 'DecimalField',
'BIT': 'BooleanField',
'BINARY': 'BooleanField',
'JSONB': 'JSONField'
}
tabelle = list(ET.ElementTree(file=arguments['--xml'])
.findall('.//table'))
tabelle2 = (ET.ElementTree(file=arguments['--xml'])
.findall('.//table'))
models = []
admin = []
serializers = []
views = []
urls = []
resources = []
nomi_tabelle = []
any_multiple_fk = False
any_multiple_pk = False
tabelle_multi_pk = set()
for t in tabelle2:
nome = t.get('name')
chiave = t.findall('.//key/part')
if len(chiave) > 1:
tabelle_multi_pk.add(nome)
any_multiple_pk = True
renames = {}
is_fk_and_pk = set()
for t in tabelle:
righe = t.findall('.//row')
n_tab = t.get('name')
chiave = t.findall('.//key/part')
# prima decido i rename
for r in righe:
fk = r.find('relation')
n = r.get('name')
is_simple_fk = fk is not None and fk.get(
'table') not in tabelle_multi_pk
if is_simple_fk and unmanaged:
# renames[(n_tab, n)] = 'fk_' + n # poi ci pensiamo
renames[(n_tab, n)] = n
else:
renames[(n_tab, n)] = n
if is_simple_fk and n in [x.text for x in chiave]:
is_fk_and_pk.add((n_tab, n))
for t in tabelle:
rec_tabella = dict()
rec_serializers = dict()
rec_tabella['nometab'] = t.get('name')
rec_tabella['nometab_lower'] = rec_tabella['nometab'].lower()
n_tab = t.get('name')
rec_tabella['classe_model'] = 'models.Model'
nomi_tabelle.append(rec_tabella['nometab'])
rec_serializers['nometab'] = t.get('name')
rec_tabella['is_managed'] = ''
rec_tabella['db_name'] = ''
rec_tabella['unique_together'] = ''
if unmanaged:
rec_tabella['is_managed'] = f'managed = False\n db_table = "{rec_tabella["nometab"]}"\n '
rec_tabella['db_name'] = f'\n database = "{nome_db}"'
righe = t.findall('.//row')
chiave = t.findall('.//key/part')
pk_singola = None
righetxt = []
if len(chiave) == 1:
if not unmanaged:
righe = [r for r in righe if r.get('name') != chiave[0].text]
pk_singola = chiave[0].text
if unmanaged:
rec_tabella['chiavi'] = renames[(n_tab, pk_singola)] + ': {self.' + (renames[(n_tab, pk_singola)]+'_id' if (
n_tab, pk_singola) in is_fk_and_pk else renames[(n_tab, pk_singola)]) + '}'
else:
rec_tabella['chiavi'] = "id: {self.id}"
elif len(chiave) > 1:
multi_pk = True
rec_tabella['classe_model'] = "MultiPkModel"
nomi = ", ".join(f"'{renames[(n_tab, x.text)]}'" for x in chiave)
rec_tabella['unique_together'] = f"unique_together=[({nomi})]"
if unmanaged:
rec_tabella['chiavi'] = ", ".join(renames[(n_tab, x.text)] + ': {self.' + (renames[(n_tab, x.text)]+'_id' if (
n_tab, x.text) in is_fk_and_pk else renames[(n_tab, x.text)]) + '}' for x in chiave)
else:
rec_tabella['chiavi'] = "non puoi fare una pk multipla in django"
righetxt.append('objects = MultiPkManager()')
righeserializerstxt = []
# TODO: implementare eventuali tipi multiparametro e con spazi
datoparametricoRE = re.compile(
r'(?P<tipo>[^(]*)\((?P<lunghezza>[^)]*)\)')
fk_composite = defaultdict(list)
is_first = True
for r in righe:
rec_riga = dict()
rec_chiave_fk = dict()
rec_chiave_fk['opzioniriga'] = dict()
rec_riga['opzioniriga'] = dict()
comm = r.find('comment')
if comm is not None and 'django_ignore' in comm.text:
continue
riga_nullable = r.get('null', "1") == "1"
if is_first and unmanaged and rec_tabella['nometab'] in tabelle_multi_pk:
if not riga_nullable:
# il primo field diventa chiave primaria fittizia. Attenzione a non usarlo davvero!
righetxt.append("")
righetxt.append(
"# chiave primaria fittizia. Attenzione a non usarla! Fa fede unique_together.")
rec_riga['opzioniriga']['primary_key'] = True
rec_chiave_fk['opzioniriga']['primary_key'] = True
is_first = False
fk = r.find('relation')
is_simple_fk = fk is not None and fk.get(
'table') not in tabelle_multi_pk
if is_simple_fk:
rec_chiave_fk['nomeriga'] = renames[(n_tab, r.get('name'))]
n_altra_tab = rec_chiave_fk['altratab'] = fk.get('table')
rec_chiave_fk['altracol'] = renames[(
n_altra_tab, fk.get('row'))]
rec_chiave_fk['opzioniriga']['on_delete'] = 'models.CASCADE'
rec_chiave_fk['opzioniriga']['null'] = riga_nullable
rec_chiave_fk['opzioniriga']['blank'] = riga_nullable
if unmanaged:
rec_chiave_fk['opzioniriga']['to_field'] = f"'{rec_chiave_fk['altracol']}'"
rec_chiave_fk['opzioniriga']['db_column'] = f"'{r.get('name')}'"
rec_chiave_fk['opzioniriga']['related_name'] = (
f""""{rec_tabella['nometab']}_da_{rec_chiave_fk['altratab']}_{rec_chiave_fk['nomeriga']}" """.strip())
if not rec_chiave_fk['opzioniriga']['null']:
del rec_chiave_fk['opzioniriga']['null']
del rec_chiave_fk['opzioniriga']['blank']
rec_chiave_fk['opzioniriga'] = ', '.join(
f'{k}={v}' for k, v in rec_chiave_fk['opzioniriga'].items())
righetxt.append(tmpl_chiave_fk.format(**rec_chiave_fk))
else:
rec_riga['opzioniriga']['null'] = riga_nullable
rec_riga['opzioniriga']['blank'] = riga_nullable
if fk is not None: # è composita
tab_target = fk.get('table')
fk_composite[tab_target].append({"campo": r.get('name'),
"campo_tgt": renames[(tab_target, fk.get("row"))],
"nullabile": rec_riga['opzioniriga']['null']})
rec_riga['nomeriga'] = r.get('name')
orig = tipodato = str(r.find('datatype').text)
parsato = tutti_parametri(tipodato)
#parametrico = datoparametricoRE.match(tipodato)
parametrico = is_parametrico(parsato)
# parametrico = tipo
rec_riga['tiporiga'] = get_tipo_mappato(parsato)
if parametrico:
if rec_riga['tiporiga'] in ('CharField',):
lunghezza = get_parametro(
parsato, positional=0, keyword="max_length")
rec_riga['opzioniriga']['max_length'] = int(lunghezza)
elif rec_riga['tiporiga'] == "DecimalField":
max_digits = get_parametro(
parsato, positional=0, keyword="max_digits")
decimal_places = get_parametro(
parsato, positional=1, keyword="decimal_places")
rec_riga['opzioniriga']['max_digits'] = int(max_digits)
rec_riga['opzioniriga']['decimal_places'] = int(
decimal_places)
arg_avanzati = parsato["keyword_args"]
for x in arg_avanzati:
rec_riga['opzioniriga'][x] = arg_avanzati[x]
elif unmanaged:
if tipodato in ('CHAR', 'VARCHAR'):
rec_riga['opzioniriga']['max_length'] = 1000000
if r.get('name') == pk_singola:
# chiave singola
rec_riga['opzioniriga']['primary_key'] = True
if not rec_riga['opzioniriga']['null']:
del rec_riga['opzioniriga']['null']
del rec_riga['opzioniriga']['blank']
if rec_riga['tiporiga'] in ('CharField', 'TextField') and 'blank' in rec_riga['opzioniriga']:
del rec_riga['opzioniriga']['blank']
rec_riga['opzioniriga'] = ','.join(
f'{k}={v}' for k, v in rec_riga['opzioniriga'].items())
righetxt.append(tmpl_riga.format(**rec_riga))
rec_riga_serializers = dict()
rec_riga_serializers['nomecampo'] = r.get('name')
righeserializerstxt.append(rec_riga_serializers['nomecampo'])
for tgt in fk_composite:
nullabile = any(x['nullabile'] for x in fk_composite[tgt])
fk_fields = ",\n ".join(
f"'{x['campo_tgt']}': '{x['campo']}'" for x in fk_composite[tgt])
asd = f"fk_{tgt} = CompositeForeignKey('{tgt}', on_delete=models.CASCADE, null={nullabile}, to_fields=" + "{\n " + fk_fields + "\n })"
righetxt.append(asd)
any_multiple_fk = True
rec_tabella['righetxt'] = '\n '.join(righetxt)
rec_serializers['campi'] = "','".join(righeserializerstxt)
models.append(tmpl_model_class.format(**rec_tabella))
admin.append(tmpl_admin_class.format(**rec_tabella))
serializers.append(tmpl_serializer_class.format(**rec_serializers))
views.append(tmpl_view_class.format(nometab=rec_tabella['nometab']))
urls.append(tmpl_url_route.format(
nometaburl=rec_tabella['nometab'].lower(), nometab=rec_tabella['nometab']))
resources.append(tmpl_resource_class.format(
nometab=rec_tabella['nometab']))
models = tmpl_models_prefisso + tmpl_models.format(corpo='\n'.join(models),
multipkmodel=multipkmodel if any_multiple_pk else "",
multifkimport=multifkimport if any_multiple_fk else "")
admin = tmpl_admin_prefisso + tmpl_admin.format(corpo='\n'.join(admin))
serializers = tmpl_serializers_prefisso + tmpl_serializers.format(corpo='\n'.join(serializers))
views = tmpl_views_prefisso + tmpl_views.format(corpo='\n'.join(views))
urls = tmpl_urls_prefisso + tmpl_urls.format(urls='\n'.join(urls))
resources = tmpl_resources_prefisso + tmpl_resources.format(corpo='\n'.join(resources))
for tgt_name, tgt_path in tgtfil.items():
with open(tgt_path, 'w') as f:
print(
f"Writing {tgt_name} to {tgt_path}, autopep8: {not arguments['--no-format']}...")
if arguments['--no-format']:
f.write(globals()[tgt_name])
else:
f.write(autopep8.fix_code(globals()[tgt_name]))