Skip to content

feat: AsyncRetriever: Enable configurability of max concurrent job count #391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import logging
import threading
import uuid
from typing import Set
from typing import Any, Mapping, Set, Union

from airbyte_cdk.logger import lazy_log
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString

LOGGER = logging.getLogger("airbyte")

Expand All @@ -15,7 +16,13 @@ class ConcurrentJobLimitReached(Exception):


class JobTracker:
def __init__(self, limit: int):
def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the assignment of config: Mapping[str, Any] = {} is not necessary in this context. Instead, I suggest we instantiate the config object within the __post_init__() method and make this field optional, setting its default value to None. Please share your thoughts on this suggestion.

if isinstance(limit, str):
limit = int(InterpolatedString(limit, parameters={}).eval(config=config))

if limit < 1:
raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.")

self._jobs: Set[str] = set()
self._limit = limit
self._lock = threading.Lock()
Expand Down
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3376,6 +3376,16 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
max_concurrent_jobs:
title: Maximum Conccurent Job Count
description: Maximum number of concurrent jobs to run.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it'd be useful to explain what those jobs do — are those parsing already downloaded archives, or are those threads that request multiple reports in parallel? etc

anyOf:
- type: integer
- type: string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced you really need this in config, and therefore you need strings here. But, up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. Looks like Salesforce defines a static max across all accounts.

default: 1
examples:
- 2
- "{{ config['max_concurrent_jobs'] }}"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,12 @@ class AsyncRetriever(BaseModel):
description="Component decoding the download response so records can be extracted.",
title="Download Decoder",
)
max_concurrent_jobs: Optional[Union[int, str]] = Field(
1,
description="Maximum number of concurrent jobs to run.",
examples=[2, "{{ config['max_concurrent_jobs'] }}"],
title="Maximum Conccurent Job Count",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2888,8 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
JobTracker(model.max_concurrent_jobs or 1, config),
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
MockAsyncJobRepository(),
stream_slices,
JobTracker(_NO_LIMIT),
JobTracker(_NO_LIMIT, config={}),
self._message_repository,
),
config={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_given_one_job_still_running_when_create_and_get_completed_partitions_th
def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception(
self, mock_sleep: MagicMock
) -> None:
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
self._job_repository.start.return_value = self._job_for_a_slice
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
{self._job_for_a_slice: [AsyncJobStatus.TIMED_OUT]}
Expand Down Expand Up @@ -184,15 +184,15 @@ def test_when_fetch_records_then_yield_records_from_each_job(self) -> None:
def _orchestrator(
self, slices: List[StreamSlice], job_tracker: Optional[JobTracker] = None
) -> AsyncJobOrchestrator:
job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT)
job_tracker = job_tracker if job_tracker else JobTracker(_NO_JOB_LIMIT, config={})
return AsyncJobOrchestrator(
self._job_repository, slices, job_tracker, self._message_repository
)

def test_given_more_jobs_than_limit_when_create_and_get_completed_partitions_then_still_return_all_slices_and_free_job_budget(
self,
) -> None:
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
self._job_repository.start.side_effect = [
self._job_for_a_slice,
self._job_for_another_slice,
Expand Down Expand Up @@ -220,7 +220,7 @@ def test_given_exception_to_break_when_start_job_and_raise_this_exception_and_ab
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE],
JobTracker(_NO_JOB_LIMIT),
JobTracker(_NO_JOB_LIMIT, config={}),
self._message_repository,
exceptions_to_break_on=[ValueError],
)
Expand All @@ -241,7 +241,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a
"""
Since this is a config error, we assume the other jobs will fail for the same reasons.
"""
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors(
"Can't create job", failure_type=FailureType.config_error
)
Expand Down Expand Up @@ -301,7 +301,7 @@ def test_given_exception_when_start_job_and_skip_this_exception(
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(
self, mock_sleep: MagicMock
) -> None:
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)]
self._job_repository.start.side_effect = jobs
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
Expand All @@ -318,7 +318,7 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_
def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed(
self,
) -> None:
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
intent_to_free = job_tracker.try_to_get_intent()

def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None:
Expand All @@ -341,7 +341,7 @@ def wait_and_free_intent(_job_tracker: JobTracker, _intent_to_free: str) -> None
def test_given_start_job_raise_when_create_and_get_completed_partitions_then_free_budget(
self,
) -> None:
job_tracker = JobTracker(1)
job_tracker = JobTracker(1, config={})
self._job_repository.start.side_effect = ValueError("Can't create job")

orchestrator = AsyncJobOrchestrator(
Expand Down
16 changes: 15 additions & 1 deletion unit_tests/sources/declarative/async_job/test_job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

class JobTrackerTest(TestCase):
def setUp(self) -> None:
self._tracker = JobTracker(_LIMIT)
self._tracker = JobTracker(
limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT}
)

def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None:
intents = self._reach_limit()
Expand All @@ -39,3 +41,15 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N

def _reach_limit(self) -> List[str]:
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]


def test_given_limit_is_interpolated_string_when_init_then_limit_is_int():
tracker = JobTracker(
limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT}
)
assert tracker._limit == _LIMIT


def test_given_limit_is_less_than_1_when_init_then_raise_value_error():
with pytest.raises(ValueError):
JobTracker(limit="-1", config={})
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_stream_slices_with_single_partition_router():
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
MockAsyncJobRepository(),
stream_slices,
JobTracker(_NO_LIMIT),
JobTracker(_NO_LIMIT, config={}),
NoopMessageRepository(),
),
config={},
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_stream_slices_with_parent_slicer():
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
MockAsyncJobRepository(),
stream_slices,
JobTracker(_NO_LIMIT),
JobTracker(_NO_LIMIT, config={}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like JobTracker already has config={} default in the initializer? Could avoid changes here I think?

NoopMessageRepository(),
),
config={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@
"path": "{{stream_slice['url']}}",
"http_method": "GET",
},
"max_concurrent_jobs": 1,
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
"schema_loader": {
Expand Down
Loading