Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3797,7 +3797,6 @@ definitions:
- polling_requester
- download_requester
- status_extractor
- download_target_extractor
properties:
type:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2852,8 +2852,8 @@ class AsyncRetriever(BaseModel):
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="Responsible for fetching the actual status of the async job."
)
download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
...,
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
None,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
download_extractor: Optional[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,11 @@ def create_async_retriever(
transformations: List[RecordTransformation],
**kwargs: Any,
) -> AsyncRetriever:
if model.download_target_requester and not model.download_target_extractor:
raise ValueError(
f"`download_target_extractor` required if using a `download_target_requester`"
)

def _get_download_retriever(
requester: Requester, extractor: RecordExtractor, _decoder: Decoder
) -> SimpleRetriever:
Expand Down Expand Up @@ -3603,11 +3608,15 @@ def _get_job_timeout() -> datetime.timedelta:
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
download_target_extractor = (
self._create_component_from_model(
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
)
if model.download_target_extractor
else None
)

job_repository: AsyncJobRepository = AsyncHttpJobRepository(
Expand Down
17 changes: 14 additions & 3 deletions airbyte_cdk/sources/declarative/requesters/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
# Download Target and Download Requester

- The `creation_response` and `polling_response` interpolation contexts are always available during the job download step of the process.

- The`download_target` interpolation context is generated by the `download_target_extractor` and made available to the job download step as well.
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
- if `download_target_requester` is provided, an additional request will be made to fetch job download targets and `download_target_extractor` will operate on that response

## Some important considerations

- **Note:** If the `download_target_extractor` and `download_target_requester` are not defined, a single job download request will be made without the `download_target` context.
- **Note:** The `download_target_extractor` is required (not optional) if using a `download_target_requester`

# AsyncHttpJobRepository sequence diagram

- Components marked as optional are not required and can be ignored.
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice

```mermaid
---
Expand Down Expand Up @@ -37,7 +48,7 @@ sequenceDiagram
UrlRequester -->> AsyncHttpJobRepository: Download URLs

AsyncHttpJobRepository ->> DownloadRetriever: Download reports
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `download_target`, `creation_response`, `polling_response`)
Reporting Server -->> DownloadRetriever: Report data
DownloadRetriever -->> AsyncHttpJobRepository: Report data
else Status: Failed
Expand Down
35 changes: 29 additions & 6 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
delete_requester: Optional[Requester]
status_extractor: DpathExtractor
status_mapping: Mapping[str, AsyncJobStatus]
download_target_extractor: DpathExtractor
download_target_extractor: Optional[DpathExtractor]

# timeout for the job to be completed, passed from `polling_job_timeout`
job_timeout: Optional[timedelta] = None
Expand Down Expand Up @@ -213,14 +213,16 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:

"""

for target_url in self._get_download_targets(job):
for download_target in self._get_download_targets(job):
job_slice = job.job_parameters()
stream_slice = StreamSlice(
partition=job_slice.partition,
cursor_slice=job_slice.cursor_slice,
extra_fields={
**job_slice.extra_fields,
"download_target": target_url,
"download_target": download_target,
"creation_response": self._get_creation_response_interpolation_context(job),
"polling_response": self._get_polling_response_interpolation_context(job),
},
)
for message in self.download_retriever.read_records({}, stream_slice):
Expand Down Expand Up @@ -330,9 +332,27 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
)

def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
if not self.download_target_requester:
url_response = self._polling_job_response_by_id[job.api_job_id()]
else:
"""Returns an iterable of strings to help target requests for downloading async jobs."""
# If neither download_target_extractor nor download_target_requester are provided, yield a single empty string
# to express the need to make a single download request without any download_target value
if not self.download_target_extractor:
if not self.download_target_requester:
lazy_log(
LOGGER,
logging.DEBUG,
lambda: "No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`.",
)
yield ""
return
else:
raise AirbyteTracedException(
internal_message="Must define a `download_target_extractor` when using a `download_target_requester`.",
failure_type=FailureType.config_error,
)

# We have a download_target_extractor, use it to extract the donload_target
if self.download_target_requester:
# if a download_target_requester if defined, we extract from the response of a request specifically for download targets.
stream_slice: StreamSlice = StreamSlice(
partition={},
cursor_slice={},
Expand All @@ -346,5 +366,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
internal_message="Always expect a response or an exception from download_target_requester",
failure_type=FailureType.system_error,
)
else:
# if no download_target_requester is defined, we extract from the polling response
url_response = self._polling_job_response_by_id[job.api_job_id()]

yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings
Loading