Refactory stamp and cancel

This commit is contained in:
Mauricio Baeza 2021-01-05 17:32:38 -06:00
commit 21fd903943
38 changed files with 2013 additions and 2375 deletions

View File

@ -1,3 +1,32 @@
v 1.40.0 [05-ene-2021]
----------------------
- Error: Al parsear XML en Python 3.9+
- Mejora: Agregar versión de Empresa Libre a plantilla.
- Mejora: Sellado en memoria
- Mejora: Se agrega un segundo PAC y se refactoriza el timbrado.
* **IMPORTANTE**
Es necesario seguir una serie de pasos **obligatorios** para migrar a esta
versión, **no continues hasta seguir paso a paso** estas instrucciones.
**Antes** de comenzar ten a la mano tus certificados de sello para timbrar, es
necesario subirlos de nuevo. **NO actualices si no tienes tus certificados**
con su respectiva contraseña, te quedarás sin poder timbrar.
1. Entra a la parte administrativa y toma de tus credenciales de timbrado en el
menú "Emisor" ficha "Otros Datos", usuario y token de timbrado.
1. Agregar nuevo requerimiento `pip install xmlsec`
1. Actualizar `git pull origin master`
1. Entrar a `source/app/controllers/pacs` y copiar `conf.py.example` a `conf.py`
1. Reiniciar el servicio: `sudo systemctl restart empresalibre`
1. Sube de nuevo tus certificados en el menú "Emisor" ficha "Certificado".
1. Ve al menú "Opciones", ficha "Otros".
1. Selecciona tu PAC, si tu usuario es un correo electrónico, invariablemente
debes seleccionar Finkok.
1. Establece las credenciales del punto 1.
1. Guarda los datos.
v 1.39.1 [17-sep-2020]
----------------------
- Error: Esquema para complemento IEDU

View File

@ -10,16 +10,17 @@ Este proyecto está en continuo desarrollo, contratar un esquema de soporte,
nos ayuda a continuar su desarrollo. Ponte en contacto con nosotros para
contratar: administracion ARROBA empresalibre.net
#### Ahora también puede aportar con Bitcoin Cash (BCH):
#### Ahora también puede aportar con criptomonedas:
`pq763fj7kxxf2wtf360lfsy5ydw84yz72q76hanhxq`
BCH: `qztd3l00xle5tffdqvh2snvadkuau2ml0uqm4n875d`
BTC: `3FhiXcXmAesmQzrNEngjHFnvaJRhU1AGWV`
### Requerimientos:
* Servidor web, recomendado Nginx
* uwsgi
* python3.6+
* python3.7+
* xsltproc
* openssl
* xmlsec

View File

@ -1 +1 @@
1.39.1
1.40.0

View File

@ -13,3 +13,10 @@ pypng
reportlab
psycopg2-binary
cryptography
xmlsec
# escpos
# pyusb
# pyserial
# qrcode

View File

@ -139,8 +139,9 @@ class CFDI(object):
return self._to_pretty_xml(ET.tostring(self._cfdi, encoding='utf-8'))
def add_sello(self, sello):
def add_sello(self, sello, cert_txt):
self._cfdi.attrib['Sello'] = sello
self._cfdi.attrib['Certificado'] = cert_txt
return self._to_pretty_xml(ET.tostring(self._cfdi, encoding='utf-8'))
def _to_pretty_xml(self, source):

View File

@ -1,13 +0,0 @@
#!/usr/bin/env python3
DEBUG = False
#~ Ecodex
ID_INTEGRADOR = ''
#~ Finkok
FINKOK= {
'USER': '',
'PASS': '',
}

View File

@ -1,90 +0,0 @@
#!/usr/bin/env python3
from .conf import DEBUG, ID_INTEGRADOR, FINKOK
DEBUG = DEBUG
TIMEOUT = 10
#~ PACs que han proporcionado un entorno de pruebas libre y abierto
#~ ecodex, finkok
PAC = 'finkok'
def ecodex(debug):
NEW_SERVER = True
auth = {'ID': ID_INTEGRADOR}
if debug:
#~ No cambies este ID de pruebas
auth = {'ID': '2b3a8764-d586-4543-9b7e-82834443f219'}
base_url = 'https://servicios.ecodex.com.mx:4043/Servicio{}.svc?wsdl'
if NEW_SERVER:
base_url = 'https://serviciosnominas.ecodex.com.mx:4043/Servicio{}.svc?wsdl'
base_api = 'https://api.ecodex.com.mx/{}'
if debug:
base_url = 'https://wsdev.ecodex.com.mx:2045/Servicio{}.svc?wsdl'
base_api = 'https://pruebasapi.ecodex.com.mx/{}'
url = {
'seguridad': base_url.format('Seguridad'),
'clients': base_url.format('Clientes'),
'timbra': base_url.format('Timbrado'),
'token': base_api.format('token?version=2'),
'docs': base_api.format('api/documentos'),
'hash': base_api.format('api/Documentos/{}'),
'codes': {
'HASH': 'DUPLICIDAD EN HASH',
}
}
return auth, url
#~ IMPORTANTE: Si quieres hacer pruebas, con tu propio correo de usuario y
#~ contraseña, ponte en contacto con Finkok para que te asignen tus datos de
#~ acceso, consulta su documentación para ver las diferentes opciones de acceso.
#~ Si solo estas haciendo pruebas de timbrado y ancelación, con estos datos debería
#~ ser suficiente.
def finkok(debug):
USER = FINKOK['USER']
PASS = FINKOK['PASS']
TOKEN = ''
auth = {
'DEBUG': debug,
'USER': '',
'PASS': TOKEN or PASS,
'RESELLER': {'USER': USER, 'PASS': PASS}
}
if debug:
USER = 'pruebas-finkok@correolibre.net'
PASS = ''
TOKEN = '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366'
auth = {
'DEBUG': debug,
'USER': USER,
'PASS': TOKEN or PASS,
'RESELLER': {
'USER': '',
'PASS': ''
}
}
base_url = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if debug:
base_url = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
url = {
'timbra': base_url.format('stamp'),
'quick_stamp': False,
'cancel': base_url.format('cancel'),
'client': base_url.format('registration'),
'util': base_url.format('utilities'),
'codes': {
'200': 'Comprobante timbrado satisfactoriamente',
'307': 'Comprobante timbrado previamente',
'205': 'No Encontrado',
}
}
return auth, url
AUTH, URL = globals()[PAC](DEBUG)

View File

@ -632,3 +632,19 @@ class AppSociosCuentasBanco(object):
req.context['result'] = self._db.partners_accounts_bank(values)
resp.status = falcon.HTTP_200
class AppCert(object):
def __init__(self, db):
self._db = db
def on_get(self, req, resp):
values = req.params
req.context['result'] = self._db.cert_get(values)
resp.status = falcon.HTTP_200
def on_post(self, req, resp):
values = req.params
req.context['result'] = self._db.cert_post(values)
resp.status = falcon.HTTP_200

View File

