From 95961196872ad2f56319f36eeacf4d06142939d5 Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Sun, 27 Jul 2025 18:00:43 -0700 Subject: [PATCH 1/6] Add axum activity feed example --- Cargo.lock | 96 +++++++++++++++++ Cargo.toml | 1 + examples/activity-feed.html | 101 ++++++++++++++++++ examples/axum-activity-feed.rs | 187 +++++++++++++++++++++++++++++++++ 4 files changed, 385 insertions(+) create mode 100644 examples/activity-feed.html create mode 100644 examples/axum-activity-feed.rs diff --git a/Cargo.lock b/Cargo.lock index ac7262a..1f9a137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -206,6 +221,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-link", +] + [[package]] name = "cookie" version = "0.18.1" @@ -239,6 +266,7 @@ version = "0.3.0" dependencies = [ "async-stream", "axum", + "chrono", "indexmap", "reqwest", "rocket", @@ -731,6 +759,30 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.0.0" @@ -1063,6 +1115,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.17.0" @@ -2351,6 +2412,41 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index 760e7ae..06db406 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ axum = { version = "0.8.4", default-features = false, optional = true, features "tokio", "json", ] } +chrono = { version = "0.4.41", default-features = false, features = ["clock"] } rocket = { version = "0.5.1", default-features = false, optional = true } serde = { version = "1", default-features = false, optional = true, features = [ "derive", diff --git a/examples/activity-feed.html b/examples/activity-feed.html new file mode 100644 index 0000000..be9cea0 --- /dev/null +++ b/examples/activity-feed.html @@ -0,0 +1,101 @@ + + + + Datastar SDK Activity Feed Demo + + + + +
+
+

+ Datastar SDK Activity Feed Demo +

+ Rocket +
+

+ SSE events will be streamed from the backend to the frontend. +

