From f99970bb82477a0bedf74f4c44ed5d621c44b4e8 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 09:27:09 -0800 Subject: [PATCH 1/8] add job tracker field --- .../sources/declarative/declarative_component_schema.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..6c25a2db5 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3376,6 +3376,12 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + maximum_job_count: + title: Maximum Job Count + description: Maximum number of asynchronous jobs to run concurrently. + anyOf: + - type: number + default: 1 $parameters: type: object additionalProperties: true From 463ad81a2b11ee2def9832bfbe36e665eb02a828 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 11:01:16 -0800 Subject: [PATCH 2/8] adds max job count for async retriever --- .../declarative/models/declarative_component_schema.py | 5 +++++ .../declarative/parsers/model_to_component_factory.py | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..20e055c1b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2368,6 +2368,11 @@ class AsyncRetriever(BaseModel): description="Component decoding the download response so records can be extracted.", title="Download Decoder", ) + maximum_job_count: Optional[float] = Field( + 1, + description="Maximum number of asynchronous jobs to run concurrently.", + title="Maximum Job Count", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..703acccba 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2888,8 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( job_repository, stream_slices, - JobTracker(1), - # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1 + JobTracker(model.maximum_job_count), self._message_repository, has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk From ed6bac51e20336584a9e138019d687a5cd4dd843 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 12:30:46 -0800 Subject: [PATCH 3/8] add max_concurrent_job to async retriever --- .../sources/declarative/async_job/job_tracker.py | 12 ++++++++++-- .../declarative/declarative_component_schema.yaml | 12 ++++++++---- .../models/declarative_component_schema.py | 7 ++++--- .../parsers/model_to_component_factory.py | 2 +- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index b47fc4cad..d9d587562 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -3,9 +3,11 @@ import logging import threading import uuid -from typing import Set +from typing import Any, Mapping, Set, Union +import json from airbyte_cdk.logger import lazy_log +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString LOGGER = logging.getLogger("airbyte") @@ -15,7 +17,13 @@ class ConcurrentJobLimitReached(Exception): class JobTracker: - def __init__(self, limit: int): + def __init__(self, limit: Union[int, InterpolatedString], config: Mapping[str, Any]): + if isinstance(limit, InterpolatedString): + limit = int(limit.eval(config=config, json_loads=json.loads)) + + if limit < 1: + raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.") + self._jobs: Set[str] = set() self._limit = limit self._lock = threading.Lock() diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6c25a2db5..e264b0483 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3376,12 +3376,16 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" - maximum_job_count: - title: Maximum Job Count - description: Maximum number of asynchronous jobs to run concurrently. + max_concurrent_jobs: + title: Maximum Conccurent Job Count + description: Maximum number of concurrent jobs to run. anyOf: - - type: number + - type: integer + - type: string default: 1 + examples: + - 2 + - "{{ config['max_concurrent_jobs'] }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 20e055c1b..11114900a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2368,10 +2368,11 @@ class AsyncRetriever(BaseModel): description="Component decoding the download response so records can be extracted.", title="Download Decoder", ) - maximum_job_count: Optional[float] = Field( + max_concurrent_jobs: Optional[Union[int, str]] = Field( 1, - description="Maximum number of asynchronous jobs to run concurrently.", - title="Maximum Job Count", + description="Maximum number of concurrent jobs to run.", + examples=[2, "{{ config['max_concurrent_jobs'] }}"], + title="Maximum Conccurent Job Count", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 703acccba..7d19ac6e1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2888,7 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( job_repository, stream_slices, - JobTracker(model.maximum_job_count), + JobTracker(model.max_concurrent_jobs, config), self._message_repository, has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk From 0a533919ede12f491da84142b71a220d2cea0f6e Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 13:52:00 -0800 Subject: [PATCH 4/8] update unit tests for additional job tracker argument --- .../declarative/async_job/test_integration.py | 2 +- .../declarative/async_job/test_job_orchestrator.py | 14 +++++++------- .../declarative/async_job/test_job_tracker.py | 2 +- .../declarative/interpolation/test_macros.py | 4 ++-- .../test_async_job_partition_router.py | 4 ++-- .../test_concurrent_declarative_source.py | 1 + 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/unit_tests/sources/declarative/async_job/test_integration.py b/unit_tests/sources/declarative/async_job/test_integration.py index a0b6195b7..a72f4d29e 100644 --- a/unit_tests/sources/declarative/async_job/test_integration.py +++ b/unit_tests/sources/declarative/async_job/test_integration.py @@ -87,7 +87,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), self._message_repository, ), config={}, diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dc81eacbc..63faff4c8 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -137,7 +137,7 @@ def test_given_one_job_still_running_when_create_and_get_completed_partitions_th def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception( self, mock_sleep: MagicMock ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.return_value = self._job_for_a_slice self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( {self._job_for_a_slice: [AsyncJobStatus.TIMED_OUT]} @@ -184,7 +184,7 @@ def test_when_fetch_records_then_yield_records_from_each_job(self) -> None: def _orchestrator( self, slices: List[StreamSlice], job_tracker: Optional[JobTracker] = None ) -> AsyncJobOrchestrator: - job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT) + job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT, config={}) return AsyncJobOrchestrator( self._job_repository, slices, job_tracker, self._message_repository ) @@ -192,7 +192,7 @@ def _orchestrator( def test_given_more_jobs_than_limit_when_create_and_get_completed_partitions_then_still_return_all_slices_and_free_job_budget( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = [ self._job_for_a_slice, self._job_for_another_slice, @@ -220,7 +220,7 @@ def test_given_exception_to_break_when_start_job_and_raise_this_exception_and_ab orchestrator = AsyncJobOrchestrator( self._job_repository, [_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE], - JobTracker(_NO_JOB_LIMIT), + JobTracker(_NO_JOB_LIMIT, config={}), self._message_repository, exceptions_to_break_on=[ValueError], ) @@ -241,7 +241,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a """ Since this is a config error, we assume the other jobs will fail for the same reasons. """ - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors( "Can't create job", failure_type=FailureType.config_error ) @@ -318,7 +318,7 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_ def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) intent_to_free = job_tracker.try_to_get_intent() def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None: @@ -341,7 +341,7 @@ def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None def test_given_start_job_raise_when_create_and_get_completed_partitions_then_free_budget( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = ValueError("Can't create job") orchestrator = AsyncJobOrchestrator( diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index 1202c663d..c7a91483b 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -15,7 +15,7 @@ class JobTrackerTest(TestCase): def setUp(self) -> None: - self._tracker = JobTracker(_LIMIT) + self._tracker = JobTracker(_LIMIT, config={}) def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None: intents = self._reach_limit() diff --git a/unit_tests/sources/declarative/interpolation/test_macros.py b/unit_tests/sources/declarative/interpolation/test_macros.py index 8543cd507..258fe888f 100644 --- a/unit_tests/sources/declarative/interpolation/test_macros.py +++ b/unit_tests/sources/declarative/interpolation/test_macros.py @@ -75,13 +75,13 @@ def test_macros_export(test_name, fn_name, found_in_macros): "2022-01-01T01:01:01Z", "%s", "%Y-%m-%dT%H:%M:%SZ", - "1640998861", + "1641027661", ), ( "2022-01-01T01:01:01Z", "%ms", "%Y-%m-%dT%H:%M:%SZ", - "1640998861000000", + "1641027661000000", ), ], ids=[ diff --git a/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py index 2a5ac3277..f3af80d28 100644 --- a/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py @@ -26,7 +26,7 @@ def test_stream_slices_with_single_partition_router(): job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), NoopMessageRepository(), ), config={}, @@ -58,7 +58,7 @@ def test_stream_slices_with_parent_slicer(): job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), NoopMessageRepository(), ), config={}, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4a043ac82..045158e42 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -349,6 +349,7 @@ "path": "{{stream_slice['url']}}", "http_method": "GET", }, + "max_concurrent_jobs": 1, }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, "schema_loader": { From a73ea65f791657ce49a9ad806cf84a8b664567cf Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 14:11:28 -0800 Subject: [PATCH 5/8] add additional tests for interpolated string and positive int validation --- .../sources/declarative/async_job/job_tracker.py | 6 +++--- .../async_job/test_job_orchestrator.py | 2 +- .../declarative/async_job/test_job_tracker.py | 16 +++++++++++++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index d9d587562..d322e9811 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -17,9 +17,9 @@ class ConcurrentJobLimitReached(Exception): class JobTracker: - def __init__(self, limit: Union[int, InterpolatedString], config: Mapping[str, Any]): - if isinstance(limit, InterpolatedString): - limit = int(limit.eval(config=config, json_loads=json.loads)) + def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}): + if isinstance(limit, str): + limit = int(InterpolatedString(limit, parameters={}).eval(config=config)) if limit < 1: raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.") diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 63faff4c8..263520dfe 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -301,7 +301,7 @@ def test_given_exception_when_start_job_and_skip_this_exception( def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)] self._job_repository.start.side_effect = jobs self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index c7a91483b..25614a467 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -15,7 +15,9 @@ class JobTrackerTest(TestCase): def setUp(self) -> None: - self._tracker = JobTracker(_LIMIT, config={}) + self._tracker = JobTracker( + limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT} + ) def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None: intents = self._reach_limit() @@ -39,3 +41,15 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N def _reach_limit(self) -> List[str]: return [self._tracker.try_to_get_intent() for i in range(_LIMIT)] + + +def test_given_limit_is_interpolated_string_when_init_then_limit_is_int(): + tracker = JobTracker( + limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT} + ) + assert tracker._limit == _LIMIT + + +def test_given_limit_is_less_than_1_when_init_then_raise_value_error(): + with pytest.raises(ValueError): + JobTracker(limit="-1", config={}) From f42fdab5909c896a54c794ed0610e31455caacbe Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 14:18:59 -0800 Subject: [PATCH 6/8] chore: mypy --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7d19ac6e1..51cafc8c0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2888,7 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( job_repository, stream_slices, - JobTracker(model.max_concurrent_jobs, config), + JobTracker(model.max_concurrent_jobs or 1, config), self._message_repository, has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk From ab227b28f831a8cccf4a03a285c27d4a65bc9506 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 14:20:08 -0800 Subject: [PATCH 7/8] chore: format --- airbyte_cdk/sources/declarative/async_job/job_tracker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index d322e9811..bc8698180 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -5,7 +5,6 @@ import uuid from typing import Any, Mapping, Set, Union -import json from airbyte_cdk.logger import lazy_log from airbyte_cdk.sources.declarative.interpolation import InterpolatedString From 5215f3c57f18922a6a3ac75abd9b80fe25fd3114 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Mar 2025 15:17:48 -0800 Subject: [PATCH 8/8] update timestamp --- unit_tests/sources/declarative/interpolation/test_macros.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/interpolation/test_macros.py b/unit_tests/sources/declarative/interpolation/test_macros.py index 258fe888f..8543cd507 100644 --- a/unit_tests/sources/declarative/interpolation/test_macros.py +++ b/unit_tests/sources/declarative/interpolation/test_macros.py @@ -75,13 +75,13 @@ def test_macros_export(test_name, fn_name, found_in_macros): "2022-01-01T01:01:01Z", "%s", "%Y-%m-%dT%H:%M:%SZ", - "1641027661", + "1640998861", ), ( "2022-01-01T01:01:01Z", "%ms", "%Y-%m-%dT%H:%M:%SZ", - "1641027661000000", + "1640998861000000", ), ], ids=[