diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 6d99985f5d3..8c882774461 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -201,28 +201,29 @@ pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess impl ControlStateDelegate for T {} /// Query API of the SpacetimeDB control plane. +#[async_trait] pub trait ControlStateReadAccess { // Nodes - fn get_node_id(&self) -> Option; - fn get_node_by_id(&self, node_id: u64) -> anyhow::Result>; - fn get_nodes(&self) -> anyhow::Result>; + async fn get_node_id(&self) -> Option; + async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result>; + async fn get_nodes(&self) -> anyhow::Result>; // Databases - fn get_database_by_id(&self, id: u64) -> anyhow::Result>; - fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result>; - fn get_databases(&self) -> anyhow::Result>; + async fn get_database_by_id(&self, id: u64) -> anyhow::Result>; + async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result>; + async fn get_databases(&self) -> anyhow::Result>; // Replicas - fn get_replica_by_id(&self, id: u64) -> anyhow::Result>; - fn get_replicas(&self) -> anyhow::Result>; - fn get_leader_replica_by_database(&self, database_id: u64) -> Option; + async fn get_replica_by_id(&self, id: u64) -> anyhow::Result>; + async fn get_replicas(&self) -> anyhow::Result>; + async fn get_leader_replica_by_database(&self, database_id: u64) -> Option; // Energy - fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result>; + async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result>; // DNS - fn lookup_identity(&self, domain: &str) -> anyhow::Result>; - fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result>; + async fn lookup_identity(&self, domain: &str) -> anyhow::Result>; + async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result>; } /// Write operations on the SpacetimeDB control plane. @@ -281,53 +282,54 @@ pub trait ControlStateWriteAccess: Send + Sync { ) -> anyhow::Result; } -impl ControlStateReadAccess for Arc { +#[async_trait] +impl ControlStateReadAccess for Arc { // Nodes - fn get_node_id(&self) -> Option { - (**self).get_node_id() + async fn get_node_id(&self) -> Option { + (**self).get_node_id().await } - fn get_node_by_id(&self, node_id: u64) -> anyhow::Result> { - (**self).get_node_by_id(node_id) + async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result> { + (**self).get_node_by_id(node_id).await } - fn get_nodes(&self) -> anyhow::Result> { - (**self).get_nodes() + async fn get_nodes(&self) -> anyhow::Result> { + (**self).get_nodes().await } // Databases - fn get_database_by_id(&self, id: u64) -> anyhow::Result> { - (**self).get_database_by_id(id) + async fn get_database_by_id(&self, id: u64) -> anyhow::Result> { + (**self).get_database_by_id(id).await } - fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result> { - (**self).get_database_by_identity(identity) + async fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result> { + (**self).get_database_by_identity(identity).await } - fn get_databases(&self) -> anyhow::Result> { - (**self).get_databases() + async fn get_databases(&self) -> anyhow::Result> { + (**self).get_databases().await } // Replicas - fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { - (**self).get_replica_by_id(id) + async fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { + (**self).get_replica_by_id(id).await } - fn get_replicas(&self) -> anyhow::Result> { - (**self).get_replicas() + async fn get_replicas(&self) -> anyhow::Result> { + (**self).get_replicas().await } // Energy - fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result> { - (**self).get_energy_balance(identity) + async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result> { + (**self).get_energy_balance(identity).await } // DNS - fn lookup_identity(&self, domain: &str) -> anyhow::Result> { - (**self).lookup_identity(domain) + async fn lookup_identity(&self, domain: &str) -> anyhow::Result> { + (**self).lookup_identity(domain).await } - fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result> { - (**self).reverse_lookup(database_identity) + async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result> { + (**self).reverse_lookup(database_identity).await } - fn get_leader_replica_by_database(&self, database_id: u64) -> Option { - (**self).get_leader_replica_by_database(database_id) + async fn get_leader_replica_by_database(&self, database_id: u64) -> Option { + (**self).get_leader_replica_by_database(database_id).await } } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 2c1d34ac422..66018f225d2 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -447,6 +447,7 @@ where let replica = worker_ctx .get_leader_replica_by_database(database.id) + .await .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; let replica_id = replica.id; @@ -506,6 +507,7 @@ pub(crate) async fn worker_ctx_find_database( ) -> axum::response::Result> { worker_ctx .get_database_by_identity(database_identity) + .await .map_err(log_and_500) } @@ -594,6 +596,7 @@ pub async fn get_names( let names = ctx .reverse_lookup(&database_identity) + .await .map_err(log_and_500)? .into_iter() .filter_map(|x| String::from(x).try_into().ok()) @@ -697,7 +700,12 @@ pub async fn publish( .as_ref() .ok_or_else(|| bad_request("Clear database requires database name or identity".into()))?; if let Ok(identity) = name_or_identity.try_resolve(&ctx).await.map_err(log_and_500)? { - if ctx.get_database_by_identity(&identity).map_err(log_and_500)?.is_some() { + if ctx + .get_database_by_identity(&identity) + .await + .map_err(log_and_500)? + .is_some() + { return reset( State(ctx), Path(ResetDatabaseParams { @@ -727,7 +735,10 @@ pub async fn publish( log::trace!("Publishing to the identity: {}", database_identity.to_hex()); // Check if the database already exists. - let existing = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?; + let existing = ctx + .get_database_by_identity(&database_identity) + .await + .map_err(log_and_500)?; match existing.as_ref() { // If not, check that the we caller is sufficiently authenticated. None => { @@ -1055,7 +1066,10 @@ pub async fn set_names( let database_identity = name_or_identity.resolve(&ctx).await?; - let database = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?; + let database = ctx + .get_database_by_identity(&database_identity) + .await + .map_err(log_and_500)?; let Some(database) = database else { return Ok(( StatusCode::NOT_FOUND, @@ -1077,7 +1091,7 @@ pub async fn set_names( })?; for name in &validated_names { - if ctx.lookup_identity(name.as_str()).unwrap().is_some() { + if ctx.lookup_identity(name.as_str()).await.unwrap().is_some() { return Ok(( StatusCode::BAD_REQUEST, axum::Json(name::SetDomainsResult::OtherError(format!( diff --git a/crates/client-api/src/routes/energy.rs b/crates/client-api/src/routes/energy.rs index 34098987c68..b2ab94c0c66 100644 --- a/crates/client-api/src/routes/energy.rs +++ b/crates/client-api/src/routes/energy.rs @@ -22,7 +22,7 @@ pub async fn get_energy_balance( Path(IdentityParams { identity }): Path, ) -> axum::response::Result { let identity = Identity::from(identity); - get_budget_inner(ctx, &identity) + get_budget_inner(ctx, &identity).await } #[serde_with::serde_as] @@ -57,15 +57,20 @@ pub async fn add_energy( // TODO: is this guaranteed to pull the updated balance? let balance = ctx .get_energy_balance(&auth.claims.identity) + .await .map_err(log_and_500)? .map_or(0, |quanta| quanta.get()); Ok(axum::Json(BalanceResponse { balance })) } -fn get_budget_inner(ctx: impl ControlStateDelegate, identity: &Identity) -> axum::response::Result { +async fn get_budget_inner( + ctx: impl ControlStateDelegate, + identity: &Identity, +) -> axum::response::Result { let balance = ctx .get_energy_balance(identity) + .await .map_err(log_and_500)? .map_or(0, |quanta| quanta.get()); @@ -103,6 +108,7 @@ pub async fn set_energy_balance( .unwrap_or(0); let current_balance = ctx .get_energy_balance(&identity) + .await .map_err(log_and_500)? .map_or(0, |quanta| quanta.get()); diff --git a/crates/client-api/src/routes/health.rs b/crates/client-api/src/routes/health.rs index bf55926a660..5bb5ad14b3a 100644 --- a/crates/client-api/src/routes/health.rs +++ b/crates/client-api/src/routes/health.rs @@ -11,6 +11,7 @@ pub async fn health( ) -> axum::response::Result { let nodes: Vec = ctx .get_nodes() + .await .map_err(|_| { ( StatusCode::INTERNAL_SERVER_ERROR, @@ -23,8 +24,10 @@ pub async fn health( let schedulable = !ctx .get_node_by_id( ctx.get_node_id() + .await .ok_or((StatusCode::INTERNAL_SERVER_ERROR, "Can't get node id"))?, ) + .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Couldn't get node info"))? .map(|n| n.unschedulable) .unwrap_or(false); diff --git a/crates/client-api/src/routes/identity.rs b/crates/client-api/src/routes/identity.rs index c8dc44d2964..fa42b460561 100644 --- a/crates/client-api/src/routes/identity.rs +++ b/crates/client-api/src/routes/identity.rs @@ -79,7 +79,7 @@ pub async fn get_databases( ) -> axum::response::Result { let identity = identity.into(); // Linear scan for all databases that have this owner, and return their identities - let all_dbs = ctx.get_databases().map_err(|e| { + let all_dbs = ctx.get_databases().await.map_err(|e| { log::error!("Failure when retrieving databases for search: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; diff --git a/crates/client-api/src/routes/prometheus.rs b/crates/client-api/src/routes/prometheus.rs index fd8d4a53cfc..f12eb0ad820 100644 --- a/crates/client-api/src/routes/prometheus.rs +++ b/crates/client-api/src/routes/prometheus.rs @@ -14,7 +14,7 @@ pub async fn get_sd_config( State(ctx): State, ) -> axum::response::Result { // TODO(cloutiertyler): security - let nodes = ctx.get_nodes().map_err(log_and_500)?; + let nodes = ctx.get_nodes().await.map_err(log_and_500)?; let mut targets = Vec::new(); let labels = HashMap::default(); diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 42ab7d7ee57..94c152d64db 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -143,6 +143,7 @@ where let database = ctx .get_database_by_identity(&db_identity) + .await .unwrap() .ok_or(StatusCode::NOT_FOUND)?; diff --git a/crates/client-api/src/util.rs b/crates/client-api/src/util.rs index 509b891e483..676f5eaa813 100644 --- a/crates/client-api/src/util.rs +++ b/crates/client-api/src/util.rs @@ -100,7 +100,7 @@ impl NameOrIdentity { ) -> anyhow::Result> { Ok(match self { Self::Identity(identity) => Ok(Identity::from(*identity)), - Self::Name(name) => ctx.lookup_identity(name.as_ref())?.ok_or(name), + Self::Name(name) => ctx.lookup_identity(name.as_ref()).await?.ok_or(name), }) } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 8c9ca86d5a8..bc18f5c67cf 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -148,13 +148,14 @@ impl NodeDelegate for StandaloneEnv { } } +#[async_trait] impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv { // Nodes - fn get_node_id(&self) -> Option { + async fn get_node_id(&self) -> Option { Some(0) } - fn get_node_by_id(&self, node_id: u64) -> anyhow::Result> { + async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result> { if node_id == 0 { return Ok(Some(Node { id: 0, @@ -166,46 +167,46 @@ impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv { Ok(None) } - fn get_nodes(&self) -> anyhow::Result> { - Ok(vec![self.get_node_by_id(0)?.unwrap()]) + async fn get_nodes(&self) -> anyhow::Result> { + Ok(vec![self.get_node_by_id(0).await?.unwrap()]) } // Databases - fn get_database_by_id(&self, id: u64) -> anyhow::Result> { + async fn get_database_by_id(&self, id: u64) -> anyhow::Result> { Ok(self.control_db.get_database_by_id(id)?) } - fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result> { + async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result> { Ok(self.control_db.get_database_by_identity(database_identity)?) } - fn get_databases(&self) -> anyhow::Result> { + async fn get_databases(&self) -> anyhow::Result> { Ok(self.control_db.get_databases()?) } // Replicas - fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { + async fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { Ok(self.control_db.get_replica_by_id(id)?) } - fn get_replicas(&self) -> anyhow::Result> { + async fn get_replicas(&self) -> anyhow::Result> { Ok(self.control_db.get_replicas()?) } - fn get_leader_replica_by_database(&self, database_id: u64) -> Option { + async fn get_leader_replica_by_database(&self, database_id: u64) -> Option { self.control_db.get_leader_replica_by_database(database_id) } // Energy - fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result> { + async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result> { Ok(self.control_db.get_energy_balance(identity)?) } // DNS - fn lookup_identity(&self, domain: &str) -> anyhow::Result> { + async fn lookup_identity(&self, domain: &str) -> anyhow::Result> { Ok(self.control_db.spacetime_dns(domain)?) } - fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result> { + async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result> { Ok(self.control_db.spacetime_reverse_dns(database_identity)?) } } @@ -446,7 +447,8 @@ impl spacetimedb_client_api::Authorization for StandaloneEnv { action: spacetimedb_client_api::Action, ) -> Result<(), spacetimedb_client_api::Unauthorized> { let database = self - .get_database_by_identity(&database)? + .get_database_by_identity(&database) + .await? .with_context(|| format!("database {database} not found")) .with_context(|| format!("Unable to authorize {subject} to perform {action:?})"))?; if subject == database.owner_identity { @@ -467,7 +469,8 @@ impl spacetimedb_client_api::Authorization for StandaloneEnv { database: Identity, ) -> Result { let database = self - .get_database_by_identity(&database)? + .get_database_by_identity(&database) + .await? .with_context(|| format!("database {database} not found")) .with_context(|| format!("Unable to authorize {subject} for SQL"))?; diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 4a6112cb167..bd08d5d6555 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -217,8 +217,8 @@ impl CompiledModule { .await .unwrap(); - let database = env.get_database_by_identity(&db_identity).unwrap().unwrap(); - let instance = env.get_leader_replica_by_database(database.id).unwrap(); + let database = env.get_database_by_identity(&db_identity).await.unwrap().unwrap(); + let instance = env.get_leader_replica_by_database(database.id).await.unwrap(); let client_id = ClientActorId { identity,