Skip to content
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ __pycache__/
# C extensions
*.so

.DS_Store


# Distribution / packaging
.Python
build/
Expand Down
Empty file added src/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions src/cell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
File Name: cell.py

Copyright (c) 2023 - 2024 IndoorJson

Author: Ziwei Xiang <[email protected]>
Create Date: 2024/3/14
"""

import json
from functools import wraps
from typing import Dict
from shapely.wkt import loads
from shapely.geometry.base import BaseGeometry


def type_check(func):
@wraps(func)
def wrapper(self, cell_id: str, properties: Dict, space: BaseGeometry, node: BaseGeometry, *args, **kwargs):
if not isinstance(cell_id, str):
raise TypeError("cell_id must be a string")
if not isinstance(properties, dict):
raise TypeError("properties must be a dictionary")
if not isinstance(space, BaseGeometry):
raise TypeError("space must be a BaseGeometry instance")
if not isinstance(node, BaseGeometry):
raise TypeError("node must be a BaseGeometry instance")
return func(self, cell_id, properties, space, node, *args, **kwargs)
return wrapper


class Cell:

@type_check
def __init__(self, cell_id: str, properties: Dict, space: BaseGeometry, node: BaseGeometry):
self.__id: str = cell_id
self.__properties: Dict = properties
self.__space: BaseGeometry = space
self.__node: BaseGeometry = node

@property
def id(self) -> str:
return self.__id

@property
def properties(self) -> Dict:
return self.__properties

@property
def space(self) -> BaseGeometry:
return self.__space

@property
def node(self) -> BaseGeometry:
return self.__node

def to_json(self) -> Dict:
result = {}
for key, value in self.__dict__.items():
json_key = key.split('__')[-1]
if isinstance(value, BaseGeometry):
result[json_key] = value.wkt
elif json_key == 'id':
json_key = '$id'
result[json_key] = value
else:
result[json_key] = value
return result

@classmethod
def from_json(cls, json_dict: Dict) -> 'Cell':
json_dict['cell_id'] = json_dict.pop('$id')
json_dict['space'] = loads(json_dict['space'])
json_dict['node'] = loads(json_dict['node'])
return cls(**json_dict)
91 changes: 91 additions & 0 deletions src/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
File Name: connection.py

Copyright (c) 2023 - 2024 IndoorJson

Author: Ziwei Xiang <[email protected]>
Create Date: 2024/3/14
"""

import json
from functools import wraps
from typing import Dict
from shapely.wkt import loads
from shapely.geometry.base import BaseGeometry


def type_check(func):
@wraps(func)
def wrapper(self, connection_id: str, properties: dict, fr: str, to: str,
bound: BaseGeometry, edge: BaseGeometry, *args, **kwargs):
if not isinstance(connection_id, str):
raise TypeError("connection_id must be a string")
if not isinstance(properties, dict):
raise TypeError("properties must be a dictionary")
if not isinstance(fr, str):
raise TypeError("source must be a string")
if not isinstance(to, str):
raise TypeError("target must be a string")
if not isinstance(bound, BaseGeometry):
raise TypeError("bound must be a BaseGeometry instance")
if not isinstance(edge, BaseGeometry):
raise TypeError("edge must be a BaseGeometry instance")
return func(self, connection_id, properties, fr, to, bound, edge, *args, **kwargs)
return wrapper


class Connection:

@type_check
def __init__(self, connections_id: str, properties: Dict, fr: str, to: str,
bound: BaseGeometry, edge: BaseGeometry):
self.__id: str = connections_id
self.__properties: Dict = properties
self.__fr: str = fr
self.__to: str = to
self.__bound: BaseGeometry = bound
self.__edge: BaseGeometry = edge

@property
def id(self) -> str:
return self.__id

@property
def properties(self) -> dict:
return self.__properties

@property
def source(self) -> str:
return self.__fr

@property
def target(self) -> str:
return self.__to

@property
def bound(self) -> BaseGeometry:
return self.__bound

@property
def edge(self) -> BaseGeometry:
return self.__edge

def to_json(self) -> Dict:
result = {}
for key, value in self.__dict__.items():
json_key = key.split('__')[-1]
if isinstance(value, BaseGeometry):
result[json_key] = value.wkt
elif json_key == 'id':
json_key = '$id'
result[json_key] = value
else:
result[json_key] = value
return result

@classmethod
def from_json(cls, json_dict: Dict) -> 'Connection':
json_dict['connection_id'] = json_dict.pop('$id')
json_dict['bound'] = loads(json_dict['bound'])
json_dict['edge'] = loads(json_dict['edge'])
return cls(**json_dict)
183 changes: 183 additions & 0 deletions src/indoorspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
File Name: indoorspace.py

Copyright (c) 2023 - 2024 IndoorJson

