From 5b7724dc47fb6634d39365fd4c4338395fa44410 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 17 Dec 2024 23:07:11 +0100 Subject: [PATCH 01/26] Composite Decoder: add Composite Decoder Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/composite_decoder.py | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/decoders/composite_decoder.py diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py new file mode 100644 index 000000000..8e951467b --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -0,0 +1,129 @@ + + +import gzip +import json +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Generator, Iterable, MutableMapping, Optional, Protocol + +import pandas as pd +import requests +from numpy import nan + +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder + + +class BufferedDataInput(Protocol): + """ + at least read to _n bytes method should be supported to align with BufferedReader and avoid OOM. + """ + def read(self, __n: int) -> bytes: ... + + +@dataclass +class Parser(ABC): + inner_parser: Optional["Parser"] = None + + @abstractmethod + def parse( + self, data: BufferedDataInput | Iterable, *args, **kwargs + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Parse data and yield dictionaries. + """ + pass + + +@dataclass +class GzipParser(Parser): + def parse( + self, data: BufferedDataInput | Iterable, *args, **kwargs + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Decompress gzipped bytes and pass decompressed data to the inner parser. + """ + gzipobj = gzip.GzipFile(fileobj=data) + if self.inner_parser: + yield from self.inner_parser.parse(gzipobj) + else: + yield from gzipobj + + +@dataclass +class JsonLineParser(Parser): + encoding: Optional[str] = "utf-8" + + def parse( + self, data: BufferedDataInput | Iterable, *args, **kwargs + ) -> Generator[MutableMapping[str, Any], None, None]: + for line in data: + try: + yield json.loads(line.decode(self.encoding)) + except json.JSONDecodeError: + # Handle invalid JSON lines gracefully (e.g., log and skip) + pass + + +@dataclass +class CsvParser(Parser): + # TODO: add more parameters: see read_csv for more details, e.g.: quotechar, headers, + encoding: Optional[str] = "utf-8" + delimiter: str = "," + + def parse( + self, data: BufferedDataInput | Iterable, *args, **kwargs + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Parse CSV data from decompressed bytes. + """ + reader = pd.read_csv(data, sep=self.delimiter, iterator=True, dtype=object) + for chunk in reader: + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for row in chunk: + yield row + + +@dataclass +class CompositeRawDecoder(Decoder): + """ + Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] + passed response.raw to parser(s). + Note: response.raw is not decoded/decompressed by default. + parsers should be instantiated recursively. + Example: + composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + """ + + parser: Parser + + def is_stream_response(self) -> bool: + return True + + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: + yield from self.parser.parse(data=response.raw) + + + +# Examples how to use +if __name__ == "__main__": + # SIMPLE JSONLINES + # composite_decoder = CompositeDecoder(parser=JsonLineParser()) + # response = requests.get('http://127.0.0.1:5000/jsonlines', stream=True) + # for rec in composite_decoder.decode(response): + # print(rec) + + # Gzipped JSONLINES + # parser = GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")) + # composite_decoder = CompositeDecoder(parser=parser) + # response = requests.get('http://127.0.0.1:5000/jsonlines', stream=True) + # for rec in composite_decoder.decode(response): + # print(rec) + + # Gzipped TSV + parser = GzipParser(inner_parser=CsvParser(encoding="iso-8859-1", delimiter="\t")) + composite_decoder = CompositeRawDecoder(parser=parser) + response = requests.get("http://127.0.0.1:5000/csv", stream=True) + for rec in composite_decoder.decode(response): + print(rec) From 990584e2cc721082f50fbeea6e5c3dd99aed4303 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 18 Dec 2024 11:20:37 +0100 Subject: [PATCH 02/26] Composite Decoder: add Models Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 35 +++++++++++++++++++ .../declarative/decoders/composite_decoder.py | 4 +-- .../models/declarative_component_schema.py | 18 ++++++++++ 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 461cfa764..1b6622b23 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2719,9 +2719,44 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" + - "$ref": "#/definitions/CompositeRawDecoder" $parameters: type: object additionalProperties: true + CompositeRawDecoder: + description: "(This is experimental, use at your own risk)" + type: object + required: + - parser + properties: + parser: + anyOf: + - "$ref": "#/definitions/GzipParser" + - "$ref": "#/definitions/JsonLineParser" + - "$ref": "#/definitions/CsvParser" +# PARSERS + GzipParser: + type: object + properties: + inner_parser: + anyOf: + - "$ref": "#/definitions/JsonLineParser" + - "$ref": "#/definitions/CsvParser" + JsonLineParser: + type: object + properties: + encoding: + type: string + default: utf-8 + CsvParser: + type: object + properties: + encoding: + type: string + default: utf-8 + delimiter: + type: string + default: "," AsyncJobStatusMap: description: Matches the api job status to Async Job Status. type: object diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index 8e951467b..aa3fd55aa 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -1,5 +1,3 @@ - - import gzip import json from abc import ABC, abstractmethod @@ -17,6 +15,7 @@ class BufferedDataInput(Protocol): """ at least read to _n bytes method should be supported to align with BufferedReader and avoid OOM. """ + def read(self, __n: int) -> bytes: ... @@ -105,7 +104,6 @@ def decode( yield from self.parser.parse(data=response.raw) - # Examples how to use if __name__ == "__main__": # SIMPLE JSONLINES diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2b4bba030..043c6006b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1109,6 +1109,15 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class JsonLineParser(BaseModel): + encoding: Optional[str] = "utf-8" + + +class CsvParser(BaseModel): + encoding: Optional[str] = "utf-8" + delimiter: Optional[str] = "," + + class AsyncJobStatusMap(BaseModel): type: Optional[Literal["AsyncJobStatusMap"]] = None running: List[str] @@ -1488,6 +1497,10 @@ class RecordSelector(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class GzipParser(BaseModel): + inner_parser: Optional[Union[JsonLineParser, CsvParser]] = None + + class Spec(BaseModel): type: Literal["Spec"] connection_specification: Dict[str, Any] = Field( @@ -1518,6 +1531,10 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class CompositeRawDecoder(BaseModel): + parser: Union[GzipParser, JsonLineParser, CsvParser] + + class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid @@ -1895,6 +1912,7 @@ class SimpleRetriever(BaseModel): IterableDecoder, XmlDecoder, GzipJsonDecoder, + CompositeRawDecoder, ] ] = Field( None, From 50782f7d5c6178686db8a7d59d60a6176502f8dd Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 11:29:09 +0100 Subject: [PATCH 03/26] Composite Decoder: ref to use BufferedIOBase Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/composite_decoder.py | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index aa3fd55aa..ddea5aaa3 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -1,8 +1,10 @@ import gzip import json +import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Generator, Iterable, MutableMapping, Optional, Protocol +from io import BufferedIOBase +from typing import Any, Generator, MutableMapping, Optional import pandas as pd import requests @@ -10,13 +12,7 @@ from airbyte_cdk.sources.declarative.decoders.decoder import Decoder - -class BufferedDataInput(Protocol): - """ - at least read to _n bytes method should be supported to align with BufferedReader and avoid OOM. - """ - - def read(self, __n: int) -> bytes: ... +logger = logging.getLogger("airbyte") @dataclass @@ -25,7 +21,7 @@ class Parser(ABC): @abstractmethod def parse( - self, data: BufferedDataInput | Iterable, *args, **kwargs + self, data: BufferedIOBase, *args, **kwargs ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse data and yield dictionaries. @@ -36,12 +32,12 @@ def parse( @dataclass class GzipParser(Parser): def parse( - self, data: BufferedDataInput | Iterable, *args, **kwargs + self, data: BufferedIOBase, *args, **kwargs ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. """ - gzipobj = gzip.GzipFile(fileobj=data) + gzipobj = gzip.GzipFile(fileobj=data, mode="rb") if self.inner_parser: yield from self.inner_parser.parse(gzipobj) else: @@ -53,12 +49,13 @@ class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" def parse( - self, data: BufferedDataInput | Iterable, *args, **kwargs + self, data: BufferedIOBase, *args, **kwargs ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: yield json.loads(line.decode(self.encoding)) except json.JSONDecodeError: + logger.warning(f"Cannot decode/parse line {line} as JSON") # Handle invalid JSON lines gracefully (e.g., log and skip) pass @@ -67,10 +64,10 @@ def parse( class CsvParser(Parser): # TODO: add more parameters: see read_csv for more details, e.g.: quotechar, headers, encoding: Optional[str] = "utf-8" - delimiter: str = "," + delimiter: Optional[str] = "," def parse( - self, data: BufferedDataInput | Iterable, *args, **kwargs + self, data: BufferedIOBase, *args, **kwargs ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse CSV data from decompressed bytes. From 17add9e349b65a2093d72d500fda38a99f95c604 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 12:58:06 +0100 Subject: [PATCH 04/26] Composite Decoder: remove inner_parser from parser definition Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_decoder.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index ddea5aaa3..51af3c3e1 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -17,8 +17,6 @@ @dataclass class Parser(ABC): - inner_parser: Optional["Parser"] = None - @abstractmethod def parse( self, data: BufferedIOBase, *args, **kwargs @@ -31,6 +29,8 @@ def parse( @dataclass class GzipParser(Parser): + inner_parser: Parser + def parse( self, data: BufferedIOBase, *args, **kwargs ) -> Generator[MutableMapping[str, Any], None, None]: @@ -38,10 +38,7 @@ def parse( Decompress gzipped bytes and pass decompressed data to the inner parser. """ gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - if self.inner_parser: - yield from self.inner_parser.parse(gzipobj) - else: - yield from gzipobj + yield from self.inner_parser.parse(gzipobj) @dataclass From 655ce355b4e2fd00323a986faa3fc2e8c5e6743d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 12:58:13 +0100 Subject: [PATCH 05/26] Composite Decoder: ref models Signed-off-by: Artem Inzhyyants --- .../declarative/declarative_component_schema.yaml | 15 +++++++++++++++ .../models/declarative_component_schema.py | 5 ++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 1b6622b23..00d226864 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2727,8 +2727,12 @@ definitions: description: "(This is experimental, use at your own risk)" type: object required: + - type - parser properties: + type: + type: string + enum: [ CompositeRawDecoder ] parser: anyOf: - "$ref": "#/definitions/GzipParser" @@ -2737,7 +2741,13 @@ definitions: # PARSERS GzipParser: type: object + required: + - type + - inner_parser properties: + type: + type: string + enum: [ GzipParser ] inner_parser: anyOf: - "$ref": "#/definitions/JsonLineParser" @@ -2750,7 +2760,12 @@ definitions: default: utf-8 CsvParser: type: object + required: + - type properties: + type: + type: string + enum: [ CsvParser ] encoding: type: string default: utf-8 diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 043c6006b..b33d7ceb6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1114,6 +1114,7 @@ class JsonLineParser(BaseModel): class CsvParser(BaseModel): + type: Literal["CsvParser"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," @@ -1498,7 +1499,8 @@ class RecordSelector(BaseModel): class GzipParser(BaseModel): - inner_parser: Optional[Union[JsonLineParser, CsvParser]] = None + type: Literal["GzipParser"] + inner_parser: Union[JsonLineParser, CsvParser] class Spec(BaseModel): @@ -1532,6 +1534,7 @@ class CompositeErrorHandler(BaseModel): class CompositeRawDecoder(BaseModel): + type: Literal["CompositeRawDecoder"] parser: Union[GzipParser, JsonLineParser, CsvParser] From 513aa437b318d9ed9bf5a074e98fd503e8945234 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 13:14:27 +0100 Subject: [PATCH 06/26] Composite Decoder: clean Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/composite_decoder.py | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index 51af3c3e1..f990ca0ba 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -96,26 +96,3 @@ def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: yield from self.parser.parse(data=response.raw) - - -# Examples how to use -if __name__ == "__main__": - # SIMPLE JSONLINES - # composite_decoder = CompositeDecoder(parser=JsonLineParser()) - # response = requests.get('http://127.0.0.1:5000/jsonlines', stream=True) - # for rec in composite_decoder.decode(response): - # print(rec) - - # Gzipped JSONLINES - # parser = GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")) - # composite_decoder = CompositeDecoder(parser=parser) - # response = requests.get('http://127.0.0.1:5000/jsonlines', stream=True) - # for rec in composite_decoder.decode(response): - # print(rec) - - # Gzipped TSV - parser = GzipParser(inner_parser=CsvParser(encoding="iso-8859-1", delimiter="\t")) - composite_decoder = CompositeRawDecoder(parser=parser) - response = requests.get("http://127.0.0.1:5000/csv", stream=True) - for rec in composite_decoder.decode(response): - print(rec) From eada5bf24042b4f8fba206d40b6fdd0b1792d56d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 15:16:26 +0100 Subject: [PATCH 07/26] Composite Decoder: ref todo Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/decoders/composite_decoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index f990ca0ba..9eea736b6 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -59,7 +59,7 @@ def parse( @dataclass class CsvParser(Parser): - # TODO: add more parameters: see read_csv for more details, e.g.: quotechar, headers, + # TODO: migrate implementation to re-use file-base classes encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," From 1984ef17e3fbdd62888e8642cf41510dcfeda8a0 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 15:17:35 +0100 Subject: [PATCH 08/26] Composite Decoder: remove args & kwargs Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/composite_decoder.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index 9eea736b6..6816875ae 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -19,7 +19,8 @@ class Parser(ABC): @abstractmethod def parse( - self, data: BufferedIOBase, *args, **kwargs + self, + data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse data and yield dictionaries. @@ -32,7 +33,8 @@ class GzipParser(Parser): inner_parser: Parser def parse( - self, data: BufferedIOBase, *args, **kwargs + self, + data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. @@ -46,7 +48,8 @@ class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" def parse( - self, data: BufferedIOBase, *args, **kwargs + self, + data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: @@ -64,7 +67,8 @@ class CsvParser(Parser): delimiter: Optional[str] = "," def parse( - self, data: BufferedIOBase, *args, **kwargs + self, + data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse CSV data from decompressed bytes. From 8d3f82a6744b39235456e66fdeeeb7023caa1b2d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 19 Dec 2024 16:28:50 +0100 Subject: [PATCH 09/26] Composite Decoder: fmt mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_decoder.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index 6816875ae..b4e1e314a 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -53,9 +53,9 @@ def parse( ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: - yield json.loads(line.decode(self.encoding)) + yield json.loads(line.decode(encoding=self.encoding)) except json.JSONDecodeError: - logger.warning(f"Cannot decode/parse line {line} as JSON") + logger.warning(f"Cannot decode/parse line {line!r} as JSON") # Handle invalid JSON lines gracefully (e.g., log and skip) pass @@ -73,7 +73,9 @@ def parse( """ Parse CSV data from decompressed bytes. """ - reader = pd.read_csv(data, sep=self.delimiter, iterator=True, dtype=object) + reader = pd.read_csv( + data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding + ) for chunk in reader: chunk = chunk.replace({nan: None}).to_dict(orient="records") for row in chunk: From f2c9b0a5fe805ac1407ea30afc3628feffba44f5 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 13:55:02 +0100 Subject: [PATCH 10/26] Composite Decoder: add to model factory Signed-off-by: Artem Inzhyyants --- .../parsers/model_to_component_factory.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 215d6fff9..46ae171cc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,6 +26,12 @@ from isodate import parse_duration from pydantic.v1 import BaseModel +from sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) from airbyte_cdk.models import FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -125,6 +131,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CompositeRawDecoder as CompositeRawDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) @@ -134,6 +143,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CsvParser as CsvParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CursorPagination as CursorPaginationModel, ) @@ -200,6 +212,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GzipParser as GzipParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) @@ -224,6 +239,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonLineParser as JsonLineParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -470,7 +488,9 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonLineParserModel: self.create_jsonline_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, + GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -1666,6 +1686,12 @@ def create_jsonl_decoder( ) -> JsonlDecoder: return JsonlDecoder(parameters={}) + @staticmethod + def create_jsonline_parser( + model: JsonLineParserModel, config: Config, **kwargs: Any + ) -> JsonLineParser: + return JsonLineParser(encoding=model.encoding) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any @@ -1682,6 +1708,20 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) + @staticmethod + def create_gzip_parser(model: GzipParserModel, config: Config, **kwargs: Any) -> GzipParser: + return GzipParser(inner_parser=model.inner_parser) + + @staticmethod + def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: + return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + + @staticmethod + def create_composite_raw_decoder( + model: CompositeRawDecoderModel, config: Config, **kwargs: Any + ) -> CompositeRawDecoder: + return CompositeRawDecoder(parser=model.parser) + @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any From 961356c1cd50b5b87168e7fe7d2fafce383d3c3d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 15:49:03 +0100 Subject: [PATCH 11/26] Composite Decoder: add to model factory Signed-off-by: Artem Inzhyyants --- .../parsers/model_to_component_factory.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 46ae171cc..1f98e1468 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,12 +26,6 @@ from isodate import parse_duration from pydantic.v1 import BaseModel -from sources.declarative.decoders.composite_decoder import ( - CompositeRawDecoder, - CsvParser, - GzipParser, - JsonLineParser, -) from airbyte_cdk.models import FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -73,6 +67,12 @@ PaginationDecoderDecorator, XmlDecoder, ) +from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, From 5fa937c213b9c78ec625ad966ca2599036a07599 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 19:32:08 +0100 Subject: [PATCH 12/26] Composite Raw Decoder: add unittest for parsers Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/__init__.py | 3 +- .../decoders/test_composite_decoder.py | 115 ++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 unit_tests/sources/declarative/decoders/test_composite_decoder.py diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 7452fe998..e83426e85 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -7,5 +7,6 @@ from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder +from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder -__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] +__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "CompositeRawDecoder"] diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py new file mode 100644 index 000000000..325862e14 --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import csv +import gzip +import json +from io import BytesIO, StringIO + +import pytest +import requests +from sources.declarative.decoders.composite_decoder import CsvParser, GzipParser, JsonLineParser + +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder + + +def compress_with_gzip(data: str, encoding: str = "utf-8"): + """ + Compress the data using Gzip. + """ + buf = BytesIO() + with gzip.GzipFile(fileobj=buf, mode="wb") as f: + f.write(data.encode(encoding)) + return buf.getvalue() + + +def generate_csv(encoding: str) -> bytes: + """ + Generate CSV data with tab-separated values (\t). + """ + data = [ + {"id": 1, "name": "John", "age": 28}, + {"id": 2, "name": "Alice", "age": 34}, + {"id": 3, "name": "Bob", "age": 25}, + ] + + output = StringIO() + writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter="\t") + writer.writeheader() + for row in data: + writer.writerow(row) + + # Ensure the pointer is at the beginning of the buffer before compressing + output.seek(0) + + # Compress the CSV data with Gzip + compressed_data = compress_with_gzip(output.read(), encoding=encoding) + + return compressed_data + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\t")) + composite_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def generate_jsonlines(): + """ + Generator function to yield data in JSON Lines format. + This is useful for streaming large datasets. + """ + data = [ + {"id": 1, "message": "Hello, World!"}, + {"id": 2, "message": "Welcome to JSON Lines"}, + {"id": 3, "message": "Streaming data is fun!"}, + ] + for item in data: + yield json.dumps(item) + "\n" # Serialize as JSON Lines + + +def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: + """ + Generator to compress the entire response content with Gzip and encode it. + """ + json_lines_content = "".join(generate_jsonlines()) + compressed_data = compress_with_gzip(json_lines_content, encoding=encoding) + return compressed_data + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) + composite_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): + response_content = "".join(generate_jsonlines()) + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=response_content.encode(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + composite_decoder = CompositeRawDecoder(parser=JsonLineParser(encoding=encoding)) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 From 1b85c26918c99e5dd2ee1ff20c49fec07599db13 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 19:59:36 +0100 Subject: [PATCH 13/26] Composite Raw Decoder: fix CompositeRawDecoder creation Signed-off-by: Artem Inzhyyants --- .../declarative/parsers/model_to_component_factory.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1f98e1468..7e80045e4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -458,6 +458,7 @@ def _init_mappings(self) -> None: BearerAuthenticatorModel: self.create_bearer_authenticator, CheckStreamModel: self.create_check_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, + CompositeRawDecoderModel: self.create_composite_raw_decoder, ConcurrencyLevelModel: self.create_concurrency_level, ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, CursorPaginationModel: self.create_cursor_pagination, @@ -1716,11 +1717,11 @@ def create_gzip_parser(model: GzipParserModel, config: Config, **kwargs: Any) -> def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: return CsvParser(encoding=model.encoding, delimiter=model.delimiter) - @staticmethod def create_composite_raw_decoder( - model: CompositeRawDecoderModel, config: Config, **kwargs: Any + self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any ) -> CompositeRawDecoder: - return CompositeRawDecoder(parser=model.parser) + parser = self._create_component_from_model(model=model.parser, config=config) + return CompositeRawDecoder(parser=parser) @staticmethod def create_json_file_schema_loader( From a181608eb4b45db40790ee0e6eaf5aeb6dac3cbb Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 20:01:01 +0100 Subject: [PATCH 14/26] Composite Raw Decoder: ref: CompositeRawDecoder & jsonlinedecoder Signed-off-by: Artem Inzhyyants --- .../decoders/test_decoders_memory_usage.py | 106 ++++++++++++++++++ .../declarative/decoders/test_json_decoder.py | 68 +---------- 2 files changed, 107 insertions(+), 67 deletions(-) create mode 100644 unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py new file mode 100644 index 000000000..e0cbe4c8c --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import gzip +import json +import os + +import pytest +import requests + +from airbyte_cdk import YamlDeclarativeSource +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder +from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory, +) + + +@pytest.fixture(name="large_events_response") +def large_event_response_fixture(): + data = {"email": "email1@example.com"} + jsonl_string = f"{json.dumps(data)}\n" + lines_in_response = 2_000_000 # ≈ 58 MB of response + dir_path = os.path.dirname(os.path.realpath(__file__)) + file_path = f"{dir_path}/test_response.txt" + with open(file_path, "w") as file: + for _ in range(lines_in_response): + file.write(jsonl_string) + yield (lines_in_response, file_path) + os.remove(file_path) + + +@pytest.mark.slow +@pytest.mark.limit_memory("20 MB") +@pytest.mark.parametrize( + "decoder_yaml_definition", + [ + "type: JsonlDecoder", + """type: CompositeRawDecoder + parser: + type: JsonLineParser + """, + ], +) +def test_jsonl_decoder_memory_usage( + requests_mock, large_events_response, decoder_yaml_definition: str +): + # + lines_in_response, file_path = large_events_response + content = f""" + name: users + type: DeclarativeStream + retriever: + type: SimpleRetriever + decoder: + {decoder_yaml_definition} + paginator: + type: "NoPagination" + requester: + path: "users/{{{{ stream_slice.slice }}}}" + type: HttpRequester + url_base: "https://for-all-mankind.nasa.com/api/v1" + http_method: GET + authenticator: + type: NoAuth + request_headers: {{}} + request_body_json: {{}} + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + partition_router: + type: ListPartitionRouter + cursor_field: "slice" + values: + - users1 + - users2 + - users3 + - users4 + primary_key: [] + """ + + factory = ModelToComponentFactory() + stream_manifest = YamlDeclarativeSource._parse(content) + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} + ) + + def get_body(): + return open(file_path, "rb", buffering=30) + + counter = 0 + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) + + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + for stream_slice in stream_slices: + for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): + counter += 1 + + assert counter == lines_in_response * len(stream_slices) diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index bb2dd0c9b..f61a0572e 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -8,14 +8,9 @@ import pytest import requests -from airbyte_cdk import YamlDeclarativeSource -from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder -from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel -from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( - ModelToComponentFactory, -) + @pytest.mark.parametrize( @@ -64,67 +59,6 @@ def large_event_response_fixture(): os.remove(file_path) -@pytest.mark.slow -@pytest.mark.limit_memory("20 MB") -def test_jsonl_decoder_memory_usage(requests_mock, large_events_response): - lines_in_response, file_path = large_events_response - content = """ - name: users - type: DeclarativeStream - retriever: - type: SimpleRetriever - decoder: - type: JsonlDecoder - paginator: - type: "NoPagination" - requester: - path: "users/{{ stream_slice.slice }}" - type: HttpRequester - url_base: "https://for-all-mankind.nasa.com/api/v1" - http_method: GET - authenticator: - type: NoAuth - request_headers: {} - request_body_json: {} - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - partition_router: - type: ListPartitionRouter - cursor_field: "slice" - values: - - users1 - - users2 - - users3 - - users4 - primary_key: [] - """ - - factory = ModelToComponentFactory() - stream_manifest = YamlDeclarativeSource._parse(content) - stream = factory.create_component( - model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} - ) - - def get_body(): - return open(file_path, "rb", buffering=30) - - counter = 0 - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) - - stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) - for stream_slice in stream_slices: - for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): - counter += 1 - - assert counter == lines_in_response * len(stream_slices) - - @pytest.mark.parametrize( "encoding", [ From 5276ed11d090030201c139c637e922c60ac96f7b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 21:07:28 +0100 Subject: [PATCH 15/26] Composite Raw Decoder: fmt Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/test_composite_decoder.py | 8 ++++++-- .../sources/declarative/decoders/test_json_decoder.py | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 325862e14..ce5f58b65 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -8,9 +8,13 @@ import pytest import requests -from sources.declarative.decoders.composite_decoder import CsvParser, GzipParser, JsonLineParser -from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) def compress_with_gzip(data: str, encoding: str = "utf-8"): diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index f61a0572e..087619dc9 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -12,7 +12,6 @@ from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder - @pytest.mark.parametrize( "response_body, first_element", [ From adf9d3d484973016426b6d409dfdfde1a25a2550 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 24 Dec 2024 00:22:56 +0100 Subject: [PATCH 16/26] Composite Raw Decoder: fix mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_decoder.py | 2 +- .../declarative/parsers/model_to_component_factory.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index b4e1e314a..e3d13d004 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -101,4 +101,4 @@ def is_stream_response(self) -> bool: def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - yield from self.parser.parse(data=response.raw) + yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7e80045e4..99801c6dd 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1709,9 +1709,11 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) - @staticmethod - def create_gzip_parser(model: GzipParserModel, config: Config, **kwargs: Any) -> GzipParser: - return GzipParser(inner_parser=model.inner_parser) + def create_gzip_parser( + self, model: GzipParserModel, config: Config, **kwargs: Any + ) -> GzipParser: + inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) + return GzipParser(inner_parser=inner_parser) @staticmethod def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: From 7604b994d904da9ee1838ed56f183ae0b79a2336 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 24 Dec 2024 11:39:53 +0100 Subject: [PATCH 17/26] Composite Raw Decoder: fix mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_decoder.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py index e3d13d004..ba4ff5f6d 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -53,11 +53,9 @@ def parse( ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: - yield json.loads(line.decode(encoding=self.encoding)) + yield json.loads(line.decode(encoding=self.encoding or "utf-8")) except json.JSONDecodeError: logger.warning(f"Cannot decode/parse line {line!r} as JSON") - # Handle invalid JSON lines gracefully (e.g., log and skip) - pass @dataclass @@ -73,7 +71,7 @@ def parse( """ Parse CSV data from decompressed bytes. """ - reader = pd.read_csv( + reader = pd.read_csv( # type: ignore data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding ) for chunk in reader: From f9a97dbb868763523f0582dde85bd76beec7e10f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 24 Dec 2024 13:37:12 +0100 Subject: [PATCH 18/26] CDK: fix conflicts Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/decoders/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index f634372e8..9fbcb1638 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( GzipJsonDecoder, @@ -14,11 +15,10 @@ PaginationDecoderDecorator, ) from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder -from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder __all__ = [ "Decoder", -"CompositeRawDecoder", + "CompositeRawDecoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", From 7fd73adba1bae0ceae3e390117024a98d4e793e0 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 11:12:17 +0100 Subject: [PATCH 19/26] CDK: add type for JsonLineParser Signed-off-by: Artem Inzhyyants --- .../sources/declarative/declarative_component_schema.yaml | 5 +++++ .../declarative/models/declarative_component_schema.py | 3 +++ 2 files changed, 8 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 74726acaf..4ae1e37a0 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2801,7 +2801,12 @@ definitions: - "$ref": "#/definitions/CsvParser" JsonLineParser: type: object + required: + - type properties: + type: + type: string + enum: [ JsonLineParser ] encoding: type: string default: utf-8 diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 4c0d9f2cc..5823b34c1 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1126,6 +1126,7 @@ class LegacySessionTokenAuthenticator(BaseModel): class JsonLineParser(BaseModel): + type: Literal["JsonLineParser"] encoding: Optional[str] = "utf-8" @@ -1218,6 +1219,8 @@ class ComponentMappingDefinition(BaseModel): "{{ components_values['updates'] }}", "{{ components_values['MetaData']['LastUpdatedTime'] }}", "{{ config['segment_id'] }}", + "{{ stream_slice['parent_id'] }}", + "{{ stream_slice['extra_fields']['name'] }}", ], title="Value", ) From 693a82d0d217116f5c55b80be7c9a37d35484523 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 11:35:07 +0100 Subject: [PATCH 20/26] CDK: rename Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/__init__.py | 2 +- ...mposite_decoder.py => composite_raw_decoder.py} | 2 +- .../parsers/model_to_component_factory.py | 2 +- .../declarative/decoders/test_composite_decoder.py | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) rename airbyte_cdk/sources/declarative/decoders/{composite_decoder.py => composite_raw_decoder.py} (95%) diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 9fbcb1638..bec52137d 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -2,7 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( GzipJsonDecoder, diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py similarity index 95% rename from airbyte_cdk/sources/declarative/decoders/composite_decoder.py rename to airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index ba4ff5f6d..df3a99f3e 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -88,7 +88,7 @@ class CompositeRawDecoder(Decoder): Note: response.raw is not decoded/decompressed by default. parsers should be instantiated recursively. Example: - composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) """ parser: Parser diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9fbfdec9c..ea6fac512 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -67,7 +67,7 @@ PaginationDecoderDecorator, XmlDecoder, ) -from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( CompositeRawDecoder, CsvParser, GzipParser, diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index ce5f58b65..9cd057791 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -9,7 +9,7 @@ import pytest import requests -from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( CompositeRawDecoder, CsvParser, GzipParser, @@ -60,9 +60,9 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\t")) - composite_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser=parser) counter = 0 - for _ in composite_decoder.decode(response): + for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 @@ -98,9 +98,9 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser=parser) counter = 0 - for _ in composite_decoder.decode(response): + for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 @@ -112,8 +112,8 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): "GET", "https://airbyte.io/", content=response_content.encode(encoding=encoding) ) response = requests.get("https://airbyte.io/", stream=True) - composite_decoder = CompositeRawDecoder(parser=JsonLineParser(encoding=encoding)) + composite_raw_decoder = CompositeRawDecoder(parser=JsonLineParser(encoding=encoding)) counter = 0 - for _ in composite_decoder.decode(response): + for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 From ca8f31eee46db45f8362808bcb709dc52e9a1bb9 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 11:37:07 +0100 Subject: [PATCH 21/26] CDK: run prettier Signed-off-by: Artem Inzhyyants --- .github/workflows/pypi_publish.yml | 2 +- .../declarative/declarative_component_schema.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pypi_publish.yml b/.github/workflows/pypi_publish.yml index 4df1018b2..302532051 100644 --- a/.github/workflows/pypi_publish.yml +++ b/.github/workflows/pypi_publish.yml @@ -10,7 +10,7 @@ name: Packaging and Publishing on: push: tags: - - 'v*' + - "v*" workflow_dispatch: inputs: version: diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 4ae1e37a0..89c731075 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2779,13 +2779,13 @@ definitions: properties: type: type: string - enum: [ CompositeRawDecoder ] + enum: [CompositeRawDecoder] parser: anyOf: - "$ref": "#/definitions/GzipParser" - "$ref": "#/definitions/JsonLineParser" - "$ref": "#/definitions/CsvParser" -# PARSERS + # PARSERS GzipParser: type: object required: @@ -2794,7 +2794,7 @@ definitions: properties: type: type: string - enum: [ GzipParser ] + enum: [GzipParser] inner_parser: anyOf: - "$ref": "#/definitions/JsonLineParser" @@ -2806,7 +2806,7 @@ definitions: properties: type: type: string - enum: [ JsonLineParser ] + enum: [JsonLineParser] encoding: type: string default: utf-8 @@ -2817,7 +2817,7 @@ definitions: properties: type: type: string - enum: [ CsvParser ] + enum: [CsvParser] encoding: type: string default: utf-8 From 46e80d3bb3001cb2d3335ea35f2583ec2590e93d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 11:38:23 +0100 Subject: [PATCH 22/26] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/parsers/model_to_component_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ea6fac512..694cb1042 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -504,7 +504,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, - JsonLineParserModel: self.create_jsonline_parser, + JsonLineParserModel: self.create_json_line_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, @@ -1723,7 +1723,7 @@ def create_jsonl_decoder( return JsonlDecoder(parameters={}) @staticmethod - def create_jsonline_parser( + def create_json_line_parser( model: JsonLineParserModel, config: Config, **kwargs: Any ) -> JsonLineParser: return JsonLineParser(encoding=model.encoding) From 61a39195a557cfff0496cae269f70bbf6ee89444 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 13:20:05 +0100 Subject: [PATCH 23/26] CDK: ref to csv.DictReader Signed-off-by: Artem Inzhyyants --- .../declarative/decoders/composite_raw_decoder.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index df3a99f3e..476f7871d 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,14 +1,13 @@ +import csv import gzip import json import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from io import BufferedIOBase +from io import BufferedIOBase, TextIOWrapper from typing import Any, Generator, MutableMapping, Optional -import pandas as pd import requests -from numpy import nan from airbyte_cdk.sources.declarative.decoders.decoder import Decoder @@ -71,13 +70,9 @@ def parse( """ Parse CSV data from decompressed bytes. """ - reader = pd.read_csv( # type: ignore - data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding - ) - for chunk in reader: - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for row in chunk: - yield row + text_data = TextIOWrapper(data, encoding=self.encoding) + reader = csv.DictReader(text_data, delimiter=self.delimiter) + yield from reader @dataclass From 2ff8edfa8da66112a902d2c36ef371b472542b19 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 15:53:24 +0100 Subject: [PATCH 24/26] CDK: fix mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_raw_decoder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 476f7871d..f7a079f42 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -70,8 +70,8 @@ def parse( """ Parse CSV data from decompressed bytes. """ - text_data = TextIOWrapper(data, encoding=self.encoding) - reader = csv.DictReader(text_data, delimiter=self.delimiter) + text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore + reader = csv.DictReader(text_data, delimiter=self.delimiter or ",") yield from reader From 9641d4d4454fa65de1c07151045c9246dd4d782e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 15:58:25 +0100 Subject: [PATCH 25/26] CDK: apply coderabbit suggestions Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_raw_decoder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index f7a079f42..60f1407c0 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -38,8 +38,8 @@ def parse( """ Decompress gzipped bytes and pass decompressed data to the inner parser. """ - gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - yield from self.inner_parser.parse(gzipobj) + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) @dataclass @@ -54,7 +54,7 @@ def parse( try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) except json.JSONDecodeError: - logger.warning(f"Cannot decode/parse line {line!r} as JSON") + logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}") @dataclass From 2b6cfb5fc3fee90ba2e05379c8df4daa3f048bd9 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 26 Dec 2024 16:03:43 +0100 Subject: [PATCH 26/26] CDK: fix Signed-off-by: Artem Inzhyyants --- .../sources/declarative/decoders/composite_raw_decoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 60f1407c0..49653296d 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -53,7 +53,7 @@ def parse( for line in data: try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) - except json.JSONDecodeError: + except json.JSONDecodeError as e: logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")