Skip to content

Commit 4f9fd20

Browse files
author
Baz
authored
fix: (CDK) (AsyncRetriever) - Use the Nested Decoders to decode the streaming responses, instead of ResponseToFileExtractor (#378)
1 parent fe2f9a5 commit 4f9fd20

File tree

10 files changed

+270
-80
lines changed

10 files changed

+270
-80
lines changed

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 102 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,47 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
15
import csv
26
import gzip
37
import io
48
import json
59
import logging
6-
from abc import ABC, abstractmethod
710
from dataclasses import dataclass
811
from io import BufferedIOBase, TextIOWrapper
9-
from typing import Any, Generator, MutableMapping, Optional
12+
from typing import Any, Optional
1013

1114
import orjson
1215
import requests
1316

1417
from airbyte_cdk.models import FailureType
15-
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
18+
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE, Decoder
19+
from airbyte_cdk.sources.declarative.decoders.decoder_parser import (
20+
PARSER_OUTPUT_TYPE,
21+
PARSERS_BY_HEADER_TYPE,
22+
PARSERS_TYPE,
23+
Parser,
24+
)
1625
from airbyte_cdk.utils import AirbyteTracedException
1726

1827
logger = logging.getLogger("airbyte")
1928

2029

21-
@dataclass
22-
class Parser(ABC):
23-
@abstractmethod
24-
def parse(
25-
self,
26-
data: BufferedIOBase,
27-
) -> Generator[MutableMapping[str, Any], None, None]:
28-
"""
29-
Parse data and yield dictionaries.
30-
"""
31-
pass
32-
33-
3430
@dataclass
3531
class GzipParser(Parser):
3632
inner_parser: Parser
3733

38-
def parse(
39-
self,
40-
data: BufferedIOBase,
41-
) -> Generator[MutableMapping[str, Any], None, None]:
34+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
4235
"""
4336
Decompress gzipped bytes and pass decompressed data to the inner parser.
37+
38+
IMPORTANT:
39+
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.
40+
41+
Note:
42+
- The data is not decoded by default.
4443
"""
44+
4545
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
4646
yield from self.inner_parser.parse(gzipobj)
4747

@@ -50,7 +50,7 @@ def parse(
5050
class JsonParser(Parser):
5151
encoding: str = "utf-8"
5252

53-
def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
53+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
5454
"""
5555
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
5656
"""
@@ -90,10 +90,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]:
9090
class JsonLineParser(Parser):
9191
encoding: Optional[str] = "utf-8"
9292

93-
def parse(
94-
self,
95-
data: BufferedIOBase,
96-
) -> Generator[MutableMapping[str, Any], None, None]:
93+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
9794
for line in data:
9895
try:
9996
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
@@ -117,10 +114,7 @@ def _get_delimiter(self) -> Optional[str]:
117114

118115
return self.delimiter
119116

120-
def parse(
121-
self,
122-
data: BufferedIOBase,
123-
) -> Generator[MutableMapping[str, Any], None, None]:
117+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
124118
"""
125119
Parse CSV data from decompressed bytes.
126120
"""
@@ -130,31 +124,95 @@ def parse(
130124
yield row
131125

132126

133-
@dataclass
134127
class CompositeRawDecoder(Decoder):
135128
"""
136-
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
129+
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE
137130
passed response.raw to parser(s).
138-
Note: response.raw is not decoded/decompressed by default.
139-
parsers should be instantiated recursively.
131+
132+
Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
133+
140134
Example:
141-
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
135+
composite_raw_decoder = CompositeRawDecoder(
136+
parser=GzipParser(
137+
inner_parser=JsonLineParser(encoding="iso-8859-1")
138+
)
139+
)
142140
"""
143141

144-
parser: Parser
145-
stream_response: bool = True
142+
def __init__(
143+
self,
144+
parser: Parser,
145+
stream_response: bool = True,
146+
parsers_by_header: PARSERS_BY_HEADER_TYPE = None,
147+
) -> None:
148+
# since we moved from using `dataclass` to `__init__` method,
149+
# we need to keep using the `parser` to be able to resolve the depenencies
150+
# between the parsers correctly.
151+
self.parser = parser
152+
153+
self._parsers_by_header = parsers_by_header if parsers_by_header else {}
154+
self._stream_response = stream_response
155+
156+
@classmethod
157+
def by_headers(
158+
cls,
159+
parsers: PARSERS_TYPE,
160+
stream_response: bool,
161+
fallback_parser: Parser,
162+
) -> "CompositeRawDecoder":
163+
"""
164+
Create a CompositeRawDecoder instance based on header values.
165+
166+
Args:
167+
parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
168+
stream_response (bool): A flag indicating whether the response should be streamed.
169+
fallback_parser (Parser): A parser to use if no matching header is found.
170+
171+
Returns:
172+
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
173+
"""
174+
parsers_by_header = {}
175+
for headers, header_values, parser in parsers:
176+
for header in headers:
177+
parsers_by_header[header] = {header_value: parser for header_value in header_values}
178+
return cls(fallback_parser, stream_response, parsers_by_header)
146179

147180
def is_stream_response(self) -> bool:
148-
return self.stream_response
181+
return self._stream_response
149182

150-
def decode(
151-
self, response: requests.Response
152-
) -> Generator[MutableMapping[str, Any], None, None]:
183+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
184+
parser = self._select_parser(response)
153185
if self.is_stream_response():
154-
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
155-
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
186+
# urllib mentions that some interfaces don't play nice with auto_close
187+
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
188+
# We have indeed observed some issues with CSV parsing.
189+
# Hence, we will manage the closing of the file ourselves until we find a better solution.
156190
response.raw.auto_close = False
157-
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
191+
yield from parser.parse(
192+
data=response.raw, # type: ignore[arg-type]
193+
)
158194
response.raw.close()
159195
else:
160-
yield from self.parser.parse(data=io.BytesIO(response.content))
196+
yield from parser.parse(data=io.BytesIO(response.content))
197+
198+
def _select_parser(self, response: requests.Response) -> Parser:
199+
"""
200+
Selects the appropriate parser based on the response headers.
201+
202+
This method iterates through the `_parsers_by_header` dictionary to find a matching parser
203+
based on the headers in the response. If a matching header and header value are found,
204+
the corresponding parser is returned. If no match is found, the default parser is returned.
205+
206+
Args:
207+
response (requests.Response): The HTTP response object containing headers to check.
208+
209+
Returns:
210+
Parser: The parser corresponding to the matched header value, or the default parser if no match is found.
211+
"""
212+
for header, parser_by_header_value in self._parsers_by_header.items():
213+
if (
214+
header in response.headers
215+
and response.headers[header] in parser_by_header_value.keys()
216+
):
217+
return parser_by_header_value[response.headers[header]]
218+
return self.parser

airbyte_cdk/sources/declarative/decoders/decoder.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import requests
1010

11+
DECODER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]
12+
1113

1214
@dataclass
1315
class Decoder:
@@ -22,9 +24,7 @@ def is_stream_response(self) -> bool:
2224
"""
2325

2426
@abstractmethod
25-
def decode(
26-
self, response: requests.Response
27-
) -> Generator[MutableMapping[str, Any], None, None]:
27+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
2828
"""
2929
Decodes a requests.Response into a Mapping[str, Any] or an array
3030
:param response: the response to decode
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import logging
7+
from abc import ABC, abstractmethod
8+
from dataclasses import dataclass
9+
from io import BufferedIOBase
10+
from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple
11+
12+
logger = logging.getLogger("airbyte")
13+
14+
15+
PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]
16+
17+
18+
@dataclass
19+
class Parser(ABC):
20+
@abstractmethod
21+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
22+
"""
23+
Parse data and yield dictionaries.
24+
"""
25+
pass
26+
27+
28+
# reusable parser types
29+
PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]]
30+
PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]]

airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@
66
import zipfile
77
from dataclasses import dataclass
88
from io import BytesIO
9-
from typing import Any, Generator, MutableMapping
109

11-
import orjson
1210
import requests
1311

1412
from airbyte_cdk.models import FailureType
1513
from airbyte_cdk.sources.declarative.decoders import Decoder
16-
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
17-
Parser,
18-
)
14+
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser
15+
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE
1916
from airbyte_cdk.utils import AirbyteTracedException
2017

2118
logger = logging.getLogger("airbyte")
@@ -28,16 +25,16 @@ class ZipfileDecoder(Decoder):
2825
def is_stream_response(self) -> bool:
2926
return False
3027

31-
def decode(
32-
self, response: requests.Response
33-
) -> Generator[MutableMapping[str, Any], None, None]:
28+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
3429
try:
3530
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
3631
for file_name in zip_file.namelist():
3732
unzipped_content = zip_file.read(file_name)
3833
buffered_content = BytesIO(unzipped_content)
3934
try:
40-
yield from self.parser.parse(buffered_content)
35+
yield from self.parser.parse(
36+
buffered_content,
37+
)
4138
except Exception as e:
4239
logger.error(
4340
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1717

18-
EMPTY_STR: str = ""
1918
DEFAULT_ENCODING: str = "utf-8"
2019
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
2120

@@ -136,7 +135,6 @@ def _read_with_chunks(
136135
"""
137136

138137
try:
139-
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
140138
with open(path, "r", encoding=file_encoding) as data:
141139
chunks = pd.read_csv(
142140
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,18 +2193,40 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
21932193
stream_response=False if self._emit_connector_builder_messages else True,
21942194
)
21952195

2196-
@staticmethod
2197-
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
2196+
def create_jsonl_decoder(
2197+
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
2198+
) -> Decoder:
21982199
return CompositeRawDecoder(
2199-
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
2200+
parser=ModelToComponentFactory._get_parser(model, config),
2201+
stream_response=False if self._emit_connector_builder_messages else True,
22002202
)
22012203

22022204
def create_gzip_decoder(
22032205
self, model: GzipDecoderModel, config: Config, **kwargs: Any
22042206
) -> Decoder:
2205-
return CompositeRawDecoder(
2206-
parser=ModelToComponentFactory._get_parser(model, config),
2207-
stream_response=False if self._emit_connector_builder_messages else True,
2207+
_compressed_response_types = {
2208+
"gzip",
2209+
"x-gzip",
2210+
"gzip, deflate",
2211+
"x-gzip, deflate",
2212+
"application/zip",
2213+
"application/gzip",
2214+
"application/x-gzip",
2215+
"application/x-zip-compressed",
2216+
}
2217+
2218+
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser
2219+
2220+
if self._emit_connector_builder_messages:
2221+
# This is very surprising but if the response is not streamed,
2222+
# CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw,
2223+
# which uses urllib3 directly and does not uncompress the data.
2224+
return CompositeRawDecoder(gzip_parser.inner_parser, False)
2225+
2226+
return CompositeRawDecoder.by_headers(
2227+
[({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)],
2228+
stream_response=True,
2229+
fallback_parser=gzip_parser.inner_parser,
22082230
)
22092231

22102232
@staticmethod
@@ -2753,7 +2775,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
27532775
)
27542776
paginator = (
27552777
self._create_component_from_model(
2756-
model=model.download_paginator, decoder=decoder, config=config, url_base=""
2778+
model=model.download_paginator,
2779+
decoder=decoder,
2780+
config=config,
2781+
url_base="",
27572782
)
27582783
if model.download_paginator
27592784
else NoPagination(parameters={})
@@ -2870,7 +2895,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
28702895
model=model.status_extractor, decoder=decoder, config=config, name=name
28712896
)
28722897
download_target_extractor = self._create_component_from_model(
2873-
model=model.download_target_extractor, decoder=decoder, config=config, name=name
2898+
model=model.download_target_extractor,
2899+
decoder=decoder,
2900+
config=config,
2901+
name=name,
28742902
)
28752903
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
28762904
creation_requester=creation_requester,

0 commit comments

Comments
 (0)