From 79631444439258208bf486c99d68cf6e72758f34 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 1 Apr 2025 14:19:27 +0200 Subject: [PATCH 1/3] Add strems limit to full resolve namifest command --- .../connector_builder_handler.py | 31 +++++-- airbyte_cdk/connector_builder/main.py | 6 +- .../test_connector_builder_handler.py | 93 +++---------------- 3 files changed, 40 insertions(+), 90 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index c80884d09..d29db762a 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -4,7 +4,7 @@ from dataclasses import asdict, dataclass, field -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Dict from airbyte_cdk.connector_builder.test_reader import TestReader from airbyte_cdk.models import ( @@ -27,30 +27,34 @@ DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5 DEFAULT_MAXIMUM_RECORDS = 100 +DEFAULT_MAXIMUM_STREAMS = 100 MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice" MAX_SLICES_KEY = "max_slices" MAX_RECORDS_KEY = "max_records" +MAX_STREAMS_KEY = "max_streams" @dataclass -class TestReadLimits: +class TestLimits: max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS) max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES) + max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS) -def get_limits(config: Mapping[str, Any]) -> TestReadLimits: +def get_limits(config: Mapping[str, Any]) -> TestLimits: command_config = config.get("__test_read_config", {}) max_pages_per_slice = ( command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE ) max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS - return TestReadLimits(max_records, max_pages_per_slice, max_slices) + max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_RECORDS + return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) -def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource: +def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( config=config, @@ -71,7 +75,7 @@ def read_stream( config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], - limits: TestReadLimits, + limits: TestLimits, ) -> AirbyteMessage: try: test_read_handler = TestReader( @@ -117,13 +121,24 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: return error.as_airbyte_message() -def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: +def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: try: + manifest = {**source.resolved_manifest} streams = manifest.get("streams", []) for stream in streams: stream["dynamic_stream_name"] = None - streams.extend(source.dynamic_streams) + + mapped_streams: Dict[str, List[Dict[str, Any]]] = {} + for stream in source.dynamic_streams: + generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) + + if len(generated_streams) < limits.max_streams: + generated_streams += [stream] + + for generated_streams_list in mapped_streams.values(): + streams.extend(generated_streams_list) + manifest["streams"] = streams return AirbyteMessage( type=Type.RECORD, diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/connector_builder/main.py index 525df8752..ad2d6650f 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/connector_builder/main.py @@ -10,7 +10,7 @@ from airbyte_cdk.connector import BaseConnector from airbyte_cdk.connector_builder.connector_builder_handler import ( - TestReadLimits, + TestLimits, create_source, full_resolve_manifest, get_limits, @@ -73,7 +73,7 @@ def handle_connector_builder_request( config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog], state: List[AirbyteStateMessage], - limits: TestReadLimits, + limits: TestLimits, ) -> AirbyteMessage: if command == "resolve_manifest": return resolve_manifest(source) @@ -83,7 +83,7 @@ def handle_connector_builder_request( ), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." return read_stream(source, config, catalog, state, limits) elif command == "full_resolve_manifest": - return full_resolve_manifest(source) + return full_resolve_manifest(source, limits) else: raise ValueError(f"Unrecognized command {command}.") diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 4b3d80237..5c537811b 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -20,7 +20,7 @@ DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, DEFAULT_MAXIMUM_NUMBER_OF_SLICES, DEFAULT_MAXIMUM_RECORDS, - TestReadLimits, + TestLimits, create_source, get_limits, resolve_manifest, @@ -384,6 +384,7 @@ RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG = { "__injected_declarative_manifest": DYNAMIC_STREAM_MANIFEST, "__command": "full_resolve_manifest", + "__test_read_config": {"max_streams": 2}, } TEST_READ_CONFIG = { @@ -524,7 +525,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): command = "resolve_manifest" config["__command"] = command source = ManifestDeclarativeSource(source_config=MANIFEST) - limits = TestReadLimits() + limits = TestLimits() resolved_manifest = handle_connector_builder_request( source, command, config, create_configured_catalog("dummy_stream"), _A_STATE, limits ) @@ -728,7 +729,7 @@ def test_read(): emitted_at=1, ), ) - limits = TestReadLimits() + limits = TestLimits() with patch( "airbyte_cdk.connector_builder.test_reader.TestReader.run_test_read", return_value=stream_read, @@ -789,7 +790,7 @@ def test_config_update() -> None: config, ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG), _A_PER_PARTITION_STATE, - TestReadLimits(), + TestLimits(), ) assert output.record.data["latest_config_update"] @@ -825,7 +826,7 @@ def check_config_against_spec(self) -> Literal[False]: mock_from_exception.return_value = stack_trace source = MockManifestDeclarativeSource() - limits = TestReadLimits() + limits = TestLimits() response = read_stream( source, TEST_READ_CONFIG, @@ -865,7 +866,7 @@ def test_handle_429_response(): ] = {"backoff_strategies": [{"type": "ConstantBackoffStrategy", "backoff_time_in_seconds": 5}]} config = TEST_READ_CONFIG - limits = TestReadLimits() + limits = TestLimits() source = create_source(config, limits) with patch("requests.Session.send", return_value=response) as mock_send: @@ -982,7 +983,7 @@ def test_create_source(): max_records = 3 max_pages_per_slice = 2 max_slices = 1 - limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + limits = TestLimits(max_records, max_pages_per_slice, max_slices) config = {"__injected_declarative_manifest": MANIFEST} @@ -1064,7 +1065,7 @@ def test_read_source(mock_http_stream): max_records = 100 max_pages_per_slice = 2 max_slices = 3 - limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + limits = TestLimits(max_records, max_pages_per_slice, max_slices) catalog = ConfiguredAirbyteCatalog( streams=[ @@ -1111,7 +1112,7 @@ def test_read_source_single_page_single_slice(mock_http_stream): max_records = 100 max_pages_per_slice = 1 max_slices = 1 - limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + limits = TestLimits(max_records, max_pages_per_slice, max_slices) catalog = ConfiguredAirbyteCatalog( streams=[ @@ -1195,7 +1196,7 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error endpoints when running on Cloud or OSS deployments """ - limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1) + limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1) catalog = ConfiguredAirbyteCatalog( streams=[ @@ -1281,7 +1282,7 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected endpoints when running on Cloud or OSS deployments """ - limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1) + limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1) catalog = ConfiguredAirbyteCatalog( streams=[ @@ -1339,7 +1340,7 @@ def test_read_stream_exception_with_secrets(): ] ) state = [] - limits = TestReadLimits() + limits = TestLimits() # Add the secret to be filtered update_secrets([config["api_key"]]) @@ -1367,7 +1368,7 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file): config = copy.deepcopy(RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG) command = config["__command"] source = ManifestDeclarativeSource(source_config=DYNAMIC_STREAM_MANIFEST) - limits = TestReadLimits() + limits = TestLimits(max_streams=2) with HttpMocker() as http_mocker: http_mocker.get( HttpRequest(url="https://api.test.com/parents"), @@ -1625,72 +1626,6 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file): }, "dynamic_stream_name": "TestDynamicStream", }, - { - "type": "DeclarativeStream", - "name": "parent_2_item_1", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}}, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.test.com", - "path": "2/1", - "http_method": "GET", - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - "paginator": {"type": "NoPagination"}, - }, - "dynamic_stream_name": "TestDynamicStream", - }, - { - "type": "DeclarativeStream", - "name": "parent_2_item_2", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}}, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.test.com", - "path": "2/2", - "http_method": "GET", - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - "paginator": {"type": "NoPagination"}, - }, - "dynamic_stream_name": "TestDynamicStream", - }, ], "check": {"type": "CheckStream", "stream_names": ["lists"]}, "spec": { From 90ca21f326dd40e7e78bba3a5e74213122132b16 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 1 Apr 2025 12:26:57 +0000 Subject: [PATCH 2/3] Auto-fix lint and format issues --- airbyte_cdk/connector_builder/connector_builder_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index d29db762a..0f0c71a2b 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -4,7 +4,7 @@ from dataclasses import asdict, dataclass, field -from typing import Any, List, Mapping, Dict +from typing import Any, Dict, List, Mapping from airbyte_cdk.connector_builder.test_reader import TestReader from airbyte_cdk.models import ( @@ -123,7 +123,6 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: try: - manifest = {**source.resolved_manifest} streams = manifest.get("streams", []) for stream in streams: From 6a96feb47923d0c50c99d523448e03713c9136d3 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 1 Apr 2025 14:29:54 +0200 Subject: [PATCH 3/3] Fix typo --- airbyte_cdk/connector_builder/connector_builder_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 0f0c71a2b..27929dfa2 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -50,7 +50,7 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: ) max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS - max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_RECORDS + max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)