Skip to content

Commit 1c6e02a

Browse files
jkahujaclaude
andcommitted
feat: add Slack status feedback as thread replies
When a Slack message triggers a Task, the bot now posts thread replies to the originating message as the Task progresses: - "Working on your request..." when accepted - "Done!" when succeeded - "Failed." when failed Implementation: - SlackReporter posts/updates thread replies via slack-go - SlackTaskReporter mirrors GitHubTaskReporter pattern using Slack-specific annotations for channel, thread_ts, reply_ts - sourceAnnotations() stamps Slack metadata on Tasks at creation - reportingEnabled() returns true for Slack sources (always-on) - buildWorkItem() now carries channel ID for annotation stamping Towards AIE-17 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6bcf85c commit 1c6e02a

8 files changed

Lines changed: 457 additions & 29 deletions

File tree

cmd/kelos-spawner/main.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,23 @@ func runReportingCycle(ctx context.Context, cl client.Client, key types.Namespac
186186
return nil
187187
}
188188

189+
func runSlackReportingCycle(ctx context.Context, cl client.Client, key types.NamespacedName, reporter *reporting.SlackTaskReporter) error {
190+
var taskList kelosv1alpha1.TaskList
191+
if err := cl.List(ctx, &taskList,
192+
client.InNamespace(key.Namespace),
193+
client.MatchingLabels{"kelos.dev/taskspawner": key.Name},
194+
); err != nil {
195+
return fmt.Errorf("listing tasks for Slack reporting: %w", err)
196+
}
197+
198+
for i := range taskList.Items {
199+
if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil {
200+
ctrl.Log.WithName("spawner").Error(err, "Reporting Slack task status", "task", taskList.Items[i].Name)
201+
}
202+
}
203+
return nil
204+
}
205+
189206
func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) error {
190207
start := time.Now()
191208
err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers, httpClient)
@@ -526,10 +543,18 @@ func renderTaskTemplateMetadata(ts *kelosv1alpha1.TaskSpawner, item source.WorkI
526543
return labels, annotations, nil
527544
}
528545

