Skip to content

Proposal for generic composite raw decoder #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import csv
import gzip
import io
Expand All @@ -6,7 +10,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional
from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple

import orjson
import requests
Expand Down Expand Up @@ -41,7 +45,14 @@ def parse(
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
"""

with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)

Expand All @@ -50,7 +61,10 @@ def parse(
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
Expand Down Expand Up @@ -130,31 +144,69 @@ def parse(
yield row


@dataclass
_HEADER = str
_HEADER_VALUE = str


class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.

Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.

Example:
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
composite_raw_decoder = CompositeRawDecoder(
parser=GzipParser(
inner_parser=JsonLineParser(encoding="iso-8859-1")
)
)
"""

parser: Parser
stream_response: bool = True
@classmethod
def by_headers(
cls,
parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]],
stream_response: bool,
fallback_parser: Parser,
) -> "CompositeRawDecoder":
parsers_by_header = {}
for headers, header_values, parser in parsers:
for header in headers:
parsers_by_header[header] = {header_value: parser for header_value in header_values}
return cls(fallback_parser, stream_response, parsers_by_header)

def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None:
self._parsers_by_header = parsers_by_header if parsers_by_header else {}
self._fallback_parser = parser
self._stream_response = stream_response

def is_stream_response(self) -> bool:
return self.stream_response
return self._stream_response

def decode(
self, response: requests.Response
self,
response: requests.Response,
) -> Generator[MutableMapping[str, Any], None, None]:
parser = self._select_parser(response)
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
# urllib mentions that some interfaces don't play nice with auto_close
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
# We have indeed observed some issues with CSV parsing.
# Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
yield from parser.parse(
data=response.raw, # type: ignore[arg-type]
)
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
yield from parser.parse(data=io.BytesIO(response.content))

def _select_parser(self, response: requests.Response) -> Parser:
for header, parser_by_header_value in self._parsers_by_header.items():
if (
header in response.headers
and response.headers[header] in parser_by_header_value.keys()
):
return parser_by_header_value[response.headers[header]]
return self._fallback_parser
20 changes: 20 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

import requests

COMPRESSSED_RESPONSE_TYPES = [
"gzip",
"x-gzip",
"gzip, deflate",
"x-gzip, deflate",
"application/zip",
"application/gzip",
"application/x-gzip",
"application/x-zip-compressed",
]


@dataclass
class Decoder:
Expand All @@ -30,3 +41,12 @@ def decode(
:param response: the response to decode
:return: Generator of Mapping describing the response
"""

def is_compressed_response(self, response: requests.Response) -> bool:
"""
Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header.
"""
return (
response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES
or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES
)
9 changes: 4 additions & 5 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
from io import BytesIO
from typing import Any, Generator, MutableMapping

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
Parser,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")
Expand All @@ -37,7 +34,9 @@ def decode(
unzipped_content = zip_file.read(file_name)
buffered_content = BytesIO(unzipped_content)
try:
yield from self.parser.parse(buffered_content)
yield from self.parser.parse(
buffered_content,
)
except Exception as e:
logger.error(
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

EMPTY_STR: str = ""
DEFAULT_ENCODING: str = "utf-8"
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10

Expand Down Expand Up @@ -136,7 +135,6 @@ def _read_with_chunks(
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,17 @@
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
}

_COMPRESSED_RESPONSE_TYPES = {
"gzip",
"x-gzip",
"gzip, deflate",
"x-gzip, deflate",
"application/zip",
"application/gzip",
"application/x-gzip",
"application/x-zip-compressed",
}


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -2193,18 +2204,29 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
def create_jsonl_decoder(
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
self, model: GzipDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser

if self._emit_connector_builder_messages:
# This is very surprising but if the response is not streamed, CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw which uses urllib3 directly and does not uncompress the data
return CompositeRawDecoder(gzip_parser.inner_parser, False)

return CompositeRawDecoder.by_headers(
[
({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser)
],
stream_response=True,
fallback_parser=gzip_parser.inner_parser,
)

@staticmethod
Expand Down Expand Up @@ -2753,7 +2775,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
)
paginator = (
self._create_component_from_model(
model=model.download_paginator, decoder=decoder, config=config, url_base=""
model=model.download_paginator,
decoder=decoder,
config=config,
url_base="",
)
if model.download_paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -2870,7 +2895,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
model=model.status_extractor, decoder=decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
Expand Down
40 changes: 33 additions & 7 deletions airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
)
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
Expand All @@ -26,7 +28,10 @@
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context
from airbyte_cdk.utils.mapping_helpers import (
combine_mappings,
get_interpolation_context,
)


@dataclass
Expand Down Expand Up @@ -155,7 +160,9 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.get_request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(
Expand All @@ -166,7 +173,9 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.get_request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

# fixing request options provider types has a lot of dependencies
Expand Down Expand Up @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
return self._request_options_provider.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

@property
Expand Down Expand Up @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str:
path (str): The path to join with the base URL.

Returns:
str: The concatenated URL with the trailing slash (if any) removed.
str: The resulting joined URL.

Note:
Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
- If the path is an empty string or None, the method returns the base URL with any trailing slash removed.

Example:
1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
3) _join_url("https://example.com/api/", "") >> 'https://example.com/api'
4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
"""
return urljoin(url_base, path).rstrip("/")

# return a full-url if provided directly from interpolation context
if path == EmptyString or path is None:
return url_base.rstrip("/")

return urljoin(url_base, path)

def send_request(
self,
Expand Down
Loading
Loading