diff --git a/.gitignore b/.gitignore index 13d1490..d684f1c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ # ---> Python + +conf.py + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/source/comerciodigital/__init__.py b/source/comerciodigital/__init__.py new file mode 100644 index 0000000..195aadd --- /dev/null +++ b/source/comerciodigital/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python3 + +from .comercio import PACComercioDigital diff --git a/source/comerciodigital/comercio.py b/source/comerciodigital/comercio.py new file mode 100644 index 0000000..7cfe984 --- /dev/null +++ b/source/comerciodigital/comercio.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +import logging + +import lxml.etree as ET +import requests +from requests.exceptions import ConnectionError + +from .conf import DEBUG, AUTH + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + +logging.getLogger('requests').setLevel(logging.ERROR) + + +TIMEOUT = 10 + + +class PACComercioDigital(object): + ws = 'https://{}.comercio-digital.mx/{}' + api = 'https://app2.comercio-digital.mx/{}' + URL = { + 'timbra': ws.format('ws', 'timbre/timbrarV5.aspx'), + 'cancel': ws.format('cancela', 'cancela3/cancelarUuid'), + 'cancelxml': ws.format('cancela', 'cancela3/cancelarXml'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + CODES = { + '000': '000 Exitoso', + '004': '004 RFC {} ya esta dado de alta con Estatus=A', + '704': '704 Usuario Invalido', + } + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + + if DEBUG: + ws = 'https://pruebas.comercio-digital.mx/{}' + URL = { + 'timbra': ws.format('timbre/timbrarV5.aspx'), + 'cancel': ws.format('cancela3/cancelarUuid'), + 'cancelxml': ws.format('cancela3/cancelarXml'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + + def __init__(self): + self.error = '' + self.cfdi_uuid = '' + self.date_stamped = '' + + def _error(self, msg): + self.error = str(msg) + log.error(msg) + return + + def _post(self, url, data, headers={}): + result = None + headers['host'] = url.split('/')[2] + headers['Content-type'] = 'text/plain' + headers['Connection'] = 'Keep-Alive' + + try: + result = requests.post(url, data=data, headers=headers, timeout=TIMEOUT) + except ConnectionError as e: + self._error(e) + + return result + + def _validate_cfdi(self, xml): + """ + Comercio Digital solo soporta la declaraciĆ³n con doble comilla + """ + tree = ET.fromstring(xml.encode()) + xml = ET.tostring(tree, + pretty_print=True, doctype='') + return xml + + def stamp(self, cfdi, auth={}): + if DEBUG or not auth: + auth = AUTH + + url = self.URL['timbra'] + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'tipo': 'XML', + } + cfdi = self._validate_cfdi(cfdi) + result = self._post(url, cfdi, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if 'errmsg' in result.headers: + self._error(result.headers['errmsg']) + return '' + + xml = result.content + tree = ET.fromstring(xml) + self.cfdi_uuid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=self.NS_CFDI) + self.date_stamped = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@FechaTimbrado)', + namespaces=self.NS_CFDI) + + return xml.decode() + + def _get_data_cancel(self, cfdi, info, auth): + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipo = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + uid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=NS_CFDI) + data = ( + f"USER={auth['user']}", + f"PWDW={auth['pass']}", + f"RFCE={rfc_emisor}", + f"UUID={uid}", + f"PWDK={info['pass']}", + f"KEYF={info['key']}", + f"CERT={info['cer']}", + f"TIPO={info['tipo']}", + f"ACUS=SI", + f"RFCR={rfc_receptor}", + f"TIPOC={tipo}", + f"TOTAL={total}", + ) + return '\n'.join(data) + + def cancel(self, cfdi, info, auth={}): + if not auth: + auth = AUTH + url = self.URL['cancel'] + data = self._get_data_cancel(cfdi, info, auth) + + result = self._post(url, data) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + return result.text + + def _get_headers_cancel_xml(self, cfdi, info, auth): + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipo = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'rfcr': rfc_receptor, + 'total': total, + 'tipocfdi': tipo, + } + headers.update(info) + + return headers + + def cancel_xml(self, cfdi, xml, info, auth={}): + if not auth: + auth = AUTH + url = self.URL['cancelxml'] + headers = self._get_headers_cancel_xml(cfdi, info, auth) + result = self._post(url, xml, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + return result.text + + def _get_data_client(self, auth, values): + data = [f"usr_ws={auth['user']}", f"pwd_ws={auth['pass']}"] + fields = ( + 'rfc_contribuyente', + 'nombre_contribuyente', + 'calle', + 'noExterior', + 'noInterior', + 'colonia', + 'localidad', + 'municipio', + 'estado', + 'pais', + 'cp', + 'contacto', + 'telefono', + 'email', + 'rep_nom', + 'rep_rfc', + 'email_fact', + 'pwd_asignado', + ) + data += [f"{k}={values[k]}" for k in fields] + + return '\n'.join(data) + + def client_add(self, data): + auth = AUTH + url = self.URL['client'] + data = self._get_data_client(auth, data) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + + def client_balance(self, data): + url = self.URL['saldo'] + host = url.split('/')[2] + headers = { + 'Content-type': 'text/plain', + 'Host': host, + 'Connection' : 'Keep-Alive', + } + try: + result = requests.get(url, params=data, headers=headers, timeout=TIMEOUT) + except ConnectionError as e: + self._error(e) + return '' + + if result.status_code != 200: + return '' + + if result.text == self.CODES['704']: + self._error(result.text) + return '' + + return result.text + + def client_add_timbres(self, data, auth={}): + if not auth: + auth = AUTH + url = self.URL['timbres'] + data = '\n'.join(( + f"usr_ws={auth['user']}", + f"pwd_ws={auth['pass']}", + f"rfc_recibir={data['rfc']}", + f"num_timbres={data['timbres']}" + )) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + diff --git a/source/comerciodigital/conf.py.example b/source/comerciodigital/conf.py.example new file mode 100644 index 0000000..6006207 --- /dev/null +++ b/source/comerciodigital/conf.py.example @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +# ~ Siempre consulta la documentaciĆ³n de PAC +# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC +# ~ NO cambies las credenciales de prueba + +DEBUG = True + + +AUTH = { + 'user': '', + 'pass': '', +} + + +if DEBUG: + AUTH = { + 'user': 'AAA010101AAA', + 'pass': 'PWD', + }