diff --git a/src/agent/infra/agent/service-initializer.ts b/src/agent/infra/agent/service-initializer.ts index 6b470f18b..7f27fdc76 100644 --- a/src/agent/infra/agent/service-initializer.ts +++ b/src/agent/infra/agent/service-initializer.ts @@ -193,12 +193,18 @@ export async function createCipherAgentServices( basePath: promptsBasePath, validateConfig: true, }) - // Register default contributors + // Register default contributors. + // + // Note: dateTime is intentionally NOT in the system prompt. Anthropic + // prompt caching does token-level prefix matching, so a per-iteration + // refreshed timestamp here would invalidate the cache for everything + // past it. dateTime is instead injected into the first user message + // by AgentLLMService, where it lives after the cache breakpoints and + // does not poison the cached prefix. systemPromptManager.registerContributors([ {enabled: true, filepath: 'system-prompt.yml', id: 'base', priority: 0, type: 'file'}, {enabled: true, id: 'env', priority: 10, type: 'environment'}, {enabled: true, id: 'memories', priority: 20, type: 'memory'}, - {enabled: true, id: 'datetime', priority: 30, type: 'dateTime'}, ]) // Register context tree structure contributor for query/curate commands diff --git a/src/agent/infra/llm/agent-llm-service.ts b/src/agent/infra/llm/agent-llm-service.ts index 51802594e..92fdeb98b 100644 --- a/src/agent/infra/llm/agent-llm-service.ts +++ b/src/agent/infra/llm/agent-llm-service.ts @@ -60,6 +60,18 @@ import {type ProcessedOutput, ToolOutputProcessor, type TruncationConfig} from ' /** Target utilization ratio for message tokens (leaves headroom for response) */ const TARGET_MESSAGE_TOKEN_UTILIZATION = 0.7 +/** + * Build a `...\n\n` prefix for a user-message body. + * + * Per-call timestamps must NOT enter the system prompt (they would poison + * the prefix cache). They are injected into the user message instead, at + * the boundaries where the model legitimately needs fresh time context: + * the iter-0 input, and after a rolling-checkpoint history clear. + */ +export function buildDateTimePrefix(now: Date = new Date()): string { + return `Current date and time: ${now.toISOString()}\n\n` +} + /** * Result of parallel tool execution (before adding to context). * Contains all information needed to add the result to context in order. @@ -902,8 +914,11 @@ export class AgentLLMService implements ILLMService { this.cachedBasePrompt = basePrompt this.memoryDirtyFlag = false } else { - // Cache hit: reuse base prompt, only refresh the DateTime section - basePrompt = this.refreshDateTime(this.cachedBasePrompt!) + // Cache hit: reuse base prompt verbatim. The cached prompt has no + // dateTime section to refresh — dateTime is injected into the + // first user message instead so the system prefix stays byte-stable + // across iterations and prompt caching can engage cleanly. + basePrompt = this.cachedBasePrompt! } let systemPrompt = basePrompt @@ -944,9 +959,13 @@ export class AgentLLMService implements ILLMService { // Add user message and compress context within mutex lock return this.mutex.withLock(async () => { - // Add user message to context only on the first iteration + // Add user message to context only on the first iteration. The + // dateTime block is prefixed here (not in the system prompt) so + // the cached system prefix stays byte-stable across iterations + // and Anthropic/OpenAI/Google prefix caches can engage cleanly. if (iterationCount === 0) { - await this.contextManager.addUserMessage(textInput, imageData, fileData) + const inputWithDateTime = `${buildDateTimePrefix()}${textInput}` + await this.contextManager.addUserMessage(inputWithDateTime, imageData, fileData) } // Rolling checkpoint: periodically save progress and clear history for RLM commands. @@ -1540,8 +1559,12 @@ export class AgentLLMService implements ILLMService { // Clear conversation history await this.contextManager.clearHistory() - // Re-inject continuation prompt with variable reference - const continuationPrompt = [ + // Re-inject continuation prompt with variable reference. + // Prepend the dateTime block: clearHistory wiped the iter-0 user + // message that originally carried it, and the iter-0 guard upstream + // prevents re-injection. Without this, every iteration after the + // first checkpoint loses time context for the rest of the run. + const continuationPrompt = buildDateTimePrefix() + [ `Continue task. Iteration checkpoint at turn ${iterationCount}.`, `Previous progress stored in variable: ${checkpointVar}`, `Original task: ${textInput.slice(0, 200)}${textInput.length > 200 ? '...' : ''}`, @@ -1555,19 +1578,6 @@ export class AgentLLMService implements ILLMService { }) } - /** - * Replace the DateTime section in a cached system prompt with a fresh timestamp. - * DateTimeContributor wraps its output in ... XML tags, - * enabling reliable regex replacement without rebuilding the entire prompt. - * - * @param cachedPrompt - Previously cached system prompt - * @returns Updated prompt with fresh DateTime - */ - private refreshDateTime(cachedPrompt: string): string { - const freshDateTime = `Current date and time: ${new Date().toISOString()}` - return cachedPrompt.replace(/[\S\s]*?<\/dateTime>/, freshDateTime) - } - /** * Check if a rolling checkpoint should trigger. * Triggers every N iterations for curate/query commands, or when token utilization is high. diff --git a/src/agent/infra/llm/generators/ai-sdk-content-generator.ts b/src/agent/infra/llm/generators/ai-sdk-content-generator.ts index 9fd961d42..1d2e66d03 100644 --- a/src/agent/infra/llm/generators/ai-sdk-content-generator.ts +++ b/src/agent/infra/llm/generators/ai-sdk-content-generator.ts @@ -5,7 +5,7 @@ * Replaces per-provider content generators with one unified implementation. */ -import type {LanguageModel} from 'ai' +import type {LanguageModel, ModelMessage} from 'ai' import {generateText, streamText} from 'ai' @@ -22,6 +22,28 @@ import {toAiSdkTools, toModelMessages} from './ai-sdk-message-converter.js' const DEFAULT_CHARS_PER_TOKEN = 4 +/** + * Prepend the system prompt as a system-role message carrying + * `providerOptions.anthropic.cacheControl: ephemeral`. AI SDK's top-level + * `system: string` parameter does not propagate providerOptions, so the + * only way to attach Anthropic cache_control to the system block is to + * pass it through the messages array. Non-Anthropic providers ignore the + * `anthropic` namespace. + */ +export function prependCachedSystemMessage(systemPrompt: string | undefined, messages: ModelMessage[]): ModelMessage[] { + if (!systemPrompt) { + return messages + } + + const systemMessage: ModelMessage = { + content: systemPrompt, + providerOptions: {anthropic: {cacheControl: {type: 'ephemeral'}}}, + role: 'system', + } + + return [systemMessage, ...messages] +} + /** * Configuration for AiSdkContentGenerator. */ @@ -54,7 +76,7 @@ export class AiSdkContentGenerator implements IContentGenerator { } public async generateContent(request: GenerateContentRequest): Promise { - const messages = toModelMessages(request.contents) + const messages = prependCachedSystemMessage(request.systemPrompt, toModelMessages(request.contents)) const tools = toAiSdkTools(request.tools) const result = await generateText({ @@ -63,7 +85,6 @@ export class AiSdkContentGenerator implements IContentGenerator { messages, model: this.model, temperature: request.config.temperature, - ...(request.systemPrompt && {system: request.systemPrompt}), ...(tools && {tools}), ...(request.config.topK !== undefined && {topK: request.config.topK}), ...(request.config.topP !== undefined && {topP: request.config.topP}), @@ -100,7 +121,7 @@ export class AiSdkContentGenerator implements IContentGenerator { } public async *generateContentStream(request: GenerateContentRequest): AsyncGenerator { - const messages = toModelMessages(request.contents) + const messages = prependCachedSystemMessage(request.systemPrompt, toModelMessages(request.contents)) const tools = toAiSdkTools(request.tools) const result = streamText({ @@ -109,7 +130,6 @@ export class AiSdkContentGenerator implements IContentGenerator { messages, model: this.model, temperature: request.config.temperature, - ...(request.systemPrompt && {system: request.systemPrompt}), ...(tools && {tools}), ...(request.config.topK !== undefined && {topK: request.config.topK}), ...(request.config.topP !== undefined && {topP: request.config.topP}), diff --git a/src/agent/infra/llm/generators/ai-sdk-message-converter.ts b/src/agent/infra/llm/generators/ai-sdk-message-converter.ts index 49a10e37e..cd61a8d95 100644 --- a/src/agent/infra/llm/generators/ai-sdk-message-converter.ts +++ b/src/agent/infra/llm/generators/ai-sdk-message-converter.ts @@ -63,18 +63,25 @@ export function toModelMessages(messages: InternalMessage[]): ModelMessage[] { /** * Convert our ToolSet to AI SDK tool definitions. * Tools are declared without `execute` — our agentic loop handles execution. + * + * The last tool gets `providerOptions.anthropic.cacheControl: ephemeral`, + * which makes Anthropic cache the entire tool block (and the system prompt + * before it). Non-Anthropic providers ignore the `anthropic` namespace. */ export function toAiSdkTools(tools?: InternalToolSet): Record> | undefined { if (!tools || Object.keys(tools).length === 0) { return undefined } + const entries = Object.entries(tools) const result: Record> = {} - for (const [name, def] of Object.entries(tools)) { + for (const [index, [name, def]] of entries.entries()) { + const isLast = index === entries.length - 1 result[name] = aiSdkTool({ description: def.description ?? '', inputSchema: jsonSchema(def.parameters as Record), + ...(isLast && {providerOptions: {anthropic: {cacheControl: {type: 'ephemeral'}}}}), }) } diff --git a/src/agent/infra/map/abstract-generator.ts b/src/agent/infra/map/abstract-generator.ts index 1576d3723..5788eb5a9 100644 --- a/src/agent/infra/map/abstract-generator.ts +++ b/src/agent/infra/map/abstract-generator.ts @@ -50,6 +50,136 @@ ${content} /** Truncate content before embedding in LLM prompts to avoid exceeding model context windows during bulk ingest. */ const MAX_ABSTRACT_CONTENT_CHARS = 20_000 +/** + * Per-file truncation when N files share a single batched call. Matches the + * non-batched cap (20 KB) so each file gets the same view of its content + * regardless of batched vs per-file mode — total batched user content scales + * linearly with N. Avoids quality regression on long-file curates that batched + * mode would otherwise see. + */ +const MAX_BATCHED_CONTENT_CHARS_PER_FILE = MAX_ABSTRACT_CONTENT_CHARS + +/** L0 batch output budget: 5 files × ~80 tokens + framing tags ≈ 600 tokens. */ +const BATCH_L0_MAX_OUTPUT_TOKENS = 800 + +/** L1 batch output budget: 5 files × ~1500 tokens + framing tags ≈ 8000 tokens. */ +const BATCH_L1_MAX_OUTPUT_TOKENS = 8500 + +/** + * Result from a batched abstract generation. One entry per input item, in + * input order. Empty string fields signal the model failed to produce content + * for that path — the caller's existing fail-open semantics still apply. + */ +export interface BatchedAbstractItem { + abstractContent: string + contextPath: string + overviewContent: string +} + +const BATCHED_ABSTRACT_SYSTEM_PROMPT = `You are a technical documentation assistant. +You produce precise one-line summaries of knowledge documents in a strict XML format. +Output ONLY the XML — no preamble, no commentary, no markdown fences.` + +const BATCHED_OVERVIEW_SYSTEM_PROMPT = `You are a technical documentation assistant. +You produce structured overviews of knowledge documents in a strict XML format. +Output ONLY the XML — no preamble, no commentary, no markdown fences.` + +function escapeXmlAttr(value: string): string { + return value.replaceAll('&', '&').replaceAll('"', '"').replaceAll('<', '<').replaceAll('>', '>') +} + +/** + * Wrap raw file content in a CDATA section so XML/HTML/JSX/markdown that + * mentions `` or `` (perfectly normal for docs that describe + * those formats) cannot terminate the envelope and conflate files. The inner + * `]]>` escape is the standard CDATA-in-CDATA trick: split the sequence so it + * never appears verbatim inside the active section. + */ +function wrapCdata(content: string): string { + return `', ']]]]>')}]]>` +} + +function buildBatchedAbstractPrompt(items: ReadonlyArray<{content: string; contextPath: string;}>): string { + const filesXml = items.map((it) => ` +${wrapCdata(it.content)} +`).join('\n') + + return `For each of the following knowledge documents, produce a ONE-LINE summary (max 80 tokens) that is a complete sentence capturing the core topic and key insight. + +Output format — emit exactly one element per input file, with the same path attribute: +One-line summary. + +Output only these XML elements, in any order. No preamble, no markdown fences. + + +${filesXml} +` +} + +function buildBatchedOverviewPrompt(items: ReadonlyArray<{content: string; contextPath: string;}>): string { + const filesXml = items.map((it) => ` +${wrapCdata(it.content)} +`).join('\n') + + return `For each of the following knowledge documents, produce a structured overview (markdown, under 1500 tokens) that includes: +- Key points (3-7 bullet points) +- Structure / sections summary +- Any notable entities, patterns, or decisions mentioned + +Output format — emit exactly one element per input file, with the same path attribute: + +- bullet 1 +- bullet 2 +... + + +Output only these XML elements, in any order. No preamble, no markdown fences. + + +${filesXml} +` +} + +/** + * Extract ... per from the model output. + * Tolerant: ignores extra whitespace, supports nested newlines inside the inner + * tag. Returns a Map keyed by path. Paths that don't appear are absent. + * + * Anchored on `` openers (not `` closers) so a model + * overview that mentions `` literally in prose — perfectly normal for + * docs about XML, JSX, or build systems — cannot prematurely terminate the + * outer match and orphan the inner tag. Each opener owns the response slice + * up to the next opener (or end-of-string), and the inner regex extracts + * the payload from that slice. + */ +function parseBatchedTags(response: string, innerTag: 'abstract' | 'overview'): Map { + const result = new Map() + const fileOpenerRe = /]*>/g + const innerRe = new RegExp(`<${innerTag}>([\\s\\S]*?)<\\/${innerTag}>`) + + const openers: Array<{bodyStart: number; rawPath: string}> = [] + let m: null | RegExpExecArray + while ((m = fileOpenerRe.exec(response)) !== null) { + openers.push({bodyStart: fileOpenerRe.lastIndex, rawPath: m[1]}) + } + + for (const [i, opener] of openers.entries()) { + // Each opener's slice runs from its end to the start of the next opener + // (or end-of-string). Within that slice, the inner regex picks up the + // payload. A literal `` in prose has no special meaning here. + const sliceEnd = i + 1 < openers.length ? openers[i + 1].bodyStart : response.length + const slice = response.slice(opener.bodyStart, sliceEnd) + const inner = innerRe.exec(slice) + if (inner) { + const path = opener.rawPath + .replaceAll('&', '&').replaceAll('"', '"').replaceAll('<', '<').replaceAll('>', '>') + result.set(path, inner[1].trim()) + } + } + + return result +} + /** * Generate L0 abstract and L1 overview for a knowledge file. * @@ -88,3 +218,68 @@ export async function generateFileAbstracts( overviewContent: overviewText.trim(), } } + +/** + * Generate L0 abstracts and L1 overviews for N knowledge files in two batched + * LLM calls (one batch for all L0s, one for all L1s) instead of 2N per-file + * calls. + * + * Two parallel calls; each call carries all input files in an XML envelope + * and the model is instructed to return one element per file. Output is + * parsed by path tag and matched back to the input order. Files the model + * fails to produce content for receive empty strings (caller's existing + * fail-open semantics still apply). + * + * Caller is responsible for capping batch size; this function does not split + * its input. Recommended cap is 5 files per call to keep the L1 batch's + * output budget under ~8K tokens. + */ +export async function generateFileAbstractsBatch( + items: ReadonlyArray<{contextPath: string; fullContent: string}>, + generator: IContentGenerator, +): Promise { + if (items.length === 0) return [] + + // Dedup by contextPath, keeping the LAST occurrence's content. The queue is + // FIFO so later items carry the most recent fullContent — and the disk file + // already reflects that write, so the abstract must summarize the latest + // state rather than an intermediate one. Without this dedup, duplicate paths + // emit two `` blocks the model may answer in either order; the + // tag parser keys on path and Map-collapses, leaving non-deterministic + // results for the duplicates. + const byPath = new Map() + for (const it of items) { + byPath.set(it.contextPath, { + content: it.fullContent.slice(0, MAX_BATCHED_CONTENT_CHARS_PER_FILE), + contextPath: it.contextPath, + }) + } + + const truncated = [...byPath.values()] + + const [abstractText, overviewText] = await Promise.all([ + streamToText(generator, { + config: {maxTokens: BATCH_L0_MAX_OUTPUT_TOKENS, temperature: 0}, + contents: [{content: buildBatchedAbstractPrompt(truncated), role: 'user'}], + model: 'default', + systemPrompt: BATCHED_ABSTRACT_SYSTEM_PROMPT, + taskId: randomUUID(), + }), + streamToText(generator, { + config: {maxTokens: BATCH_L1_MAX_OUTPUT_TOKENS, temperature: 0}, + contents: [{content: buildBatchedOverviewPrompt(truncated), role: 'user'}], + model: 'default', + systemPrompt: BATCHED_OVERVIEW_SYSTEM_PROMPT, + taskId: randomUUID(), + }), + ]) + + const abstracts = parseBatchedTags(abstractText, 'abstract') + const overviews = parseBatchedTags(overviewText, 'overview') + + return items.map((it) => ({ + abstractContent: (abstracts.get(it.contextPath) ?? '').trim(), + contextPath: it.contextPath, + overviewContent: (overviews.get(it.contextPath) ?? '').trim(), + })) +} diff --git a/src/agent/infra/map/abstract-queue.ts b/src/agent/infra/map/abstract-queue.ts index 05d17aaba..862c7b9a7 100644 --- a/src/agent/infra/map/abstract-queue.ts +++ b/src/agent/infra/map/abstract-queue.ts @@ -1,10 +1,20 @@ import {appendFileSync} from 'node:fs' import {mkdir, writeFile} from 'node:fs/promises' -import {join} from 'node:path' +import {isAbsolute, join} from 'node:path' import type {IContentGenerator} from '../../core/interfaces/i-content-generator.js' -import {generateFileAbstracts} from './abstract-generator.js' +import {generateFileAbstractsBatch} from './abstract-generator.js' + +/** + * Maximum files combined into a single batched L0/L1 LLM call. + * + * Two parallel calls fire per cycle: one L0 batch (~80 tok output × N files + + * tags), one L1 batch (~1500 tok output × N files + tags). At N=5 the L1 + * output budget caps at ~8K tokens; raising N further risks output truncation + * on smaller-context models. Lowering N reduces savings without quality gain. + */ +const BATCH_SIZE_CAP = 5 const QUEUE_TRACE_ENABLED = process.env.BRV_QUEUE_TRACE === '1' const LOG_PATH = process.env.BRV_SESSION_LOG @@ -47,6 +57,13 @@ export interface AbstractQueueStatus { * - drain() waits for all pending/processing items to complete (for graceful shutdown) */ export class AbstractGenerationQueue { + /** + * When true, scheduleNext fires the next batch even if pending is below + * BATCH_SIZE_CAP. Set by drain(); reset once the queue is fully idle. + * Without this, items below the cap would be buffered indefinitely with + * no flush trigger when a curate writes fewer files than the cap. + */ + private drainRequested = false private drainResolvers: Array<() => void> = [] private failed = 0 private generator: IContentGenerator | undefined @@ -71,7 +88,13 @@ export class AbstractGenerationQueue { */ async drain(): Promise { queueLog(`drain:start idle=${this.isIdle()} pending=${this.pending.length} retrying=${this.retrying} processing=${this.processing}`) + // Force any buffered (below-cap) pending items to fire as a final batch. + // scheduleNext respects drainRequested even when pending < BATCH_SIZE_CAP. + this.drainRequested = true + this.scheduleNext() + if (this.isIdle()) { + this.drainRequested = false await this.statusWritePromise.catch(() => {}) queueLog('drain:resolved-immediate') return @@ -88,6 +111,18 @@ export class AbstractGenerationQueue { * Add a file to the abstract generation queue. */ enqueue(item: {contextPath: string; fullContent: string}): void { + // Background batch writes derive .abstract.md / .overview.md from + // contextPath via raw `writeFile`. A relative path would resolve under + // process.cwd() rather than the intended context-tree location, and the + // failure would be invisible because batch errors are catch-suppressed. + // Drop misconfigured items at the entry point with a trace breadcrumb + // rather than failing loudly — callers are internal and treat the queue + // as fail-open. + if (!isAbsolute(item.contextPath)) { + queueLog(`enqueue:dropped non-absolute path=${item.contextPath}`) + return + } + // Guard against paths that must never trigger abstract generation: // - derived artifacts (.abstract.md, .overview.md) — would produce .abstract.abstract.md // - summary index files (_index.md) — domain/topic summaries, not knowledge nodes @@ -105,7 +140,13 @@ export class AbstractGenerationQueue { this.pending.push({attempts: 0, contextPath: item.contextPath, fullContent: item.fullContent}) queueLog(`enqueue path=${item.contextPath} pending=${this.pending.length} retrying=${this.retrying} processing=${this.processing}`) this.queueStatusWrite() - this.scheduleNext() + // Buffer until cap is reached; drain() will trigger the final flush + // for partial batches at curate-end. Without this gating, the first + // enqueue starts a 1-item batch before the curate finishes writing + // the rest of its files. + if (this.pending.length >= BATCH_SIZE_CAP || this.drainRequested) { + this.scheduleNext() + } } /** @@ -143,7 +184,11 @@ export class AbstractGenerationQueue { } private async processNext(): Promise { - if (!this.generator || this.processing || this.pending.length === 0) { + // Capture the generator in a local const so type narrowing survives the + // `await` boundary below — TS won't keep `this.generator` narrow across + // suspensions because another async path could reassign the property. + const {generator} = this + if (!generator || this.processing || this.pending.length === 0) { this.resolveDrainersIfIdle() return } @@ -151,8 +196,15 @@ export class AbstractGenerationQueue { this.processing = true this.queueStatusWrite() - const item = this.pending.shift()! - queueLog(`process:start path=${item.contextPath} remaining=${this.pending.length} retrying=${this.retrying}`) + // Drain up to BATCH_SIZE_CAP items into a single batch. Items beyond the + // cap stay pending for the next cycle. Note: `maxAttempts` counts BATCH + // attempts for this item, not individual-call attempts — a transient + // failure on attempt 1 consumes one retry token for every item in the + // batch, including ones whose content was unrelated to the failure. + // Acceptable: batches are small (cap=5) and the per-item re-enqueue on + // batch failure preserves attempts independently across cycles. + const batch = this.pending.splice(0, BATCH_SIZE_CAP) + queueLog(`process:start batchSize=${batch.length} remaining=${this.pending.length} retrying=${this.retrying}`) try { // Refresh credentials before each generation (OAuth tokens may expire) @@ -163,28 +215,45 @@ export class AbstractGenerationQueue { console.debug(`[AbstractQueue] token refresh failed, proceeding with existing generator: ${msg}`) } - const {abstractContent, overviewContent} = await generateFileAbstracts( - item.fullContent, - this.generator, + const results = await generateFileAbstractsBatch( + batch.map((it) => ({contextPath: it.contextPath, fullContent: it.fullContent})), + generator, ) - // Derive sibling paths: replace .md with .abstract.md and .overview.md - const abstractPath = item.contextPath.replace(/\.md$/, '.abstract.md') - const overviewPath = item.contextPath.replace(/\.md$/, '.overview.md') - - await Promise.all([ - writeFile(abstractPath, abstractContent, 'utf8'), - writeFile(overviewPath, overviewContent, 'utf8'), - ]) - - this.processed++ - queueLog(`process:success path=${item.contextPath} processed=${this.processed}`) + // Write all batched outputs in parallel. Empty strings are valid (model + // produced no content for that path) — preserves existing fail-open. + await Promise.all(results.flatMap((r) => { + const abstractPath = r.contextPath.replace(/\.md$/, '.abstract.md') + const overviewPath = r.contextPath.replace(/\.md$/, '.overview.md') + return [ + writeFile(abstractPath, r.abstractContent, 'utf8'), + writeFile(overviewPath, r.overviewContent, 'utf8'), + ] + })) + + this.processed += batch.length + queueLog(`process:success batchSize=${batch.length} processed=${this.processed}`) } catch (error) { + // Batch-level failure → re-enqueue each item individually with its own + // attempts counter, mirroring per-item retry semantics. Items past + // maxAttempts count as failed. const msg = error instanceof Error ? error.message : String(error) - console.debug(`[AbstractQueue] ${item.contextPath} attempt ${item.attempts + 1}/${this.maxAttempts}: ${msg}`) - item.attempts++ - if (item.attempts < this.maxAttempts) { - // Exponential backoff: 500ms, 1000ms, 2000ms, ... + const failedThisCycle: QueueItem[] = [] + const retryThisCycle: QueueItem[] = [] + for (const item of batch) { + item.attempts++ + if (item.attempts < this.maxAttempts) { + retryThisCycle.push(item) + } else { + this.failed++ + failedThisCycle.push(item) + queueLog(`process:failed path=${item.contextPath} failed=${this.failed}`) + } + } + + console.debug(`[AbstractQueue] batch attempt failed (${msg}); retrying=${retryThisCycle.length}, exhausted=${failedThisCycle.length}`) + + for (const item of retryThisCycle) { const delay = 500 * 2 ** (item.attempts - 1) this.retrying++ this.queueStatusWrite() @@ -195,13 +264,10 @@ export class AbstractGenerationQueue { this.queueStatusWrite() this.scheduleNext() }, delay) - } else { - this.failed++ - queueLog(`process:failed path=${item.contextPath} failed=${this.failed}`) } } finally { this.processing = false - queueLog(`process:finally path=${item.contextPath} pending=${this.pending.length} retrying=${this.retrying} processed=${this.processed} failed=${this.failed}`) + queueLog(`process:finally batchSize=${batch.length} pending=${this.pending.length} retrying=${this.retrying} processed=${this.processed} failed=${this.failed}`) this.queueStatusWrite() } @@ -220,6 +286,10 @@ export class AbstractGenerationQueue { return } + // Reset drain state once the queue settles — next curate's enqueue burst + // should buffer normally up to BATCH_SIZE_CAP again. + this.drainRequested = false + queueLog(`drain:idle pending=${this.pending.length} retrying=${this.retrying} processed=${this.processed} failed=${this.failed}`) const resolvers = this.drainResolvers.splice(0) const settledStatusWrite = this.statusWritePromise.catch(() => {}) @@ -229,11 +299,22 @@ export class AbstractGenerationQueue { } private scheduleNext(): void { - if (!this.generator || this.processing || this.pending.length === 0) { + if (!this.generator || this.processing) { + return + } + + if (this.pending.length === 0) { this.resolveDrainersIfIdle() return } + // Buffer items below the cap unless drain has been requested (curate-end + // signal). This keeps the queue from firing partial 1-item batches in the + // middle of a multi-file curate. + if (this.pending.length < BATCH_SIZE_CAP && !this.drainRequested) { + return + } + // eslint-disable-next-line no-void setImmediate(() => { void this.processNext() }) } diff --git a/src/agent/infra/system-prompt/contributors/file-contributor.ts b/src/agent/infra/system-prompt/contributors/file-contributor.ts index fc5fd8970..e5fd8c83e 100644 --- a/src/agent/infra/system-prompt/contributors/file-contributor.ts +++ b/src/agent/infra/system-prompt/contributors/file-contributor.ts @@ -150,12 +150,16 @@ export class FileContributor implements SystemPromptContributor { private renderTemplateVariables(template: string, context: ContributorContext): string { let result = template - // Build variables from context + // Build variables from context. + // Note: a `datetime` template variable is intentionally NOT exposed here. + // Per-call timestamps must never enter the system prompt — they would + // poison the prefix cache from that byte onward. The current date/time + // is injected once into the iter-0 user message instead (see + // agent-llm-service.ts). /* eslint-disable camelcase */ const variables: Record = { available_markers: context.availableMarkers ? Object.keys(context.availableMarkers).join(', ') : '', available_tools: context.availableTools?.join(', ') ?? '', - datetime: `Current date and time: ${new Date().toISOString()}`, } /* eslint-enable camelcase */ diff --git a/src/agent/infra/tools/tool-manager.ts b/src/agent/infra/tools/tool-manager.ts index 3ec59230c..63c2523ec 100644 --- a/src/agent/infra/tools/tool-manager.ts +++ b/src/agent/infra/tools/tool-manager.ts @@ -35,6 +35,12 @@ export class ToolManager { /** * Tools allowed for curate operations. * Uses code_exec only - curate operations available via tools.curate() in sandbox. + * + * NOTE: Insertion order is load-bearing for Anthropic prompt caching. + * `toAiSdkTools` attaches `cacheControl: ephemeral` to the LAST tool in + * iteration order, which becomes the cache breakpoint. Reordering this + * list (or the per-call sort in `filterToolsForCommand`) silently shifts + * the breakpoint and can halve cache hit-rate. Append new tools at the end. */ private static readonly CURATE_TOOL_NAMES = [ 'agentic_map', @@ -45,7 +51,10 @@ export class ToolManager { /** * Tools allowed for query operations - only code_exec for programmatic search * All search operations (searchKnowledge, glob, grep, readFile) are available - * via tools.* SDK inside the sandbox + * via tools.* SDK inside the sandbox. + * + * Same insertion-order contract as CURATE_TOOL_NAMES (Anthropic cache + * breakpoint lands on the last tool). */ private static readonly QUERY_TOOL_NAMES = [ 'code_exec', diff --git a/src/server/infra/dream/dream-state-schema.ts b/src/server/infra/dream/dream-state-schema.ts index 1ab7a9fd9..48525f725 100644 --- a/src/server/infra/dream/dream-state-schema.ts +++ b/src/server/infra/dream/dream-state-schema.ts @@ -7,23 +7,40 @@ export const PendingMergeSchema = z.object({ suggestedByDreamId: z.string(), }) +/** + * One entry in the stale-summary queue drained at the next dream cycle. + * `enqueuedAt` is preserved across dedup'd re-enqueues so future telemetry + * (e.g., "oldest waiting path") can read meaningful wait times even though + * no consumer reads it today. + */ +export const StaleSummaryEntrySchema = z.object({ + enqueuedAt: z.number().int().nonnegative(), + // Empty paths indicate a bug at the call site (a malformed diff entry would + // resolve to an empty parent dir); reject them at the schema boundary so + // garbage cannot persist into dream-state.json. + path: z.string().min(1), +}) + export const DreamStateSchema = z.object({ curationsSinceDream: z.number().int().min(0), lastDreamAt: z.string().datetime().nullable(), lastDreamLogId: z.string().nullable(), pendingMerges: z.array(PendingMergeSchema).optional().default([]), + staleSummaryPaths: z.array(StaleSummaryEntrySchema).optional().default([]), totalDreams: z.number().int().min(0), version: z.literal(1), }) export type DreamState = z.infer export type PendingMerge = z.infer +export type StaleSummaryEntry = z.infer export const EMPTY_DREAM_STATE: DreamState = { curationsSinceDream: 0, lastDreamAt: null, lastDreamLogId: null, pendingMerges: [], + staleSummaryPaths: [], totalDreams: 0, version: 1, } diff --git a/src/server/infra/dream/dream-state-service.ts b/src/server/infra/dream/dream-state-service.ts index 44c2b8e53..d5ac8fe79 100644 --- a/src/server/infra/dream/dream-state-service.ts +++ b/src/server/infra/dream/dream-state-service.ts @@ -49,6 +49,56 @@ export class DreamStateService { this.stateFilePath = join(opts.baseDir, STATE_FILENAME) } + /** + * Atomic drain — reads the current queue and clears it in a single RMW, + * returning the deduped path list. The caller is responsible for retrying + * (re-enqueueing the returned snapshot) if the downstream work fails. + * + * Atomicity is the load-bearing property: any enqueue that runs after the + * drain returns sees an empty queue, so it always appends a fresh entry + * that survives independently of whether the downstream propagation succeeds + * or fails. Earlier "snapshot + clear-later" approaches lost same-path + * enqueues: the dedup check on enqueue saw the still-present snapshot entry + * and skipped, then `clear()` removed it. + */ + async drainStaleSummaryPaths(): Promise { + let snapshot: string[] = [] + await this.update((state) => { + snapshot = state.staleSummaryPaths.map((e) => e.path) + if (snapshot.length === 0) return state + return {...state, staleSummaryPaths: []} + }) + return snapshot + } + + /** + * Append the given file paths to the stale-summary queue, deduping by path. + * A path already in the queue keeps its original `enqueuedAt` timestamp so + * "how long has this been waiting?" telemetry stays meaningful. + * + * Serialized through {@link update} so concurrent enqueues from parallel + * curate tasks do not lose entries. Empty input is a no-op (no write). + */ + async enqueueStaleSummaryPaths(paths: string[]): Promise { + if (paths.length === 0) return + // Dedup the input itself before checking against the queue — callers may + // pass non-unique arrays (e.g. multiple changed paths within a single + // curate that round-trip through the same parent dir). + const incoming = [...new Set(paths)] + const enqueuedAt = Date.now() + await this.update((state) => { + const existing = new Set(state.staleSummaryPaths.map((e) => e.path)) + const additions = incoming + .filter((p) => !existing.has(p)) + .map((p) => ({enqueuedAt, path: p})) + if (additions.length === 0) return state + return { + ...state, + staleSummaryPaths: [...state.staleSummaryPaths, ...additions], + } + }) + } + /** * Read-modify-write under a per-file mutex. Serializes concurrent increments * from parallel curate tasks within the same agent process so no updates are lost. @@ -61,10 +111,10 @@ export class DreamStateService { try { const raw = await readFile(this.stateFilePath, 'utf8') const parsed = DreamStateSchema.safeParse(JSON.parse(raw)) - if (!parsed.success) return {...EMPTY_DREAM_STATE, pendingMerges: []} + if (!parsed.success) return {...EMPTY_DREAM_STATE} return parsed.data } catch { - return {...EMPTY_DREAM_STATE, pendingMerges: []} + return {...EMPTY_DREAM_STATE} } } @@ -80,7 +130,16 @@ export class DreamStateService { return mutex.withLock(async () => { const state = await this.read() const next = updater(state) - await this.write(next) + // Skip the write when the updater returned the same state reference. + // Existing call sites (drainStaleSummaryPaths on empty queue, + // enqueueStaleSummaryPaths with all-duplicate input) already follow + // this convention by returning `state` unchanged — making the no-op + // contract observable at the disk level avoids a tmpfile + rename on + // every empty drain. + if (next !== state) { + await this.write(next) + } + return next }) } diff --git a/src/server/infra/dream/dream-trigger.ts b/src/server/infra/dream/dream-trigger.ts index df20661a0..06722510d 100644 --- a/src/server/infra/dream/dream-trigger.ts +++ b/src/server/infra/dream/dream-trigger.ts @@ -79,8 +79,12 @@ export class DreamTrigger { } } - // Gate 2: Activity - if (state.curationsSinceDream < minCurations) { + // Gate 2: Activity. Bypassed when the stale-summary queue has deferred + // work — leaving entries indefinitely strands `_index.md` regeneration + // in low-activity projects (the very projects ENG-2485 most affects, + // since 1–2 curates over a 12h window otherwise sit under minCurations + // forever). Dream is the canonical drain point; if it has work, run. + if (state.curationsSinceDream < minCurations && state.staleSummaryPaths.length === 0) { return { eligible: false, reason: `Not enough activity (${state.curationsSinceDream} < ${minCurations} curations)`, diff --git a/src/server/infra/executor/curate-executor.ts b/src/server/infra/executor/curate-executor.ts index 9555b9adc..dcba97530 100644 --- a/src/server/infra/executor/curate-executor.ts +++ b/src/server/infra/executor/curate-executor.ts @@ -4,6 +4,7 @@ import type {ICipherAgent} from '../../../agent/core/interfaces/i-cipher-agent.j import type {CurationStatus} from '../../core/domain/entities/curation-status.js' import type {CurateExecuteOptions, ICurateExecutor} from '../../core/interfaces/executor/i-curate-executor.js' +import {recon as reconHelper} from '../../../agent/infra/sandbox/curation-helpers.js' import {BRV_DIR} from '../../constants.js' import {FileValidationError} from '../../core/domain/errors/task-error.js' import { @@ -12,8 +13,9 @@ import { type FileReadResult, } from '../../utils/file-content-reader.js' import {validateFileForCurate} from '../../utils/file-validator.js' +import {FileContextTreeManifestService} from '../context-tree/file-context-tree-manifest-service.js' import {FileContextTreeSnapshotService} from '../context-tree/file-context-tree-snapshot-service.js' -import {propagateSummariesUnderLock} from '../context-tree/propagate-summaries.js' +import {diffStates} from '../context-tree/snapshot-diff.js' import {DreamStateService} from '../dream/dream-state-service.js' import {PreCompactionService} from './pre-compaction/pre-compaction-service.js' @@ -118,12 +120,31 @@ export class CurateExecutor implements ICurateExecutor { type: 'string', } - // Inject context, metadata, empty history, and taskId into the TASK session's sandbox + // Pre-pipeline the recon step (deterministic helper) so the agent loop + // doesn't spend its first iteration calling tools.curation.recon. The + // result is injected as a sandbox variable for code-exec access AND + // its key findings are surfaced inline in the prompt so the agent's + // first iteration can proceed directly to extraction. recon is pure + // JS — no LLM judgment is needed for whether to call it; the answer + // is always "yes, first thing." Surfacing it as an agent-tool meant + // paying a full LLM iteration just to invoke a deterministic helper. + const initialHistory = {entries: [], totalProcessed: 0} + // The `metadata` arg is currently unused by `recon` — the helper + // recomputes char/line/message counts from `effectiveContext` + // directly. Passed through here to match the helper's existing + // signature; do NOT assume changing `metadata` will alter + // `reconResult`. + const reconResult = reconHelper(effectiveContext, metadata, initialHistory) + const reconVar = `__recon_result_${taskIdSafe}` + + // Inject context, metadata, empty history, taskId, and pre-computed + // recon result into the TASK session's sandbox. const taskIdVar = `__taskId_${taskIdSafe}` agent.setSandboxVariableOnSession(taskSessionId, ctxVar, effectiveContext) - agent.setSandboxVariableOnSession(taskSessionId, histVar, {entries: [], totalProcessed: 0}) + agent.setSandboxVariableOnSession(taskSessionId, histVar, initialHistory) agent.setSandboxVariableOnSession(taskSessionId, metaVar, metadata) agent.setSandboxVariableOnSession(taskSessionId, taskIdVar, taskId) + agent.setSandboxVariableOnSession(taskSessionId, reconVar, reconResult) // Prompt with curation helpers guidance (tools.curation.* replaces manual infrastructure code) const prompt = [ @@ -132,7 +153,8 @@ export class CurateExecutor implements ICurateExecutor { `History variable: ${histVar}`, `Metadata variable: ${metaVar}`, `Task ID variable: ${taskIdVar} (pass as bare variable, not a string)`, - `IMPORTANT: Do NOT print raw context. Start with tools.curation.recon(${ctxVar}, ${metaVar}, ${histVar}) to assess.`, + `Recon already computed in ${reconVar}: suggestedMode=${reconResult.suggestedMode}, suggestedChunkCount=${reconResult.suggestedChunkCount}, charCount=${reconResult.meta.charCount}, lineCount=${reconResult.meta.lineCount}, messageCount=${reconResult.meta.messageCount}.`, + `IMPORTANT: Do NOT print raw context. Do NOT call tools.curation.recon — it has been pre-computed. Proceed directly to extraction.`, `For chunked extraction use tools.curation.mapExtract(). Pass taskId: ${taskIdVar} (bare variable).`, `IMPORTANT: Any code_exec call containing mapExtract MUST use timeout: 300000 on the code_exec tool call itself (not inside mapExtract options).`, `Use tools.curation.groupBySubject() and tools.curation.dedup() to organize extractions.`, @@ -156,7 +178,7 @@ export class CurateExecutor implements ICurateExecutor { const finalize = async (): Promise => { try { - await propagateSummariesUnderLock({agent, baseDir, preState, snapshotService, taskId}) + await this.propagateAndRebuild({baseDir, preState, snapshotService}) await this.incrementDreamCounter(baseDir) await (agent as BackgroundDrainAgent).drainBackgroundWork?.() } finally { @@ -344,4 +366,58 @@ export class CurateExecutor implements ICurateExecutor { // Format with actual content return this.formatFileContentsForPrompt(readResults, skippedFiles, projectRoot) } + + /** + * Phase 4: snapshot diff → enqueue stale paths for dream → rebuild manifest. + * + * Summary cascade regeneration (the LLM-driven `propagateStaleness` walk) is + * deferred to the next dream cycle to keep curate's hot path free of LLM + * calls. The manifest is rebuilt inline because it is a pure file scan (no + * LLM) and keeps newly-curated leaf files immediately discoverable via + * manifest-driven retrieval. + * + * Two independent fail-open concerns: (a) enqueue the deferred summary-cascade + * work to dream's queue; (b) rebuild the search manifest. They share + * `changedPaths` but otherwise are unrelated — a transient disk error on the + * dream-state write must not skip the pure-filesystem manifest scan. Each + * runs in its own try block so one failure cannot mask the other's work. + */ + private async propagateAndRebuild(args: { + baseDir: string + preState: Map | undefined + snapshotService: FileContextTreeSnapshotService + }): Promise { + const {baseDir, preState, snapshotService} = args + if (!preState) return + + let changedPaths: string[] = [] + try { + const postState = await snapshotService.getCurrentState(baseDir) + changedPaths = diffStates(preState, postState) + } catch { + // Fail-open: snapshot errors leave changedPaths empty → no enqueue, + // no manifest rebuild. Next curate's snapshot will pick up the diff. + } + + if (changedPaths.length === 0) return + + try { + const dreamStateService = new DreamStateService({baseDir: path.join(baseDir, BRV_DIR)}) + await dreamStateService.enqueueStaleSummaryPaths(changedPaths) + } catch { + // Fail-open: queue write errors never block curation. If this write + // fails the changed paths are lost from the deferred queue; they will + // only be re-captured if the same files are modified in a later curate + // (diffStates compares a fresh pre/post snapshot pair, not a persistent + // accumulator) or picked up by dream's own snapshot diff if dream + // touches them. + } + + try { + const manifestService = new FileContextTreeManifestService({baseDirectory: baseDir}) + await manifestService.buildManifest(baseDir) + } catch { + // Fail-open: manifest rebuild is best-effort pre-warming. + } + } } diff --git a/src/server/infra/executor/dream-executor.ts b/src/server/infra/executor/dream-executor.ts index d115fd877..f516c9241 100644 --- a/src/server/infra/executor/dream-executor.ts +++ b/src/server/infra/executor/dream-executor.ts @@ -55,6 +55,8 @@ export type DreamExecutorDeps = { save(entry: DreamLogEntry): Promise } dreamStateService: { + drainStaleSummaryPaths(): Promise + enqueueStaleSummaryPaths(paths: string[]): Promise read(): Promise update(updater: (state: import('../dream/dream-state-schema.js').DreamState) => import('../dream/dream-state-schema.js').DreamState): Promise write(state: import('../dream/dream-state-schema.js').DreamState): Promise @@ -142,7 +144,12 @@ export class DreamExecutor { try { preState = await snapshotService.getCurrentState(projectRoot) } catch { - // Fail-open: if snapshot fails, skip propagation + // Fail-open: leaving preState undefined skips the entire step 5 block + // (queue drain + propagation), so the stale-summary queue is left + // intact for the next successful dream cycle. Skipping drain here is + // safer than drain-then-fail: the atomic-drain design clears entries + // synchronously inside the RMW, so if we drained and then threw + // before reaching the catch's re-enqueue, the snapshot would be lost. } // Step 2: Load dream state @@ -166,18 +173,38 @@ export class DreamExecutor { }) // Step 5: Post-dream propagation (fail-open) + // Two sources of stale summary paths: + // A. The stale-summary queue, drained from dream state — paths from + // curate operations that ran since the last dream cycle (the LLM + // cascade work was deferred from curate's hot path to here). + // B. Dream's own snapshot diff — paths changed by this dream's + // consolidate/synthesize/prune operations. + // Merging A ∪ B before calling propagateStaleness lets a path touched + // by both sources regenerate exactly once. The queue is drained + // atomically (cleared in the same RMW that captures the snapshot) so + // any concurrent curate enqueueing during propagation appends a fresh + // entry to the now-empty queue and the next dream picks it up. if (preState) { + let drainedSnapshot: string[] = [] try { + drainedSnapshot = await this.deps.dreamStateService.drainStaleSummaryPaths() + const postState = await snapshotService.getCurrentState(projectRoot) const changedPaths = diffStates(preState, postState) - if (changedPaths.length > 0) { - const summaryService = new FileContextTreeSummaryService() - await summaryService.propagateStaleness(changedPaths, agent, projectRoot, options.taskId) - const manifestService = new FileContextTreeManifestService({baseDirectory: projectRoot}) - await manifestService.buildManifest(projectRoot) + + const merged = [...new Set([...changedPaths, ...drainedSnapshot])] + if (merged.length > 0) { + await this.runStaleSummaryPropagation({agent, parentTaskId: options.taskId, paths: merged, projectRoot}) } } catch { - // Fail-open: propagation errors never block dream + // Fail-open: propagation errors never block dream. Re-enqueue the + // drained snapshot so the next dream cycle retries — atomic drain + // already removed them, so without this they would be lost. + if (drainedSnapshot.length > 0) { + await this.deps.dreamStateService.enqueueStaleSummaryPaths(drainedSnapshot).catch(() => { + // If the re-enqueue itself fails, there is nothing more to do here. + }) + } } } @@ -336,6 +363,26 @@ export class DreamExecutor { ) } + /** + * Regenerate parent `_index.md` files for the given paths and rebuild the + * manifest. Extracted as a seam so tests can override and assert which + * paths were passed (the A ∪ B merge in step 5 is the central correctness + * invariant of the deferral). Production constructs the services here so + * the dependency surface of {@link DreamExecutorDeps} stays narrow. + */ + protected async runStaleSummaryPropagation(args: { + agent: ICipherAgent + parentTaskId?: string + paths: string[] + projectRoot: string + }): Promise { + const summaryService = new FileContextTreeSummaryService() + await summaryService.propagateStaleness(args.paths, args.agent, args.projectRoot, args.parentTaskId) + const manifestService = new FileContextTreeManifestService({baseDirectory: args.projectRoot}) + await manifestService.buildManifest(args.projectRoot) + } + + /** Errors are tracked at the log level (status='error'), not per-operation — always 0 here. */ private computeSummary(operations: DreamOperation[]): DreamLogSummary { const summary: DreamLogSummary = {consolidated: 0, errors: 0, flaggedForReview: 0, pruned: 0, synthesized: 0} for (const op of operations) { diff --git a/test/unit/agent/llm/agent-llm-service.test.ts b/test/unit/agent/llm/agent-llm-service.test.ts new file mode 100644 index 000000000..ed3b21ae0 --- /dev/null +++ b/test/unit/agent/llm/agent-llm-service.test.ts @@ -0,0 +1,32 @@ +import {expect} from 'chai' + +import {buildDateTimePrefix} from '../../../../src/agent/infra/llm/agent-llm-service.js' + +describe('buildDateTimePrefix', () => { + it('renders the supplied date as an ISO-8601 dateTime block followed by a blank line', () => { + const fixed = new Date('2026-05-01T10:30:00.000Z') + const result = buildDateTimePrefix(fixed) + + expect(result).to.equal('Current date and time: 2026-05-01T10:30:00.000Z\n\n') + }) + + it('uses the current time when no date is supplied', () => { + const before = Date.now() + const result = buildDateTimePrefix() + const after = Date.now() + + const match = /Current date and time: (\S+)<\/dateTime>\n\n$/.exec(result) + expect(match).to.not.equal(null) + + const rendered = match === null ? 0 : Date.parse(match[1]) + expect(rendered).to.be.at.least(before) + expect(rendered).to.be.at.most(after) + }) + + it('terminates with a double-newline so the prefix can be concatenated directly to a text body', () => { + const result = buildDateTimePrefix(new Date('2026-01-01T00:00:00.000Z')) + const composed = `${result}body` + + expect(composed).to.equal('Current date and time: 2026-01-01T00:00:00.000Z\n\nbody') + }) +}) diff --git a/test/unit/agent/llm/generators/ai-sdk-content-generator.test.ts b/test/unit/agent/llm/generators/ai-sdk-content-generator.test.ts new file mode 100644 index 000000000..d5244bb0a --- /dev/null +++ b/test/unit/agent/llm/generators/ai-sdk-content-generator.test.ts @@ -0,0 +1,41 @@ +import type {ModelMessage} from 'ai' + +import {expect} from 'chai' + +import {prependCachedSystemMessage} from '../../../../../src/agent/infra/llm/generators/ai-sdk-content-generator.js' + +describe('prependCachedSystemMessage', () => { + const userMsg: ModelMessage = {content: 'hi', role: 'user'} + + it('returns the input messages unchanged when systemPrompt is undefined', () => { + const result = prependCachedSystemMessage(undefined, [userMsg]) + expect(result).to.deep.equal([userMsg]) + }) + + it('returns the input messages unchanged when systemPrompt is the empty string', () => { + const result = prependCachedSystemMessage('', [userMsg]) + expect(result).to.deep.equal([userMsg]) + }) + + it('prepends a system-role message with cacheControl providerOptions when systemPrompt is non-empty', () => { + const result = prependCachedSystemMessage('You are a helpful assistant.', [userMsg]) + + expect(result).to.have.length(2) + + const [system, user] = result + expect(system.role).to.equal('system') + expect(system.content).to.equal('You are a helpful assistant.') + expect(system.providerOptions).to.deep.equal({ + anthropic: {cacheControl: {type: 'ephemeral'}}, + }) + expect(user).to.deep.equal(userMsg) + }) + + it('does not mutate the original messages array', () => { + const original: ModelMessage[] = [userMsg] + const result = prependCachedSystemMessage('sys', original) + + expect(original).to.have.length(1) + expect(result).to.not.equal(original) + }) +}) diff --git a/test/unit/agent/llm/generators/ai-sdk-message-converter.test.ts b/test/unit/agent/llm/generators/ai-sdk-message-converter.test.ts new file mode 100644 index 000000000..23c49f5b8 --- /dev/null +++ b/test/unit/agent/llm/generators/ai-sdk-message-converter.test.ts @@ -0,0 +1,55 @@ +import {expect} from 'chai' + +import type {ToolSet as InternalToolSet} from '../../../../../src/agent/core/domain/tools/types.js' + +import {toAiSdkTools} from '../../../../../src/agent/infra/llm/generators/ai-sdk-message-converter.js' + +function makeTool(description: string): InternalToolSet[string] { + return { + description, + parameters: {properties: {}, type: 'object'}, + } +} + +function getProviderOptions(tool: unknown): Record | undefined { + if (!tool || typeof tool !== 'object') return undefined + return (tool as {providerOptions?: Record}).providerOptions +} + +const EPHEMERAL_CACHE_CONTROL = {anthropic: {cacheControl: {type: 'ephemeral'}}} + +describe('toAiSdkTools — anthropic cache_control on last tool', () => { + it('returns undefined when tools is undefined or empty', () => { + expect(toAiSdkTools()).to.equal(undefined) + expect(toAiSdkTools({})).to.equal(undefined) + }) + + it('attaches cache_control to the single tool when only one is registered', () => { + const tools: InternalToolSet = {onlyTool: makeTool('the only one')} + const result = toAiSdkTools(tools) + expect(result).to.exist + expect(getProviderOptions(result?.onlyTool)).to.deep.equal(EPHEMERAL_CACHE_CONTROL) + }) + + it('attaches cache_control to the LAST tool only when multiple are registered', () => { + const tools: InternalToolSet = { + firstTool: makeTool('first'), + lastTool: makeTool('last'), + middleTool: makeTool('middle'), + } + const result = toAiSdkTools(tools) + expect(result).to.exist + + // The cache_control marker is attached to the LAST entry by insertion + // order, NOT by name. In production, tool registration is deterministic + // (driven by getToolNamesForCommand), so the "last" entry is stable. + // In this test, the object literal is alphabetically sorted by the + // sort-objects lint rule, so iteration order is + // firstTool → lastTool → middleTool — and middleTool ends up last, + // which is what should carry cacheControl. This test pins the + // insertion-order contract, not an alphabetical or name-based one. + expect(getProviderOptions(result?.firstTool)).to.equal(undefined) + expect(getProviderOptions(result?.lastTool)).to.equal(undefined) + expect(getProviderOptions(result?.middleTool)).to.deep.equal(EPHEMERAL_CACHE_CONTROL) + }) +}) diff --git a/test/unit/agent/map/abstract-generator-batch.test.ts b/test/unit/agent/map/abstract-generator-batch.test.ts new file mode 100644 index 000000000..7bb14dd17 --- /dev/null +++ b/test/unit/agent/map/abstract-generator-batch.test.ts @@ -0,0 +1,278 @@ +import {expect} from 'chai' +import {createSandbox, type SinonSandbox} from 'sinon' + +import type {IContentGenerator} from '../../../../src/agent/core/interfaces/i-content-generator.js' + +import {generateFileAbstractsBatch} from '../../../../src/agent/infra/map/abstract-generator.js' + +/** + * Build a generator whose generateContentStream yields a fixed text response + * the n-th time it's called. Useful for stubbing the parallel L0/L1 batch + * calls with two distinct texts. + */ +function makeScriptedGenerator(sandbox: SinonSandbox, responsesByCall: string[]): IContentGenerator { + let callIndex = 0 + return { + estimateTokensSync: () => 10, + generateContent: sandbox.stub().rejects(new Error('n/a')), + generateContentStream: sandbox.stub().callsFake(async function *() { + const text = responsesByCall[callIndex++] ?? '' + yield {content: text, isComplete: false} + yield {isComplete: true} + }), + } as unknown as IContentGenerator +} + +describe('generateFileAbstractsBatch', () => { + const sandbox = createSandbox() + + afterEach(() => sandbox.restore()) + + it('returns one result per input file when the model responds with all paths', async () => { + const l0Response = [ + 'One-line summary of A.', + 'One-line summary of B.', + ].join('\n') + const l1Response = [ + '- bullet 1\n- bullet 2\n- bullet 3', + '- bullet 1\n- bullet 2', + ].join('\n') + + const generator = makeScriptedGenerator(sandbox, [l0Response, l1Response]) + const result = await generateFileAbstractsBatch( + [ + {contextPath: 'a.md', fullContent: 'content of A'}, + {contextPath: 'b.md', fullContent: 'content of B'}, + ], + generator, + ) + + expect(result).to.have.lengthOf(2) + expect(result[0].contextPath).to.equal('a.md') + expect(result[0].abstractContent).to.equal('One-line summary of A.') + expect(result[0].overviewContent).to.contain('bullet 1') + expect(result[1].contextPath).to.equal('b.md') + expect(result[1].abstractContent).to.equal('One-line summary of B.') + }) + + it('keeps input order when the model returns paths out of order', async () => { + const l0Response = [ + 'B summary.', + 'A summary.', + ].join('\n') + const l1Response = [ + 'B over.', + 'A over.', + ].join('\n') + + const generator = makeScriptedGenerator(sandbox, [l0Response, l1Response]) + const result = await generateFileAbstractsBatch( + [ + {contextPath: 'a.md', fullContent: 'A'}, + {contextPath: 'b.md', fullContent: 'B'}, + ], + generator, + ) + + expect(result.map((r: {contextPath: string}) => r.contextPath)).to.deep.equal(['a.md', 'b.md']) + expect(result[0].abstractContent).to.equal('A summary.') + expect(result[1].abstractContent).to.equal('B summary.') + }) + + it('returns empty strings for files the model omits', async () => { + const l0Response = 'Only A.' + const l1Response = 'Only A over.' + + const generator = makeScriptedGenerator(sandbox, [l0Response, l1Response]) + const result = await generateFileAbstractsBatch( + [ + {contextPath: 'a.md', fullContent: 'A'}, + {contextPath: 'b.md', fullContent: 'B'}, + ], + generator, + ) + + expect(result).to.have.lengthOf(2) + expect(result[0].abstractContent).to.equal('Only A.') + expect(result[1].abstractContent).to.equal('') + expect(result[1].overviewContent).to.equal('') + }) + + it('returns empty strings when the model output is malformed (no matching tags)', async () => { + const generator = makeScriptedGenerator(sandbox, ['random unparseable text', 'also unparseable']) + const result = await generateFileAbstractsBatch( + [ + {contextPath: 'a.md', fullContent: 'A'}, + ], + generator, + ) + + expect(result).to.have.lengthOf(1) + expect(result[0].abstractContent).to.equal('') + expect(result[0].overviewContent).to.equal('') + }) + + it('dedups duplicate contextPath inputs, keeping the last item content (most recent state)', async () => { + // Capture the prompt the model receives so we can assert it carries the + // LATEST content (v2), not the older content (v1) for a duplicated path. + let capturedAbstractPrompt = '' + const generator: IContentGenerator = { + estimateTokensSync: () => 10, + generateContent: sandbox.stub().rejects(new Error('n/a')), + generateContentStream: sandbox.stub().callsFake(async function *(req: { + contents?: Array<{content?: string}> + systemPrompt?: string + }) { + const userContent = req.contents?.[0]?.content ?? '' + const isAbstract = (req.systemPrompt ?? '').includes('one-line') + if (isAbstract) capturedAbstractPrompt = userContent + const innerTag = isAbstract ? 'abstract' : 'overview' + yield {content: `<${innerTag}>S`, isComplete: false} + yield {isComplete: true} + }), + } as unknown as IContentGenerator + + const result = await generateFileAbstractsBatch( + [ + {contextPath: 'auth/jwt.md', fullContent: 'v1: original draft'}, + {contextPath: 'auth/jwt.md', fullContent: 'v2: updated content'}, + ], + generator, + ) + + // Only one `` block should appear in the prompt + // (deduped at the generator boundary). Without the dedup, the model would + // see two blocks and may answer them in either order. + const pathOccurrences = (capturedAbstractPrompt.match(/ { + // Capture BOTH the L0 and L1 prompts so we assert the wrap is applied to + // each independent builder. Without separate captures, we'd only validate + // the last call's prompt — a future refactor that forgot wrapCdata in one + // builder would slip past. + const capturedPrompts: {abstract?: string; overview?: string} = {} + const generator: IContentGenerator = { + estimateTokensSync: () => 10, + generateContent: sandbox.stub().rejects(new Error('n/a')), + generateContentStream: sandbox.stub().callsFake(async function *(req: { + contents?: Array<{content?: string}> + systemPrompt?: string + }) { + const isAbstract = (req.systemPrompt ?? '').includes('one-line') + if (isAbstract) capturedPrompts.abstract = req.contents?.[0]?.content ?? '' + else capturedPrompts.overview = req.contents?.[0]?.content ?? '' + const innerTag = isAbstract ? 'abstract' : 'overview' + yield {content: `<${innerTag}>OK`, isComplete: false} + yield {isComplete: true} + }), + } as unknown as IContentGenerator + + // Content that would break the prompt envelope without CDATA: literal + // and markers, plus an XML-flavored payload. + const treacherousContent = 'A doc explaining the tag and : bar' + + const result = await generateFileAbstractsBatch( + [{contextPath: 'docs/xml.md', fullContent: treacherousContent}], + generator, + ) + + // Both prompts (L0 and L1) must independently wrap content in CDATA. + for (const [label, prompt] of [['abstract', capturedPrompts.abstract], ['overview', capturedPrompts.overview]] as const) { + expect(prompt, `${label} prompt was captured`).to.be.a('string') + const promptText = prompt ?? '' + expect(promptText, `${label} prompt opens CDATA`).to.include('') + // Exactly one structural opener per file (the body's literal + // is now inert inside CDATA). + const docOpen = (promptText.match(//g) ?? []).length + expect(docOpen, `${label} prompt has exactly one envelope`).to.equal(1) + } + + // Result still parses cleanly. + expect(result[0].abstractContent).to.equal('OK') + }) + + it('parser is robust to a literal appearing inside the model overview prose', async () => { + // The model output is plain text (NOT CDATA-wrapped). A document about + // build systems / XML / JSX may legitimately produce overview prose like + // "the closing tag in Ant build files…" — a closer-anchored + // parser would stop at the premature and orphan the inner tag. + const l0Response = 'Ant build files use a closing tag.' + const l1Response = ` +- Mentions the closing tag in build prose +- Still must round-trip cleanly +` + + const generator = makeScriptedGenerator(sandbox, [l0Response, l1Response]) + const result = await generateFileAbstractsBatch( + [{contextPath: 'ant.md', fullContent: 'Ant build files'}], + generator, + ) + + // Both fields are populated despite the literal in the body — + // the parser anchors on `` openers, not `` closers. + expect(result).to.have.lengthOf(1) + expect(result[0].abstractContent).to.include('Ant build files') + expect(result[0].abstractContent).to.include('') + expect(result[0].overviewContent).to.include('Mentions the ') + expect(result[0].overviewContent).to.include('Still must round-trip cleanly') + }) + + it('escapes nested CDATA terminators in content so the wrap stays valid', async () => { + let capturedPrompt = '' + const generator: IContentGenerator = { + estimateTokensSync: () => 10, + generateContent: sandbox.stub().rejects(new Error('n/a')), + generateContentStream: sandbox.stub().callsFake(async function *(req: { + contents?: Array<{content?: string}> + }) { + capturedPrompt = req.contents?.[0]?.content ?? '' + yield {content: 'OK', isComplete: false} + yield {isComplete: true} + }), + } as unknown as IContentGenerator + + // Content that contains a literal `]]>` sequence — would terminate CDATA + // prematurely without the in-CDATA escape trick. + await generateFileAbstractsBatch( + [{contextPath: 'x.md', fullContent: 'before ]]> after'}], + generator, + ) + + // The bare `]]>` must NOT appear inside the still-active CDATA section — + // it should be split via `]]]]>`. + expect(capturedPrompt).to.include(']]]]>') + }) + + it('issues exactly two LLM calls regardless of batch size (one L0 batch, one L1 batch)', async () => { + const l0Response = Array.from({length: 5}, (_, i) => + `S${i}.`, + ).join('\n') + const l1Response = Array.from({length: 5}, (_, i) => + `O${i}.`, + ).join('\n') + + const generator = makeScriptedGenerator(sandbox, [l0Response, l1Response]) + await generateFileAbstractsBatch( + Array.from({length: 5}, (_, i) => ({contextPath: `${i}.md`, fullContent: `c${i}`})), + generator, + ) + + const stubbed = generator.generateContentStream as ReturnType + expect(stubbed.callCount).to.equal(2) + }) +}) diff --git a/test/unit/agent/map/abstract-queue.test.ts b/test/unit/agent/map/abstract-queue.test.ts index 9ea3a4b9d..cb65671ba 100644 --- a/test/unit/agent/map/abstract-queue.test.ts +++ b/test/unit/agent/map/abstract-queue.test.ts @@ -20,12 +20,24 @@ function makeFailingGenerator(sandbox: SinonSandbox): IContentGenerator { } as unknown as IContentGenerator } +/** + * Stream stub that responds in the XML format expected by H3's batched generator. + * Sniffs the request's user content for `generated text` per detected + * path. The L0 vs L1 branch is detected from the system prompt. + */ function makeSuccessfulGenerator(sandbox: SinonSandbox): IContentGenerator { return { estimateTokensSync: () => 10, generateContent: sandbox.stub().rejects(new Error('n/a')), - generateContentStream: sandbox.stub().callsFake(async function *() { - yield {content: 'generated text', isComplete: false} + generateContentStream: sandbox.stub().callsFake(async function *(request: {contents?: Array<{content?: string}>; systemPrompt?: string}) { + const userContent = request.contents?.[0]?.content ?? '' + const isAbstract = (request.systemPrompt ?? '').includes('one-line') + const innerTag = isAbstract ? 'abstract' : 'overview' + const pathMatches = [...userContent.matchAll(/ 0 ? pathMatches.map((m) => m[1]) : ['unknown'] + const xml = paths.map((p) => `<${innerTag}>generated text`).join('\n') + yield {content: xml, isComplete: false} yield {isComplete: true} }), } as unknown as IContentGenerator @@ -99,24 +111,28 @@ describe('AbstractGenerationQueue', () => { const q = new AbstractGenerationQueue(tmpDir, 2) // maxAttempts=2 → one retry q.setGenerator(generator) - q.enqueue({contextPath: join(tmpDir, 'file.md'), fullContent: 'content'}) + // Enqueue BATCH_SIZE_CAP=5 items so the batch fires immediately. + for (let i = 0; i < 5; i++) { + q.enqueue({contextPath: join(tmpDir, `file-${i}.md`), fullContent: 'content'}) + } - // scheduleNext fires via setImmediate; processNext is now awaiting generateFileAbstracts + // scheduleNext fires via setImmediate; processNext is now awaiting generateFileAbstractsBatch await new Promise((r) => { setImmediate(r) }) expect(q.getStatus().processing).to.equal(true) - // Trigger failure — processNext catch fires: retrying++, setTimeout(500ms backoff) + // Trigger failure — Promise.all over the two parallel streams rejects, + // processNext catch fires: each item retrying++, setTimeout(500ms backoff) rejectNextCall(new Error('deliberate failure')) - // Two setImmediate ticks: one for the catch block, one for the finally block + // Ticks for catch + finally + microtask queue + await new Promise((r) => { setImmediate(r) }) await new Promise((r) => { setImmediate(r) }) await new Promise((r) => { setImmediate(r) }) - // Item is now in retry backoff: retrying=1, pending=[] - // Before fix: getStatus().pending was 0 (retrying items invisible) - // After fix: getStatus().pending is 1 (retrying items folded into pending) + // All 5 items now in retry backoff: retrying=5, pending=[] + // getStatus().pending folds retrying into pending so callers don't see false-idle. const status = q.getStatus() expect(status.processing).to.equal(false) - expect(status.pending).to.equal(1) + expect(status.pending).to.equal(5) expect(status.failed).to.equal(0) }) }) @@ -237,4 +253,81 @@ describe('AbstractGenerationQueue', () => { expect(written.processing).to.equal(false) }) }) + + // ── batching behaviour ───────────────────────────────────────────────────── + + describe('batching behaviour', () => { + it('buffers items below BATCH_SIZE_CAP without firing LLM calls', async () => { + const successfulGenerator = makeSuccessfulGenerator(sandbox) + const q = new AbstractGenerationQueue(tmpDir) + q.setGenerator(successfulGenerator) + + // Enqueue 3 items — below BATCH_SIZE_CAP=5, so no batch should fire. + for (let i = 0; i < 3; i++) { + q.enqueue({contextPath: join(tmpDir, `f${i}.md`), fullContent: `content ${i}`}) + } + + // Give scheduleNext time to (incorrectly) fire if the buffer guard is broken. + await new Promise((r) => { setImmediate(r) }) + await new Promise((r) => { setImmediate(r) }) + + const stub = successfulGenerator.generateContentStream as ReturnType + expect(stub.callCount).to.equal(0, 'Expected 0 LLM calls while pending is below BATCH_SIZE_CAP') + expect(q.getStatus()).to.deep.equal({failed: 0, pending: 3, processed: 0, processing: false}) + + // drain() forces the partial batch to flush → exactly 2 stream calls. + await q.drain() + expect(stub.callCount).to.equal(2, 'Expected drain() to flush the partial batch as 1×L0 + 1×L1') + expect(q.getStatus().processed).to.equal(3) + }) + + it('processes up to BATCH_SIZE_CAP items in a single LLM cycle', async () => { + const successfulGenerator = makeSuccessfulGenerator(sandbox) + const q = new AbstractGenerationQueue(tmpDir) + q.setGenerator(successfulGenerator) + + const N = 5 + for (let i = 0; i < N; i++) { + q.enqueue({contextPath: join(tmpDir, `f${i}.md`), fullContent: `content ${i}`}) + } + + await q.drain() + + // 1 batch * 2 LLM calls (L0 + L1) = exactly 2 stream calls for N=5 + const stub = successfulGenerator.generateContentStream as ReturnType + expect(stub.callCount).to.equal(2, 'Expected exactly 2 LLM stream calls for a 5-item batch (1×L0 + 1×L1)') + expect(q.getStatus()).to.deep.equal({failed: 0, pending: 0, processed: N, processing: false}) + + // Every file gets its abstract.md and overview.md written + const fileChecks = Array.from({length: N}, async (_, i) => { + const abstractPath = join(tmpDir, `f${i}.abstract.md`) + const overviewPath = join(tmpDir, `f${i}.overview.md`) + const [abstractText, overviewText] = await Promise.all([ + fs.readFile(abstractPath, 'utf8'), + fs.readFile(overviewPath, 'utf8'), + ]) + expect(abstractText).to.equal('generated text') + expect(overviewText).to.equal('generated text') + }) + await Promise.all(fileChecks) + }) + + it('splits oversized backlogs into multiple batches', async () => { + const successfulGenerator = makeSuccessfulGenerator(sandbox) + const q = new AbstractGenerationQueue(tmpDir) + q.setGenerator(successfulGenerator) + + const N = 7 // > BATCH_SIZE_CAP=5 → expect 2 batches (5 + 2) + for (let i = 0; i < N; i++) { + q.enqueue({contextPath: join(tmpDir, `f${i}.md`), fullContent: `content ${i}`}) + } + + await q.drain() + + const stub = successfulGenerator.generateContentStream as ReturnType + // 2 batches × 2 LLM calls each = 4 stream calls + expect(stub.callCount).to.equal(4, 'Expected 4 stream calls for 7 items split into batches of 5+2') + expect(q.getStatus().processed).to.equal(N) + }) + }) }) diff --git a/test/unit/infra/dream/dream-state-schema.test.ts b/test/unit/infra/dream/dream-state-schema.test.ts index 90951a0d6..d7f4d155e 100644 --- a/test/unit/infra/dream/dream-state-schema.test.ts +++ b/test/unit/infra/dream/dream-state-schema.test.ts @@ -1,6 +1,6 @@ import {expect} from 'chai' -import {DreamStateSchema, EMPTY_DREAM_STATE, PendingMergeSchema} from '../../../../src/server/infra/dream/dream-state-schema.js' +import {DreamStateSchema, EMPTY_DREAM_STATE, PendingMergeSchema, StaleSummaryEntrySchema} from '../../../../src/server/infra/dream/dream-state-schema.js' describe('dream-state-schema', () => { describe('DreamStateSchema', () => { @@ -17,6 +17,7 @@ describe('DreamStateSchema', () => { suggestedByDreamId: 'drm-1712736000000', }, ], + staleSummaryPaths: [], totalDreams: 3, version: 1, } @@ -120,8 +121,59 @@ describe('EMPTY_DREAM_STATE', () => { expect(EMPTY_DREAM_STATE.lastDreamAt).to.be.null expect(EMPTY_DREAM_STATE.lastDreamLogId).to.be.null expect(EMPTY_DREAM_STATE.pendingMerges).to.deep.equal([]) + expect(EMPTY_DREAM_STATE.staleSummaryPaths).to.deep.equal([]) expect(EMPTY_DREAM_STATE.totalDreams).to.equal(0) expect(EMPTY_DREAM_STATE.version).to.equal(1) }) }) + +describe('StaleSummaryEntrySchema', () => { + it('should parse a valid entry', () => { + const input = {enqueuedAt: 1_745_539_200_000, path: 'auth/jwt/token.md'} + const result = StaleSummaryEntrySchema.parse(input) + expect(result).to.deep.equal(input) + }) + + it('should reject a non-integer enqueuedAt', () => { + expect(() => StaleSummaryEntrySchema.parse({enqueuedAt: 1.5, path: 'a.md'})).to.throw() + }) + + it('should reject a negative enqueuedAt', () => { + expect(() => StaleSummaryEntrySchema.parse({enqueuedAt: -1, path: 'a.md'})).to.throw() + }) + + it('should reject a missing path', () => { + expect(() => StaleSummaryEntrySchema.parse({enqueuedAt: 0})).to.throw() + }) +}) + +describe('DreamStateSchema staleSummaryPaths', () => { + it('should default staleSummaryPaths to [] when missing', () => { + const input = { + curationsSinceDream: 0, + lastDreamAt: null, + lastDreamLogId: null, + totalDreams: 0, + version: 1, + } + const result = DreamStateSchema.parse(input) + expect(result.staleSummaryPaths).to.deep.equal([]) + }) + + it('should accept a populated staleSummaryPaths array', () => { + const input = { + curationsSinceDream: 0, + lastDreamAt: null, + lastDreamLogId: null, + staleSummaryPaths: [ + {enqueuedAt: 1_745_539_200_000, path: 'auth/jwt/token.md'}, + {enqueuedAt: 1_745_546_400_000, path: 'billing/webhooks/stripe.md'}, + ], + totalDreams: 0, + version: 1, + } + const result = DreamStateSchema.parse(input) + expect(result.staleSummaryPaths).to.have.lengthOf(2) + }) +}) }) diff --git a/test/unit/infra/dream/dream-state-service.test.ts b/test/unit/infra/dream/dream-state-service.test.ts index 52970ce55..e35e1d5cf 100644 --- a/test/unit/infra/dream/dream-state-service.test.ts +++ b/test/unit/infra/dream/dream-state-service.test.ts @@ -2,6 +2,7 @@ import {expect} from 'chai' import {mkdir, readFile, rm, writeFile} from 'node:fs/promises' import {tmpdir} from 'node:os' import {join} from 'node:path' +import {restore, spy} from 'sinon' import type {DreamState} from '../../../../src/server/infra/dream/dream-state-schema.js' @@ -13,6 +14,7 @@ function makeState(overrides: Partial = {}): DreamState { lastDreamAt: null, lastDreamLogId: null, pendingMerges: [], + staleSummaryPaths: [], totalDreams: 0, version: 1, ...overrides, @@ -30,6 +32,7 @@ describe('DreamStateService', () => { }) afterEach(async () => { + restore() await rm(tempDir, {force: true, recursive: true}) }) @@ -217,4 +220,184 @@ describe('DreamStateService', () => { expect(final.totalDreams).to.equal(5) }) }) + + // ========================================================================== + // enqueueStaleSummaryPaths — defer summary cascade + // ========================================================================== + + describe('enqueueStaleSummaryPaths', () => { + it('appends new paths to an empty queue', async () => { + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md', 'billing/webhooks/stripe.md']) + const state = await service.read() + expect(state.staleSummaryPaths.map((e) => e.path)).to.deep.equal([ + 'auth/jwt/token.md', + 'billing/webhooks/stripe.md', + ]) + }) + + it('stamps each entry with enqueuedAt at the moment of the call', async () => { + const before = Date.now() + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + const after = Date.now() + + const state = await service.read() + expect(state.staleSummaryPaths).to.have.lengthOf(1) + const [entry] = state.staleSummaryPaths + expect(entry.enqueuedAt).to.be.at.least(before) + expect(entry.enqueuedAt).to.be.at.most(after) + }) + + it('dedups entries by path (keeps oldest enqueuedAt)', async () => { + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + const firstState = await service.read() + const firstStamp = firstState.staleSummaryPaths[0].enqueuedAt + + // ensure the second call's Date.now() is strictly later + await new Promise((resolve) => { + setTimeout(resolve, 5) + }) + + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md', 'billing/webhooks/stripe.md']) + const secondState = await service.read() + + expect(secondState.staleSummaryPaths).to.have.lengthOf(2) + const tokenEntry = secondState.staleSummaryPaths.find((e) => e.path === 'auth/jwt/token.md') + expect(tokenEntry?.enqueuedAt, 'oldest enqueuedAt preserved on dedup').to.equal(firstStamp) + }) + + it('preserves other state fields when enqueuing', async () => { + await service.write(makeState({ + curationsSinceDream: 7, + totalDreams: 2, + })) + + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + + const state = await service.read() + expect(state.curationsSinceDream).to.equal(7) + expect(state.totalDreams).to.equal(2) + expect(state.staleSummaryPaths).to.have.lengthOf(1) + }) + + it('is a no-op for an empty input array', async () => { + await service.enqueueStaleSummaryPaths([]) + const state = await service.read() + expect(state.staleSummaryPaths).to.deep.equal([]) + }) + + it('dedups within-batch duplicates so a single call cannot insert the same path twice', async () => { + // The contract is "dedup by path". A caller passing a non-unique array + // (e.g. multiple changedPaths within a single curate that round-trip + // through the same parent dir) must NOT produce duplicate queue entries. + await service.enqueueStaleSummaryPaths(['auth/jwt.md', 'auth/jwt.md', 'auth/jwt.md']) + + const state = await service.read() + expect(state.staleSummaryPaths).to.have.lengthOf(1) + expect(state.staleSummaryPaths[0].path).to.equal('auth/jwt.md') + }) + + it('does not lose entries when 10 enqueues run concurrently', async () => { + const paths = Array.from({length: 10}, (_, i) => `domain/topic-${i}.md`) + await Promise.all(paths.map((p) => service.enqueueStaleSummaryPaths([p]))) + const state = await service.read() + const stored = state.staleSummaryPaths.map((e) => e.path).sort() + expect(stored).to.deep.equal([...paths].sort()) + }) + }) + + // ========================================================================== + // drainStaleSummaryPaths — snapshot-and-clear pattern + // ========================================================================== + + describe('drainStaleSummaryPaths', () => { + it('returns the current snapshot of paths AND clears the queue atomically', async () => { + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md', 'billing/webhooks/stripe.md']) + + const snapshot = await service.drainStaleSummaryPaths() + expect(snapshot.sort()).to.deep.equal([ + 'auth/jwt/token.md', + 'billing/webhooks/stripe.md', + ]) + + // queue is empty after drain — the same RMW that read it cleared it + const state = await service.read() + expect(state.staleSummaryPaths).to.deep.equal([]) + }) + + it('returns an empty snapshot when the queue is empty', async () => { + const snapshot = await service.drainStaleSummaryPaths() + expect(snapshot).to.deep.equal([]) + }) + + it('does NOT issue a write when the queue is already empty', async () => { + // The early-return guard in drainStaleSummaryPaths returns the same + // state ref unchanged; update() then skips the disk write. Without + // this contract, every empty drain would tmpfile + rename for nothing. + const writeSpy = spy(service, 'write') + + await service.drainStaleSummaryPaths() + + expect(writeSpy.called).to.equal(false) + }) + + it('different-path enqueue during processing survives', async () => { + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + const snapshot = await service.drainStaleSummaryPaths() + expect(snapshot).to.deep.equal(['auth/jwt/token.md']) + + // simulate a curate enqueue happening WHILE the dream is processing + await service.enqueueStaleSummaryPaths(['billing/webhooks/stripe.md']) + + const state = await service.read() + expect(state.staleSummaryPaths.map((e) => e.path)).to.deep.equal(['billing/webhooks/stripe.md']) + }) + + it('drain on an empty queue returns an empty snapshot and leaves enqueues untouched', async () => { + const snapshot = await service.drainStaleSummaryPaths() + expect(snapshot).to.deep.equal([]) + + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + + const state = await service.read() + expect(state.staleSummaryPaths.map((e) => e.path)).to.deep.equal(['auth/jwt/token.md']) + }) + + it('preserves other state fields when draining', async () => { + await service.write(makeState({ + curationsSinceDream: 3, + totalDreams: 1, + })) + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + await service.drainStaleSummaryPaths() + + const state = await service.read() + expect(state.curationsSinceDream).to.equal(3) + expect(state.totalDreams).to.equal(1) + expect(state.staleSummaryPaths).to.deep.equal([]) + }) + + it('preserves a same-path enqueue made after the drain (no race loss)', async () => { + // Repro of the race the reviewer flagged on PR #551: + // 1. Dream drains queue containing X. + // 2. Concurrent curate touches X again — enqueue should record it. + // 3. Dream finishes propagation. + // 4. The post-drain enqueue MUST survive so the next dream picks it up. + // Atomic drain (queue cleared upfront) makes the post-drain enqueue see + // an empty queue, so it always appends fresh. + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + + // (1) Dream drains — entries removed atomically. + const snapshot = await service.drainStaleSummaryPaths() + expect(snapshot).to.deep.equal(['auth/jwt/token.md']) + + // (2) A curate touches the same path during dream propagation. + await service.enqueueStaleSummaryPaths(['auth/jwt/token.md']) + + // (3) Dream finishes — no clear() to call; entries already removed at (1). + + // (4) The path enqueued at (2) survives. + const state = await service.read() + expect(state.staleSummaryPaths.map((e) => e.path)).to.deep.equal(['auth/jwt/token.md']) + }) + }) }) diff --git a/test/unit/infra/dream/dream-trigger.test.ts b/test/unit/infra/dream/dream-trigger.test.ts index 7606dfb55..6c5465e5f 100644 --- a/test/unit/infra/dream/dream-trigger.test.ts +++ b/test/unit/infra/dream/dream-trigger.test.ts @@ -11,6 +11,7 @@ function makeState(overrides: Partial = {}): DreamState { lastDreamAt: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(), lastDreamLogId: null, pendingMerges: [], + staleSummaryPaths: [], totalDreams: 0, version: 1, ...overrides, @@ -78,6 +79,37 @@ describe('DreamTrigger', () => { } }) + it('should bypass activity gate when stale-summary queue has work', async () => { + // ENG-2485: deferred summary cascade lives in staleSummaryPaths. If a + // low-activity project (1-2 curates) accumulates queued paths and the + // activity gate kept blocking, _index.md regeneration would never run. + const deps = makeDeps({ + state: makeState({ + curationsSinceDream: 1, + staleSummaryPaths: [{enqueuedAt: Date.now(), path: 'auth/jwt.md'}], + }), + }) + const trigger = new DreamTrigger(deps) + + const result = await trigger.tryStartDream('/project') + expect(result.eligible).to.be.true + }) + + it('should still fail activity gate when both curations AND queue are empty', async () => { + // Negative case for the bypass: empty queue + low activity means the + // activity gate should still block (nothing to drain, no work to do). + const deps = makeDeps({ + state: makeState({curationsSinceDream: 1, staleSummaryPaths: []}), + }) + const trigger = new DreamTrigger(deps) + + const result = await trigger.tryStartDream('/project') + expect(result.eligible).to.be.false + if (!result.eligible) { + expect(result.reason).to.include('activity') + } + }) + it('should fail when queue is not empty', async () => { const deps = makeDeps({queueLength: 3}) const trigger = new DreamTrigger(deps) @@ -264,6 +296,22 @@ describe('DreamTrigger', () => { } }) + it('should bypass activity gate when stale-summary queue has work', async () => { + // Symmetry with the tryStartDream bypass test — both methods delegate + // to checkGates1to3, so a future refactor of the shared path must keep + // this invariant on both call sites. + const deps = makeDeps({ + state: makeState({ + curationsSinceDream: 1, + staleSummaryPaths: [{enqueuedAt: Date.now(), path: 'auth/jwt.md'}], + }), + }) + const trigger = new DreamTrigger(deps) + + const result = await trigger.checkEligibility('/project') + expect(result.eligible).to.be.true + }) + it('should fail when queue is not empty', async () => { const deps = makeDeps({queueLength: 3}) const trigger = new DreamTrigger(deps) diff --git a/test/unit/infra/executor/curate-executor.test.ts b/test/unit/infra/executor/curate-executor.test.ts index d9390ce7b..95d718075 100644 --- a/test/unit/infra/executor/curate-executor.test.ts +++ b/test/unit/infra/executor/curate-executor.test.ts @@ -45,24 +45,10 @@ import {FileValidationError} from '../../../../src/server/core/domain/errors/tas import {FileContextTreeManifestService} from '../../../../src/server/infra/context-tree/file-context-tree-manifest-service.js' import {FileContextTreeSnapshotService} from '../../../../src/server/infra/context-tree/file-context-tree-snapshot-service.js' import {FileContextTreeSummaryService} from '../../../../src/server/infra/context-tree/file-context-tree-summary-service.js' -import {DreamLockService} from '../../../../src/server/infra/dream/dream-lock-service.js' +import {DreamStateService} from '../../../../src/server/infra/dream/dream-state-service.js' import {CurateExecutor} from '../../../../src/server/infra/executor/curate-executor.js' -/** - * Default DreamLockService stubs so Phase 4 tests don't write real - * `dream.lock` files. Tests exercising the lock directly re-stub via restore. - */ -function stubDreamLockServiceDefaults(): void { - stub(DreamLockService.prototype, 'tryAcquire').resolves({acquired: true, priorMtime: 0}) - stub(DreamLockService.prototype, 'release').resolves() - stub(DreamLockService.prototype, 'rollback').resolves() -} - describe('CurateExecutor (regression)', () => { - beforeEach(() => { - stubDreamLockServiceDefaults() - }) - afterEach(() => { restore() }) @@ -292,8 +278,8 @@ describe('CurateExecutor (regression)', () => { }) }) - describe('summary propagation taskId threading (ENG-2100)', () => { - it('passes the curate operation taskId to propagateStaleness so summary LLM calls share one billing session', async () => { + describe('summary cascade deferral to dream (ENG-2485)', () => { + it('enqueues stale-summary paths to the dream queue and does NOT call propagateStaleness inline', async () => { const agent = { cancel: stub().resolves(false), createTaskSession: stub().resolves('session-id'), @@ -325,6 +311,10 @@ describe('CurateExecutor (regression)', () => { 'propagateStaleness', ).resolves([]) stub(FileContextTreeManifestService.prototype, 'buildManifest').resolves() + const enqueueStub = stub(DreamStateService.prototype, 'enqueueStaleSummaryPaths').resolves() + // incrementCurationCount is unrelated dream-state work that runs after the post-curation + // step; stub it so the test doesn't hit disk for the dream state file. + stub(DreamStateService.prototype, 'incrementCurationCount').resolves() const taskId = 'curate-op-uuid-1' const projectRoot = '/projects/myapp' @@ -336,16 +326,88 @@ describe('CurateExecutor (regression)', () => { taskId, }) - expect(propagateStalenessStub.calledOnce).to.be.true - // 4th arg must be the curate's taskId so the billing service groups - // summary regenerations into the same session as the parent operation. - expect(propagateStalenessStub.firstCall.args[3]).to.equal(taskId) + // ENG-2485 invariant: the LLM-bound propagateStaleness walk MUST NOT run + // on the curate hot path. It is deferred to the next dream cycle. + expect(propagateStalenessStub.called).to.equal(false) + + // The deferred work is captured in the dream queue: the changed paths from + // diffStates are enqueued for the next dream cycle to drain. + expect(enqueueStub.calledOnce).to.equal(true) + expect(enqueueStub.firstCall.args[0]).to.deep.equal(['auth/jwt.md']) + }) + }) + + describe('pre-pipelined recon (ENG-2530)', () => { + it('injects __recon_result_ as a sandbox variable and surfaces it in the prompt', async () => { + const taskId = '7a2b9e10-cdef-4321-8765-0abcdef01234' + const taskIdSafe = taskId.replaceAll('-', '_') + const expectedReconVar = `__recon_result_${taskIdSafe}` + + const setSandboxVariableOnSession = stub() + const executeOnSession = stub().resolves('ok') + + const agent = { + cancel: stub().resolves(false), + createTaskSession: stub().resolves('session-id'), + deleteSandboxVariable: stub(), + deleteSandboxVariableOnSession: stub(), + deleteSession: stub().resolves(true), + deleteTaskSession: stub().resolves(), + execute: stub().resolves(''), + executeOnSession, + generate: stub().resolves({content: '', toolCalls: [], usage: {inputTokens: 0, outputTokens: 0}}), + getSessionMetadata: stub().resolves(), + getState: stub().returns({currentIteration: 0, executionHistory: [], executionState: 'idle', toolCallsExecuted: 0}), + listPersistedSessions: stub().resolves([]), + reset: stub(), + setSandboxVariable: stub(), + setSandboxVariableOnSession, + start: stub().resolves(), + stream: stub().resolves({[Symbol.asyncIterator]: () => ({next: () => Promise.resolve({done: true, value: undefined})})}), + } as unknown as ICipherAgent + + // Stub the post-curate filesystem services so the test stays + // fully in-memory (mirrors the ENG-2485 test above). Without + // these, executeWithAgent attempts real I/O against /projects/myapp. + stub(FileContextTreeSnapshotService.prototype, 'getCurrentState').resolves(new Map()) + stub(DreamStateService.prototype, 'incrementCurationCount').resolves() + + const executor = new CurateExecutor() + await executor.executeWithAgent(agent, { + clientCwd: '/projects/myapp', + content: 'plain text content for the curate to inspect', + taskId, + }) + + // (a) recon result was set on the task session under __recon_result_ + const reconCall = setSandboxVariableOnSession + .getCalls() + .find((c) => c.args[1] === expectedReconVar) + expect(reconCall, `no setSandboxVariableOnSession call with key ${expectedReconVar}`).to.not.equal(undefined) + + const reconValue = reconCall?.args[2] as Record | undefined + expect(reconValue).to.have.property('suggestedMode') + expect(reconValue).to.have.property('suggestedChunkCount') + expect(reconValue).to.have.property('meta') + const meta = reconValue?.meta as Record + expect(meta).to.have.property('charCount') + expect(meta).to.have.property('lineCount') + expect(meta).to.have.property('messageCount') + + // (b) prompt instructs the agent that recon is pre-computed and to skip the call + expect(executeOnSession.calledOnce).to.equal(true) + const promptArg = executeOnSession.firstCall.args[1] as string + expect(promptArg).to.include('Recon already computed in') + expect(promptArg).to.include(expectedReconVar) + expect(promptArg).to.include('Do NOT call tools.curation.recon') }) }) describe('runAgentBody / finalize split', () => { // runAgentBody must return the response BEFORE Phase 4 runs so the daemon // can fire `task:completed` early and queue finalize for background work. + // Under cascade-defer (ENG-2485), Phase 4 is enqueueStaleSummaryPaths + + // buildManifest — it must NEVER call propagateStaleness inline. it('returns the agent response without running Phase 4 first', async () => { const agent = buildSplitTestAgent() @@ -358,7 +420,9 @@ describe('CurateExecutor (regression)', () => { FileContextTreeSummaryService.prototype, 'propagateStaleness', ).resolves([]) - stub(FileContextTreeManifestService.prototype, 'buildManifest').resolves() + const buildManifestStub = stub(FileContextTreeManifestService.prototype, 'buildManifest').resolves() + const enqueueStub = stub(DreamStateService.prototype, 'enqueueStaleSummaryPaths').resolves() + stub(DreamStateService.prototype, 'incrementCurationCount').resolves() const executor = new CurateExecutor() const {finalize, response} = await executor.runAgentBody(agent, { @@ -370,12 +434,16 @@ describe('CurateExecutor (regression)', () => { // Phase 4 must NOT have run yet — response was returned immediately. expect(response).to.equal('curated') - expect(propagateStalenessStub.called).to.be.false + expect(enqueueStub.called).to.be.false + expect(buildManifestStub.called).to.be.false expect((agent.deleteTaskSession as ReturnType).called).to.be.false - // finalize() actually runs Phase 4 + // finalize() runs Phase 4: enqueue + manifest rebuild + session cleanup. + // ENG-2485 invariant: propagateStaleness MUST NOT run on the curate path. await finalize() - expect(propagateStalenessStub.calledOnce).to.be.true + expect(enqueueStub.calledOnce).to.be.true + expect(buildManifestStub.calledOnce).to.be.true + expect(propagateStalenessStub.called).to.be.false expect((agent.deleteTaskSession as ReturnType).calledOnce).to.be.true }) @@ -402,10 +470,10 @@ describe('CurateExecutor (regression)', () => { .resolves(new Map()) .onSecondCall() .resolves(new Map([['auth/jwt.md', {hash: 'h', size: 1}]])) - const propagateStalenessStub = stub( - FileContextTreeSummaryService.prototype, - 'propagateStaleness', - ).resolves([]) + stub(FileContextTreeSummaryService.prototype, 'propagateStaleness').resolves([]) + stub(FileContextTreeManifestService.prototype, 'buildManifest').resolves() + const enqueueStub = stub(DreamStateService.prototype, 'enqueueStaleSummaryPaths').resolves() + stub(DreamStateService.prototype, 'incrementCurationCount').resolves() const executor = new CurateExecutor() const result = await executor.executeWithAgent(agent, { @@ -416,107 +484,9 @@ describe('CurateExecutor (regression)', () => { }) expect(result).to.equal('curated') - // Wrapper awaits finalize internally — Phase 4 ran by the time we get here. - expect(propagateStalenessStub.calledOnce).to.be.true - }) - }) - - describe('dream-lock coordination in Phase 4', () => { - // Detached Phase 4 races with idle-triggered dream on `_index.md` / - // `_manifest.json`. Curate's finalize must hold the dream lock around - // propagateStaleness + buildManifest to prevent interleaving. - - it('acquires the dream lock before propagation and releases on success', async () => { - // Restore default stubs so we can observe the real call sequence. - restore() - const tryAcquire = stub(DreamLockService.prototype, 'tryAcquire').resolves({acquired: true, priorMtime: 1234}) - const release = stub(DreamLockService.prototype, 'release').resolves() - const rollback = stub(DreamLockService.prototype, 'rollback').resolves() - - const agent = buildSplitTestAgent() - stub(FileContextTreeSnapshotService.prototype, 'getCurrentState') - .onFirstCall() - .resolves(new Map()) - .onSecondCall() - .resolves(new Map([['auth/jwt.md', {hash: 'h', size: 1}]])) - const propagateStaleness = stub(FileContextTreeSummaryService.prototype, 'propagateStaleness').resolves([]) - - const executor = new CurateExecutor() - const {finalize} = await executor.runAgentBody(agent, { - clientCwd: '/p', - content: 'x', - projectRoot: '/p', - taskId: 't-lock-success', - }) - await finalize() - - expect(tryAcquire.calledOnce).to.be.true - expect(propagateStaleness.calledOnce).to.be.true - // Lock-then-propagate, then release on success (no rollback). - expect(tryAcquire.calledBefore(propagateStaleness)).to.be.true - expect(release.calledOnce).to.be.true - expect(rollback.called).to.be.false - }) - - it('skips propagation when the lock is held (dream is running)', async () => { - restore() - const tryAcquire = stub(DreamLockService.prototype, 'tryAcquire').resolves({acquired: false}) - const release = stub(DreamLockService.prototype, 'release').resolves() - const rollback = stub(DreamLockService.prototype, 'rollback').resolves() - - const agent = buildSplitTestAgent() - // Snapshot is reachable so without the lock check, propagation would run. - stub(FileContextTreeSnapshotService.prototype, 'getCurrentState') - .onFirstCall() - .resolves(new Map()) - .onSecondCall() - .resolves(new Map([['auth/jwt.md', {hash: 'h', size: 1}]])) - const propagateStaleness = stub(FileContextTreeSummaryService.prototype, 'propagateStaleness').resolves([]) - - const executor = new CurateExecutor() - const {finalize} = await executor.runAgentBody(agent, { - clientCwd: '/p', - content: 'x', - projectRoot: '/p', - taskId: 't-lock-held', - }) - await finalize() - - // Lock was checked; propagation skipped; nothing to release/rollback. - expect(tryAcquire.calledOnce).to.be.true - expect(propagateStaleness.called).to.be.false - expect(release.called).to.be.false - expect(rollback.called).to.be.false - }) - - it('rolls back the lock (preserves prior mtime) when propagation throws', async () => { - restore() - const priorMtime = 9999 - stub(DreamLockService.prototype, 'tryAcquire').resolves({acquired: true, priorMtime}) - const release = stub(DreamLockService.prototype, 'release').resolves() - const rollback = stub(DreamLockService.prototype, 'rollback').resolves() - - const agent = buildSplitTestAgent() - stub(FileContextTreeSnapshotService.prototype, 'getCurrentState') - .onFirstCall() - .resolves(new Map()) - .onSecondCall() - .resolves(new Map([['auth/jwt.md', {hash: 'h', size: 1}]])) - stub(FileContextTreeSummaryService.prototype, 'propagateStaleness').rejects(new Error('boom')) - - const executor = new CurateExecutor() - const {finalize} = await executor.runAgentBody(agent, { - clientCwd: '/p', - content: 'x', - projectRoot: '/p', - taskId: 't-lock-fail', - }) - - // Phase 4 is fail-open: finalize must not throw even though propagation did. - await finalize() - - expect(release.called).to.be.false - expect(rollback.calledOnceWithExactly(priorMtime)).to.be.true + // Wrapper awaits finalize internally — cascade-defer enqueue ran by the + // time we get here. + expect(enqueueStub.calledOnce).to.be.true }) }) }) diff --git a/test/unit/infra/executor/dream-executor.test.ts b/test/unit/infra/executor/dream-executor.test.ts index b1ffa4833..b62576b26 100644 --- a/test/unit/infra/executor/dream-executor.test.ts +++ b/test/unit/infra/executor/dream-executor.test.ts @@ -49,7 +49,7 @@ function makePartialRunExecutor(args: { } describe('DreamExecutor', () => { - let dreamStateService: {read: SinonStub; update: SinonStub; write: SinonStub} + let dreamStateService: {drainStaleSummaryPaths: SinonStub; enqueueStaleSummaryPaths: SinonStub; read: SinonStub; update: SinonStub; write: SinonStub} let dreamLogStore: {getNextId: SinonStub; save: SinonStub} let dreamLockService: {release: SinonStub; rollback: SinonStub} let curateLogStore: {getNextId: SinonStub; list: SinonStub; save: SinonStub} @@ -64,7 +64,12 @@ describe('DreamExecutor', () => { beforeEach(() => { dreamStateService = { - read: stub().resolves({...EMPTY_DREAM_STATE, pendingMerges: []}), + // Default drain: empty queue. Tests that exercise the queue override. + drainStaleSummaryPaths: stub().resolves([]), + // Default enqueue: no-op stub. Used by the executor's catch block to + // re-enqueue a drained snapshot if propagation fails. + enqueueStaleSummaryPaths: stub().resolves(), + read: stub().resolves({...EMPTY_DREAM_STATE, pendingMerges: [], staleSummaryPaths: []}), // Default update implementation: read → updater → write, mirroring the real // service so tests that count write.callCount stay valid without changes. update: stub().callsFake(async (updater: (state: import('../../../../src/server/infra/dream/dream-state-schema.js').DreamState) => import('../../../../src/server/infra/dream/dream-state-schema.js').DreamState) => { @@ -516,6 +521,123 @@ describe('DreamExecutor', () => { } }) + // ========================================================================== + // Stale-summary queue: drain + re-enqueue on propagation failure + // ========================================================================== + + it('propagates over A ∪ B union of drained queue and snapshot diff (happy path)', async () => { + // The merge at dream-executor.ts is the central correctness invariant of this + // PR — anything in EITHER the queue (A) OR dream's own diff (B) must be + // propagated, exactly once per path. This test pins that invariant. + dreamStateService.drainStaleSummaryPaths.resolves(['queue/path.md']) + + // Real temp project so snapshotService.getCurrentState succeeds. We override + // runOperations to write a new file between pre and post snapshots, so the + // snapshot diff produces a non-empty list — that becomes the B half of A ∪ B. + const projectRoot = mkdtempSync(join(tmpdir(), 'brv-dream-merge-')) + const contextTreeDir = join(projectRoot, '.brv', 'context-tree') + mkdirSync(contextTreeDir, {recursive: true}) + const captured: string[][] = [] + + class MergeTestExecutor extends DreamExecutor { + protected override async runOperations(): Promise { + // Mutate the tree so postState differs from preState by 'diff/added.md'. + mkdirSync(join(contextTreeDir, 'diff'), {recursive: true}) + writeFileSync(join(contextTreeDir, 'diff', 'added.md'), '# new from dream') + } + + protected override async runStaleSummaryPropagation(opts: { + agent: ICipherAgent + paths: string[] + projectRoot: string + }): Promise { + captured.push([...opts.paths].sort()) + } + } + + try { + const executor = new MergeTestExecutor(deps) + await executor.executeWithAgent(agent, {...defaultOptions, projectRoot}) + } finally { + rmSync(projectRoot, {force: true, recursive: true}) + } + + expect(captured).to.have.lengthOf(1) + expect(captured[0]).to.deep.equal(['diff/added.md', 'queue/path.md']) + expect(dreamStateService.enqueueStaleSummaryPaths.callCount).to.equal(0) + }) + + it('dedups paths that appear in both the queue and the snapshot diff (single regeneration)', async () => { + dreamStateService.drainStaleSummaryPaths.resolves(['shared/path.md']) + + const projectRoot = mkdtempSync(join(tmpdir(), 'brv-dream-merge-dedup-')) + const contextTreeDir = join(projectRoot, '.brv', 'context-tree') + mkdirSync(contextTreeDir, {recursive: true}) + const captured: string[][] = [] + + class MergeTestExecutor extends DreamExecutor { + protected override async runOperations(): Promise { + // Write the SAME path the queue contains — the merge must dedup. + mkdirSync(join(contextTreeDir, 'shared'), {recursive: true}) + writeFileSync(join(contextTreeDir, 'shared', 'path.md'), '# also touched by dream') + } + + protected override async runStaleSummaryPropagation(opts: { + agent: ICipherAgent + paths: string[] + projectRoot: string + }): Promise { + captured.push([...opts.paths].sort()) + } + } + + try { + const executor = new MergeTestExecutor(deps) + await executor.executeWithAgent(agent, {...defaultOptions, projectRoot}) + } finally { + rmSync(projectRoot, {force: true, recursive: true}) + } + + expect(captured).to.have.lengthOf(1) + expect(captured[0]).to.deep.equal(['shared/path.md']) + }) + + it('re-enqueues drained snapshot when post-dream propagation throws', async () => { + // Atomic drain removes entries upfront. If propagation fails, the catch + // block must re-enqueue so the snapshot is not lost. + dreamStateService.drainStaleSummaryPaths.resolves([ + 'auth/jwt/token.md', + 'billing/webhooks/stripe.md', + ]) + + // Force the propagation block to throw by making the snapshot service fail. + // The dream-executor wraps Step 5 in try/catch so the dream itself completes. + const projectRoot = mkdtempSync(join(tmpdir(), 'brv-dream-reenqueue-')) + try { + const executor = new DreamExecutor(deps) + // executeWithAgent uses a real FileContextTreeSnapshotService bound to projectRoot. + // The directory exists but has no .brv/context-tree, so getCurrentState throws — + // exercising the catch block that should re-enqueue the drained snapshot. + await executor.executeWithAgent(agent, {...defaultOptions, projectRoot}) + } finally { + rmSync(projectRoot, {force: true, recursive: true}) + } + + expect(dreamStateService.enqueueStaleSummaryPaths.calledOnce).to.equal(true) + expect(dreamStateService.enqueueStaleSummaryPaths.firstCall.args[0]).to.deep.equal([ + 'auth/jwt/token.md', + 'billing/webhooks/stripe.md', + ]) + }) + + it('does not call enqueue when drain returns an empty snapshot (no work to retry)', async () => { + // Default drain stub returns [] — no snapshot to preserve on failure. + const executor = new DreamExecutor(deps) + await executor.executeWithAgent(agent, defaultOptions) + + expect(dreamStateService.enqueueStaleSummaryPaths.callCount).to.equal(0) + }) + // ========================================================================== // Partial / error log preservation (ENG-2126 fix #2) // ========================================================================== diff --git a/test/unit/infra/llm/internal-llm-service.test.ts b/test/unit/infra/llm/internal-llm-service.test.ts index 26a2afd34..10fa11938 100644 --- a/test/unit/infra/llm/internal-llm-service.test.ts +++ b/test/unit/infra/llm/internal-llm-service.test.ts @@ -788,8 +788,13 @@ describe('AgentLLMService', () => { await service.completeTask('Analyze this image', {imageData}) - // Verify imageData was passed to addUserMessage - expect(addUserMessageStub.calledWith('Analyze this image', imageData)).to.be.true + // Verify imageData was passed to addUserMessage. The first argument now + // includes a `` prefix injected at iteration 0 to keep the + // system prefix byte-stable for prompt caching, so we match by suffix. + expect(addUserMessageStub.calledOnce).to.be.true + const [firstArg, secondArg] = addUserMessageStub.firstCall.args as [string, typeof imageData] + expect(firstArg).to.match(/.*<\/dateTime>\n\nAnalyze this image$/s) + expect(secondArg).to.equal(imageData) }) }) })