Skip to content

Commit 9b2de4f

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
fix(low-code): sync records without cursor field in ConcurrentPerPartitionCursor (#605)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 577f9a7 commit 9b2de4f

File tree

3 files changed

+285
-3
lines changed

3 files changed

+285
-3
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,14 @@ def observe(self, record: Record) -> None:
448448
"Invalid state as stream slices that are emitted should refer to an existing cursor"
449449
)
450450

451+
# if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
452+
try:
453+
record_cursor_value = self._cursor_field.extract_value(record)
454+
except ValueError:
455+
return
456+
451457
record_cursor = self._connector_state_converter.output_format(
452-
self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
458+
self._connector_state_converter.parse_value(record_cursor_value)
453459
)
454460
self._update_global_cursor(record_cursor)
455461
if not self._use_global_cursor:

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,279 @@ def run_incremental_parent_state_test(
14601460
],
14611461
},
14621462
),
1463+
(
1464+
"test_incremental_parent_state_one_record_without_cursor",
1465+
SUBSTREAM_MANIFEST,
1466+
[
1467+
# Fetch the first page of posts
1468+
(
1469+
f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}",
1470+
{
1471+
"posts": [
1472+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
1473+
]
1474+
},
1475+
),
1476+
# Fetch the first page of comments for post 1
1477+
(
1478+
"https://api.example.com/community/posts/1/comments?per_page=100",
1479+
{
1480+
"comments": [
1481+
{
1482+
"id": 9,
1483+
"post_id": 1,
1484+
"updated_at": COMMENT_9_OLDEST,
1485+
},
1486+
{
1487+
"id": 10,
1488+
"post_id": 1,
1489+
"updated_at": COMMENT_10_UPDATED_AT,
1490+
},
1491+
{
1492+
"id": 11,
1493+
"post_id": 1,
1494+
"updated_at": COMMENT_11_UPDATED_AT,
1495+
},
1496+
]
1497+
},
1498+
),
1499+
# Fetch the first page of votes for comment 10 of post 1 (vote without cursor field)
1500+
(
1501+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
1502+
{
1503+
"votes": [
1504+
{
1505+
"id": 100,
1506+
"comment_id": 10,
1507+
}
1508+
],
1509+
},
1510+
),
1511+
# Fetch the first page of votes for comment 11 of post 1
1512+
(
1513+
f"https://api.example.com/community/posts/1/comments/11/votes"
1514+
f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
1515+
{"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]},
1516+
),
1517+
# Requests with intermediate states
1518+
# Fetch votes for comment 11 of post 1
1519+
(
1520+
f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}",
1521+
{
1522+
"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}],
1523+
},
1524+
),
1525+
],
1526+
# Expected records
1527+
[
1528+
{"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100},
1529+
{
1530+
"comment_id": 11,
1531+
"comment_updated_at": COMMENT_11_UPDATED_AT,
1532+
"created_at": "2024-01-13T00:00:00Z",
1533+
"id": 111,
1534+
},
1535+
],
1536+
# Number of intermediate states - 6 as number of parent partitions
1537+
2,
1538+
# Initial state
1539+
{
1540+
"parent_state": {
1541+
"post_comments": {
1542+
"states": [
1543+
{
1544+
"partition": {"id": 1, "parent_slice": {}},
1545+
"cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1},
1546+
}
1547+
],
1548+
"parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}},
1549+
}
1550+
},
1551+
"state": {"created_at": INITIAL_GLOBAL_CURSOR},
1552+
"states": [
1553+
{
1554+
"partition": {
1555+
"id": 10,
1556+
"parent_slice": {"id": 1, "parent_slice": {}},
1557+
},
1558+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
1559+
},
1560+
{
1561+
"partition": {
1562+
"id": 11,
1563+
"parent_slice": {"id": 1, "parent_slice": {}},
1564+
},
1565+
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
1566+
},
1567+
],
1568+
"lookback_window": 86400,
1569+
},
1570+
# Expected state
1571+
{
1572+
"state": {"created_at": VOTE_111_CREATED_AT},
1573+
"parent_state": {
1574+
"post_comments": {
1575+
"use_global_cursor": False,
1576+
"state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest"
1577+
"parent_state": {
1578+
"posts": {"updated_at": POST_1_UPDATED_AT}
1579+
}, # post 1 is the latest
1580+
"lookback_window": 1,
1581+
"states": [
1582+
{
1583+
"partition": {"id": 1, "parent_slice": {}},
1584+
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
1585+
},
1586+
],
1587+
}
1588+
},
1589+
"lookback_window": 1,
1590+
"use_global_cursor": False,
1591+
"states": [
1592+
{
1593+
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
1594+
# initial state because record doesn't have a cursor field
1595+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
1596+
},
1597+
{
1598+
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
1599+
"cursor": {"created_at": VOTE_111_CREATED_AT},
1600+
},
1601+
],
1602+
},
1603+
),
1604+
(
1605+
"test_incremental_parent_state_all_records_without_cursor",
1606+
SUBSTREAM_MANIFEST,
1607+
[
1608+
# Fetch the first page of posts
1609+
(
1610+
f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}",
1611+
{
1612+
"posts": [
1613+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
1614+
]
1615+
},
1616+
),
1617+
# Fetch the first page of comments for post 1
1618+
(
1619+
"https://api.example.com/community/posts/1/comments?per_page=100",
1620+
{
1621+
"comments": [
1622+
{
1623+
"id": 9,
1624+
"post_id": 1,
1625+
"updated_at": COMMENT_9_OLDEST,
1626+
},
1627+
{
1628+
"id": 10,
1629+
"post_id": 1,
1630+
"updated_at": COMMENT_10_UPDATED_AT,
1631+
},
1632+
{
1633+
"id": 11,
1634+
"post_id": 1,
1635+
"updated_at": COMMENT_11_UPDATED_AT,
1636+
},
1637+
]
1638+
},
1639+
),
1640+
# Fetch the first page of votes for comment 10 of post 1 (vote without cursor field)
1641+
(
1642+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
1643+
{
1644+
"votes": [
1645+
{
1646+
"id": 100,
1647+
"comment_id": 10,
1648+
}
1649+
],
1650+
},
1651+
),
1652+
# Fetch the first page of votes for comment 11 of post 1 (vote without cursor field)
1653+
(
1654+
f"https://api.example.com/community/posts/1/comments/11/votes"
1655+
f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
1656+
{"votes": [{"id": 111, "comment_id": 11}]},
1657+
),
1658+
],
1659+
# Expected records
1660+
[
1661+
{"comment_id": 10, "comment_updated_at": COMMENT_10_UPDATED_AT, "id": 100},
1662+
{
1663+
"comment_id": 11,
1664+
"comment_updated_at": COMMENT_11_UPDATED_AT,
1665+
"id": 111,
1666+
},
1667+
],
1668+
# Number of intermediate states - 6 as number of parent partitions
1669+
2,
1670+
# Initial state
1671+
{
1672+
"parent_state": {
1673+
"post_comments": {
1674+
"states": [
1675+
{
1676+
"partition": {"id": 1, "parent_slice": {}},
1677+
"cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1},
1678+
}
1679+
],
1680+
"parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}},
1681+
}
1682+
},
1683+
"state": {"created_at": INITIAL_GLOBAL_CURSOR},
1684+
"states": [
1685+
{
1686+
"partition": {
1687+
"id": 10,
1688+
"parent_slice": {"id": 1, "parent_slice": {}},
1689+
},
1690+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
1691+
},
1692+
{
1693+
"partition": {
1694+
"id": 11,
1695+
"parent_slice": {"id": 1, "parent_slice": {}},
1696+
},
1697+
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
1698+
},
1699+
],
1700+
"lookback_window": 86400,
1701+
},
1702+
# Expected state
1703+
{
1704+
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
1705+
"parent_state": {
1706+
"post_comments": {
1707+
"use_global_cursor": False,
1708+
"state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest"
1709+
"parent_state": {
1710+
"posts": {"updated_at": POST_1_UPDATED_AT}
1711+
}, # post 1 is the latest
1712+
"lookback_window": 1,
1713+
"states": [
1714+
{
1715+
"partition": {"id": 1, "parent_slice": {}},
1716+
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
1717+
},
1718+
],
1719+
}
1720+
},
1721+
"lookback_window": 1,
1722+
"use_global_cursor": False,
1723+
"states": [
1724+
{
1725+
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
1726+
# initial state because record doesn't have a cursor field
1727+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
1728+
},
1729+
{
1730+
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
1731+
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
1732+
},
1733+
],
1734+
},
1735+
),
14631736
],
14641737
)
14651738
def test_incremental_parent_state(

unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,12 @@ def test_dynamic_streams_with_parametrized_components_resolver():
172172
if record.type == Type.RECORD
173173
]
174174

175-
assert len(records) == 3
176-
assert [dict(sorted(record.record.data.items())) for record in records] == [
175+
expected_records = [
177176
{"field1": "Customers info", "field2": "Related to customers field"},
178177
{"field1": "Refunds info", "field2": "Related to refunds field"},
179178
{"field1": "Orders info", "field2": "Related to orders field"},
180179
]
180+
assert len(records) == len(expected_records)
181+
for record in records:
182+
record_data = record.record.data
183+
assert record_data in expected_records

0 commit comments

Comments
 (0)