Skip to content

Commit a832fba

Browse files
committed
fix: not set trigger querier
1 parent 9317ad6 commit a832fba

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

src/cmd/src/frontend.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ impl StartCommand {
469469
);
470470
let heartbeat_task = Some(heartbeat_task);
471471

472-
let instance = FrontendBuilder::new(
472+
let builder = FrontendBuilder::new(
473473
opts.clone(),
474474
cached_meta_backend.clone(),
475475
layered_cache_registry.clone(),
@@ -479,10 +479,19 @@ impl StartCommand {
479479
process_manager,
480480
)
481481
.with_plugin(plugins.clone())
482-
.with_local_cache_invalidator(layered_cache_registry)
483-
.try_build()
484-
.await
485-
.context(error::StartFrontendSnafu)?;
482+
.with_local_cache_invalidator(layered_cache_registry);
483+
484+
#[cfg(feature = "enterprise")]
485+
let builder = if let Some(factory) = extension.trigger_querier_factory {
486+
builder.with_trigger_querier(factory)
487+
} else {
488+
builder
489+
};
490+
491+
let instance = builder
492+
.try_build()
493+
.await
494+
.context(error::StartFrontendSnafu)?;
486495
let instance = Arc::new(instance);
487496

488497
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))

src/frontend/src/instance/builder.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use operator::flow::FlowServiceOperator;
3232
use operator::insert::Inserter;
3333
use operator::procedure::ProcedureServiceOperator;
3434
use operator::request::Requester;
35+
#[cfg(feature = "enterprise")]
36+
use operator::statement::TriggerQuerierFactoryRef;
3537
use operator::statement::{StatementExecutor, StatementExecutorRef};
3638
use operator::table::TableMutationOperator;
3739
use partition::manager::PartitionRuleManager;
@@ -58,6 +60,8 @@ pub struct FrontendBuilder {
5860
plugins: Option<Plugins>,
5961
procedure_executor: ProcedureExecutorRef,
6062
process_manager: ProcessManagerRef,
63+
#[cfg(feature = "enterprise")]
64+
trigger_querier_factory: Option<TriggerQuerierFactoryRef>,
6165
}
6266

6367
impl FrontendBuilder {
@@ -81,6 +85,8 @@ impl FrontendBuilder {
8185
plugins: None,
8286
procedure_executor,
8387
process_manager,
88+
#[cfg(feature = "enterprise")]
89+
trigger_querier_factory: None,
8490
}
8591
}
8692

@@ -98,6 +104,14 @@ impl FrontendBuilder {
98104
}
99105
}
100106

107+
#[cfg(feature = "enterprise")]
108+
pub fn with_trigger_querier(self, trigger_querier: TriggerQuerierFactoryRef) -> Self {
109+
Self {
110+
trigger_querier_factory: Some(trigger_querier),
111+
..self
112+
}
113+
}
114+
101115
pub async fn try_build(self) -> Result<Instance> {
102116
let kv_backend = self.kv_backend;
103117
let node_manager = self.node_manager;
@@ -188,12 +202,10 @@ impl FrontendBuilder {
188202
);
189203

190204
#[cfg(feature = "enterprise")]
191-
let statement_executor =
192-
if let Some(factory) = plugins.get::<operator::statement::TriggerQuerierFactoryRef>() {
193-
statement_executor.with_trigger_querier(factory.create(kv_backend.clone()))
194-
} else {
195-
statement_executor
196-
};
205+
if let Some(factory) = self.trigger_querier_factory.as_ref() {
206+
let trigger_querier = factory.create(kv_backend.clone());
207+
statement_executor = statement_executor.with_trigger_querier(trigger_querier);
208+
}
197209

198210
let statement_executor = Arc::new(statement_executor);
199211

0 commit comments

Comments
 (0)