yasd/yasd.py

972 lines
30 KiB
Python
Executable File

#!/usr/bin/env python
# (c) 2023 perro <hi@perrotuerto.blog>.
# Founded by Mexican Academy of Language <https://academia.org.mx>.
# Licensed under GPLv3.
# Requirements: python > 3.10, pyyaml, lxml, bs4, xmlschema, rich
import sys
import yaml
import copy
import argparse
import xmlschema
import urllib.request
from pathlib import Path
from datetime import datetime
from bs4 import BeautifulSoup
from bs4.formatter import XMLFormatter
from rich.console import Console
from rich.markdown import Markdown
class YASD:
"""
YASD actions performer.
"""
def do(
action="check",
indata=None,
outfile=None,
tests=None,
quiet=False,
log=False,
stdout=False,
validate=False,
):
"""
Performs YASD actions directly.
Intented for YASDCLI, but can also be used programmatically.
:param str action: YASD action to perform; 'check' by default
:param indata: YASD input; 'None' by default
:type indata: None or Path or dict
:param outfile: YASD output file path; 'None' by default
:type outfile: None or Path
:param tests: XML tests; 'None' by default
:type tests: None or list<Path>
:param quiet: If messages are print or not; 'False' by default
:type quiet: True or False
:param log: If messages are write in a file or not; 'False' by default
:type log: True or False
:param stdout: if conversion goes to stdout or not; 'False' by default
:type stdout: True or False
:param validate: if XSD is validated; 'False' by default
:type validate: True or False
:return: Output data; str on 'document'; bs4.element.Tag on 'convert'
or 'sample'; YAML dict on 'check'
:rtype: str or bs4.element.Tag or dict
"""
yasd = YASD(indata, outfile, tests, quiet, log, stdout, validate)
yasd.msgr.run(f"action_{action}")
match action:
case "document":
return yasd.document()
case "convert":
return yasd.convert()
case "sample":
return yasd.sample()
case "check":
return yasd.yaml
def __init__(
self,
indata=None,
outfile=None,
tests=None,
quiet=False,
log=False,
stdout=False,
validate=False,
):
"""
Inits YASD object.
:param indata: YASD input; 'None' by default
:type indata: None or Path or dict
:param outfile: YASD output file path; 'None' by default
:type outfile: None or Path
:param tests: XML tests; 'None' by default
:type tests: None or list<Path>
:param quiet: If messages are print or not; 'False' by default
:type quiet: True or False
:param log: If messages are write in a file or not; 'False' by default
:type log: True or False
:param stdout: if conversion goes to stdout or not; 'False' by default
:type stdout: True or False
:param validate: if XSD is validated; 'False' by default
:type validate: True or False
"""
if outfile is None:
self.msgr = YASDMessenger(quiet, log)
self.outfile = None
else:
self.msgr = YASDMessenger(quiet, log, outfile.parent)
self.outfile = YASDCheck.file(outfile, self.msgr)
self.yaml = YASDCheck(indata, self.msgr).yaml
self.formatter = XMLFormatter(indent=2)
self.stdout = stdout
self.validate = validate
self.tests = set(tests) if tests is not None else None
def convert(self):
"""
Converts YASD to XSD.
:return: XSD element
:rtype: bs4.element.Tag
"""
self.xsd = YASDXSD(self.yaml, self.msgr).xsd
out = self.__output(self.xsd)
if self.validate:
return YASDCheck.xsd(out, self.tests, self.msgr)
else:
return {"xsd": out, "tests": []}
def sample(self):
"""
Generates XML sample from YASD.
:return: XML element
:rtype: bs4.element.Tag
"""
self.xml = YASDXML(self.yaml, self.msgr).xml
return self.__output(self.xml, extname=".xml")
def document(self):
"""
Generates RST documentation.
:return: RST document
:rtype: str
"""
self.rst = YASDRST(self.yaml, self.msgr).rst
return self.__output(self.rst, extname=".rst")
def __output(self, outdata="", extname=".xsd"):
"""
Prints in the terminal or writes into a file.
:param outdata: Data for output
:type outdata: bs4.BeautifulSoup or str
:param str extname: Extension name for file output
:return: Output data
:rtype: str
"""
if not isinstance(outdata, str):
outdata = outdata.prettify(formatter=self.formatter)
outdata = f'<?xml version="1.0"?>\n{outdata}'
if self.stdout:
if self.outfile is None:
sys.stdout.write(outdata)
else:
suffix = self.outfile.suffix
if len(suffix) > 0 and suffix == suffix.replace(" ", ""):
extname = suffix
filename = f"{self.outfile.stem}{extname}"
filename = Path(self.outfile.parent / filename)
filename.write_text(outdata)
return outdata
class YASDXSD:
"""
YASD convertor to XSD.
"""
# TODO refactor so it fits XSD grammar, cfr. reference:
# https://www.w3schools.com/xml/schema_elements_ref.asp
def __init__(self, yml=None, messenger=None):
"""
Inits YASD convertor.
"""
self.msgr = YASDCheck.messenger(messenger)
self.yaml = YASDCheck.yaml(yml, self.msgr)
self.xsd = BeautifulSoup(parser="xml")
self.__build_schema()
self.__build_elements()
self.__build_attributes()
self.__build_groups()
def __build_schema(self):
"""
Builds root node for XSD.
"""
for key in ["version", "schemaLocation"]:
del self.yaml["schema"][key]
schema = self.xsd.new_tag("schema", nsprefix="xs")
schema["xmlns:xs"] = "http://www.w3.org/2001/XMLSchema"
for key, val in self.yaml["schema"].items():
schema[key] = val
self.xsd.append(schema)
def __build_elements(self):
"""
Builds element nodes for XSD.
Element nodes can be simple or complex types.
"""
for el in self.yaml["elements"]:
el = self.__sanitize(el)
if el["type"] == "simple":
self.__build_simple(el)
else:
self.__build_complex(el)
def __build_attributes(self):
"""
Builds attribute nodes for XSD.
Attributes are always simple types.
"""
for el in self.yaml["attributeElements"]:
self.__build_simple(self.__sanitize(el), tag="attribute")
def __build_groups(self):
"""
Builds group nodes for XSD.
"""
for el in self.yaml["groups"]:
if "attribute_group" in el.keys():
# TODO build attributeGroup
...
else:
element = self.xsd.new_tag("group", nsprefix="xs")
element["name"] = el["name"]
indicator = self.__build_indicator(el)
element.append(indicator)
self.xsd.schema.append(element)
def __build_simple(self, el, tag="element"):
"""
Builds simple node for XSD.
:param dict el: YASD element
:param str tag: tag name for XSD node
"""
element = self.xsd.new_tag(tag, nsprefix="xs")
if "default" in el.keys() and "fixed" in el.keys():
del el["fixed"]
if "restriction" in el.keys() and "datatype" in el.keys():
del el["datatype"]
if "type" in el.keys() and tag == "element":
del el["type"]
for key, val in el.items():
if key == "datatype":
element["type"] = f"xs:{val}"
elif key == "restriction":
self.__build_restriction(element, val)
else:
element[key] = val
self.xsd.schema.append(element)
def __build_complex(self, el):
"""
Builds complex node for XSD.
:param dict el: YASD element
"""
element = self.__build_complex_root(el)
complex_type = self.__build_complex_type(el)
if "children" in el.keys():
if "group" in el["children"][0].keys():
self.__add_ref(complex_type, el)
self.__add_occurs(complex_type.group, el)
else:
complex_type.append(self.__build_indicator(el))
self.__add_ref(complex_type, el, is_attr=True)
element.append(complex_type)
self.xsd.schema.append(element)
def __build_complex_root(self, el):
"""
Builds root complex node for XSD.
:param dict el: YASD element
:return: root complex node
:rtype: bs4.element.Tag
"""
element = self.xsd.new_tag("element", nsprefix="xs")
element["name"] = el["name"]
return element
def __build_complex_type(self, el):
"""
Builds complex type node for XSD.
:param dict el: YASD element
:return: root complex node
:rtype: bs4.element.Tag
"""
container = self.xsd.new_tag("complexType", nsprefix="xs")
simple_content = self.__build_simple_content(el)
if simple_content is not None:
container.append(simple_content)
if el["type"] == "mixed":
container["mixed"] = "true"
return container
def __build_simple_content(self, el):
"""
Builds simple content node for XSD.
"""
simple_content = None
if el["type"] == "no_elements":
simple_content = self.xsd.new_tag("simpleContent", nsprefix="xs")
extension = self.xsd.new_tag("extension", nsprefix="xs")
extension["base"] = f"xs:{el['datatype']}"
self.__add_ref(extension, el, is_attr=True)
simple_content.append(extension)
return simple_content
def __build_restriction(self, root, restrs, simple=True):
"""
Builds restriction node for XSD.
:param bs4.element.Tag root: root node that requires restriction node
:param dict restrs: restrictions for root node
:param str container_tag: name of container tag for restriction
"""
if simple:
container = self.xsd.new_tag("simpleType", nsprefix="xs")
else:
container = self.xsd.new_tag("complexContent", nsprefix="xs")
restriction = self.xsd.new_tag("restriction", nsprefix="xs")
restriction["base"] = self.__get_base(restrs)
for restr in restrs:
for key, val in restr.items():
constrain = self.xsd.new_tag(key, nsprefix="xs", value=val)
restriction.append(constrain)
container.append(restriction)
root.append(container)
def __build_indicator(self, el):
"""
Builds indicator node for XSD.
:param dict el: YASD element
"""
other_tag = el["children_order"]
indicator = self.xsd.new_tag(other_tag, nsprefix="xs")
self.__add_ref(indicator, el)
return indicator
def __get_base(self, restrictions):
"""
Gets restriction data type.
It uses the first restriction to get the data type. A valid restriction
node always have the same data type for all its restrictions.
:param dict restrictions: restrictions as a dict
:return: 'xs:string' or 'xs:integer'
:rtype: str
"""
key = list(restrictions[0].keys())[0]
strings = "enumeration pattern whiteSpace length minLength maxLength"
if key in strings.split():
return "xs:string"
else:
return "xs:integer"
def __get_ref(self, el, is_attr):
"""
Gets required variables values for references.
:param dict el: YASD element
:param is_attr: if is and attribute reference
:type is_attr: True or False
"""
key, tag = "children", "element"
if is_attr:
key, tag = "attributes", "attribute"
if key in el.keys() and "group" in el[key][0].keys():
tag, name = "group", "group"
else:
name = "ref"
return key, tag, name
def __get_dict_el(self, mylist, key, val):
"""
Gets dict element inside a list
It gets list<el> if el dict key is equal to val.
For example, for l = [{"k": "v1"}, {"k": "v2"}]
self.__get_dict_el(l, "k", "v2") will return {"k": "v2"}
:param list mylist: list that contains the element
:param str el: element key
:param str val: element value
:return: element found
:rtype: dict
"""
return [el for el in mylist if el[key] == val][0]
def __add_ref(self, root, el, is_attr=False):
"""
Adds element or attribute references to root node.
:param bs4.element.Tag root: root node that requires references
:param dict el: YASD element
:param is_attr: if is an attribute reference; 'False' by default
:type is_attr: True or False
"""
key, tag, name = self.__get_ref(el, is_attr)
if key in el.keys():
for element in el[key]:
if key == "children" and tag == "group":
self.__add_group(el[key])
node = self.xsd.new_tag(tag, nsprefix="xs")
self.__add_occurs(node, element)
node["ref"] = element[name]
if "use" in element.keys():
node["use"] = element["use"]
root.append(node)
del el[key]
def __add_group(self, children):
"""
Adds groups dynamically
The key option 'remove' allows to add groups without a list of
elements. This allows to remove elements in groups for XMLSchema10
or avoids the use of 'assert' in XMLSchema11.
The key 'remove' is for this case in mind: you want to reuse a group
but without certain elements; for example, you want to reuse the group
'inlines' (which includes elements 'i', 'b'ā€¦) for the element 'b' but
without itself, so the syntax '<b><b>bold</b></b>' is invalid.
:param list children: element children
"""
groups = self.yaml["groups"]
for group in children:
if "remove" in group.keys():
name = group["group"]
original = self.__get_dict_el(groups, "name", name)
clone = copy.deepcopy(original)
name = f"dyn_{name}-%s" % "-".join(group["remove"])
group["group"] = clone["name"] = name
for unwanted in group["remove"]:
el = self.__get_dict_el(clone["children"], "ref", unwanted)
clone["children"].remove(el)
self.yaml["groups"].append(clone)
def __add_occurs(self, node, el):
"""
Adds occurrences to node.
:param bs4.element.Tag node: node that requires occurrences
:param dict el: element that indicates if there are occurrences
"""
if "minOccurs" in el.keys():
node["minOccurs"] = el["minOccurs"]
if "maxOccurs" in el.keys():
node["maxOccurs"] = el["maxOccurs"]
def __sanitize(self, el):
"""
Prepares element or attribute for conversion.
It eliminates 'description' key.
:param dict el: Element or attribute as a dictionary
:return: Sanitized element or attribute
:rtype: dict
"""
if "description" in el.keys():
del el["description"]
return el
class YASDXML:
"""
YASD sampler to XML.
"""
# TODO XML Sample
def __init__(self, yml=None, messenger=None):
"""
Inits YASD sampler.
"""
self.msgr = YASDCheck.messenger(messenger)
self.yaml = YASDCheck.yaml(yml, self.msgr)
self.xml = "XML sample"
class YASDRST:
"""
YASD document generator to RST.
"""
# TODO RST document
def __init__(self, yml=None, messenger=None):
"""
Inits YASD document generator.
"""
self.msgr = YASDCheck.messenger(messenger)
self.yaml = YASDCheck.yaml(yml, self.msgr)
self.rst = "RST document"
class YASDCheck:
"""
YASD validator.
Validates everything related to YASD classes.
"""
def messenger(msgr=None):
"""
Verifies if messenger was initialize.
:param messenger: Messenger object
:type messenger: None or YASDMessenger
"""
if msgr is None:
return YASDMessenger()
else:
return msgr
def yaml(yml=None, msgr=None):
"""
Verifies if yaml exists.
:param dict yml: YAML object
:param msgr: Messenger object
:type msgr: None or YASDMessenger
"""
msgr = YASDCheck.messenger(msgr)
if yml is None or type(yml) is not dict:
msgr.run("no_yaml", level="error")
else:
return yml
def file(filepath=None, msgr=None):
"""
Verifies if file exists.
:param filepath: File path
:type filepath: None or Path
:param msgr: Messenger object
:type msgr: None or YASDMessenger
"""
msgr = YASDCheck.messenger(msgr)
if type(filepath).__module__ != "pathlib":
msgr.run("no_input", level="error")
elif not filepath.exists() or not filepath.is_file():
msgr.run("invalid_input", level="error", file=filepath)
return filepath.resolve()
def url(key="readme", msgr=None):
"""
Verifies if remote file exists
:param str key: YASDMessenger string key
:param msgr: Messenger object
:type msgr: None or YASDMessenger
"""
# TODO if YASD becomes pip package, the fetched files should be local
# Remove urllib import if that is the case
msgr = YASDCheck.messenger(msgr)
try:
url = YASDMessenger.keys()[key]
msgr.run("fetching", url=url)
return urllib.request.urlopen(url).read().decode("utf-8")
except Exception:
msgr.run("no_url", level="error", url=url)
def xsd(xsd, tests=None, msgr=None):
"""
Validates XSD.
:param str xsd: XSD as XML string
:param tests: XML file paths
:type tests: None or set
:param msgr: Messenger object
:type msgr: None or YASDMessenger
:return: XSD as string and results as list
:rtype: dict
"""
msgr = YASDCheck.messenger(msgr)
msgr.run("validating")
try:
valid_xsd = xmlschema.XMLSchema(xsd)
tests = YASDCheck.xsd_test(valid_xsd, tests, msgr)
return {"xsd": xsd, "tests": tests}
except xmlschema.validators.exceptions.XMLSchemaParseError as error:
error = str(error).replace("\n", "\n ")
msgr.run("no_valid", error=error, level="error")
def xsd_test(xsd, tests=None, msgr=None):
"""
Test XSD against XML files.
:param xsd: XSD for testing
:type xsd: XMLSchema10 or str
:param tests: XML file paths
:type tests: None or set
:param msgr: Messenger object
:type msgr: None or YASDMessenger
:return: Tests results
:rtype: list
"""
results = []
if isinstance(xsd, str):
xsd = xmlschema.XMLSchema(xsd)
if isinstance(tests, set):
for test in tests:
result = YASDCheck.__valid_test_path(test)
if "error" not in result.keys():
result = YASDCheck.__valid_test_xml(xsd, result)
results.append(result)
YASDCheck.__print_tests(results, msgr)
return results
def __valid_test_path(filepath=""):
"""
Validates test path.
:param filepath: Test file path
:type filepath: Path or str
"""
result = {"path": filepath}
if isinstance(filepath, str):
filepath = Path(filepath)
if not filepath.exists() or not filepath.is_file():
if not filepath.exists():
error = YASDMessenger.keys()["no_exists"]
else:
error = YASDMessenger.keys()["no_file"]
result.setdefault("msg", error)
result.setdefault("error", True)
return result
def __valid_test_xml(xsd, result):
"""
Test XSD against XML file.
The result is a temporary dict with only 'path' as key.
:param XMLSchema10 xsd: XSD for testing
:param dict result: Temporary test result
:return: Test result
:rtype: dict
"""
try:
xsd.validate(result["path"])
result.setdefault("msg", YASDMessenger.keys()["passed"])
result.setdefault("error", False)
except Exception as error:
result.setdefault("msg", error.message)
result.setdefault("error", True)
return result
def __print_tests(results=[], msgr=None):
"""
Prints XSD test results.
:param list results: Tests results
:param msgr: Messenger object
:type msgr: None or YASDMessenger
"""
for result in results:
level = "warn" if result["error"] else "info"
res = "FAILED" if result["error"] else "PASSED"
expectation = YASDCheck.__get_expectation(result["path"])
msgr.run("testing", path=result["path"])
msgr.run(
"test_result",
level=level,
path=result["path"],
expectation=expectation,
res=res,
msg=result["msg"],
)
def __get_expectation(filepath=""):
"""
Gets XML file expected result.
:param filepath: Test file path
:type filepath: Path or str
:return: 'PASSED' or 'FAILED' or '???'
:rtype: str
"""
if isinstance(filepath, str):
filepath = Path(filepath)
name = filepath.stem
if name.find("pass") >= 0:
return "PASSED"
elif name.find("fail") >= 0:
return "FAILED"
else:
return "???"
def __init__(self, indata=None, messenger=None):
"""
Inits YASD validator.
:param indata: YASD input
:type indata: None or Path or dict
:param messenger: Object for print or save messages
:type messenger: None or YASDMessenger
"""
self.msgr = YASDCheck.messenger(messenger)
if type(indata) is dict:
self.yaml = indata
else:
self.yaml = self.parse_file(YASDCheck.file(indata, self.msgr))
self.check_structure()
def parse_file(self, filepath):
"""
Attempts YASD file parsing.
:param filepath: YASD file path
:type filepath: Path
"""
raw = filepath.read_text(encoding="utf8")
try:
return yaml.safe_load(raw)
except yaml.YAMLError:
self.msgr.run("invalid_yaml", level="error")
def check_structure(self):
"""
Verifies YASD structure.
:return: YASD structure
:rtype: dict
"""
# TODO extra checks for self.yaml
...
class YASDMessenger:
"""
YASD printer or writer.
"""
def keys():
"""
Messages keys dictionary.
"""
# TODO internationalization with: https://github.com/sectasy0/pyi18n
return {
"prog": "yasd",
"description": """
YASD, Yet Another Schema Definition. YASD is a YAML format for
human writable XSDs (XML Schema Definition), humans declare what is
indispensable, leaving the machines to do the rest of the
unreadable <syntaxis who_can_read_this="?" />.
""",
"epilog": """
(c) 2023 perro <hi@perrotuerto.blog>. Founded by Mexican Academy of
Language <https://academia.org.mx>. Licensed under GPLv3.
""",
"readme": "".join(
[
"https://gitlab.com/perrotuerto_personal/codigo/yasd/",
"-/raw/no-masters/README.md",
]
),
"w3": "https://www.w3.org/2001/XMLSchema.xsd",
"help_action": "action to perform",
"help_input": "input file in YAML format",
"help_output": "output file",
"help_tests": """
one or more XML test files;
use 'pass' or 'fail' as file name prefix for expectation
""",
"help_quiet": "enable quiet mode",
"help_log": "write log",
"action_convert": "creating XSD schema",
"action_check": "checking YASD structure",
"action_sample": "creating XML sample",
"action_document": "creating RST documentation",
"invalid_level": "invalid log level '@lvl'",
"invalid_input": "invalid file '@file'",
"invalid_yaml": "invalid YAML structure",
"no_url": "failed to fetch '@url'",
"no_input": "input file needed",
"no_yaml": "YAML dict needed",
"no_valid": "XSD schema has the following error:\n @error",
"no_exists": "File doesn't exists",
"no_file": "Path isn't a file",
"fetching": "fetching '@url'",
"validating": "validating XSD schema",
"testing": "testing XSD against '@path'",
"test_result": "\n".join(
[
"test output:",
" File: @path",
" Expectation: @expectation",
" Result: @res",
" Message: @msg",
]
),
"passed": "Test passed!",
}
def __init__(self, quiet=False, log=False, logpath=Path.cwd()):
"""
Inits YASD Messenger.
"""
self.quiet = quiet
self.log = log
self.logfile = logpath / "log.txt"
self.timestamp = "[%s]" % datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
def run(self, key="", level="info", **kwargs):
"""
Prints or writes messages.
'**kwargs' are the keys for message text replacements.
:param str key: Message key
:param str level: Log level; 'info' by default
"""
self.__check_level(level)
name = YASDMessenger.keys()["prog"]
msg = self.__get_msg(key, **kwargs)
msg = f"{name}: {level}: {msg}\n"
if not self.quiet:
sys.stdout.write(msg)
if self.log:
self.__write(msg)
if level in ["error", "fatal"]:
sys.exit(1)
def __write(self, msg):
"""
Writes log file.
:param str msg: Output message
"""
if self.logfile.parent.exists():
if self.logfile.exists():
file = open(self.logfile, "a")
else:
file = open(self.logfile, "w")
file.write(f"{self.timestamp} {msg}")
file.close()
def __check_level(self, level):
"""
Verifies log level.
Prints warning if log level doesn't exist.
:param str level: Log level
"""
if level not in ["trace", "debug", "info", "warn", "error", "fatal"]:
YASDMessenger().run("invalid_level", level="warn", lvl=level)
def __get_msg(self, key, **kwargs):
"""
Gets message based on key.
'**kwargs' are the keys for message text replacements.
:param str key: Message key
:return: Message or key if message key doesn't exist.
:rtype: str
"""
if key in YASDMessenger.keys().keys():
msg = YASDMessenger.keys()[key]
for key, value in kwargs.items():
msg = msg.replace(f"@{key}", str(value))
return msg
else:
return key
class YASDCLI:
"""
YASD command-line interface.
"""
def print_man():
"""
Prints README as manual.
"""
raw = YASDCheck.url()
raw = raw.replace("## Table of Contents\n\n[TOC]\n\n", "")
md = Markdown(raw)
console = Console()
with console.pager(styles=True):
console.print(md)
def __init__(self):
"""
Inits YASD CLI.
"""
self.__init_parser()
args = self.parser.parse_args()
if args.action == "man":
YASDCLI.print_man()
else:
YASD.do(
args.action,
args.input,
args.output,
args.tests,
args.quiet,
args.log,
stdout=True,
validate=True,
)
def __init_parser(self):
"""
Inits argument parser.
"""
msg = YASDMessenger.keys()
self.parser = argparse.ArgumentParser(
prog=msg["prog"],
description=msg["description"],
epilog=msg["epilog"],
)
self.parser.add_argument(
"action",
choices=["convert", "check", "sample", "document", "man"],
help=msg["help_action"],
)
self.parser.add_argument(
"input",
type=Path,
nargs="?",
default=None,
help=msg["help_input"],
)
self.parser.add_argument(
"-q", "--quiet", action="store_true", help=msg["help_quiet"]
)
self.parser.add_argument(
"-l", "--log", action="store_true", help=msg["help_log"]
)
self.parser.add_argument(
"-o",
"--output",
type=Path,
default=None,
help=msg["help_output"],
)
self.parser.add_argument(
"-t",
"--tests",
type=Path,
default=None,
help=msg["help_tests"],
action="extend",
nargs="+",
)
if __name__ == "__main__":
YASDCLI()