Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3797,7 +3797,6 @@ definitions:
- polling_requester
- download_requester
- status_extractor
- download_target_extractor
properties:
type:
type: string
Expand All @@ -3815,7 +3814,7 @@ definitions:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
download_target_extractor:
description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.
description: Responsible for fetching the information needed to download the completed job from the polling HTTP response.
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2852,9 +2850,9 @@ class AsyncRetriever(BaseModel):
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="Responsible for fetching the actual status of the async job."
)
download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
...,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
None,
description="Responsible for fetching the information needed to download the completed job from the polling HTTP response.",
)
download_extractor: Optional[
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3603,11 +3603,15 @@ def _get_job_timeout() -> datetime.timedelta:
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
download_target_extractor = (
self._create_component_from_model(
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
)
if model.download_target_extractor
else None
)

job_repository: AsyncJobRepository = AsyncHttpJobRepository(
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/requesters/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# AsyncHttpJobRepository sequence diagram

- Components marked as optional are not required and can be ignored.
- # TODO update
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice

Expand Down Expand Up @@ -37,7 +38,7 @@ sequenceDiagram
UrlRequester -->> AsyncHttpJobRepository: Download URLs

AsyncHttpJobRepository ->> DownloadRetriever: Download reports
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`, `creation_response`, `polling_response`)
Reporting Server -->> DownloadRetriever: Report data
DownloadRetriever -->> AsyncHttpJobRepository: Report data
else Status: Failed
Expand Down
28 changes: 21 additions & 7 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
delete_requester: Optional[Requester]
status_extractor: DpathExtractor
status_mapping: Mapping[str, AsyncJobStatus]
download_target_extractor: DpathExtractor
download_target_extractor: Optional[DpathExtractor]

# timeout for the job to be completed, passed from `polling_job_timeout`
job_timeout: Optional[timedelta] = None
Expand Down Expand Up @@ -213,14 +213,16 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:

"""

for target_url in self._get_download_targets(job):
for download_target in self._get_download_targets(job):
job_slice = job.job_parameters()
stream_slice = StreamSlice(
partition=job_slice.partition,
cursor_slice=job_slice.cursor_slice,
extra_fields={
**job_slice.extra_fields,
"download_target": target_url,
"download_target": download_target,
"creation_response": self._get_creation_response_interpolation_context(job),
"polling_response": self._get_polling_response_interpolation_context(job),
},
)
for message in self.download_retriever.read_records({}, stream_slice):
Expand Down Expand Up @@ -330,9 +332,18 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
)

def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
if not self.download_target_requester:
url_response = self._polling_job_response_by_id[job.api_job_id()]
else:
"""Returns an iterable of strings to help target requests for downloading async jobs."""
# If neither download_target_extractor nor download_target_requester are provided, return a single empty string
# to express the need to make a single download request without any download_target value
if not self.download_target_extractor and not self.download_target_requester:
lazy_log(
LOGGER,
logging.DEBUG,
lambda: "No download_target_extractor or download_target_requester provided. Using fallback behavior for single download request without download_target.",
)
return [""]

if self.download_target_requester:
stream_slice: StreamSlice = StreamSlice(
partition={},
cursor_slice={},
Expand All @@ -346,5 +357,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
internal_message="Always expect a response or an exception from download_target_requester",
failure_type=FailureType.system_error,
)

else:
# if no download_target_requester is provided, we extract directly from the polling response
url_response = self._polling_job_response_by_id[job.api_job_id()]

yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings
1 change: 0 additions & 1 deletion bin/generate-component-manifest-dagger.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@

set -e

pip install dagger-io==0.13.3
python bin/generate_component_manifest_files.py
6 changes: 5 additions & 1 deletion debug_manifest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ To configure the debugger in VSCode to run the `debug_manifest`, follow these st
"request": "launch",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/debug_manifest",
"python": "<PATH_TO_CDK_ENV>/bin/python",
"python": "<PATH_TO_CDK_ENV>/bin/python", // REPLACE ME
"module": "debug_manifest",
"args": [
// SPECIFY THE COMMAND: [spec, check, discover, read]
"read",
// SPECIFY THE MANIFEST FILE
"--manifest-path",
// PATH TO THE MANIFEST FILE
"resources/manifest.yaml",
// SPECIFY THE CONFIG
"--config",
// PATH TO THE CONFIG FILE
Expand Down
9 changes: 3 additions & 6 deletions debug_manifest/debug_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@
#

import sys
from typing import Any, Mapping

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.sources.declarative.yaml_declarative_source import (
YamlDeclarativeSource,
)

configuration: Mapping[str, Any] = {
"path_to_yaml": "resources/manifest.yaml",
}


def debug_manifest(source: YamlDeclarativeSource, args: list[str]) -> None:
"""
Expand All @@ -24,13 +19,15 @@ def debug_manifest(source: YamlDeclarativeSource, args: list[str]) -> None:

if __name__ == "__main__":
args = sys.argv[1:]
parsed_args = AirbyteEntrypoint.parse_args(args)
manifest_path = getattr(parsed_args, "manifest_path", None) or "resources/manifest.yaml"
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)

debug_manifest(
YamlDeclarativeSource(
path_to_yaml="resources/manifest.yaml",
path_to_yaml=manifest_path,
catalog=YamlDeclarativeSource.read_catalog(catalog_path) if catalog_path else None,
config=YamlDeclarativeSource.read_config(config_path) if config_path else None,
state=YamlDeclarativeSource.read_state(state_path) if state_path else None,
Expand Down
Loading
Loading