Skip to content

Commit 33e9f5e

Browse files
authored
fix(concurrent-partition-cursor): Fix cursor comparison error (#298)
1 parent 0b14379 commit 33e9f5e

File tree

3 files changed

+92
-16
lines changed

3 files changed

+92
-16
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
)
2323
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
2424
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
25+
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
26+
AbstractStreamStateConverter,
27+
)
2528
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
2629

2730
logger = logging.getLogger("airbyte")
@@ -72,13 +75,15 @@ def __init__(
7275
stream_state: Any,
7376
message_repository: MessageRepository,
7477
connector_state_manager: ConnectorStateManager,
78+
connector_state_converter: AbstractStreamStateConverter,
7579
cursor_field: CursorField,
7680
) -> None:
7781
self._global_cursor: Optional[StreamState] = {}
7882
self._stream_name = stream_name
7983
self._stream_namespace = stream_namespace
8084
self._message_repository = message_repository
8185
self._connector_state_manager = connector_state_manager
86+
self._connector_state_converter = connector_state_converter
8287
self._cursor_field = cursor_field
8388

8489
self._cursor_factory = cursor_factory
@@ -301,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
301306
):
302307
# We assume that `stream_state` is in a global format that can be applied to all partitions.
303308
# Example: {"global_state_format_key": "global_state_format_value"}
304-
self._global_cursor = deepcopy(stream_state)
305-
self._new_global_cursor = deepcopy(stream_state)
309+
self._set_global_state(stream_state)
306310

307311
else:
308312
self._use_global_cursor = stream_state.get("use_global_cursor", False)
@@ -319,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
319323

320324
# set default state for missing partitions if it is per partition with fallback to global
321325
if self._GLOBAL_STATE_KEY in stream_state:
322-
self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
323-
self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
326+
self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])
324327

325328
# Set initial parent state
326329
if stream_state.get("parent_state"):
@@ -329,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
329332
# Set parent state for partition routers based on parent streams
330333
self._partition_router.set_initial_state(stream_state)
331334

335+
def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
336+
"""
337+
Initializes the global cursor state from the provided stream state.
338+
339+
If the cursor field key is present in the stream state, its value is parsed,
340+
formatted, and stored as the global cursor. This ensures consistency in state
341+
representation across partitions.
342+
"""
343+
if self.cursor_field.cursor_field_key in stream_state:
344+
global_state_value = stream_state[self.cursor_field.cursor_field_key]
345+
final_format_global_state_value = self._connector_state_converter.output_format(
346+
self._connector_state_converter.parse_value(global_state_value)
347+
)
348+
349+
fixed_global_state = {
350+
self.cursor_field.cursor_field_key: final_format_global_state_value
351+
}
352+
353+
self._global_cursor = deepcopy(fixed_global_state)
354+
self._new_global_cursor = deepcopy(fixed_global_state)
355+
332356
def observe(self, record: Record) -> None:
333357
if not self._use_global_cursor and self.limit_reached():
334358
self._use_global_cursor = True

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
12101210
)
12111211
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
12121212

1213+
datetime_format = datetime_based_cursor_model.datetime_format
1214+
1215+
cursor_granularity = (
1216+
parse_duration(datetime_based_cursor_model.cursor_granularity)
1217+
if datetime_based_cursor_model.cursor_granularity
1218+
else None
1219+
)
1220+
1221+
connector_state_converter: DateTimeStreamStateConverter
1222+
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
1223+
datetime_format=datetime_format,
1224+
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
1225+
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
1226+
cursor_granularity=cursor_granularity,
1227+
)
1228+
12131229
# Create the cursor factory
12141230
cursor_factory = ConcurrentCursorFactory(
12151231
partial(
@@ -1233,6 +1249,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
12331249
stream_state=stream_state,
12341250
message_repository=self._message_repository, # type: ignore
12351251
connector_state_manager=state_manager,
1252+
connector_state_converter=connector_state_converter,
12361253
cursor_field=cursor_field,
12371254
)
12381255

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
},
6666
"cursor_incremental_sync": {
6767
"type": "DatetimeBasedCursor",
68-
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"],
68+
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
6969
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
7070
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
7171
"start_datetime": {"datetime": "{{ config.get('start_date')}}"},
@@ -399,13 +399,16 @@ def _run_read(
399399
VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20
400400
VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21
401401
VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30
402+
VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30
402403

403404
# Initial State Constants
404405
PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition)
405406
PARENT_POSTS_CURSOR = "2024-01-05T00:00:00Z" # Parent posts cursor (expected in state)
406407

407408
INITIAL_STATE_PARTITION_10_CURSOR = "2024-01-02T00:00:01Z"
409+
INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000
408410
INITIAL_STATE_PARTITION_11_CURSOR = "2024-01-03T00:00:02Z"
411+
INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000
409412
INITIAL_GLOBAL_CURSOR = INITIAL_STATE_PARTITION_11_CURSOR
410413
INITIAL_GLOBAL_CURSOR_DATE = datetime.fromisoformat(
411414
INITIAL_STATE_PARTITION_11_CURSOR.replace("Z", "")
@@ -596,7 +599,7 @@ def _run_read(
596599
{
597600
"id": 300,
598601
"comment_id": 30,
599-
"created_at": VOTE_300_CREATED_AT,
602+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
600603
}
601604
]
602605
},
@@ -637,7 +640,7 @@ def _run_read(
637640
{
638641
"comment_id": 30,
639642
"comment_updated_at": COMMENT_30_UPDATED_AT,
640-
"created_at": VOTE_300_CREATED_AT,
643+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
641644
"id": 300,
642645
},
643646
],
@@ -662,7 +665,7 @@ def _run_read(
662665
"id": 10,
663666
"parent_slice": {"id": 1, "parent_slice": {}},
664667
},
665-
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
668+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
666669
},
667670
{
668671
"partition": {
@@ -672,7 +675,7 @@ def _run_read(
672675
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
673676
},
674677
],
675-
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
678+
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
676679
"lookback_window": 86400,
677680
},
678681
# Expected state
@@ -981,7 +984,15 @@ def run_incremental_parent_state_test(
981984
# Fetch the first page of votes for comment 30 of post 3
982985
(
983986
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
984-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
987+
{
988+
"votes": [
989+
{
990+
"id": 300,
991+
"comment_id": 30,
992+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
993+
}
994+
]
995+
},
985996
),
986997
# Requests with intermediate states
987998
# Fetch votes for comment 10 of post 1
@@ -1018,7 +1029,15 @@ def run_incremental_parent_state_test(
10181029
# Fetch votes for comment 30 of post 3
10191030
(
10201031
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}",
1021-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1032+
{
1033+
"votes": [
1034+
{
1035+
"id": 300,
1036+
"comment_id": 30,
1037+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1038+
}
1039+
]
1040+
},
10221041
),
10231042
],
10241043
# Expected records
@@ -1056,7 +1075,7 @@ def run_incremental_parent_state_test(
10561075
{
10571076
"comment_id": 30,
10581077
"comment_updated_at": COMMENT_30_UPDATED_AT,
1059-
"created_at": VOTE_300_CREATED_AT,
1078+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
10601079
"id": 300,
10611080
},
10621081
],
@@ -1344,7 +1363,15 @@ def test_incremental_parent_state(
13441363
(
13451364
f"https://api.example.com/community/posts/3/comments/30/votes"
13461365
f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}",
1347-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1366+
{
1367+
"votes": [
1368+
{
1369+
"id": 300,
1370+
"comment_id": 30,
1371+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1372+
}
1373+
]
1374+
},
13481375
),
13491376
],
13501377
# Expected records
@@ -1382,7 +1409,7 @@ def test_incremental_parent_state(
13821409
{
13831410
"comment_id": 30,
13841411
"comment_updated_at": COMMENT_30_UPDATED_AT,
1385-
"created_at": VOTE_300_CREATED_AT,
1412+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
13861413
"id": 300,
13871414
},
13881415
],
@@ -1896,7 +1923,15 @@ def test_incremental_parent_state_no_records(
18961923
(
18971924
f"https://api.example.com/community/posts/3/comments/30/votes"
18981925
f"?per_page=100&start_time={LOOKBACK_DATE}",
1899-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1926+
{
1927+
"votes": [
1928+
{
1929+
"id": 300,
1930+
"comment_id": 30,
1931+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1932+
}
1933+
]
1934+
},
19001935
),
19011936
],
19021937
# Expected records
@@ -1928,7 +1963,7 @@ def test_incremental_parent_state_no_records(
19281963
{
19291964
"comment_id": 30,
19301965
"comment_updated_at": COMMENT_30_UPDATED_AT,
1931-
"created_at": VOTE_300_CREATED_AT,
1966+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
19321967
"id": 300,
19331968
},
19341969
],

0 commit comments

Comments
 (0)