Skip to content

Commit e3f904a

Browse files
authored
fix: Async Retriever change url path for download retriever (#192)
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent 7569db0 commit e3f904a

File tree

4 files changed

+11
-3
lines changed

4 files changed

+11
-3
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2287,7 +2287,7 @@ def create_async_retriever(
22872287
extractor=download_extractor,
22882288
name=name,
22892289
record_filter=None,
2290-
transformations=[],
2290+
transformations=transformations,
22912291
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
22922292
config=config,
22932293
parameters={},

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
189189
for url in self.urls_extractor.extract_records(
190190
self._polling_job_response_by_id[job.api_job_id()]
191191
):
192-
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={})
192+
job_slice = job.job_parameters()
193+
stream_slice = StreamSlice(
194+
partition=job_slice.partition,
195+
cursor_slice=job_slice.cursor_slice,
196+
extra_fields={**job_slice.extra_fields, "url": url},
197+
)
193198
for message in self.download_retriever.read_records({}, stream_slice):
194199
if isinstance(message, Record):
195200
yield message.data

airbyte_cdk/sources/types.py

+3
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,6 @@ def __json_serializable__(self) -> Any:
152152

153153
def __hash__(self) -> int:
154154
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS))
155+
156+
def __bool__(self) -> bool:
157+
return bool(self._stream_slice) or bool(self._extra_fields)

unit_tests/sources/declarative/requesters/test_http_job_repository.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def setUp(self) -> None:
8484
requester=HttpRequester(
8585
name="stream <name>: fetch_result",
8686
url_base="",
87-
path="{{stream_slice['url']}}",
87+
path="{{stream_slice.extra_fields['url']}}",
8888
error_handler=error_handler,
8989
http_method=HttpMethod.GET,
9090
config=_ANY_CONFIG,

0 commit comments

Comments
 (0)