Crear, obtener y eliminar cliente

This commit is contained in:
Mauricio Baeza 2021-04-28 23:37:11 -05:00
parent 8da88cda1c
commit bb3e921a54
22 changed files with 769 additions and 0 deletions

View File

@ -2,3 +2,4 @@ xmlsec
cryptography
lxml
httpx
django

0
source/api/__init__.py Normal file
View File

8
source/api/admin.py Normal file
View File

@ -0,0 +1,8 @@
from django.contrib import admin
from .models import Clients
@admin.register(Clients)
class AdminClients(admin.ModelAdmin):
actions_on_top = True

6
source/api/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class ApiConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'api'

View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
# ~ Establece un token personalizado para encriptar las claves
# ~ from secrets import token_hex
# ~ token_hex(32)
TOKEN = ''
# ~ Token maestro
API_TOKEN = ''

View File

@ -0,0 +1,33 @@
# Generated by Django 3.2 on 2021-04-28 18:42
import api.models
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Clients',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('rfc', api.models.RFCField(max_length=13, unique=True, verbose_name='RFC')),
('token', models.CharField(max_length=100, verbose_name='Token')),
('key', models.TextField(blank=True, default='', verbose_name='Key')),
('cer', models.TextField(blank=True, default='', verbose_name='Cer')),
('serial_number', models.CharField(blank=True, default='', max_length=100, verbose_name='Fiel Serie')),
('date_from', models.DateTimeField(blank=True, null=True, verbose_name='Desde')),
('date_to', models.DateTimeField(blank=True, null=True, verbose_name='Hasta')),
],
options={
'verbose_name': 'Cliente',
'verbose_name_plural': 'Clientes',
'ordering': ['rfc'],
},
),
]

View File

62
source/api/models.py Normal file
View File

@ -0,0 +1,62 @@
import re
from datetime import datetime
from django.db import models
from django.core.validators import MinLengthValidator
from django.core.exceptions import ValidationError
def validate_rfc(value):
l = 4
if len(value)==12:
l = 3
s = value[0:l]
r = re.match('[A-ZÑ&]{%s}' % l, s)
if not r:
raise ValidationError('Caracteres inválidos al inicio del RFC')
s = value[-3:]
r = re.match('[A-Z0-9]{3}', s)
if not r:
raise ValidationError('Caracteres inválidos al final del RFC')
s = value[l:l+6]
r = re.match('[0-9]{6}', s)
msg = 'Fecha inválida en el RFC'
if not r:
raise ValidationError(msg)
try:
datetime.strptime(s,"%y%m%d")
except:
raise ValidationError(msg)
class RFCField(models.CharField):
description = 'Field to RFC of México'
default_validators = [MinLengthValidator(12), validate_rfc]
def __init__(self, *args, **kwargs):
kwargs['max_length'] = 13
super().__init__(*args, **kwargs)
def to_python(self, value):
return value.upper()
class Clients(models.Model):
rfc = RFCField('RFC', unique=True)
token = models.CharField('Token', max_length=100)
key = models.TextField('Key', default='', blank=True)
cer = models.TextField('Cer', default='', blank=True)
serial_number = models.CharField('Fiel Serie', default='', blank=True, max_length=100)
date_from = models.DateTimeField('Desde', null=True, blank=True)
date_to = models.DateTimeField('Hasta', null=True, blank=True)
class Meta:
ordering = ['rfc']
verbose_name = 'Cliente'
verbose_name_plural = 'Clientes'
def __str__(self):
return self.rfc

3
source/api/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

10
source/api/urls.py Normal file
View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
from django.urls import path
from api.views import ViewClients, ViewCfdi
urlpatterns = [
path('clients/', ViewClients.as_view()),
path('cfdi/', ViewCfdi.as_view()),
]

View File

