Skip to content

Commit 3f1de9e

Browse files
gefjonCentril
andauthored
WASM host execution for procedures (#3498)
# Description of Changes This commit builds support for executing procedures in WASM modules. This includes an HTTP endpoint, `/v1/database/:name_or_address/procedure/:name POST`, as well as an extension to the WS protocol. These new APIs are not wired up to the CLI or SDKs, but I have manually tested the HTTP endpoint via `curl`. The new WS extensions are completely untested. Several TODOs are scattered throughout the new code, most notably for sensibly tracking procedure execution time in the metrics. I also expect that we will want to remove the `procedure_sleep_until` syscall and the `ProcedureContext::sleep_until` method prior to release. # API and ABI breaking changes Adds new APIs and ABIs. # Expected complexity level and risk 3? 4? Unlikely to break existing stuff, 'cause it's mostly additive, but adds plenty of potentially-fragile new stuff. Notably is the first time we're doing anything actually `async`hronous on a database core Tokio worker, and we don't yet have strong evidence of how that will affect reducer execution. # Testing - [x] Manually published `modules/module-test` and executed procedures with the following `curl` invocations: - `curl -X POST -H "Content-Type:application/json" -d '[]' http://localhost:3000/v1/database/module-test/procedure/sleep_one_second` - `curl -X POST -H "Content-Type:application/json" -d '[1223]' http://localhost:3000/v1/database/module-test/procedure/return_value` - [ ] Need to write automated tests. --------- Co-authored-by: Mazdak Farrokhzad <[email protected]>
1 parent 647be7e commit 3f1de9e

File tree

23 files changed

+1134
-178
lines changed

23 files changed

+1134
-178
lines changed

crates/bindings-sys/src/lib.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,26 @@ pub mod raw {
645645
pub fn get_jwt(connection_id_ptr: *const u8, bytes_source_id: *mut BytesSource) -> u16;
646646
}
647647

648+
#[link(wasm_import_module = "spacetime_10.3")]
649+
extern "C" {
650+
/// Suspends execution of this WASM instance until approximately `wake_at_micros_since_unix_epoch`.
651+
///
652+
/// Returns immediately if `wake_at_micros_since_unix_epoch` is in the past.
653+
///
654+
/// Upon resuming, returns the current timestamp as microseconds since the Unix epoch.
655+
///
656+
/// Not particularly useful, except for testing SpacetimeDB internals related to suspending procedure execution.
657+
/// # Traps
658+
///
659+
/// Traps if:
660+
///
661+
/// - The calling WASM instance is holding open a transaction.
662+
/// - The calling WASM instance is not executing a procedure.
663+
// TODO(procedure-sleep-until): remove this
664+
#[cfg(feature = "unstable")]
665+
pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64;
666+
}
667+
648668
/// What strategy does the database index use?
649669
///
650670
/// See also: <https://www.postgresql.org/docs/current/sql-createindex.html>
@@ -1222,7 +1242,10 @@ impl Drop for RowIter {
12221242
pub mod procedure {
12231243
//! Side-effecting or asynchronous operations which only procedures are allowed to perform.
12241244
#[inline]
1225-
pub fn sleep_until(_wake_at_timestamp: i64) -> i64 {
1226-
todo!("Add `procedure_sleep_until` host function")
1245+
#[cfg(feature = "unstable")]
1246+
pub fn sleep_until(wake_at_timestamp: i64) -> i64 {
1247+
// Safety: Just calling an `extern "C"` function.
1248+
// Nothing weird happening here.
1249+
unsafe { super::raw::procedure_sleep_until(wake_at_timestamp) }
12271250
}
12281251
}

crates/bindings/src/lib.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -720,24 +720,9 @@ pub use spacetimedb_bindings_macro::reducer;
720720
/// Procedures are allowed to perform certain operations which take time.
721721
/// During the execution of these operations, the procedure's execution will be suspended,
722722
/// allowing other database operations to run in parallel.
723-
/// The simplest (and least useful) of these operators is [`ProcedureContext::sleep_until`].
724723
///
725724
/// Procedures must not hold open a transaction while performing a blocking operation.
726-
///
727-
/// ```no_run
728-
/// # use std::time::Duration;
729-
/// # use spacetimedb::{procedure, ProcedureContext};
730-
/// #[procedure]
731-
/// fn sleep_one_second(ctx: &mut ProcedureContext) {
732-
/// let prev_time = ctx.timestamp;
733-
/// let target = prev_time + Duration::from_secs(1);
734-
/// ctx.sleep_until(target);
735-
/// let new_time = ctx.timestamp;
736-
/// let actual_delta = new_time.duration_since(prev_time).unwrap();
737-
/// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}");
738-
/// }
739-
/// ```
740-
// TODO(procedure-http): replace this example with an HTTP request.
725+
// TODO(procedure-http): add example with an HTTP request.
741726
// TODO(procedure-transaction): document obtaining and using a transaction within a procedure.
742727
///
743728
/// # Scheduled procedures
@@ -1088,7 +1073,8 @@ impl ProcedureContext {
10881073
/// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}");
10891074
/// # }
10901075
/// ```
1091-
// TODO(procedure-async): mark this method `async`.
1076+
// TODO(procedure-sleep-until): remove this method
1077+
#[cfg(feature = "unstable")]
10921078
pub fn sleep_until(&mut self, timestamp: Timestamp) {
10931079
let new_time = sys::procedure::sleep_until(timestamp.to_micros_since_unix_epoch());
10941080
let new_time = Timestamp::from_micros_since_unix_epoch(new_time);

crates/client-api-messages/src/websocket.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ pub enum ClientMessage<Args> {
105105
/// Remove a subscription to a SQL query that was added with SubscribeSingle.
106106
Unsubscribe(Unsubscribe),
107107
UnsubscribeMulti(UnsubscribeMulti),
108+
/// Request a procedure run.
109+
CallProcedure(CallProcedure<Args>),
108110
}
109111

110112
impl<Args> ClientMessage<Args> {
@@ -127,6 +129,17 @@ impl<Args> ClientMessage<Args> {
127129
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
128130
ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
129131
ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
132+
ClientMessage::CallProcedure(CallProcedure {
133+
procedure,
134+
args,
135+
request_id,
136+
flags,
137+
}) => ClientMessage::CallProcedure(CallProcedure {
138+
procedure,
139+
args: f(args),
140+
request_id,
141+
flags,
142+
}),
130143
}
131144
}
132145
}
@@ -292,6 +305,40 @@ pub struct OneOffQuery {
292305
pub query_string: Box<str>,
293306
}
294307

308+
#[derive(SpacetimeType)]
309+
#[sats(crate = spacetimedb_lib)]
310+
/// Request a procedure run.
311+
///
312+
/// Parametric over the argument type to enable [`ClientMessage::map_args`].
313+
pub struct CallProcedure<Args> {
314+
/// The name of the procedure to call.
315+
pub procedure: Box<str>,
316+
/// The arguments to the procedure.
317+
///
318+
/// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
319+
/// and the enclosing message format.
320+
pub args: Args,
321+
/// An identifier for a client request.
322+
///
323+
/// The server will include the same ID in the response [`ProcedureResult`].
324+
pub request_id: u32,
325+
/// Reserved space for future extensions.
326+
pub flags: CallProcedureFlags,
327+
}
328+
329+
#[derive(Clone, Copy, Default, PartialEq, Eq)]
330+
pub enum CallProcedureFlags {
331+
#[default]
332+
Default,
333+
}
334+
335+
impl_st!([] CallProcedureFlags, AlgebraicType::U8);
336+
impl_serialize!([] CallProcedureFlags, (self, ser) => ser.serialize_u8(*self as u8));
337+
impl_deserialize!([] CallProcedureFlags, de => match de.deserialize_u8()? {
338+
0 => Ok(Self::Default),
339+
x => Err(D::Error::custom(format_args!("invalid call procedure flag {x}"))),
340+
});
341+
295342
/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
296343
pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;
297344

@@ -326,6 +373,8 @@ pub enum ServerMessage<F: WebsocketFormat> {
326373
SubscribeMultiApplied(SubscribeMultiApplied<F>),
327374
/// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
328375
UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
376+
/// Sent in response to a [`CallProcedure`] message. This contains the return value.
377+
ProcedureResult(ProcedureResult<F>),
329378
}
330379

331380
/// The matching rows of a subscription query.
@@ -705,6 +754,48 @@ pub struct OneOffTable<F: WebsocketFormat> {
705754
pub rows: F::List,
706755
}
707756

757+
/// The result of running a procedure,
758+
/// including the return value of the procedure on success.
759+
///
760+
/// Sent in response to a [`CallProcedure`] message.
761+
#[derive(SpacetimeType, Debug)]
762+
#[sats(crate = spacetimedb_lib)]
763+
pub struct ProcedureResult<F: WebsocketFormat> {
764+
/// The status of the procedure run.
765+
///
766+
/// Contains the return value if successful, or the error message if not.
767+
pub status: ProcedureStatus<F>,
768+
/// The time when the reducer started.
769+
///
770+
/// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
771+
pub timestamp: Timestamp,
772+
/// The time the procedure took to run.
773+
pub total_host_execution_duration: TimeDuration,
774+
/// The same same client-provided identifier as in the original [`ProcedureCall`] request.
775+
///
776+
/// Clients use this to correlate the response with the original request.
777+
pub request_id: u32,
778+
}
779+
780+
/// The status of a procedure call,
781+
/// including the return value on success.
782+
#[derive(SpacetimeType, Debug)]
783+
#[sats(crate = spacetimedb_lib)]
784+
pub enum ProcedureStatus<F: WebsocketFormat> {
785+
/// The procedure ran and returned the enclosed value.
786+
///
787+
/// All user error handling happens within here;
788+
/// the returned value may be a `Result` or `Option`,
789+
/// or any other type to which the user may ascribe arbitrary meaning.
790+
Returned(F::Single),
791+
/// The reducer was interrupted due to insufficient energy/funds.
792+
///
793+
/// The procedure may have performed some observable side effects before being interrupted.
794+
OutOfEnergy,
795+
/// The call failed in the host, e.g. due to a type error or unknown procedure name.
796+
InternalError(String),
797+
}
798+
708799
/// Used whenever different formats need to coexist.
709800
#[derive(Debug, Clone)]
710801
pub enum FormatSwitch<B, J> {

0 commit comments

Comments
 (0)