diff --git a/api/v1alpha1/taskspawner_types.go b/api/v1alpha1/taskspawner_types.go index 5fdfc200..2e88ed10 100644 --- a/api/v1alpha1/taskspawner_types.go +++ b/api/v1alpha1/taskspawner_types.go @@ -519,6 +519,102 @@ type GenericWebhookFilter struct { Pattern string `json:"pattern,omitempty"` } +// ContextSource declares an external HTTP endpoint to query before task +// creation. The response body (optionally filtered via JSONPath) is made +// available as a .Context.NAME template variable. +type ContextSource struct { + // Name identifies this context source. The fetched value is available + // as .Context.NAME in promptTemplate, branch, and metadata templates. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=64 + // +kubebuilder:validation:Pattern=`^[a-zA-Z][a-zA-Z0-9_]*$` + Name string `json:"name"` + + // URL is the HTTP(S) endpoint to fetch. Supports Go text/template + // variables from the work item (e.g., "https://api.example.com/items/{{.Number}}"). + // HTTPS is required unless AllowInsecure is set. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + URL string `json:"url"` + + // Method is the HTTP method to use. Defaults to GET. + // +kubebuilder:validation:Enum=GET;POST + // +kubebuilder:default=GET + // +optional + Method string `json:"method,omitempty"` + + // Headers are static HTTP headers to include in the request. + // Values support Go text/template variables from the work item. + // +optional + Headers map[string]string `json:"headers,omitempty"` + + // HeadersFrom references Secrets whose data keys map to HTTP header + // values. These are merged with inline Headers; HeadersFrom values + // take precedence on conflict. + // +optional + HeadersFrom []HeaderFromSecret `json:"headersFrom,omitempty"` + + // Body is a Go text/template for POST request bodies. + // +optional + Body string `json:"body,omitempty"` + + // JSONPathFilter is a JSONPath expression applied to the JSON response + // body (e.g., "$.data.value"). When set, only the extracted value is + // stored as the context variable. When empty, the entire response body + // is stored as a string. Uses the same JSONPath syntax as generic + // webhook fieldMapping. + // +optional + JSONPathFilter string `json:"jsonPathFilter,omitempty"` + + // AllowInsecure permits plain HTTP (non-TLS) URLs. Defaults to false. + // +optional + AllowInsecure bool `json:"allowInsecure,omitempty"` + + // TimeoutSeconds is the per-request timeout. Defaults to 10. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=60 + // +kubebuilder:default=10 + // +optional + TimeoutSeconds *int32 `json:"timeoutSeconds,omitempty"` + + // MaxResponseBytes limits the response body size read from the + // endpoint. Prevents oversized responses from inflating prompts. + // Defaults to 32768 (32 KiB). + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=131072 + // +kubebuilder:default=32768 + // +optional + MaxResponseBytes *int32 `json:"maxResponseBytes,omitempty"` + + // Required when true causes task creation to be skipped for this work + // item if the context source fetch fails. When false (default), a + // failed fetch produces an empty string for the context variable and + // logs a warning. + // +optional + Required bool `json:"required,omitempty"` +} + +// HeaderFromSecret maps a single HTTP header to a value stored in a +// Kubernetes Secret. +type HeaderFromSecret struct { + // Header is the HTTP header name (e.g., "Authorization"). + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Header string `json:"header"` + + // SecretRef references the Secret containing the header value. + // The Secret must be in the same namespace as the TaskSpawner. + // +kubebuilder:validation:Required + SecretRef SecretReference `json:"secretRef"` + + // Key is the data key within the Secret whose value is used as the + // header value. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Key string `json:"key"` +} + // TaskTemplateMetadata holds optional labels and annotations for spawned Tasks. type TaskTemplateMetadata struct { // Labels are merged into the spawned Task's labels. Values support Go @@ -593,6 +689,7 @@ type TaskTemplate struct { // GitHub webhook sources: {{.Event}}, {{.Action}}, {{.Sender}}, {{.Ref}}, {{.Repository}}, {{.Payload}} (full payload access) // Linear webhook sources: {{.Type}}, {{.Action}}, {{.State}}, {{.Labels}}, {{.IssueID}}, {{.Payload}} // Cron sources: {{.Time}}, {{.Schedule}} + // When contextSources are configured: .Context.NAME for each source // +optional Branch string `json:"branch,omitempty"` @@ -603,6 +700,7 @@ type TaskTemplate struct { // GitHub webhook sources: {{.Event}}, {{.Action}}, {{.Sender}}, {{.Ref}}, {{.Repository}}, {{.Payload}} (full payload access) // Linear webhook sources: {{.Type}}, {{.Action}}, {{.State}}, {{.Labels}}, {{.IssueID}}, {{.Payload}} // Cron sources: {{.Time}}, {{.Schedule}} + // When contextSources are configured: .Context.NAME for each source // +optional PromptTemplate string `json:"promptTemplate,omitempty"` @@ -625,6 +723,15 @@ type TaskTemplate struct { // +optional Metadata *TaskTemplateMetadata `json:"metadata,omitempty"` + // ContextSources declares external HTTP endpoints to query before task + // creation. Each source's response is available as .Context.NAME + // in promptTemplate, branch, and metadata templates. Sources are + // fetched in parallel during the discovery cycle. + // +optional + // +kubebuilder:validation:MaxItems=8 + // +kubebuilder:validation:XValidation:rule="self.all(a, self.exists_one(b, b.name == a.name))",message="contextSources names must be unique" + ContextSources []ContextSource `json:"contextSources,omitempty"` + // UpstreamRepo is the upstream repository in "owner/repo" format. // When set, spawned Tasks inherit this value and inject // KELOS_UPSTREAM_REPO into the agent container. This is typically diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index ebe09fee..e904cfba 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -148,6 +148,43 @@ func (in *AgentDefinition) DeepCopy() *AgentDefinition { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContextSource) DeepCopyInto(out *ContextSource) { + *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.HeadersFrom != nil { + in, out := &in.HeadersFrom, &out.HeadersFrom + *out = make([]HeaderFromSecret, len(*in)) + copy(*out, *in) + } + if in.TimeoutSeconds != nil { + in, out := &in.TimeoutSeconds, &out.TimeoutSeconds + *out = new(int32) + **out = **in + } + if in.MaxResponseBytes != nil { + in, out := &in.MaxResponseBytes, &out.MaxResponseBytes + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContextSource. +func (in *ContextSource) DeepCopy() *ContextSource { + if in == nil { + return nil + } + out := new(ContextSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Credentials) DeepCopyInto(out *Credentials) { *out = *in @@ -509,6 +546,22 @@ func (in *GitRemote) DeepCopy() *GitRemote { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HeaderFromSecret) DeepCopyInto(out *HeaderFromSecret) { + *out = *in + out.SecretRef = in.SecretRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HeaderFromSecret. +func (in *HeaderFromSecret) DeepCopy() *HeaderFromSecret { + if in == nil { + return nil + } + out := new(HeaderFromSecret) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Jira) DeepCopyInto(out *Jira) { *out = *in @@ -1078,6 +1131,13 @@ func (in *TaskTemplate) DeepCopyInto(out *TaskTemplate) { *out = new(TaskTemplateMetadata) (*in).DeepCopyInto(*out) } + if in.ContextSources != nil { + in, out := &in.ContextSources, &out.ContextSources + *out = make([]ContextSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskTemplate. diff --git a/cmd/kelos-spawner/main.go b/cmd/kelos-spawner/main.go index 8b6587ff..cab92f84 100644 --- a/cmd/kelos-spawner/main.go +++ b/cmd/kelos-spawner/main.go @@ -24,6 +24,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/contextfetch" "github.com/kelos-dev/kelos/internal/githubapp" "github.com/kelos-dev/kelos/internal/logging" "github.com/kelos-dev/kelos/internal/reporting" @@ -342,6 +343,16 @@ func runCycleWithSourceCore(ctx context.Context, cl client.Client, key types.Nam maxTotalTasks = int(*ts.Spec.MaxTotalTasks) } + var contextFetcher *contextfetch.Fetcher + if len(ts.Spec.TaskTemplate.ContextSources) > 0 { + contextFetcher = &contextfetch.Fetcher{ + Client: cl, + HTTPClient: http.DefaultClient, + Namespace: ts.Namespace, + Logger: log, + } + } + newTasksCreated := 0 for _, item := range newItems { // Enforce max concurrency limit @@ -360,6 +371,16 @@ func runCycleWithSourceCore(ctx context.Context, cl client.Client, key types.Nam templateVars := source.WorkItemToTemplateVars(item) + // Enrich with external context sources + if contextFetcher != nil { + contextData, err := contextFetcher.FetchAll(ctx, ts.Spec.TaskTemplate.ContextSources, templateVars) + if err != nil { + log.Error(err, "Fetching context sources", "item", item.ID) + continue + } + templateVars["Context"] = contextData + } + tb, err := taskbuilder.NewTaskBuilder(cl) if err != nil { log.Error(err, "creating task builder", "item", item.ID) diff --git a/cmd/kelos-spawner/main_test.go b/cmd/kelos-spawner/main_test.go index 6d732651..f2114e42 100644 --- a/cmd/kelos-spawner/main_test.go +++ b/cmd/kelos-spawner/main_test.go @@ -29,6 +29,7 @@ import ( kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" "github.com/kelos-dev/kelos/internal/reporting" "github.com/kelos-dev/kelos/internal/source" + "strings" ) var noToken = func(context.Context) (string, error) { return "", nil } @@ -2518,3 +2519,181 @@ func TestRunOnce_ReturnsSourcePollInterval(t *testing.T) { t.Fatalf("Interval = %v, want %v", interval, 15*time.Second) } } + +func TestRunCycleWithSource_ContextSources(t *testing.T) { + // Serve a fake external API that returns context data. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"severity":"critical","affected_users":42}`)) + })) + defer srv.Close() + + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ctx-spawner", + Namespace: "default", + UID: "ctx-spawner-uid", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + WorkspaceRef: &kelosv1alpha1.WorkspaceReference{Name: "test-ws"}, + PromptTemplate: `Fix bug #{{.Number}}: {{.Title}} +{{- if .Context.errorInfo}} + +Error context: {{.Context.errorInfo}} +{{- end}}`, + ContextSources: []kelosv1alpha1.ContextSource{{ + Name: "errorInfo", + URL: srv.URL + "/errors/{{.Number}}", + AllowInsecure: true, + }}, + }, + }, + } + + cl, key := setupTest(t, ts) + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "10", Number: 10, Title: "Connection timeout"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var taskList kelosv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task, got %d", len(taskList.Items)) + } + + prompt := taskList.Items[0].Spec.Prompt + if !strings.Contains(prompt, "Connection timeout") { + t.Errorf("Prompt should contain title, got: %s", prompt) + } + if !strings.Contains(prompt, `{"severity":"critical","affected_users":42}`) { + t.Errorf("Prompt should contain context source data, got: %s", prompt) + } +} + +func TestRunCycleWithSource_ContextSources_OptionalFailure(t *testing.T) { + // Serve a fake external API that always fails. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ctx-opt-spawner", + Namespace: "default", + UID: "ctx-opt-spawner-uid", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + WorkspaceRef: &kelosv1alpha1.WorkspaceReference{Name: "test-ws"}, + PromptTemplate: `Fix #{{.Number}}`, + ContextSources: []kelosv1alpha1.ContextSource{{ + Name: "optional", + URL: srv.URL, + Required: false, + AllowInsecure: true, + }}, + }, + }, + } + + cl, key := setupTest(t, ts) + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "20", Number: 20, Title: "Bug"}, + }, + } + + // Task should still be created despite context source failure. + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var taskList kelosv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task, got %d", len(taskList.Items)) + } +} + +func TestRunCycleWithSource_ContextSources_RequiredFailure(t *testing.T) { + // Serve a fake external API that always fails. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ctx-req-spawner", + Namespace: "default", + UID: "ctx-req-spawner-uid", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + WorkspaceRef: &kelosv1alpha1.WorkspaceReference{Name: "test-ws"}, + PromptTemplate: `Fix #{{.Number}}`, + ContextSources: []kelosv1alpha1.ContextSource{{ + Name: "required", + URL: srv.URL, + Required: true, + AllowInsecure: true, + }}, + }, + }, + } + + cl, key := setupTest(t, ts) + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "30", Number: 30, Title: "Bug"}, + }, + } + + // Cycle should succeed (error is per-item, not cycle-level) but no task created. + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var taskList kelosv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 0 { + t.Fatalf("Expected 0 tasks (required context source failed), got %d", len(taskList.Items)) + } +} diff --git a/docs/integration.md b/docs/integration.md index c4f4cb37..6ed80259 100644 --- a/docs/integration.md +++ b/docs/integration.md @@ -383,7 +383,7 @@ All `promptTemplate` and `branch` fields support Go `text/template` syntax. Avai | `{{.ID}}` | Issue number (string) | PR number (string) | Issue/PR number or commit ID | Issue key (e.g., `ENG-42`) | Linear resource ID | Mapped `id` field (required) | Date-time string | | `{{.Number}}` | Issue number (int) | PR number (int) | Issue/PR number | `0` | Empty | Empty | `0` | | `{{.Title}}` | Issue title | PR title | Issue/PR title | Issue summary | Resource title | Mapped `title` field (if present) | Trigger time (RFC3339) | -| `{{.Body}}` | Issue body | PR body | Issue/PR/comment body | Issue description | Empty | Mapped `body` field (if present) | Empty | +| `{{.Body}}` | Issue body | PR body | Issue/PR body | Issue description | Empty | Mapped `body` field (if present) | Empty | | `{{.URL}}` | Issue URL | PR URL | Issue/PR URL | Issue URL | Empty | Mapped `url` field (if present) | Empty | | `{{.Labels}}` | Comma-separated | Comma-separated | Empty | Comma-separated | Comma-separated | Empty | Empty | | `{{.Comments}}` | Issue comments | PR comments | Empty | Issue comments | Empty | Empty | Empty | @@ -402,6 +402,8 @@ All `promptTemplate` and `branch` fields support Go `text/template` syntax. Avai | `{{.Type}}` | Empty | Empty | Empty | Empty | Resource type (e.g., `"Issue"`, `"Comment"`) | Empty | Empty | | `{{.State}}` | Empty | Empty | Empty | Empty | Workflow state (e.g., `"Todo"`, `"In Progress"`) | Empty | Empty | | `{{.IssueID}}` | Empty | Empty | Empty | Empty | Parent issue ID (Comment events only) | Empty | Empty | +| `{{.CommentBody}}` | Empty | Empty | Comment/review body (`issue_comment`, `pull_request_review`, `pull_request_review_comment`) | Empty | Empty | Empty | Empty | +| `{{.CommentURL}}` | Empty | Empty | Comment/review HTML URL (`issue_comment`, `pull_request_review`, `pull_request_review_comment`) | Empty | Empty | Empty | Empty | | `{{.Time}}` | Empty | Empty | Empty | Empty | Empty | Empty | Trigger time (RFC3339) | > **Generic Webhook only:** any additional keys you declare in `fieldMapping` are also exposed as top-level variables. For example, `fieldMapping: {severity: "$.level"}` makes `{{.severity}}` available in templates. diff --git a/docs/reference.md b/docs/reference.md index 0c561248..ab0e597a 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -207,7 +207,7 @@ The `promptTemplate` field uses Go `text/template` syntax. Available variables d | `{{.ID}}` | Unique identifier | Issue/PR number as string (e.g., `"42"`) | Pull request number as string | Issue/PR number or commit ID | Linear resource ID | Mapped `id` field (required) | Date-time string (e.g., `"20260207-0900"`) | | `{{.Number}}` | Issue or PR number | Issue/PR number (e.g., `42`) | Pull request number | Issue/PR number (when available) | Empty | Empty | `0` | | `{{.Title}}` | Title of the work item | Issue/PR title | Pull request title | Issue/PR title or "Push to <branch>" | Resource title | Mapped `title` field (if present) | Trigger time (RFC3339) | -| `{{.Body}}` | Body text | Issue/PR body | Pull request body | Issue/PR/comment body | Empty | Mapped `body` field (if present) | Empty | +| `{{.Body}}` | Body text | Issue/PR body | Pull request body | Issue/PR body | Empty | Mapped `body` field (if present) | Empty | | `{{.URL}}` | URL to the source item | GitHub HTML URL | GitHub PR URL | Issue/PR HTML URL | Empty | Mapped `url` field (if present) | Empty | | `{{.Labels}}` | Comma-separated labels | Issue/PR labels | Pull request labels | Empty | Issue labels | Empty | Empty | | `{{.Comments}}` | Concatenated comments | Issue/PR comments | PR conversation comments | Empty | Empty | Empty | Empty | @@ -226,6 +226,8 @@ The `promptTemplate` field uses Go `text/template` syntax. Available variables d | `{{.Type}}` | Resource type | Empty | Empty | Empty | Resource type (e.g., `"Issue"`, `"Comment"`) | Empty | Empty | | `{{.State}}` | Workflow state | Empty | Empty | Empty | Current state name (e.g., `"Todo"`, `"In Progress"`) | Empty | Empty | | `{{.IssueID}}` | Parent issue ID | Empty | Empty | Empty | Parent issue ID (Comment events only) | Empty | Empty | +| `{{.CommentBody}}` | Comment or review body | Empty | Empty | Comment/review body (`issue_comment`, `pull_request_review`, `pull_request_review_comment` events) | Empty | Empty | Empty | +| `{{.CommentURL}}` | Comment or review URL | Empty | Empty | Comment/review HTML URL (`issue_comment`, `pull_request_review`, `pull_request_review_comment` events) | Empty | Empty | Empty | | `{{.Time}}` | Trigger time (RFC3339) | Empty | Empty | Empty | Empty | Empty | Cron tick time (e.g., `"2026-02-07T09:00:00Z"`) | | `{{.Schedule}}` | Cron schedule expression | Empty | Empty | Empty | Empty | Empty | Schedule string (e.g., `"0 * * * *"`) | diff --git a/internal/contextfetch/fetcher.go b/internal/contextfetch/fetcher.go new file mode 100644 index 00000000..169bbd38 --- /dev/null +++ b/internal/contextfetch/fetcher.go @@ -0,0 +1,249 @@ +package contextfetch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "sync" + "text/template" + "time" + + "github.com/PaesslerAG/jsonpath" + "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + v1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" +) + +const ( + defaultTimeoutSeconds = 10 + defaultMaxResponseBytes = 32768 + defaultMethod = "GET" +) + +// Fetcher fetches external context sources and returns their results as +// template variables. +type Fetcher struct { + Client client.Client + HTTPClient *http.Client + Namespace string + Logger logr.Logger +} + +// FetchAll fetches all context sources in parallel and returns a map +// suitable for injection as templateVars["Context"]. The templateVars +// parameter provides work item variables for rendering URL and header +// templates. +func (f *Fetcher) FetchAll(ctx context.Context, sources []v1alpha1.ContextSource, templateVars map[string]interface{}) (map[string]interface{}, error) { + var mu sync.Mutex + result := make(map[string]interface{}, len(sources)) + + g, gctx := errgroup.WithContext(ctx) + for _, src := range sources { + g.Go(func() error { + val, err := f.fetchOne(gctx, src, templateVars) + if err != nil { + if src.Required { + return fmt.Errorf("required context source %q: %w", src.Name, err) + } + if gctx.Err() != nil { + // Context was cancelled (likely by a required source failing); + // skip logging to avoid misleading per-source error noise. + mu.Lock() + result[src.Name] = "" + mu.Unlock() + return nil + } + f.Logger.Error(err, "Context source fetch failed, using empty value", "source", src.Name) + mu.Lock() + result[src.Name] = "" + mu.Unlock() + return nil + } + mu.Lock() + result[src.Name] = val + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + return result, nil +} + +func (f *Fetcher) fetchOne(ctx context.Context, src v1alpha1.ContextSource, templateVars map[string]interface{}) (string, error) { + // Render URL template + renderedURL, err := renderTemplateStr("url", src.URL, templateVars) + if err != nil { + return "", fmt.Errorf("rendering URL template: %w", err) + } + + // Validate URL scheme + if err := validateURLScheme(renderedURL, src.AllowInsecure); err != nil { + return "", err + } + + // Resolve headers + headers, err := f.resolveHeaders(ctx, src, templateVars) + if err != nil { + return "", fmt.Errorf("resolving headers: %w", err) + } + + // Build request body for POST + var bodyReader io.Reader + if src.Body != "" { + rendered, err := renderTemplateStr("body", src.Body, templateVars) + if err != nil { + return "", fmt.Errorf("rendering body template: %w", err) + } + bodyReader = strings.NewReader(rendered) + } + + method := src.Method + if method == "" { + method = defaultMethod + } + + timeoutSec := defaultTimeoutSeconds + if src.TimeoutSeconds != nil { + timeoutSec = int(*src.TimeoutSeconds) + } + + maxBytes := int64(defaultMaxResponseBytes) + if src.MaxResponseBytes != nil { + maxBytes = int64(*src.MaxResponseBytes) + } + + reqCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, method, renderedURL, bodyReader) + if err != nil { + return "", fmt.Errorf("creating HTTP request: %w", err) + } + for k, v := range headers { + req.Header.Set(k, v) + } + if method == "POST" && bodyReader != nil && req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + + resp, err := f.HTTPClient.Do(req) + if err != nil { + return "", fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("HTTP %d from %s", resp.StatusCode, renderedURL) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, maxBytes+1)) + if err != nil { + return "", fmt.Errorf("reading response body: %w", err) + } + if int64(len(body)) > maxBytes { + return "", fmt.Errorf("response body exceeds maxResponseBytes (%d)", maxBytes) + } + + // Apply JSONPath filter if configured + if src.JSONPathFilter != "" { + return applyJSONPathFilter(body, src.JSONPathFilter) + } + + return string(body), nil +} + +func (f *Fetcher) resolveHeaders(ctx context.Context, src v1alpha1.ContextSource, templateVars map[string]interface{}) (map[string]string, error) { + headers := make(map[string]string, len(src.Headers)+len(src.HeadersFrom)) + + // Render static headers (support template variables) + for k, v := range src.Headers { + rendered, err := renderTemplateStr("header-"+k, v, templateVars) + if err != nil { + return nil, fmt.Errorf("rendering header %q: %w", k, err) + } + headers[k] = rendered + } + + // Resolve headers from Secrets + for _, hfs := range src.HeadersFrom { + secret := &corev1.Secret{} + key := client.ObjectKey{Name: hfs.SecretRef.Name, Namespace: f.Namespace} + if err := f.Client.Get(ctx, key, secret); err != nil { + return nil, fmt.Errorf("reading Secret %q for header %q: %w", hfs.SecretRef.Name, hfs.Header, err) + } + val, ok := secret.Data[hfs.Key] + if !ok { + return nil, fmt.Errorf("key %q not found in Secret %q", hfs.Key, hfs.SecretRef.Name) + } + headers[hfs.Header] = string(val) + } + + return headers, nil +} + +func validateURLScheme(rawURL string, allowInsecure bool) error { + parsed, err := url.Parse(rawURL) + if err != nil { + return fmt.Errorf("invalid URL %q: %w", rawURL, err) + } + switch parsed.Scheme { + case "https": + return nil + case "http": + if allowInsecure { + return nil + } + return fmt.Errorf("HTTP URLs are not allowed without allowInsecure: %s", rawURL) + default: + return fmt.Errorf("unsupported URL scheme %q (only http/https are allowed)", parsed.Scheme) + } +} + +func applyJSONPathFilter(body []byte, expr string) (string, error) { + var parsed interface{} + if err := json.Unmarshal(body, &parsed); err != nil { + return "", fmt.Errorf("parsing JSON response for JSONPath filter: %w", err) + } + + val, err := jsonpath.Get(expr, parsed) + if err != nil { + return "", fmt.Errorf("evaluating JSONPath %q: %w", expr, err) + } + + // Marshal complex values back to JSON; scalars use fmt. + switch v := val.(type) { + case string: + return v, nil + case float64, bool, nil: + return fmt.Sprintf("%v", v), nil + default: + b, err := json.Marshal(v) + if err != nil { + return "", fmt.Errorf("marshaling JSONPath result: %w", err) + } + return string(b), nil + } +} + +func renderTemplateStr(name, tmplStr string, vars map[string]interface{}) (string, error) { + tmpl, err := template.New(name).Option("missingkey=error").Parse(tmplStr) + if err != nil { + return "", err + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, vars); err != nil { + return "", err + } + return buf.String(), nil +} diff --git a/internal/contextfetch/fetcher_test.go b/internal/contextfetch/fetcher_test.go new file mode 100644 index 00000000..fb99c966 --- /dev/null +++ b/internal/contextfetch/fetcher_test.go @@ -0,0 +1,590 @@ +package contextfetch + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + v1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" +) + +func newTestScheme() *runtime.Scheme { + s := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(s)) + utilruntime.Must(v1alpha1.AddToScheme(s)) + return s +} + +func int32Ptr(v int32) *int32 { return &v } + +func newFetcher(opts ...func(*Fetcher)) *Fetcher { + f := &Fetcher{ + HTTPClient: http.DefaultClient, + Namespace: "default", + Logger: zap.New(zap.UseDevMode(true)), + } + for _, o := range opts { + o(f) + } + return f +} + +func TestFetchAll_BasicGET(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("expected GET, got %s", r.Method) + } + w.Write([]byte(`{"status":"ok"}`)) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "test", + URL: srv.URL, + AllowInsecure: true, + }} + vars := map[string]interface{}{"Number": 42} + + result, err := f.FetchAll(context.Background(), sources, vars) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["test"]; got != `{"status":"ok"}` { + t.Errorf("expected {\"status\":\"ok\"}, got %v", got) + } +} + +func TestFetchAll_POSTWithBody(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + w.Write([]byte("created")) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "post", + URL: srv.URL, + Method: "POST", + Body: `{"id":{{.Number}}}`, + AllowInsecure: true, + }} + vars := map[string]interface{}{"Number": 42} + + result, err := f.FetchAll(context.Background(), sources, vars) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["post"]; got != "created" { + t.Errorf("expected 'created', got %v", got) + } +} + +func TestFetchAll_URLTemplateRendering(t *testing.T) { + var requestedPath string + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestedPath = r.URL.Path + w.Write([]byte("ok")) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "url", + URL: srv.URL + "/items/{{.Number}}", + AllowInsecure: true, + }} + vars := map[string]interface{}{"Number": 99} + + _, err := f.FetchAll(context.Background(), sources, vars) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if requestedPath != "/items/99" { + t.Errorf("expected path /items/99, got %s", requestedPath) + } +} + +func TestFetchAll_JSONPathFilter(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": map[string]interface{}{ + "value": "extracted", + }, + }) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "filtered", + URL: srv.URL, + JSONPathFilter: "$.data.value", + AllowInsecure: true, + }} + vars := map[string]interface{}{} + + result, err := f.FetchAll(context.Background(), sources, vars) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["filtered"]; got != "extracted" { + t.Errorf("expected 'extracted', got %v", got) + } +} + +func TestFetchAll_JSONPathFilter_ComplexValue(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": []interface{}{"a", "b"}, + }) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "arr", + URL: srv.URL, + JSONPathFilter: "$.items", + AllowInsecure: true, + }} + + result, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["arr"]; got != `["a","b"]` { + t.Errorf("expected [\"a\",\"b\"], got %v", got) + } +} + +func TestFetchAll_HeadersFromSecret(t *testing.T) { + var gotAuth string + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + w.Write([]byte("ok")) + })) + defer srv.Close() + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "my-secret", Namespace: "default"}, + Data: map[string][]byte{"token": []byte("Bearer s3cret")}, + } + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(secret).Build() + + f := newFetcher(func(f *Fetcher) { + f.HTTPClient = srv.Client() + f.Client = cl + }) + sources := []v1alpha1.ContextSource{{ + Name: "auth", + URL: srv.URL, + HeadersFrom: []v1alpha1.HeaderFromSecret{{ + Header: "Authorization", + SecretRef: v1alpha1.SecretReference{Name: "my-secret"}, + Key: "token", + }}, + AllowInsecure: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotAuth != "Bearer s3cret" { + t.Errorf("expected 'Bearer s3cret', got %q", gotAuth) + } +} + +func TestFetchAll_StaticHeaders(t *testing.T) { + var gotAccept string + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAccept = r.Header.Get("Accept") + w.Write([]byte("ok")) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "hdrs", + URL: srv.URL, + Headers: map[string]string{"Accept": "application/json"}, + AllowInsecure: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotAccept != "application/json" { + t.Errorf("expected application/json, got %q", gotAccept) + } +} + +func TestFetchAll_HTTPSRequired(t *testing.T) { + f := newFetcher() + sources := []v1alpha1.ContextSource{{ + Name: "insecure", + URL: "http://example.com/data", + Required: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected error for HTTP URL without allowInsecure") + } + if !strings.Contains(err.Error(), "HTTP URLs are not allowed") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestFetchAll_HTTPAllowInsecure(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ok")) + })) + defer srv.Close() + + f := newFetcher() + sources := []v1alpha1.ContextSource{{ + Name: "insecure", + URL: srv.URL, + AllowInsecure: true, + }} + + result, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["insecure"]; got != "ok" { + t.Errorf("expected 'ok', got %v", got) + } +} + +func TestFetchAll_RequiredSourceFails(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "req", + URL: srv.URL, + Required: true, + AllowInsecure: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected error for required source failure") + } + if !strings.Contains(err.Error(), "required context source") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestFetchAll_OptionalSourceFails(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "opt", + URL: srv.URL, + Required: false, + AllowInsecure: true, + }} + + result, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["opt"]; got != "" { + t.Errorf("expected empty string for failed optional source, got %v", got) + } +} + +func TestFetchAll_OptionalSourceCancelledByRequired(t *testing.T) { + // When a required source fails, errgroup cancels the context. In-flight + // optional sources should NOT log "Context source fetch failed" — verify + // by capturing log output. + slowSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Slow enough that the required source fails first. + time.Sleep(2 * time.Second) + w.Write([]byte("slow")) + })) + defer slowSrv.Close() + + failSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer failSrv.Close() + + var logBuf strings.Builder + logger := zap.New(zap.WriteTo(&logBuf), zap.UseDevMode(true)) + + f := newFetcher(func(f *Fetcher) { f.Logger = logger }) + sources := []v1alpha1.ContextSource{ + {Name: "optional-slow", URL: slowSrv.URL, Required: false, AllowInsecure: true, TimeoutSeconds: int32Ptr(5)}, + {Name: "required-fail", URL: failSrv.URL, Required: true, AllowInsecure: true}, + } + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("Expected error from required source failure") + } + if strings.Contains(logBuf.String(), "Context source fetch failed") { + t.Error("Optional source logged misleading 'fetch failed' when it was actually cancelled by required source failure") + } +} + +func TestFetchAll_Timeout(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Second) + w.Write([]byte("late")) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "slow", + URL: srv.URL, + Required: true, + TimeoutSeconds: int32Ptr(1), + AllowInsecure: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected timeout error") + } +} + +func TestFetchAll_ResponseSizeLimit(t *testing.T) { + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Write more than 64 bytes + w.Write([]byte(strings.Repeat("x", 100))) + })) + defer srv.Close() + + f := newFetcher(func(f *Fetcher) { f.HTTPClient = srv.Client() }) + sources := []v1alpha1.ContextSource{{ + Name: "big", + URL: srv.URL, + Required: true, + MaxResponseBytes: int32Ptr(64), + AllowInsecure: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected error for oversized response") + } + if !strings.Contains(err.Error(), "maxResponseBytes") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestFetchAll_MultipleSources(t *testing.T) { + srv1 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("data1")) + })) + defer srv1.Close() + + srv2 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("data2")) + })) + defer srv2.Close() + + // TLS tests need individual clients; use AllowInsecure + plain HTTP instead + plainSrv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("data1")) + })) + defer plainSrv1.Close() + + plainSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("data2")) + })) + defer plainSrv2.Close() + + f := newFetcher() + sources := []v1alpha1.ContextSource{ + {Name: "src1", URL: plainSrv1.URL, AllowInsecure: true}, + {Name: "src2", URL: plainSrv2.URL, AllowInsecure: true}, + } + + result, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result["src1"]; got != "data1" { + t.Errorf("src1: expected 'data1', got %v", got) + } + if got := result["src2"]; got != "data2" { + t.Errorf("src2: expected 'data2', got %v", got) + } +} + +func TestFetchAll_SecretNotFound(t *testing.T) { + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).Build() + + f := newFetcher(func(f *Fetcher) { f.Client = cl }) + sources := []v1alpha1.ContextSource{{ + Name: "missing", + URL: "https://example.com", + HeadersFrom: []v1alpha1.HeaderFromSecret{{ + Header: "Authorization", + SecretRef: v1alpha1.SecretReference{Name: "nonexistent"}, + Key: "token", + }}, + Required: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected error for missing Secret") + } + if !strings.Contains(err.Error(), "nonexistent") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestFetchAll_SecretKeyNotFound(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "my-secret", Namespace: "default"}, + Data: map[string][]byte{"other-key": []byte("val")}, + } + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(secret).Build() + + f := newFetcher(func(f *Fetcher) { f.Client = cl }) + sources := []v1alpha1.ContextSource{{ + Name: "badkey", + URL: "https://example.com", + HeadersFrom: []v1alpha1.HeaderFromSecret{{ + Header: "Authorization", + SecretRef: v1alpha1.SecretReference{Name: "my-secret"}, + Key: "token", + }}, + Required: true, + }} + + _, err := f.FetchAll(context.Background(), sources, map[string]interface{}{}) + if err == nil { + t.Fatal("expected error for missing key in Secret") + } + if !strings.Contains(err.Error(), "token") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestFetchAll_MissingTemplateVariable(t *testing.T) { + f := newFetcher() + sources := []v1alpha1.ContextSource{{ + Name: "bad", + URL: "https://api.example.com/items/{{.MissingVar}}", + Required: true, + AllowInsecure: true, + }} + vars := map[string]interface{}{"Number": 42} + + _, err := f.FetchAll(context.Background(), sources, vars) + if err == nil { + t.Fatal("Expected error for missing template variable") + } + if !strings.Contains(err.Error(), "MissingVar") { + t.Errorf("Expected error to mention missing variable, got: %v", err) + } +} + +func TestValidateURLScheme(t *testing.T) { + tests := []struct { + name string + url string + allowInsecure bool + wantErr bool + }{ + {"https allowed", "https://example.com", false, false}, + {"http blocked", "http://example.com", false, true}, + {"http allowed with flag", "http://example.com", true, false}, + {"ftp blocked", "ftp://example.com", false, true}, + {"ftp blocked even with flag", "ftp://example.com", true, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateURLScheme(tt.url, tt.allowInsecure) + if (err != nil) != tt.wantErr { + t.Errorf("validateURLScheme(%q, %v) error = %v, wantErr %v", tt.url, tt.allowInsecure, err, tt.wantErr) + } + }) + } +} + +func TestApplyJSONPathFilter(t *testing.T) { + tests := []struct { + name string + body string + expr string + want string + wantErr bool + }{ + { + name: "string value", + body: `{"a":"b"}`, + expr: "$.a", + want: "b", + }, + { + name: "numeric value", + body: `{"a":42}`, + expr: "$.a", + want: "42", + }, + { + name: "nested object", + body: `{"a":{"b":"c"}}`, + expr: "$.a", + want: `{"b":"c"}`, + }, + { + name: "missing field", + body: `{"a":"b"}`, + expr: "$.missing", + wantErr: true, + }, + { + name: "invalid json", + body: `not json`, + expr: "$.a", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := applyJSONPathFilter([]byte(tt.body), tt.expr) + if (err != nil) != tt.wantErr { + t.Errorf("error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml index 7968455b..932c4e73 100644 --- a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml +++ b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml @@ -139,7 +139,138 @@ spec: GitHub webhook sources: {{ "{{.Event}}" }}, {{ "{{.Action}}" }}, {{ "{{.Sender}}" }}, {{ "{{.Ref}}" }}, {{ "{{.Repository}}" }}, {{ "{{.Payload}}" }} (full payload access) Linear webhook sources: {{ "{{.Type}}" }}, {{ "{{.Action}}" }}, {{ "{{.State}}" }}, {{ "{{.Labels}}" }}, {{ "{{.IssueID}}" }}, {{ "{{.Payload}}" }} Cron sources: {{ "{{.Time}}" }}, {{ "{{.Schedule}}" }} + When contextSources are configured: .Context.NAME for each source type: string + contextSources: + description: |- + ContextSources declares external HTTP endpoints to query before task + creation. Each source's response is available as .Context.NAME + in promptTemplate, branch, and metadata templates. Sources are + fetched in parallel during the discovery cycle. + items: + description: |- + ContextSource declares an external HTTP endpoint to query before task + creation. The response body (optionally filtered via JSONPath) is made + available as a .Context.NAME template variable. + properties: + allowInsecure: + description: AllowInsecure permits plain HTTP (non-TLS) + URLs. Defaults to false. + type: boolean + body: + description: Body is a Go text/template for POST request + bodies. + type: string + headers: + additionalProperties: + type: string + description: |- + Headers are static HTTP headers to include in the request. + Values support Go text/template variables from the work item. + type: object + headersFrom: + description: |- + HeadersFrom references Secrets whose data keys map to HTTP header + values. These are merged with inline Headers; HeadersFrom values + take precedence on conflict. + items: + description: |- + HeaderFromSecret maps a single HTTP header to a value stored in a + Kubernetes Secret. + properties: + header: + description: Header is the HTTP header name (e.g., + "Authorization"). + minLength: 1 + type: string + key: + description: |- + Key is the data key within the Secret whose value is used as the + header value. + minLength: 1 + type: string + secretRef: + description: |- + SecretRef references the Secret containing the header value. + The Secret must be in the same namespace as the TaskSpawner. + properties: + name: + description: Name is the name of the secret. + type: string + required: + - name + type: object + required: + - header + - key + - secretRef + type: object + type: array + jsonPathFilter: + description: |- + JSONPathFilter is a JSONPath expression applied to the JSON response + body (e.g., "$.data.value"). When set, only the extracted value is + stored as the context variable. When empty, the entire response body + is stored as a string. Uses the same JSONPath syntax as generic + webhook fieldMapping. + type: string + maxResponseBytes: + default: 32768 + description: |- + MaxResponseBytes limits the response body size read from the + endpoint. Prevents oversized responses from inflating prompts. + Defaults to 32768 (32 KiB). + format: int32 + maximum: 131072 + minimum: 1 + type: integer + method: + default: GET + description: Method is the HTTP method to use. Defaults + to GET. + enum: + - GET + - POST + type: string + name: + description: |- + Name identifies this context source. The fetched value is available + as .Context.NAME in promptTemplate, branch, and metadata templates. + maxLength: 64 + minLength: 1 + pattern: ^[a-zA-Z][a-zA-Z0-9_]*$ + type: string + required: + description: |- + Required when true causes task creation to be skipped for this work + item if the context source fetch fails. When false (default), a + failed fetch produces an empty string for the context variable and + logs a warning. + type: boolean + timeoutSeconds: + default: 10 + description: TimeoutSeconds is the per-request timeout. + Defaults to 10. + format: int32 + maximum: 60 + minimum: 1 + type: integer + url: + description: |- + URL is the HTTP(S) endpoint to fetch. Supports Go text/template + variables from the work item (e.g., "https://api.example.com/items/{{ "{{.Number}}" }}"). + HTTPS is required unless AllowInsecure is set. + minLength: 1 + type: string + required: + - name + - url + type: object + maxItems: 8 + type: array + x-kubernetes-validations: + - message: contextSources names must be unique + rule: self.all(a, self.exists_one(b, b.name == a.name)) credentials: description: Credentials specifies how to authenticate with the agent. @@ -2893,6 +3024,7 @@ spec: GitHub webhook sources: {{ "{{.Event}}" }}, {{ "{{.Action}}" }}, {{ "{{.Sender}}" }}, {{ "{{.Ref}}" }}, {{ "{{.Repository}}" }}, {{ "{{.Payload}}" }} (full payload access) Linear webhook sources: {{ "{{.Type}}" }}, {{ "{{.Action}}" }}, {{ "{{.State}}" }}, {{ "{{.Labels}}" }}, {{ "{{.IssueID}}" }}, {{ "{{.Payload}}" }} Cron sources: {{ "{{.Time}}" }}, {{ "{{.Schedule}}" }} + When contextSources are configured: .Context.NAME for each source type: string ttlSecondsAfterFinished: description: |- diff --git a/internal/manifests/charts/kelos/templates/rbac.yaml b/internal/manifests/charts/kelos/templates/rbac.yaml index 4bd7de4d..e19b3729 100644 --- a/internal/manifests/charts/kelos/templates/rbac.yaml +++ b/internal/manifests/charts/kelos/templates/rbac.yaml @@ -136,6 +136,12 @@ kind: ClusterRole metadata: name: kelos-spawner-role rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - kelos.dev resources: @@ -236,6 +242,12 @@ kind: ClusterRole metadata: name: kelos-webhook-role rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - kelos.dev resources: diff --git a/internal/manifests/install-crd.yaml b/internal/manifests/install-crd.yaml index eb600622..47e3c48f 100644 --- a/internal/manifests/install-crd.yaml +++ b/internal/manifests/install-crd.yaml @@ -3258,7 +3258,138 @@ spec: GitHub webhook sources: {{.Event}}, {{.Action}}, {{.Sender}}, {{.Ref}}, {{.Repository}}, {{.Payload}} (full payload access) Linear webhook sources: {{.Type}}, {{.Action}}, {{.State}}, {{.Labels}}, {{.IssueID}}, {{.Payload}} Cron sources: {{.Time}}, {{.Schedule}} + When contextSources are configured: .Context.NAME for each source type: string + contextSources: + description: |- + ContextSources declares external HTTP endpoints to query before task + creation. Each source's response is available as .Context.NAME + in promptTemplate, branch, and metadata templates. Sources are + fetched in parallel during the discovery cycle. + items: + description: |- + ContextSource declares an external HTTP endpoint to query before task + creation. The response body (optionally filtered via JSONPath) is made + available as a .Context.NAME template variable. + properties: + allowInsecure: + description: AllowInsecure permits plain HTTP (non-TLS) + URLs. Defaults to false. + type: boolean + body: + description: Body is a Go text/template for POST request + bodies. + type: string + headers: + additionalProperties: + type: string + description: |- + Headers are static HTTP headers to include in the request. + Values support Go text/template variables from the work item. + type: object + headersFrom: + description: |- + HeadersFrom references Secrets whose data keys map to HTTP header + values. These are merged with inline Headers; HeadersFrom values + take precedence on conflict. + items: + description: |- + HeaderFromSecret maps a single HTTP header to a value stored in a + Kubernetes Secret. + properties: + header: + description: Header is the HTTP header name (e.g., + "Authorization"). + minLength: 1 + type: string + key: + description: |- + Key is the data key within the Secret whose value is used as the + header value. + minLength: 1 + type: string + secretRef: + description: |- + SecretRef references the Secret containing the header value. + The Secret must be in the same namespace as the TaskSpawner. + properties: + name: + description: Name is the name of the secret. + type: string + required: + - name + type: object + required: + - header + - key + - secretRef + type: object + type: array + jsonPathFilter: + description: |- + JSONPathFilter is a JSONPath expression applied to the JSON response + body (e.g., "$.data.value"). When set, only the extracted value is + stored as the context variable. When empty, the entire response body + is stored as a string. Uses the same JSONPath syntax as generic + webhook fieldMapping. + type: string + maxResponseBytes: + default: 32768 + description: |- + MaxResponseBytes limits the response body size read from the + endpoint. Prevents oversized responses from inflating prompts. + Defaults to 32768 (32 KiB). + format: int32 + maximum: 131072 + minimum: 1 + type: integer + method: + default: GET + description: Method is the HTTP method to use. Defaults + to GET. + enum: + - GET + - POST + type: string + name: + description: |- + Name identifies this context source. The fetched value is available + as .Context.NAME in promptTemplate, branch, and metadata templates. + maxLength: 64 + minLength: 1 + pattern: ^[a-zA-Z][a-zA-Z0-9_]*$ + type: string + required: + description: |- + Required when true causes task creation to be skipped for this work + item if the context source fetch fails. When false (default), a + failed fetch produces an empty string for the context variable and + logs a warning. + type: boolean + timeoutSeconds: + default: 10 + description: TimeoutSeconds is the per-request timeout. + Defaults to 10. + format: int32 + maximum: 60 + minimum: 1 + type: integer + url: + description: |- + URL is the HTTP(S) endpoint to fetch. Supports Go text/template + variables from the work item (e.g., "https://api.example.com/items/{{.Number}}"). + HTTPS is required unless AllowInsecure is set. + minLength: 1 + type: string + required: + - name + - url + type: object + maxItems: 8 + type: array + x-kubernetes-validations: + - message: contextSources names must be unique + rule: self.all(a, self.exists_one(b, b.name == a.name)) credentials: description: Credentials specifies how to authenticate with the agent. @@ -6012,6 +6143,7 @@ spec: GitHub webhook sources: {{.Event}}, {{.Action}}, {{.Sender}}, {{.Ref}}, {{.Repository}}, {{.Payload}} (full payload access) Linear webhook sources: {{.Type}}, {{.Action}}, {{.State}}, {{.Labels}}, {{.IssueID}}, {{.Payload}} Cron sources: {{.Time}}, {{.Schedule}} + When contextSources are configured: .Context.NAME for each source type: string ttlSecondsAfterFinished: description: |- diff --git a/internal/webhook/github_filter.go b/internal/webhook/github_filter.go index 4d01bb6e..070a283a 100644 --- a/internal/webhook/github_filter.go +++ b/internal/webhook/github_filter.go @@ -35,6 +35,10 @@ type GitHubEventData struct { Body string URL string Branch string + // Comment-specific fields for issue_comment, pull_request_review, + // and pull_request_review_comment events. + CommentBody string + CommentURL string // ChangedFiles lists file paths modified by the event. // For push events, populated from the payload. For PR events, lazily // fetched from the GitHub API when a webhook filter uses FilePatterns. @@ -132,6 +136,10 @@ func ParseGitHubWebhook(eventType string, payload []byte) (*GitHubEventData, err case *github.IssueCommentEvent: data.Action = e.GetAction() data.Sender = e.GetSender().GetLogin() + if comment := e.GetComment(); comment != nil { + data.CommentBody = comment.GetBody() + data.CommentURL = comment.GetHTMLURL() + } if issue := e.GetIssue(); issue != nil { data.ID = fmt.Sprintf("%d", issue.GetNumber()) data.Title = issue.GetTitle() @@ -150,6 +158,10 @@ func ParseGitHubWebhook(eventType string, payload []byte) (*GitHubEventData, err case *github.PullRequestReviewEvent: data.Action = e.GetAction() data.Sender = e.GetSender().GetLogin() + if review := e.GetReview(); review != nil { + data.CommentBody = review.GetBody() + data.CommentURL = review.GetHTMLURL() + } if pr := e.GetPullRequest(); pr != nil { data.ID = fmt.Sprintf("%d", pr.GetNumber()) data.Title = pr.GetTitle() @@ -164,6 +176,10 @@ func ParseGitHubWebhook(eventType string, payload []byte) (*GitHubEventData, err case *github.PullRequestReviewCommentEvent: data.Action = e.GetAction() data.Sender = e.GetSender().GetLogin() + if comment := e.GetComment(); comment != nil { + data.CommentBody = comment.GetBody() + data.CommentURL = comment.GetHTMLURL() + } if pr := e.GetPullRequest(); pr != nil { data.ID = fmt.Sprintf("%d", pr.GetNumber()) data.Title = pr.GetTitle() @@ -458,6 +474,12 @@ func ExtractGitHubWorkItem(eventData *GitHubEventData) map[string]interface{} { if eventData.Branch != "" { vars["Branch"] = eventData.Branch } + if eventData.CommentBody != "" { + vars["CommentBody"] = eventData.CommentBody + } + if eventData.CommentURL != "" { + vars["CommentURL"] = eventData.CommentURL + } return vars } diff --git a/internal/webhook/github_filter_test.go b/internal/webhook/github_filter_test.go index 96b0dacc..01d85fa4 100644 --- a/internal/webhook/github_filter_test.go +++ b/internal/webhook/github_filter_test.go @@ -1282,6 +1282,129 @@ func TestParseGitHubWebhook_IssueCommentOnPR_ExtractsPullRequestAPIURL(t *testin } } +func TestParseGitHubWebhook_IssueComment_ExtractsCommentFields(t *testing.T) { + payload := `{ + "action": "created", + "sender": {"login": "testuser"}, + "repository": {"full_name": "org/repo", "name": "repo", "owner": {"login": "org"}}, + "issue": { + "number": 42, + "title": "Test PR", + "body": "PR body", + "html_url": "https://github.com/org/repo/pull/42", + "state": "open" + }, + "comment": { + "body": "/review please", + "html_url": "https://github.com/org/repo/pull/42#issuecomment-123", + "user": {"login": "commenter"} + } + }` + + got, err := ParseGitHubWebhook("issue_comment", []byte(payload)) + if err != nil { + t.Fatalf("ParseGitHubWebhook() error = %v", err) + } + if got.CommentBody != "/review please" { + t.Errorf("CommentBody = %q, want %q", got.CommentBody, "/review please") + } + if got.CommentURL != "https://github.com/org/repo/pull/42#issuecomment-123" { + t.Errorf("CommentURL = %q, want %q", got.CommentURL, "https://github.com/org/repo/pull/42#issuecomment-123") + } +} + +func TestParseGitHubWebhook_PullRequestReviewComment_ExtractsCommentFields(t *testing.T) { + payload := `{ + "action": "created", + "sender": {"login": "testuser"}, + "repository": {"full_name": "org/repo", "name": "repo", "owner": {"login": "org"}}, + "pull_request": { + "number": 99, + "title": "Fix bug", + "body": "Fixes the bug", + "html_url": "https://github.com/org/repo/pull/99", + "head": {"ref": "fix-branch"} + }, + "comment": { + "body": "nit: rename this variable", + "html_url": "https://github.com/org/repo/pull/99#discussion_r456", + "user": {"login": "reviewer"} + } + }` + + got, err := ParseGitHubWebhook("pull_request_review_comment", []byte(payload)) + if err != nil { + t.Fatalf("ParseGitHubWebhook() error = %v", err) + } + if got.CommentBody != "nit: rename this variable" { + t.Errorf("CommentBody = %q, want %q", got.CommentBody, "nit: rename this variable") + } + if got.CommentURL != "https://github.com/org/repo/pull/99#discussion_r456" { + t.Errorf("CommentURL = %q, want %q", got.CommentURL, "https://github.com/org/repo/pull/99#discussion_r456") + } +} + +func TestParseGitHubWebhook_PullRequestReview_ExtractsCommentFields(t *testing.T) { + payload := `{ + "action": "submitted", + "sender": {"login": "testuser"}, + "repository": {"full_name": "org/repo", "name": "repo", "owner": {"login": "org"}}, + "pull_request": { + "number": 50, + "title": "Add feature", + "body": "New feature", + "html_url": "https://github.com/org/repo/pull/50", + "head": {"ref": "feat-branch"} + }, + "review": { + "body": "LGTM with minor comments", + "html_url": "https://github.com/org/repo/pull/50#pullrequestreview-789", + "user": {"login": "lead-reviewer"} + } + }` + + got, err := ParseGitHubWebhook("pull_request_review", []byte(payload)) + if err != nil { + t.Fatalf("ParseGitHubWebhook() error = %v", err) + } + if got.CommentBody != "LGTM with minor comments" { + t.Errorf("CommentBody = %q, want %q", got.CommentBody, "LGTM with minor comments") + } + if got.CommentURL != "https://github.com/org/repo/pull/50#pullrequestreview-789" { + t.Errorf("CommentURL = %q, want %q", got.CommentURL, "https://github.com/org/repo/pull/50#pullrequestreview-789") + } +} + +func TestExtractGitHubWorkItemCommentFields(t *testing.T) { + eventData := &GitHubEventData{ + Event: "pull_request_review_comment", + CommentBody: "nit: rename this", + CommentURL: "https://github.com/org/repo/pull/99#discussion_r456", + } + + vars := ExtractGitHubWorkItem(eventData) + if vars["CommentBody"] != "nit: rename this" { + t.Errorf("CommentBody = %v, want %q", vars["CommentBody"], "nit: rename this") + } + if vars["CommentURL"] != "https://github.com/org/repo/pull/99#discussion_r456" { + t.Errorf("CommentURL = %v, want %q", vars["CommentURL"], "https://github.com/org/repo/pull/99#discussion_r456") + } +} + +func TestExtractGitHubWorkItemNoCommentFields(t *testing.T) { + eventData := &GitHubEventData{ + Event: "push", + } + + vars := ExtractGitHubWorkItem(eventData) + if _, ok := vars["CommentBody"]; ok { + t.Error("CommentBody should not be set for non-comment events") + } + if _, ok := vars["CommentURL"]; ok { + t.Error("CommentURL should not be set for non-comment events") + } +} + func TestParseGitHubWebhook_IssueCommentOnIssue_NoPullRequestAPIURL(t *testing.T) { payload := `{ "action": "created", diff --git a/internal/webhook/handler.go b/internal/webhook/handler.go index 718d4183..15b0775c 100644 --- a/internal/webhook/handler.go +++ b/internal/webhook/handler.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/contextfetch" "github.com/kelos-dev/kelos/internal/reporting" "github.com/kelos-dev/kelos/internal/taskbuilder" ) @@ -528,6 +529,21 @@ func (h *WebhookHandler) createTask(ctx context.Context, spawner *v1alpha1.TaskS log.Info("Extracted template variables", "ID", templateVars["ID"], "Title", templateVars["Title"], "Action", templateVars["Action"]) + // Enrich with external context sources + if len(spawner.Spec.TaskTemplate.ContextSources) > 0 { + fetcher := &contextfetch.Fetcher{ + Client: h.client, + HTTPClient: http.DefaultClient, + Namespace: spawner.Namespace, + Logger: log, + } + contextData, err := fetcher.FetchAll(ctx, spawner.Spec.TaskTemplate.ContextSources, templateVars) + if err != nil { + return fmt.Errorf("fetching context sources: %w", err) + } + templateVars["Context"] = contextData + } + // Build unique task name using a hash of the delivery ID to avoid collisions. // Hashing gives uniform 12-hex-char suffix regardless of input length, // avoiding the collision risk of simple prefix truncation.