Skip to content

Commit

Permalink
Auto-fix lint and format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
octavia-squidington-iii committed Feb 12, 2025
1 parent 53b2980 commit 14138ed
Showing 1 changed file with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1680,7 +1680,12 @@ def create_declarative_stream(

def _build_stream_slicer_from_partition_router(
self,
model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
model: Union[
AsyncRetrieverModel,
CustomRetrieverModel,
SimpleRetrieverModel,
StateDelegatingRetrieverModel,
],
config: Config,
) -> Optional[PartitionRouter]:
if (
Expand Down Expand Up @@ -1726,7 +1731,10 @@ def _build_incremental_cursor(
cursor_component = self._create_component_from_model(
model=incremental_sync_model, config=config
)
is_global_cursor = hasattr(incremental_sync_model, "global_substream_cursor") and incremental_sync_model.global_substream_cursor
is_global_cursor = (
hasattr(incremental_sync_model, "global_substream_cursor")
and incremental_sync_model.global_substream_cursor
)

if is_global_cursor:
return GlobalSubstreamCursor(
Expand Down Expand Up @@ -1756,7 +1764,12 @@ def _build_incremental_cursor(

def _build_resumable_cursor(
self,
model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
model: Union[
AsyncRetrieverModel,
CustomRetrieverModel,
SimpleRetrieverModel,
StateDelegatingRetrieverModel,
],
stream_slicer: Optional[PartitionRouter],
) -> Optional[StreamSlicer]:
if hasattr(model, "paginator") and model.paginator and not stream_slicer:
Expand All @@ -1778,12 +1791,17 @@ def _merge_stream_slicers(
if model.retriever.type == "StateDelegatingRetriever" and not model.incremental_sync:
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")

if model.retriever.type == "AsyncRetriever" and model.incremental_sync.type != "DatetimeBasedCursor":
if (
model.retriever.type == "AsyncRetriever"
and model.incremental_sync.type != "DatetimeBasedCursor"
):
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the
# support or unordered slices (for example, when we trigger reports for January and February, the report
# in February can be completed first). Once we have support for custom concurrent cursor or have a new
# implementation available in the CDK, we can enable more cursors here.
raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet.")
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet."
)

if model.retriever.type == "AsyncRetriever" and model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged
Expand Down Expand Up @@ -2059,9 +2077,7 @@ def create_dynamic_schema_loader(
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
) -> DynamicSchemaLoader:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor(
model.retriever, stream_slicer
)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

schema_transformations = []
if model.schema_transformations:
Expand Down Expand Up @@ -2916,9 +2932,7 @@ def create_http_components_resolver(
self, model: HttpComponentsResolverModel, config: Config
) -> Any:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor(
model.retriever, stream_slicer
)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

retriever = self._create_component_from_model(
model=model.retriever,
Expand Down

0 comments on commit 14138ed

Please sign in to comment.