Skip to content

Commit 9de6cef

Browse files
darynaishchenkooctavia-squidington-iii
and
octavia-squidington-iii
authored
feat(low-code): added key_transformation to DpathFlattenFields (#472)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent a42ad33 commit 9de6cef

File tree

5 files changed

+163
-4
lines changed

5 files changed

+163
-4
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+26
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,27 @@ definitions:
23072307
$parameters:
23082308
type: object
23092309
additionalProperties: true
2310+
KeyTransformation:
2311+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2312+
type: object
2313+
required:
2314+
- type
2315+
properties:
2316+
type:
2317+
type: string
2318+
enum: [ KeyTransformation ]
2319+
prefix:
2320+
title: Key Prefix
2321+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2322+
type: string
2323+
examples:
2324+
- flattened_
2325+
suffix:
2326+
title: Key Suffix
2327+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2328+
type: string
2329+
examples:
2330+
- _flattened
23102331
DpathFlattenFields:
23112332
title: Dpath Flatten Fields
23122333
description: A transformation that flatten field values to the to top of the record.
@@ -2335,6 +2356,11 @@ definitions:
23352356
title: Replace Origin Record
23362357
description: Whether to replace the origin record or not. Default is False.
23372358
type: boolean
2359+
key_transformation:
2360+
title: Key transformation
2361+
description: Transformation for object keys. If not provided, original key will be used.
2362+
type: object
2363+
"$ref": "#/definitions/KeyTransformation"
23382364
$parameters:
23392365
type: object
23402366
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+24
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,25 @@ class FlattenFields(BaseModel):
879879
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
880880

881881

882+
class KeyTransformation(BaseModel):
883+
prefix: Optional[Union[str, None]] = Field(
884+
None,
885+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886+
examples=[
887+
"flattened_",
888+
],
889+
title="Key Prefix",
890+
)
891+
suffix: Optional[Union[str, None]] = Field(
892+
None,
893+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894+
examples=[
895+
"_flattened",
896+
],
897+
title="Key Suffix",
898+
)
899+
900+
882901
class DpathFlattenFields(BaseModel):
883902
type: Literal["DpathFlattenFields"]
884903
field_path: List[str] = Field(
@@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel):
897916
description="Whether to replace the origin record or not. Default is False.",
898917
title="Replace Origin Record",
899918
)
919+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
920+
None,
921+
description="Transformation for object keys. If not provided, original key will be used.",
922+
title="Key transformation",
923+
)
900924
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
901925

902926

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+12
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@
498498
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
499499
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
500500
DpathFlattenFields,
501+
KeyTransformation,
501502
)
502503
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
503504
FlattenFields,
@@ -790,13 +791,24 @@ def create_dpath_flatten_fields(
790791
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
791792
) -> DpathFlattenFields:
792793
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
794+
key_transformation = (
795+
KeyTransformation(
796+
config=config,
797+
prefix=model.key_transformation.prefix,
798+
suffix=model.key_transformation.suffix,
799+
parameters=model.parameters or {},
800+
)
801+
if model.key_transformation is not None
802+
else None
803+
)
793804
return DpathFlattenFields(
794805
config=config,
795806
field_path=model_field_path,
796807
delete_origin_value=model.delete_origin_value
797808
if model.delete_origin_value is not None
798809
else False,
799810
replace_record=model.replace_record if model.replace_record is not None else False,
811+
key_transformation=key_transformation,
800812
parameters=model.parameters or {},
801813
)
802814

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

+41-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,24 @@
88
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
99

1010

11+
@dataclass
12+
class KeyTransformation:
13+
config: Config
14+
parameters: InitVar[Mapping[str, Any]]
15+
prefix: Optional[str] = None
16+
suffix: Optional[str] = None
17+
18+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
19+
if self.prefix is not None:
20+
self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
21+
self.config
22+
)
23+
if self.suffix is not None:
24+
self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
25+
self.config
26+
)
27+
28+
1129
@dataclass
1230
class DpathFlattenFields(RecordTransformation):
1331
"""
@@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation):
1634
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1735
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1836
replace_record: bool = False whether to replace origin record or not. Default is False.
37+
key_transformation: KeyTransformation = None how to transform extracted object keys
1938
2039
"""
2140