529-
// sourceAnnotations returns annotations that stamp GitHub source metadata
530-
// onto a spawned Task. These annotations enable downstream consumers (such
531-
// as the reporting watcher) to identify the originating issue or PR.
546+
// sourceAnnotations returns annotations that stamp source metadata onto a
547+
// spawned Task. These annotations enable downstream consumers (such as the
548+
// reporting watcher) to identify the originating issue, PR, or Slack message.
532549
func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[string]string {
550+
if ts.Spec.When.Slack != nil && len(item.Labels) >= 2 {
551+
return map[string]string{
552+
reporting.AnnotationSlackReporting: "enabled",
553+
reporting.AnnotationSlackChannel: item.Labels[1],
554+
reporting.AnnotationSlackThreadTS: item.ID,
555+
}
556+
}
557+
533558
if ts.Spec.When.GitHubIssues == nil && ts.Spec.When.GitHubPullRequests == nil {
534559
return nil
535560
}
@@ -551,9 +576,12 @@ func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[
551576
return annotations
552577
}
553578

554-
// reportingEnabled returns true when GitHub reporting is configured and enabled
579+
// reportingEnabled returns true when reporting is configured and enabled
555580
// on the TaskSpawner.
556581
func reportingEnabled(ts *kelosv1alpha1.TaskSpawner) bool {
582+
if ts.Spec.When.Slack != nil {
583+
return true
584+
}
557585
if ts.Spec.When.GitHubIssues != nil && ts.Spec.When.GitHubIssues.Reporting != nil {
558586
return ts.Spec.When.GitHubIssues.Reporting.Enabled
559587
}

cmd/kelos-spawner/reconciler.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"os"
78
"time"
89

910
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -82,25 +83,36 @@ func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cf
8283
}
8384

8485
if reportingEnabled(&ts) {
85-
token, err := readGitHubToken(cfg.GitHubTokenFile)
86-
if err != nil {
87-
return 0, fmt.Errorf("reading GitHub token for reporting: %w", err)
88-
}
86+
if ts.Spec.When.Slack != nil {
87+
botToken := os.Getenv("SLACK_BOT_TOKEN")
88+
slackReporter := &reporting.SlackTaskReporter{
89+
Client: cl,
90+
Reporter: &reporting.SlackReporter{BotToken: botToken},
91+
}
92+
if err := runSlackReportingCycle(ctx, cl, key, slackReporter); err != nil {
93+
return 0, err
94+
}
95+
} else {
96+
token, err := readGitHubToken(cfg.GitHubTokenFile)
97+
if err != nil {
98+
return 0, fmt.Errorf("reading GitHub token for reporting: %w", err)
99+
}
89100

90-
// Reporting always uses the direct API base URL (writes bypass the proxy).
91-
reporter := &reporting.TaskReporter{
92-
Client: cl,
93-
Reporter: &reporting.GitHubReporter{
94-
Owner: cfg.GitHubOwner,
95-
Repo: cfg.GitHubRepo,
96-
Token: token,
97-
TokenFile: cfg.GitHubTokenFile,
98-
BaseURL: cfg.GitHubAPIBaseURL,
99-
Client: cfg.HTTPClient,
100-
},
101-
}
102-
if err := runReportingCycle(ctx, cl, key, reporter); err != nil {
103-
return 0, err
101+
// Reporting always uses the direct API base URL (writes bypass the proxy).
102+
reporter := &reporting.TaskReporter{
103+
Client: cl,
104+
Reporter: &reporting.GitHubReporter{
105+
Owner: cfg.GitHubOwner,
106+
Repo: cfg.GitHubRepo,
107+
Token: token,
108+
TokenFile: cfg.GitHubTokenFile,
109+
BaseURL: cfg.GitHubAPIBaseURL,
110+
Client: cfg.HTTPClient,
111+
},
112+
}
113+
if err := runReportingCycle(ctx, cl, key, reporter); err != nil {
114+
return 0, err
115+
}
104116
}
105117
}
106118

internal/reporting/slack.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package reporting
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/slack-go/slack"
8+
)
9+
10+
// SlackReporter posts and updates thread replies in Slack channels.
11+
type SlackReporter struct {
12+
// BotToken is the Bot User OAuth Token (xoxb-...).
13+
BotToken string
14+
}
15+
16+
// PostThreadReply posts a new message as a thread reply and returns the
17+
// reply's message timestamp.
18+
func (r *SlackReporter) PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) {
19+
api := slack.New(r.BotToken)
20+
_, ts, err := api.PostMessageContext(ctx, channel,
21+
slack.MsgOptionText(text, false),
22+
slack.MsgOptionTS(threadTS),
23+
)
24+
if err != nil {
25+
return "", fmt.Errorf("posting Slack thread reply: %w", err)
26+
}
27+
return ts, nil
28+
}
29+
30+
// UpdateMessage updates an existing Slack message in place.
31+
func (r *SlackReporter) UpdateMessage(ctx context.Context, channel, messageTS, text string) error {
32+
api := slack.New(r.BotToken)
33+
_, _, _, err := api.UpdateMessageContext(ctx, channel, messageTS,
34+
slack.MsgOptionText(text, false),
35+
)
36+
if err != nil {
37+
return fmt.Errorf("updating Slack message: %w", err)
38+
}
39+
return nil
40+
}
41+
42+
// FormatSlackAccepted returns the thread reply text for an accepted task.
43+
func FormatSlackAccepted(taskName string) string {
44+
return fmt.Sprintf("Working on your request... (Task: %s)", taskName)
45+
}
46+
47+
// FormatSlackSucceeded returns the thread reply text for a succeeded task.
48+
func FormatSlackSucceeded(taskName string) string {
49+
return fmt.Sprintf("Done! (Task: %s)", taskName)
50+
}
51+
52+
// FormatSlackFailed returns the thread reply text for a failed task.
53+
func FormatSlackFailed(taskName string) string {
54+
return fmt.Sprintf("Failed. (Task: %s)", taskName)
55+
}

