Sellar y timbrar con ISH

This commit is contained in:
Mauricio Baeza 2021-04-30 23:06:36 -05:00
parent ce57ea169a
commit cdd223ffc8
8 changed files with 567 additions and 17 deletions

View File

@ -24,6 +24,13 @@ class CFDI(object):
'ns': {'nomina12': _nomina},
'schema': f' {_nomina} http://www.sat.gob.mx/sitio_internet/cfd/nomina/nomina12.xsd',
}
_tax_locales = 'http://www.sat.gob.mx/implocal'
TAX_LOCALES = {
'version': '1.0',
'prefix': _tax_locales,
'ns': {'implocal': _tax_locales},
'schema': f' {_tax_locales} http://www.sat.gob.mx/sitio_internet/cfd/implocal/implocal.xsd',
}
def __init__(self):
self.error = ''
@ -62,12 +69,14 @@ class CFDI(object):
self._node_complement = False
self._exists_pagos = False
self._exists_nomina = False
self._exists_tax_locales = False
if not 'complementos' in data:
return
complements = data['complementos']
self._exists_pagos = 'pagos' in complements
self._exists_nomina = 'nomina' in complements
self._exists_tax_locales = 'impuestos_locales' in complements
if self._exists_pagos:
self._node_complement = True
@ -75,6 +84,9 @@ class CFDI(object):
if self._exists_nomina:
self._node_complement = True
self._schema += self.NOMINA['schema']
if self._exists_tax_locales:
self._node_complement = True
self._schema += self.TAX_LOCALES['schema']
return
def _comprobante(self, attr):
@ -86,6 +98,8 @@ class CFDI(object):
NSMAP.update(self.PAGOS['ns'])
if self._exists_nomina:
NSMAP.update(self.NOMINA['ns'])
if self._exists_tax_locales:
NSMAP.update(self.TAX_LOCALES['ns'])
attr_qname = ET.QName(
'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation')
@ -177,9 +191,10 @@ class CFDI(object):
if self._exists_pagos:
self._pagos(complemento, data['pagos'])
if self._exists_nomina:
self._nomina(complemento, data['nomina'])
if self._exists_tax_locales:
self._tax_locales(complemento, data['impuestos_locales'])
return
def _pagos(self, complemento, data):
@ -241,3 +256,16 @@ class CFDI(object):
sub_name = f"{{{self.NOMINA['prefix']}}}SubsidioAlEmpleo"
ET.SubElement(sub_node, sub_name, subsidio)
return
def _tax_locales(self, complemento, data):
traslados = data.pop('traslados', ())
retenciones = data.pop('retenciones', ())
node_name = f"{{{self.TAX_LOCALES['prefix']}}}ImpuestosLocales"
attr = {'version': self.TAX_LOCALES['version']}
attr.update(data)
node_tax = ET.SubElement(complemento, node_name, attr)
for traslado in traslados:
node_name = f"{{{self.TAX_LOCALES['prefix']}}}TrasladosLocales"
ET.SubElement(node_tax, node_name, traslado)
return

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python
from .comerciodigital import PACComercioDigital

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python3
from .comercio import PACComercioDigital

View File

