From 3210e7aeb33df8c3cfbb64c5af8e778315b53b5d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 21 Mar 2025 22:11:16 +0100 Subject: [PATCH 1/2] Add type and parameters resolving for dynamic streams --- .../manifest_declarative_source.py | 8 +++++--- .../parsers/manifest_component_transformer.py | 10 ++++++---- .../test_http_components_resolver.py | 20 +++++++++++++++---- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 23d41b174..b822d9faa 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -87,9 +87,9 @@ def __init__( self.components_module: ModuleType | None = get_registered_components_module(config=config) resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) - propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( - "", resolved_source_config, {} - ) + propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", + resolved_source_config, + {}) self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages @@ -387,6 +387,8 @@ def _dynamic_stream_configs( for dynamic_stream in components_resolver.resolve_components( stream_template_config=stream_template_config ): + dynamic_stream = {**ManifestComponentTransformer().propagate_types_and_parameters("", dynamic_stream, {}, use_parent_parameters=True)} + if "type" not in dynamic_stream: dynamic_stream["type"] = "DeclarativeStream" diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index f2719bb14..6ff92712d 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -4,7 +4,7 @@ import copy import typing -from typing import Any, Mapping +from typing import Any, Mapping, Optional PARAMETERS_STR = "$parameters" @@ -94,6 +94,7 @@ def propagate_types_and_parameters( parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], + use_parent_parameters: Optional[bool] = None, ) -> Mapping[str, Any]: """ Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the @@ -103,6 +104,7 @@ def propagate_types_and_parameters( :param declarative_component: The current component that is having type and parameters added :param parent_field_identifier: The name of the field of the current component coming from the parent component :param parent_parameters: The parameters set on parent components defined before the current component + :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same :return: A deep copy of the transformed component with types and parameters persisted to it """ propagated_component = dict(copy.deepcopy(declarative_component)) @@ -130,7 +132,7 @@ def propagate_types_and_parameters( # level take precedence current_parameters = dict(copy.deepcopy(parent_parameters)) component_parameters = propagated_component.pop(PARAMETERS_STR, {}) - current_parameters = {**current_parameters, **component_parameters} + current_parameters = {**component_parameters, **current_parameters} if use_parent_parameters else {**current_parameters, **component_parameters} # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if # both exist @@ -145,7 +147,7 @@ def propagate_types_and_parameters( excluded_parameter = current_parameters.pop(field_name, None) parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" propagated_component[field_name] = self.propagate_types_and_parameters( - parent_type_field_identifier, field_value, current_parameters + parent_type_field_identifier, field_value, current_parameters, use_parent_parameters=use_parent_parameters ) if excluded_parameter: current_parameters[field_name] = excluded_parameter @@ -158,7 +160,7 @@ def propagate_types_and_parameters( f"{propagated_component.get('type')}.{field_name}" ) field_value[i] = self.propagate_types_and_parameters( - parent_type_field_identifier, element, current_parameters + parent_type_field_identifier, element, current_parameters, use_parent_parameters=use_parent_parameters ) if excluded_parameter: current_parameters[field_name] = excluded_parameter diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 09d069bff..0e25d8a74 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -59,7 +59,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -145,7 +147,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -231,7 +235,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -331,7 +337,7 @@ def to_configured_catalog( "components_mapping": [ { "type": "ComponentMappingDefinition", - "field_path": ["name"], + "field_path": ["$parameters", "name"], "value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}", }, { @@ -563,6 +569,10 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str catalog=None, state=None, ) + dynamic_streams = source._dynamic_stream_configs(source.resolved_manifest, _CONFIG) + + assert len(dynamic_streams) == 4 + assert dynamic_streams[0]["retriever"]["name"] == "parent_1_item_1" actual_catalog = source.discover(logger=source.logger, config=_CONFIG) @@ -586,3 +596,5 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str actual_record_stream_names.sort() assert actual_record_stream_names == expected_stream_names + + From 70e66650226c1bd72cd20b61586b2a718df1f2a1 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 21 Mar 2025 21:13:10 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../declarative/manifest_declarative_source.py | 12 ++++++++---- .../parsers/manifest_component_transformer.py | 16 +++++++++++++--- .../resolvers/test_http_components_resolver.py | 2 -- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index b822d9faa..ae0fdcf6e 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -87,9 +87,9 @@ def __init__( self.components_module: ModuleType | None = get_registered_components_module(config=config) resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) - propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", - resolved_source_config, - {}) + propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( + "", resolved_source_config, {} + ) self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages @@ -387,7 +387,11 @@ def _dynamic_stream_configs( for dynamic_stream in components_resolver.resolve_components( stream_template_config=stream_template_config ): - dynamic_stream = {**ManifestComponentTransformer().propagate_types_and_parameters("", dynamic_stream, {}, use_parent_parameters=True)} + dynamic_stream = { + **ManifestComponentTransformer().propagate_types_and_parameters( + "", dynamic_stream, {}, use_parent_parameters=True + ) + } if "type" not in dynamic_stream: dynamic_stream["type"] = "DeclarativeStream" diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 6ff92712d..6779b54ab 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -132,7 +132,11 @@ def propagate_types_and_parameters( # level take precedence current_parameters = dict(copy.deepcopy(parent_parameters)) component_parameters = propagated_component.pop(PARAMETERS_STR, {}) - current_parameters = {**component_parameters, **current_parameters} if use_parent_parameters else {**current_parameters, **component_parameters} + current_parameters = ( + {**component_parameters, **current_parameters} + if use_parent_parameters + else {**current_parameters, **component_parameters} + ) # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if # both exist @@ -147,7 +151,10 @@ def propagate_types_and_parameters( excluded_parameter = current_parameters.pop(field_name, None) parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" propagated_component[field_name] = self.propagate_types_and_parameters( - parent_type_field_identifier, field_value, current_parameters, use_parent_parameters=use_parent_parameters + parent_type_field_identifier, + field_value, + current_parameters, + use_parent_parameters=use_parent_parameters, ) if excluded_parameter: current_parameters[field_name] = excluded_parameter @@ -160,7 +167,10 @@ def propagate_types_and_parameters( f"{propagated_component.get('type')}.{field_name}" ) field_value[i] = self.propagate_types_and_parameters( - parent_type_field_identifier, element, current_parameters, use_parent_parameters=use_parent_parameters + parent_type_field_identifier, + element, + current_parameters, + use_parent_parameters=use_parent_parameters, ) if excluded_parameter: current_parameters[field_name] = excluded_parameter diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 0e25d8a74..e94f007e2 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -596,5 +596,3 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str actual_record_stream_names.sort() assert actual_record_stream_names == expected_stream_names - -