From bb559b6c5d4cd0f2795c602a456f14aa6090ab61 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 14 Mar 2025 09:33:17 -0400 Subject: [PATCH 1/2] Ensure stream is declarative --- .../sources/declarative/concurrent_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 99cb37cd5..80de3f70b 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -206,7 +206,7 @@ def _group_streams( # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - if name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream": + if isinstance(declarative_stream, DeclarativeStream) and name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream": stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) From 865301468c545f3c6ba6183c43b68890ecd9b818 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 14 Mar 2025 13:37:37 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../sources/declarative/concurrent_declarative_source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 80de3f70b..e212b0f2a 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -206,7 +206,11 @@ def _group_streams( # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - if isinstance(declarative_stream, DeclarativeStream) and name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream": + if ( + isinstance(declarative_stream, DeclarativeStream) + and name_to_stream_mapping[declarative_stream.name]["type"] + == "StateDelegatingStream" + ): stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace )