diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index b47fc4cad..bc8698180 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -3,9 +3,10 @@ import logging import threading import uuid -from typing import Set +from typing import Any, Mapping, Set, Union from airbyte_cdk.logger import lazy_log +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString LOGGER = logging.getLogger("airbyte") @@ -15,7 +16,13 @@ class ConcurrentJobLimitReached(Exception): class JobTracker: - def __init__(self, limit: int): + def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}): + if isinstance(limit, str): + limit = int(InterpolatedString(limit, parameters={}).eval(config=config)) + + if limit < 1: + raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.") + self._jobs: Set[str] = set() self._limit = limit self._lock = threading.Lock() diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..e264b0483 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3376,6 +3376,16 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + max_concurrent_jobs: + title: Maximum Conccurent Job Count + description: Maximum number of concurrent jobs to run. + anyOf: + - type: integer + - type: string + default: 1 + examples: + - 2 + - "{{ config['max_concurrent_jobs'] }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..11114900a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2368,6 +2368,12 @@ class AsyncRetriever(BaseModel): description="Component decoding the download response so records can be extracted.", title="Download Decoder", ) + max_concurrent_jobs: Optional[Union[int, str]] = Field( + 1, + description="Maximum number of concurrent jobs to run.", + examples=[2, "{{ config['max_concurrent_jobs'] }}"], + title="Maximum Conccurent Job Count", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..51cafc8c0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2888,8 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( job_repository, stream_slices, - JobTracker(1), - # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1 + JobTracker(model.max_concurrent_jobs or 1, config), self._message_repository, has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk diff --git a/unit_tests/sources/declarative/async_job/test_integration.py b/unit_tests/sources/declarative/async_job/test_integration.py index a0b6195b7..a72f4d29e 100644 --- a/unit_tests/sources/declarative/async_job/test_integration.py +++ b/unit_tests/sources/declarative/async_job/test_integration.py @@ -87,7 +87,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), self._message_repository, ), config={}, diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dc81eacbc..263520dfe 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -137,7 +137,7 @@ def test_given_one_job_still_running_when_create_and_get_completed_partitions_th def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception( self, mock_sleep: MagicMock ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.return_value = self._job_for_a_slice self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( {self._job_for_a_slice: [AsyncJobStatus.TIMED_OUT]} @@ -184,7 +184,7 @@ def test_when_fetch_records_then_yield_records_from_each_job(self) -> None: def _orchestrator( self, slices: List[StreamSlice], job_tracker: Optional[JobTracker] = None ) -> AsyncJobOrchestrator: - job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT) + job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT, config={}) return AsyncJobOrchestrator( self._job_repository, slices, job_tracker, self._message_repository ) @@ -192,7 +192,7 @@ def _orchestrator( def test_given_more_jobs_than_limit_when_create_and_get_completed_partitions_then_still_return_all_slices_and_free_job_budget( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = [ self._job_for_a_slice, self._job_for_another_slice, @@ -220,7 +220,7 @@ def test_given_exception_to_break_when_start_job_and_raise_this_exception_and_ab orchestrator = AsyncJobOrchestrator( self._job_repository, [_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE], - JobTracker(_NO_JOB_LIMIT), + JobTracker(_NO_JOB_LIMIT, config={}), self._message_repository, exceptions_to_break_on=[ValueError], ) @@ -241,7 +241,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a """ Since this is a config error, we assume the other jobs will fail for the same reasons. """ - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors( "Can't create job", failure_type=FailureType.config_error ) @@ -301,7 +301,7 @@ def test_given_exception_when_start_job_and_skip_this_exception( def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)] self._job_repository.start.side_effect = jobs self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( @@ -318,7 +318,7 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_ def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) intent_to_free = job_tracker.try_to_get_intent() def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None: @@ -341,7 +341,7 @@ def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None def test_given_start_job_raise_when_create_and_get_completed_partitions_then_free_budget( self, ) -> None: - job_tracker = JobTracker(1) + job_tracker = JobTracker(1, config={}) self._job_repository.start.side_effect = ValueError("Can't create job") orchestrator = AsyncJobOrchestrator( diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index 1202c663d..25614a467 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -15,7 +15,9 @@ class JobTrackerTest(TestCase): def setUp(self) -> None: - self._tracker = JobTracker(_LIMIT) + self._tracker = JobTracker( + limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT} + ) def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None: intents = self._reach_limit() @@ -39,3 +41,15 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N def _reach_limit(self) -> List[str]: return [self._tracker.try_to_get_intent() for i in range(_LIMIT)] + + +def test_given_limit_is_interpolated_string_when_init_then_limit_is_int(): + tracker = JobTracker( + limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT} + ) + assert tracker._limit == _LIMIT + + +def test_given_limit_is_less_than_1_when_init_then_raise_value_error(): + with pytest.raises(ValueError): + JobTracker(limit="-1", config={}) diff --git a/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py index 2a5ac3277..f3af80d28 100644 --- a/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py @@ -26,7 +26,7 @@ def test_stream_slices_with_single_partition_router(): job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), NoopMessageRepository(), ), config={}, @@ -58,7 +58,7 @@ def test_stream_slices_with_parent_slicer(): job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( MockAsyncJobRepository(), stream_slices, - JobTracker(_NO_LIMIT), + JobTracker(_NO_LIMIT, config={}), NoopMessageRepository(), ), config={}, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4a043ac82..045158e42 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -349,6 +349,7 @@ "path": "{{stream_slice['url']}}", "http_method": "GET", }, + "max_concurrent_jobs": 1, }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, "schema_loader": {