Skip to content

Commit f8cb659

Browse files
authored
feat: add Composite Raw Decoder (#179)
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent 2671c24 commit f8cb659

File tree

9 files changed

+447
-68
lines changed

9 files changed

+447
-68
lines changed

.github/workflows/pypi_publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ name: Packaging and Publishing
1010
on:
1111
push:
1212
tags:
13-
- 'v*'
13+
- "v*"
1414
workflow_dispatch:
1515
inputs:
1616
version:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2766,9 +2766,64 @@ definitions:
27662766
- "$ref": "#/definitions/IterableDecoder"
27672767
- "$ref": "#/definitions/XmlDecoder"
27682768
- "$ref": "#/definitions/GzipJsonDecoder"
2769+
- "$ref": "#/definitions/CompositeRawDecoder"
27692770
$parameters:
27702771
type: object
27712772
additionalProperties: true
2773+
CompositeRawDecoder:
2774+
description: "(This is experimental, use at your own risk)"
2775+
type: object
2776+
required:
2777+
- type
2778+
- parser
2779+
properties:
2780+
type:
2781+
type: string
2782+
enum: [CompositeRawDecoder]
2783+
parser:
2784+
anyOf:
2785+
- "$ref": "#/definitions/GzipParser"
2786+
- "$ref": "#/definitions/JsonLineParser"
2787+
- "$ref": "#/definitions/CsvParser"
2788+
# PARSERS
2789+
GzipParser:
2790+
type: object
2791+
required:
2792+
- type
2793+
- inner_parser
2794+
properties:
2795+
type:
2796+
type: string
2797+
enum: [GzipParser]
2798+
inner_parser:
2799+
anyOf:
2800+
- "$ref": "#/definitions/JsonLineParser"
2801+
- "$ref": "#/definitions/CsvParser"
2802+
JsonLineParser:
2803+
type: object
2804+
required:
2805+
- type
2806+
properties:
2807+
type:
2808+
type: string
2809+
enum: [JsonLineParser]
2810+
encoding:
2811+
type: string
2812+
default: utf-8
2813+
CsvParser:
2814+
type: object
2815+
required:
2816+
- type
2817+
properties:
2818+
type:
2819+
type: string
2820+
enum: [CsvParser]
2821+
encoding:
2822+
type: string
2823+
default: utf-8
2824+
delimiter:
2825+
type: string
2826+
default: ","
27722827
AsyncJobStatusMap:
27732828
description: Matches the api job status to Async Job Status.
27742829
type: object

airbyte_cdk/sources/declarative/decoders/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder
56
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
67
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
78
GzipJsonDecoder,
@@ -17,6 +18,7 @@
1718

1819
__all__ = [
1920
"Decoder",
21+
"CompositeRawDecoder",
2022
"JsonDecoder",
2123
"JsonlDecoder",
2224
"IterableDecoder",
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import csv
2+
import gzip
3+
import json
4+
import logging
5+
from abc import ABC, abstractmethod
6+
from dataclasses import dataclass
7+
from io import BufferedIOBase, TextIOWrapper
8+
from typing import Any, Generator, MutableMapping, Optional
9+
10+
import requests
11+
12+
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
13+
14+
logger = logging.getLogger("airbyte")
15+
16+
17+
@dataclass
18+
class Parser(ABC):
19+
@abstractmethod
20+
def parse(
21+
self,
22+
data: BufferedIOBase,
23+
) -> Generator[MutableMapping[str, Any], None, None]:
24+
"""
25+
Parse data and yield dictionaries.
26+
"""
27+
pass
28+
29+
30+
@dataclass
31+
class GzipParser(Parser):
32+
inner_parser: Parser
33+
34+
def parse(
35+
self,
36+
data: BufferedIOBase,
37+
) -> Generator[MutableMapping[str, Any], None, None]:
38+
"""
39+
Decompress gzipped bytes and pass decompressed data to the inner parser.
40+
"""
41+
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
42+
yield from self.inner_parser.parse(gzipobj)
43+
44+
45+
@dataclass
46+
class JsonLineParser(Parser):
47+
encoding: Optional[str] = "utf-8"
48+
49+
def parse(
50+
self,
51+
data: BufferedIOBase,
52+
) -> Generator[MutableMapping[str, Any], None, None]:
53+
for line in data:
54+
try:
55+
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
56+
except json.JSONDecodeError as e:
57+
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")
58+
59+
60+
@dataclass
61+
class CsvParser(Parser):
62+
# TODO: migrate implementation to re-use file-base classes
63+
encoding: Optional[str] = "utf-8"
64+
delimiter: Optional[str] = ","
65+
66+
def parse(
67+
self,
68+
data: BufferedIOBase,
69+
) -> Generator[MutableMapping[str, Any], None, None]:
70+
"""
71+
Parse CSV data from decompressed bytes.
72+
"""
73+
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
74+
reader = csv.DictReader(text_data, delimiter=self.delimiter or ",")
75+
yield from reader
76+
77+
78+
@dataclass
79+
class CompositeRawDecoder(Decoder):
80+
"""
81+
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
82+
passed response.raw to parser(s).
83+
Note: response.raw is not decoded/decompressed by default.
84+
parsers should be instantiated recursively.
85+
Example:
86+
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
87+
"""
88+
89+
parser: Parser
90+
91+
def is_stream_response(self) -> bool:
92+
return True
93+
94+
def decode(
95+
self, response: requests.Response
96+
) -> Generator[MutableMapping[str, Any], None, None]:
97+
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,17 @@ class LegacySessionTokenAuthenticator(BaseModel):
11251125
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
11261126

11271127

1128+
class JsonLineParser(BaseModel):
1129+
type: Literal["JsonLineParser"]
1130+
encoding: Optional[str] = "utf-8"
1131+
1132+
1133+
class CsvParser(BaseModel):
1134+
type: Literal["CsvParser"]
1135+
encoding: Optional[str] = "utf-8"
1136+
delimiter: Optional[str] = ","
1137+
1138+
11281139
class AsyncJobStatusMap(BaseModel):
11291140
type: Optional[Literal["AsyncJobStatusMap"]] = None
11301141
running: List[str]
@@ -1208,6 +1219,8 @@ class ComponentMappingDefinition(BaseModel):
12081219
"{{ components_values['updates'] }}",
12091220
"{{ components_values['MetaData']['LastUpdatedTime'] }}",
12101221
"{{ config['segment_id'] }}",
1222+
"{{ stream_slice['parent_id'] }}",
1223+
"{{ stream_slice['extra_fields']['name'] }}",
12111224
],
12121225
title="Value",
12131226
)
@@ -1504,6 +1517,11 @@ class RecordSelector(BaseModel):
15041517
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15051518

