Obtener autorización

This commit is contained in:
Mauricio Baeza 2021-07-14 00:12:21 -05:00
parent 7eb599d6eb
commit d8d9b8bab6
6 changed files with 653 additions and 0 deletions

3
.gitignore vendored
View File

@ -1,4 +1,7 @@
# ---> Python
conf.py
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

47
source/cfdi-descarga.py Executable file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
import argparse
from sat import util
def main(args):
if args.fiel_validar:
util.fiel_validar(args)
return
util.sat_download(args)
return
def _process_command_line_arguments():
now = util.today()
year = now.year
month = now.month
parser = argparse.ArgumentParser(description='CFDI Descarga SAT')
help = 'Valida la FIEL'
parser.add_argument('-fv', '--fiel-validar', help=help,
action='store_true', default=False, required=False)
help = 'Ruta al directorio con la FIEL'
parser.add_argument('-fd', '--fiel-dir', help=help, default='')
help = "Nombre de los archivos FIEL, el predeterminado es 'fiel'"
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
help = "Descargar por Tipo: t=todos(default), e=emitidas, r=recibidas"
parser.add_argument('-t', '--tipo', help=help, dest='type', default='t', choices=['t', 'e', 'r'])
help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1))
help = "Mes de la descarga, el mes actual es el predeterminado"
parser.add_argument('-m', '--mes', help=help, dest='month', default=month, type=int, choices=range(13))
help = "Día de la descarga, de forma predeterminada no se usa"
parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32))
args = parser.parse_args()
return args
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

