diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 93775c1ec..2fc26c43a 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,47 +1,47 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + import csv import gzip import io import json import logging -from abc import ABC, abstractmethod from dataclasses import dataclass from io import BufferedIOBase, TextIOWrapper -from typing import Any, Generator, MutableMapping, Optional +from typing import Any, Optional import orjson import requests from airbyte_cdk.models import FailureType -from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE, Decoder +from airbyte_cdk.sources.declarative.decoders.decoder_parser import ( + PARSER_OUTPUT_TYPE, + PARSERS_BY_HEADER_TYPE, + PARSERS_TYPE, + Parser, +) from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") -@dataclass -class Parser(ABC): - @abstractmethod - def parse( - self, - data: BufferedIOBase, - ) -> Generator[MutableMapping[str, Any], None, None]: - """ - Parse data and yield dictionaries. - """ - pass - - @dataclass class GzipParser(Parser): inner_parser: Parser - def parse( - self, - data: BufferedIOBase, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Decompress gzipped bytes and pass decompressed data to the inner parser. + + IMPORTANT: + - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. + + Note: + - The data is not decoded by default. """ + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) @@ -50,7 +50,7 @@ def parse( class JsonParser(Parser): encoding: str = "utf-8" - def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ @@ -90,10 +90,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]: class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" - def parse( - self, - data: BufferedIOBase, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: for line in data: try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) @@ -117,10 +114,7 @@ def _get_delimiter(self) -> Optional[str]: return self.delimiter - def parse( - self, - data: BufferedIOBase, - ) -> Generator[MutableMapping[str, Any], None, None]: + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Parse CSV data from decompressed bytes. """ @@ -130,31 +124,95 @@ def parse( yield row -@dataclass class CompositeRawDecoder(Decoder): """ - Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] + Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s). - Note: response.raw is not decoded/decompressed by default. - parsers should be instantiated recursively. + + Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. + Example: - composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + composite_raw_decoder = CompositeRawDecoder( + parser=GzipParser( + inner_parser=JsonLineParser(encoding="iso-8859-1") + ) + ) """ - parser: Parser - stream_response: bool = True + def __init__( + self, + parser: Parser, + stream_response: bool = True, + parsers_by_header: PARSERS_BY_HEADER_TYPE = None, + ) -> None: + # since we moved from using `dataclass` to `__init__` method, + # we need to keep using the `parser` to be able to resolve the depenencies + # between the parsers correctly. + self.parser = parser + + self._parsers_by_header = parsers_by_header if parsers_by_header else {} + self._stream_response = stream_response + + @classmethod + def by_headers( + cls, + parsers: PARSERS_TYPE, + stream_response: bool, + fallback_parser: Parser, + ) -> "CompositeRawDecoder": + """ + Create a CompositeRawDecoder instance based on header values. + + Args: + parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. + stream_response (bool): A flag indicating whether the response should be streamed. + fallback_parser (Parser): A parser to use if no matching header is found. + + Returns: + CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. + """ + parsers_by_header = {} + for headers, header_values, parser in parsers: + for header in headers: + parsers_by_header[header] = {header_value: parser for header_value in header_values} + return cls(fallback_parser, stream_response, parsers_by_header) def is_stream_response(self) -> bool: - return self.stream_response + return self._stream_response - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: + parser = self._select_parser(response) if self.is_stream_response(): - # urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) - # We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution. + # urllib mentions that some interfaces don't play nice with auto_close + # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content + # We have indeed observed some issues with CSV parsing. + # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + yield from parser.parse( + data=response.raw, # type: ignore[arg-type] + ) response.raw.close() else: - yield from self.parser.parse(data=io.BytesIO(response.content)) + yield from parser.parse(data=io.BytesIO(response.content)) + + def _select_parser(self, response: requests.Response) -> Parser: + """ + Selects the appropriate parser based on the response headers. + + This method iterates through the `_parsers_by_header` dictionary to find a matching parser + based on the headers in the response. If a matching header and header value are found, + the corresponding parser is returned. If no match is found, the default parser is returned. + + Args: + response (requests.Response): The HTTP response object containing headers to check. + + Returns: + Parser: The parser corresponding to the matched header value, or the default parser if no match is found. + """ + for header, parser_by_header_value in self._parsers_by_header.items(): + if ( + header in response.headers + and response.headers[header] in parser_by_header_value.keys() + ): + return parser_by_header_value[response.headers[header]] + return self.parser diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 5fa9dc8f6..34d99db1f 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,6 +8,8 @@ import requests +DECODER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None] + @dataclass class Decoder: @@ -22,9 +24,7 @@ def is_stream_response(self) -> bool: """ @abstractmethod - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: """ Decodes a requests.Response into a Mapping[str, Any] or an array :param response: the response to decode diff --git a/airbyte_cdk/sources/declarative/decoders/decoder_parser.py b/airbyte_cdk/sources/declarative/decoders/decoder_parser.py new file mode 100644 index 000000000..d1401f54f --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/decoder_parser.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from io import BufferedIOBase +from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple + +logger = logging.getLogger("airbyte") + + +PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None] + + +@dataclass +class Parser(ABC): + @abstractmethod + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: + """ + Parse data and yield dictionaries. + """ + pass + + +# reusable parser types +PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]] +PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]] diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index a937a1e4d..5e9ba5788 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -6,16 +6,13 @@ import zipfile from dataclasses import dataclass from io import BytesIO -from typing import Any, Generator, MutableMapping -import orjson import requests from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( - Parser, -) +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser +from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -28,16 +25,16 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False - def decode( - self, response: requests.Response - ) -> Generator[MutableMapping[str, Any], None, None]: + def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: try: with zipfile.ZipFile(BytesIO(response.content)) as zip_file: for file_name in zip_file.namelist(): unzipped_content = zip_file.read(file_name) buffered_content = BytesIO(unzipped_content) try: - yield from self.parser.parse(buffered_content) + yield from self.parser.parse( + buffered_content, + ) except Exception as e: logger.error( f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 76631ee6b..c7fd98c17 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -15,7 +15,6 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -EMPTY_STR: str = "" DEFAULT_ENCODING: str = "utf-8" DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 @@ -136,7 +135,6 @@ def _read_with_chunks( """ try: - # TODO: Add support for other file types, like `json`, with `pd.read_json()` with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv( data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1c2289c17..9a0c66af2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2193,18 +2193,40 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A stream_response=False if self._emit_connector_builder_messages else True, ) - @staticmethod - def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_jsonl_decoder( + self, model: JsonlDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: - return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), - stream_response=False if self._emit_connector_builder_messages else True, + _compressed_response_types = { + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", + } + + gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + + if self._emit_connector_builder_messages: + # This is very surprising but if the response is not streamed, + # CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw, + # which uses urllib3 directly and does not uncompress the data. + return CompositeRawDecoder(gzip_parser.inner_parser, False) + + return CompositeRawDecoder.by_headers( + [({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)], + stream_response=True, + fallback_parser=gzip_parser.inner_parser, ) @staticmethod @@ -2753,7 +2775,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie ) paginator = ( self._create_component_from_model( - model=model.download_paginator, decoder=decoder, config=config, url_base="" + model=model.download_paginator, + decoder=decoder, + config=config, + url_base="", ) if model.download_paginator else NoPagination(parameters={}) @@ -2870,7 +2895,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie model=model.status_extractor, decoder=decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( - model=model.download_target_extractor, decoder=decoder, config=config, name=name + model=model.download_target_extractor, + decoder=decoder, + config=config, + name=name, ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 8a64fae60..45671fc59 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -16,7 +16,9 @@ ) from airbyte_cdk.sources.declarative.decoders import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -26,7 +28,10 @@ from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState -from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context +from airbyte_cdk.utils.mapping_helpers import ( + combine_mappings, + get_interpolation_context, +) @dataclass @@ -155,7 +160,9 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: return self._request_options_provider.get_request_params( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) def get_request_headers( @@ -166,7 +173,9 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._request_options_provider.get_request_headers( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) # fixing request options provider types has a lot of dependencies @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: return self._request_options_provider.get_request_body_json( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) @property @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str: path (str): The path to join with the base URL. Returns: - str: The concatenated URL with the trailing slash (if any) removed. + str: The resulting joined URL. + + Note: + Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 + - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. + + Example: + 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' + 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' + 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api' + 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' """ - return urljoin(url_base, path).rstrip("/") + + # return a full-url if provided directly from interpolation context + if path == EmptyString or path is None: + return url_base.rstrip("/") + + return urljoin(url_base, path) def send_request( self, diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 39e74d8e6..02c0993b6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -8,7 +8,8 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO from threading import Thread -from unittest.mock import patch +from typing import Iterable +from unittest.mock import Mock, patch import pytest import requests @@ -68,6 +69,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding, delimiter="\t", should_compress=True), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -81,7 +83,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): assert counter == 3 -def generate_jsonlines(): +def generate_jsonlines() -> Iterable[str]: """ Generator function to yield data in JSON Lines format. This is useful for streaming large datasets. @@ -107,12 +109,57 @@ def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): requests_mock.register_uri( - "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(encoding=encoding), ) response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_match_when_decode_then_select_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser()) + unused_parser = Mock() + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, parser)], + stream_response=True, + fallback_parser=unused_parser, + ) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_does_not_match_when_decode_then_select_fallback_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content="".join(generate_jsonlines()).encode("utf-8"), + headers={"Content-Encoding": "not gzip in order to expect fallback"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + unused_parser = GzipParser(inner_parser=Mock()) + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, unused_parser)], + stream_response=True, + fallback_parser=JsonLineParser(), + ) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 @@ -266,7 +313,8 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ ) response = requests.get("https://airbyte.io/") composite_raw_decoder = CompositeRawDecoder( - parser=JsonParser(encoding="utf-8"), stream_response=False + parser=JsonParser(encoding="utf-8"), + stream_response=False, ) content = list(composite_raw_decoder.decode(response)) diff --git a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py index 731895e2e..f5c988d0f 100644 --- a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py @@ -43,7 +43,12 @@ def test_zipfile_decoder_with_single_file_response(requests_mock, json_data): zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser())) compressed_data = gzip.compress(json.dumps(json_data).encode()) zipped_data = create_zip_from_dict(compressed_data) - requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=zipped_data, + headers={"Content-Encoding": "application/zip"}, + ) response = requests.get("https://airbyte.io/") if isinstance(json_data, list): diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index a1229579f..dfe78011a 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token(): "test_trailing_slash_on_path", "https://airbyte.io", "/my_endpoint/", - "https://airbyte.io/my_endpoint", + "https://airbyte.io/my_endpoint/", ), ( "test_nested_path_no_leading_slash",