Skip to content

Commit 53b2980

Browse files
authored
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
2 parents b78cc6e + fdcded3 commit 53b2980

27 files changed

+1475
-404
lines changed

.github/workflows/pypi_publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ jobs:
146146
(github.event_name == 'push' &&
147147
startsWith(github.ref, 'refs/tags/v')
148148
) || github.event.inputs.publish_to_pypi == 'true'
149-
uses: pypa/[email protected].3
149+
uses: pypa/[email protected].4
150150

151151
publish_sdm:
152152
name: Publish SDM to DockerHub

airbyte_cdk/sources/declarative/auth/oauth.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from datetime import timedelta
6+
from datetime import datetime, timedelta
77
from typing import Any, List, Mapping, MutableMapping, Optional, Union
88

99
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
@@ -232,8 +232,13 @@ def get_refresh_request_headers(self) -> Mapping[str, Any]:
232232
return self._refresh_request_headers.eval(self.config)
233233

234234
def get_token_expiry_date(self) -> AirbyteDateTime:
235+
if not self._has_access_token_been_initialized():
236+
return AirbyteDateTime.from_datetime(datetime.min)
235237
return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
236238

239+
def _has_access_token_been_initialized(self) -> bool:
240+
return self._access_token is not None
241+
237242
def set_token_expiry_date(self, value: Union[str, int]) -> None:
238243
self._token_expiry_date = self._parse_token_expiration_date(value)
239244

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
import logging
6-
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple
6+
from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
77

88
from airbyte_cdk.models import (
99
AirbyteCatalog,
@@ -224,6 +224,7 @@ def _group_streams(
224224
stream_state = self._connector_state_manager.get_stream_state(
225225
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
226226
)
227+
stream_state = self._migrate_state(declarative_stream, stream_state)
227228

228229
retriever = self._get_retriever(declarative_stream, stream_state)
229230

@@ -331,6 +332,8 @@ def _group_streams(
331332
stream_state = self._connector_state_manager.get_stream_state(
332333
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
333334
)
335+
stream_state = self._migrate_state(declarative_stream, stream_state)
336+
334337
partition_router = declarative_stream.retriever.stream_slicer._partition_router
335338

336339
perpartition_cursor = (
@@ -521,3 +524,14 @@ def _remove_concurrent_streams_from_catalog(
521524
if stream.stream.name not in concurrent_stream_names
522525
]
523526
)
527+
528+
@staticmethod
529+
def _migrate_state(
530+
declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
531+
) -> MutableMapping[str, Any]:
532+
for state_migration in declarative_stream.state_migrations:
533+
if state_migration.should_migrate(stream_state):
534+
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
535+
stream_state = dict(state_migration.migrate(stream_state))
536+
537+
return stream_state

0 commit comments

Comments
 (0)