Solicitar descarga
This commit is contained in:
parent
d8d9b8bab6
commit
48155488c3
|
@ -1,3 +1,4 @@
|
||||||
httpx
|
httpx
|
||||||
peewee
|
peewee
|
||||||
cryptography
|
cryptography
|
||||||
|
lxml
|
||||||
|
|
|
@ -17,7 +17,6 @@ def main(args):
|
||||||
def _process_command_line_arguments():
|
def _process_command_line_arguments():
|
||||||
now = util.today()
|
now = util.today()
|
||||||
year = now.year
|
year = now.year
|
||||||
month = now.month
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='CFDI Descarga SAT')
|
parser = argparse.ArgumentParser(description='CFDI Descarga SAT')
|
||||||
|
|
||||||
|
@ -34,7 +33,7 @@ def _process_command_line_arguments():
|
||||||
help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
|
help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
|
||||||
parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1))
|
parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1))
|
||||||
help = "Mes de la descarga, el mes actual es el predeterminado"
|
help = "Mes de la descarga, el mes actual es el predeterminado"
|
||||||
parser.add_argument('-m', '--mes', help=help, dest='month', default=month, type=int, choices=range(13))
|
parser.add_argument('-m', '--mes', help=help, dest='month', default=0, type=int, choices=range(13))
|
||||||
help = "Día de la descarga, de forma predeterminada no se usa"
|
help = "Día de la descarga, de forma predeterminada no se usa"
|
||||||
parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32))
|
parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32))
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ class SATCertificate(object):
|
||||||
self._p12 = b''
|
self._p12 = b''
|
||||||
self._cer_modulus = 0
|
self._cer_modulus = 0
|
||||||
self._key_modulus = 0
|
self._key_modulus = 0
|
||||||
|
self._issuer = ''
|
||||||
return
|
return
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
@ -68,6 +69,8 @@ class SATCertificate(object):
|
||||||
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
|
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
|
||||||
self._not_before = obj.not_valid_before
|
self._not_before = obj.not_valid_before
|
||||||
self._not_after = obj.not_valid_after
|
self._not_after = obj.not_valid_after
|
||||||
|
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
now = datetime.datetime.utcnow()
|
||||||
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
|
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
|
||||||
if not self._is_valid_time:
|
if not self._is_valid_time:
|
||||||
|
@ -80,6 +83,7 @@ class SATCertificate(object):
|
||||||
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
|
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
|
||||||
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
|
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
|
||||||
self._cer_modulus = obj.public_key().public_numbers().n
|
self._cer_modulus = obj.public_key().public_numbers().n
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def _get_data_key(self, key, password):
|
def _get_data_key(self, key, password):
|
||||||
|
@ -203,6 +207,10 @@ class SATCertificate(object):
|
||||||
def key_enc(self):
|
def key_enc(self):
|
||||||
return self._key_enc
|
return self._key_enc
|
||||||
|
|
||||||
|
@property
|
||||||
|
def issuer(self):
|
||||||
|
return self._issuer
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def p12(self):
|
def p12(self):
|
||||||
return self._get_p12()
|
return self._get_p12()
|
||||||
|
@ -210,52 +218,3 @@ class SATCertificate(object):
|
||||||
@property
|
@property
|
||||||
def error(self):
|
def error(self):
|
||||||
return self._error
|
return self._error
|
||||||
|
|
||||||
|
|
||||||
def main(args):
|
|
||||||
contra = getpass.getpass('Introduce la contraseña del archivo KEY: ')
|
|
||||||
#contra = '12345678a'
|
|
||||||
if not contra.strip():
|
|
||||||
msg = 'La contraseña es requerida'
|
|
||||||
print(msg)
|
|
||||||
return
|
|
||||||
|
|
||||||
path_cer = Path(args.cer)
|
|
||||||
path_key = Path(args.key)
|
|
||||||
|
|
||||||
if not path_cer.is_file():
|
|
||||||
msg = 'El archivo CER es necesario'
|
|
||||||
print(msg)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not path_key.is_file():
|
|
||||||
msg = 'El archivo KEY es necesario'
|
|
||||||
print(msg)
|
|
||||||
return
|
|
||||||
|
|
||||||
cer = path_cer.read_bytes()
|
|
||||||
key = path_key.read_bytes()
|
|
||||||
cert = SATCertificate(cer, key, contra)
|
|
||||||
|
|
||||||
if cert.error:
|
|
||||||
print(cert.error)
|
|
||||||
else:
|
|
||||||
print(cert)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def _process_command_line_arguments():
|
|
||||||
parser = argparse.ArgumentParser(description='CFDI Certificados')
|
|
||||||
|
|
||||||
help = 'Archivo CER'
|
|
||||||
parser.add_argument('-c', '--cer', help=help, default='')
|
|
||||||
help = 'Archivo KEY'
|
|
||||||
parser.add_argument('-k', '--key', help=help, default='')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
args = _process_command_line_arguments()
|
|
||||||
main(args)
|
|
||||||
|
|
|
@ -28,10 +28,11 @@ class SATWebService():
|
||||||
NS = {
|
NS = {
|
||||||
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
||||||
'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
|
'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
|
||||||
'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
|
'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd',
|
||||||
'des': 'http://DescargaMasivaTerceros.sat.gob.mx',
|
'des': 'http://DescargaMasivaTerceros.sat.gob.mx',
|
||||||
'xd': 'http://www.w3.org/2000/09/xmldsig#',
|
'xd': 'http://www.w3.org/2000/09/xmldsig#',
|
||||||
}
|
}
|
||||||
|
NS_RESULT = {'s': NS['s'], None: XMLNS}
|
||||||
|
|
||||||
def __init__(self, cert):
|
def __init__(self, cert):
|
||||||
self._cert = cert
|
self._cert = cert
|
||||||
|
@ -120,8 +121,7 @@ class SATWebService():
|
||||||
|
|
||||||
node_name = f"{{{self.NS['s']}}}Body"
|
node_name = f"{{{self.NS['s']}}}Body"
|
||||||
body = ET.SubElement(root, node_name)
|
body = ET.SubElement(root, node_name)
|
||||||
nsmap = {None: self.XMLNS}
|
ET.SubElement(body, 'Autentica', nsmap=self.NS_RESULT)
|
||||||
ET.SubElement(body, 'Autentica', nsmap=nsmap)
|
|
||||||
|
|
||||||
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
||||||
soap = ET.tostring(root)
|
soap = ET.tostring(root)
|
||||||
|
@ -146,10 +146,77 @@ class SATWebService():
|
||||||
return token
|
return token
|
||||||
|
|
||||||
def _get_data_req(self, args):
|
def _get_data_req(self, args):
|
||||||
|
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
||||||
|
FORMAT = '%Y-%m-%dT%H:%M:%S'
|
||||||
|
|
||||||
return
|
date_start = args['date_start']
|
||||||
|
date_end = args['date_end']
|
||||||
|
|
||||||
def _get_request(self, args):
|
node_name = f"{{{self.NS['s']}}}Envelope"
|
||||||
|
root = ET.Element(node_name, nsmap=NSMAP)
|
||||||
|
|
||||||
|
node_name = f"{{{self.NS['s']}}}Header"
|
||||||
|
body = ET.SubElement(root, node_name)
|
||||||
|
|
||||||
|
node_name = f"{{{self.NS['s']}}}Body"
|
||||||
|
body = ET.SubElement(root, node_name)
|
||||||
|
|
||||||
|
node_name = f"{{{self.NS['des']}}}SolicitaDescarga"
|
||||||
|
request_down = ET.SubElement(body, node_name)
|
||||||
|
|
||||||
|
node_name = f"{{{self.NS['des']}}}solicitud"
|
||||||
|
attr = {
|
||||||
|
'RfcSolicitante': self._cert.rfc,
|
||||||
|
'FechaFinal': date_end.strftime(FORMAT),
|
||||||
|
'FechaInicial': date_start.strftime(FORMAT),
|
||||||
|
'TipoSolicitud': 'cfdi',
|
||||||
|
}
|
||||||
|
request = ET.SubElement(request_down, node_name, attr)
|
||||||
|
# ~ if rfc_emisor is not None:
|
||||||
|
# ~ solicitud.set('RfcEmisor', rfc_emisor)
|
||||||
|
# ~ if rfc_receptor is not None:
|
||||||
|
# ~ solicitud.set('RfcReceptor', rfc_receptor)
|
||||||
|
|
||||||
|
nsmap = {None: self.NS['xd']}
|
||||||
|
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
|
||||||
|
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
|
||||||
|
|
||||||
|
node_name = 'CanonicalizationMethod'
|
||||||
|
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
|
||||||
|
canonicalization = ET.SubElement(signed_info, node_name, attr)
|
||||||
|
|
||||||
|
node_name = 'SignatureMethod'
|
||||||
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
||||||
|
signature_method = ET.SubElement(signed_info, node_name, attr)
|
||||||
|
|
||||||
|
attr = {'URI': '#_0'}
|
||||||
|
reference = ET.SubElement(signed_info, 'Reference', attr)
|
||||||
|
transforms = ET.SubElement(reference, 'Transforms')
|
||||||
|
ET.SubElement(transforms, 'Transform')
|
||||||
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
|
||||||
|
ET.SubElement(reference, 'DigestMethod', attr)
|
||||||
|
|
||||||
|
dvalue = ET.tostring(request_down, method='c14n', exclusive=1)
|
||||||
|
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
|
||||||
|
ET.SubElement(reference, 'DigestValue').text = dvalue
|
||||||
|
|
||||||
|
signature_value = ET.tostring(signed_info, method='c14n', exclusive=1)
|
||||||
|
signature_value = self._cert.sign_sha1(signature_value)
|
||||||
|
ET.SubElement(signature, 'SignatureValue').text = signature_value
|
||||||
|
|
||||||
|
key_info = ET.SubElement(signature, 'KeyInfo')
|
||||||
|
x_data = ET.SubElement(key_info, 'X509Data')
|
||||||
|
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
|
||||||
|
ET.SubElement(x_issuer_serial, 'X509IssuerName').text = self._cert.issuer
|
||||||
|
ET.SubElement(x_issuer_serial, 'X509SerialNumber').text = self._cert.serial_number
|
||||||
|
ET.SubElement(x_data, 'X509Certificate').text = self._cert.cer_txt
|
||||||
|
|
||||||
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
||||||
|
soap = ET.tostring(root)
|
||||||
|
|
||||||
|
return soap
|
||||||
|
|
||||||
|
def request_download(self, args):
|
||||||
headers = self.HEADERS.copy()
|
headers = self.HEADERS.copy()
|
||||||
headers['SOAPAction'] = self.ACTIONS['REQ']
|
headers['SOAPAction'] = self.ACTIONS['REQ']
|
||||||
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
||||||
|
@ -160,12 +227,9 @@ class SATWebService():
|
||||||
self._error = f'Status: {response.status_code} - {response.text}'
|
self._error = f'Status: {response.status_code} - {response.text}'
|
||||||
return
|
return
|
||||||
|
|
||||||
print(response.text)
|
|
||||||
result = ET.fromstring(response.text)
|
result = ET.fromstring(response.text)
|
||||||
|
node_name = 's:Body/SolicitaDescargaResponse/SolicitaDescargaResult'
|
||||||
|
node = result.find(node_name, namespaces=self.NS_RESULT)
|
||||||
|
data = dict(node.attrib)
|
||||||
|
|
||||||
return
|
return data
|
||||||
|
|
||||||
def download(self, args):
|
|
||||||
request = self._get_request(args)
|
|
||||||
print(request)
|
|
||||||
return
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
import getpass
|
import getpass
|
||||||
import uuid
|
import uuid
|
||||||
|
from calendar import monthrange
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
@ -138,6 +139,22 @@ def _validate_download_args(args):
|
||||||
log.error(msg)
|
log.error(msg)
|
||||||
return False, {}
|
return False, {}
|
||||||
|
|
||||||
|
now = today()
|
||||||
|
|
||||||
|
month1 = month2 = data['month']
|
||||||
|
if month1 == 0:
|
||||||
|
month1 = 1
|
||||||
|
month2 = 12
|
||||||
|
|
||||||
|
if data['day']:
|
||||||
|
day1 = day2 = data['day']
|
||||||
|
else:
|
||||||
|
day1 = 1
|
||||||
|
day2 = monthrange(data['year'], month2)[1]
|
||||||
|
|
||||||
|
data['date_start'] = datetime(data['year'], month1, day1)
|
||||||
|
data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59)
|
||||||
|
|
||||||
return True, data
|
return True, data
|
||||||
|
|
||||||
|
|
||||||
|
@ -148,10 +165,12 @@ def sat_download(args):
|
||||||
|
|
||||||
sat = SATWebService(data['cert'])
|
sat = SATWebService(data['cert'])
|
||||||
|
|
||||||
if sat.is_authenticate:
|
if not sat.is_authenticate:
|
||||||
sat.download(data)
|
|
||||||
else:
|
|
||||||
log.error(sat.error)
|
log.error(sat.error)
|
||||||
|
return
|
||||||
|
|
||||||
|
result = sat.request_download(data)
|
||||||
|
print(result)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue