From 2ba2a12072f42314b7323885130158a36fdd9df6 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 27 Oct 2025 11:58:55 -0400 Subject: [PATCH 1/9] WASM host execution for procedures This commit builds support for executing procedures in WASM modules. This includes an HTTP endpoint, `/v1/database/:name_or_address/procedure/:name POST`, as well as an extension to the WS protocol. These new APIs are not wired up to the CLI or SDKs, but I have manually tested the HTTP endpoint via `curl`. The new WS extensions are completely untested. Several TODOs are scattered throughout the new code, most notably for sensibly tracking procedure execution time in the metrics. I also expect that we will want to remove the `procedure_sleep_until` syscall and the `ProcedureContext::sleep_until` method prior to release. --- crates/bindings-sys/src/lib.rs | 24 ++- crates/client-api-messages/src/websocket.rs | 82 +++++++ crates/client-api/src/routes/database.rs | 192 +++++++++++++---- crates/core/src/client/client_connection.rs | 26 ++- crates/core/src/client/message_handlers.rs | 23 +- crates/core/src/client/messages.rs | 95 +++++++- crates/core/src/host/host_controller.rs | 9 +- crates/core/src/host/instance_env.rs | 4 +- crates/core/src/host/mod.rs | 8 +- crates/core/src/host/module_host.rs | 146 ++++++++++++- crates/core/src/host/v8/mod.rs | 11 +- crates/core/src/host/wasm_common.rs | 10 +- .../src/host/wasm_common/module_host_actor.rs | 163 +++++++++++++- .../src/host/wasmtime/wasm_instance_env.rs | 93 +++++--- .../core/src/host/wasmtime/wasmtime_module.rs | 204 ++++++++++++++++-- .../subscription/module_subscription_actor.rs | 20 +- crates/core/src/util/jobs.rs | 3 + crates/datastore/src/execution_context.rs | 1 + sdks/rust/src/db_connection.rs | 3 +- .../src/module_bindings/mod.rs | 2 +- .../test-client/src/module_bindings/mod.rs | 2 +- 21 files changed, 1008 insertions(+), 113 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 764b73387c6..c1032777ff3 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -645,6 +645,24 @@ pub mod raw { pub fn get_jwt(connection_id_ptr: *const u8, bytes_source_id: *mut BytesSource) -> u16; } + #[link(wasm_import_module = "spacetime_10.3")] + extern "C" { + /// Suspends execution of this WASM instance until approximately `wake_at_micros_since_unix_epoch`. + /// + /// Returns immediately if `wake_at_micros_since_unix_epoch` is in the past. + /// + /// Upon resuming, returns the current timestamp as microseconds since the Unix epoch. + /// + /// Not particularly useful, except for testing SpacetimeDB internals related to suspending procedure execution. + /// # Traps + /// + /// Traps if: + /// + /// - The calling WASM instance is holding open a transaction. + /// - The calling WASM instance is not executing a procedure. + pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64; + } + /// What strategy does the database index use? /// /// See also: @@ -1222,7 +1240,9 @@ impl Drop for RowIter { pub mod procedure { //! Side-effecting or asynchronous operations which only procedures are allowed to perform. #[inline] - pub fn sleep_until(_wake_at_timestamp: i64) -> i64 { - todo!("Add `procedure_sleep_until` host function") + pub fn sleep_until(wake_at_timestamp: i64) -> i64 { + // Safety: Just calling an `extern "C"` function. + // Nothing weird happening here. + unsafe { super::raw::procedure_sleep_until(wake_at_timestamp) } } } diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index cf24e6e5e87..c4234ef99bd 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -105,6 +105,8 @@ pub enum ClientMessage { /// Remove a subscription to a SQL query that was added with SubscribeSingle. Unsubscribe(Unsubscribe), UnsubscribeMulti(UnsubscribeMulti), + /// Request a procedure run. + CallProcedure(CallProcedure), } impl ClientMessage { @@ -127,6 +129,17 @@ impl ClientMessage { ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x), ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x), ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x), + ClientMessage::CallProcedure(CallProcedure { + procedure, + args, + request_id, + flags, + }) => ClientMessage::CallProcedure(CallProcedure { + procedure, + args: f(args), + request_id, + flags, + }), } } } @@ -292,6 +305,37 @@ pub struct OneOffQuery { pub query_string: Box, } +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct CallProcedure { + /// The name of the procedure to call. + pub procedure: Box, + /// The arguments to the procedure. + /// + /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema + /// and the enclosing message format. + pub args: Args, + /// An identifier for a client request. + /// + /// The server will include the same ID in the response [`ProcedureResult`]. + pub request_id: u32, + /// Reserved space for future extensions. + pub flags: CallProcedureFlags, +} + +#[derive(Clone, Copy, Default, PartialEq, Eq)] +pub enum CallProcedureFlags { + #[default] + Default, +} + +impl_st!([] CallProcedureFlags, AlgebraicType::U8); +impl_serialize!([] CallProcedureFlags, (self, ser) => ser.serialize_u8(*self as u8)); +impl_deserialize!([] CallProcedureFlags, de => match de.deserialize_u8()? { + 0 => Ok(Self::Default), + x => Err(D::Error::custom(format_args!("invalid call procedure flag {x}"))), +}); + /// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`]. pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0; @@ -326,6 +370,8 @@ pub enum ServerMessage { SubscribeMultiApplied(SubscribeMultiApplied), /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows. UnsubscribeMultiApplied(UnsubscribeMultiApplied), + /// Sent in response to a `ProcedureCall` message. This contains the return value. + ProcedureResult(ProcedureResult), } /// The matching rows of a subscription query. @@ -705,6 +751,42 @@ pub struct OneOffTable { pub rows: F::List, } +#[derive(SpacetimeType, Debug)] +#[sats(crate = spacetimedb_lib)] +pub struct ProcedureResult { + /// The status of the procedure run. + /// + /// Contains the return value if successful, or the error message if not. + pub status: ProcedureStatus, + /// The time when the reducer started. + /// + /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch. + pub timestamp: Timestamp, + /// The time the procedure took to run. + pub total_host_execution_duration: TimeDuration, + /// The same same client-provided identifier as in the original [`ProcedureCall`] request. + /// + /// Clients use this to correlate the response with the original request. + pub request_id: u32, +} + +#[derive(SpacetimeType, Debug)] +#[sats(crate = spacetimedb_lib)] +pub enum ProcedureStatus { + /// The procedure ran and returned the enclosed value. + /// + /// All user error handling happens within here; + /// the returned value may be a `Result` or `Option`, + /// or any other type to which the user may ascribe arbitrary meaning. + Returned(F::Single), + /// The reducer was interrupted due to insufficient energy/funds. + /// + /// The procedure may have performed some observable side effects before being interrupted. + OutOfEnergy, + /// The call failed in the host, e.g. due to a type error or unknown procedure name. + InternalError(String), +} + /// Used whenever different formats need to coexist. #[derive(Debug, Clone)] pub enum FormatSwitch { diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 7be200d5c26..1869860cc02 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -20,10 +20,10 @@ use http::StatusCode; use serde::Deserialize; use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::host::module_host::ClientConnectedError; -use spacetimedb::host::ReducerCallError; use spacetimedb::host::ReducerOutcome; use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::{FunctionArgs, MigratePlanResult}; +use spacetimedb::host::{ProcedureCallError, ReducerCallError}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, HostType}; use spacetimedb_client_api_messages::name::{ @@ -31,7 +31,7 @@ use spacetimedb_client_api_messages::name::{ }; use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9; use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::{sats, ProductValue, Timestamp}; +use spacetimedb_lib::{sats, AlgebraicValue, ProductValue, Timestamp}; use spacetimedb_schema::auto_migrate::{ MigrationPolicy as SchemaMigrationPolicy, MigrationToken, PrettyPrintStyle as AutoMigratePrettyPrintStyle, }; @@ -70,7 +70,7 @@ pub async fn call( log::error!("Could not find database: {}", db_identity.to_hex()); NO_SUCH_DATABASE })?; - let identity = database.owner_identity; + let owner_identity = database.owner_identity; let leader = worker_ctx .leader(database.id) @@ -83,35 +83,11 @@ pub async fn call( // so generate one. let connection_id = generate_random_connection_id(); - match module.call_identity_connected(auth.into(), connection_id).await { - // If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored, - // meaning the connection was refused. Return 403 forbidden. - Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()), - // If `call_identity_connected` returns `Err(OutOfEnergy)`, - // then, well, the database is out of energy. - // Return 503 service unavailable. - Err(err @ ClientConnectedError::OutOfEnergy) => { - return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into()) - } - // If `call_identity_connected` returns `Err(ReducerCall)`, - // something went wrong while invoking the `client_connected` reducer. - // I (pgoldman 2025-03-27) am not really sure how this would happen, - // but we returned 404 not found in this case prior to my editing this code, - // so I guess let's keep doing that. - Err(ClientConnectedError::ReducerCall(e)) => { - return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into()) - } - // If `call_identity_connected` returns `Err(DBError)`, - // then the module didn't define `client_connected`, - // but something went wrong when we tried to insert into `st_client`. - // That's weird and scary, so return 500 internal error. - Err(e @ ClientConnectedError::DBError(_)) => { - return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into()) - } + module + .call_identity_connected(auth.into(), connection_id) + .await + .map_err(client_connected_error_to_response)?; - // If `call_identity_connected` returns `Ok`, then we can actually call the reducer we want. - Ok(()) => (), - } let result = match module .call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args) .await @@ -139,17 +115,14 @@ pub async fn call( } }; - if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await { - // If `call_identity_disconnected` errors, something is very wrong: - // it means we tried to delete the `st_client` row but failed. - // Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer. - // Slap a 500 on it and pray. - return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into()); - } + module + .call_identity_disconnected(caller_identity, connection_id) + .await + .map_err(client_disconnected_error_to_response)?; match result { Ok(result) => { - let (status, body) = reducer_outcome_response(&identity, &reducer, result.outcome); + let (status, body) = reducer_outcome_response(&owner_identity, &reducer, result.outcome); Ok(( status, TypedHeader(SpacetimeEnergyUsed(result.energy_used)), @@ -161,7 +134,7 @@ pub async fn call( } } -fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: ReducerOutcome) -> (StatusCode, String) { +fn reducer_outcome_response(owner_identity: &Identity, reducer: &str, outcome: ReducerOutcome) -> (StatusCode, String) { match outcome { ReducerOutcome::Committed => (StatusCode::OK, "".to_owned()), ReducerOutcome::Failed(errmsg) => { @@ -169,7 +142,7 @@ fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: Reducer (StatusCode::from_u16(530).unwrap(), errmsg) } ReducerOutcome::BudgetExceeded => { - log::warn!("Node's energy budget exceeded for identity: {identity} while executing {reducer}"); + log::warn!("Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}"); ( StatusCode::PAYMENT_REQUIRED, "Module energy budget exhausted.".to_owned(), @@ -178,6 +151,38 @@ fn reducer_outcome_response(identity: &Identity, reducer: &str, outcome: Reducer } } +fn client_connected_error_to_response(err: ClientConnectedError) -> ErrorResponse { + match err { + // If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored, + // meaning the connection was refused. Return 403 forbidden. + ClientConnectedError::Rejected(msg) => (StatusCode::FORBIDDEN, msg).into(), + // If `call_identity_connected` returns `Err(OutOfEnergy)`, + // then, well, the database is out of energy. + // Return 503 service unavailable. + ClientConnectedError::OutOfEnergy => (StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into(), + // If `call_identity_connected` returns `Err(ReducerCall)`, + // something went wrong while invoking the `client_connected` reducer. + // I (pgoldman 2025-03-27) am not really sure how this would happen, + // but we returned 404 not found in this case prior to my editing this code, + // so I guess let's keep doing that. + ClientConnectedError::ReducerCall(e) => (StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into(), + // If `call_identity_connected` returns `Err(DBError)`, + // then the module didn't define `client_connected`, + // but something went wrong when we tried to insert into `st_client`. + // That's weird and scary, so return 500 internal error. + ClientConnectedError::DBError(_) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(), + } +} + +/// If `call_identity_disconnected` errors, something is very wrong: +/// it means we tried to delete the `st_client` row but failed. +/// +/// Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer. +/// Slap a 500 on it and pray. +fn client_disconnected_error_to_response(err: ReducerCallError) -> ErrorResponse { + (StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(err))).into() +} + #[derive(Debug, derive_more::From)] pub enum DBCallErr { HandlerError(ErrorResponse), @@ -185,6 +190,107 @@ pub enum DBCallErr { InstanceNotScheduled, } +#[derive(Deserialize)] +pub struct ProcedureParams { + name_or_identity: NameOrIdentity, + procedure: String, +} + +async fn procedure( + State(worker_ctx): State, + Extension(auth): Extension, + Path(ProcedureParams { + name_or_identity, + procedure, + }): Path, + TypedHeader(content_type): TypedHeader, + ByteStringBody(body): ByteStringBody, +) -> axum::response::Result { + if content_type != headers::ContentType::json() { + return Err(axum::extract::rejection::MissingJsonContentType::default().into()); + } + let caller_identity = auth.claims.identity; + + let args = FunctionArgs::Json(body); + + let db_identity = name_or_identity.resolve(&worker_ctx).await?; + let database = worker_ctx_find_database(&worker_ctx, &db_identity) + .await? + .ok_or_else(|| { + log::error!("Could not find database: {}", db_identity.to_hex()); + NO_SUCH_DATABASE + })?; + + let leader = worker_ctx + .leader(database.id) + .await + .map_err(log_and_500)? + .ok_or(StatusCode::NOT_FOUND)?; + let module = leader.module().await.map_err(log_and_500)?; + + // HTTP callers always need a connection ID to provide to connect/disconnect, + // so generate one. + let connection_id = generate_random_connection_id(); + + // Call the database's `client_connected` reducer, if any. + // If it fails or rejects the connection, bail. + module + .call_identity_connected(auth.into(), connection_id) + .await + .map_err(client_connected_error_to_response)?; + + let result = match module + .call_procedure(caller_identity, Some(connection_id), None, &procedure, args) + .await + { + Ok(res) => Ok(res), + Err(e) => { + let status_code = match e { + ProcedureCallError::Args(_) => { + log::debug!("Attempt to call reducer with invalid arguments"); + StatusCode::BAD_REQUEST + } + ProcedureCallError::NoSuchModule(_) => StatusCode::NOT_FOUND, + ProcedureCallError::NoSuchProcedure => { + log::debug!("Attempt to call non-existent procedure {procedure}"); + StatusCode::NOT_FOUND + } + ProcedureCallError::OutOfEnergy => StatusCode::PAYMENT_REQUIRED, + ProcedureCallError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, + }; + log::error!("Error while invoking procedure {e:#}"); + Err((status_code, format!("{:#}", anyhow::anyhow!(e)))) + } + }; + + module + .call_identity_disconnected(caller_identity, connection_id) + .await + .map_err(client_disconnected_error_to_response)?; + + match result { + Ok(result) => { + // Procedures don't assign a special meaning to error returns, unlike reducers, + // as there's no transaction for them to automatically abort. + // Instead, we just pass on their return value with the OK status so long as we successfully invoked the procedure. + let (status, body) = procedure_outcome_response(result.return_val); + Ok(( + status, + TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)), + body, + )) + } + Err(e) => Err((e.0, e.1).into()), + } +} + +fn procedure_outcome_response(return_val: AlgebraicValue) -> (StatusCode, axum::response::Response) { + ( + StatusCode::OK, + axum::Json(sats::serde::SerdeWrapper(return_val)).into_response(), + ) +} + #[derive(Deserialize)] pub struct SchemaParams { name_or_identity: NameOrIdentity, @@ -940,6 +1046,8 @@ pub struct DatabaseRoutes { pub subscribe_get: MethodRouter, /// POST: /database/:name_or_identity/call/:reducer pub call_reducer_post: MethodRouter, + /// POST: /database/:name_or_identity/procedure/:reducer + pub call_procedure_post: MethodRouter, /// GET: /database/:name_or_identity/schema pub schema_get: MethodRouter, /// GET: /database/:name_or_identity/logs @@ -969,6 +1077,7 @@ where identity_get: get(get_identity::), subscribe_get: get(handle_websocket::), call_reducer_post: post(call::), + call_procedure_post: post(procedure::), schema_get: get(schema::), logs_get: get(logs::), sql_post: post(sql::), @@ -993,6 +1102,7 @@ where .route("/identity", self.identity_get) .route("/subscribe", self.subscribe_get) .route("/call/:reducer", self.call_reducer_post) + .route("/procedure/:procedure", self.call_procedure_post) .route("/schema", self.schema_get) .route("/logs", self.logs_get) .route("/sql", self.sql_post) diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index ccc63497bc2..7f3b3b24003 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -7,13 +7,14 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Instant, SystemTime}; -use super::messages::{OneOffQueryResponseMessage, SerializableMessage}; +use super::messages::{OneOffQueryResponseMessage, ProcedureResultMessage, SerializableMessage}; use super::{message_handlers, ClientActorId, MessageHandleError}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::host::module_host::ClientConnectedError; use crate::host::{FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError, ReducerCallResult}; use crate::messages::websocket::Subscribe; +use crate::subscription::module_subscription_manager::BroadcastError; use crate::util::asyncify; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; @@ -834,6 +835,29 @@ impl ClientConnection { .await } + pub async fn call_procedure( + &self, + procedure: &str, + args: FunctionArgs, + request_id: RequestId, + timer: Instant, + ) -> Result<(), BroadcastError> { + let res = self + .module() + .call_procedure( + self.id.identity, + Some(self.id.connection_id), + Some(timer), + procedure, + args, + ) + .await; + + self.module() + .subscriptions() + .send_procedure_message(self.sender(), ProcedureResultMessage::from_result(&res, request_id)) + } + pub async fn subscribe_single( &self, subscription: SubscribeSingle, diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index 90829afa9e5..313b87d0281 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -6,6 +6,7 @@ use crate::host::{FunctionArgs, ReducerId}; use crate::identity::Identity; use crate::messages::websocket::{CallReducer, ClientMessage, OneOffQuery}; use crate::worker_metrics::WORKER_METRICS; +use spacetimedb_client_api_messages::websocket::CallProcedure; use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_lib::de::serde::DeserializeWrapper; use spacetimedb_lib::identity::RequestId; @@ -129,9 +130,27 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst .observe(timer.elapsed().as_secs_f64()); res.map_err(|err| (None, None, err)) } + ClientMessage::CallProcedure(CallProcedure { + ref procedure, + args, + request_id, + flags: _, + }) => { + let res = client.call_procedure(procedure, args, request_id, timer).await; + WORKER_METRICS + .request_round_trip + .with_label_values(&WorkloadType::Procedure, &database_identity, procedure) + .observe(timer.elapsed().as_secs_f64()); + if let Err(e) = res { + log::warn!("Procedure call failed: {e:#}"); + } + // `ClientConnection::call_procedure` handles sending the error message to the client if the call fails, + // so we don't need to return an `Err` here. + Ok(()) + } }; - res.map_err(|(reducer, reducer_id, err)| MessageExecutionError { - reducer: reducer.cloned(), + res.map_err(|(reducer_name, reducer_id, err)| MessageExecutionError { + reducer: reducer_name.cloned(), reducer_id, caller_identity: client.id.identity, caller_connection_id: Some(client.id.connection_id), diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 67f3b90397b..f769689061a 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -1,6 +1,6 @@ use super::{ClientConfig, DataMessage, Protocol}; -use crate::host::module_host::{EventStatus, ModuleEvent}; -use crate::host::ArgsTuple; +use crate::host::module_host::{EventStatus, ModuleEvent, ProcedureCallError}; +use crate::host::{ArgsTuple, ProcedureCallResult}; use crate::messages::websocket as ws; use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress}; use bytes::{BufMut, Bytes, BytesMut}; @@ -13,7 +13,7 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::ser::serde::SerializeWrapper; -use spacetimedb_lib::{ConnectionId, TimeDuration}; +use spacetimedb_lib::{AlgebraicValue, ConnectionId, TimeDuration, Timestamp}; use spacetimedb_primitives::TableId; use spacetimedb_sats::bsatn; use std::sync::Arc; @@ -167,6 +167,7 @@ pub enum SerializableMessage { Subscribe(SubscriptionUpdateMessage), Subscription(SubscriptionMessage), TxUpdate(TransactionUpdateMessage), + ProcedureResult(ProcedureResultMessage), } impl SerializableMessage { @@ -177,7 +178,7 @@ impl SerializableMessage { Self::Subscribe(msg) => Some(msg.num_rows()), Self::Subscription(msg) => Some(msg.num_rows()), Self::TxUpdate(msg) => Some(msg.num_rows()), - Self::Identity(_) => None, + Self::Identity(_) | Self::ProcedureResult(_) => None, } } @@ -194,6 +195,7 @@ impl SerializableMessage { }, Self::TxUpdate(_) => Some(WorkloadType::Update), Self::Identity(_) => None, + Self::ProcedureResult(_) => Some(WorkloadType::Procedure), } } } @@ -208,6 +210,7 @@ impl ToProtocol for SerializableMessage { SerializableMessage::Subscribe(msg) => msg.to_protocol(protocol), SerializableMessage::TxUpdate(msg) => msg.to_protocol(protocol), SerializableMessage::Subscription(msg) => msg.to_protocol(protocol), + SerializableMessage::ProcedureResult(msg) => msg.to_protocol(protocol), } } } @@ -584,3 +587,87 @@ fn convert(msg: OneOffQueryResponseMessage) -> ws::Server total_host_execution_duration: msg.total_host_execution_duration, }) } + +#[derive(Debug)] +pub enum ProcedureStatus { + Returned(AlgebraicValue), + OutOfEnergy, + InternalError(String), +} + +#[derive(Debug)] +pub struct ProcedureResultMessage { + status: ProcedureStatus, + timestamp: Timestamp, + total_host_execution_duration: TimeDuration, + request_id: u32, +} + +impl ProcedureResultMessage { + pub fn from_result(res: &Result, request_id: RequestId) -> Self { + let (status, timestamp, execution_duration) = match res { + Ok(ProcedureCallResult { + return_val, + execution_duration, + start_timestamp, + }) => ( + ProcedureStatus::Returned(return_val.clone()), + *start_timestamp, + TimeDuration::from(*execution_duration), + ), + Err(err) => ( + match err { + ProcedureCallError::OutOfEnergy => ProcedureStatus::OutOfEnergy, + _ => ProcedureStatus::InternalError(format!("{err}")), + }, + Timestamp::UNIX_EPOCH, + TimeDuration::ZERO, + ), + }; + + ProcedureResultMessage { + status, + timestamp, + total_host_execution_duration: execution_duration, + request_id, + } + } +} + +impl ToProtocol for ProcedureResultMessage { + type Encoded = SwitchedServerMessage; + + fn to_protocol(self, protocol: Protocol) -> Self::Encoded { + fn convert( + msg: ProcedureResultMessage, + serialize_value: impl Fn(AlgebraicValue) -> F::Single, + ) -> ws::ServerMessage { + let ProcedureResultMessage { + status, + timestamp, + total_host_execution_duration, + request_id, + } = msg; + let status = match status { + ProcedureStatus::InternalError(msg) => ws::ProcedureStatus::InternalError(msg), + ProcedureStatus::OutOfEnergy => ws::ProcedureStatus::OutOfEnergy, + ProcedureStatus::Returned(val) => ws::ProcedureStatus::Returned(serialize_value(val)), + }; + ws::ServerMessage::ProcedureResult(ws::ProcedureResult { + status, + timestamp, + total_host_execution_duration, + request_id, + }) + } + + // Note that procedure returns are sent only to the caller, not broadcast to all subscribers, + // so we don't have to bother with memoizing the serialization the way we do for reducer args. + match protocol { + Protocol::Binary => FormatSwitch::Bsatn(convert(self, |val| bsatn::to_vec(&val).unwrap().into())), + Protocol::Text => FormatSwitch::Json(convert(self, |val| { + serde_json::to_string(&SerializeWrapper(val)).unwrap().into() + })), + } + } +} diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index cc266b30351..f81e5d6b018 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -29,7 +29,7 @@ use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; -use spacetimedb_lib::{hash_bytes, Identity}; +use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp}; use spacetimedb_paths::server::{ReplicaDir, ServerDataDir}; use spacetimedb_paths::FromPathUnchecked; use spacetimedb_sats::hash::Hash; @@ -170,6 +170,13 @@ impl From<&EventStatus> for ReducerOutcome { } } +#[derive(Clone, Debug)] +pub struct ProcedureCallResult { + pub return_val: AlgebraicValue, + pub execution_duration: Duration, + pub start_timestamp: Timestamp, +} + impl HostController { pub fn new( data_dir: Arc, diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index d7527b516b5..dda5e6fc3a2 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -179,8 +179,8 @@ impl InstanceEnv { &self.replica_ctx.database.database_identity } - /// Signal to this `InstanceEnv` that a reducer call is beginning. - pub fn start_reducer(&mut self, ts: Timestamp) { + /// Signal to this `InstanceEnv` that a reducer or procedure call is beginning. + pub fn start_funcall(&mut self, ts: Timestamp) { self.start_time = ts; } diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index d49ae054b27..8cbcab7b926 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -24,10 +24,10 @@ mod wasm_common; pub use disk_storage::DiskStorage; pub use host_controller::{ - extract_schema, ExternalDurability, ExternalStorage, HostController, MigratePlanResult, ProgramStorage, - ReducerCallResult, ReducerOutcome, + extract_schema, ExternalDurability, ExternalStorage, HostController, MigratePlanResult, ProcedureCallResult, + ProgramStorage, ReducerCallResult, ReducerOutcome, }; -pub use module_host::{ModuleHost, NoSuchModule, ReducerCallError, UpdateDatabaseResult}; +pub use module_host::{ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult}; pub use scheduler::Scheduler; /// Encoded arguments to a database function. @@ -167,4 +167,6 @@ pub enum AbiCall { GetJwt, VolatileNonatomicScheduleImmediate, + + ProcedureSleepUntil, } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 899f374cff1..f2f41b7b079 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,5 +1,6 @@ use super::{ - ArgsTuple, FunctionArgs, InvalidReducerArguments, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, + ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ProcedureCallResult, + ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, }; use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender}; @@ -46,16 +47,17 @@ use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::TableId; +use spacetimedb_primitives::{ProcedureId, TableId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::ProductValue; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; use spacetimedb_schema::def::deserialize::ArgsSeed; -use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef, ViewDef}; +use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; use spacetimedb_vm::relation::RelValue; use std::collections::VecDeque; use std::fmt; +use std::future::Future; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -400,6 +402,13 @@ impl Instance { Instance::Js(inst) => inst.call_reducer(tx, params), } } + + async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { + match self { + Instance::Wasm(inst) => inst.call_procedure(params).await, + Instance::Js(inst) => inst.call_procedure(params).await, + } + } } /// Creates the table for `table_def` in `stdb`. @@ -519,6 +528,15 @@ pub struct CallReducerParams { pub args: ArgsTuple, } +pub struct CallProcedureParams { + pub timestamp: Timestamp, + pub caller_identity: Identity, + pub caller_connection_id: ConnectionId, + pub timer: Option, + pub procedure_id: ProcedureId, + pub args: ArgsTuple, +} + /// Holds a [`Module`] and a set of [`Instance`]s from it, /// and allocates the [`Instance`]s to be used for function calls. /// @@ -672,6 +690,20 @@ pub enum ReducerCallError { LifecycleReducer(Lifecycle), } +#[derive(thiserror::Error, Debug)] +pub enum ProcedureCallError { + #[error(transparent)] + Args(#[from] InvalidProcedureArguments), + #[error(transparent)] + NoSuchModule(#[from] NoSuchModule), + #[error("no such procedure")] + NoSuchProcedure, + #[error("Procedure terminated due to insufficient budget")] + OutOfEnergy, + #[error("The WASM instance encountered a fatal error: {0}")] + InternalError(String), +} + #[derive(thiserror::Error, Debug)] pub enum InitDatabaseError { #[error(transparent)] @@ -794,6 +826,37 @@ impl ModuleHost { }) } + async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result + where + Fun: (FnOnce(Instance) -> Fut) + Send + 'static, + Fut: Future + Send + 'static, + R: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("procedure {label} panicked"); + (self.on_panic)(); + }); + + // TODO: should we be calling and/or `await`-ing `get_instance` within the below `run_job`? + // Unclear how much overhead this call can have. + let instance = self.instance_manager.lock().await.get_instance().await; + + let (res, instance) = self + .executor + .run_job(async move { + drop(timer_guard); + f(instance).await + }) + .await; + + self.instance_manager.lock().await.return_instance(instance); + + Ok(res) + } + /// Run a function on the JobThread for this module which has access to the module instance. async fn call(&self, label: &str, f: F) -> Result where @@ -1227,6 +1290,79 @@ impl ModuleHost { res } + pub async fn call_procedure( + &self, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result { + let res = async { + let (procedure_id, procedure_def) = self + .info + .module_def + .procedure_full(procedure_name) + .ok_or(ProcedureCallError::NoSuchProcedure)?; + self.call_procedure_inner( + caller_identity, + caller_connection_id, + timer, + procedure_id, + procedure_def, + args, + ) + .await + } + .await; + + let log_message = match &res { + Err(ProcedureCallError::NoSuchProcedure) => Some(format!( + "External attempt to call nonexistent procedure \"{procedure_name}\" failed. Have you run `spacetime generate` recently?" + )), + Err(ProcedureCallError::Args(_)) => Some(format!( + "External attempt to call procedure \"{procedure_name}\" failed, invalid arguments.\n\ + This is likely due to a mismatched client schema, have you run `spacetime generate` recently?" + )), + _ => None, + }; + + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, procedure_name, &log_message) + } + + res + } + + async fn call_procedure_inner( + &self, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_id: ProcedureId, + procedure_def: &ProcedureDef, + args: FunctionArgs, + ) -> Result { + let procedure_seed = ArgsSeed(self.info.module_def.typespace().with_type(procedure_def)); + let args = args.into_tuple(procedure_seed).map_err(InvalidProcedureArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + + self.call_async_with_instance(&procedure_def.name, async move |mut inst| { + let res = inst + .call_procedure(CallProcedureParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + timer, + procedure_id, + args, + }) + .await; + (res, inst) + }) + .await? + } + // Scheduled reducers require a different function here to call their reducer // because their reducer arguments are stored in the database and need to be fetched // within the same transaction as the reducer call. @@ -1312,11 +1448,11 @@ impl ModuleHost { self.module.scheduler().closed().await; } - pub fn inject_logs(&self, log_level: LogLevel, reducer_name: &str, message: &str) { + pub fn inject_logs(&self, log_level: LogLevel, function_name: &str, message: &str) { self.replica_ctx().logger.write( log_level, &Record { - function: Some(reducer_name), + function: Some(function_name), ..Record::injected(message) }, &(), diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 21af71e92e8..64cc58b3a73 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -7,7 +7,7 @@ use self::ser::serialize_to_js; use self::string::{str_from_ident, IntoJsString}; use self::syscall::{call_call_reducer, call_describe_module, call_reducer_fun, resolve_sys_module, FnRet}; use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; -use super::module_host::{CallReducerParams, Module, ModuleInfo, ModuleRuntime}; +use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::instance_env::{ChunkPool, InstanceEnv}; use crate::host::module_host::Instance; @@ -200,7 +200,7 @@ impl JsInstanceEnv { fn start_reducer(&mut self, name: &str, ts: Timestamp) { self.reducer_start = Instant::now(); name.clone_into(&mut self.reducer_name); - self.instance_env.start_reducer(ts); + self.instance_env.start_funcall(ts); } /// Returns the name of the most recent reducer to be run in this environment. @@ -295,6 +295,13 @@ impl JsInstance { response } + + pub async fn call_procedure( + &mut self, + _params: CallProcedureParams, + ) -> Result { + todo!("JS/TS module procedure support") + } } /// A request for the worker in [`spawn_instance_worker`]. diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index b44976e53c3..0fbdec0d17b 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -14,6 +14,8 @@ use spacetimedb_table::table::UniqueConstraintViolation; pub const CALL_REDUCER_DUNDER: &str = "__call_reducer__"; +pub const CALL_PROCEDURE_DUNDER: &str = "__call_procedure__"; + pub const DESCRIBE_MODULE_DUNDER: &str = "__describe_module__"; /// functions with this prefix run prior to __setup__, initializing global variables and the like @@ -384,8 +386,8 @@ pub struct AbiRuntimeError { } macro_rules! abi_funcs { - ($mac:ident) => { - $mac! { + ($link_sync:ident, $link_async:ident) => { + $link_sync! { "spacetime_10.0"::table_id_from_name, "spacetime_10.0"::datastore_table_row_count, "spacetime_10.0"::datastore_table_scan_bsatn, @@ -413,6 +415,10 @@ macro_rules! abi_funcs { "spacetime_10.2"::get_jwt, } + + $link_async! { + "spacetime_10.3"::procedure_sleep_until, + } }; } pub(crate) use abi_funcs; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 2cdf70335cd..c49b981c659 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,6 +1,10 @@ +use bytes::Bytes; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_lib::db::raw_def::v9::Lifecycle; +use spacetimedb_lib::de::DeserializeSeed; +use spacetimedb_primitives::ProcedureId; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; +use std::future::Future; use std::sync::Arc; use std::time::Duration; use tracing::span::EnteredSpan; @@ -12,9 +16,12 @@ use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ - CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, + CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, +}; +use crate::host::{ + ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, + UpdateDatabaseResult, }; -use crate::host::{ArgsTuple, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult}; use crate::identity::Identity; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContextLimited; @@ -48,6 +55,7 @@ pub trait WasmInstancePre: Send + Sync + 'static { fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result; } +#[async_trait::async_trait] pub trait WasmInstance: Send + Sync + 'static { fn extract_descriptions(&mut self) -> Result, DescribeError>; @@ -56,6 +64,8 @@ pub trait WasmInstance: Send + Sync + 'static { fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> ExecuteResult; fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error); + + async fn call_procedure(&mut self, op: ProcedureOp, budget: ReducerBudget) -> ProcedureExecuteResult; } pub struct EnergyStats { @@ -64,6 +74,11 @@ pub struct EnergyStats { } impl EnergyStats { + pub const ZERO: Self = Self { + budget: ReducerBudget::ZERO, + remaining: ReducerBudget::ZERO, + }; + /// Returns the used energy amount. fn used(&self) -> ReducerBudget { (self.budget.get() - self.remaining.get()).into() @@ -75,6 +90,17 @@ pub struct ExecutionTimings { pub wasm_instance_env_call_times: CallTimes, } +impl ExecutionTimings { + /// Not a `const` because there doesn't seem to be any way to `const` construct an `enum_map::EnumMap`, + /// which `CallTimes` uses. + pub fn zero() -> Self { + Self { + total_duration: Duration::ZERO, + wasm_instance_env_call_times: CallTimes::new(), + } + } +} + /// The result that `__call_reducer__` produces during normal non-trap execution. pub type ReducerResult = Result<(), Box>; @@ -85,6 +111,15 @@ pub struct ExecuteResult { pub call_result: anyhow::Result, } +pub struct ProcedureExecuteResult { + #[allow(unused)] + pub energy: EnergyStats, + #[allow(unused)] + pub timings: ExecutionTimings, + pub memory_allocation: usize, + pub call_result: Result, +} + pub struct WasmModuleHostActor { module: T::InstancePre, common: ModuleCommon, @@ -238,6 +273,24 @@ impl WasmModuleInstance { pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) } + + pub async fn call_procedure( + &mut self, + params: CallProcedureParams, + ) -> Result { + let res = self + .common + .call_procedure( + params, + |ty, fun, err| T::log_traceback(ty, fun, err), + |op, budget| self.instance.call_procedure(op, budget), + ) + .await; + if res.is_err() { + self.trapped = true; + } + res + } } impl WasmModuleInstance { @@ -340,6 +393,102 @@ impl InstanceCommon { } } + async fn call_procedure>( + &mut self, + params: CallProcedureParams, + log_traceback: impl FnOnce(&str, &str, &anyhow::Error), + vm_call_procedure: impl FnOnce(ProcedureOp, ReducerBudget) -> F, + ) -> Result { + let CallProcedureParams { + timestamp, + caller_identity, + caller_connection_id, + timer, + procedure_id, + args, + } = params; + + // We've already validated by this point that the procedure exists, + // so it's fine to use the panicking `procedure_by_id`. + let procedure_def = self.info.module_def.procedure_by_id(procedure_id); + let procedure_name: &str = &procedure_def.name; + + // TODO(observability): Add tracing spans, energy, metrics? + // These will require further thinking once we implement procedure suspend/resume, + // and so are not worth doing yet. + + let op = ProcedureOp { + id: procedure_id, + name: procedure_name.into(), + caller_identity, + caller_connection_id, + timestamp, + arg_bytes: args.get_bsatn().clone(), + }; + + let energy_fingerprint = ReducerFingerprint { + module_hash: self.info.module_hash, + module_identity: self.info.owner_identity, + caller_identity, + reducer_name: &procedure_def.name, + }; + + // TODO: replace with call to separate function `procedure_budget`. + let budget = self.energy_monitor.reducer_budget(&energy_fingerprint); + + let result = vm_call_procedure(op, budget).await; + + let ProcedureExecuteResult { + memory_allocation, + call_result, + // TODO: Do something with timing and energy. + .. + } = result; + + if self.allocated_memory != memory_allocation { + self.metric_wasm_memory_bytes.set(memory_allocation as i64); + self.allocated_memory = memory_allocation; + } + + match call_result { + Err(err) => { + log_traceback("procedure", &procedure_def.name, &err); + + WORKER_METRICS + .wasm_instance_errors + .with_label_values( + &caller_identity, + &self.info.module_hash, + &caller_connection_id, + procedure_name, + ) + .inc(); + + // TODO(procedure-energy): + // if energy.remaining.get() == 0 { + // return Err(ProcedureCallError::OutOfEnergy); + // } else + { + Err(ProcedureCallError::InternalError(format!("{err}"))) + } + } + Ok(return_val) => { + // TODO: deserialize return_val at its appropriate type, which you get out of the procedure def, + // then return it in `Ok`. + let return_type = &procedure_def.return_type; + let seed = spacetimedb_sats::WithTypespace::new(self.info.module_def.typespace(), return_type); + let return_val = seed + .deserialize(bsatn::Deserializer::new(&mut &return_val[..])) + .map_err(|err| ProcedureCallError::InternalError(format!("{err}")))?; + Ok(ProcedureCallResult { + return_val, + execution_duration: timer.map(|timer| timer.elapsed()).unwrap_or_default(), + start_timestamp: timestamp, + }) + } + } + } + /// Execute a reducer. /// /// If `Some` [`MutTxId`] is supplied, the reducer is called within the @@ -689,3 +838,13 @@ impl From> for execution_context::ReducerContext { } } } + +#[derive(Clone, Debug)] +pub struct ProcedureOp { + pub id: ProcedureId, + pub name: Box, + pub caller_identity: Identity, + pub caller_connection_id: ConnectionId, + pub timestamp: Timestamp, + pub arg_bytes: Bytes, +} diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 9ccf8759361..d0997b66312 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -11,6 +11,7 @@ use anyhow::Context as _; use spacetimedb_data_structures::map::IntMap; use spacetimedb_lib::{ConnectionId, Timestamp}; use spacetimedb_primitives::{errno, ColId}; +use std::future::Future; use std::num::NonZeroU32; use std::time::Instant; use wasmtime::{AsContext, Caller, StoreContextMut}; @@ -102,8 +103,8 @@ pub(super) struct WasmInstanceEnv { /// Track time spent in module-defined spans. timing_spans: TimingSpanSet, - /// The point in time the last reducer call started at. - reducer_start: Instant, + /// The point in time the last, or current, reducer or procedure call started at. + funcall_start: Instant, /// Track time spent in all wasm instance env calls (aka syscall time). /// @@ -111,8 +112,8 @@ pub(super) struct WasmInstanceEnv { /// to this tracker. call_times: CallTimes, - /// The last, including current, reducer to be executed by this environment. - reducer_name: String, + /// The name of the last, including current, reducer or procedure to be executed by this environment. + funcall_name: String, /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. @@ -129,7 +130,7 @@ type RtResult = anyhow::Result; impl WasmInstanceEnv { /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`. pub fn new(instance_env: InstanceEnv) -> Self { - let reducer_start = Instant::now(); + let funcall_start = Instant::now(); Self { instance_env, mem: None, @@ -138,9 +139,9 @@ impl WasmInstanceEnv { standard_bytes_sink: None, iters: Default::default(), timing_spans: Default::default(), - reducer_start, + funcall_start, call_times: CallTimes::new(), - reducer_name: String::from(""), + funcall_name: String::from(""), chunk_pool: <_>::default(), } } @@ -219,48 +220,54 @@ impl WasmInstanceEnv { self.standard_bytes_sink.take().unwrap_or_default() } - /// Signal to this `WasmInstanceEnv` that a reducer call is beginning. + /// Signal to this `WasmInstanceEnv` that a reducer or procedure call is beginning. /// - /// Returns the handle used by reducers to read from `args` - /// as well as the handle used to write the error message, if any. - pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (BytesSourceId, u32) { + /// Returns the handle used by reducers and procedures to read from `args` + /// as well as the handle used to write the reducer error message or procedure return value. + pub fn start_funcall(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (BytesSourceId, u32) { + // Create the output sink. + // Reducers which fail will write their error message here. + // Procedures will write their result here. let errors = self.setup_standard_bytes_sink(); let args = self.create_bytes_source(args).unwrap(); - self.reducer_start = Instant::now(); - name.clone_into(&mut self.reducer_name); - self.instance_env.start_reducer(ts); + self.funcall_start = Instant::now(); + name.clone_into(&mut self.funcall_name); + self.instance_env.start_funcall(ts); (args, errors) } - /// Returns the name of the most recent reducer to be run in this environment. - pub fn reducer_name(&self) -> &str { - &self.reducer_name + /// Returns the name of the most recent reducer or procedure to be run in this environment. + pub fn funcall_name(&self) -> &str { + &self.funcall_name } - /// Returns the name of the most recent reducer to be run in this environment, - /// or `None` if no reducer is actively being invoked. + /// Returns the name of the most recent reducer or procedure to be run in this environment, + /// or `None` if no reducer or procedure is actively being invoked. fn log_record_function(&self) -> Option<&str> { - let function = self.reducer_name(); + let function = self.funcall_name(); (!function.is_empty()).then_some(function) } - /// Returns the name of the most recent reducer to be run in this environment. - pub fn reducer_start(&self) -> Instant { - self.reducer_start + /// Returns the start time of the most recent reducer or procedure to be run in this environment. + pub fn funcall_start(&self) -> Instant { + self.funcall_start } - /// Signal to this `WasmInstanceEnv` that a reducer call is over. - /// This resets all of the state associated to a single reducer call, - /// and returns instrumentation records. - pub fn finish_reducer(&mut self) -> (ExecutionTimings, Vec) { + /// Signal to this `WasmInstanceEnv` that a reducer or procedure call is over. + /// + /// Returns time measurements which can be recorded as metrics, + /// and the errors written by the WASM code to hte standard error sink. + /// + /// This resets the call times and clears the arguments source and error sink. + pub fn finish_funcall(&mut self) -> (ExecutionTimings, Vec) { // For the moment, // we only explicitly clear the source/sink buffers and the "syscall" times. // TODO: should we be clearing `iters` and/or `timing_spans`? - let total_duration = self.reducer_start.elapsed(); + let total_duration = self.funcall_start.elapsed(); // Taking the call times record also resets timings to 0s for the next call. let wasm_instance_env_call_times = self.call_times.take(); @@ -1368,6 +1375,36 @@ impl WasmInstanceEnv { Ok(()) }) } + + pub fn procedure_sleep_until<'caller>( + mut caller: Caller<'caller, Self>, + (wake_at_micros_since_unix_epoch,): (i64,), + ) -> Box + Send + 'caller> { + Box::new(async move { + use std::time::SystemTime; + let span_start = span::CallSpanStart::new(AbiCall::ProcedureSleepUntil); + + let get_current_time = || Timestamp::now().to_micros_since_unix_epoch(); + + if wake_at_micros_since_unix_epoch < 0 { + return get_current_time(); + } + + let wake_at = Timestamp::from_micros_since_unix_epoch(wake_at_micros_since_unix_epoch); + let Ok(duration) = SystemTime::from(wake_at).duration_since(SystemTime::now()) else { + return get_current_time(); + }; + + tokio::time::sleep(duration).await; + + let res = get_current_time(); + + let span = span_start.end(); + span::record_span(&mut caller.data_mut().call_times, span); + + res + }) + } } impl BacktraceProvider for wasmtime::StoreContext<'_, T> { diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index eaf5b682e29..098ca8cd1b7 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -9,6 +9,7 @@ use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationE use crate::host::wasm_common::*; use crate::util::string_from_utf8_lossy_owned; use futures_util::FutureExt; +use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use wasmtime::{ AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, @@ -39,7 +40,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 2); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 3); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; @@ -49,7 +50,13 @@ impl WasmtimeModule { linker$(.func_wrap($module, stringify!($func), WasmInstanceEnv::$func)?)*; } } - abi_funcs!(link_functions); + macro_rules! link_async_functions { + ($($module:literal :: $func:ident,)*) => { + #[allow(deprecated)] + linker$(.func_wrap_async($module, stringify!($func), WasmInstanceEnv::$func)?)*; + } + } + abi_funcs!(link_functions, link_async_functions); Ok(()) } } @@ -126,9 +133,11 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { store.epoch_deadline_callback(|store| { let env = store.data(); let database = env.instance_env().replica_ctx.database_identity; - let reducer = env.reducer_name(); - let dur = env.reducer_start().elapsed(); - tracing::warn!(reducer, ?database, "Wasm has been running for {dur:?}"); + let funcall = env.funcall_name(); + let dur = env.funcall_start().elapsed(); + // TODO(procedure-timing): This measurement is not super meaningful for procedures, + // which may (will) suspend execution and therefore may not have been continuously running since `env.funcall_start`. + tracing::warn!(funcall, ?database, "Wasm has been running for {dur:?}"); Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND)) }); @@ -162,22 +171,78 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { .get_typed_func(&mut store, CALL_REDUCER_DUNDER) .expect("no call_reducer"); + let call_procedure = get_call_procedure(&mut store, &instance); + Ok(WasmtimeInstance { store, instance, call_reducer, + call_procedure, }) } } -type CallReducerType = TypedFunc<(u32, u64, u64, u64, u64, u64, u64, u64, u32, u32), i32>; +/// Look up the `instance`'s export named by [`CALL_PROCEDURE_DUNDER`]. +/// +/// Return `None` if the `instance` has no such export. +/// Modules from before the introduction of procedures will not have a `__call_procedure__` export, +/// which is fine because they also won't define any procedures. +/// +/// Panicks if the `instance` has an export at the expected name, +/// but it is not a function or is a function of an inappropriate type. +/// For new modules, this will be caught during publish. +/// Old modules from before the introduction of procedures might have an export at that name, +/// but it follows the double-underscore pattern of reserved names, +/// so we're fine to break those modules. +fn get_call_procedure(store: &mut Store, instance: &Instance) -> Option { + // Wasmtime uses `anyhow` for error reporting, vexing library users the world over. + // This means we can't distinguish between the failure modes of `Instance::get_typed_func`. + // Instead, we type out the body of that method ourselves, + // but with error handling appropriate to our needs. + let export = instance.get_export(store.as_context_mut(), CALL_PROCEDURE_DUNDER)?; + + Some( + export + .into_func() + .unwrap_or_else(|| panic!("{CALL_PROCEDURE_DUNDER} export is not a function")) + .typed(store) + .unwrap_or_else(|err| panic!("{CALL_PROCEDURE_DUNDER} export is a function with incorrect type: {err}")), + ) +} + +type CallReducerType = TypedFunc< + ( + // Reducer ID, + u32, + // Sender `Identity` + u64, + u64, + u64, + u64, + // Sender `ConnectionId`, or 0 for none. + u64, + u64, + // Start timestamp. + u64, + // Args byte source. + u32, + // Errors byte sink. + u32, + ), + // Errno. + i32, +>; +// `__call_procedure__` takes the same arguments as `__call_reducer__`. +type CallProcedureType = CallReducerType; pub struct WasmtimeInstance { store: Store, instance: Instance, call_reducer: CallReducerType, + call_procedure: Option, } +#[async_trait::async_trait] impl module_host_actor::WasmInstance for WasmtimeInstance { fn extract_descriptions(&mut self) -> Result, DescribeError> { let describer_func_name = DESCRIBE_MODULE_DUNDER; @@ -206,18 +271,17 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { #[tracing::instrument(level = "trace", skip_all)] fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> module_host_actor::ExecuteResult { let store = &mut self.store; - // Set the fuel budget in WASM. - set_store_fuel(store, budget.into()); - store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); + + prepare_store_for_call(store, budget); // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. - let [sender_0, sender_1, sender_2, sender_3] = bytemuck::must_cast(op.caller_identity.to_byte_array()); - let [conn_id_0, conn_id_1] = bytemuck::must_cast(op.caller_connection_id.as_le_byte_array()); + let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.caller_identity); + let [conn_id_0, conn_id_1] = prepare_connection_id_for_call(*op.caller_connection_id); // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); - let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp); + let (args_source, errors_sink) = store.data_mut().start_funcall(op.name, args_bytes, op.timestamp); let call_result = call_sync_typed_func( &self.call_reducer, @@ -239,7 +303,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Signal that this reducer call is finished. This gets us the timings // associated to our reducer call, and clears all of the instance state // associated to the call. - let (timings, error) = store.data_mut().finish_reducer(); + let (timings, error) = store.data_mut().finish_funcall(); let call_result = call_result.map(|code| handle_error_sink_code(code, error)); @@ -260,6 +324,77 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error) { log_traceback(func_type, func, trap) } + + #[tracing::instrument(level = "trace", skip_all)] + async fn call_procedure( + &mut self, + op: module_host_actor::ProcedureOp, + budget: ReducerBudget, + ) -> module_host_actor::ProcedureExecuteResult { + let store = &mut self.store; + prepare_store_for_call(store, budget); + + // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. + let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(op.caller_identity); + let [conn_id_0, conn_id_1] = prepare_connection_id_for_call(op.caller_connection_id); + + // Prepare arguments to the reducer + the error sink & start timings. + let (args_source, result_sink) = store.data_mut().start_funcall(&op.name, op.arg_bytes, op.timestamp); + + let Some(call_procedure) = self.call_procedure.as_ref() else { + return module_host_actor::ProcedureExecuteResult { + energy: module_host_actor::EnergyStats::ZERO, + timings: module_host_actor::ExecutionTimings::zero(), + memory_allocation: get_memory_size(store), + call_result: Err(anyhow::anyhow!( + "Module defines procedure {} but does not export `{}`", + op.name, + CALL_PROCEDURE_DUNDER, + )), + }; + }; + let call_result = call_procedure + .call_async( + &mut *store, + ( + op.id.0, + sender_0, + sender_1, + sender_2, + sender_3, + conn_id_0, + conn_id_1, + op.timestamp.to_micros_since_unix_epoch() as u64, + args_source.0, + result_sink, + ), + ) + .await; + + // Close the timing span for this procedure and get the BSATN bytes of its result. + let (timings, result_bytes) = store.data_mut().finish_funcall(); + + let call_result = call_result.and_then(|code| { + (code == 0).then_some(result_bytes.into()).ok_or_else(|| { + anyhow::anyhow!( + "{CALL_PROCEDURE_DUNDER} returned unexpected code {code}. Procedures should return code 0 or trap." + ) + }) + }); + + let remaining_fuel = get_store_fuel(store); + let remaining = ReducerBudget::from(remaining_fuel); + + let energy = module_host_actor::EnergyStats { budget, remaining }; + let memory_allocation = get_memory_size(store); + + module_host_actor::ProcedureExecuteResult { + energy, + timings, + memory_allocation, + call_result, + } + } } fn set_store_fuel(store: &mut impl AsContextMut, fuel: WasmtimeFuel) { @@ -270,6 +405,49 @@ fn get_store_fuel(store: &impl AsContext) -> WasmtimeFuel { WasmtimeFuel(store.as_context().get_fuel().unwrap()) } +fn prepare_store_for_call(store: &mut Store, budget: ReducerBudget) { + // note that ReducerBudget being a u64 is load-bearing here - although we convert budget right back into + // EnergyQuanta at the end of this function, from_energy_quanta clamps it to a u64 range. + // otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate. + set_store_fuel(store, budget.into()); + + // This seems odd, as we don't use epoch interruption, at least as far as I (pgoldman 2025-08-22) know. + // But this call was here prior to my last edit. + // The previous line git-blames to https://github.com/clockworklabs/spacetimeDB/pull/2738 . + store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); +} + +/// Convert `caller_identity` to the format used by `__call_reducer__` and `__call_procedure__`, +/// i.e. an array of 4 `u64`s. +/// +/// Callers should destructure this like: +/// ```ignore +/// # let identity = Identity::ZERO; +/// let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(identity); +/// ``` +fn prepare_identity_for_call(caller_identity: Identity) -> [u64; 4] { + // Encode this as a LITTLE-ENDIAN byte array + bytemuck::must_cast(caller_identity.to_byte_array()) +} + +/// Convert `caller_connection_id` to the format used by `__call_reducer` and `__call_procedure__`, +/// i.e. an array of 2 `u64`s. +/// +/// Callers should destructure this like: +/// ```ignore +/// # let connection_id = ConnectionId::ZERO; +/// let [conn_id_0, conn_id_1] = prepare_connection_id_for_call(connection_id); +/// ``` +/// +fn prepare_connection_id_for_call(caller_connection_id: ConnectionId) -> [u64; 2] { + // Encode this as a LITTLE-ENDIAN byte array + bytemuck::must_cast(caller_connection_id.as_le_byte_array()) +} + +fn get_memory_size(store: &Store) -> usize { + store.data().get_mem().memory.data_size(store) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 41eb62e60ba..1c1f492b285 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -7,8 +7,8 @@ use super::query::compile_query_with_hashes; use super::tx::DeltaTx; use super::{collect_table_update, TableUpdateType}; use crate::client::messages::{ - SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, - SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage, + ProcedureResultMessage, SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, + SubscriptionResult, SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage, }; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; @@ -647,6 +647,22 @@ impl ModuleSubscriptions { .send_client_message(recipient, Some(tx_offset), message) } + /// Like [`Self::send_client_message`], + /// but doesn't require a `TxId` because procedures don't hold a transaction open. + pub fn send_procedure_message( + &self, + recipient: Arc, + message: ProcedureResultMessage, + ) -> Result<(), BroadcastError> { + self.broadcast_queue.send_client_message( + recipient, + // TODO(procedure-tx): We'll need some mechanism for procedures to report their last-referenced TxOffset, + // and to pass it here. + // This is currently moot, as procedures have no way to open a transaction yet. + None, message, + ) + } + #[tracing::instrument(level = "trace", skip_all)] pub fn add_multi_subscription( &self, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index d97acd652b7..059e2486278 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -71,6 +71,9 @@ impl CoreInfo { // However, `max_blocking_threads` will panic if passed 0, so we set a limit of 1 // and use `on_thread_start` to log an error when spawning a blocking task. .max_blocking_threads(1) + // Enable the timer system so that `procedure_sleep_until` can work. + // TODO(procedure-sleep): Remove this. + .enable_time() .on_thread_start({ use std::sync::atomic::{AtomicBool, Ordering}; let already_spawned_worker = AtomicBool::new(false); diff --git a/crates/datastore/src/execution_context.rs b/crates/datastore/src/execution_context.rs index f4fbcaf84ae..ef72cbbdf25 100644 --- a/crates/datastore/src/execution_context.rs +++ b/crates/datastore/src/execution_context.rs @@ -131,6 +131,7 @@ pub enum WorkloadType { Unsubscribe, Update, Internal, + Procedure, } impl Default for WorkloadType { diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index aa5c3f6fb27..b7f507f263b 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -1184,7 +1184,8 @@ async fn parse_loop( error: e.error.to_string(), }, ws::ServerMessage::SubscribeApplied(_) => unreachable!("Rust client SDK never sends `SubscribeSingle`, but received a `SubscribeApplied` from the host... huh?"), - ws::ServerMessage::UnsubscribeApplied(_) => unreachable!("Rust client SDK never sends `UnsubscribeSingle`, but received a `UnsubscribeApplied` from the host... huh?") + ws::ServerMessage::UnsubscribeApplied(_) => unreachable!("Rust client SDK never sends `UnsubscribeSingle`, but received a `UnsubscribeApplied` from the host... huh?"), + ws::ServerMessage::ProcedureResult(_) => todo!("Rust client SDK procedure support"), }) .expect("Failed to send ParsedMessage to main thread"); } diff --git a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs index 77cb1bb77fd..4c763beb353 100644 --- a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.6.0 (commit 11eb6b1cc9098d6b3727cef255b0c6b3dbf1df97). +// This was generated using spacetimedb cli version 1.6.0 (commit a952ba57372dfbee37da9d079a3f86704ca35611). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; diff --git a/sdks/rust/tests/test-client/src/module_bindings/mod.rs b/sdks/rust/tests/test-client/src/module_bindings/mod.rs index f7eb3a169d9..234638a8f28 100644 --- a/sdks/rust/tests/test-client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/test-client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.6.0 (commit 11eb6b1cc9098d6b3727cef255b0c6b3dbf1df97). +// This was generated using spacetimedb cli version 1.6.0 (commit a952ba57372dfbee37da9d079a3f86704ca35611). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; From 55fb010a78fb7f47aa2c3aa829225af7b0a25dae Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:17:39 -0400 Subject: [PATCH 2/9] Docs and comments from Mazdak's review --- crates/bindings-sys/src/lib.rs | 1 + crates/client-api-messages/src/websocket.rs | 11 ++++++++++- crates/core/src/client/messages.rs | 5 +++++ crates/core/src/host/module_host.rs | 4 ++-- .../src/host/wasm_common/module_host_actor.rs | 9 ++++----- .../core/src/host/wasmtime/wasm_instance_env.rs | 16 +++++++++++++++- crates/core/src/host/wasmtime/wasmtime_module.rs | 7 +++---- 7 files changed, 40 insertions(+), 13 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index c1032777ff3..fa981d5d3b5 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -660,6 +660,7 @@ pub mod raw { /// /// - The calling WASM instance is holding open a transaction. /// - The calling WASM instance is not executing a procedure. + // TODO(procedure-sleep-until): remove this pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64; } diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index c4234ef99bd..4620f508809 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -307,6 +307,9 @@ pub struct OneOffQuery { #[derive(SpacetimeType)] #[sats(crate = spacetimedb_lib)] +/// Request a procedure run. +/// +/// Parametric over the argument type to enable [`ClientMessage::map_args`]. pub struct CallProcedure { /// The name of the procedure to call. pub procedure: Box, @@ -370,7 +373,7 @@ pub enum ServerMessage { SubscribeMultiApplied(SubscribeMultiApplied), /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows. UnsubscribeMultiApplied(UnsubscribeMultiApplied), - /// Sent in response to a `ProcedureCall` message. This contains the return value. + /// Sent in response to a [`CallProcedure`] message. This contains the return value. ProcedureResult(ProcedureResult), } @@ -751,6 +754,10 @@ pub struct OneOffTable { pub rows: F::List, } +/// The result of running a procedure, +/// including the return value of the procedure on success. +/// +/// Sent in response to a [`CallProcedure`] message. #[derive(SpacetimeType, Debug)] #[sats(crate = spacetimedb_lib)] pub struct ProcedureResult { @@ -770,6 +777,8 @@ pub struct ProcedureResult { pub request_id: u32, } +/// The status of a procedure call, +/// including the return value on success. #[derive(SpacetimeType, Debug)] #[sats(crate = spacetimedb_lib)] pub enum ProcedureStatus { diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index f769689061a..58934a09d53 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -588,13 +588,18 @@ fn convert(msg: OneOffQueryResponseMessage) -> ws::Server }) } +/// Result of a procedure run. #[derive(Debug)] pub enum ProcedureStatus { + /// The procedure ran to completion and returned this value. Returned(AlgebraicValue), + /// The procedure was terminated due to running out of energy. OutOfEnergy, + /// The procedure failed to run to completion. This string describes the failure. InternalError(String), } +/// Will be sent to the caller of a procedure after that procedure finishes running. #[derive(Debug)] pub struct ProcedureResultMessage { status: ProcedureStatus, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index f2f41b7b079..ccf2c974cc5 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -696,11 +696,11 @@ pub enum ProcedureCallError { Args(#[from] InvalidProcedureArguments), #[error(transparent)] NoSuchModule(#[from] NoSuchModule), - #[error("no such procedure")] + #[error("No such procedure")] NoSuchProcedure, #[error("Procedure terminated due to insufficient budget")] OutOfEnergy, - #[error("The WASM instance encountered a fatal error: {0}")] + #[error("The module instance encountered a fatal error: {0}")] InternalError(String), } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c49b981c659..05f9654a480 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -117,7 +117,7 @@ pub struct ProcedureExecuteResult { #[allow(unused)] pub timings: ExecutionTimings, pub memory_allocation: usize, - pub call_result: Result, + pub call_result: anyhow::Result, } pub struct WasmModuleHostActor { @@ -433,7 +433,7 @@ impl InstanceCommon { reducer_name: &procedure_def.name, }; - // TODO: replace with call to separate function `procedure_budget`. + // TODO(procedure-energy): replace with call to separate function `procedure_budget`. let budget = self.energy_monitor.reducer_budget(&energy_fingerprint); let result = vm_call_procedure(op, budget).await; @@ -441,7 +441,7 @@ impl InstanceCommon { let ProcedureExecuteResult { memory_allocation, call_result, - // TODO: Do something with timing and energy. + // TODO(procedure-energy): Do something with timing and energy. .. } = result; @@ -473,8 +473,6 @@ impl InstanceCommon { } } Ok(return_val) => { - // TODO: deserialize return_val at its appropriate type, which you get out of the procedure def, - // then return it in `Ok`. let return_type = &procedure_def.return_type; let seed = spacetimedb_sats::WithTypespace::new(self.info.module_def.typespace(), return_type); let return_val = seed @@ -839,6 +837,7 @@ impl From> for execution_context::ReducerContext { } } +/// Describes a procedure call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct ProcedureOp { pub id: ProcedureId, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index d0997b66312..a9792dec042 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -259,7 +259,7 @@ impl WasmInstanceEnv { /// Signal to this `WasmInstanceEnv` that a reducer or procedure call is over. /// /// Returns time measurements which can be recorded as metrics, - /// and the errors written by the WASM code to hte standard error sink. + /// and the errors written by the WASM code to the standard error sink. /// /// This resets the call times and clears the arguments source and error sink. pub fn finish_funcall(&mut self) -> (ExecutionTimings, Vec) { @@ -1376,6 +1376,20 @@ impl WasmInstanceEnv { }) } + /// Suspends execution of this WASM instance until approximately `wake_at_micros_since_unix_epoch`. + /// + /// Returns immediately if `wake_at_micros_since_unix_epoch` is in the past. + /// + /// Upon resuming, returns the current timestamp as microseconds since the Unix epoch. + /// + /// Not particularly useful, except for testing SpacetimeDB internals related to suspending procedure execution. + /// # Traps + /// + /// Traps if: + /// + /// - The calling WASM instance is holding open a transaction. + /// - The calling WASM instance is not executing a procedure. + // TODO(procedure-sleep-until): remove this pub fn procedure_sleep_until<'caller>( mut caller: Caller<'caller, Self>, (wake_at_micros_since_unix_epoch,): (i64,), diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 098ca8cd1b7..7cae069db93 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -188,7 +188,7 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { /// Modules from before the introduction of procedures will not have a `__call_procedure__` export, /// which is fine because they also won't define any procedures. /// -/// Panicks if the `instance` has an export at the expected name, +/// Panics if the `instance` has an export at the expected name, /// but it is not a function or is a function of an inappropriate type. /// For new modules, this will be caught during publish. /// Old modules from before the introduction of procedures might have an export at that name, @@ -411,9 +411,8 @@ fn prepare_store_for_call(store: &mut Store, budget: ReducerBud // otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate. set_store_fuel(store, budget.into()); - // This seems odd, as we don't use epoch interruption, at least as far as I (pgoldman 2025-08-22) know. - // But this call was here prior to my last edit. - // The previous line git-blames to https://github.com/clockworklabs/spacetimeDB/pull/2738 . + // We enable epoch interruption only to log on long-running WASM functions. + // Our epoch interrupt callback logs and then immediately resumes execution. store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); } From bf8e41a348c8e64d6c552d5fb6ded0120a7e85c5 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:19:35 -0400 Subject: [PATCH 3/9] unwrap -> expect --- crates/core/src/client/messages.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 58934a09d53..eb2ace727e5 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -669,9 +669,15 @@ impl ToProtocol for ProcedureResultMessage { // Note that procedure returns are sent only to the caller, not broadcast to all subscribers, // so we don't have to bother with memoizing the serialization the way we do for reducer args. match protocol { - Protocol::Binary => FormatSwitch::Bsatn(convert(self, |val| bsatn::to_vec(&val).unwrap().into())), + Protocol::Binary => FormatSwitch::Bsatn(convert(self, |val| { + bsatn::to_vec(&val) + .expect("Procedure return value failed to serialize to BSATN") + .into() + })), Protocol::Text => FormatSwitch::Json(convert(self, |val| { - serde_json::to_string(&SerializeWrapper(val)).unwrap().into() + serde_json::to_string(&SerializeWrapper(val)) + .expect("Procedure return value failed to serialize to JSON") + .into() })), } } From eb1fcb2080a5e70969962c974ce0c9754877ecff Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:23:06 -0400 Subject: [PATCH 4/9] Mark sleep_until as unstable --- crates/bindings-sys/src/lib.rs | 2 ++ crates/bindings/src/lib.rs | 3 ++- crates/core/src/host/wasmtime/wasm_instance_env.rs | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index fa981d5d3b5..0e1c786eb39 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -661,6 +661,7 @@ pub mod raw { /// - The calling WASM instance is holding open a transaction. /// - The calling WASM instance is not executing a procedure. // TODO(procedure-sleep-until): remove this + #[cfg(feature = "unstable")] pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64; } @@ -1241,6 +1242,7 @@ impl Drop for RowIter { pub mod procedure { //! Side-effecting or asynchronous operations which only procedures are allowed to perform. #[inline] + #[cfg(feature = "unstable")] pub fn sleep_until(wake_at_timestamp: i64) -> i64 { // Safety: Just calling an `extern "C"` function. // Nothing weird happening here. diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index 7e75d36214b..c40412b6882 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -1089,7 +1089,8 @@ impl ProcedureContext { /// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}"); /// # } /// ``` - // TODO(procedure-async): mark this method `async`. + // TODO(procedure-sleep-until): remove this method + #[cfg(feature = "unstable")] pub fn sleep_until(&mut self, timestamp: Timestamp) { let new_time = sys::procedure::sleep_until(timestamp.to_micros_since_unix_epoch()); let new_time = Timestamp::from_micros_since_unix_epoch(new_time); diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index a9792dec042..6f0c46ebf99 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1383,6 +1383,9 @@ impl WasmInstanceEnv { /// Upon resuming, returns the current timestamp as microseconds since the Unix epoch. /// /// Not particularly useful, except for testing SpacetimeDB internals related to suspending procedure execution. + /// + /// In our public module-facing interfaces, this function is marked as unstable. + /// /// # Traps /// /// Traps if: From dc4951dfd6126d42ba79b6e01e9ebfe7e265fcfe Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:40:05 -0400 Subject: [PATCH 5/9] Deduplicate `Host` and `Database` lookup in client-api --- crates/client-api/src/routes/database.rs | 109 +++++++++++------------ 1 file changed, 50 insertions(+), 59 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 1869860cc02..6fecde08bbd 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -8,7 +8,7 @@ use crate::auth::{ }; use crate::routes::subscribe::generate_random_connection_id; pub use crate::util::{ByteStringBody, NameOrIdentity}; -use crate::{log_and_500, ControlStateDelegate, DatabaseDef, NodeDelegate}; +use crate::{log_and_500, ControlStateDelegate, DatabaseDef, Host, NodeDelegate}; use axum::body::{Body, Bytes}; use axum::extract::{Path, Query, State}; use axum::response::{ErrorResponse, IntoResponse}; @@ -20,9 +20,9 @@ use http::StatusCode; use serde::Deserialize; use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::host::module_host::ClientConnectedError; -use spacetimedb::host::ReducerOutcome; use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::{FunctionArgs, MigratePlanResult}; +use spacetimedb::host::{ModuleHost, ReducerOutcome}; use spacetimedb::host::{ProcedureCallError, ReducerCallError}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, HostType}; @@ -56,33 +56,18 @@ pub async fn call( TypedHeader(content_type): TypedHeader, ByteStringBody(body): ByteStringBody, ) -> axum::response::Result { - if content_type != headers::ContentType::json() { - return Err(axum::extract::rejection::MissingJsonContentType::default().into()); - } + assert_content_type_json(content_type)?; + let caller_identity = auth.claims.identity; let args = FunctionArgs::Json(body); - let db_identity = name_or_identity.resolve(&worker_ctx).await?; - let database = worker_ctx_find_database(&worker_ctx, &db_identity) - .await? - .ok_or_else(|| { - log::error!("Could not find database: {}", db_identity.to_hex()); - NO_SUCH_DATABASE - })?; - let owner_identity = database.owner_identity; - - let leader = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; - let module = leader.module().await.map_err(log_and_500)?; - // HTTP callers always need a connection ID to provide to connect/disconnect, // so generate one. let connection_id = generate_random_connection_id(); + let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?; + module .call_identity_connected(auth.into(), connection_id) .await @@ -134,6 +119,14 @@ pub async fn call( } } +fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> { + if content_type != headers::ContentType::json() { + Err(axum::extract::rejection::MissingJsonContentType::default().into()) + } else { + Ok(()) + } +} + fn reducer_outcome_response(owner_identity: &Identity, reducer: &str, outcome: ReducerOutcome) -> (StatusCode, String) { match outcome { ReducerOutcome::Committed => (StatusCode::OK, "".to_owned()), @@ -183,6 +176,37 @@ fn client_disconnected_error_to_response(err: ReducerCallError) -> ErrorResponse (StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(err))).into() } +async fn find_leader_and_database( + worker_ctx: &S, + name_or_identity: NameOrIdentity, +) -> axum::response::Result<(Host, Database)> { + let db_identity = name_or_identity.resolve(worker_ctx).await?; + let database = worker_ctx_find_database(worker_ctx, &db_identity) + .await? + .ok_or_else(|| { + log::error!("Could not find database: {}", db_identity.to_hex()); + NO_SUCH_DATABASE + })?; + + let leader = worker_ctx + .leader(database.id) + .await + .map_err(log_and_500)? + .ok_or(StatusCode::NOT_FOUND)?; + + Ok((leader, database)) +} + +async fn find_module_and_database( + worker_ctx: &S, + name_or_identity: NameOrIdentity, +) -> axum::response::Result<(ModuleHost, Database)> { + let (leader, database) = find_leader_and_database(worker_ctx, name_or_identity).await?; + let module = leader.module().await.map_err(log_and_500)?; + + Ok((module, database)) +} + #[derive(Debug, derive_more::From)] pub enum DBCallErr { HandlerError(ErrorResponse), @@ -206,27 +230,13 @@ async fn procedure( TypedHeader(content_type): TypedHeader, ByteStringBody(body): ByteStringBody, ) -> axum::response::Result { - if content_type != headers::ContentType::json() { - return Err(axum::extract::rejection::MissingJsonContentType::default().into()); - } + assert_content_type_json(content_type)?; + let caller_identity = auth.claims.identity; let args = FunctionArgs::Json(body); - let db_identity = name_or_identity.resolve(&worker_ctx).await?; - let database = worker_ctx_find_database(&worker_ctx, &db_identity) - .await? - .ok_or_else(|| { - log::error!("Could not find database: {}", db_identity.to_hex()); - NO_SUCH_DATABASE - })?; - - let leader = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; - let module = leader.module().await.map_err(log_and_500)?; + let (module, _) = find_module_and_database(&worker_ctx, name_or_identity).await?; // HTTP callers always need a connection ID to provide to connect/disconnect, // so generate one. @@ -315,17 +325,7 @@ pub async fn schema( where S: ControlStateDelegate + NodeDelegate, { - let db_identity = name_or_identity.resolve(&worker_ctx).await?; - let database = worker_ctx_find_database(&worker_ctx, &db_identity) - .await? - .ok_or(NO_SUCH_DATABASE)?; - - let leader = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; - let module = leader.module().await.map_err(log_and_500)?; + let (module, _) = find_module_and_database(&worker_ctx, name_or_identity).await?; let module_def = &module.info.module_def; let response_json = match version { @@ -513,20 +513,11 @@ where // Anyone is authorized to execute SQL queries. The SQL engine will determine // which queries this identity is allowed to execute against the database. - let db_identity = name_or_identity.resolve(&worker_ctx).await?; - let database = worker_ctx_find_database(&worker_ctx, &db_identity) - .await? - .ok_or(NO_SUCH_DATABASE)?; + let (host, database) = find_leader_and_database(&worker_ctx, name_or_identity).await?; let auth = AuthCtx::new(database.owner_identity, caller_identity); log::debug!("auth: {auth:?}"); - let host = worker_ctx - .leader(database.id) - .await - .map_err(log_and_500)? - .ok_or(StatusCode::NOT_FOUND)?; - host.exec_sql(auth, database, confirmed, sql).await } From 1ea5c038ef3f136c19501a2581b7d022fb951c3c Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:58:12 -0400 Subject: [PATCH 6/9] Deduplicate log messages for function call errors --- crates/core/src/host/module_host.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index ccf2c974cc5..141f62d5721 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1274,13 +1274,8 @@ impl ModuleHost { .await; let log_message = match &res { - Err(ReducerCallError::NoSuchReducer) => Some(format!( - "External attempt to call nonexistent reducer \"{reducer_name}\" failed. Have you run `spacetime generate` recently?" - )), - Err(ReducerCallError::Args(_)) => Some(format!( - "External attempt to call reducer \"{reducer_name}\" failed, invalid arguments.\n\ - This is likely due to a mismatched client schema, have you run `spacetime generate` recently?", - )), + Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)), + Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)), _ => None, }; if let Some(log_message) = log_message { @@ -1317,13 +1312,8 @@ impl ModuleHost { .await; let log_message = match &res { - Err(ProcedureCallError::NoSuchProcedure) => Some(format!( - "External attempt to call nonexistent procedure \"{procedure_name}\" failed. Have you run `spacetime generate` recently?" - )), - Err(ProcedureCallError::Args(_)) => Some(format!( - "External attempt to call procedure \"{procedure_name}\" failed, invalid arguments.\n\ - This is likely due to a mismatched client schema, have you run `spacetime generate` recently?" - )), + Err(ProcedureCallError::NoSuchProcedure) => Some(no_such_function_log_message("procedure", procedure_name)), + Err(ProcedureCallError::Args(_)) => Some(args_error_log_message("procedure", procedure_name)), _ => None, }; @@ -1628,3 +1618,14 @@ impl WeakModuleHost { }) } } + +fn no_such_function_log_message(function_kind: &str, function_name: &str) -> String { + format!("External attempt to call nonexistent {function_kind} \"{function_name}\" failed. Have you run `spacetime generate` recently?") +} + +fn args_error_log_message(function_kind: &str, function_name: &str) -> String { + format!( + "External attempt to call {function_kind} \"{function_name}\" failed, invalid arguments.\n\ + This is likely due to a mismatched client schema, have you run `spacetime generate` recently?" + ) +} From 16ce3a74280a083ff6270f49ed0f36b7e5b38a1f Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 10:59:46 -0400 Subject: [PATCH 7/9] Add TODO for shub per review --- crates/core/src/host/wasm_common/module_host_actor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 05f9654a480..6bf94421f2a 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -445,6 +445,7 @@ impl InstanceCommon { .. } = result; + // TODO(shub): deduplicate with reducer and view logic. if self.allocated_memory != memory_allocation { self.metric_wasm_memory_bytes.set(memory_allocation as i64); self.allocated_memory = memory_allocation; From 9f30761910dc7375b0dca93beaf986f04293204f Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 11:01:17 -0400 Subject: [PATCH 8/9] Remove doc link and example with unstable method --- crates/bindings/src/lib.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index c40412b6882..e1ff3ea4699 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -721,24 +721,9 @@ pub use spacetimedb_bindings_macro::reducer; /// Procedures are allowed to perform certain operations which take time. /// During the execution of these operations, the procedure's execution will be suspended, /// allowing other database operations to run in parallel. -/// The simplest (and least useful) of these operators is [`ProcedureContext::sleep_until`]. /// /// Procedures must not hold open a transaction while performing a blocking operation. -/// -/// ```no_run -/// # use std::time::Duration; -/// # use spacetimedb::{procedure, ProcedureContext}; -/// #[procedure] -/// fn sleep_one_second(ctx: &mut ProcedureContext) { -/// let prev_time = ctx.timestamp; -/// let target = prev_time + Duration::from_secs(1); -/// ctx.sleep_until(target); -/// let new_time = ctx.timestamp; -/// let actual_delta = new_time.duration_since(prev_time).unwrap(); -/// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}"); -/// } -/// ``` -// TODO(procedure-http): replace this example with an HTTP request. +// TODO(procedure-http): add example with an HTTP request. // TODO(procedure-transaction): document obtaining and using a transaction within a procedure. /// /// # Scheduled procedures From d2626d8af7293a1ce0d6fc9ec69a02689f13e541 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 29 Oct 2025 18:05:22 -0400 Subject: [PATCH 9/9] Add simple test for calling a procedure --- .../tests/standalone_integration_test.rs | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/crates/testing/tests/standalone_integration_test.rs b/crates/testing/tests/standalone_integration_test.rs index 2df76d7d628..7f19bd185ce 100644 --- a/crates/testing/tests/standalone_integration_test.rs +++ b/crates/testing/tests/standalone_integration_test.rs @@ -130,6 +130,50 @@ fn test_calling_a_reducer_with_private_table() { ); } +fn test_calling_a_procedure_in_module(module_name: &'static str) { + init(); + + CompiledModule::compile(module_name, CompilationMode::Debug).with_module_async( + DEFAULT_CONFIG, + |module| async move { + let json = r#" +{ + "CallProcedure": { + "procedure": "sleep_one_second", + "args": "[]", + "request_id": 0, + "flags": 0 + } +}"# + .to_string(); + module.send(json).await.unwrap(); + + // It sleeps one second, but we'll wait two just to be safe. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let logs = read_logs(&module).await; + let logs = logs + .into_iter() + // Filter out log lines from the `repeating_test` reducer, + // which runs frequently enough to appear in our logs after we've slept a second. + .filter(|line| !line.starts_with("Timestamp: Timestamp { __timestamp_micros_since_unix_epoch__: ")) + .collect::>(); + let [log_sleep] = &logs[..] else { + panic!("Expected a single log message but found {logs:#?}"); + }; + + assert!(log_sleep.starts_with("Slept from ")); + assert!(log_sleep.contains("a total of")); + }, + ) +} + +#[test] +#[serial] +fn test_calling_a_procedure() { + test_calling_a_procedure_in_module("module-test"); +} + /// Invoke the `module-test` module, /// use `caller` to invoke its `test` reducer, /// and assert that its logs look right.