From ca49eb919958cd08df1bf51d1da7da1b3e830cde Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:05:46 -0500 Subject: [PATCH 1/9] fix: capture agent outputs in session runner for persistent mode The session runner was not capturing agent stdout to extract output markers (---KELOS_OUTPUTS_START/END---). This meant Task status never received outputs/results, so Slack reporting had nothing to post back to the thread. Now runAgent uses io.MultiWriter to tee stdout into a buffer, parses the output markers after the agent exits, and writes outputs/results to the Task status via the Kelos API. Co-Authored-By: Claude Opus 4.6 --- internal/sessionrunner/runner.go | 98 +++++++++++++++++++++++++-- internal/sessionrunner/runner_test.go | 78 +++++++++++++++++++++ 2 files changed, 171 insertions(+), 5 deletions(-) diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index 6687d4a7..44dcd547 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -17,11 +17,14 @@ limitations under the License. package sessionrunner import ( + "bytes" "context" "fmt" + "io" "os" "os/exec" "strconv" + "strings" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -215,12 +218,23 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { return fmt.Errorf("workspace reset failed: %w", err) } - // Invoke the agent entrypoint. - return r.runAgent(ctx, task) + // Invoke the agent entrypoint and capture outputs. + agentOutput, agentErr := r.runAgent(ctx, task) + + // Parse and persist outputs to Task status regardless of success/failure. + if outputs := parseOutputs(agentOutput); len(outputs) > 0 { + results := resultsFromOutputs(outputs) + if err := r.updateTaskOutputs(ctx, taskName, outputs, results); err != nil { + fmt.Printf("Error updating task outputs: %v\n", err) + } + } + + return agentErr } // runAgent invokes the agent entrypoint with the task prompt. -func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error { +// It returns the captured stdout content and any execution error. +func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string, error) { entrypoint := "/kelos_entrypoint.sh" // Set branch env var if present. @@ -229,13 +243,32 @@ func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error { env = append(env, fmt.Sprintf("KELOS_BRANCH=%s", task.Spec.Branch)) } + var buf bytes.Buffer cmd := exec.CommandContext(ctx, entrypoint, task.Spec.Prompt) cmd.Dir = "/workspace/repo" - cmd.Stdout = os.Stdout + cmd.Stdout = io.MultiWriter(os.Stdout, &buf) cmd.Stderr = os.Stderr cmd.Env = env - return cmd.Run() + err := cmd.Run() + return buf.String(), err +} + +// updateTaskOutputs writes captured outputs and results to the Task status. +func (r *Runner) updateTaskOutputs(ctx context.Context, taskName string, outputs []string, results map[string]string) error { + task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{}) + if err != nil { + return err + } + task.Status.Outputs = outputs + task.Status.Results = results + now := metav1.Now() + task.Status.CompletionTime = &now + if task.Status.StartTime == nil { + task.Status.StartTime = &now + } + _, err = r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).UpdateStatus(ctx, task, metav1.UpdateOptions{}) + return err } // setTaskStatus sets the kelos.dev/task-status annotation on the pod. @@ -266,3 +299,58 @@ func (r *Runner) setAnnotation(ctx context.Context, key, value string) error { } return fmt.Errorf("failed to set annotation %s after %d retries", key, maxRetries) } + +const ( + outputStartMarker = "---KELOS_OUTPUTS_START---" + outputEndMarker = "---KELOS_OUTPUTS_END---" +) + +// parseOutputs extracts output lines from log data between markers. +func parseOutputs(logData string) []string { + startIdx := strings.Index(logData, outputStartMarker) + if startIdx == -1 { + return nil + } + endIdx := strings.Index(logData, outputEndMarker) + if endIdx == -1 || endIdx <= startIdx { + return nil + } + + between := logData[startIdx+len(outputStartMarker) : endIdx] + between = strings.TrimSpace(between) + if between == "" { + return nil + } + + lines := strings.Split(between, "\n") + var result []string + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" { + result = append(result, line) + } + } + if len(result) == 0 { + return nil + } + return result +} + +// resultsFromOutputs builds a key-value map from output lines in "key: value" format. +func resultsFromOutputs(outputs []string) map[string]string { + if len(outputs) == 0 { + return nil + } + var result map[string]string + for _, line := range outputs { + key, value, ok := strings.Cut(line, ": ") + if !ok || key == "" { + continue + } + if result == nil { + result = make(map[string]string) + } + result[key] = value + } + return result +} diff --git a/internal/sessionrunner/runner_test.go b/internal/sessionrunner/runner_test.go index 378b31d4..fbc9dcb9 100644 --- a/internal/sessionrunner/runner_test.go +++ b/internal/sessionrunner/runner_test.go @@ -83,3 +83,81 @@ func TestConfigFromEnv_InvalidMaxTasks(t *testing.T) { t.Errorf("MaxTasksPerSession: expected 0 on invalid input, got %d", cfg.MaxTasksPerSession) } } + +func TestParseOutputs(t *testing.T) { + tests := []struct { + name string + input string + expect []string + }{ + { + name: "no markers", + input: "some random log output\n", + expect: nil, + }, + { + name: "empty between markers", + input: "---KELOS_OUTPUTS_START---\n---KELOS_OUTPUTS_END---\n", + expect: nil, + }, + { + name: "single output", + input: "log line\n---KELOS_OUTPUTS_START---\nbranch: main\n---KELOS_OUTPUTS_END---\n", + expect: []string{"branch: main"}, + }, + { + name: "multiple outputs", + input: "---KELOS_OUTPUTS_START---\nbranch: feat\ncommit: abc123\nresponse: dGVzdA==\n---KELOS_OUTPUTS_END---\n", + expect: []string{"branch: feat", "commit: abc123", "response: dGVzdA=="}, + }, + { + name: "start without end", + input: "---KELOS_OUTPUTS_START---\nbranch: main\n", + expect: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := parseOutputs(tc.input) + if tc.expect == nil { + if got != nil { + t.Errorf("expected nil, got %v", got) + } + return + } + if len(got) != len(tc.expect) { + t.Fatalf("expected %d outputs, got %d: %v", len(tc.expect), len(got), got) + } + for i, want := range tc.expect { + if got[i] != want { + t.Errorf("output[%d]: expected %q, got %q", i, want, got[i]) + } + } + }) + } +} + +func TestResultsFromOutputs(t *testing.T) { + outputs := []string{"branch: main", "commit: abc123", "cost-usd: 0.05"} + results := resultsFromOutputs(outputs) + + if results["branch"] != "main" { + t.Errorf("branch: expected 'main', got %q", results["branch"]) + } + if results["commit"] != "abc123" { + t.Errorf("commit: expected 'abc123', got %q", results["commit"]) + } + if results["cost-usd"] != "0.05" { + t.Errorf("cost-usd: expected '0.05', got %q", results["cost-usd"]) + } +} + +func TestResultsFromOutputs_Empty(t *testing.T) { + if got := resultsFromOutputs(nil); got != nil { + t.Errorf("expected nil, got %v", got) + } + if got := resultsFromOutputs([]string{}); got != nil { + t.Errorf("expected nil, got %v", got) + } +} From 2a99cd8e8df781482cc625557ad8941057909214 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:17:16 -0500 Subject: [PATCH 2/9] test: add integration tests for persistent execution mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests the full SessionReconciler lifecycle through envtest: - Task with execution-mode label transitions to Queued (not Job) - SessionReconciler assigns task to available session pod - Annotation-based protocol: running → succeeded/failed transitions - Task outputs/results written by session runner are preserved - Requeue behavior when no session pod is available Also registers SessionReconciler in the integration test suite. Co-Authored-By: Claude Opus 4.6 --- test/integration/session_test.go | 335 +++++++++++++++++++++++++++++++ test/integration/suite_test.go | 7 + 2 files changed, 342 insertions(+) create mode 100644 test/integration/session_test.go diff --git a/test/integration/session_test.go b/test/integration/session_test.go new file mode 100644 index 00000000..7d45c5f7 --- /dev/null +++ b/test/integration/session_test.go @@ -0,0 +1,335 @@ +package integration + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/controller" +) + +var _ = Describe("Persistent Execution Mode", func() { + const ( + timeout = time.Second * 10 + interval = time.Millisecond * 250 + ) + + Context("Task lifecycle through session pod", func() { + var ( + ns *corev1.Namespace + task *kelosv1alpha1.Task + pod *corev1.Pod + taskKey types.NamespacedName + podKey types.NamespacedName + ) + + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-persistent-" + randomSuffix(), + }, + } + Expect(k8sClient.Create(ctx, ns)).Should(Succeed()) + + task = &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "persistent-task", + Namespace: ns.Name, + Labels: map[string]string{ + controller.LabelExecutionMode: string(kelosv1alpha1.ExecutionModePersistent), + "kelos.dev/taskspawner": "my-spawner", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "Say hello", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeNone, + }, + }, + } + taskKey = types.NamespacedName{Name: task.Name, Namespace: ns.Name} + + pod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "session-my-spawner-0", + Namespace: ns.Name, + Labels: map[string]string{ + "kelos.dev/taskspawner": "my-spawner", + "kelos.dev/component": controller.SessionComponentLabel, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "agent", Image: "busybox"}, + }, + }, + } + podKey = types.NamespacedName{Name: pod.Name, Namespace: ns.Name} + }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + + It("should transition task through Queued → Pending → Running → Succeeded", func() { + By("Creating a session pod") + Expect(k8sClient.Create(ctx, pod)).Should(Succeed()) + + By("Setting pod status to Running") + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + pod.Status.Phase = corev1.PodRunning + return k8sClient.Status().Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Creating a persistent-mode task") + Expect(k8sClient.Create(ctx, task)).Should(Succeed()) + + By("Verifying the task transitions to Queued (no Job created)") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseQueued)) + + By("Verifying no Job was created") + Consistently(func() bool { + var jobList client.ObjectList + _ = jobList + // Just verify via task - if a Job existed, phase would be Pending via TaskReconciler + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return false + } + return t.Status.JobName == "" + }, 2*time.Second, interval).Should(BeTrue()) + + By("Verifying the SessionReconciler assigns the task to the pod") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhasePending)) + + By("Verifying pod has the assignment annotation") + Eventually(func() string { + var p corev1.Pod + if err := k8sClient.Get(ctx, podKey, &p); err != nil { + return "" + } + return p.Annotations[controller.AnnotationAssignedTask] + }, timeout, interval).Should(Equal("persistent-task")) + + By("Verifying task has sessionPodName set") + var updatedTask kelosv1alpha1.Task + Expect(k8sClient.Get(ctx, taskKey, &updatedTask)).Should(Succeed()) + Expect(updatedTask.Status.SessionPodName).To(Equal(pod.Name)) + + By("Simulating session runner setting status to running") + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[controller.AnnotationTaskStatus] = "running" + return k8sClient.Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Verifying task transitions to Running") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseRunning)) + + By("Simulating session runner setting status to succeeded") + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + pod.Annotations[controller.AnnotationTaskStatus] = "succeeded" + return k8sClient.Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Verifying task transitions to Succeeded") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseSucceeded)) + + By("Verifying pod assignment was cleared") + Eventually(func() bool { + var p corev1.Pod + if err := k8sClient.Get(ctx, podKey, &p); err != nil { + return false + } + _, exists := p.Annotations[controller.AnnotationAssignedTask] + return !exists + }, timeout, interval).Should(BeTrue()) + }) + + It("should mark task failed when session runner reports failure", func() { + By("Creating a session pod in Running state") + Expect(k8sClient.Create(ctx, pod)).Should(Succeed()) + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + pod.Status.Phase = corev1.PodRunning + return k8sClient.Status().Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Creating a persistent-mode task") + Expect(k8sClient.Create(ctx, task)).Should(Succeed()) + + By("Waiting for task to be assigned") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhasePending)) + + By("Simulating session runner reporting failure") + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[controller.AnnotationTaskStatus] = "failed" + return k8sClient.Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Verifying task transitions to Failed") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseFailed)) + }) + + It("should preserve task outputs written by session runner", func() { + By("Creating a session pod in Running state") + Expect(k8sClient.Create(ctx, pod)).Should(Succeed()) + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + pod.Status.Phase = corev1.PodRunning + return k8sClient.Status().Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Creating a persistent-mode task") + Expect(k8sClient.Create(ctx, task)).Should(Succeed()) + + By("Waiting for task to be assigned") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhasePending)) + + By("Simulating session runner writing outputs to task status") + Eventually(func() error { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return err + } + t.Status.Outputs = []string{ + "branch: feat/hello", + "commit: abc123", + "response: SGVsbG8gd29ybGQ=", + } + t.Status.Results = map[string]string{ + "branch": "feat/hello", + "commit": "abc123", + "response": "SGVsbG8gd29ybGQ=", + } + return k8sClient.Status().Update(ctx, &t) + }, timeout, interval).Should(Succeed()) + + By("Simulating session runner setting status to succeeded") + Eventually(func() error { + if err := k8sClient.Get(ctx, podKey, pod); err != nil { + return err + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[controller.AnnotationTaskStatus] = "succeeded" + return k8sClient.Update(ctx, pod) + }, timeout, interval).Should(Succeed()) + + By("Verifying task reaches Succeeded with outputs preserved") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseSucceeded)) + + By("Verifying outputs and results are still present on the task") + var finalTask kelosv1alpha1.Task + Expect(k8sClient.Get(ctx, taskKey, &finalTask)).Should(Succeed()) + Expect(finalTask.Status.Outputs).To(ContainElement("branch: feat/hello")) + Expect(finalTask.Status.Outputs).To(ContainElement("response: SGVsbG8gd29ybGQ=")) + Expect(finalTask.Status.Results).To(HaveKeyWithValue("branch", "feat/hello")) + Expect(finalTask.Status.Results).To(HaveKeyWithValue("response", "SGVsbG8gd29ybGQ=")) + }) + + It("should requeue when no session pod is available", func() { + By("Creating a persistent-mode task without a session pod") + Expect(k8sClient.Create(ctx, task)).Should(Succeed()) + + By("Verifying task reaches Queued phase") + Eventually(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseQueued)) + + By("Verifying task stays in Queued (not assigned)") + Consistently(func() kelosv1alpha1.TaskPhase { + var t kelosv1alpha1.Task + if err := k8sClient.Get(ctx, taskKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, 3*time.Second, interval).Should(Equal(kelosv1alpha1.TaskPhaseQueued)) + }) + }) +}) + +func randomSuffix() string { + return time.Now().Format("150405") + "-" + metav1.Now().Format("000000000")[:4] +} diff --git a/test/integration/suite_test.go b/test/integration/suite_test.go index 73d58536..b297d3c0 100644 --- a/test/integration/suite_test.go +++ b/test/integration/suite_test.go @@ -90,6 +90,13 @@ var _ = BeforeSuite(func() { }).SetupWithManager(mgr) Expect(err).NotTo(HaveOccurred()) + err = (&controller.SessionReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("kelos-controller"), + }).SetupWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) + err = (&controller.TaskSpawnerReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), From 65cf0712c3d48d11334748fb19e13a02f20c3eb8 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:25:59 -0500 Subject: [PATCH 3/9] fix: address review feedback on session runner output capture - Add retry-on-conflict to updateTaskStatus (renamed from updateTaskOutputs) to avoid silently dropping outputs when the SessionReconciler writes concurrently - Always set CompletionTime/StartTime regardless of whether output markers are present, so downstream consumers (TTL, Slack reporter) don't see the task as perpetually in-progress - Replace unbounded bytes.Buffer with a 256KB ring buffer (tailWriter) to cap memory usage from verbose agents Co-Authored-By: Claude Opus 4.6 --- internal/sessionrunner/runner.go | 113 ++++++++++++++++++++------ internal/sessionrunner/runner_test.go | 56 +++++++++++++ 2 files changed, 145 insertions(+), 24 deletions(-) diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index 44dcd547..286fc983 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -221,19 +221,23 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { // Invoke the agent entrypoint and capture outputs. agentOutput, agentErr := r.runAgent(ctx, task) - // Parse and persist outputs to Task status regardless of success/failure. - if outputs := parseOutputs(agentOutput); len(outputs) > 0 { - results := resultsFromOutputs(outputs) - if err := r.updateTaskOutputs(ctx, taskName, outputs, results); err != nil { - fmt.Printf("Error updating task outputs: %v\n", err) - } + // Parse outputs and persist to Task status. + outputs := parseOutputs(agentOutput) + results := resultsFromOutputs(outputs) + if err := r.updateTaskStatus(ctx, taskName, outputs, results); err != nil { + fmt.Printf("Error updating task status: %v\n", err) } return agentErr } +// tailBufferSize is the maximum bytes retained from agent stdout for output +// marker parsing. The markers are always emitted at the end of the run by +// kelos-capture, so only the tail is needed. +const tailBufferSize = 256 * 1024 + // runAgent invokes the agent entrypoint with the task prompt. -// It returns the captured stdout content and any execution error. +// It returns the tail of captured stdout (for output parsing) and any execution error. func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string, error) { entrypoint := "/kelos_entrypoint.sh" @@ -243,32 +247,45 @@ func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string env = append(env, fmt.Sprintf("KELOS_BRANCH=%s", task.Spec.Branch)) } - var buf bytes.Buffer + tail := newTailWriter(tailBufferSize) cmd := exec.CommandContext(ctx, entrypoint, task.Spec.Prompt) cmd.Dir = "/workspace/repo" - cmd.Stdout = io.MultiWriter(os.Stdout, &buf) + cmd.Stdout = io.MultiWriter(os.Stdout, tail) cmd.Stderr = os.Stderr cmd.Env = env err := cmd.Run() - return buf.String(), err + return tail.String(), err } -// updateTaskOutputs writes captured outputs and results to the Task status. -func (r *Runner) updateTaskOutputs(ctx context.Context, taskName string, outputs []string, results map[string]string) error { - task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{}) - if err != nil { - return err - } - task.Status.Outputs = outputs - task.Status.Results = results - now := metav1.Now() - task.Status.CompletionTime = &now - if task.Status.StartTime == nil { - task.Status.StartTime = &now +// updateTaskStatus writes completion timestamps and any captured outputs to the +// Task status. It retries on conflict since the SessionReconciler may write +// concurrently. +func (r *Runner) updateTaskStatus(ctx context.Context, taskName string, outputs []string, results map[string]string) error { + const maxRetries = 3 + for attempt := 0; attempt < maxRetries; attempt++ { + task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{}) + if err != nil { + return err + } + now := metav1.Now() + if task.Status.StartTime == nil { + task.Status.StartTime = &now + } + task.Status.CompletionTime = &now + if len(outputs) > 0 { + task.Status.Outputs = outputs + task.Status.Results = results + } + _, err = r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).UpdateStatus(ctx, task, metav1.UpdateOptions{}) + if err == nil { + return nil + } + if !apierrors.IsConflict(err) { + return err + } } - _, err = r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).UpdateStatus(ctx, task, metav1.UpdateOptions{}) - return err + return fmt.Errorf("failed to update task status after %d retries", maxRetries) } // setTaskStatus sets the kelos.dev/task-status annotation on the pod. @@ -300,6 +317,54 @@ func (r *Runner) setAnnotation(ctx context.Context, key, value string) error { return fmt.Errorf("failed to set annotation %s after %d retries", key, maxRetries) } +// tailWriter is a fixed-size ring buffer that retains only the last N bytes +// written to it. This bounds memory usage when capturing verbose agent output. +type tailWriter struct { + buf []byte + size int + pos int + full bool +} + +func newTailWriter(size int) *tailWriter { + return &tailWriter{buf: make([]byte, size), size: size} +} + +func (tw *tailWriter) Write(p []byte) (int, error) { + n := len(p) + if n >= tw.size { + copy(tw.buf, p[n-tw.size:]) + tw.pos = 0 + tw.full = true + return n, nil + } + space := tw.size - tw.pos + if n <= space { + copy(tw.buf[tw.pos:], p) + tw.pos += n + } else { + copy(tw.buf[tw.pos:], p[:space]) + copy(tw.buf, p[space:]) + tw.pos = n - space + tw.full = true + } + if tw.pos == tw.size { + tw.pos = 0 + tw.full = true + } + return n, nil +} + +func (tw *tailWriter) String() string { + if !tw.full { + return string(tw.buf[:tw.pos]) + } + var b bytes.Buffer + b.Write(tw.buf[tw.pos:]) + b.Write(tw.buf[:tw.pos]) + return b.String() +} + const ( outputStartMarker = "---KELOS_OUTPUTS_START---" outputEndMarker = "---KELOS_OUTPUTS_END---" diff --git a/internal/sessionrunner/runner_test.go b/internal/sessionrunner/runner_test.go index fbc9dcb9..f2993abd 100644 --- a/internal/sessionrunner/runner_test.go +++ b/internal/sessionrunner/runner_test.go @@ -161,3 +161,59 @@ func TestResultsFromOutputs_Empty(t *testing.T) { t.Errorf("expected nil, got %v", got) } } + +func TestTailWriter_SmallWrite(t *testing.T) { + tw := newTailWriter(100) + tw.Write([]byte("hello")) + if got := tw.String(); got != "hello" { + t.Errorf("expected 'hello', got %q", got) + } +} + +func TestTailWriter_ExactFit(t *testing.T) { + tw := newTailWriter(5) + tw.Write([]byte("hello")) + if got := tw.String(); got != "hello" { + t.Errorf("expected 'hello', got %q", got) + } +} + +func TestTailWriter_Overflow(t *testing.T) { + tw := newTailWriter(5) + tw.Write([]byte("hello world")) + if got := tw.String(); got != "world" { + t.Errorf("expected 'world', got %q", got) + } +} + +func TestTailWriter_MultipleWrites(t *testing.T) { + tw := newTailWriter(10) + tw.Write([]byte("aaaa")) + tw.Write([]byte("bbbb")) + tw.Write([]byte("cccc")) + got := tw.String() + // Total written: 12 bytes ("aaaabbbbcccc"), buffer is 10, so last 10 = "aabbbbcccc" + want := "aabbbbcccc" + if got != want { + t.Errorf("expected %q, got %q", want, got) + } +} + +func TestTailWriter_PreservesOutputMarkers(t *testing.T) { + tw := newTailWriter(256) + // Write a bunch of noise first + for i := 0; i < 100; i++ { + tw.Write([]byte("noise line that should be evicted\n")) + } + // Then write the markers at the end + tw.Write([]byte("---KELOS_OUTPUTS_START---\nbranch: main\ncommit: abc\n---KELOS_OUTPUTS_END---\n")) + + got := tw.String() + outputs := parseOutputs(got) + if len(outputs) != 2 { + t.Fatalf("expected 2 outputs, got %d from tail: %q", len(outputs), got[max(0, len(got)-200):]) + } + if outputs[0] != "branch: main" { + t.Errorf("output[0]: expected 'branch: main', got %q", outputs[0]) + } +} From e9948d92a78e8809ae4bd6f4d8189d895af63567 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:34:16 -0500 Subject: [PATCH 4/9] fix: integration test race conditions - Accept either Queued or Pending when verifying initial phase, since the SessionReconciler can assign the task before the first poll - Use crypto/rand for namespace suffixes to avoid collisions when tests run within the same second Co-Authored-By: Claude Opus 4.6 --- test/integration/session_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/integration/session_test.go b/test/integration/session_test.go index 7d45c5f7..ed1ec6fa 100644 --- a/test/integration/session_test.go +++ b/test/integration/session_test.go @@ -1,6 +1,8 @@ package integration import ( + "crypto/rand" + "fmt" "time" . "github.com/onsi/ginkgo/v2" @@ -8,7 +10,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" "github.com/kelos-dev/kelos/internal/controller" @@ -94,20 +95,17 @@ var _ = Describe("Persistent Execution Mode", func() { By("Creating a persistent-mode task") Expect(k8sClient.Create(ctx, task)).Should(Succeed()) - By("Verifying the task transitions to Queued (no Job created)") - Eventually(func() kelosv1alpha1.TaskPhase { + By("Verifying the task transitions to Queued or Pending (no Job created)") + Eventually(func() bool { var t kelosv1alpha1.Task if err := k8sClient.Get(ctx, taskKey, &t); err != nil { - return "" + return false } - return t.Status.Phase - }, timeout, interval).Should(Equal(kelosv1alpha1.TaskPhaseQueued)) + return t.Status.Phase == kelosv1alpha1.TaskPhaseQueued || t.Status.Phase == kelosv1alpha1.TaskPhasePending + }, timeout, interval).Should(BeTrue()) By("Verifying no Job was created") Consistently(func() bool { - var jobList client.ObjectList - _ = jobList - // Just verify via task - if a Job existed, phase would be Pending via TaskReconciler var t kelosv1alpha1.Task if err := k8sClient.Get(ctx, taskKey, &t); err != nil { return false @@ -331,5 +329,7 @@ var _ = Describe("Persistent Execution Mode", func() { }) func randomSuffix() string { - return time.Now().Format("150405") + "-" + metav1.Now().Format("000000000")[:4] + b := make([]byte, 4) + rand.Read(b) + return fmt.Sprintf("%x", b) } From 9fb5ec5ae0f217ba3ed72599191a08e0f890f41e Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:43:30 -0500 Subject: [PATCH 5/9] refactor: consolidate output parsing into internal/capture Move ParseOutputs and ResultsFromOutputs into internal/capture as the single source of truth. The controller delegates to capture, and the session runner imports directly instead of duplicating the logic. Co-Authored-By: Claude Opus 4.6 --- internal/capture/capture.go | 61 ++++++++++++++++++++++++++- internal/controller/output_parser.go | 52 ++--------------------- internal/sessionrunner/runner.go | 61 ++------------------------- internal/sessionrunner/runner_test.go | 12 +++--- 4 files changed, 72 insertions(+), 114 deletions(-) diff --git a/internal/capture/capture.go b/internal/capture/capture.go index bef5887c..e7ae5ccd 100644 --- a/internal/capture/capture.go +++ b/internal/capture/capture.go @@ -14,12 +14,69 @@ import ( ) const ( - markerStart = "---KELOS_OUTPUTS_START---" - markerEnd = "---KELOS_OUTPUTS_END---" + // MarkerStart is the sentinel line that begins the outputs block. + MarkerStart = "---KELOS_OUTPUTS_START---" + // MarkerEnd is the sentinel line that ends the outputs block. + MarkerEnd = "---KELOS_OUTPUTS_END---" + + markerStart = MarkerStart + markerEnd = MarkerEnd agentOutputFile = "/tmp/agent-output.jsonl" commandTimeout = 30 * time.Second ) +// ParseOutputs extracts output lines from log data between the +// ---KELOS_OUTPUTS_START--- and ---KELOS_OUTPUTS_END--- markers. +func ParseOutputs(logData string) []string { + startIdx := strings.Index(logData, MarkerStart) + if startIdx == -1 { + return nil + } + endIdx := strings.Index(logData, MarkerEnd) + if endIdx == -1 || endIdx <= startIdx { + return nil + } + + between := logData[startIdx+len(MarkerStart) : endIdx] + between = strings.TrimSpace(between) + if between == "" { + return nil + } + + lines := strings.Split(between, "\n") + var result []string + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" { + result = append(result, line) + } + } + if len(result) == 0 { + return nil + } + return result +} + +// ResultsFromOutputs builds a key-value map from output lines in "key: value" format. +// Lines that do not contain ": " are skipped. If duplicate keys exist, the last value wins. +func ResultsFromOutputs(outputs []string) map[string]string { + if len(outputs) == 0 { + return nil + } + var result map[string]string + for _, line := range outputs { + key, value, ok := strings.Cut(line, ": ") + if !ok || key == "" { + continue + } + if result == nil { + result = make(map[string]string) + } + result[key] = value + } + return result +} + // Run captures deterministic outputs (branch, commit, PRs, token usage) from // the workspace and emits them between markers to stdout. Returns 0 on success. func Run() int { diff --git a/internal/controller/output_parser.go b/internal/controller/output_parser.go index a32c2422..daa4e0da 100644 --- a/internal/controller/output_parser.go +++ b/internal/controller/output_parser.go @@ -1,60 +1,14 @@ package controller -import "strings" - -const ( - outputStartMarker = "---KELOS_OUTPUTS_START---" - outputEndMarker = "---KELOS_OUTPUTS_END---" -) +import "github.com/kelos-dev/kelos/internal/capture" // ParseOutputs extracts output lines from log data between the // ---KELOS_OUTPUTS_START--- and ---KELOS_OUTPUTS_END--- markers. func ParseOutputs(logData string) []string { - startIdx := strings.Index(logData, outputStartMarker) - if startIdx == -1 { - return nil - } - endIdx := strings.Index(logData, outputEndMarker) - if endIdx == -1 || endIdx <= startIdx { - return nil - } - - between := logData[startIdx+len(outputStartMarker) : endIdx] - between = strings.TrimSpace(between) - if between == "" { - return nil - } - - lines := strings.Split(between, "\n") - var result []string - for _, line := range lines { - line = strings.TrimSpace(line) - if line != "" { - result = append(result, line) - } - } - if len(result) == 0 { - return nil - } - return result + return capture.ParseOutputs(logData) } // ResultsFromOutputs builds a key-value map from output lines in "key: value" format. -// Lines that do not contain ": " are skipped. If duplicate keys exist, the last value wins. func ResultsFromOutputs(outputs []string) map[string]string { - if len(outputs) == 0 { - return nil - } - var result map[string]string - for _, line := range outputs { - key, value, ok := strings.Cut(line, ": ") - if !ok || key == "" { - continue - } - if result == nil { - result = make(map[string]string) - } - result[key] = value - } - return result + return capture.ResultsFromOutputs(outputs) } diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index 286fc983..91db3e85 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -24,7 +24,6 @@ import ( "os" "os/exec" "strconv" - "strings" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -33,6 +32,7 @@ import ( "k8s.io/client-go/rest" kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/capture" kelosversioned "github.com/kelos-dev/kelos/pkg/generated/clientset/versioned" ) @@ -222,8 +222,8 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { agentOutput, agentErr := r.runAgent(ctx, task) // Parse outputs and persist to Task status. - outputs := parseOutputs(agentOutput) - results := resultsFromOutputs(outputs) + outputs := capture.ParseOutputs(agentOutput) + results := capture.ResultsFromOutputs(outputs) if err := r.updateTaskStatus(ctx, taskName, outputs, results); err != nil { fmt.Printf("Error updating task status: %v\n", err) } @@ -364,58 +364,3 @@ func (tw *tailWriter) String() string { b.Write(tw.buf[:tw.pos]) return b.String() } - -const ( - outputStartMarker = "---KELOS_OUTPUTS_START---" - outputEndMarker = "---KELOS_OUTPUTS_END---" -) - -// parseOutputs extracts output lines from log data between markers. -func parseOutputs(logData string) []string { - startIdx := strings.Index(logData, outputStartMarker) - if startIdx == -1 { - return nil - } - endIdx := strings.Index(logData, outputEndMarker) - if endIdx == -1 || endIdx <= startIdx { - return nil - } - - between := logData[startIdx+len(outputStartMarker) : endIdx] - between = strings.TrimSpace(between) - if between == "" { - return nil - } - - lines := strings.Split(between, "\n") - var result []string - for _, line := range lines { - line = strings.TrimSpace(line) - if line != "" { - result = append(result, line) - } - } - if len(result) == 0 { - return nil - } - return result -} - -// resultsFromOutputs builds a key-value map from output lines in "key: value" format. -func resultsFromOutputs(outputs []string) map[string]string { - if len(outputs) == 0 { - return nil - } - var result map[string]string - for _, line := range outputs { - key, value, ok := strings.Cut(line, ": ") - if !ok || key == "" { - continue - } - if result == nil { - result = make(map[string]string) - } - result[key] = value - } - return result -} diff --git a/internal/sessionrunner/runner_test.go b/internal/sessionrunner/runner_test.go index f2993abd..60dd2971 100644 --- a/internal/sessionrunner/runner_test.go +++ b/internal/sessionrunner/runner_test.go @@ -4,6 +4,8 @@ import ( "os" "testing" "time" + + "github.com/kelos-dev/kelos/internal/capture" ) func TestConfigFromEnv_Defaults(t *testing.T) { @@ -119,7 +121,7 @@ func TestParseOutputs(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - got := parseOutputs(tc.input) + got := capture.ParseOutputs(tc.input) if tc.expect == nil { if got != nil { t.Errorf("expected nil, got %v", got) @@ -140,7 +142,7 @@ func TestParseOutputs(t *testing.T) { func TestResultsFromOutputs(t *testing.T) { outputs := []string{"branch: main", "commit: abc123", "cost-usd: 0.05"} - results := resultsFromOutputs(outputs) + results := capture.ResultsFromOutputs(outputs) if results["branch"] != "main" { t.Errorf("branch: expected 'main', got %q", results["branch"]) @@ -154,10 +156,10 @@ func TestResultsFromOutputs(t *testing.T) { } func TestResultsFromOutputs_Empty(t *testing.T) { - if got := resultsFromOutputs(nil); got != nil { + if got := capture.ResultsFromOutputs(nil); got != nil { t.Errorf("expected nil, got %v", got) } - if got := resultsFromOutputs([]string{}); got != nil { + if got := capture.ResultsFromOutputs([]string{}); got != nil { t.Errorf("expected nil, got %v", got) } } @@ -209,7 +211,7 @@ func TestTailWriter_PreservesOutputMarkers(t *testing.T) { tw.Write([]byte("---KELOS_OUTPUTS_START---\nbranch: main\ncommit: abc\n---KELOS_OUTPUTS_END---\n")) got := tw.String() - outputs := parseOutputs(got) + outputs := capture.ParseOutputs(got) if len(outputs) != 2 { t.Fatalf("expected 2 outputs, got %d from tail: %q", len(outputs), got[max(0, len(got)-200):]) } From 3edf5fe346d371f26e4e28939a2a120081283167 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 21:57:52 -0500 Subject: [PATCH 6/9] fix: capture StartTime before agent execution begins Capture the start timestamp at the top of processTask (before runAgent) and pass it to updateTaskStatus as a fallback. Previously metav1.Now() was captured only at completion time and shared for both StartTime and CompletionTime, producing zero-duration metrics for fast-completing tasks. Co-Authored-By: Claude Opus 4.6 --- internal/sessionrunner/runner.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index 91db3e85..f39d08af 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -213,6 +213,8 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { return fmt.Errorf("failed to set task status to running: %w", err) } + startTime := metav1.Now() + // Reset workspace. if err := r.workspace.Reset(ctx, task.Spec.Branch); err != nil { return fmt.Errorf("workspace reset failed: %w", err) @@ -224,7 +226,7 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { // Parse outputs and persist to Task status. outputs := capture.ParseOutputs(agentOutput) results := capture.ResultsFromOutputs(outputs) - if err := r.updateTaskStatus(ctx, taskName, outputs, results); err != nil { + if err := r.updateTaskStatus(ctx, taskName, &startTime, outputs, results); err != nil { fmt.Printf("Error updating task status: %v\n", err) } @@ -261,17 +263,17 @@ func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string // updateTaskStatus writes completion timestamps and any captured outputs to the // Task status. It retries on conflict since the SessionReconciler may write // concurrently. -func (r *Runner) updateTaskStatus(ctx context.Context, taskName string, outputs []string, results map[string]string) error { +func (r *Runner) updateTaskStatus(ctx context.Context, taskName string, startTime *metav1.Time, outputs []string, results map[string]string) error { const maxRetries = 3 for attempt := 0; attempt < maxRetries; attempt++ { task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{}) if err != nil { return err } - now := metav1.Now() if task.Status.StartTime == nil { - task.Status.StartTime = &now + task.Status.StartTime = startTime } + now := metav1.Now() task.Status.CompletionTime = &now if len(outputs) > 0 { task.Status.Outputs = outputs From 9b662e3e6daf6e4a851db714a7283651deffa066 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 22:17:29 -0500 Subject: [PATCH 7/9] fix: address review feedback on marker aliases and CompletionTime - Remove unnecessary markerStart/markerEnd unexported aliases in internal/capture; use the exported constants directly - Use defer in processTask to guarantee updateTaskStatus is called (and CompletionTime written) even when workspace reset fails early Co-Authored-By: Claude Opus 4.6 --- internal/capture/capture.go | 6 ++---- internal/sessionrunner/runner.go | 17 +++++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/capture/capture.go b/internal/capture/capture.go index e7ae5ccd..70c5a281 100644 --- a/internal/capture/capture.go +++ b/internal/capture/capture.go @@ -19,8 +19,6 @@ const ( // MarkerEnd is the sentinel line that ends the outputs block. MarkerEnd = "---KELOS_OUTPUTS_END---" - markerStart = MarkerStart - markerEnd = MarkerEnd agentOutputFile = "/tmp/agent-output.jsonl" commandTimeout = 30 * time.Second ) @@ -84,11 +82,11 @@ func Run() int { if len(outputs) == 0 { return 0 } - fmt.Println(markerStart) + fmt.Println(MarkerStart) for _, line := range outputs { fmt.Println(line) } - fmt.Println(markerEnd) + fmt.Println(MarkerEnd) return 0 } diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index f39d08af..4a3e89f8 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -214,6 +214,14 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { } startTime := metav1.Now() + var outputs []string + var results map[string]string + + defer func() { + if err := r.updateTaskStatus(ctx, taskName, &startTime, outputs, results); err != nil { + fmt.Printf("Error updating task status: %v\n", err) + } + }() // Reset workspace. if err := r.workspace.Reset(ctx, task.Spec.Branch); err != nil { @@ -223,12 +231,9 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { // Invoke the agent entrypoint and capture outputs. agentOutput, agentErr := r.runAgent(ctx, task) - // Parse outputs and persist to Task status. - outputs := capture.ParseOutputs(agentOutput) - results := capture.ResultsFromOutputs(outputs) - if err := r.updateTaskStatus(ctx, taskName, &startTime, outputs, results); err != nil { - fmt.Printf("Error updating task status: %v\n", err) - } + // Parse outputs. + outputs = capture.ParseOutputs(agentOutput) + results = capture.ResultsFromOutputs(outputs) return agentErr } From 7f85c78f5ccecd34065ea0bebf4f65cae149244c Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 22:21:16 -0500 Subject: [PATCH 8/9] fix: unexport marker constants in internal/capture No external package references them directly; keep the API surface minimal. Co-Authored-By: Claude Opus 4.6 --- internal/capture/capture.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/capture/capture.go b/internal/capture/capture.go index 70c5a281..9e25b3f5 100644 --- a/internal/capture/capture.go +++ b/internal/capture/capture.go @@ -14,10 +14,8 @@ import ( ) const ( - // MarkerStart is the sentinel line that begins the outputs block. - MarkerStart = "---KELOS_OUTPUTS_START---" - // MarkerEnd is the sentinel line that ends the outputs block. - MarkerEnd = "---KELOS_OUTPUTS_END---" + markerStart = "---KELOS_OUTPUTS_START---" + markerEnd = "---KELOS_OUTPUTS_END---" agentOutputFile = "/tmp/agent-output.jsonl" commandTimeout = 30 * time.Second @@ -26,16 +24,16 @@ const ( // ParseOutputs extracts output lines from log data between the // ---KELOS_OUTPUTS_START--- and ---KELOS_OUTPUTS_END--- markers. func ParseOutputs(logData string) []string { - startIdx := strings.Index(logData, MarkerStart) + startIdx := strings.Index(logData, markerStart) if startIdx == -1 { return nil } - endIdx := strings.Index(logData, MarkerEnd) + endIdx := strings.Index(logData, markerEnd) if endIdx == -1 || endIdx <= startIdx { return nil } - between := logData[startIdx+len(MarkerStart) : endIdx] + between := logData[startIdx+len(markerStart) : endIdx] between = strings.TrimSpace(between) if between == "" { return nil @@ -82,11 +80,11 @@ func Run() int { if len(outputs) == 0 { return 0 } - fmt.Println(MarkerStart) + fmt.Println(markerStart) for _, line := range outputs { fmt.Println(line) } - fmt.Println(MarkerEnd) + fmt.Println(markerEnd) return 0 } From c7e6b574a8e9a8d8af1d2ee8b8b06892331c0255 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 7 May 2026 22:30:31 -0500 Subject: [PATCH 9/9] fix: use independent context for deferred status update Use context.Background with a 10s timeout in the deferred updateTaskStatus call so that CompletionTime is written even when the parent context is cancelled (e.g. SIGTERM during agent execution). Co-Authored-By: Claude Opus 4.6 --- internal/sessionrunner/runner.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/sessionrunner/runner.go b/internal/sessionrunner/runner.go index 4a3e89f8..4310b9a9 100644 --- a/internal/sessionrunner/runner.go +++ b/internal/sessionrunner/runner.go @@ -218,7 +218,9 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error { var results map[string]string defer func() { - if err := r.updateTaskStatus(ctx, taskName, &startTime, outputs, results); err != nil { + statusCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := r.updateTaskStatus(statusCtx, taskName, &startTime, outputs, results); err != nil { fmt.Printf("Error updating task status: %v\n", err) } }()