Skip to content

Commit ad20739

Browse files
devin-ai-integration[bot]aaronsteersoctavia-squidington-iii
authored
chore: remove pendulum dependency (#258)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Aaron <AJ> Steers <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 128b678 commit ad20739

File tree

20 files changed

+1036
-2000192
lines changed

20 files changed

+1036
-2000192
lines changed

airbyte_cdk/cli/source_declarative_manifest/_run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import sys
2222
import traceback
2323
from collections.abc import Mapping
24-
from datetime import datetime
2524
from pathlib import Path
2625
from typing import Any, cast
2726

@@ -44,6 +43,7 @@
4443
)
4544
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
4645
from airbyte_cdk.sources.source import TState
46+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
4747

4848

4949
class SourceLocalYaml(YamlDeclarativeSource):
@@ -101,7 +101,7 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
101101
type=Type.TRACE,
102102
trace=AirbyteTraceMessage(
103103
type=TraceType.ERROR,
104-
emitted_at=int(datetime.now().timestamp() * 1000),
104+
emitted_at=ab_datetime_now().to_epoch_millis(),
105105
error=AirbyteErrorTraceMessage(
106106
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
107107
stack_trace=traceback.format_exc(),
@@ -191,7 +191,7 @@ def create_declarative_source(
191191
type=Type.TRACE,
192192
trace=AirbyteTraceMessage(
193193
type=TraceType.ERROR,
194-
emitted_at=int(datetime.now().timestamp() * 1000),
194+
emitted_at=ab_datetime_now().to_epoch_millis(),
195195
error=AirbyteErrorTraceMessage(
196196
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
197197
stack_trace=traceback.format_exc(),

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#
44

55
import dataclasses
6-
from datetime import datetime
76
from typing import Any, List, Mapping
87

98
from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
@@ -21,6 +20,7 @@
2120
ModelToComponentFactory,
2221
)
2322
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
23+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
2424
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2525

2626
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
@@ -114,4 +114,4 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
114114

115115

116116
def _emitted_at() -> int:
117-
return int(datetime.now().timestamp()) * 1000
117+
return ab_datetime_now().to_epoch_millis()

airbyte_cdk/sources/declarative/auth/oauth.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6+
from datetime import timedelta
67
from typing import Any, List, Mapping, MutableMapping, Optional, Union
78

8-
import pendulum
9-
109
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
1110
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1211
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
@@ -18,6 +17,7 @@
1817
from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import (
1918
SingleUseRefreshTokenOauth2Authenticator,
2019
)
20+
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
2121

2222

2323
@dataclass
@@ -53,7 +53,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut
5353
refresh_token: Optional[Union[InterpolatedString, str]] = None
5454
scopes: Optional[List[str]] = None
5555
token_expiry_date: Optional[Union[InterpolatedString, str]] = None
56-
_token_expiry_date: Optional[pendulum.DateTime] = field(init=False, repr=False, default=None)
56+
_token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
5757
token_expiry_date_format: Optional[str] = None
5858
token_expiry_is_time_of_expiration: bool = False
5959
access_token_name: Union[InterpolatedString, str] = "access_token"
@@ -122,15 +122,24 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
122122
self._refresh_request_headers = InterpolatedMapping(
123123
self.refresh_request_headers or {}, parameters=parameters
124124
)
125-
self._token_expiry_date: pendulum.DateTime = (
126-
pendulum.parse(
127-
InterpolatedString.create(self.token_expiry_date, parameters=parameters).eval(
128-
self.config
125+
try:
126+
if (
127+
isinstance(self.token_expiry_date, (int, str))
128+
and str(self.token_expiry_date).isdigit()
129+
):
130+
self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
131+
else:
132+
self._token_expiry_date = (
133+
ab_datetime_parse(
134+
InterpolatedString.create(
135+
self.token_expiry_date, parameters=parameters
136+
).eval(self.config)
137+
)
138+
if self.token_expiry_date
139+
else ab_datetime_now() - timedelta(days=1)
129140
)
130-
) # type: ignore # pendulum.parse returns a datetime in this context
131-
if self.token_expiry_date
132-
else pendulum.now().subtract(days=1) # type: ignore # substract does not have type hints
133-
)
141+
except ValueError as e:
142+
raise ValueError(f"Invalid token expiry date format: {e}")
134143
self.use_profile_assertion = (
135144
InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
136145
if isinstance(self.use_profile_assertion, str)
@@ -222,8 +231,8 @@ def get_refresh_request_body(self) -> Mapping[str, Any]:
222231
def get_refresh_request_headers(self) -> Mapping[str, Any]:
223232
return self._refresh_request_headers.eval(self.config)
224233

225-
def get_token_expiry_date(self) -> pendulum.DateTime:
226-
return self._token_expiry_date # type: ignore # _token_expiry_date is a pendulum.DateTime. It is never None despite what mypy thinks
234+
def get_token_expiry_date(self) -> AirbyteDateTime:
235+
return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
227236

228237
def set_token_expiry_date(self, value: Union[str, int]) -> None:
229238
self._token_expiry_date = self._parse_token_expiration_date(value)

airbyte_cdk/sources/declarative/auth/token_provider.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
from typing import Any, List, Mapping, Optional, Union
1010

1111
import dpath
12-
import pendulum
1312
from isodate import Duration
14-
from pendulum import DateTime
1513

1614
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
1715
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
@@ -21,6 +19,7 @@
2119
from airbyte_cdk.sources.http_logger import format_http_message
2220
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
2321
from airbyte_cdk.sources.types import Config
22+
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now
2423

2524

2625
class TokenProvider:
@@ -38,7 +37,7 @@ class SessionTokenProvider(TokenProvider):
3837
message_repository: MessageRepository = NoopMessageRepository()
3938
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
4039

41-
_next_expiration_time: Optional[DateTime] = None
40+
_next_expiration_time: Optional[AirbyteDateTime] = None
4241
_token: Optional[str] = None
4342

4443
def get_token(self) -> str:
@@ -48,7 +47,7 @@ def get_token(self) -> str:
4847
return self._token
4948

5049
def _refresh_if_necessary(self) -> None:
51-
if self._next_expiration_time is None or self._next_expiration_time < pendulum.now():
50+
if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now():
5251
self._refresh()
5352

5453
def _refresh(self) -> None:
@@ -65,7 +64,7 @@ def _refresh(self) -> None:
6564
raise ReadException("Failed to get session token, response got ignored by requester")
6665
session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
6766
if self.expiration_duration is not None:
68-
self._next_expiration_time = pendulum.now() + self.expiration_duration
67+
self._next_expiration_time = ab_datetime_now() + self.expiration_duration
6968
self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
7069

7170

airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from datetime import datetime, timedelta, timezone
77
from typing import Any, Callable, List, MutableMapping, Optional, Tuple
88

9-
import pendulum
10-
from pendulum.datetime import DateTime
11-
129
# FIXME We would eventually like the Concurrent package do be agnostic of the declarative package. However, this is a breaking change and
1310
# the goal in the short term is only to fix the issue we are seeing for source-declarative-manifest.
1411
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
@@ -17,6 +14,7 @@
1714
AbstractStreamStateConverter,
1815
ConcurrencyCompatibleStateType,
1916
)
17+
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
2018

2119

2220
class DateTimeStreamStateConverter(AbstractStreamStateConverter):
@@ -36,7 +34,7 @@ def zero_value(self) -> datetime:
3634

3735
@classmethod
3836
def get_end_provider(cls) -> Callable[[], datetime]:
39-
return lambda: datetime.now(timezone.utc)
37+
return ab_datetime_now
4038

4139
@abstractmethod
4240
def increment(self, timestamp: datetime) -> datetime: ...
@@ -136,10 +134,10 @@ def output_format(self, timestamp: datetime) -> int:
136134
return int(timestamp.timestamp())
137135

138136
def parse_timestamp(self, timestamp: int) -> datetime:
139-
dt_object = pendulum.from_timestamp(timestamp)
140-
if not isinstance(dt_object, DateTime):
137+
dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc)
138+
if not isinstance(dt_object, AirbyteDateTime):
141139
raise ValueError(
142-
f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})"
140+
f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})"
143141
)
144142
return dt_object
145143

