Skip to content

feat: Async Retriever add url_requester #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,11 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
url_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
download_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.
anyOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,33 +737,43 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
flatten_lists: Optional[bool] = Field(
True,
description="Whether to flatten lists or leave it as is. Default is True.",
title="Flatten Lists",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
" ",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
"_",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
flatten_lists: Optional[bool] = Field(
True,
description="Whether to flatten lists or leave it as is. Default is True.",
title="Flatten Lists",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -2040,6 +2050,10 @@ class AsyncRetriever(BaseModel):
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
)
url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
)
download_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,16 @@ def create_async_retriever(
if model.delete_requester
else None
)
url_requester = (
self._create_component_from_model(
model=model.url_requester,
decoder=decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.url_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
)
Expand All @@ -2334,6 +2344,7 @@ def create_async_retriever(
creation_requester=creation_requester,
polling_requester=polling_requester,
download_retriever=download_retriever,
url_requester=url_requester,
abort_requester=abort_requester,
delete_requester=delete_requester,
status_extractor=status_extractor,
Expand Down
57 changes: 57 additions & 0 deletions airbyte_cdk/sources/declarative/requesters/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# AsyncHttpJobRepository sequence diagram

- Components marked as optional are not required and can be ignored.
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice


```mermaid
---
title: AsyncHttpJobRepository Sequence Diagram
---
sequenceDiagram
participant AsyncHttpJobRepository as AsyncOrchestrator
participant CreationRequester as creation_requester
participant PollingRequester as polling_requester
participant UrlRequester as url_requester (Optional)
participant DownloadRetriever as download_retriever
participant AbortRequester as abort_requester (Optional)
participant DeleteRequester as delete_requester (Optional)
participant Reporting Server as Async Reporting Server

AsyncHttpJobRepository ->> CreationRequester: Initiate job creation
CreationRequester ->> Reporting Server: Create job request
Reporting Server -->> CreationRequester: Job ID response
CreationRequester -->> AsyncHttpJobRepository: Job ID

loop Poll for job status
AsyncHttpJobRepository ->> PollingRequester: Check job status
PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`)
Reporting Server -->> PollingRequester: Status response
PollingRequester -->> AsyncHttpJobRepository: Job status
end

alt Status: Ready
AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable)
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`)
Reporting Server -->> UrlRequester: Download URLs
UrlRequester -->> AsyncHttpJobRepository: Download URLs

AsyncHttpJobRepository ->> DownloadRetriever: Download reports
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
Reporting Server -->> DownloadRetriever: Report data
DownloadRetriever -->> AsyncHttpJobRepository: Report data
else Status: Failed
AsyncHttpJobRepository ->> AbortRequester: Send abort request
AbortRequester ->> Reporting Server: Abort job
Reporting Server -->> AbortRequester: Abort confirmation
AbortRequester -->> AsyncHttpJobRepository: Confirmation
end

AsyncHttpJobRepository ->> DeleteRequester: Send delete job request
DeleteRequester ->> Reporting Server: Delete job
Reporting Server -->> DeleteRequester: Deletion confirmation
DeleteRequester -->> AsyncHttpJobRepository: Confirmation


```
30 changes: 27 additions & 3 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

@dataclass
class AsyncHttpJobRepository(AsyncJobRepository):
"""
See Readme file for more details about flow.
"""

creation_requester: Requester
polling_requester: Requester
download_retriever: SimpleRetriever
Expand All @@ -44,6 +48,9 @@ class AsyncHttpJobRepository(AsyncJobRepository):
record_extractor: RecordExtractor = field(
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
)
url_requester: Optional[Requester] = (
None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
)

def __post_init__(self) -> None:
self._create_job_response_by_id: Dict[str, Response] = {}
Expand Down Expand Up @@ -186,9 +193,7 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:

"""

for url in self.urls_extractor.extract_records(
self._polling_job_response_by_id[job.api_job_id()]
):
for url in self._get_download_url(job):
job_slice = job.job_parameters()
stream_slice = StreamSlice(
partition=job_slice.partition,
Expand Down Expand Up @@ -231,3 +236,22 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
cursor_slice={},
)
return stream_slice

def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
if not self.url_requester:
url_response = self._polling_job_response_by_id[job.api_job_id()]
else:
stream_slice: StreamSlice = StreamSlice(
partition={
"polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
},
cursor_slice={},
)
url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
if not url_response:
raise AirbyteTracedException(
internal_message="Always expect a response or an exception from url_requester",
failure_type=FailureType.system_error,
)

yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings
Loading