Skip to content

Commit 14375fe

Browse files
maxi297octavia-squidington-iiibrianjlai
authored
feat(concurrent cursor): attempt at clamping datetime (#234)
Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: brianjlai <[email protected]>
1 parent 83dba91 commit 14375fe

File tree

9 files changed

+623
-44
lines changed

9 files changed

+623
-44
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+23
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,29 @@ definitions:
784784
type:
785785
type: string
786786
enum: [DatetimeBasedCursor]
787+
clamping:
788+
title: Date Range Clamping
789+
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
790+
type: object
791+
required:
792+
- target
793+
properties:
794+
target:
795+
title: Target
796+
description: The period of time that datetime windows will be clamped by
797+
# This should ideally be an enum. However, we don't use an enum because we want to allow for connectors
798+
# to support interpolation on the connector config to get the target which is an arbitrary string
799+
type: string
800+
interpolation_context:
801+
- config
802+
examples:
803+
- "DAY"
804+
- "WEEK"
805+
- "MONTH"
806+
- "{{ config['target'] }}"
807+
target_details:
808+
type: object
809+
additionalProperties: true
787810
cursor_field:
788811
title: Cursor Field
789812
description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,16 @@ class Config:
328328
type: Optional[Literal["LegacyToPerPartitionStateMigration"]] = None
329329

330330

331+
class Clamping(BaseModel):
332+
target: str = Field(
333+
...,
334+
description="The period of time that datetime windows will be clamped by",
335+
examples=["DAY", "WEEK", "MONTH", "{{ config['target'] }}"],
336+
title="Target",
337+
)
338+
target_details: Optional[Dict[str, Any]] = None
339+
340+
331341
class Algorithm(Enum):
332342
HS256 = "HS256"
333343
HS384 = "HS384"
@@ -719,7 +729,7 @@ class HttpResponseFilter(BaseModel):
719729
class TypesMap(BaseModel):
720730
target_type: Union[str, List[str]]
721731
current_type: Union[str, List[str]]
722-
condition: Optional[str]
732+
condition: Optional[str] = None
723733

724734

725735
class SchemaTypeIdentifier(BaseModel):
@@ -797,14 +807,11 @@ class DpathFlattenFields(BaseModel):
797807
field_path: List[str] = Field(
798808
...,
799809
description="A path to field that needs to be flattened.",
800-
examples=[
801-
["data"],
802-
["data", "*", "field"],
803-
],
810+
examples=[["data"], ["data", "*", "field"]],
804811
title="Field Path",
805812
)
806813
delete_origin_value: Optional[bool] = Field(
807-
False,
814+
None,
808815
description="Whether to delete the origin value or keep it. Default is False.",
809816
title="Delete Origin Value",
810817
)
@@ -1454,6 +1461,11 @@ class AuthFlow(BaseModel):
14541461

14551462
class DatetimeBasedCursor(BaseModel):
14561463
type: Literal["DatetimeBasedCursor"]
1464+
clamping: Optional[Clamping] = Field(
1465+
None,
1466+
description="This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)",
1467+
title="Date Range Clamping",
1468+
)
14571469
cursor_field: str = Field(
14581470
...,
14591471
description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+78-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import importlib
99
import inspect
1010
import re
11-
import sys
1211
from functools import partial
1312
from typing import (
1413
Any,
@@ -102,6 +101,7 @@
102101
LegacyToPerPartitionStateMigration,
103102
)
104103
from airbyte_cdk.sources.declarative.models import (
104+
Clamping,
105105
CustomStateMigration,
106106
)
107107
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
@@ -462,6 +462,15 @@
462462
LogAppenderMessageRepositoryDecorator,
463463
MessageRepository,
464464
)
465+
from airbyte_cdk.sources.streams.concurrent.clamping import (
466+
ClampingEndProvider,
467+
ClampingStrategy,
468+
DayClampingStrategy,
469+
MonthClampingStrategy,
470+
NoClamping,
471+
WeekClampingStrategy,
472+
Weekday,
473+
)
465474
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
466475
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
467476
CustomFormatConcurrentStreamStateConverter,
@@ -1048,6 +1057,53 @@ def create_concurrent_cursor_from_datetime_based_cursor(
10481057
if evaluated_step:
10491058
step_length = parse_duration(evaluated_step)
10501059

1060+
clamping_strategy: ClampingStrategy = NoClamping()
1061+
if datetime_based_cursor_model.clamping:
1062+
# While it is undesirable to interpolate within the model factory (as opposed to at runtime),
1063+
# it is still better than shifting interpolation low-code concept into the ConcurrentCursor runtime
1064+
# object which we want to keep agnostic of being low-code
1065+
target = InterpolatedString(
1066+
string=datetime_based_cursor_model.clamping.target,
1067+
parameters=datetime_based_cursor_model.parameters or {},
1068+
)
1069+
evaluated_target = target.eval(config=config)
1070+
match evaluated_target:
1071+
case "DAY":
1072+
clamping_strategy = DayClampingStrategy()
1073+
end_date_provider = ClampingEndProvider(
1074+
DayClampingStrategy(is_ceiling=False),
1075+
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1076+
granularity=cursor_granularity or datetime.timedelta(seconds=1),
1077+
)
1078+
case "WEEK":
1079+
if (
1080+
not datetime_based_cursor_model.clamping.target_details
1081+
or "weekday" not in datetime_based_cursor_model.clamping.target_details
1082+
):
1083+
raise ValueError(
1084+
"Given WEEK clamping, weekday needs to be provided as target_details"
1085+
)
1086+
weekday = self._assemble_weekday(
1087+
datetime_based_cursor_model.clamping.target_details["weekday"]
1088+
)
1089+
clamping_strategy = WeekClampingStrategy(weekday)
1090+
end_date_provider = ClampingEndProvider(
1091+
WeekClampingStrategy(weekday, is_ceiling=False),
1092+
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1093+
granularity=cursor_granularity or datetime.timedelta(days=1),
1094+
)
1095+
case "MONTH":
1096+
clamping_strategy = MonthClampingStrategy()
1097+
end_date_provider = ClampingEndProvider(
1098+
MonthClampingStrategy(is_ceiling=False),
1099+
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1100+
granularity=cursor_granularity or datetime.timedelta(days=1),
1101+
)
1102+
case _:
1103+
raise ValueError(
1104+
f"Invalid clamping target {evaluated_target}, expected DAY, WEEK, MONTH"
1105+
)
1106+
10511107
return ConcurrentCursor(
10521108
stream_name=stream_name,
10531109
stream_namespace=stream_namespace,
@@ -1062,7 +1118,27 @@ def create_concurrent_cursor_from_datetime_based_cursor(
10621118
lookback_window=lookback_window,
10631119
slice_range=step_length,
10641120
cursor_granularity=cursor_granularity,
1065-
)
1121+
clamping_strategy=clamping_strategy,
1122+
)
1123+
1124+
def _assemble_weekday(self, weekday: str) -> Weekday:
1125+
match weekday:
1126+
case "MONDAY":
1127+
return Weekday.MONDAY
1128+
case "TUESDAY":
1129+
return Weekday.TUESDAY
1130+
case "WEDNESDAY":
1131+
return Weekday.WEDNESDAY
1132+
case "THURSDAY":
1133+
return Weekday.THURSDAY
1134+
case "FRIDAY":
1135+
return Weekday.FRIDAY
1136+
case "SATURDAY":
1137+
return Weekday.SATURDAY
1138+
case "SUNDAY":
1139+
return Weekday.SUNDAY
1140+
case _:
1141+
raise ValueError(f"Unknown weekday {weekday}")
10661142

10671143
@staticmethod
10681144
def create_constant_backoff_strategy(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
from abc import ABC
2+
from datetime import datetime, timedelta
3+
from enum import Enum
4+
from typing import Callable
5+
6+
from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType
7+
8+
9+
class ClampingStrategy(ABC):
10+
def clamp(self, value: CursorValueType) -> CursorValueType:
11+
raise NotImplementedError()
12+
13+
14+
class NoClamping(ClampingStrategy):
15+
def clamp(self, value: CursorValueType) -> CursorValueType:
16+
return value
17+
18+
19+
class ClampingEndProvider:
20+
def __init__(
21+
self,
22+
clamping_strategy: ClampingStrategy,
23+
end_provider: Callable[[], CursorValueType],
24+
granularity: timedelta,
25+
) -> None:
26+
self._clamping_strategy = clamping_strategy
27+
self._end_provider = end_provider
28+
self._granularity = granularity
29+
30+
def __call__(self) -> CursorValueType:
31+
return self._clamping_strategy.clamp(self._end_provider()) - self._granularity
32+
33+
34+
class DayClampingStrategy(ClampingStrategy):
35+
def __init__(self, is_ceiling: bool = True) -> None:
36+
self._is_ceiling = is_ceiling
37+
38+
def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
39+
return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
40+
if self._is_ceiling:
41+
return return_value + timedelta(days=1)
42+
return return_value
43+
44+
45+
class MonthClampingStrategy(ClampingStrategy):
46+
def __init__(self, is_ceiling: bool = True) -> None:
47+
self._is_ceiling = is_ceiling
48+
49+
def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
50+
return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
51+
needs_to_round = value.day != 1
52+
if not needs_to_round:
53+
return return_value
54+
55+
return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)
56+
57+
def _ceil(self, value: datetime) -> datetime:
58+
return value.replace(
59+
year=value.year + 1 if value.month == 12 else value.year,
60+
month=(value.month % 12) + 1,
61+
day=1,
62+
hour=0,
63+
minute=0,
64+
second=0,
65+
microsecond=0,
66+
)
67+
68+
69+
class Weekday(Enum):
70+
"""
71+
These integer values map to the same ones used by the Datetime.date.weekday() implementation
72+
"""
73+
74+
MONDAY = 0
75+
TUESDAY = 1
76+
WEDNESDAY = 2
77+
THURSDAY = 3
78+
FRIDAY = 4
79+
SATURDAY = 5
80+
SUNDAY = 6
81+
82+
83+
class WeekClampingStrategy(ClampingStrategy):
84+
def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None:
85+
self._day_of_week = day_of_week.value
86+
self._is_ceiling = is_ceiling
87+
88+
def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
89+
days_diff_to_ceiling = (
90+
7 - (value.weekday() - self._day_of_week)
91+
if value.weekday() > self._day_of_week
92+
else abs(value.weekday() - self._day_of_week)
93+
)
94+
delta = (
95+
timedelta(days_diff_to_ceiling)
96+
if self._is_ceiling
97+
else timedelta(days_diff_to_ceiling - 7)
98+
)
99+
return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta

0 commit comments

Comments
 (0)