@@ -169,22 +167,33 @@ def __init__(
169167
def increment(self, timestamp: datetime) -> datetime:
170168
return timestamp + self._cursor_granularity
171169

172-
def output_format(self, timestamp: datetime) -> Any:
173-
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
170+
def output_format(self, timestamp: datetime) -> str:
171+
"""Format datetime with milliseconds always included.
172+
173+
Args:
174+
timestamp: The datetime to format.
175+
176+
Returns:
177+
str: ISO8601/RFC3339 formatted string with milliseconds.
178+
"""
179+
dt = AirbyteDateTime.from_datetime(timestamp)
180+
# Always include milliseconds, even if zero
181+
millis = dt.microsecond // 1000 if dt.microsecond else 0
182+
return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z"
174183

175184
def parse_timestamp(self, timestamp: str) -> datetime:
176-
dt_object = pendulum.parse(timestamp)
177-
if not isinstance(dt_object, DateTime):
185+
dt_object = ab_datetime_parse(timestamp)
186+
if not isinstance(dt_object, AirbyteDateTime):
178187
raise ValueError(
179-
f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})"
188+
f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})"
180189
)
181190
return dt_object
182191

183192

184193
class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
185194
"""
186195
Datetime State converter that emits state according to the supplied datetime format. The converter supports reading
187-
incoming state in any valid datetime format via Pendulum.
196+
incoming state in any valid datetime format using AirbyteDateTime parsing utilities.
188197
"""
189198