@ -0,0 +1,383 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import lxml.etree as ET
import requests
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
TIMEOUT = 10
class PACComercioDigital(object):
ws = 'https://{}.comercio-digital.mx/{}'
api = 'https://app2.comercio-digital.mx/{}'
URL = {
'timbra': ws.format('ws', 'timbre/timbrarV5.aspx'),
'cancel': ws.format('cancela', 'cancela3/cancelarUuid'),
'cancelxml': ws.format('cancela', 'cancela3/cancelarXml'),
'status': ws.format('cancela', 'arws/consultaEstatus'),
'client': api.format('x3/altaEmpresa'),
'saldo': api.format('x3/saldo'),
'timbres': api.format('x3/altaTimbres'),
}
CODES = {
'000': '000 Exitoso',
'004': '004 RFC {} ya esta dado de alta con Estatus=A',
'704': '704 Usuario Invalido',
'702': '702 Error rfc/empresa invalido',
}
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
if DEBUG:
ws = 'https://pruebas.comercio-digital.mx/{}'
ws6 = 'https://pruebas6.comercio-digital.mx/arws/{}'
URL = {
'timbra': ws.format('timbre/timbrarV5.aspx'),
'cancel': ws.format('cancela3/cancelarUuid'),
'cancelxml': ws.format('cancela3/cancelarXml'),
'status': ws6.format('consultaEstatus'),
'client': api.format('x3/altaEmpresa'),
'saldo': api.format('x3/saldo'),
'timbres': api.format('x3/altaTimbres'),
}
def __init__(self):
self.error = ''
def _error(self, msg):
self.error = str(msg)
log.error(msg)
return
def _post(self, url, data, headers={}):
result = None
headers['host'] = url.split('/')[2]
headers['Content-type'] = 'text/plain'
headers['Connection'] = 'Keep-Alive'
try:
result = requests.post(url, data=data, headers=headers, timeout=TIMEOUT)
except ConnectionError as e:
self._error(e)
return result
def _validate_cfdi(self, xml):
"""
Comercio Digital solo soporta la declaración con doble comilla
"""
tree = ET.fromstring(xml.encode())
xml = ET.tostring(tree,
pretty_print=True, doctype='<?xml version="1.0" encoding="utf-8"?>')
return xml
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
url = self.URL['timbra']
headers = {
'usrws': auth['user'],
'pwdws': auth['pass'],
'tipo': 'XML',
}
cfdi = self._validate_cfdi(cfdi)
result = self._post(url, cfdi, headers)
if result is None:
return ''
if result.status_code != 200:
return ''
if 'errmsg' in result.headers:
self._error(result.headers['errmsg'])
return ''
xml = result.content
tree = ET.fromstring(xml)
cfdi_uuid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=self.NS_CFDI)
date_stamped = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@FechaTimbrado)',
namespaces=self.NS_CFDI)
data = {
'xml': xml.decode(),
'uuid': cfdi_uuid,
'date': date_stamped,
}
return data
def _get_data_cancel(self, cfdi, info, auth):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi.encode())
tipo = tree.xpath(
'string(//cfdi:Comprobante/@TipoDeComprobante)',
namespaces=NS_CFDI)
total = tree.xpath(
'string(//cfdi:Comprobante/@Total)',
namespaces=NS_CFDI)
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
rfc_receptor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)',
namespaces=NS_CFDI)
uid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=NS_CFDI)
data = (
f"USER={auth['user']}",
f"PWDW={auth['pass']}",
f"RFCE={rfc_emisor}",
f"UUID={uid}",
f"PWDK={info['pass']}",
f"KEYF={info['key']}",
f"CERT={info['cer']}",
f"TIPO={info['tipo']}",
f"ACUS=SI",
f"RFCR={rfc_receptor}",
f"TIPOC={tipo}",
f"TOTAL={total}",
)
return '\n'.join(data)
def cancel(self, cfdi, info, auth={}):
if not auth:
auth = AUTH
url = self.URL['cancel']
data = self._get_data_cancel(cfdi, info, auth)
result = self._post(url, data)
if result is None:
return ''
if result.status_code != 200:
return ''
if result.headers['codigo'] != '000':
self._error(result.headers['errmsg'])
return ''
return result.text
def _get_headers_cancel_xml(self, cfdi, info, auth):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi.encode())
tipocfdi = tree.xpath(
'string(//cfdi:Comprobante/@TipoDeComprobante)',
namespaces=NS_CFDI)
total = tree.xpath(
'string(//cfdi:Comprobante/@Total)',
namespaces=NS_CFDI)
rfc_receptor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)',
namespaces=NS_CFDI)
headers = {
'usrws': auth['user'],
'pwdws': auth['pass'],
'rfcr': rfc_receptor,
'total': total,
'tipocfdi': tipocfdi,
}
headers.update(info)
return headers
def cancel_xml(self, xml, auth={}, cfdi='', info={'tipo': 'cfdi3.3'}):
if DEBUG or not auth:
auth = AUTH
url = self.URL['cancelxml']
headers = self._get_headers_cancel_xml(cfdi, info, auth)
result = self._post(url, xml, headers)
if result is None:
return ''
if result.status_code != 200:
return ''
if result.headers['codigo'] != '000':
self._error(result.headers['errmsg'])
return ''
tree = ET.fromstring(result.text)
date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19]
data = {
'acuse': result.text,
'date': date_cancel,
}
return data
def status(self, data, auth={}):
if not auth:
auth = AUTH
url = self.URL['status']
data = (
f"USER={auth['user']}",
f"PWDW={auth['pass']}",
f"RFCR={data['rfc_receptor']}",
f"RFCE={data['rfc_emisor']}",
f"TOTAL={data['total']}",
f"UUID={data['uuid']}",
)
data = '\n'.join(data)
result = self._post(url, data)
if result is None:
return ''
if result.status_code != 200:
self._error(result.status_code)
return self.error
return result.text
def _get_data_client(self, auth, values):
data = [f"usr_ws={auth['user']}", f"pwd_ws={auth['pass']}"]
fields = (
'rfc_contribuyente',
'nombre_contribuyente',
'calle',
'noExterior',
'noInterior',
'colonia',
'localidad',
'municipio',
'estado',
'pais',
'cp',
'contacto',
'telefono',
'email',
'rep_nom',
'rep_rfc',
'email_fact',
'pwd_asignado',
)
data += [f"{k}={values[k]}" for k in fields]
return '\n'.join(data)
def client_add(self, data):
auth = AUTH
url = self.URL['client']
data = self._get_data_client(auth, data)
result = self._post(url, data)
if result is None:
return False
if result.status_code != 200:
self._error(f'Code: {result.status_code}')
return False
if result.text != self.CODES['000']:
self._error(result.text)
return False
return True
def client_balance(self, data, rfc=''):
url = self.URL['saldo']
host = url.split('/')[2]
headers = {
'Content-type': 'text/plain',
'Host': host,
'Connection' : 'Keep-Alive',
}
data = {'usr': data['user'], 'pwd': data['pass']}
try:
result = requests.get(url, params=data, headers=headers, timeout=TIMEOUT)
except ConnectionError as e:
self._error(e)
return ''
if result.status_code != 200:
return ''
if result.text == self.CODES['704']:
self._error(result.text)
return ''
if result.text == self.CODES['702']:
self._error(result.text)
return ''
return result.text
def client_add_timbres(self, data, auth={}):
if not auth:
auth = AUTH
url = self.URL['timbres']
data = '\n'.join((
f"usr_ws={auth['user']}",
f"pwd_ws={auth['pass']}",
f"rfc_recibir={data['rfc']}",
f"num_timbres={data['timbres']}"
))
result = self._post(url, data)
if result is None:
return False
if result.status_code != 200:
self._error(f'Code: {result.status_code}')
return False
if result.text != self.CODES['000']:
self._error(result.text)
return False
return True

