Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (low code)run state migrations for concurrent streams #316

Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
for state_migration in declarative_stream.state_migrations:
darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
if state_migration.should_migrate(stream_state):
stream_state = state_migration.migrate(stream_state)

darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
retriever = self._get_retriever(declarative_stream, stream_state)

Expand Down Expand Up @@ -331,6 +334,10 @@ def _group_streams(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
for state_migration in declarative_stream.state_migrations:
if state_migration.should_migrate(stream_state):
stream_state = state_migration.migrate(stream_state)

darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
Expand Down
47 changes: 47 additions & 0 deletions unit_tests/sources/declarative/custom_state_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.types import Config


class CustomStateMigration(StateMigration):
declarative_stream: DeclarativeStream
config: Config

def __init__(self, declarative_stream: DeclarativeStream, config: Config):
self._config = config
self.declarative_stream = declarative_stream
self._cursor = declarative_stream.incremental_sync
self._parameters = declarative_stream.parameters
self._cursor_field = InterpolatedString.create(
self._cursor.cursor_field, parameters=self._parameters
).eval(self._config)

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
if not self.should_migrate(stream_state):
return stream_state
updated_at = stream_state[self._cursor.cursor_field]

migrated_stream_state = {
"states": [
{
"partition": {"type": "type_1"},
"cursor": {self._cursor.cursor_field: updated_at},
},
{
"partition": {"type": "type_2"},
"cursor": {self._cursor.cursor_field: updated_at},
},
]
}

return migrated_stream_state
229 changes: 229 additions & 0 deletions unit_tests/sources/declarative/test_concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,235 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
assert len(party_members_skills_records) == 9


@freezegun.freeze_time(_NOW)
def test_read_with_state_when_state_migration_was_provided():
manifest = {
"version": "5.0.0",
"definitions": {
"selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"requester": {
"type": "HttpRequester",
"url_base": "https://persona.metaverse.com",
"http_method": "GET",
"authenticator": {
"type": "BasicHttpAuthenticator",
"username": "{{ config['api_key'] }}",
"password": "{{ config['secret_key'] }}",
},
"error_handler": {
"type": "DefaultErrorHandler",
"response_filters": [
{
"http_codes": [403],
"action": "FAIL",
"failure_type": "config_error",
"error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.",
},
{
"http_codes": [404],
"action": "IGNORE",
"error_message": "No data available for the time range requested.",
},
],
},
},
"retriever": {
"type": "SimpleRetriever",
"record_selector": {"$ref": "#/definitions/selector"},
"paginator": {"type": "NoPagination"},
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_cursor": {
"type": "DatetimeBasedCursor",
"start_datetime": {
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
},
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
"datetime_format": "%Y-%m-%d",
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
"cursor_granularity": "P1D",
"step": "P15D",
"cursor_field": "updated_at",
"lookback_window": "P5D",
"start_time_option": {
"type": "RequestOption",
"field_name": "start",
"inject_into": "request_parameter",
},
"end_time_option": {
"type": "RequestOption",
"field_name": "end",
"inject_into": "request_parameter",
},
},
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
"base_incremental_stream": {
"retriever": {
"$ref": "#/definitions/retriever",
"requester": {"$ref": "#/definitions/requester"},
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
},
"party_members_stream": {
"$ref": "#/definitions/base_incremental_stream",
"retriever": {
"$ref": "#/definitions/base_incremental_stream/retriever",
"requester": {
"$ref": "#/definitions/requester",
"request_parameters": {"filter": "{{stream_partition['type']}}"},
},
"record_selector": {"$ref": "#/definitions/selector"},
"partition_router": [
{
"type": "ListPartitionRouter",
"values": ["type_1", "type_2"],
"cursor_field": "type",
}
],
},
"$parameters": {
"name": "party_members",
"primary_key": "id",
"path": "/party_members",
},
"state_migrations": [
{
"type": "CustomStateMigration",
"class_name": "unit_tests.sources.declarative.custom_state_migration.CustomStateMigration",
}
],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"description": "The identifier",
"type": ["null", "string"],
},
"name": {
"description": "The name of the party member",
"type": ["null", "string"],
},
},
},
},
},
},
"streams": [
"#/definitions/party_members_stream",
],
"check": {"stream_names": ["party_members", "locations"]},
"concurrency_level": {
"type": "ConcurrencyLevel",
"default_concurrency": "{{ config['num_workers'] or 10 }}",
"max_concurrency": 25,
},
}
state = [
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
stream_state=AirbyteStateBlob(updated_at="2024-08-21"),
),
),
]
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="party_members",
json_schema={},
supported_sync_modes=[SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
]
)
source = ConcurrentDeclarativeSource(
source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state
)
party_members_slices_and_responses = [
(
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_1"},
HttpResponse(
json.dumps(
[
{
"id": "nijima",
"first_name": "makoto",
"last_name": "nijima",
"updated_at": "2024-08-10",
"type": 1,
}
]
)
),
),
(
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_2"},
HttpResponse(
json.dumps(
[
{
"id": "nijima",
"first_name": "makoto",
"last_name": "nijima",
"updated_at": "2024-08-10",
"type": 2,
}
]
)
),
),
(
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_1"},
HttpResponse(
json.dumps(
[
{
"id": "yoshizawa",
"first_name": "sumire",
"last_name": "yoshizawa",
"updated_at": "2024-09-10",
"type": 1,
}
]
)
),
),
(
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_2"},
HttpResponse(
json.dumps(
[
{
"id": "yoshizawa",
"first_name": "sumire",
"last_name": "yoshizawa",
"updated_at": "2024-09-10",
"type": 2,
}
]
)
),
),
]
with HttpMocker() as http_mocker:
_mock_party_members_requests(http_mocker, party_members_slices_and_responses)
messages = list(
source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
)
final_state = get_states_for_stream(stream_name="party_members", messages=messages)
assert state not in final_state
darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved


@freezegun.freeze_time(_NOW)
@patch(
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",
Expand Down
Loading