diff --git a/backend/.sqlx/query-046147987d8bbc914365f4e525a80c322bbc80c261b4d958263dadc030da673a.json b/backend/.sqlx/query-046147987d8bbc914365f4e525a80c322bbc80c261b4d958263dadc030da673a.json new file mode 100644 index 0000000000000..54bbd90edc6dc --- /dev/null +++ b/backend/.sqlx/query-046147987d8bbc914365f4e525a80c322bbc80c261b4d958263dadc030da673a.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM mailbox \n WHERE message_id = ( SELECT message_id ║\n FROM mailbox \n WHERE type = $1 AND mailbox_id = $2 AND workspace_id = $3 \n LIMIT 1 \n ) \n RETURNING payload, created_at, message_id as id;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "046147987d8bbc914365f4e525a80c322bbc80c261b4d958263dadc030da673a" +} diff --git a/backend/.sqlx/query-1a8ddf380eda978b71035a2774b921e576a351384a5aef10e0d9a361c583af05.json b/backend/.sqlx/query-1a8ddf380eda978b71035a2774b921e576a351384a5aef10e0d9a361c583af05.json new file mode 100644 index 0000000000000..f52df8df80748 --- /dev/null +++ b/backend/.sqlx/query-1a8ddf380eda978b71035a2774b921e576a351384a5aef10e0d9a361c583af05.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM mailbox \n WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3\n RETURNING payload, created_at, message_id as id;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "1a8ddf380eda978b71035a2774b921e576a351384a5aef10e0d9a361c583af05" +} diff --git a/backend/.sqlx/query-645b5e65784dda5eff44d9421bac1c626be718b9d156207c134cda09fa89c23d.json b/backend/.sqlx/query-645b5e65784dda5eff44d9421bac1c626be718b9d156207c134cda09fa89c23d.json new file mode 100644 index 0000000000000..411a7d3e25be7 --- /dev/null +++ b/backend/.sqlx/query-645b5e65784dda5eff44d9421bac1c626be718b9d156207c134cda09fa89c23d.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM mailbox \n WHERE message_id = ANY($1)\n AND workspace_id = $2\n AND type = $3\n AND mailbox_id IS NOT DISTINCT FROM $4 \n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Text", + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text" + ] + }, + "nullable": [] + }, + "hash": "645b5e65784dda5eff44d9421bac1c626be718b9d156207c134cda09fa89c23d" +} diff --git a/backend/.sqlx/query-8e1ab979711175080198cab4ac813914ee55cf00f85991b4de28e25c9e9b7ca9.json b/backend/.sqlx/query-8e1ab979711175080198cab4ac813914ee55cf00f85991b4de28e25c9e9b7ca9.json new file mode 100644 index 0000000000000..14709dffe466a --- /dev/null +++ b/backend/.sqlx/query-8e1ab979711175080198cab4ac813914ee55cf00f85991b4de28e25c9e9b7ca9.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT payload, created_at, message_id as id\n FROM mailbox \n WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "8e1ab979711175080198cab4ac813914ee55cf00f85991b4de28e25c9e9b7ca9" +} diff --git a/backend/.sqlx/query-93a2722f125830ef0b5cee7698983737a987f2003a1ecb2a9b647eb652502c76.json b/backend/.sqlx/query-93a2722f125830ef0b5cee7698983737a987f2003a1ecb2a9b647eb652502c76.json new file mode 100644 index 0000000000000..10ad147b2be91 --- /dev/null +++ b/backend/.sqlx/query-93a2722f125830ef0b5cee7698983737a987f2003a1ecb2a9b647eb652502c76.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO mailbox(mailbox_id, type, payload, workspace_id) VALUES ($1, $2, $3, $4)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Jsonb", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "93a2722f125830ef0b5cee7698983737a987f2003a1ecb2a9b647eb652502c76" +} diff --git a/backend/.sqlx/query-a95aed7462a5915c8204abf422e9f2cd9c347c7984265e4d9c60069d7f46ade0.json b/backend/.sqlx/query-a95aed7462a5915c8204abf422e9f2cd9c347c7984265e4d9c60069d7f46ade0.json new file mode 100644 index 0000000000000..6449f9e1683ed --- /dev/null +++ b/backend/.sqlx/query-a95aed7462a5915c8204abf422e9f2cd9c347c7984265e4d9c60069d7f46ade0.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT payload, created_at, message_id as id\n FROM mailbox \n WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3 \n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "a95aed7462a5915c8204abf422e9f2cd9c347c7984265e4d9c60069d7f46ade0" +} diff --git a/backend/.sqlx/query-fe7157fc9cfcc4af5329b44b99c79dc27896bc9c5a208f9eb4634fdfc3171e3a.json b/backend/.sqlx/query-fe7157fc9cfcc4af5329b44b99c79dc27896bc9c5a208f9eb4634fdfc3171e3a.json new file mode 100644 index 0000000000000..fd922489a5c08 --- /dev/null +++ b/backend/.sqlx/query-fe7157fc9cfcc4af5329b44b99c79dc27896bc9c5a208f9eb4634fdfc3171e3a.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM mailbox \n WHERE message_id = $1\n AND workspace_id = $2\n AND type = $3\n AND mailbox_id IS NOT DISTINCT FROM $4 \n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + { + "Custom": { + "name": "mailbox_type", + "kind": { + "Enum": [ + "trigger", + "debouncing_stale_data" + ] + } + } + }, + "Text" + ] + }, + "nullable": [] + }, + "hash": "fe7157fc9cfcc4af5329b44b99c79dc27896bc9c5a208f9eb4634fdfc3171e3a" +} diff --git a/backend/migrations/20251028105101_mailbox.down.sql b/backend/migrations/20251028105101_mailbox.down.sql new file mode 100644 index 0000000000000..84fb1b5c6a53d --- /dev/null +++ b/backend/migrations/20251028105101_mailbox.down.sql @@ -0,0 +1,5 @@ +-- Add down migration script here +DROP TABLE IF EXISTS mailbox; +DROP TYPE IF EXISTS mailbox_type; +DROP SEQUENCE IF EXISTS mailbox_id_seq; +DROP INDEX IF EXISTS idx_mailbox_type_mailbox_id_message_id; diff --git a/backend/migrations/20251028105101_mailbox.up.sql b/backend/migrations/20251028105101_mailbox.up.sql new file mode 100644 index 0000000000000..a20bb72284fb0 --- /dev/null +++ b/backend/migrations/20251028105101_mailbox.up.sql @@ -0,0 +1,19 @@ +-- Add up migration script here +CREATE SEQUENCE IF NOT EXISTS mailbox_id_seq; + +CREATE TYPE mailbox_type AS ENUM ( + 'trigger', + 'debouncing_stale_data' +); + +CREATE TABLE mailbox( + message_id BIGINT DEFAULT nextval('mailbox_id_seq') PRIMARY KEY, -- Also indicates position in stack + mailbox_id TEXT, -- Can be NULL + workspace_id character varying(50) NOT NULL, + type mailbox_type NOT NULL, -- Type of mailbox + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + payload JSONB NOT NULL -- Payload of specific message +); + +CREATE INDEX idx_mailbox_type_mailbox_id_message_id +ON mailbox(type, mailbox_id, message_id ASC); diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index 53675c6ef4e92..64ffa66776171 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -59,6 +59,7 @@ pub mod git_sync_ee; pub mod git_sync_oss; pub mod jobs; pub mod jwt; +pub mod mailbox; pub mod mcp_client; pub mod more_serde; pub mod oauth2; diff --git a/backend/windmill-common/src/mailbox.rs b/backend/windmill-common/src/mailbox.rs new file mode 100644 index 0000000000000..fabccd204be9a --- /dev/null +++ b/backend/windmill-common/src/mailbox.rs @@ -0,0 +1,302 @@ +use sqlx::Postgres; + +use crate::error; + +#[derive(Clone)] +pub struct Mailbox { + mailbox_id: Option, + mailbox_type: MailboxType, + workspace_id: String, +} + +pub type MsgPayload = serde_json::Value; + +#[derive(sqlx::FromRow, Debug, Clone)] +pub struct MailboxMsg { + pub id: i64, + pub payload: MsgPayload, + pub created_at: chrono::DateTime, +} + +#[derive(sqlx::Type, Clone, Copy)] +#[sqlx(rename_all = "snake_case", type_name = "mailbox_type")] +pub enum MailboxType { + Trigger, + DebouncingStaleData, +} + +impl Mailbox { + pub fn open(mailbox_id: Option<&str>, mailbox_type: MailboxType, workspace_id: &str) -> Self { + Self { + mailbox_id: mailbox_id.map(str::to_owned), + mailbox_type, + workspace_id: workspace_id.to_owned(), + } + } + + pub async fn push<'c>( + &self, + payload: MsgPayload, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result<()> { + sqlx::query!( + r#"INSERT INTO mailbox(mailbox_id, type, payload, workspace_id) VALUES ($1, $2, $3, $4)"#, + self.mailbox_id.as_ref(), + self.mailbox_type as MailboxType, + payload, + self.workspace_id + ) + .execute(e) + .await?; + + Ok(()) + } + + pub async fn pull<'c>( + &self, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result> { + sqlx::query_as!( + MailboxMsg, + r#" + DELETE FROM mailbox + WHERE message_id = ( SELECT message_id ║ + FROM mailbox + WHERE type = $1 AND mailbox_id = $2 AND workspace_id = $3 + LIMIT 1 + ) + RETURNING payload, created_at, message_id as id; + "#, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + &self.workspace_id, + ) + .fetch_optional(e) + .await + .map_err(error::Error::from) + } + + pub async fn pull_all<'c>( + &self, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result> { + sqlx::query_as!( + MailboxMsg, + r#" + DELETE FROM mailbox + WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3 + RETURNING payload, created_at, message_id as id; + "#, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + &self.workspace_id, + ) + .fetch_all(e) + .await + .map_err(error::Error::from) + } + + pub async fn delete<'c>( + &self, + message_id: i64, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result<()> { + sqlx::query!( + r#" + DELETE FROM mailbox + WHERE message_id = $1 + AND workspace_id = $2 + AND type = $3 + AND mailbox_id IS NOT DISTINCT FROM $4 + "#, + message_id, + &self.workspace_id, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + ) + .fetch_all(e) + .await?; + Ok(()) + } + + pub async fn delete_batch<'c>( + &self, + message_ids: Vec, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result<()> { + sqlx::query!( + r#" + DELETE FROM mailbox + WHERE message_id = ANY($1) + AND workspace_id = $2 + AND type = $3 + AND mailbox_id IS NOT DISTINCT FROM $4 + "#, + &message_ids, + &self.workspace_id, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + ) + .fetch_all(e) + .await?; + Ok(()) + } + + pub async fn read<'c>( + &self, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result> { + sqlx::query_as!( + MailboxMsg, + r#" + SELECT payload, created_at, message_id as id + FROM mailbox + WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3 + "#, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + &self.workspace_id, + ) + .fetch_optional(e) + .await + .map_err(error::Error::from) + } + + pub async fn read_all<'c>( + &self, + e: impl sqlx::Executor<'c, Database = Postgres>, + ) -> error::Result> { + sqlx::query_as!( + MailboxMsg, + r#" + SELECT payload, created_at, message_id as id + FROM mailbox + WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3 + "#, + self.mailbox_type as MailboxType, + self.mailbox_id.as_ref(), + &self.workspace_id + ) + .fetch_all(e) + .await + .map_err(error::Error::from) + } +} + +#[cfg(test)] +mod mailbox_tests { + use serde_json::json; + + use crate::mailbox::Mailbox; + + #[sqlx::test(fixtures("../../migrations/20251028105101_mailbox.up.sql"))] + async fn test_mailbox(db: sqlx::Pool) -> anyhow::Result<()> { + let db = &db; + let push = async |mbox: Mailbox| { + mbox.push(json!(1), db).await.unwrap(); + mbox.push(json!(2), db).await.unwrap(); + mbox.push(json!(3), db).await.unwrap(); + }; + + let assert_read = async |mbox: Mailbox| { + assert_eq!(mbox.read(db).await.unwrap().unwrap().payload, json!(1)); + }; + + let assert_read_all = async |mbox: Mailbox| { + let all = mbox.read_all(db).await.unwrap(); + assert_eq!(all.len(), 3); + assert_eq!(all[0].payload, json!(1)); + assert_eq!(all[1].payload, json!(2)); + assert_eq!(all[2].payload, json!(3)); + }; + + let assert_pull = async |mbox: Mailbox| { + assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(1)); + assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(2)); + assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(3)); + assert!(mbox.pull(db).await.unwrap().is_none()); + }; + + let assert_pull_all = async |mbox: Mailbox| { + let all = mbox.pull_all(db).await.unwrap(); + assert_eq!(all.len(), 3); + assert_eq!(all[0].payload, json!(1)); + assert_eq!(all[1].payload, json!(2)); + assert_eq!(all[2].payload, json!(3)); + }; + + // Run those in parallel to make sure they are not conflicting + tokio::join!( + // Main body + // All others will be small deviations from this one + async { + let mbox = Mailbox::open( + Some("mymailbox"), + crate::mailbox::MailboxType::Trigger, + "test-workspace_id", + ); + push(mbox.clone()).await; + assert_read(mbox.clone()).await; + assert_read_all(mbox.clone()).await; + assert_pull(mbox.clone()).await; + }, + // Same as above, but different workspace_id + async { + let mbox = Mailbox::open( + Some("mymailbox"), + crate::mailbox::MailboxType::Trigger, + "another-workspace_id", + ); + push(mbox.clone()).await; + assert_read(mbox.clone()).await; + assert_read_all(mbox.clone()).await; + assert_pull(mbox.clone()).await; + }, + // Different id + async { + let mbox = Mailbox::open( + Some("another id"), + crate::mailbox::MailboxType::Trigger, + "test-workspace_id", + ); + push(mbox.clone()).await; + assert_read(mbox.clone()).await; + assert_read_all(mbox.clone()).await; + assert_pull(mbox.clone()).await; + }, + // Different kind + async { + let mbox = Mailbox::open( + Some("mymailbox"), + crate::mailbox::MailboxType::DebouncingStaleData, + "test-workspace_id", + ); + push(mbox.clone()).await; + assert_read(mbox.clone()).await; + assert_read_all(mbox.clone()).await; + assert_pull(mbox.clone()).await; + }, + // Global mailboix + async { + let mbox = Mailbox::open( + None, + crate::mailbox::MailboxType::Trigger, + "test-workspace_id", + ); + push(mbox.clone()).await; + dbg!( + sqlx::query!("SELECT mailbox_id, payload, workspace_id FROM mailbox") + .fetch_all(db) + .await + .unwrap() + ); + assert_read(mbox.clone()).await; + assert_read_all(mbox.clone()).await; + // Also test pull_all + assert_pull_all(mbox.clone()).await; + }, + ); + + Ok(()) + } +}