empresa-libre/source/app/controllers/utils.py

296 lines
8.9 KiB
Python
Raw Normal View History

2019-02-03 23:32:48 -06:00
#!/usr/bin/env python3
# ~ Empresa Libre
# ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net)
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
2019-02-15 14:38:41 -06:00
import collections
2019-02-15 17:51:13 -06:00
import datetime
2019-02-03 23:32:48 -06:00
import math
2019-02-05 22:12:19 -06:00
import smtplib
2019-02-17 13:29:10 -06:00
import zipfile
2019-02-03 23:32:48 -06:00
2019-02-05 22:12:19 -06:00
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from email.utils import formatdate
2019-02-15 14:38:41 -06:00
from io import BytesIO
2019-02-05 22:12:19 -06:00
2019-02-15 17:51:13 -06:00
import lxml.etree as ET
2019-02-05 22:12:19 -06:00
import requests
2019-02-15 17:51:13 -06:00
2019-02-03 23:32:48 -06:00
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
2019-02-15 17:51:13 -06:00
from dateutil import parser
2019-02-03 23:32:48 -06:00
2019-02-05 22:12:19 -06:00
TIMEOUT = 10
2019-02-15 14:38:41 -06:00
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
class CaseInsensitiveDict(collections.MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of
``collections.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
will contain case-sensitive keys. However, querying and contains
testing is case insensitive:
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
cid['aCCEPT'] == 'application/json' # True
list(cid) == ['Accept'] # True
For example, ``headers['content-encoding']`` will return the
value of a ``'Content-Encoding'`` response header, regardless
of how the header name was originally stored.
If the constructor, ``.update``, or equality comparison
operations are given keys that have equal ``.lower()``s, the
behavior is undefined.
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
if data is None:
data = {}
self.update(data, **kwargs)
def __setitem__(self, key, value):
# Use the lowercased key for lookups, but store the actual
# key alongside the value.
self._store[key.lower()] = (key, value)
def __getitem__(self, key):
return self._store[key.lower()][1]
def __delitem__(self, key):
del self._store[key.lower()]
def __iter__(self):
return (casedkey for casedkey, mappedvalue in self._store.values())
def __len__(self):
return len(self._store)
def lower_items(self):
"""Like iteritems(), but with all lowercase keys."""
return (
(lowerkey, keyval[1])
for (lowerkey, keyval)
in self._store.items()
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
other = CaseInsensitiveDict(other)
else:
return NotImplemented
# Compare insensitively
return dict(self.lower_items()) == dict(other.lower_items())
# Copy is required
def copy(self):
return CaseInsensitiveDict(self._store.values())
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, dict(self.items()))
2019-02-05 22:12:19 -06:00
class SendMail(object):
def __init__(self, config):
self._config = config
self._server = None
self._error = ''
self._is_connect = self._login()
@property
def is_connect(self):
return self._is_connect
@property
def error(self):
return self._error
def _login(self):
hosts = ('gmail' in self._config['server'] or
'outlook' in self._config['server'])
try:
if self._config['ssl'] and hosts:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
self._server.starttls()
self._server.ehlo()
elif self._config['ssl']:
self._server = smtplib.SMTP_SSL(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
else:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.login(self._config['user'], self._config['pass'])
return True
except smtplib.SMTPAuthenticationError as e:
if '535' in str(e):
self._error = 'Nombre de usuario o contraseña inválidos'
return False
if '534' in str(e) and 'gmail' in self._config['server']:
self._error = 'Necesitas activar el acceso a otras ' \
'aplicaciones en tu cuenta de GMail'
return False
except smtplib.SMTPException as e:
self._error = str(e)
return False
except Exception as e:
self._error = str(e)
return False
return
def send(self, options):
try:
message = MIMEMultipart()
message['From'] = self._config['user']
message['To'] = options['to']
message['CC'] = options.get('copy', '')
message['Subject'] = options['subject']
message['Date'] = formatdate(localtime=True)
if options.get('confirm', False):
message['Disposition-Notification-To'] = message['From']
message.attach(MIMEText(options['message'], 'html'))
for f in options.get('files', ()):
part = MIMEBase('application', 'octet-stream')
if isinstance(f[0], str):
part.set_payload(f[0].encode('utf-8'))
else:
part.set_payload(f[0])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
"attachment; filename={}".format(f[1]))
message.attach(part)
receivers = options['to'].split(',') + message['CC'].split(',')
self._server.sendmail(
self._config['user'], receivers, message.as_string())
return ''
except Exception as e:
return str(e)
def close(self):
try:
self._server.quit()
except:
pass
return
2019-02-15 14:38:41 -06:00
class CfdiToDict(object):
NS = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'divisas': 'http://www.sat.gob.mx/divisas',
}
def __init__(self, xml):
self._values = {}
self._root = ET.parse(BytesIO(xml.encode())).getroot()
self._get_values()
@property
def values(self):
return self._values
def _get_values(self):
self._complementos()
return
def _complementos(self):
complemento = self._root.xpath('//cfdi:Complemento', namespaces=self.NS)[0]
divisas = complemento.xpath('//divisas:Divisas', namespaces=self.NS)
if divisas:
d = CaseInsensitiveDict(divisas[0].attrib)
d.pop('version', '')
self._values.update({'divisas': d})
return
2019-02-05 22:12:19 -06:00
def send_mail(data):
msg = ''
ok = True
server = SendMail(data['server'])
if server.is_connect:
msg = server.send(data['mail'])
else:
msg = server.error
ok = False
server.close()
return {'ok': ok, 'msg': msg}
2019-02-03 23:32:48 -06:00
def round_up(value):
return int(math.ceil(value))
def _get_key(password):
2019-02-04 22:13:11 -06:00
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(password.encode())
key = base64.urlsafe_b64encode(digest.finalize())
2019-02-03 23:32:48 -06:00
return key
2019-02-04 22:13:11 -06:00
def encrypt(data, password):
2019-02-03 23:32:48 -06:00
f = Fernet(_get_key(password))
return f.encrypt(data.encode()).decode()
2019-02-04 22:13:11 -06:00
def decrypt(data, password):
2019-02-03 23:32:48 -06:00
f = Fernet(_get_key(password))
return f.decrypt(data.encode()).decode()
2019-02-05 22:12:19 -06:00
def to_bool(value):
return bool(int(value))
def get_url(url):
r = requests.get(url).text
return r
2019-02-15 14:38:41 -06:00
2019-02-15 17:51:13 -06:00
def parse_date(value, next_day=False):
d = parser.parse(value)
if next_day:
return d + datetime.timedelta(days=1)
return d
2019-02-17 13:29:10 -06:00
2019-02-17 22:12:01 -06:00
def to_zip(files):
2019-02-17 13:29:10 -06:00
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
2019-02-17 22:12:01 -06:00
for file_name, data in files.items():
2019-02-17 13:29:10 -06:00
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()