Skip to content

Commit ab2c827

Browse files
szubstertolik0
andauthored
fix(declarative): Pass extra_fields in global_substream_cursor (#195)
Co-authored-by: Anatolii Yatsuk <[email protected]>
1 parent 1869fa5 commit ab2c827

File tree

2 files changed

+69
-2
lines changed

2 files changed

+69
-2
lines changed

Diff for: airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ def stream_slices(self) -> Iterable[StreamSlice]:
115115
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
116116
"""
117117
slice_generator = (
118-
StreamSlice(partition=partition, cursor_slice=cursor_slice)
118+
StreamSlice(
119+
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
120+
)
119121
for partition in self._partition_router.stream_slices()
120122
for cursor_slice in self._stream_cursor.stream_slices()
121123
)
@@ -131,7 +133,9 @@ def stream_slices(self) -> Iterable[StreamSlice]:
131133

132134
def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
133135
slice_generator = (
134-
StreamSlice(partition=partition, cursor_slice=cursor_slice)
136+
StreamSlice(
137+
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
138+
)
135139
for cursor_slice in self._stream_cursor.stream_slices()
136140
)
137141

Diff for: unit_tests/sources/declarative/incremental/test_per_partition_cursor.py

+63
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import pytest
99

1010
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
11+
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
12+
GlobalSubstreamCursor,
13+
)
1114
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import (
1215
PerPartitionCursor,
1316
PerPartitionKeySerializer,
@@ -715,3 +718,63 @@ def test_per_partition_state_when_set_initial_global_state(
715718
},
716719
]
717720
assert cursor.get_stream_state()["states"] == expected_state
721+
722+
723+
def test_per_partition_cursor_partition_router_extra_fields(
724+
mocked_cursor_factory, mocked_partition_router
725+
):
726+
first_partition = {"first_partition_key": "first_partition_value"}
727+
mocked_partition_router.stream_slices.return_value = [
728+
StreamSlice(
729+
partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"}
730+
),
731+
]
732+
cursor = (
733+
MockedCursorBuilder()
734+
.with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}])
735+
.build()
736+
)
737+
738+
mocked_cursor_factory.create.return_value = cursor
739+
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
740+
741+
cursor.set_initial_state({"states": [{"partition": first_partition, "cursor": CURSOR_STATE}]})
742+
slices = list(cursor.stream_slices())
743+
744+
assert slices[0].extra_fields == {"extra_field": "extra_value"}
745+
assert slices == [
746+
StreamSlice(
747+
partition={"first_partition_key": "first_partition_value"},
748+
cursor_slice={CURSOR_SLICE_FIELD: "first slice cursor value"},
749+
extra_fields={"extra_field": "extra_value"},
750+
)
751+
]
752+
753+
754+
def test_global_cursor_partition_router_extra_fields(
755+
mocked_cursor_factory, mocked_partition_router
756+
):
757+
first_partition = {"first_partition_key": "first_partition_value"}
758+
mocked_partition_router.stream_slices.return_value = [
759+
StreamSlice(
760+
partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"}
761+
),
762+
]
763+
cursor = (
764+
MockedCursorBuilder()
765+
.with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}])
766+
.build()
767+
)
768+
769+
global_cursor = GlobalSubstreamCursor(cursor, mocked_partition_router)
770+
771+
slices = list(global_cursor.stream_slices())
772+
773+
assert slices[0].extra_fields == {"extra_field": "extra_value"}
774+
assert slices == [
775+
StreamSlice(
776+
partition=first_partition,
777+
cursor_slice={CURSOR_SLICE_FIELD: "first slice cursor value"},
778+
extra_fields={"extra_field": "extra_value"},
779+
)
780+
]

0 commit comments

Comments
 (0)