261
source/sat/cfdi_cert.py Normal file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python3
import argparse
import base64
import datetime
import getpass
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from conf import TOKEN
class SATCertificate(object):
def __init__(self, cer=b'', key=b'', password=''):
self._error = ''
self._init_values()
self._get_data_cer(cer)
self._get_data_key(key, password)
def _init_values(self):
self._rfc = ''
self._serial_number = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key_enc = b''
self._p12 = b''
self._cer_modulus = 0
self._key_modulus = 0
return
def __str__(self):
msg = '\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number.encode())
digest.update(TOKEN.encode())
return digest.finalize()
def _get_data_cer(self, cer):
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_data_key(self, key, password):
self._key_enc = key
if not key or not password:
return
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
p = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(p)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def _get_key(self, password):
if not password:
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key_enc, password=password, backend=default_backend())
return private_key
def _get_key_pem(self):
obj = self._get_key('')
key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return key_pem
# Not work
def _get_p12(self):
obj = serialization.pkcs12.serialize_key_and_certificates('test',
self.key_pem, self.cer_pem, None,
encryption_algorithm=serialization.NoEncryption()
)
return obj
def sign(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(firma).decode()
def sign_sha1(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1())
return base64.b64encode(firma).decode()
def sign_xml(self, tree):
import xmlsec
node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
ctx.key = key
ctx.sign(node)
node = xmlsec.tree.find_node(tree, 'X509Certificate')
node.text = self.cer_txt
return tree
@property
def rfc(self):
return self._rfc
@property
def serial_number(self):
return self._serial_number
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._get_key_pem()
@property
def key_enc(self):
return self._key_enc
@property
def p12(self):
return self._get_p12()
@property
def error(self):
return self._error
def main(args):
contra = getpass.getpass('Introduce la contraseña del archivo KEY: ')
#contra = '12345678a'
if not contra.strip():
msg = 'La contraseña es requerida'
print(msg)
return
path_cer = Path(args.cer)
path_key = Path(args.key)
if not path_cer.is_file():
msg = 'El archivo CER es necesario'
print(msg)
return
if not path_key.is_file():
msg = 'El archivo KEY es necesario'
print(msg)
return
cer = path_cer.read_bytes()
key = path_key.read_bytes()
cert = SATCertificate(cer, key, contra)
if cert.error:
print(cert.error)
else:
print(cert)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(description='CFDI Certificados')
help = 'Archivo CER'
parser.add_argument('-c', '--cer', help=help, default='')
help = 'Archivo KEY'
parser.add_argument('-k', '--key', help=help, default='')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

171
source/sat/sat_web.py Normal file
View File

@ -0,0 +1,171 @@
#!/usr/bin/env python3
import base64
import hashlib
import uuid
from datetime import datetime, timedelta
import httpx
import lxml.etree as ET
class SATWebService():
BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx'
URL = {
'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc',
'REQ': f'{BASE}/SolicitaDescargaService.svc',
}
XMLNS = 'http://DescargaMasivaTerceros.gob.mx'
ACTIONS = {
'AUTH': f'{XMLNS}/IAutenticacion/Autentica',
'REQ': f'{XMLNS}/ISolicitaDescargaService/SolicitaDescarga',
}
HEADERS = {
'Content-type': 'text/xml;charset="utf-8"',
'Accept': 'text/xml',
'Cache-Control': 'no-cache',
}
NS = {
's': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
'des': 'http://DescargaMasivaTerceros.sat.gob.mx',
'xd': 'http://www.w3.org/2000/09/xmldsig#',
}
def __init__(self, cert):
self._cert = cert
self._error = ''
self._token = self._get_token()
@property
def is_authenticate(self):
return bool(self._token)
@property
def error(self):
return self._error
def _get_data_auth(self):
NSMAP = {'s': self.NS['s'], 'u': self.NS['u']}
FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
UID = str(uuid.uuid4())
now = datetime.utcnow()
date_created = now.strftime(FORMAT)
date_expires = (now + timedelta(seconds=300)).strftime(FORMAT)
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
header = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['o']}}}Security"
nsmap = {'o': self.NS['o']}
attr_name = f"{{{self.NS['s']}}}mustUnderstand"
attr = {attr_name: '1'}
security = ET.SubElement(header, node_name, attr, nsmap=nsmap)
node_name = f"{{{self.NS['u']}}}Timestamp"
attr_name = f"{{{self.NS['u']}}}Id"
attr = {attr_name: '_0'}
timestamp = ET.SubElement(security, node_name, attr)
node_name = f"{{{self.NS['u']}}}Created"
ET.SubElement(timestamp, node_name).text = date_created
node_name = f"{{{self.NS['u']}}}Expires"
ET.SubElement(timestamp, node_name).text = date_expires
node_name = f"{{{self.NS['o']}}}BinarySecurityToken"
attr = {
f"{{{self.NS['u']}}}Id": UID,
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
'EncodingType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary'
}
ET.SubElement(security, node_name, attr).text = self._cert.cer_txt
nsmap = {None: 'http://www.w3.org/2000/09/xmldsig#'}
signature = ET.SubElement(security, 'Signature', nsmap=nsmap)
signedinfo = ET.SubElement(signature, 'SignedInfo')
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
ET.SubElement(signedinfo, 'CanonicalizationMethod', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
ET.SubElement(signedinfo, 'SignatureMethod', attr)
attr = {'URI': '#_0'}
reference = ET.SubElement(signedinfo, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
dvalue = ET.tostring(timestamp, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
ET.SubElement(reference, 'DigestValue').text = dvalue
signature_value = ET.tostring(signedinfo, method='c14n', exclusive=1)
signature_value = self._cert.sign_sha1(signature_value)
ET.SubElement(signature, 'SignatureValue').text = signature_value
keyinfo = ET.SubElement(signature, 'KeyInfo')
node_name = f"{{{self.NS['o']}}}SecurityTokenReference"
security_token = ET.SubElement(keyinfo, node_name)
node_name = f"{{{self.NS['o']}}}Reference"
attr = {
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
'URI': f'#{UID}',
}
ET.SubElement(security_token, node_name, attr)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
nsmap = {None: self.XMLNS}
ET.SubElement(body, 'Autentica', nsmap=nsmap)
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def _get_token(self):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['AUTH']
data = self._get_data_auth()
response = httpx.post(self.URL['AUTH'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
nsmap = {'s': self.NS['s'], None: self.XMLNS}
node_name = 's:Body/AutenticaResponse/AutenticaResult'
token = result.find(node_name, namespaces=nsmap).text
return token
def _get_data_req(self, args):
return
def _get_request(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['REQ']
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_req(args)
response = httpx.post(self.URL['REQ'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
print(response.text)
result = ET.fromstring(response.text)
return
def download(self, args):
request = self._get_request(args)
print(request)
return

157
source/sat/util.py Normal file
View File

@ -0,0 +1,157 @@
#!/usr/bin/env python3
import getpass
import uuid
from datetime import datetime
from pathlib import Path
from .cfdi_cert import SATCertificate
from .sat_web import SATWebService
from settings import log
def today():
return datetime.today()
def validate_date(year, month, day):
try:
datetime(year, month, day, 0, 0, 0)
result = True
except ValueError:
result = False
return result
def is_dir(path):
return Path(path).is_dir()
def join(*paths):
return Path(paths[0]).joinpath(*paths[1:])
def _validate_fiel_args(args):
fiel_path = args.fiel_dir
fiel_name = args.fiel_nombre
if not fiel_path:
msg = 'El directorio con la FIEL es requerido'
log.error(msg)
return False, {}
if not is_dir(fiel_path):
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
log.error(msg)
return False, {}
path_cer = join(fiel_path, f'{fiel_name}.cer')
path_key = join(fiel_path, f'{fiel_name}.key')
path_enc = join(fiel_path, f'{fiel_name}.enc')
if not path_cer.is_file():
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
log.error(msg)
return False, {}
if not path_key.is_file():
msg = f'No se encontró el archivo KEY. \nRuta: {path_cer}'
log.error(msg)
return False, {}
data = {
'path_cer': path_cer,
'path_key': path_key,
'path_enc': path_enc,
}
return True, data
def fiel_validar(args):
result, data = _validate_fiel_args(args)
if not result:
return
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
if not password:
msg = 'La contraseña es requerida para validar la FIEL'
log.error(msg)
return
cer = data['path_cer'].read_bytes()
key = data['path_key'].read_bytes()
cert = SATCertificate(cer, key, password)
if cert.error:
msg = f'{cert.error}\n\nNo podrás conectarte el SAT.'
log.error(msg)
return
if not cert.is_fiel:
msg = 'El certificado no es FIEL'
log.error(msg)
return
data['path_enc'].write_bytes(cert.key_enc)
msg = 'Los datos del certificado son:'
log.info(msg)
log.info(f'\n{cert}')
msg = 'Ya puedes descargar del SAT'
log.info(msg)
return
def base_datos():
db.create_tables()
return
def _validate_download_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
data['cert'] = cert
data['type'] = args.type
data['year'] = args.year
data['month'] = args.month
data['day'] = args.day
if data['day']:
if not validate_date(data['year'], data['month'], data['day']):
msg = 'Fecha inválida'
log.error(msg)
return False, {}
return True, data
def sat_download(args):
result, data = _validate_download_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if sat.is_authenticate:
sat.download(data)
else:
log.error(sat.error)
return

14
source/settings.py Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
import logging
from conf import DEBUG
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)