diff --git a/Makefile b/Makefile index 72bd3a69..4ac50259 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # Image configuration REGISTRY ?= ghcr.io/kelos-dev VERSION ?= latest -IMAGE_DIRS ?= cmd/kelos-controller cmd/kelos-spawner cmd/ghproxy cmd/kelos-webhook-server claude-code codex gemini opencode cursor +IMAGE_DIRS ?= cmd/kelos-controller cmd/kelos-spawner cmd/ghproxy cmd/kelos-webhook-server claude-code codex gemini opencode cursor cmd/kelos-slack-server LOCAL_ARCH ?= $(shell go env GOARCH) # Version injection for the kelos CLI – only set ldflags when an explicit diff --git a/api/v1alpha1/taskspawner_types.go b/api/v1alpha1/taskspawner_types.go index 40f05833..d0ff6bc0 100644 --- a/api/v1alpha1/taskspawner_types.go +++ b/api/v1alpha1/taskspawner_types.go @@ -51,6 +51,12 @@ type When struct { // the HMAC secret is read from the _WEBHOOK_SECRET env var. // +optional GenericWebhook *GenericWebhook `json:"webhook,omitempty"` + + // Slack discovers work items from Slack messages via Socket Mode. + // The centralized kelos-slack-server connects to Slack via an outbound + // WebSocket (no ingress required) and routes messages to matching agents. + // +optional + Slack *Slack `json:"slack,omitempty"` } // Cron triggers task spawning on a cron schedule. @@ -483,6 +489,50 @@ type GenericWebhookFilter struct { Pattern string `json:"pattern,omitempty"` } +// Slack triggers task spawning from Slack messages via the centralized +// kelos-slack-server. The server connects to Slack via Socket Mode (outbound +// WebSocket — no ingress required) and routes messages to matching +// TaskSpawners. Authentication tokens (SLACK_BOT_TOKEN, SLACK_APP_TOKEN) +// are configured on the server, not per-TaskSpawner. +// +// The bot must be invited to each channel it should listen in; the Channels +// field is a post-delivery filter, not a privacy scope. +// +// Bot mention (@bot) is implicitly required by default. The handler knows its +// own bot user ID from the Slack auth response. When Triggers are configured, +// each trigger's regex pattern is AND'd with the implicit mention requirement +// (unless MentionOptional is set). Multiple triggers use OR semantics. +// Empty triggers = every bot mention fires. +type Slack struct { + // Channels optionally restricts which Slack channels the bot listens in. + // Values are channel IDs (e.g., "C0123456789"). When empty, the bot + // listens in every channel it has been invited to. + // +optional + // +kubebuilder:validation:MaxItems=64 + // +kubebuilder:validation:items:Pattern=`^[CG][A-Z0-9]{8,}$` + Channels []string `json:"channels,omitempty"` + + // Triggers define regex patterns that must match the message text. + // Bot mention is implicitly required unless MentionOptional is set. + // Multiple triggers use OR semantics. When empty, every bot mention fires. + // +optional + // +kubebuilder:validation:MaxItems=8 + Triggers []SlackTrigger `json:"triggers,omitempty"` +} + +// SlackTrigger defines a regex pattern trigger for Slack messages. +type SlackTrigger struct { + // Pattern is a Go RE2 regex matched against message text (unanchored). + // +optional + // +kubebuilder:validation:MaxLength=256 + Pattern string `json:"pattern,omitempty"` + + // MentionOptional, when true, fires the trigger on pattern match alone + // without requiring a bot @-mention. + // +optional + MentionOptional *bool `json:"mentionOptional,omitempty"` +} + // TaskTemplateMetadata holds optional labels and annotations for spawned Tasks. type TaskTemplateMetadata struct { // Labels are merged into the spawned Task's labels. Values support Go diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index f207f2a0..c1f35740 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -718,6 +718,53 @@ func (in *SkillsShSpec) DeepCopy() *SkillsShSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Slack) DeepCopyInto(out *Slack) { + *out = *in + if in.Channels != nil { + in, out := &in.Channels, &out.Channels + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Triggers != nil { + in, out := &in.Triggers, &out.Triggers + *out = make([]SlackTrigger, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Slack. +func (in *Slack) DeepCopy() *Slack { + if in == nil { + return nil + } + out := new(Slack) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SlackTrigger) DeepCopyInto(out *SlackTrigger) { + *out = *in + if in.MentionOptional != nil { + in, out := &in.MentionOptional, &out.MentionOptional + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SlackTrigger. +func (in *SlackTrigger) DeepCopy() *SlackTrigger { + if in == nil { + return nil + } + out := new(SlackTrigger) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Task) DeepCopyInto(out *Task) { *out = *in @@ -1083,6 +1130,11 @@ func (in *When) DeepCopyInto(out *When) { *out = new(GenericWebhook) (*in).DeepCopyInto(*out) } + if in.Slack != nil { + in, out := &in.Slack, &out.Slack + *out = new(Slack) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new When. diff --git a/cmd/kelos-slack-server/Dockerfile b/cmd/kelos-slack-server/Dockerfile new file mode 100644 index 00000000..930629fc --- /dev/null +++ b/cmd/kelos-slack-server/Dockerfile @@ -0,0 +1,5 @@ +FROM gcr.io/distroless/static:nonroot +WORKDIR / +COPY bin/kelos-slack-server . +USER 65532:65532 +ENTRYPOINT ["/kelos-slack-server"] diff --git a/cmd/kelos-slack-server/main.go b/cmd/kelos-slack-server/main.go new file mode 100644 index 00000000..cada5c43 --- /dev/null +++ b/cmd/kelos-slack-server/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/logging" + kelosslack "github.com/kelos-dev/kelos/internal/slack" +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(kelosv1alpha1.AddToScheme(scheme)) +} + +func main() { + var ( + metricsAddr string + probeAddr string + enableLeaderElection bool + ) + + flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager.") + + opts, applyVerbosity := logging.SetupZapOptions(flag.CommandLine) + flag.Parse() + + if err := applyVerbosity(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(opts))) + + botToken := os.Getenv("SLACK_BOT_TOKEN") + appToken := os.Getenv("SLACK_APP_TOKEN") + if botToken == "" || appToken == "" { + setupLog.Error(fmt.Errorf("missing tokens"), "SLACK_BOT_TOKEN and SLACK_APP_TOKEN environment variables are required") + os.Exit(1) + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: metricsserver.Options{BindAddress: metricsAddr}, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: "kelos-slack-server", + }) + if err != nil { + setupLog.Error(err, "Unable to start manager") + os.Exit(1) + } + + ctx := ctrl.SetupSignalHandler() + + // Create the Slack handler + handler, err := kelosslack.NewSlackHandler( + ctx, + mgr.GetClient(), + botToken, + appToken, + ctrl.Log.WithName("slack"), + ) + if err != nil { + setupLog.Error(err, "Unable to create Slack handler") + os.Exit(1) + } + + // Register Socket Mode listener as a leader-elected runnable so that only + // one replica opens the single-connection Socket Mode WebSocket. + if err := mgr.Add(&slackRunnable{handler: handler}); err != nil { + setupLog.Error(err, "Unable to register Slack handler with manager") + os.Exit(1) + } + + // Health checks + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "Unable to set up health check") + os.Exit(1) + } + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "Unable to set up ready check") + os.Exit(1) + } + + setupLog.Info("Starting manager") + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "Problem running manager") + os.Exit(1) + } +} + +// slackRunnable wraps the SlackHandler as a leader-elected manager.Runnable. +// This ensures only the leader replica opens the Socket Mode connection. +type slackRunnable struct { + handler *kelosslack.SlackHandler +} + +func (r *slackRunnable) Start(ctx context.Context) error { + setupLog.Info("Starting Slack Socket Mode listener") + err := r.handler.Start(ctx) + if err != nil && ctx.Err() == nil { + return err + } + return nil +} + +func (r *slackRunnable) NeedLeaderElection() bool { return true } diff --git a/go.mod b/go.mod index 88ff6373..5dc83b79 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 github.com/robfig/cron/v3 v3.0.1 + github.com/slack-go/slack v0.20.0 github.com/spf13/cobra v1.10.2 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 @@ -62,6 +63,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/renameio/v2 v2.0.0 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 54b2ea51..538059dc 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,8 @@ github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7 github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U= +github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/gobuffalo/flect v1.0.3 h1:xeWBM2nui+qnVvNM4S3foBhCAL2XgPU+a7FdpelbTq4= github.com/gobuffalo/flect v1.0.3/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= @@ -111,6 +113,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/yamlfmt v0.21.0 h1:9FKApQkDpMKgBjwLFytBHUCgqnQgxaQnci0uiESfbzs= github.com/google/yamlfmt v0.21.0/go.mod h1:q6FYExB+Ueu7jZDjKECJk+EaeDXJzJ6Ne0dxx69GWfI= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -195,6 +199,8 @@ github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEV github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/slack-go/slack v0.20.0 h1:gbDdbee8+Z2o+DWx05Spq3GzbrLLleiRwHUKs+hZLSU= +github.com/slack-go/slack v0.20.0/go.mod h1:K81UmCivcYd/5Jmz8vLBfuyoZ3B4rQC2GHVXHteXiAE= github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= diff --git a/internal/controller/taskspawner_controller.go b/internal/controller/taskspawner_controller.go index 7d5a8d13..bfe92a6b 100644 --- a/internal/controller/taskspawner_controller.go +++ b/internal/controller/taskspawner_controller.go @@ -58,8 +58,10 @@ func isCronBased(ts *kelosv1alpha1.TaskSpawner) bool { } // isWebhookBased returns true if the TaskSpawner is webhook-driven. +// Slack uses Socket Mode (outbound WebSocket) handled by the centralized +// kelos-slack-server, so it follows the same no-deployment pattern. func isWebhookBased(ts *kelosv1alpha1.TaskSpawner) bool { - return ts.Spec.When.GitHubWebhook != nil || ts.Spec.When.LinearWebhook != nil || ts.Spec.When.GenericWebhook != nil + return ts.Spec.When.GitHubWebhook != nil || ts.Spec.When.LinearWebhook != nil || ts.Spec.When.GenericWebhook != nil || ts.Spec.When.Slack != nil } // Reconcile handles TaskSpawner reconciliation. diff --git a/internal/controller/taskspawner_controller_test.go b/internal/controller/taskspawner_controller_test.go index 47823ab8..3f86bba8 100644 --- a/internal/controller/taskspawner_controller_test.go +++ b/internal/controller/taskspawner_controller_test.go @@ -91,6 +91,19 @@ func TestIsWebhookBased(t *testing.T) { }, want: true, }, + { + name: "Slack TaskSpawner", + ts: &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + Channels: []string{"C0123456789"}, + }, + }, + }, + }, + want: true, + }, } for _, tt := range tests { diff --git a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml index ac5cfc36..a03e2801 100644 --- a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml +++ b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml @@ -1003,6 +1003,45 @@ spec: required: - types type: object + slack: + description: |- + Slack discovers work items from Slack messages via Socket Mode. + The centralized kelos-slack-server connects to Slack via an outbound + WebSocket (no ingress required) and routes messages to matching agents. + properties: + channels: + description: |- + Channels optionally restricts which Slack channels the bot listens in. + Values are channel IDs (e.g., "C0123456789"). When empty, the bot + listens in every channel it has been invited to. + items: + pattern: ^[CG][A-Z0-9]{8,}$ + type: string + maxItems: 64 + type: array + triggers: + description: |- + Triggers define regex patterns that must match the message text. + Bot mention is implicitly required unless MentionOptional is set. + Multiple triggers use OR semantics. When empty, every bot mention fires. + items: + description: SlackTrigger defines a regex pattern trigger + for Slack messages. + properties: + mentionOptional: + description: |- + MentionOptional, when true, fires the trigger on pattern match alone + without requiring a bot @-mention. + type: boolean + pattern: + description: Pattern is a Go RE2 regex matched against + message text (unanchored). + maxLength: 256 + type: string + type: object + maxItems: 8 + type: array + type: object webhook: description: |- GenericWebhook triggers task spawning from arbitrary HTTP POST payloads. diff --git a/internal/manifests/charts/kelos/templates/rbac.yaml b/internal/manifests/charts/kelos/templates/rbac.yaml index 015da94e..f51399fe 100644 --- a/internal/manifests/charts/kelos/templates/rbac.yaml +++ b/internal/manifests/charts/kelos/templates/rbac.yaml @@ -275,3 +275,65 @@ subjects: name: kelos-webhook namespace: {{ .Release.Namespace }} {{- end }} +{{- if .Values.slackServer.enabled }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kelos-slack-server-role +rules: + - apiGroups: + - kelos.dev + resources: + - taskspawners + verbs: + - get + - list + - watch + - apiGroups: + - kelos.dev + resources: + - tasks + verbs: + - create + - get + - list + - apiGroups: + - kelos.dev + resources: + - agentconfigs + - workspaces + verbs: + - get + - list + - watch + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kelos-slack-server-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kelos-slack-server-role +subjects: + - kind: ServiceAccount + name: kelos-slack-server + namespace: {{ .Release.Namespace }} +{{- end }} diff --git a/internal/manifests/charts/kelos/templates/serviceaccount.yaml b/internal/manifests/charts/kelos/templates/serviceaccount.yaml index 70925126..0733e14d 100644 --- a/internal/manifests/charts/kelos/templates/serviceaccount.yaml +++ b/internal/manifests/charts/kelos/templates/serviceaccount.yaml @@ -12,3 +12,11 @@ metadata: name: kelos-webhook namespace: {{ .Release.Namespace }} {{- end }} +{{- if .Values.slackServer.enabled }} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kelos-slack-server + namespace: {{ .Release.Namespace }} +{{- end }} diff --git a/internal/manifests/charts/kelos/templates/slack-server.yaml b/internal/manifests/charts/kelos/templates/slack-server.yaml new file mode 100644 index 00000000..b8e826b3 --- /dev/null +++ b/internal/manifests/charts/kelos/templates/slack-server.yaml @@ -0,0 +1,78 @@ +{{- if .Values.slackServer.enabled }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kelos-slack-server + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kelos + app.kubernetes.io/component: slack-server +spec: + replicas: {{ .Values.slackServer.replicas }} + selector: + matchLabels: + app.kubernetes.io/name: kelos + app.kubernetes.io/component: slack-server + template: + metadata: + labels: + app.kubernetes.io/name: kelos + app.kubernetes.io/component: slack-server + spec: + serviceAccountName: kelos-slack-server + securityContext: + runAsNonRoot: true + containers: + - name: slack-server + image: {{ .Values.slackServer.image }}{{- if .Values.image.tag }}:{{ .Values.image.tag }}{{- end }} + {{- if .Values.image.pullPolicy }} + imagePullPolicy: {{ .Values.image.pullPolicy }} + {{- end }} + args: + - --leader-elect + - --metrics-bind-address=:8080 + - --health-probe-bind-address=:8081 + env: + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: {{ .Values.slackServer.secretName }} + key: SLACK_BOT_TOKEN + - name: SLACK_APP_TOKEN + valueFrom: + secretKeyRef: + name: {{ .Values.slackServer.secretName }} + key: SLACK_APP_TOKEN + ports: + - name: metrics + containerPort: 8080 + protocol: TCP + - name: health + containerPort: 8081 + protocol: TCP + livenessProbe: + httpGet: + path: /healthz + port: health + initialDelaySeconds: 15 + periodSeconds: 20 + readinessProbe: + httpGet: + path: /readyz + port: health + initialDelaySeconds: 5 + periodSeconds: 10 + {{- if .Values.slackServer.resources }} + resources: + {{- toYaml .Values.slackServer.resources | nindent 12 }} + {{- end }} + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault + capabilities: + drop: + - "ALL" +{{- end }} diff --git a/internal/manifests/charts/kelos/values.yaml b/internal/manifests/charts/kelos/values.yaml index 138557c2..2e6b5a53 100644 --- a/internal/manifests/charts/kelos/values.yaml +++ b/internal/manifests/charts/kelos/values.yaml @@ -75,3 +75,15 @@ webhookServer: tls: enabled: false certificateRefs: [] +slackServer: + enabled: false + image: ghcr.io/kelos-dev/kelos-slack-server + replicas: 1 + secretName: "" + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 10m + memory: 64Mi diff --git a/internal/manifests/install-crd.yaml b/internal/manifests/install-crd.yaml index 88caed34..dca09bc8 100644 --- a/internal/manifests/install-crd.yaml +++ b/internal/manifests/install-crd.yaml @@ -1690,6 +1690,45 @@ spec: required: - types type: object + slack: + description: |- + Slack discovers work items from Slack messages via Socket Mode. + The centralized kelos-slack-server connects to Slack via an outbound + WebSocket (no ingress required) and routes messages to matching agents. + properties: + channels: + description: |- + Channels optionally restricts which Slack channels the bot listens in. + Values are channel IDs (e.g., "C0123456789"). When empty, the bot + listens in every channel it has been invited to. + items: + pattern: ^[CG][A-Z0-9]{8,}$ + type: string + maxItems: 64 + type: array + triggers: + description: |- + Triggers define regex patterns that must match the message text. + Bot mention is implicitly required unless MentionOptional is set. + Multiple triggers use OR semantics. When empty, every bot mention fires. + items: + description: SlackTrigger defines a regex pattern trigger + for Slack messages. + properties: + mentionOptional: + description: |- + MentionOptional, when true, fires the trigger on pattern match alone + without requiring a bot @-mention. + type: boolean + pattern: + description: Pattern is a Go RE2 regex matched against + message text (unanchored). + maxLength: 256 + type: string + type: object + maxItems: 8 + type: array + type: object webhook: description: |- GenericWebhook triggers task spawning from arbitrary HTTP POST payloads. diff --git a/internal/slack/filter.go b/internal/slack/filter.go new file mode 100644 index 00000000..f4970a78 --- /dev/null +++ b/internal/slack/filter.go @@ -0,0 +1,143 @@ +package slack + +import ( + "fmt" + "regexp" + "strings" + "sync" + + "github.com/kelos-dev/kelos/api/v1alpha1" +) + +// SlackMessageData holds the parsed fields from a Slack message or slash +// command needed for matching and task creation. +type SlackMessageData struct { + // UserID is the Slack user ID of the message author. + UserID string + // ChannelID is the Slack channel ID where the message was posted. + ChannelID string + // UserName is the display name of the message author. + UserName string + // Text is the raw message text. + Text string + // ThreadTS is the parent message timestamp when this is a thread reply. + ThreadTS string + // Timestamp is the message's own timestamp (used as ID and thread_ts for replies). + Timestamp string + // Permalink is the Slack permalink URL for the message. + Permalink string + // Body is the processed message body (trigger prefix stripped, or full thread context). + Body string + // HasThreadContext indicates that Body contains full thread context + // rather than the raw message text. + HasThreadContext bool + // IsSlashCommand indicates this came from a slash command rather than a message event. + IsSlashCommand bool + // SlashCommandID is the composite ID for slash commands (channelID:command:triggerID). + SlashCommandID string +} + +var triggerRegexpCache sync.Map + +func getOrCompileTriggerRegexp(pattern string) (*regexp.Regexp, error) { + if cached, ok := triggerRegexpCache.Load(pattern); ok { + return cached.(*regexp.Regexp), nil + } + re, err := regexp.Compile(pattern) + if err != nil { + return nil, err + } + triggerRegexpCache.Store(pattern, re) + return re, nil +} + +// MatchesSpawner checks whether a Slack message matches the given TaskSpawner's +// Slack configuration (channels, bot mention, and trigger patterns). +func MatchesSpawner(slackCfg *v1alpha1.Slack, msg *SlackMessageData, botUserID string) bool { + if slackCfg == nil { + return false + } + if !matchesChannel(msg.ChannelID, slackCfg.Channels) { + return false + } + // Slash commands bypass mention and trigger filters. + if msg.IsSlashCommand { + return true + } + // No triggers: fire on any bot mention. + if len(slackCfg.Triggers) == 0 { + return hasBotMention(msg.Text, botUserID) + } + // With triggers: OR across triggers. + return matchesTriggers(msg.Text, slackCfg.Triggers, botUserID) +} + +// ExtractSlackWorkItem builds the template variables map from a Slack message +// for use with taskbuilder.BuildTask. The keys match the standard template +// variables available in promptTemplate and branch. +func ExtractSlackWorkItem(msg *SlackMessageData) map[string]interface{} { + id := msg.Timestamp + if msg.IsSlashCommand { + id = msg.SlashCommandID + } + + title := msg.Text + if idx := strings.Index(title, "\n"); idx != -1 { + title = title[:idx] + } + + return map[string]interface{}{ + "ID": id, + "Title": title, + "Body": msg.Body, + "URL": msg.Permalink, + "Kind": "SlackMessage", + } +} + +// matchesChannel returns true if channelID is in the allowed list, +// or if the allowed list is empty (all channels permitted). +func matchesChannel(channelID string, allowed []string) bool { + if len(allowed) == 0 { + return true + } + for _, id := range allowed { + if id == channelID { + return true + } + } + return false +} + +// hasBotMention returns true if the message text contains an @-mention of +// the bot user ID. Slack encodes mentions as <@USER_ID> or <@USER_ID|name>. +func hasBotMention(text string, botUserID string) bool { + if botUserID == "" { + return false + } + return strings.Contains(text, fmt.Sprintf("<@%s>", botUserID)) || + strings.Contains(text, fmt.Sprintf("<@%s|", botUserID)) +} + +// matchesTriggers evaluates trigger patterns against message text with OR +// semantics. Each trigger requires pattern match AND bot mention, unless +// MentionOptional is true on that trigger. +func matchesTriggers(text string, triggers []v1alpha1.SlackTrigger, botUserID string) bool { + mentioned := hasBotMention(text, botUserID) + for _, t := range triggers { + re, err := getOrCompileTriggerRegexp(t.Pattern) + if err != nil { + continue + } + if !re.MatchString(text) { + continue + } + if t.MentionOptional != nil && *t.MentionOptional { + return true + } + if mentioned { + return true + } + } + return false +} diff --git a/internal/slack/filter_test.go b/internal/slack/filter_test.go new file mode 100644 index 00000000..46e3316c --- /dev/null +++ b/internal/slack/filter_test.go @@ -0,0 +1,440 @@ +package slack + +import ( + "testing" + + "github.com/kelos-dev/kelos/api/v1alpha1" +) + +func boolPtr(b bool) *bool { return &b } + +func TestMatchesSpawner(t *testing.T) { + tests := []struct { + name string + slackCfg *v1alpha1.Slack + msg *SlackMessageData + botUserID string + want bool + }{ + { + name: "nil slack config", + slackCfg: nil, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "empty config with bot mention matches", + slackCfg: &v1alpha1.Slack{}, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "hey <@UBOT1> help"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "empty config without bot mention rejects", + slackCfg: &v1alpha1.Slack{}, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "hey help"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "channel filter matches", + slackCfg: &v1alpha1.Slack{ + Channels: []string{"C1", "C2"}, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> hi"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "channel filter rejects", + slackCfg: &v1alpha1.Slack{ + Channels: []string{"C2", "C3"}, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> hi"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "trigger with pattern and mention matches", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "fix.*bug"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> fix the bug"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "trigger with pattern match but no mention rejects", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "fix.*bug"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "fix the bug"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "trigger with mention but pattern does not match rejects", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "deploy"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> fix the bug"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "trigger with mentionOptional fires on pattern alone", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "fix.*bug", MentionOptional: boolPtr(true)}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "fix the bug"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "trigger with mentionOptional=false requires mention", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "fix.*bug", MentionOptional: boolPtr(false)}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "fix the bug"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "multiple triggers OR semantics first misses second hits", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "deploy"}, + {Pattern: "fix"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> fix the bug"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "multiple triggers none match", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "deploy"}, + {Pattern: "rollback"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> fix the bug"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "slash command bypasses mention and triggers", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "deploy"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "fix this", IsSlashCommand: true}, + botUserID: "UBOT1", + want: true, + }, + { + name: "thread reply with bot mention matches", + slackCfg: &v1alpha1.Slack{}, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> follow up", ThreadTS: "1234567890.123456"}, + botUserID: "UBOT1", + want: true, + }, + { + name: "thread reply without bot mention rejects", + slackCfg: &v1alpha1.Slack{}, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "follow up", ThreadTS: "1234567890.123456"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "channel filter passes but no mention rejects", + slackCfg: &v1alpha1.Slack{ + Channels: []string{"C1"}, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "hello"}, + botUserID: "UBOT1", + want: false, + }, + { + name: "invalid trigger regex is skipped", + slackCfg: &v1alpha1.Slack{ + Triggers: []v1alpha1.SlackTrigger{ + {Pattern: "[invalid"}, + {Pattern: "fix"}, + }, + }, + msg: &SlackMessageData{UserID: "U1", ChannelID: "C1", Text: "<@UBOT1> fix it"}, + botUserID: "UBOT1", + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := MatchesSpawner(tt.slackCfg, tt.msg, tt.botUserID) + if got != tt.want { + t.Errorf("MatchesSpawner() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestExtractSlackWorkItem(t *testing.T) { + t.Run("regular message", func(t *testing.T) { + msg := &SlackMessageData{ + UserID: "U123", + UserName: "Alice", + Text: "fix the login page", + Body: "fix the login page", + Timestamp: "1234567890.123456", + Permalink: "https://slack.com/archives/C1/p1234567890123456", + } + + vars := ExtractSlackWorkItem(msg) + + if vars["ID"] != "1234567890.123456" { + t.Errorf("ID = %v, want %v", vars["ID"], "1234567890.123456") + } + if vars["Title"] != "fix the login page" { + t.Errorf("Title = %v, want %v", vars["Title"], "fix the login page") + } + if vars["Body"] != "fix the login page" { + t.Errorf("Body = %v, want %v", vars["Body"], "fix the login page") + } + if vars["URL"] != "https://slack.com/archives/C1/p1234567890123456" { + t.Errorf("URL = %v, want %v", vars["URL"], msg.Permalink) + } + if vars["Kind"] != "SlackMessage" { + t.Errorf("Kind = %v, want %v", vars["Kind"], "SlackMessage") + } + }) + + t.Run("slash command uses composite ID", func(t *testing.T) { + msg := &SlackMessageData{ + UserID: "U123", + UserName: "Alice", + Text: "do something", + Body: "do something", + IsSlashCommand: true, + SlashCommandID: "C1:/kelos:trigger123", + } + + vars := ExtractSlackWorkItem(msg) + + if vars["ID"] != "C1:/kelos:trigger123" { + t.Errorf("ID = %v, want %v", vars["ID"], "C1:/kelos:trigger123") + } + }) + + t.Run("multi-line message uses first line as title", func(t *testing.T) { + msg := &SlackMessageData{ + UserID: "U123", + UserName: "Alice", + Text: "fix the login page\nmore details here\nand more", + Body: "fix the login page\nmore details here\nand more", + Timestamp: "1234567890.123456", + } + + vars := ExtractSlackWorkItem(msg) + + if vars["Title"] != "fix the login page" { + t.Errorf("Title = %v, want %v", vars["Title"], "fix the login page") + } + }) +} + +func TestShouldProcess(t *testing.T) { + tests := []struct { + name string + userID string + subtype string + hasContent bool + selfUserID string + want bool + }{ + { + name: "normal message", + userID: "U1", + hasContent: true, + selfUserID: "UBOT", + want: true, + }, + { + name: "self message filtered", + userID: "UBOT", + hasContent: true, + selfUserID: "UBOT", + want: false, + }, + { + name: "bot_message subtype filtered", + userID: "U1", + subtype: "bot_message", + hasContent: true, + selfUserID: "UBOT", + want: false, + }, + { + name: "message_changed subtype filtered", + userID: "U1", + subtype: "message_changed", + hasContent: true, + selfUserID: "UBOT", + want: false, + }, + { + name: "message_deleted subtype filtered", + userID: "U1", + subtype: "message_deleted", + hasContent: true, + selfUserID: "UBOT", + want: false, + }, + { + name: "message_replied subtype filtered", + userID: "U1", + subtype: "message_replied", + hasContent: true, + selfUserID: "UBOT", + want: false, + }, + { + name: "no content filtered", + userID: "U1", + hasContent: false, + selfUserID: "UBOT", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := shouldProcess(tt.userID, tt.subtype, tt.hasContent, tt.selfUserID) + if got != tt.want { + t.Errorf("shouldProcess() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMatchesChannel(t *testing.T) { + tests := []struct { + name string + channelID string + allowed []string + want bool + }{ + {"empty allowed list matches all", "C1", nil, true}, + {"in allowed list", "C1", []string{"C1", "C2"}, true}, + {"not in allowed list", "C3", []string{"C1", "C2"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := matchesChannel(tt.channelID, tt.allowed); got != tt.want { + t.Errorf("matchesChannel() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestHasBotMention(t *testing.T) { + tests := []struct { + name string + text string + botUserID string + want bool + }{ + {"mention present", "hey <@UBOT1> fix", "UBOT1", true}, + {"mention with display name", "hey <@UBOT1|kelos-bot> fix", "UBOT1", true}, + {"mention absent", "hey fix this", "UBOT1", false}, + {"empty bot user ID", "hey <@UBOT1> fix", "", false}, + {"partial ID does not match", "hey <@UBOT10> fix", "UBOT1", false}, + {"mention without angle brackets", "hey @UBOT1 fix", "UBOT1", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := hasBotMention(tt.text, tt.botUserID); got != tt.want { + t.Errorf("hasBotMention() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMatchesTriggers(t *testing.T) { + tests := []struct { + name string + text string + triggers []v1alpha1.SlackTrigger + botUserID string + want bool + }{ + { + name: "pattern matches with mention", + text: "<@UBOT1> deploy prod", + triggers: []v1alpha1.SlackTrigger{{Pattern: "deploy"}}, + botUserID: "UBOT1", + want: true, + }, + { + name: "pattern matches without mention requires mention", + text: "deploy prod", + triggers: []v1alpha1.SlackTrigger{{Pattern: "deploy"}}, + botUserID: "UBOT1", + want: false, + }, + { + name: "mentionOptional allows pattern only", + text: "deploy prod", + triggers: []v1alpha1.SlackTrigger{{Pattern: "deploy", MentionOptional: boolPtr(true)}}, + botUserID: "UBOT1", + want: true, + }, + { + name: "pattern does not match", + text: "<@UBOT1> rollback", + triggers: []v1alpha1.SlackTrigger{{Pattern: "deploy"}}, + botUserID: "UBOT1", + want: false, + }, + { + name: "OR semantics across triggers", + text: "<@UBOT1> rollback", + triggers: []v1alpha1.SlackTrigger{ + {Pattern: "deploy"}, + {Pattern: "rollback"}, + }, + botUserID: "UBOT1", + want: true, + }, + { + name: "invalid regex skipped", + text: "<@UBOT1> fix it", + triggers: []v1alpha1.SlackTrigger{{Pattern: "[invalid"}, {Pattern: "fix"}}, + botUserID: "UBOT1", + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := matchesTriggers(tt.text, tt.triggers, tt.botUserID); got != tt.want { + t.Errorf("matchesTriggers() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/slack/handler.go b/internal/slack/handler.go new file mode 100644 index 00000000..42ff93a9 --- /dev/null +++ b/internal/slack/handler.go @@ -0,0 +1,399 @@ +package slack + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + stdlog "log" + "os" + "strings" + "time" + + "github.com/go-logr/logr" + goslack "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" + "github.com/slack-go/slack/socketmode" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/taskbuilder" +) + +const ( + // AnnotationSlackReporting indicates that Slack reporting is enabled + // for this Task. + AnnotationSlackReporting = "kelos.dev/slack-reporting" + + // AnnotationSlackChannel records the Slack channel ID where the + // originating message was posted. + AnnotationSlackChannel = "kelos.dev/slack-channel" + + // AnnotationSlackThreadTS records the originating message timestamp, + // used as thread_ts for posting replies. + AnnotationSlackThreadTS = "kelos.dev/slack-thread-ts" +) + +const enrichCallTimeout = 5 * time.Second + +// SlackHandler handles Slack messages via Socket Mode and routes them to +// matching TaskSpawners. It is the centralized equivalent of the per-TaskSpawner +// SlackSource that previously ran in each spawner pod. +type SlackHandler struct { + client client.Client + log logr.Logger + taskBuilder *taskbuilder.TaskBuilder + api *goslack.Client + sm *socketmode.Client + botUserID string + cancel context.CancelFunc +} + +// NewSlackHandler creates a new handler. Call Start to begin listening. +func NewSlackHandler(ctx context.Context, cl client.Client, botToken, appToken string, log logr.Logger) (*SlackHandler, error) { + api := goslack.New(botToken, goslack.OptionAppLevelToken(appToken)) + + authResp, err := api.AuthTestContext(ctx) + if err != nil { + return nil, fmt.Errorf("Slack auth test failed: %w", err) + } + + tb, err := taskbuilder.NewTaskBuilder(cl) + if err != nil { + return nil, fmt.Errorf("Creating task builder: %w", err) + } + + log.Info("Authenticated with Slack", "botUserID", authResp.UserID) + + return &SlackHandler{ + client: cl, + log: log, + taskBuilder: tb, + api: api, + sm: newSocketModeClient(api), + botUserID: authResp.UserID, + }, nil +} + +// Start connects to Slack via Socket Mode and begins listening for events. +// It blocks until the context is cancelled. +func (h *SlackHandler) Start(ctx context.Context) error { + bgCtx, cancel := context.WithCancel(ctx) + h.cancel = cancel + + go func() { + if err := h.sm.RunContext(bgCtx); err != nil { + h.log.Error(err, "Socket Mode connection closed with error") + } else { + h.log.Info("Socket Mode connection closed cleanly") + } + }() + + for { + select { + case <-bgCtx.Done(): + return bgCtx.Err() + case evt, ok := <-h.sm.Events: + if !ok { + h.log.Info("Socket Mode events channel closed, exiting listener") + return fmt.Errorf("Socket Mode events channel closed unexpectedly") + } + switch evt.Type { + case socketmode.EventTypeEventsAPI: + h.handleEventsAPI(bgCtx, evt) + case socketmode.EventTypeSlashCommand: + h.handleSlashCommand(bgCtx, evt) + default: + h.log.V(1).Info("Unhandled Socket Mode event type", "type", evt.Type) + } + } + } +} + +// Stop shuts down the Socket Mode listener. +func (h *SlackHandler) Stop() { + if h.cancel != nil { + h.cancel() + } +} + +func (h *SlackHandler) handleEventsAPI(ctx context.Context, evt socketmode.Event) { + eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) + if !ok { + h.sm.Ack(*evt.Request) + return + } + h.sm.Ack(*evt.Request) + + innerEvent, ok := eventsAPIEvent.InnerEvent.Data.(*slackevents.MessageEvent) + if !ok { + return + } + + hasContent := innerEvent.Text != "" || + (innerEvent.Message != nil && len(innerEvent.Message.Attachments) > 0) + if !shouldProcess(innerEvent.User, innerEvent.SubType, hasContent, h.botUserID) { + h.log.V(1).Info("Message filtered by shouldProcess", + "user", innerEvent.User, "subtype", innerEvent.SubType, "channel", innerEvent.Channel) + return + } + + // Enrich message with user info, permalink, channel name + msg := h.enrichMessage(ctx, innerEvent) + + // For thread replies, fetch full thread context so the agent sees + // the entire conversation. Spawner filters (mention + triggers) + // decide whether to process the message. + if innerEvent.ThreadTimeStamp != "" { + body, err := FetchThreadContext(ctx, h.api, innerEvent.Channel, innerEvent.ThreadTimeStamp, h.botUserID) + if err != nil { + h.log.Error(err, "Failed to fetch thread context", "channel", innerEvent.Channel, "threadTS", innerEvent.ThreadTimeStamp) + return + } + msg.Body = body + msg.HasThreadContext = true + } + + h.routeMessage(ctx, msg) +} + +func (h *SlackHandler) handleSlashCommand(ctx context.Context, evt socketmode.Event) { + cmd, ok := evt.Data.(goslack.SlashCommand) + if !ok { + h.sm.Ack(*evt.Request) + return + } + h.sm.Ack(*evt.Request) + + if cmd.UserID == h.botUserID { + return + } + + body := strings.TrimSpace(cmd.Text) + if body == "" { + return + } + + msg := &SlackMessageData{ + UserID: cmd.UserID, + ChannelID: cmd.ChannelID, + UserName: cmd.UserName, + Text: cmd.Text, + Body: body, + IsSlashCommand: true, + SlashCommandID: fmt.Sprintf("%s:%s:%s", cmd.ChannelID, cmd.Command, cmd.TriggerID), + } + + h.routeMessage(ctx, msg) +} + +// routeMessage finds all matching TaskSpawners and creates tasks for each. +func (h *SlackHandler) routeMessage(ctx context.Context, msg *SlackMessageData) { + spawners, err := h.getMatchingSpawners(ctx) + if err != nil { + h.log.Error(err, "Failed to get matching spawners") + return + } + + if len(spawners) == 0 { + h.log.V(1).Info("No matching TaskSpawners for Slack message", "channel", msg.ChannelID) + return + } + + for _, spawner := range spawners { + spawnerLog := h.log.WithValues("spawner", spawner.Name, "namespace", spawner.Namespace) + + // Check if suspended + if spawner.Spec.Suspend != nil && *spawner.Spec.Suspend { + spawnerLog.V(1).Info("Skipping suspended TaskSpawner") + continue + } + + // Check max concurrency + if spawner.Spec.MaxConcurrency != nil && *spawner.Spec.MaxConcurrency > 0 { + if int32(spawner.Status.ActiveTasks) >= *spawner.Spec.MaxConcurrency { + spawnerLog.Info("Max concurrency reached, dropping message", + "activeTasks", spawner.Status.ActiveTasks, + "maxConcurrency", *spawner.Spec.MaxConcurrency) + continue + } + } + + slackCfg := spawner.Spec.When.Slack + + // Check channel, mention, and trigger filters + if !MatchesSpawner(slackCfg, msg, h.botUserID) { + spawnerLog.V(1).Info("Message did not match spawner filters", + "channel", msg.ChannelID, "triggerCount", len(slackCfg.Triggers)) + continue + } + + taskMsg := *msg + + spawnerLog.Info("Message matches TaskSpawner — creating task", "channel", msg.ChannelID, "user", msg.UserID) + + if err := h.createTask(ctx, spawner, &taskMsg); err != nil { + spawnerLog.Error(err, "Failed to create task") + continue + } + } +} + +// getMatchingSpawners returns all TaskSpawners that have a Slack source configured. +func (h *SlackHandler) getMatchingSpawners(ctx context.Context) ([]*v1alpha1.TaskSpawner, error) { + var spawnerList v1alpha1.TaskSpawnerList + if err := h.client.List(ctx, &spawnerList, &client.ListOptions{}); err != nil { + return nil, err + } + + var matching []*v1alpha1.TaskSpawner + for i := range spawnerList.Items { + spawner := &spawnerList.Items[i] + if spawner.Spec.When.Slack != nil { + matching = append(matching, spawner) + } + } + + return matching, nil +} + +// createTask creates a Task for the given TaskSpawner from a Slack message. +func (h *SlackHandler) createTask(ctx context.Context, spawner *v1alpha1.TaskSpawner, msg *SlackMessageData) error { + templateVars := ExtractSlackWorkItem(msg) + + // Build unique task name using a hash of the message identifier + hashInput := fmt.Sprintf("%s-%s", msg.ChannelID, msg.Timestamp) + if msg.IsSlashCommand { + hashInput = msg.SlashCommandID + } + sum := sha256.Sum256([]byte(hashInput)) + shortHash := hex.EncodeToString(sum[:])[:12] + taskName := fmt.Sprintf("%s-slack-%s", spawner.Name, shortHash) + if len(taskName) > 63 { + taskName = strings.TrimRight(taskName[:63], "-.") + } + + // Resolve GVK for owner reference + gvks, _, err := h.client.Scheme().ObjectKinds(spawner) + if err != nil || len(gvks) == 0 { + return fmt.Errorf("Failed to get GVK for TaskSpawner: %w", err) + } + gvk := gvks[0] + + task, err := h.taskBuilder.BuildTask( + taskName, + spawner.Namespace, + &spawner.Spec.TaskTemplate, + templateVars, + &taskbuilder.SpawnerRef{ + Name: spawner.Name, + UID: string(spawner.UID), + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + }, + ) + if err != nil { + return fmt.Errorf("Building task: %w", err) + } + + // Add Slack reporting annotations + if task.Annotations == nil { + task.Annotations = make(map[string]string) + } + task.Annotations[AnnotationSlackReporting] = "enabled" + task.Annotations[AnnotationSlackChannel] = msg.ChannelID + + // Only set thread_ts for real message timestamps (not slash command composite IDs). + // Slash commands intentionally skip status reporting — there is no thread to reply to. + if !msg.IsSlashCommand { + threadTS := msg.Timestamp + if msg.ThreadTS != "" { + threadTS = msg.ThreadTS + } + task.Annotations[AnnotationSlackThreadTS] = threadTS + } + + if err := h.client.Create(ctx, task); err != nil { + if apierrors.IsAlreadyExists(err) { + h.log.Info("Task already exists, skipping", "task", taskName) + return nil + } + return fmt.Errorf("Creating task: %w", err) + } + + h.log.Info("Created task from Slack message", "task", taskName, "spawner", spawner.Name) + return nil +} + +// enrichMessage builds a SlackMessageData from a raw Slack message event, +// enriching it with user info and permalink. +func (h *SlackHandler) enrichMessage(ctx context.Context, event *slackevents.MessageEvent) *SlackMessageData { + userName := event.User + userCtx, userCancel := context.WithTimeout(ctx, enrichCallTimeout) + defer userCancel() + if info, err := h.api.GetUserInfoContext(userCtx, event.User); err == nil { + userName = info.RealName + if userName == "" { + userName = info.Name + } + } + + permalink := "" + linkCtx, linkCancel := context.WithTimeout(ctx, enrichCallTimeout) + defer linkCancel() + if link, err := h.api.GetPermalinkContext(linkCtx, &goslack.PermalinkParameters{ + Channel: event.Channel, + Ts: event.TimeStamp, + }); err == nil { + permalink = link + } + + body := event.Text + if event.Message != nil && len(event.Message.Attachments) > 0 { + if attachText := formatAttachments(event.Message.Attachments); attachText != "" { + if body != "" { + body = body + "\n" + attachText + } else { + body = attachText + } + } + } + + return &SlackMessageData{ + UserID: event.User, + ChannelID: event.Channel, + UserName: userName, + Text: event.Text, + Body: body, + ThreadTS: event.ThreadTimeStamp, + Timestamp: event.TimeStamp, + Permalink: permalink, + } +} + +// newSocketModeClient creates a Socket Mode client with an stderr logger. +// Set SLACK_SOCKET_DEBUG=1 to enable verbose WebSocket frame logging. +func newSocketModeClient(api *goslack.Client) *socketmode.Client { + opts := []socketmode.Option{ + socketmode.OptionLog(stdlog.New(os.Stderr, "socketmode: ", stdlog.LstdFlags|stdlog.Lshortfile)), + } + if os.Getenv("SLACK_SOCKET_DEBUG") == "1" { + opts = append(opts, socketmode.OptionDebug(true)) + } + return socketmode.New(api, opts...) +} + +// shouldProcess decides whether a Slack message should be processed. +// It filters out bot messages, self-messages, and message subtypes we don't handle. +// hasContent should be true when the message has text or attachments. +func shouldProcess(userID, subtype string, hasContent bool, selfUserID string) bool { + if userID == selfUserID { + return false + } + switch subtype { + case "bot_message", "message_changed", "message_deleted", "message_replied": + return false + } + return hasContent +} diff --git a/internal/slack/handler_test.go b/internal/slack/handler_test.go new file mode 100644 index 00000000..bd06638e --- /dev/null +++ b/internal/slack/handler_test.go @@ -0,0 +1,179 @@ +package slack + +import ( + "context" + "testing" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/kelos-dev/kelos/api/v1alpha1" + "github.com/kelos-dev/kelos/internal/taskbuilder" +) + +// TestRouteMessageThreadContextBody verifies that routeMessage preserves the +// thread context body for thread replies (HasThreadContext=true) and uses the +// trigger-processed body for top-level messages. +func TestRouteMessageThreadContextBody(t *testing.T) { + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + + spawner := &v1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + UID: "spawner-uid", + }, + Spec: v1alpha1.TaskSpawnerSpec{ + When: v1alpha1.When{ + Slack: &v1alpha1.Slack{}, + }, + TaskTemplate: v1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: v1alpha1.Credentials{ + Type: v1alpha1.CredentialTypeNone, + }, + PromptTemplate: "{{.Body}}", + }, + }, + } + + tests := []struct { + name string + msg *SlackMessageData + wantBody string + }{ + { + name: "top-level message uses raw text as body", + msg: &SlackMessageData{ + UserID: "U1", + ChannelID: "C1", + Text: "<@UBOT> fix the bug", + Body: "<@UBOT> fix the bug", + Timestamp: "1111111111.111111", + }, + wantBody: "<@UBOT> fix the bug", + }, + { + name: "top-level message with attachments preserves full body", + msg: &SlackMessageData{ + UserID: "U1", + ChannelID: "C1", + Text: "<@UBOT> fix the bug", + Body: "<@UBOT> fix the bug\n[Attachment: error log]\nStackTrace: panic at line 42", + Timestamp: "3333333333.333333", + }, + wantBody: "<@UBOT> fix the bug\n[Attachment: error log]\nStackTrace: panic at line 42", + }, + { + name: "thread reply with context preserves thread body", + msg: &SlackMessageData{ + UserID: "U1", + ChannelID: "C1", + Text: "<@UBOT> can you take a look", + Body: "Slack thread conversation:\n\nUser: original question\n\nUser: <@UBOT> can you take a look\n", + ThreadTS: "1111111111.000000", + Timestamp: "2222222222.222222", + HasThreadContext: true, + }, + // HasThreadContext=true means the thread body is preserved as-is + wantBody: "Slack thread conversation:\n\nUser: original question\n\nUser: <@UBOT> can you take a look\n", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(spawner.DeepCopy()). + Build() + + tb, err := taskbuilder.NewTaskBuilder(cl) + if err != nil { + t.Fatalf("NewTaskBuilder: %v", err) + } + + h := &SlackHandler{ + client: cl, + log: logr.Discard(), + taskBuilder: tb, + botUserID: "UBOT", + } + + h.routeMessage(context.Background(), tt.msg) + + // Verify a task was created with the expected body + var tasks v1alpha1.TaskList + if err := cl.List(context.Background(), &tasks); err != nil { + t.Fatalf("List tasks: %v", err) + } + if len(tasks.Items) != 1 { + t.Fatalf("Expected 1 task, got %d", len(tasks.Items)) + } + if tasks.Items[0].Spec.Prompt != tt.wantBody { + t.Errorf("Task prompt = %q, want %q", tasks.Items[0].Spec.Prompt, tt.wantBody) + } + }) + } +} + +func TestCreateTaskAlreadyExists(t *testing.T) { + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + + spawner := &v1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + UID: "spawner-uid", + }, + Spec: v1alpha1.TaskSpawnerSpec{ + TaskTemplate: v1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: v1alpha1.Credentials{ + Type: v1alpha1.CredentialTypeNone, + }, + PromptTemplate: "{{.Body}}", + }, + }, + } + + msg := &SlackMessageData{ + UserID: "U123", + ChannelID: "C456", + Text: "hello", + Body: "hello", + Timestamp: "1234567890.123456", + } + + tb, err := taskbuilder.NewTaskBuilder(nil) + if err != nil { + t.Fatalf("NewTaskBuilder: %v", err) + } + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + h := &SlackHandler{ + client: cl, + log: logr.Discard(), + taskBuilder: tb, + } + + // First call should succeed + if err := h.createTask(context.Background(), spawner, msg); err != nil { + t.Fatalf("First createTask() error: %v", err) + } + + // Second call with same message should not return an error (AlreadyExists is handled) + if err := h.createTask(context.Background(), spawner, msg); err != nil { + t.Fatalf("Second createTask() should not error on AlreadyExists, got: %v", err) + } +} diff --git a/internal/slack/thread.go b/internal/slack/thread.go new file mode 100644 index 00000000..df12baba --- /dev/null +++ b/internal/slack/thread.go @@ -0,0 +1,93 @@ +package slack + +import ( + "context" + "fmt" + "strings" + "time" + + goslack "github.com/slack-go/slack" +) + +const threadFetchTimeout = 10 * time.Second + +// BotParticipated returns true if any message in the thread was sent by the +// given bot user ID. This prevents processing thread replies in conversations +// the bot never participated in. +func BotParticipated(msgs []goslack.Message, botUserID string) bool { + for _, m := range msgs { + if m.User == botUserID { + return true + } + } + return false +} + +// FormatThreadContext formats a Slack thread's messages into a readable +// conversation string for use as a follow-up task prompt. Messages from the +// bot itself are labeled as "Agent" while all others use "User". +func FormatThreadContext(msgs []goslack.Message, botUserID string) string { + var b strings.Builder + b.WriteString("Slack thread conversation:\n") + for _, m := range msgs { + attachText := formatAttachments(m.Attachments) + if m.Text == "" && attachText == "" { + continue + } + role := "User" + if m.User == botUserID || m.BotID != "" { + role = "Agent" + } + switch { + case m.Text != "" && attachText != "": + fmt.Fprintf(&b, "\n%s: %s\n%s\n", role, m.Text, attachText) + case m.Text != "": + fmt.Fprintf(&b, "\n%s: %s\n", role, m.Text) + default: + fmt.Fprintf(&b, "\n%s: [attachment]\n%s\n", role, attachText) + } + } + return b.String() +} + +// formatAttachments extracts text content from Slack message attachments +// (forwarded messages, unfurls, etc.) and returns a formatted string. +// Returns empty string if there are no text-bearing attachments. +func formatAttachments(attachments []goslack.Attachment) string { + var parts []string + for _, a := range attachments { + var lines []string + if a.Pretext != "" { + lines = append(lines, a.Pretext) + } + if a.Text != "" { + lines = append(lines, "> "+strings.ReplaceAll(a.Text, "\n", "\n> ")) + } + if a.Fallback != "" && a.Text == "" { + lines = append(lines, "> "+strings.ReplaceAll(a.Fallback, "\n", "\n> ")) + } + if len(lines) > 0 { + parts = append(parts, strings.Join(lines, "\n")) + } + } + return strings.Join(parts, "\n") +} + +// FetchThreadContext fetches the full thread history and returns formatted +// context. The caller decides whether to process the message — this function +// always returns the thread body when the API call succeeds. +func FetchThreadContext(ctx context.Context, api *goslack.Client, channelID, threadTS, botUserID string) (string, error) { + threadCtx, cancel := context.WithTimeout(ctx, threadFetchTimeout) + defer cancel() + + msgs, _, _, err := api.GetConversationRepliesContext(threadCtx, + &goslack.GetConversationRepliesParameters{ + ChannelID: channelID, + Timestamp: threadTS, + }) + if err != nil { + return "", fmt.Errorf("fetching thread replies: %w", err) + } + + return FormatThreadContext(msgs, botUserID), nil +} diff --git a/internal/slack/thread_test.go b/internal/slack/thread_test.go new file mode 100644 index 00000000..4123d546 --- /dev/null +++ b/internal/slack/thread_test.go @@ -0,0 +1,238 @@ +package slack + +import ( + "strings" + "testing" + + goslack "github.com/slack-go/slack" +) + +func TestBotParticipated(t *testing.T) { + tests := []struct { + name string + msgs []goslack.Message + botUserID string + want bool + }{ + { + name: "empty thread", + msgs: nil, + botUserID: "UBOT", + want: false, + }, + { + name: "bot not in thread", + msgs: []goslack.Message{ + {Msg: goslack.Msg{User: "U1", Text: "hello"}}, + {Msg: goslack.Msg{User: "U2", Text: "world"}}, + }, + botUserID: "UBOT", + want: false, + }, + { + name: "bot in thread", + msgs: []goslack.Message{ + {Msg: goslack.Msg{User: "U1", Text: "hello"}}, + {Msg: goslack.Msg{User: "UBOT", Text: "I can help"}}, + }, + botUserID: "UBOT", + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := BotParticipated(tt.msgs, tt.botUserID); got != tt.want { + t.Errorf("BotParticipated() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFormatThreadContext(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{User: "U1", Text: "fix the bug please"}}, + {Msg: goslack.Msg{User: "UBOT", Text: "Working on it"}}, + {Msg: goslack.Msg{User: "U1", Text: "any updates?"}}, + {Msg: goslack.Msg{Text: ""}}, // empty text, should be skipped + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.HasPrefix(result, "Slack thread conversation:\n") { + t.Error("Expected thread context to start with header") + } + if !strings.Contains(result, "User: fix the bug please") { + t.Error("Expected user message to be labeled as User") + } + if !strings.Contains(result, "Agent: Working on it") { + t.Error("Expected bot message to be labeled as Agent") + } + if !strings.Contains(result, "User: any updates?") { + t.Error("Expected follow-up user message") + } + // Empty text message should be skipped + if strings.Count(result, "User:") != 2 || strings.Count(result, "Agent:") != 1 { + t.Errorf("Unexpected message count in output:\n%s", result) + } +} + +func TestFormatThreadContext_BotIDMessage(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{User: "U1", Text: "hello"}}, + {Msg: goslack.Msg{BotID: "B123", Text: "Bot response"}}, + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.Contains(result, "Agent: Bot response") { + t.Error("Expected message with BotID to be labeled as Agent") + } +} + +func TestFormatThreadContext_WithAttachments(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{ + User: "U1", + Text: "This looks new in 379.0", + Attachments: []goslack.Attachment{ + {Text: "Original forwarded message content"}, + }, + }}, + {Msg: goslack.Msg{User: "U1", Text: "<@UBOT> investigate this"}}, + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.Contains(result, "User: This looks new in 379.0") { + t.Error("Expected user text to appear") + } + if !strings.Contains(result, "> Original forwarded message content") { + t.Errorf("Expected attachment text to appear blockquoted, got:\n%s", result) + } +} + +func TestFormatThreadContext_AttachmentOnly(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{ + User: "U1", + Attachments: []goslack.Attachment{ + {Text: "Forwarded without commentary"}, + }, + }}, + {Msg: goslack.Msg{User: "U1", Text: "<@UBOT> look at this"}}, + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.Contains(result, "User: [attachment]") { + t.Errorf("Expected attachment-only message to have [attachment] label, got:\n%s", result) + } + if !strings.Contains(result, "> Forwarded without commentary") { + t.Errorf("Expected attachment text to appear, got:\n%s", result) + } +} + +func TestFormatThreadContext_AttachmentWithPretext(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{ + User: "U1", + Text: "Check this out", + Attachments: []goslack.Attachment{ + {Pretext: "Message from #general", Text: "The actual content"}, + }, + }}, + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.Contains(result, "User: Check this out") { + t.Error("Expected user text") + } + if !strings.Contains(result, "Message from #general") { + t.Errorf("Expected pretext to appear, got:\n%s", result) + } + if !strings.Contains(result, "> The actual content") { + t.Errorf("Expected attachment text blockquoted, got:\n%s", result) + } +} + +func TestFormatThreadContext_FallbackOnly(t *testing.T) { + msgs := []goslack.Message{ + {Msg: goslack.Msg{ + User: "U1", + Attachments: []goslack.Attachment{ + {Fallback: "Fallback text for unsupported attachment"}, + }, + }}, + } + + result := FormatThreadContext(msgs, "UBOT") + + if !strings.Contains(result, "> Fallback text for unsupported attachment") { + t.Errorf("Expected fallback text when no primary text, got:\n%s", result) + } +} + +func TestFormatAttachments(t *testing.T) { + t.Run("empty attachments", func(t *testing.T) { + result := formatAttachments(nil) + if result != "" { + t.Errorf("Expected empty string for nil attachments, got: %q", result) + } + }) + + t.Run("attachment with text only", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Text: "hello world"}, + }) + if result != "> hello world" { + t.Errorf("Expected blockquoted text, got: %q", result) + } + }) + + t.Run("attachment with multiline text", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Text: "line one\nline two"}, + }) + if result != "> line one\n> line two" { + t.Errorf("Expected each line blockquoted, got: %q", result) + } + }) + + t.Run("fallback ignored when text present", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Text: "primary", Fallback: "fallback"}, + }) + if strings.Contains(result, "fallback") { + t.Errorf("Fallback should not appear when text is present, got: %q", result) + } + }) + + t.Run("fallback used when no text", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Fallback: "fallback only"}, + }) + if result != "> fallback only" { + t.Errorf("Expected fallback to be used, got: %q", result) + } + }) + + t.Run("multiline fallback blockquoted", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Fallback: "line one\nline two"}, + }) + if result != "> line one\n> line two" { + t.Errorf("Expected each fallback line blockquoted, got: %q", result) + } + }) + + t.Run("attachment with no text fields", func(t *testing.T) { + result := formatAttachments([]goslack.Attachment{ + {Color: "danger"}, + }) + if result != "" { + t.Errorf("Expected empty string for non-text attachment, got: %q", result) + } + }) +}