+
+ + +
+
+ + +
+ +  |  + + + + +
+
+
+ Click Generate to create + 200 events, + 10 milliseconds apart. +
+
+ + diff --git a/examples/axum-activity-feed.rs b/examples/axum-activity-feed.rs new file mode 100644 index 0000000..d3435d2 --- /dev/null +++ b/examples/axum-activity-feed.rs @@ -0,0 +1,187 @@ +use { + async_stream::stream, + axum::{ + Router, + response::{Html, IntoResponse, Sse}, + routing::{get, post}, + }, + chrono, + core::{convert::Infallible, error::Error, time::Duration}, + datastar::{ + axum::ReadSignals, + prelude::{ElementPatchMode, PatchElements, PatchSignals}, + }, + serde::{Deserialize, Serialize}, + std::sync::Arc, + tokio::sync::RwLock, + tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}, +}; + +pub struct State { + pub feed: Vec, + pub count: Count, +} + +pub struct Count { + pub all: u32, + pub done: u32, + pub warn: u32, + pub fail: u32, + pub info: u32, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + format!("{}=debug,tower_http=debug", env!("CARGO_CRATE_NAME")).into() + }), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let state = Arc::new(RwLock::new(State { + feed: Vec::new(), + count: Count { + all: 0, + done: 0, + warn: 0, + fail: 0, + info: 0, + }, + })); + + let state_generate = state.clone(); + let state_info = state.clone(); + let state_done = state.clone(); + let state_warn = state.clone(); + let state_fail = state.clone(); + + let event = { move |level, state| async move { event(level, state).await } }; + let generate = { move |signals| async move { generate(signals, state_generate).await } }; + + let app = Router::new() + .route("/", get(index)) + .route("/event/generate", post(generate)) + .route("/event/info", post(move || event(Status::Info, state_info))) + .route("/event/done", post(move || event(Status::Done, state_done))) + .route("/event/warn", post(move || event(Status::Warn, state_warn))) + .route("/event/fail", post(move || event(Status::Fail, state_fail))); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") + .await + .unwrap(); + + tracing::debug!("listening on {}", listener.local_addr().unwrap()); + + axum::serve(listener, app).await.unwrap(); + + Ok(()) +} + +async fn index() -> Html<&'static str> { + Html(include_str!("activity-feed.html")) +} + +#[derive(Serialize, Deserialize)] +pub struct Signals { + pub interval: u64, + pub events: u64, + pub generating: bool, +} + +#[derive(Clone, PartialEq, Eq, Deserialize)] +pub enum Status { + Info, + Done, + Warn, + Fail, +} + +async fn generate( + ReadSignals(signals): ReadSignals, + state: Arc>, +) -> impl IntoResponse { + Sse::new(stream! { + let elements = r#"
"#.to_string(); + let patch = PatchElements::new(elements); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + + let signals_generating = serde_json::to_string(&Signals{ + generating: true, + interval: signals.interval, + events: signals.events, + }).unwrap(); + let patch = PatchSignals::new(signals_generating); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + + for _ in 1..=signals.events { + let mut state = state.write().await; + let elements = event_entry(&mut state, &Status::Done, "Auto"); + let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + tokio::time::sleep(Duration::from_millis(signals.interval)).await; + } + + let signals_done = serde_json::to_string(&Signals{ + generating: false, + interval: signals.interval, + events: signals.events, + }).unwrap(); + let patch = PatchSignals::new(signals_done); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + }) +} + +async fn event(status: Status, state: Arc>) -> impl IntoResponse { + Sse::new(stream! { + let mut state = state.write().await; + let elements = event_entry(&mut state, &status, "Manual"); + let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); + let sse_event = patch.write_as_axum_sse_event(); + + yield Ok::<_, Infallible>(sse_event); + }) +} + +fn event_entry(state: &mut State, status: &Status, prefix: &str) -> String { + state.count.all += 1; + let timestamp = chrono::Utc::now() + .format("%Y-%m-%d %H:%M:%S%.3f") + .to_string(); + match status { + Status::Done => { + state.count.done += 1; + format!( + "
{} [ ✅ Done ] {} event {}
", + state.count.all, timestamp, prefix, state.count.all + ) + } + Status::Warn => { + state.count.warn += 1; + format!( + "
{} [ ⚠️ Warn ] {} event {}
", + state.count.all, timestamp, prefix, state.count.all + ) + } + Status::Fail => { + state.count.fail += 1; + format!( + "
{} [ ❌ Fail ] {} event {}
", + state.count.all, timestamp, prefix, state.count.all + ) + } + Status::Info => { + state.count.info += 1; + format!( + "
{} [ ℹ️ Info ] {} event {}
", + state.count.all, timestamp, prefix, state.count.all + ) + } + } +} From 06cda3768ca0ab646287da05e2d6bc81fe009864 Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Sun, 27 Jul 2025 21:02:07 -0700 Subject: [PATCH 2/6] Remove state, use signals --- examples/activity-feed.html | 28 ++++++- examples/axum-activity-feed.rs | 145 ++++++++++++++++----------------- 2 files changed, 97 insertions(+), 76 deletions(-) diff --git a/examples/activity-feed.html b/examples/activity-feed.html index be9cea0..b71dd55 100644 --- a/examples/activity-feed.html +++ b/examples/activity-feed.html @@ -13,6 +13,11 @@ data-signals-interval="10" data-signals-events="200" data-signals-generating="false" + data-signals-total="0" + data-signals-done="0" + data-signals-warn="0" + data-signals-fail="0" + data-signals-info="0" class="bg-white dark:bg-gray-800 text-gray-500 dark:text-gray-400 rounded-lg px-6 py-8 ring shadow-xl ring-gray-900/5 space-y-2" >
@@ -94,7 +99,28 @@
Click Generate to create 200 events, - 10 milliseconds apart. + 10 milliseconds apart.
+ + Total: 0 + + | + + Done: 0 + + | + + Warn: 0 + + | + Fail: 0 + + | + + Info: 0 + +
+ -------------------------------------------------------------
diff --git a/examples/axum-activity-feed.rs b/examples/axum-activity-feed.rs index d3435d2..019a97c 100644 --- a/examples/axum-activity-feed.rs +++ b/examples/axum-activity-feed.rs @@ -12,24 +12,9 @@ use { prelude::{ElementPatchMode, PatchElements, PatchSignals}, }, serde::{Deserialize, Serialize}, - std::sync::Arc, - tokio::sync::RwLock, tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}, }; -pub struct State { - pub feed: Vec, - pub count: Count, -} - -pub struct Count { - pub all: u32, - pub done: u32, - pub warn: u32, - pub fail: u32, - pub info: u32, -} - #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::registry() @@ -41,33 +26,25 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .init(); - let state = Arc::new(RwLock::new(State { - feed: Vec::new(), - count: Count { - all: 0, - done: 0, - warn: 0, - fail: 0, - info: 0, - }, - })); - - let state_generate = state.clone(); - let state_info = state.clone(); - let state_done = state.clone(); - let state_warn = state.clone(); - let state_fail = state.clone(); - - let event = { move |level, state| async move { event(level, state).await } }; - let generate = { move |signals| async move { generate(signals, state_generate).await } }; - let app = Router::new() .route("/", get(index)) .route("/event/generate", post(generate)) - .route("/event/info", post(move || event(Status::Info, state_info))) - .route("/event/done", post(move || event(Status::Done, state_done))) - .route("/event/warn", post(move || event(Status::Warn, state_warn))) - .route("/event/fail", post(move || event(Status::Fail, state_fail))); + .route( + "/event/info", + post(move |signals| event(Status::Info, signals)), + ) + .route( + "/event/done", + post(move |signals| event(Status::Done, signals)), + ) + .route( + "/event/warn", + post(move |signals| event(Status::Warn, signals)), + ) + .route( + "/event/fail", + post(move |signals| event(Status::Fail, signals)), + ); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -89,6 +66,11 @@ pub struct Signals { pub interval: u64, pub events: u64, pub generating: bool, + pub total: u64, + pub done: u64, + pub warn: u64, + pub fail: u64, + pub info: u64, } #[derive(Clone, PartialEq, Eq, Deserialize)] @@ -99,38 +81,35 @@ pub enum Status { Fail, } -async fn generate( - ReadSignals(signals): ReadSignals, - state: Arc>, -) -> impl IntoResponse { +async fn generate(ReadSignals(signals): ReadSignals) -> impl IntoResponse { Sse::new(stream! { - let elements = r#"
"#.to_string(); - let patch = PatchElements::new(elements); - let sse_event = patch.write_as_axum_sse_event(); - yield Ok::<_, Infallible>(sse_event); - - let signals_generating = serde_json::to_string(&Signals{ - generating: true, - interval: signals.interval, - events: signals.events, - }).unwrap(); - let patch = PatchSignals::new(signals_generating); - let sse_event = patch.write_as_axum_sse_event(); - yield Ok::<_, Infallible>(sse_event); - + let mut total = signals.total; + let mut done = signals.done; for _ in 1..=signals.events { - let mut state = state.write().await; - let elements = event_entry(&mut state, &Status::Done, "Auto"); + total += 1; + done += 1; + let elements = event_entry(total, &Status::Done, "Auto"); let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + let signals_generating = serde_json::to_string(&Signals{ + generating: true, + total, + done, + ..signals + }).unwrap(); + let patch = PatchSignals::new(signals_generating); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); tokio::time::sleep(Duration::from_millis(signals.interval)).await; } let signals_done = serde_json::to_string(&Signals{ generating: false, - interval: signals.interval, - events: signals.events, + total, + done, + ..signals }).unwrap(); let patch = PatchSignals::new(signals_done); let sse_event = patch.write_as_axum_sse_event(); @@ -138,49 +117,65 @@ async fn generate( }) } -async fn event(status: Status, state: Arc>) -> impl IntoResponse { +async fn event(status: Status, ReadSignals(signals): ReadSignals) -> impl IntoResponse { Sse::new(stream! { - let mut state = state.write().await; - let elements = event_entry(&mut state, &status, "Manual"); + let total = signals.total + 1; + let mut done = signals.done; + let mut warn = signals.warn; + let mut fail = signals.fail; + let mut info = signals.info; + match status { + Status::Done => done += 1, + Status::Warn => warn += 1, + Status::Fail => fail += 1, + Status::Info => info += 1, + } + let elements = event_entry(total, &status, "Manual"); let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); - yield Ok::<_, Infallible>(sse_event); + + let signals = serde_json::to_string(&Signals { + total, + done, + warn, + fail, + info, + ..signals + }).unwrap(); + let patch = PatchSignals::new(signals); + let sse_signal = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_signal); }) } -fn event_entry(state: &mut State, status: &Status, prefix: &str) -> String { - state.count.all += 1; +fn event_entry(index: u64, status: &Status, prefix: &str) -> String { let timestamp = chrono::Utc::now() .format("%Y-%m-%d %H:%M:%S%.3f") .to_string(); match status { Status::Done => { - state.count.done += 1; format!( "
{} [ ✅ Done ] {} event {}
", - state.count.all, timestamp, prefix, state.count.all + index, timestamp, prefix, index ) } Status::Warn => { - state.count.warn += 1; format!( "
{} [ ⚠️ Warn ] {} event {}
", - state.count.all, timestamp, prefix, state.count.all + index, timestamp, prefix, index ) } Status::Fail => { - state.count.fail += 1; format!( "
{} [ ❌ Fail ] {} event {}
", - state.count.all, timestamp, prefix, state.count.all + index, timestamp, prefix, index ) } Status::Info => { - state.count.info += 1; format!( "
{} [ ℹ️ Info ] {} event {}
", - state.count.all, timestamp, prefix, state.count.all + index, timestamp, prefix, index ) } } From 7cc13a7a14ade301c52e768942353c32f0d5dada Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Mon, 28 Jul 2025 14:17:47 -0700 Subject: [PATCH 3/6] Make signal patches partial --- examples/axum-activity-feed.rs | 59 ++++++++++++++++------------------ 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/examples/axum-activity-feed.rs b/examples/axum-activity-feed.rs index 019a97c..2dfbd0f 100644 --- a/examples/axum-activity-feed.rs +++ b/examples/axum-activity-feed.rs @@ -85,6 +85,11 @@ async fn generate(ReadSignals(signals): ReadSignals) -> impl IntoRespon Sse::new(stream! { let mut total = signals.total; let mut done = signals.done; + let interval = signals.interval; + let patch = PatchSignals::new(format!(r#"{{"generating": true}}"#)); + let sse_event = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_event); + for _ in 1..=signals.events { total += 1; done += 1; @@ -92,26 +97,14 @@ async fn generate(ReadSignals(signals): ReadSignals) -> impl IntoRespon let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); - let signals_generating = serde_json::to_string(&Signals{ - generating: true, - total, - done, - ..signals - }).unwrap(); - let patch = PatchSignals::new(signals_generating); - let sse_event = patch.write_as_axum_sse_event(); + let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#)); + let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); - tokio::time::sleep(Duration::from_millis(signals.interval)).await; + tokio::time::sleep(Duration::from_millis(interval)).await; } - let signals_done = serde_json::to_string(&Signals{ - generating: false, - total, - done, - ..signals - }).unwrap(); - let patch = PatchSignals::new(signals_done); + let patch = PatchSignals::new(format!(r#"{{"generating": false}}"#)); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); }) @@ -124,26 +117,30 @@ async fn event(status: Status, ReadSignals(signals): ReadSignals) -> im let mut warn = signals.warn; let mut fail = signals.fail; let mut info = signals.info; - match status { - Status::Done => done += 1, - Status::Warn => warn += 1, - Status::Fail => fail += 1, - Status::Info => info += 1, - } + let signal = match status { + Status::Done => { + done += 1; + format!(r#"{{"total": {total}, "done": {done}}}"#) + } + Status::Warn => { + warn += 1; + format!(r#"{{"total": {total}, "warn": {warn}}}"#) + } + Status::Fail => { + fail += 1; + format!(r#"{{"total": {total}, "fail": {fail}}}"#) + } + Status::Info => { + info += 1; + format!(r#"{{"total": {total}, "info": {info}}}"#) + } + }; let elements = event_entry(total, &status, "Manual"); let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); - let signals = serde_json::to_string(&Signals { - total, - done, - warn, - fail, - info, - ..signals - }).unwrap(); - let patch = PatchSignals::new(signals); + let patch = PatchSignals::new(signal); let sse_signal = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_signal); }) From 1d32a9e3a2ffdd2b4d7a1a3a2ce8a235a2f00584 Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Mon, 28 Jul 2025 18:28:36 -0700 Subject: [PATCH 4/6] UX refinements --- examples/activity-feed.html | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/examples/activity-feed.html b/examples/activity-feed.html index b71dd55..b67afe1 100644 --- a/examples/activity-feed.html +++ b/examples/activity-feed.html @@ -10,14 +10,6 @@
@@ -38,19 +30,21 @@

