Skip to content
98 changes: 93 additions & 5 deletions internal/sessionrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package sessionrunner

import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -215,12 +218,23 @@ func (r *Runner) processTask(ctx context.Context, taskName string) error {
return fmt.Errorf("workspace reset failed: %w", err)
}

// Invoke the agent entrypoint.
return r.runAgent(ctx, task)
// Invoke the agent entrypoint and capture outputs.
agentOutput, agentErr := r.runAgent(ctx, task)

// Parse and persist outputs to Task status regardless of success/failure.
if outputs := parseOutputs(agentOutput); len(outputs) > 0 {
results := resultsFromOutputs(outputs)
if err := r.updateTaskOutputs(ctx, taskName, outputs, results); err != nil {
fmt.Printf("Error updating task outputs: %v\n", err)
}
}

return agentErr
Comment thread
j-bennet marked this conversation as resolved.
}

// runAgent invokes the agent entrypoint with the task prompt.
func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error {
// It returns the captured stdout content and any execution error.
func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) (string, error) {
entrypoint := "/kelos_entrypoint.sh"

// Set branch env var if present.
Expand All @@ -229,13 +243,32 @@ func (r *Runner) runAgent(ctx context.Context, task *kelosv1alpha1.Task) error {
env = append(env, fmt.Sprintf("KELOS_BRANCH=%s", task.Spec.Branch))
}

var buf bytes.Buffer
cmd := exec.CommandContext(ctx, entrypoint, task.Spec.Prompt)
cmd.Dir = "/workspace/repo"
cmd.Stdout = os.Stdout
cmd.Stdout = io.MultiWriter(os.Stdout, &buf)
cmd.Stderr = os.Stderr
cmd.Env = env

return cmd.Run()
err := cmd.Run()
return buf.String(), err
Comment thread
j-bennet marked this conversation as resolved.
Outdated
}

// updateTaskOutputs writes captured outputs and results to the Task status.
func (r *Runner) updateTaskOutputs(ctx context.Context, taskName string, outputs []string, results map[string]string) error {
task, err := r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).Get(ctx, taskName, metav1.GetOptions{})
if err != nil {
return err
}
task.Status.Outputs = outputs
task.Status.Results = results
now := metav1.Now()
task.Status.CompletionTime = &now
if task.Status.StartTime == nil {
task.Status.StartTime = &now
Comment thread
j-bennet marked this conversation as resolved.
Outdated
}
_, err = r.kelosClient.ApiV1alpha1().Tasks(r.config.PodNamespace).UpdateStatus(ctx, task, metav1.UpdateOptions{})
return err
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
}

// setTaskStatus sets the kelos.dev/task-status annotation on the pod.
Expand Down Expand Up @@ -266,3 +299,58 @@ func (r *Runner) setAnnotation(ctx context.Context, key, value string) error {
}
return fmt.Errorf("failed to set annotation %s after %d retries", key, maxRetries)
}

const (
outputStartMarker = "---KELOS_OUTPUTS_START---"
outputEndMarker = "---KELOS_OUTPUTS_END---"
)

// parseOutputs extracts output lines from log data between markers.
func parseOutputs(logData string) []string {
startIdx := strings.Index(logData, outputStartMarker)
if startIdx == -1 {
return nil
}
endIdx := strings.Index(logData, outputEndMarker)
if endIdx == -1 || endIdx <= startIdx {
return nil
}

between := logData[startIdx+len(outputStartMarker) : endIdx]
between = strings.TrimSpace(between)
if between == "" {
return nil
}

lines := strings.Split(between, "\n")
var result []string
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
result = append(result, line)
}
}
if len(result) == 0 {
return nil
}
return result
}

// resultsFromOutputs builds a key-value map from output lines in "key: value" format.
func resultsFromOutputs(outputs []string) map[string]string {
if len(outputs) == 0 {
return nil
}
var result map[string]string
for _, line := range outputs {
key, value, ok := strings.Cut(line, ": ")
if !ok || key == "" {
continue
}
if result == nil {
result = make(map[string]string)
}
result[key] = value
}
return result
}
78 changes: 78 additions & 0 deletions internal/sessionrunner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,81 @@ func TestConfigFromEnv_InvalidMaxTasks(t *testing.T) {
t.Errorf("MaxTasksPerSession: expected 0 on invalid input, got %d", cfg.MaxTasksPerSession)
}
}

func TestParseOutputs(t *testing.T) {
tests := []struct {
name string
input string
expect []string
}{
{
name: "no markers",
input: "some random log output\n",
expect: nil,
},
{
name: "empty between markers",
input: "---KELOS_OUTPUTS_START---\n---KELOS_OUTPUTS_END---\n",
expect: nil,
},
{
name: "single output",
input: "log line\n---KELOS_OUTPUTS_START---\nbranch: main\n---KELOS_OUTPUTS_END---\n",
expect: []string{"branch: main"},
},
{
name: "multiple outputs",
input: "---KELOS_OUTPUTS_START---\nbranch: feat\ncommit: abc123\nresponse: dGVzdA==\n---KELOS_OUTPUTS_END---\n",
expect: []string{"branch: feat", "commit: abc123", "response: dGVzdA=="},
},
{
name: "start without end",
input: "---KELOS_OUTPUTS_START---\nbranch: main\n",
expect: nil,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := parseOutputs(tc.input)
if tc.expect == nil {
if got != nil {
t.Errorf("expected nil, got %v", got)
}
return
}
if len(got) != len(tc.expect) {
t.Fatalf("expected %d outputs, got %d: %v", len(tc.expect), len(got), got)
}
for i, want := range tc.expect {
if got[i] != want {
t.Errorf("output[%d]: expected %q, got %q", i, want, got[i])
}
}
})
}
}

func TestResultsFromOutputs(t *testing.T) {
outputs := []string{"branch: main", "commit: abc123", "cost-usd: 0.05"}
results := resultsFromOutputs(outputs)

if results["branch"] != "main" {
t.Errorf("branch: expected 'main', got %q", results["branch"])
}
if results["commit"] != "abc123" {
t.Errorf("commit: expected 'abc123', got %q", results["commit"])
}
if results["cost-usd"] != "0.05" {
t.Errorf("cost-usd: expected '0.05', got %q", results["cost-usd"])
}
}

func TestResultsFromOutputs_Empty(t *testing.T) {
if got := resultsFromOutputs(nil); got != nil {
t.Errorf("expected nil, got %v", got)
}
if got := resultsFromOutputs([]string{}); got != nil {
t.Errorf("expected nil, got %v", got)
}
}
Loading