Skip to content
27 changes: 25 additions & 2 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,26 @@ pub mod raw {
pub fn get_jwt(connection_id_ptr: *const u8, bytes_source_id: *mut BytesSource) -> u16;
}

#[link(wasm_import_module = "spacetime_10.3")]
extern "C" {
/// Suspends execution of this WASM instance until approximately `wake_at_micros_since_unix_epoch`.
///
/// Returns immediately if `wake_at_micros_since_unix_epoch` is in the past.
///
/// Upon resuming, returns the current timestamp as microseconds since the Unix epoch.
///
/// Not particularly useful, except for testing SpacetimeDB internals related to suspending procedure execution.
/// # Traps
///
/// Traps if:
///
/// - The calling WASM instance is holding open a transaction.
/// - The calling WASM instance is not executing a procedure.
// TODO(procedure-sleep-until): remove this
#[cfg(feature = "unstable")]
pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64;
}

/// What strategy does the database index use?
///
/// See also: <https://www.postgresql.org/docs/current/sql-createindex.html>
Expand Down Expand Up @@ -1222,7 +1242,10 @@ impl Drop for RowIter {
pub mod procedure {
//! Side-effecting or asynchronous operations which only procedures are allowed to perform.
#[inline]
pub fn sleep_until(_wake_at_timestamp: i64) -> i64 {
todo!("Add `procedure_sleep_until` host function")
#[cfg(feature = "unstable")]
pub fn sleep_until(wake_at_timestamp: i64) -> i64 {
// Safety: Just calling an `extern "C"` function.
// Nothing weird happening here.
unsafe { super::raw::procedure_sleep_until(wake_at_timestamp) }
}
}
20 changes: 3 additions & 17 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,24 +721,9 @@ pub use spacetimedb_bindings_macro::reducer;
/// Procedures are allowed to perform certain operations which take time.
/// During the execution of these operations, the procedure's execution will be suspended,
/// allowing other database operations to run in parallel.
/// The simplest (and least useful) of these operators is [`ProcedureContext::sleep_until`].
///
/// Procedures must not hold open a transaction while performing a blocking operation.
///
/// ```no_run
/// # use std::time::Duration;
/// # use spacetimedb::{procedure, ProcedureContext};
/// #[procedure]
/// fn sleep_one_second(ctx: &mut ProcedureContext) {
/// let prev_time = ctx.timestamp;
/// let target = prev_time + Duration::from_secs(1);
/// ctx.sleep_until(target);
/// let new_time = ctx.timestamp;
/// let actual_delta = new_time.duration_since(prev_time).unwrap();
/// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}");
/// }
/// ```
// TODO(procedure-http): replace this example with an HTTP request.
// TODO(procedure-http): add example with an HTTP request.
// TODO(procedure-transaction): document obtaining and using a transaction within a procedure.
///
/// # Scheduled procedures
Expand Down Expand Up @@ -1089,7 +1074,8 @@ impl ProcedureContext {
/// log::info!("Slept from {prev_time} to {new_time}, a total of {actual_delta:?}");
/// # }
/// ```
// TODO(procedure-async): mark this method `async`.
// TODO(procedure-sleep-until): remove this method
#[cfg(feature = "unstable")]
pub fn sleep_until(&mut self, timestamp: Timestamp) {
let new_time = sys::procedure::sleep_until(timestamp.to_micros_since_unix_epoch());
let new_time = Timestamp::from_micros_since_unix_epoch(new_time);
Expand Down
91 changes: 91 additions & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub enum ClientMessage<Args> {
/// Remove a subscription to a SQL query that was added with SubscribeSingle.
Unsubscribe(Unsubscribe),
UnsubscribeMulti(UnsubscribeMulti),
/// Request a procedure run.
CallProcedure(CallProcedure<Args>),
}

impl<Args> ClientMessage<Args> {
Expand All @@ -127,6 +129,17 @@ impl<Args> ClientMessage<Args> {
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
ClientMessage::CallProcedure(CallProcedure {
procedure,
args,
request_id,
flags,
}) => ClientMessage::CallProcedure(CallProcedure {
procedure,
args: f(args),
request_id,
flags,
}),
}
}
}
Expand Down Expand Up @@ -292,6 +305,40 @@ pub struct OneOffQuery {
pub query_string: Box<str>,
}

#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
/// Request a procedure run.
///
/// Parametric over the argument type to enable [`ClientMessage::map_args`].
pub struct CallProcedure<Args> {
/// The name of the procedure to call.
pub procedure: Box<str>,
/// The arguments to the procedure.
///
/// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
/// and the enclosing message format.
pub args: Args,
/// An identifier for a client request.
///
/// The server will include the same ID in the response [`ProcedureResult`].
pub request_id: u32,
/// Reserved space for future extensions.
pub flags: CallProcedureFlags,
}

#[derive(Clone, Copy, Default, PartialEq, Eq)]
pub enum CallProcedureFlags {
#[default]
Default,
}

impl_st!([] CallProcedureFlags, AlgebraicType::U8);
impl_serialize!([] CallProcedureFlags, (self, ser) => ser.serialize_u8(*self as u8));
impl_deserialize!([] CallProcedureFlags, de => match de.deserialize_u8()? {
0 => Ok(Self::Default),
x => Err(D::Error::custom(format_args!("invalid call procedure flag {x}"))),
});

/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;

Expand Down Expand Up @@ -326,6 +373,8 @@ pub enum ServerMessage<F: WebsocketFormat> {
SubscribeMultiApplied(SubscribeMultiApplied<F>),
/// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
/// Sent in response to a [`CallProcedure`] message. This contains the return value.
ProcedureResult(ProcedureResult<F>),
}

/// The matching rows of a subscription query.
Expand Down Expand Up @@ -705,6 +754,48 @@ pub struct OneOffTable<F: WebsocketFormat> {
pub rows: F::List,
}

/// The result of running a procedure,
/// including the return value of the procedure on success.
///
/// Sent in response to a [`CallProcedure`] message.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub struct ProcedureResult<F: WebsocketFormat> {
/// The status of the procedure run.
///
/// Contains the return value if successful, or the error message if not.
pub status: ProcedureStatus<F>,
/// The time when the reducer started.
///
/// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
pub timestamp: Timestamp,
/// The time the procedure took to run.
pub total_host_execution_duration: TimeDuration,
/// The same same client-provided identifier as in the original [`ProcedureCall`] request.
///
/// Clients use this to correlate the response with the original request.
pub request_id: u32,
}

/// The status of a procedure call,
/// including the return value on success.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub enum ProcedureStatus<F: WebsocketFormat> {
/// The procedure ran and returned the enclosed value.
///
/// All user error handling happens within here;
/// the returned value may be a `Result` or `Option`,
/// or any other type to which the user may ascribe arbitrary meaning.
Returned(F::Single),
/// The reducer was interrupted due to insufficient energy/funds.
///
/// The procedure may have performed some observable side effects before being interrupted.
OutOfEnergy,
/// The call failed in the host, e.g. due to a type error or unknown procedure name.
InternalError(String),
}

/// Used whenever different formats need to coexist.
#[derive(Debug, Clone)]
pub enum FormatSwitch<B, J> {
Expand Down
Loading
Loading