diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b2a7d2a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,6 @@ +# Lista de cambios + + +## v 0.1.0 [00-Jun-22] +--- +* Versión inicial diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..8acdd82 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ab90481 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +lxml diff --git a/source/cfdi-txt.py b/source/cfdi-txt.py new file mode 100755 index 0000000..43b5ff7 --- /dev/null +++ b/source/cfdi-txt.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +import argparse +from helper import util + + +def main(args): + if args.dir_trabajo: + entrada = util.join(args.dir_trabajo, 'entrada') + sellados = util.join(args.dir_trabajo, 'sellados') + timbrados = util.join(args.dir_trabajo, 'timbrados') + else: + entrada = args.dir_entrada + sellados = args.dir_sellados + timbrados = args.dir_timbrados + + if args.generar: + util.make_cfdi(entrada, sellados, args.dir_cert, args.nombre) + + if args.timbrar: + util.stamp_pac(sellados, timbrados) + return + + +def _process_command_line_arguments(): + parser = argparse.ArgumentParser( + description='CFDI Test') + + parser.add_argument('-dc', '--dir-cert', default='') + help = "Nombre de los certificados, el predeterminado es 'cert'" + parser.add_argument('-n', '--nombre', help=help, default='cert') + + parser.add_argument('-d', '--dir-trabajo', dest='dir_trabajo', default='') + parser.add_argument('-de', '--dir-entrada', dest='dir_entrada', default='') + parser.add_argument('-ds', '--dir-sellados', dest='dir_sellados', default='') + parser.add_argument('-dt', '--dir-timbrados', dest='dir_timbrados', default='') + + parser.add_argument('-g', '--generar', dest='generar', + action='store_true', default=False, required=False) + parser.add_argument('-t', '--timbrar', dest='timbrar', + action='store_true', default=False, required=False) + + return parser.parse_args() + + +if __name__ == '__main__': + args = _process_command_line_arguments() + main(args) diff --git a/source/conf.py b/source/conf.py new file mode 100644 index 0000000..a2b6ccb --- /dev/null +++ b/source/conf.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +DEBUG = True + +TIMEOUT = 10 + +DELETE_FILES = False + +PAC_AUTH = { + 'user': 'AAA010101AAA', + 'pass': 'PWD', +} diff --git a/source/conf.py.ejemplo b/source/conf.py.ejemplo new file mode 100644 index 0000000..41128aa --- /dev/null +++ b/source/conf.py.ejemplo @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 + +DEBUG = True + +TIMEOUT = 10 diff --git a/source/helper/comercio.py b/source/helper/comercio.py new file mode 100644 index 0000000..295eca1 --- /dev/null +++ b/source/helper/comercio.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +import base64 +import logging + +import lxml.etree as ET +from . import mureq + +from conf import DEBUG + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + +logging.getLogger('requests').setLevel(logging.ERROR) + + +TIMEOUT = 10 + + +def pretty_print_POST(req): + """ + At this point it is completely built and ready + to be fired; it is "prepared". + + However pay attention at the formatting used in + this function because it is programmed to be pretty + printed and may differ from the actual request. + """ + print('{}\n{}\r\n{}\r\n\r\n{}'.format( + '-----------START-----------', + req.method + ' ' + req.url, + '\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()), + req.body, + )) + + +class PACComercioDigital(object): + ws = 'https://{}.comercio-digital.mx/{}' + api = 'https://app2.comercio-digital.mx/{}' + URL = { + 'timbra': ws.format('ws', 'timbre4/timbrarV5'), + 'cancel': ws.format('cancela', 'cancela4/cancelarUuid'), + 'cancelxml': ws.format('cancela', 'cancela4/cancelarXml'), + 'status': ws.format('cancela', 'arws/consultaEstatus'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + CODES = { + '000': '000 Exitoso', + '004': '004 RFC {} ya esta dado de alta con Estatus=A', + '704': '704 Usuario Invalido', + '702': '702 Error rfc/empresa invalido', + } + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/4', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + + if DEBUG: + ws = 'https://pruebas.comercio-digital.mx/{}' + ws6 = 'https://pruebas6.comercio-digital.mx/arws/{}' + URL = { + 'timbra': ws.format('timbre4/timbrarV5'), + 'cancel': ws.format('cancela4/cancelarUuid'), + 'cancelxml': ws.format('cancela4/cancelarXml'), + 'status': ws6.format('consultaEstatus'), + 'client': api.format('x3/altaEmpresa'), + 'saldo': api.format('x3/saldo'), + 'timbres': api.format('x3/altaTimbres'), + } + + def __init__(self): + self.error = '' + + def _error(self, msg): + self.error = str(msg) + # ~ log.error(msg) + return + + def _post(self, url, data, headers={}): + result = None + headers['host'] = url.split('/')[2] + headers['Content-type'] = 'text/plain' + headers['Connection'] = 'Keep-Alive' + headers['Expect'] = '100-continue' + + # ~ if DEBUG: + # ~ req = requests.Request('POST', url, headers=headers, data=data) + # ~ prepared = req.prepare() + # ~ pretty_print_POST(prepared) + + try: + result = mureq.post(url, body=data, headers=headers, timeout=TIMEOUT) + except Exception as e: + self._error(e) + + return result + + def _validate_cfdi(self, xml): + """ + Comercio Digital solo soporta la declaración con doble comilla + """ + # ~ tree = ET.fromstring(xml.encode()) + # ~ xml = ET.tostring(tree, + # ~ pretty_print=True, doctype='') + return xml.encode('utf-8') + + def stamp(self, cfdi, auth): + url = self.URL['timbra'] + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'tipo': 'XML', + } + cfdi = self._validate_cfdi(cfdi) + result = self._post(url, cfdi, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if 'errmsg' in result.headers: + self._error(result.headers['errmsg']) + return '' + + xml = result.content + tree = ET.fromstring(xml) + cfdi_uuid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=self.NS_CFDI) + date_stamped = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@FechaTimbrado)', + namespaces=self.NS_CFDI) + + data = { + 'xml': xml.decode(), + 'uuid': cfdi_uuid, + 'date': date_stamped, + } + return data + + def _get_data_cancel(self, cfdi, info, auth): + info['tipo'] = 'cfdi' + info['key'] = base64.b64encode(info['key_enc']).decode() + info['cer'] = base64.b64encode(info['cer_ori']).decode() + + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipo = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + uid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=NS_CFDI) + data = ( + f"USER={auth['user']}", + f"PWDW={auth['pass']}", + f"RFCE={rfc_emisor}", + f"UUID={uid}", + f"PWDK={info['pass']}", + f"KEYF={info['key']}", + f"CERT={info['cer']}", + f"TIPO1={info['tipo']}", + f"ACUS=SI", + f"RFCR={rfc_receptor}", + f"TIPOC={tipo}", + f"TOTAL={total}", + f"UUIDREL={info['args']['uuid']}", + f"MOTIVO={info['args']['reason']}", + ) + return '\n'.join(data) + + def cancel(self, cfdi, info, auth): + # ~ if DEBUG or not auth: + # ~ auth = AUTH + url = self.URL['cancel'] + data = self._get_data_cancel(cfdi, info, auth) + + result = self._post(url, data) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + tree = ET.fromstring(result.text) + date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19] + + data = { + 'acuse': result.text, + 'date': date_cancel, + } + + return data + + def _get_headers_cancel_xml(self, cfdi, info, auth): + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + tipocfdi = tree.xpath( + 'string(//cfdi:Comprobante/@TipoDeComprobante)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + + headers = { + 'usrws': auth['user'], + 'pwdws': auth['pass'], + 'rfcr': rfc_receptor, + 'total': total, + 'tipocfdi': tipocfdi, + } + headers.update(info) + + return headers + + def cancel_xml(self, xml, auth, cfdi='', info={'tipo': 'cfdi'}): + # ~ if DEBUG or not auth: + # ~ auth = AUTH + + url = self.URL['cancelxml'] + headers = self._get_headers_cancel_xml(cfdi, info, auth) + result = self._post(url, xml, headers) + + if result is None: + return '' + + if result.status_code != 200: + return '' + + if result.headers['codigo'] != '000': + self._error(result.headers['errmsg']) + return '' + + tree = ET.fromstring(result.text) + date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19] + + data = { + 'acuse': result.text, + 'date': date_cancel, + } + return data + + def status(self, data, auth): + # ~ if not auth: + # ~ auth = AUTH + url = self.URL['status'] + + data = ( + f"USER={auth['user']}", + f"PWDW={auth['pass']}", + f"RFCR={data['rfc_receptor']}", + f"RFCE={data['rfc_emisor']}", + f"TOTAL={data['total']}", + f"UUID={data['uuid']}", + ) + data = '\n'.join(data) + result = self._post(url, data) + + if result is None: + return '' + + if result.status_code != 200: + self._error(result.status_code) + return self.error + + return result.text + + def _get_data_client(self, auth, values): + data = [f"usr_ws={auth['user']}", f"pwd_ws={auth['pass']}"] + fields = ( + 'rfc_contribuyente', + 'nombre_contribuyente', + 'calle', + 'noExterior', + 'noInterior', + 'colonia', + 'localidad', + 'municipio', + 'estado', + 'pais', + 'cp', + 'contacto', + 'telefono', + 'email', + 'rep_nom', + 'rep_rfc', + 'email_fact', + 'pwd_asignado', + ) + data += [f"{k}={values[k]}" for k in fields] + + return '\n'.join(data) + + def client_add(self, data, auth): + # ~ auth = AUTH + url = self.URL['client'] + data = self._get_data_client(auth, data) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + + def client_balance(self, data, rfc=''): + url = self.URL['saldo'] + host = url.split('/')[2] + headers = { + 'Content-type': 'text/plain', + 'Host': host, + 'Connection' : 'Keep-Alive', + } + data = {'usr': data['user'], 'pwd': data['pass']} + try: + result = requests.get(url, params=data, headers=headers, timeout=TIMEOUT) + except ConnectionError as e: + self._error(e) + return '' + + if result.status_code != 200: + return '' + + if result.text == self.CODES['704']: + self._error(result.text) + return '' + + if result.text == self.CODES['702']: + self._error(result.text) + return '' + + return result.text + + def client_add_timbres(self, data, auth): + # ~ if not auth: + # ~ auth = AUTH + url = self.URL['timbres'] + data = '\n'.join(( + f"usr_ws={auth['user']}", + f"pwd_ws={auth['pass']}", + f"rfc_recibir={data['rfc']}", + f"num_timbres={data['timbres']}" + )) + + result = self._post(url, data) + + if result is None: + return False + + if result.status_code != 200: + self._error(f'Code: {result.status_code}') + return False + + if result.text != self.CODES['000']: + self._error(result.text) + return False + + return True + diff --git a/source/helper/mureq.py b/source/helper/mureq.py new file mode 100644 index 0000000..49547e4 --- /dev/null +++ b/source/helper/mureq.py @@ -0,0 +1,393 @@ +""" +mureq is a replacement for python-requests, intended to be vendored +in-tree by Linux systems software and other lightweight applications. + +mureq is copyright 2021 by its contributors and is released under the +0BSD ("zero-clause BSD") license. +""" +import contextlib +import io +import os.path +import socket +import ssl +import sys +import urllib.parse +from http.client import HTTPConnection, HTTPSConnection, HTTPMessage, HTTPException + +__version__ = '0.2.0' + +__all__ = ['HTTPException', 'TooManyRedirects', 'Response', + 'yield_response', 'request', 'get', 'post', 'head', 'put', 'patch', 'delete'] + +DEFAULT_TIMEOUT = 15.0 + +# e.g. "Python 3.8.10" +DEFAULT_UA = "Python " + sys.version.split()[0] + + +def request(method, url, *, read_limit=None, **kwargs): + """request performs an HTTP request and reads the entire response body. + + :param str method: HTTP method to request (e.g. 'GET', 'POST') + :param str url: URL to request + :param read_limit: maximum number of bytes to read from the body, or None for no limit + :type read_limit: int or None + :param kwargs: optional arguments defined by yield_response + :return: Response object + :rtype: Response + :raises: HTTPException + """ + with yield_response(method, url, **kwargs) as response: + try: + body = response.read(read_limit) + except HTTPException: + raise + except IOError as e: + raise HTTPException(str(e)) from e + return Response(response.url, response.status, _prepare_incoming_headers(response.headers), body) + + +def get(url, **kwargs): + """get performs an HTTP GET request.""" + return request('GET', url=url, **kwargs) + + +def post(url, body=None, **kwargs): + """post performs an HTTP POST request.""" + return request('POST', url=url, body=body, **kwargs) + + +def head(url, **kwargs): + """head performs an HTTP HEAD request.""" + return request('HEAD', url=url, **kwargs) + + +def put(url, body=None, **kwargs): + """put performs an HTTP PUT request.""" + return request('PUT', url=url, body=body, **kwargs) + + +def patch(url, body=None, **kwargs): + """patch performs an HTTP PATCH request.""" + return request('PATCH', url=url, body=body, **kwargs) + + +def delete(url, **kwargs): + """delete performs an HTTP DELETE request.""" + return request('DELETE', url=url, **kwargs) + + +@contextlib.contextmanager +def yield_response(method, url, *, unix_socket=None, timeout=DEFAULT_TIMEOUT, headers=None, + params=None, body=None, form=None, json=None, verify=True, source_address=None, + max_redirects=None, ssl_context=None): + """yield_response is a low-level API that exposes the actual + http.client.HTTPResponse via a contextmanager. + + Note that unlike mureq.Response, http.client.HTTPResponse does not + automatically canonicalize multiple appearances of the same header by + joining them together with a comma delimiter. To retrieve canonicalized + headers from the response, use response.getheader(): + https://docs.python.org/3/library/http.client.html#http.client.HTTPResponse.getheader + + :param str method: HTTP method to request (e.g. 'GET', 'POST') + :param str url: URL to request + :param unix_socket: path to Unix domain socket to query, or None for a normal TCP request + :type unix_socket: str or None + :param timeout: timeout in seconds, or None for no timeout (default: 15 seconds) + :type timeout: float or None + :param headers: HTTP headers as a mapping or list of key-value pairs + :param params: parameters to be URL-encoded and added to the query string, as a mapping or list of key-value pairs + :param body: payload body of the request + :type body: bytes or None + :param form: parameters to be form-encoded and sent as the payload body, as a mapping or list of key-value pairs + :param json: object to be serialized as JSON and sent as the payload body + :param bool verify: whether to verify TLS certificates (default: True) + :param source_address: source address to bind to for TCP + :type source_address: str or tuple(str, int) or None + :param max_redirects: maximum number of redirects to follow, or None (the default) for no redirection + :type max_redirects: int or None + :param ssl_context: TLS config to control certificate validation, or None for default behavior + :type ssl_context: ssl.SSLContext or None + :return: http.client.HTTPResponse, yielded as context manager + :rtype: http.client.HTTPResponse + :raises: HTTPException + """ + method = method.upper() + headers = _prepare_outgoing_headers(headers) + enc_params = _prepare_params(params) + body = _prepare_body(body, form, json, headers) + + visited_urls = [] + + while max_redirects is None or len(visited_urls) <= max_redirects: + url, conn, path = _prepare_request(method, url, enc_params=enc_params, timeout=timeout, unix_socket=unix_socket, verify=verify, source_address=source_address, ssl_context=ssl_context) + enc_params = '' # don't reappend enc_params if we get redirected + visited_urls.append(url) + try: + try: + conn.request(method, path, headers=headers, body=body) + response = conn.getresponse() + except HTTPException: + raise + except IOError as e: + # wrap any IOError that is not already an HTTPException + # in HTTPException, exposing a uniform API for remote errors + raise HTTPException(str(e)) from e + redirect_url = _check_redirect(url, response.status, response.headers) + if max_redirects is None or redirect_url is None: + response.url = url # https://bugs.python.org/issue42062 + yield response + return + else: + url = redirect_url + if response.status == 303: + # 303 See Other: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/303 + method = 'GET' + finally: + conn.close() + + raise TooManyRedirects(visited_urls) + + +class Response: + """Response contains a completely consumed HTTP response. + + :ivar str url: the retrieved URL, indicating whether a redirection occurred + :ivar int status_code: the HTTP status code + :ivar http.client.HTTPMessage headers: the HTTP headers + :ivar bytes body: the payload body of the response + """ + + __slots__ = ('url', 'status_code', 'headers', 'body') + + def __init__(self, url, status_code, headers, body): + self.url, self.status_code, self.headers, self.body = url, status_code, headers, body + + def __repr__(self): + return f"Response(status_code={self.status_code:d})" + + @property + def ok(self): + """ok returns whether the response had a successful status code + (anything other than a 40x or 50x).""" + return not (400 <= self.status_code < 600) + + @property + def content(self): + """content returns the response body (the `body` member). This is an + alias for compatibility with requests.Response.""" + return self.body + + def raise_for_status(self): + """raise_for_status checks the response's success code, raising an + exception for error codes.""" + if not self.ok: + raise HTTPErrorStatus(self.status_code) + + def json(self): + """Attempts to deserialize the response body as UTF-8 encoded JSON.""" + import json as jsonlib + return jsonlib.loads(self.body) + + def _debugstr(self): + buf = io.StringIO() + print("HTTP", self.status_code, file=buf) + for k, v in self.headers.items(): + print(f"{k}: {v}", file=buf) + print(file=buf) + try: + print(self.body.decode('utf-8'), file=buf) + except UnicodeDecodeError: + print(f"<{len(self.body)} bytes binary data>", file=buf) + return buf.getvalue() + + +class TooManyRedirects(HTTPException): + """TooManyRedirects is raised when automatic following of redirects was + enabled, but the server redirected too many times without completing.""" + pass + + +class HTTPErrorStatus(HTTPException): + """HTTPErrorStatus is raised by Response.raise_for_status() to indicate an + HTTP error code (a 40x or a 50x). Note that a well-formed response with an + error code does not result in an exception unless raise_for_status() is + called explicitly. + """ + + def __init__(self, status_code): + self.status_code = status_code + + def __str__(self): + return f"HTTP response returned error code {self.status_code:d}" + + +# end public API, begin internal implementation details + +_JSON_CONTENTTYPE = 'application/json' +_FORM_CONTENTTYPE = 'application/x-www-form-urlencoded' + + +class UnixHTTPConnection(HTTPConnection): + """UnixHTTPConnection is a subclass of HTTPConnection that connects to a + Unix domain stream socket instead of a TCP address. + """ + + def __init__(self, path, timeout=DEFAULT_TIMEOUT): + super(UnixHTTPConnection, self).__init__('localhost', timeout=timeout) + self._unix_path = path + + def connect(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.settimeout(self.timeout) + sock.connect(self._unix_path) + except Exception: + sock.close() + raise + self.sock = sock + + +def _check_redirect(url, status, response_headers): + """Return the URL to redirect to, or None for no redirection.""" + if status not in (301, 302, 303, 307, 308): + return None + location = response_headers.get('Location') + if not location: + return None + parsed_location = urllib.parse.urlparse(location) + if parsed_location.scheme: + # absolute URL + return location + + old_url = urllib.parse.urlparse(url) + if location.startswith('/'): + # absolute path on old hostname + return urllib.parse.urlunparse((old_url.scheme, old_url.netloc, + parsed_location.path, parsed_location.params, + parsed_location.query, parsed_location.fragment)) + + # relative path on old hostname + old_dir, _old_file = os.path.split(old_url.path) + new_path = os.path.join(old_dir, location) + return urllib.parse.urlunparse((old_url.scheme, old_url.netloc, + new_path, parsed_location.params, + parsed_location.query, parsed_location.fragment)) + + +def _prepare_outgoing_headers(headers): + if headers is None: + headers = HTTPMessage() + elif not isinstance(headers, HTTPMessage): + new_headers = HTTPMessage() + if hasattr(headers, 'items'): + iterator = headers.items() + else: + iterator = iter(headers) + for k, v in iterator: + new_headers[k] = v + headers = new_headers + _setdefault_header(headers, 'User-Agent', DEFAULT_UA) + return headers + + +# XXX join multi-headers together so that get(), __getitem__(), +# etc. behave intuitively, then stuff them back in an HTTPMessage. +def _prepare_incoming_headers(headers): + headers_dict = {} + for k, v in headers.items(): + headers_dict.setdefault(k, []).append(v) + result = HTTPMessage() + # note that iterating over headers_dict preserves the original + # insertion order in all versions since Python 3.6: + for k, vlist in headers_dict.items(): + result[k] = ','.join(vlist) + return result + + +def _setdefault_header(headers, name, value): + if name not in headers: + headers[name] = value + + +def _prepare_body(body, form, json, headers): + if body is not None: + if not isinstance(body, bytes): + raise TypeError('body must be bytes or None', type(body)) + return body + + if json is not None: + _setdefault_header(headers, 'Content-Type', _JSON_CONTENTTYPE) + import json as jsonlib + return jsonlib.dumps(json).encode('utf-8') + + if form is not None: + _setdefault_header(headers, 'Content-Type', _FORM_CONTENTTYPE) + return urllib.parse.urlencode(form, doseq=True) + + return None + + +def _prepare_params(params): + if params is None: + return '' + return urllib.parse.urlencode(params, doseq=True) + + +def _prepare_request(method, url, *, enc_params='', timeout=DEFAULT_TIMEOUT, source_address=None, unix_socket=None, verify=True, ssl_context=None): + """Parses the URL, returns the path and the right HTTPConnection subclass.""" + parsed_url = urllib.parse.urlparse(url) + + is_unix = (unix_socket is not None) + scheme = parsed_url.scheme.lower() + if scheme.endswith('+unix'): + scheme = scheme[:-5] + is_unix = True + if scheme == 'https': + raise ValueError("https+unix is not implemented") + + if scheme not in ('http', 'https'): + raise ValueError("unrecognized scheme", scheme) + + is_https = (scheme == 'https') + host = parsed_url.hostname + port = 443 if is_https else 80 + if parsed_url.port: + port = parsed_url.port + + if is_unix and unix_socket is None: + unix_socket = urllib.parse.unquote(parsed_url.netloc) + + path = parsed_url.path + if parsed_url.query: + if enc_params: + path = f'{path}?{parsed_url.query}&{enc_params}' + else: + path = f'{path}?{parsed_url.query}' + else: + if enc_params: + path = f'{path}?{enc_params}' + else: + pass # just parsed_url.path in this case + + if isinstance(source_address, str): + source_address = (source_address, 0) + + if is_unix: + conn = UnixHTTPConnection(unix_socket, timeout=timeout) + elif is_https: + if ssl_context is None: + ssl_context = ssl.create_default_context() + if not verify: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + conn = HTTPSConnection(host, port, source_address=source_address, timeout=timeout, + context=ssl_context) + else: + conn = HTTPConnection(host, port, source_address=source_address, timeout=timeout) + + munged_url = urllib.parse.urlunparse((parsed_url.scheme, parsed_url.netloc, + path, parsed_url.params, + '', parsed_url.fragment)) + return munged_url, conn, path diff --git a/source/helper/util.py b/source/helper/util.py new file mode 100644 index 0000000..d3a3798 --- /dev/null +++ b/source/helper/util.py @@ -0,0 +1,517 @@ +#!/usr/bin/env python3 + +import os +import re +import subprocess + +import lxml.etree as ET + +from settings import DEBUG, log, PATH_XSLT, DELETE_FILES, PAC_AUTH +from helper.comercio import PACComercioDigital as PAC + + +def _call(args): + return subprocess.check_output(args, shell=True).decode() + + +def join(*paths): + return os.path.join(*paths) + + +def kill(path): + try: + os.remove(path) + except: + pass + return + + +class Cert(): + OPENSSL = 'openssl' + + def __init__ (self, path, name): + self._get_data(path, name) + + @property + def serial_number(self): + return self._serial_number + + @property + def txt(self): + return self._cert_txt + + def _get_data(self, path, name): + path_cer = join(path, f'{name}.cer') + self._path_key = join(path, f'{name}.pem') + self._serial_number = self._get_serial_number(path_cer) + self._cert_txt = self._get_cert_txt(path_cer) + return + + def _get_serial_number(self, path_cer): + args = f'"{self.OPENSSL}" x509 -inform DER -in "{path_cer}" -noout -serial' + serial_number = _call(args) + serial_number = serial_number.split('=')[1].split('\n')[0][1::2] + return serial_number + + def _get_cert_txt(self, path_cer): + args = f'"{self.OPENSSL}" enc -base64 -in "{path_cer}"' + data = _call(args).replace('\n', '') + return data + + def sign(self, data): + args = f'echo -n -e "{data.decode()}" | "{self.OPENSSL}" dgst -sha256 -sign "{self._path_key}" | "{self.OPENSSL}" enc -base64' + data = _call(args).replace('\n', '') + return data + + +class DictToCfdi(): + _PREFIX = 'cfdi' + _XMLNS = 'http://www.sat.gob.mx/cfd/4' + _SCHEMA = f'{_XMLNS} http://www.sat.gob.mx/sitio_internet/cfd/4/cfdv40.xsd' + _LEYENDAS = { + 'version': '1.0', + 'prefix': 'leyendasFisc', + 'xmlns': 'http://www.sat.gob.mx/leyendasFiscales', + 'schema': ' http://www.sat.gob.mx/leyendasFiscales http://www.sat.gob.mx/sitio_internet/cfd/leyendasFiscales/leyendasFisc.xsd', + } + + def __init__ (self, data): + self._data = data + self._cfdi = None + self._root = None + self._attr_complementos = {} + self._make_cfdi() + + @property + def cfdi(self): + return self._cfdi + + def _make_cfdi(self): + self._validate_data() + self._comprobante() + self._relacionados() + self._emisor() + self._receptor() + self._conceptos() + self._impuestos() + self._complementos() + + xml = ET.tostring(self._root, + pretty_print=True, xml_declaration=True, encoding='utf-8') + self._cfdi = xml.decode() + return + + def _validate_data(self): + self._schema = self._SCHEMA + + if 'leyendas' in self._data['complementos']: + self._schema += self._LEYENDAS['schema'] + self._attr_complementos['leyendas'] = { + self._LEYENDAS['prefix']: self._LEYENDAS['xmlns'] + } + return + + def _comprobante(self): + attr = self._data['comprobante'] + NSMAP = { + self._PREFIX: self._XMLNS, + 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', + } + for k, value in self._attr_complementos.items(): + NSMAP.update(value) + + attr_qname = ET.QName( + 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') + schema = {attr_qname: self._schema} + + node_name = f'{{{self._XMLNS}}}Comprobante' + self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP) + return + + def _relacionados(self): + data = self._data['relacionados'] + if not data: + return + + node_name = f'{{{self._XMLNS}}}CfdiRelacionados' + attr = {'TipoRelacion': data['TipoRelacion']} + node = ET.SubElement(self._root, node_name, attr) + for uuid in data['UUID']: + node_name = f'{{{self._XMLNS}}}CfdiRelacionado' + attr = {'UUID': uuid} + ET.SubElement(node, node_name, attr) + return + + def _emisor(self): + attr = self._data['emisor'] + node_name = f'{{{self._XMLNS}}}Emisor' + emisor = ET.SubElement(self._root, node_name, attr) + return + + def _receptor(self): + attr = self._data['receptor'] + node_name = f'{{{self._XMLNS}}}Receptor' + emisor = ET.SubElement(self._root, node_name, attr) + return + + def _conceptos(self): + products = self._data['conceptos'] + node_name = f'{{{self._XMLNS}}}Conceptos' + node = ET.SubElement(self._root, node_name) + for product in products: + complemento = product.pop('complemento', {}) + taxes = product.pop('impuestos', {}) + node_name = f'{{{self._XMLNS}}}Concepto' + node_product = ET.SubElement(node, node_name, product) + if not taxes: + continue + + node_name = f'{{{self._XMLNS}}}Impuestos' + node_taxes = ET.SubElement(node_product, node_name) + traslados = taxes.get('traslados', []) + retenciones = taxes.get('retenciones', []) + + if traslados: + node_name = f'{{{self._XMLNS}}}Traslados' + node_tmp = ET.SubElement(node_taxes, node_name) + for tax in traslados: + node_name = f'{{{self._XMLNS}}}Traslado' + ET.SubElement(node_tmp, node_name, tax) + + if retenciones: + node_name = f'{{{self._XMLNS}}}Retenciones' + node_tmp = ET.SubElement(node_taxes, node_name) + for tax in retenciones: + node_name = f'{{{self._XMLNS}}}Retencion' + ET.SubElement(node_tmp, node_name, tax) + + return + + def _impuestos(self): + taxes = self._data['impuestos'] + if not taxes: + return + + node_name = f'{{{self._XMLNS}}}Impuestos' + retenciones = taxes.pop('retenciones', ()) + traslados = taxes.pop('traslados', ()) + node = ET.SubElement(self._root, node_name, taxes) + + if retenciones: + node_name = f'{{{self._XMLNS}}}Retenciones' + sub_node = ET.SubElement(node, node_name) + node_name = f'{{{self._XMLNS}}}Retencion' + for tax in retenciones: + ET.SubElement(sub_node, node_name, tax) + + if traslados: + node_name = f'{{{self._XMLNS}}}Traslados' + sub_node = ET.SubElement(node, node_name) + node_name = f'{{{self._XMLNS}}}Traslado' + for tax in traslados: + ET.SubElement(sub_node, node_name, tax) + return + + def _complementos(self): + if not self._data['complementos']: + return + + node_name = f'{{{self._XMLNS}}}Complemento' + node = ET.SubElement(self._root, node_name) + + if 'leyendas' in self._data['complementos']: + self._complemento_leyendas(self._data['complementos']['leyendas'], node) + return + + def _complemento_leyendas(self, data, node): + attr = {'version': self._LEYENDAS['version']} + node_name = f"{{{self._LEYENDAS['xmlns']}}}LeyendasFiscales" + node_leyendas = ET.SubElement(node, node_name, attr) + + for leyenda in data: + node_name = f"{{{self._LEYENDAS['xmlns']}}}Leyenda" + ET.SubElement(node_leyendas, node_name, leyenda) + return + + +class DataToDict(): + TRASLADO = 'T' + RETENCION = 'R' + NODES = { + '01': '_comprobante', + '02': '_relacionados', + '03': '_emisor', + '04': '_receptor', + '05': '_conceptos', + '06': '_impuestos', + '10': '_leyendas', + } + + def __init__ (self, data): + self._data = data + self._cfdi = {'conceptos': [], 'complementos': {}} + self._process_data() + + @property + def cfdi(self): + return self._cfdi + + def _process_data(self): + lines = self._data.split('\n') + for line in lines: + parts = line.split('|') + if not parts[0]: + continue + header = self.NODES.get(parts[0], '') + if not header: + log.debug(f'No existe: {parts[0]}') + continue + if hasattr(self, header): + getattr(self, header)(parts[2:]) + return + + def _comprobante(self, data): + self._cfdi['comprobante'] = {} + fields = ( + 'Version', + 'Serie', + 'Folio', + 'Fecha', + 'FormaPago', + 'CondicionesDePago', + 'SubTotal', + 'Descuento', + 'Moneda', + 'TipoCambio', + 'Total', + 'TipoDeComprobante', + 'MetodoPago', + 'LugarExpedicion', + 'Confirmacion', + 'Exportacion', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['comprobante'][field] = data[index] + return + + def _relacionados(self, data): + self._cfdi['relacionados'] = {} + if data[0]: + self._cfdi['relacionados']['TipoRelacion'] = data[0] + self._cfdi['relacionados']['UUID'] = data[1:] + return + + def _emisor(self, data): + self._cfdi['emisor'] = {} + fields = ( + 'Rfc', + 'Nombre', + 'RegimenFiscal', + ) + for index, field in enumerate(fields): + self._cfdi['emisor'][field] = data[index] + return + + def _receptor(self, data): + self._cfdi['receptor'] = {} + fields = ( + 'Rfc', + 'Nombre', + 'DomicilioFiscalReceptor', + 'ResidenciaFiscal', + 'NumRegIdTrib', + 'RegimenFiscalReceptor', + 'UsoCFDI', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['receptor'][field] = data[index] + return + + def _get_taxes_by_concept(self, data): + taxes = {} + traslados = [] + retenciones = [] + for i in range(0, len(data), 6): + type_tax = data[i] + tax = { + 'Base': data[i + 1], + 'Impuesto': data[i + 2], + 'TipoFactor': data[i + 3], + 'TasaOCuota': data[i + 4], + 'Importe': data[i + 5], + } + if type_tax == self.TRASLADO: + traslados.append(tax) + elif type_tax == self.RETENCION: + retenciones.append(tax) + if traslados: + taxes['traslados'] = traslados + if retenciones: + taxes['retenciones'] = retenciones + return taxes + + def _conceptos(self, data): + concepto = {} + fields = ( + 'ClaveProdServ', + 'NoIdentificacion', + 'Cantidad', + 'ClaveUnidad', + 'Unidad', + 'Descripcion', + 'ValorUnitario', + 'Importe', + 'Descuento', + 'ObjetoImp', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + concepto[field] = data[index] + pedimento = data[index + 1] + if pedimento: + concepto['pedimento'] = pedimento + concepto['impuestos'] = self._get_taxes_by_concept(data[index + 2:]) + self._cfdi['conceptos'].append(concepto) + return + + def _get_taxes(self, data): + traslados = [] + retenciones = [] + for i in range(0, len(data), 6): + type_tax = data[i] + if type_tax == self.TRASLADO: + tax = { + 'Base': data[i + 1], + 'Impuesto': data[i + 2], + 'TipoFactor': data[i + 3], + 'TasaOCuota': data[i + 4], + 'Importe': data[i + 5], + } + traslados.append(tax) + elif type_tax == self.RETENCION: + tax = { + 'Impuesto': data[i + 2], + 'Importe': data[i + 5], + } + retenciones.append(tax) + if traslados: + self._cfdi['impuestos']['traslados'] = traslados + if retenciones: + self._cfdi['impuestos']['retenciones'] = retenciones + return + + def _impuestos(self, data): + self._cfdi['impuestos'] = {} + fields = ( + 'TotalImpuestosRetenidos', + 'TotalImpuestosTrasladados', + ) + for index, field in enumerate(fields): + if not data[index]: + continue + self._cfdi['impuestos'][field] = data[index] + self._get_taxes(data[index + 1:]) + return + + def _leyendas(self, data): + if not data: + return + + leyendas = [] + for i in range(0, len(data), 3): + leyenda = { + 'disposicionFiscal': data[i], + 'norma': data[i+1], + 'textoLeyenda': data[i+2], + } + leyendas.append(leyenda) + + self._cfdi['complementos']['leyendas'] = leyendas + return + + +def stamp_cfdi(cfdi, cert): + xslt = open(PATH_XSLT, 'rb') + root = ET.fromstring(cfdi.encode()) + root.attrib['NoCertificado'] = cert.serial_number + root.attrib['Certificado'] = cert.txt + + transfor = ET.XSLT(ET.parse(xslt)) + cadena = str(transfor(root)).encode() + root.attrib['Sello'] = cert.sign(cadena) + xslt.close() + + xml = ET.tostring(root, pretty_print=True, encoding='utf-8') + + return xml.decode() + + +def _get_files(path, ext='xml'): + paths = [] + for folder, _, files in os.walk(path): + pattern = re.compile('\.{}'.format(ext), re.IGNORECASE) + paths += [join(folder, f) for f in files if pattern.search(f)] + return paths + + +def _read_file(path, encoding='utf-8'): + # ~ CODEC_WIN = 'ISO-8859-1' + with open(path, 'r', encoding=encoding) as f: + data = f.read() + if DEBUG: + msg = f'Archivo leido: {path}' + log.debug(msg) + return data + + +def _save_file(path, target, data): + _, filename = os.path.split(path) + name, _ = os.path.splitext(filename) + path_new = join(target, f'{name}.xml') + data = f'\n{data}' + with open(path_new, 'w', encoding='utf-8') as f: + f.write(data) + if DEBUG: + msg = f'Archivo sellado: {path}' + log.debug(msg) + return path_new + + +def make_cfdi(source, target, dir_cert, nombre): + cert = Cert(dir_cert, nombre) + paths = _get_files(source, 'txt') + for path in paths: + data = _read_file(path) + data = DataToDict(data).cfdi + cfdi = DictToCfdi(data).cfdi + cfdi = stamp_cfdi(cfdi, cert) + path_xml = _save_file(path, target, cfdi) + msg = f'CFDI: {path_xml}' + log.info(msg) + return + + +def stamp_pac(source, target): + pac = PAC() + paths = _get_files(source) + for path in paths: + log.info(f'\tEnviar: {path}') + _, filename = os.path.split(path) + data = open(path, 'r').read() + result = pac.stamp(data, PAC_AUTH) + + if pac.error: + log.error(pac.error) + continue + + new_path = f'{target}/{filename}' + + with open(new_path, 'w') as f: + f.write(result['xml']) + log.info(f'\tTimbrada: {new_path}') + return diff --git a/source/settings.py b/source/settings.py new file mode 100644 index 0000000..153dc85 --- /dev/null +++ b/source/settings.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +import logging +import os +from conf import DEBUG, DELETE_FILES, PAC_AUTH + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + +current_path = os.path.dirname(__file__) +PATH_XSLT = os.path.join(current_path, 'xslt', 'cadena.xslt') diff --git a/source/xslt/cadena.xslt b/source/xslt/cadena.xslt new file mode 100644 index 0000000..44de891 --- /dev/null +++ b/source/xslt/cadena.xslt @@ -0,0 +1,403 @@ + + + + + + + + + + + + + ||| + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/xslt/leyendasFisc.xslt b/source/xslt/leyendasFisc.xslt new file mode 100644 index 0000000..e0587a2 --- /dev/null +++ b/source/xslt/leyendasFisc.xslt @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/source/xslt/utilerias.xslt b/source/xslt/utilerias.xslt new file mode 100644 index 0000000..4ae4bf4 --- /dev/null +++ b/source/xslt/utilerias.xslt @@ -0,0 +1,22 @@ + + + + + + | + + + + + + + + | + + + + + + + +