#!/usr/bin/env python3 # ~ Empresa Libre # ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net) # ~ # ~ This program is free software: you can redistribute it and/or modify # ~ it under the terms of the GNU General Public License as published by # ~ the Free Software Foundation, either version 3 of the License, or # ~ (at your option) any later version. # ~ # ~ This program is distributed in the hope that it will be useful, # ~ but WITHOUT ANY WARRANTY; without even the implied warranty of # ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # ~ GNU General Public License for more details. # ~ # ~ You should have received a copy of the GNU General Public License # ~ along with this program. If not, see . import base64 import collections import datetime import math import smtplib import zipfile from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email import encoders from email.utils import formatdate from io import BytesIO import lxml.etree as ET import requests from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from dateutil import parser TIMEOUT = 10 #~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37 class CaseInsensitiveDict(collections.MutableMapping): """ A case-insensitive ``dict``-like object. Implements all methods and operations of ``collections.MutableMapping`` as well as dict's ``copy``. Also provides ``lower_items``. All keys are expected to be strings. The structure remembers the case of the last key to be set, and ``iter(instance)``, ``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()`` will contain case-sensitive keys. However, querying and contains testing is case insensitive: cid = CaseInsensitiveDict() cid['Accept'] = 'application/json' cid['aCCEPT'] == 'application/json' # True list(cid) == ['Accept'] # True For example, ``headers['content-encoding']`` will return the value of a ``'Content-Encoding'`` response header, regardless of how the header name was originally stored. If the constructor, ``.update``, or equality comparison operations are given keys that have equal ``.lower()``s, the behavior is undefined. """ def __init__(self, data=None, **kwargs): self._store = dict() if data is None: data = {} self.update(data, **kwargs) def __setitem__(self, key, value): # Use the lowercased key for lookups, but store the actual # key alongside the value. self._store[key.lower()] = (key, value) def __getitem__(self, key): return self._store[key.lower()][1] def __delitem__(self, key): del self._store[key.lower()] def __iter__(self): return (casedkey for casedkey, mappedvalue in self._store.values()) def __len__(self): return len(self._store) def lower_items(self): """Like iteritems(), but with all lowercase keys.""" return ( (lowerkey, keyval[1]) for (lowerkey, keyval) in self._store.items() ) def __eq__(self, other): if isinstance(other, collections.Mapping): other = CaseInsensitiveDict(other) else: return NotImplemented # Compare insensitively return dict(self.lower_items()) == dict(other.lower_items()) # Copy is required def copy(self): return CaseInsensitiveDict(self._store.values()) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, dict(self.items())) class SendMail(object): def __init__(self, config): self._config = config self._server = None self._error = '' self._is_connect = self._login() @property def is_connect(self): return self._is_connect @property def error(self): return self._error def _login(self): hosts = ('gmail' in self._config['server'] or 'outlook' in self._config['server']) try: if self._config['ssl'] and hosts: self._server = smtplib.SMTP( self._config['server'], self._config['port'], timeout=TIMEOUT) self._server.ehlo() self._server.starttls() self._server.ehlo() elif self._config['ssl']: self._server = smtplib.SMTP_SSL( self._config['server'], self._config['port'], timeout=TIMEOUT) self._server.ehlo() else: self._server = smtplib.SMTP( self._config['server'], self._config['port'], timeout=TIMEOUT) self._server.login(self._config['user'], self._config['pass']) return True except smtplib.SMTPAuthenticationError as e: if '535' in str(e): self._error = 'Nombre de usuario o contraseña inválidos' return False if '534' in str(e) and 'gmail' in self._config['server']: self._error = 'Necesitas activar el acceso a otras ' \ 'aplicaciones en tu cuenta de GMail' return False except smtplib.SMTPException as e: self._error = str(e) return False except Exception as e: self._error = str(e) return False return def send(self, options): try: message = MIMEMultipart() message['From'] = self._config['user'] message['To'] = options['to'] message['CC'] = options.get('copy', '') message['Subject'] = options['subject'] message['Date'] = formatdate(localtime=True) if options.get('confirm', False): message['Disposition-Notification-To'] = message['From'] message.attach(MIMEText(options['message'], 'html')) for f in options.get('files', ()): part = MIMEBase('application', 'octet-stream') if isinstance(f[0], str): part.set_payload(f[0].encode('utf-8')) else: part.set_payload(f[0]) encoders.encode_base64(part) part.add_header( 'Content-Disposition', "attachment; filename={}".format(f[1])) message.attach(part) receivers = options['to'].split(',') + message['CC'].split(',') self._server.sendmail( self._config['user'], receivers, message.as_string()) return '' except Exception as e: return str(e) def close(self): try: self._server.quit() except: pass return class CfdiToDict(object): NS = { 'cfdi': 'http://www.sat.gob.mx/cfd/3', 'divisas': 'http://www.sat.gob.mx/divisas', } def __init__(self, xml): self._values = {} self._root = ET.parse(BytesIO(xml.encode())).getroot() self._get_values() @property def values(self): return self._values def _get_values(self): self._complementos() return def _complementos(self): complemento = self._root.xpath('//cfdi:Complemento', namespaces=self.NS)[0] divisas = complemento.xpath('//divisas:Divisas', namespaces=self.NS) if divisas: d = CaseInsensitiveDict(divisas[0].attrib) d.pop('version', '') self._values.update({'divisas': d}) return def send_mail(data): msg = '' ok = True server = SendMail(data['server']) if server.is_connect: msg = server.send(data['mail']) else: msg = server.error ok = False server.close() return {'ok': ok, 'msg': msg} def round_up(value): return int(math.ceil(value)) def _get_key(password): digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) digest.update(password.encode()) key = base64.urlsafe_b64encode(digest.finalize()) return key def encrypt(data, password): f = Fernet(_get_key(password)) return f.encrypt(data.encode()).decode() def decrypt(data, password): f = Fernet(_get_key(password)) return f.decrypt(data.encode()).decode() def to_bool(value): return bool(int(value)) def get_url(url): r = requests.get(url).text return r def parse_date(value, next_day=False): d = parser.parse(value) if next_day: return d + datetime.timedelta(days=1) return d def to_zip(*files): zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: for data, file_name in files: zip_file.writestr(file_name, data) return zip_buffer.getvalue()