Author: Ziwei Xiang <[email protected]>
Create Date: 2024/3/14
"""

import json
import numpy as np
from cell import Cell
from connection import Connection
from layer import Layer
from rlines import Rlines
from typing import List, Dict


class IndoorSpace:

def __init__(self):
self._properties: Dict = {}
self._cells: List[Cell] = []
self._connections: List[Connection] = []
self._layers: List[Layer] = []
self._rlineses: List[Rlines] = []
self._hypergraph: Dict = {}

@property
def properties(self) -> Dict:
return self._properties

@property
def cells(self) -> List[Cell]:
return self._cells

@property
def connections(self) -> List[Connection]:
return self._connections

@property
def layers(self) -> List[Layer]:
return self._layers

@property
def rlineses(self) -> List[Rlines]:
return self._rlineses

@property
def hypergraph(self) -> Dict:
return self._hypergraph

def set_properties(self, properties: Dict):
self._properties = properties

def add_cell(self, cell: Cell):
if cell.id not in [c.id for c in self._cells]:
self._cells.append(cell)
else:
raise ValueError('Cell id already exists')

def add_connection(self, connection: Connection):
if connection.id not in [c.id for c in self._connections]:
if connection.source in [
c.id for c in self._cells
] and connection.target in [c.id for c in self._cells]:
self._connections.append(connection)
elif connection.source not in [
c.id for c in self._cells
] and connection.target in [c.id for c in self._cells]:
raise ValueError('Source cell does not exist')
elif connection.source in [
c.id for c in self._cells
] and connection.target not in [c.id for c in self._cells]:
raise ValueError('Target cell does not exist')
else:
raise ValueError('Source and target cell do not exist')
else:
raise ValueError('Connection id already exists')

def set_layers(self, layers: Layer):
self._layers.append(layers)

def set_rlineses(self, rlineses: Rlines):
self._rlineses.append(rlineses)

def get_incident_matrix(self):
cells = self.cells
connections = self.connections
incident_matrix = np.zeros((len(cells), len(connections)), dtype=int)
for j, connection in enumerate(connections):
source = self.get_cell_from_id(connection.source)
target = self.get_cell_from_id(connection.target)
source_index = cells.index(source)
target_index = cells.index(target)
incident_matrix[source_index, j] = 1
incident_matrix[target_index, j] = -1
return incident_matrix

def get_hypergraph_incidence_matrix(self):
return self.get_incident_matrix().T

def get_hypergraph(self):
cells = self.cells
connections = self.connections
rlineses = self.rlineses
hypergraph = self._hypergraph
hypergraph['hyperNodes'] = []
hypergraph['hyperEdges'] = []
incident_matrix = self.get_incident_matrix()
incident_matrix_transpose = incident_matrix.T

for hyperNode in connections:
hypergraph['hyperNodes'].append(hyperNode.to_json())

for j in range(incident_matrix_transpose.shape[1]):
hyperEdge = {}
inner_edge_id = {'ins': [], 'outs': []}
for i in range(incident_matrix_transpose.shape[0]):
if incident_matrix_transpose[i, j] != 0:
if incident_matrix_transpose[i, j] == -1:
inner_edge_ins_id = connections[i].id
inner_edge_id['ins'].append(inner_edge_ins_id)
elif incident_matrix_transpose[i, j] == 1:
inner_edge_outs_id = connections[i].id
inner_edge_id['outs'].append(inner_edge_outs_id)
else:
raise ValueError('Incident matrix error')
hyperEdge['id'] = cells[j].id
hyperEdge['properties'] = cells[j].properties
hyperEdge['space'] = cells[j].space.wkt
hyperEdge['node'] = cells[j].node.wkt
hyperEdge['inner_nodeset'] = inner_edge_id

for rlines in rlineses:
if rlines.cell == cells[j].id:
hyperEdge['closure'] = rlines.closure
break

hypergraph['hyperEdges'].append(hyperEdge)

self.set_hypergraph(hypergraph)

return hypergraph

def set_hypergraph(self, hypergraph):
self._hypergraph = hypergraph

def get_cell_from_id(self, cell_id):
for cell in self.cells:
if cell.id == cell_id:
return cell
return None

def get_connection_from_id(self, connection_id):
for connection in self.connections:
if connection.id == connection_id:
return connection
return None

def to_json(self) -> Dict:
result = {}
for key, value in self.__dict__.items():
if key == '_hypergraph':
continue
elif key == '_properties':
result[key.strip('_')] = value
else:
result[key.strip('_')] = [item.to_json() for item in value]
return result

@classmethod
def from_json(cls, json_str: str) -> 'IndoorSpace':
json_data = json.loads(json_str)
instance = cls()
for key, value in json_data.items():
if key == 'properties':
setattr(instance, f"_{key}", value)
elif key == 'rlineses':
setattr(instance, f"_{key}", [eval(key.capitalize()[:-2]).from_json(item) for item in value])
else:
setattr(instance, f"_{key}", [eval(key.capitalize()[:-1]).from_json(item) for item in value])
return instance
Loading