Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
El Mau | 4ecaf7126d | |
El Mau | b9e1c3332a | |
Mauricio Baeza | 5adf7086aa | |
Mauricio Baeza | f68d4a898a | |
Mauricio Baeza | 315decd98a | |
Mauricio Baeza | 482a10aa5d | |
Mauricio Baeza | 3aee1aacfb | |
Mauricio Baeza | 7d3a7f404c |
|
@ -1,6 +1,11 @@
|
|||
# Lista de cambios
|
||||
|
||||
|
||||
## v 0.3.0 [31-Mar-22]
|
||||
---
|
||||
* Cambios en el SAT
|
||||
|
||||
|
||||
## v 0.2.1 [16-Jul-21]
|
||||
---
|
||||
* Fix - Issue #1
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
httpx
|
||||
peewee
|
||||
cryptography
|
||||
#cryptography
|
||||
python-dateutil
|
||||
lxml
|
||||
|
|
|
@ -41,13 +41,26 @@ def _process_command_line_arguments():
|
|||
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
|
||||
|
||||
help = "Descargar por Tipo: t=todos(default), e=emitidas, r=recibidas"
|
||||
parser.add_argument('-t', '--tipo', help=help, dest='type', default='t', choices=['t', 'e', 'r'])
|
||||
parser.add_argument('-t', '--tipo', help=help,
|
||||
dest='type', default='t', choices=['t', 'e', 'r'])
|
||||
help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
|
||||
parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1))
|
||||
parser.add_argument('-a', '--año', help=help,
|
||||
dest='year', default=year, type=int, choices=range(2014, year+1))
|
||||
help = "Mes de la descarga, el mes actual es el predeterminado"
|
||||
parser.add_argument('-m', '--mes', help=help, dest='month', default=0, type=int, choices=range(13))
|
||||
parser.add_argument('-m', '--mes', help=help,
|
||||
dest='month', default=0, type=int, choices=range(13))
|
||||
help = "Día de la descarga, de forma predeterminada no se usa"
|
||||
parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32))
|
||||
parser.add_argument('-d', '--dia', help=help,
|
||||
dest='day', default=0, type=int, choices=range(31))
|
||||
help = "Intervalo de días a partir de la fecha actual y hacia a atras"
|
||||
parser.add_argument('-ud', '--ultimos-dias', help=help,
|
||||
dest='last_days', default=0, type=int, choices=range(30))
|
||||
help = "Fecha inicial AAAA-MM-DD"
|
||||
parser.add_argument('-fi', '--fecha-inicial', help=help,
|
||||
dest='date_start', default='')
|
||||
help = "Fecha final AAAA-MM-DD"
|
||||
parser.add_argument('-ff', '--fecha-final', help=help,
|
||||
dest='date_end', default='')
|
||||
|
||||
help = 'Solicitar descarga'
|
||||
parser.add_argument('-sd', '--solicitar-descarga', help=help,
|
||||
|
@ -59,13 +72,19 @@ def _process_command_line_arguments():
|
|||
parser.add_argument('-da', '--descargar-archivos', help=help,
|
||||
action='store_true', default=False, required=False)
|
||||
help = 'ID de solicitud'
|
||||
parser.add_argument('-id', '--id-solicitud', dest='id_request', help=help, default='')
|
||||
parser.add_argument('-id', '--id-solicitud', dest='id_request',
|
||||
help=help, default='')
|
||||
help = 'ID archivo'
|
||||
parser.add_argument('-ida', '--id-archivo', dest='id_file', help=help, default='')
|
||||
parser.add_argument('-ida', '--id-archivo', dest='id_file',
|
||||
help=help, default='')
|
||||
help = 'Ruta de descarga de archivos'
|
||||
parser.add_argument('-dd', '--directorio-descargas', dest='path_download',
|
||||
help=help, default='')
|
||||
|
||||
help = 'Descargar solo metadatos'
|
||||
parser.add_argument('-md', '--metadata', help=help,
|
||||
action='store_true', default=False, required=False)
|
||||
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
|
|
@ -5,3 +5,5 @@ DEBUG = False
|
|||
# ~ Este valor se usa para cifrar la FIEL
|
||||
# ~ Si la cambias en producción, debes de validar de nuevo las FIELs
|
||||
TOKEN = ''
|
||||
|
||||
TIMEOUT = 10
|
||||
|
|
|
@ -0,0 +1,253 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import re
|
||||
from subprocess import check_output
|
||||
from dateutil.parser import parse
|
||||
from conf import TOKEN
|
||||
|
||||
|
||||
class SATCertificate(object):
|
||||
OPENSSL = 'openssl'
|
||||
|
||||
def __init__(self, cer=b'', key=b'', pem=b'', password=''):
|
||||
self._error = ''
|
||||
self._init_values()
|
||||
self._get_data_cer(cer)
|
||||
self._get_data_key(key, password)
|
||||
self._get_data_pem(pem)
|
||||
|
||||
def _init_values(self):
|
||||
self._rfc = ''
|
||||
self._serial_number = ''
|
||||
self._serial_number2 = ''
|
||||
self._not_before = None
|
||||
self._not_after = None
|
||||
self._is_fiel = False
|
||||
self._are_couple = False
|
||||
self._is_valid_time = False
|
||||
self._cer_pem = ''
|
||||
self._cer_txt = ''
|
||||
self._key_enc = b''
|
||||
self._key_pem = b''
|
||||
self._p12 = b''
|
||||
self._cer_modulus = 0
|
||||
self._key_modulus = 0
|
||||
self._issuer = ''
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
msg = '\tRFC: {}\n'.format(self.rfc)
|
||||
msg += '\tNo de Serie: {}\n'.format(self.serial_number)
|
||||
msg += '\tVálido desde: {}\n'.format(self.not_before)
|
||||
msg += '\tVálido hasta: {}\n'.format(self.not_after)
|
||||
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
|
||||
msg += '\tSon pareja: {}\n'.format(self.are_couple)
|
||||
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
|
||||
return msg
|
||||
|
||||
def __bool__(self):
|
||||
return self.is_valid
|
||||
|
||||
def _get_hash(self):
|
||||
digest = hashes.Hash(hashes.SHA512(), default_backend())
|
||||
digest.update(self._rfc.encode())
|
||||
digest.update(self._serial_number.encode())
|
||||
digest.update(TOKEN.encode())
|
||||
return digest.finalize()
|
||||
|
||||
def _get_data_cer(self, cer):
|
||||
# ~ RFC
|
||||
cmd = f'{self.OPENSSL} x509 -inform der -in "{cer}" -noout -subject'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
pattern = r'[A-Z]{3,4}[0-9]{6}[A-Z0-9]{3}'
|
||||
self._rfc = re.search(pattern, result)[0]
|
||||
# ~ Serial number
|
||||
sep = '='
|
||||
cmd = f'{self.OPENSSL} x509 -inform der -in "{cer}" -noout -serial'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
self._serial_number2 = result.split(sep)[1]
|
||||
self._serial_number = self._serial_number2[1::2]
|
||||
# ~ Dates
|
||||
cmd = f'{self.OPENSSL} x509 -inform der -in "{cer}" -noout -dates'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
data = result.split('\n')
|
||||
self._not_before = parse(data[0].split('=')[1])
|
||||
self._not_after = parse(data[1].split('=')[1])
|
||||
# ~ Issuer
|
||||
cmd = f'{self.OPENSSL} x509 -inform der -in "{cer}" -noout -issuer'
|
||||
self._issuer = check_output(cmd, shell=True).decode()[7:]
|
||||
|
||||
now = datetime.datetime.utcnow().timestamp()
|
||||
self._is_valid_time = (now > self.not_before.timestamp()) and (now < self.not_after.timestamp())
|
||||
if not self._is_valid_time:
|
||||
msg = 'El certificado no es vigente'
|
||||
self._error = msg
|
||||
|
||||
# ~ self._is_fiel = obj.extensions.get_extension_for_oid(
|
||||
# ~ ExtensionOID.KEY_USAGE).value.key_agreement
|
||||
|
||||
cmd = f'{self.OPENSSL} x509 -inform der -in "{cer}"'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
self._cer_txt = ''.join(result.split('\n')[1:-2])
|
||||
|
||||
# ~ self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
|
||||
# ~ self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
|
||||
# ~ self._cer_modulus = obj.public_key().public_numbers().n
|
||||
return
|
||||
|
||||
def _get_data_key(self, key, password):
|
||||
self._key_enc = key
|
||||
if not key or not password:
|
||||
return
|
||||
|
||||
# ~ try:
|
||||
# ~ obj = serialization.load_der_private_key(
|
||||
# ~ key, password.encode(), default_backend())
|
||||
# ~ except ValueError:
|
||||
# ~ msg = 'La contraseña es incorrecta'
|
||||
# ~ self._error = msg
|
||||
# ~ return
|
||||
|
||||
# ~ p = self._get_hash()
|
||||
# ~ self._key_enc = obj.private_bytes(
|
||||
# ~ encoding=serialization.Encoding.PEM,
|
||||
# ~ format=serialization.PrivateFormat.PKCS8,
|
||||
# ~ encryption_algorithm=serialization.BestAvailableEncryption(p)
|
||||
# ~ )
|
||||
|
||||
# ~ self._key_modulus = obj.public_key().public_numbers().n
|
||||
# ~ self._are_couple = self._cer_modulus == self._key_modulus
|
||||
# ~ if not self._are_couple:
|
||||
# ~ msg = 'El CER y el KEY no son pareja'
|
||||
# ~ self._error = msg
|
||||
return
|
||||
|
||||
def _get_key(self, password):
|
||||
if not password:
|
||||
password = self._get_hash()
|
||||
private_key = serialization.load_pem_private_key(
|
||||
self._key_enc, password=password, backend=default_backend())
|
||||
return private_key
|
||||
|
||||
def _get_key_pem(self):
|
||||
obj = self._get_key('')
|
||||
key_pem = obj.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
return key_pem
|
||||
|
||||
def _get_data_pem(self, pem):
|
||||
if not pem:
|
||||
return
|
||||
|
||||
self._key_pem = pem
|
||||
|
||||
# ~ self._key_pem = serialization.load_pem_private_key(
|
||||
# ~ pem, None, backend=default_backend())
|
||||
return
|
||||
|
||||
# Not work
|
||||
def _get_p12(self):
|
||||
obj = serialization.pkcs12.serialize_key_and_certificates('test',
|
||||
self.key_pem, self.cer_pem, None,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
return obj
|
||||
|
||||
def sign(self, data, password=''):
|
||||
private_key = self._get_key(password)
|
||||
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
|
||||
return base64.b64encode(firma).decode()
|
||||
|
||||
def _sign_with_pem(self, data, name_hash):
|
||||
cmd = f"echo -n -e '{data.decode()}' | {self.OPENSSL} dgst -{name_hash} -sign '{self._key_pem}' | {self.OPENSSL} enc -base64"
|
||||
result = check_output(cmd, shell=True).decode().replace('\n', '')
|
||||
return result
|
||||
|
||||
def sign_sha1(self, data, password=''):
|
||||
if self._key_pem:
|
||||
return self._sign_with_pem(data, 'sha1')
|
||||
|
||||
private_key = self._get_key(password)
|
||||
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1())
|
||||
return base64.b64encode(firma).decode()
|
||||
|
||||
def sign_xml(self, tree):
|
||||
import xmlsec
|
||||
|
||||
node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
|
||||
ctx = xmlsec.SignatureContext()
|
||||
key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
|
||||
ctx.key = key
|
||||
ctx.sign(node)
|
||||
node = xmlsec.tree.find_node(tree, 'X509Certificate')
|
||||
node.text = self.cer_txt
|
||||
return tree
|
||||
|
||||
@property
|
||||
def rfc(self):
|
||||
return self._rfc
|
||||
|
||||
@property
|
||||
def serial_number(self):
|
||||
return self._serial_number
|
||||
|
||||
@property
|
||||
def serial_number2(self):
|
||||
return self._serial_number2
|
||||
|
||||
@property
|
||||
def not_before(self):
|
||||
return self._not_before
|
||||
|
||||
@property
|
||||
def not_after(self):
|
||||
return self._not_after
|
||||
|
||||
@property
|
||||
def is_fiel(self):
|
||||
return self._is_fiel
|
||||
|
||||
@property
|
||||
def are_couple(self):
|
||||
return self._are_couple
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return not bool(self.error)
|
||||
|
||||
@property
|
||||
def is_valid_time(self):
|
||||
return self._is_valid_time
|
||||
|
||||
@property
|
||||
def cer_pem(self):
|
||||
return self._cer_pem.encode()
|
||||
|
||||
@property
|
||||
def cer_txt(self):
|
||||
return self._cer_txt
|
||||
|
||||
@property
|
||||
def key_pem(self):
|
||||
return self._get_key_pem()
|
||||
|
||||
@property
|
||||
def key_enc(self):
|
||||
return self._key_enc
|
||||
|
||||
@property
|
||||
def issuer(self):
|
||||
return self._issuer
|
||||
|
||||
@property
|
||||
def p12(self):
|
||||
return self._get_p12()
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
return self._error
|
|
@ -8,6 +8,8 @@ from datetime import datetime, timedelta
|
|||
import httpx
|
||||
import lxml.etree as ET
|
||||
|
||||
from conf import TIMEOUT
|
||||
|
||||
|
||||
class SATWebService():
|
||||
BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx'
|
||||
|
@ -29,6 +31,8 @@ class SATWebService():
|
|||
'Content-type': 'text/xml;charset="utf-8"',
|
||||
'Accept': 'text/xml',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Expect': '100-continue',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
}
|
||||
NS = {
|
||||
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
||||
|
@ -148,38 +152,69 @@ class SATWebService():
|
|||
nsmap = {'s': self.NS['s'], None: self.XMLNS}
|
||||
node_name = 's:Body/AutenticaResponse/AutenticaResult'
|
||||
token = result.find(node_name, namespaces=nsmap).text
|
||||
# ~ print(f'Token: {token}')
|
||||
|
||||
return token
|
||||
|
||||
def _get_data_req(self, args):
|
||||
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
# ~ NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
# ~ NSMAP = {'s': self.NS['s'], 'xd': self.NS['xd']}
|
||||
NSMAP = {'s': self.NS['s']}
|
||||
FORMAT = '%Y-%m-%dT%H:%M:%S'
|
||||
|
||||
date_start = args['date_start']
|
||||
date_end = args['date_end']
|
||||
msg = f'Descarga desde: {date_start} hasta: {date_end}'
|
||||
print(msg)
|
||||
|
||||
node_name = f"{{{self.NS['s']}}}Envelope"
|
||||
root = ET.Element(node_name, nsmap=NSMAP)
|
||||
|
||||
node_name = f"{{{self.NS['s']}}}Header"
|
||||
body = ET.SubElement(root, node_name)
|
||||
header = ET.SubElement(root, node_name)
|
||||
|
||||
node_name = 'ActivityId'
|
||||
attr = {'CorrelationId': '806aad0d-ef46-443b-9741-040c8e8e8c7d'}
|
||||
nsmap = {None: 'http://schemas.microsoft.com/2004/09/ServiceModel/Diagnostics'}
|
||||
activity = ET.SubElement(header, node_name, attr, nsmap=nsmap)
|
||||
activity.text = 'e906cfb4-f706-43de-94d0-5cc935be1aaa'
|
||||
|
||||
node_name = f"{{{self.NS['s']}}}Body"
|
||||
body = ET.SubElement(root, node_name)
|
||||
|
||||
node_name = f"{{{self.NS['des']}}}SolicitaDescarga"
|
||||
request_down = ET.SubElement(body, node_name)
|
||||
|
||||
node_name = f"{{{self.NS['des']}}}solicitud"
|
||||
attr = {
|
||||
'RfcSolicitante': self._cert.rfc,
|
||||
'FechaFinal': date_end.strftime(FORMAT),
|
||||
'FechaInicial': date_start.strftime(FORMAT),
|
||||
'TipoSolicitud': 'CFDI',
|
||||
args['rfc']: self._cert.rfc,
|
||||
nsmap = {
|
||||
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
|
||||
'xsd': 'http://www.w3.org/2001/XMLSchema',
|
||||
}
|
||||
body = ET.SubElement(root, node_name, nsmap=nsmap)
|
||||
|
||||
# ~ node_name = f"{{{self.NS['des']}}}SolicitaDescarga"
|
||||
node_name = "SolicitaDescarga"
|
||||
nsmap = {None: 'http://DescargaMasivaTerceros.sat.gob.mx'}
|
||||
request_down = ET.SubElement(body, node_name, nsmap=nsmap)
|
||||
|
||||
# ~ node_name = f"{{{self.NS['des']}}}solicitud"
|
||||
node_name = "solicitud"
|
||||
|
||||
type_request = 'CFDI'
|
||||
if args['metadata']:
|
||||
type_request = 'Metadata'
|
||||
attr = {
|
||||
'FechaInicial': date_start.strftime(FORMAT),
|
||||
'FechaFinal': date_end.strftime(FORMAT),
|
||||
'RfcSolicitante': self._cert.rfc,
|
||||
'TipoSolicitud': type_request,
|
||||
# ~ 'RfcACuentaTerceros': '',
|
||||
}
|
||||
if args['rfc'] == 'RfcEmisor':
|
||||
attr['RfcEmisor'] = self._cert.rfc
|
||||
request = ET.SubElement(request_down, node_name, attr)
|
||||
|
||||
node_name = 'RfcReceptores'
|
||||
node_receptores = ET.SubElement(request, node_name)
|
||||
node_name = 'RfcReceptor'
|
||||
node_receptor = ET.SubElement(node_receptores, node_name)
|
||||
if args['rfc'] == 'RfcReceptor':
|
||||
node_receptor.text = self._cert.rfc
|
||||
|
||||
nsmap = {None: self.NS['xd']}
|
||||
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
|
||||
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
|
||||
|
@ -192,7 +227,8 @@ class SATWebService():
|
|||
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
||||
signature_method = ET.SubElement(signed_info, node_name, attr)
|
||||
|
||||
attr = {'URI': '#_0'}
|
||||
# ~ attr = {'URI': '#_0'}
|
||||
attr = {'URI': ''}
|
||||
reference = ET.SubElement(signed_info, 'Reference', attr)
|
||||
transforms = ET.SubElement(reference, 'Transforms')
|
||||
ET.SubElement(transforms, 'Transform', attr1)
|
||||
|
@ -220,18 +256,25 @@ class SATWebService():
|
|||
x_serial_number.text = str(self._cert.serial_number2)
|
||||
x_cert.text = self._cert.cer_txt
|
||||
|
||||
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
||||
soap = ET.tostring(root)
|
||||
|
||||
# ~ soap = b'<?xml version="1.0"?>\n' + ET.tostring(root, pretty_print=True, encoding='utf-8')
|
||||
soap = ET.tostring(root, pretty_print=True)
|
||||
# ~ print(soap.decode())
|
||||
return soap
|
||||
|
||||
def request_download(self, args):
|
||||
headers = self.HEADERS.copy()
|
||||
headers['SOAPAction'] = self.ACTIONS['REQ']
|
||||
headers['Host'] = 'srvsolicituddescargamaster.cloudapp.net'
|
||||
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
||||
data = self._get_data_req(args)
|
||||
|
||||
response = httpx.post(self.URL['REQ'], data=data, headers=headers)
|
||||
try:
|
||||
response = httpx.post(self.URL['REQ'],
|
||||
data=data, headers=headers, timeout=TIMEOUT)
|
||||
except httpx.TimeoutException as exc:
|
||||
print(exc)
|
||||
return
|
||||
|
||||
if response.status_code != httpx.codes.OK:
|
||||
self._error = f'Status: {response.status_code} - {response.text}'
|
||||
return
|
||||
|
@ -244,7 +287,7 @@ class SATWebService():
|
|||
return data
|
||||
|
||||
def _get_data_verify(self, args):
|
||||
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
|
||||
node_name = f"{{{self.NS['s']}}}Envelope"
|
||||
root = ET.Element(node_name, nsmap=NSMAP)
|
||||
|
@ -277,7 +320,8 @@ class SATWebService():
|
|||
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
||||
signature_method = ET.SubElement(signed_info, node_name, attr)
|
||||
|
||||
attr = {'URI': '#_0'}
|
||||
# ~ attr = {'URI': '#_0'}
|
||||
attr = {'URI': ''}
|
||||
reference = ET.SubElement(signed_info, 'Reference', attr)
|
||||
transforms = ET.SubElement(reference, 'Transforms')
|
||||
ET.SubElement(transforms, 'Transform', attr1)
|
||||
|
@ -330,7 +374,7 @@ class SATWebService():
|
|||
return data
|
||||
|
||||
def _get_data_download(self, args):
|
||||
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||
|
||||
node_name = f"{{{self.NS['s']}}}Envelope"
|
||||
root = ET.Element(node_name, nsmap=NSMAP)
|
||||
|
@ -363,7 +407,8 @@ class SATWebService():
|
|||
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
||||
signature_method = ET.SubElement(signed_info, node_name, attr)
|
||||
|
||||
attr = {'URI': '#_0'}
|
||||
# ~ attr = {'URI': '#_0'}
|
||||
attr = {'URI': ''}
|
||||
reference = ET.SubElement(signed_info, 'Reference', attr)
|
||||
transforms = ET.SubElement(reference, 'Transforms')
|
||||
ET.SubElement(transforms, 'Transform', attr1)
|
||||
|
@ -402,7 +447,13 @@ class SATWebService():
|
|||
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
||||
data = self._get_data_download(args)
|
||||
|
||||
response = httpx.post(self.URL['DOWN'], data=data, headers=headers)
|
||||
try:
|
||||
response = httpx.post(self.URL['DOWN'],
|
||||
data=data, headers=headers, timeout=TIMEOUT)
|
||||
except httpx.TimeoutException as exc:
|
||||
print(exc)
|
||||
return
|
||||
|
||||
if response.status_code != httpx.codes.OK:
|
||||
self._error = f'Status: {response.status_code} - {response.text}'
|
||||
return
|
||||
|
|
|
@ -3,11 +3,11 @@
|
|||
import getpass
|
||||
import uuid
|
||||
from calendar import monthrange
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from .cfdi_cert import SATCertificate
|
||||
from .cfdi_openssl import SATCertificate
|
||||
from .sat_web import SATWebService
|
||||
from settings import log
|
||||
|
||||
|
@ -76,7 +76,13 @@ def fiel_validar(args):
|
|||
if not result:
|
||||
return
|
||||
|
||||
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
|
||||
try:
|
||||
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
|
||||
except KeyboardInterrupt:
|
||||
msg = 'Proceso cancelado'
|
||||
log.info(msg)
|
||||
return
|
||||
|
||||
if not password:
|
||||
msg = 'La contraseña es requerida para validar la FIEL'
|
||||
log.error(msg)
|
||||
|
@ -113,17 +119,28 @@ def base_datos():
|
|||
|
||||
|
||||
def _get_cert(data):
|
||||
key = b''
|
||||
pem = b''
|
||||
cer = data['path_cer'].read_bytes()
|
||||
if data['path_enc'].is_file():
|
||||
key = data['path_enc'].read_bytes()
|
||||
elif data['path_pem'].is_file():
|
||||
pem = data['path_pem'].read_bytes()
|
||||
key = data['path_enc']
|
||||
pem = data['path_pem']
|
||||
cer = data['path_cer']
|
||||
cert = SATCertificate(cer, key, pem)
|
||||
return cert
|
||||
|
||||
|
||||
def _to_date(str_date, end=False):
|
||||
error = ''
|
||||
dt = None
|
||||
try:
|
||||
parts = str_date.split('-')
|
||||
if end:
|
||||
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 23, 59, 59)
|
||||
else:
|
||||
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 0, 0, 0)
|
||||
except Exception as e:
|
||||
error = 'Fecha inválida'
|
||||
|
||||
return error, dt
|
||||
|
||||
|
||||
def _validate_requests_args(args):
|
||||
result, data = _validate_fiel_args(args)
|
||||
if not result:
|
||||
|
@ -134,9 +151,6 @@ def _validate_requests_args(args):
|
|||
log.error(msg)
|
||||
return False, {}
|
||||
|
||||
# ~ cer = data['path_cer'].read_bytes()
|
||||
# ~ key = data['path_enc'].read_bytes()
|
||||
# ~ cert = SATCertificate(cer, key)
|
||||
cert = _get_cert(data)
|
||||
|
||||
if not cert.is_valid_time:
|
||||
|
@ -149,6 +163,32 @@ def _validate_requests_args(args):
|
|||
data['year'] = args.year
|
||||
data['month'] = args.month
|
||||
data['day'] = args.day
|
||||
data['metadata'] = args.metadata
|
||||
|
||||
now = today()
|
||||
|
||||
if args.last_days:
|
||||
date_start = now.replace(hour=0, minute=0, second=0, microsecond=0) \
|
||||
- timedelta(days=args.last_days)
|
||||
date_end = now.replace(hour=23, minute=59, second=59, microsecond=0)
|
||||
data['date_start'] = date_start
|
||||
data['date_end'] = date_end
|
||||
return True, data
|
||||
|
||||
if args.date_start and args.date_end:
|
||||
error, date_start = _to_date(args.date_start)
|
||||
if error:
|
||||
log.error(error)
|
||||
return False, {}
|
||||
|
||||
error, date_end = _to_date(args.date_end, True)
|
||||
if error:
|
||||
log.error(error)
|
||||
return False, {}
|
||||
|
||||
data['date_start'] = date_start
|
||||
data['date_end'] = date_end
|
||||
return True, data
|
||||
|
||||
if data['day']:
|
||||
if not validate_date(data['year'], data['month'], data['day']):
|
||||
|
@ -156,8 +196,6 @@ def _validate_requests_args(args):
|
|||
log.error(msg)
|
||||
return False, {}
|
||||
|
||||
now = today()
|
||||
|
||||
month1 = month2 = data['month']
|
||||
if month1 == 0:
|
||||
month1 = 1
|
||||
|
@ -180,14 +218,12 @@ def _validate_verificar_args(args):
|
|||
if not result:
|
||||
return False, {}
|
||||
|
||||
if not data['path_enc'].is_file():
|
||||
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
||||
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
|
||||
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
|
||||
log.error(msg)
|
||||
return False, {}
|
||||
|
||||
cer = data['path_cer'].read_bytes()
|
||||
key = data['path_enc'].read_bytes()
|
||||
cert = SATCertificate(cer, key)
|
||||
cert = _get_cert(data)
|
||||
|
||||
if not cert.is_valid_time:
|
||||
msg = 'La FIEL no es vigente'
|
||||
|
@ -210,14 +246,12 @@ def _validate_download_args(args):
|
|||
if not result:
|
||||
return False, {}
|
||||
|
||||
if not data['path_enc'].is_file():
|
||||
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
||||
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
|
||||
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
|
||||
log.error(msg)
|
||||
return False, {}
|
||||
|
||||
cer = data['path_cer'].read_bytes()
|
||||
key = data['path_enc'].read_bytes()
|
||||
cert = SATCertificate(cer, key)
|
||||
cert = _get_cert(data)
|
||||
|
||||
if not cert.is_valid_time:
|
||||
msg = 'La FIEL no es vigente'
|
||||
|
@ -305,6 +339,10 @@ def descargar_archivos(args):
|
|||
else:
|
||||
result = sat.verify(data)
|
||||
files = result['files']
|
||||
if result['EstadoSolicitud'] in ('1', '2'):
|
||||
msg = 'Solicitud aún no aceptada...'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
for f in files:
|
||||
data['id_file'] = f
|
||||
|
@ -337,9 +375,11 @@ def _validate_args(args):
|
|||
return True, data
|
||||
|
||||
|
||||
def _download(sat, data, key):
|
||||
def _download(data, key):
|
||||
OK = '5000'
|
||||
|
||||
sat = SATWebService(data['cert'])
|
||||
|
||||
result = _request_download(sat, data, key)
|
||||
if result['CodEstatus'] != OK:
|
||||
log.error(result)
|
||||
|
@ -354,7 +394,9 @@ def _download(sat, data, key):
|
|||
sleep(1)
|
||||
|
||||
while True:
|
||||
sat = SATWebService(data['cert'])
|
||||
result = sat.verify(data)
|
||||
|
||||
if result['EstadoSolicitud'] in ('1', '2'):
|
||||
msg = 'Esperando un minuto más para volver a verificar...'
|
||||
log.info(msg)
|
||||
|
@ -397,12 +439,12 @@ def descargar(args):
|
|||
return
|
||||
|
||||
if data['type'] == 'e':
|
||||
_download(sat, data, 'RfcEmisor')
|
||||
_download(data, 'RfcEmisor')
|
||||
elif data['type'] == 'r':
|
||||
_download(sat, data, 'RfcReceptor')
|
||||
_download(data, 'RfcReceptor')
|
||||
else:
|
||||
_download(sat, data, 'RfcEmisor')
|
||||
_download(sat, data, 'RfcReceptor')
|
||||
_download(data, 'RfcEmisor')
|
||||
_download(data, 'RfcReceptor')
|
||||
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in New Issue