@ -0,0 +1,255 @@
#!/usr/bin/env python3
import argparse
import base64
import datetime
import getpass
from pathlib import Path
import xmlsec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from ..conf import TOKEN
class SATCertificate(object):
def __init__(self, cer=b'', key=b'', password=''):
self._error = ''
self._init_values()
self._get_data_cer(cer)
self._get_data_key(key, password)
def _init_values(self):
self._rfc = ''
self._serial_number = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key_enc = b''
self._p12 = b''
self._cer_modulus = 0
self._key_modulus = 0
return
def __str__(self):
msg = '\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number.encode())
digest.update(TOKEN.encode())
return digest.finalize()
def _get_data_cer(self, cer):
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_data_key(self, key, password):
self._key_enc = key
if not key or not password:
return
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
p = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(p)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def _get_key(self, password):
if not password:
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key_enc, password=password, backend=default_backend())
return private_key
def _get_key_pem(self):
obj = self._get_key('')
key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return key_pem
# Not work
def _get_p12(self):
obj = serialization.pkcs12.serialize_key_and_certificates('test',
self.key_pem, self.cer_pem, None,
encryption_algorithm=serialization.NoEncryption()
)
return obj
def sign(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(firma).decode()
def sign_xml(self, tree):
node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
ctx.key = key
ctx.sign(node)
node = xmlsec.tree.find_node(tree, 'X509Certificate')
node.text = self.cer_txt
return tree
@property
def rfc(self):
return self._rfc
@property
def serial_number(self):
return self._serial_number
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._get_key_pem()
@property
def key_enc(self):
return self._key_enc
@property
def p12(self):
return self._get_p12()
@property
def error(self):
return self._error
def main(args):
contra = getpass.getpass('Introduce la contraseña del archivo KEY: ')
#contra = '12345678a'
if not contra.strip():
msg = 'La contraseña es requerida'
print(msg)
return
path_cer = Path(args.cer)
path_key = Path(args.key)
if not path_cer.is_file():
msg = 'El archivo CER es necesario'
print(msg)
return
if not path_key.is_file():
msg = 'El archivo KEY es necesario'
print(msg)
return
cer = path_cer.read_bytes()
key = path_key.read_bytes()
cert = SATCertificate(cer, key, contra)
if cert.error:
print(cert.error)
else:
print(cert)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(description='CFDI Certificados')
help = 'Archivo CER'
parser.add_argument('-c', '--cer', help=help, default='')
help = 'Archivo KEY'
parser.add_argument('-k', '--key', help=help, default='')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

54
source/api/util/util.py Normal file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
from secrets import token_hex
from ..conf import API_TOKEN
from .cfdi_cert import SATCertificate
def validate_token(token):
return token == API_TOKEN
def validate_client(post, files):
rfc = post.get('rfc', '')
if not rfc:
msg = 'El RFC es requerido'
return False, msg
contra = post.get('password', '')
if not contra:
msg = 'La contraseña es requerida'
return False, msg
file_cer = files.get('cer', b'')
if not file_cer:
msg = 'El archivo CER es requerido'
return False, msg
file_key = files.get('key', b'')
if not file_key:
msg = 'El archivo KEY es requerido'
return False, msg
cert = SATCertificate(file_cer.read(), file_key.read(), contra)
if not cert.is_valid:
return False, cert.error
if cert.is_fiel:
msg = 'El certificado es FIEL'
return False, msg
if rfc.upper() != cert.rfc:
msg = 'El certificado no corresponde al RFC'
return False, msg
data = {
'rfc': rfc,
'token': token_hex(32),
'key': cert.key_enc,
'cer': cert.cer_pem,
'serial_number': cert.serial_number,
'date_from': cert.not_before,
'date_to': cert.not_after,
}
return True, data

69
source/api/views.py Normal file
View File

@ -0,0 +1,69 @@
from django.http import HttpResponse, HttpResponseNotFound
from django.http import JsonResponse
from django.views import View
from .util import util
from .models import Clients
def _validate_token(request):
if not 'Token' in request.headers:
return False
token = request.headers['Token']
if not util.validate_token(token):
return False
return True
class ViewClients(View):
def get(self, request):
if not _validate_token(request):
return HttpResponse(status=401)
rfc = request.GET['rfc']
try:
obj = Clients.objects.filter(rfc=rfc).values()[0]
except IndexError:
msg = {'error': 'Cliente no existe'}
return JsonResponse(msg, safe=False, status=202)
del obj['key']
del obj['cer']
return JsonResponse(obj, safe=False)
def post(self, request):
if not _validate_token(request):
return HttpResponse(status=401)
post = request.POST
files = request.FILES
result, data = util.validate_client(post, files)
if not result:
return HttpResponse(data, status=202)
if Clients.objects.filter(rfc=data['rfc']).exists():
msg = 'Cliente ya existe'
return HttpResponse(msg, status=202)
obj = Clients.objects.create(**data)
return HttpResponse(status=201)
def delete(self, request, *args, **kwargs):
if not _validate_token(request):
return HttpResponse(status=401)
rfc = request.GET['rfc']
obj = Clients.objects.get(rfc=rfc)
obj.delete()
return HttpResponse()
class ViewCfdi(View):
def get(self, request):
return HttpResponse('ok')

View File

16
source/cfditimbra/asgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
ASGI config for cfditimbra project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cfditimbra.settings')
application = get_asgi_application()

View File

@ -0,0 +1,126 @@
"""
Django settings for cfditimbra project.
Generated by 'django-admin startproject' using Django 3.2.
For more information on this file, see
https://docs.djangoproject.com/en/3.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.2/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-0(14)u68lkquc^pw+dpq^_^q_*uh+ho&g087)88#aq_ms=vue#'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'api',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
# ~ 'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'cfditimbra.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'cfditimbra.wsgi.application'
# Database
# https://docs.djangoproject.com/en/3.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/3.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/3.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.2/howto/static-files/
STATIC_URL = '/static/'
# Default primary key field type
# https://docs.djangoproject.com/en/3.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

22
source/cfditimbra/urls.py Normal file
View File

@ -0,0 +1,22 @@
"""cfditimbra URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/3.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import include, path
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('api.urls')),
]

