Skip to content
57 changes: 55 additions & 2 deletions internal/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,65 @@ import (
)

const (
markerStart = "---KELOS_OUTPUTS_START---"
markerEnd = "---KELOS_OUTPUTS_END---"
markerStart = "---KELOS_OUTPUTS_START---"
markerEnd = "---KELOS_OUTPUTS_END---"

agentOutputFile = "/tmp/agent-output.jsonl"
commandTimeout = 30 * time.Second
)

// ParseOutputs extracts output lines from log data between the
// ---KELOS_OUTPUTS_START--- and ---KELOS_OUTPUTS_END--- markers.
func ParseOutputs(logData string) []string {
startIdx := strings.Index(logData, markerStart)
if startIdx == -1 {
return nil
}
endIdx := strings.Index(logData, markerEnd)
if endIdx == -1 || endIdx <= startIdx {
return nil
}

between := logData[startIdx+len(markerStart) : endIdx]
between = strings.TrimSpace(between)
if between == "" {
return nil
}

lines := strings.Split(between, "\n")
var result []string
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
result = append(result, line)
}
}
if len(result) == 0 {
return nil
}
return result
}

// ResultsFromOutputs builds a key-value map from output lines in "key: value" format.
// Lines that do not contain ": " are skipped. If duplicate keys exist, the last value wins.
func ResultsFromOutputs(outputs []string) map[string]string {
if len(outputs) == 0 {
return nil
}
var result map[string]string
for _, line := range outputs {
key, value, ok := strings.Cut(line, ": ")
if !ok || key == "" {
continue
}
if result == nil {
result = make(map[string]string)
}
result[key] = value
}
return result
}

// Run captures deterministic outputs (branch, commit, PRs, token usage) from
// the workspace and emits them between markers to stdout. Returns 0 on success.
func Run() int {
Expand Down
52 changes: 3 additions & 49 deletions internal/controller/output_parser.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,14 @@
package controller

import "strings"

const (
outputStartMarker = "---KELOS_OUTPUTS_START---"
outputEndMarker = "---KELOS_OUTPUTS_END---"
)
import "github.com/kelos-dev/kelos/internal/capture"

// ParseOutputs extracts output lines from log data between the
// ---KELOS_OUTPUTS_START--- and ---KELOS_OUTPUTS_END--- markers.
func ParseOutputs(logData string) []string {
startIdx := strings.Index(logData, outputStartMarker)
if startIdx == -1 {
return nil
}
endIdx := strings.Index(logData, outputEndMarker)
if endIdx == -1 || endIdx <= startIdx {
return nil
}

between := logData[startIdx+len(outputStartMarker) : endIdx]
between = strings.TrimSpace(between)
if between == "" {
return nil
}

lines := strings.Split(between, "\n")
var result []string
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
result = append(result, line)
}
}
if len(result) == 0 {
return nil
}
return result
return capture.ParseOutputs(logData)
}

// ResultsFromOutputs builds a key-value map from output lines in "key: value" format.
// Lines that do not contain ": " are skipped. If duplicate keys exist, the last value wins.
func ResultsFromOutputs(outputs []string) map[string]string {
if len(outputs) == 0 {
return nil
}
var result map[string]string
for _, line := range outputs {
key, value, ok := strings.Cut(line, ": ")
if !ok || key == "" {
continue
}
if result == nil {
result = make(map[string]string)
}
result[key] = value
}
return result
return capture.ResultsFromOutputs(outputs)
}
115 changes: 110 additions & 5 deletions internal/sessionrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package sessionrunner

import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
Expand All @@ -30,6 +32,7 @@ import (
"k8s.io/client-go/rest"

kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1"
"github.com/kelos-dev/kelos/internal/capture"
kelosversioned "github.com/kelos-dev/kelos/pkg/generated/clientset/versioned"
)

Expand Down Expand Up @@ -210,17 +213,39 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error {
return fmt.Errorf("failed to set task status to running: %w", err)
}

startTime := metav1.Now()
var outputs []string
var results map[string]string

