diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 888989b13..50e80b601 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3797,7 +3797,6 @@ definitions: - polling_requester - download_requester - status_extractor - - download_target_extractor properties: type: type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index ccd2e9e8d..e207e18f4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2852,8 +2852,8 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( - ..., + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4a73dced3..cf792a67e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3446,6 +3446,11 @@ def create_async_retriever( transformations: List[RecordTransformation], **kwargs: Any, ) -> AsyncRetriever: + if model.download_target_requester and not model.download_target_extractor: + raise ValueError( + f"`download_target_extractor` required if using a `download_target_requester`" + ) + def _get_download_retriever( requester: Requester, extractor: RecordExtractor, _decoder: Decoder ) -> SimpleRetriever: @@ -3603,11 +3608,15 @@ def _get_job_timeout() -> datetime.timedelta: status_extractor = self._create_component_from_model( model=model.status_extractor, decoder=decoder, config=config, name=name ) - download_target_extractor = self._create_component_from_model( - model=model.download_target_extractor, - decoder=decoder, - config=config, - name=name, + download_target_extractor = ( + self._create_component_from_model( + model=model.download_target_extractor, + decoder=decoder, + config=config, + name=name, + ) + if model.download_target_extractor + else None ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( diff --git a/airbyte_cdk/sources/declarative/requesters/README.md b/airbyte_cdk/sources/declarative/requesters/README.md index cfeaf7e76..096081dc7 100644 --- a/airbyte_cdk/sources/declarative/requesters/README.md +++ b/airbyte_cdk/sources/declarative/requesters/README.md @@ -1,8 +1,19 @@ +# Download Target and Download Requester + +- The `creation_response` and `polling_response` interpolation contexts are always available during the job download step of the process. + +- The`download_target` interpolation context is generated by the `download_target_extractor` and made available to the job download step as well. + - if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response` + - if `download_target_requester` is provided, an additional request will be made to fetch job download targets and `download_target_extractor` will operate on that response + +## Some important considerations + +- **Note:** If the `download_target_extractor` and `download_target_requester` are not defined, a single job download request will be made without the `download_target` context. +- **Note:** The `download_target_extractor` is required (not optional) if using a `download_target_requester` + # AsyncHttpJobRepository sequence diagram - Components marked as optional are not required and can be ignored. -- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response` -- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice ```mermaid --- @@ -37,7 +48,7 @@ sequenceDiagram UrlRequester -->> AsyncHttpJobRepository: Download URLs AsyncHttpJobRepository ->> DownloadRetriever: Download reports - DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`) + DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `download_target`, `creation_response`, `polling_response`) Reporting Server -->> DownloadRetriever: Report data DownloadRetriever -->> AsyncHttpJobRepository: Report data else Status: Failed diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 2ca38494e..d837ed902 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository): delete_requester: Optional[Requester] status_extractor: DpathExtractor status_mapping: Mapping[str, AsyncJobStatus] - download_target_extractor: DpathExtractor + download_target_extractor: Optional[DpathExtractor] # timeout for the job to be completed, passed from `polling_job_timeout` job_timeout: Optional[timedelta] = None @@ -213,14 +213,16 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: """ - for target_url in self._get_download_targets(job): + for download_target in self._get_download_targets(job): job_slice = job.job_parameters() stream_slice = StreamSlice( partition=job_slice.partition, cursor_slice=job_slice.cursor_slice, extra_fields={ **job_slice.extra_fields, - "download_target": target_url, + "download_target": download_target, + "creation_response": self._get_creation_response_interpolation_context(job), + "polling_response": self._get_polling_response_interpolation_context(job), }, ) for message in self.download_retriever.read_records({}, stream_slice): @@ -330,9 +332,27 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice: ) def _get_download_targets(self, job: AsyncJob) -> Iterable[str]: - if not self.download_target_requester: - url_response = self._polling_job_response_by_id[job.api_job_id()] - else: + """Returns an iterable of strings to help target requests for downloading async jobs.""" + # If neither download_target_extractor nor download_target_requester are provided, yield a single empty string + # to express the need to make a single download request without any download_target value + if not self.download_target_extractor: + if not self.download_target_requester: + lazy_log( + LOGGER, + logging.DEBUG, + lambda: "No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`.", + ) + yield "" + return + else: + raise AirbyteTracedException( + internal_message="Must define a `download_target_extractor` when using a `download_target_requester`.", + failure_type=FailureType.config_error, + ) + + # We have a download_target_extractor, use it to extract the donload_target + if self.download_target_requester: + # if a download_target_requester if defined, we extract from the response of a request specifically for download targets. stream_slice: StreamSlice = StreamSlice( partition={}, cursor_slice={}, @@ -346,5 +366,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]: internal_message="Always expect a response or an exception from download_target_requester", failure_type=FailureType.system_error, ) + else: + # if no download_target_requester is defined, we extract from the polling response + url_response = self._polling_job_response_by_id[job.api_job_id()] yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings