-
Notifications
You must be signed in to change notification settings - Fork 715
feat(frontend): support sink backfill rate limit #24075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces support for separate backfill rate limiting on sinks by decoupling ThrottleTarget (what to throttle) from ThrottleType (how to throttle). This allows users to independently control both the sink execution rate and the sink backfill rate.
Key Changes:
- Introduced
ThrottleTypeenum to distinguish between DML, Backfill, Source, and Sink rate limit types - Added
ALTER SINK SET BACKFILL_RATE_LIMITSQL syntax support - Updated all executors to check
throttle_typebefore applying rate limits - Modified RPC interfaces to accept both
throttle_targetandthrottle_typeparameters
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/meta.proto | Added ThrottleType enum and updated ApplyThrottleRequest to include both target and type |
| proto/stream_plan.proto | Added nested ThrottleType enum in ThrottleMutation and added throttle_type field |
| src/sqlparser/src/ast/ddl.rs | Added SetBackfillRateLimit variant to AlterSinkOperation enum |
| src/sqlparser/src/parser.rs | Added parsing logic for ALTER SINK SET BACKFILL_RATE_LIMIT syntax |
| src/frontend/src/meta_client.rs | Updated apply_throttle method signature to accept both target and type |
| src/frontend/src/test_utils.rs | Updated mock implementation to match new signature |
| src/frontend/src/handler/mod.rs | Added handler routing for sink backfill rate limit operations |
| src/frontend/src/handler/alter_streaming_rate_limit.rs | Updated logic to handle different throttle target/type combinations |
| src/rpc_client/src/meta_client.rs | Updated apply_throttle to pass both target and type parameters |
| src/meta/service/src/stream_service.rs | Updated throttle application logic to match on (type, target) tuples |
| src/meta/service/src/ddl_service.rs | Updated to pass ThrottleType::Source when creating throttle commands |
| src/meta/src/barrier/command.rs | Modified Command::Throttle to include ThrottleType parameter |
| src/meta/src/barrier/info.rs | Updated pattern matching for throttle commands |
| src/meta/src/barrier/context/context_impl.rs | Updated pattern matching for throttle commands |
| src/stream/src/executor/mod.rs | Updated Mutation::Throttle to include throttle_type field and conversion logic |
| src/stream/src/executor/sink.rs | Added throttle type check to only apply sink rate limits |
| src/stream/src/executor/dml.rs | Added throttle type check to only apply DML rate limits |
| src/stream/src/executor/source/source_executor.rs | Added throttle type check to only apply source rate limits |
| src/stream/src/executor/source/source_backfill_executor.rs | Added throttle type check to only apply backfill rate limits |
| src/stream/src/executor/source/iceberg_fetch_executor.rs | Added throttle type check to only apply source rate limits |
| src/stream/src/executor/source/fs_fetch_executor.rs | Added throttle type check to only apply source rate limits |
| src/stream/src/executor/backfill/no_shuffle_backfill.rs | Added throttle type check to only apply backfill rate limits |
| src/stream/src/executor/backfill/arrangement_backfill.rs | Added throttle type check to only apply backfill rate limits |
| src/stream/src/executor/backfill/cdc/cdc_backfill.rs | Added throttle type check to only apply backfill rate limits |
| src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs | Added throttle type check to only apply backfill rate limits |
| src/ctl/src/cmd_impl/throttle.rs | Added logic to infer throttle type from target for backward compatibility |
| CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl, | ||
| }; | ||
| use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw}; | ||
| use risingwave_pb::meta::ThrottleType; |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with other executor files in this PR, consider importing ThrottleType from crate::executor rather than directly from risingwave_pb::meta. Most other executors use use crate::executor::ThrottleType; (see source_executor.rs, sink.rs, etc.).
| use risingwave_pb::meta::ThrottleType; | |
| use crate::executor::ThrottleType; |
|
@copilot please submit a pr to fix the review comments. |
| // limitations under the License. | ||
|
|
||
| use risingwave_pb::common::PbThrottleType; | ||
| use risingwave_pb::meta::PbThrottleTarget; | ||
|
|
||
| use crate::ThrottleCommandArgs; | ||
| use crate::common::CtlContext; | ||
|
|
||
| pub async fn apply_throttle( | ||
| context: &CtlContext, | ||
| kind: PbThrottleTarget, | ||
| params: ThrottleCommandArgs, | ||
| ) -> anyhow::Result<()> { | ||
| let meta_client = context.meta_client().await?; | ||
| // Infer throttle type from target for backward compatibility. | ||
| let throttle_type = match kind { | ||
| PbThrottleTarget::Source => PbThrottleType::Source, | ||
| PbThrottleTarget::Mv => PbThrottleType::Backfill, | ||
| PbThrottleTarget::Sink => PbThrottleType::Sink, | ||
| PbThrottleTarget::Table | PbThrottleTarget::Fragment | PbThrottleTarget::Unspecified => { | ||
| // Default to Backfill for unspecified/unsupported combinations; user should use SQL for table throttling | ||
| PbThrottleType::Backfill | ||
| } | ||
| }; | ||
| meta_client | ||
| .apply_throttle(kind, params.id, params.rate) | ||
| .apply_throttle(kind, throttle_type, params.id, params.rate) | ||
| .await?; | ||
| Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed, we should supply both throttle type and throttle target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot please help to update the ctl interface to support this behaviour. Add a new cli option for this.
Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: kwannoel <[email protected]>
8757617 to
3c0155a
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
ThrottleTargetintoThrottleTargetandThrottleType.ThrottleTyperefers to the type of rate limit we want to apply.ThrottleTargetrefers to the actual object we are targeting (sink, index, fragment, etc...).Checklist
Documentation
Release note