Skip to content

Disable reducer arg broadcast #2694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct HostController {
pub page_pool: PagePool,
/// The runtimes for running our modules.
runtimes: Arc<HostRuntimes>,
/// Flag for hiding reducer callback arguments from broadcasting to non-caller
disable_reducer_args: bool,
}

struct HostRuntimes {
Expand Down Expand Up @@ -166,6 +168,7 @@ impl HostController {
program_storage: ProgramStorage,
energy_monitor: Arc<impl EnergyMonitor>,
durability: Arc<dyn DurabilityProvider>,
disable_reducer_args: bool,
) -> Self {
Self {
hosts: <_>::default(),
Expand All @@ -176,6 +179,7 @@ impl HostController {
runtimes: HostRuntimes::new(&data_dir),
data_dir,
page_pool: PagePool::new(default_config.page_pool_max_size),
disable_reducer_args,
}
}

Expand Down Expand Up @@ -523,9 +527,11 @@ async fn make_replica_ctx(
database: Database,
replica_id: u64,
relational_db: Arc<RelationalDB>,
disable_reducer_args: bool,
) -> anyhow::Result<ReplicaContext> {
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
let subscriptions = Arc::new(RwLock::new(SubscriptionManager::default()));
subscriptions.write_arc().set_disable_reducer_args(disable_reducer_args);
let downgraded = Arc::downgrade(&subscriptions);
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);

Expand Down Expand Up @@ -611,11 +617,12 @@ async fn launch_module(
energy_monitor: Arc<dyn EnergyMonitor>,
replica_dir: ReplicaDir,
runtimes: Arc<HostRuntimes>,
disable_reducer_args: bool,
) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = database.database_identity;
let host_type = database.host_type;

let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db)
let replica_ctx = make_replica_ctx(replica_dir, database, replica_id, relational_db, disable_reducer_args)
.await
.map(Arc::new)?;
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
Expand Down Expand Up @@ -768,6 +775,7 @@ impl Host {
energy_monitor.clone(),
replica_dir,
runtimes.clone(),
host_controller.disable_reducer_args,
)
.await?;

Expand Down Expand Up @@ -858,6 +866,7 @@ impl Host {
Arc::new(NullEnergyMonitor),
phony_replica_dir,
runtimes.clone(),
host_controller.disable_reducer_args,
)
.await?;

Expand Down
27 changes: 27 additions & 0 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::client::{ClientConnectionSender, Protocol};
use crate::error::DBError;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
use crate::host::ArgsTuple;
use crate::messages::websocket::{self as ws, TableUpdate};
use crate::subscription::delta::eval_delta;
use crate::subscription::record_exec_metrics;
Expand Down Expand Up @@ -316,6 +317,9 @@ pub struct SubscriptionManager {
// For queries that have simple equality filters,
// we map the filter values to the query in this lookup table.
search_args: SearchArguments,

// Changes Reducer callbacks to send back empty arguments to everyone but the caller
disable_reducer_args: bool,
}

// Tracks some gauges related to subscriptions.
Expand Down Expand Up @@ -998,6 +1002,25 @@ impl SubscriptionManager {
send_to_client(caller, message);
}

// Disable Reducer args
let event = if self.disable_reducer_args {
let mut new_event = ModuleEvent {
timestamp: event.timestamp,
caller_identity: event.caller_identity,
caller_connection_id: event.caller_connection_id,
function_call: event.function_call.clone(),
status: event.status.clone(),
energy_quanta_used: event.energy_quanta_used,
host_execution_duration: event.host_execution_duration,
request_id: event.request_id,
timer: event.timer,
};
new_event.function_call.args = ArgsTuple::nullary();
Arc::new(new_event)
} else {
event.clone()
};

// Send all the other updates.
for (id, update) in eval {
let database_update = SubscriptionUpdateMessage::from_event_and_update(&event, update);
Expand Down Expand Up @@ -1030,6 +1053,10 @@ impl SubscriptionManager {
metrics
})
}

pub fn set_disable_reducer_args(&mut self, disable: bool) {
self.disable_reducer_args = disable;
}
}

fn send_to_client(client: &ClientConnectionSender, message: impl Into<SerializableMessage>) {
Expand Down
6 changes: 4 additions & 2 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl StandaloneEnv {
config: Config,
certs: &CertificateAuthority,
data_dir: Arc<ServerDataDir>,
disable_reducer_args: bool,
) -> anyhow::Result<Arc<Self>> {
let _pid_file = data_dir.pid_file()?;
let meta_path = data_dir.metadata_toml();
Expand All @@ -68,6 +69,7 @@ impl StandaloneEnv {
program_store.clone(),
energy_monitor,
durability_provider,
disable_reducer_args,
);
let client_actor_index = ClientActorIndex::new();
let jwt_keys = certs.get_or_create_keys()?;
Expand Down Expand Up @@ -513,9 +515,9 @@ mod tests {
page_pool_max_size: None,
};

let _env = StandaloneEnv::init(config, &ca, data_dir.clone()).await?;
let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), false).await?;
// Ensure that we have a lock.
assert!(StandaloneEnv::init(config, &ca, data_dir.clone()).await.is_err());
assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), false).await.is_err());

Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion crates/standalone/src/subcommands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub fn cli() -> clap::Command {
"The maximum size of the page pool in bytes. Should be a multiple of 64KiB. The default is 8GiB.",
),
)
.arg(
Arg::new("no_reducer_args")
.long("no-reducer-args")
.help("This flag disables reducer callbacks from broadcasting their arguments to anyone but the caller")
.action(SetTrue),
)
// .after_help("Run `spacetime help start` for more detailed information.")
}

Expand Down Expand Up @@ -103,6 +109,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
storage,
page_pool_max_size,
};
let no_reducer_args = args.get_flag("no_reducer_args");

banner();
let exe_name = std::env::current_exe()?;
Expand Down Expand Up @@ -147,7 +154,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
.context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?;

let data_dir = Arc::new(data_dir.clone());
let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?;
let ctx = StandaloneEnv::init(db_config, &certs, data_dir, no_reducer_args).await?;
worker_metrics::spawn_jemalloc_stats(listen_addr.clone());
worker_metrics::spawn_tokio_stats(listen_addr.clone());
worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone());
Expand Down