Skip to content

Commit 2cc6e17

Browse files
committed
add cmdline flag to disable broadcasted reducer arguments
1 parent d4021b6 commit 2cc6e17

File tree

4 files changed

+46
-17
lines changed

4 files changed

+46
-17
lines changed

crates/core/src/host/host_controller.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ pub struct HostController {
9292
pub page_pool: PagePool,
9393
/// The runtimes for running our modules.
9494
runtimes: Arc<HostRuntimes>,
95+
/// Flag for hiding reducer callback arguments from broadcasting to non-caller
96+
disable_reducer_args: bool,
9597
}
9698

9799
struct HostRuntimes {
@@ -166,6 +168,7 @@ impl HostController {
166168
program_storage: ProgramStorage,
167169
energy_monitor: Arc<impl EnergyMonitor>,
168170
durability: Arc<dyn DurabilityProvider>,
171+
disable_reducer_args: bool,
169172
) -> Self {
170173
Self {
171174
hosts: <_>::default(),
@@ -176,6 +179,7 @@ impl HostController {
176179
runtimes: HostRuntimes::new(&data_dir),
177180
data_dir,
178181
page_pool: PagePool::new(default_config.page_pool_max_size),
182+
disable_reducer_args,
179183
}
180184
}
181185

@@ -523,9 +527,11 @@ async fn make_replica_ctx(
523527
database: Database,
524528
replica_id: u64,
525529
relational_db: Arc<RelationalDB>,
530+
disable_reducer_args: bool,
526531
) -> anyhow::Result<ReplicaContext> {
527532
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
528533
let subscriptions = Arc::new(RwLock::new(SubscriptionManager::default()));
534+
subscriptions.write_arc().set_disable_reducer_args(disable_reducer_args);
529535
let downgraded = Arc::downgrade(&subscriptions);
530536
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);
531537

@@ -611,11 +617,12 @@ async fn launch_module(
611617
energy_monitor: Arc<dyn EnergyMonitor>,
612618
replica_dir: ReplicaDir,
613619
runtimes: Arc<HostRuntimes>,
620+
disable_reducer_args: bool,
614621
) -> anyhow::Result<(Program, LaunchedModule)> {
615622
let db_identity = database.database_identity;
616623
let host_type = database.host_type;
617624

618-
let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
625+
let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db, disable_reducer_args)
619626
.await
620627
.map(Arc::new)?;
621628
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
@@ -768,6 +775,7 @@ impl Host {
768775
energy_monitor.clone(),
769776
replica_dir,
770777
runtimes.clone(),
778+
host_controller.disable_reducer_args,
771779
)
772780
.await?;
773781

@@ -858,6 +866,7 @@ impl Host {
858866
Arc::new(NullEnergyMonitor),
859867
phony_replica_dir,
860868
runtimes.clone(),
869+
host_controller.disable_reducer_args,
861870
)
862871
.await?;
863872

crates/core/src/subscription/module_subscription_manager.rs

+24-13
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ pub struct SubscriptionManager {
317317
// For queries that have simple equality filters,
318318
// we map the filter values to the query in this lookup table.
319319
search_args: SearchArguments,
320+
321+
// Changes Reducer callbacks to send back empty arguments to everyone but the caller
322+
disable_reducer_args: bool,
320323
}
321324

322325
// Tracks some gauges related to subscriptions.
@@ -999,20 +1002,24 @@ impl SubscriptionManager {
9991002
send_to_client(caller, message);
10001003
}
10011004

1002-
// Censor Reducer args
1003-
let mut event = ModuleEvent {
1004-
timestamp: event.timestamp,
1005-
caller_identity: event.caller_identity,
1006-
caller_connection_id: event.caller_connection_id,
1007-
function_call: event.function_call.clone(),
1008-
status: event.status.clone(),
1009-
energy_quanta_used: event.energy_quanta_used,
1010-
host_execution_duration: event.host_execution_duration,
1011-
request_id: event.request_id,
1012-
timer: event.timer,
1005+
// Disable Reducer args
1006+
let event = if self.disable_reducer_args {
1007+
let mut new_event = ModuleEvent {
1008+
timestamp: event.timestamp,
1009+
caller_identity: event.caller_identity,
1010+
caller_connection_id: event.caller_connection_id,
1011+
function_call: event.function_call.clone(),
1012+
status: event.status.clone(),
1013+
energy_quanta_used: event.energy_quanta_used,
1014+
host_execution_duration: event.host_execution_duration,
1015+
request_id: event.request_id,
1016+
timer: event.timer,
1017+
};
1018+
new_event.function_call.args = ArgsTuple::nullary();
1019+
Arc::new(new_event)
1020+
} else {
1021+
event.clone()
10131022
};
1014-
event.function_call.args = ArgsTuple::nullary();
1015-
let event = Arc::new(event);
10161023

10171024
// Send all the other updates.
10181025
for (id, update) in eval {
@@ -1046,6 +1053,10 @@ impl SubscriptionManager {
10461053
metrics
10471054
})
10481055
}
1056+
1057+
pub fn set_disable_reducer_args(&mut self, disable: bool) {
1058+
self.disable_reducer_args = disable;
1059+
}
10491060
}
10501061

10511062
fn send_to_client(client: &ClientConnectionSender, message: impl Into<SerializableMessage>) {

crates/standalone/src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl StandaloneEnv {
4646
config: Config,
4747
certs: &CertificateAuthority,
4848
data_dir: Arc<ServerDataDir>,
49+
disable_reducer_args: bool,
4950
) -> anyhow::Result<Arc<Self>> {
5051
let _pid_file = data_dir.pid_file()?;
5152
let meta_path = data_dir.metadata_toml();
@@ -68,6 +69,7 @@ impl StandaloneEnv {
6869
program_store.clone(),
6970
energy_monitor,
7071
durability_provider,
72+
disable_reducer_args,
7173
);
7274
let client_actor_index = ClientActorIndex::new();
7375
let jwt_keys = certs.get_or_create_keys()?;
@@ -513,9 +515,9 @@ mod tests {
513515
page_pool_max_size: None,
514516
};
515517

516-
let _env = StandaloneEnv::init(config, &ca, data_dir.clone()).await?;
518+
let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), false).await?;
517519
// Ensure that we have a lock.
518-
assert!(StandaloneEnv::init(config, &ca, data_dir.clone()).await.is_err());
520+
assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), false).await.is_err());
519521

