Skip to content

Commit dedbfb4

Browse files
authored
client-api: Make ControlStateReadAccess an async trait (#3357)
Historically, controldb reads could be treated as just a datastructure, but that became a lie when reconnections were introduced. Some tricks were employed to enter the async context when needed, but those always bear the risk of introducing a deadlock somewhere. So, just make it async. # Expected complexity level and risk 2 It may or may not be safe to use sled in an async context. We did already for the write path, but if it's a problem it'll show now. # Testing Not a functional change.
1 parent 06ccec1 commit dedbfb4

File tree

10 files changed

+92
-63
lines changed

10 files changed

+92
-63
lines changed

crates/client-api/src/lib.rs

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -201,28 +201,29 @@ pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess
201201
impl<T: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync> ControlStateDelegate for T {}
202202

203203
/// Query API of the SpacetimeDB control plane.
204+
#[async_trait]
204205
pub trait ControlStateReadAccess {
205206
// Nodes
206-
fn get_node_id(&self) -> Option<u64>;
207-
fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
208-
fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;
207+
async fn get_node_id(&self) -> Option<u64>;
208+
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
209+
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;
209210

210211
// Databases
211-
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
212-
fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
213-
fn get_databases(&self) -> anyhow::Result<Vec<Database>>;
212+
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
213+
async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
214+
async fn get_databases(&self) -> anyhow::Result<Vec<Database>>;
214215

215216
// Replicas
216-
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
217-
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
218-
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;
217+
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
218+
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
219+
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;
219220

220221
// Energy
221-
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;
222+
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;
222223

223224
// DNS
224-
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
225-
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
225+
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
226+
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
226227
}
227228

228229
/// Write operations on the SpacetimeDB control plane.
@@ -281,53 +282,54 @@ pub trait ControlStateWriteAccess: Send + Sync {
281282
) -> anyhow::Result<SetDomainsResult>;
282283
}
283284

284-
impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
285+
#[async_trait]
286+
impl<T: ControlStateReadAccess + Send + Sync + Sync + ?Sized> ControlStateReadAccess for Arc<T> {
285287
// Nodes
286-
fn get_node_id(&self) -> Option<u64> {
287-
(**self).get_node_id()
288+
async fn get_node_id(&self) -> Option<u64> {
289+
(**self).get_node_id().await
288290
}
289-
fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
290-
(**self).get_node_by_id(node_id)
291+
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
292+
(**self).get_node_by_id(node_id).await
291293
}
292-
fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
293-
(**self).get_nodes()
294+
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
295+
(**self).get_nodes().await
294296
}
295297

296298
// Databases
297-
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
298-
(**self).get_database_by_id(id)
299+
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
300+
(**self).get_database_by_id(id).await
299301
}
300-
fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
301-
(**self).get_database_by_identity(identity)
302+
async fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
303+
(**self).get_database_by_identity(identity).await
302304
}
303-
fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
304-
(**self).get_databases()
305+
async fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
306+
(**self).get_databases().await
305307
}
306308

307309
// Replicas
308-
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
309-
(**self).get_replica_by_id(id)
310+
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
311+
(**self).get_replica_by_id(id).await
310312
}
311-
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
312-
(**self).get_replicas()
313+
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
314+
(**self).get_replicas().await
313315
}
314316

315317
// Energy
316-
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
317-
(**self).get_energy_balance(identity)
318+
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
319+
(**self).get_energy_balance(identity).await
318320
}
319321

320322
// DNS
321-
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
322-
(**self).lookup_identity(domain)
323+
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
324+
(**self).lookup_identity(domain).await
323325
}
324326

325-
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
326-
(**self).reverse_lookup(database_identity)
327+
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
328+
(**self).reverse_lookup(database_identity).await
327329
}
328330

329-
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
330-
(**self).get_leader_replica_by_database(database_id)
331+
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
332+
(**self).get_leader_replica_by_database(database_id).await
331333
}
332334
}
333335

crates/client-api/src/routes/database.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ where
447447

448448
let replica = worker_ctx
449449
.get_leader_replica_by_database(database.id)
450+
.await
450451
.ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
451452
let replica_id = replica.id;
452453

@@ -506,6 +507,7 @@ pub(crate) async fn worker_ctx_find_database(
506507
) -> axum::response::Result<Option<Database>> {
507508
worker_ctx
508509
.get_database_by_identity(database_identity)
510+
.await
509511
.map_err(log_and_500)
510512
}
511513

