Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5bf4a9d
Organize spec and design documents into docs/superpowers
aram356 Mar 25, 2026
deffcec
Enable superpowers and chrome-devtools plugins
aram356 Mar 25, 2026
baefae8
Add streaming response optimization spec for non-Next.js paths
aram356 Mar 25, 2026
de8dbfd
Expand testing strategy with Chrome DevTools MCP performance measurement
aram356 Mar 25, 2026
fa74167
Clarify flush vs drop behavior in process_through_compression
aram356 Mar 25, 2026
221d971
Add implementation plan for streaming response optimization
aram356 Mar 25, 2026
42b7b07
Merge branch 'main' into specs/streaming-response-optimization
aram356 Mar 26, 2026
6968201
Fix encoder finalization: explicit finish instead of drop
aram356 Mar 26, 2026
a4fd5c6
Convert process_gzip_to_gzip to chunk-based processing
aram356 Mar 26, 2026
a4f4a7c
Convert decompress_and_process to chunk-based processing
aram356 Mar 26, 2026
105244c
Rewrite HtmlRewriterAdapter for incremental lol_html streaming
aram356 Mar 26, 2026
d72669c
Unify compression paths into single process_chunks method
aram356 Mar 26, 2026
80e51d4
Update plan with compression refactor implementation note
aram356 Mar 26, 2026
c505c00
Accumulate output for post-processors in HtmlWithPostProcessing
aram356 Mar 26, 2026
6cae7f9
Add streaming response optimization spec for non-Next.js paths
aram356 Mar 25, 2026
930a584
Address spec review: Content-Length, streaming gate, finalization ord…
aram356 Mar 25, 2026
a2b71bf
Address deep review: header timing, error phases, process_response_st…
aram356 Mar 25, 2026
b363e56
Address deep review: remove fastly::init, fix API assumptions, add mi…
aram356 Mar 25, 2026
13366f8
Merge branch 'main' into feature/streaming-pipeline-phase1
aram356 Mar 26, 2026
b83f61c
Apply rustfmt formatting to streaming_processor
aram356 Mar 26, 2026
aeca9f6
Add debug logging, brotli round-trip test, and post-processor accumul…
aram356 Mar 26, 2026
e1c6cb8
Address deep review: imports, stale comments, brotli finalization, tests
aram356 Mar 26, 2026
9753026
Address second deep review: correctness, docs, and test robustness
aram356 Mar 26, 2026
0a4ece7
Add active post-processor test and precise flush docs per codec
aram356 Mar 26, 2026
68d11e8
Fix text node fragmentation regression for script rewriters
aram356 Mar 26, 2026
6faeea0
Gate streaming adapter on script rewriter presence
aram356 Mar 26, 2026
73c992e
Document text node fragmentation workaround and Phase 3 plan
aram356 Mar 26, 2026
75f455a
Add buffered mode guard, anti-fragmentation test, and fix stale spec
aram356 Mar 26, 2026
94f238a
Address PR review feedback on streaming response spec
aram356 Mar 31, 2026
1f2091d
Move EC coordination note to Phase 2 / Step 2 level
aram356 Mar 31, 2026
d049915
Merge branch 'main' into specs/streaming-response-optimization
aram356 Mar 31, 2026
d00fc5d
Formatting
aram356 Mar 31, 2026
76f8965
Merge branch 'specs/streaming-response-optimization' into feature/str…
aram356 Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 198 additions & 10 deletions crates/trusted-server-core/src/html_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,22 @@ use crate::settings::Settings;
use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
use crate::tsjs;

