Skip to content

Commit

Permalink
chore(decoder): clean decoders and make csvdecoder available (#326)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
maxi297 and octavia-squidington-iii authored Feb 12, 2025
1 parent 74631d8 commit cb5a921
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 330 deletions.
102 changes: 22 additions & 80 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,6 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2133,43 +2132,26 @@ definitions:
$parameters:
type: object
additionalProperties: true
GzipJsonDecoder:
title: GzipJson Decoder
description: Use this if the response is Gzip compressed Json.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [GzipJsonDecoder]
encoding:
type: string
default: utf-8
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
- parser
- decoder
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
decoder:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -3002,79 +2984,39 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
CompositeRawDecoder:
description: "(This is experimental, use at your own risk)"
type: object
required:
- type
- parser
properties:
type:
type: string
enum: [CompositeRawDecoder]
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
GzipParser:
GzipDecoder:
type: object
required:
- type
- inner_parser
- decoder
properties:
type:
type: string
enum: [GzipParser]
inner_parser:
enum: [GzipDecoder]
decoder:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
required:
- type
properties:
type:
type: string
enum: [JsonParser]
encoding:
type: string
default: utf-8
JsonLineParser:
type: object
required:
- type
properties:
type:
type: string
enum: [JsonLineParser]
encoding:
type: string
default: utf-8
CsvParser:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
CsvDecoder:
type: object
required:
- type
properties:
type:
type: string
enum: [CsvParser]
enum: [CsvDecoder]
encoding:
type: string
default: utf-8
Expand Down Expand Up @@ -3202,24 +3144,24 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
Expand Down
4 changes: 0 additions & 4 deletions airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
)
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
IterableDecoder,
JsonDecoder,
JsonlDecoder,
)
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import (
Expand All @@ -27,9 +25,7 @@
"CompositeRawDecoder",
"JsonDecoder",
"JsonParser",
"JsonlDecoder",
"IterableDecoder",
"GzipJsonDecoder",
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import gzip
import io
import json
import logging
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -130,11 +131,15 @@ class CompositeRawDecoder(Decoder):
"""

parser: Parser
stream_response: bool = True

def is_stream_response(self) -> bool:
return True
return self.stream_response

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
if self.is_stream_response():
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
70 changes: 12 additions & 58 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,41 @@
import orjson
import requests

from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class JsonDecoder(Decoder):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.
"""

parameters: InitVar[Mapping[str, Any]]
def __init__(self, parameters: Mapping[str, Any]):
self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False)

def is_stream_response(self) -> bool:
return False
return self._decoder.is_stream_response()

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
"""
has_yielded = False
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
for element in self._decoder.decode(response):
yield element
has_yielded = True
except Exception:
yield {}

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
if not has_yielded:
yield {}
else:
yield from body_json


@dataclass
Expand All @@ -69,43 +63,3 @@ def decode(
) -> Generator[MutableMapping[str, Any], None, None]:
for line in response.iter_lines():
yield {"record": line.decode()}


@dataclass
class JsonlDecoder(Decoder):
"""
Decoder strategy that returns the json-encoded content of the response, if any.
"""

parameters: InitVar[Mapping[str, Any]]

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
# TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional?
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
for record in response.iter_lines():
yield orjson.loads(record)


@dataclass
class GzipJsonDecoder(JsonDecoder):
encoding: Optional[str]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.encoding:
try:
codecs.lookup(self.encoding)
except LookupError:
raise ValueError(
f"Invalid encoding '{self.encoding}'. Please check provided encoding"
)

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8")
yield from self.parse_body_json(orjson.loads(raw_string))
Loading

0 comments on commit cb5a921

Please sign in to comment.