Skip to content

Commit 19b76fe

Browse files
committed
At record extraction step, in each record add the service field $root holding a reference to: * the root response object, when parsing JSON format * the original record, when parsing JSONL format that each record to process is extracted from. More service fields could be added in future. The service fields are available in the record's filtering and transform steps. Avoid: * reusing the maps/dictionaries produced, thus avoid building cyclic structures * transforming the service fields in the Flatten transformation. Explicitly cleanup the service field(s) after the transform step, thus making them: * local for the filter and transform steps * not visible to the next mapping and store steps (as they should be) * not visible in the tests beyond the test_record_selector (as they should be) This allows the record transformation logic to define its "local variables" to reuse some interim calculations. The contract of body parsing seems irregular in representing the cases of bad JSON, no JSON and empty JSON. Cannot be unified as that that irregularity is already used. Update the development environment setup documentation * to organize and present the setup steps explicitly * to avoid misunderstandings and wasted efforts. Update CONTRIBUTING.md to * collect and organize the knowledge on running the test locally. * state the actual testing steps. * clarify and make explicit the procedures and steps. The unit, integration, and acceptance tests in this exactly version succeed under Fedora 41, while one of them fails under Oracle Linux 8.7. not related to the contents of this PR. The integration tests of the CDK fail due to missing `secrets/config.json` file for the Shopify source. See airbytehq#197 Polish Integrate the DpathEnhancingExtractor in the UI of Airbyte. Created a DPath Enhancing Extractor Refactored the record enhancement logic - moved to the extracted class Split the tests of DPathExtractor and DPathEnhancingExtractor Fix the failing tests: FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_custom_components[test_create_custom_component_with_subcomponent_that_uses_parameters] FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_custom_components_do_not_contain_extra_fields FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_parse_custom_component_fields_if_subcomponent FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_page_increment FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_offset_increment FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[simple_unstructured_scenario] FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[no_file_extension_unstructured_scenario] They faile because of comparing string and int values of the page_size (public) attribute. Imposed an invariant: on construction, page_size can be set to a string or int keep only values of one type in page_size for uniform comparison (convert the values of the other type) _page_size holds the internal / working value ... unless manipulated directly. Merged: feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework (airbytehq#228) fix(low-code): Fix declarative low-code state migration in SubstreamPartitionRouter (airbytehq#267) feat: combine slash command jobs into single job steps (airbytehq#266) feat(low-code): add items and property mappings to dynamic schemas (airbytehq#256) feat: add help response for unrecognized slash commands (airbytehq#264) ci: post direct links to html connector test reports (airbytehq#252) (airbytehq#263) fix(low-code): Fix legacy state migration in SubstreamPartitionRouter (airbytehq#261) fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254) feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236) feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111) fix: don't mypy unit_tests (airbytehq#241) fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225) feat(concurrent cursor): attempt at clamping datetime (airbytehq#234) fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254) feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236) feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111) fix: don't mypy unit_tests (airbytehq#241) fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225) feat(concurrent cursor): attempt at clamping datetime (airbytehq#234) ci: use `ubuntu-24.04` explicitly (resolves CI warnings) (airbytehq#244) Fix(sdm): module ref issue in python components import (airbytehq#243) feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174) chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133) docs: comments on what the `Dockerfile` is for (airbytehq#240) chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237) Fix(sdm): module ref issue in python components import (airbytehq#243) feat(low-code): add DpathFlattenFields (airbytehq#227) feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174) chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133) docs: comments on what the `Dockerfile` is for (airbytehq#240) chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237) formatted Update record_extractor.py Trigger a new build. Hopefully, the integration test infrastructure is fixed. Update CONTRIBUTING.md Trigger a new build
1 parent 6260248 commit 19b76fe

33 files changed

+1665
-133
lines changed

airbyte_cdk/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
from .sources.declarative.declarative_stream import DeclarativeStream
9393
from .sources.declarative.decoders import Decoder, JsonDecoder
9494
from .sources.declarative.exceptions import ReadException
95-
from .sources.declarative.extractors import DpathExtractor, RecordSelector
95+
from .sources.declarative.extractors import DpathEnhancingExtractor, DpathExtractor, RecordSelector
9696
from .sources.declarative.extractors.record_extractor import RecordExtractor
9797
from .sources.declarative.extractors.record_filter import RecordFilter
9898
from .sources.declarative.incremental import DatetimeBasedCursor
@@ -234,6 +234,7 @@
234234
"DefaultPaginator",
235235
"DefaultRequestOptionsProvider",
236236
"DpathExtractor",
237+
"DpathEnhancingExtractor",
237238
"FieldPointer",
238239
"HttpMethod",
239240
"HttpRequester",

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,6 +1459,39 @@ definitions:
14591459
$parameters:
14601460
type: object
14611461
additionalProperties: true
1462+
1463+
DpathEnhancingExtractor:
1464+
title: Dpath Enhancing Extractor
1465+
description: |
1466+
Extract records from a response, navigating a path as an array of fields. Include $parent and $root service fields where:
1467+
$root holds the original response;
1468+
$parent holds the attributes container object, including its $parent field.
1469+
These service fields are local for the filter and transform phases.
1470+
type: object
1471+
required:
1472+
- type
1473+
- field_path
1474+
properties:
1475+
type:
1476+
type: string
1477+
enum: [DpathEnhancingExtractor]
1478+
field_path:
1479+
title: Field Path
1480+
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
1481+
type: array
1482+
items:
1483+
- type: string
1484+
interpolation_context:
1485+
- config
1486+
examples:
1487+
- ["data"]
1488+
- ["data", "records"]
1489+
- ["data", "{{ parameters.name }}"]
1490+
- ["data", "*", "record"]
1491+
$parameters:
1492+
type: object
1493+
additionalProperties: true
1494+
14621495
ResponseToFileExtractor:
14631496
title: CSV To File Extractor
14641497
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
@@ -2775,6 +2808,7 @@ definitions:
27752808
anyOf:
27762809
- "$ref": "#/definitions/CustomRecordExtractor"
27772810
- "$ref": "#/definitions/DpathExtractor"
2811+
- "$ref": "#/definitions/DpathEnhancingExtractor"
27782812
record_filter:
27792813
title: Record Filter
27802814
description: Responsible for filtering records to be emitted by the Source.

airbyte_cdk/sources/declarative/decoders/json_decoder.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,19 @@ def decode(
3535
try:
3636
body_json = response.json()
3737
yield from self.parse_body_json(body_json)
38-
except requests.exceptions.JSONDecodeError:
39-
logger.warning(
40-
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
41-
)
42-
yield {}
38+
except requests.exceptions.JSONDecodeError as ex:
39+
logger.warning("Response cannot be parsed into json: %s", ex)
40+
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
41+
yield {} # Keep the exiting contract
4342

4443
@staticmethod
4544
def parse_body_json(
4645
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
4746
) -> Generator[MutableMapping[str, Any], None, None]:
48-
if not isinstance(body_json, list):
49-
body_json = [body_json]
50-
if len(body_json) == 0:
51-
yield {}
52-
else:
47+
if isinstance(body_json, list):
5348
yield from body_json
49+
else:
50+
yield from [body_json]
5451

5552

5653
@dataclass

airbyte_cdk/sources/declarative/extractors/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.sources.declarative.extractors.dpath_enhancing_extractor import (
6+
DpathEnhancingExtractor,
7+
)
58
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
69
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
710
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
@@ -15,6 +18,7 @@
1518
"TypeTransformer",
1619
"HttpSelector",
1720
"DpathExtractor",
21+
"DpathEnhancingExtractor",
1822
"RecordFilter",
1923
"RecordSelector",
2024
"ResponseToFileExtractor",
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass, field
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
7+
8+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
9+
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
10+
SERVICE_KEY_PREFIX,
11+
add_service_key,
12+
is_service_key,
13+
)
14+
15+
# The name of the service key that holds a reference to the original root response
16+
SERVICE_KEY_ROOT = "root"
17+
18+
# The name of the service key that holds a reference to the owner object
19+
SERVICE_KEY_PARENT = "parent"
20+
21+
22+
@dataclass
23+
class DpathEnhancingExtractor(DpathExtractor):
24+
"""
25+
Navigate a path through the JSON structure to the records to retrieve. Extend the records with service fields
26+
applicable to their filtering and transformation.
27+
28+
Like the DpathExtractor, extract records from a response by following a path of names of nested objects,
29+
while adding specific service fields to the extracted records to facilitate the further processing.
30+
31+
Service fields:
32+
root: Binds the original response body, the record was extracted from. This allows the record access any attribute
33+
in any nested object, navigating from the root field.
34+
35+
parent: Binds a map of the parent object's attributes, including its "parent" service field. This way the extracted
36+
record has access to the attributes of any object This is especially useful when the records are extracted from
37+
nested lists.
38+
39+
Example:
40+
body: {"a":1, "b":2, "c":{"d":4}}\n
41+
path: {c}\n
42+
record: {"d":4,"parent": { "a":1, "b":2}, "root": { "a":1, "b":2, "c":{"d":4}}}\n
43+
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...
44+
45+
Example:
46+
body: {"a":1, "b":2, "c":[{"d":4},{"e",5}]}\n
47+
path: {c, *}\n
48+
record 1: {"d":4, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
49+
record 2: {"e",5, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
50+
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...
51+
52+
Example:
53+
body: { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}\n
54+
path: {c,e}\n
55+
record: {"f":6, "parent": {"d":4, parent: { "a":1, "b":2}},"root":{ "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}}\n
56+
access: {{ record.f }}, {{ record["parent"].d }}, {{ record["parent"]["parent"].a }},\n
57+
{{ record["parent"]["parent"].b }},{{ record.["root"].a }}, {{ record.["root"].a.c.d }}...
58+
59+
Note:
60+
The names of the service fields have a specific prefix like $ set in SERVICE_KEY_PREFIX.\n
61+
When the result record is the body object itself, then the "parent" service field is not set (as it is None).\n
62+
When the parent contains no attributes and no parent service field, the parent field is not bound.\n
63+
The "root" service field is always set in the result record.
64+
"""
65+
66+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
67+
"""
68+
See DpathExtractor
69+
"""
70+
super().__post_init__(parameters)
71+
72+
def update_body(self, body: Any) -> Any:
73+
"""
74+
In each nested object in the body add a service key "parent" to refer to the owner object.
75+
For the root object/body the owner is None.
76+
Example:
77+
body = { "a":1, "b":2, "c":{"d":4}}
78+
result = { "a":1,
79+
"b":2,
80+
"c":{"d":4,
81+
parent: { "a":1, "b":2}}}
82+
83+
Example:
84+
body = { "a":1, "b":2, "c":[{"d":4},{"e",5}]}
85+
result = { "a":1,
86+
"b":2,
87+
"c":[{"d":4, "parent":{ "a":1, "b":2}},
88+
{"e",5, "parent":{ "a":1, "b":2}}],
89+
}
90+
91+
Example:
92+
body = { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}
93+
result = { "a":1,
94+
"b":2,
95+
"c":{"d":4,
96+
parent: { "a":1, "b":2},
97+
"e":{"f":6,
98+
"parent": {"d":4,
99+
parent: { "a":1, "b":2}} }}}
100+
101+
:param body: the original response body. Not to be changed
102+
:return: a copy of the body, where the nested objects have the "parent" service field bound to the map of the
103+
parent object's attributes (including its "parent" service fields). This way any record that will be
104+
extracted from the nested objects will have access to any parent's attributes still avoiding loops
105+
in the JSON structure.
106+
"""
107+
return self._set_parent(body, None)
108+
109+
def _set_parent(self, body: Any, parent: Any) -> Any:
110+
"""
111+
:param body: the original response body. Not to be changed
112+
:param parent: none or the parent object that owns/has as nested the body object
113+
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
114+
"""
115+
if isinstance(body, dict):
116+
result: dict[str, Any] = dict()
117+
if parent:
118+
result = add_service_key(result, SERVICE_KEY_PARENT, parent)
119+
attributes_only = dict(result)
120+
attributes_only.update(
121+
{
122+
k: v
123+
for k, v in body.items()
124+
if v and not isinstance(v, dict) and not isinstance(v, list)
125+
}
126+
)
127+
for k, v in body.items():
128+
result[k] = self._set_parent(v, attributes_only)
129+
return result
130+
elif isinstance(body, list):
131+
return [self._set_parent(v, parent) for v in body]
132+
else:
133+
return body
134+
135+
def update_record(self, record: Any, root: Any) -> Any:
136+
"""
137+
Change the extracted record in a subclass-specific way. Override in subclasses.
138+
:param record: the original extracted record. Not to be changed. Not None.
139+
:param root: the original body the record is extracted from.
140+
:return: a copy of the record changed or enanced in a subclass-specific way.
141+
"""
142+
return add_service_key(record, SERVICE_KEY_ROOT, root)

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,44 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7070

7171
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
7272
for body in self.decoder.decode(response):
73-
if len(self._field_path) == 0:
74-
extracted = body
73+
if body == {}:
74+
# An empty/invalid JSON parsed, keep the contract
75+
yield {}
7576
else:
76-
path = [path.eval(self.config) for path in self._field_path]
77-
if "*" in path:
78-
extracted = dpath.values(body, path)
77+
root_response = body
78+
body = self.update_body(root_response)
79+
80+
if len(self._field_path) == 0:
81+
extracted = body
7982
else:
80-
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
81-
if isinstance(extracted, list):
82-
yield from extracted
83-
elif extracted:
84-
yield extracted
85-
else:
86-
yield from []
83+
path = [path.eval(self.config) for path in self._field_path]
84+
if "*" in path:
85+
extracted = dpath.values(body, path)
86+
else:
87+
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
88+
if isinstance(extracted, list):
89+
for record in extracted:
90+
yield self.update_record(record, root_response)
91+
elif isinstance(extracted, dict):
92+
yield self.update_record(extracted, root_response)
93+
elif extracted:
94+
yield extracted
95+
else:
96+
yield from []
97+
98+
def update_body(self, body: Any) -> Any:
99+
"""
100+
Change the original response in a subclass-specific way. Override in subclasses.
101+
:param body: the original response body. Not to be changed
102+
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
103+
"""
104+
return body
105+
106+
def update_record(self, record: Any, root: Any) -> Any:
107+
"""
108+
Change the extracted record in a subclass-specific way. Override in subclasses.
109+
:param record: the original extracted record. Not to be changed. Not None.
110+
:param root: the original body the record is extracted from.
111+
:return: a copy of the record changed or enanced in a subclass-specific way.
112+
"""
113+
return record

airbyte_cdk/sources/declarative/extractors/record_extractor.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,46 @@
77

88
import requests
99

10+
# Convention:
11+
# - The record extractors may leave service fields bound in the extracted records (mappings).
12+
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix.
13+
# - The service fields are kept only during the record's filtering and transformation.
14+
SERVICE_KEY_PREFIX = "$"
15+
16+
17+
def add_service_key(mapping: Mapping[str, Any], key: str, value: Any) -> dict[str, Any]:
18+
"""
19+
:param mapping: non-null mapping
20+
:param key: the name of the key, not including any specific prefixes
21+
:param value: the value to bind
22+
:return: a non-null copy of the mappibg including a new key-value pair, where the key is prefixed as service field.
23+
"""
24+
result = dict(mapping)
25+
result[SERVICE_KEY_PREFIX + key] = value
26+
return result
27+
28+
29+
def exclude_service_keys(struct: Any) -> Any:
30+
"""
31+
:param struct: any object/JSON structure
32+
:return: a copy of struct without any service fields at any level of nesting
33+
"""
34+
if isinstance(struct, dict):
35+
return {k: exclude_service_keys(v) for k, v in struct.items() if not is_service_key(k)}
36+
elif isinstance(struct, list):
37+
return [exclude_service_keys(v) for v in struct]
38+
else:
39+
return struct
40+
41+
42+
def is_service_key(key: str) -> bool:
43+
return key.startswith(SERVICE_KEY_PREFIX)
44+
45+
46+
def remove_service_keys(records: Iterable[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]:
47+
for record in records:
48+
yield exclude_service_keys(record)
49+
1050

1151
@dataclass
1252
class RecordExtractor:

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
import requests
99

1010
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
11-
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
11+
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
12+
RecordExtractor,
13+
remove_service_keys,
14+
)
1215
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1316
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
1417
TypeTransformer as DeclarativeTypeTransformer,
1518
)
1619
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
17-
from airbyte_cdk.sources.declarative.models import SchemaNormalization
1820
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1921
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2022
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -106,7 +108,8 @@ def filter_and_transform(
106108
"""
107109
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
108110
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
109-
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
111+
no_service_fields_data = remove_service_keys(transformed_data)
112+
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
110113
for data in normalized_data:
111114
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
112115

0 commit comments

Comments
 (0)