View File

@ -0,0 +1,38 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
# ~ Siempre consulta la documentación de PAC
# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
# ~ NO cambies las credenciales de prueba
DEBUG = False
AUTH = {
'user': '',
'pass': '',
}
if DEBUG:
AUTH = {
'user': 'AAA010101AAA',
'pass': 'PWD',
}

View File

@ -6,14 +6,17 @@ from pathlib import Path
from secrets import token_hex
from django.core.exceptions import ObjectDoesNotExist
from django.utils.timezone import now
from ..conf import API_TOKEN
from .cfdi_cert import SATCertificate
from .cfdi_xml import CFDI
from .pacs import PACComercioDigital
from ..models import Clients
CURRENT_DIR = Path(__file__).resolve().parent #.parent
CURRENT_DIR = Path(__file__).resolve().parent
CADENA = 'xslt/cadena.xslt'
def validate_token(token):
@ -89,15 +92,30 @@ def validate_cfdi(post):
return True, data
def send_to_pac(data):
pac = PACComercioDigital()
xml = pac.stamp(data)
if pac.error:
print(pac.error)
return '', pac.error
return xml, ''
def send_stamp(data):
msg = ''
emisor = data['emisor']
path_xslt = CURRENT_DIR / 'xslt/cadena.xslt'
path_xslt = CURRENT_DIR / CADENA
cert = SATCertificate(emisor.cer, emisor.key)
cfdi = CFDI()
cfdi.make_xml(data['cfdi'])
cfdi.stamp(cert, path_xslt)
try:
cfdi.make_xml(data['cfdi'])
cfdi.stamp(cert, path_xslt)
except Exception as e:
return '', str(e)
return cfdi.xml, msg
xml, msg = send_to_pac(cfdi.xml)
return xml, msg