520522
Ok(())
521523
}

crates/standalone/src/subcommands/start.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ pub fn cli() -> clap::Command {
7272
"The maximum size of the page pool in bytes. Should be a multiple of 64KiB. The default is 8GiB.",
7373
),
7474
)
75+
.arg(
76+
Arg::new("no_reducer_args")
77+
.long("no-reducer-args")
78+
.help("This flag disables reducer callbacks from broadcasting their arguments to anyone but the caller")
79+
.action(SetTrue),
80+
)
7581
// .after_help("Run `spacetime help start` for more detailed information.")
7682
}
7783

@@ -103,6 +109,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
103109
storage,
104110
page_pool_max_size,
105111
};
112+
let no_reducer_args = args.get_flag("no_reducer_args");
106113

107114
banner();
108115
let exe_name = std::env::current_exe()?;
@@ -147,7 +154,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
147154
.context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?;
148155

149156
let data_dir = Arc::new(data_dir.clone());
150-
let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?;
157+
let ctx = StandaloneEnv::init(db_config, &certs, data_dir, no_reducer_args).await?;
151158
worker_metrics::spawn_jemalloc_stats(listen_addr.clone());
152159
worker_metrics::spawn_tokio_stats(listen_addr.clone());
153160
worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone());

0 commit comments

Comments
 (0)