Skip to content

Commit 2bd8dcf

Browse files
call state migration in ConcurrentDeclarativeSource
1 parent ca68c5c commit 2bd8dcf

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ def _group_streams(
224224
stream_state = self._connector_state_manager.get_stream_state(
225225
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
226226
)
227+
for state_migration in declarative_stream.state_migrations:
228+
if state_migration.should_migrate(stream_state):
229+
stream_state = state_migration.migrate(stream_state)
227230

228231
retriever = self._get_retriever(declarative_stream, stream_state)
229232

@@ -331,6 +334,10 @@ def _group_streams(
331334
stream_state = self._connector_state_manager.get_stream_state(
332335
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
333336
)
337+
for state_migration in declarative_stream.state_migrations:
338+
if state_migration.should_migrate(stream_state):
339+
stream_state = state_migration.migrate(stream_state)
340+
334341
partition_router = declarative_stream.retriever.stream_slicer._partition_router
335342

336343
perpartition_cursor = (

0 commit comments

Comments
 (0)