internal/reporting/slack_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package reporting
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
func TestFormatSlackMessages(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
fn func(string) string
12+
taskName string
13+
want string
14+
}{
15+
{
16+
name: "accepted",
17+
fn: FormatSlackAccepted,
18+
taskName: "spawner-1234567890.123456",
19+
want: "Working on your request... (Task: spawner-1234567890.123456)",
20+
},
21+
{
22+
name: "succeeded",
23+
fn: FormatSlackSucceeded,
24+
taskName: "spawner-1234567890.123456",
25+
want: "Done! (Task: spawner-1234567890.123456)",
26+
},
27+
{
28+
name: "failed",
29+
fn: FormatSlackFailed,
30+
taskName: "spawner-1234567890.123456",
31+
want: "Failed. (Task: spawner-1234567890.123456)",
32+
},
33+
}
34+
35+
for _, tt := range tests {
36+
t.Run(tt.name, func(t *testing.T) {
37+
got := tt.fn(tt.taskName)
38+
if got != tt.want {
39+
t.Errorf("got %q, want %q", got, tt.want)
40+
}
41+
})
42+
}
43+
}
44+
45+
func TestSlackReporterConstruction(t *testing.T) {
46+
reporter := &SlackReporter{BotToken: "xoxb-test-token"}
47+
if reporter.BotToken != "xoxb-test-token" {
48+
t.Errorf("BotToken = %q, want %q", reporter.BotToken, "xoxb-test-token")
49+
}
50+
}
51+
52+
func TestSlackReporter_PostThreadReplyError(t *testing.T) {
53+
reporter := &SlackReporter{BotToken: "xoxb-invalid"}
54+
_, err := reporter.PostThreadReply(context.Background(), "C123", "1234.5678", "test")
55+
if err == nil {
56+
t.Error("expected error with invalid token, got nil")
57+
}
58+
}
59+
60+
func TestSlackReporter_UpdateMessageError(t *testing.T) {
61+
reporter := &SlackReporter{BotToken: "xoxb-invalid"}
62+
err := reporter.UpdateMessage(context.Background(), "C123", "1234.5678", "test")
63+
if err == nil {
64+
t.Error("expected error with invalid token, got nil")
65+
}
66+
}

internal/reporting/watcher.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,26 @@ const (
3232
// AnnotationGitHubReportPhase records the last Task phase that was
3333
// reported to GitHub, preventing duplicate API calls on re-list.
3434
AnnotationGitHubReportPhase = "kelos.dev/github-report-phase"
35+
36+
// AnnotationSlackReporting indicates that Slack reporting is enabled
37+
// for this Task.
38+
AnnotationSlackReporting = "kelos.dev/slack-reporting"
39+
40+
// AnnotationSlackChannel records the Slack channel ID where the
41+
// originating message was posted.
42+
AnnotationSlackChannel = "kelos.dev/slack-channel"
43+
44+
// AnnotationSlackThreadTS records the originating message timestamp,
45+
// used as thread_ts for posting replies.
46+
AnnotationSlackThreadTS = "kelos.dev/slack-thread-ts"
47+
48+
// AnnotationSlackReplyTS stores the message timestamp of the status
49+
// reply so subsequent updates edit the same message.
50+
AnnotationSlackReplyTS = "kelos.dev/slack-reply-ts"
51+
52+
// AnnotationSlackReportPhase records the last Task phase that was
53+
// reported to Slack, preventing duplicate API calls on re-list.
54+
AnnotationSlackReportPhase = "kelos.dev/slack-report-phase"
3555
)
3656

