sangue/django/contatti_app/views.py

308 lines
12 KiB
Python

# def index(request):
# return HttpResponse("Hello, %s!" % (request.user.username if request.user.is_authenticated else 'World'))
import csv
import collections
import json
import re
from copy import deepcopy
from io import TextIOWrapper
from contatti_app.models import Email, PersonaFisica, Telefono, Recapito
from dati_geo_app.models import CAP, Comune
from django_auto_prefetching import AutoPrefetchViewSetMixin
from rest_framework import viewsets
from rest_framework.authentication import (BasicAuthentication,
SessionAuthentication)
from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated
from django.contrib import messages
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.forms import AuthenticationForm
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from django.http import HttpResponse, JsonResponse
from django.shortcuts import redirect, render
from django.template.response import TemplateResponse
from django.utils.http import url_has_allowed_host_and_scheme
from . import models, serializers
def iterable_but_not_str(obj):
return isinstance(obj, collections.abc.Iterable) and not isinstance(obj, (str, bytes))
def googleimport_preview(request):
def normalizza_generico(*x):
while iterable_but_not_str(x):
if len(x) == 1:
x = x[0]
else:
return tuple(normalizza_generico(y) for y in x)
return x.strip().lower()
normalizza_soggetto = normalizza_generico
normalizza_persona = normalizza_generico
normalizza_email = normalizza_generico
def normalizza_telefono(x):
if iterable_but_not_str(x):
x = x[0]
x=re.sub(r'[^0-9+]', '', x.strip())
if x.startswith('00'):
x='+'+x[2:]
if not x.startswith('+'):
x='+39'+x
return x
csv_file = request.FILES['csv_file']
csv_file_text = TextIOWrapper(csv_file, encoding='utf-8')
reader = csv.DictReader(csv_file_text)
contacts = []
recapiti_db = Recapito.objects.select_related(
'polymorphic_ctype',
'soggetto__personafisica',
'soggetto__personagiuridica'
).filter(
polymorphic_ctype__model__in = {'email', 'pec', 'telefono', }
)
def new_contatto():
return {'emails': set(), 'telefoni': set()}
contatti_db = collections.defaultdict(
lambda: collections.defaultdict(new_contatto))
soggetto_pk_by_recapito = collections.defaultdict(set)
for recapito_valore in recapiti_db:
soggetto = recapito_valore.soggetto
soggetto_pk = soggetto.pk
tipo_sogg = recapito_valore.soggetto.polymorphic_ctype.name
if tipo_sogg == 'persona fisica':
nome = soggetto.personafisica.nome
cognome = soggetto.personafisica.cognome
soggetto_descr = normalizza_persona(nome, cognome)
soggetto_descr_alt = normalizza_soggetto(f'{nome} {cognome}')
contatto = contatti_db[soggetto_descr]
contatti_db[soggetto_descr_alt] = contatto
elif tipo_sogg == 'persona giuridica':
soggetto_descr = normalizza_soggetto(
soggetto.personagiuridica.denominazione)
contatto = contatti_db[soggetto_descr]
else:
raise NotImplementedError
contatto_univoco = contatto[soggetto_pk]
if recapito_valore.polymorphic_ctype.model in {'email', 'pec', }:
recapito_descr = recapito_valore.email.indirizzo_email
contatto_univoco['emails'].add(recapito_descr)
elif recapito_valore.polymorphic_ctype.model in {'telefono', }:
recapito_descr = recapito_valore.telefono.numero
contatto_univoco['telefoni'].add(recapito_descr)
soggetto_pk_by_recapito[recapito_descr] = soggetto_pk
soggetto_pk_by_recapito = dict(soggetto_pk_by_recapito)
contatti_db = {k: dict(v) for k, v in contatti_db.items()}
warning_altro_sogg_emails = set()
warning_altro_sogg_telefoni = set()
contacts = []
for row in reader:
nome = row['Given Name']
cognome = row['Family Name']
contatto = normalizza_persona(nome, cognome)
contatto_alt = normalizza_soggetto(row['Name'])
soggetti_gia_presenti = {k:v for c in (contatto, contatto_alt) if c in contatti_db for k,v in contatti_db[c].items()}
emails = [row[f'E-mail {n} - Value'] for n in range(1, 4)]
emails = [normalizza_email(y)
for x in emails for y in x.split(':::') if y.strip()]
telefoni = [row[f'Phone {n} - Value'] for n in range(1, 4)]
telefoni = [normalizza_telefono(y)
for x in telefoni for y in x.split(':::') if y.strip()]
for recapiti,tipo_recapito in (
(emails,'email',),
(telefoni,'telefono')
):
for recapito_valore in recapiti:
contact = dict()
contacts.append(contact)
contact['recapito']=tipo_recapito
contact['nuovo_contatto'] = not soggetti_gia_presenti
contact['nome'], contact['cognome'] = nome, cognome
contact['full_name'] = contatto_alt
contact['valore']=recapito_valore
if recapito_valore in soggetto_pk_by_recapito:
pk_recapito = soggetto_pk_by_recapito[recapito_valore]
if pk_recapito in soggetti_gia_presenti:
contact['warning']=False
contact['import_status']=False
else:
contact['warning']=True
contact['import_status']='warning'
else:
contact['import_status']=True
# TODO: considerare anche la casistica in cui esiste lo stesso contatto su diversi soggetti
# già direttamente nel file da importare.
# if warning_altro_sogg_emails:
# emails = Email.objects.all().select_related('soggetto')
# emails = {normalizza_email(x['indirizzo_email']): str(
# x.soggetto) for x in emails}
# for soggetto in contacts:
# for r in soggetto:
# if r['recapito'] == 'email' and r['valore'] in emails:
# r['warning_altro_soggetto'] = emails[r['valore']]
# if warning_altro_sogg_telefoni:
# telefoni = Telefono.objects.all().select_related('soggetto')
# telefoni = {normalizza_telefono(x['numero']): str(
# x.soggetto) for x in telefoni}
# for soggetto in contacts:
# for r in soggetto:
# if r['recapito'] == 'telefono' and r['valore'] in telefoni:
# r['warning_altro_soggetto'] = telefoni[r['valore']]
# contacts = [{**{k: v for k, v in soggetto.items() if k != 'recapiti'}, **recapito}
# for soggetto in contacts for recapito in soggetto['recapiti']]
context = {'contacts': json.dumps(contacts)}
return context
def import_google_contacts_confirm(request):
if request.method == 'POST':
selected_indices = [int(key.split('_')[1])
for key in request.POST if key.startswith('import_')]
csv_file = request.session.get('csv_file')
if csv_file and selected_indices:
reader = csv.DictReader(csv_file)
contacts = list(reader)
selected_contacts = [contacts[i] for i in selected_indices]
for row in selected_contacts:
nome = row['Name']
email = row['E-mail 1 - Value']
telefono = row['Phone 1 - Value']
# Importa il soggetto solo se l'utente lo ha selezionato
if nome and row.get('import') == '1':
try:
soggetto = SoggettoContattabile.objects.get(nome=nome)
# Effettua l'aggiornamento del soggetto
soggetto.email = email
soggetto.telefono = telefono
soggetto.save()
messages.success(request, f'Updated contact: {nome}')
except SoggettoContattabile.DoesNotExist:
# Crea un nuovo soggetto
soggetto = SoggettoContattabile.objects.create(
nome=nome)
if email:
Email.objects.create(
soggetto=soggetto, indirizzo_email=email)
if telefono:
Telefono.objects.create(
soggetto=soggetto, numero=telefono)
messages.success(request, f'Imported contact: {nome}')
# Reindirizza a una pagina dopo l'importazione
return redirect('admin')
# Reindirizza se si verifica un errore o non sono stati selezionati contatti
return redirect('import_google_contacts')
# --------------- FINE PREFISSO TEMPLATE ---------------
class ContattoAziendale_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.ContattoAziendale.objects.all()
serializer_class = serializers.ContattoAziendaleSerializer
class Recapito_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Recapito.objects.all()
serializer_class = serializers.RecapitoSerializer
class Telefono_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Telefono.objects.all()
serializer_class = serializers.TelefonoSerializer
class Email_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Email.objects.all()
serializer_class = serializers.EmailSerializer
class PersonaFisica_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.PersonaFisica.objects.all()
serializer_class = serializers.PersonaFisicaSerializer
class PersonaGiuridica_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.PersonaGiuridica.objects.all()
serializer_class = serializers.PersonaGiuridicaSerializer
class Sede_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Sede.objects.all()
serializer_class = serializers.SedeSerializer
class Fax_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Fax.objects.all()
serializer_class = serializers.FaxSerializer
class Pec_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Pec.objects.all()
serializer_class = serializers.PecSerializer
class SoggettoContattabile_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.SoggettoContattabile.objects.all()
serializer_class = serializers.SoggettoContattabileSerializer
class Indirizzo_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Indirizzo.objects.all()
serializer_class = serializers.IndirizzoSerializer
class Societa_View(viewsets.ModelViewSet):
# authentication_classes = [BasicAuthentication, SessionAuthentication, TokenAuthentication]
# permission_classes = [DjangoModelPermissions]
queryset = models.Societa.objects.all()
serializer_class = serializers.SocietaSerializer