Skip to content

feat(low-code concurrent): Concurrent execution for streams without partition routers nor cursor #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
67 changes: 43 additions & 24 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ def _group_streams(
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous
if isinstance(declarative_stream, DeclarativeStream):
if (
isinstance(declarative_stream, DeclarativeStream)
and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"]
== "SimpleRetriever"
):
incremental_sync_component_definition = name_to_stream_mapping[
declarative_stream.name
].get("incremental_sync")
Expand All @@ -210,36 +214,30 @@ def _group_streams(
.get("retriever")
.get("partition_router")
)
is_without_partition_router_or_cursor = not bool(
incremental_sync_component_definition
) and not bool(partition_router_component_definition)

is_substream_without_incremental = (
partition_router_component_definition
and not incremental_sync_component_definition
)

if (
incremental_sync_component_definition
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
if self._is_datetime_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

cursor, connector_state_converter = (
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)

partition_generator = StreamSlicerPartitionGenerator(
Expand All @@ -263,14 +261,19 @@ def _group_streams(
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key,
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
and hasattr(
cursor.cursor_field, "cursor_field_key"
) # FIXME this will need to be updated once we do the per partition
else None,
logger=self.logger,
cursor=cursor,
)
)
elif is_substream_without_incremental and hasattr(
declarative_stream.retriever, "stream_slicer"
):
elif (
is_substream_without_incremental or is_without_partition_router_or_cursor
) and hasattr(declarative_stream.retriever, "stream_slicer"):
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
Expand Down Expand Up @@ -310,6 +313,22 @@ def _group_streams(

return concurrent_streams, synchronous_streams

def _is_datetime_incremental_without_partition_routing(
self,
declarative_stream: DeclarativeStream,
incremental_sync_component_definition: Mapping[str, Any],
) -> bool:
return (
bool(incremental_sync_component_definition)
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
)

def _stream_supports_concurrent_partition_processing(
self, declarative_stream: DeclarativeStream
) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
Union,
get_args,
Expand Down Expand Up @@ -760,7 +759,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
config: Config,
stream_state: MutableMapping[str, Any],
**kwargs: Any,
) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]:
) -> ConcurrentCursor:
component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(
Expand Down Expand Up @@ -891,23 +890,20 @@ def create_concurrent_cursor_from_datetime_based_cursor(
if evaluated_step:
step_length = parse_duration(evaluated_step)

return (
ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
),
connector_state_converter,
return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
)

@staticmethod
Expand Down
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/streams/concurrent/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ def get_primary_key_from_stream(
elif isinstance(stream_primary_key, str):
return [stream_primary_key]
elif isinstance(stream_primary_key, list):
if len(stream_primary_key) > 0 and all(isinstance(k, str) for k in stream_primary_key):
are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key)
are_all_elements_list_of_size_one = all(
isinstance(k, list) and len(k) == 1 for k in stream_primary_key
)

if are_all_elements_str:
return stream_primary_key # type: ignore # We verified all items in the list are strings
elif are_all_elements_list_of_size_one:
return list(map(lambda x: x[0], stream_primary_key))
else:
raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3062,7 +3062,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
"lookback_window": "P3D",
}

concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down Expand Up @@ -3094,6 +3094,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
assert concurrent_cursor._end_provider() == expected_end
assert concurrent_cursor._concurrent_state == expected_concurrent_state

stream_state_converter = concurrent_cursor._connector_state_converter
assert isinstance(stream_state_converter, CustomFormatConcurrentStreamStateConverter)
assert stream_state_converter._datetime_format == expected_datetime_format
assert stream_state_converter._is_sequential_state
Expand Down Expand Up @@ -3194,7 +3195,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor(
stream_state={},
)
else:
concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down Expand Up @@ -3251,7 +3252,7 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
"lookback_window": "P3D",
}

concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down
Loading
Loading