diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 072a1efcd..9ab3b18d8 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2111,6 +2111,24 @@ definitions: type: type: string enum: [XmlDecoder] + CsvDecoder: + title: CSV Decoder + description: Decoder that parses CSV content from the response. + type: object + required: + - type + properties: + type: + type: string + enum: [CsvDecoder] + delimiter: + type: string + default: "," + description: The delimiter character to use when parsing CSV content. + encoding: + type: string + default: "utf-8" + description: The encoding to use when reading the CSV content. CustomDecoder: title: Custom Decoder description: Use this to implement custom decoder logic. diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 45eaf5599..168b30c4c 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -8,6 +8,7 @@ JsonParser, Parser, ) +from airbyte_cdk.sources.declarative.decoders.csv_decoder import CsvDecoder from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( GzipJsonDecoder, @@ -33,5 +34,6 @@ "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", + "CsvDecoder", "ZipfileDecoder", ] diff --git a/airbyte_cdk/sources/declarative/decoders/csv_decoder.py b/airbyte_cdk/sources/declarative/decoders/csv_decoder.py new file mode 100644 index 000000000..530173542 --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/csv_decoder.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import io +import logging +from dataclasses import InitVar, dataclass +from typing import Any, Generator, Mapping, MutableMapping + +import pandas as pd +import requests + +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder + +logger = logging.getLogger("airbyte") + + +@dataclass +class CsvDecoder(Decoder): + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.delimiter = parameters.get("delimiter", ",") + self.encoding = parameters.get("encoding", "utf-8") + self.chunk_size = 100 + + def is_stream_response(self) -> bool: + return True + + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: + try: + if not response.text.strip(): + yield {} + return + + # First validate CSV structure + lines = response.text.strip().split("\n") + if not lines: + yield {} + return + + # Check if all rows have the same number of columns + first_row_cols = len(lines[0].split(self.delimiter)) + if any(len(line.split(self.delimiter)) != first_row_cols for line in lines[1:]): + yield {} + return + + csv_data = io.StringIO(response.text) + try: + chunks = pd.read_csv( + csv_data, + chunksize=self.chunk_size, + iterator=True, + dtype=object, + delimiter=self.delimiter, + encoding=self.encoding, + ) + except (pd.errors.EmptyDataError, pd.errors.ParserError): + yield {} + return + for chunk in chunks: + for record in chunk.replace({pd.NA: None}).to_dict(orient="records"): + yield record + except Exception as exc: + logger.warning( + f"Response cannot be parsed as CSV: {response.status_code=}, {response.text=}, {exc=}" + ) + yield {} diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fe29cee2c..70dac4a40 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -873,6 +873,16 @@ class XmlDecoder(BaseModel): type: Literal["XmlDecoder"] +class CsvDecoder(BaseModel): + type: Literal["CsvDecoder"] + delimiter: Optional[str] = Field( + ",", description="The delimiter character to use when parsing CSV content." + ) + encoding: Optional[str] = Field( + "utf-8", description="The encoding to use when reading the CSV content." + ) + + class CustomDecoder(BaseModel): class Config: extra = Extra.allow diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a664b8530..a55d67f8b 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -59,6 +59,7 @@ from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( + CsvDecoder, Decoder, GzipJsonDecoder, IterableDecoder, @@ -154,6 +155,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CsvDecoder as CsvDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CsvParser as CsvParserModel, ) @@ -2081,6 +2085,14 @@ def create_iterable_decoder( def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) + @staticmethod + def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> CsvDecoder: + parameters = { + "delimiter": model.delimiter, + "encoding": model.encoding, + } + return CsvDecoder(parameters=parameters) + @staticmethod def create_gzipjson_decoder( model: GzipJsonDecoderModel, config: Config, **kwargs: Any @@ -2909,7 +2921,7 @@ def create_config_components_resolver( ) def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool: - if isinstance(decoder, (JsonDecoder, XmlDecoder)): + if isinstance(decoder, (JsonDecoder, XmlDecoder, CsvDecoder)): return True elif isinstance(decoder, CompositeRawDecoder): return self._is_supported_parser_for_pagination(decoder.parser) diff --git a/unit_tests/sources/declarative/decoders/test_csv_decoder.py b/unit_tests/sources/declarative/decoders/test_csv_decoder.py new file mode 100644 index 000000000..b3c0b90b7 --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_csv_decoder.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest +import requests + +from airbyte_cdk.sources.declarative.decoders import CsvDecoder + + +@pytest.mark.parametrize( + "response_body, expected, delimiter", + [ + ("name,age\nJohn,30", [{"name": "John", "age": "30"}], ","), + ("name;age\nJohn;30", [{"name": "John", "age": "30"}], ";"), + ("", [{}], ","), # Empty response + ("invalid,csv,data\nno,columns", [{}], ","), # Malformed CSV + ( + "name,age\nJohn,30\nJane,25", + [{"name": "John", "age": "30"}, {"name": "Jane", "age": "25"}], + ",", + ), # Multiple rows + ], +) +def test_csv_decoder(requests_mock, response_body, expected, delimiter): + requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body) + response = requests.get("https://airbyte.io/") + decoder = CsvDecoder(parameters={"delimiter": delimiter}) + assert list(decoder.decode(response)) == expected + + +def test_is_stream_response(): + decoder = CsvDecoder(parameters={}) + assert decoder.is_stream_response() is True + + +def test_custom_encoding(): + decoder = CsvDecoder(parameters={"encoding": "latin1"}) + assert decoder.encoding == "latin1"