Skip to content

Commit

Permalink
Move async retriever validation to quit faster
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Feb 12, 2025
1 parent 9706535 commit 63e9951
Showing 1 changed file with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1686,15 +1686,23 @@ def _merge_stream_slicers(
if model.retriever.type == "StateDelegatingRetriever" and not model.incremental_sync:
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")

if model.retriever.type == "AsyncRetriever" and model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the
# support or unordered slices (for example, when we trigger reports for January and February, the report
# in February can be completed first). Once we have support for custom concurrent cursor or have a new
# implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)

if model.retriever.type == "AsyncRetriever" and model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")

stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)

if model.incremental_sync and stream_slicer:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if stream_slicer:
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
Expand Down Expand Up @@ -1740,26 +1748,14 @@ def _merge_stream_slicers(
)
elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
else None
)
return self._create_component_from_model(model=model.incremental_sync, config=config)
elif self._disable_resumable_full_refresh:
return stream_slicer
elif stream_slicer:
Expand Down

0 comments on commit 63e9951

Please sign in to comment.