@@ -594,6 +596,7 @@ pub async fn get_names<S: ControlStateDelegate>(
594596

595597
let names = ctx
596598
.reverse_lookup(&database_identity)
599+
.await
597600
.map_err(log_and_500)?
598601
.into_iter()
599602
.filter_map(|x| String::from(x).try_into().ok())
@@ -697,7 +700,12 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
697700
.as_ref()
698701
.ok_or_else(|| bad_request("Clear database requires database name or identity".into()))?;
699702
if let Ok(identity) = name_or_identity.try_resolve(&ctx).await.map_err(log_and_500)? {
700-
if ctx.get_database_by_identity(&identity).map_err(log_and_500)?.is_some() {
703+
if ctx
704+
.get_database_by_identity(&identity)
705+
.await
706+
.map_err(log_and_500)?
707+
.is_some()
708+
{
701709
return reset(
702710
State(ctx),
703711
Path(ResetDatabaseParams {
@@ -727,7 +735,10 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
727735
log::trace!("Publishing to the identity: {}", database_identity.to_hex());
728736

729737
// Check if the database already exists.
730-
let existing = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?;
738+
let existing = ctx
739+
.get_database_by_identity(&database_identity)
740+
.await
741+
.map_err(log_and_500)?;
731742
match existing.as_ref() {
732743
// If not, check that the we caller is sufficiently authenticated.
733744
None => {
@@ -1055,7 +1066,10 @@ pub async fn set_names<S: ControlStateDelegate + Authorization>(
10551066

10561067
let database_identity = name_or_identity.resolve(&ctx).await?;
10571068

1058-
let database = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?;
1069+
let database = ctx
1070+
.get_database_by_identity(&database_identity)
1071+
.await
1072+
.map_err(log_and_500)?;
10591073
let Some(database) = database else {
10601074
return Ok((
10611075
StatusCode::NOT_FOUND,
@@ -1077,7 +1091,7 @@ pub async fn set_names<S: ControlStateDelegate + Authorization>(
10771091
})?;
10781092

10791093
for name in &validated_names {
1080-
if ctx.lookup_identity(name.as_str()).unwrap().is_some() {
1094+
if ctx.lookup_identity(name.as_str()).await.unwrap().is_some() {
10811095
return Ok((
10821096
StatusCode::BAD_REQUEST,
10831097
axum::Json(name::SetDomainsResult::OtherError(format!(

crates/client-api/src/routes/energy.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub async fn get_energy_balance<S: ControlStateDelegate>(
2222
Path(IdentityParams { identity }): Path<IdentityParams>,
2323
) -> axum::response::Result<impl IntoResponse> {
2424
let identity = Identity::from(identity);
25-
get_budget_inner(ctx, &identity)
25+
get_budget_inner(ctx, &identity).await
2626
}
2727

2828
#[serde_with::serde_as]
@@ -57,15 +57,20 @@ pub async fn add_energy<S: ControlStateDelegate>(
5757
// TODO: is this guaranteed to pull the updated balance?
5858
let balance = ctx
5959
.get_energy_balance(&auth.claims.identity)
60+
.await
6061
.map_err(log_and_500)?
6162
.map_or(0, |quanta| quanta.get());
6263

6364
Ok(axum::Json(BalanceResponse { balance }))
6465
}
6566

66-
fn get_budget_inner(ctx: impl ControlStateDelegate, identity: &Identity) -> axum::response::Result<impl IntoResponse> {
67+
async fn get_budget_inner(
68+
ctx: impl ControlStateDelegate,
69+
identity: &Identity,
70+
) -> axum::response::Result<impl IntoResponse> {
6771
let balance = ctx
6872
.get_energy_balance(identity)
73+
.await
6974
.map_err(log_and_500)?
7075
.map_or(0, |quanta| quanta.get());
7176

@@ -103,6 +108,7 @@ pub async fn set_energy_balance<S: ControlStateDelegate>(
103108
.unwrap_or(0);
104109
let current_balance = ctx
105110
.get_energy_balance(&identity)
111+
.await
106112
.map_err(log_and_500)?
107113
.map_or(0, |quanta| quanta.get());
108114

crates/client-api/src/routes/health.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub async fn health<S: ControlStateDelegate + NodeDelegate>(
1111
) -> axum::response::Result<impl IntoResponse> {
1212
let nodes: Vec<u64> = ctx
1313
.get_nodes()
14+
.await
1415
.map_err(|_| {
1516
(
1617
StatusCode::INTERNAL_SERVER_ERROR,
@@ -23,8 +24,10 @@ pub async fn health<S: ControlStateDelegate + NodeDelegate>(
2324
let schedulable = !ctx
2425
.get_node_by_id(
2526
ctx.get_node_id()
27+
.await
2628
.ok_or((StatusCode::INTERNAL_SERVER_ERROR, "Can't get node id"))?,
2729
)
30+
.await
2831
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Couldn't get node info"))?
2932
.map(|n| n.unschedulable)
3033
.unwrap_or(false);

crates/client-api/src/routes/identity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn get_databases<S: ControlStateDelegate>(
7979
) -> axum::response::Result<impl IntoResponse> {
8080
let identity = identity.into();
8181
// Linear scan for all databases that have this owner, and return their identities
82-
let all_dbs = ctx.get_databases().map_err(|e| {
82+
let all_dbs = ctx.get_databases().await.map_err(|e| {
8383
log::error!("Failure when retrieving databases for search: {e}");
8484
StatusCode::INTERNAL_SERVER_ERROR
8585
})?;

crates/client-api/src/routes/prometheus.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub async fn get_sd_config<S: ControlStateReadAccess>(
1414
State(ctx): State<S>,
1515
) -> axum::response::Result<impl IntoResponse> {
1616
// TODO(cloutiertyler): security
17-
let nodes = ctx.get_nodes().map_err(log_and_500)?;
17+
let nodes = ctx.get_nodes().await.map_err(log_and_500)?;
1818

1919
let mut targets = Vec::new();
2020
let labels = HashMap::default();

crates/client-api/src/routes/subscribe.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ where
143143

144144
let database = ctx
145145
.get_database_by_identity(&db_identity)
146+
.await
146147
.unwrap()
147148
.ok_or(StatusCode::NOT_FOUND)?;
148149

crates/client-api/src/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl NameOrIdentity {
100100
) -> anyhow::Result<Result<Identity, &DatabaseName>> {
101101
Ok(match self {
102102
Self::Identity(identity) => Ok(Identity::from(*identity)),
103-
Self::Name(name) => ctx.lookup_identity(name.as_ref())?.ok_or(name),
103+
Self::Name(name) => ctx.lookup_identity(name.as_ref()).await?.ok_or(name),
104104
})
105105
}
106106

crates/standalone/src/lib.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,14 @@ impl NodeDelegate for StandaloneEnv {
148148
}
149149
}
150150

151+
#[async_trait]
151152
impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
152153
// Nodes
153-
fn get_node_id(&self) -> Option<u64> {
154+
async fn get_node_id(&self) -> Option<u64> {
154155
Some(0)
155156
}
156157

157-
fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
158+
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
158159
if node_id == 0 {
159160
return Ok(Some(Node {
160161
id: 0,
@@ -166,46 +167,46 @@ impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
166167
Ok(None)
167168
}
168169

169-
fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
170-
Ok(vec![self.get_node_by_id(0)?.unwrap()])
170+
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
171+
Ok(vec![self.get_node_by_id(0).await?.unwrap()])
171172
}
172173

173174
// Databases
174-
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
175+
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
175176
Ok(self.control_db.get_database_by_id(id)?)
176177
}
177178

178-
fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
179+
async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
179180
Ok(self.control_db.get_database_by_identity(database_identity)?)
180181
}
181182

182-
fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
183+
async fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
183184
Ok(self.control_db.get_databases()?)
184185
}
185186

186187
// Replicas
187-
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
188+
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
188189
Ok(self.control_db.get_replica_by_id(id)?)
189190
}
190191

191-
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
192+
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
192193
Ok(self.control_db.get_replicas()?)
193194
}
194195

195-
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
196+
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
196197
self.control_db.get_leader_replica_by_database(database_id)
197198
}
198199
// Energy
199-
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
200+
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
200201
Ok(self.control_db.get_energy_balance(identity)?)
201202
}
202203

203204
// DNS
204-
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
205+
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
205206
Ok(self.control_db.spacetime_dns(domain)?)
206207
}
207208

208-
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
209+
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
209210
Ok(self.control_db.spacetime_reverse_dns(database_identity)?)
210211
}
211212
}
@@ -446,7 +447,8 @@ impl spacetimedb_client_api::Authorization for StandaloneEnv {
446447
action: spacetimedb_client_api::Action,
447448
) -> Result<(), spacetimedb_client_api::Unauthorized> {
448449
let database = self
449-
.get_database_by_identity(&database)?
450+
.get_database_by_identity(&database)
451+
.await?
450452
.with_context(|| format!("database {database} not found"))
451453
.with_context(|| format!("Unable to authorize {subject} to perform {action:?})"))?;
452454
if subject == database.owner_identity {
@@ -467,7 +469,8 @@ impl spacetimedb_client_api::Authorization for StandaloneEnv {
467469
database: Identity,
468470
) -> Result<AuthCtx, spacetimedb_client_api::Unauthorized> {
469471
let database = self
470-
.get_database_by_identity(&database)?
472+
.get_database_by_identity(&database)
473+
.await?
471474
.with_context(|| format!("database {database} not found"))
472475
.with_context(|| format!("Unable to authorize {subject} for SQL"))?;
473476

crates/testing/src/modules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ impl CompiledModule {
217217
.await
218218
.unwrap();
219219

220-
let database = env.get_database_by_identity(&db_identity).unwrap().unwrap();
221-
let instance = env.get_leader_replica_by_database(database.id).unwrap();
220+
let database = env.get_database_by_identity(&db_identity).await.unwrap().unwrap();
221+
let instance = env.get_leader_replica_by_database(database.id).await.unwrap();
222222

223223
let client_id = ClientActorId {
224224
identity,

0 commit comments

Comments
 (0)