3757
// TaskReporter watches Tasks and reports status changes to GitHub.
@@ -156,3 +176,107 @@ func (tr *TaskReporter) persistReportingState(ctx context.Context, task *kelosv1
156176

157177
return nil
158178
}
179+
180+
// SlackTaskReporter watches Tasks and reports status changes to Slack
181+
// as thread replies on the originating message.
182+
type SlackTaskReporter struct {
183+
Client client.Client
184+
Reporter *SlackReporter
185+
}
186+
187+
// ReportTaskStatus checks a Task's current phase against its last reported
188+
// phase and creates or updates the Slack thread reply as needed.
189+
func (tr *SlackTaskReporter) ReportTaskStatus(ctx context.Context, task *kelosv1alpha1.Task) error {
190+
log := ctrl.Log.WithName("slack-reporter")
191+
192+
annotations := task.Annotations
193+
if annotations == nil {
194+
return nil
195+
}
196+
197+
if annotations[AnnotationSlackReporting] != "enabled" {
198+
return nil
199+
}
200+
201+
channel := annotations[AnnotationSlackChannel]
202+
threadTS := annotations[AnnotationSlackThreadTS]
203+
if channel == "" || threadTS == "" {
204+
return nil
205+
}
206+
207+
var desiredPhase string
208+
switch task.Status.Phase {
209+
case kelosv1alpha1.TaskPhasePending, kelosv1alpha1.TaskPhaseRunning, kelosv1alpha1.TaskPhaseWaiting:
210+
desiredPhase = "accepted"
211+
case kelosv1alpha1.TaskPhaseSucceeded:
212+
desiredPhase = "succeeded"
213+
case kelosv1alpha1.TaskPhaseFailed:
214+
desiredPhase = "failed"
215+
default:
216+
return nil
217+
}
218+
219+
if annotations[AnnotationSlackReportPhase] == desiredPhase {
220+
return nil
221+
}
222+
223+
var body string
224+
switch desiredPhase {
225+
case "accepted":
226+
body = FormatSlackAccepted(task.Name)
227+
case "succeeded":
228+
body = FormatSlackSucceeded(task.Name)
229+
case "failed":
230+
body = FormatSlackFailed(task.Name)
231+
}
232+
233+
replyTS := annotations[AnnotationSlackReplyTS]
234+
if replyTS == "" {
235+
log.Info("Posting Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase)
236+
newTS, err := tr.Reporter.PostThreadReply(ctx, channel, threadTS, body)
237+
if err != nil {
238+
return fmt.Errorf("posting Slack reply for task %s: %w", task.Name, err)
239+
}
240+
replyTS = newTS
241+
} else {
242+
log.Info("Updating Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase)
243+
if err := tr.Reporter.UpdateMessage(ctx, channel, replyTS, body); err != nil {
244+
return fmt.Errorf("updating Slack reply for task %s: %w", task.Name, err)
245+
}
246+
}
247+
248+
if err := tr.persistSlackReportingState(ctx, task, replyTS, desiredPhase); err != nil {
249+
return err
250+
}
251+
252+
return nil
253+
}
254+
255+
func (tr *SlackTaskReporter) persistSlackReportingState(ctx context.Context, task *kelosv1alpha1.Task, replyTS, desiredPhase string) error {
256+
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
257+
var current kelosv1alpha1.Task
258+
if err := tr.Client.Get(ctx, client.ObjectKeyFromObject(task), &current); err != nil {
259+
return err
260+
}
261+
262+
if current.Annotations == nil {
263+
current.Annotations = make(map[string]string)
264+
}
265+
current.Annotations[AnnotationSlackReplyTS] = replyTS
266+
current.Annotations[AnnotationSlackReportPhase] = desiredPhase
267+
268+
if err := tr.Client.Update(ctx, &current); err != nil {
269+
return err
270+
}
271+
272+
task.Annotations = current.Annotations
273+
return nil
274+
}); err != nil {
275+
if apierrors.IsNotFound(err) {
276+
return fmt.Errorf("persisting Slack reporting annotations on task %s: task no longer exists", task.Name)
277+
}
278+
return fmt.Errorf("persisting Slack reporting annotations on task %s: %w", task.Name, err)
279+
}
280+
281+
return nil
282+
}

0 commit comments

Comments
 (0)