diff --git a/end_to_end_tests/golden-record/my_test_api_client/models/__init__.py b/end_to_end_tests/golden-record/my_test_api_client/models/__init__.py index 0efd7dcb6..c0d59cfc2 100644 --- a/end_to_end_tests/golden-record/my_test_api_client/models/__init__.py +++ b/end_to_end_tests/golden-record/my_test_api_client/models/__init__.py @@ -2,7 +2,6 @@ from .a_form_data import AFormData from .a_model import AModel -from .a_model_with_properties_reference_that_are_not_object import AModelWithPropertiesReferenceThatAreNotObject from .a_model_with_indirect_reference_property import AModelWithIndirectReferenceProperty from .a_model_with_indirect_self_reference_property import AModelWithIndirectSelfReferenceProperty from .a_model_with_properties_reference_that_are_not_object import AModelWithPropertiesReferenceThatAreNotObject diff --git a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_reference_property.py b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_reference_property.py index 14058761d..808bb08b7 100644 --- a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_reference_property.py +++ b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_reference_property.py @@ -31,9 +31,11 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: d = src_dict.copy() - an_enum_indirect_ref: Union[Unset, AnEnum] = UNSET _an_enum_indirect_ref = d.pop("an_enum_indirect_ref", UNSET) - if not isinstance(_an_enum_indirect_ref, Unset): + an_enum_indirect_ref: Union[Unset, AnEnum] + if isinstance(_an_enum_indirect_ref, Unset): + an_enum_indirect_ref = UNSET + else: an_enum_indirect_ref = AnEnum(_an_enum_indirect_ref) a_model_with_indirect_reference_property = cls( diff --git a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_self_reference_property.py b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_self_reference_property.py index 2c101f264..61bf214f6 100644 --- a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_self_reference_property.py +++ b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_indirect_self_reference_property.py @@ -44,9 +44,11 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: d = src_dict.copy() required_self_ref = d.pop("required_self_ref") - an_enum: Union[Unset, AnEnum] = UNSET _an_enum = d.pop("an_enum", UNSET) - if not isinstance(_an_enum, Unset): + an_enum: Union[Unset, AnEnum] + if isinstance(_an_enum, Unset): + an_enum = UNSET + else: an_enum = AnEnum(_an_enum) optional_self_ref = d.pop("optional_self_ref", UNSET) diff --git a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_properties_reference_that_are_not_object.py b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_properties_reference_that_are_not_object.py index 4b95ec80e..515ee649e 100644 --- a/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_properties_reference_that_are_not_object.py +++ b/end_to_end_tests/golden-record/my_test_api_client/models/a_model_with_properties_reference_that_are_not_object.py @@ -6,7 +6,7 @@ from dateutil.parser import isoparse from ..models.an_enum import AnEnum -from ..types import File +from ..types import UNSET, File, Unset T = TypeVar("T", bound="AModelWithPropertiesReferenceThatAreNotObject") @@ -15,165 +15,199 @@ class AModelWithPropertiesReferenceThatAreNotObject: """ """ - enum_properties_ref: List[AnEnum] - str_properties_ref: List[str] - date_properties_ref: List[datetime.date] - datetime_properties_ref: List[datetime.datetime] - int_32_properties_ref: List[int] - int_64_properties_ref: List[int] - float_properties_ref: List[float] - double_properties_ref: List[float] - file_properties_ref: List[File] - bytestream_properties_ref: List[str] - enum_properties: List[AnEnum] - str_properties: List[str] - date_properties: List[datetime.date] - datetime_properties: List[datetime.datetime] - int_32_properties: List[int] - int_64_properties: List[int] - float_properties: List[float] - double_properties: List[float] - file_properties: List[File] - bytestream_properties: List[str] - enum_property_ref: AnEnum - str_property_ref: str - date_property_ref: datetime.date - datetime_property_ref: datetime.datetime - int_32_property_ref: int - int_64_property_ref: int - float_property_ref: float - double_property_ref: float - file_property_ref: File - bytestream_property_ref: str + enum_properties_ref: Union[Unset, List[AnEnum]] = UNSET + str_properties_ref: Union[Unset, List[str]] = UNSET + date_properties_ref: Union[Unset, List[datetime.date]] = UNSET + datetime_properties_ref: Union[Unset, List[datetime.datetime]] = UNSET + int_32_properties_ref: Union[Unset, List[int]] = UNSET + int_64_properties_ref: Union[Unset, List[int]] = UNSET + float_properties_ref: Union[Unset, List[float]] = UNSET + double_properties_ref: Union[Unset, List[float]] = UNSET + file_properties_ref: Union[Unset, List[File]] = UNSET + bytestream_properties_ref: Union[Unset, List[str]] = UNSET + enum_properties: Union[Unset, List[AnEnum]] = UNSET + str_properties: Union[Unset, List[str]] = UNSET + date_properties: Union[Unset, List[datetime.date]] = UNSET + datetime_properties: Union[Unset, List[datetime.datetime]] = UNSET + int_32_properties: Union[Unset, List[int]] = UNSET + int_64_properties: Union[Unset, List[int]] = UNSET + float_properties: Union[Unset, List[float]] = UNSET + double_properties: Union[Unset, List[float]] = UNSET + file_properties: Union[Unset, List[File]] = UNSET + bytestream_properties: Union[Unset, List[str]] = UNSET + enum_property_ref: Union[Unset, AnEnum] = UNSET + str_property_ref: Union[Unset, str] = UNSET + date_property_ref: Union[Unset, datetime.date] = UNSET + datetime_property_ref: Union[Unset, datetime.datetime] = UNSET + int_32_property_ref: Union[Unset, int] = UNSET + int_64_property_ref: Union[Unset, int] = UNSET + float_property_ref: Union[Unset, float] = UNSET + double_property_ref: Union[Unset, float] = UNSET + file_property_ref: Union[Unset, File] = UNSET + bytestream_property_ref: Union[Unset, str] = UNSET additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) def to_dict(self) -> Dict[str, Any]: - enum_properties_ref = [] - for componentsschemas_an_other_array_of_enum_item_data in self.enum_properties_ref: - componentsschemas_an_other_array_of_enum_item = componentsschemas_an_other_array_of_enum_item_data.value + enum_properties_ref: Union[Unset, List[str]] = UNSET + if not isinstance(self.enum_properties_ref, Unset): + enum_properties_ref = [] + for componentsschemas_an_other_array_of_enum_item_data in self.enum_properties_ref: + componentsschemas_an_other_array_of_enum_item = componentsschemas_an_other_array_of_enum_item_data.value - enum_properties_ref.append(componentsschemas_an_other_array_of_enum_item) + enum_properties_ref.append(componentsschemas_an_other_array_of_enum_item) str_properties_ref = self.str_properties_ref - - date_properties_ref = [] - for componentsschemas_an_other_array_of_date_item_data in self.date_properties_ref: - componentsschemas_an_other_array_of_date_item = ( - componentsschemas_an_other_array_of_date_item_data.isoformat() - ) - date_properties_ref.append(componentsschemas_an_other_array_of_date_item) - - datetime_properties_ref = [] - for componentsschemas_an_other_array_of_date_time_item_data in self.datetime_properties_ref: - componentsschemas_an_other_array_of_date_time_item = ( - componentsschemas_an_other_array_of_date_time_item_data.isoformat() - ) - - datetime_properties_ref.append(componentsschemas_an_other_array_of_date_time_item) - + date_properties_ref = self.date_properties_ref + datetime_properties_ref = self.datetime_properties_ref int_32_properties_ref = self.int_32_properties_ref - int_64_properties_ref = self.int_64_properties_ref - float_properties_ref = self.float_properties_ref - double_properties_ref = self.double_properties_ref - - file_properties_ref = [] - for componentsschemas_an_other_array_of_file_item_data in self.file_properties_ref: - componentsschemas_an_other_array_of_file_item = ( - componentsschemas_an_other_array_of_file_item_data.to_tuple() - ) - - file_properties_ref.append(componentsschemas_an_other_array_of_file_item) - + file_properties_ref = self.file_properties_ref bytestream_properties_ref = self.bytestream_properties_ref - - enum_properties = [] - for componentsschemas_an_array_of_enum_item_data in self.enum_properties: - componentsschemas_an_array_of_enum_item = componentsschemas_an_array_of_enum_item_data.value - - enum_properties.append(componentsschemas_an_array_of_enum_item) - - str_properties = self.str_properties - - date_properties = [] - for componentsschemas_an_array_of_date_item_data in self.date_properties: - componentsschemas_an_array_of_date_item = componentsschemas_an_array_of_date_item_data.isoformat() - date_properties.append(componentsschemas_an_array_of_date_item) - - datetime_properties = [] - for componentsschemas_an_array_of_date_time_item_data in self.datetime_properties: - componentsschemas_an_array_of_date_time_item = componentsschemas_an_array_of_date_time_item_data.isoformat() - - datetime_properties.append(componentsschemas_an_array_of_date_time_item) - - int_32_properties = self.int_32_properties - - int_64_properties = self.int_64_properties - - float_properties = self.float_properties - - double_properties = self.double_properties - - file_properties = [] - for componentsschemas_an_array_of_file_item_data in self.file_properties: - componentsschemas_an_array_of_file_item = componentsschemas_an_array_of_file_item_data.to_tuple() - - file_properties.append(componentsschemas_an_array_of_file_item) - - bytestream_properties = self.bytestream_properties - - enum_property_ref = self.enum_property_ref.value + enum_properties: Union[Unset, List[str]] = UNSET + if not isinstance(self.enum_properties, Unset): + enum_properties = [] + for componentsschemas_an_array_of_enum_item_data in self.enum_properties: + componentsschemas_an_array_of_enum_item = componentsschemas_an_array_of_enum_item_data.value + + enum_properties.append(componentsschemas_an_array_of_enum_item) + + str_properties: Union[Unset, List[str]] = UNSET + if not isinstance(self.str_properties, Unset): + str_properties = self.str_properties + + date_properties: Union[Unset, List[str]] = UNSET + if not isinstance(self.date_properties, Unset): + date_properties = [] + for componentsschemas_an_array_of_date_item_data in self.date_properties: + componentsschemas_an_array_of_date_item = componentsschemas_an_array_of_date_item_data.isoformat() + date_properties.append(componentsschemas_an_array_of_date_item) + + datetime_properties: Union[Unset, List[str]] = UNSET + if not isinstance(self.datetime_properties, Unset): + datetime_properties = [] + for componentsschemas_an_array_of_date_time_item_data in self.datetime_properties: + componentsschemas_an_array_of_date_time_item = ( + componentsschemas_an_array_of_date_time_item_data.isoformat() + ) + + datetime_properties.append(componentsschemas_an_array_of_date_time_item) + + int_32_properties: Union[Unset, List[int]] = UNSET + if not isinstance(self.int_32_properties, Unset): + int_32_properties = self.int_32_properties + + int_64_properties: Union[Unset, List[int]] = UNSET + if not isinstance(self.int_64_properties, Unset): + int_64_properties = self.int_64_properties + + float_properties: Union[Unset, List[float]] = UNSET + if not isinstance(self.float_properties, Unset): + float_properties = self.float_properties + + double_properties: Union[Unset, List[float]] = UNSET + if not isinstance(self.double_properties, Unset): + double_properties = self.double_properties + + file_properties: Union[Unset, List[Tuple[Optional[str], Union[BinaryIO, TextIO], Optional[str]]]] = UNSET + if not isinstance(self.file_properties, Unset): + file_properties = [] + for componentsschemas_an_array_of_file_item_data in self.file_properties: + componentsschemas_an_array_of_file_item = componentsschemas_an_array_of_file_item_data.to_tuple() + + file_properties.append(componentsschemas_an_array_of_file_item) + + bytestream_properties: Union[Unset, List[str]] = UNSET + if not isinstance(self.bytestream_properties, Unset): + bytestream_properties = self.bytestream_properties + + enum_property_ref: Union[Unset, str] = UNSET + if not isinstance(self.enum_property_ref, Unset): + enum_property_ref = self.enum_property_ref.value str_property_ref = self.str_property_ref - date_property_ref = self.date_property_ref.isoformat() - datetime_property_ref = self.datetime_property_ref.isoformat() + date_property_ref: Union[Unset, str] = UNSET + if not isinstance(self.date_property_ref, Unset): + date_property_ref = self.date_property_ref.isoformat() + + datetime_property_ref: Union[Unset, str] = UNSET + if not isinstance(self.datetime_property_ref, Unset): + datetime_property_ref = self.datetime_property_ref.isoformat() int_32_property_ref = self.int_32_property_ref int_64_property_ref = self.int_64_property_ref float_property_ref = self.float_property_ref double_property_ref = self.double_property_ref - file_property_ref = self.file_property_ref.to_tuple() + file_property_ref: Union[Unset, Tuple[Optional[str], Union[BinaryIO, TextIO], Optional[str]]] = UNSET + if not isinstance(self.file_property_ref, Unset): + file_property_ref = self.file_property_ref.to_tuple() bytestream_property_ref = self.bytestream_property_ref field_dict: Dict[str, Any] = {} field_dict.update(self.additional_properties) - field_dict.update( - { - "enum_properties_ref": enum_properties_ref, - "str_properties_ref": str_properties_ref, - "date_properties_ref": date_properties_ref, - "datetime_properties_ref": datetime_properties_ref, - "int32_properties_ref": int_32_properties_ref, - "int64_properties_ref": int_64_properties_ref, - "float_properties_ref": float_properties_ref, - "double_properties_ref": double_properties_ref, - "file_properties_ref": file_properties_ref, - "bytestream_properties_ref": bytestream_properties_ref, - "enum_properties": enum_properties, - "str_properties": str_properties, - "date_properties": date_properties, - "datetime_properties": datetime_properties, - "int32_properties": int_32_properties, - "int64_properties": int_64_properties, - "float_properties": float_properties, - "double_properties": double_properties, - "file_properties": file_properties, - "bytestream_properties": bytestream_properties, - "enum_property_ref": enum_property_ref, - "str_property_ref": str_property_ref, - "date_property_ref": date_property_ref, - "datetime_property_ref": datetime_property_ref, - "int32_property_ref": int_32_property_ref, - "int64_property_ref": int_64_property_ref, - "float_property_ref": float_property_ref, - "double_property_ref": double_property_ref, - "file_property_ref": file_property_ref, - "bytestream_property_ref": bytestream_property_ref, - } - ) + field_dict.update({}) + if enum_properties_ref is not UNSET: + field_dict["enum_properties_ref"] = enum_properties_ref + if str_properties_ref is not UNSET: + field_dict["str_properties_ref"] = str_properties_ref + if date_properties_ref is not UNSET: + field_dict["date_properties_ref"] = date_properties_ref + if datetime_properties_ref is not UNSET: + field_dict["datetime_properties_ref"] = datetime_properties_ref + if int_32_properties_ref is not UNSET: + field_dict["int32_properties_ref"] = int_32_properties_ref + if int_64_properties_ref is not UNSET: + field_dict["int64_properties_ref"] = int_64_properties_ref + if float_properties_ref is not UNSET: + field_dict["float_properties_ref"] = float_properties_ref + if double_properties_ref is not UNSET: + field_dict["double_properties_ref"] = double_properties_ref + if file_properties_ref is not UNSET: + field_dict["file_properties_ref"] = file_properties_ref + if bytestream_properties_ref is not UNSET: + field_dict["bytestream_properties_ref"] = bytestream_properties_ref + if enum_properties is not UNSET: + field_dict["enum_properties"] = enum_properties + if str_properties is not UNSET: + field_dict["str_properties"] = str_properties + if date_properties is not UNSET: + field_dict["date_properties"] = date_properties + if datetime_properties is not UNSET: + field_dict["datetime_properties"] = datetime_properties + if int_32_properties is not UNSET: + field_dict["int32_properties"] = int_32_properties + if int_64_properties is not UNSET: + field_dict["int64_properties"] = int_64_properties + if float_properties is not UNSET: + field_dict["float_properties"] = float_properties + if double_properties is not UNSET: + field_dict["double_properties"] = double_properties + if file_properties is not UNSET: + field_dict["file_properties"] = file_properties + if bytestream_properties is not UNSET: + field_dict["bytestream_properties"] = bytestream_properties + if enum_property_ref is not UNSET: + field_dict["enum_property_ref"] = enum_property_ref + if str_property_ref is not UNSET: + field_dict["str_property_ref"] = str_property_ref + if date_property_ref is not UNSET: + field_dict["date_property_ref"] = date_property_ref + if datetime_property_ref is not UNSET: + field_dict["datetime_property_ref"] = datetime_property_ref + if int_32_property_ref is not UNSET: + field_dict["int32_property_ref"] = int_32_property_ref + if int_64_property_ref is not UNSET: + field_dict["int64_property_ref"] = int_64_property_ref + if float_property_ref is not UNSET: + field_dict["float_property_ref"] = float_property_ref + if double_property_ref is not UNSET: + field_dict["double_property_ref"] = double_property_ref + if file_property_ref is not UNSET: + field_dict["file_property_ref"] = file_property_ref + if bytestream_property_ref is not UNSET: + field_dict["bytestream_property_ref"] = bytestream_property_ref return field_dict @@ -181,112 +215,111 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: d = src_dict.copy() enum_properties_ref = [] - _enum_properties_ref = d.pop("enum_properties_ref") - for componentsschemas_an_other_array_of_enum_item_data in _enum_properties_ref: + _enum_properties_ref = d.pop("enum_properties_ref", UNSET) + for componentsschemas_an_other_array_of_enum_item_data in _enum_properties_ref or []: componentsschemas_an_other_array_of_enum_item = AnEnum(componentsschemas_an_other_array_of_enum_item_data) enum_properties_ref.append(componentsschemas_an_other_array_of_enum_item) - str_properties_ref = cast(List[str], d.pop("str_properties_ref")) + str_properties_ref = d.pop("str_properties_ref", UNSET) - date_properties_ref = [] - _date_properties_ref = d.pop("date_properties_ref") - for componentsschemas_an_other_array_of_date_item_data in _date_properties_ref: - componentsschemas_an_other_array_of_date_item = isoparse( - componentsschemas_an_other_array_of_date_item_data - ).date() + date_properties_ref = d.pop("date_properties_ref", UNSET) - date_properties_ref.append(componentsschemas_an_other_array_of_date_item) + datetime_properties_ref = d.pop("datetime_properties_ref", UNSET) - datetime_properties_ref = [] - _datetime_properties_ref = d.pop("datetime_properties_ref") - for componentsschemas_an_other_array_of_date_time_item_data in _datetime_properties_ref: - componentsschemas_an_other_array_of_date_time_item = isoparse( - componentsschemas_an_other_array_of_date_time_item_data - ) - - datetime_properties_ref.append(componentsschemas_an_other_array_of_date_time_item) - - int_32_properties_ref = cast(List[int], d.pop("int32_properties_ref")) + int_32_properties_ref = d.pop("int32_properties_ref", UNSET) - int_64_properties_ref = cast(List[int], d.pop("int64_properties_ref")) + int_64_properties_ref = d.pop("int64_properties_ref", UNSET) - float_properties_ref = cast(List[float], d.pop("float_properties_ref")) + float_properties_ref = d.pop("float_properties_ref", UNSET) - double_properties_ref = cast(List[float], d.pop("double_properties_ref")) - - file_properties_ref = [] - _file_properties_ref = d.pop("file_properties_ref") - for componentsschemas_an_other_array_of_file_item_data in _file_properties_ref: - componentsschemas_an_other_array_of_file_item = File( - payload=BytesIO(componentsschemas_an_other_array_of_file_item_data) - ) + double_properties_ref = d.pop("double_properties_ref", UNSET) - file_properties_ref.append(componentsschemas_an_other_array_of_file_item) + file_properties_ref = d.pop("file_properties_ref", UNSET) - bytestream_properties_ref = cast(List[str], d.pop("bytestream_properties_ref")) + bytestream_properties_ref = d.pop("bytestream_properties_ref", UNSET) enum_properties = [] - _enum_properties = d.pop("enum_properties") - for componentsschemas_an_array_of_enum_item_data in _enum_properties: + _enum_properties = d.pop("enum_properties", UNSET) + for componentsschemas_an_array_of_enum_item_data in _enum_properties or []: componentsschemas_an_array_of_enum_item = AnEnum(componentsschemas_an_array_of_enum_item_data) enum_properties.append(componentsschemas_an_array_of_enum_item) - str_properties = cast(List[str], d.pop("str_properties")) + str_properties = cast(List[str], d.pop("str_properties", UNSET)) date_properties = [] - _date_properties = d.pop("date_properties") - for componentsschemas_an_array_of_date_item_data in _date_properties: + _date_properties = d.pop("date_properties", UNSET) + for componentsschemas_an_array_of_date_item_data in _date_properties or []: componentsschemas_an_array_of_date_item = isoparse(componentsschemas_an_array_of_date_item_data).date() date_properties.append(componentsschemas_an_array_of_date_item) datetime_properties = [] - _datetime_properties = d.pop("datetime_properties") - for componentsschemas_an_array_of_date_time_item_data in _datetime_properties: + _datetime_properties = d.pop("datetime_properties", UNSET) + for componentsschemas_an_array_of_date_time_item_data in _datetime_properties or []: componentsschemas_an_array_of_date_time_item = isoparse(componentsschemas_an_array_of_date_time_item_data) datetime_properties.append(componentsschemas_an_array_of_date_time_item) - int_32_properties = cast(List[int], d.pop("int32_properties")) + int_32_properties = cast(List[int], d.pop("int32_properties", UNSET)) - int_64_properties = cast(List[int], d.pop("int64_properties")) + int_64_properties = cast(List[int], d.pop("int64_properties", UNSET)) - float_properties = cast(List[float], d.pop("float_properties")) + float_properties = cast(List[float], d.pop("float_properties", UNSET)) - double_properties = cast(List[float], d.pop("double_properties")) + double_properties = cast(List[float], d.pop("double_properties", UNSET)) file_properties = [] - _file_properties = d.pop("file_properties") - for componentsschemas_an_array_of_file_item_data in _file_properties: + _file_properties = d.pop("file_properties", UNSET) + for componentsschemas_an_array_of_file_item_data in _file_properties or []: componentsschemas_an_array_of_file_item = File( payload=BytesIO(componentsschemas_an_array_of_file_item_data) ) file_properties.append(componentsschemas_an_array_of_file_item) - bytestream_properties = cast(List[str], d.pop("bytestream_properties")) + bytestream_properties = cast(List[str], d.pop("bytestream_properties", UNSET)) - enum_property_ref = AnEnum(d.pop("enum_property_ref")) + _enum_property_ref = d.pop("enum_property_ref", UNSET) + enum_property_ref: Union[Unset, AnEnum] + if isinstance(_enum_property_ref, Unset): + enum_property_ref = UNSET + else: + enum_property_ref = AnEnum(_enum_property_ref) - str_property_ref = d.pop("str_property_ref") + str_property_ref = d.pop("str_property_ref", UNSET) - date_property_ref = isoparse(d.pop("date_property_ref")).date() + _date_property_ref = d.pop("date_property_ref", UNSET) + date_property_ref: Union[Unset, datetime.date] + if isinstance(_date_property_ref, Unset): + date_property_ref = UNSET + else: + date_property_ref = isoparse(_date_property_ref).date() - datetime_property_ref = isoparse(d.pop("datetime_property_ref")) + _datetime_property_ref = d.pop("datetime_property_ref", UNSET) + datetime_property_ref: Union[Unset, datetime.datetime] + if isinstance(_datetime_property_ref, Unset): + datetime_property_ref = UNSET + else: + datetime_property_ref = isoparse(_datetime_property_ref) - int_32_property_ref = d.pop("int32_property_ref") + int_32_property_ref = d.pop("int32_property_ref", UNSET) - int_64_property_ref = d.pop("int64_property_ref") + int_64_property_ref = d.pop("int64_property_ref", UNSET) - float_property_ref = d.pop("float_property_ref") + float_property_ref = d.pop("float_property_ref", UNSET) - double_property_ref = d.pop("double_property_ref") + double_property_ref = d.pop("double_property_ref", UNSET) - file_property_ref = File(payload=BytesIO(d.pop("file_property_ref"))) + _file_property_ref = d.pop("file_property_ref", UNSET) + file_property_ref: Union[Unset, File] + if isinstance(_file_property_ref, Unset): + file_property_ref = UNSET + else: + file_property_ref = File(payload=BytesIO(_file_property_ref)) - bytestream_property_ref = d.pop("bytestream_property_ref") + bytestream_property_ref = d.pop("bytestream_property_ref", UNSET) a_model_with_properties_reference_that_are_not_object = cls( enum_properties_ref=enum_properties_ref, diff --git a/openapi_python_client/__init__.py b/openapi_python_client/__init__.py index 43f813483..fe5e42f13 100644 --- a/openapi_python_client/__init__.py +++ b/openapi_python_client/__init__.py @@ -3,13 +3,13 @@ import shutil import subprocess import sys +import urllib from enum import Enum from pathlib import Path -from typing import Any, Dict, Optional, Sequence, Union +from typing import Any, Dict, Optional, Sequence, Union, cast import httpcore import httpx -import yaml from jinja2 import BaseLoader, ChoiceLoader, Environment, FileSystemLoader, PackageLoader from openapi_python_client import utils @@ -17,6 +17,7 @@ from .config import Config from .parser import GeneratorData, import_string_from_class from .parser.errors import GeneratorError +from .resolver.schema_resolver import SchemaResolver from .utils import snake_case if sys.version_info.minor < 8: # version did not exist before 3.8, need to use a backport @@ -351,20 +352,21 @@ def update_existing_client( def _get_document(*, url: Optional[str], path: Optional[Path]) -> Union[Dict[str, Any], GeneratorError]: - yaml_bytes: bytes if url is not None and path is not None: return GeneratorError(header="Provide URL or Path, not both.") - if url is not None: - try: - response = httpx.get(url) - yaml_bytes = response.content - except (httpx.HTTPError, httpcore.NetworkError): - return GeneratorError(header="Could not get OpenAPI document from provided URL") - elif path is not None: - yaml_bytes = path.read_bytes() - else: + + if url is None and path is None: return GeneratorError(header="No URL or Path provided") + + source = cast(Union[str, Path], (url if url is not None else path)) try: - return yaml.safe_load(yaml_bytes) - except yaml.YAMLError: + resolver = SchemaResolver(source) + result = resolver.resolve() + if len(result.errors) > 0: + return GeneratorError(header="; ".join(result.errors)) + except (httpx.HTTPError, httpcore.NetworkError, urllib.error.URLError): + return GeneratorError(header="Could not get OpenAPI document from provided URL") + except Exception: return GeneratorError(header="Invalid YAML from provided source") + + return result.schema diff --git a/openapi_python_client/parser/properties/schemas.py b/openapi_python_client/parser/properties/schemas.py index 6e369e6ab..322b15a45 100644 --- a/openapi_python_client/parser/properties/schemas.py +++ b/openapi_python_client/parser/properties/schemas.py @@ -61,12 +61,10 @@ def from_string(*, string: str, config: Config) -> "Class": class Schemas: """Structure for containing all defined, shareable, and reusable schemas (attr classes and Enums)""" - classes_by_reference: Dict[ - _ReferencePath, _Holder[Union[Property, RecursiveReferenceInterupt]] - ] = attr.ib(factory=dict) - classes_by_name: Dict[ - _ClassName, _Holder[Union[Property, RecursiveReferenceInterupt]] - ] = attr.ib(factory=dict) + classes_by_reference: Dict[_ReferencePath, _Holder[Union[Property, RecursiveReferenceInterupt]]] = attr.ib( + factory=dict + ) + classes_by_name: Dict[_ClassName, _Holder[Union[Property, RecursiveReferenceInterupt]]] = attr.ib(factory=dict) errors: List[ParseError] = attr.ib(factory=list) diff --git a/openapi_python_client/parser/responses.py b/openapi_python_client/parser/responses.py index ffa703d6f..b1d5432f2 100644 --- a/openapi_python_client/parser/responses.py +++ b/openapi_python_client/parser/responses.py @@ -21,6 +21,7 @@ class Response: _SOURCE_BY_CONTENT_TYPE = { "application/json": "response.json()", + "application/problem+json": "response.json()", "application/vnd.api+json": "response.json()", "application/octet-stream": "response.content", "text/html": "response.text", diff --git a/openapi_python_client/resolver/__init__.py b/openapi_python_client/resolver/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openapi_python_client/resolver/collision_resolver.py b/openapi_python_client/resolver/collision_resolver.py new file mode 100644 index 000000000..66cb05fa7 --- /dev/null +++ b/openapi_python_client/resolver/collision_resolver.py @@ -0,0 +1,148 @@ +import hashlib +import re +from typing import Any, Dict, List, Tuple + +from .reference import Reference +from .resolver_types import SchemaData + + +class CollisionResolver: + def __init__(self, root: SchemaData, refs: Dict[str, SchemaData], errors: List[str], parent: str): + self._root: SchemaData = root + self._refs: Dict[str, SchemaData] = refs + self._errors: List[str] = errors + self._parent = parent + self._refs_index: Dict[str, str] = dict() + self._schema_index: Dict[str, Reference] = dict() + self._keys_to_replace: Dict[str, Tuple[int, SchemaData, List[str]]] = dict() + + def _browse_schema(self, attr: Any, root_attr: Any) -> None: + if isinstance(attr, dict): + attr_copy = {**attr} # Create a shallow copy + for key, val in attr_copy.items(): + if key == "$ref": + ref = Reference(val, self._parent) + value = ref.pointer.value + + assert value + + schema = self._get_from_ref(ref, root_attr) + hashed_schema = self._reference_schema_hash(schema) + + if value in self._refs_index.keys(): + if self._refs_index[value] != hashed_schema: + if ref.is_local(): + self._increment_ref(ref, root_attr, hashed_schema, attr, key) + else: + assert ref.abs_path in self._refs.keys() + self._increment_ref(ref, self._refs[ref.abs_path], hashed_schema, attr, key) + else: + self._refs_index[value] = hashed_schema + + if hashed_schema in self._schema_index.keys(): + existing_ref = self._schema_index[hashed_schema] + if ( + existing_ref.pointer.value != ref.pointer.value + and ref.pointer.tokens()[-1] == existing_ref.pointer.tokens()[-1] + ): + self._errors.append(f"Found a duplicate schema in {existing_ref.value} and {ref.value}") + else: + self._schema_index[hashed_schema] = ref + + else: + self._browse_schema(val, root_attr) + + elif isinstance(attr, list): + for val in attr: + self._browse_schema(val, root_attr) + + def _get_from_ref(self, ref: Reference, attr: SchemaData) -> SchemaData: + if ref.is_remote(): + assert ref.abs_path in self._refs.keys() + attr = self._refs[ref.abs_path] + cursor = attr + query_parts = ref.pointer.tokens() + + for key in query_parts: + if key == "": + continue + + if isinstance(cursor, dict) and key in cursor: + cursor = cursor[key] + else: + self._errors.append(f"Did not find data corresponding to the reference {ref.value}") + + if list(cursor) == ["$ref"]: + ref2 = cursor["$ref"] + ref2 = re.sub(r"(.*)_\d", r"\1", ref2) + ref2 = Reference(ref2, self._parent) + if ref2.is_remote(): + attr = self._refs[ref2.abs_path] + return self._get_from_ref(ref2, attr) + + return cursor + + def _increment_ref( + self, ref: Reference, schema: SchemaData, hashed_schema: str, attr: Dict[str, Any], key: str + ) -> None: + i = 2 + value = ref.pointer.value + incremented_value = value + "_" + str(i) + + while incremented_value in self._refs_index.keys(): + if self._refs_index[incremented_value] == hashed_schema: + if ref.value not in self._keys_to_replace.keys(): + break # have to increment target key aswell + else: + attr[key] = ref.value + "_" + str(i) + return + else: + i = i + 1 + incremented_value = value + "_" + str(i) + + attr[key] = ref.value + "_" + str(i) + self._refs_index[incremented_value] = hashed_schema + self._keys_to_replace[ref.value] = (i, schema, ref.pointer.tokens()) + + def _modify_root_ref_name(self, query_parts: List[str], i: int, attr: SchemaData) -> None: + cursor = attr + last_key = query_parts[-1] + + for key in query_parts: + if key == "": + continue + + if key == last_key and key + "_" + str(i) not in cursor: + assert key in cursor, "Didnt find %s in %s" % (key, attr) + cursor[key + "_" + str(i)] = cursor.pop(key) + return + + if isinstance(cursor, dict) and key in cursor: + cursor = cursor[key] + else: + return + + def resolve(self) -> None: + self._browse_schema(self._root, self._root) + for file, schema in self._refs.items(): + self._browse_schema(schema, schema) + for a, b in self._keys_to_replace.items(): + self._modify_root_ref_name(b[2], b[0], b[1]) + + def _reference_schema_hash(self, schema: Dict[str, Any]) -> str: + md5 = hashlib.md5() + hash_elms = [] + for key in schema.keys(): + if key == "description": + hash_elms.append(schema[key]) + if key == "type": + hash_elms.append(schema[key]) + if key == "allOf": + for item in schema[key]: + hash_elms.append(str(item)) + + hash_elms.append(key) + + hash_elms.sort() + md5.update(";".join(hash_elms).encode("utf-8")) + return md5.hexdigest() diff --git a/openapi_python_client/resolver/data_loader.py b/openapi_python_client/resolver/data_loader.py new file mode 100644 index 000000000..df6677020 --- /dev/null +++ b/openapi_python_client/resolver/data_loader.py @@ -0,0 +1,24 @@ +import json + +import yaml + +from .resolver_types import SchemaData + + +class DataLoader: + @classmethod + def load(cls, path: str, data: bytes) -> SchemaData: + data_type = path.split(".")[-1].casefold() + + if data_type == "json": + return cls.load_json(data) + else: + return cls.load_yaml(data) + + @classmethod + def load_json(cls, data: bytes) -> SchemaData: + return json.loads(data) + + @classmethod + def load_yaml(cls, data: bytes) -> SchemaData: + return yaml.safe_load(data) diff --git a/openapi_python_client/resolver/pointer.py b/openapi_python_client/resolver/pointer.py new file mode 100644 index 000000000..911f36243 --- /dev/null +++ b/openapi_python_client/resolver/pointer.py @@ -0,0 +1,48 @@ +import urllib.parse +from typing import List, Union + + +class Pointer: + """https://tools.ietf.org/html/rfc6901""" + + def __init__(self, pointer: str) -> None: + if pointer is None or pointer != "" and not pointer.startswith("/"): + raise ValueError(f'Invalid pointer value {pointer}, it must match: *( "/" reference-token )') + + self._pointer = pointer + + @property + def value(self) -> str: + return self._pointer + + @property + def parent(self) -> Union["Pointer", None]: + tokens = self.tokens(False) + + if len(tokens) > 1: + tokens.pop() + return Pointer("/".join(tokens)) + else: + assert tokens[-1] == "" + return None + + def tokens(self, unescape: bool = True) -> List[str]: + tokens = [] + + if unescape: + for token in self._pointer.split("/"): + tokens.append(self._unescape(token)) + else: + tokens = self._pointer.split("/") + + return tokens + + @property + def unescapated_value(self) -> str: + return self._unescape(self._pointer) + + def _unescape(self, data: str) -> str: + data = urllib.parse.unquote(data) + data = data.replace("~1", "/") + data = data.replace("~0", "~") + return data diff --git a/openapi_python_client/resolver/reference.py b/openapi_python_client/resolver/reference.py new file mode 100644 index 000000000..019def096 --- /dev/null +++ b/openapi_python_client/resolver/reference.py @@ -0,0 +1,68 @@ +import urllib.parse +from pathlib import Path +from typing import Union + +from .pointer import Pointer + + +class Reference: + """https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03""" + + def __init__(self, reference: str, parent: str = None): + self._ref = reference + self._parsed_ref = urllib.parse.urlparse(reference) + self._parent = parent + + @property + def path(self) -> str: + return urllib.parse.urldefrag(self._parsed_ref.geturl()).url + + @property + def abs_path(self) -> str: + if self._parent: + parent_dir = Path(self._parent) + abs_path = parent_dir.joinpath(self.path) + abs_path = abs_path.resolve() + return str(abs_path) + else: + return self.path + + @property + def parent(self) -> Union[str, None]: + return self._parent + + @property + def pointer(self) -> Pointer: + frag = self._parsed_ref.fragment + if self.is_url() and frag != "" and not frag.startswith("/"): + frag = f"/{frag}" + + return Pointer(frag) + + def is_relative(self) -> bool: + """return True if reference path is a relative path""" + return not self.is_absolute() + + def is_absolute(self) -> bool: + """return True is reference path is an absolute path""" + return self._parsed_ref.netloc != "" + + @property + def value(self) -> str: + return self._ref + + def is_url(self) -> bool: + """return True if the reference path is pointing to an external url location""" + return self.is_remote() and self._parsed_ref.netloc != "" + + def is_remote(self) -> bool: + """return True if the reference pointer is pointing to a remote document""" + return not self.is_local() + + def is_local(self) -> bool: + """return True if the reference pointer is pointing to the current document""" + return self._parsed_ref.path == "" + + def is_full_document(self) -> bool: + """return True if the reference pointer is pointing to the whole document content""" + return self.pointer.parent is None diff --git a/openapi_python_client/resolver/resolved_schema.py b/openapi_python_client/resolver/resolved_schema.py new file mode 100644 index 000000000..2612e0362 --- /dev/null +++ b/openapi_python_client/resolver/resolved_schema.py @@ -0,0 +1,175 @@ +from typing import Any, Dict, Generator, List, Tuple, Union, cast + +from .reference import Reference +from .resolver_types import SchemaData + + +class ResolvedSchema: + def __init__(self, root: SchemaData, refs: Dict[str, SchemaData], errors: List[str], parent: str): + self._root: SchemaData = root + self._refs: Dict[str, SchemaData] = refs + self._errors: List[str] = errors + self._resolved_remotes_components: SchemaData = cast(SchemaData, {}) + self._parent = parent + + self._resolved_schema: SchemaData = cast(SchemaData, {}) + if len(self._errors) == 0: + self._process() + + @property + def schema(self) -> SchemaData: + return self._root + + @property + def errors(self) -> List[str]: + return self._errors.copy() + + def _dict_deep_update(self, d: Dict[str, Any], u: Dict[str, Any]) -> Dict[str, Any]: + for k, v in u.items(): + if isinstance(d, Dict) and list(d) == ["$ref"]: + d.pop("$ref") + if isinstance(v, Dict): + d[k] = self._dict_deep_update(d.get(k, {}), v) + else: + d[k] = v + return d + + def _process(self) -> None: + self._process_remote_paths() + self._process_remote_components(self._root, parent_path=self._parent) + self._dict_deep_update(self._root, self._resolved_remotes_components) + + def _process_remote_paths(self) -> None: + refs_to_replace = [] + refs_to_remove = [] + for owner, ref_key, ref_val in self._lookup_schema_references_in(self._root, "paths"): + ref = Reference(ref_val, self._parent) + + if ref.is_local(): + continue + + remote_path = ref.abs_path + path = ref.pointer.unescapated_value + tokens = ref.pointer.tokens() + + if remote_path not in self._refs: + self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) + else: + remote_schema = self._refs[remote_path] + remote_value = self._lookup_dict(remote_schema, tokens) + if not remote_value: + self._errors.append("Failed to read remote value {}, in remote ref {}".format(path, remote_path)) + refs_to_remove.append((owner, ref_key)) + else: + refs_to_replace.append((owner, remote_schema, remote_value)) + + for owner, remote_schema, remote_value in refs_to_replace: + self._process_remote_components(remote_schema, remote_value, 1, self._parent) + self._replace_reference_with(owner, remote_value) + + for owner, ref_key in refs_to_remove: + owner.pop(ref_key) + + def _process_remote_components( + self, owner: SchemaData, subpart: Union[SchemaData, None] = None, depth: int = 0, parent_path: str = None + ) -> None: + target = subpart if subpart else owner + + for parent, ref_key, ref_val in self._lookup_schema_references(target): + ref = Reference(ref_val, parent_path) + + if ref.is_local(): + # print('Found local reference >> {0}'.format(ref.value)) + if depth > 0: + self._transform_to_local_components(owner, ref) + else: + remote_path = ref.abs_path + if remote_path not in self._refs: + self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) + else: + remote_owner = self._refs[remote_path] + self._transform_to_local_components(remote_owner, ref) + self._transform_to_local_ref(parent, ref) + + def _transform_to_local_components(self, owner: SchemaData, ref: Reference) -> None: + self._ensure_components_dir_exists(ref) + + # print('Processing remote component > {0}'.format(ref.value)) + remote_component = self._lookup_dict(owner, ref.pointer.tokens()) + pointer_parent = ref.pointer.parent + + if pointer_parent is not None: + root_components_dir = self._lookup_dict(self._resolved_remotes_components, pointer_parent.tokens()) + component_name = ref.pointer.value.split("/")[-1] + + if remote_component is None: + print("Weird relookup of >> {0}".format(ref.value)) + assert ref.is_local() and self._lookup_dict(self._resolved_remotes_components, ref.pointer.tokens()) + return + + if "$ref" in remote_component: + subref = Reference(remote_component["$ref"], ref.parent) + if not subref.is_local(): + print("Lookup remote ref >>> {0}".format(subref.value)) + self._process_remote_components(remote_component, parent_path=ref.parent) + + if root_components_dir is not None: + if component_name not in root_components_dir: + root_components_dir[component_name] = remote_component + self._process_remote_components(owner, remote_component, 2, ref.parent) + + def _ensure_components_dir_exists(self, ref: Reference) -> None: + cursor = self._resolved_remotes_components + pointer_dir = ref.pointer.parent + assert pointer_dir is not None + + for key in pointer_dir.value.split("/"): # noqa + if key == "": + continue + + if key not in cursor: + cursor[key] = {} + + cursor = cursor[key] + + def _transform_to_local_ref(self, owner: Dict[str, Any], ref: Reference) -> None: + owner["$ref"] = "#{0}".format(ref.pointer.value) + + def _lookup_dict(self, attr: SchemaData, query_parts: List[str]) -> Union[SchemaData, None]: + cursor = attr + + for key in query_parts: + if key == "": + continue + + if isinstance(cursor, dict) and key in cursor: + cursor = cursor[key] + else: + return None + return cursor + + def _replace_reference_with(self, root: Dict[str, Any], new_value: Dict[str, Any]) -> None: + for key in new_value: + root[key] = new_value[key] + + root.pop("$ref") + + def _lookup_schema_references_in( + self, attr: SchemaData, path: str + ) -> Generator[Tuple[SchemaData, str, Any], None, None]: + if not isinstance(attr, dict) or path not in attr: + return + + yield from self._lookup_schema_references(attr[path]) + + def _lookup_schema_references(self, attr: Any) -> Generator[Tuple[SchemaData, str, str], None, None]: + if isinstance(attr, dict): + for key, val in attr.items(): + if key == "$ref": + yield cast(SchemaData, attr), cast(str, key), cast(str, val) + else: + yield from self._lookup_schema_references(val) + + elif isinstance(attr, list): + for val in attr: + yield from self._lookup_schema_references(val) diff --git a/openapi_python_client/resolver/resolver_types.py b/openapi_python_client/resolver/resolver_types.py new file mode 100644 index 000000000..84f6cea5b --- /dev/null +++ b/openapi_python_client/resolver/resolver_types.py @@ -0,0 +1,3 @@ +from typing import Any, Dict, NewType + +SchemaData = NewType("SchemaData", Dict[str, Any]) diff --git a/openapi_python_client/resolver/schema_resolver.py b/openapi_python_client/resolver/schema_resolver.py new file mode 100644 index 000000000..3521842fb --- /dev/null +++ b/openapi_python_client/resolver/schema_resolver.py @@ -0,0 +1,147 @@ +import logging +import urllib +from pathlib import Path +from typing import Any, Dict, Generator, List, Union, cast + +import httpx + +from .collision_resolver import CollisionResolver +from .data_loader import DataLoader +from .reference import Reference +from .resolved_schema import ResolvedSchema +from .resolver_types import SchemaData + + +class SchemaResolver: + def __init__(self, url_or_path: Union[str, Path]): + if not url_or_path: + raise ValueError("Invalid document root reference, it shall be an remote url or local file path") + + self._root_path: Union[Path, None] = None + self._root_url: Union[str, None] = None + self._root_url_scheme: Union[str, None] = None + self._parent_path: str + + if self._isapath(url_or_path): + url_or_path = cast(Path, url_or_path) + self._root_path = url_or_path.absolute() + self._parent_path = str(self._root_path.parent) + else: + url_or_path = cast(str, url_or_path) + self._root_url = url_or_path + self._parent_path = url_or_path + try: + self._root_url_scheme = urllib.parse.urlparse(url_or_path).scheme + if self._root_url_scheme not in ["http", "https"]: + raise ValueError(f"Unsupported URL scheme '{self._root_url_scheme}', expecting http or https") + except (TypeError, AttributeError): + raise urllib.error.URLError(f"Coult not parse URL > {url_or_path}") + + def _isapath(self, url_or_path: Union[str, Path]) -> bool: + return isinstance(url_or_path, Path) + + def resolve(self, recursive: bool = True) -> ResolvedSchema: + assert self._root_path or self._root_url + + root_schema: SchemaData + external_schemas: Dict[str, SchemaData] = {} + errors: List[str] = [] + parent: str + + if self._root_path: + root_schema = self._fetch_remote_file_path(self._root_path) + elif self._root_url: + root_schema = self._fetch_url_reference(self._root_url) + + self._resolve_schema_references(self._parent_path, root_schema, external_schemas, errors, recursive) + CollisionResolver(root_schema, external_schemas, errors, self._parent_path).resolve() + return ResolvedSchema(root_schema, external_schemas, errors, self._parent_path) + + def _resolve_schema_references( + self, + parent: str, + root: SchemaData, + external_schemas: Dict[str, SchemaData], + errors: List[str], + recursive: bool, + ) -> None: + + for ref in self._lookup_schema_references(root): + if ref.is_local(): + continue + + try: + path = self._absolute_path(ref.path, parent) + parent = self._parent(path) + + if path in external_schemas: + continue + + external_schemas[path] = self._fetch_remote_reference(path) + + if recursive: + self._resolve_schema_references(parent, external_schemas[path], external_schemas, errors, recursive) + + except Exception: + errors.append(f"Failed to gather external reference data of {ref.value} from {path}") + logging.exception(f"Failed to gather external reference data of {ref.value} from {path}") + + def _parent(self, abs_path: str) -> str: + if abs_path.startswith("http", 0): + return urllib.parse.urljoin(f"{abs_path}/", "..") + else: + path = Path(abs_path) + return str(path.parent) + + def _absolute_path(self, relative_path: str, parent: str) -> str: + if relative_path.startswith("http", 0): + return relative_path + + if relative_path.startswith("//"): + if parent.startswith("http"): + scheme = urllib.parse.urlparse(parent).scheme + return f"{scheme}:{relative_path}" + else: + scheme = self._root_url_scheme or "http" + return f"{scheme}:{relative_path}" + + if parent.startswith("http"): + return urllib.parse.urljoin(parent, relative_path) + else: + parent_dir = Path(parent) + abs_path = parent_dir.joinpath(relative_path) + abs_path = abs_path.resolve() + return str(abs_path) + + def _fetch_remote_reference(self, abs_path: str) -> SchemaData: + res: SchemaData + + if abs_path.startswith("http"): + res = self._fetch_url_reference(abs_path) + else: + res = self._fetch_remote_file_path(Path(abs_path)) + + return res + + def _fetch_remote_file_path(self, path: Path) -> SchemaData: + logging.info(f"Fetching remote ref file path > {path}") + return DataLoader.load(str(path), path.read_bytes()) + + def _fetch_url_reference(self, url: str) -> SchemaData: + if url.startswith("//", 0): + url = "{0}:{1}".format((self._root_url_scheme or "http"), url) + + logging.info(f"Fetching remote ref url > {url}") + return DataLoader.load(url, httpx.get(url).content) + + def _lookup_schema_references(self, attr: Any) -> Generator[Reference, None, None]: + if isinstance(attr, dict): + for key, val in attr.items(): + if key == "$ref": + yield Reference(val) + else: + yield from self._lookup_schema_references(val) + + elif isinstance(attr, list): + for val in attr: + yield from self._lookup_schema_references(val) diff --git a/openapi_python_client/utils.py b/openapi_python_client/utils.py index 7a7c84185..c2f5ed047 100644 --- a/openapi_python_client/utils.py +++ b/openapi_python_client/utils.py @@ -67,7 +67,7 @@ def to_valid_python_identifier(value: str) -> str: See: https://docs.python.org/3/reference/lexical_analysis.html#identifiers """ - new_value = fix_reserved_words(fix_keywords(sanitize(value))) + new_value = fix_reserved_words(fix_keywords(sanitize(value))).lstrip("_") if new_value.isidentifier(): return new_value diff --git a/tests/test___init__.py b/tests/test___init__.py index 0579e83f0..bd8f12d08 100644 --- a/tests/test___init__.py +++ b/tests/test___init__.py @@ -1,4 +1,5 @@ import pathlib +from urllib.parse import ParseResult import httpcore import jinja2 @@ -175,7 +176,7 @@ def test__get_document_url_and_path(self, mocker): loads.assert_not_called() def test__get_document_bad_url(self, mocker): - get = mocker.patch("httpx.get", side_effect=httpcore.NetworkError) + get = mocker.patch("httpx.get") Path = mocker.patch("openapi_python_client.Path") loads = mocker.patch("yaml.safe_load") @@ -185,7 +186,7 @@ def test__get_document_bad_url(self, mocker): result = _get_document(url=url, path=None) assert result == GeneratorError(header="Could not get OpenAPI document from provided URL") - get.assert_called_once_with(url) + get.assert_not_called() Path.assert_not_called() loads.assert_not_called() @@ -196,7 +197,7 @@ def test__get_document_url_no_path(self, mocker): from openapi_python_client import _get_document - url = mocker.MagicMock() + url = "http://localhost/" _get_document(url=url, path=None) get.assert_called_once_with(url) @@ -206,6 +207,7 @@ def test__get_document_url_no_path(self, mocker): def test__get_document_path_no_url(self, mocker): get = mocker.patch("httpx.get") loads = mocker.patch("yaml.safe_load") + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) from openapi_python_client import _get_document @@ -213,12 +215,13 @@ def test__get_document_path_no_url(self, mocker): _get_document(url=None, path=path) get.assert_not_called() - path.read_bytes.assert_called_once() - loads.assert_called_once_with(path.read_bytes()) + path.absolute().read_bytes.assert_called_once() + loads.assert_called_once_with(path.absolute().read_bytes()) def test__get_document_bad_yaml(self, mocker): get = mocker.patch("httpx.get") loads = mocker.patch("yaml.safe_load", side_effect=yaml.YAMLError) + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) from openapi_python_client import _get_document @@ -226,8 +229,8 @@ def test__get_document_bad_yaml(self, mocker): result = _get_document(url=None, path=path) get.assert_not_called() - path.read_bytes.assert_called_once() - loads.assert_called_once_with(path.read_bytes()) + path.absolute().read_bytes.assert_called_once() + loads.assert_called_once_with(path.absolute().read_bytes()) assert result == GeneratorError(header="Invalid YAML from provided source") diff --git a/tests/test_parser/test_properties/test_init.py b/tests/test_parser/test_properties/test_init.py index 1f5646d74..4a593be8d 100644 --- a/tests/test_parser/test_properties/test_init.py +++ b/tests/test_parser/test_properties/test_init.py @@ -1,10 +1,6 @@ -from unittest.mock import MagicMock, call - -import attr import pytest import openapi_python_client.schema as oai -from openapi_python_client import Config from openapi_python_client.parser.errors import PropertyError, ValidationError from openapi_python_client.parser.properties import BooleanProperty, FloatProperty, IntProperty @@ -402,35 +398,110 @@ def test_get_imports(self, mocker): class TestEnumProperty: - @pytest.mark.parametrize( - "required, nullable, expected", - ( - (False, False, "Union[Unset, {}]"), - (True, False, "{}"), - (False, True, "Union[Unset, None, {}]"), - (True, True, "Optional[{}]"), - ), - ) - def test_get_type_string(self, mocker, enum_property_factory, required, nullable, expected): - fake_class = mocker.MagicMock() - fake_class.name = "MyTestEnum" + def test_get_type_string(self, mocker): + fake_reference = mocker.MagicMock(class_name="MyTestEnum") + + from openapi_python_client.parser import properties + + p = properties.EnumProperty( + name="test", + required=True, + default=None, + values={}, + nullable=False, + reference=fake_reference, + value_type=str, + ) + + base_type_string = f"MyTestEnum" + + assert p.get_type_string() == base_type_string + assert p.get_type_string(json=True) == "str" - p = enum_property_factory(class_info=fake_class, required=required, nullable=nullable) + p = properties.EnumProperty( + name="test", + required=True, + default=None, + values={}, + nullable=True, + reference=fake_reference, + value_type=str, + ) + assert p.get_type_string() == f"Optional[{base_type_string}]" + assert p.get_type_string(no_optional=True) == base_type_string - assert p.get_type_string() == expected.format(fake_class.name) - assert p.get_type_string(no_optional=True) == fake_class.name - assert p.get_type_string(json=True) == expected.format("str") + p = properties.EnumProperty( + name="test", + required=False, + default=None, + values={}, + nullable=True, + reference=fake_reference, + value_type=str, + ) + assert p.get_type_string() == f"Union[Unset, None, {base_type_string}]" + assert p.get_type_string(no_optional=True) == base_type_string - def test_get_imports(self, mocker, enum_property_factory): - fake_class = mocker.MagicMock(module_name="my_test_enum") - fake_class.name = "MyTestEnum" + p = properties.EnumProperty( + name="test", + required=False, + default=None, + values={}, + nullable=False, + reference=fake_reference, + value_type=str, + ) + assert p.get_type_string() == f"Union[Unset, {base_type_string}]" + assert p.get_type_string(no_optional=True) == base_type_string + + def test_get_imports(self, mocker): + fake_reference = mocker.MagicMock(class_name="MyTestEnum", module_name="my_test_enum") prefix = "..." - enum_property = enum_property_factory(class_info=fake_class, required=False) + from openapi_python_client.parser import properties + + enum_property = properties.EnumProperty( + name="test", + required=True, + default=None, + values={}, + nullable=False, + reference=fake_reference, + value_type=str, + ) + + assert enum_property.get_imports(prefix=prefix) == { + f"from {prefix}models.{fake_reference.module_name} import {fake_reference.class_name}", + } + enum_property = properties.EnumProperty( + name="test", + required=False, + default=None, + values={}, + nullable=False, + reference=fake_reference, + value_type=str, + ) assert enum_property.get_imports(prefix=prefix) == { - f"from {prefix}models.{fake_class.module_name} import {fake_class.name}", - "from typing import Union", # Makes sure unset is handled via base class + f"from {prefix}models.{fake_reference.module_name} import {fake_reference.class_name}", + "from typing import Union", + "from ...types import UNSET, Unset", + } + + enum_property = properties.EnumProperty( + name="test", + required=False, + default=None, + values={}, + nullable=True, + reference=fake_reference, + value_type=str, + ) + assert enum_property.get_imports(prefix=prefix) == { + f"from {prefix}models.{fake_reference.module_name} import {fake_reference.class_name}", + "from typing import Union", + "from typing import Optional", "from ...types import UNSET, Unset", } @@ -463,7 +534,7 @@ def test_values_from_list_duplicate(self): class TestPropertyFromData: def test_property_from_data_str_enum(self, mocker): - from openapi_python_client.parser.properties import Class, EnumProperty + from openapi_python_client.parser.properties import EnumProperty, Reference from openapi_python_client.schema import Schema data = Schema(title="AnEnum", enum=["A", "B", "C"], nullable=False, default="B") @@ -472,10 +543,10 @@ def test_property_from_data_str_enum(self, mocker): from openapi_python_client.parser.properties import Schemas, property_from_data - schemas = Schemas(classes_by_name={"AnEnum": mocker.MagicMock()}) + schemas = Schemas(enums={"AnEnum": mocker.MagicMock()}) prop, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=Config() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert prop == EnumProperty( @@ -483,18 +554,18 @@ def test_property_from_data_str_enum(self, mocker): required=True, nullable=False, values={"A": "A", "B": "B", "C": "C"}, - class_info=Class(name="ParentAnEnum", module_name="parent_an_enum"), + reference=Reference(class_name="ParentAnEnum", module_name="parent_an_enum"), value_type=str, default="ParentAnEnum.B", ) assert schemas != new_schemas, "Provided Schemas was mutated" - assert new_schemas.classes_by_name == { - "AnEnum": schemas.classes_by_name["AnEnum"], + assert new_schemas.enums == { + "AnEnum": schemas.enums["AnEnum"], "ParentAnEnum": prop, } def test_property_from_data_int_enum(self, mocker): - from openapi_python_client.parser.properties import Class, EnumProperty + from openapi_python_client.parser.properties import EnumProperty, Reference from openapi_python_client.schema import Schema data = Schema.construct(title="anEnum", enum=[1, 2, 3], nullable=False, default=3) @@ -503,10 +574,10 @@ def test_property_from_data_int_enum(self, mocker): from openapi_python_client.parser.properties import Schemas, property_from_data - schemas = Schemas(classes_by_name={"AnEnum": mocker.MagicMock()}) + schemas = Schemas(enums={"AnEnum": mocker.MagicMock()}) prop, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=Config() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert prop == EnumProperty( @@ -514,21 +585,21 @@ def test_property_from_data_int_enum(self, mocker): required=True, nullable=False, values={"VALUE_1": 1, "VALUE_2": 2, "VALUE_3": 3}, - class_info=Class(name="ParentAnEnum", module_name="parent_an_enum"), + reference=Reference(class_name="ParentAnEnum", module_name="parent_an_enum"), value_type=int, default="ParentAnEnum.VALUE_3", ) assert schemas != new_schemas, "Provided Schemas was mutated" - assert new_schemas.classes_by_name == { - "AnEnum": schemas.classes_by_name["AnEnum"], + assert new_schemas.enums == { + "AnEnum": schemas.enums["AnEnum"], "ParentAnEnum": prop, } def test_property_from_data_ref_enum(self): - from openapi_python_client.parser.properties import Class, EnumProperty, Schemas, property_from_data + from openapi_python_client.parser.properties import EnumProperty, Reference, Schemas, property_from_data name = "some_enum" - data = oai.Reference.construct(ref="#/components/schemas/MyEnum") + data = oai.Reference.construct(ref="MyEnum") existing_enum = EnumProperty( name="an_enum", required=True, @@ -536,13 +607,11 @@ def test_property_from_data_ref_enum(self): default=None, values={"A": "a"}, value_type=str, - class_info=Class(name="MyEnum", module_name="my_enum"), + reference=Reference(class_name="MyEnum", module_name="my_enum"), ) - schemas = Schemas(classes_by_reference={"/components/schemas/MyEnum": existing_enum}) + schemas = Schemas(enums={"MyEnum": existing_enum}) - prop, new_schemas = property_from_data( - name=name, required=False, data=data, schemas=schemas, parent_name="", config=Config() - ) + prop, new_schemas = property_from_data(name=name, required=False, data=data, schemas=schemas, parent_name="") assert prop == EnumProperty( name="some_enum", @@ -551,95 +620,39 @@ def test_property_from_data_ref_enum(self): default=None, values={"A": "a"}, value_type=str, - class_info=Class(name="MyEnum", module_name="my_enum"), - ) - assert schemas == new_schemas - - def test_property_from_data_ref_enum_with_overridden_default(self): - from openapi_python_client.parser.properties import Class, EnumProperty, Schemas, property_from_data - - name = "some_enum" - data = oai.Schema.construct(default="b", allOf=[oai.Reference.construct(ref="#/components/schemas/MyEnum")]) - existing_enum = EnumProperty( - name="an_enum", - required=True, - nullable=False, - default="MyEnum.A", - values={"A": "a", "B": "b"}, - value_type=str, - class_info=Class(name="MyEnum", module_name="my_enum"), - ) - schemas = Schemas(classes_by_reference={"/components/schemas/MyEnum": existing_enum}) - - prop, new_schemas = property_from_data( - name=name, required=False, data=data, schemas=schemas, parent_name="", config=Config() - ) - - assert prop == EnumProperty( - name="some_enum", - required=False, - nullable=False, - default="MyEnum.B", - values={"A": "a", "B": "b"}, - value_type=str, - class_info=Class(name="MyEnum", module_name="my_enum"), - ) - assert schemas == new_schemas - - def test_property_from_data_ref_enum_with_invalid_default(self): - from openapi_python_client.parser.properties import Class, EnumProperty, Schemas, property_from_data - - name = "some_enum" - data = oai.Schema.construct(default="x", allOf=[oai.Reference.construct(ref="#/components/schemas/MyEnum")]) - existing_enum = EnumProperty( - name="an_enum", - required=True, - nullable=False, - default="MyEnum.A", - values={"A": "a", "B": "b"}, - value_type=str, - class_info=Class(name="MyEnum", module_name="my_enum"), + reference=Reference(class_name="MyEnum", module_name="my_enum"), ) - schemas = Schemas(classes_by_reference={"/components/schemas/MyEnum": existing_enum}) - - prop, new_schemas = property_from_data( - name=name, required=False, data=data, schemas=schemas, parent_name="", config=Config() - ) - assert schemas == new_schemas - assert prop == PropertyError(data=data, detail="x is an invalid default for enum MyEnum") def test_property_from_data_ref_model(self): - from openapi_python_client.parser.properties import Class, ModelProperty, Schemas, property_from_data + from openapi_python_client.parser.properties import ModelProperty, Reference, Schemas, property_from_data name = "new_name" required = False class_name = "MyModel" - data = oai.Reference.construct(ref=f"#/components/schemas/{class_name}") + data = oai.Reference.construct(ref=class_name) existing_model = ModelProperty( name="old_name", required=True, nullable=False, default=None, - class_info=Class(name=class_name, module_name="my_model"), + reference=Reference(class_name=class_name, module_name="my_model"), required_properties=[], optional_properties=[], description="", relative_imports=set(), additional_properties=False, ) - schemas = Schemas(classes_by_reference={f"/components/schemas/{class_name}": existing_model}) + schemas = Schemas(models={class_name: existing_model}) - prop, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="", config=Config() - ) + prop, new_schemas = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="") assert prop == ModelProperty( name=name, required=required, nullable=False, default=None, - class_info=Class(name=class_name, module_name="my_model"), + reference=Reference(class_name=class_name, module_name="my_model"), required_properties=[], optional_properties=[], description="", @@ -654,37 +667,18 @@ def test_property_from_data_ref_not_found(self, mocker): name = mocker.MagicMock() required = mocker.MagicMock() data = oai.Reference.construct(ref=mocker.MagicMock()) - parse_reference_path = mocker.patch(f"{MODULE_NAME}.parse_reference_path") + from_ref = mocker.patch(f"{MODULE_NAME}.Reference.from_ref") mocker.patch("openapi_python_client.utils.remove_string_escapes", return_value=name) schemas = Schemas() prop, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=mocker.MagicMock() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) - parse_reference_path.assert_called_once_with(data.ref) + from_ref.assert_called_once_with(data.ref) assert prop == PropertyError(data=data, detail="Could not find reference in parsed models or enums") assert schemas == new_schemas - def test_property_from_data_invalid_ref(self, mocker): - from openapi_python_client.parser.properties import PropertyError, Schemas, property_from_data - - name = mocker.MagicMock() - required = mocker.MagicMock() - data = oai.Reference.construct(ref=mocker.MagicMock()) - parse_reference_path = mocker.patch( - f"{MODULE_NAME}.parse_reference_path", return_value=PropertyError(detail="bad stuff") - ) - schemas = Schemas() - - prop, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=mocker.MagicMock() - ) - - parse_reference_path.assert_called_once_with(data.ref) - assert prop == PropertyError(data=data, detail="bad stuff") - assert schemas == new_schemas - def test_property_from_data_string(self, mocker): from openapi_python_client.parser.properties import Schemas, property_from_data @@ -696,7 +690,7 @@ def test_property_from_data_string(self, mocker): schemas = Schemas() p, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=mocker.MagicMock() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert p == _string_based_property.return_value @@ -720,7 +714,7 @@ def test_property_from_data_simple_types(self, openapi_type, prop_type, python_t schemas = Schemas() p, new_schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=MagicMock() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert p == prop_type(name=name, required=required, default=python_type(data.default), nullable=False) @@ -730,16 +724,12 @@ def test_property_from_data_simple_types(self, openapi_type, prop_type, python_t data.default = 0 data.nullable = True - p, _ = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=MagicMock() - ) + p, _ = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="parent") assert p == prop_type(name=name, required=required, default=python_type(data.default), nullable=True) # Test bad default value data.default = "a" - p, _ = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=MagicMock() - ) + p, _ = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="parent") assert python_type is bool or isinstance(p, PropertyError) def test_property_from_data_array(self, mocker): @@ -754,15 +744,12 @@ def test_property_from_data_array(self, mocker): build_list_property = mocker.patch(f"{MODULE_NAME}.build_list_property") mocker.patch("openapi_python_client.utils.remove_string_escapes", return_value=name) schemas = Schemas() - config = MagicMock() - response = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=config - ) + response = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="parent") assert response == build_list_property.return_value build_list_property.assert_called_once_with( - data=data, name=name, required=required, schemas=schemas, parent_name="parent", config=config + data=data, name=name, required=required, schemas=schemas, parent_name="parent" ) def test_property_from_data_object(self, mocker): @@ -776,15 +763,12 @@ def test_property_from_data_object(self, mocker): build_model_property = mocker.patch(f"{MODULE_NAME}.build_model_property") mocker.patch("openapi_python_client.utils.remove_string_escapes", return_value=name) schemas = Schemas() - config = MagicMock() - response = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=config - ) + response = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="parent") assert response == build_model_property.return_value build_model_property.assert_called_once_with( - data=data, name=name, required=required, schemas=schemas, parent_name="parent", config=config + data=data, name=name, required=required, schemas=schemas, parent_name="parent" ) def test_property_from_data_union(self, mocker): @@ -801,38 +785,13 @@ def test_property_from_data_union(self, mocker): build_union_property = mocker.patch(f"{MODULE_NAME}.build_union_property") mocker.patch("openapi_python_client.utils.remove_string_escapes", return_value=name) schemas = Schemas() - config = MagicMock() - response = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=config - ) + response = property_from_data(name=name, required=required, data=data, schemas=schemas, parent_name="parent") assert response == build_union_property.return_value build_union_property.assert_called_once_with( - data=data, name=name, required=required, schemas=schemas, parent_name="parent", config=config - ) - - def test_property_from_data_union_of_one_element(self, mocker, model_property_factory): - from openapi_python_client.parser.properties import Class, ModelProperty, Schemas, property_from_data - - name = "new_name" - required = False - class_name = "MyModel" - existing_model = model_property_factory() - schemas = Schemas(classes_by_reference={f"/{class_name}": existing_model}) - - data = oai.Schema.construct( - allOf=[oai.Reference.construct(ref=f"#/{class_name}")], - nullable=True, + data=data, name=name, required=required, schemas=schemas, parent_name="parent" ) - build_union_property = mocker.patch(f"{MODULE_NAME}.build_union_property") - - prop, schemas = property_from_data( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=Config() - ) - - assert prop == attr.evolve(existing_model, name=name, required=required) - build_union_property.assert_not_called() def test_property_from_data_unsupported_type(self, mocker): name = mocker.MagicMock() @@ -842,9 +801,7 @@ def test_property_from_data_unsupported_type(self, mocker): from openapi_python_client.parser.errors import PropertyError from openapi_python_client.parser.properties import Schemas, property_from_data - assert property_from_data( - name=name, required=required, data=data, schemas=Schemas(), parent_name="parent", config=MagicMock() - ) == ( + assert property_from_data(name=name, required=required, data=data, schemas=Schemas(), parent_name="parent") == ( PropertyError(data=data, detail=f"unknown type {data.type}"), Schemas(), ) @@ -856,7 +813,7 @@ def test_property_from_data_no_valid_props_in_data(self): data = oai.Schema() prop, new_schemas = property_from_data( - name="blah", required=True, data=data, schemas=schemas, parent_name="parent", config=MagicMock() + name="blah", required=True, data=data, schemas=schemas, parent_name="parent" ) assert prop == NoneProperty(name="blah", required=True, nullable=False, default=None) @@ -871,7 +828,7 @@ def test_property_from_data_validation_error(self, mocker): data = oai.Schema() err, new_schemas = property_from_data( - name="blah", required=True, data=data, schemas=schemas, parent_name="parent", config=MagicMock() + name="blah", required=True, data=data, schemas=schemas, parent_name="parent" ) assert err == PropertyError(detail="Failed to validate default value", data=data) assert new_schemas == schemas @@ -888,7 +845,7 @@ def test_build_list_property_no_items(self, mocker): schemas = properties.Schemas() p, new_schemas = properties.build_list_property( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=MagicMock() + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert p == PropertyError(data=data, detail="type array must have items defined") @@ -909,17 +866,16 @@ def test_build_list_property_invalid_items(self, mocker): property_from_data = mocker.patch.object( properties, "property_from_data", return_value=(properties.PropertyError(data="blah"), second_schemas) ) - config = MagicMock() p, new_schemas = properties.build_list_property( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=config + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert p == PropertyError(data="blah", detail=f"invalid data in items of array {name}") assert new_schemas == second_schemas assert schemas != new_schemas, "Schema was mutated" property_from_data.assert_called_once_with( - name=f"{name}_item", required=True, data=data.items, schemas=schemas, parent_name="parent", config=config + name=f"{name}_item", required=True, data=data.items, schemas=schemas, parent_name="parent" ) def test_build_list_property(self, mocker): @@ -938,10 +894,9 @@ def test_build_list_property(self, mocker): ) mocker.patch("openapi_python_client.utils.snake_case", return_value=name) mocker.patch("openapi_python_client.utils.to_valid_python_identifier", return_value=name) - config = MagicMock() p, new_schemas = properties.build_list_property( - name=name, required=required, data=data, schemas=schemas, parent_name="parent", config=config + name=name, required=required, data=data, schemas=schemas, parent_name="parent" ) assert isinstance(p, properties.ListProperty) @@ -949,7 +904,7 @@ def test_build_list_property(self, mocker): assert new_schemas == second_schemas assert schemas != new_schemas, "Schema was mutated" property_from_data.assert_called_once_with( - name=f"{name}_item", required=True, data=data.items, schemas=schemas, parent_name="parent", config=config + name=f"{name}_item", required=True, data=data.items, schemas=schemas, parent_name="parent" ) @@ -970,9 +925,7 @@ def test_property_from_data_union(self, mocker): from openapi_python_client.parser.properties import Schemas, property_from_data - p, s = property_from_data( - name=name, required=required, data=data, schemas=Schemas(), parent_name="parent", config=MagicMock() - ) + p, s = property_from_data(name=name, required=required, data=data, schemas=Schemas(), parent_name="parent") FloatProperty.assert_called_once_with(name=name, required=required, default=0.0, nullable=False) IntProperty.assert_called_once_with(name=name, required=required, default=0, nullable=False) @@ -994,9 +947,7 @@ def test_property_from_data_union_bad_type(self, mocker): from openapi_python_client.parser.properties import Schemas, property_from_data - p, s = property_from_data( - name=name, required=required, data=data, schemas=Schemas(), parent_name="parent", config=MagicMock() - ) + p, s = property_from_data(name=name, required=required, data=data, schemas=Schemas(), parent_name="parent") assert p == PropertyError(detail=f"Invalid property in union {name}", data=oai.Schema(type="garbage")) @@ -1085,86 +1036,63 @@ def test__string_based_property_unsupported_format(self, mocker): assert p == StringProperty(name=name, required=required, nullable=True, default=None) -class TestBuildSchemas: - def test_skips_references_and_keeps_going(self, mocker): - from openapi_python_client.parser.properties import Schemas, build_schemas - from openapi_python_client.schema import Reference, Schema - - components = {"a_ref": Reference.construct(), "a_schema": Schema.construct()} - update_schemas_with_data = mocker.patch(f"{MODULE_NAME}.update_schemas_with_data") - parse_reference_path = mocker.patch(f"{MODULE_NAME}.parse_reference_path") - config = Config() - - result = build_schemas(components=components, schemas=Schemas(), config=config) - # Should not even try to parse a path for the Reference - parse_reference_path.assert_called_once_with("#/components/schemas/a_schema") - update_schemas_with_data.assert_called_once_with( - ref_path=parse_reference_path.return_value, - config=config, - data=components["a_schema"], - schemas=Schemas( - errors=[PropertyError(detail="Reference schemas are not supported.", data=components["a_ref"])] - ), - ) - assert result == update_schemas_with_data.return_value +def test_build_schemas(mocker): + build_model_property = mocker.patch(f"{MODULE_NAME}.build_model_property") + in_data = {"1": mocker.MagicMock(enum=None), "2": mocker.MagicMock(enum=None), "3": mocker.MagicMock(enum=None)} + model_1 = mocker.MagicMock() + schemas_1 = mocker.MagicMock() + model_2 = mocker.MagicMock() + schemas_2 = mocker.MagicMock(errors=[]) + error = PropertyError() + schemas_3 = mocker.MagicMock() - def test_records_bad_uris_and_keeps_going(self, mocker): - from openapi_python_client.parser.properties import Schemas, build_schemas - from openapi_python_client.schema import Schema + # This loops through one for each, then again to retry the error + build_model_property.side_effect = [ + (model_1, schemas_1), + (model_2, schemas_2), + (error, schemas_3), + (error, schemas_3), + ] - components = {"first": Schema.construct(), "second": Schema.construct()} - update_schemas_with_data = mocker.patch(f"{MODULE_NAME}.update_schemas_with_data") - parse_reference_path = mocker.patch( - f"{MODULE_NAME}.parse_reference_path", side_effect=[PropertyError(detail="some details"), "a_path"] - ) - config = Config() - - result = build_schemas(components=components, schemas=Schemas(), config=config) - parse_reference_path.assert_has_calls( - [ - call("#/components/schemas/first"), - call("#/components/schemas/second"), - ] - ) - update_schemas_with_data.assert_called_once_with( - ref_path="a_path", - config=config, - data=components["second"], - schemas=Schemas(errors=[PropertyError(detail="some details", data=components["first"])]), - ) - assert result == update_schemas_with_data.return_value + from openapi_python_client.parser.properties import Schemas, build_schemas - def test_retries_failing_properties_while_making_progress(self, mocker): - from openapi_python_client.parser.properties import Schemas, build_schemas - from openapi_python_client.schema import Schema + result = build_schemas(components=in_data) - components = {"first": Schema.construct(), "second": Schema.construct()} - update_schemas_with_data = mocker.patch( - f"{MODULE_NAME}.update_schemas_with_data", side_effect=[PropertyError(), Schemas(), PropertyError()] - ) - parse_reference_path = mocker.patch(f"{MODULE_NAME}.parse_reference_path") - config = Config() - - result = build_schemas(components=components, schemas=Schemas(), config=config) - parse_reference_path.assert_has_calls( - [ - call("#/components/schemas/first"), - call("#/components/schemas/second"), - call("#/components/schemas/first"), - ] - ) - assert update_schemas_with_data.call_count == 3 - assert result.errors == [PropertyError()] + build_model_property.assert_has_calls( + [ + mocker.call(data=in_data["1"], name="1", schemas=Schemas(), required=True, parent_name=None), + mocker.call(data=in_data["2"], name="2", schemas=schemas_1, required=True, parent_name=None), + mocker.call(data=in_data["3"], name="3", schemas=schemas_2, required=True, parent_name=None), + mocker.call(data=in_data["3"], name="3", schemas=schemas_2, required=True, parent_name=None), + ] + ) + # schemas_3 was the last to come back from build_model_property, but it should be ignored because it's an error + assert result == schemas_2 + assert result.errors == [error] + + +def test_build_enums(mocker): + from openapi_python_client.parser.openapi import build_schemas + + build_model_property = mocker.patch(f"{MODULE_NAME}.build_model_property") + schemas = mocker.MagicMock() + build_enum_property = mocker.patch(f"{MODULE_NAME}.build_enum_property", return_value=(mocker.MagicMock(), schemas)) + in_data = {"1": mocker.MagicMock(enum=["val1", "val2", "val3"])} + + build_schemas(components=in_data) + + build_enum_property.assert_called() + build_model_property.assert_not_called() def test_build_enum_property_conflict(mocker): from openapi_python_client.parser.properties import Schemas, build_enum_property data = oai.Schema() - schemas = Schemas(classes_by_name={"Existing": mocker.MagicMock()}) + schemas = Schemas(enums={"Existing": mocker.MagicMock()}) err, schemas = build_enum_property( - data=data, name="Existing", required=True, schemas=schemas, enum=[], parent_name=None, config=Config() + data=data, name="Existing", required=True, schemas=schemas, enum=[], parent_name=None ) assert schemas == schemas @@ -1178,7 +1106,7 @@ def test_build_enum_property_no_values(): schemas = Schemas() err, schemas = build_enum_property( - data=data, name="Existing", required=True, schemas=schemas, enum=[], parent_name=None, config=Config() + data=data, name="Existing", required=True, schemas=schemas, enum=[], parent_name=None ) assert schemas == schemas @@ -1192,7 +1120,7 @@ def test_build_enum_property_bad_default(): schemas = Schemas() err, schemas = build_enum_property( - data=data, name="Existing", required=True, schemas=schemas, enum=["A"], parent_name=None, config=Config() + data=data, name="Existing", required=True, schemas=schemas, enum=["A"], parent_name=None ) assert schemas == schemas diff --git a/tests/test_resolver/test_resolver_collision_resolver.py b/tests/test_resolver/test_resolver_collision_resolver.py new file mode 100644 index 000000000..0d9191a1c --- /dev/null +++ b/tests/test_resolver/test_resolver_collision_resolver.py @@ -0,0 +1,163 @@ +import pathlib +import urllib +import urllib.parse + +import pytest + + +def test__collision_resolver_get_schema_from_ref(): + + from openapi_python_client.resolver.collision_resolver import CollisionResolver + + root_schema = {"foo": {"$ref": "first_instance.yaml#/foo"}} + + external_schemas = {"/home/user/first_instance.yaml": {"food": {"description": "food_first_description"}}} + + errors = [] + + CollisionResolver(root_schema, external_schemas, errors, "/home/user").resolve() + + assert len(errors) == 1 + assert errors == ["Did not find data corresponding to the reference first_instance.yaml#/foo"] + + +def test__collision_resolver_duplicate_schema(): + + from openapi_python_client.resolver.collision_resolver import CollisionResolver + + root_schema = { + "foo": {"$ref": "first_instance.yaml#/foo"}, + "bar": {"$ref": "second_instance.yaml#/bar/foo"}, + } + + external_schemas = { + "/home/user/first_instance.yaml": {"foo": {"description": "foo_first_description"}}, + "/home/user/second_instance.yaml": {"bar": {"foo": {"description": "foo_first_description"}}}, + } + + errors = [] + + CollisionResolver(root_schema, external_schemas, errors, "/home/user").resolve() + + assert len(errors) == 1 + assert errors == ["Found a duplicate schema in first_instance.yaml#/foo and second_instance.yaml#/bar/foo"] + + +def test__collision_resolver(): + + from openapi_python_client.resolver.collision_resolver import CollisionResolver + + root_schema = { + "foobar": {"$ref": "first_instance.yaml#/foo"}, + "barfoo": {"$ref": "second_instance.yaml#/foo"}, + "barbarfoo": {"$ref": "third_instance.yaml#/foo"}, + "foobarfoo": {"$ref": "second_instance.yaml#/foo"}, + "barfoobar": {"$ref": "first_instance.yaml#/bar/foo"}, + "localref": {"$ref": "#/local_ref"}, + "local_ref": {"description": "a local ref"}, + "array": ["array_item_one", "array_item_two"], + "last": {"$ref": "first_instance.yaml#/fourth_instance"}, + "baz": {"$ref": "fifth_instance.yaml#/foo"}, + } + + external_schemas = { + "/home/user/first_instance.yaml": { + "foo": {"description": "foo_first_description"}, + "bar": {"foo": {"description": "nested foo"}}, + "fourth_instance": {"$ref": "fourth_instance.yaml#/foo"}, + }, + "/home/user/second_instance.yaml": { + "foo": {"description": "foo_second_description"}, + "another_local_ref": {"$ref": "#/foo"}, + }, + "/home/user/third_instance.yaml": {"foo": {"description": "foo_third_description"}}, + "/home/user/fourth_instance.yaml": {"foo": {"description": "foo_fourth_description"}}, + "/home/user/fifth_instance.yaml": {"foo": {"description": "foo_second_description"}}, + } + + root_schema_result = { + "foobar": {"$ref": "first_instance.yaml#/foo"}, + "barfoo": {"$ref": "second_instance.yaml#/foo_2"}, + "barbarfoo": {"$ref": "third_instance.yaml#/foo_3"}, + "foobarfoo": {"$ref": "second_instance.yaml#/foo_2"}, + "barfoobar": {"$ref": "first_instance.yaml#/bar/foo"}, + "localref": {"$ref": "#/local_ref"}, + "local_ref": {"description": "a local ref"}, + "array": ["array_item_one", "array_item_two"], + "last": {"$ref": "first_instance.yaml#/fourth_instance"}, + "baz": {"$ref": "fifth_instance.yaml#/foo_2"}, + } + + external_schemas_result = { + "/home/user/first_instance.yaml": { + "foo": {"description": "foo_first_description"}, + "bar": {"foo": {"description": "nested foo"}}, + "fourth_instance": {"$ref": "fourth_instance.yaml#/foo_4"}, + }, + "/home/user/second_instance.yaml": { + "foo_2": {"description": "foo_second_description"}, + "another_local_ref": {"$ref": "#/foo_2"}, + }, + "/home/user/third_instance.yaml": {"foo_3": {"description": "foo_third_description"}}, + "/home/user/fourth_instance.yaml": {"foo_4": {"description": "foo_fourth_description"}}, + "/home/user/fifth_instance.yaml": {"foo_2": {"description": "foo_second_description"}}, + } + + errors = [] + + CollisionResolver(root_schema, external_schemas, errors, "/home/user").resolve() + + assert len(errors) == 0 + assert root_schema == root_schema_result + assert external_schemas == external_schemas_result + + +def test__collision_resolver_deep_root_keys(): + + from openapi_python_client.resolver.collision_resolver import CollisionResolver + + root_schema = { + "foobar": {"$ref": "first_instance.yaml#/bar/foo"}, + "barfoo": {"$ref": "second_instance.yaml#/bar/foo"}, + "barfoobar": {"$ref": "second_instance.yaml#/barfoobar"}, + } + + external_schemas = { + "/home/user/first_instance.yaml": { + "bar": {"foo": {"description": "foo_first_description"}}, + }, + "/home/user/second_instance.yaml": { + "bar": {"foo": {"description": "foo_second_description"}}, + "barfoobar": { + "type": "object", + "allOf": [{"description": "first_description"}, {"description": "second_description"}], + }, + }, + } + + root_schema_result = { + "foobar": {"$ref": "first_instance.yaml#/bar/foo"}, + "barfoo": {"$ref": "second_instance.yaml#/bar/foo_2"}, + "barfoobar": {"$ref": "second_instance.yaml#/barfoobar"}, + } + + external_schemas_result = { + "/home/user/first_instance.yaml": { + "bar": {"foo": {"description": "foo_first_description"}}, + }, + "/home/user/second_instance.yaml": { + "bar": {"foo_2": {"description": "foo_second_description"}}, + "barfoobar": { + "type": "object", + "allOf": [{"description": "first_description"}, {"description": "second_description"}], + }, + }, + } + + errors = [] + + CollisionResolver(root_schema, external_schemas, errors, "/home/user").resolve() + + assert len(errors) == 0 + assert root_schema == root_schema_result + assert external_schemas == external_schemas_result diff --git a/tests/test_resolver/test_resolver_data_loader.py b/tests/test_resolver/test_resolver_data_loader.py new file mode 100644 index 000000000..271067ccd --- /dev/null +++ b/tests/test_resolver/test_resolver_data_loader.py @@ -0,0 +1,52 @@ +import pytest + + +def test_load(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + dl_load_json = mocker.patch("openapi_python_client.resolver.data_loader.DataLoader.load_json") + dl_load_yaml = mocker.patch("openapi_python_client.resolver.data_loader.DataLoader.load_yaml") + + content = mocker.MagicMock() + DataLoader.load("foobar.json", content) + dl_load_json.assert_called_once_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.jSoN", content) + dl_load_json.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.yaml", content) + dl_load_yaml.assert_called_once_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.yAmL", content) + dl_load_yaml.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.ymL", content) + dl_load_yaml.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar", content) + dl_load_yaml.assert_called_with(content) + + +def test_load_yaml(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + yaml_safeload = mocker.patch("yaml.safe_load") + + content = mocker.MagicMock() + DataLoader.load_yaml(content) + yaml_safeload.assert_called_once_with(content) + + +def test_load_json(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + json_loads = mocker.patch("json.loads") + + content = mocker.MagicMock() + DataLoader.load_json(content) + json_loads.assert_called_once_with(content) diff --git a/tests/test_resolver/test_resolver_pointer.py b/tests/test_resolver/test_resolver_pointer.py new file mode 100644 index 000000000..92e1ded35 --- /dev/null +++ b/tests/test_resolver/test_resolver_pointer.py @@ -0,0 +1,97 @@ +import pytest + + +def get_data_set(): + # https://tools.ietf.org/html/rfc6901 + return { + "valid_pointers": [ + "/myElement", + "/definitions/myElement", + "", + "/foo", + "/foo/0", + "/", + "/a~1b", + "/c%d", + "/e^f", + "/g|h", + "/i\\j" '/k"l', + "/ ", + "/m~0n", + "/m~01", + ], + "invalid_pointers": ["../foo", "foobar", None], + "tokens_by_pointer": { + "/myElement": ["", "myElement"], + "/definitions/myElement": ["", "definitions", "myElement"], + "": [""], + "/foo": ["", "foo"], + "/foo/0": ["", "foo", "0"], + "/": ["", ""], + "/a~1b": ["", "a/b"], + "/c%d": ["", "c%d"], + "/e^f": ["", "e^f"], + "/g|h": ["", "g|h"], + "/i\\j": ["", "i\\j"], + '/k"l': ["", 'k"l'], + "/ ": ["", " "], + "/m~0n": ["", "m~n"], + "/m~01": ["", "m~1"], + }, + } + + +def test___init__(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["valid_pointers"]: + p = Pointer(pointer_str) + assert p.value != None + assert p.value == pointer_str + + for pointer_str in data_set["invalid_pointers"]: + with pytest.raises(ValueError): + p = Pointer(pointer_str) + + +def test_token(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["tokens_by_pointer"].keys(): + p = Pointer(pointer_str) + expected_tokens = data_set["tokens_by_pointer"][pointer_str] + + for idx, token in enumerate(p.tokens()): + assert expected_tokens[idx] == token + + +def test_parent(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["tokens_by_pointer"].keys(): + p = Pointer(pointer_str) + expected_tokens = data_set["tokens_by_pointer"][pointer_str] + + while p.parent is not None: + p = p.parent + expected_tokens.pop() + assert p.tokens()[-1] == expected_tokens[-1] + assert len(p.tokens()) == len(expected_tokens) + + assert len(expected_tokens) == 1 + assert expected_tokens[-1] == "" + + +def test__unescape_and__escape(): + from openapi_python_client.resolver.pointer import Pointer + + escaped_unescaped_values = [("/m~0n", "/m~n"), ("/m~01", "/m~1"), ("/a~1b", "/a/b"), ("/foobar", "/foobar")] + + for escaped, unescaped in escaped_unescaped_values: + assert Pointer(escaped).unescapated_value == unescaped diff --git a/tests/test_resolver/test_resolver_reference.py b/tests/test_resolver/test_resolver_reference.py new file mode 100644 index 000000000..bc13266b2 --- /dev/null +++ b/tests/test_resolver/test_resolver_reference.py @@ -0,0 +1,223 @@ +import pytest + + +def get_data_set(): + # https://swagger.io/docs/specification/using-ref/ + return { + "local_references": ["#/definitions/myElement"], + "remote_references": [ + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + ], + "url_references": [ + "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement", + "//anotherserver.com/files/example.json", + ], + "relative_references": [ + "#/definitions/myElement", + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + ], + "absolute_references": [ + "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement", + "//anotherserver.com/files/example.json", + ], + "full_document_references": [ + "http://path/to/your/resource", + "//anotherserver.com/files/example.json", + ], + "not_full_document_references": [ + "#/definitions/myElement", + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + "http://path/to/your/resource.json#myElement", + ], + "path_by_reference": { + "#/definitions/myElement": "", + "document.json#/myElement": "document.json", + "../document.json#/myElement": "../document.json", + "../another-folder/document.json#/myElement": "../another-folder/document.json", + "http://path/to/your/resource": "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement": "http://path/to/your/resource.json", + "//anotherserver.com/files/example.json": "//anotherserver.com/files/example.json", + }, + "pointer_by_reference": { + "#/definitions/myElement": "/definitions/myElement", + "document.json#/myElement": "/myElement", + "../document.json#/myElement": "/myElement", + "../another-folder/document.json#/myElement": "/myElement", + "http://path/to/your/resource": "", + "http://path/to/your/resource.json#myElement": "/myElement", + "//anotherserver.com/files/example.json": "", + }, + "pointerparent_by_reference": { + "#/definitions/myElement": "/definitions", + "document.json#/myElement": "", + "../document.json#/myElement": "", + "../another-folder/document.json#/myElement": "", + "http://path/to/your/resource": None, + "http://path/to/your/resource.json#myElement": "", + "//anotherserver.com/files/example.json": None, + }, + } + + +def test_is_local(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_local() == True + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_local() == False + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_local() == False + + +def test_is_remote(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == False + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == True + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == True + + +def test_is_url(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_url() == False + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_url() == False + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_url() == True + + +def test_is_absolute(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["absolute_references"]: + ref = Reference(ref_str) + assert ref.is_absolute() == True + + for ref_str in data_set["relative_references"]: + ref = Reference(ref_str) + assert ref.is_absolute() == False + + +def test_is_relative(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["absolute_references"]: + ref = Reference(ref_str) + assert ref.is_relative() == False + + for ref_str in data_set["relative_references"]: + ref = Reference(ref_str) + assert ref.is_relative() == True + + +def test_pointer(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["pointer_by_reference"].keys(): + ref = Reference(ref_str) + pointer = data_set["pointer_by_reference"][ref_str] + assert ref.pointer.value == pointer + + +def test_pointer_parent(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["pointerparent_by_reference"].keys(): + ref = Reference(ref_str) + pointer_parent = data_set["pointerparent_by_reference"][ref_str] + + if pointer_parent is not None: + assert ref.pointer.parent.value == pointer_parent + else: + assert ref.pointer.parent == None + + +def test_path(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["path_by_reference"].keys(): + ref = Reference(ref_str) + path = data_set["path_by_reference"][ref_str] + assert ref.path == path + + +def test_abs_path(): + + from openapi_python_client.resolver.reference import Reference + + ref = Reference("foo.yaml#/foo") + ref_with_parent = Reference("foo.yaml#/foo", "/home/user") + + assert ref.abs_path == "foo.yaml" + assert ref_with_parent.abs_path == "/home/user/foo.yaml" + + +def test_is_full_document(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["full_document_references"]: + ref = Reference(ref_str) + assert ref.is_full_document() == True + assert ref.pointer.parent == None + + for ref_str in data_set["not_full_document_references"]: + ref = Reference(ref_str) + assert ref.is_full_document() == False + assert ref.pointer.parent != None + + +def test_value(): + from openapi_python_client.resolver.reference import Reference + + ref = Reference("fooBaR") + assert ref.value == "fooBaR" + + ref = Reference("FooBAR") + assert ref.value == "FooBAR" diff --git a/tests/test_resolver/test_resolver_resolved_schema.py b/tests/test_resolver/test_resolver_resolved_schema.py new file mode 100644 index 000000000..3b3e6b9d8 --- /dev/null +++ b/tests/test_resolver/test_resolver_resolved_schema.py @@ -0,0 +1,170 @@ +import pathlib +import urllib +import urllib.parse + +import pytest + + +def test__resolved_schema_with_resolved_external_references(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = {"foobar": {"$ref": "foobar.yaml#/foo"}} + + external_schemas = { + "/home/user/foobar.yaml": {"foo": {"$ref": "/home/user/foobar_2.yaml#/foo"}}, + "/home/user/foobar_2.yaml": {"foo": {"description": "foobar_description"}}, + } + errors = [] + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 0 + assert "foo" in resolved_schema + assert "foobar" in resolved_schema + assert "$ref" in resolved_schema["foobar"] + assert "#/foo" in resolved_schema["foobar"]["$ref"] + assert "description" in resolved_schema["foo"] + assert "foobar_description" in resolved_schema["foo"]["description"] + + +def test__resolved_schema_with_depth_refs(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = {"foo": {"$ref": "foo.yaml#/foo"}, "bar": {"$ref": "bar.yaml#/bar"}} + + external_schemas = { + "/home/user/foo.yaml": {"foo": {"$ref": "bar.yaml#/bar"}}, + "/home/user/bar.yaml": {"bar": {"description": "bar"}}, + } + + errors = [] + + expected_result = {"foo": {"$ref": "#/bar"}, "bar": {"description": "bar"}} + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 0 + assert resolved_schema == expected_result + + +def test__resolved_schema_with_duplicate_ref(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = { + "foo": {"$ref": "foobar.yaml#/foo"}, + "bar": {"$ref": "foobar.yaml#/foo"}, + "list": [{"foobar": {"$ref": "foobar.yaml#/bar"}}, {"barfoo": {"$ref": "foobar.yaml#/bar2/foo"}}], + } + + external_schemas = { + "/home/user/foobar.yaml": { + "foo": {"description": "foo_description"}, + "bar": {"$ref": "#/foo"}, + "bar2": {"foo": {"description": "foo_second_description"}}, + }, + } + + errors = [] + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 0 + + +def test__resolved_schema_with_malformed_schema(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = { + "paths": { + "/foo/bar": {"$ref": "inexistant.yaml#/paths/~1foo~1bar"}, + "/bar": {"$ref": "foobar.yaml#/paths/~1bar"}, + }, + "foo": {"$ref": "inexistant.yaml#/foo"}, + } + + external_schemas = { + "/home/user/foobar.yaml": { + "paths": { + "/foo/bar": {"description": "foobar_description"}, + }, + }, + } + + errors = [] + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 4 + assert errors == [ + "Failed to resolve remote reference > /home/user/inexistant.yaml", + "Failed to read remote value /paths//bar, in remote ref /home/user/foobar.yaml", + "Failed to resolve remote reference > /home/user/inexistant.yaml", + "Failed to resolve remote reference > /home/user/inexistant.yaml", + ] + + +def test__resolved_schema_with_remote_paths(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = { + "paths": { + "/foo/bar": {"$ref": "foobar.yaml#/paths/~1foo~1bar"}, + "/foo/bar2": {"$ref": "#/bar2"}, + }, + "bar2": {"description": "bar2_description"}, + } + + external_schemas = { + "/home/user/foobar.yaml": { + "paths": { + "/foo/bar": {"description": "foobar_description"}, + }, + }, + } + + expected_result = { + "paths": {"/foo/bar": {"description": "foobar_description"}, "/foo/bar2": {"$ref": "#/bar2"}}, + "bar2": {"description": "bar2_description"}, + } + + errors = [] + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 0 + assert resolved_schema == expected_result + + +def test__resolved_schema_with_absolute_paths(): + + from openapi_python_client.resolver.resolved_schema import ResolvedSchema + + root_schema = {"foobar": {"$ref": "foobar.yaml#/foo"}, "barfoo": {"$ref": "../barfoo.yaml#/bar"}} + + external_schemas = { + "/home/user/foobar.yaml": {"foo": {"description": "foobar_description"}}, + "/home/barfoo.yaml": {"bar": {"description": "barfoo_description"}}, + } + + errors = [] + + resolved_schema = ResolvedSchema(root_schema, external_schemas, errors, "/home/user").schema + + assert len(errors) == 0 + assert "foo" in resolved_schema + assert "bar" in resolved_schema + assert "foobar" in resolved_schema + assert "barfoo" in resolved_schema + assert "$ref" in resolved_schema["foobar"] + assert "#/foo" in resolved_schema["foobar"]["$ref"] + assert "$ref" in resolved_schema["barfoo"] + assert "#/bar" in resolved_schema["barfoo"]["$ref"] + assert "description" in resolved_schema["foo"] + assert "foobar_description" in resolved_schema["foo"]["description"] + assert "description" in resolved_schema["bar"] + assert "barfoo_description" in resolved_schema["bar"]["description"] diff --git a/tests/test_resolver/test_resolver_schema_resolver.py b/tests/test_resolver/test_resolver_schema_resolver.py new file mode 100644 index 000000000..36caa3d7e --- /dev/null +++ b/tests/test_resolver/test_resolver_schema_resolver.py @@ -0,0 +1,267 @@ +import pathlib +import urllib +import urllib.parse + +import pytest + + +def test___init__invalid_data(mocker): + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + with pytest.raises(ValueError): + SchemaResolver(None) + + invalid_url = "foobar" + with pytest.raises(ValueError): + SchemaResolver(invalid_url) + + invalid_url = 42 + with pytest.raises(urllib.error.URLError): + SchemaResolver(invalid_url) + + invalid_url = mocker.Mock() + with pytest.raises(urllib.error.URLError): + SchemaResolver(invalid_url) + + +def test__init_with_filepath(mocker): + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", return_value={}) + path = mocker.MagicMock() + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + resolver = SchemaResolver(path) + resolver.resolve() + + path.absolute().read_bytes.assert_called_once() + + +def test__init_with_url(mocker): + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", return_value={}) + url_parse = mocker.patch( + "urllib.parse.urlparse", + return_value=urllib.parse.ParseResult( + scheme="http", netloc="foobar.io", path="foo", params="", query="", fragment="/bar" + ), + ) + get = mocker.patch("httpx.get") + + url = mocker.MagicMock() + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + resolver = SchemaResolver(url) + resolver.resolve() + + url_parse.assert_called_once_with(url) + get.assert_called_once() + + +def test__resolve_schema_references_with_path(mocker): + read_bytes = mocker.patch("pathlib.Path.read_bytes") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + path = pathlib.Path("/foo/bar/foobar") + path_parent = str(path.parent) + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(path, data): + if path == "/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if path == "/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if path == "/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "/barfoo.io/foobar#foobar"}} + + if path == "/barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if path == "/barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "/bar.foo/foobar"}} + + if path == "/bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "/foo.bar/foobar"}} + + if path == "/foo.bar/foobar": + return {"foobar": "foo.bar/foobar", "bar": {"$ref": "/foo/bar/foobar"}} # Loop to first path + + raise ValueError(f"Unexpected path {path}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(path) + resolver._resolve_schema_references(path_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "/foo/bar/foobar" in external_schemas + assert "/foo/bar/bar" in external_schemas + assert "/foo/bar" in external_schemas + assert "/barfoo.io/foobar" in external_schemas + assert "/barfoo.io/bar" in external_schemas + assert "/bar.foo/foobar" in external_schemas + assert "/foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_with_url(mocker): + get = mocker.patch("httpx.get") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + url = "http://foobar.io/foo/bar/foobar" + url_parent = "http://foobar.io/foo/bar/" + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(url, data): + if url == "http://foobar.io/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if url == "http://foobar.io/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if url == "http://foobar.io/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if url == "http://barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if url == "http://barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "https://bar.foo/foobar"}} + + if url == "https://bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "//foo.bar/foobar"}} + + if url == "https://foo.bar/foobar": + return {"foobar": "foo.bar/foobar", "bar": {"$ref": "http://foobar.io/foo/bar/foobar"}} # Loop to first uri + + raise ValueError(f"Unexpected url {url}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + + resolver = SchemaResolver(url) + resolver._resolve_schema_references(url_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "http://foobar.io/foo/bar/bar" in external_schemas + assert "http://foobar.io/foo/bar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/bar" in external_schemas + assert "https://bar.foo/foobar" in external_schemas + assert "https://foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_mix_path_and_url(mocker): + read_bytes = mocker.patch("pathlib.Path.read_bytes") + get = mocker.patch("httpx.get") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + path = pathlib.Path("/foo/bar/foobar") + path_parent = str(path.parent) + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(path, data): + if path == "/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if path == "/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if path == "/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if path == "http://barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if path == "http://barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "https://bar.foo/foobar"}} + + if path == "https://bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "//foo.bar/foobar"}} + + if path == "https://foo.bar/foobar": + return {"foobar": "foo.bar/foobar"} + + raise ValueError(f"Unexpected path {path}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(path) + resolver._resolve_schema_references(path_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "/foo/bar/foobar" in external_schemas + assert "/foo/bar/bar" in external_schemas + assert "/foo/bar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/bar" in external_schemas + assert "https://bar.foo/foobar" in external_schemas + assert "https://foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_with_error(mocker): + get = mocker.patch("httpx.get") + + import httpcore + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + url = "http://foobar.io/foo/bar/foobar" + url_parent = "http://foobar.io/foo/bar/" + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(url, data): + if url == "http://foobar.io/foo/bar/foobar": + return { + "foobar": "bar", + "bar": {"$ref": "bar#/foobar"}, + "barfoor": {"$ref": "barfoo#foobar"}, + "local": {"$ref": "#/toto"}, + } + + if url == "http://foobar.io/foo/bar/bar": + raise httpcore.NetworkError("mocked error") + + if url == "http://foobar.io/foo/bar/barfoo": + return {"foobar": "foo/bar/barfoo", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if url == "http://barfoo.io/foobar": + return {"foobar": "foobar"} + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(url) + resolver._resolve_schema_references(url_parent, schema, external_schemas, errors, True) + + assert len(errors) == 1 + assert errors[0] == "Failed to gather external reference data of bar#/foobar from http://foobar.io/foo/bar/bar" + assert "http://foobar.io/foo/bar/bar" not in external_schemas + assert "http://foobar.io/foo/bar/foobar" in external_schemas + assert "http://foobar.io/foo/bar/barfoo" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + + +def test___lookup_schema_references(): + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + data_set = { + "foo": {"$ref": "#/ref_1"}, + "bar": {"foobar": {"$ref": "#/ref_2"}}, + "foobar": [{"foo": {"$ref": "#/ref_3"}}, {"bar": [{"foobar": {"$ref": "#/ref_4"}}]}], + } + + resolver = SchemaResolver("http://foobar.io") + expected_references = sorted([f"#/ref_{x}" for x in range(1, 5)]) + references = sorted([x.value for x in resolver._lookup_schema_references(data_set)]) + + for idx, ref in enumerate(references): + assert expected_references[idx] == ref