@@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation):
2443
parameters: InitVar[Mapping[str, Any]]
2544
delete_origin_value: bool = False
2645
replace_record: bool = False
46+
key_transformation: Optional[KeyTransformation] = None
2747

2848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49+
self._parameters = parameters
2950
self._field_path = [
30-
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
51+
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
3152
]
3253
for path_index in range(len(self.field_path)):
3354
if isinstance(self.field_path[path_index], str):
3455
self._field_path[path_index] = InterpolatedString.create(
35-
self.field_path[path_index], parameters=parameters
56+
self.field_path[path_index], parameters=self._parameters
3657
)
3758

59+
def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
60+
if self.key_transformation:
61+
if self.key_transformation.prefix:
62+
extracted = {
63+
f"{self.key_transformation.prefix}{key}": value
64+
for key, value in extracted.items()
65+
}
66+
67+
if self.key_transformation.suffix:
68+
extracted = {
69+
f"{key}{self.key_transformation.suffix}": value
70+
for key, value in extracted.items()
71+
}
72+
73+
return extracted
74+
3875
def transform(
3976
self,
4077
record: Dict[str, Any],
@@ -50,6 +87,8 @@ def transform(
5087
extracted = dpath.get(record, path, default=[])
5188

5289
if isinstance(extracted, dict):
90+
extracted = self._apply_key_transformation(extracted)
91+
5392
if self.replace_record and extracted:
5493
dpath.delete(record, "**")
5594
record.update(extracted)

unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py

+60-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
import pytest
22

3-
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields
3+
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
4+
DpathFlattenFields,
5+
KeyTransformation,
6+
)
47

58
_ANY_VALUE = -1
69
_DELETE_ORIGIN_VALUE = True
710
_REPLACE_WITH_VALUE = True
811
_DO_NOT_DELETE_ORIGIN_VALUE = False
912
_DO_NOT_REPLACE_WITH_VALUE = False
13+
_NO_KEY_PREFIX = None
14+
_NO_KEY_SUFFIX = None
15+
_NO_KEY_TRANSFORMATIONS = None
1016

1117

1218
@pytest.mark.parametrize(
@@ -16,6 +22,7 @@
1622
"field_path",
1723
"delete_origin_value",
1824
"replace_record",
25+
"key_transformation",
1926
"expected_record",
2027
],
2128
[
@@ -25,6 +32,7 @@
2532
["field2"],
2633
_DO_NOT_DELETE_ORIGIN_VALUE,
2734
_DO_NOT_REPLACE_WITH_VALUE,
35+
_NO_KEY_TRANSFORMATIONS,
2836
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
2937
id="flatten by dpath, don't delete origin value",
3038
),
@@ -34,6 +42,7 @@
3442
["field2"],
3543
_DELETE_ORIGIN_VALUE,
3644
_DO_NOT_REPLACE_WITH_VALUE,
45+
_NO_KEY_TRANSFORMATIONS,
3746
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
3847
id="flatten by dpath, delete origin value",
3948
),
@@ -46,6 +55,7 @@
4655
["field2", "*", "field4"],
4756
_DO_NOT_DELETE_ORIGIN_VALUE,
4857
_DO_NOT_REPLACE_WITH_VALUE,
58+
_NO_KEY_TRANSFORMATIONS,
4959
{
5060
"field1": _ANY_VALUE,
5161
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
@@ -62,6 +72,7 @@
6272
["field2", "*", "field4"],
6373
_DELETE_ORIGIN_VALUE,
6474
_DO_NOT_REPLACE_WITH_VALUE,
75+
_NO_KEY_TRANSFORMATIONS,
6576
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
6677
id="flatten by dpath with *, delete origin value",
6778
),
@@ -71,6 +82,7 @@
7182
["{{ config['field_path'] }}"],
7283
_DO_NOT_DELETE_ORIGIN_VALUE,
7384
_DO_NOT_REPLACE_WITH_VALUE,
85+
_NO_KEY_TRANSFORMATIONS,
7486
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
7587
id="flatten by dpath from config, don't delete origin value",
7688
),
@@ -80,6 +92,7 @@
8092
["non-existing-field"],
8193
_DO_NOT_DELETE_ORIGIN_VALUE,
8294
_DO_NOT_REPLACE_WITH_VALUE,
95+
_NO_KEY_TRANSFORMATIONS,
8396
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
8497
id="flatten by non-existing dpath, don't delete origin value",
8598
),
@@ -89,6 +102,7 @@
89102
["*", "non-existing-field"],
90103
_DO_NOT_DELETE_ORIGIN_VALUE,
91104
_DO_NOT_REPLACE_WITH_VALUE,
105+
_NO_KEY_TRANSFORMATIONS,
92106
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
93107
id="flatten by non-existing dpath with *, don't delete origin value",
94108
),
@@ -98,6 +112,7 @@
98112
["field2"],
99113
_DO_NOT_DELETE_ORIGIN_VALUE,
100114
_DO_NOT_REPLACE_WITH_VALUE,
115+
_NO_KEY_TRANSFORMATIONS,
101116
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
102117
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
103118
),
@@ -107,6 +122,7 @@
107122
["field2"],
108123
_DO_NOT_DELETE_ORIGIN_VALUE,
109124
_DO_NOT_REPLACE_WITH_VALUE,
125+
_NO_KEY_TRANSFORMATIONS,
110126
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
111127
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
112128
),
@@ -116,6 +132,7 @@
116132
["field2"],
117133
_DO_NOT_DELETE_ORIGIN_VALUE,
118134
_REPLACE_WITH_VALUE,
135+
_NO_KEY_TRANSFORMATIONS,
119136
{"field3": _ANY_VALUE},
120137
id="flatten by dpath, replace with value",
121138
),
@@ -125,20 +142,61 @@
125142
["field2"],
126143
_DELETE_ORIGIN_VALUE,
127144
_REPLACE_WITH_VALUE,
145+
_NO_KEY_TRANSFORMATIONS,
128146
{"field3": _ANY_VALUE},
129147
id="flatten by dpath, delete_origin_value do not affect to replace_record",
130148
),
149+
pytest.param(
150+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
151+
{},
152+
["field2"],
153+
_DO_NOT_DELETE_ORIGIN_VALUE,
154+
_REPLACE_WITH_VALUE,
155+
("prefix_", _NO_KEY_SUFFIX),
156+
{"prefix_field3": _ANY_VALUE},
157+
id="flatten by dpath, not delete origin value, replace record, add keys prefix",
158+
),
159+
pytest.param(
160+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
161+
{},
162+
["field2"],
163+
_DO_NOT_DELETE_ORIGIN_VALUE,
164+
_REPLACE_WITH_VALUE,
165+
(_NO_KEY_PREFIX, "_suffix"),
166+
{"field3_suffix": _ANY_VALUE},
167+
id="flatten by dpath, not delete origin value, replace record, add keys suffix",
168+
),
169+
pytest.param(
170+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
171+
{},
172+
["field2"],
173+
_DO_NOT_DELETE_ORIGIN_VALUE,
174+
_REPLACE_WITH_VALUE,
175+
("prefix_", "_suffix"),
176+
{"prefix_field3_suffix": _ANY_VALUE},
177+
id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix",
178+
),
131179
],
132180
)
133181
def test_dpath_flatten_lists(
134-
input_record, config, field_path, delete_origin_value, replace_record, expected_record
182+
input_record,
183+
config,
184+
field_path,
185+
delete_origin_value,
186+
replace_record,
187+
key_transformation,
188+
expected_record,
135189
):
190+
if key_transformation:
191+
key_transformation = KeyTransformation(config, {}, *key_transformation)
192+
136193
flattener = DpathFlattenFields(
137194
field_path=field_path,
138195
parameters={},
139196
config=config,
140197
delete_origin_value=delete_origin_value,
141198
replace_record=replace_record,
199+
key_transformation=key_transformation,
142200
)
143201
flattener.transform(input_record)
144202
assert input_record == expected_record

0 commit comments

Comments
 (0)