15061519

1520+
class GzipParser(BaseModel):
1521+
type: Literal["GzipParser"]
1522+
inner_parser: Union[JsonLineParser, CsvParser]
1523+
1524+
15071525
class Spec(BaseModel):
15081526
type: Literal["Spec"]
15091527
connection_specification: Dict[str, Any] = Field(
@@ -1534,6 +1552,11 @@ class CompositeErrorHandler(BaseModel):
15341552
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15351553

15361554

1555+
class CompositeRawDecoder(BaseModel):
1556+
type: Literal["CompositeRawDecoder"]
1557+
parser: Union[GzipParser, JsonLineParser, CsvParser]
1558+
1559+
15371560
class DeclarativeSource1(BaseModel):
15381561
class Config:
15391562
extra = Extra.forbid
@@ -1936,6 +1959,7 @@ class SimpleRetriever(BaseModel):
19361959
IterableDecoder,
19371960
XmlDecoder,
19381961
GzipJsonDecoder,
1962+
CompositeRawDecoder,
19391963
]
19401964
] = Field(
19411965
None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@
6767
PaginationDecoderDecorator,
6868
XmlDecoder,
6969
)
70+
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
71+
CompositeRawDecoder,
72+
CsvParser,
73+
GzipParser,
74+
JsonLineParser,
75+
)
7076
from airbyte_cdk.sources.declarative.extractors import (
7177
DpathExtractor,
7278
RecordFilter,
@@ -125,6 +131,9 @@
125131
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
126132
CompositeErrorHandler as CompositeErrorHandlerModel,
127133
)
134+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
135+
CompositeRawDecoder as CompositeRawDecoderModel,
136+
)
128137
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
129138
ConcurrencyLevel as ConcurrencyLevelModel,
130139
)
@@ -134,6 +143,9 @@
134143
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
135144
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
136145
)
146+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
147+
CsvParser as CsvParserModel,
148+
)
137149
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
138150
CursorPagination as CursorPaginationModel,
139151
)
@@ -203,6 +215,9 @@
203215
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
204216
GzipJsonDecoder as GzipJsonDecoderModel,
205217
)
218+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
219+
GzipParser as GzipParserModel,
220+
)
206221
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
207222
HttpComponentsResolver as HttpComponentsResolverModel,
208223
)
@@ -227,6 +242,9 @@
227242
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
228243
JsonlDecoder as JsonlDecoderModel,
229244
)
245+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
246+
JsonLineParser as JsonLineParserModel,
247+
)
230248
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231249
JwtAuthenticator as JwtAuthenticatorModel,
232250
)
@@ -455,6 +473,7 @@ def _init_mappings(self) -> None:
455473
BearerAuthenticatorModel: self.create_bearer_authenticator,
456474
CheckStreamModel: self.create_check_stream,
457475
CompositeErrorHandlerModel: self.create_composite_error_handler,
476+
CompositeRawDecoderModel: self.create_composite_raw_decoder,
458477
ConcurrencyLevelModel: self.create_concurrency_level,
459478
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy,
460479
CursorPaginationModel: self.create_cursor_pagination,
@@ -485,7 +504,9 @@ def _init_mappings(self) -> None:
485504
InlineSchemaLoaderModel: self.create_inline_schema_loader,
486505
JsonDecoderModel: self.create_json_decoder,
487506
JsonlDecoderModel: self.create_jsonl_decoder,
507+
JsonLineParserModel: self.create_json_line_parser,
488508
GzipJsonDecoderModel: self.create_gzipjson_decoder,
509+
GzipParserModel: self.create_gzip_parser,
489510
KeysToLowerModel: self.create_keys_to_lower_transformation,
490511
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
491512
FlattenFieldsModel: self.create_flatten_fields,
@@ -1701,6 +1722,12 @@ def create_jsonl_decoder(
17011722
) -> JsonlDecoder:
17021723
return JsonlDecoder(parameters={})
17031724