defer func() {
if err := r.updateTaskStatus(ctx, taskName, &startTime, outputs, results); err != nil {
fmt.Printf("Error updating task status: %v\n", err)
}
}()
Comment thread
j-bennet marked this conversation as resolved.

// Reset workspace.
if err := r.workspace.Reset(ctx, task.Spec.Branch); err != nil {
return fmt.Errorf("workspace reset failed: %w", err)
}

// Invoke the agent entrypoint.
return r.runAgent(ctx, task)
// Invoke the agent entrypoint and capture outputs.
agentOutput, agentErr := r.runAgent(ctx, task)

// Parse outputs.
outputs = capture.ParseOutputs(agentOutput)
results = capture.ResultsFromOutputs(outputs)

return agentErr
Comment thread
j-bennet marked this conversation as resolved.
}

// tailBufferSize is the maximum bytes retained from agent stdout for output
// marker parsing. The markers are always emitted at the end of the run by
// kelos-capture, so only the tail is needed.
const tailBufferSize = 256 * 1024

// runAgent invokes the agent entrypoint with the task prompt.
func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error {
// It returns the tail of captured stdout (for output parsing) and any execution error.
func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string, error) {
entrypoint := "/kelos_entrypoint.sh"

// Set branch env var if present.
Expand All @@ -229,13 +254,45 @@ func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error {
env = append(env, fmt.Sprintf("KELOS_BRANCH=%s", task.Spec.Branch))
}

tail := newTailWriter(tailBufferSize)
cmd := exec.CommandContext(ctx, entrypoint, task.Spec.Prompt)
cmd.Dir = "/workspace/repo"
cmd.Stdout = os.Stdout
cmd.Stdout = io.MultiWriter(os.Stdout, tail)
cmd.Stderr = os.Stderr
cmd.Env = env

return cmd.Run()
err := cmd.Run()
return tail.String(), err
}

// updateTaskStatus writes completion timestamps and any captured outputs to the
// Task status. It retries on conflict since the SessionReconciler may write
// concurrently.
func (r *Runner) updateTaskStatus(ctx context.Context, taskName string, startTime *metav1.Time, outputs []string, results map[string]string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{})
if err != nil {
return err
}
if task.Status.StartTime == nil {
task.Status.StartTime = startTime
}
now := metav1.Now()
task.Status.CompletionTime = &now
if len(outputs) > 0 {
task.Status.Outputs = outputs
task.Status.Results = results
}
_, err = r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).UpdateStatus(ctx, task, metav1.UpdateOptions{})
if err == nil {
return nil
}
if !apierrors.IsConflict(err) {
return err
}
}
return fmt.Errorf("failed to update task status after %d retries", maxRetries)
}

// setTaskStatus sets the kelos.dev/task-status annotation on the pod.
Expand Down Expand Up @@ -266,3 +323,51 @@ func (r *Runner) setAnnotation(ctx context.Context, key, value string) error {
}
return fmt.Errorf("failed to set annotation %s after %d retries", key, maxRetries)
}

// tailWriter is a fixed-size ring buffer that retains only the last N bytes
// written to it. This bounds memory usage when capturing verbose agent output.
type tailWriter struct {
buf []byte
size int
pos int
full bool
}

func newTailWriter(size int) *tailWriter {
return &tailWriter{buf: make([]byte, size), size: size}
}

func (tw *tailWriter) Write(p []byte) (int, error) {
n := len(p)
if n >= tw.size {
copy(tw.buf, p[n-tw.size:])
tw.pos = 0
tw.full = true
return n, nil
}
space := tw.size - tw.pos
if n <= space {
copy(tw.buf[tw.pos:], p)
tw.pos += n
} else {
copy(tw.buf[tw.pos:], p[:space])
copy(tw.buf, p[space:])
tw.pos = n - space
tw.full = true
}
if tw.pos == tw.size {
tw.pos = 0
tw.full = true
}
return n, nil
}

func (tw *tailWriter) String() string {
if !tw.full {
return string(tw.buf[:tw.pos])
}
var b bytes.Buffer
b.Write(tw.buf[tw.pos:])
b.Write(tw.buf[:tw.pos])
return b.String()
}
Loading
Loading