@ -1,755 +0,0 @@
#!/usr/bin/env python3
#~ import re
#~ from xml.etree import ElementTree as ET
#~ from requests import Request, Session, exceptions
import datetime
import hashlib
import os
import requests
import time
from lxml import etree
from xml.dom.minidom import parseString
from xml.sax.saxutils import escape, unescape
from uuid import UUID
from logbook import Logger
from zeep import Client
from zeep.plugins import HistoryPlugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
if __name__ == '__main__':
from configpac import DEBUG, TIMEOUT, AUTH, URL
else:
from .configpac import DEBUG, TIMEOUT, AUTH, URL
log = Logger('PAC')
#~ node = client.create_message(client.service, SERVICE, **args)
#~ print(etree.tostring(node, pretty_print=True).decode())
class Ecodex(object):
def __init__(self, auth, url):
self.auth = auth
self.url = url
self.codes = self.url['codes']
self.error = ''
self.message = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = None
self._history = None
if DEBUG:
self._history = HistoryPlugin()
self._plugins = [self._history]
def _get_token(self, rfc):
client = Client(self.url['seguridad'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.ObtenerToken(rfc, self._get_epoch())
except Fault as e:
self.error = str(e)
log.error(self.error)
return ''
s = '{}|{}'.format(self.auth['ID'], result.Token)
return hashlib.sha1(s.encode()).hexdigest()
def _get_token_rest(self, rfc):
data = {
'rfc': rfc,
'grant_type': 'authorization_token',
}
headers = {'Content-type': 'application/x-www-form-urlencoded'}
result = requests.post(URL['token'], data=data, headers=headers)
data = result.json()
s = '{}|{}'.format(AUTH['ID'], data['service_token'])
return hashlib.sha1(s.encode()).hexdigest(), data['access_token']
def _validate_xml(self, xml):
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
if os.path.isfile(xml):
tree = etree.parse(xml).getroot()
else:
tree = etree.fromstring(xml.encode())
fecha = tree.get('Fecha')
rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
data = {
'ComprobanteXML': etree.tostring(tree).decode(),
'RFC': rfc,
'Token': self._get_token(rfc),
'TransaccionID': self._get_epoch(fecha),
}
return data
def _get_by_hash(self, sh, rfc):
token, access_token = self._get_token_rest(rfc)
url = URL['hash'].format(sh)
headers = {
'Authorization': 'Bearer {}'.format(access_token),
'X-Auth-Token': token,
}
result = requests.get(url, headers=headers)
if result.status_code == 200:
print (result.json())
return
def timbra_xml(self, xml):
data = self._validate_xml(xml)
client = Client(self.url['timbra'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.TimbraXML(**data)
except Fault as e:
error = str(e)
if self.codes['HASH'] in error:
sh = error.split(' ')[3]
return self._get_by_hash(sh[:40], data['RFC'])
self.error = error
return ''
tree = parseString(result.ComprobanteXML.DatosXML)
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
return xml
def _get_epoch(self, date=None):
if isinstance(date, str):
f = '%Y-%m-%dT%H:%M:%S'
e = int(time.mktime(time.strptime(date, f)))
else:
date = datetime.datetime.now()
e = int(time.mktime(date.timetuple()))
return e
def estatus_cuenta(self, rfc):
#~ Codigos:
#~ 100 = Cuenta encontrada
#~ 101 = RFC no dado de alta en el sistema ECODEX
token = self._get_token(rfc)
if not token:
return {}
data = {
'RFC': rfc,
'Token': token,
'TransaccionID': self._get_epoch()
}
client = Client(URL['clients'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.EstatusCuenta(**data)
except Fault as e:
log.error(str(e))
return
#~ print (result)
return result.Estatus
class Finkok(object):
def __init__(self, auth={}):
self.codes = URL['codes']
self.error = ''
self.message = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = None
self._history = None
self.uuid = ''
self.fecha = None
if DEBUG:
self._history = HistoryPlugin()
self._plugins = [self._history]
self._auth = AUTH
else:
self._auth = auth
def _debug(self):
if not DEBUG:
return
print('SEND', self._history.last_sent)
print('RESULT', self._history.last_received)
return
def _check_result(self, method, result):
# ~ print ('CODE', result.CodEstatus)
# ~ print ('INCIDENCIAS', result.Incidencias)
self.message = ''
MSG = {
'OK': 'Comprobante timbrado satisfactoriamente',
'307': 'Comprobante timbrado previamente',
}
status = result.CodEstatus
if status is None and result.Incidencias:
for i in result.Incidencias['Incidencia']:
self.error += 'Error: {}\n{}\n{}'.format(
i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
return ''
if method == 'timbra' and status in (MSG['OK'], MSG['307']):
#~ print ('UUID', result.UUID)
#~ print ('FECHA', result.Fecha)
if status == MSG['307']:
self.message = MSG['307']
tree = parseString(result.xml)
response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
self.uuid = result.UUID
self.fecha = result.Fecha
return response
def _load_file(self, path):
try:
with open(path, 'rb') as f:
data = f.read()
except Exception as e:
self.error = str(e)
return
return data
def _validate_xml(self, file_xml):
if os.path.isfile(file_xml):
try:
with open(file_xml, 'rb') as f:
xml = f.read()
except Exception as e:
self.error = str(e)
return False, ''
else:
xml = file_xml.encode('utf-8')
return True, xml
def _validate_uuid(self, uuid):
try:
UUID(uuid)
return True
except ValueError:
self.error = 'UUID no válido: {}'.format(uuid)
return False
def timbra_xml(self, file_xml):
self.error = ''
if not DEBUG and not self._auth:
self.error = 'Sin datos para timbrar'
return
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
}
if URL['quick_stamp']:
try:
result = client.service.quick_stamp(**args)
except Fault as e:
self.error = str(e)
return
else:
try:
result = client.service.stamp(**args)
except Fault as e:
self.error = str(e)
return
except TransportError as e:
if '413' in str(e):
self.error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self.error = str(e)
return
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return
return self._check_result(method, result)
def _get_xml(self, uuid):
if not self._validate_uuid(uuid):
return ''
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': uuid,
'taxpayer_id': self.rfc,
'invoice_type': 'I',
}
try:
result = client.service.get_xml(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return ''
tree = parseString(result.xml)
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
return xml
def recupera_xml(self, file_xml='', uuid=''):
self.error = ''
if uuid:
return self._get_xml(uuid)
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.stamped(
xml, self._auth['user'], self._auth['pass'])
except Fault as e:
self.error = str(e)
return ''
return self._check_result(method, result)
def estatus_xml(self, uuid):
method = 'timbra'
if not self._validate_uuid(uuid):
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.query_pending(
self._auth['USER'], self._auth['PASS'], uuid)
return result.status
except Fault as e:
self.error = str(e)
return ''
def cancel_xml(self, rfc, uuid, cer, key):
# ~ for u in uuids:
# ~ if not self._validate_uuid(u):
# ~ return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=uuid)),
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'cer': cer,
'key': key,
'store_pending': False,
}
try:
result = client.service.cancel(**args)
except Fault as e:
self.error = str(e)
return ''
if result.CodEstatus and self.codes['205'] in result.CodEstatus:
self.error = result.CodEstatus
return ''
return result
def cancel_signature(self, file_xml):
method = 'cancel'
if os.path.isfile(file_xml):
root = etree.parse(file_xml).getroot()
else:
root = etree.fromstring(file_xml.encode())
xml = etree.tostring(root)
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
'store_pending': False,
}
try:
result = client.service.cancel_signature(**args)
return result
except Fault as e:
self.error = str(e)
return ''
def get_acuse(self, rfc, uuids, type_acuse='C'):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'uuid': '',
'type': type_acuse,
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.get_receipt(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def estatus_cancel(self, uuids):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': '',
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.query_pending_cancellation(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def add_token(self, rfc, email):
"""Agrega un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
try:
result = client.service.add_token(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_date(self):
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
except Fault as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return
return result.datetime
def add_client(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {False: 'P', True: 'O'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
try:
result = client.service.add(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def edit_client(self, rfc, status=True):
"""
Se requiere cuenta de reseller para usar este método
status = 'A' or 'S'
"""
auth = AUTH['RESELLER']
sv = {False: 'S', True: 'A'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'status': sv[status],
}
try:
result = client.service.edit(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_client(self, rfc):
"""Regresa el estatus del cliente
.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor
Returns:
dict
'message': None,
'users': {
'ResellerUser': [
{
'status': 'A',
'counter': 0,
'taxpayer_id': '',
'credit': 0
}
]
} or None si no existe
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
}
try:
result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
return result
def assign_client(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'credit': credit,
}
try:
result = client.service.assign(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def client_get_timbres(self, rfc):
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': self._auth['USER'],
'reseller_password': self._auth['PASS'],
'taxpayer_id': rfc,
}
try:
self.result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return 0
except TransportError as e:
self.error = str(e)
return 0
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return 0
success = bool(self.result.users)
if not success:
self.error = self.result.message or 'RFC no existe'
return 0
return self.result.users.ResellerUser[0].credit
def _get_data_sat(path):
BF = 'string(//*[local-name()="{}"]/@{})'
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
try:
if os.path.isfile(path):
tree = etree.parse(path).getroot()
else:
tree = etree.fromstring(path.encode())
data = {}
emisor = escape(
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
)
receptor = escape(
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
)
data['total'] = tree.get('total') or tree.get('Total')
data['emisor'] = emisor
data['receptor'] = receptor
data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
except Exception as e:
print (e)
return {}
return '?re={emisor}&amp;rr={receptor}&amp;tt={total}&amp;id={uuid}'.format(**data)
def get_status_sat(xml):
data = _get_data_sat(xml)
if not data:
return 'XML inválido'
data = """<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<Consulta xmlns="http://tempuri.org/">
<expresionImpresa>
{}
</expresionImpresa>
</Consulta>
</soap:Body>
</soap:Envelope>""".format(data)
headers = {
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
'Content-type': 'text/xml; charset="UTF-8"'
}
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
try:
result = requests.post(URL, data=data, headers=headers)
tree = etree.fromstring(result.text)
node = tree.xpath("//*[local-name() = 'Estado']")[0]
except Exception as e:
return 'Error: {}'.format(str(e))
return node.text
def main():
return
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
from .comerciodigital import PACComercioDigital
from .finkok import PACFinkok

View File

@ -0,0 +1,261 @@
#!/usr/bin/env python3
import argparse
import base64
import datetime
import getpass
from pathlib import Path
import xmlsec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from .conf import TOKEN
class SATCertificate(object):
def __init__(self, cer=b'', key=b'', password=''):
self._error = ''
self._init_values()
self._get_data_cer(cer)
self._get_data_key(key, password)
def _init_values(self):
self._rfc = ''
self._serial_number = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer = b''
self._cer_pem = ''
self._cer_txt = ''
self._key_enc = b''
self._p12 = b''
self._cer_modulus = 0
self._key_modulus = 0
return
def __str__(self):
msg = '\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number.encode())
digest.update(TOKEN.encode())
return digest.finalize()
def _get_data_cer(self, cer):
self._cer = cer
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_data_key(self, key, password):
self._key_enc = key
if not key or not password:
return
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
p = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(p)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def _get_key(self, password):
if not password:
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key_enc, password=password, backend=default_backend())
return private_key
def _get_key_pem(self):
obj = self._get_key('')
key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return key_pem
# Not work
def _get_p12(self):
obj = serialization.pkcs12.serialize_key_and_certificates('test',
self.key_pem, self.cer_pem, None,
encryption_algorithm=serialization.NoEncryption()
)
return obj
def sign(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(firma).decode()
def sign_xml(self, tree):
node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
ctx.key = key
ctx.sign(node)
node = xmlsec.tree.find_node(tree, 'X509Certificate')
node.text = self.cer_txt
return tree
@property
def rfc(self):
return self._rfc
@property
def serial_number(self):
return self._serial_number
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer(self):
return self._cer
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._get_key_pem()
@property
def key_enc(self):
return self._key_enc
@property
def p12(self):
return self._get_p12()
@property
def error(self):
return self._error
def main(args):
# ~ contra = getpass.getpass('Introduce la contraseña del archivo KEY: ')
contra = '12345678a'
if not contra.strip():
msg = 'La contraseña es requerida'
print(msg)
return
path_cer = Path(args.cer)
path_key = Path(args.key)
if not path_cer.is_file():
msg = 'El archivo CER es necesario'
print(msg)
return
if not path_key.is_file():
msg = 'El archivo KEY es necesario'
print(msg)
return
cer = path_cer.read_bytes()
key = path_key.read_bytes()
cert = SATCertificate(cer, key, contra)
if cert.error:
print(cert.error)
else:
print(cert)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(description='CFDI Certificados')
help = 'Archivo CER'
parser.add_argument('-c', '--cer', help=help, default='')
help = 'Archivo KEY'
parser.add_argument('-k', '--key', help=help, default='')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

View File

@ -23,6 +23,8 @@ import lxml.etree as ET
import requests
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
@ -31,16 +33,10 @@ logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
try:
from .conf import DEBUG, AUTH
except ImportError:
DEBUG = False
log.debug('Need make conf.py')
TIMEOUT = 10
@ -51,6 +47,7 @@ class PACComercioDigital(object):
'timbra': ws.format('ws', 'timbre/timbrarV5.aspx'),
'cancel': ws.format('cancela', 'cancela3/cancelarUuid'),
'cancelxml': ws.format('cancela', 'cancela3/cancelarXml'),
'status': ws.format('cancela', 'arws/consultaEstatus'),
'client': api.format('x3/altaEmpresa'),
'saldo': api.format('x3/saldo'),
'timbres': api.format('x3/altaTimbres'),
@ -59,6 +56,7 @@ class PACComercioDigital(object):
'000': '000 Exitoso',
'004': '004 RFC {} ya esta dado de alta con Estatus=A',
'704': '704 Usuario Invalido',
'702': '702 Error rfc/empresa invalido',
}
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
@ -67,10 +65,12 @@ class PACComercioDigital(object):
if DEBUG:
ws = 'https://pruebas.comercio-digital.mx/{}'
ws6 = 'https://pruebas6.comercio-digital.mx/arws/{}'
URL = {
'timbra': ws.format('timbre/timbrarV5.aspx'),
'cancel': ws.format('cancela3/cancelarUuid'),
'cancelxml': ws.format('cancela3/cancelarXml'),
'status': ws6.format('consultaEstatus'),
'client': api.format('x3/altaEmpresa'),
'saldo': api.format('x3/saldo'),
'timbres': api.format('x3/altaTimbres'),
@ -78,8 +78,8 @@ class PACComercioDigital(object):
def __init__(self):
self.error = ''
self.cfdi_uuid = ''
self.date_stamped = ''
# ~ self.cfdi_uuid = ''
# ~ self.date_stamped = ''
def _error(self, msg):
self.error = str(msg)
@ -133,21 +133,26 @@ class PACComercioDigital(object):
xml = result.content
tree = ET.fromstring(xml)
self.cfdi_uuid = tree.xpath(
cfdi_uuid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=self.NS_CFDI)
self.date_stamped = tree.xpath(
date_stamped = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@FechaTimbrado)',
namespaces=self.NS_CFDI)
return xml.decode()
data = {
'xml': xml.decode(),
'uuid': cfdi_uuid,
'date': date_stamped,
}
return data
def _get_data_cancel(self, cfdi, info, auth):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi)
tree = ET.fromstring(cfdi.encode())
tipo = tree.xpath(
'string(//cfdi:Comprobante/@TipoDeComprobante)',
namespaces=NS_CFDI)
@ -197,15 +202,15 @@ class PACComercioDigital(object):
self._error(result.headers['errmsg'])
return ''
return result.content
return result.text
def _get_headers_cancel_xml(self, cfdi, info, auth):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi)
tipo = tree.xpath(
tree = ET.fromstring(cfdi.encode())
tipocfdi = tree.xpath(
'string(//cfdi:Comprobante/@TipoDeComprobante)',
namespaces=NS_CFDI)
total = tree.xpath(
@ -220,15 +225,16 @@ class PACComercioDigital(object):
'pwdws': auth['pass'],
'rfcr': rfc_receptor,
'total': total,
'tipocfdi': tipo,
'tipocfdi': tipocfdi,
}
headers.update(info)
return headers
def cancel_xml(self, cfdi, xml, info, auth={}):
if not auth:
def cancel_xml(self, xml, auth={}, cfdi='', info={'tipo': 'cfdi3.3'}):
if DEBUG or not auth:
auth = AUTH
url = self.URL['cancelxml']
headers = self._get_headers_cancel_xml(cfdi, info, auth)
result = self._post(url, xml, headers)
@ -243,7 +249,39 @@ class PACComercioDigital(object):
self._error(result.headers['errmsg'])
return ''
return result.content
tree = ET.fromstring(result.text)
date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19]
data = {
'acuse': result.text,
'date': date_cancel,
}
return data
def status(self, data, auth={}):
if not auth:
auth = AUTH
url = self.URL['status']
data = (
f"USER={auth['user']}",
f"PWDW={auth['pass']}",
f"RFCR={data['rfc_receptor']}",
f"RFCE={data['rfc_emisor']}",
f"TOTAL={data['total']}",
f"UUID={data['uuid']}",
)
data = '\n'.join(data)
result = self._post(url, data)
if result is None:
return ''
if result.status_code != 200:
self._error(result.status_code)
return self.error
return result.text
def _get_data_client(self, auth, values):
data = [f"usr_ws={auth['user']}", f"pwd_ws={auth['pass']}"]
@ -299,6 +337,7 @@ class PACComercioDigital(object):
'Host': host,
'Connection' : 'Keep-Alive',
}
data = {'usr': data['rfc'], 'pwd': data['password']}
try:
result = requests.get(url, params=data, headers=headers, timeout=TIMEOUT)
except ConnectionError as e:
@ -312,6 +351,10 @@ class PACComercioDigital(object):
self._error(result.text)
return ''
if result.text == self.CODES['702']:
self._error(result.text)
return ''
return result.text
def client_add_timbres(self, data, auth={}):

View File

@ -17,14 +17,11 @@
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
# ~ Siempre consulta la documentación de Finkok
# ~ AUTH = Puedes usar credenciales genericas para timbrar, o exclusivas para
# ~ cada emisor
# ~ RESELLER = Algunos procesos como agregar emisores, solo pueden ser usadas
# ~ con una cuenta de reseller
# ~ Siempre consulta la documentación de PAC
# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
# ~ NO cambies las credenciales de prueba
DEBUG = False
DEBUG = True
AUTH = {

View File

@ -0,0 +1,6 @@
#!/usr/bin/env python3
DEBUG = False
TOKEN = ''

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python3
from .finkok import PACFinkok

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
# ~ Siempre consulta la documentación de PAC
# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
# ~ NO cambies las credenciales de prueba
DEBUG = True
AUTH = {
'user': '',
'pass': '',
'RESELLER': {
'user': '',
'pass': ''
}
}
if DEBUG:
AUTH = {
'user': 'pruebas-finkok@correolibre.net',
'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366',
'RESELLER': {
'user': '',
'pass': ''
}
}

View File

@ -0,0 +1,559 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import logging
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
logging.getLogger('zeep').setLevel(logging.ERROR)
TIMEOUT = 10
DEBUG_SOAP = False
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if DEBUG:
WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
URL = {
'quick_stamp': False,
'timbra': WS.format('stamp'),
'cancel': WS.format('cancel'),
'client': WS.format('registration'),
'util': WS.format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
@property
def error(self):
return self._error
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce is None:
return result
if ce == self.CODE['IP']:
self._error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self._error = 'UUID ' + self.CODE['NE']
return {}
if ce == 'UUID Not Found':
self._error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
log.error('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self._error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self._error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self._error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self._error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self._error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self._error = msg
return {}
return self._validate_result(result)
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
method = 'timbra'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'xml': cfdi.encode('utf-8'),
}
result = self._get_result(client, 'stamp', args)
if self.error:
log.error(self.error)
return ''
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'date': result.Fecha,
}
return data
def _get_data_cancel(self, cfdi):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi.encode())
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
cfdi_uuid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=NS_CFDI)
return rfc_emisor, cfdi_uuid
def cancel(self, cfdi, info, auth={}):
if DEBUG or not auth:
auth = AUTH
rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi)
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)),
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc_emisor,
'cer': info['cer'],
'key': info['key'],
'store_pending': False,
}
result = self._get_result(client, 'cancel', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status != '201':
log.debug(f'Cancel status: {status} - {cfdi_uuid}')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def cancel_xml(self, xml, auth={}, cfdi=''):
if DEBUG or not auth:
auth = AUTH
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel')
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n{xml}'
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="true"?>\n{xml}'
args = {
'xml': xml.encode(),
'username': auth['user'],
'password': auth['pass'],
'store_pending': False,
}
result = self._get_result(client, 'cancel_signature', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status == '708':
self._error = 'Error 708 del SAT, intenta más tarde.'
log.error(self.error)
return ''
if status != '201':
log.debug(f'Cancel status: {status} -')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def client_add(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool):
False == 'P' == Prepago
True == 'O' == On demand
Returns:
True or False
origin PAC
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
result = self._get_result(client, 'add', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
# ~ PAC success debería ser False
msg = 'Account Already exists'
if result.message == msg:
self.error = msg
return True
return result.success
def client_get_token(self, rfc, email):
"""Genera un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
token (str): Es la contraseña para timbrar
origin PAC
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
result = self._get_result(client, 'add_token', args)
if self.error:
log.error(self.error)
return ''
if not result.success:
self.error = result.message
log.error(self.error)
return ''
return result.token
def client_add_timbres(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}.'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'credit': credit,
}
result = self._get_result(client, 'assign', args)
if self.error:
log.error(error)
return ''
if not result.success:
self.error = result.message
return 0
return result.credit
def client_balance(self, auth={}, rfc=''):
"""Regresa los timbres restantes del cliente
Se pueden usar las credenciales de relleser o las credenciales del emisor
Args:
rfc (str): El RFC del emisor
Kwargs:
auth (dict): Credenciales del emisor
Returns:
int Cantidad de timbres restantes
"""
if not auth:
auth = AUTH['RESELLER']
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
}
result = self._get_result(client, 'get', args)
if self.error:
log.error(self.error)
return ''
success = bool(result.users)
if not success:
self.error = result.message or 'RFC no existe'
return 0
return result.users.ResellerUser[0].credit
def client_set_status(self, rfc, status):
"""Edita el estatus (Activo o Suspendido) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'A' == Activo
False == 'S' == Suspendido
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
ts = {True: 'A', False: 'S'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'status': ts[status],
}
result = self._get_result(client, 'edit', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_switch(self, rfc, type_user):
"""Edita el tipo de timbrado (OnDemand o Prepago) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'O' == OnDemand
False == 'P' == Prepago
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
}
result = self._get_result(client, 'switch', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'):
"""Obtiene un reporte del total de facturas timbradas
"""
auth = AUTH['RESELLER']
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'date_from': date_from,
'date_to': date_to,
'invoice_type': invoice_type,
}
method = 'util'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
result = self._get_result(client, 'report_total', args)
if result.result is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return 0
total = result.result.ReportTotal[0].total
return total

View File

@ -68,11 +68,13 @@ from settings import DEBUG, MV, log, template_lookup, COMPANIES, DB_SAT, \
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PATH_MEDIA, PRE, \
PATH_XMLSEC, TEMPLATE_CANCEL, DEFAULT_SAT_PRODUCTO, DECIMALES, DIR_FACTURAS
from settings import SEAFILE_SERVER, USAR_TOKEN, API, DECIMALES_TAX
from .configpac import AUTH
from settings import USAR_TOKEN, API, DECIMALES_TAX
# ~ from .configpac import AUTH
# ~ v2
from .pacs.cfdi_cert import SATCertificate
from settings import (
EXT,
MXN,
@ -395,190 +397,34 @@ def to_slug(string):
return value.replace(' ', '_')
class Certificado(object):
# ~ def make_xml(data, certificado):
# ~ from .cfdi_xml import CFDI
def __init__(self, paths):
self._path_key = paths['path_key']
self._path_cer = paths['path_cer']
self._modulus = ''
self.error = ''
# ~ cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
# ~ if DEBUG:
# ~ data['emisor']['Rfc'] = certificado.rfc
# ~ data['emisor']['RegimenFiscal'] = '603'
def _kill(self, path):
try:
os.remove(path)
except:
pass
return
# ~ cfdi = CFDI()
# ~ xml = cfdi.get_xml(data)
def _get_info_cer(self, session_rfc):
data = {}
args = 'openssl x509 -inform DER -in {}'
try:
cer_pem = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en PEM'
return data
# ~ data = {
# ~ 'xsltproc': PATH_XSLTPROC,
# ~ 'xslt': _join(PATH_XSLT, 'cadena.xslt'),
# ~ 'xml': save_temp(xml, 'w'),
# ~ 'openssl': PATH_OPENSSL,
# ~ 'key': save_temp(certificado.key_enc, 'w'),
# ~ 'pass': token,
# ~ }
# ~ args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
# ~ '"{openssl}" dgst -sha256 -sign "{key}" -passin pass:"{pass}" | ' \
# ~ '"{openssl}" enc -base64 -A'.format(**data)
# ~ sello = _call(args)
args = 'openssl enc -base64 -in {}'
try:
cer_txt = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en TXT'
return data
# ~ _kill(data['xml'])
# ~ _kill(data['key'])
args = 'openssl x509 -inform DER -in {} -noout -{}'
try:
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
except Exception as e:
self.error = 'No se puede saber si es FIEL'
return data
if result == 'SSL server : No':
self.error = 'El certificado es FIEL'
return data
result = _call(args.format(self._path_cer, 'serial'))
serie = result.split('=')[1].split('\n')[0][1::2]
result = _call(args.format(self._path_cer, 'subject'))
#~ Verificar si es por la version de OpenSSL
t1 = 'x500UniqueIdentifier = '
t2 = 'x500UniqueIdentifier='
if t1 in result:
rfc = result.split(t1)[1][:13].strip()
elif t2 in result:
rfc = result.split(t2)[1][:13].strip()
else:
self.error = 'No se pudo obtener el RFC del certificado'
print ('\n', result)
return data
if not DEBUG:
if not rfc == session_rfc:
self.error = 'El RFC del certificado no corresponde.'
return data
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
desde = parser.parse(dates[0].split('=')[1])
hasta = parser.parse(dates[1].split('=')[1])
self._modulus = _call(args.format(self._path_cer, 'modulus'))
data['cer'] = read_file(self._path_cer)
data['cer_pem'] = cer_pem
data['cer_txt'] = cer_txt.replace('\n', '')
data['serie'] = serie
data['rfc'] = rfc
data['desde'] = desde.replace(tzinfo=None)
data['hasta'] = hasta.replace(tzinfo=None)
return data
def _get_p12(self, password, rfc, token):
tmp_cer = tempfile.mkstemp()[1]
tmp_key = tempfile.mkstemp()[1]
tmp_p12 = tempfile.mkstemp()[1]
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
_call(args.format(self._path_cer, tmp_cer))
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" -out "{}"'
_call(args.format(self._path_key, password, tmp_key))
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" ' \
'-passout pass:"{}" -out "{}"'
_call(args.format(tmp_cer, tmp_key, rfc, token, tmp_p12))
data = read_file(tmp_p12)
self._kill(tmp_cer)
self._kill(tmp_key)
self._kill(tmp_p12)
return data
def _get_info_key(self, password, rfc, token):
data = {}
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}"'
try:
result = _call(args.format(self._path_key, password))
except Exception as e:
self.error = 'Contraseña incorrecta'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \
'openssl rsa -noout -modulus'
mod_key = _call(args.format(self._path_key, password))
if self._modulus != mod_key:
self.error = 'Los archivos no son pareja'
return data
args = "openssl pkcs8 -inform DER -in '{}' -passin pass:'{}' | " \
"openssl rsa -des3 -passout pass:'{}'".format(
self._path_key, password, token)
key_enc = _call(args)
data['key'] = read_file(self._path_key)
data['key_enc'] = key_enc
data['p12'] = self._get_p12(password, rfc, token)
return data
def validate(self, password, rfc, auth):
token = _get_md5(rfc)
if USAR_TOKEN:
token = auth['PASS']
if AUTH['DEBUG']:
token = AUTH['PASS']
if not self._path_key or not self._path_cer:
self.error = 'Error en las rutas temporales del certificado'
return {}
data = self._get_info_cer(rfc)
if not data:
return {}
llave = self._get_info_key(password, rfc, token)
if not llave:
return {}
data.update(llave)
self._kill(self._path_key)
self._kill(self._path_cer)
return data
def make_xml(data, certificado, auth):
from .cfdi_xml import CFDI
token = _get_md5(certificado.rfc)
if USAR_TOKEN:
token = auth['PASS']
if AUTH['DEBUG']:
token = AUTH['PASS']
if DEBUG:
data['emisor']['Rfc'] = certificado.rfc
data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = cfdi.get_xml(data)
data = {
'xsltproc': PATH_XSLTPROC,
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
'xml': save_temp(xml, 'w'),
'openssl': PATH_OPENSSL,
'key': save_temp(certificado.key_enc, 'w'),
'pass': token,
}
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
'"{openssl}" dgst -sha256 -sign "{key}" -passin pass:"{pass}" | ' \
'"{openssl}" enc -base64 -A'.format(**data)
sello = _call(args)
_kill(data['xml'])
_kill(data['key'])
return cfdi.add_sello(sello)
# ~ return cfdi.add_sello(sello)
def timbra_xml(xml, auth):
@ -1339,9 +1185,15 @@ class LIBO(object):
self._leyendas(data.get('leyendas', ''))
self._cancelado(data['cancelada'])
self._others_values(data)
self._clean()
return
def _others_values(self, data):
version = data['version']
self._set_cell('{version}', version)
return
def pdf(self, path, data, ods=False):
options = {'AsTemplate': True, 'Hidden': True}
log.debug('Abrir plantilla...')
@ -1818,7 +1670,7 @@ def _get_relacionados(doc, version):
if node is None:
return ''
uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in node.getchildren()]
uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in list(node)]
return '\n'.join(uuids)
@ -1933,7 +1785,8 @@ def _conceptos(doc, version, options):
data = []
conceptos = doc.find('{}Conceptos'.format(PRE[version]))
for c in conceptos.getchildren():
# ~ for c in conceptos.getchildren():
for c in list(conceptos):
values = CaseInsensitiveDict(c.attrib.copy())
if is_nomina:
values['noidentificacion'] = values['ClaveProdServ']
@ -2002,7 +1855,8 @@ def _totales(doc, cfdi, version):
node = imp.find('{}Traslados'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
# ~ for n in node.getchildren():
for n in list(node):
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
tasa = round(float(tmp['tasaocuota']), DECIMALES)
@ -2013,7 +1867,8 @@ def _totales(doc, cfdi, version):
node = imp.find('{}Retenciones'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
# ~ for n in node.getchildren():
for n in list(node):
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
title = 'Retención {} {}'.format(
@ -2119,20 +1974,20 @@ def _nomina(doc, data, values, version_cfdi):
if not node is None:
data['comprobante'].update(CaseInsensitiveDict(node.attrib.copy()))
info['percepciones'] = []
for p in node.getchildren():
for p in list(node):
info['percepciones'].append(CaseInsensitiveDict(p.attrib.copy()))
node = node_nomina.find('{}Deducciones'.format(PRE['NOMINA'][version]))
if not node is None:
data['comprobante'].update(CaseInsensitiveDict(node.attrib.copy()))
info['deducciones'] = []
for d in node.getchildren():
for d in list(node):
info['deducciones'].append(CaseInsensitiveDict(d.attrib.copy()))
node = node_nomina.find('{}OtrosPagos'.format(PRE['NOMINA'][version]))
if not node is None:
info['otrospagos'] = []
for o in node.getchildren():
for o in list(node):
info['otrospagos'].append(CaseInsensitiveDict(o.attrib.copy()))
n = o.find('{}SubsidioAlEmpleo'.format(PRE['NOMINA'][version]))
if not n is None:
@ -2141,7 +1996,7 @@ def _nomina(doc, data, values, version_cfdi):
node = node_nomina.find('{}Incapacidades'.format(PRE['NOMINA'][version]))
if not node is None:
info['incapacidades'] = []
for i in node.getchildren():
for i in list(node):
info['incapacidades'].append(CaseInsensitiveDict(i.attrib.copy()))
return info
@ -2196,6 +2051,8 @@ def get_data_from_xml(invoice, values):
if data['pagos']:
data['pays'] = _cfdipays(doc, data, version)
data['pakings'] = values.get('pakings', [])
# ~ data['version'] = values['version']
data['version'] = version
return data
@ -2692,12 +2549,12 @@ def local_copy(files):
log.error(msg)
return
args = 'df -P {} | tail -1 | cut -d" " -f 1'.format(path_bk)
try:
result = _call(args)
# ~ args = 'df -P {} | tail -1 | cut -d" " -f 1'.format(path_bk)
# ~ try:
# ~ result = _call(args)
# ~ log.info(result)
except:
pass
# ~ except:
# ~ pass
# ~ if result != 'empresalibre\n':
# ~ log.info(result)
# ~ msg = 'Asegurate de que exista la carpeta para sincronizar'
@ -2742,20 +2599,20 @@ def sync_files(files, auth={}):
return
def sync_cfdi(auth, files):
def sync_cfdi(files):
local_copy(files)
if DEBUG:
return
if not auth['REPO'] or not SEAFILE_SERVER:
return
# ~ if not auth['REPO'] or not SEAFILE_SERVER:
# ~ return
seafile = SeaFileAPI(SEAFILE_SERVER['URL'], auth['USER'], auth['PASS'])
if seafile.is_connect:
for f in files:
seafile.update_file(
f, auth['REPO'], 'Facturas/{}/'.format(f[2]), auth['PASS'])
# ~ seafile = SeaFileAPI(SEAFILE_SERVER['URL'], auth['USER'], auth['PASS'])
# ~ if seafile.is_connect:
# ~ for f in files:
# ~ seafile.update_file(
# ~ f, auth['REPO'], 'Facturas/{}/'.format(f[2]), auth['PASS'])
return
@ -3158,7 +3015,7 @@ class ImportCFDI(object):
def _conceptos(self):
data = []
conceptos = self._doc.find('{}Conceptos'.format(self._pre))
for c in conceptos.getchildren():
for c in list(conceptos):
values = CaseInsensitiveDict(c.attrib.copy())
data.append(values)
return data

View File

@ -48,11 +48,13 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from dateutil import parser
import seafileapi
from .cfdi_xml import CFDI
from settings import DEBUG, DB_COMPANIES, PATHS
from .comercio import PACComercioDigital
# ~ from .finkok import PACFinkok
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL, RFCS
from .pacs.cfdi_cert import SATCertificate
from .pacs import PACComercioDigital
from .pacs import PACFinkok
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
@ -74,9 +76,14 @@ if DEBUG:
PSQL = 'psql -h localhost -U postgres'
PACS = {
# ~ '': PACFinkok,
'finkok': PACFinkok,
'comercio': PACComercioDigital,
}
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
class CaseInsensitiveDict(collections.MutableMapping):
@ -491,29 +498,6 @@ def _backup_db(rfc, is_mv, url_seafile):
shutil.copy(path, path_target)
else:
log.error('\tNo existe la carpeta compartida...')
# ~ sql = 'select correo_timbrado, token_soporte from emisor;'
# ~ args = 'psql -U postgres -d {} -Atc "{}"'.format(data['name'], sql)
# ~ result = _call(args)
# ~ if not result:
# ~ log.error('\tSin datos para backup remoto')
# ~ return
# ~ data = result.strip().split('|')
# ~ if not data[1]:
# ~ log.error('\tSin token de soporte')
# ~ return
# ~ email = data[0]
# ~ uuid = data[1]
# ~ email = 'hola@elmau.net'
# ~ uuid = 'cc42c591-cf66-499a-ae70-c09df5646be9'
# ~ log.debug(url_seafile, email, _get_pass(rfc))
# ~ client = seafileapi.connect(url_seafile, email, _get_pass(rfc))
# ~ repo = client.repos.get_repo(uuid)
# ~ print(repo)
return
@ -583,26 +567,24 @@ def get_pass():
return True, password
def xml_stamp(xml, auth, name):
def xml_stamp(xml, auth):
if not DEBUG and not auth:
msg = 'Sin datos para timbrar'
result = {'ok': False, 'error': msg}
return result
result = {'ok': True, 'error': ''}
auth = {'user': auth['USER'], 'pass': auth['PASS']}
pac = PACS[name]()
xml_stamped = pac.stamp(xml, auth)
pac = PACS[auth['pac']]()
response = pac.stamp(xml, auth)
if not xml_stamped:
if not response:
result['ok'] = False
result['error'] = pac.error
return result
result['xml'] = xml_stamped
result['uuid'] = pac.cfdi_uuid
result['fecha'] = pac.date_stamped
result.update(response)
return result
@ -630,14 +612,89 @@ def xml_cancel(xml, auth, cert, name):
return data, result
def get_client_balance(auth, name):
def get_client_balance(auth):
if DEBUG:
return '-d'
pac = PACS[name]()
auth = {'usr': auth['USER'], 'pwd': auth['PASS']}
pac = PACS[auth['pac']]()
balance = pac.client_balance(auth)
if pac.error:
balance = '-e'
balance = 'p/e'
return balance
def get_cert(args):
cer = base64.b64decode(args['cer'].split(',')[1])
key = base64.b64decode(args['key'].split(',')[1])
cert = SATCertificate(cer, key, args['contra'])
return cert
def make_xml(data, certificado):
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
if DEBUG:
data['emisor']['Rfc'] = certificado.rfc
data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = ET.parse(BytesIO(cfdi.get_xml(data).encode()))
path_xslt = _join(PATHS['xslt'], 'cadena.xslt')
xslt = open(path_xslt, 'rb')
transfor = ET.XSLT(ET.parse(xslt))
cadena = str(transfor(xml)).encode()
stamp = cert.sign(cadena)
xslt.close()
return cfdi.add_sello(stamp, cert.cer_txt)
def get_pac_by_rfc(cfdi):
tree = ET.fromstring(cfdi.encode())
path = 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@RfcProvCertif)'
rfc_pac = tree.xpath(path, namespaces=NS_CFDI)
return RFCS[rfc_pac]
def _cancel_finkok(invoice, auth, certificado):
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
pac = PACS[auth['pac']]()
info = {'cer': cert.cer_pem, 'key': cert.key_pem}
result = pac.cancel(invoice.xml, info, auth)
if pac.error:
data = {'ok': False, 'msg': pac.error, 'row': {}}
return data
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
'date': result['date'], 'acuse': result['acuse']}
return data
def cancel_xml_sign(invoice, auth, certificado):
if auth['pac'] == 'finkok':
return _cancel_finkok(invoice, auth, certificado)
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
pac = PACS[auth['pac']]()
data = {
'rfc': certificado.rfc,
'fecha': now().isoformat()[:19],
'uuid': str(invoice.uuid).upper(),
}
template = TEMPLATE_CANCEL.format(**data)
tree = ET.fromstring(template.encode())
tree = cert.sign_xml(tree)
sign_xml = ET.tostring(tree).decode()
result = pac.cancel_xml(sign_xml, auth, invoice.xml)
if pac.error:
data = {'ok': False, 'msg': pac.error, 'row': {}}
return data
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
'date': result['date'], 'acuse': result['acuse']}
return data

View File

@ -17,7 +17,7 @@ from controllers.main import (AppEmpresas,
AppDocumentos, AppFiles, AppPreInvoices, AppCuentasBanco,
AppMovimientosBanco, AppTickets, AppStudents, AppEmployees, AppNomina,
AppInvoicePay, AppCfdiPay, AppSATBancos, AppSociosCuentasBanco,
AppSATFormaPago, AppSATLeyendaFiscales
AppSATFormaPago, AppSATLeyendaFiscales, AppCert
)
@ -62,6 +62,7 @@ api.add_route('/satbancos', AppSATBancos(db))
api.add_route('/satformapago', AppSATFormaPago(db))
api.add_route('/socioscb', AppSociosCuentasBanco(db))
api.add_route('/leyendasfiscales', AppSATLeyendaFiscales(db))
api.add_route('/cert', AppCert(db))
session_options = {

View File

@ -471,6 +471,13 @@ class StorageEngine(object):
def sat_leyendas_fiscales_delete(self, values):
return main.SATLeyendasFiscales.remove(values)
# ~ v2
def cert_get(self, values):
return main.Certificado.get_data(values)
def cert_post(self, values):
return main.Certificado.post(values)
# Companies only in MV
def _get_empresas(self, values):
return main.companies_get()

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +0,0 @@
from seafileapi.client import SeafileApiClient
def connect(server, username, password):
client = SeafileApiClient(server, username, password)
return client

View File

@ -1,7 +0,0 @@
class SeafileAdmin(object):
def lists_users(self, maxcount=100):
pass
def list_user_repos(self, username):
pass

View File

@ -1,77 +0,0 @@
import requests
from seafileapi.utils import urljoin
from seafileapi.exceptions import ClientHttpError
from seafileapi.repos import Repos
class SeafileApiClient(object):
"""Wraps seafile web api"""
def __init__(self, server, username=None, password=None, token=None):
"""Wraps various basic operations to interact with seahub http api.
"""
self.server = server
self.username = username
self.password = password
self._token = token
self.repos = Repos(self)
self.groups = Groups(self)
if token is None:
self._get_token()
def _get_token(self):
data = {
'username': self.username,
'password': self.password,
}
url = urljoin(self.server, '/api2/auth-token/')
res = requests.post(url, data=data)
if res.status_code != 200:
raise ClientHttpError(res.status_code, res.content)
token = res.json()['token']
assert len(token) == 40, 'The length of seahub api auth token should be 40'
self._token = token
def __str__(self):
return 'SeafileApiClient[server=%s, user=%s]' % (self.server, self.username)
__repr__ = __str__
def get(self, *args, **kwargs):
return self._send_request('GET', *args, **kwargs)
def post(self, *args, **kwargs):
return self._send_request('POST', *args, **kwargs)
def put(self, *args, **kwargs):
return self._send_request('PUT', *args, **kwargs)
def delete(self, *args, **kwargs):
return self._send_request('delete', *args, **kwargs)
def _send_request(self, method, url, *args, **kwargs):
if not url.startswith('http'):
url = urljoin(self.server, url)
headers = kwargs.get('headers', {})
headers.setdefault('Authorization', 'Token ' + self._token)
kwargs['headers'] = headers
expected = kwargs.pop('expected', 200)
if not hasattr(expected, '__iter__'):
expected = (expected, )
resp = requests.request(method, url, *args, **kwargs)
if resp.status_code not in expected:
msg = 'Expected %s, but get %s' % \
(' or '.join(map(str, expected)), resp.status_code)
raise ClientHttpError(resp.status_code, msg)
return resp
class Groups(object):
def __init__(self, client):
pass
def create_group(self, name):
pass

View File

@ -1,25 +0,0 @@
class ClientHttpError(Exception):
"""This exception is raised if the returned http response is not as
expected"""
def __init__(self, code, message):
super(ClientHttpError, self).__init__()
self.code = code
self.message = message
def __str__(self):
return 'ClientHttpError[%s: %s]' % (self.code, self.message)
class OperationError(Exception):
"""Expcetion to raise when an opeartion is failed"""
pass
class DoesNotExist(Exception):
"""Raised when not matching resource can be found."""
def __init__(self, msg):
super(DoesNotExist, self).__init__()
self.msg = msg
def __str__(self):
return 'DoesNotExist: %s' % self.msg

View File

@ -1,250 +0,0 @@
import io
import os
import posixpath
import re
from seafileapi.utils import querystr
ZERO_OBJ_ID = '0000000000000000000000000000000000000000'
class _SeafDirentBase(object):
"""Base class for :class:`SeafFile` and :class:`SeafDir`.
It provides implementation of their common operations.
"""
isdir = None
def __init__(self, repo, path, object_id, size=0):
"""
:param:`path` the full path of this entry within its repo, like
"/documents/example.md"
:param:`size` The size of a file. It should be zero for a dir.
"""
self.client = repo.client
self.repo = repo
self.path = path
self.id = object_id
self.size = size
@property
def name(self):
return posixpath.basename(self.path)
def list_revisions(self):
pass
def delete(self):
suffix = 'dir' if self.isdir else 'file'
url = '/api2/repos/%s/%s/' % (self.repo.id, suffix) + querystr(p=self.path)
resp = self.client.delete(url)
return resp
def rename(self, newname):
"""Change file/folder name to newname
"""
suffix = 'dir' if self.isdir else 'file'
url = '/api2/repos/%s/%s/' % (self.repo.id, suffix) + querystr(p=self.path, reloaddir='true')
postdata = {'operation': 'rename', 'newname': newname}
resp = self.client.post(url, data=postdata)
succeeded = resp.status_code == 200
if succeeded:
if self.isdir:
new_dirent = self.repo.get_dir(os.path.join(os.path.dirname(self.path), newname))
else:
new_dirent = self.repo.get_file(os.path.join(os.path.dirname(self.path), newname))
for key in list(self.__dict__.keys()):
self.__dict__[key] = new_dirent.__dict__[key]
return succeeded
def _copy_move_task(self, operation, dirent_type, dst_dir, dst_repo_id=None):
url = '/api/v2.1/copy-move-task/'
src_repo_id = self.repo.id
src_parent_dir = os.path.dirname(self.path)
src_dirent_name = os.path.basename(self.path)
dst_repo_id = dst_repo_id
dst_parent_dir = dst_dir
operation = operation
dirent_type = dirent_type
postdata = {'src_repo_id': src_repo_id, 'src_parent_dir': src_parent_dir,
'src_dirent_name': src_dirent_name, 'dst_repo_id': dst_repo_id,
'dst_parent_dir': dst_parent_dir, 'operation': operation,
'dirent_type': dirent_type}
return self.client.post(url, data=postdata)
def copyTo(self, dst_dir, dst_repo_id=None):
"""Copy file/folder to other directory (also to a different repo)
"""
if dst_repo_id is None:
dst_repo_id = self.repo.id
dirent_type = 'dir' if self.isdir else 'file'
resp = self._copy_move_task('copy', dirent_type, dst_dir, dst_repo_id)
return resp.status_code == 200
def moveTo(self, dst_dir, dst_repo_id=None):
"""Move file/folder to other directory (also to a different repo)
"""
if dst_repo_id is None:
dst_repo_id = self.repo.id
dirent_type = 'dir' if self.isdir else 'file'
resp = self._copy_move_task('move', dirent_type, dst_dir, dst_repo_id)
succeeded = resp.status_code == 200
if succeeded:
new_repo = self.client.repos.get_repo(dst_repo_id)
dst_path = os.path.join(dst_dir, os.path.basename(self.path))
if self.isdir:
new_dirent = new_repo.get_dir(dst_path)
else:
new_dirent = new_repo.get_file(dst_path)
for key in list(self.__dict__.keys()):
self.__dict__[key] = new_dirent.__dict__[key]
return succeeded
def get_share_link(self):
pass
class SeafDir(_SeafDirentBase):
isdir = True
def __init__(self, *args, **kwargs):
super(SeafDir, self).__init__(*args, **kwargs)
self.entries = None
self.entries = kwargs.pop('entries', None)
def ls(self, force_refresh=False):
"""List the entries in this dir.
Return a list of objects of class :class:`SeafFile` or :class:`SeafDir`.
"""
if self.entries is None or force_refresh:
self.load_entries()
return self.entries
def share_to_user(self, email, permission):
url = '/api2/repos/%s/dir/shared_items/' % self.repo.id + querystr(p=self.path)
putdata = {
'share_type': 'user',
'username': email,
'permission': permission
}
resp = self.client.put(url, data=putdata)
return resp.status_code == 200
def create_empty_file(self, name):
"""Create a new empty file in this dir.
Return a :class:`SeafFile` object of the newly created file.
"""
# TODO: file name validation
path = posixpath.join(self.path, name)
url = '/api2/repos/%s/file/' % self.repo.id + querystr(p=path, reloaddir='true')
postdata = {'operation': 'create'}
resp = self.client.post(url, data=postdata)
self.id = resp.headers['oid']
self.load_entries(resp.json())
return SeafFile(self.repo, path, ZERO_OBJ_ID, 0)
def mkdir(self, name):
"""Create a new sub folder right under this dir.
Return a :class:`SeafDir` object of the newly created sub folder.
"""
path = posixpath.join(self.path, name)
url = '/api2/repos/%s/dir/' % self.repo.id + querystr(p=path, reloaddir='true')
postdata = {'operation': 'mkdir'}
resp = self.client.post(url, data=postdata)
self.id = resp.headers['oid']
self.load_entries(resp.json())
return SeafDir(self.repo, path, ZERO_OBJ_ID)
def upload(self, fileobj, filename):
"""Upload a file to this folder.
:param:fileobj :class:`File` like object
:param:filename The name of the file
Return a :class:`SeafFile` object of the newly uploaded file.
"""
if isinstance(fileobj, str):
fileobj = io.BytesIO(fileobj)
upload_url = self._get_upload_link()
files = {
'file': (filename, fileobj),
'parent_dir': self.path,
}
self.client.post(upload_url, files=files)
return self.repo.get_file(posixpath.join(self.path, filename))
def upload_local_file(self, filepath, name=None):
"""Upload a file to this folder.
:param:filepath The path to the local file
:param:name The name of this new file. If None, the name of the local file would be used.
Return a :class:`SeafFile` object of the newly uploaded file.
"""
name = name or os.path.basename(filepath)
with open(filepath, 'r') as fp:
return self.upload(fp, name)
def _get_upload_link(self):
url = '/api2/repos/%s/upload-link/' % self.repo.id
resp = self.client.get(url)
return re.match(r'"(.*)"', resp.text).group(1)
def get_uploadable_sharelink(self):
"""Generate a uploadable shared link to this dir.
Return the url of this link.
"""
pass
def load_entries(self, dirents_json=None):
if dirents_json is None:
url = '/api2/repos/%s/dir/' % self.repo.id + querystr(p=self.path)
dirents_json = self.client.get(url).json()
self.entries = [self._load_dirent(entry_json) for entry_json in dirents_json]
def _load_dirent(self, dirent_json):
path = posixpath.join(self.path, dirent_json['name'])
if dirent_json['type'] == 'file':
return SeafFile(self.repo, path, dirent_json['id'], dirent_json['size'])
else:
return SeafDir(self.repo, path, dirent_json['id'], 0)
@property
def num_entries(self):
if self.entries is None:
self.load_entries()
return len(self.entries) if self.entries is not None else 0
def __str__(self):
return 'SeafDir[repo=%s,path=%s,entries=%s]' % \
(self.repo.id[:6], self.path, self.num_entries)
__repr__ = __str__
class SeafFile(_SeafDirentBase):
isdir = False
def update(self, fileobj):
"""Update the content of this file"""
pass
def __str__(self):
return 'SeafFile[repo=%s,path=%s,size=%s]' % \
(self.repo.id[:6], self.path, self.size)
def _get_download_link(self):
url = '/api2/repos/%s/file/' % self.repo.id + querystr(p=self.path)
resp = self.client.get(url)
return re.match(r'"(.*)"', resp.text).group(1)
def get_content(self):
"""Get the content of the file"""
url = self._get_download_link()
return self.client.get(url).content
__repr__ = __str__

View File

@ -1,22 +0,0 @@
class Group(object):
def __init__(self, client, group_id, group_name):
self.client = client
self.group_id = group_id
self.group_name = group_name
def list_memebers(self):
pass
def delete(self):
pass
def add_member(self, username):
pass
def remove_member(self, username):
pass
def list_group_repos(self):
pass

View File

@ -1,99 +0,0 @@
from urllib.parse import urlencode
from seafileapi.files import SeafDir, SeafFile
from seafileapi.utils import raise_does_not_exist
class Repo(object):
"""
A seafile library
"""
def __init__(self, client, repo_id, repo_name,
encrypted, owner, perm):
self.client = client
self.id = repo_id
self.name = repo_name
self.encrypted = encrypted
self.owner = owner
self.perm = perm
@classmethod
def from_json(cls, client, repo_json):
repo_id = repo_json['id']
repo_name = repo_json['name']
encrypted = repo_json['encrypted']
perm = repo_json['permission']
owner = repo_json['owner']
return cls(client, repo_id, repo_name, encrypted, owner, perm)
def is_readonly(self):
return 'w' not in self.perm
@raise_does_not_exist('The requested file does not exist')
def get_file(self, path):
"""Get the file object located in `path` in this repo.
Return a :class:`SeafFile` object
"""
assert path.startswith('/')
url = '/api2/repos/%s/file/detail/' % self.id
query = '?' + urlencode(dict(p=path))
file_json = self.client.get(url + query).json()
return SeafFile(self, path, file_json['id'], file_json['size'])
@raise_does_not_exist('The requested dir does not exist')
def get_dir(self, path):
"""Get the dir object located in `path` in this repo.
Return a :class:`SeafDir` object
"""
assert path.startswith('/')
url = '/api2/repos/%s/dir/' % self.id
query = '?' + urlencode(dict(p=path))
resp = self.client.get(url + query)
dir_id = resp.headers['oid']
dir_json = resp.json()
dir = SeafDir(self, path, dir_id)
dir.load_entries(dir_json)
return dir
def delete(self):
"""Remove this repo. Only the repo owner can do this"""
self.client.delete('/api2/repos/' + self.id)
def list_history(self):
"""List the history of this repo
Returns a list of :class:`RepoRevision` object.
"""
pass
## Operations only the repo owner can do:
def update(self, name=None):
"""Update the name of this repo. Only the repo owner can do
this.
"""
pass
def get_settings(self):
"""Get the settings of this repo. Returns a dict containing the following
keys:
`history_limit`: How many days of repo history to keep.
"""
pass
def restore(self, commit_id):
pass
class RepoRevision(object):
def __init__(self, client, repo, commit_id):
self.client = client
self.repo = repo
self.commit_id = commit_id
def restore(self):
"""Restore the repo to this revision"""
self.repo.revert(self.commit_id)

View File

@ -1,26 +0,0 @@
from seafileapi.repo import Repo
from seafileapi.utils import raise_does_not_exist
class Repos(object):
def __init__(self, client):
self.client = client
def create_repo(self, name, password=None):
data = {'name': name}
if password:
data['passwd'] = password
repo_json = self.client.post('/api2/repos/', data=data).json()
return self.get_repo(repo_json['repo_id'])
@raise_does_not_exist('The requested library does not exist')
def get_repo(self, repo_id):
"""Get the repo which has the id `repo_id`.
Raises :exc:`DoesNotExist` if no such repo exists.
"""
repo_json = self.client.get('/api2/repos/' + repo_id).json()
return Repo.from_json(self.client, repo_json)
def list_repos(self):
repos_json = self.client.get('/api2/repos/').json()
return [Repo.from_json(self.client, j) for j in repos_json]

View File

@ -1,57 +0,0 @@
import string
import random
from functools import wraps
from urllib.parse import urlencode
from seafileapi.exceptions import ClientHttpError, DoesNotExist
def randstring(length=0):
if length == 0:
length = random.randint(1, 30)
return ''.join(random.choice(string.lowercase) for i in range(length))
def urljoin(base, *args):
url = base
if url[-1] != '/':
url += '/'
for arg in args:
arg = arg.strip('/')
url += arg + '/'
if '?' in url:
url = url[:-1]
return url
def raise_does_not_exist(msg):
"""Decorator to turn a function that get a http 404 response to a
:exc:`DoesNotExist` exception."""
def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except ClientHttpError as e:
if e.code == 404:
raise DoesNotExist(msg)
else:
raise
return wrapped
return decorator
def to_utf8(obj):
if isinstance(obj, str):
return obj.encode('utf-8')
return obj
def querystr(**kwargs):
return '?' + urlencode(kwargs)
def utf8lize(obj):
if isinstance(obj, dict):
return {k: to_utf8(v) for k, v in obj.items()}
if isinstance(obj, list):
return [to_utf8(x) for x in ob]
if instance(obj, str):
return obj.encode('utf-8')
return obj

View File

@ -30,11 +30,6 @@ try:
except ImportError:
DEFAULT_PASSWORD = 'salgueiro3.3'
try:
from conf import SEAFILE_SERVER
except ImportError:
SEAFILE_SERVER = {}
try:
from conf import TITLE_APP
except ImportError:
@ -47,7 +42,7 @@ except ImportError:
DEBUG = DEBUG
VERSION = '1.39.1'
VERSION = '1.40.0'
EMAIL_SUPPORT = ('soporte@empresalibre.mx',)
TITLE_APP = '{} v{}'.format(TITLE_APP, VERSION)
@ -78,8 +73,8 @@ PATH_SESSIONS = {
IV = 'valores_iniciales.json'
INIT_VALUES = os.path.abspath(os.path.join(BASE_DIR, '..', 'db', IV))
CT = 'cancel_template.xml'
TEMPLATE_CANCEL = os.path.abspath(os.path.join(PATH_TEMPLATES, CT))
# ~ CT = 'cancel_template.xml'
# ~ TEMPLATE_CANCEL = os.path.abspath(os.path.join(PATH_TEMPLATES, CT))
PATH_XSLT = os.path.abspath(os.path.join(BASE_DIR, '..', 'xslt'))
PATH_BIN = os.path.abspath(os.path.join(BASE_DIR, '..', 'bin'))
@ -222,6 +217,7 @@ PATHS = {
'BK': path_bk,
'LOCAL': path_local,
'SAT': path_sat,
'xslt': PATH_XSLT,
}
VALUES_PDF = {
@ -237,6 +233,8 @@ VALUES_PDF = {
RFCS = {
'PUBLIC': 'XAXX010101000',
'FOREIGN': 'XEXX010101000',
'CVD110412TF6': 'finkok',
'SCD110105654': 'comercio',
}
URL = {
@ -249,3 +247,32 @@ DEFAULT_GLOBAL = {
'descripcion': 'Venta',
'clave_sat': '01010101',
}
TEMPLATE_CANCEL = """<Cancelacion RfcEmisor="{rfc}" Fecha="{fecha}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://cancelacfd.sat.gob.mx">
<Folios>
<UUID>{uuid}</UUID>
</Folios>
<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">
<SignedInfo>
<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315" />
<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1" />
<Reference URI="">
<Transforms>
<Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature" />
</Transforms>
<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1" />
<DigestValue />
</Reference>
</SignedInfo>
<SignatureValue />
<KeyInfo>
<X509Data>
<X509SubjectName />
<X509IssuerSerial />
<X509Certificate />
</X509Data>
<KeyValue />
</KeyInfo>
</Signature>
</Cancelacion>
"""

View File

@ -19,6 +19,9 @@ var msg = ''
var tb_options = null
var tb_sat = null
var file_cer = null
var file_key = null
var controllers = {
init: function(){
@ -32,7 +35,9 @@ var controllers = {
$$('chk_escuela').attachEvent('onChange', chk_escuela_change)
$$('chk_ong').attachEvent('onChange', chk_ong_change)
$$('cmd_subir_certificado').attachEvent('onItemClick', cmd_subir_certificado_click)
$$('up_cert').attachEvent('onUploadComplete', up_cert_upload_complete)
//~ $$('up_cert').attachEvent('onUploadComplete', up_cert_upload_complete)
//~ $$('up_cert').attachEvent('onAfterFileAdd', up_cert_after_file_add)
$$('up_cert').attachEvent('onBeforeFileAdd', up_cert_before_file_add)
$$('cmd_agregar_serie').attachEvent('onItemClick', cmd_agregar_serie_click)
$$('grid_folios').attachEvent('onItemClick', grid_folios_click)
$$('chk_folio_custom').attachEvent('onItemClick', chk_config_item_click)
@ -135,6 +140,7 @@ var controllers = {
$$('chk_ticket_user_show_doc').attachEvent('onItemClick', chk_config_item_click)
$$('txt_ticket_printer').attachEvent('onKeyPress', txt_ticket_printer_key_press)
$$('lst_pac').attachEvent('onChange', lst_pac_on_change)
$$('cmd_save_pac').attachEvent('onItemClick', cmd_save_pac_click)
$$('cmd_subir_bdfl').attachEvent('onItemClick', cmd_subir_bdfl_click)
$$('cmd_subir_cfdixml').attachEvent('onItemClick', cmd_subir_cfdixml_click)
@ -279,7 +285,7 @@ function get_emisor(){
function get_certificado(){
var form = $$('form_cert')
webix.ajax().get("/values/cert", {}, {
webix.ajax().get("/cert", {'opt': 'cert'}, {
error: function(text, data, xhr) {
msg = 'Error al consultar'
msg_error(msg)
@ -480,7 +486,7 @@ function get_config_values(opt){
var values = data.json()
Object.keys(values).forEach(function(key){
if(key=='lst_pac'){
set_value(key, values[key])
$$('lst_pac').setValue(values[key])
}else{
$$(key).setValue(values[key])
if(key=='chk_config_leyendas_fiscales'){
@ -601,105 +607,6 @@ function chk_ong_change(new_value, old_value){
}
function cmd_subir_certificado_click(){
var form = $$('form_upload')
if (!form.validate()){
msg = 'Valores inválidos'
msg_error(msg)
return
}
var values = form.getValues()
if(!values.contra.trim()){
msg = 'La contraseña no puede estar vacía'
msg_error(msg)
return
}
if($$('lst_cert').count() < 2){
msg = 'Selecciona al menos dos archivos: CER y KEY del certificado.'
msg_error(msg)
return
}
if($$('lst_cert').count() > 2){
msg = 'Selecciona solo dos archivos: CER y KEY del certificado.'
msg_error(msg)
return
}
var fo1 = $$('up_cert').files.getItem($$('up_cert').files.getFirstId())
var fo2 = $$('up_cert').files.getItem($$('up_cert').files.getLastId())
var ext = ['key', 'cer']
if(ext.indexOf(fo1.type.toLowerCase()) == -1 || ext.indexOf(fo2.type.toLowerCase()) == -1){
msg = 'Archivos inválidos, se requiere un archivo CER y un KEY.'
msg_error(msg)
return
}
if(fo1.type == fo2.type && fo1.size == fo2.size){
msg = 'Selecciona archivos diferentes: un archivo CER y un KEY.'
msg_error(msg)
return
}
var serie = $$('form_cert').getValues()['cert_serie']
if(serie){
msg = 'Ya existe un certificado guardado<BR><BR>¿Deseas reemplazarlo?'
webix.confirm({
title: 'Certificado Existente',
ok: 'Si',
cancel: 'No',
type: 'confirm-error',
text: msg,
callback:function(result){
if(result){
$$('up_cert').send()
}
}
})
}else{
$$('up_cert').send()
}
}
function up_cert_upload_complete(response){
if(response.status != 'server'){
msg = 'Ocurrio un error al subir los archivos'
msg_error(msg)
return
}
msg = 'Archivos subidos correctamente. Esperando validación'
msg_ok(msg)
var values = $$('form_upload').getValues()
$$('form_upload').setValues({})
$$('up_cert').files.data.clearAll()
webix.ajax().post('/values/cert', values, {
error:function(text, data, XmlHttpRequest){
msg = 'Ocurrio un error, consulta a soporte técnico'
msg_error(msg)
},
success:function(text, data, XmlHttpRequest){
var values = data.json()
if(values.ok){
$$('form_cert').setValues(values.data)
msg_ok(values.msg)
}else{
msg_error(values.msg)
}
}
})
}
function cmd_agregar_serie_click(){
var form = $$('form_folios')
var grid = $$('grid_folios')
@ -2558,37 +2465,6 @@ function opt_make_pdf_from_on_change(new_value, old_value){
}
function lst_pac_on_change(nv, ov){
if(nv=='default'){
webix.ajax().del('/config', {id: 'lst_pac'}, function(text, xml, xhr){
var msg = 'PAC predeterminado establecido correctamente'
if(xhr.status == 200){
msg_ok(msg)
}else{
msg = 'No se pudo eliminar'
msg_error(msg)
}
})
}else{
webix.ajax().post('/config', {'lst_pac': nv}, {
error: function(text, data, xhr) {
msg = 'Error al guardar la configuración'
msg_error(msg)
},
success: function(text, data, xhr) {
var values = data.json();
if (values.ok){
msg = 'PAC establecido correctamente'
msg_ok(msg)
}else{
msg_error(values.msg)
}
}
})
}
}
function admin_config_other_options(id){
if(id=='chk_config_leyendas_fiscales'){
var value = Boolean($$(id).getValue())
@ -2693,3 +2569,171 @@ function delete_leyenda_fiscal(id){
}
})
}
function lst_pac_on_change(nv, ov){
webix.ajax().get('/config', {'fields': 'pac', 'pac': nv}, {
error: function(text, data, xhr) {
msg = 'Error al consultar'
msg_error(msg)
},
success: function(text, data, xhr) {
var values = data.json()
Object.keys(values).forEach(function(key){
set_value(key, values[key])
})
}
})
}
function cmd_save_pac_click(){
var pac = $$('lst_pac').getValue()
var user = $$('user_timbrado').getValue()
var token = $$('token_timbrado').getValue()
if(!pac.trim()){
msg = 'Selecciona un PAC'
msg_error(msg)
return
}
if(!user.trim()){
msg = 'El Usuario es requerido'
msg_error(msg)
return
}
if(!token.trim()){
msg = 'El Token es requerido'
msg_error(msg)
return
}
var values = {
opt: 'save_pac',
lst_pac: pac,
user_timbrado: user,
token_timbrado: token,
}
webix.ajax().post('/config', values, {
error: function(text, data, xhr) {
msg = 'Error al guardar el PAC'
msg_error(msg)
},
success: function(text, data, xhr) {
var values = data.json();
if (values.ok){
msg = 'PAC guardado correctamente'
msg_ok(msg)
}else{
msg_error(values.msg)
}
}
})
}
function cmd_subir_certificado_click(){
var form = $$('form_upload')
if (!form.validate()){
msg = 'Valores inválidos'
msg_error(msg)
return
}
var values = form.getValues()
if(!values.contra.trim()){
msg = 'La contraseña no puede estar vacía'
msg_error(msg)
return
}
var serie = $$('form_cert').getValues()['cert_serie']
if(serie){
msg = 'Ya existe un certificado guardado<BR><BR>¿Deseas reemplazarlo?'
webix.confirm({
title: 'Certificado Existente',
ok: 'Si',
cancel: 'No',
type: 'confirm-error',
text: msg,
callback:function(result){
if(!result){
return
}
}
})
}
$$('form_upload').setValues({})
$$('up_cert').files.data.clearAll()
values['cer'] = file_cer
values['key'] = file_key
validate_cert(values)
}
function up_cert_before_file_add(file){
if (file.type.toLowerCase() != 'cer' && file.type.toLowerCase() != 'key'){
msg_error('Selecciona un archivo CER o KEY')
return false
}
var count = $$('lst_cert').count()
if (count > 1){
msg = 'Selecciona solo dos archivos: CER y KEY del certificado.'
msg_error(msg)
return false
}
if (count > 0){
var f = $$('up_cert').files.getItem($$('up_cert').files.getFirstId())
if (f.type.toLowerCase() == file.type.toLowerCase()){
msg = 'Selecciona archivos diferentes: un archivo CER y un KEY.'
msg_error(msg)
return false
}
}
var reader = new FileReader();
if (file.type.toLowerCase() == 'cer'){
reader.addEventListener('load', (event) => {
file_cer = event.target.result;
});
reader.readAsDataURL(file.file);
} else {
reader.addEventListener('load', (event) => {
file_key = event.target.result;
});
reader.readAsDataURL(file.file);
}
}
function validate_cert(values){
msg = 'Archivos recibidos correctamente. Esperando validación'
msg_ok(msg)
values['opt'] = 'validate_cert'
webix.ajax().post('/cert', values, {
error:function(text, data, XmlHttpRequest){
msg = 'Ocurrio un error, consulta a soporte técnico'
msg_error(msg)
},
success:function(text, data, XmlHttpRequest){
var values = data.json()
if(values.ok){
$$('form_cert').setValues(values.data)
msg_ok(values.msg)
}else{
msg_error(values.msg)
}
}
})
}

View File

@ -1366,6 +1366,7 @@ function send_cancel(id){
msg_ok(values.msg)
gi.updateItem(id, values.row)
}else{
msg_error('No fue posible cancelar')
webix.alert({
title: 'Error al Cancelar',
text: values.msg,

View File

@ -238,11 +238,11 @@ var emisor_otros_datos= [
{cols: [{view: 'datepicker', id: 'ong_fecha_dof', name: 'ong_fecha_dof',
label: 'Fecha de DOF: ', disabled: true, format: '%d-%M-%Y',
placeholder: 'Fecha de publicación en el DOF'}, {}]},
{template: 'Timbrado y Soporte', type: 'section'},
{view: 'text', id: 'correo_timbrado',
name: 'correo_timbrado', label: 'Usuario para Timbrado: '},
{view: 'text', id: 'token_timbrado',
name: 'token_timbrado', label: 'Token de Timbrado: '},
{template: 'Soporte', type: 'section'},
//~ {view: 'text', id: 'correo_timbrado',
//~ name: 'correo_timbrado', label: 'Usuario para Timbrado: '},
//~ {view: 'text', id: 'token_timbrado',
//~ name: 'token_timbrado', label: 'Token de Timbrado: '},
{view: 'text', id: 'token_soporte',
name: 'token_soporte', label: 'Token de Soporte: '},
]
@ -278,13 +278,16 @@ var col_fiel = {rows: [
]}
//~ {view: 'uploader', id: 'up_cert', autosend: false, link: 'lst_cert',
//~ value: 'Seleccionar certificado', upload: '/values/files'}, {}]},
var emisor_certificado = [
{cols: [col_sello, col_fiel]},
{template: 'Cargar Certificado', type: 'section'},
{view: 'form', id: 'form_upload', rows: [
{cols: [{},
{view: 'uploader', id: 'up_cert', autosend: false, link: 'lst_cert',
value: 'Seleccionar certificado', upload: '/values/files'}, {}]},
value: 'Seleccionar certificado'}, {}]},
{cols: [{},
{view: 'list', id: 'lst_cert', name: 'certificado',
type: 'uploader', autoheight:true, borderless: true}, {}]},
@ -644,7 +647,7 @@ var options_templates = [
var options_pac = [
{id: 'default', value: 'Predeterminado'},
{id: 'finkok', value: 'Finkok'},
{id: 'comercio', value: 'Comercio Digital'},
]
@ -690,12 +693,26 @@ var options_admin_otros = [
{},
]},
{maxHeight: 15},
{template: 'Timbrado', type: 'section'},
{cols: [{maxWidth: 15},
{view: 'richselect', id: 'lst_pac', name: 'lst_pac', width: 300,
label: 'PAC: ', value: 'default', required: false,
options: options_pac}, {view: 'label', label: 'NO cambies este valor, a menos que se te haya indicado'},
label: 'PAC: ', value: '', required: true,
labelAlign: 'right', options: options_pac}, {view: 'label',
label: ' NO cambies este valor, a menos que se te haya indicado'},
]},
{cols: [{maxWidth: 15},
{view: 'text', id: 'user_timbrado', name: 'user_timbrado',
label: 'Usuario: ', labelAlign: 'right', required: true},
{view: 'text', id: 'token_timbrado', name: 'token_timbrado',
label: 'Token: ', labelAlign: 'right', required: true},
]},
{cols: [{maxWidth: 15}, {},
{view: 'button', id: 'cmd_save_pac', label: 'Guardar',
autowidth: true, type: 'form'}, {},
]},
{maxHeight: 20},
{template: 'Ayudas varias', type: 'section'},
{cols: [{maxWidth: 15},
{view: 'checkbox', id: 'chk_config_anticipo', labelWidth: 0,

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:fn="http://www.w3.org/2005/xpath-functions" xmlns:servicioparcial="http://www.sat.gob.mx/servicioparcialconstruccion">
<xsl:stylesheet version="1.1" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:fn="http://www.w3.org/2005/xpath-functions" xmlns:servicioparcial="http://www.sat.gob.mx/servicioparcialconstruccion">
<xsl:template match="servicioparcial:parcialesconstruccion">
<!--Manejador de nodos tipo parcialesconstruccion-->
<xsl:call-template name="Requerido">