-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathcomponents.py
56 lines (47 loc) · 1.97 KB
/
components.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Union
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
@dataclass
class NullCheckedDpathExtractor(RecordExtractor):
"""
Pipedrive requires a custom extractor because the format of its API responses is inconsistent.
Records are typically found in a nested "data" field, but sometimes the "data" field is null.
This extractor checks for null "data" fields and returns the parent object, which contains the record ID, instead.
Example faulty records:
```
{
"item": "file",
"id": <an_id>,
"data": null
},
{
"item": "file",
"id": <another_id>,
"data": null
}
```
"""
field_path: List[Union[InterpolatedString, str]]
nullable_nested_field: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]):
self._dpath_extractor = DpathExtractor(
field_path=self.field_path,
config=self.config,
parameters=parameters,
decoder=self.decoder,
)
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
records = self._dpath_extractor.extract_records(response)
return [record.get(self.nullable_nested_field) or record for record in records]