Skip to content

Commit 327f5fc

Browse files
added unit tests
1 parent 2bd8dcf commit 327f5fc

File tree

2 files changed

+276
-0
lines changed

2 files changed

+276
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Mapping
6+
7+
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
8+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
9+
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
10+
from airbyte_cdk.sources.types import Config
11+
12+
13+
class CustomStateMigration(StateMigration):
14+
declarative_stream: DeclarativeStream
15+
config: Config
16+
17+
def __init__(self, declarative_stream: DeclarativeStream, config: Config):
18+
self._config = config
19+
self.declarative_stream = declarative_stream
20+
self._cursor = declarative_stream.incremental_sync
21+
self._parameters = declarative_stream.parameters
22+
self._cursor_field = InterpolatedString.create(
23+
self._cursor.cursor_field, parameters=self._parameters
24+
).eval(self._config)
25+
26+
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
27+
return True
28+
29+
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
30+
if not self.should_migrate(stream_state):
31+
return stream_state
32+
updated_at = stream_state[self._cursor.cursor_field]
33+
34+
migrated_stream_state = {
35+
"states": [
36+
{
37+
"partition": {"type": "type_1"},
38+
"cursor": {self._cursor.cursor_field: updated_at},
39+
},
40+
{
41+
"partition": {"type": "type_2"},
42+
"cursor": {self._cursor.cursor_field: updated_at},
43+
},
44+
]
45+
}
46+
47+
return migrated_stream_state

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,235 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
12311231
assert len(party_members_skills_records) == 9
12321232

12331233

1234+
@freezegun.freeze_time(_NOW)
1235+
def test_read_with_state_when_state_migration_was_provided():
1236+
manifest = {
1237+
"version": "5.0.0",
1238+
"definitions": {
1239+
"selector": {
1240+
"type": "RecordSelector",
1241+
"extractor": {"type": "DpathExtractor", "field_path": []},
1242+
},
1243+
"requester": {
1244+
"type": "HttpRequester",
1245+
"url_base": "https://persona.metaverse.com",
1246+
"http_method": "GET",
1247+
"authenticator": {
1248+
"type": "BasicHttpAuthenticator",
1249+
"username": "{{ config['api_key'] }}",
1250+
"password": "{{ config['secret_key'] }}",
1251+
},
1252+
"error_handler": {
1253+
"type": "DefaultErrorHandler",
1254+
"response_filters": [
1255+
{
1256+
"http_codes": [403],
1257+
"action": "FAIL",
1258+
"failure_type": "config_error",
1259+
"error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.",
1260+
},
1261+
{
1262+
"http_codes": [404],
1263+
"action": "IGNORE",
1264+
"error_message": "No data available for the time range requested.",
1265+
},
1266+
],
1267+
},
1268+
},
1269+
"retriever": {
1270+
"type": "SimpleRetriever",
1271+
"record_selector": {"$ref": "#/definitions/selector"},
1272+
"paginator": {"type": "NoPagination"},
1273+
"requester": {"$ref": "#/definitions/requester"},
1274+
},
1275+
"incremental_cursor": {
1276+
"type": "DatetimeBasedCursor",
1277+
"start_datetime": {
1278+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
1279+
},
1280+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
1281+
"datetime_format": "%Y-%m-%d",
1282+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
1283+
"cursor_granularity": "P1D",
1284+
"step": "P15D",
1285+
"cursor_field": "updated_at",
1286+
"lookback_window": "P5D",
1287+
"start_time_option": {
1288+
"type": "RequestOption",
1289+
"field_name": "start",
1290+
"inject_into": "request_parameter",
1291+
},
1292+
"end_time_option": {
1293+
"type": "RequestOption",
1294+
"field_name": "end",
1295+
"inject_into": "request_parameter",
1296+
},
1297+
},
1298+
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
1299+
"base_incremental_stream": {
1300+
"retriever": {
1301+
"$ref": "#/definitions/retriever",
1302+
"requester": {"$ref": "#/definitions/requester"},
1303+
},
1304+
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
1305+
},
1306+
"party_members_stream": {
1307+
"$ref": "#/definitions/base_incremental_stream",
1308+
"retriever": {
1309+
"$ref": "#/definitions/base_incremental_stream/retriever",
1310+
"requester": {
1311+
"$ref": "#/definitions/requester",
1312+
"request_parameters": {"filter": "{{stream_partition['type']}}"},
1313+
},
1314+
"record_selector": {"$ref": "#/definitions/selector"},
1315+
"partition_router": [
1316+
{
1317+
"type": "ListPartitionRouter",
1318+
"values": ["type_1", "type_2"],
1319+
"cursor_field": "type",
1320+
}
1321+
],
1322+
},
1323+
"$parameters": {
1324+
"name": "party_members",
1325+
"primary_key": "id",
1326+
"path": "/party_members",
1327+
},
1328+
"state_migrations": [
1329+
{
1330+
"type": "CustomStateMigration",
1331+
"class_name": "unit_tests.sources.declarative.custom_state_migration.CustomStateMigration",
1332+
}
1333+
],
1334+
"schema_loader": {
1335+
"type": "InlineSchemaLoader",
1336+
"schema": {
1337+
"$schema": "https://json-schema.org/draft-07/schema#",
1338+
"type": "object",
1339+
"properties": {
1340+
"id": {
1341+
"description": "The identifier",
1342+
"type": ["null", "string"],
1343+
},
1344+
"name": {
1345+
"description": "The name of the party member",
1346+
"type": ["null", "string"],
1347+
},
1348+
},
1349+
},
1350+
},
1351+
},
1352+
},
1353+
"streams": [
1354+
"#/definitions/party_members_stream",
1355+
],
1356+
"check": {"stream_names": ["party_members", "locations"]},
1357+
"concurrency_level": {
1358+
"type": "ConcurrencyLevel",
1359+
"default_concurrency": "{{ config['num_workers'] or 10 }}",
1360+
"max_concurrency": 25,
1361+
},
1362+
}
1363+
state = [
1364+
AirbyteStateMessage(
1365+
type=AirbyteStateType.STREAM,
1366+
stream=AirbyteStreamState(
1367+
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
1368+
stream_state=AirbyteStateBlob(updated_at="2024-08-21"),
1369+
),
1370+
),
1371+
]
1372+
catalog = ConfiguredAirbyteCatalog(
1373+
streams=[
1374+
ConfiguredAirbyteStream(
1375+
stream=AirbyteStream(
1376+
name="party_members",
1377+
json_schema={},
1378+
supported_sync_modes=[SyncMode.incremental],
1379+
),
1380+
sync_mode=SyncMode.incremental,
1381+
destination_sync_mode=DestinationSyncMode.append,
1382+
)
1383+
]
1384+
)
1385+
source = ConcurrentDeclarativeSource(
1386+
source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state
1387+
)
1388+
party_members_slices_and_responses = [
1389+
(
1390+
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_1"},
1391+
HttpResponse(
1392+
json.dumps(
1393+
[
1394+
{
1395+
"id": "nijima",
1396+
"first_name": "makoto",
1397+
"last_name": "nijima",
1398+
"updated_at": "2024-08-10",
1399+
"type": 1,
1400+
}
1401+
]
1402+
)
1403+
),
1404+
),
1405+
(
1406+
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_2"},
1407+
HttpResponse(
1408+
json.dumps(
1409+
[
1410+
{
1411+
"id": "nijima",
1412+
"first_name": "makoto",
1413+
"last_name": "nijima",
1414+
"updated_at": "2024-08-10",
1415+
"type": 2,
1416+
}
1417+
]
1418+
)
1419+
),
1420+
),
1421+
(
1422+
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_1"},
1423+
HttpResponse(
1424+
json.dumps(
1425+
[
1426+
{
1427+
"id": "yoshizawa",
1428+
"first_name": "sumire",
1429+
"last_name": "yoshizawa",
1430+
"updated_at": "2024-09-10",
1431+
"type": 1,
1432+
}
1433+
]
1434+
)
1435+
),
1436+
),
1437+
(
1438+
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_2"},
1439+
HttpResponse(
1440+
json.dumps(
1441+
[
1442+
{
1443+
"id": "yoshizawa",
1444+
"first_name": "sumire",
1445+
"last_name": "yoshizawa",
1446+
"updated_at": "2024-09-10",
1447+
"type": 2,
1448+
}
1449+
]
1450+
)
1451+
),
1452+
),
1453+
]
1454+
with HttpMocker() as http_mocker:
1455+
_mock_party_members_requests(http_mocker, party_members_slices_and_responses)
1456+
messages = list(
1457+
source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
1458+
)
1459+
final_state = get_states_for_stream(stream_name="party_members", messages=messages)
1460+
assert state not in final_state
1461+
1462+
12341463
@freezegun.freeze_time(_NOW)
12351464
@patch(
12361465
"airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__",

0 commit comments

Comments
 (0)