View File

@ -81,4 +81,4 @@ class ViewCfdi(View):
if msg:
return HttpResponse(msg, status=202)
return HttpResponse(xml, status=201)
return JsonResponse(xml, status=201)

View File

@ -3,6 +3,7 @@
import unittest
import warnings
import httpx
from uuid import UUID
URL_API = 'http://127.0.0.1:8000/api/{}'
@ -60,6 +61,72 @@ CFDI_MINIMO = {
}
}
CFDI_ISH = {
"comprobante": {
"TipoCambio": "1",
"Moneda": "MXN",
"TipoDeComprobante": "I",
"LugarExpedicion": "06850",
"SubTotal": "1409.64",
"Total": "1677.47",
"FormaPago": "03",
"MetodoPago": "PUE"
},
"emisor": {
"Rfc": "EKU9003173C9",
"RegimenFiscal": "601"
},
"receptor": {
"Rfc": "BASM740115RW0",
"UsoCFDI": "G01"
},
"conceptos": [
{
"ClaveProdServ": "81112106",
"Cantidad": "1.0",
"ClaveUnidad": "18",
"Descripcion": "Proveedores de servicios de aplicación",
"ValorUnitario": "1409.64",
"Importe": "1409.64",
"impuestos": {
"traslados": [
{
"Base": "1409.64",
"Impuesto": "002",
"TipoFactor": "Tasa",
"TasaOCuota": "0.160000",
"Importe": "225.54"
}
]
}
}
],
"impuestos": {
"TotalImpuestosTrasladados": "225.54",
"traslados": [
{
"Impuesto": "002",
"TipoFactor": "Tasa",
"TasaOCuota": "0.160000",
"Importe": "225.54"
}
]
},
"complementos": {
"impuestos_locales": {
"TotaldeTraslados": "42.29",
"TotaldeRetenciones": "0.00",
"traslados": [
{
"ImpLocTrasladado": "ISH",
"TasadeTraslado": "3.00",
"Importe": "42.29"
}
]
}
}
}
def ignore_warnings(test_func):
def do_test(self, *args, **kwargs):
@ -128,22 +195,32 @@ class TestCfdi(unittest.TestCase):
print(f'In method: {self._testMethodName}')
self.url = URL_API.format('cfdi/')
# ~ def test_stamp_cfdi_emisor_not_exists(self):
# ~ expected = 202
# ~ msg = 'Emisor no existe'
# ~ headers = {'Token': '12345'}
# ~ cfdi = CFDI_MINIMO.copy()
# ~ cfdi['emisor']['Rfc'] = 'No_exists'
# ~ result = httpx.post(self.url, headers=headers, json=cfdi)
# ~ self.assertEqual(result.text, msg)
# ~ self.assertEqual(expected, result.status_code)
# ~ @unittest.skip('temp')
def test_stamp_cfdi_emisor_not_exists(self):
expected = 202
msg = 'Emisor no existe'
headers = {'Token': '12345'}
cfdi = CFDI_MINIMO.copy()
cfdi['emisor']['Rfc'] = 'No_exists'
result = httpx.post(self.url, headers=headers, json=cfdi)
self.assertEqual(result.text, msg)
self.assertEqual(expected, result.status_code)
def test_stamp_cfdi(self):
expected = 201
headers = {'Token': '12345'}
result = httpx.post(self.url, headers=headers, json=CFDI_MINIMO)
print(result.text)
data = result.json()
self.assertEqual(expected, result.status_code)
self.assertTrue(bool(UUID(data['uuid'])))
def test_stamp_cfdi_ish(self):
expected = 201
headers = {'Token': '12345'}
result = httpx.post(self.url, headers=headers, json=CFDI_ISH)
data = result.json()
self.assertEqual(expected, result.status_code)
self.assertTrue(bool(UUID(data['uuid'])))
if __name__ == '__main__':