1725+
@staticmethod
1726+
def create_json_line_parser(
1727+
model: JsonLineParserModel, config: Config, **kwargs: Any
1728+
) -> JsonLineParser:
1729+
return JsonLineParser(encoding=model.encoding)
1730+
17041731
@staticmethod
17051732
def create_iterable_decoder(
17061733
model: IterableDecoderModel, config: Config, **kwargs: Any
@@ -1717,6 +1744,22 @@ def create_gzipjson_decoder(
17171744
) -> GzipJsonDecoder:
17181745
return GzipJsonDecoder(parameters={}, encoding=model.encoding)
17191746

1747+
def create_gzip_parser(
1748+
self, model: GzipParserModel, config: Config, **kwargs: Any
1749+
) -> GzipParser:
1750+
inner_parser = self._create_component_from_model(model=model.inner_parser, config=config)
1751+
return GzipParser(inner_parser=inner_parser)
1752+
1753+
@staticmethod
1754+
def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser:
1755+
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)
1756+
1757+
def create_composite_raw_decoder(
1758+
self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any
1759+
) -> CompositeRawDecoder:
1760+
parser = self._create_component_from_model(model=model.parser, config=config)
1761+
return CompositeRawDecoder(parser=parser)
1762+
17201763
@staticmethod
17211764
def create_json_file_schema_loader(
17221765
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any

0 commit comments

Comments
 (0)