- +
Generate @@ -100,23 +98,23 @@ Click Generate to create 200 events, 10 milliseconds apart.
- + Total: 0 | - + Done: 0 | - + Warn: 0 | - Fail: 0 + + Fail: 0 | - + Info: 0
From 227e6c1073e7d8c3caf9818293471402185490f0 Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Mon, 28 Jul 2025 19:52:22 -0700 Subject: [PATCH 5/6] Update formatting --- examples/activity-feed.html | 45 +++++++++---------------------------- 1 file changed, 11 insertions(+), 34 deletions(-) diff --git a/examples/activity-feed.html b/examples/activity-feed.html index b67afe1..87069a3 100644 --- a/examples/activity-feed.html +++ b/examples/activity-feed.html @@ -13,21 +13,10 @@ class="bg-white dark:bg-gray-800 text-gray-500 dark:text-gray-400 rounded-lg px-6 py-8 ring shadow-xl ring-gray-900/5 space-y-2" >
-

- Datastar SDK Activity Feed Demo -

- Rocket +

Datastar SDK Activity Feed Demo

+ Rocket
-

- SSE events will be streamed from the backend to the frontend. -

+

SSE events will be streamed from the backend to the frontend.

-
+
Click Generate to create - 200 events, - 10 milliseconds apart.
- - Total: 0 - + 200 events, 10 milliseconds + apart.
+ Total: 0 | - - Done: 0 - + Done: 0 | - - Warn: 0 - + Warn: 0 | - - Fail: 0 - + Fail: 0 | - - Info: 0 - + Info: 0
-------------------------------------------------------------
From c2e13bde8fc56226ba03af44d585e26307c24421 Mon Sep 17 00:00:00 2001 From: Ryan Eno Date: Mon, 28 Jul 2025 21:24:17 -0700 Subject: [PATCH 6/6] Refactoring and comments --- examples/axum-activity-feed.rs | 176 ++++++++++++++------------------- 1 file changed, 76 insertions(+), 100 deletions(-) diff --git a/examples/axum-activity-feed.rs b/examples/axum-activity-feed.rs index 2dfbd0f..e3ecc52 100644 --- a/examples/axum-activity-feed.rs +++ b/examples/axum-activity-feed.rs @@ -2,6 +2,7 @@ use { async_stream::stream, axum::{ Router, + extract::Path, response::{Html, IntoResponse, Sse}, routing::{get, post}, }, @@ -15,6 +16,34 @@ use { tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}, }; +/// All `data-signals-*` defined in activity-feed.html +#[derive(Serialize, Deserialize)] +pub struct Signals { + // Form inputs + pub interval: u64, + pub events: u64, + // Activity flags + pub generating: bool, + // Output counters + pub total: u64, + pub done: u64, + pub warn: u64, + pub fail: u64, + pub info: u64, +} + +/// All valid event statuses. +// Normalizing variants to lowercase allows parsing routes from `/event/{status}` +// with a `Path` extractor. +#[derive(Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Status { + Done, + Fail, + Info, + Warn, +} + #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::registry() @@ -29,151 +58,98 @@ async fn main() -> Result<(), Box> { let app = Router::new() .route("/", get(index)) .route("/event/generate", post(generate)) - .route( - "/event/info", - post(move |signals| event(Status::Info, signals)), - ) - .route( - "/event/done", - post(move |signals| event(Status::Done, signals)), - ) - .route( - "/event/warn", - post(move |signals| event(Status::Warn, signals)), - ) - .route( - "/event/fail", - post(move |signals| event(Status::Fail, signals)), - ); + .route("/event/{status}", post(event)); - let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") - .await - .unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?; - tracing::debug!("listening on {}", listener.local_addr().unwrap()); + tracing::debug!("listening on {}", listener.local_addr()?); - axum::serve(listener, app).await.unwrap(); + axum::serve(listener, app).await?; Ok(()) } +/// Simple handler returning a static HTML page async fn index() -> Html<&'static str> { Html(include_str!("activity-feed.html")) } -#[derive(Serialize, Deserialize)] -pub struct Signals { - pub interval: u64, - pub events: u64, - pub generating: bool, - pub total: u64, - pub done: u64, - pub warn: u64, - pub fail: u64, - pub info: u64, -} - -#[derive(Clone, PartialEq, Eq, Deserialize)] -pub enum Status { - Info, - Done, - Warn, - Fail, -} - +/// Generates a number of "done" events with a specified interval. async fn generate(ReadSignals(signals): ReadSignals) -> impl IntoResponse { + // Values we will update in a loop + let mut total = signals.total; + let mut done = signals.done; + + // Start the SSE stream Sse::new(stream! { - let mut total = signals.total; - let mut done = signals.done; - let interval = signals.interval; + // Signal event generation start let patch = PatchSignals::new(format!(r#"{{"generating": true}}"#)); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); + // Yield the events elements and signals to the stream for _ in 1..=signals.events { total += 1; done += 1; - let elements = event_entry(total, &Status::Done, "Auto"); + // Append a new entry to the activity feed + let elements = event_entry(&Status::Done, total, "Auto"); let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); + // Update the event counts let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#)); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); - tokio::time::sleep(Duration::from_millis(interval)).await; + tokio::time::sleep(Duration::from_millis(signals.interval)).await; } + // Signal event generation end let patch = PatchSignals::new(format!(r#"{{"generating": false}}"#)); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); }) } -async fn event(status: Status, ReadSignals(signals): ReadSignals) -> impl IntoResponse { +/// Creates one event with a given status +async fn event( + Path(status): Path, + ReadSignals(signals): ReadSignals, +) -> impl IntoResponse { + // Create the event stream, since we're patching both an element and a signal. Sse::new(stream! { + // Signal the updated event counts let total = signals.total + 1; - let mut done = signals.done; - let mut warn = signals.warn; - let mut fail = signals.fail; - let mut info = signals.info; - let signal = match status { - Status::Done => { - done += 1; - format!(r#"{{"total": {total}, "done": {done}}}"#) - } - Status::Warn => { - warn += 1; - format!(r#"{{"total": {total}, "warn": {warn}}}"#) - } - Status::Fail => { - fail += 1; - format!(r#"{{"total": {total}, "fail": {fail}}}"#) - } - Status::Info => { - info += 1; - format!(r#"{{"total": {total}, "info": {info}}}"#) - } + let signals = match status { + Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1), + Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1), + Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1), + Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1), }; - let elements = event_entry(total, &status, "Manual"); + let patch = PatchSignals::new(signals); + let sse_signal = patch.write_as_axum_sse_event(); + yield Ok::<_, Infallible>(sse_signal); + + // Patch an element and append it to the feed + let elements = event_entry(&status, total, "Manual"); let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After); let sse_event = patch.write_as_axum_sse_event(); yield Ok::<_, Infallible>(sse_event); - - let patch = PatchSignals::new(signal); - let sse_signal = patch.write_as_axum_sse_event(); - yield Ok::<_, Infallible>(sse_signal); }) } -fn event_entry(index: u64, status: &Status, prefix: &str) -> String { +/// Returns an HTML string for the entry +fn event_entry(status: &Status, index: u64, source: &str) -> String { let timestamp = chrono::Utc::now() .format("%Y-%m-%d %H:%M:%S%.3f") .to_string(); - match status { - Status::Done => { - format!( - "
{} [ ✅ Done ] {} event {}
", - index, timestamp, prefix, index - ) - } - Status::Warn => { - format!( - "
{} [ ⚠️ Warn ] {} event {}
", - index, timestamp, prefix, index - ) - } - Status::Fail => { - format!( - "
{} [ ❌ Fail ] {} event {}
", - index, timestamp, prefix, index - ) - } - Status::Info => { - format!( - "
{} [ ℹ️ Info ] {} event {}
", - index, timestamp, prefix, index - ) - } - } + let (color, indicator) = match status { + Status::Done => ("green", "✅ Done"), + Status::Warn => ("yellow", "⚠️ Warn"), + Status::Fail => ("red", "❌ Fail"), + Status::Info => ("blue", "ℹ️ Info"), + }; + format!( + "
{timestamp} [ {indicator} ] {source} event {index}
" + ) }