16
source/cfditimbra/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for cfditimbra project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cfditimbra.settings')
application = get_wsgi_application()

22
source/manage.py Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cfditimbra.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

Binary file not shown.

Binary file not shown.

56
source/tests/tests.py Normal file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
import unittest
import httpx
URL_API = 'http://127.0.0.1:8000/api/{}'
PATH_CERT = 'certificados/comercio.{}'
class TestClients(unittest.TestCase):
def setUp(self):
self.url = URL_API.format('clients/')
def test_unauthorized_without_token(self):
expected = 401
result = httpx.post(self.url)
self.assertEqual(expected, result.status_code)
def test_unauthorized_with_token(self):
expected = 401
result = httpx.post(self.url, headers={'Token': '123'})
self.assertEqual(expected, result.status_code)
def test_01_add_client(self):
expected = 201
headers = {'Token': '12345'}
data = {
'rfc': 'EKU9003173C9',
'password': '12345678a',
}
files = {
'cer': open(PATH_CERT.format('cer'), 'rb'),
'key': open(PATH_CERT.format('key'), 'rb'),
}
result = httpx.post(self.url, headers=headers, data=data, files=files)
self.assertEqual(expected, result.status_code)
def test_02_get_client(self):
expected = 200
headers = {'Token': '12345'}
params = {'rfc': 'EKU9003173C9'}
result = httpx.get(self.url, headers=headers, params=params)
self.assertEqual(expected, result.status_code)
def test_03_delete_client(self):
expected = 200
headers = {'Token': '12345'}
params = {'rfc': 'EKU9003173C9'}
result = httpx.delete(self.url, headers=headers, params=params)
self.assertEqual(expected, result.status_code)
if __name__ == '__main__':
unittest.main()