190199
def __init__(

airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
import logging
66
from abc import abstractmethod
7+
from datetime import timedelta
78
from json import JSONDecodeError
89
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union
910

1011
import backoff
11-
import pendulum
1212
import requests
1313
from requests.auth import AuthBase
1414

@@ -17,6 +17,7 @@
1717
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
1818
from airbyte_cdk.utils import AirbyteTracedException
1919
from airbyte_cdk.utils.airbyte_secrets_utils import add_to_secrets
20+
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
2021

2122
from ..exceptions import DefaultBackoffException
2223

@@ -72,7 +73,7 @@ def get_access_token(self) -> str:
7273

7374
def token_has_expired(self) -> bool:
7475
"""Returns True if the token is expired"""
75-
return pendulum.now() > self.get_token_expiry_date() # type: ignore # this is always a bool despite what mypy thinks
76+
return ab_datetime_now() > self.get_token_expiry_date()
7677

7778
def build_refresh_request_body(self) -> Mapping[str, Any]:
7879
"""
@@ -179,7 +180,7 @@ def refresh_access_token(self) -> Tuple[str, Union[str, int]]:
179180
self.get_expires_in_name()
180181
]
181182

182-
def _parse_token_expiration_date(self, value: Union[str, int]) -> pendulum.DateTime:
183+
def _parse_token_expiration_date(self, value: Union[str, int]) -> AirbyteDateTime:
183184
"""
184185
Return the expiration datetime of the refresh token
185186
@@ -191,9 +192,19 @@ def _parse_token_expiration_date(self, value: Union[str, int]) -> pendulum.DateT
191192
raise ValueError(
192193
f"Invalid token expiry date format {self.token_expiry_date_format}; a string representing the format is required."
193194
)
194-
return pendulum.from_format(str(value), self.token_expiry_date_format)
195+
try:
196+
return ab_datetime_parse(str(value))
197+
except ValueError as e:
198+
raise ValueError(f"Invalid token expiry date format: {e}")
195199
else:
196-
return pendulum.now().add(seconds=int(float(value)))
200+
try:
201+
# Only accept numeric values (as int/float/string) when no format specified
202+
seconds = int(float(str(value)))
203+
return ab_datetime_now() + timedelta(seconds=seconds)
204+
except (ValueError, TypeError):
205+
raise ValueError(
206+
f"Invalid expires_in value: {value}. Expected number of seconds when no format specified."
207+
)
197208

198209
@property
199210
def token_expiry_is_time_of_expiration(self) -> bool:
@@ -244,7 +255,7 @@ def get_scopes(self) -> List[str]:
244255
"""List of requested scopes"""
245256

246257
@abstractmethod
247-
def get_token_expiry_date(self) -> pendulum.DateTime:
258+
def get_token_expiry_date(self) -> AirbyteDateTime:
248259
"""Expiration date of the access token"""
249260

250261
@abstractmethod

0 commit comments

Comments
 (0)