Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (low code)run state migrations for concurrent streams #316

16 changes: 15 additions & 1 deletion airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, MutableMapping

from airbyte_cdk.models import (
AirbyteCatalog,
Expand Down Expand Up @@ -224,6 +224,7 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
stream_state = self._migrate_state(declarative_stream, stream_state)
darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved

retriever = self._get_retriever(declarative_stream, stream_state)

Expand Down Expand Up @@ -331,6 +332,8 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
stream_state = self._migrate_state(declarative_stream, stream_state)

partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
Expand Down Expand Up @@ -521,3 +524,14 @@ def _remove_concurrent_streams_from_catalog(
if stream.stream.name not in concurrent_stream_names
]
)

@staticmethod
def _migrate_state(
declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
) -> MutableMapping[str, Any]:
for state_migration in declarative_stream.state_migrations:
if state_migration.should_migrate(stream_state):
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
stream_state = dict(state_migration.migrate(stream_state))

return stream_state
47 changes: 47 additions & 0 deletions unit_tests/sources/declarative/custom_state_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.types import Config


class CustomStateMigration(StateMigration):
declarative_stream: DeclarativeStream
config: Config

def __init__(self, declarative_stream: DeclarativeStream, config: Config):
self._config = config
self.declarative_stream = declarative_stream
self._cursor = declarative_stream.incremental_sync
self._parameters = declarative_stream.parameters
self._cursor_field = InterpolatedString.create(
self._cursor.cursor_field, parameters=self._parameters
).eval(self._config)

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
if not self.should_migrate(stream_state):
return stream_state
updated_at = stream_state[self._cursor.cursor_field]

migrated_stream_state = {
"states": [
{
"partition": {"type": "type_1"},
"cursor": {self._cursor.cursor_field: updated_at},
},
{
"partition": {"type": "type_2"},
"cursor": {self._cursor.cursor_field: updated_at},
},
]
}

return migrated_stream_state
153 changes: 153 additions & 0 deletions unit_tests/sources/declarative/test_concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,159 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
assert len(party_members_skills_records) == 9


def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifest():
manifest = {
"version": "5.0.0",
"definitions": {
"selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"requester": {
"type": "HttpRequester",
"url_base": "https://persona.metaverse.com",
"http_method": "GET",
"authenticator": {
"type": "BasicHttpAuthenticator",
"username": "{{ config['api_key'] }}",
"password": "{{ config['secret_key'] }}",
},
"error_handler": {
"type": "DefaultErrorHandler",
"response_filters": [
{
"http_codes": [403],
"action": "FAIL",
"failure_type": "config_error",
"error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.",
},
{
"http_codes": [404],
"action": "IGNORE",
"error_message": "No data available for the time range requested.",
},
],
},
},
"retriever": {
"type": "SimpleRetriever",
"record_selector": {"$ref": "#/definitions/selector"},
"paginator": {"type": "NoPagination"},
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_cursor": {
"type": "DatetimeBasedCursor",
"start_datetime": {
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
},
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
"datetime_format": "%Y-%m-%d",
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
"cursor_granularity": "P1D",
"step": "P15D",
"cursor_field": "updated_at",
"lookback_window": "P5D",
"start_time_option": {
"type": "RequestOption",
"field_name": "start",
"inject_into": "request_parameter",
},
"end_time_option": {
"type": "RequestOption",
"field_name": "end",
"inject_into": "request_parameter",
},
},
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
"base_incremental_stream": {
"retriever": {
"$ref": "#/definitions/retriever",
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
},
"party_members_stream": {
"$ref": "#/definitions/base_incremental_stream",
"retriever": {
"$ref": "#/definitions/base_incremental_stream/retriever",
"requester": {
"$ref": "#/definitions/requester",
"request_parameters": {"filter": "{{stream_partition['type']}}"},
},
"record_selector": {"$ref": "#/definitions/selector"},
"partition_router": [
{
"type": "ListPartitionRouter",
"values": ["type_1", "type_2"],
"cursor_field": "type",
}
],
},
"$parameters": {
"name": "party_members",
"primary_key": "id",
"path": "/party_members",
},
"state_migrations": [
{
"type": "CustomStateMigration",
"class_name": "unit_tests.sources.declarative.custom_state_migration.CustomStateMigration",
}
],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"description": "The identifier",
"type": ["null", "string"],
},
"name": {
"description": "The name of the party member",
"type": ["null", "string"],
},
},
},
},
},
},
"streams": [
"#/definitions/party_members_stream",
],
"check": {"stream_names": ["party_members", "locations"]},
"concurrency_level": {
"type": "ConcurrencyLevel",
"default_concurrency": "{{ config['num_workers'] or 10 }}",
"max_concurrency": 25,
},
}
state_blob = AirbyteStateBlob(updated_at="2024-08-21")
state = [
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
stream_state=state_blob,
),
),
]
source = ConcurrentDeclarativeSource(
source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state
)
concurrent_streams, synchronous_streams = source._group_streams(_CONFIG)
assert concurrent_streams[0].cursor.state != state_blob.__dict__
assert concurrent_streams[0].cursor.state == {
"lookback_window": 0,
"states": [
{"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}},
{"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}},
],
"use_global_cursor": False,
}


@freezegun.freeze_time(_NOW)
@patch(
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
Expand Down
Loading