diff --git a/.gitignore b/.gitignore index 7857b93..36ffcdb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ examples/* !examples/*.py +!examples/*.json *.so *.o diff --git a/doc/API.rst b/doc/API.rst index 561a013..082c59f 100644 --- a/doc/API.rst +++ b/doc/API.rst @@ -1,8 +1,59 @@ .. _pyXDSM_API: pyXDSM API +========== + +XDSM Class ---------- -.. currentmodule:: pyxdsm.XDSM -.. autoclass:: pyxdsm.XDSM.XDSM +The main XDSM class for creating Extended Design Structure Matrix diagrams. + +.. autopydantic_model:: pyxdsm.XDSM.XDSM + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource + :exclude-members: model_config, model_fields, model_computed_fields + +Pydantic Models +--------------- + +SystemNode +^^^^^^^^^^ + +.. autopydantic_model:: pyxdsm.XDSM.SystemNode + :members: + :undoc-members: + :show-inheritance: + +ConnectionEdge +^^^^^^^^^^^^^^ + +.. autopydantic_model:: pyxdsm.XDSM.ConnectionEdge + :members: + :undoc-members: + :show-inheritance: + +OutputNode +^^^^^^^^^^ + +.. autopydantic_model:: pyxdsm.XDSM.OutputNode + :members: + :undoc-members: + :show-inheritance: + +ProcessChain +^^^^^^^^^^^^ + +.. autopydantic_model:: pyxdsm.XDSM.ProcessChain + :members: + :undoc-members: + :show-inheritance: + +AutoFadeConfig +^^^^^^^^^^^^^^ + +.. autopydantic_model:: pyxdsm.XDSM.AutoFadeConfig :members: + :undoc-members: + :show-inheritance: diff --git a/doc/conf.py b/doc/conf.py index 674e5ae..9575010 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -20,8 +20,42 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions.extend(["numpydoc"]) +extensions.extend([ + "numpydoc", + "sphinx.ext.intersphinx", + "sphinxcontrib.autodoc_pydantic", +]) numpydoc_show_class_members = False +# -- intersphinx configuration ------------------------------------------------ + +intersphinx_mapping = { + "python": ("https://docs.python.org/3", None), + "pydantic": ("https://docs.pydantic.dev/latest/", None), +} + +# -- autodoc_pydantic configuration ------------------------------------------- + +# Show all configuration options for Pydantic models +autodoc_pydantic_model_show_json = False +autodoc_pydantic_model_show_config_summary = False +autodoc_pydantic_model_show_config_member = False +autodoc_pydantic_model_show_validator_members = False +autodoc_pydantic_model_show_validator_summary = False +autodoc_pydantic_model_show_field_summary = True +autodoc_pydantic_model_members = True +autodoc_pydantic_model_undoc_members = True + +# Settings for fields +autodoc_pydantic_field_list_validators = False +autodoc_pydantic_field_doc_policy = "both" # Show both docstring and description +autodoc_pydantic_field_show_constraints = True +autodoc_pydantic_field_show_alias = True +autodoc_pydantic_field_show_default = True + +# Validator settings +autodoc_pydantic_validator_replace_signature = True +autodoc_pydantic_validator_list_fields = True + # mock import for autodoc autodoc_mock_imports = ["numpy"] diff --git a/doc/examples.rst b/doc/examples.rst index a037520..e3b0d3c 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -15,6 +15,42 @@ This will output ``mdf.tex``, a standalone tex document that (by default) is als :scale: 30 +This example uses the `.to_json` method to serialize the XDSM to a JSON file: + +.. literalinclude:: ../examples/mdf.json + +This can be loaded programmatically using the static :meth:`~pyxdsm.XDSM.XDSM.from_json` method. +Alternatively a command-line tool can be used to write the JSON to a PDF, a tikz file, or another JSON file. + +Command-line Usage +------------------ + +The JSON file can be used directly from the command line: + +.. code-block:: bash + + python -m pyxdsm mdf.json -o mdf.pdf + +This generates a PDF from the JSON specification. Other output formats are also supported: + +.. code-block:: bash + + # Generate only TikZ (no PDF compilation) + python -m pyxdsm mdf.json -o mdf.tikz + + # Export to a different JSON file + python -m pyxdsm mdf.json -o output.json + + # Generate PDF with default name (mdf.pdf) + python -m pyxdsm mdf.json + +For more options, use the ``--help`` flag: + +.. code-block:: bash + + python -m pyxdsm --help + + More complicated example ------------------------ diff --git a/doc/requirements.txt b/doc/requirements.txt index b03c968..c1e8477 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -1,2 +1,3 @@ numpydoc sphinx_mdolab_theme +autodoc-pydantic diff --git a/examples/kitchen_sink.json b/examples/kitchen_sink.json new file mode 100644 index 0000000..af68e59 --- /dev/null +++ b/examples/kitchen_sink.json @@ -0,0 +1,432 @@ +{ + "systems": [ + { + "node_name": "opt", + "style": "Optimization", + "label": "\\text{Optimizer}", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "opt" + }, + { + "node_name": "DOE", + "style": "DOE", + "label": "\\text{DOE}", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "DOE" + }, + { + "node_name": "MDA", + "style": "MDA", + "label": "\\text{Newton}", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "MDA" + }, + { + "node_name": "D1", + "style": "Function", + "label": "D_1", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "D1" + }, + { + "node_name": "D2", + "style": "ImplicitFunction", + "label": "D_2", + "stack": false, + "faded": true, + "label_width": null, + "spec_name": "D2" + }, + { + "node_name": "D3", + "style": "ImplicitFunction", + "label": "D_3", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "D3" + }, + { + "node_name": "subopt", + "style": "SubOptimization", + "label": "SubOpt", + "stack": false, + "faded": true, + "label_width": null, + "spec_name": "subopt" + }, + { + "node_name": "G1", + "style": "Group", + "label": "G_1", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "G1" + }, + { + "node_name": "G2", + "style": "ImplicitGroup", + "label": "G_2", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "G2" + }, + { + "node_name": "MM", + "style": "Metamodel", + "label": "MM", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "MM" + }, + { + "node_name": "F", + "style": "Function", + "label": [ + "F", + "\\text{Functional}" + ], + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "F" + }, + { + "node_name": "H", + "style": "Function", + "label": "H", + "stack": true, + "faded": false, + "label_width": null, + "spec_name": "H" + } + ], + "connections": [ + { + "src": "opt", + "target": "D1", + "label": [ + "x", + "z", + "y_2" + ], + "label_width": 2, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "D2", + "label": [ + "z", + "y_1" + ], + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": true + }, + { + "src": "opt", + "target": "D3", + "label": "z, y_1", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "subopt", + "label": "z, y_1", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": true + }, + { + "src": "D3", + "target": "G1", + "label": "y_3", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "subopt", + "target": "G1", + "label": "z_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": true, + "src_faded": true, + "target_faded": false + }, + { + "src": "subopt", + "target": "G2", + "label": "z_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": true, + "src_faded": true, + "target_faded": false + }, + { + "src": "subopt", + "target": "MM", + "label": "z_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": true, + "src_faded": true, + "target_faded": false + }, + { + "src": "subopt", + "target": "F", + "label": "f", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": true, + "src_faded": true, + "target_faded": false + }, + { + "src": "MM", + "target": "subopt", + "label": "f", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": true + }, + { + "src": "opt", + "target": "G2", + "label": "z", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "F", + "label": "x, z", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "F", + "label": "y_1, y_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "H", + "label": "y_1, y_2", + "label_width": null, + "style": "DataInter", + "stack": true, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "D1", + "target": "opt", + "label": "\\mathcal{R}(y_1)", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "D2", + "target": "opt", + "label": "\\mathcal{R}(y_2)", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": true, + "src_faded": true, + "target_faded": false + }, + { + "src": "F", + "target": "opt", + "label": "f", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "H", + "target": "opt", + "label": "h", + "label_width": null, + "style": "DataInter", + "stack": true, + "faded": false, + "src_faded": false, + "target_faded": false + } + ], + "inputs": { + "D1": { + "node_name": "output_D1", + "label": "P_1", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false + }, + "D2": { + "node_name": "output_D2", + "label": "P_2", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false + }, + "opt": { + "node_name": "output_opt", + "label": "x_0", + "label_width": null, + "style": "DataIO", + "stack": true, + "faded": false + } + }, + "outputs": { + "opt": { + "node_name": "left_output_opt", + "label": "y^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "D1": { + "node_name": "left_output_D1", + "label": "y_1^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "D2": { + "node_name": "left_output_D2", + "label": "y_2^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": true, + "side": "left" + }, + "F": { + "node_name": "right_output_F", + "label": "f^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "right" + }, + "H": { + "node_name": "right_output_H", + "label": "h^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "right" + } + }, + "processes": [ + { + "systems": [ + "opt", + "DOE", + "MDA", + "D1", + "D2", + "subopt", + "G1", + "G2", + "MM", + "F", + "H", + "opt" + ], + "arrow": true, + "faded": false + }, + { + "systems": [ + "output_opt", + "opt", + "left_output_opt" + ], + "arrow": true, + "faded": false + } + ], + "use_sfmath": true, + "optional_packages": [], + "auto_fade": { + "inputs": "none", + "outputs": "connected", + "connections": "outgoing", + "processes": "none" + } +} diff --git a/examples/kitchen_sink.py b/examples/kitchen_sink.py index 95638ab..9342ad6 100644 --- a/examples/kitchen_sink.py +++ b/examples/kitchen_sink.py @@ -1,16 +1,16 @@ from pyxdsm.XDSM import ( - XDSM, - OPT, - SUBOPT, - SOLVER, DOE, - IFUNC, FUNC, GROUP, + IFUNC, IGROUP, - METAMODEL, LEFT, + METAMODEL, + OPT, RIGHT, + SOLVER, + SUBOPT, + XDSM, ) x = XDSM( @@ -87,3 +87,4 @@ x.write("kitchen_sink", cleanup=False) x.write_sys_specs("sink_specs") +x.to_json("kitchen_sink.json") diff --git a/examples/mdf.json b/examples/mdf.json new file mode 100644 index 0000000..572619a --- /dev/null +++ b/examples/mdf.json @@ -0,0 +1,238 @@ +{ + "systems": [ + { + "node_name": "opt", + "style": "Optimization", + "label": "\\text{Optimizer}", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "opt" + }, + { + "node_name": "solver", + "style": "MDA", + "label": "\\text{Newton}", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "solver" + }, + { + "node_name": "D1", + "style": "Function", + "label": "D_1", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "D1" + }, + { + "node_name": "D2", + "style": "Function", + "label": "D_2", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "D2" + }, + { + "node_name": "F", + "style": "Function", + "label": "F", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "F" + }, + { + "node_name": "G", + "style": "Function", + "label": "G", + "stack": false, + "faded": false, + "label_width": null, + "spec_name": "G" + } + ], + "connections": [ + { + "src": "opt", + "target": "D1", + "label": "x, z", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "D2", + "label": "z", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "opt", + "target": "F", + "label": "x, z", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "solver", + "target": "D1", + "label": "y_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "solver", + "target": "D2", + "label": "y_1", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "D1", + "target": "solver", + "label": "\\mathcal{R}(y_1)", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "solver", + "target": "F", + "label": "y_1, y_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "D2", + "target": "solver", + "label": "\\mathcal{R}(y_2)", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "solver", + "target": "G", + "label": "y_1, y_2", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "F", + "target": "opt", + "label": "f", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + }, + { + "src": "G", + "target": "opt", + "label": "g", + "label_width": null, + "style": "DataInter", + "stack": false, + "faded": false, + "src_faded": false, + "target_faded": false + } + ], + "inputs": {}, + "outputs": { + "opt": { + "node_name": "left_output_opt", + "label": "x^*, z^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "D1": { + "node_name": "left_output_D1", + "label": "y_1^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "D2": { + "node_name": "left_output_D2", + "label": "y_2^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "F": { + "node_name": "left_output_F", + "label": "f^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + }, + "G": { + "node_name": "left_output_G", + "label": "g^*", + "label_width": null, + "style": "DataIO", + "stack": false, + "faded": false, + "side": "left" + } + }, + "processes": [], + "use_sfmath": true, + "optional_packages": [], + "auto_fade": { + "inputs": "none", + "outputs": "none", + "connections": "none", + "processes": "none" + } +} diff --git a/examples/mdf.py b/examples/mdf.py index 65c89f1..7359ebb 100644 --- a/examples/mdf.py +++ b/examples/mdf.py @@ -1,4 +1,4 @@ -from pyxdsm.XDSM import XDSM, OPT, SOLVER, FUNC, LEFT +from pyxdsm.XDSM import FUNC, LEFT, OPT, SOLVER, XDSM # Change `use_sfmath` to False to use computer modern x = XDSM(use_sfmath=True) @@ -29,3 +29,5 @@ x.add_output("F", "f^*", side=LEFT) x.add_output("G", "g^*", side=LEFT) x.write("mdf") + +x.to_json("mdf.json") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a8e6f41 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,94 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "pyXDSM" +dynamic = ["version"] +description = "Python script to generate PDF XDSM diagrams using TikZ and LaTeX" +readme = "README.md" +requires-python = ">=3.8" +license = {text = "Apache License Version 2.0"} +keywords = ["optimization", "multidisciplinary", "multi-disciplinary", "analysis", "n2", "xdsm"] +authors = [ + {name = "MDO Lab"}, +] +maintainers = [ + {name = "MDO Lab"}, +] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering", +] + +dependencies = [ + "numpy>=1.21", + "pydantic>=2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=6.0", + "pytest-cov", + "ruff", +] +docs = [ + "sphinx", + "sphinx-mdolab-theme", + "numpydoc", + "sphinxcontrib-autodoc-pydantic>=2.0", +] +all = ["pyXDSM[dev,docs]"] + +[project.urls] +Homepage = "https://github.com/mdolab/pyXDSM" +Documentation = "https://mdolab-pyxdsm.readthedocs-hosted.com" +Repository = "https://github.com/mdolab/pyXDSM" +Issues = "https://github.com/mdolab/pyXDSM/issues" + +[tool.setuptools] +packages = ["pyxdsm"] + +[tool.setuptools.dynamic] +version = {attr = "pyxdsm.__version__"} + +[tool.setuptools.package-data] +pyxdsm = ["*.tex"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v --tb=short" + +[tool.ruff] +line-length = 120 +indent-width = 4 +exclude = ["doc/conf.py"] + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "UP", # pyupgrade +] +ignore = [ + "E501", # line too long (handled by formatter) + "UP031", # use format specifiers instead of percent format + "UP032", # use f-string instead of format call +] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" diff --git a/pyxdsm/XDSM.py b/pyxdsm/XDSM.py index cd3a1cc..978683d 100644 --- a/pyxdsm/XDSM.py +++ b/pyxdsm/XDSM.py @@ -1,12 +1,17 @@ -from __future__ import print_function -import os -import numpy as np +""" +pyXDSM with Pydantic models for validation and serialization +""" + import json -import subprocess -from collections import namedtuple +import os +from pathlib import Path +from typing import Literal, Optional, Union + +from pydantic import BaseModel, field_validator, model_validator -from pyxdsm import __version__ as pyxdsm_version +from pyxdsm.xdsm_latex_writer import XDSMLatexWriter +# Constants OPT = "Optimization" SUBOPT = "SubOptimization" SOLVER = "MDA" @@ -19,171 +24,210 @@ LEFT = "left" RIGHT = "right" -tikzpicture_template = r""" -%%% Preamble Requirements %%% -% \usepackage{{geometry}} -% \usepackage{{amsfonts}} -% \usepackage{{amsmath}} -% \usepackage{{amssymb}} -% \usepackage{{tikz}} - -% Optional packages such as sfmath set through python interface -% \usepackage{{{optional_packages}}} - -% \usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}} - -%%% End Preamble Requirements %%% - -\input{{"{diagram_styles_path}"}} -\begin{{tikzpicture}} - -\matrix[MatrixSetup]{{ -{nodes}}}; - -% XDSM process chains -{process} - -\begin{{pgfonlayer}}{{data}} -\path -{edges} -\end{{pgfonlayer}} - -\end{{tikzpicture}} -""" - -tex_template = r""" -% XDSM diagram created with pyXDSM {version}. -\documentclass{{article}} -\usepackage{{geometry}} -\usepackage{{amsfonts}} -\usepackage{{amsmath}} -\usepackage{{amssymb}} -\usepackage{{tikz}} - -% Optional packages such as sfmath set through python interface -\usepackage{{{optional_packages}}} - -% Define the set of TikZ packages to be included in the architecture diagram document -\usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}} - - -% Set the border around all of the architecture diagrams to be tight to the diagrams themselves -% (i.e. no longer need to tinker with page size parameters) -\usepackage[active,tightpage]{{preview}} -\PreviewEnvironment{{tikzpicture}} -\setlength{{\PreviewBorder}}{{5pt}} - -\begin{{document}} - -\input{{"{tikzpicture_path}"}} - -\end{{document}} -""" - - -def chunk_label(label, n_chunks): - # looping till length l - for i in range(0, len(label), n_chunks): - yield label[i : i + n_chunks] - - -def _parse_label(label, label_width=None): - if isinstance(label, (tuple, list)): - if label_width is None: - return r"$\begin{array}{c}" + r" \\ ".join(label) + r"\end{array}$" - else: - labels = [] - for chunk in chunk_label(label, label_width): - labels.append(", ".join(chunk)) - return r"$\begin{array}{c}" + r" \\ ".join(labels) + r"\end{array}$" - else: - return r"${}$".format(label) - - -def _label_to_spec(label, spec): - if isinstance(label, str): - label = [ - label, - ] - for var in label: - if var: - spec.add(var) - - -System = namedtuple("System", "node_name style label stack faded label_width spec_name") -Input = namedtuple("Input", "node_name label label_width style stack faded") -Output = namedtuple("Output", "node_name label label_width style stack faded side") -Connection = namedtuple("Connection", "src target label label_width style stack faded src_faded target_faded") -Process = namedtuple("Process", "systems arrow faded") - - -class XDSM: - def __init__(self, use_sfmath=True, optional_latex_packages=None, auto_fade=None): - """Initialize XDSM object - - Parameters - ---------- - use_sfmath : bool, optional - Whether to use the sfmath latex package, by default True - optional_latex_packages : string or list of strings, optional - Additional latex packages to use when creating the pdf and tex versions of the diagram, by default None - auto_fade : dictionary, optional - Controls the automatic fading of inputs, outputs, connections and processes based on the fading of diagonal blocks. For each key "inputs", "outputs", "connections", and "processes", the value can be one of: - - "all" : fade all blocks - - "connected" : fade all components connected to faded blocks (both source and target must be faded for a conncection to be faded) - - "none" : do not auto-fade anything - For connections there are two additional options: - - "incoming" : Fade all connections that are incoming to faded blocks. - - "outgoing" : Fade all connections that are outgoing from faded blocks. - """ - self.systems = [] - self.connections = [] - self.left_outs = {} - self.right_outs = {} - self.ins = {} - self.processes = [] - - self.use_sfmath = use_sfmath - if optional_latex_packages is None: - self.optional_packages = [] - else: - if isinstance(optional_latex_packages, str): - self.optional_packages = [optional_latex_packages] - elif isinstance(optional_latex_packages, list): - self.optional_packages = optional_latex_packages - else: - raise ValueError("optional_latex_packages must be a string or a list of strings") - - self.auto_fade = {"inputs": "none", "outputs": "none", "connections": "none", "processes": "none"} - fade_options = ["all", "connected", "none"] - if auto_fade is not None: - if any([key not in self.auto_fade for key in auto_fade.keys()]): - raise ValueError( - "The supplied 'auto_fade' dictionary contains keys that are not recognized. " - + "valid keys are 'inputs', 'outputs', 'connections', 'processes'." - ) - - self.auto_fade.update(auto_fade) - for key in self.auto_fade.keys(): - option_is_valid = self.auto_fade[key] in fade_options or ( - key == "connections" and self.auto_fade[key] in ["incoming", "outgoing"] - ) - if not option_is_valid: - raise ValueError( - f"The supplied 'auto_fade' dictionary contains an invalid value: '{key}'. " - + "valid values are 'all', 'connected', 'none', 'incoming', 'outgoing'." - ) +ConnectionStyle = Literal["DataInter", "DataIO"] +Side = Literal["left", "right"] +AutoFadeOption = Literal["all", "connected", "none", "incoming", "outgoing"] + +# Valid TikZ node styles (from diagram_styles.tikzstyles) +NodeStyle = Literal[ + "Optimization", + "SubOptimization", + "MDA", + "DOE", + "ImplicitFunction", + "Function", + "Group", + "ImplicitGroup", + "Metamodel", + "DataInter", + "DataIO", +] + + +class SystemNode(BaseModel): + """System node on the diagonal of XDSM diagram.""" + + node_name: str + style: NodeStyle + label: Union[str, list[str], tuple[str, ...]] + stack: bool = False + faded: bool = False + label_width: Optional[int] = None + spec_name: Optional[str] = None + + @field_validator("node_name") + @classmethod + def _validate_node_name(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Node name cannot be empty") + return v.strip() + + @model_validator(mode="after") + def set_defaults(self) -> "SystemNode": + """Set spec_name to node_name if not provided.""" + if self.spec_name is None: + self.spec_name = self.node_name + return self + + +class InputNode(BaseModel): + """Input node at top of XDSM diagram.""" + + node_name: str + label: Union[str, list[str], tuple[str, ...]] + label_width: Optional[int] = None + style: str = "DataIO" + stack: bool = False + faded: bool = False + + +class OutputNode(BaseModel): + """Output node on left or right side of XDSM diagram.""" + + node_name: str + label: Union[str, list[str], tuple[str, ...]] + label_width: Optional[int] = None + style: ConnectionStyle = "DataIO" + stack: bool = False + faded: bool = False + side: Side = "right" + + +class ConnectionEdge(BaseModel): + """Connection between two nodes.""" + + src: str + target: str + label: Union[str, list[str], tuple[str, ...]] + label_width: Optional[int] = None + style: ConnectionStyle = "DataInter" + stack: bool = False + faded: bool = False + src_faded: bool = False + target_faded: bool = False + + @model_validator(mode="after") + def _validate_no_self_connection(self): + if self.src == self.target: + raise ValueError("Cannot connect component to itself") + return self + + +class ProcessChain(BaseModel): + """Process flow chain between systems.""" + + systems: list[str] + arrow: bool = True + faded: bool = False + + @field_validator("systems") + @classmethod + def _validate_systems(cls, v: list[str]) -> list[str]: + if len(v) < 2: + raise ValueError("Process chain must contain at least 2 systems") + return v + + +class AutoFadeConfig(BaseModel): + """Configuration for automatic fading of components.""" + + inputs: AutoFadeOption = "none" + outputs: AutoFadeOption = "none" + connections: AutoFadeOption = "none" + processes: AutoFadeOption = "none" + + @field_validator("inputs", "outputs", "processes") + @classmethod + def _validate_basic_options(cls, v: str) -> str: + valid = ["all", "connected", "none"] + if v not in valid: + raise ValueError(f"Must be one of {valid}") + return v + + @field_validator("connections") + @classmethod + def _validate_connection_options(cls, v: str) -> str: + valid = ["all", "connected", "none", "incoming", "outgoing"] + if v not in valid: + raise ValueError(f"Must be one of {valid}") + return v + + +class XDSM(BaseModel): + """ + XDSM diagram specification and renderer using Pydantic validation. + """ + + systems: list[SystemNode] = [] + connections: list[ConnectionEdge] = [] + inputs: dict[str, InputNode] = {} + outputs: dict[str, OutputNode] = {} + processes: list[ProcessChain] = [] + + use_sfmath: bool = True + optional_packages: Union[str, list[str]] = [] + auto_fade: Union[dict[str, str], AutoFadeConfig] = AutoFadeConfig() + + @field_validator("optional_packages", mode="before") + @classmethod + def _validate_optional_packages(cls, v): + """Accept string or list, convert to list.""" + if v is None: + return [] + if isinstance(v, str): + return [v] + return v + + @field_validator("auto_fade", mode="before") + @classmethod + def _validate_auto_fade(cls, v): + """Accept dict or AutoFadeConfig, convert to AutoFadeConfig.""" + if v is None: + return AutoFadeConfig() + if isinstance(v, dict): + return AutoFadeConfig(**v) + return v + + @model_validator(mode="before") + @classmethod + def _set_defaults_for_missing_fields(cls, data): + """Ensure missing or null collection fields get empty defaults.""" + if not isinstance(data, dict): + return data + + # Set empty defaults for missing or null collection fields + if "inputs" not in data or data.get("inputs") is None: + data["inputs"] = {} + if "outputs" not in data or data.get("outputs") is None: + data["outputs"] = {} + if "systems" not in data or data.get("systems") is None: + data["systems"] = [] + if "connections" not in data or data.get("connections") is None: + data["connections"] = [] + if "processes" not in data or data.get("processes") is None: + data["processes"] = [] + + return data + + @model_validator(mode="after") + def _validate_unique_system_names(self): + """Ensure all system names are unique.""" + names = [sys.node_name for sys in self.systems] + duplicates = [n for n in names if names.count(n) > 1] + if duplicates: + raise ValueError(f"Duplicate system names: {set(duplicates)}") + return self def add_system( self, - node_name, - style, - label, - stack=False, - faded=False, - label_width=None, - spec_name=None, - ): + node_name: str, + style: str, + label: Union[str, list[str], tuple[str, ...]], + stack: bool = False, + faded: bool = False, + label_width: Optional[int] = None, + spec_name: Optional[str] = None, + ) -> None: r""" Add a "system" block, which will be placed on the diagonal of the XDSM diagram. @@ -219,13 +263,26 @@ def add_system( The spec name used for the spec file. """ - if spec_name is None: - spec_name = node_name - - sys = System(node_name, style, label, stack, faded, label_width, spec_name) - self.systems.append(sys) + system = SystemNode( + node_name=node_name, + style=style, + label=label, + stack=stack, + faded=faded, + label_width=label_width, + spec_name=spec_name, + ) + self.systems.append(system) - def add_input(self, name, label, label_width=None, style="DataIO", stack=False, faded=False): + def add_input( + self, + name: str, + label: Union[str, list[str], tuple[str, ...]], + label_width: Optional[int] = None, + style: str = "DataIO", + stack: bool = False, + faded: bool = False, + ) -> None: r""" Add an input, which will appear in the top row of the diagram. @@ -257,16 +314,27 @@ def add_input(self, name, label, label_width=None, style="DataIO", stack=False, faded : bool If true, the component will be faded, in order to highlight some other system. """ - sys_faded = {} - for s in self.systems: - sys_faded[s.node_name] = s.faded - if (self.auto_fade["inputs"] == "all") or ( - self.auto_fade["inputs"] == "connected" and name in sys_faded and sys_faded[name] + sys_faded = {s.node_name: s.faded for s in self.systems} + + if (self.auto_fade.inputs == "all") or ( + self.auto_fade.inputs == "connected" and name in sys_faded and sys_faded[name] ): faded = True - self.ins[name] = Input("output_" + name, label, label_width, style, stack, faded) - def add_output(self, name, label, label_width=None, style="DataIO", stack=False, faded=False, side="left"): + self.inputs[name] = InputNode( + node_name="output_" + name, label=label, label_width=label_width, style=style, stack=stack, faded=faded + ) + + def add_output( + self, + name: str, + label: Union[str, list[str], tuple[str, ...]], + label_width: Optional[int] = None, + style: str = "DataIO", + stack: bool = False, + faded: bool = False, + side: str = "left", + ) -> None: r""" Add an output, which will appear in the left or right-most column of the diagram. @@ -302,30 +370,35 @@ def add_output(self, name, label, label_width=None, style="DataIO", stack=False, Must be one of ``['left', 'right']``. This parameter controls whether the output is placed on the left-most column or the right-most column of the diagram. """ - sys_faded = {} - for s in self.systems: - sys_faded[s.node_name] = s.faded - if (self.auto_fade["outputs"] == "all") or ( - self.auto_fade["outputs"] == "connected" and name in sys_faded and sys_faded[name] + sys_faded = {s.node_name: s.faded for s in self.systems} + + if (self.auto_fade.outputs == "all") or ( + self.auto_fade.outputs == "connected" and name in sys_faded and sys_faded[name] ): faded = True - if side == "left": - self.left_outs[name] = Output("left_output_" + name, label, label_width, style, stack, faded, side) - elif side == "right": - self.right_outs[name] = Output("right_output_" + name, label, label_width, style, stack, faded, side) - else: - raise ValueError("The option 'side' must be given as either 'left' or 'right'!") + + output = OutputNode( + node_name=f"{side}_output_{name}", + label=label, + label_width=label_width, + style=style, + stack=stack, + faded=faded, + side=side, + ) + + self.outputs[name] = output def connect( self, - src, - target, - label, - label_width=None, - style="DataInter", - stack=False, - faded=False, - ): + src: str, + target: str, + label: Union[str, list[str], tuple[str, ...]], + label_width: Optional[int] = None, + style: str = "DataInter", + stack: bool = False, + faded: bool = False, + ) -> None: r""" Connects two components with a data line, and adds a label to indicate the data being transferred. @@ -361,30 +434,34 @@ def connect( faded : bool If true, the component will be faded, in order to highlight some other system. """ - if src == target: - raise ValueError("Can not connect component to itself") - - if (not isinstance(label_width, int)) and (label_width is not None): - raise ValueError("label_width argument must be an integer") + sys_faded = {s.node_name: s.faded for s in self.systems} - sys_faded = {} - for s in self.systems: - sys_faded[s.node_name] = s.faded + src_faded = src in sys_faded and sys_faded[src] + target_faded = target in sys_faded and sys_faded[target] - allFaded = self.auto_fade["connections"] == "all" - srcFaded = src in sys_faded and sys_faded[src] - targetFaded = target in sys_faded and sys_faded[target] + all_faded = self.auto_fade.connections == "all" if ( - allFaded - or (self.auto_fade["connections"] == "connected" and (srcFaded and targetFaded)) - or (self.auto_fade["connections"] == "incoming" and targetFaded) - or (self.auto_fade["connections"] == "outgoing" and srcFaded) + all_faded + or (self.auto_fade.connections == "connected" and src_faded and target_faded) + or (self.auto_fade.connections == "incoming" and target_faded) + or (self.auto_fade.connections == "outgoing" and src_faded) ): faded = True - self.connections.append(Connection(src, target, label, label_width, style, stack, faded, srcFaded, targetFaded)) + connection = ConnectionEdge( + src=src, + target=target, + label=label, + label_width=label_width, + style=style, + stack=stack, + faded=faded, + src_faded=src_faded, + target_faded=target_faded, + ) + self.connections.append(connection) - def add_process(self, systems, arrow=True, faded=False): + def add_process(self, systems: list[str], arrow: bool = True, faded: bool = False) -> None: """ Add a process line between a list of systems, to indicate process flow. @@ -398,316 +475,61 @@ def add_process(self, systems, arrow=True, faded=False): If true, arrows will be added to the process lines to indicate the direction of the process flow. """ - sys_faded = {} - for s in self.systems: - sys_faded[s.node_name] = s.faded - if (self.auto_fade["processes"] == "all") or ( - self.auto_fade["processes"] == "connected" - and any( - [sys_faded[s] for s in systems if s in sys_faded.keys()] - ) # sometimes a process may contain off-diagonal blocks + sys_faded = {s.node_name: s.faded for s in self.systems} + + if (self.auto_fade.processes == "all") or ( + self.auto_fade.processes == "connected" and any([sys_faded.get(s, False) for s in systems]) ): faded = True - self.processes.append(Process(systems, arrow, faded)) - - def _build_node_grid(self): - size = len(self.systems) - comps_rows = np.arange(size) - comps_cols = np.arange(size) + process = ProcessChain(systems=systems, arrow=arrow, faded=faded) + self.processes.append(process) - if self.ins: - size += 1 - # move all comps down one row - comps_rows += 1 - - if self.left_outs: - size += 1 - # shift all comps to the right by one, to make room for inputs - comps_cols += 1 - - if self.right_outs: - size += 1 - # don't need to shift anything in this case - - # build a map between comp node_names and row idx for ordering calculations - row_idx_map = {} - col_idx_map = {} - - node_str = r"\node [{style}] ({node_name}) {{{node_label}}};" - - grid = np.empty((size, size), dtype=object) - grid[:] = "" - - # add all the components on the diagonal - for i_row, j_col, comp in zip(comps_rows, comps_cols, self.systems): - style = comp.style - if comp.stack: - style += ",stack" - if comp.faded: - style += ",faded" - - label = _parse_label(comp.label, comp.label_width) - node = node_str.format(style=style, node_name=comp.node_name, node_label=label) - grid[i_row, j_col] = node - - row_idx_map[comp.node_name] = i_row - col_idx_map[comp.node_name] = j_col - - # add all the off diagonal nodes from components - for conn in self.connections: - # src, target, style, label, stack, faded, label_width - src_row = row_idx_map[conn.src] - target_col = col_idx_map[conn.target] - - loc = (src_row, target_col) - - style = conn.style - if conn.stack: - style += ",stack" - if conn.faded: - style += ",faded" - - label = _parse_label(conn.label, conn.label_width) - - node_name = "{}-{}".format(conn.src, conn.target) - - node = node_str.format(style=style, node_name=node_name, node_label=label) - - grid[loc] = node - - # add the nodes for left outputs - for comp_name, out in self.left_outs.items(): - style = out.style - if out.stack: - style += ",stack" - if out.faded: - style += ",faded" - - i_row = row_idx_map[comp_name] - loc = (i_row, 0) - - label = _parse_label(out.label, out.label_width) - node = node_str.format(style=style, node_name=out.node_name, node_label=label) - - grid[loc] = node - - # add the nodes for right outputs - for comp_name, out in self.right_outs.items(): - style = out.style - if out.stack: - style += ",stack" - if out.faded: - style += ",faded" - - i_row = row_idx_map[comp_name] - loc = (i_row, -1) - label = _parse_label(out.label, out.label_width) - node = node_str.format(style=style, node_name=out.node_name, node_label=label) - - grid[loc] = node - - # add the inputs to the top of the grid - for comp_name, inp in self.ins.items(): - # node_name, style, label, stack = in_data - style = inp.style - if inp.stack: - style += ",stack" - if inp.faded: - style += ",faded" - - j_col = col_idx_map[comp_name] - loc = (0, j_col) - label = _parse_label(inp.label, label_width=inp.label_width) - node = node_str.format(style=style, node_name=inp.node_name, node_label=label) - - grid[loc] = node - - # mash the grid data into a string - rows_str = "" - for i, row in enumerate(grid): - rows_str += "%Row {}\n".format(i) + "&\n".join(row) + r"\\" + "\n" - - return rows_str + def write( + self, file_name: str, build: bool = True, cleanup: bool = True, quiet: bool = False, outdir: str = "." + ) -> None: + """ + Write output files for the XDSM diagram (delegates to XDSMLatexWriter). - def _build_edges(self): - h_edges = [] - v_edges = [] + Parameters + ---------- + file_name : str + Prefix for output files + build : bool + Whether to compile the PDF + cleanup : bool + Whether to delete build files after compilation + quiet : bool + Suppress pdflatex output + outdir : str + Output directory path + """ + XDSMLatexWriter.write(self, file_name, build, cleanup, quiet, outdir) - edge_format_string = "({start}) edge [{style}] ({end})" - for conn in self.connections: - h_edge_style = "DataLine" - v_edge_style = "DataLine" - if conn.src_faded or conn.faded: - h_edge_style += ",faded" - if conn.target_faded or conn.faded: - v_edge_style += ",faded" - od_node_name = "{}-{}".format(conn.src, conn.target) - - h_edges.append(edge_format_string.format(start=conn.src, end=od_node_name, style=h_edge_style)) - v_edges.append(edge_format_string.format(start=od_node_name, end=conn.target, style=v_edge_style)) - - for comp_name, out in self.left_outs.items(): - style = "DataLine" - if out.faded: - style += ",faded" - node_name = out.node_name - h_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) - - for comp_name, out in self.right_outs.items(): - style = "DataLine" - if out.faded: - style += ",faded" - node_name = out.node_name - h_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) - - for comp_name, inp in self.ins.items(): - style = "DataLine" - if inp.faded: - style += ",faded" - node_name = inp.node_name - v_edges.append(edge_format_string.format(start=comp_name, end=node_name, style=style)) - - h_edges = sorted(h_edges, key=lambda s: "faded" in s) - v_edges = sorted(v_edges, key=lambda s: "faded" in s) - - paths_str = "% Horizontal edges\n" + "\n".join(h_edges) + "\n" - paths_str += "% Vertical edges\n" + "\n".join(v_edges) + ";" - - return paths_str - - def _build_process_chain(self): - sys_names = [s.node_name for s in self.systems] - output_names = ( - [data[0] for _, data in self.ins.items()] - + [data[0] for _, data in self.left_outs.items()] - + [data[0] for _, data in self.right_outs.items()] - ) - # comp_name, in_data in self.ins.items(): - # node_name, style, label, stack = in_data - chain_str = "" - - for proc in self.processes: - chain_str += "{ [start chain=process]\n \\begin{pgfonlayer}{process} \n" - start_tip = False - for i, sys in enumerate(proc.systems): - if sys not in sys_names and sys not in output_names: - raise ValueError( - 'process includes a system named "{}" but no system with that name exists.'.format(sys) - ) - if sys in output_names and i == 0: - start_tip = True - if i == 0: - chain_str += "\\chainin ({});\n".format(sys) - else: - if sys in output_names or (i == 1 and start_tip): - if proc.arrow: - style = "ProcessTipA" - else: - style = "ProcessTip" - else: - if proc.arrow: - style = "ProcessHVA" - else: - style = "ProcessHV" - if proc.faded: - style = "Faded" + style - chain_str += "\\chainin ({}) [join=by {}];\n".format(sys, style) - chain_str += "\\end{pgfonlayer}\n}" - - return chain_str - - def _compose_optional_package_list(self): - # Check for optional LaTeX packages - optional_packages_list = self.optional_packages - if self.use_sfmath: - optional_packages_list.append("sfmath") - - # Join all packages into one string separated by comma - optional_packages_str = ",".join(optional_packages_list) - - return optional_packages_str - - def write(self, file_name, build=True, cleanup=True, quiet=False, outdir="."): + def to_latex( + self, file_name: str, build: bool = True, cleanup: bool = True, quiet: bool = False, outdir: str = "." + ) -> None: """ - Write output files for the XDSM diagram. This produces the following: + Export XDSM diagram to LaTeX/TikZ format. - - {file_name}.tikz - A file containing the TikZ definition of the XDSM diagram. - - {file_name}.tex - A standalone document wrapped around an include of the TikZ file which can - be compiled to a pdf. - - {file_name}.pdf - An optional compiled version of the standalone tex file. + Alias for write() method for clarity when exporting to LaTeX. Parameters ---------- file_name : str - The prefix to be used for the output files + Prefix for output files build : bool - Flag that determines whether the standalone PDF of the XDSM will be compiled. - Default is True. + Whether to compile the PDF cleanup : bool - Flag that determines if pdflatex build files will be deleted after build is complete + Whether to delete build files after compilation quiet : bool - Set to True to suppress output from pdflatex. + Suppress pdflatex output outdir : str - Path to an existing directory in which to place output files. If a relative - path is given, it is interpreted relative to the current working directory. + Output directory path """ - nodes = self._build_node_grid() - edges = self._build_edges() - process = self._build_process_chain() - - module_path = os.path.dirname(__file__) - diagram_styles_path = os.path.join(module_path, "diagram_styles") - # Hack for Windows. MiKTeX needs Linux style paths. - diagram_styles_path = diagram_styles_path.replace("\\", "/") - - optional_packages_str = self._compose_optional_package_list() - - tikzpicture_str = tikzpicture_template.format( - nodes=nodes, - edges=edges, - process=process, - diagram_styles_path=diagram_styles_path, - optional_packages=optional_packages_str, - ) - - base_output_fp = os.path.join(outdir, file_name) - with open(base_output_fp + ".tikz", "w") as f: - f.write(tikzpicture_str) - - tex_str = tex_template.format( - nodes=nodes, - edges=edges, - tikzpicture_path=file_name + ".tikz", - diagram_styles_path=diagram_styles_path, - optional_packages=optional_packages_str, - version=pyxdsm_version, - ) + XDSMLatexWriter.write(self, file_name, build, cleanup, quiet, outdir) - with open(base_output_fp + ".tex", "w") as f: - f.write(tex_str) - - if build: - command = [ - "pdflatex", - "-halt-on-error", - "-interaction=nonstopmode", - "-output-directory={}".format(outdir), - ] - if quiet: - command += ["-interaction=batchmode", "-halt-on-error"] - command += [f"{file_name}.tex"] - subprocess.run(command, check=True) - if cleanup: - for ext in ["aux", "fdb_latexmk", "fls", "log"]: - f_name = "{}.{}".format(base_output_fp, ext) - if os.path.exists(f_name): - os.remove(f_name) - - def write_sys_specs(self, folder_name): + def write_sys_specs(self, folder_name: str) -> None: """ Write I/O spec json files for systems to specified folder @@ -728,31 +550,36 @@ def write_sys_specs(self, folder_name): name of the folder, which will be created if it doesn't exist, to put spec files into """ - # find un-connected to each system by looking at Inputs + def _label_to_spec(label: Union[str, list[str], tuple[str, ...]], spec: set[str]) -> None: + """Add label variables to spec set.""" + if isinstance(label, str): + label = [label] + for var in label: + if var: + spec.add(var) + specs = {} for sys in self.systems: specs[sys.node_name] = {"inputs": set(), "outputs": set()} - for sys_name, inp in self.ins.items(): + # Add inputs from Input nodes + for sys_name, inp in self.inputs.items(): _label_to_spec(inp.label, specs[sys_name]["inputs"]) - # find connected inputs/outputs to each system by looking at Connections + # Add inputs/outputs from Connections for conn in self.connections: _label_to_spec(conn.label, specs[conn.target]["inputs"]) - _label_to_spec(conn.label, specs[conn.src]["outputs"]) - # find unconnected outputs to each system by looking at Outputs - for sys_name, out in self.left_outs.items(): - _label_to_spec(out.label, specs[sys_name]["outputs"]) - for sys_name, out in self.right_outs.items(): + # Add outputs from Output nodes + for sys_name, out in self.outputs.items(): _label_to_spec(out.label, specs[sys_name]["outputs"]) if not os.path.isdir(folder_name): os.mkdir(folder_name) for sys in self.systems: - if sys.spec_name is not False: + if sys.spec_name is not False and sys.spec_name is not None: path = os.path.join(folder_name, sys.spec_name + ".json") with open(path, "w") as f: spec = specs[sys.node_name] @@ -760,3 +587,40 @@ def write_sys_specs(self, folder_name): spec["outputs"] = list(spec["outputs"]) json_str = json.dumps(spec, indent=2) f.write(json_str) + + def to_json(self, filename: Optional[str] = None) -> str: + """ + Get the JSON representation of the XDSM, and optioally write to file. + + Parameters + ---------- + filename : str + The filename to which to write the JSON representation of the XDSM""" + json_str = self.model_dump_json(indent=2) + if filename: + with open(filename, "w") as f: + f.write(f"{json_str}\n") + return json_str + + @classmethod + def from_json(cls, s: str) -> "XDSM": + """Instantiate an XDSM from the given JSON data. + + Parameters + ---------- + f : str + A filename or string of JSON data from which + the XDSM should be instantiated. + """ + if Path(s).is_file(): + with open(s) as f: + try: + data = json.load(f) + except Exception as e: + raise RuntimeError("Unable to load JSON from file: {s}") from e + else: + try: + data = json.loads(s) + except (json.JSONDecodeError, TypeError) as e: + raise RuntimeError("Given string is neither an existing filename nor valid JSON.") from e + return cls.model_validate(data) diff --git a/pyxdsm/__main__.py b/pyxdsm/__main__.py new file mode 100644 index 0000000..bb95984 --- /dev/null +++ b/pyxdsm/__main__.py @@ -0,0 +1,82 @@ +""" +Command-line interface for pyXDSM. + +Allows running pyXDSM from the command line: + python -m pyxdsm input.json -o output.pdf +""" + +import argparse +import sys +from pathlib import Path + +from .XDSM import XDSM + + +def main(): + """Main entry point for the pyxdsm CLI.""" + parser = argparse.ArgumentParser( + description="Generate XDSM diagrams from JSON specification files", prog="python -m pyxdsm" + ) + + parser.add_argument("input", type=str, help="Input JSON specification file") + + parser.add_argument( + "-o", + "--output", + type=str, + required=False, + default=None, + help="Output file path (e.g., output.pdf or output.tikz). If not provided, defaults to input filename with .pdf extension", + ) + + parser.add_argument( + "-c", + "--cleanup", + action="store_true", + default=True, + help="Clean up auxiliary files after PDF build (default: True)", + ) + + parser.add_argument("-q", "--quiet", action="store_true", default=False, help="Suppress pdflatex output") + + args = parser.parse_args() + + # Load XDSM from JSON + try: + xdsm = XDSM.from_json(args.input) + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Error loading JSON: {e}", file=sys.stderr) + sys.exit(1) + + if args.output is None: + input_path = Path(args.input) + output_path = input_path.with_suffix(".pdf") + else: + output_path = Path(args.output) + + outdir = str(output_path.parent) if output_path.parent != Path(".") else "." + file_name = output_path.stem + extension = output_path.suffix.lower() + + if extension.lower() == ".json": + xdsm.to_json(output_path) + print(f"Successfully generated {output_path}") + else: + if extension.lower() not in (".pdf", ".tikz"): + print(f"Warning: Unknown output extension '{extension}'. Defaulting to PDF build.", file=sys.stderr) + extension = ".pdf" + xdsm.write( + file_name=file_name, + build=extension.lower() == ".pdf", + cleanup=args.cleanup, + quiet=args.quiet, + outdir=outdir, + ) + print(f"Successfully generated {args.output}") + + +if __name__ == "__main__": + main() diff --git a/pyxdsm/matrix_eqn.py b/pyxdsm/matrix_eqn.py index ab6957c..fadbac4 100644 --- a/pyxdsm/matrix_eqn.py +++ b/pyxdsm/matrix_eqn.py @@ -1,8 +1,8 @@ import os import subprocess from collections import namedtuple -import numpy as np +import numpy as np # color pallette link: http://paletton.com/#uid=72Q1j0kllllkS5tKC9H96KClOKC @@ -247,7 +247,7 @@ def _write_tikz(tikz, out_file, build=True, cleanup=True): os.remove(f_name) -class TotalJacobian(object): +class TotalJacobian: def __init__(self): self._variables = {} self._j_inputs = {} @@ -366,7 +366,7 @@ def write(self, out_file=None, build=True, cleanup=True): _write_tikz(jac_tikz, out_file, build, cleanup) -class MatrixEquation(object): +class MatrixEquation: def __init__(self): self._variables = {} self._ij_variables = {} diff --git a/pyxdsm/xdsm_latex_writer.py b/pyxdsm/xdsm_latex_writer.py new file mode 100644 index 0000000..89bff44 --- /dev/null +++ b/pyxdsm/xdsm_latex_writer.py @@ -0,0 +1,432 @@ +import os +import re +import subprocess +from typing import TYPE_CHECKING, List, Optional, Tuple, Union + +import numpy as np + +from pyxdsm import __version__ as pyxdsm_version + +if TYPE_CHECKING: + from pyxdsm.XDSM import XDSM + +# LaTeX templates +tikzpicture_template = r""" +%%% Preamble Requirements %%% +% \usepackage{{geometry}} +% \usepackage{{amsfonts}} +% \usepackage{{amsmath}} +% \usepackage{{amssymb}} +% \usepackage{{tikz}} + +% Optional packages such as sfmath set through python interface +% \usepackage{{{optional_packages}}} + +% \usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}} + +%%% End Preamble Requirements %%% + +\input{{"{diagram_styles_path}"}} +\begin{{tikzpicture}} + +\matrix[MatrixSetup]{{ +{nodes}}}; + +% XDSM process chains +{process} + +\begin{{pgfonlayer}}{{data}} +\path +{edges} +\end{{pgfonlayer}} + +\end{{tikzpicture}} +""" + +tex_template = r""" +% XDSM diagram created with pyXDSM {version}. +\documentclass{{article}} +\usepackage{{geometry}} +\usepackage{{amsfonts}} +\usepackage{{amsmath}} +\usepackage{{amssymb}} +\usepackage{{tikz}} + +% Optional packages such as sfmath set through python interface +\usepackage{{{optional_packages}}} + +% Define the set of TikZ packages to be included in the architecture diagram document +\usetikzlibrary{{arrows,chains,positioning,scopes,shapes.geometric,shapes.misc,shadows}} + + +% Set the border around all of the architecture diagrams to be tight to the diagrams themselves +% (i.e. no longer need to tinker with page size parameters) +\usepackage[active,tightpage]{{preview}} +\PreviewEnvironment{{tikzpicture}} +\setlength{{\PreviewBorder}}{{5pt}} + +\begin{{document}} + +\input{{"{tikzpicture_path}"}} + +\end{{document}} +""" + + +def _chunk_label(label, n_chunks): + for i in range(0, len(label), n_chunks): + yield label[i : i + n_chunks] + + +def _sanitize_tikz_name(name: str) -> str: + """ + Sanitize a node name to be TikZ-compatible. + + TikZ node names cannot contain certain characters like periods, spaces, + and other special characters. This function replaces them with safe alternatives. + + Parameters + ---------- + name : str + Original node name (may contain periods, spaces, etc.) + + Returns + ------- + str + Sanitized name safe for use as TikZ node identifier + """ + # Replace periods with underscores + sanitized = name.replace(".", "_") + # Replace spaces with underscores + sanitized = sanitized.replace(" ", "_") + # Replace other problematic characters with underscores + sanitized = re.sub(r"[^\w\-]", "_", sanitized) + return sanitized + + +def _parse_label(label: Union[str, List[str], Tuple[str, ...]], label_width: Optional[int] = None) -> str: + """Parse label into LaTeX format.""" + if isinstance(label, (tuple, list)): + if label_width is None: + return r"$\begin{array}{c}" + r" \\ ".join(label) + r"\end{array}$" + else: + labels = [] + for chunk in _chunk_label(label, label_width): + labels.append(", ".join(chunk)) + return r"$\begin{array}{c}" + r" \\ ".join(labels) + r"\end{array}$" + else: + return rf"${label}$" + + +class XDSMLatexWriter: + """ + Writer class for generating LaTeX/TikZ output from XDSM diagrams. + """ + + @staticmethod + def _build_node_grid(xdsm: "XDSM") -> str: + """Build the TikZ node grid.""" + size = len(xdsm.systems) + comps_rows = np.arange(size) + comps_cols = np.arange(size) + + if xdsm.inputs: + size += 1 + comps_rows += 1 + + if any(out.side == "left" for out in xdsm.outputs.values()): + size += 1 + comps_cols += 1 + + if any(out.side == "right" for out in xdsm.outputs.values()): + size += 1 + + row_idx_map = {} + col_idx_map = {} + + node_str = r"\node [{style}] ({node_name}) {{{node_label}}};" + grid = np.empty((size, size), dtype=object) + grid[:] = "" + + # Add diagonal systems + for i_row, j_col, comp in zip(comps_rows, comps_cols, xdsm.systems): + style = comp.style + if comp.stack: + style += ",stack" + if comp.faded: + style += ",faded" + + label = _parse_label(comp.label, comp.label_width) + sanitized_name = _sanitize_tikz_name(comp.node_name) + node = node_str.format(style=style, node_name=sanitized_name, node_label=label) + grid[i_row, j_col] = node + + row_idx_map[comp.node_name] = i_row + col_idx_map[comp.node_name] = j_col + + # Add off-diagonal connection nodes + for conn in xdsm.connections: + src_row = row_idx_map[conn.src] + target_col = col_idx_map[conn.target] + + style = conn.style + if conn.stack: + style += ",stack" + if conn.faded: + style += ",faded" + + label = _parse_label(conn.label, conn.label_width) + node_name = f"{_sanitize_tikz_name(conn.src)}-{_sanitize_tikz_name(conn.target)}" + node = node_str.format(style=style, node_name=node_name, node_label=label) + + grid[src_row, target_col] = node + + # Add left outputs + for comp_name, out in xdsm.outputs.items(): + if out.side != "left": + continue + style = out.style + if out.stack: + style += ",stack" + if out.faded: + style += ",faded" + + i_row = row_idx_map[comp_name] + label = _parse_label(out.label, out.label_width) + sanitized_name = _sanitize_tikz_name(out.node_name) + node = node_str.format(style=style, node_name=sanitized_name, node_label=label) + grid[i_row, 0] = node + + # Add right outputs + for comp_name, out in xdsm.outputs.items(): + if out.side != "right": + continue + style = out.style + if out.stack: + style += ",stack" + if out.faded: + style += ",faded" + + i_row = row_idx_map[comp_name] + label = _parse_label(out.label, out.label_width) + sanitized_name = _sanitize_tikz_name(out.node_name) + node = node_str.format(style=style, node_name=sanitized_name, node_label=label) + grid[i_row, -1] = node + + # Add inputs + for comp_name, inp in xdsm.inputs.items(): + style = inp.style + if inp.stack: + style += ",stack" + if inp.faded: + style += ",faded" + + j_col = col_idx_map[comp_name] + label = _parse_label(inp.label, inp.label_width) + sanitized_name = _sanitize_tikz_name(inp.node_name) + node = node_str.format(style=style, node_name=sanitized_name, node_label=label) + grid[0, j_col] = node + + # Convert grid to string + rows_str = "" + for i, row in enumerate(grid): + rows_str += f"%Row {i}\n" + "&\n".join(row) + r"\\" + "\n" + + return rows_str + + @staticmethod + def _build_edges(xdsm: "XDSM") -> str: + """Build the TikZ edge definitions.""" + h_edges = [] + v_edges = [] + + edge_format = "({start}) edge [{style}] ({end})" + + for conn in xdsm.connections: + h_style = "DataLine" + v_style = "DataLine" + + if conn.src_faded or conn.faded: + h_style += ",faded" + if conn.target_faded or conn.faded: + v_style += ",faded" + + src_sanitized = _sanitize_tikz_name(conn.src) + target_sanitized = _sanitize_tikz_name(conn.target) + od_node = f"{src_sanitized}-{target_sanitized}" + h_edges.append(edge_format.format(start=src_sanitized, end=od_node, style=h_style)) + v_edges.append(edge_format.format(start=od_node, end=target_sanitized, style=v_style)) + + for comp_name, out in xdsm.outputs.items(): + if out.side != "left": + continue + style = "DataLine" + if out.faded: + style += ",faded" + comp_sanitized = _sanitize_tikz_name(comp_name) + out_sanitized = _sanitize_tikz_name(out.node_name) + h_edges.append(edge_format.format(start=comp_sanitized, end=out_sanitized, style=style)) + + for comp_name, out in xdsm.outputs.items(): + if out.side != "right": + continue + style = "DataLine" + if out.faded: + style += ",faded" + comp_sanitized = _sanitize_tikz_name(comp_name) + out_sanitized = _sanitize_tikz_name(out.node_name) + h_edges.append(edge_format.format(start=comp_sanitized, end=out_sanitized, style=style)) + + for comp_name, inp in xdsm.inputs.items(): + style = "DataLine" + if inp.faded: + style += ",faded" + comp_sanitized = _sanitize_tikz_name(comp_name) + inp_sanitized = _sanitize_tikz_name(inp.node_name) + v_edges.append(edge_format.format(start=comp_sanitized, end=inp_sanitized, style=style)) + + h_edges = sorted(h_edges, key=lambda s: "faded" in s) + v_edges = sorted(v_edges, key=lambda s: "faded" in s) + + paths_str = "% Horizontal edges\n" + "\n".join(h_edges) + "\n" + paths_str += "% Vertical edges\n" + "\n".join(v_edges) + ";" + + return paths_str + + @staticmethod + def _build_process_chain(xdsm: "XDSM") -> str: + """Build the TikZ process chain definitions.""" + sys_names = [s.node_name for s in xdsm.systems] + output_names = [inp.node_name for inp in xdsm.inputs.values()] + [ + out.node_name for out in xdsm.outputs.values() + ] + + chain_str = "" + + for proc in xdsm.processes: + chain_str += "\\begin{scope}[start chain=process]\n" + chain_str += "\\begin{pgfonlayer}{process}\n" + start_tip = False + + for i, sys in enumerate(proc.systems): + if sys not in sys_names and sys not in output_names: + raise ValueError(f'Process includes system "{sys}" but no such system exists') + + if sys in output_names and i == 0: + start_tip = True + + sys_sanitized = _sanitize_tikz_name(sys) + + if i == 0: + chain_str += f"\\chainin ({sys_sanitized});\n" + else: + if sys in output_names or (i == 1 and start_tip): + style = "ProcessTipA" if proc.arrow else "ProcessTip" + else: + style = "ProcessHVA" if proc.arrow else "ProcessHV" + + if proc.faded: + style = "Faded" + style + + chain_str += f"\\chainin ({sys_sanitized}) [join=by {style}];\n" + + chain_str += "\\end{pgfonlayer}\n" + chain_str += "\\end{scope}\n" + + return chain_str + + @staticmethod + def _compose_optional_package_list(xdsm: "XDSM") -> str: + """Compose the optional LaTeX package list.""" + # Check for optional LaTeX packages + packages = xdsm.optional_packages.copy() + + if xdsm.use_sfmath: + packages.append("sfmath") + + # Join all packages into one string separated by comma + return ",".join(packages) + + @staticmethod + def write( + xdsm: "XDSM", file_name: str, build: bool = True, cleanup: bool = True, quiet: bool = False, outdir: str = "." + ) -> None: + """ + Write latex output files for the XDSM diagram. + + This produces the following: + + - {file_name}.tikz + A file containing the TikZ definition of the XDSM diagram. + - {file_name}.tex + A standalone document wrapped around an include of the TikZ file which can + be compiled to a pdf. + - {file_name}.pdf + An optional compiled version of the standalone tex file. + + Parameters + ---------- + xdsm : XDSM + The XDSM diagram object to write + file_name : str + Prefix for output files + build : bool + Whether to compile the PDF + cleanup : bool + Whether to delete build files after compilation + quiet : bool + Suppress pdflatex output + outdir : str + Output directory path + """ + nodes = XDSMLatexWriter._build_node_grid(xdsm) + edges = XDSMLatexWriter._build_edges(xdsm) + process = XDSMLatexWriter._build_process_chain(xdsm) + + module_path = os.path.dirname(__file__) + diagram_styles_path = os.path.join(module_path, "diagram_styles") + diagram_styles_path = diagram_styles_path.replace("\\", "/") + + optional_packages_str = XDSMLatexWriter._compose_optional_package_list(xdsm) + + tikzpicture_str = tikzpicture_template.format( + nodes=nodes, + edges=edges, + process=process, + diagram_styles_path=diagram_styles_path, + optional_packages=optional_packages_str, + ) + + base_output_fp = os.path.join(outdir, file_name) + with open(base_output_fp + ".tikz", "w") as f: + f.write(tikzpicture_str) + + tex_str = tex_template.format( + nodes=nodes, + edges=edges, + tikzpicture_path=file_name + ".tikz", + diagram_styles_path=diagram_styles_path, + optional_packages=optional_packages_str, + version=pyxdsm_version, + ) + + with open(base_output_fp + ".tex", "w") as f: + f.write(tex_str) + + if build: + command = [ + "pdflatex", + "-halt-on-error", + "-interaction=nonstopmode", + f"-output-directory={outdir}", + ] + if quiet: + command += ["-interaction=batchmode", "-halt-on-error"] + command += [f"{file_name}.tex"] + subprocess.run(command, check=True) + + if cleanup: + for ext in ["aux", "fdb_latexmk", "fls", "log"]: + f_name = f"{base_output_fp}.{ext}" + if os.path.exists(f_name): + os.remove(f_name) diff --git a/setup.py b/setup.py deleted file mode 100644 index ea8d36a..0000000 --- a/setup.py +++ /dev/null @@ -1,40 +0,0 @@ -from setuptools import setup -import re -from os import path - -__version__ = re.findall( - r"""__version__ = ["']+([0-9\.]*)["']+""", - open("pyxdsm/__init__.py").read(), -)[0] - -this_directory = path.abspath(path.dirname(__file__)) -with open(path.join(this_directory, "README.md"), encoding="utf-8") as f: - long_description = f.read() - -setup( - name="pyXDSM", - version=__version__, - description="Python script to generate PDF XDSM diagrams using TikZ and LaTeX", - long_description=long_description, - long_description_content_type="text/markdown", - keywords="optimization multidisciplinary multi-disciplinary analysis n2 xdsm", - author="", - author_email="", - url="https://github.com/mdolab/pyXDSM", - license="Apache License Version 2.0", - packages=[ - "pyxdsm", - ], - package_data={"pyxdsm": ["*.tex"]}, - install_requires=["numpy>=1.21"], - python_requires=">=3", - classifiers=[ - "Operating System :: OS Independent", - "Programming Language :: Python", - "Topic :: Scientific/Engineering", - "Programming Language :: Python", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - ], -) diff --git a/tests/test_xdsm.py b/tests/test_xdsm.py index 9a2def3..3850d90 100644 --- a/tests/test_xdsm.py +++ b/tests/test_xdsm.py @@ -1,10 +1,10 @@ -import unittest import os import shutil -import tempfile import subprocess -from pyxdsm.XDSM import XDSM, OPT, FUNC, SOLVER, LEFT, RIGHT -from numpy.distutils.exec_command import find_executable +import tempfile +import unittest + +from pyxdsm.XDSM import FUNC, LEFT, OPT, RIGHT, SOLVER, XDSM basedir = os.path.dirname(os.path.abspath(__file__)) @@ -44,7 +44,7 @@ def test_examples(self): self.assertTrue(os.path.isfile(f + ".tikz")) self.assertTrue(os.path.isfile(f + ".tex")) # look for the pdflatex executable - pdflatex = find_executable("pdflatex") is not None + pdflatex = shutil.which("pdflatex") is not None # if no pdflatex, then do not assert that the pdf was compiled self.assertTrue(not pdflatex or os.path.isfile(f + ".pdf")) subprocess.run(["python", "mat_eqn.py"], check=True) @@ -52,17 +52,43 @@ def test_examples(self): # change back to previous directory os.chdir(self.tempdir) + def test_examples_json(self): + """ + This test just builds the three examples, and assert that the output files exist. + Unlike the other tests, this one requires LaTeX to be available. + """ + # we first copy the examples to the temp dir + shutil.copytree(os.path.join(basedir, "../examples"), os.path.join(self.tempdir, "examples_json")) + os.chdir(os.path.join(self.tempdir, "examples_json")) + + filenames = ["kitchen_sink", "mdf"] + for f in filenames: + subprocess.run(["python", "-m", "pyxdsm", f"{f}.json"], check=True) + self.assertTrue(os.path.isfile(f + ".tikz")) + self.assertTrue(os.path.isfile(f + ".tex")) + # look for the pdflatex executable + pdflatex = shutil.which("pdflatex") is not None + # if no pdflatex, then do not assert that the pdf was compiled + self.assertTrue(not pdflatex or os.path.isfile(f + ".pdf")) + + # change back to previous directory + os.chdir(self.tempdir) + def test_connect(self): + from pydantic import ValidationError + x = XDSM(use_sfmath=False) x.add_system("D1", FUNC, "D_1", label_width=2) x.add_system("D2", FUNC, "D_2", stack=False) - try: + # Pydantic raises ValidationError when validation fails + with self.assertRaises(ValidationError) as context: x.connect("D1", "D2", r"\mathcal{R}(y_1)", "foobar") - except ValueError as err: - self.assertEquals(str(err), "label_width argument must be an integer") - else: - self.fail("Expected ValueError") + + # Check that the error is about label_width + error_msg = str(context.exception) + self.assertIn("label_width", error_msg) + self.assertIn("Input should be a valid integer", error_msg) def test_options(self): filename = "xdsm_test_options" @@ -115,7 +141,7 @@ def test_stacked_system(self): x.write(file_name) tikz_file = file_name + ".tikz" - with open(tikz_file, "r") as f: + with open(tikz_file) as f: tikz = f.read() self.assertIn(r"\node [Optimization,stack]", tikz) @@ -279,7 +305,7 @@ def test_tikz_content(self): sample_lines = sample_txt.split("\n") sample_lines = filter_lines(sample_lines) - with open(tikz_file, "r") as f: + with open(tikz_file) as f: new_lines = filter_lines(f.readlines()) sample_no_match = [] # Sample text @@ -321,6 +347,74 @@ def test_write_outdir(self): # no files outside the subdirs self.assertFalse(any(os.path.isfile(fp) for fp in os.listdir(self.tempdir))) + def test_serialize_deserialize(self): + filename = "xdsm_test_ser_deser" + + # Change `use_sfmath` to False to use computer modern + x = XDSM(use_sfmath=False) + + x.add_system("opt", OPT, r"\text{Optimizer}") + x.add_system("solver", SOLVER, r"\text{Newton}") + x.add_system("D1", FUNC, "D_1", label_width=2) + x.add_system("D2", FUNC, "D_2", stack=False) + x.add_system("F", FUNC, "F", faded=True) + x.add_system("G", FUNC, "G", spec_name="G_spec") + + x.connect("opt", "D1", "x, z") + x.connect("opt", "D2", "z") + x.connect("opt", "F", "x, z") + x.connect("solver", "D1", "y_2") + x.connect("solver", "D2", "y_1") + x.connect("D1", "solver", r"\mathcal{R}(y_1)") + x.connect("solver", "F", "y_1, y_2") + x.connect("D2", "solver", r"\mathcal{R}(y_2)") + x.connect("solver", "G", "y_1, y_2") + + x.connect("F", "opt", "f") + x.connect("G", "opt", "g") + + x.add_output("opt", "x^*, z^*", side=RIGHT) + x.add_output("D1", "y_1^*", side=LEFT, stack=True) + x.add_output("D2", "y_2^*", side=LEFT) + x.add_output("F", "f^*", side=LEFT) + x.add_output("G", "g^*") + + # Save to JSON file + json_file = filename + ".json" + x.to_json(json_file) + + # Verify JSON file was created + self.assertTrue(os.path.isfile(json_file)) + + # Load from JSON file + x_loaded = XDSM.from_json(json_file) + + # Verify the loaded XDSM is equivalent to the original + # Compare the model_dump() output (which gives us the full state) + original_dict = x.model_dump() + loaded_dict = x_loaded.model_dump() + + self.assertEqual(original_dict, loaded_dict) + + # Corrupt the file to test a json load failure + shutil.copyfile(json_file, "corrupt.json") + with open("corrupt.json", "a") as f: + f.write(":lk23jr22091k") + + with self.assertRaises(RuntimeError) as e: + XDSM.from_json("corrupt.json") + + expected = "Unable to load JSON from file" + self.assertIn(expected, str(e.exception)) + + def test_invalid_json(self): + # Load from invalid JSON + with self.assertRaises(RuntimeError) as e: + XDSM.from_json(":11234_invalid_json") + + expected = "Given string is neither an existing filename nor valid JSON." + self.assertIn(expected, str(e.exception)) + if __name__ == "__main__": unittest.main()