diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b2a7d2a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,6 @@ +# Lista de cambios + + +## v 0.1.0 [00-Jun-22] +--- +* Versión inicial diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..8acdd82 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ab90481 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +lxml diff --git a/source/cfdi-txt.py b/source/cfdi-txt.py new file mode 100755 index 0000000..618e995 --- /dev/null +++ b/source/cfdi-txt.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +import argparse +from helper import util + + +def main(args): + if args.dir_trabajo: + entrada = util.join(args.dir_trabajo, 'entrada') + sellados = util.join(args.dir_trabajo, 'sellados') + timbrados = util.join(args.dir_trabajo, 'timbrados') + else: + entrada = args.dir_entrada + sellados = args.dir_sellados + timbrados = args.dir_timbrados + + if args.generar: + util.make_cfdi(entrada, sellados, args.dir_cert, args.nombre) + + if args.timbrar: + util.stamp_pac(sellados, timbrados) + return + + +def _process_command_line_arguments(): + parser = argparse.ArgumentParser( + description='CFDI Test') + + parser.add_argument('-dc', '--dir-cert', default='') + help = "Nombre de los certificados, el predeterminado es 'cert'" + parser.add_argument('-n', '--nombre', help=help, default='cert') + + parser.add_argument('-d', '--dir-trabajo', dest='dir_trabajo', default='') + parser.add_argument('-de', '--dir-entrada', dest='dir_entrada', default='') + parser.add_argument('-ds', '--dir-sellados', dest='dir_sellados', default='') + parser.add_argument('-dt', '--dir-timbrados', dest='dir_timbrados', default='') + + parser.add_argument('-g', '--generar', dest='generar', + action='store_true', default=False, required=False) + parser.add_argument('-t', '--timbrar', dest='timbrar', + action='store_true', default=False, required=False) + + return parser.parse_args() + + +if __name__ == '__main__': + args = _process_command_line_arguments() + main(args) diff --git a/source/conf.py b/source/conf.py new file mode 100644 index 0000000..a2b6ccb --- /dev/null +++ b/source/conf.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +DEBUG = True + +TIMEOUT = 10 + +DELETE_FILES = False + +PAC_AUTH = { + 'user': 'AAA010101AAA', + 'pass': 'PWD', +} diff --git a/source/conf.py.ejemplo b/source/conf.py.ejemplo new file mode 100644 index 0000000..41128aa --- /dev/null +++ b/source/conf.py.ejemplo @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 + +DEBUG = True + +TIMEOUT = 10 diff --git a/source/helper/comercio.py b/source/helper/comercio.py new file mode 100644 index 0000000..295eca1 --- /dev/null +++ b/source/helper/comercio.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +import base64 +import logging + +import lxml.etree as ET +from . import mureq + +from conf import DEBUG + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + +logging.getLogger('requests').setLevel(logging.ERROR) + + +TIMEOUT = 10 + + +def pretty_print_POST(req): + """ + At this point it is completely built and ready + to be fired; it is "prepared". + + However pay attention at the formatting used in + this function because it is programmed to be pretty + printed and may differ from the actual request. + """ + print('{}\n{}\r\n{}\r\n\r\n{}'.format( + '-----------START-----------', + req.method + ' ' + req.url, + '\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()), + req.body, + )) + + +class PACComercioDigital(object): + ws = 'https://{}.comercio-digital.mx/{}' + api = 'https://app2.comercio-digital.mx/{}' + URL = { + 'timbra': ws.format('ws', 'timbre4/timbrarV5'), + 'cancel': ws.format('cancela', 'cancela4/cancelarUuid'), + 'cancelxml': ws.format('cancela', 'cancela4/cancelarXml'), + 'status': ws.format('cancela', 'arws/consultaEstatus'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + CODES = { + '000': '000 Exitoso', + '004': '004 RFC {} ya esta dado de alta con Estatus=A', + '704': '704 Usuario Invalido', + '702': '702 Error rfc/empresa invalido', + } + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/4', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + + if DEBUG: + ws = 'https://pruebas.comercio-digital.mx/{}' + ws6 = 'https://pruebas6.comercio-digital.mx/arws/{}' + URL = { + 'timbra': ws.format('timbre4/timbrarV5'), + 'cancel': ws.format('cancela4/cancelarUuid'), + 'cancelxml': ws.format('cancela4/cancelarXml'), + 'status': ws6.format('consultaEstatus'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + + def __init__(self): + self.error = '' + + def _error(self, msg): + self.error = str(msg) + # ~ log.error(msg) + return + + def _post(self, url, data, headers={}): + result = None + headers['host'] = url.split('/')[2] + headers['Content-type'] = 'text/plain' + headers['Connection'] = 'Keep-Alive' + headers['Expect'] = '100-continue' + + # ~ if DEBUG: + # ~ req = requests.Request('POST', url, headers=headers, data=data) + # ~ prepared = req.prepare() + # ~ pretty_print_POST(prepared) + + try: + result = mureq.post(url, body=data, headers=headers, timeout=TIMEOUT) + except Exception as e: + self._error(e) + + return result + + def _validate_cfdi(self, xml): + """ + Comercio Digital solo soporta la declaración con doble comilla + """ + # ~ tree = ET.fromstring(xml.encode()) + # ~ xml = ET.tostring(tree, + # ~ pretty_print=True, doctype='') + return xml.encode('utf-8') + + def stamp(self, cfdi, auth): + url = self.URL['timbra'] + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'tipo': 'XML', + } + cfdi = self._validate_cfdi(cfdi) + result = self._post(url, cfdi, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if 'errmsg' in result.headers: + self._error(result.headers['errmsg']) + return '' + + xml = result.content + tree = ET.fromstring(xml) + cfdi_uuid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=self.NS_CFDI) + date_stamped = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@FechaTimbrado)', + namespaces=self.NS_CFDI) + + data = { + 'xml': xml.decode(), + 'uuid': cfdi_uuid, + 'date': date_stamped, + } + return data + + def _get_data_cancel(self, cfdi, info, auth): + info['tipo'] = 'cfdi' + info['key'] = base64.b64encode(info['key_enc']).decode() + info['cer'] = base64.b64encode(info['cer_ori']).decode() + + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipo = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + uid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=NS_CFDI) + data = ( + f"USER={auth['user']}", + f"PWDW={auth['pass']}", + f"RFCE={rfc_emisor}", + f"UUID={uid}", + f"PWDK={info['pass']}", + f"KEYF={info['key']}", + f"CERT={info['cer']}", + f"TIPO1={info['tipo']}", + f"ACUS=SI", + f"RFCR={rfc_receptor}", + f"TIPOC={tipo}", + f"TOTAL={total}", + f"UUIDREL={info['args']['uuid']}", + f"MOTIVO={info['args']['reason']}", + ) + return '\n'.join(data) + + def cancel(self, cfdi, info, auth): + # ~ if DEBUG or not auth: + # ~ auth = AUTH + url = self.URL['cancel'] + data = self._get_data_cancel(cfdi, info, auth) + + result = self._post(url, data) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + tree = ET.fromstring(result.text) + date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19] + + data = { + 'acuse': result.text, + 'date': date_cancel, + } + + return data + + def _get_headers_cancel_xml(self, cfdi, info, auth): + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipocfdi = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'rfcr': rfc_receptor, + 'total': total, + 'tipocfdi': tipocfdi, + } + headers.update(info) + + return headers + + def cancel_xml(self, xml, auth, cfdi='', info={'tipo': 'cfdi'}): + # ~ if DEBUG or not auth: + # ~ auth = AUTH + + url = self.URL['cancelxml'] + headers = self._get_headers_cancel_xml(cfdi, info, auth) + result = self._post(url, xml, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + tree = ET.fromstring(result.text) + date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19] + + data = { + 'acuse': result.text, + 'date': date_cancel, + } + return data + + def status(self, data, auth): + # ~ if not auth: + # ~ auth = AUTH + url = self.URL['status'] + + data = ( + f"USER={auth['user']}", + f"PWDW={auth['pass']}", + f"RFCR={data['rfc_receptor']}", + f"RFCE={data['rfc_emisor']}", + f"TOTAL={data['total']}", + f"UUID={data['uuid']}", + ) + data = '\n'.join(data) + result = self._post(url, data) + + if result is None: + return '' + + if result.status_code != 200: + self._error(result.status_code) + return self.error + + return result.text + + def _get_data_client(self, auth, values): + data = [f"usr_ws={auth['user']}", f"pwd_ws={auth['pass']}"] + fields = ( + 'rfc_contribuyente', + 'nombre_contribuyente', + 'calle', + 'noExterior', + 'noInterior', + 'colonia', + 'localidad', + 'municipio', + 'estado', + 'pais', + 'cp', + 'contacto', + 'telefono', + 'email', + 'rep_nom', + 'rep_rfc', + 'email_fact', + 'pwd_asignado', + ) + data += [f"{k}={values[k]}" for k in fields] + + return '\n'.join(data) + + def client_add(self, data, auth): + # ~ auth = AUTH + url = self.URL['client'] + data = self._get_data_client(auth, data) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + + def client_balance(self, data, rfc=''): + url = self.URL['saldo'] + host = url.split('/')[2] + headers = { + 'Content-type': 'text/plain', + 'Host': host, + 'Connection' : 'Keep-Alive', + } + data = {'usr': data['user'], 'pwd': data['pass']} + try: + result = requests.get(url, params=data, headers=headers, timeout=TIMEOUT) + except ConnectionError as e: + self._error(e) + return '' + + if result.status_code != 200: + return '' + + if result.text == self.CODES['704']: + self._error(result.text) + return '' + + if result.text == self.CODES['702']: + self._error(result.text) + return '' + + return result.text + + def client_add_timbres(self, data, auth): + # ~ if not auth: + # ~ auth = AUTH + url = self.URL['timbres'] + data = '\n'.join(( + f"usr_ws={auth['user']}", + f"pwd_ws={auth['pass']}", + f"rfc_recibir={data['rfc']}", + f"num_timbres={data['timbres']}" + )) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + diff --git a/source/helper/mureq.py b/source/helper/mureq.py new file mode 100644 index 0000000..49547e4 --- /dev/null +++ b/source/helper/mureq.py @@ -0,0 +1,393 @@ +""" +mureq is a replacement for python-requests, intended to be vendored +in-tree by Linux systems software and other lightweight applications. + +mureq is copyright 2021 by its contributors and is released under the +0BSD ("zero-clause BSD") license. +""" +import contextlib +import io +import os.path +import socket +import ssl +import sys +import urllib.parse +from http.client import HTTPConnection, HTTPSConnection, HTTPMessage, HTTPException + +__version__ = '0.2.0' + +__all__ = ['HTTPException', 'TooManyRedirects', 'Response', + 'yield_response', 'request', 'get', 'post', 'head', 'put', 'patch', 'delete'] + +DEFAULT_TIMEOUT = 15.0 + +# e.g. "Python 3.8.10" +DEFAULT_UA = "Python " + sys.version.split()[0] + + +def request(method, url, *, read_limit=None, **kwargs): + """request performs an HTTP request and reads the entire response body. + + :param str method: HTTP method to request (e.g. 'GET', 'POST') + :param str url: URL to request + :param read_limit: maximum number of bytes to read from the body, or None for no limit + :type read_limit: int or None + :param kwargs: optional arguments defined by yield_response + :return: Response object + :rtype: Response + :raises: HTTPException + """ + with yield_response(method, url, **kwargs) as response: + try: + body = response.read(read_limit) + except HTTPException: + raise + except IOError as e: + raise HTTPException(str(e)) from e + return Response(response.url, response.status, _prepare_incoming_headers(response.headers), body) + + +def get(url, **kwargs): + """get performs an HTTP GET request.""" + return request('GET', url=url, **kwargs) + + +def post(url, body=None, **kwargs): + """post performs an HTTP POST request.""" + return request('POST', url=url, body=body, **kwargs) + + +def head(url, **kwargs): + """head performs an HTTP HEAD request.""" + return request('HEAD', url=url, **kwargs) + + +def put(url, body=None, **kwargs): + """put performs an HTTP PUT request.""" + return request('PUT', url=url, body=body, **kwargs) + + +def patch(url, body=None, **kwargs): + """patch performs an HTTP PATCH request.""" + return request('PATCH', url=url, body=body, **kwargs) + + +def delete(url, **kwargs): + """delete performs an HTTP DELETE request.""" + return request('DELETE', url=url, **kwargs) + + +@contextlib.contextmanager +def yield_response(method, url, *, unix_socket=None, timeout=DEFAULT_TIMEOUT, headers=None, + params=None, body=None, form=None, json=None, verify=True, source_address=None, + max_redirects=None, ssl_context=None): + """yield_response is a low-level API that exposes the actual + http.client.HTTPResponse via a contextmanager. + + Note that unlike mureq.Response, http.client.HTTPResponse does not + automatically canonicalize multiple appearances of the same header by + joining them together with a comma delimiter. To retrieve canonicalized + headers from the response, use response.getheader(): + https://docs.python.org/3/library/http.client.html#http.client.HTTPResponse.getheader + + :param str method: HTTP method to request (e.g. 'GET', 'POST') + :param str url: URL to request + :param unix_socket: path to Unix domain socket to query, or None for a normal TCP request + :type unix_socket: str or None + :param timeout: timeout in seconds, or None for no timeout (default: 15 seconds) + :type timeout: float or None + :param headers: HTTP headers as a mapping or list of key-value pairs + :param params: parameters to be URL-encoded and added to the query string, as a mapping or list of key-value pairs + :param body: payload body of the request + :type body: bytes or None + :param form: parameters to be form-encoded and sent as the payload body, as a mapping or list of key-value pairs + :param json: object to be serialized as JSON and sent as the payload body + :param bool verify: whether to verify TLS certificates (default: True) + :param source_address: source address to bind to for TCP + :type source_address: str or tuple(str, int) or None + :param max_redirects: maximum number of redirects to follow, or None (the default) for no redirection + :type max_redirects: int or None + :param ssl_context: TLS config to control certificate validation, or None for default behavior + :type ssl_context: ssl.SSLContext or None + :return: http.client.HTTPResponse, yielded as context manager + :rtype: http.client.HTTPResponse + :raises: HTTPException + """ + method = method.upper() + headers = _prepare_outgoing_headers(headers) + enc_params = _prepare_params(params) + body = _prepare_body(body, form, json, headers) + + visited_urls = [] + + while max_redirects is None or len(visited_urls) <= max_redirects: + url, conn, path = _prepare_request(method, url, enc_params=enc_params, timeout=timeout, unix_socket=unix_socket, verify=verify, source_address=source_address, ssl_context=ssl_context) + enc_params = '' # don't reappend enc_params if we get redirected + visited_urls.append(url) + try: + try: + conn.request(method, path, headers=headers, body=body) + response = conn.getresponse() + except HTTPException: + raise + except IOError as e: + # wrap any IOError that is not already an HTTPException + # in HTTPException, exposing a uniform API for remote errors + raise HTTPException(str(e)) from e + redirect_url = _check_redirect(url, response.status, response.headers) + if max_redirects is None or redirect_url is None: + response.url = url # https://bugs.python.org/issue42062 + yield response + return + else: + url = redirect_url + if response.status == 303: + # 303 See Other: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303 + method = 'GET' + finally: + conn.close() + + raise TooManyRedirects(visited_urls) + + +class Response: + """Response contains a completely consumed HTTP response. + + :ivar str url: the retrieved URL, indicating whether a redirection occurred + :ivar int status_code: the HTTP status code + :ivar http.client.HTTPMessage headers: the HTTP headers + :ivar bytes body: the payload body of the response + """ + + __slots__ = ('url', 'status_code', 'headers', 'body') + + def __init__(self, url, status_code, headers, body): + self.url, self.status_code, self.headers, self.body = url, status_code, headers, body + + def __repr__(self): + return f"Response(status_code={self.status_code:d})" + + @property + def ok(self): + """ok returns whether the response had a successful status code + (anything other than a 40x or 50x).""" + return not (400 <= self.status_code < 600) + + @property + def content(self): + """content returns the response body (the `body` member). This is an + alias for compatibility with requests.Response.""" + return self.body + + def raise_for_status(self): + """raise_for_status checks the response's success code, raising an + exception for error codes.""" + if not self.ok: + raise HTTPErrorStatus(self.status_code) + + def json(self): + """Attempts to deserialize the response body as UTF-8 encoded JSON.""" + import json as jsonlib + return jsonlib.loads(self.body) + + def _debugstr(self): + buf = io.StringIO() + print("HTTP", self.status_code, file=buf) + for k, v in self.headers.items(): + print(f"{k}: {v}", file=buf) + print(file=buf) + try: + print(self.body.decode('utf-8'), file=buf) + except UnicodeDecodeError: + print(f"<{len(self.body)} bytes binary data>", file=buf) + return buf.getvalue() + + +class TooManyRedirects(HTTPException): + """TooManyRedirects is raised when automatic following of redirects was + enabled, but the server redirected too many times without completing.""" + pass + + +class HTTPErrorStatus(HTTPException): + """HTTPErrorStatus is raised by Response.raise_for_status() to indicate an + HTTP error code (a 40x or a 50x). Note that a well-formed response with an + error code does not result in an exception unless raise_for_status() is + called explicitly. + """ + + def __init__(self, status_code): + self.status_code = status_code + + def __str__(self): + return f"HTTP response returned error code {self.status_code:d}" + + +# end public API, begin internal implementation details + +_JSON_CONTENTTYPE = 'application/json' +_FORM_CONTENTTYPE = 'application/x-www-form-urlencoded' + + +class UnixHTTPConnection(HTTPConnection): + """UnixHTTPConnection is a subclass of HTTPConnection that connects to a + Unix domain stream socket instead of a TCP address. + """ + + def __init__(self, path, timeout=DEFAULT_TIMEOUT): + super(UnixHTTPConnection, self).__init__('localhost', timeout=timeout) + self._unix_path = path + + def connect(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.settimeout(self.timeout) + sock.connect(self._unix_path) + except Exception: + sock.close() + raise + self.sock = sock + + +def _check_redirect(url, status, response_headers): + """Return the URL to redirect to, or None for no redirection.""" + if status not in (301, 302, 303, 307, 308): + return None + location = response_headers.get('Location') + if not location: + return None + parsed_location = urllib.parse.urlparse(location) + if parsed_location.scheme: + # absolute URL + return location + + old_url = urllib.parse.urlparse(url) + if location.startswith('/'): + # absolute path on old hostname + return urllib.parse.urlunparse((old_url.scheme, old_url.netloc, + parsed_location.path, parsed_location.params, + parsed_location.query, parsed_location.fragment)) + + # relative path on old hostname + old_dir, _old_file = os.path.split(old_url.path) + new_path = os.path.join(old_dir, location) + return urllib.parse.urlunparse((old_url.scheme, old_url.netloc, + new_path, parsed_location.params, + parsed_location.query, parsed_location.fragment)) + + +def _prepare_outgoing_headers(headers): + if headers is None: + headers = HTTPMessage() + elif not isinstance(headers, HTTPMessage): + new_headers = HTTPMessage() + if hasattr(headers, 'items'): + iterator = headers.items() + else: + iterator = iter(headers) + for k, v in iterator: + new_headers[k] = v + headers = new_headers + _setdefault_header(headers, 'User-Agent', DEFAULT_UA) + return headers + + +# XXX join multi-headers together so that get(), __getitem__(), +# etc. behave intuitively, then stuff them back in an HTTPMessage. +def _prepare_incoming_headers(headers): + headers_dict = {} + for k, v in headers.items(): + headers_dict.setdefault(k, []).append(v) + result = HTTPMessage() + # note that iterating over headers_dict preserves the original + # insertion order in all versions since Python 3.6: + for k, vlist in headers_dict.items(): + result[k] = ','.join(vlist) + return result + + +def _setdefault_header(headers, name, value): + if name not in headers: + headers[name] = value + + +def _prepare_body(body, form, json, headers): + if body is not None: + if not isinstance(body, bytes): + raise TypeError('body must be bytes or None', type(body)) + return body + + if json is not None: + _setdefault_header(headers, 'Content-Type', _JSON_CONTENTTYPE) + import json as jsonlib + return jsonlib.dumps(json).encode('utf-8') + + if form is not None: + _setdefault_header(headers, 'Content-Type', _FORM_CONTENTTYPE) + return urllib.parse.urlencode(form, doseq=True) + + return None + + +def _prepare_params(params): + if params is None: + return '' + return urllib.parse.urlencode(params, doseq=True) + + +def _prepare_request(method, url, *, enc_params='', timeout=DEFAULT_TIMEOUT, source_address=None, unix_socket=None, verify=True, ssl_context=None): + """Parses the URL, returns the path and the right HTTPConnection subclass.""" + parsed_url = urllib.parse.urlparse(url) + + is_unix = (unix_socket is not None) + scheme = parsed_url.scheme.lower() + if scheme.endswith('+unix'): + scheme = scheme[:-5] + is_unix = True + if scheme == 'https': + raise ValueError("https+unix is not implemented") + + if scheme not in ('http', 'https'): + raise ValueError("unrecognized scheme", scheme) + + is_https = (scheme == 'https') + host = parsed_url.hostname + port = 443 if is_https else 80 + if parsed_url.port: + port = parsed_url.port + + if is_unix and unix_socket is None: + unix_socket = urllib.parse.unquote(parsed_url.netloc) + + path = parsed_url.path + if parsed_url.query: + if enc_params: + path = f'{path}?{parsed_url.query}&{enc_params}' + else: + path = f'{path}?{parsed_url.query}' + else: + if enc_params: + path = f'{path}?{enc_params}' + else: + pass # just parsed_url.path in this case + + if isinstance(source_address, str): + source_address = (source_address, 0) + + if is_unix: + conn = UnixHTTPConnection(unix_socket, timeout=timeout) + elif is_https: + if ssl_context is None: + ssl_context = ssl.create_default_context() + if not verify: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + conn = HTTPSConnection(host, port, source_address=source_address, timeout=timeout, + context=ssl_context) + else: + conn = HTTPConnection(host, port, source_address=source_address, timeout=timeout) + + munged_url = urllib.parse.urlunparse((parsed_url.scheme, parsed_url.netloc, + path, parsed_url.params, + '', parsed_url.fragment)) + return munged_url, conn, path diff --git a/source/helper/util.py b/source/helper/util.py new file mode 100644 index 0000000..04dbd0e --- /dev/null +++ b/source/helper/util.py @@ -0,0 +1,1181 @@ +#!/usr/bin/env python3 + +import os +import re +import subprocess + +import lxml.etree as ET + +from settings import DEBUG, log, PATH_XSLT, DELETE_FILES, PAC_AUTH +from helper.comercio import PACComercioDigital as PAC + + +def _call(args): + return subprocess.check_output(args, shell=True).decode() + + +def join(*paths): + return os.path.join(*paths) + + +def kill(path): + try: + os.remove(path) + except: + pass + return + + +class Cert(): + OPENSSL = 'openssl' + + def __init__ (self, path, name): + self._get_data(path, name) + + @property + def serial_number(self): + return self._serial_number + + @property + def txt(self): + return self._cert_txt + + def _get_data(self, path, name): + path_cer = join(path, f'{name}.cer') + self._path_key = join(path, f'{name}.pem') + self._serial_number = self._get_serial_number(path_cer) + self._cert_txt = self._get_cert_txt(path_cer) + return + + def _get_serial_number(self, path_cer): + args = f'"{self.OPENSSL}" x509 -inform DER -in "{path_cer}" -noout -serial' + serial_number = _call(args) + serial_number = serial_number.split('=')[1].split('\n')[0][1::2] + return serial_number + + def _get_cert_txt(self, path_cer): + args = f'"{self.OPENSSL}" enc -base64 -in "{path_cer}"' + data = _call(args).replace('\n', '') + return data + + def sign(self, data): + args = f'echo -n -e "{data.decode()}" | "{self.OPENSSL}" dgst -sha256 -sign "{self._path_key}" | "{self.OPENSSL}" enc -base64' + data = _call(args).replace('\n', '') + return data + + +class DictToCfdi(): + _PREFIX = 'cfdi' + _XMLNS = 'http://www.sat.gob.mx/cfd/4' + _SCHEMA = f'{_XMLNS} http://www.sat.gob.mx/sitio_internet/cfd/4/cfdv40.xsd' + _LEYENDAS = { + 'version': '1.0', + 'prefix': 'leyendasFisc', + 'xmlns': 'http://www.sat.gob.mx/leyendasFiscales', + 'schema': ' http://www.sat.gob.mx/leyendasFiscales http://www.sat.gob.mx/sitio_internet/cfd/leyendasFiscales/leyendasFisc.xsd', + } + _PAGOS = { + 'version': '2.0', + 'prefix': 'pago20', + 'xmlns': 'http://www.sat.gob.mx/Pagos20', + 'schema': ' http://www.sat.gob.mx/Pagos20 http://www.sat.gob.mx/sitio_internet/cfd/Pagos/Pagos20.xsd', + } + _COMERCIO = { + 'version': '1.1', + 'prefix': 'cce11', + 'xmlns': 'http://www.sat.gob.mx/ComercioExterior11', + 'schema': ' http://www.sat.gob.mx/ComercioExterior11 http://www.sat.gob.mx/sitio_internet/cfd/ComercioExterior11/ComercioExterior11.xsd', + } + + def __init__ (self, data): + self._data = data + self._cfdi = None + self._root = None + self._attr_complementos = {} + self._node_addenda = None + self._make_cfdi() + + @property + def cfdi(self): + return self._cfdi + + def _make_cfdi(self): + self._validate_data() + self._comprobante() + self._relacionados() + self._emisor() + self._receptor() + self._conceptos() + self._impuestos() + self._complementos() + self._addenda() + + xml = ET.tostring(self._root, + pretty_print=True, xml_declaration=True, encoding='utf-8') + self._cfdi = xml.decode() + return + + def _validate_data(self): + self._schema = self._SCHEMA + + if 'leyendas' in self._data['complementos']: + self._schema += self._LEYENDAS['schema'] + self._attr_complementos['leyendas'] = { + self._LEYENDAS['prefix']: self._LEYENDAS['xmlns'] + } + if 'pagos' in self._data['complementos']: + self._schema += self._PAGOS['schema'] + self._attr_complementos['pagos'] = { + self._PAGOS['prefix']: self._PAGOS['xmlns'] + } + if 'comercio' in self._data['complementos']: + self._schema += self._COMERCIO['schema'] + self._attr_complementos['comercio'] = { + self._COMERCIO['prefix']: self._COMERCIO['xmlns'] + } + + return + + def _comprobante(self): + attr = self._data['comprobante'] + NSMAP = { + self._PREFIX: self._XMLNS, + 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', + } + for k, value in self._attr_complementos.items(): + NSMAP.update(value) + + attr_qname = ET.QName( + 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') + schema = {attr_qname: self._schema} + + node_name = f'{{{self._XMLNS}}}Comprobante' + self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP) + return + + def _relacionados(self): + data = self._data['relacionados'] + if not data: + return + + node_name = f'{{{self._XMLNS}}}CfdiRelacionados' + attr = {'TipoRelacion': data['TipoRelacion']} + node = ET.SubElement(self._root, node_name, attr) + for uuid in data['UUID']: + node_name = f'{{{self._XMLNS}}}CfdiRelacionado' + attr = {'UUID': uuid} + ET.SubElement(node, node_name, attr) + return + + def _emisor(self): + attr = self._data['emisor'] + node_name = f'{{{self._XMLNS}}}Emisor' + emisor = ET.SubElement(self._root, node_name, attr) + return + + def _receptor(self): + attr = self._data['receptor'] + node_name = f'{{{self._XMLNS}}}Receptor' + emisor = ET.SubElement(self._root, node_name, attr) + return + + def _conceptos(self): + products = self._data['conceptos'] + node_name = f'{{{self._XMLNS}}}Conceptos' + node = ET.SubElement(self._root, node_name) + for product in products: + complemento = product.pop('complemento', {}) + taxes = product.pop('impuestos', {}) + node_name = f'{{{self._XMLNS}}}Concepto' + node_product = ET.SubElement(node, node_name, product) + if not taxes: + continue + + node_name = f'{{{self._XMLNS}}}Impuestos' + node_taxes = ET.SubElement(node_product, node_name) + traslados = taxes.get('traslados', []) + retenciones = taxes.get('retenciones', []) + + if traslados: + node_name = f'{{{self._XMLNS}}}Traslados' + node_tmp = ET.SubElement(node_taxes, node_name) + for tax in traslados: + node_name = f'{{{self._XMLNS}}}Traslado' + ET.SubElement(node_tmp, node_name, tax) + + if retenciones: + node_name = f'{{{self._XMLNS}}}Retenciones' + node_tmp = ET.SubElement(node_taxes, node_name) + for tax in retenciones: + node_name = f'{{{self._XMLNS}}}Retencion' + ET.SubElement(node_tmp, node_name, tax) + + return + + def _impuestos(self): + taxes = self._data['impuestos'] + if not taxes: + return + + node_name = f'{{{self._XMLNS}}}Impuestos' + retenciones = taxes.pop('retenciones', ()) + traslados = taxes.pop('traslados', ()) + node = ET.SubElement(self._root, node_name, taxes) + + if retenciones: + node_name = f'{{{self._XMLNS}}}Retenciones' + sub_node = ET.SubElement(node, node_name) + node_name = f'{{{self._XMLNS}}}Retencion' + for tax in retenciones: + ET.SubElement(sub_node, node_name, tax) + + if traslados: + node_name = f'{{{self._XMLNS}}}Traslados' + sub_node = ET.SubElement(node, node_name) + node_name = f'{{{self._XMLNS}}}Traslado' + for tax in traslados: + ET.SubElement(sub_node, node_name, tax) + return + + def _complementos(self): + if not self._data['complementos']: + return + + node_name = f'{{{self._XMLNS}}}Complemento' + node = ET.SubElement(self._root, node_name) + + if 'leyendas' in self._data['complementos']: + self._complemento_leyendas(self._data['complementos']['leyendas'], node) + + if 'pagos' in self._data['complementos']: + self._complemento_pagos(self._data['complementos']['pagos'], node) + elif 'comercio' in self._data['complementos']: + self._complemento_comercio(self._data['complementos']['comercio'], node) + + return + + def _complemento_leyendas(self, data, node): + attr = {'version': self._LEYENDAS['version']} + node_name = f"{{{self._LEYENDAS['xmlns']}}}LeyendasFiscales" + node_leyendas = ET.SubElement(node, node_name, attr) + + for leyenda in data: + node_name = f"{{{self._LEYENDAS['xmlns']}}}Leyenda" + ET.SubElement(node_leyendas, node_name, leyenda) + return + + def _complemento_pagos(self, data, node): + pago = data.pop('pago') + docs = data.pop('docs') + taxesd = data.pop('taxesd') + taxes = data.pop('taxes') + + attr = {'Version': data.pop('Version')} + node_name = f"{{{self._PAGOS['xmlns']}}}Pagos" + node_pagos = ET.SubElement(node, node_name, attr) + + attr = data + node_name = f"{{{self._PAGOS['xmlns']}}}Totales" + ET.SubElement(node_pagos, node_name, attr) + + node_name = f"{{{self._PAGOS['xmlns']}}}Pago" + node_pago = ET.SubElement(node_pagos, node_name, pago) + + for i, doc in enumerate(docs): + node_name = f"{{{self._PAGOS['xmlns']}}}DoctoRelacionado" + node_doc = ET.SubElement(node_pago, node_name, doc) + if taxesd: + node_name = f"{{{self._PAGOS['xmlns']}}}ImpuestosDR" + node_taxes_doc = ET.SubElement(node_doc, node_name) + if 'retenciones' in taxesd: + node_name = f"{{{self._PAGOS['xmlns']}}}RetencionesDR" + node_taxes_dr = ET.SubElement(node_taxes_doc, node_name) + for r in taxesd['retenciones']: + node_name = f"{{{self._PAGOS['xmlns']}}}RetencionDR" + ET.SubElement(node_taxes_dr, node_name, r) + if 'traslados' in taxesd: + node_name = f"{{{self._PAGOS['xmlns']}}}TrasladosDR" + node_taxes_dt = ET.SubElement(node_taxes_doc, node_name) + for t in taxesd['traslados']: + node_name = f"{{{self._PAGOS['xmlns']}}}TrasladoDR" + ET.SubElement(node_taxes_dt, node_name, t) + + node_name = f"{{{self._PAGOS['xmlns']}}}ImpuestosP" + node_taxes = ET.SubElement(node_pago, node_name) + + if 'retenciones' in taxes: + node_name = f"{{{self._PAGOS['xmlns']}}}RetencionesP" + node_taxes_r = ET.SubElement(node_taxes, node_name) + for r in taxes['retenciones']: + node_name = f"{{{self._PAGOS['xmlns']}}}RetencionP" + ET.SubElement(node_taxes_r, node_name, r) + if 'traslados' in taxes: + node_name = f"{{{self._PAGOS['xmlns']}}}TrasladosP" + node_taxes_t = ET.SubElement(node_taxes, node_name) + for t in taxes['traslados']: + node_name = f"{{{self._PAGOS['xmlns']}}}TrasladoP" + ET.SubElement(node_taxes_t, node_name, t) + + + return + + def _complemento_comercio(self, data, node): + mercancias = data.pop('mercancias', {}) + emisor = data.pop('emisor', {}) + propietario = data.pop('propietario', {}) + receptor = data.pop('receptor', {}) + destinatario = data.pop('destinatario', {}) + attr = data + + node_name = f"{{{self._COMERCIO['xmlns']}}}ComercioExterior" + node_comercio = ET.SubElement(node, node_name, attr) + + if emisor: + node_name = f"{{{self._COMERCIO['xmlns']}}}Emisor" + attr = {} + if 'Curp' in emisor: + attr = {'Curp': emisor.pop('Curp')} + node_emisor = ET.SubElement(node_comercio, node_name, attr) + + node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" + ET.SubElement(node_emisor, node_name, emisor) + + if propietario: + node_name = f"{{{self._COMERCIO['xmlns']}}}Propietario" + ET.SubElement(node_comercio, node_name, propietario) + + if receptor: + node_name = f"{{{self._COMERCIO['xmlns']}}}Receptor" + attr = {} + if 'NumRegIdTrib' in receptor: + attr = {'NumRegIdTrib': emisor.pop('NumRegIdTrib')} + node_receptor = ET.SubElement(node_comercio, node_name, attr) + + node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" + ET.SubElement(node_receptor, node_name, receptor) + + if destinatario: + node_name = f"{{{self._COMERCIO['xmlns']}}}Destinatario" + attr = {} + if 'NumRegIdTrib' in destinatario: + attr['NumRegIdTrib'] = destinatario.pop('NumRegIdTrib') + if 'Nombre' in destinatario: + attr['Nombre'] = destinatario.pop('Nombre') + node_destinatario = ET.SubElement(node_comercio, node_name, attr) + + node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" + ET.SubElement(node_destinatario, node_name, destinatario) + + if mercancias: + node_name = f"{{{self._COMERCIO['xmlns']}}}Mercancias" + node_mercancias = ET.SubElement(node_comercio, node_name) + for mercancia in mercancias: + description = mercancia.pop('description', {}) + node_name = f"{{{self._COMERCIO['xmlns']}}}Mercancia" + node_mercancia = ET.SubElement(node_mercancias, node_name, mercancia) + if description: + node_name = f"{{{self._COMERCIO['xmlns']}}}DescripcionesEspecificas" + ET.SubElement(node_mercancia, node_name, description) + return + + def _addenda(self): + type_boveda = self._data['addenda'].get('type', '') + type_client = self._data['addenda'].get('type_client', '') + data = self._data['addenda'].get('boveda', False) + lotes = self._data['addenda']['lotes'] + self._boveda(type_boveda, data, lotes) + if type_client: + data = self._data['addenda'].get('cliente', False) + partes = self._data['addenda'].get('partes', ()) + self._addenda_client(type_client, data, partes) + return + + def _boveda(self, type_boveda, data, lotes): + if not data: + return + + XMLNS = 'http://kontender.mx/namespace/boveda' + NSMAP = { + 'bovadd': 'http://kontender.mx/namespace/boveda', + 'kon': 'http://kontender.mx/namespace', + } + + node_name = f'{{{self._XMLNS}}}Addenda' + self._node_addenda = ET.SubElement(self._root, node_name) + + schema = 'http://kontender.mx/namespace/boveda http://kontender.mx/namespace/boveda/BOVEDAFISCAL.xsd http://kontender.mx/namespace http://kontender.mx/namespace/AddendaK.xsd' + attr_qname = ET.QName( + 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') + schema = {attr_qname: schema} + + node_name = f'{{{XMLNS}}}BOVEDAFISCAL' + node = ET.SubElement(self._node_addenda, node_name, schema, nsmap=NSMAP) + + if type_boveda == '01': + for k, v in data.items(): + node_name = f'{{{XMLNS}}}{k}' + ET.SubElement(node, node_name, v) + if lotes: + node_name = f'{{{XMLNS}}}Lotes' + node_lotes = ET.SubElement(node, node_name) + for lote in lotes: + node_name = f'{{{XMLNS}}}Lote' + node_lote = ET.SubElement(node_lotes, node_name) + for k2, v2 in lote.items(): + node_name = f'{{{XMLNS}}}{k2}' + sn = ET.SubElement(node_lote, node_name) + sn.text = v2 + elif type_boveda == '02': + for k, v in data.items(): + node_name = f'{{{XMLNS}}}{k}' + n = ET.SubElement(node, node_name) + n.text = v + + return + + def _addenda_client(self, type_client, data, partes): + if not data: + return + + if type_client == '1': + XMLNS = 'http://www.vwnovedades.com/volkswagen/kanseilab/shcp/2009/Addenda/PMT' + NSMAP = {'PMT': XMLNS} + elif type_client == '2': + XMLNS = 'http://www.vwnovedades.com/volkswagen/kanseilab/shcp/2009/Addenda/PSV' + NSMAP = {'PSV': XMLNS} + elif type_client == '4': + XMLNS = 'http://www.sas-automative/en/locations/local-offices-and-plants/mexico/plant-puebla.html' + NSMAP = {'PMT': XMLNS} + + attr = data.pop('Factura') + node_name = f'{{{XMLNS}}}Factura' + node = ET.SubElement(self._node_addenda, node_name, **attr, nsmap=NSMAP) + + for key, attr in data.items(): + node_name = f'{{{XMLNS}}}{key}' + ET.SubElement(node, node_name, **attr) + + if not partes: + return + + node_name = f'{{{XMLNS}}}Partes' + node = ET.SubElement(node, node_name) + + for parte in partes: + referencias = parte.pop('referencias') + node_name = f'{{{XMLNS}}}Parte' + sub_node = ET.SubElement(node, node_name, **parte) + node_name = f'{{{XMLNS}}}Referencias' + ET.SubElement(sub_node, node_name, **referencias) + + return + + +class DataToDict(): + TRASLADO = 'T' + RETENCION = 'R' + NODES = { + '01': '_comprobante', + '02': '_relacionados', + '03': '_emisor', + '04': '_receptor', + '05': '_conceptos', + '06': '_impuestos', + '10': '_leyendas', + '11': '_complemento', + '12': '_complemento_12', + '13': '_complemento_13', + '14': '_complemento_14', + '15': '_complemento_15', + '16': '_complemento_16', + '17': '_complemento_17', + '49': '_addenda_lotes', + '50': '_boveda', + '51': '_addenda', + '52': '_addenda_partes', + } + + def __init__ (self, data): + self._data = data + self._cfdi = {'conceptos': [], 'impuestos': {}, 'complementos': {}, + 'addenda': {}} + self._complement = '' + self._type_header = '' + self._partes = [] + self._lotes = [] + self._ce_mercancias = [] + self._process_data() + + @property + def cfdi(self): + return self._cfdi + + def _process_data(self): + lines = self._data.split('\n') + for line in lines: + parts = line.split('|') + if not parts[0]: + continue + header = self.NODES.get(parts[0], '') + if not header: + log.debug(f'No existe: {parts[0]}') + continue + if hasattr(self, header): + getattr(self, header)(parts[2:]) + + self._cfdi['addenda']['partes'] = self._partes + self._cfdi['addenda']['lotes'] = self._lotes + + if self._ce_mercancias: + self._cfdi['complementos']['comercio']['mercancias'] = \ + self._ce_mercancias + return + + def _comprobante(self, data): + self._cfdi['comprobante'] = {} + fields = ( + 'Version', + 'Serie', + 'Folio', + 'Fecha', + 'FormaPago', + 'CondicionesDePago', + 'SubTotal', + 'Descuento', + 'Moneda', + 'TipoCambio', + 'Total', + 'TipoDeComprobante', + 'MetodoPago', + 'LugarExpedicion', + 'Confirmacion', + 'Exportacion', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['comprobante'][field] = data[index] + return + + def _relacionados(self, data): + self._cfdi['relacionados'] = {} + if data[0]: + self._cfdi['relacionados']['TipoRelacion'] = data[0] + self._cfdi['relacionados']['UUID'] = data[1:] + return + + def _emisor(self, data): + self._cfdi['emisor'] = {} + fields = ( + 'Rfc', + 'Nombre', + 'RegimenFiscal', + ) + for index, field in enumerate(fields): + self._cfdi['emisor'][field] = data[index] + return + + def _receptor(self, data): + self._cfdi['receptor'] = {} + fields = ( + 'Rfc', + 'Nombre', + 'DomicilioFiscalReceptor', + 'ResidenciaFiscal', + 'NumRegIdTrib', + 'RegimenFiscalReceptor', + 'UsoCFDI', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['receptor'][field] = data[index] + return + + def _get_taxes_by_concept(self, data): + taxes = {} + traslados = [] + retenciones = [] + for i in range(0, len(data), 6): + type_tax = data[i] + tax = { + 'Base': data[i + 1], + 'Impuesto': data[i + 2], + 'TipoFactor': data[i + 3], + 'TasaOCuota': data[i + 4], + 'Importe': data[i + 5], + } + if type_tax == self.TRASLADO: + traslados.append(tax) + elif type_tax == self.RETENCION: + retenciones.append(tax) + if traslados: + taxes['traslados'] = traslados + if retenciones: + taxes['retenciones'] = retenciones + return taxes + + def _conceptos(self, data): + concepto = {} + fields = ( + 'ClaveProdServ', + 'NoIdentificacion', + 'Cantidad', + 'ClaveUnidad', + 'Unidad', + 'Descripcion', + 'ValorUnitario', + 'Importe', + 'Descuento', + 'ObjetoImp', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + concepto[field] = data[index] + pedimento = data[index + 1] + if pedimento: + concepto['pedimento'] = pedimento + concepto['impuestos'] = self._get_taxes_by_concept(data[index + 2:]) + self._cfdi['conceptos'].append(concepto) + return + + def _get_taxes(self, data): + traslados = [] + retenciones = [] + for i in range(0, len(data), 6): + type_tax = data[i] + if type_tax == self.TRASLADO: + tax = { + 'Base': data[i + 1], + 'Impuesto': data[i + 2], + 'TipoFactor': data[i + 3], + 'TasaOCuota': data[i + 4], + 'Importe': data[i + 5], + } + traslados.append(tax) + elif type_tax == self.RETENCION: + tax = { + 'Impuesto': data[i + 2], + 'Importe': data[i + 5], + } + retenciones.append(tax) + if traslados: + self._cfdi['impuestos']['traslados'] = traslados + if retenciones: + self._cfdi['impuestos']['retenciones'] = retenciones + return + + def _impuestos(self, data): + # ~ self._cfdi['impuestos'] = {} + fields = ( + 'TotalImpuestosRetenidos', + 'TotalImpuestosTrasladados', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['impuestos'][field] = data[index] + self._get_taxes(data[index + 1:]) + return + + def _leyendas(self, data): + if not data: + return + + leyendas = [] + for i in range(0, len(data), 3): + leyenda = { + 'disposicionFiscal': data[i], + 'norma': data[i+1], + 'textoLeyenda': data[i+2], + } + leyendas.append(leyenda) + + self._cfdi['complementos']['leyendas'] = leyendas + return + + def _fields_to_dict(self, fields, data): + attr = {} + for index, field in enumerate(fields): + if not data[index]: + continue + attr[field] = data[index] + return attr + + def _complemento(self, data): + if not data: + return + + self._complement = data[0] + version = {'Version': data[1]} + if self._complement == '1': + self._cfdi['complementos']['pagos'] = version + self._cfdi['complementos']['pagos']['docs'] = [] + self._cfdi['complementos']['pagos']['taxes'] = {} + self._cfdi['complementos']['pagos']['taxesd'] = {} + elif self._complement == '2': + self._cfdi['complementos']['comercio'] = version + + return + + def _complemento_12(self, data): + if not data: + return + + if self._complement == '1': + fields = ( + 'TotalRetencionesIVA', + 'TotalRetencionesISR', + 'TotalRetencionesIEPS', + 'TotalTrasladosBaseIVA16', + 'TotalTrasladosImpuestoIVA16', + 'TotalTrasladosBaseIVA8', + 'TotalTrasladosImpuestoIVA8', + 'TotalTrasladosBaseIVA0', + 'TotalTrasladosImpuestoIVA0', + 'TotalTrasladosBaseIVAExento', + 'MontoTotalPagos', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['pagos'].update(attr) + elif self._complement == '2': + fields = ( + 'MotivoTraslado', + 'TipoOperacion', + 'ClaveDePedimento', + 'CertificadoOrigen', + 'NumCertificadoOrigen', + 'NumeroExportadorConfiable', + 'Incoterm', + 'Subdivision', + 'Observaciones', + 'TipoCambioUSD', + 'TotalUSD', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['comercio'].update(attr) + + return + + def _complemento_13(self, data): + if not data: + return + + if self._complement == '1': + fields = ( + 'FechaPago', + 'FormaDePagoP', + 'MonedaP', + 'TipoCambioP', + 'Monto', + 'NumOperacion', + 'RfcEmisorCtaOrd', + 'NomBancoOrdExt', + 'CtaOrdenante', + 'RfcEmisorCtaBen', + 'CtaBeneficiario', + 'TipoCadPago', + 'CertPago', + 'CadPago', + 'SelloPago', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['pagos']['pago'] = attr + elif self._complement == '2': + fields = ( + 'Curp', + 'Calle', + 'NumeroExterior', + 'NumeroInterior', + 'Colonia', + 'Localidad', + 'Referencia', + 'Municipio', + 'Estado', + 'Pais', + 'CodigoPostal', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['comercio']['emisor'] = attr + return + + def _get_retenciones_by_pay(self, data): + retenciones = [] + for i in range(0, len(data), 2): + tax = { + 'ImpuestoP': data[i], + 'ImporteP': data[i + 1], + } + retenciones.append(tax) + return retenciones + + def _complemento_14(self, data): + if not data: + return + + if self._complement == '1': + attr = self._get_retenciones_by_pay(data) + self._cfdi['complementos']['pagos']['taxes']['retenciones'] = attr + elif self._complement == '2': + fields = ('NumRegIdTrib', 'ResidenciaFiscal') + attr = self._fields_to_dict(fields, data) + if attr: + self._cfdi['complementos']['comercio']['propietario'] = attr + return + + def _get_traslados_by_pay(self, data): + traslados = [] + for i in range(0, len(data), 5): + tax = { + 'BaseP': data[i], + 'ImpuestoP': data[i + 1], + 'TipoFactorP': data[i + 2], + 'TasaOCuotaP': data[i + 3], + 'ImporteP': data[i + 4], + } + traslados.append(tax) + return traslados + + def _complemento_15(self, data): + if not data: + return + + if self._complement == '1': + attr = self._get_traslados_by_pay(data) + self._cfdi['complementos']['pagos']['taxes']['traslados'] = attr + elif self._complement == '2': + fields = ( + 'NumRegIdTrib', + 'Calle', + 'NumeroExterior', + 'NumeroInterior', + 'Colonia', + 'Localidad', + 'Referencia', + 'Municipio', + 'Estado', + 'Pais', + 'CodigoPostal', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['comercio']['receptor'] = attr + return + + def _complemento_16(self, data): + if not data: + return + + if self._complement == '1': + fields = ( + 'IdDocumento', + 'Serie', + 'Folio', + 'MonedaDR', + 'EquivalenciaDR', + 'NumParcialidad', + 'ImpSaldoAnt', + 'ImpPagado', + 'ImpSaldoInsoluto', + 'ObjetoImpDR', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['pagos']['docs'].append(attr) + elif self._complement == '2': + fields = ( + 'NumRegIdTrib', + 'Nombre', + 'Calle', + 'NumeroExterior', + 'NumeroInterior', + 'Colonia', + 'Localidad', + 'Referencia', + 'Municipio', + 'Estado', + 'Pais', + 'CodigoPostal', + ) + attr = self._fields_to_dict(fields, data) + self._cfdi['complementos']['comercio']['destinatario'] = attr + return + + def _get_taxes_by_doc(self, data): + taxes = {} + traslados = [] + retenciones = [] + for i in range(0, len(data), 6): + type_tax = data[i] + tax = { + 'BaseDR': data[i + 1], + 'ImpuestoDR': data[i + 2], + 'TipoFactorDR': data[i + 3], + 'TasaOCuotaDR': data[i + 4], + 'ImporteDR': data[i + 5], + } + if type_tax == self.TRASLADO: + traslados.append(tax) + elif type_tax == self.RETENCION: + retenciones.append(tax) + + if traslados: + taxes['traslados'] = traslados + if retenciones: + taxes['retenciones'] = retenciones + + return taxes + + def _complemento_17(self, data): + if not data: + return + + if self._complement == '1': + attr = self._get_taxes_by_doc(data) + self._cfdi['complementos']['pagos']['taxesd'] = attr + elif self._complement == '2': + fields = ( + 'NoIdentificacion', + 'FraccionArancelaria', + 'CantidadAduana', + 'UnidadAduana', + 'ValorUnitarioAduana', + 'ValorDolares', + ) + mercancia = self._fields_to_dict(fields, data) + fields = ( + 'Marca', + 'Modelo', + 'SubModelo', + 'NumeroSerie', + ) + description = self._fields_to_dict(fields, data[6:]) + if description: + mercancia['description'] = description + self._ce_mercancias.append(mercancia) + + return + + def _boveda(self, data): + type_addenda = data[0] + if type_addenda == '01': + fields = ( + ('ImporteLetra', 'importe'), + ('UsoCFDI', 'UsoCFDI'), + ('MetodosPago', 'MetodoPagoSAT'), + ('FormaPago', 'FormaPagoSAT'), + ('TipoDoctoElectronico', 'TipoDocumento'), + ('BovedaFiscal', 'almacen', 'condicion', 'correoEmisor', 'correoReceptor', 'numeroCliente', 'razonSocialCliente', 'tipo'), + ('DireccionEmisor', 'Calle', 'CodigoPostal', 'Colonia', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia', 'Telefono'), + ('DireccionSucursal', 'Calle', 'Ciudad', 'CodigoPostal', 'Colonia', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia'), + ('DireccionReceptor', 'Calle', 'Ciudad', 'CodigoPostal', 'Colonia', 'Delegacion', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia'), + ('DireccionReceptorSucursal', 'Nombre', 'Calle', 'Ciudad', 'CodigoPostal', 'Estado', 'Pais', 'Comentario', 'Dato01', 'Dato02', 'Dato03', 'Dato04', 'Dato05', 'Dato06', 'Dato07', 'Dato08', 'Dato09', 'Dato10'), + ('NombreComercial', 'Nombre'), + ('ClaveTipoFolio', 'clave'), + ('TR', 'transaccion'), + ('OrdenCompra', 'folio'), + ('NotaDeVenta', 'folio'), + ) + boveda = {} + i = 1 + for f in fields: + k = f[0] + attr = {} + for a in f[1:]: + try: + attr[a] = data[i] + except IndexError: + log.error('Faltan datos en addenda Boveda') + attr[a] = '' + i += 1 + boveda[k] = attr + elif type_addenda == '02': + fields = ( + 'Razon_Social_destino', + 'Calle_Destino', + 'Colonia_Destino', + 'Ciudad_Destino', + 'Estado_Destino', + 'Pais_Destino', + 'CP_Destino_consigan', + 'RFC_Destino_consigna', + 'Telefono_Receptor', + 'Peso_Bruto', + 'Peso_Neto', + 'Incoterm', + 'leyenda_pie', + 'R.vto', + 'TIPO_CAMBIO_FACTURA', + 'R.cte', + 'RI_Solicitante', + 'R.fefa', + 'Razon_Social_facturado', + 'Calle_facturado', + 'Colonia_facturado', + 'RFC_destino', + 'Telefono_facturado', + 'NUMCTAPAGO', + ) + boveda = {} + for i, f in enumerate(fields): + boveda[f] = data[i+1] + + self._cfdi['addenda']['type'] = type_addenda + self._cfdi['addenda']['boveda'] = boveda + + return + + def _addenda(self, header): + self._type_header = header[1] + self._cfdi['addenda']['type_client'] = self._type_header + data = {} + if self._type_header == '2': + data['Factura'] = { + 'version': header[3], + 'tipoDocumentoFiscal': header[4], + 'tipoDocumentoVWM': header[5], + 'division': header[6]} + data['Moneda'] = { + 'tipoMoneda': header[7], + 'tipoCambio': header[8]} + data['Proveedor'] = { + 'codigo': header[10], + 'nombre': header[11], + 'correoContacto': header[12]} + data['Origen'] = {'codigo': header[13]} + data['Destino'] = { + 'codigo': header[14], + 'naveReciboMaterial': header[15]} + data['Referencias'] = {'referenciaProveedor': header[16]} + data['Solicitante'] = {'correo': header[17]} + data['Archivo'] = {'datos': header[20], 'tipo': 'ZIP'} + elif self._type_header == '4': + data['Factura'] = {'version': header[3]} + data['Moneda'] = {'tipoMoneda': header[7]} + data['Proveedor'] = {'codigo': header[10], 'nombre': header[11]} + data['Referencias'] = {'referenciaProveedor': header[16]} + + self._cfdi['addenda']['cliente'] = data + return + + def _addenda_lotes(self, lote): + attr = { + 'IVM_InvoiceID': '', + 'POline': lote[0], + 'NoIdentificacion': lote[1], + 'Lote': lote[2], + 'Cantidad': lote[3], + 'Fecha': lote[4], + } + self._lotes.append(attr) + return + + def _addenda_partes(self, parte): + if self._type_header == '2': + attr = {'posicion': parte[0], + 'numeroMaterial': parte[1], + 'descripcionMaterial': parte[2], + 'cantidadMaterial': parte[3], + 'unidadMedida': parte[4], + 'precioUnitario': parte[5], + 'montoLinea': parte[6], + 'codigoImpuesto': parte[8], + 'referencias': { + 'ordenCompra': parte[7], + } + } + elif self._type_header == '4': + attr = {'posicion': parte[0], + 'numeroMaterial': parte[1], + 'descripcionMaterial': parte[2], + 'referencias': { + 'ordenCompra': parte[7], + 'numeroPKN': parte[8], + 'numeroASN': parte[9], + } + } + self._partes.append(attr) + return + + +def stamp_cfdi(cfdi, cert): + xslt = open(PATH_XSLT, 'rb') + root = ET.fromstring(cfdi.encode()) + root.attrib['NoCertificado'] = cert.serial_number + root.attrib['Certificado'] = cert.txt + + transfor = ET.XSLT(ET.parse(xslt)) + cadena = str(transfor(root)).encode() + root.attrib['Sello'] = cert.sign(cadena) + xslt.close() + + xml = ET.tostring(root, pretty_print=True, encoding='utf-8') + + return xml.decode() + + +def _get_files(path, ext='xml'): + paths = [] + for folder, _, files in os.walk(path): + pattern = re.compile('\.{}'.format(ext), re.IGNORECASE) + paths += [join(folder, f) for f in files if pattern.search(f)] + return paths + + +def _read_file(path, encoding='utf-8'): + # ~ CODEC_WIN = 'ISO-8859-1' + with open(path, 'r', encoding=encoding) as f: + data = f.read() + if DEBUG: + msg = f'Archivo leido: {path}' + log.debug(msg) + return data + + +def _save_file(path, target, data): + _, filename = os.path.split(path) + name, _ = os.path.splitext(filename) + path_new = join(target, f'{name}.xml') + data = f'\n{data}' + with open(path_new, 'w', encoding='utf-8') as f: + f.write(data) + if DEBUG: + msg = f'Archivo sellado: {path}' + log.debug(msg) + return path_new + + +def make_cfdi(source, target, dir_cert, nombre): + cert = Cert(dir_cert, nombre) + paths = _get_files(source, 'txt') + for path in paths: + # ~ _version33(path, target) + # ~ continue + data = _read_file(path) + data = DataToDict(data).cfdi + cfdi = DictToCfdi(data).cfdi + cfdi = stamp_cfdi(cfdi, cert) + path_xml = _save_file(path, target, cfdi) + msg = f'CFDI: {path_xml}' + log.info(msg) + return + + +def stamp_pac(source, target): + pac = PAC() + paths = _get_files(source) + for path in paths: + log.info(f'\tEnviar: {path}') + _, filename = os.path.split(path) + data = open(path, 'r').read() + result = pac.stamp(data, PAC_AUTH) + + if pac.error: + log.error(pac.error) + continue + + new_path = f'{target}/{filename}' + + with open(new_path, 'w') as f: + f.write(result['xml']) + log.info(f'\tTimbrada: {new_path}') + return diff --git a/source/settings.py b/source/settings.py new file mode 100644 index 0000000..153dc85 --- /dev/null +++ b/source/settings.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +import logging +import os +from conf import DEBUG, DELETE_FILES, PAC_AUTH + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + +current_path = os.path.dirname(__file__) +PATH_XSLT = os.path.join(current_path, 'xslt', 'cadena.xslt') diff --git a/source/xslt/cadena.xslt b/source/xslt/cadena.xslt new file mode 100644 index 0000000..eaab860 --- /dev/null +++ b/source/xslt/cadena.xslt @@ -0,0 +1,403 @@ + + + + + + + + + + + + + + + ||| + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/xslt/comercioexterior11.xslt b/source/xslt/comercioexterior11.xslt new file mode 100644 index 0000000..fd71841 --- /dev/null +++ b/source/xslt/comercioexterior11.xslt @@ -0,0 +1,181 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/xslt/leyendasFisc.xslt b/source/xslt/leyendasFisc.xslt new file mode 100644 index 0000000..e0587a2 --- /dev/null +++ b/source/xslt/leyendasFisc.xslt @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/xslt/pagos20.xslt b/source/xslt/pagos20.xslt new file mode 100644 index 0000000..1e6cf98 --- /dev/null +++ b/source/xslt/pagos20.xslt @@ -0,0 +1,233 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/source/xslt/utilerias.xslt b/source/xslt/utilerias.xslt new file mode 100644 index 0000000..4ae4bf4 --- /dev/null +++ b/source/xslt/utilerias.xslt @@ -0,0 +1,22 @@ + + + + + + | + + + + + + + + | + + + + + + + +