Skip to content

fix: (CDK) (AsyncRetriever) - Use the Nested Decoders to decode the streaming responses, instead of ResponseToFileExtractor #378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
146 changes: 102 additions & 44 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import csv
import gzip
import io
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional
from typing import Any, Optional

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE, Decoder
from airbyte_cdk.sources.declarative.decoders.decoder_parser import (
PARSER_OUTPUT_TYPE,
PARSERS_BY_HEADER_TYPE,
PARSERS_TYPE,
Parser,
)
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")


@dataclass
class Parser(ABC):
@abstractmethod
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse data and yield dictionaries.
"""
pass


@dataclass
class GzipParser(Parser):
inner_parser: Parser

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
"""

with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)

Expand All @@ -50,7 +50,7 @@ def parse(
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
Expand Down Expand Up @@ -90,10 +90,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]:
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
for line in data:
try:
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
Expand All @@ -117,10 +114,7 @@ def _get_delimiter(self) -> Optional[str]:

return self.delimiter

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Parse CSV data from decompressed bytes.
"""
Expand All @@ -130,31 +124,95 @@ def parse(
yield row


@dataclass
class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.

Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.

Example:
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
composite_raw_decoder = CompositeRawDecoder(
parser=GzipParser(
inner_parser=JsonLineParser(encoding="iso-8859-1")
)
)
"""

parser: Parser
stream_response: bool = True
def __init__(
self,
parser: Parser,
stream_response: bool = True,
parsers_by_header: PARSERS_BY_HEADER_TYPE = None,
) -> None:
# since we moved from using `dataclass` to `__init__` method,
# we need to keep using the `parser` to be able to resolve the depenencies
# between the parsers correctly.
self.parser = parser

self._parsers_by_header = parsers_by_header if parsers_by_header else {}
self._stream_response = stream_response

@classmethod
def by_headers(
cls,
parsers: PARSERS_TYPE,
stream_response: bool,
fallback_parser: Parser,
) -> "CompositeRawDecoder":
"""
Create a CompositeRawDecoder instance based on header values.

Args:
parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
stream_response (bool): A flag indicating whether the response should be streamed.
fallback_parser (Parser): A parser to use if no matching header is found.

Returns:
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
"""
parsers_by_header = {}
for headers, header_values, parser in parsers:
for header in headers:
parsers_by_header[header] = {header_value: parser for header_value in header_values}
return cls(fallback_parser, stream_response, parsers_by_header)

def is_stream_response(self) -> bool:
return self.stream_response
return self._stream_response

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
parser = self._select_parser(response)
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
# urllib mentions that some interfaces don't play nice with auto_close
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
# We have indeed observed some issues with CSV parsing.
# Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
yield from parser.parse(
data=response.raw, # type: ignore[arg-type]
)
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
yield from parser.parse(data=io.BytesIO(response.content))

def _select_parser(self, response: requests.Response) -> Parser:
"""
Selects the appropriate parser based on the response headers.

This method iterates through the `_parsers_by_header` dictionary to find a matching parser
based on the headers in the response. If a matching header and header value are found,
the corresponding parser is returned. If no match is found, the default parser is returned.

Args:
response (requests.Response): The HTTP response object containing headers to check.

Returns:
Parser: The parser corresponding to the matched header value, or the default parser if no match is found.
"""
for header, parser_by_header_value in self._parsers_by_header.items():
if (
header in response.headers
and response.headers[header] in parser_by_header_value.keys()
):
return parser_by_header_value[response.headers[header]]
return self.parser
6 changes: 3 additions & 3 deletions airbyte_cdk/sources/declarative/decoders/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import requests

DECODER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]


@dataclass
class Decoder:
Expand All @@ -22,9 +24,7 @@ def is_stream_response(self) -> bool:
"""

@abstractmethod
def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
"""
Decodes a requests.Response into a Mapping[str, Any] or an array
:param response: the response to decode
Expand Down
30 changes: 30 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/decoder_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase
from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple

logger = logging.getLogger("airbyte")


PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]


@dataclass
class Parser(ABC):
@abstractmethod
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Parse data and yield dictionaries.
"""
pass


# reusable parser types
PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]]
PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]]
15 changes: 6 additions & 9 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import zipfile
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Generator, MutableMapping

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
Parser,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")
Expand All @@ -28,16 +25,16 @@ class ZipfileDecoder(Decoder):
def is_stream_response(self) -> bool:
return False

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
try:
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
for file_name in zip_file.namelist():
unzipped_content = zip_file.read(file_name)
buffered_content = BytesIO(unzipped_content)
try:
yield from self.parser.parse(buffered_content)
yield from self.parser.parse(
buffered_content,
)
except Exception as e:
logger.error(
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

EMPTY_STR: str = ""
DEFAULT_ENCODING: str = "utf-8"
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10

Expand Down Expand Up @@ -136,7 +135,6 @@ def _read_with_chunks(
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,18 +2193,40 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
def create_jsonl_decoder(
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
self, model: GzipDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
_compressed_response_types = {
"gzip",
"x-gzip",
"gzip, deflate",
"x-gzip, deflate",
"application/zip",
"application/gzip",
"application/x-gzip",
"application/x-zip-compressed",
}

gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser

if self._emit_connector_builder_messages:
# This is very surprising but if the response is not streamed,
# CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw,
# which uses urllib3 directly and does not uncompress the data.
return CompositeRawDecoder(gzip_parser.inner_parser, False)

return CompositeRawDecoder.by_headers(
[({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)],
stream_response=True,
fallback_parser=gzip_parser.inner_parser,
)

@staticmethod
Expand Down Expand Up @@ -2753,7 +2775,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
)
paginator = (
self._create_component_from_model(
model=model.download_paginator, decoder=decoder, config=config, url_base=""
model=model.download_paginator,
decoder=decoder,
config=config,
url_base="",
)
if model.download_paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -2870,7 +2895,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
model=model.status_extractor, decoder=decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
model=model.download_target_extractor,
decoder=decoder,
config=config,
name=name,
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
Expand Down
Loading
Loading