Skip to content

Commit c9a4086

Browse files
authored
fix(concurrent-cdk): Move the grouping of concurrent and synchronous streams into the read and discover commands instead of when initializing the source (#130)
1 parent 89defe1 commit c9a4086

File tree

3 files changed

+54
-44
lines changed

3 files changed

+54
-44
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

+12-23
Original file line numberDiff line numberDiff line change
@@ -86,23 +86,10 @@ def __init__(
8686
component_factory=component_factory,
8787
)
8888

89+
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
90+
# no longer needs to store the original incoming state. But maybe there's an edge case?
8991
self._state = state
9092

91-
self._concurrent_streams: Optional[List[AbstractStream]]
92-
self._synchronous_streams: Optional[List[Stream]]
93-
94-
# If the connector command was SPEC, there is no incoming config, and we cannot instantiate streams because
95-
# they might depend on it. Ideally we want to have a static method on this class to get the spec without
96-
# any other arguments, but the existing entrypoint.py isn't designed to support this. Just noting this
97-
# for our future improvements to the CDK.
98-
if config:
99-
self._concurrent_streams, self._synchronous_streams = self._group_streams(
100-
config=config or {}
101-
)
102-
else:
103-
self._concurrent_streams = None
104-
self._synchronous_streams = None
105-
10693
concurrency_level_from_manifest = self._source_config.get("concurrency_level")
10794
if concurrency_level_from_manifest:
10895
concurrency_level_component = self._constructor.create_component(
@@ -136,17 +123,20 @@ def read(
136123
logger: logging.Logger,
137124
config: Mapping[str, Any],
138125
catalog: ConfiguredAirbyteCatalog,
139-
state: Optional[Union[List[AirbyteStateMessage]]] = None,
126+
state: Optional[List[AirbyteStateMessage]] = None,
140127
) -> Iterator[AirbyteMessage]:
141-
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of the concurrent
142-
# streams must be saved so that they can be removed from the catalog before starting synchronous streams
143-
if self._concurrent_streams:
128+
concurrent_streams, _ = self._group_streams(config=config)
129+
130+
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
131+
# the concurrent streams must be saved so that they can be removed from the catalog before starting
132+
# synchronous streams
133+
if len(concurrent_streams) > 0:
144134
concurrent_stream_names = set(
145-
[concurrent_stream.name for concurrent_stream in self._concurrent_streams]
135+
[concurrent_stream.name for concurrent_stream in concurrent_streams]
146136
)
147137

148138
selected_concurrent_streams = self._select_streams(
149-
streams=self._concurrent_streams, configured_catalog=catalog
139+
streams=concurrent_streams, configured_catalog=catalog
150140
)
151141
# It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
152142
# This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
@@ -165,8 +155,7 @@ def read(
165155
yield from super().read(logger, config, filtered_catalog, state)
166156

167157
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
168-
concurrent_streams = self._concurrent_streams or []
169-
synchronous_streams = self._synchronous_streams or []
158+
concurrent_streams, synchronous_streams = self._group_streams(config=config)
170159
return AirbyteCatalog(
171160
streams=[
172161
stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams

unit_tests/sources/declarative/decoders/test_json_decoder.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def test_jsonl_decoder(requests_mock, response_body, expected_json):
5454
def large_event_response_fixture():
5555
data = {"email": "[email protected]"}
5656
jsonl_string = f"{json.dumps(data)}\n"
57-
lines_in_response = 2 # ≈ 58 MB of response
57+
lines_in_response = 2_000_000 # ≈ 58 MB of response
5858
dir_path = os.path.dirname(os.path.realpath(__file__))
5959
file_path = f"{dir_path}/test_response.txt"
6060
with open(file_path, "w") as file:

unit_tests/sources/declarative/test_concurrent_declarative_source.py

+41-20
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import json
77
from datetime import datetime, timedelta, timezone
88
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
9+
from unittest.mock import patch
910

1011
import freezegun
1112
import isodate
@@ -647,8 +648,7 @@ def test_group_streams():
647648
source = ConcurrentDeclarativeSource(
648649
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=state
649650
)
650-
concurrent_streams = source._concurrent_streams
651-
synchronous_streams = source._synchronous_streams
651+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
652652

653653
# 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental
654654
assert len(concurrent_streams) == 5
@@ -705,8 +705,9 @@ def test_create_concurrent_cursor():
705705
source = ConcurrentDeclarativeSource(
706706
source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state
707707
)
708+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
708709

709-
party_members_stream = source._concurrent_streams[0]
710+
party_members_stream = concurrent_streams[0]
710711
assert isinstance(party_members_stream, DefaultStream)
711712
party_members_cursor = party_members_stream.cursor
712713

@@ -722,7 +723,7 @@ def test_create_concurrent_cursor():
722723
assert party_members_cursor._lookback_window == timedelta(days=5)
723724
assert party_members_cursor._cursor_granularity == timedelta(days=1)
724725

725-
locations_stream = source._concurrent_streams[2]
726+
locations_stream = concurrent_streams[2]
726727
assert isinstance(locations_stream, DefaultStream)
727728
locations_cursor = locations_stream.cursor
728729

@@ -866,7 +867,21 @@ def _mock_party_members_skills_requests(http_mocker: HttpMocker) -> None:
866867
)
867868

868869

870+
def mocked_init(self, is_sequential_state: bool = True):
871+
"""
872+
This method is used to patch the existing __init__() function and always set is_sequential_state to
873+
false. This is required because we want to test the concurrent state format. And because streams are
874+
created under the hood of the read/discover/check command, we have no way of setting the field without
875+
patching __init__()
876+
"""
877+
self._is_sequential_state = False
878+
879+
869880
@freezegun.freeze_time(_NOW)
881+
@patch(
882+
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
883+
mocked_init,
884+
)
870885
def test_read_with_concurrent_and_synchronous_streams():
871886
"""
872887
Verifies that a ConcurrentDeclarativeSource processes concurrent streams followed by synchronous streams
@@ -879,7 +894,6 @@ def test_read_with_concurrent_and_synchronous_streams():
879894
source = ConcurrentDeclarativeSource(
880895
source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=None
881896
)
882-
disable_emitting_sequential_state_messages(source=source)
883897

884898
with HttpMocker() as http_mocker:
885899
_mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES)
@@ -959,6 +973,10 @@ def test_read_with_concurrent_and_synchronous_streams():
959973

960974

961975
@freezegun.freeze_time(_NOW)
976+
@patch(
977+
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
978+
mocked_init,
979+
)
962980
def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state():
963981
"""
964982
Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming
@@ -1016,7 +1034,6 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state():
10161034
source = ConcurrentDeclarativeSource(
10171035
source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state
10181036
)
1019-
disable_emitting_sequential_state_messages(source=source)
10201037

10211038
with HttpMocker() as http_mocker:
10221039
_mock_party_members_requests(http_mocker, party_members_slices_and_responses)
@@ -1080,6 +1097,10 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state():
10801097

10811098

10821099
@freezegun.freeze_time(_NOW)
1100+
@patch(
1101+
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
1102+
mocked_init,
1103+
)
10831104
def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
10841105
"""
10851106
Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming
@@ -1105,7 +1126,6 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
11051126
source = ConcurrentDeclarativeSource(
11061127
source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state
11071128
)
1108-
disable_emitting_sequential_state_messages(source=source)
11091129

11101130
party_members_slices_and_responses = _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES + [
11111131
(
@@ -1204,6 +1224,10 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
12041224

12051225

12061226
@freezegun.freeze_time(_NOW)
1227+
@patch(
1228+
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
1229+
mocked_init,
1230+
)
12071231
def test_read_concurrent_with_failing_partition_in_the_middle():
12081232
"""
12091233
Verify that partial state is emitted when only some partitions are successful during a concurrent sync attempt
@@ -1236,7 +1260,6 @@ def test_read_concurrent_with_failing_partition_in_the_middle():
12361260
source = ConcurrentDeclarativeSource(
12371261
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
12381262
)
1239-
disable_emitting_sequential_state_messages(source=source)
12401263

12411264
location_slices = [
12421265
{"start": "2024-07-01", "end": "2024-07-31"},
@@ -1263,6 +1286,10 @@ def test_read_concurrent_with_failing_partition_in_the_middle():
12631286

12641287

12651288
@freezegun.freeze_time(_NOW)
1289+
@patch(
1290+
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
1291+
mocked_init,
1292+
)
12661293
def test_read_concurrent_skip_streams_not_in_catalog():
12671294
"""
12681295
Verifies that the ConcurrentDeclarativeSource only syncs streams that are specified in the incoming ConfiguredCatalog
@@ -1311,8 +1338,6 @@ def test_read_concurrent_skip_streams_not_in_catalog():
13111338
# palaces requests
13121339
http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE)
13131340

1314-
disable_emitting_sequential_state_messages(source=source)
1315-
13161341
messages = list(
13171342
source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])
13181343
)
@@ -1429,11 +1454,12 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous():
14291454
catalog=_CATALOG,
14301455
state=None,
14311456
)
1457+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
14321458

14331459
# 1 full refresh stream, 2 with parent stream without incremental dependency
1434-
assert len(source._concurrent_streams) == 3
1460+
assert len(concurrent_streams) == 3
14351461
# 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies), 1 stream with async retriever
1436-
assert len(source._synchronous_streams) == 4
1462+
assert len(synchronous_streams) == 4
14371463

14381464

14391465
def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent():
@@ -1569,9 +1595,10 @@ def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurr
15691595
source = ConcurrentDeclarativeSource(
15701596
source_config=manifest, config=_CONFIG, catalog=catalog, state=state
15711597
)
1598+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
15721599

1573-
assert len(source._concurrent_streams) == 0
1574-
assert len(source._synchronous_streams) == 1
1600+
assert len(concurrent_streams) == 0
1601+
assert len(synchronous_streams) == 1
15751602

15761603

15771604
def create_wrapped_stream(stream: DeclarativeStream) -> Stream:
@@ -1725,9 +1752,3 @@ def get_states_for_stream(
17251752
for message in messages
17261753
if message.state and message.state.stream.stream_descriptor.name == stream_name
17271754
]
1728-
1729-
1730-
def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None:
1731-
for concurrent_stream in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test
1732-
if isinstance(concurrent_stream.cursor, ConcurrentCursor):
1733-
concurrent_stream.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above

0 commit comments

Comments
 (0)