From 2120a18ba759297b0cd4a12fdcf09903ab42fd95 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 4 Mar 2025 19:00:26 +0200 Subject: [PATCH 01/12] add --- .../declarative_component_schema.yaml | 4 + .../extractors/response_to_file_extractor.py | 193 +++++++++++++----- .../models/declarative_component_schema.py | 4 + .../parsers/model_to_component_factory.py | 4 +- 4 files changed, 154 insertions(+), 51 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..3ff38b051 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1678,6 +1678,10 @@ definitions: type: type: string enum: [ResponseToFileExtractor] + file_type: + title: The file type in which the response data is storred. Supported types are [csv, jsonl]. + type: string + default: csv $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 76631ee6b..f0e2ccf94 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -7,6 +7,7 @@ import zlib from contextlib import closing from dataclasses import InitVar, dataclass +from enum import Enum from typing import Any, Dict, Iterable, Mapping, Optional, Tuple import pandas as pd @@ -20,21 +21,57 @@ DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 +class FileTypes(Enum): + CSV = "csv" + JSONL = "jsonl" + + @dataclass class ResponseToFileExtractor(RecordExtractor): """ - This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as - a tradeoff. + This class is used when having very big HTTP responses (usually streamed), + which would require too much memory so we use disk space as a tradeoff. + + The extractor does the following: + 1) Save the response to a temporary file + 2) Read from the temporary file by chunks to avoid OOM + 3) Remove the temporary file after reading + 4) Return the records + 5) If the response is not compressed, it will be filtered for null bytes + 6) If the response is compressed, it will be decompressed + 7) If the response is compressed and contains null bytes, it will be filtered for null bytes - Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for - a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. """ parameters: InitVar[Mapping[str, Any]] + file_type: Optional[str] = "csv" def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") + def extract_records( + self, response: Optional[requests.Response] = None + ) -> Iterable[Mapping[str, Any]]: + """ + Extracts records from the given response by: + 1) Saving the result to a tmp file + 2) Reading from saved file by chunks to avoid OOM + + Args: + response (Optional[requests.Response]): The response object containing the data. Defaults to None. + + Yields: + Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. + + Returns: + None + """ + if response: + file_path, encoding = self._save_to_file(response) + yield from self._read_with_chunks(file_path, encoding) + else: + yield from [] + def _get_response_encoding(self, headers: Dict[str, Any]) -> str: """ Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library @@ -42,6 +79,7 @@ def _get_response_encoding(self, headers: Dict[str, Any]) -> str: Args: headers (Dict[str, Any]): The headers of the response. + Returns: str: The encoding of the response. """ @@ -73,11 +111,28 @@ def _filter_null_bytes(self, b: bytes) -> bytes: res = b.replace(b"\x00", b"") if len(res) < len(b): - self.logger.warning( - "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res) - ) + message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars" + self.logger.warning(message, len(b), len(res)) return res + def _get_file_path(self) -> str: + """ + Get a temporary file path with a unique name. + + Returns: + str: The path to the temporary file. + + Raises: + ValueError: If the file type is not supported. + """ + + if self.file_type not in [file_type.value for file_type in FileTypes]: + raise ValueError( + f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.", + ) + + return str(uuid.uuid4()) + "." + self.file_type + def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: """ Saves the binary data from the given response to a temporary file and returns the filepath and response encoding. @@ -95,8 +150,9 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) needs_decompression = True # we will assume at first that the response is compressed and change the flag if not - tmp_file = str(uuid.uuid4()) - with closing(response) as response, open(tmp_file, "wb") as data_file: + file_path = self._get_file_path() + # save binary data to tmp file + with closing(response) as response, open(file_path, "wb") as data_file: response_encoding = self._get_response_encoding(dict(response.headers or {})) for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): try: @@ -110,15 +166,76 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: needs_decompression = False # check the file exists - if os.path.isfile(tmp_file): - return tmp_file, response_encoding + if os.path.isfile(file_path): + return file_path, response_encoding else: - raise ValueError( - f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist." - ) + message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data." + raise ValueError(f"{message} Tmp file {file_path} doesn't exist.") + + def _read_csv( + self, + path: str, + file_encoding: str, + chunk_size: int = 100, + ) -> Iterable[Mapping[str, Any]]: + """ + Reads a CSV file and yields each row as a dictionary. + + Args: + path (str): The path to the CSV file to be read. + file_encoding (str): The encoding of the file. + + Yields: + Mapping[str, Any]: A dictionary representing each row of data. + """ + + csv_read_params = { + "chunksize": chunk_size, + "iterator": True, + "dialect": "unix", + "dtype": object, + "encoding": file_encoding, + } + + for chunk in pd.read_csv(path, **csv_read_params): + # replace NaN with None + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for record in chunk: + yield record + + def _read_json_lines( + self, + path: str, + file_encoding: str, + chunk_size: int = 100, + ) -> Iterable[Mapping[str, Any]]: + """ + Reads a JSON file and yields each row as a dictionary. + + Args: + path (str): The path to the JSON file to be read. + file_encoding (str): The encoding of the file. + + Yields: + Mapping[str, Any]: A dictionary representing each row of data. + """ + + json_read_params = { + "lines": True, + "chunksize": chunk_size, + "encoding": file_encoding, + "convert_dates": False, + } + + for chunk in pd.read_json(path, **json_read_params): + for record in chunk.to_dict(orient="records"): + yield record def _read_with_chunks( - self, path: str, file_encoding: str, chunk_size: int = 100 + self, + path: str, + file_encoding: str, + chunk_size: int = 100, ) -> Iterable[Mapping[str, Any]]: """ Reads data from a file in chunks and yields each row as a dictionary. @@ -132,47 +249,23 @@ def _read_with_chunks( Mapping[str, Any]: A dictionary representing each row of data. Raises: - ValueError: If an IO/Error occurs while reading the temporary data. + ValueError: If an error occurs while reading the data from the file. """ try: - # TODO: Add support for other file types, like `json`, with `pd.read_json()` - with open(path, "r", encoding=file_encoding) as data: - chunks = pd.read_csv( - data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object - ) - for chunk in chunks: - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for row in chunk: - yield row + if self.file_type == FileTypes.CSV.value: + yield from self._read_csv(path, file_encoding, chunk_size) + + if self.file_type == FileTypes.JSONL.value: + yield from self._read_json_lines(path, file_encoding, chunk_size) + except pd.errors.EmptyDataError as e: - self.logger.info(f"Empty data received. {e}") + message = "ResponseToFileExtractor._read_with_chunks(): Empty data received." + self.logger.info(f"{message} {e}") yield from [] except IOError as ioe: - raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) + message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file." + raise ValueError(f"{message} Called: {path}", ioe) finally: # remove binary tmp file, after data is read os.remove(path) - - def extract_records( - self, response: Optional[requests.Response] = None - ) -> Iterable[Mapping[str, Any]]: - """ - Extracts records from the given response by: - 1) Saving the result to a tmp file - 2) Reading from saved file by chunks to avoid OOM - - Args: - response (Optional[requests.Response]): The response object containing the data. Defaults to None. - - Yields: - Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. - - Returns: - None - """ - if response: - file_path, encoding = self._save_to_file(response) - yield from self._read_with_chunks(file_path, encoding) - else: - yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..1a114981d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -702,6 +702,10 @@ class DpathExtractor(BaseModel): class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] + file_type: Optional[str] = Field( + "csv", + title="The file type in which the response data is storred. Supported types are [csv, jsonl].", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..39da868d7 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1992,7 +1992,9 @@ def create_response_to_file_extractor( model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: - return ResponseToFileExtractor(parameters=model.parameters or {}) + return ResponseToFileExtractor( + parameters=model.parameters or {}, file_type=model.file_type or "csv" + ) @staticmethod def create_exponential_backoff_strategy( From 5167da5fe990c9809a8afd37e18d39488e76ddb6 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 4 Mar 2025 19:34:41 +0200 Subject: [PATCH 02/12] fix linter issues --- .../declarative/extractors/response_to_file_extractor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index f0e2ccf94..266e10098 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -197,7 +197,7 @@ def _read_csv( "encoding": file_encoding, } - for chunk in pd.read_csv(path, **csv_read_params): + for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed # replace NaN with None chunk = chunk.replace({nan: None}).to_dict(orient="records") for record in chunk: @@ -227,7 +227,7 @@ def _read_json_lines( "convert_dates": False, } - for chunk in pd.read_json(path, **json_read_params): + for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed for record in chunk.to_dict(orient="records"): yield record From 030f1065ce78f13cbc95c3ebe49f6fd9aa63d7db Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 5 Mar 2025 18:20:52 +0200 Subject: [PATCH 03/12] updated after the review --- .../declarative_component_schema.yaml | 4 - .../decoders/composite_raw_decoder.py | 25 ++- .../extractors/response_to_file_extractor.py | 192 +++++------------- .../models/declarative_component_schema.py | 4 - .../parsers/model_to_component_factory.py | 38 ++-- 5 files changed, 94 insertions(+), 169 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3ff38b051..6cd9998c7 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1678,10 +1678,6 @@ definitions: type: type: string enum: [ResponseToFileExtractor] - file_type: - title: The file type in which the response data is storred. Supported types are [csv, jsonl]. - type: string - default: csv $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index b8e8e3315..209afb1c2 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -35,15 +35,36 @@ def parse( class GzipParser(Parser): inner_parser: Parser + def _reset_reader_pointer(self, data: BufferedIOBase) -> None: + """ + Reset the reader pointer to the beginning of the data. + + Note: + - This is necessary because the gzip decompression will consume the data stream. + """ + data.seek(0) + def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. + + IMPORTANT: + - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. + + Note: + - The data is not decoded by default. """ - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) + + try: + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + except gzip.BadGzipFile: + logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.") + self._reset_reader_pointer(data) + yield from self.inner_parser.parse(data) @dataclass diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 266e10098..0215ddb45 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -7,7 +7,6 @@ import zlib from contextlib import closing from dataclasses import InitVar, dataclass -from enum import Enum from typing import Any, Dict, Iterable, Mapping, Optional, Tuple import pandas as pd @@ -21,57 +20,21 @@ DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 -class FileTypes(Enum): - CSV = "csv" - JSONL = "jsonl" - - @dataclass class ResponseToFileExtractor(RecordExtractor): """ - This class is used when having very big HTTP responses (usually streamed), - which would require too much memory so we use disk space as a tradeoff. - - The extractor does the following: - 1) Save the response to a temporary file - 2) Read from the temporary file by chunks to avoid OOM - 3) Remove the temporary file after reading - 4) Return the records - 5) If the response is not compressed, it will be filtered for null bytes - 6) If the response is compressed, it will be decompressed - 7) If the response is compressed and contains null bytes, it will be filtered for null bytes + This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as + a tradeoff. + Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for + a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. """ parameters: InitVar[Mapping[str, Any]] - file_type: Optional[str] = "csv" def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") - def extract_records( - self, response: Optional[requests.Response] = None - ) -> Iterable[Mapping[str, Any]]: - """ - Extracts records from the given response by: - 1) Saving the result to a tmp file - 2) Reading from saved file by chunks to avoid OOM - - Args: - response (Optional[requests.Response]): The response object containing the data. Defaults to None. - - Yields: - Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. - - Returns: - None - """ - if response: - file_path, encoding = self._save_to_file(response) - yield from self._read_with_chunks(file_path, encoding) - else: - yield from [] - def _get_response_encoding(self, headers: Dict[str, Any]) -> str: """ Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library @@ -79,7 +42,6 @@ def _get_response_encoding(self, headers: Dict[str, Any]) -> str: Args: headers (Dict[str, Any]): The headers of the response. - Returns: str: The encoding of the response. """ @@ -111,27 +73,10 @@ def _filter_null_bytes(self, b: bytes) -> bytes: res = b.replace(b"\x00", b"") if len(res) < len(b): - message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars" - self.logger.warning(message, len(b), len(res)) - return res - - def _get_file_path(self) -> str: - """ - Get a temporary file path with a unique name. - - Returns: - str: The path to the temporary file. - - Raises: - ValueError: If the file type is not supported. - """ - - if self.file_type not in [file_type.value for file_type in FileTypes]: - raise ValueError( - f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.", + self.logger.warning( + "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res) ) - - return str(uuid.uuid4()) + "." + self.file_type + return res def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: """ @@ -150,9 +95,8 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) needs_decompression = True # we will assume at first that the response is compressed and change the flag if not - file_path = self._get_file_path() - # save binary data to tmp file - with closing(response) as response, open(file_path, "wb") as data_file: + tmp_file = str(uuid.uuid4()) + with closing(response) as response, open(tmp_file, "wb") as data_file: response_encoding = self._get_response_encoding(dict(response.headers or {})) for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): try: @@ -166,76 +110,15 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: needs_decompression = False # check the file exists - if os.path.isfile(file_path): - return file_path, response_encoding + if os.path.isfile(tmp_file): + return tmp_file, response_encoding else: - message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data." - raise ValueError(f"{message} Tmp file {file_path} doesn't exist.") - - def _read_csv( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, - ) -> Iterable[Mapping[str, Any]]: - """ - Reads a CSV file and yields each row as a dictionary. - - Args: - path (str): The path to the CSV file to be read. - file_encoding (str): The encoding of the file. - - Yields: - Mapping[str, Any]: A dictionary representing each row of data. - """ - - csv_read_params = { - "chunksize": chunk_size, - "iterator": True, - "dialect": "unix", - "dtype": object, - "encoding": file_encoding, - } - - for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed - # replace NaN with None - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for record in chunk: - yield record - - def _read_json_lines( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, - ) -> Iterable[Mapping[str, Any]]: - """ - Reads a JSON file and yields each row as a dictionary. - - Args: - path (str): The path to the JSON file to be read. - file_encoding (str): The encoding of the file. - - Yields: - Mapping[str, Any]: A dictionary representing each row of data. - """ - - json_read_params = { - "lines": True, - "chunksize": chunk_size, - "encoding": file_encoding, - "convert_dates": False, - } - - for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed - for record in chunk.to_dict(orient="records"): - yield record + raise ValueError( + f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist." + ) def _read_with_chunks( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, + self, path: str, file_encoding: str, chunk_size: int = 100 ) -> Iterable[Mapping[str, Any]]: """ Reads data from a file in chunks and yields each row as a dictionary. @@ -249,23 +132,46 @@ def _read_with_chunks( Mapping[str, Any]: A dictionary representing each row of data. Raises: - ValueError: If an error occurs while reading the data from the file. + ValueError: If an IO/Error occurs while reading the temporary data. """ try: - if self.file_type == FileTypes.CSV.value: - yield from self._read_csv(path, file_encoding, chunk_size) - - if self.file_type == FileTypes.JSONL.value: - yield from self._read_json_lines(path, file_encoding, chunk_size) - + with open(path, "r", encoding=file_encoding) as data: + chunks = pd.read_csv( + data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object + ) + for chunk in chunks: + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for row in chunk: + yield row except pd.errors.EmptyDataError as e: - message = "ResponseToFileExtractor._read_with_chunks(): Empty data received." - self.logger.info(f"{message} {e}") + self.logger.info(f"Empty data received. {e}") yield from [] except IOError as ioe: - message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file." - raise ValueError(f"{message} Called: {path}", ioe) + raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) finally: # remove binary tmp file, after data is read os.remove(path) + + def extract_records( + self, response: Optional[requests.Response] = None + ) -> Iterable[Mapping[str, Any]]: + """ + Extracts records from the given response by: + 1) Saving the result to a tmp file + 2) Reading from saved file by chunks to avoid OOM + + Args: + response (Optional[requests.Response]): The response object containing the data. Defaults to None. + + Yields: + Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. + + Returns: + None + """ + if response: + file_path, encoding = self._save_to_file(response) + yield from self._read_with_chunks(file_path, encoding) + else: + yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 1a114981d..a49b66c03 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -702,10 +702,6 @@ class DpathExtractor(BaseModel): class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] - file_type: Optional[str] = Field( - "csv", - title="The file type in which the response data is storred. Supported types are [csv, jsonl].", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39da868d7..da9b018a3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1992,9 +1992,7 @@ def create_response_to_file_extractor( model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: - return ResponseToFileExtractor( - parameters=model.parameters or {}, file_type=model.file_type or "csv" - ) + return ResponseToFileExtractor(parameters=model.parameters or {}) @staticmethod def create_exponential_backoff_strategy( @@ -2195,10 +2193,12 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A stream_response=False if self._emit_connector_builder_messages else True, ) - @staticmethod - def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_jsonl_decoder( + self, model: JsonlDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) def create_gzip_decoder( @@ -2755,7 +2755,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie ) paginator = ( self._create_component_from_model( - model=model.download_paginator, decoder=decoder, config=config, url_base="" + model=model.download_paginator, + decoder=operational_decoder, + config=config, + url_base="", ) if model.download_paginator else NoPagination(parameters={}) @@ -2784,7 +2787,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie parameters={}, ) - decoder = ( + operational_decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) @@ -2792,7 +2795,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie record_selector = self._create_component_from_model( model=model.record_selector, config=config, - decoder=decoder, + decoder=operational_decoder, name=name, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, @@ -2800,13 +2803,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) creation_requester = self._create_component_from_model( model=model.creation_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job creation - {name}", ) polling_requester = self._create_component_from_model( model=model.polling_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job polling - {name}", ) @@ -2841,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie abort_requester = ( self._create_component_from_model( model=model.abort_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job abort - {name}", ) @@ -2851,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie delete_requester = ( self._create_component_from_model( model=model.delete_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job delete - {name}", ) @@ -2861,7 +2864,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie download_target_requester = ( self._create_component_from_model( model=model.download_target_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job extract_url - {name}", ) @@ -2869,10 +2872,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie else None ) status_extractor = self._create_component_from_model( - model=model.status_extractor, decoder=decoder, config=config, name=name + model=model.status_extractor, decoder=operational_decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( - model=model.download_target_extractor, decoder=decoder, config=config, name=name + model=model.download_target_extractor, + decoder=operational_decoder, + config=config, + name=name, ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, From 0d85176c01998222af7ddc56e7965b33a025ac35 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 5 Mar 2025 18:31:22 +0200 Subject: [PATCH 04/12] removed non-used constant --- .../sources/declarative/extractors/response_to_file_extractor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 0215ddb45..c7fd98c17 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -15,7 +15,6 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -EMPTY_STR: str = "" DEFAULT_ENCODING: str = "utf-8" DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 From f0be859b6018a7f52c7be61e33109c14833d90d5 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 6 Mar 2025 11:38:23 +0200 Subject: [PATCH 05/12] fixed stripping trailing slash when the path provides values --- .../declarative/requesters/http_requester.py | 40 +++++++++++++++---- .../requesters/test_http_requester.py | 2 +- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 8a64fae60..45671fc59 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -16,7 +16,9 @@ ) from airbyte_cdk.sources.declarative.decoders import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -26,7 +28,10 @@ from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState -from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context +from airbyte_cdk.utils.mapping_helpers import ( + combine_mappings, + get_interpolation_context, +) @dataclass @@ -155,7 +160,9 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: return self._request_options_provider.get_request_params( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) def get_request_headers( @@ -166,7 +173,9 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._request_options_provider.get_request_headers( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) # fixing request options provider types has a lot of dependencies @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: return self._request_options_provider.get_request_body_json( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) @property @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str: path (str): The path to join with the base URL. Returns: - str: The concatenated URL with the trailing slash (if any) removed. + str: The resulting joined URL. + + Note: + Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 + - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. + + Example: + 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' + 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' + 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api' + 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' """ - return urljoin(url_base, path).rstrip("/") + + # return a full-url if provided directly from interpolation context + if path == EmptyString or path is None: + return url_base.rstrip("/") + + return urljoin(url_base, path) def send_request( self, diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index a1229579f..dfe78011a 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token(): "test_trailing_slash_on_path", "https://airbyte.io", "/my_endpoint/", - "https://airbyte.io/my_endpoint", + "https://airbyte.io/my_endpoint/", ), ( "test_nested_path_no_leading_slash", From 559b280ebb9a4c46b115a503225acc3eb3409061 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 6 Mar 2025 23:11:22 +0200 Subject: [PATCH 06/12] updated after the review --- .../decoders/composite_raw_decoder.py | 59 +++++++++++++------ .../decoders/test_composite_decoder.py | 27 ++++++++- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 209afb1c2..857d2f0e1 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,3 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + import csv import gzip import io @@ -18,12 +22,21 @@ logger = logging.getLogger("airbyte") +COMPRESSION_TYPES = [ + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", +] + + @dataclass class Parser(ABC): @abstractmethod def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse data and yield dictionaries. @@ -35,18 +48,10 @@ def parse( class GzipParser(Parser): inner_parser: Parser - def _reset_reader_pointer(self, data: BufferedIOBase) -> None: - """ - Reset the reader pointer to the beginning of the data. - - Note: - - This is necessary because the gzip decompression will consume the data stream. - """ - data.seek(0) - def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. @@ -58,12 +63,10 @@ def parse( - The data is not decoded by default. """ - try: + if compressed: with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) - except gzip.BadGzipFile: - logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.") - self._reset_reader_pointer(data) + else: yield from self.inner_parser.parse(data) @@ -71,7 +74,11 @@ def parse( class JsonParser(Parser): encoding: str = "utf-8" - def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + def parse( + self, + data: BufferedIOBase, + compressed: Optional[bool] = False, + ) -> Generator[MutableMapping[str, Any], None, None]: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ @@ -114,6 +121,7 @@ class JsonLineParser(Parser): def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: @@ -141,6 +149,7 @@ def _get_delimiter(self) -> Optional[str]: def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse CSV data from decompressed bytes. @@ -156,10 +165,15 @@ class CompositeRawDecoder(Decoder): """ Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] passed response.raw to parser(s). - Note: response.raw is not decoded/decompressed by default. - parsers should be instantiated recursively. + + Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. + Example: - composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + composite_raw_decoder = CompositeRawDecoder( + parser=GzipParser( + inner_parser=JsonLineParser(encoding="iso-8859-1") + ) + ) """ parser: Parser @@ -168,10 +182,17 @@ class CompositeRawDecoder(Decoder): def is_stream_response(self) -> bool: return self.stream_response + def is_compressed(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return response.headers.get("Content-Encoding") in COMPRESSION_TYPES + def decode( - self, response: requests.Response + self, + response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] else: yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 745113925..26cbae613 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -110,6 +110,30 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str assert counter == 3 +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_response( + requests_mock, encoding: str +) -> None: + """ + Test the GzipParser with a non-compressed response. + """ + + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + # we encode the jsonl content as bytes here + content="".join(generate_jsonlines()).encode(encoding), + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): response_content = "".join(generate_jsonlines()) @@ -224,7 +248,8 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ ) response = requests.get("https://airbyte.io/") composite_raw_decoder = CompositeRawDecoder( - parser=JsonParser(encoding="utf-8"), stream_response=False + parser=JsonParser(encoding="utf-8"), + stream_response=False, ) content = list(composite_raw_decoder.decode(response)) From dbed4d6b6f7a773a3e8d8a819fae5305cde3f7a3 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:09:22 +0200 Subject: [PATCH 07/12] updated COMPRESSED_TYPES and fixed the tests --- .../declarative/decoders/composite_raw_decoder.py | 11 ++++++++--- .../declarative/decoders/zipfile_decoder.py | 15 +++++++++++---- .../decoders/test_composite_decoder.py | 11 ++++++++++- .../declarative/decoders/test_zipfile_decoder.py | 7 ++++++- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 4bf38f869..1162afa09 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -27,6 +27,10 @@ "x-gzip", "gzip, deflate", "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", ] @@ -64,6 +68,7 @@ def parse( """ if compressed: + print(f"\n\nHERE\n\n") with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) else: @@ -193,9 +198,9 @@ def decode( response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): - # urllib mentions that some interfaces don't play nice with auto_close - # [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) - # We have indeed observed some issues with CSV parsing. + # urllib mentions that some interfaces don't play nice with auto_close + # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content + # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index a937a1e4d..637f742b1 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -13,9 +13,7 @@ from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( - Parser, -) +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -28,6 +26,12 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False + def is_compressed(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return response.headers.get("Content-Encoding") in COMPRESSION_TYPES + def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: @@ -37,7 +41,10 @@ def decode( unzipped_content = zip_file.read(file_name) buffered_content = BytesIO(unzipped_content) try: - yield from self.parser.parse(buffered_content) + yield from self.parser.parse( + buffered_content, + compressed=self.is_compressed(response), + ) except Exception as e: logger.error( f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 5aba44dc2..98079686c 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -68,6 +68,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding, delimiter="\t", should_compress=True), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -107,7 +108,10 @@ def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): requests_mock.register_uri( - "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(encoding=encoding), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -132,6 +136,11 @@ def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_resp "https://airbyte.io/", # we encode the jsonl content as bytes here content="".join(generate_jsonlines()).encode(encoding), + # we don't specify the `Content-Encoding` header here + # to simulate a non-compressed response + # but we still use the GzipParser to decode it + # to test the GzipParser's behavior with non-compressed data + # and to ensure it doesn't raise an error. ) response = requests.get("https://airbyte.io/", stream=True) diff --git a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py index 731895e2e..f5c988d0f 100644 --- a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py @@ -43,7 +43,12 @@ def test_zipfile_decoder_with_single_file_response(requests_mock, json_data): zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser())) compressed_data = gzip.compress(json.dumps(json_data).encode()) zipped_data = create_zip_from_dict(compressed_data) - requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=zipped_data, + headers={"Content-Encoding": "application/zip"}, + ) response = requests.get("https://airbyte.io/") if isinstance(json_data, list): From ead326dc20e5e85346f37baa70fa75ab7d8a6954 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:25:23 +0200 Subject: [PATCH 08/12] reverted operational_decoder to decoder --- .../parsers/model_to_component_factory.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d270750c5..09f42282e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2756,7 +2756,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie paginator = ( self._create_component_from_model( model=model.download_paginator, - decoder=operational_decoder, + decoder=decoder, config=config, url_base="", ) @@ -2787,7 +2787,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie parameters={}, ) - operational_decoder = ( + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) @@ -2795,7 +2795,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie record_selector = self._create_component_from_model( model=model.record_selector, config=config, - decoder=operational_decoder, + decoder=decoder, name=name, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, @@ -2803,13 +2803,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) creation_requester = self._create_component_from_model( model=model.creation_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job creation - {name}", ) polling_requester = self._create_component_from_model( model=model.polling_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job polling - {name}", ) @@ -2844,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie abort_requester = ( self._create_component_from_model( model=model.abort_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job abort - {name}", ) @@ -2854,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie delete_requester = ( self._create_component_from_model( model=model.delete_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job delete - {name}", ) @@ -2864,7 +2864,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie download_target_requester = ( self._create_component_from_model( model=model.download_target_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job extract_url - {name}", ) @@ -2872,11 +2872,11 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie else None ) status_extractor = self._create_component_from_model( - model=model.status_extractor, decoder=operational_decoder, config=config, name=name + model=model.status_extractor, decoder=decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( model=model.download_target_extractor, - decoder=operational_decoder, + decoder=decoder, config=config, name=name, ) From f1a71a66810b6c7d8356e6f67f64a29ed6eaca5f Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:29:55 +0200 Subject: [PATCH 09/12] removed print --- .../sources/declarative/decoders/composite_raw_decoder.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 1162afa09..30b8c449f 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -68,7 +68,6 @@ def parse( """ if compressed: - print(f"\n\nHERE\n\n") with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) else: From 3750f6d218261e9c83940b9b00956ac96560875d Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 08:34:13 +0200 Subject: [PATCH 10/12] updated compression types checks --- .../decoders/composite_raw_decoder.py | 23 ++++--------------- .../sources/declarative/decoders/decoder.py | 20 ++++++++++++++++ .../declarative/decoders/zipfile_decoder.py | 11 ++------- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 30b8c449f..60636b845 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -22,18 +22,6 @@ logger = logging.getLogger("airbyte") -COMPRESSION_TYPES = [ - "gzip", - "x-gzip", - "gzip, deflate", - "x-gzip, deflate", - "application/zip", - "application/gzip", - "application/x-gzip", - "application/x-zip-compressed", -] - - @dataclass class Parser(ABC): @abstractmethod @@ -186,12 +174,6 @@ class CompositeRawDecoder(Decoder): def is_stream_response(self) -> bool: return self.stream_response - def is_compressed(self, response: requests.Response) -> bool: - """ - Check if the response is compressed based on the Content-Encoding header. - """ - return response.headers.get("Content-Encoding") in COMPRESSION_TYPES - def decode( self, response: requests.Response, @@ -202,7 +184,10 @@ def decode( # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] + yield from self.parser.parse( + data=response.raw, # type: ignore[arg-type] + compressed=self.is_compressed_response(response), + ) response.raw.close() else: yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 5fa9dc8f6..44445034a 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,6 +8,17 @@ import requests +COMPRESSION_RESPONSE_TYPES = [ + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", +] + @dataclass class Decoder: @@ -30,3 +41,12 @@ def decode( :param response: the response to decode :return: Generator of Mapping describing the response """ + + def is_compressed_response(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return ( + response.headers.get("Content-Encoding") in COMPRESSION_RESPONSE_TYPES + or response.headers.get("Content-Type") in COMPRESSION_RESPONSE_TYPES + ) diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index 637f742b1..4423519f9 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -8,12 +8,11 @@ from io import BytesIO from typing import Any, Generator, MutableMapping -import orjson import requests from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -26,12 +25,6 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False - def is_compressed(self, response: requests.Response) -> bool: - """ - Check if the response is compressed based on the Content-Encoding header. - """ - return response.headers.get("Content-Encoding") in COMPRESSION_TYPES - def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: @@ -43,7 +36,7 @@ def decode( try: yield from self.parser.parse( buffered_content, - compressed=self.is_compressed(response), + compressed=self.is_compressed_response(response), ) except Exception as e: logger.error( From 96f41dac7dd666266b3f4541de27ff5a1cb7da58 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 14:09:05 +0200 Subject: [PATCH 11/12] nit --- airbyte_cdk/sources/declarative/decoders/decoder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 44445034a..d195caeac 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,7 +8,7 @@ import requests -COMPRESSION_RESPONSE_TYPES = [ +COMPRESSSED_RESPONSE_TYPES = [ "gzip", "x-gzip", "gzip, deflate", @@ -44,9 +44,9 @@ def decode( def is_compressed_response(self, response: requests.Response) -> bool: """ - Check if the response is compressed based on the Content-Encoding header. + Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header. """ return ( - response.headers.get("Content-Encoding") in COMPRESSION_RESPONSE_TYPES - or response.headers.get("Content-Type") in COMPRESSION_RESPONSE_TYPES + response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES + or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES ) From bcc2c36502bc9b86685b1f0d34d42dcb82fda8c0 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 19:31:33 +0200 Subject: [PATCH 12/12] updated after the review + maximes proposal --- .../decoders/composite_raw_decoder.py | 133 +++++++++++------- .../sources/declarative/decoders/decoder.py | 24 +--- .../declarative/decoders/decoder_parser.py | 30 ++++ .../declarative/decoders/zipfile_decoder.py | 7 +- .../parsers/model_to_component_factory.py | 26 +++- .../decoders/test_composite_decoder.py | 54 ++++--- 6 files changed, 170 insertions(+), 104 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/decoders/decoder_parser.py diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 60636b845..2fc26c43a 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -7,44 +7,31 @@ import io import json import logging -from abc import ABC, abstractmethod from dataclasses import dataclass from io import BufferedIOBase, TextIOWrapper -from typing import Any, Generator, MutableMapping, Optional +from typing import Any, Optional import orjson import requests from airbyte_cdk.models import FailureType -from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE, Decoder +from airbyte_cdk.sources.declarative.decoders.decoder_parser import ( + PARSER_OUTPUT_TYPE, + PARSERS_BY_HEADER_TYPE, + PARSERS_TYPE, + Parser, +) from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") -@dataclass -class Parser(ABC): - @abstractmethod - def parse( - self, - data: BufferedIOBase, - compressed: Optional[bool] = False, - ) -> Generator[MutableMapping[str, Any], None, None]: - """ - Parse data and yield dictionaries. - """ - pass - - @dataclass class GzipParser(Parser): inner_parser: Parser - def parse( - self, - data: BufferedIOBase, - compressed: Optional[bool] = False, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Decompress gzipped bytes and pass decompressed data to the inner parser. @@ -55,22 +42,15 @@ def parse( - The data is not decoded by default. """ - if compressed: - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) - else: - yield from self.inner_parser.parse(data) + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) @dataclass class JsonParser(Parser): encoding: str = "utf-8" - def parse( - self, - data: BufferedIOBase, - compressed: Optional[bool] = False, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ @@ -110,11 +90,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]: class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" - def parse( - self, - data: BufferedIOBase, - compressed: Optional[bool] = False, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: for line in data: try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) @@ -138,11 +114,7 @@ def _get_delimiter(self) -> Optional[str]: return self.delimiter - def parse( - self, - data: BufferedIOBase, - compressed: Optional[bool] = False, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Parse CSV data from decompressed bytes. """ @@ -152,10 +124,9 @@ def parse( yield row -@dataclass class CompositeRawDecoder(Decoder): """ - Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] + Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s). Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. @@ -168,26 +139,80 @@ class CompositeRawDecoder(Decoder): ) """ - parser: Parser - stream_response: bool = True + def __init__( + self, + parser: Parser, + stream_response: bool = True, + parsers_by_header: PARSERS_BY_HEADER_TYPE = None, + ) -> None: + # since we moved from using `dataclass` to `__init__` method, + # we need to keep using the `parser` to be able to resolve the depenencies + # between the parsers correctly. + self.parser = parser + + self._parsers_by_header = parsers_by_header if parsers_by_header else {} + self._stream_response = stream_response + + @classmethod + def by_headers( + cls, + parsers: PARSERS_TYPE, + stream_response: bool, + fallback_parser: Parser, + ) -> "CompositeRawDecoder": + """ + Create a CompositeRawDecoder instance based on header values. + + Args: + parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. + stream_response (bool): A flag indicating whether the response should be streamed. + fallback_parser (Parser): A parser to use if no matching header is found. + + Returns: + CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. + """ + parsers_by_header = {} + for headers, header_values, parser in parsers: + for header in headers: + parsers_by_header[header] = {header_value: parser for header_value in header_values} + return cls(fallback_parser, stream_response, parsers_by_header) def is_stream_response(self) -> bool: - return self.stream_response + return self._stream_response - def decode( - self, - response: requests.Response, - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: + parser = self._select_parser(response) if self.is_stream_response(): # urllib mentions that some interfaces don't play nice with auto_close # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse( + yield from parser.parse( data=response.raw, # type: ignore[arg-type] - compressed=self.is_compressed_response(response), ) response.raw.close() else: - yield from self.parser.parse(data=io.BytesIO(response.content)) + yield from parser.parse(data=io.BytesIO(response.content)) + + def _select_parser(self, response: requests.Response) -> Parser: + """ + Selects the appropriate parser based on the response headers. + + This method iterates through the `_parsers_by_header` dictionary to find a matching parser + based on the headers in the response. If a matching header and header value are found, + the corresponding parser is returned. If no match is found, the default parser is returned. + + Args: + response (requests.Response): The HTTP response object containing headers to check. + + Returns: + Parser: The parser corresponding to the matched header value, or the default parser if no match is found. + """ + for header, parser_by_header_value in self._parsers_by_header.items(): + if ( + header in response.headers + and response.headers[header] in parser_by_header_value.keys() + ): + return parser_by_header_value[response.headers[header]] + return self.parser diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index d195caeac..34d99db1f 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,16 +8,7 @@ import requests -COMPRESSSED_RESPONSE_TYPES = [ - "gzip", - "x-gzip", - "gzip, deflate", - "x-gzip, deflate", - "application/zip", - "application/gzip", - "application/x-gzip", - "application/x-zip-compressed", -] +DECODER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None] @dataclass @@ -33,20 +24,9 @@ def is_stream_response(self) -> bool: """ @abstractmethod - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: """ Decodes a requests.Response into a Mapping[str, Any] or an array :param response: the response to decode :return: Generator of Mapping describing the response """ - - def is_compressed_response(self, response: requests.Response) -> bool: - """ - Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header. - """ - return ( - response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES - or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES - ) diff --git a/airbyte_cdk/sources/declarative/decoders/decoder_parser.py b/airbyte_cdk/sources/declarative/decoders/decoder_parser.py new file mode 100644 index 000000000..d1401f54f --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/decoder_parser.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from io import BufferedIOBase +from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple + +logger = logging.getLogger("airbyte") + + +PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None] + + +@dataclass +class Parser(ABC): + @abstractmethod + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: + """ + Parse data and yield dictionaries. + """ + pass + + +# reusable parser types +PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]] +PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]] diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index 4423519f9..5e9ba5788 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -6,13 +6,13 @@ import zipfile from dataclasses import dataclass from io import BytesIO -from typing import Any, Generator, MutableMapping import requests from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser +from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -25,9 +25,7 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: try: with zipfile.ZipFile(BytesIO(response.content)) as zip_file: for file_name in zip_file.namelist(): @@ -36,7 +34,6 @@ def decode( try: yield from self.parser.parse( buffered_content, - compressed=self.is_compressed_response(response), ) except Exception as e: logger.error( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 09f42282e..9a0c66af2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2204,9 +2204,29 @@ def create_jsonl_decoder( def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: - return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), - stream_response=False if self._emit_connector_builder_messages else True, + _compressed_response_types = { + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", + } + + gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + + if self._emit_connector_builder_messages: + # This is very surprising but if the response is not streamed, + # CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw, + # which uses urllib3 directly and does not uncompress the data. + return CompositeRawDecoder(gzip_parser.inner_parser, False) + + return CompositeRawDecoder.by_headers( + [({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)], + stream_response=True, + fallback_parser=gzip_parser.inner_parser, ) @staticmethod diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 98079686c..02c0993b6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -8,7 +8,8 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO from threading import Thread -from unittest.mock import patch +from typing import Iterable +from unittest.mock import Mock, patch import pytest import requests @@ -82,7 +83,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): assert counter == 3 -def generate_jsonlines(): +def generate_jsonlines() -> Iterable[str]: """ Generator function to yield data in JSON Lines format. This is useful for streaming large datasets. @@ -111,41 +112,54 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding), - headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 -@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) -def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_response( - requests_mock, encoding: str -) -> None: - """ - Test the GzipParser with a non-compressed response. - """ +def test_given_header_match_when_decode_then_select_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=JsonLineParser()) + unused_parser = Mock() + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, parser)], + stream_response=True, + fallback_parser=unused_parser, + ) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_does_not_match_when_decode_then_select_fallback_parser(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", - # we encode the jsonl content as bytes here - content="".join(generate_jsonlines()).encode(encoding), - # we don't specify the `Content-Encoding` header here - # to simulate a non-compressed response - # but we still use the GzipParser to decode it - # to test the GzipParser's behavior with non-compressed data - # and to ensure it doesn't raise an error. + content="".join(generate_jsonlines()).encode("utf-8"), + headers={"Content-Encoding": "not gzip in order to expect fallback"}, ) response = requests.get("https://airbyte.io/", stream=True) - parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + unused_parser = GzipParser(inner_parser=Mock()) + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, unused_parser)], + stream_response=True, + fallback_parser=JsonLineParser(), + ) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1