Skip to content

Commit 857c3e7

Browse files
committed
fix: cr
1 parent bd8b0b9 commit 857c3e7

File tree

14 files changed

+236
-192
lines changed

14 files changed

+236
-192
lines changed

src/cmd/src/bin/greptime.rs

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
use clap::{Parser, Subcommand};
1818
use cmd::datanode::builder::InstanceBuilder;
1919
use cmd::error::{InitTlsProviderSnafu, Result};
20+
use cmd::extension::flownode::DefaultExtensionFactory as FlowExtensionFactory;
21+
use cmd::extension::frontend::DefaultExtensionFactory as FrontendExtensionFactory;
22+
use cmd::extension::standalone::DefaultExtensionFactory as StandaloneExtensionFactory;
2023
use cmd::options::{EmptyOptions, GlobalOptions};
2124
use cmd::{App, cli, datanode, flownode, frontend, metasrv, standalone};
2225
use common_base::Plugins;
2326
use common_version::{verbose_version, version};
27+
use meta_srv::bootstrap::extension::DefaultExtensionFactory as MetaExtensionFactory;
2428
use servers::install_ring_crypto_provider;
2529

2630
#[derive(Parser)]
@@ -113,40 +117,23 @@ async fn start(cli: Command) -> Result<()> {
113117
datanode::SubCommand::Objbench(ref bench) => bench.run().await,
114118
},
115119
SubCommand::Flownode(cmd) => {
116-
cmd.build(
117-
cmd.load_options::<EmptyOptions>(&cli.global_options)?,
118-
Default::default(),
119-
)
120-
.await?
121-
.run()
122-
.await
120+
let opts = cmd.load_options::<EmptyOptions>(&cli.global_options)?;
121+
cmd.build(opts, FlowExtensionFactory).await?.run().await
123122
}
124123
SubCommand::Frontend(cmd) => {
125-
cmd.build(
126-
cmd.load_options::<EmptyOptions>(&cli.global_options)?,
127-
Default::default(),
128-
)
129-
.await?
130-
.run()
131-
.await
124+
let opts = cmd.load_options::<EmptyOptions>(&cli.global_options)?;
125+
cmd.build(opts, FrontendExtensionFactory).await?.run().await
132126
}
133127
SubCommand::Metasrv(cmd) => {
134-
cmd.build(
135-
cmd.load_options::<EmptyOptions>(&cli.global_options)?,
136-
Default::default(),
137-
)
138-
.await?
139-
.run()
140-
.await
128+
let opts = cmd.load_options::<EmptyOptions>(&cli.global_options)?;
129+
cmd.build(opts, MetaExtensionFactory).await?.run().await
141130
}
142131
SubCommand::Standalone(cmd) => {
143-
cmd.build(
144-
cmd.load_options::<EmptyOptions>(&cli.global_options)?,
145-
Default::default(),
146-
)
147-
.await?
148-
.run()
149-
.await
132+
let opts = cmd.load_options::<EmptyOptions>(&cli.global_options)?;
133+
cmd.build(opts, StandaloneExtensionFactory)
134+
.await?
135+
.run()
136+
.await
150137
}
151138
SubCommand::Cli(cmd) => {
152139
cmd.build(cmd.load_options(&cli.global_options)?)

src/cmd/src/cli.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ mod tests {
108108
use common_telemetry::logging::LoggingOptions;
109109

110110
use crate::error::Result as CmdResult;
111+
use crate::extension::standalone::DefaultExtensionFactory as StandaloneExtensionFactory;
111112
use crate::options::{EmptyOptions, GlobalOptions};
112113
use crate::{App, cli, standalone};
113114

@@ -126,7 +127,7 @@ mod tests {
126127
.load_options::<EmptyOptions>(&GlobalOptions::default())
127128
.unwrap();
128129
let mut instance = standalone
129-
.build(standalone_opts, Default::default())
130+
.build(standalone_opts, StandaloneExtensionFactory)
130131
.await?;
131132
instance.start().await?;
132133

src/cmd/src/extension/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub type InformationSchemaTableFactoriesRef = Arc<dyn InformationSchemaTableFact
4242

4343
/// Context for information schema table factory providers.
4444
pub struct TableFactoryContext {
45-
pub fe_client: Option<Arc<FrontendClient>>,
45+
pub fe_client: Arc<FrontendClient>,
4646
}
4747

4848
/// Allows extending the gRPC server with additional services (e.g., enterprise

src/cmd/src/extension/flownode.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,29 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use common_error::ext::BoxedError;
16+
1517
use crate::extension::common::GrpcExtensionRef;
1618

1719
#[derive(Default)]
1820
pub struct Extension {
1921
pub grpc: Option<GrpcExtensionRef>,
2022
}
23+
24+
/// Factory trait to create Extension instances.
25+
pub trait ExtensionFactory: Send + Sync {
26+
fn create(
27+
&self,
28+
ctx: ExtensionContext,
29+
) -> impl Future<Output = Result<Extension, BoxedError>> + Send;
30+
}
31+
32+
pub struct ExtensionContext {}
33+
34+
pub struct DefaultExtensionFactory;
35+
36+
impl ExtensionFactory for DefaultExtensionFactory {
37+
async fn create(&self, _ctx: ExtensionContext) -> Result<Extension, BoxedError> {
38+
Ok(Extension::default())
39+
}
40+
}

src/cmd/src/extension/frontend.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,39 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#[cfg(feature = "enterprise")]
16-
use operator::statement::TriggerQuerierFactoryRef;
15+
use std::collections::HashMap;
1716

18-
use crate::extension::common::InformationSchemaTableFactoriesRef;
17+
use catalog::information_schema::InformationSchemaTableFactoryRef;
18+
use common_error::ext::BoxedError;
19+
use common_meta::kv_backend::KvBackendRef;
20+
use meta_client::MetaClientRef;
21+
#[cfg(feature = "enterprise")]
22+
use operator::statement::TriggerQuerierRef;
1923

2024
#[derive(Default)]
2125
pub struct Extension {
22-
pub info_schema_factories: Option<InformationSchemaTableFactoriesRef>,
26+
pub info_schema_factories: Option<HashMap<String, InformationSchemaTableFactoryRef>>,
2327
#[cfg(feature = "enterprise")]
24-
pub trigger_querier_factory: Option<TriggerQuerierFactoryRef>,
28+
pub trigger_querier: Option<TriggerQuerierRef>,
29+
}
30+
31+
/// Factory trait to create Extension instances.
32+
pub trait ExtensionFactory: Send + Sync {
33+
fn create(
34+
&self,
35+
ctx: ExtensionContext,
36+
) -> impl Future<Output = Result<Extension, BoxedError>> + Send;
37+
}
38+
39+
pub struct ExtensionContext {
40+
pub kv_backend: KvBackendRef,
41+
pub meta_client: MetaClientRef,
42+
}
43+
44+
pub struct DefaultExtensionFactory;
45+
46+
impl ExtensionFactory for DefaultExtensionFactory {
47+
async fn create(&self, _ctx: ExtensionContext) -> Result<Extension, BoxedError> {
48+
Ok(Extension::default())
49+
}
2550
}

src/cmd/src/extension/standalone.rs

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,57 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
use std::sync::Arc;
17+
18+
use catalog::CatalogManagerRef;
19+
use catalog::information_schema::InformationSchemaTableFactoryRef;
20+
use common_error::ext::BoxedError;
1521
#[cfg(feature = "enterprise")]
16-
pub use ee::*;
22+
use common_meta::ddl_manager::TriggerDdlManagerRef;
23+
use common_meta::kv_backend::KvBackendRef;
24+
use flow::FrontendClient;
1725
#[cfg(feature = "enterprise")]
18-
use operator::statement::TriggerQuerierFactoryRef;
26+
use operator::statement::TriggerQuerierRef;
1927

20-
use crate::extension::common::InformationSchemaTableFactoriesRef;
28+
use crate::extension::common::{InformationSchemaTableFactories, TableFactoryContext};
2129

22-
#[cfg(feature = "enterprise")]
23-
mod ee {
24-
use std::sync::Arc;
25-
26-
use catalog::CatalogManagerRef;
27-
use common_error::ext::BoxedError;
28-
use common_meta::ddl_manager::TriggerDdlManagerRef;
29-
use common_meta::kv_backend::KvBackendRef;
30-
use flow::FrontendClient;
31-
32-
#[async_trait::async_trait]
33-
pub trait TriggerDdlManagerFactory: Send + Sync {
34-
async fn create(
35-
&self,
36-
ctx: TriggerDdlManagerRequest,
37-
) -> Result<TriggerDdlManagerRef, BoxedError>;
38-
}
30+
#[derive(Default)]
31+
pub struct Extension {
32+
#[cfg(feature = "enterprise")]
33+
pub trigger_ddl_manager: Option<TriggerDdlManagerRef>,
34+
#[cfg(feature = "enterprise")]
35+
pub trigger_querier: Option<TriggerQuerierRef>,
36+
}
3937

40-
pub type TriggerDdlManagerFactoryRef = Arc<dyn TriggerDdlManagerFactory>;
38+
/// Factory trait to create Extension instances.
39+
pub trait ExtensionFactory: InformationSchemaTableFactories + Send + Sync {
40+
fn create(
41+
&self,
42+
ctx: ExtensionContext,
43+
) -> impl Future<Output = Result<Extension, BoxedError>> + Send;
44+
}
4145

42-
pub struct TriggerDdlManagerRequest {
43-
pub kv_backend: KvBackendRef,
44-
pub catalog_manager: CatalogManagerRef,
45-
pub fe_client: Arc<FrontendClient>,
46+
pub struct ExtensionContext {
47+
pub kv_backend: KvBackendRef,
48+
pub catalog_manager: CatalogManagerRef,
49+
pub frontend_client: Arc<FrontendClient>,
50+
}
51+
52+
pub struct DefaultExtensionFactory;
53+
54+
#[async_trait::async_trait]
55+
impl InformationSchemaTableFactories for DefaultExtensionFactory {
56+
async fn create_factories(
57+
&self,
58+
_ctx: TableFactoryContext,
59+
) -> Result<HashMap<String, InformationSchemaTableFactoryRef>, BoxedError> {
60+
Ok(HashMap::new())
4661
}
4762
}
4863

49-
#[derive(Default)]
50-
pub struct Extension {
51-
pub info_schema_factories: Option<InformationSchemaTableFactoriesRef>,
52-
#[cfg(feature = "enterprise")]
53-
pub trigger_ddl_manager_factory: Option<TriggerDdlManagerFactoryRef>,
54-
#[cfg(feature = "enterprise")]
55-
pub trigger_querier_factory: Option<TriggerQuerierFactoryRef>,
64+
impl ExtensionFactory for DefaultExtensionFactory {
65+
async fn create(&self, _ctx: ExtensionContext) -> Result<Extension, BoxedError> {
66+
Ok(Extension::default())
67+
}
5668
}

src/cmd/src/flownode.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::error::{
4848
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
4949
};
5050
use crate::extension::common::GrpcExtensionContext;
51-
use crate::extension::flownode::Extension;
51+
use crate::extension::flownode::{ExtensionContext, ExtensionFactory};
5252
use crate::options::{GlobalOptions, GreptimeOptions};
5353
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
5454

@@ -109,12 +109,12 @@ pub struct Command {
109109
}
110110

111111
impl Command {
112-
pub async fn build<E: Debug>(
112+
pub async fn build<E: Debug, F: ExtensionFactory>(
113113
&self,
114114
opts: FlownodeOptions<E>,
115-
extension: Extension,
115+
extension_factory: F,
116116
) -> Result<Instance> {
117-
self.subcmd.build(opts, extension).await
117+
self.subcmd.build(opts, extension_factory).await
118118
}
119119

120120
pub fn load_options<E: Configurable>(
@@ -133,13 +133,13 @@ enum SubCommand {
133133
}
134134

135135
impl SubCommand {
136-
async fn build<E: Debug>(
136+
async fn build<E: Debug, F: ExtensionFactory>(
137137
&self,
138138
opts: FlownodeOptions<E>,
139-
extension: Extension,
139+
extension_factory: F,
140140
) -> Result<Instance> {
141141
match self {
142-
SubCommand::Start(cmd) => cmd.build(opts, extension).await,
142+
SubCommand::Start(cmd) => cmd.build(opts, extension_factory).await,
143143
}
144144
}
145145
}
@@ -262,10 +262,10 @@ impl StartCommand {
262262
Ok(())
263263
}
264264

265-
async fn build<E: Debug>(
265+
async fn build<E: Debug, F: ExtensionFactory>(
266266
&self,
267267
opts: FlownodeOptions<E>,
268-
extension: Extension,
268+
extension_factory: F,
269269
) -> Result<Instance> {
270270
common_runtime::init_global_runtimes(&opts.runtime);
271271

@@ -405,14 +405,18 @@ impl StartCommand {
405405

406406
let mut builder =
407407
FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
408-
if let Some(extension) = extension.grpc.as_ref() {
408+
let extension = extension_factory
409+
.create(ExtensionContext {})
410+
.await
411+
.context(OtherSnafu)?;
412+
if let Some(grpc_extension) = extension.grpc {
409413
let ctx = GrpcExtensionContext {
410414
kv_backend: cached_meta_backend.clone(),
411415
fe_client: frontend_client.clone(),
412416
flownode_id: member_id,
413417
catalog_manager: catalog_manager.clone(),
414418
};
415-
extension
419+
grpc_extension
416420
.extend_grpc_services(&mut builder, ctx)
417421
.await
418422
.context(OtherSnafu)?

0 commit comments

Comments
 (0)