Skip to content

Commit e18e407

Browse files
authored
feat: Async Retriever add url_requester (#211)
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent e3f904a commit e18e407

File tree

5 files changed

+126
-15
lines changed

5 files changed

+126
-15
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -2977,6 +2977,11 @@ definitions:
29772977
anyOf:
29782978
- "$ref": "#/definitions/CustomRequester"
29792979
- "$ref": "#/definitions/HttpRequester"
2980+
url_requester:
2981+
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
2982+
anyOf:
2983+
- "$ref": "#/definitions/CustomRequester"
2984+
- "$ref": "#/definitions/HttpRequester"
29802985
download_requester:
29812986
description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.
29822987
anyOf:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+26-12
Original file line numberDiff line numberDiff line change
@@ -737,33 +737,43 @@ class KeysToSnakeCase(BaseModel):
737737
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
738738

739739

740+
class FlattenFields(BaseModel):
741+
type: Literal["FlattenFields"]
742+
flatten_lists: Optional[bool] = Field(
743+
True,
744+
description="Whether to flatten lists or leave it as is. Default is True.",
745+
title="Flatten Lists",
746+
)
747+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
748+
749+
740750
class KeysReplace(BaseModel):
741751
type: Literal["KeysReplace"]
742752
old: str = Field(
743753
...,
744754
description="Old value to replace.",
745-
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
755+
examples=[
756+
" ",
757+
"{{ record.id }}",
758+
"{{ config['id'] }}",
759+
"{{ stream_slice['id'] }}",
760+
],
746761
title="Old value",
747762
)
748763
new: str = Field(
749764
...,
750765
description="New value to set.",
751-
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
766+
examples=[
767+
"_",
768+
"{{ record.id }}",
769+
"{{ config['id'] }}",
770+
"{{ stream_slice['id'] }}",
771+
],
752772
title="New value",
753773
)
754774
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
755775

756776

757-
class FlattenFields(BaseModel):
758-
type: Literal["FlattenFields"]
759-
flatten_lists: Optional[bool] = Field(
760-
True,
761-
description="Whether to flatten lists or leave it as is. Default is True.",
762-
title="Flatten Lists",
763-
)
764-
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
765-
766-
767777
class IterableDecoder(BaseModel):
768778
type: Literal["IterableDecoder"]
769779

@@ -2040,6 +2050,10 @@ class AsyncRetriever(BaseModel):
20402050
...,
20412051
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
20422052
)
2053+
url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
2054+
None,
2055+
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
2056+
)
20432057
download_requester: Union[CustomRequester, HttpRequester] = Field(
20442058
...,
20452059
description="Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+11
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,16 @@ def create_async_retriever(
23242324
if model.delete_requester
23252325
else None
23262326
)
2327+
url_requester = (
2328+
self._create_component_from_model(
2329+
model=model.url_requester,
2330+
decoder=decoder,
2331+
config=config,
2332+
name=f"job extract_url - {name}",
2333+
)
2334+
if model.url_requester
2335+
else None
2336+
)
23272337
status_extractor = self._create_component_from_model(
23282338
model=model.status_extractor, decoder=decoder, config=config, name=name
23292339
)
@@ -2334,6 +2344,7 @@ def create_async_retriever(
23342344
creation_requester=creation_requester,
23352345
polling_requester=polling_requester,
23362346
download_retriever=download_retriever,
2347+
url_requester=url_requester,
23372348
abort_requester=abort_requester,
23382349
delete_requester=delete_requester,
23392350
status_extractor=status_extractor,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# AsyncHttpJobRepository sequence diagram
2+
3+
- Components marked as optional are not required and can be ignored.
4+
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
5+
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice
6+
7+
8+
```mermaid
9+
---
10+
title: AsyncHttpJobRepository Sequence Diagram
11+
---
12+
sequenceDiagram
13+
participant AsyncHttpJobRepository as AsyncOrchestrator
14+
participant CreationRequester as creation_requester
15+
participant PollingRequester as polling_requester
16+
participant UrlRequester as url_requester (Optional)
17+
participant DownloadRetriever as download_retriever
18+
participant AbortRequester as abort_requester (Optional)
19+
participant DeleteRequester as delete_requester (Optional)
20+
participant Reporting Server as Async Reporting Server
21+
22+
AsyncHttpJobRepository ->> CreationRequester: Initiate job creation
23+
CreationRequester ->> Reporting Server: Create job request
24+
Reporting Server -->> CreationRequester: Job ID response
25+
CreationRequester -->> AsyncHttpJobRepository: Job ID
26+
27+
loop Poll for job status
28+
AsyncHttpJobRepository ->> PollingRequester: Check job status
29+
PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`)
30+
Reporting Server -->> PollingRequester: Status response
31+
PollingRequester -->> AsyncHttpJobRepository: Job status
32+
end
33+
34+
alt Status: Ready
35+
AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable)
36+
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`)
37+
Reporting Server -->> UrlRequester: Download URLs
38+
UrlRequester -->> AsyncHttpJobRepository: Download URLs
39+
40+
AsyncHttpJobRepository ->> DownloadRetriever: Download reports
41+
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
42+
Reporting Server -->> DownloadRetriever: Report data
43+
DownloadRetriever -->> AsyncHttpJobRepository: Report data
44+
else Status: Failed
45+
AsyncHttpJobRepository ->> AbortRequester: Send abort request
46+
AbortRequester ->> Reporting Server: Abort job
47+
Reporting Server -->> AbortRequester: Abort confirmation
48+
AbortRequester -->> AsyncHttpJobRepository: Confirmation
49+
end
50+
51+
AsyncHttpJobRepository ->> DeleteRequester: Send delete job request
52+
DeleteRequester ->> Reporting Server: Delete job
53+
Reporting Server -->> DeleteRequester: Deletion confirmation
54+
DeleteRequester -->> AsyncHttpJobRepository: Confirmation
55+
56+
57+
```

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

+27-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131

3232
@dataclass
3333
class AsyncHttpJobRepository(AsyncJobRepository):
34+
"""
35+
See Readme file for more details about flow.
36+
"""
37+
3438
creation_requester: Requester
3539
polling_requester: Requester
3640
download_retriever: SimpleRetriever
@@ -44,6 +48,9 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4448
record_extractor: RecordExtractor = field(
4549
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
4650
)
51+
url_requester: Optional[Requester] = (
52+
None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
53+
)
4754

4855
def __post_init__(self) -> None:
4956
self._create_job_response_by_id: Dict[str, Response] = {}
@@ -186,9 +193,7 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
186193
187194
"""
188195

189-
for url in self.urls_extractor.extract_records(
190-
self._polling_job_response_by_id[job.api_job_id()]
191-
):
196+
for url in self._get_download_url(job):
192197
job_slice = job.job_parameters()
193198
stream_slice = StreamSlice(
194199
partition=job_slice.partition,
@@ -231,3 +236,22 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
231236
cursor_slice={},
232237
)
233238
return stream_slice
239+
240+
def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
241+
if not self.url_requester:
242+
url_response = self._polling_job_response_by_id[job.api_job_id()]
243+
else:
244+
stream_slice: StreamSlice = StreamSlice(
245+
partition={
246+
"polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
247+
},
248+
cursor_slice={},
249+
)
250+
url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
251+
if not url_response:
252+
raise AirbyteTracedException(
253+
internal_message="Always expect a response or an exception from url_requester",
254+
failure_type=FailureType.system_error,
255+
)
256+
257+
yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings

0 commit comments

Comments
 (0)