220 lines
7.5 KiB
Python
220 lines
7.5 KiB
Python
from django.contrib import admin
|
|
from django.contrib import messages
|
|
from django.shortcuts import render
|
|
from django.shortcuts import HttpResponseRedirect
|
|
from django.utils.html import format_html
|
|
from django.urls import reverse
|
|
from django.db import IntegrityError
|
|
|
|
from .models import Clients
|
|
from .models import Cfdi
|
|
from .models import CfdiDetails
|
|
from .models import DetailTaxes
|
|
from .models import Taxes
|
|
from . import forms
|
|
from .util import util
|
|
|
|
|
|
@admin.register(Clients)
|
|
class AdminClients(admin.ModelAdmin):
|
|
form = forms.AdminFormClients
|
|
fields = ('rfc', 'name', 'file_key', 'file_cer', 'contra')
|
|
list_display = (
|
|
'rfc',
|
|
'name',
|
|
'serial_number',
|
|
'date_from',
|
|
'date_to')
|
|
search_fields = ('rfc', 'name')
|
|
actions = ['upload_xml', 'upload_zip']
|
|
actions_on_top = True
|
|
|
|
def save_model(self, request, obj, form, change):
|
|
if form.cert:
|
|
obj.serial_number = form.cert.serial_number
|
|
obj.date_from = form.cert.not_before
|
|
obj.date_to = form.cert.not_after
|
|
obj.cer = form.cert.cer_pem
|
|
obj.key = form.cert.key_enc
|
|
obj.rfc = obj.rfc.upper()
|
|
super().save_model(request, obj, form, change)
|
|
|
|
def upload_xml(self, request, queryset):
|
|
template = 'upload_xml.html'
|
|
form = None
|
|
user = getattr(request, 'user', None)
|
|
msg = ''
|
|
|
|
if 'cancel' in request.POST:
|
|
msg = 'Proceso cancelado, NO se importo ningún documento'
|
|
self.message_user(request, msg, level=messages.ERROR)
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
if 'apply' in request.POST:
|
|
data = request.POST
|
|
file_xml = request.FILES['file_xml']
|
|
|
|
level = messages.ERROR
|
|
|
|
if not file_xml.content_type in ('text/xml', 'application/xml'):
|
|
msg = 'El archivo no es XML'
|
|
self.message_user(request, msg, level=level)
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
rfcs = [q.rfc for q in queryset]
|
|
data = util.get_data_from_cfdi(file_xml.read())
|
|
if Cfdi.objects.filter(uuid=data['uuid']).exists():
|
|
msg = 'CFDI ya existe'
|
|
elif not data['emisor_rfc'] in rfcs and not data['receptor_rfc'] in rfcs:
|
|
msg = 'CFDI no corresponde al RFC'
|
|
else:
|
|
invoice = Cfdi.objects.create(**data)
|
|
msg = 'CFDI importado correctamente'
|
|
level = messages.INFO
|
|
|
|
self.message_user(request, msg, level=level)
|
|
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
if not form:
|
|
selected = request.POST.getlist(admin.helpers.ACTION_CHECKBOX_NAME)
|
|
form = forms.AdminUploadXmlForm(initial={'_selected_action': selected})
|
|
data = {'form': form}
|
|
return render(request, template, data)
|
|
upload_xml.short_description = 'Importar XML'
|
|
|
|
def upload_zip(self, request, queryset):
|
|
template = 'upload_zip.html'
|
|
form = None
|
|
user = getattr(request, 'user', None)
|
|
msg = ''
|
|
|
|
if 'cancel' in request.POST:
|
|
msg = 'Proceso cancelado, NO se importo ningún archivo'
|
|
self.message_user(request, msg, level=messages.ERROR)
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
if 'apply' in request.POST:
|
|
data = request.POST
|
|
file_zip = request.FILES['file_zip']
|
|
|
|
level = messages.ERROR
|
|
|
|
if not file_zip.content_type in ('application/zip',):
|
|
msg = 'El archivo no es ZIP'
|
|
self.message_user(request, msg, level=level)
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
level = messages.INFO
|
|
rfcs = [q.rfc for q in queryset]
|
|
zf = util.read_zip(file_zip.read())
|
|
total = len(zf.namelist())
|
|
import_ok = 0
|
|
for name in zf.namelist():
|
|
source = zf.open(name).read()
|
|
data = util.get_data_from_cfdi(source)
|
|
if Cfdi.objects.filter(uuid=data['uuid']).exists():
|
|
continue
|
|
elif not data['emisor_rfc'] in rfcs and not data['receptor_rfc'] in rfcs:
|
|
continue
|
|
else:
|
|
invoice = Cfdi.objects.create(**data)
|
|
import_ok += 1
|
|
|
|
msg = f'Archivos totales: {total}\nArchivos importados: {import_ok}'
|
|
|
|
self.message_user(request, msg, level=level)
|
|
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
if not form:
|
|
selected = request.POST.getlist(admin.helpers.ACTION_CHECKBOX_NAME)
|
|
form = forms.AdminUploadZipForm(initial={'_selected_action': selected})
|
|
data = {'form': form}
|
|
return render(request, template, data)
|
|
upload_zip.short_description = 'Importar ZIP'
|
|
|
|
|
|
@admin.register(Cfdi)
|
|
class AdminCfdi(admin.ModelAdmin):
|
|
form = forms.AdminFormCfdi
|
|
fields = ('file_xml',)
|
|
list_display = (
|
|
'uuid',
|
|
'link_xml',
|
|
'link_pdf',
|
|
'serie',
|
|
'folio',
|
|
'date_cfdi',
|
|
'type_cfdi',
|
|
'emisor_rfc',
|
|
'receptor_rfc',
|
|
'currency',
|
|
'type_change',
|
|
'subtotal_format',
|
|
'discount_format',
|
|
'total_format',
|
|
)
|
|
search_fields = ('uuid', 'emisor', 'receptor')
|
|
date_hierarchy = 'date_cfdi'
|
|
span_right = '<span style="float:right;">{:0,.2f}</span>'
|
|
|
|
def link_xml(self, obj):
|
|
link = reverse('down-xml', args=[obj.id])
|
|
html = format_html(f"<a href='{link}'>XML</a>")
|
|
return html
|
|
link_xml.short_description = 'XML'
|
|
|
|
def link_pdf(self, obj):
|
|
link = reverse('down-pdf', args=[obj.id])
|
|
html = format_html(f"<a href='{link}'>PDF</a>")
|
|
return html
|
|
link_pdf.short_description = 'PDF'
|
|
|
|
def _format(self, value):
|
|
return format_html(self.span_right.format(value))
|
|
|
|
def subtotal_format(self, obj):
|
|
return self._format(obj.subtotal)
|
|
subtotal_format.short_description = 'SubTotal'
|
|
subtotal_format.admin_order_field = 'subtotal'
|
|
|
|
def discount_format(self, obj):
|
|
f = format_html('<span style="float:right;"> - </span>')
|
|
if obj.discount:
|
|
f = self._format(obj.discount)
|
|
return f
|
|
discount_format.short_description = 'Descuento'
|
|
discount_format.admin_order_field = 'discount'
|
|
|
|
def total_format(self, obj):
|
|
return self._format(obj.total)
|
|
total_format.short_description = 'Total'
|
|
total_format.admin_order_field = 'total'
|
|
|
|
def has_change_permission(self, request, obj=None):
|
|
return False
|
|
|
|
def get_fields(self, request, obj=None):
|
|
fields = self.fields
|
|
if obj:
|
|
fields = [f.name for f in obj.__class__._meta.fields]
|
|
fields.remove('xml')
|
|
return fields
|
|
|
|
def save_model(self, request, obj, form, change):
|
|
data = form.data
|
|
details = data.pop('details', [])
|
|
taxes = data.pop('taxes', [])
|
|
obj = Cfdi(**data)
|
|
super().save_model(request, obj, form, change)
|
|
for detail in details:
|
|
dt = detail.pop('taxes', [])
|
|
obj_detail = CfdiDetails.objects.create(cfdi=obj, **detail)
|
|
if dt:
|
|
objects = [DetailTaxes(detail=obj_detail, **t) for t in dt]
|
|
DetailTaxes.objects.bulk_create(objects)
|
|
if taxes:
|
|
objects = [Taxes(cfdi=obj, **t) for t in taxes]
|
|
Taxes.objects.bulk_create(objects)
|