diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 2cb618175..b8e8e3315 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -126,7 +126,8 @@ def parse( """ text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") - yield from reader + for row in reader: + yield row @dataclass diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d9cc3e3ef..0ad66e1b0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2091,10 +2091,10 @@ def create_dynamic_schema_loader( def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder: return JsonDecoder(parameters={}) - @staticmethod - def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod @@ -2103,10 +2103,12 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) - @staticmethod - def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_gzip_decoder( + self, model: GzipDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod