diff --git a/easygraph/functions/drawing/geometry.py b/easygraph/functions/drawing/geometry.py index b3e9981d..4e71dc63 100644 --- a/easygraph/functions/drawing/geometry.py +++ b/easygraph/functions/drawing/geometry.py @@ -25,6 +25,8 @@ def vlen(vector): def common_tangent_radian(r1, r2, d): value = abs(r2 - r1) / d + if value > 1.0: value = 1.0 + elif value < -1.0: value = -1.0 alpha = math.acos(value) alpha = alpha if r1 > r2 else pi - alpha return alpha diff --git a/easygraph/tests/test_hif.py b/easygraph/tests/test_hif.py new file mode 100644 index 00000000..a598bdc6 --- /dev/null +++ b/easygraph/tests/test_hif.py @@ -0,0 +1,95 @@ +import unittest +import json +import os +import tempfile +import easygraph as eg +from pathlib import Path + +MOCK_HIF_DATA = { + "metadata": { + "name": "test_organism", + "description": "Simulation for unit test" + }, + "network-type": "directed", + "nodes": [ + {"node": "n1", "weight": 1.0, "attrs": {"name": "Node A"}}, + {"node": "n2", "weight": 1.0, "attrs": {"name": "Node B"}} + ], + "edges": [ + {"edge": "e1", "weight": 1.0, "attrs": {"name": "Edge Alpha"}} + ], + "incidences": [ + {"edge": "e1", "node": "n1", "weight": 1.0, "direction": "tail"}, + {"edge": "e1", "node": "n2", "weight": 1.0, "direction": "head"} + ] +} + +class HIFTest(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_dir_path = Path(self.temp_dir.name) + + self.input_file = self.temp_dir_path / "input_mock.hif.json" + self.output_file = self.temp_dir_path / "output_result.hif.json" + + with open(self.input_file, "w", encoding="utf-8") as f: + json.dump(MOCK_HIF_DATA, f) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_hif_roundtrip_preservation(self): + """ + Test that custom attributes are preserved AND the generated + EasyGraph object is structurally valid. + """ + hg = eg.hif_to_hypergraph(filename=self.input_file) + + self.assertEqual(hg.num_v, 2, "Loaded graph should have 2 nodes") + self.assertEqual(hg.num_e, 1, "Loaded graph should have 1 edge") + + node_names = [props.get('name') for props in hg.v_property] + self.assertIn("n1", node_names, "Node ID 'n1' should be in v_property") + + edges = hg.e[0] + self.assertEqual(len(edges), 1, "Should have 1 edge group") + self.assertEqual(len(edges[0]), 2, "Edge e1 should connect 2 nodes") + + self.assertTrue(hasattr(hg, "custom_hif_incidences"), "Failed to attach custom incidences") + self.assertTrue(hasattr(hg, "metadata"), "Failed to attach metadata") + + eg.hypergraph_to_hif(hg, filename=self.output_file) + + with open(self.output_file, 'r', encoding="utf-8") as f: + res = json.load(f) + + first_incidence = res["incidences"][0] + self.assertIn("direction", first_incidence, "'direction' field lost in roundtrip") + self.assertIn(first_incidence["direction"], ["tail", "head"]) + + self.assertNotIn("default_attrs", res["metadata"], "'default_attrs' was forced into metadata") + self.assertEqual(res["metadata"]["name"], "test_organism") + + def test_manual_graph_export(self): + """Test exporting a manually created Hypergraph (not loaded from file).""" + hg = eg.Hypergraph( + num_v=5, + e_list=[(0, 1, 2), (2, 3), (2, 3), (0, 4)], + merge_op="sum" + ) + hg.metadata = {"created_by": "manual_test"} + + eg.hypergraph_to_hif(hg, filename=self.output_file) + + with open(self.output_file, 'r', encoding="utf-8") as f: + data = json.load(f) + self.assertEqual(len(data["nodes"]), 5) + self.assertEqual(len(data["edges"]), 3) + self.assertEqual(data["metadata"]["created_by"], "manual_test") + + + weights = [e["weight"] for e in data["edges"]] + self.assertIn(2.0, weights, "Merged edge weight should be 2.0") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/easygraph/utils/HIF.py b/easygraph/utils/HIF.py new file mode 100644 index 00000000..ea536163 --- /dev/null +++ b/easygraph/utils/HIF.py @@ -0,0 +1,233 @@ +import json +import requests +import fastjsonschema +from copy import deepcopy +from typing import Optional, Union, List, Dict, Any +from pathlib import Path +from easygraph.classes.hypergraph import Hypergraph + +schema_url = "https://raw.githubusercontent.com/pszufe/HIF_validators/main/schemas/hif_schema_v0.1.0.json" + +class EasyGraphHIFError(Exception): + """Custom exception for HIF conversion errors.""" + pass + +_hif_validator = None + +def _get_hif_validator(): + global _hif_validator + if _hif_validator is None: + try: + resp = requests.get(schema_url, timeout=5) + if resp.status_code == 200: + schema = json.loads(resp.text) + _hif_validator = fastjsonschema.compile(schema) + except Exception: + print("Warning: HIF Schema could not be fetched. Validation skipped.") + _hif_validator = lambda x: True + + return _hif_validator if _hif_validator else (lambda x: True) + +def hypergraph_to_hif( + hg: Hypergraph, + filename: Optional[Union[str, Path]] = None, +) -> dict: + """ + Converts an EasyGraph Hypergraph to HIF JSON. + Correctly handles hg.e tuple structure ((edges), (weights), (props)). + """ + + if hasattr(hg, "custom_hif_nodes"): + nodj = hg.custom_hif_nodes + else: + nodj = [] + num_v = hg.num_v if hasattr(hg, "num_v") else len(hg.v_property) if hasattr(hg, "v_property") else 0 + v_props = getattr(hg, "v_property", [{} for _ in range(num_v)]) + if not v_props and num_v > 0: v_props = [{} for _ in range(num_v)] + + for i in range(num_v): + props = v_props[i] if i < len(v_props) and isinstance(v_props[i], dict) else {} + p = props.copy() + weight = p.pop("weight", 1.0) + node_id = p.pop("name", str(i)) + nodj.append({"node": node_id, "weight": weight, "attrs": p}) + + e_structure = [] + e_weights = [] + e_props = [] + + if hasattr(hg, "e") and isinstance(hg.e, tuple) and len(hg.e) == 3 and \ + isinstance(hg.e[0], (list, tuple)) and isinstance(hg.e[1], (list, tuple)): + e_structure = hg.e[0] + e_weights = hg.e[1] + e_props = hg.e[2] + + elif hasattr(hg, "e_list") and hg.e_list: + e_structure = hg.e_list + e_weights = getattr(hg, "e_weight", [1.0] * len(e_structure)) + e_props = getattr(hg, "e_property_full", [{} for _ in range(len(e_structure))]) + + elif hasattr(hg, "e") and isinstance(hg.e, (list, tuple)): + e_structure = hg.e + e_weights = getattr(hg, "e_weight", [1.0] * len(e_structure)) + e_props = getattr(hg, "e_property_full", [{} for _ in range(len(e_structure))]) + + num_e = len(e_structure) + + if len(e_weights) < num_e: e_weights = [1.0] * num_e + if len(e_props) < num_e: e_props = [{} for _ in range(num_e)] + + if hasattr(hg, "custom_hif_edges"): + edgj = hg.custom_hif_edges + else: + edgj = [] + for i in range(num_e): + props = e_props[i].copy() if isinstance(e_props[i], dict) else {} + edge_id = props.pop("name", str(i)) + weight = e_weights[i] + props.pop("weight", None) + edgj.append({"edge": edge_id, "weight": weight, "attrs": props}) + + if hasattr(hg, "custom_hif_incidences"): + incj = hg.custom_hif_incidences + else: + incj = [] + node_id_list = [n["node"] for n in nodj] + edge_id_list = [e["edge"] for e in edgj] + + for e_idx, nodes_in_edge in enumerate(e_structure): + if e_idx >= len(edge_id_list): break + edge_name = edge_id_list[e_idx] + + flat_nodes = [] + if isinstance(nodes_in_edge, (list, tuple)): + for item in nodes_in_edge: + if isinstance(item, (list, tuple)): + flat_nodes.extend(item) + else: + flat_nodes.append(item) + else: + flat_nodes = [nodes_in_edge] + + for n_idx in flat_nodes: + try: + n_idx_int = int(n_idx) + if 0 <= n_idx_int < len(node_id_list): + incj.append({ + "edge": edge_name, + "node": node_id_list[n_idx_int], + "weight": 1.0, + }) + except (ValueError, TypeError): + continue + + metadata = getattr(hg, "metadata", {}) + network_type = getattr(hg, "network_type", "undirected") + + hif = { + "nodes": nodj, + "edges": edgj, + "incidences": incj, + "network-type": network_type, + "metadata": metadata + } + + try: + validator = _get_hif_validator() + validator(hif) + except Exception as e: + print(f"Validation Warning: {e}") + + if filename: + with open(filename, "w", encoding='utf-8') as f: + json.dump(hif, f, indent=4, ensure_ascii=False) + + return hif + + +def hif_to_hypergraph( + hif: dict = None, + filename: Optional[Union[str, Path]] = None, +): + """ + Reads HIF JSON and returns an EasyGraph Hypergraph. + Attaches original JSON parts to 'custom_hif_*' attributes to preserve + structure during round-trips. + """ + if hif is None: + if filename is None: + raise EasyGraphHIFError("No HIF data or filename provided.") + try: + with open(filename, "r", encoding='utf-8') as f: + hif = json.load(f) + except Exception as e: + raise EasyGraphHIFError(f"Failed to load HIF file {filename}: {e}") + + nodes_list = hif.get("nodes", []) + node_name_to_idx = {rec["node"]: i for i, rec in enumerate(nodes_list)} + num_v = len(nodes_list) + + edges_list = hif.get("edges", []) + edge_name_to_idx = {rec["edge"]: i for i, rec in enumerate(edges_list)} + num_e = len(edges_list) + + v_property = [{} for _ in range(num_v)] + for rec in nodes_list: + idx = node_name_to_idx.get(rec["node"]) + if idx is not None: + + prop = rec.get("attrs", {}).copy() + prop["name"] = rec["node"] + prop["weight"] = rec.get("weight", 1.0) + v_property[idx] = prop + + e_property_full = [{} for _ in range(num_e)] + e_weight = [1.0] * num_e + + for rec in edges_list: + idx = edge_name_to_idx.get(rec["edge"]) + if idx is not None: + prop = rec.get("attrs", {}).copy() + if "name" not in prop: + prop["name"] = rec["edge"] + + prop["weight"] = rec.get("weight", 1.0) + e_property_full[idx] = prop + e_weight[idx] = prop["weight"] + + raw_groups = [[] for _ in range(num_e)] + + incidences_list = hif.get("incidences", []) + + for inc in incidences_list: + e_name = inc.get("edge") + n_name = inc.get("node") + + e_idx = edge_name_to_idx.get(e_name) + n_idx = node_name_to_idx.get(n_name) + + if e_idx is not None and n_idx is not None: + raw_groups[e_idx].append(n_idx) + + hg = Hypergraph( + num_v=num_v, + e_list=raw_groups, + e_weight=e_weight, + v_property=v_property + ) + + hg.custom_hif_nodes = deepcopy(nodes_list) + hg.custom_hif_edges = deepcopy(edges_list) + hg.custom_hif_incidences = deepcopy(incidences_list) + + if "metadata" in hif: + hg.metadata = deepcopy(hif["metadata"]) + else: + hg.metadata = {} + + if "network-type" in hif: + hg.network_type = hif["network-type"] + + hg.e_property_full = e_property_full + + return hg \ No newline at end of file diff --git a/easygraph/utils/__init__.py b/easygraph/utils/__init__.py index 1005a65d..7629c893 100644 --- a/easygraph/utils/__init__.py +++ b/easygraph/utils/__init__.py @@ -11,3 +11,4 @@ from easygraph.utils.relabel import * from easygraph.utils.sparse import * from easygraph.utils.type_change import * +from easygraph.utils.HIF import * \ No newline at end of file