Skip to content

Commit 70e544c

Browse files
committed
fix: self cr
1 parent 8af0d99 commit 70e544c

File tree

8 files changed

+46
-66
lines changed

8 files changed

+46
-66
lines changed

src/cmd/src/extension/common.rs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,56 +12,18 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
1615
use std::result::Result;
1716
use std::sync::Arc;
1817

19-
use catalog::CatalogManagerRef;
20-
use catalog::information_schema::InformationSchemaTableFactoryRef;
2118
use common_error::ext::BoxedError;
22-
use common_meta::FlownodeId;
23-
use common_meta::kv_backend::KvBackendRef;
24-
use flow::FrontendClient;
2519
use servers::grpc::builder::GrpcServerBuilder;
2620

27-
/// Provides additional information schema table factories beyond the built-in
28-
/// ones.
29-
///
30-
/// These are typically provided by enterprise or optional modules, such as:
31-
/// - `information_schema.triggers`
32-
/// - `information_schema.alerts`
33-
#[async_trait::async_trait]
34-
pub trait InformationSchemaTableFactories: Send + Sync {
35-
async fn create_factories(
36-
&self,
37-
ctx: TableFactoryContext,
38-
) -> Result<HashMap<String, InformationSchemaTableFactoryRef>, BoxedError>;
39-
}
40-
41-
pub type InformationSchemaTableFactoriesRef = Arc<dyn InformationSchemaTableFactories>;
42-
43-
/// Context for information schema table factory providers.
44-
pub struct TableFactoryContext {
45-
pub fe_client: Arc<FrontendClient>,
46-
}
47-
4821
/// Allows extending the gRPC server with additional services (e.g., enterprise
4922
/// features).
5023
#[async_trait::async_trait]
5124
pub trait GrpcExtension: Send + Sync {
52-
async fn extend_grpc_services(
53-
&self,
54-
builder: &mut GrpcServerBuilder,
55-
ctx: GrpcExtensionContext,
56-
) -> Result<(), BoxedError>;
57-
}
58-
59-
/// Context provided to gRPC service extensions during server construction.
60-
pub struct GrpcExtensionContext {
61-
pub kv_backend: KvBackendRef,
62-
pub fe_client: Arc<FrontendClient>,
63-
pub flownode_id: FlownodeId,
64-
pub catalog_manager: CatalogManagerRef,
25+
async fn extend_grpc_services(&self, builder: &mut GrpcServerBuilder)
26+
-> Result<(), BoxedError>;
6527
}
6628

6729
pub type GrpcExtensionRef = Arc<dyn GrpcExtension>;

src/cmd/src/extension/flownode.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use catalog::CatalogManagerRef;
1518
use common_error::ext::BoxedError;
19+
use common_meta::FlownodeId;
20+
use common_meta::kv_backend::KvBackendRef;
21+
use flow::FrontendClient;
1622

1723
use crate::extension::common::GrpcExtensionRef;
1824

25+
/// The extension point for flownode instance.
1926
#[derive(Default)]
2027
pub struct Extension {
2128
pub grpc: Option<GrpcExtensionRef>,
@@ -29,12 +36,19 @@ pub trait ExtensionFactory: Send + Sync {
2936
) -> impl Future<Output = Result<Extension, BoxedError>> + Send;
3037
}
3138

32-
pub struct ExtensionContext {}
39+
/// Context provided to ExtensionFactory during extension creation.
40+
pub struct ExtensionContext {
41+
pub kv_backend: KvBackendRef,
42+
pub fe_client: Arc<FrontendClient>,
43+
pub flownode_id: FlownodeId,
44+
pub catalog_manager: CatalogManagerRef,
45+
}
3346

47+
/// Default no-op implementation of ExtensionFactory.
3448
pub struct DefaultExtensionFactory;
3549

3650
impl ExtensionFactory for DefaultExtensionFactory {
37-
async fn create(&self, _ctx: ExtensionContext) -> Result<Extension, BoxedError> {
51+
async fn create(&self, _: ExtensionContext) -> Result<Extension, BoxedError> {
3852
Ok(Extension::default())
3953
}
4054
}

src/cmd/src/extension/frontend.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use meta_client::MetaClientRef;
2121
#[cfg(feature = "enterprise")]
2222
use operator::statement::TriggerQuerierRef;
2323

24+
/// The extension point for frontend instance.
2425
#[derive(Default)]
2526
pub struct Extension {
2627
pub info_schema_factories: Option<HashMap<String, InformationSchemaTableFactoryRef>>,
@@ -36,15 +37,17 @@ pub trait ExtensionFactory: Send + Sync {
3637
) -> impl Future<Output = Result<Extension, BoxedError>> + Send;
3738
}
3839

40+
/// Context provided to ExtensionFactory during extension creation.
3941
pub struct ExtensionContext {
4042
pub kv_backend: KvBackendRef,
4143
pub meta_client: MetaClientRef,
4244
}
4345

46+
/// Default no-op implementation of ExtensionFactory.
4447
pub struct DefaultExtensionFactory;
4548

4649
impl ExtensionFactory for DefaultExtensionFactory {
47-
async fn create(&self, _ctx: ExtensionContext) -> Result<Extension, BoxedError> {
50+
async fn create(&self, _: ExtensionContext) -> Result<Extension, BoxedError> {
4851
Ok(Extension::default())
4952
}
5053
}

src/cmd/src/extension/standalone.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,22 @@ pub struct ExtensionContext {
5454
/// These are typically provided by enterprise or optional modules, such as:
5555
/// - `information_schema.triggers`
5656
/// - `information_schema.alerts`
57+
///
58+
/// It is separated from the [`ExtensionFactory`] to avoid circular dependencies,
59+
/// since the [`CatalogManagerRef`] of [`ExtensionContext`] is dependent on the
60+
/// [`InformationSchemaTableFactories`].
5761
#[async_trait::async_trait]
5862
pub trait InformationSchemaTableFactories: Send + Sync {
5963
async fn create_factories(
6064
&self,
61-
ctx: TableFactoryContext,
65+
ctx: InfoTableFactoryContext,
6266
) -> Result<HashMap<String, InformationSchemaTableFactoryRef>, BoxedError>;
6367
}
6468

6569
pub type InformationSchemaTableFactoriesRef = Arc<dyn InformationSchemaTableFactories>;
6670

6771
/// Context for information schema table factory providers.
68-
pub struct TableFactoryContext {
72+
pub struct InfoTableFactoryContext {
6973
pub fe_client: Arc<FrontendClient>,
7074
}
7175

@@ -76,7 +80,7 @@ pub struct DefaultExtensionFactory;
7680
impl InformationSchemaTableFactories for DefaultExtensionFactory {
7781
async fn create_factories(
7882
&self,
79-
_ctx: TableFactoryContext,
83+
_ctx: InfoTableFactoryContext,
8084
) -> Result<HashMap<String, InformationSchemaTableFactoryRef>, BoxedError> {
8185
Ok(HashMap::new())
8286
}

src/cmd/src/flownode.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use crate::error::{
4747
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
4848
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
4949
};
50-
use crate::extension::common::GrpcExtensionContext;
5150
use crate::extension::flownode::{ExtensionContext, ExtensionFactory};
5251
use crate::options::{GlobalOptions, GreptimeOptions};
5352
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -403,21 +402,22 @@ impl StartCommand {
403402

404403
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
405404

406-
let mut builder =
407-
FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
405+
let context = ExtensionContext {
406+
kv_backend: cached_meta_backend.clone(),
407+
fe_client: frontend_client.clone(),
408+
flownode_id: member_id,
409+
catalog_manager: catalog_manager.clone(),
410+
};
408411
let extension = extension_factory
409-
.create(ExtensionContext {})
412+
.create(context)
410413
.await
411414
.context(OtherSnafu)?;
415+
416+
let mut builder =
417+
FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
412418
if let Some(grpc_extension) = extension.grpc {
413-
let ctx = GrpcExtensionContext {
414-
kv_backend: cached_meta_backend.clone(),
415-
fe_client: frontend_client.clone(),
416-
flownode_id: member_id,
417-
catalog_manager: catalog_manager.clone(),
418-
};
419419
grpc_extension
420-
.extend_grpc_services(&mut builder, ctx)
420+
.extend_grpc_services(&mut builder)
421421
.await
422422
.context(OtherSnafu)?
423423
}

src/cmd/src/standalone.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use standalone::options::StandaloneOptions;
6666
use tracing_appender::non_blocking::WorkerGuard;
6767

6868
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
69-
use crate::extension::standalone::{ExtensionContext, ExtensionFactory, TableFactoryContext};
69+
use crate::extension::standalone::{ExtensionContext, ExtensionFactory, InfoTableFactoryContext};
7070
use crate::options::{GlobalOptions, GreptimeOptions};
7171
use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
7272

@@ -423,7 +423,7 @@ impl StartCommand {
423423
let frontend_client = Arc::new(frontend_client);
424424

425425
let info_schema_table_factories = extension_factory
426-
.create_factories(TableFactoryContext {
426+
.create_factories(InfoTableFactoryContext {
427427
fe_client: frontend_client.clone(),
428428
})
429429
.await
@@ -522,7 +522,7 @@ impl StartCommand {
522522
.context(error::InitDdlManagerSnafu)?;
523523
#[cfg(feature = "enterprise")]
524524
let ddl_manager = if let Some(trigger_ddl_manager) = extension.trigger_ddl_manager {
525-
ddl_manager.with_trigger_ddl_manager_opt(Some(trigger_ddl_manager))
525+
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
526526
} else {
527527
ddl_manager
528528
};

src/common/meta/src/ddl_manager.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,8 @@ impl DdlManager {
148148
}
149149

150150
#[cfg(feature = "enterprise")]
151-
pub fn with_trigger_ddl_manager_opt(
152-
mut self,
153-
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
154-
) -> Self {
155-
self.trigger_ddl_manager = trigger_ddl_manager;
151+
pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
152+
self.trigger_ddl_manager = Some(trigger_ddl_manager);
156153
self
157154
}
158155

src/meta-srv/src/metasrv/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ impl MetasrvBuilder {
414414
#[cfg(feature = "enterprise")]
415415
let ddl_manager =
416416
if let Some(trigger_ddl_manager) = extension.and_then(|e| e.trigger_ddl_manager) {
417-
ddl_manager.with_trigger_ddl_manager_opt(Some(trigger_ddl_manager))
417+
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
418418
} else {
419419
ddl_manager
420420
};

0 commit comments

Comments
 (0)