#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ sqldes2django. Usage: sqldes2django.py write [--tgt-dir=] [--unmanaged=] [--models-file=] [--serializers-file=] [--views-file=] [--urls-file=] [--admin-file=] [--resources-file=] [--no-format] [--xml=] sqldes2django.py (-h | --help) sqldes2django.py (--version) Options: --tgt-dir= Directory di output [default: .]. --xml= Directory di output [default: modello_database.xml]. --models-file= Nome del file dei modelli [default: /models.py]. --serializers-file= Nome del file dei serializers [default: /serializers.py]. --views-file= Nome del file delle views [default: /views.py]. --urls-file= Nome del file degli url [default: /urls.py]. --admin-file= Nome del file dell'admin [default: /admin.py]. --resources-file= Nome del file delle resources [default: /resources.py]. --no-format Non formattare l'output con autopep8. --unmanaged= Crea modelli unmanaged (senza migrazioni): supporta pk e fk multiple, setta database= --version Show version. -h --help Show this screen. """ import xml.etree.ElementTree as ET from collections import defaultdict import lark import os import sys import autopep8 import re from docopt import docopt """ Created on Thu Feb 28 15:28:17 2019 @author: guido """ # TODO: Validare nomi per uso con Django # TODO: Trattare le chiavi primarie manuali e le chiavi primarie composite # TODO: Aggiungere tipi mancanti alla mappa if __name__ == '__main__': arguments = docopt(__doc__, version='Sqldes2django 0.1') for arg in arguments: if isinstance(arguments[arg], str): arguments[arg] = arguments[arg].replace("", arguments['--tgt-dir']) tgtdir = arguments['--tgt-dir'] unmanaged = False if arguments['--unmanaged']: unmanaged = True nome_db = arguments['--unmanaged'] print(arguments) tgtfil = {x: arguments["--" + x + "-file"] for x in ('models', 'admin', 'serializers', 'views', 'urls', 'resources',)} sql_type_grammar = lark.Lark(r""" %import common.ESCAPED_STRING %import common.SIGNED_NUMBER %import common.WS %import common.CNAME %ignore WS tipo: costruttore | costruttore "(" [arg ("," arg)* ] ")" ?costruttore: CNAME ?arg: val | keyword_argument val: SIGNED_NUMBER | ESCAPED_STRING keyword_argument: CNAME "=" val""", parser="lalr", start="tipo") # TODO: leggere lo standard vero sql def tutti_parametri(stringa_orig): params = sql_type_grammar.parse(stringa_orig) parametri = params.children[1:] posizionali = [] keyword_args = {} trovato_kw = False for x in parametri: if x.data == "val": assert not trovato_kw, f"Argomento posizionale non può seguire keyword argument in '{stringa_orig}'" posizionali.append(x.children[0].value) if x.data == "keyword_argument": trovato_kw = True keyword_args[x.children[0].value] = x.children[1].children[0].value return {"costruttore": params.children[0].value, "posizionali": posizionali, "keyword_args": keyword_args, "stringa": stringa_orig} def get_tipo_mappato(parsato): tipodato = parsato['costruttore'].upper() return mappa_tipi[tipodato] def is_parametrico(parsato): return len(parsato["posizionali"]) > 0 or len(parsato["keyword_args"]) > 0 def get_parametro(parametri, positional=None, keyword=None): if keyword is not None and keyword in parametri['keyword_args']: val = parametri['keyword_args'][keyword] del parametri['keyword_args'][keyword] return val if positional is not None: try: return parametri['posizionali'][positional] except IndexError as e: raise Exception( f"Richiesti almeno {positional + 1} argomenti posizionali, trovati {len(parametri['posizionali'])} in {parametri['stringa']}") from e raise Exception("Occorre specificare positional o keyword") fine_prefisso_RE = re.compile(r'# +-+ +FINE PREFISSO TEMPLATE +-+') def leggi_prefisso(nome_modulo): tmpl_prefisso = [] with open(tgtfil[nome_modulo]) as fp: for l in fp.readlines(): tmpl_prefisso.append(l) if fine_prefisso_RE.search(l): break return ''.join(tmpl_prefisso) multifkimport = "from compositefk.fields import CompositeForeignKey # Installa django-composite-foreignkey" multipkmodel = """ class MultiPkModel(models.Model): class Meta: abstract = True def save(self, *args, **kwargs): raise NotImplementedError("È un MultiPkModel") # TODO def __hash__(self): attributi_chiave_multipla = self._meta.unique_together[0] parti = [] for x in attributi_chiave_multipla: try: parte = getattr(self, x + '_id') # se oltre che parte di primary è foreign, non vogliamo fetchare tutto l'oggetto except AttributeError: parte = getattr(self, x) parti.append(parte) return hash(tuple(parti)) class MultiPkQuerySet(models.QuerySet): def get(self, *args, **kwargs): attributi_chiave_multipla = self.model._meta.unique_together[0] if any(key not in kwargs for key in attributi_chiave_multipla): raise Exception("È un MultiPkModel. Fornire anche i seguenti keywoard arguments: " + str(set(attributi_chiave_multipla) - set(kwargs.keys()))) filtro = {k : kwargs[k] for k in attributi_chiave_multipla} risultati = list(self.filter(**filtro).all()) if len(risultati) == 1: return risultati[0] elif len(risultati) == 0: raise self.model.DoesNotExist(self.model._meta.object_name + " with " + str(filtro)) else: raise Exception("MultiPkQuerySet.get: righe multiple con " + str(filtro)) class MultiPkManager(models.Manager): def get_queryset(self): return MultiPkQuerySet(self.model, using=self._db) """ tmpl_models_prefisso = leggi_prefisso('models') tmpl_models = ''' {multifkimport} {multipkmodel} {corpo} ''' tmpl_model_class = ''' class {nometab}({classe_model}):{db_name} class Meta: verbose_name = '{nometab_lower}' verbose_name_plural = '{nometab_lower}' {is_managed}{unique_together} def __str__(self): return f"{nometab} ({chiavi})" {righetxt} ''' tmpl_riga = '''{nomeriga} = models.{tiporiga}({opzioniriga})''' tmpl_chiave_fk = '''{nomeriga} = models.ForeignKey('{altratab}', {opzioniriga})''' tmpl_admin_prefisso = leggi_prefisso('admin') tmpl_admin = ''' {corpo} ''' tmpl_admin_class = ''' @admin.register(models.{nometab}) class {nometab}Admin(ImportExportModelAdmin): # resource = resources.{nometab}Resource # list_per_page = 15 # paginator = CachingPaginator # show_full_result_count = False pass ''' tmpl_serializers_prefisso = leggi_prefisso('serializers') tmpl_serializers = ''' {corpo} ''' tmpl_serializer_class = ''' class {nometab}Serializer(serializers.ModelSerializer): class Meta: model = models.{nometab} fields = ('{campi}') ''' tmpl_views_prefisso = leggi_prefisso('views') tmpl_views = ''' {corpo} ''' tmpl_view_class = ''' class {nometab}_View(viewsets.ModelViewSet): # authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication] # permission_classes = [DjangoModelPermissions] queryset = models.{nometab}.objects.all() serializer_class = serializers.{nometab}Serializer ''' tmpl_urls_prefisso = leggi_prefisso('urls') tmpl_urls = ''' {urls} urlpatterns += router.urls ''' tmpl_url_route = '''router.register(r'{nometaburl}', views.{nometab}_View)''' tmpl_resources_prefisso = leggi_prefisso('resources') tmpl_resources = ''' {corpo} ''' tmpl_resource_class = ''' class {nometab}Resource(resources.ModelResource): class Meta: model = models.{nometab} ''' mappa_tipi = { 'INTEGER': 'IntegerField', 'INT': 'IntegerField', 'NUMERIC': 'DecimalField', 'DECIMAL': 'DecimalField', 'SMALLINT': 'SmallIntegerField', 'FLOAT': 'FloatField', 'TEXT': 'TextField', 'VARCHAR': 'CharField', 'CHAR': 'CharField', 'MEDIUMTEXT': 'TextField', 'DATE': 'DateField', 'TIME': 'TimeField', 'DATETIME': 'DateTimeField', 'TIMESTAMP': 'DateTimeField', 'DECIMAL': 'DecimalField', 'BIT': 'BooleanField', 'BINARY': 'BooleanField', 'JSONB': 'JSONField' } tabelle = list(ET.ElementTree(file=arguments['--xml']) .findall('.//table')) tabelle2 = (ET.ElementTree(file=arguments['--xml']) .findall('.//table')) models = [] admin = [] serializers = [] views = [] urls = [] resources = [] nomi_tabelle = [] any_multiple_fk = False any_multiple_pk = False tabelle_multi_pk = set() for t in tabelle2: nome = t.get('name') chiave = t.findall('.//key/part') if len(chiave) > 1: tabelle_multi_pk.add(nome) any_multiple_pk = True renames = {} is_fk_and_pk = set() for t in tabelle: righe = t.findall('.//row') n_tab = t.get('name') chiave = t.findall('.//key/part') # prima decido i rename for r in righe: fk = r.find('relation') n = r.get('name') is_simple_fk = fk is not None and fk.get( 'table') not in tabelle_multi_pk if is_simple_fk and unmanaged: # renames[(n_tab, n)] = 'fk_' + n # poi ci pensiamo renames[(n_tab, n)] = n else: renames[(n_tab, n)] = n if is_simple_fk and n in [x.text for x in chiave]: is_fk_and_pk.add((n_tab, n)) for t in tabelle: rec_tabella = dict() rec_serializers = dict() rec_tabella['nometab'] = t.get('name') rec_tabella['nometab_lower'] = rec_tabella['nometab'].lower() n_tab = t.get('name') rec_tabella['classe_model'] = 'models.Model' nomi_tabelle.append(rec_tabella['nometab']) rec_serializers['nometab'] = t.get('name') rec_tabella['is_managed'] = '' rec_tabella['db_name'] = '' rec_tabella['unique_together'] = '' if unmanaged: rec_tabella['is_managed'] = f'managed = False\n db_table = "{rec_tabella["nometab"]}"\n ' rec_tabella['db_name'] = f'\n database = "{nome_db}"' righe = t.findall('.//row') chiave = t.findall('.//key/part') pk_singola = None righetxt = [] if len(chiave) == 1: if not unmanaged: righe = [r for r in righe if r.get('name') != chiave[0].text] pk_singola = chiave[0].text if unmanaged: rec_tabella['chiavi'] = renames[(n_tab, pk_singola)] + ': {self.' + (renames[(n_tab, pk_singola)]+'_id' if ( n_tab, pk_singola) in is_fk_and_pk else renames[(n_tab, pk_singola)]) + '}' else: rec_tabella['chiavi'] = "id: {self.id}" elif len(chiave) > 1: multi_pk = True rec_tabella['classe_model'] = "MultiPkModel" nomi = ", ".join(f"'{renames[(n_tab, x.text)]}'" for x in chiave) rec_tabella['unique_together'] = f"unique_together=[({nomi})]" if unmanaged: rec_tabella['chiavi'] = ", ".join(renames[(n_tab, x.text)] + ': {self.' + (renames[(n_tab, x.text)]+'_id' if ( n_tab, x.text) in is_fk_and_pk else renames[(n_tab, x.text)]) + '}' for x in chiave) else: rec_tabella['chiavi'] = "non puoi fare una pk multipla in django" righetxt.append('objects = MultiPkManager()') righeserializerstxt = [] # TODO: implementare eventuali tipi multiparametro e con spazi datoparametricoRE = re.compile( r'(?P[^(]*)\((?P[^)]*)\)') fk_composite = defaultdict(list) is_first = True for r in righe: rec_riga = dict() rec_chiave_fk = dict() rec_chiave_fk['opzioniriga'] = dict() rec_riga['opzioniriga'] = dict() comm = r.find('comment') if comm is not None and 'django_ignore' in comm.text: continue riga_nullable = r.get('null', "1") == "1" if is_first and unmanaged and rec_tabella['nometab'] in tabelle_multi_pk: if not riga_nullable: # il primo field diventa chiave primaria fittizia. Attenzione a non usarlo davvero! righetxt.append("") righetxt.append( "# chiave primaria fittizia. Attenzione a non usarla! Fa fede unique_together.") rec_riga['opzioniriga']['primary_key'] = True rec_chiave_fk['opzioniriga']['primary_key'] = True is_first = False fk = r.find('relation') is_simple_fk = fk is not None and fk.get( 'table') not in tabelle_multi_pk if is_simple_fk: rec_chiave_fk['nomeriga'] = renames[(n_tab, r.get('name'))] n_altra_tab = rec_chiave_fk['altratab'] = fk.get('table') rec_chiave_fk['altracol'] = renames[( n_altra_tab, fk.get('row'))] rec_chiave_fk['opzioniriga']['on_delete'] = 'models.CASCADE' rec_chiave_fk['opzioniriga']['null'] = riga_nullable rec_chiave_fk['opzioniriga']['blank'] = riga_nullable if unmanaged: rec_chiave_fk['opzioniriga']['to_field'] = f"'{rec_chiave_fk['altracol']}'" rec_chiave_fk['opzioniriga']['db_column'] = f"'{r.get('name')}'" rec_chiave_fk['opzioniriga']['related_name'] = ( f""""{rec_tabella['nometab']}_da_{rec_chiave_fk['altratab']}_{rec_chiave_fk['nomeriga']}" """.strip()) if not rec_chiave_fk['opzioniriga']['null']: del rec_chiave_fk['opzioniriga']['null'] del rec_chiave_fk['opzioniriga']['blank'] rec_chiave_fk['opzioniriga'] = ', '.join( f'{k}={v}' for k, v in rec_chiave_fk['opzioniriga'].items()) righetxt.append(tmpl_chiave_fk.format(**rec_chiave_fk)) else: rec_riga['opzioniriga']['null'] = riga_nullable rec_riga['opzioniriga']['blank'] = riga_nullable if fk is not None: # è composita tab_target = fk.get('table') fk_composite[tab_target].append({"campo": r.get('name'), "campo_tgt": renames[(tab_target, fk.get("row"))], "nullabile": rec_riga['opzioniriga']['null']}) rec_riga['nomeriga'] = r.get('name') orig = tipodato = str(r.find('datatype').text) parsato = tutti_parametri(tipodato) #parametrico = datoparametricoRE.match(tipodato) parametrico = is_parametrico(parsato) # parametrico = tipo rec_riga['tiporiga'] = get_tipo_mappato(parsato) if parametrico: if rec_riga['tiporiga'] in ('CharField',): lunghezza = get_parametro( parsato, positional=0, keyword="max_length") rec_riga['opzioniriga']['max_length'] = int(lunghezza) elif rec_riga['tiporiga'] == "DecimalField": max_digits = get_parametro( parsato, positional=0, keyword="max_digits") decimal_places = get_parametro( parsato, positional=1, keyword="decimal_places") rec_riga['opzioniriga']['max_digits'] = int(max_digits) rec_riga['opzioniriga']['decimal_places'] = int( decimal_places) arg_avanzati = parsato["keyword_args"] for x in arg_avanzati: rec_riga['opzioniriga'][x] = arg_avanzati[x] elif unmanaged: if tipodato in ('CHAR', 'VARCHAR'): rec_riga['opzioniriga']['max_length'] = 1000000 if r.get('name') == pk_singola: # chiave singola rec_riga['opzioniriga']['primary_key'] = True if not rec_riga['opzioniriga']['null']: del rec_riga['opzioniriga']['null'] del rec_riga['opzioniriga']['blank'] if rec_riga['tiporiga'] in ('CharField', 'TextField') and 'blank' in rec_riga['opzioniriga']: del rec_riga['opzioniriga']['blank'] rec_riga['opzioniriga'] = ','.join( f'{k}={v}' for k, v in rec_riga['opzioniriga'].items()) righetxt.append(tmpl_riga.format(**rec_riga)) rec_riga_serializers = dict() rec_riga_serializers['nomecampo'] = r.get('name') righeserializerstxt.append(rec_riga_serializers['nomecampo']) for tgt in fk_composite: nullabile = any(x['nullabile'] for x in fk_composite[tgt]) fk_fields = ",\n ".join( f"'{x['campo_tgt']}': '{x['campo']}'" for x in fk_composite[tgt]) asd = f"fk_{tgt} = CompositeForeignKey('{tgt}', on_delete=models.CASCADE, null={nullabile}, to_fields=" + "{\n " + fk_fields + "\n })" righetxt.append(asd) any_multiple_fk = True rec_tabella['righetxt'] = '\n '.join(righetxt) rec_serializers['campi'] = "','".join(righeserializerstxt) models.append(tmpl_model_class.format(**rec_tabella)) admin.append(tmpl_admin_class.format(**rec_tabella)) serializers.append(tmpl_serializer_class.format(**rec_serializers)) views.append(tmpl_view_class.format(nometab=rec_tabella['nometab'])) urls.append(tmpl_url_route.format( nometaburl=rec_tabella['nometab'].lower(), nometab=rec_tabella['nometab'])) resources.append(tmpl_resource_class.format( nometab=rec_tabella['nometab'])) models = tmpl_models_prefisso + tmpl_models.format(corpo='\n'.join(models), multipkmodel=multipkmodel if any_multiple_pk else "", multifkimport=multifkimport if any_multiple_fk else "") admin = tmpl_admin_prefisso + tmpl_admin.format(corpo='\n'.join(admin)) serializers = tmpl_serializers_prefisso + tmpl_serializers.format(corpo='\n'.join(serializers)) views = tmpl_views_prefisso + tmpl_views.format(corpo='\n'.join(views)) urls = tmpl_urls_prefisso + tmpl_urls.format(urls='\n'.join(urls)) resources = tmpl_resources_prefisso + tmpl_resources.format(corpo='\n'.join(resources)) for tgt_name, tgt_path in tgtfil.items(): with open(tgt_path, 'w') as f: print( f"Writing {tgt_name} to {tgt_path}, autopep8: {not arguments['--no-format']}...") if arguments['--no-format']: f.write(globals()[tgt_name]) else: f.write(autopep8.fix_code(globals()[tgt_name]))