diff --git a/src/ctl/src/cmd_impl/throttle.rs b/src/ctl/src/cmd_impl/throttle.rs index c6eaddb4de9d0..f1cb1109fdeff 100644 --- a/src/ctl/src/cmd_impl/throttle.rs +++ b/src/ctl/src/cmd_impl/throttle.rs @@ -15,8 +15,8 @@ use risingwave_pb::common::PbThrottleType; use risingwave_pb::meta::PbThrottleTarget; -use crate::ThrottleCommandArgs; use crate::common::CtlContext; +use crate::{ThrottleCommandArgs, ThrottleTypeArg}; pub async fn apply_throttle( context: &CtlContext, @@ -24,16 +24,13 @@ pub async fn apply_throttle( params: ThrottleCommandArgs, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - // Infer throttle type from target for backward compatibility. - let throttle_type = match kind { - PbThrottleTarget::Source => PbThrottleType::Source, - PbThrottleTarget::Mv => PbThrottleType::Backfill, - PbThrottleTarget::Sink => PbThrottleType::Sink, - PbThrottleTarget::Table | PbThrottleTarget::Fragment | PbThrottleTarget::Unspecified => { - // Default to Backfill for unspecified/unsupported combinations; user should use SQL for table throttling - PbThrottleType::Backfill - } + let throttle_type = match params.throttle_type { + ThrottleTypeArg::Dml => PbThrottleType::Dml, + ThrottleTypeArg::Backfill => PbThrottleType::Backfill, + ThrottleTypeArg::Source => PbThrottleType::Source, + ThrottleTypeArg::Sink => PbThrottleType::Sink, }; + meta_client .apply_throttle(kind, throttle_type, params.id, params.rate) .await?; diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index ea2acbf005801..b3c05d5868e11 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -506,12 +506,28 @@ enum TestCommands { enum ThrottleCommands { Source(ThrottleCommandArgs), Mv(ThrottleCommandArgs), + Sink(ThrottleCommandArgs), +} + +#[derive(Clone, Debug, clap::ValueEnum)] +pub enum ThrottleTypeArg { + Dml, + Backfill, + Source, + Sink, } #[derive(Clone, Debug, Args)] pub struct ThrottleCommandArgs { + /// The ID of the object to throttle + #[clap(long, required = true)] id: u32, + /// The rate limit to apply + #[clap(long)] rate: Option, + /// The type of throttle to apply + #[clap(long, value_enum, required = true)] + throttle_type: ThrottleTypeArg, } #[derive(Subcommand, Clone, Debug)] @@ -921,6 +937,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Throttle(ThrottleCommands::Mv(args)) => { apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?; } + Commands::Throttle(ThrottleCommands::Sink(args)) => { + apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?; + } Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism { table_id, parallelism,