Skip to content

Commit

Permalink
Auto-fix lint and format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
octavia-squidington-iii committed Feb 12, 2025
1 parent d3a83a4 commit 666c4fa
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1784,21 +1784,29 @@ def _build_resumable_cursor(
return None

def _merge_stream_slicers(
self, model: DeclarativeStreamModel, config: Config
self, model: DeclarativeStreamModel, config: Config
) -> Optional[StreamSlicer]:
if model.retriever.type == "StateDelegatingRetriever" and not model.incremental_sync:
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")

if model.retriever.type == "AsyncRetriever":
is_not_datetime_cursor = model.incremental_sync.type != "DatetimeBasedCursor" if model.incremental_sync else None
is_partition_router = bool(model.retriever.partition_router) if model.incremental_sync else None
is_not_datetime_cursor = (
model.incremental_sync.type != "DatetimeBasedCursor"
if model.incremental_sync
else None
)
is_partition_router = (
bool(model.retriever.partition_router) if model.incremental_sync else None
)

if is_not_datetime_cursor:
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the
# support or unordered slices (for example, when we trigger reports for January and February, the report
# in February can be completed first). Once we have support for custom concurrent cursor or have a new
# implementation available in the CDK, we can enable more cursors here.
raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet.")
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet."
)

if is_partition_router:
# Note that this development is also done in parallel to the per partition development which once merged
Expand All @@ -1810,7 +1818,11 @@ def _merge_stream_slicers(
if model.incremental_sync:
return self._build_incremental_cursor(model, stream_slicer, config)

return stream_slicer if self._disable_resumable_full_refresh else self._build_resumable_cursor(model.retriever, stream_slicer)
return (
stream_slicer
if self._disable_resumable_full_refresh
else self._build_resumable_cursor(model.retriever, stream_slicer)
)

def create_default_error_handler(
self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any
Expand Down

0 comments on commit 666c4fa

Please sign in to comment.