Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CsvDecoder to support streaming CSV parsing #321

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,24 @@ definitions:
type:
type: string
enum: [XmlDecoder]
CsvDecoder:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, but you also need to add CsvDecoder as an option to all the spots in the schema that accept XmlDecoder as an option.

title: CSV Decoder
description: Decoder that parses CSV content from the response.
type: object
required:
- type
properties:
type:
type: string
enum: [CsvDecoder]
delimiter:
type: string
default: ","
description: The delimiter character to use when parsing CSV content.
encoding:
type: string
default: "utf-8"
description: The encoding to use when reading the CSV content.
CustomDecoder:
title: Custom Decoder
description: Use this to implement custom decoder logic.
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
JsonParser,
Parser,
)
from airbyte_cdk.sources.declarative.decoders.csv_decoder import CsvDecoder
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
Expand All @@ -33,5 +34,6 @@
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
"CsvDecoder",
"ZipfileDecoder",
]
70 changes: 70 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/csv_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import io
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping

import pandas as pd
import requests

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class CsvDecoder(Decoder):
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.delimiter = parameters.get("delimiter", ",")
self.encoding = parameters.get("encoding", "utf-8")
self.chunk_size = 100

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
if not response.text.strip():
yield {}
return

# First validate CSV structure
lines = response.text.strip().split("\n")
if not lines:
yield {}
return

# Check if all rows have the same number of columns
first_row_cols = len(lines[0].split(self.delimiter))
if any(len(line.split(self.delimiter)) != first_row_cols for line in lines[1:]):
yield {}
return

csv_data = io.StringIO(response.text)
try:
chunks = pd.read_csv(
csv_data,
chunksize=self.chunk_size,
iterator=True,
dtype=object,
delimiter=self.delimiter,
encoding=self.encoding,
)
except (pd.errors.EmptyDataError, pd.errors.ParserError):
yield {}
return
for chunk in chunks:
for record in chunk.replace({pd.NA: None}).to_dict(orient="records"):
yield record
except Exception as exc:
logger.warning(
f"Response cannot be parsed as CSV: {response.status_code=}, {response.text=}, {exc=}"
)
yield {}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,16 @@ class XmlDecoder(BaseModel):
type: Literal["XmlDecoder"]


class CsvDecoder(BaseModel):
type: Literal["CsvDecoder"]
delimiter: Optional[str] = Field(
",", description="The delimiter character to use when parsing CSV content."
)
encoding: Optional[str] = Field(
"utf-8", description="The encoding to use when reading the CSV content."
)


class CustomDecoder(BaseModel):
class Config:
extra = Extra.allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import (
CsvDecoder,
Decoder,
GzipJsonDecoder,
IterableDecoder,
Expand Down Expand Up @@ -154,6 +155,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CsvDecoder as CsvDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CsvParser as CsvParserModel,
)
Expand Down Expand Up @@ -2081,6 +2085,14 @@ def create_iterable_decoder(
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
return XmlDecoder(parameters={})

@staticmethod
def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> CsvDecoder:
parameters = {
"delimiter": model.delimiter,
"encoding": model.encoding,
}
return CsvDecoder(parameters=parameters)

@staticmethod
def create_gzipjson_decoder(
model: GzipJsonDecoderModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -2909,7 +2921,7 @@ def create_config_components_resolver(
)

def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool:
if isinstance(decoder, (JsonDecoder, XmlDecoder)):
if isinstance(decoder, (JsonDecoder, XmlDecoder, CsvDecoder)):
return True
elif isinstance(decoder, CompositeRawDecoder):
return self._is_supported_parser_for_pagination(decoder.parser)
Expand Down
39 changes: 39 additions & 0 deletions unit_tests/sources/declarative/decoders/test_csv_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import pytest
import requests

from airbyte_cdk.sources.declarative.decoders import CsvDecoder


@pytest.mark.parametrize(
"response_body, expected, delimiter",
[
("name,age\nJohn,30", [{"name": "John", "age": "30"}], ","),
("name;age\nJohn;30", [{"name": "John", "age": "30"}], ";"),
("", [{}], ","), # Empty response
("invalid,csv,data\nno,columns", [{}], ","), # Malformed CSV
(
"name,age\nJohn,30\nJane,25",
[{"name": "John", "age": "30"}, {"name": "Jane", "age": "25"}],
",",
), # Multiple rows
],
)
def test_csv_decoder(requests_mock, response_body, expected, delimiter):
requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body)
response = requests.get("https://airbyte.io/")
decoder = CsvDecoder(parameters={"delimiter": delimiter})
assert list(decoder.decode(response)) == expected


def test_is_stream_response():
decoder = CsvDecoder(parameters={})
assert decoder.is_stream_response() is True


def test_custom_encoding():
decoder = CsvDecoder(parameters={"encoding": "latin1"})
assert decoder.encoding == "latin1"
Loading