/// Wraps [`HtmlRewriterAdapter`] with optional post-processing.
///
/// When `post_processors` is empty (the common streaming path), chunks pass
/// through immediately with no extra copying. When post-processors are
/// registered, intermediate output is accumulated in `accumulated_output`
/// until `is_last`, then post-processors run on the full document. This adds
/// an extra copy per chunk compared to the pre-streaming adapter (which
/// accumulated raw input instead of rewriter output). The overhead is
/// acceptable because the post-processor path is already fully buffered —
/// the real streaming win comes from the empty-post-processor path in Phase 2.
struct HtmlWithPostProcessing {
inner: HtmlRewriterAdapter,
post_processors: Vec<Arc<dyn IntegrationHtmlPostProcessor>>,
/// Buffer that accumulates all intermediate output when post-processors
/// need the full document. Left empty on the streaming-only path.
accumulated_output: Vec<u8>,
origin_host: String,
request_host: String,
request_scheme: String,
Expand All @@ -29,12 +42,26 @@ struct HtmlWithPostProcessing {
impl StreamProcessor for HtmlWithPostProcessing {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result<Vec<u8>, io::Error> {
let output = self.inner.process_chunk(chunk, is_last)?;
if !is_last || output.is_empty() || self.post_processors.is_empty() {

// Streaming-optimized path: no post-processors, pass through immediately.
if self.post_processors.is_empty() {
return Ok(output);
}

let Ok(output_str) = std::str::from_utf8(&output) else {
return Ok(output);
// Post-processors need the full document. Accumulate until the last chunk.
self.accumulated_output.extend_from_slice(&output);
if !is_last {
return Ok(Vec::new());
}

// Final chunk: run post-processors on the full accumulated output.
let full_output = std::mem::take(&mut self.accumulated_output);
if full_output.is_empty() {
return Ok(full_output);
}

let Ok(output_str) = std::str::from_utf8(&full_output) else {
return Ok(full_output);
};

let ctx = IntegrationHtmlContext {
Expand All @@ -50,10 +77,10 @@ impl StreamProcessor for HtmlWithPostProcessing {
.iter()
.any(|p| p.should_process(output_str, &ctx))
{
return Ok(output);
return Ok(full_output);
}

let mut html = String::from_utf8(output).map_err(|e| {
let mut html = String::from_utf8(full_output).map_err(|e| {
io::Error::other(format!(
"HTML post-processing expected valid UTF-8 output: {e}"
))
Expand All @@ -77,10 +104,11 @@ impl StreamProcessor for HtmlWithPostProcessing {
Ok(html.into_bytes())
}

fn reset(&mut self) {
self.inner.reset();
self.document_state.clear();
}
/// No-op. `HtmlWithPostProcessing` wraps a single-use
/// [`HtmlRewriterAdapter`] that cannot be reset. Clearing auxiliary
/// state without resetting the rewriter would leave the processor
/// in an inconsistent state, so this method intentionally does nothing.
fn reset(&mut self) {}
}

/// Configuration for HTML processing
Expand Down Expand Up @@ -427,6 +455,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
}),
];

let has_script_rewriters = !script_rewriters.is_empty();
for script_rewriter in script_rewriters {
let selector = script_rewriter.selector();
let rewriter = script_rewriter.clone();
Expand Down Expand Up @@ -464,9 +493,21 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
..RewriterSettings::default()
};

// Use buffered mode when script rewriters are registered. lol_html fragments
// text nodes across input chunk boundaries, breaking rewriters that expect
// complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire
// document in one write() call, preserving text node integrity.
// Phase 3 will make rewriters fragment-safe, enabling streaming for all configs.
let inner = if has_script_rewriters {
HtmlRewriterAdapter::new_buffered(rewriter_settings)
} else {
HtmlRewriterAdapter::new(rewriter_settings)
};

HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(rewriter_settings),
inner,
post_processors,
accumulated_output: Vec::new(),
origin_host: config.origin_host,
request_host: config.request_host,
request_scheme: config.request_scheme,
Expand Down Expand Up @@ -991,4 +1032,151 @@ mod tests {
.collect::<String>()
);
}

#[test]
fn post_processors_accumulate_while_streaming_path_passes_through() {
use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
use lol_html::Settings;

// --- Streaming path: no post-processors → output emitted per chunk ---
let mut streaming = HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(Settings::default()),
post_processors: Vec::new(),
accumulated_output: Vec::new(),
origin_host: String::new(),
request_host: String::new(),
request_scheme: String::new(),
document_state: IntegrationDocumentState::default(),
};

let chunk1 = streaming
.process_chunk(b"<html><body>", false)
.expect("should process chunk1");
let chunk2 = streaming
.process_chunk(b"<p>hello</p>", false)
.expect("should process chunk2");
let chunk3 = streaming
.process_chunk(b"</body></html>", true)
.expect("should process final chunk");

assert!(
!chunk1.is_empty() || !chunk2.is_empty(),
"should emit intermediate output on streaming path"
);

let mut streaming_all = chunk1;
streaming_all.extend_from_slice(&chunk2);
streaming_all.extend_from_slice(&chunk3);

// --- Buffered path: post-processor registered → accumulates until is_last ---
struct NoopPostProcessor;
impl IntegrationHtmlPostProcessor for NoopPostProcessor {
fn integration_id(&self) -> &'static str {
"test-noop"
}
fn post_process(&self, _html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool {
false
}
}

let mut buffered = HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(Settings::default()),
post_processors: vec![Arc::new(NoopPostProcessor)],
accumulated_output: Vec::new(),
origin_host: String::new(),
request_host: String::new(),
request_scheme: String::new(),
document_state: IntegrationDocumentState::default(),
};

let buf1 = buffered
.process_chunk(b"<html><body>", false)
.expect("should process chunk1");
let buf2 = buffered
.process_chunk(b"<p>hello</p>", false)
.expect("should process chunk2");
let buf3 = buffered
.process_chunk(b"</body></html>", true)
.expect("should process final chunk");

assert!(
buf1.is_empty() && buf2.is_empty(),
"should return empty for intermediate chunks when post-processors are registered"
);
assert!(
!buf3.is_empty(),
"should emit all output in final chunk when post-processors are registered"
);

// Both paths should produce identical output
let streaming_str =
String::from_utf8(streaming_all).expect("streaming output should be valid UTF-8");
let buffered_str = String::from_utf8(buf3).expect("buffered output should be valid UTF-8");
assert_eq!(
streaming_str, buffered_str,
"streaming and buffered paths should produce identical output"
);
}

#[test]
fn active_post_processor_receives_full_document_and_mutates_output() {
use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
use lol_html::Settings;

struct AppendCommentProcessor;
impl IntegrationHtmlPostProcessor for AppendCommentProcessor {
fn integration_id(&self) -> &'static str {
"test-append"
}
fn should_process(&self, html: &str, _ctx: &IntegrationHtmlContext<'_>) -> bool {
html.contains("</html>")
}
fn post_process(&self, html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool {
html.push_str("<!-- processed -->");
true
}
}

let mut processor = HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(Settings::default()),
post_processors: vec![Arc::new(AppendCommentProcessor)],
accumulated_output: Vec::new(),
origin_host: String::new(),
request_host: String::new(),
request_scheme: String::new(),
document_state: IntegrationDocumentState::default(),
};

// Feed multiple chunks
let r1 = processor
.process_chunk(b"<html><body>", false)
.expect("should process chunk1");
let r2 = processor
.process_chunk(b"<p>content</p>", false)
.expect("should process chunk2");
let r3 = processor
.process_chunk(b"</body></html>", true)
.expect("should process final chunk");

// Intermediate chunks return empty (buffered for post-processor)
assert!(
r1.is_empty() && r2.is_empty(),
"should buffer intermediate chunks"
);

// Final chunk contains the full document with post-processor mutation
let output = String::from_utf8(r3).expect("should be valid UTF-8");
assert!(
output.contains("<p>content</p>"),
"should contain original content"
);
assert!(
output.contains("</html>"),
"should contain complete document"
);
assert!(
output.contains("<!-- processed -->"),
"should contain post-processor mutation"
);
}
}
Loading
Loading