Skip to content

Commit

Permalink
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
Browse files Browse the repository at this point in the history
…airbytehq/airbyte-python-cdk into lazebnyi/add-state-delegating-retriever
  • Loading branch information
lazebnyi committed Feb 12, 2025
2 parents bb3b176 + 14138ed commit 66001f1
Show file tree
Hide file tree
Showing 27 changed files with 1,500 additions and 415 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ jobs:
(github.event_name == 'push' &&
startsWith(github.ref, 'refs/tags/v')
) || github.event.inputs.publish_to_pypi == 'true'
uses: pypa/[email protected].3
uses: pypa/[email protected].4

publish_sdm:
name: Publish SDM to DockerHub
Expand Down
7 changes: 6 additions & 1 deletion airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
Expand Down Expand Up @@ -232,8 +232,13 @@ def get_refresh_request_headers(self) -> Mapping[str, Any]:
return self._refresh_request_headers.eval(self.config)

def get_token_expiry_date(self) -> AirbyteDateTime:
if not self._has_access_token_been_initialized():
return AirbyteDateTime.from_datetime(datetime.min)
return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

def _has_access_token_been_initialized(self) -> bool:
return self._access_token is not None

def set_token_expiry_date(self, value: Union[str, int]) -> None:
self._token_expiry_date = self._parse_token_expiration_date(value)

Expand Down
16 changes: 15 additions & 1 deletion airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple
from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple

from airbyte_cdk.models import (
AirbyteCatalog,
Expand Down Expand Up @@ -224,6 +224,7 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
stream_state = self._migrate_state(declarative_stream, stream_state)

retriever = self._get_retriever(declarative_stream, stream_state)

Expand Down Expand Up @@ -331,6 +332,8 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
stream_state = self._migrate_state(declarative_stream, stream_state)

partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
Expand Down Expand Up @@ -521,3 +524,14 @@ def _remove_concurrent_streams_from_catalog(
if stream.stream.name not in concurrent_stream_names
]
)

@staticmethod
def _migrate_state(
declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
) -> MutableMapping[str, Any]:
for state_migration in declarative_stream.state_migrations:
if state_migration.should_migrate(stream_state):
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
stream_state = dict(state_migration.migrate(stream_state))

return stream_state
Loading

0 comments on commit 66001f1

Please sign in to comment.