diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 2c8931383..9da1b8148 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -30,35 +30,35 @@ def compress_with_gzip(data: str, encoding: str = "utf-8"): return buf.getvalue() -def generate_csv(encoding: str) -> bytes: - """ - Generate CSV data with tab-separated values (\t). - """ +def generate_csv( + encoding: str = "utf-8", delimiter: str = ",", should_compress: bool = False +) -> bytes: data = [ - {"id": 1, "name": "John", "age": 28}, - {"id": 2, "name": "Alice", "age": 34}, - {"id": 3, "name": "Bob", "age": 25}, + {"id": "1", "name": "John", "age": "28"}, + {"id": "2", "name": "Alice", "age": "34"}, + {"id": "3", "name": "Bob", "age": "25"}, ] output = StringIO() - writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter="\t") + writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter=delimiter) writer.writeheader() for row in data: writer.writerow(row) - # Ensure the pointer is at the beginning of the buffer before compressing output.seek(0) + csv_data = output.read() - # Compress the CSV data with Gzip - compressed_data = compress_with_gzip(output.read(), encoding=encoding) - - return compressed_data + if should_compress: + return compress_with_gzip(csv_data, encoding=encoding) + return csv_data.encode(encoding) @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): requests_mock.register_uri( - "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding) + "GET", + "https://airbyte.io/", + content=generate_csv(encoding=encoding, delimiter="\t", should_compress=True), ) response = requests.get("https://airbyte.io/", stream=True) @@ -175,3 +175,26 @@ def test_composite_raw_decoder_raises_traced_exception_when_both_parsers_fail(re with patch("json.loads", side_effect=Exception("test")): with pytest.raises(AirbyteTracedException): list(composite_raw_decoder.decode(response)) + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +@pytest.mark.parametrize("delimiter", [",", "\t", ";"]) +def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, delimiter: str): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_csv(encoding=encoding, delimiter=delimiter, should_compress=False), + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = CsvParser(encoding=encoding, delimiter=delimiter) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + + expected_data = [ + {"id": "1", "name": "John", "age": "28"}, + {"id": "2", "name": "Alice", "age": "34"}, + {"id": "3", "name": "Bob", "age": "25"}, + ] + + parsed_records = list(composite_raw_decoder.decode(response)) + assert parsed_records == expected_data