Skip to content
This repository was archived by the owner on Apr 12, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/llm/langchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ Thought:{{.agent_scratchpad}}
`, historyBuilder.String())),
)

e := agents.NewExecutor(ag, agents.WithMaxIterations(maxAgentIterations))
e := agents.NewExecutor(ag,
agents.WithMaxIterations(maxAgentIterations),
agents.WithParserErrorHandler(agents.NewParserErrorHandler(func(s string) string {
return "Your previous response could not be parsed. Remember: if you want to use a tool, respond ONLY with the Action/Action Input format. If you want to give a final answer, respond ONLY with the AI: format. Do not mix both. Your unparseable response was: " + s
})),
)

call, err := e.Call(ctx, map[string]any{
"input": prompt,
Expand Down
8 changes: 7 additions & 1 deletion internal/mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/mark3labs/mcp-go/client"
mcptransport "github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/mcp"

customErrors "github.com/tuannvm/slack-mcp-client/internal/common/errors"
Expand Down Expand Up @@ -93,7 +94,12 @@ func NewClient(transport, addressOrCommand string, serverName string, args []str
return nil, customErrors.WrapMCPError(err, "client_start", fmt.Sprintf("Failed to start MCP client for %s", addressOrCommand))
}
case "http":
mcpClient, err = client.NewStreamableHttpClient(addressOrCommand)
// Convert resolvedHeaders map to map[string]string for streamable HTTP transport
httpHeaders := make(map[string]string)
for k, v := range resolvedHeaders {
httpHeaders[k] = v
}
mcpClient, err = client.NewStreamableHttpClient(addressOrCommand, mcptransport.WithHTTPHeaders(httpHeaders))
if err != nil {
return nil, customErrors.WrapMCPError(err, "client_creation", fmt.Sprintf("Failed to create MCP client for %s", addressOrCommand))
}
Expand Down
9 changes: 7 additions & 2 deletions internal/mcp/mcpTool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (t *ToolInfo) Call(ctx context.Context, input string) (string, error) {
var args map[string]interface{}
err := json.Unmarshal([]byte(input), &args)
if err != nil {
return "", fmt.Errorf("failed to unmarshal input: %w", err)
// Return parse error as a tool result so the LLM can see what happened
// and retry with valid JSON, rather than crashing the agent loop.
return fmt.Sprintf("[Tool Error] Invalid JSON input for %s: %v. Your Action Input must be a single-line valid JSON object with no trailing text.", t.Name(), err), nil
}

isError := "false"
Expand All @@ -47,7 +49,10 @@ func (t *ToolInfo) Call(ctx context.Context, input string) (string, error) {
res, err := t.Client.CallTool(ctx, t.Name(), args)
if err != nil {
isError = "true"
return "", fmt.Errorf("while calling tool %s: %w", t.Name(), err)
// Return error as a tool result instead of a Go error so the LLM agent
// can see what happened and retry or use a different approach, rather
// than crashing the entire agent loop.
return fmt.Sprintf("[Tool Error] %s failed: %v", t.Name(), err), nil
}

return res, nil
Expand Down
33 changes: 29 additions & 4 deletions internal/slack/agentCallbackHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package slackbot

import (
"context"
"strings"

"github.com/tmc/langchaingo/callbacks"
)

Expand All @@ -13,9 +15,32 @@ type agentCallbackHandler struct {
}

func (handler *agentCallbackHandler) HandleChainEnd(_ context.Context, outputs map[string]any) {
if text, ok := outputs["text"]; ok {
if textStr, ok := text.(string); ok {
handler.sendMessage(textStr)
}
text, ok := outputs["text"]
if !ok {
return
}
textStr, ok := text.(string)
if !ok {
return
}

// Only send the final answer to the user.
// Intermediate agent steps contain "Action:" and "Action Input:" lines
// which are internal reasoning and should not be shown.
// The final answer uses the format:
// Thought: Do I need to use a tool? No
// AI: <actual response>
if strings.Contains(textStr, "Action:") && strings.Contains(textStr, "Action Input:") {
// This is an intermediate tool-calling step — suppress it
return
}

// Strip the ReAct scaffolding prefix from the final answer
if idx := strings.Index(textStr, "AI:"); idx != -1 {
textStr = strings.TrimSpace(textStr[idx+3:])
}

if textStr != "" {
handler.sendMessage(textStr)
}
}
117 changes: 113 additions & 4 deletions internal/slack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"strings"
"time"

"github.com/slack-go/slack"
"github.com/slack-go/slack/slackevents"
"github.com/slack-go/slack/socketmode"

Expand Down Expand Up @@ -37,6 +41,36 @@
historyLimit int
discoveredTools map[string]mcp.ToolInfo
tracingHandler observability.TracingHandler
activeThreads map[string]bool // channel:threadTS keys where bot has been mentioned
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unbounded growth of activeThreads map — entries are added but never evicted.

Every AppMentionEvent inserts a key into activeThreads, but there is no expiration, size cap, or cleanup path. For a long-running bot in an active workspace this map will grow indefinitely, leaking memory.

Consider one of:

  • A TTL-based eviction (e.g., remove entries older than N hours using a timestamp value instead of bool).
  • A bounded LRU / ring buffer.
  • Periodic pruning in a background goroutine.
Example: store timestamps and add a simple pruning helper
-	activeThreads   map[string]bool // channel:threadTS keys where bot has been mentioned
+	activeThreads   map[string]time.Time // channel:threadTS → last-activity time
-		c.activeThreads[threadKey] = true
+		c.activeThreads[threadKey] = time.Now()
-			isActiveThread = c.activeThreads[threadKey]
+			if ts, ok := c.activeThreads[threadKey]; ok && time.Since(ts) < 24*time.Hour {
+				isActiveThread = true
+			}

And periodically (or lazily) prune stale entries.

Also applies to: 269-273

🤖 Prompt for AI Agents
In `@internal/slack/client.go` at line 40, activeThreads currently uses
map[string]bool and is never evicted, causing unbounded growth; change it to
store timestamps (e.g., map[string]time.Time or map[string]int64) instead of
bool, update the code paths that insert keys (the AppMentionEvent handling where
entries are added) to write time.Now(), and add a pruning strategy — either a
simple TTL-based prune helper that removes entries older than N (called
periodically via a background goroutine or lazily on writes/reads), or replace
with a bounded LRU structure; ensure all access to activeThreads is protected by
the existing mutex (or add one) when reading/writing to avoid races.

}

// thinkingMessages is the pool of random "working on it" indicators.
var thinkingMessages = []string{
"On it...",
"Digging in...",
"One sec...",
"Let me check...",
"Pulling that up...",
"Looking into it...",
"Crunching...",
"Searching...",
"Firing up the neurons...",
"Brb, asking the data...",
}

// randomThinkingMessage returns a random thinking indicator.
func randomThinkingMessage() string {
return thinkingMessages[rand.Intn(len(thinkingMessages))]
}

// isThinkingMessage checks if a message is any of the known thinking indicators.
func isThinkingMessage(text string) bool {
for _, msg := range thinkingMessages {
if text == msg {
return true
}
}
return false
}

// Message represents a message in the conversation history
Expand Down Expand Up @@ -201,6 +235,7 @@
historyLimit: cfg.Slack.MessageHistory, // Store configured number of messages per channel
discoveredTools: discoveredTools,
tracingHandler: tracingHandler,
activeThreads: make(map[string]bool),
}, nil
}

Expand Down Expand Up @@ -229,6 +264,14 @@
c.logger.Warn("Connection failed. Retrying...")
case socketmode.EventTypeConnected:
c.logger.Info("Connected to Slack!")
case socketmode.EventTypeSlashCommand:
cmd, ok := evt.Data.(slack.SlashCommand)
if !ok {
c.logger.WarnKV("Ignored unexpected SlashCommand event type", "type", fmt.Sprintf("%T", evt.Data))
continue
}
c.logger.InfoKV("Received slash command", "command", cmd.Command, "user", cmd.UserID, "channel", cmd.ChannelID)
go c.handleSlashCommand(cmd, evt)
case socketmode.EventTypeEventsAPI:
eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent)
if !ok {
Expand All @@ -245,6 +288,55 @@
c.logger.Info("Slack event channel closed.")
}

// handleSlashCommand handles incoming slash commands via Socket Mode.
func (c *Client) handleSlashCommand(cmd slack.SlashCommand, evt socketmode.Event) {
switch cmd.Command {
case "/usage":
// Fetch status from the sidecar status-server running on localhost:8081
resp, err := http.Get("http://localhost:8081/slack/status")
if err != nil {
c.logger.ErrorKV("Failed to reach status server", "error", err)
c.userFrontend.Ack(*evt.Request, map[string]interface{}{
"response_type": "ephemeral",
"text": fmt.Sprintf("❌ Status server unreachable: %v", err),
})
return
}
defer resp.Body.Close()

Check failure on line 305 in internal/slack/client.go

View workflow job for this annotation

GitHub Actions / Code Quality

Error return value of `resp.Body.Close` is not checked (errcheck)

body, err := io.ReadAll(resp.Body)
if err != nil {
c.logger.ErrorKV("Failed to read status response", "error", err)
c.userFrontend.Ack(*evt.Request, map[string]interface{}{
"response_type": "ephemeral",
"text": "❌ Failed to read status response",
})
return
}

// Parse the Block Kit JSON from the status server
var statusResponse map[string]interface{}
if err := json.Unmarshal(body, &statusResponse); err != nil {
c.logger.ErrorKV("Failed to parse status response", "error", err)
c.userFrontend.Ack(*evt.Request, map[string]interface{}{
"response_type": "ephemeral",
"text": "❌ Failed to parse status response",
})
return
}

c.logger.Info("Returning status response to Slack")
c.userFrontend.Ack(*evt.Request, statusResponse)

default:
c.logger.WarnKV("Unknown slash command", "command", cmd.Command)
c.userFrontend.Ack(*evt.Request, map[string]interface{}{
"response_type": "ephemeral",
"text": fmt.Sprintf("Unknown command: %s", cmd.Command),
})
}
}

// handleEventMessage processes specific EventsAPI messages.
func (c *Client) handleEventMessage(event slackevents.EventsAPIEvent) {
switch event.Type {
Expand All @@ -264,6 +356,12 @@
if parentTS == "" {
parentTS = ev.TimeStamp // Use the original message timestamp if no thread
}

// Track this thread so the bot stays engaged for follow-up messages
threadKey := fmt.Sprintf("%s:%s", ev.Channel, parentTS)
c.activeThreads[threadKey] = true
c.logger.InfoKV("Tracking active thread", "channel", ev.Channel, "threadTS", parentTS)

// Use handleUserPrompt for app mentions too, for consistency
go c.handleUserPrompt(strings.TrimSpace(messageText), ev.Channel, parentTS, ev.TimeStamp, profile)

Expand All @@ -273,8 +371,19 @@
isNotEdited := ev.SubType != "message_changed"
isBot := ev.BotID != "" || ev.SubType == "bot_message"

if isDirectMessage && isValidUser && isNotEdited && !isBot {
c.logger.InfoKV("Received direct message in channel", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp)
// Check if this is a reply in a thread the bot is already participating in
isActiveThread := false
if ev.ThreadTimeStamp != "" {
threadKey := fmt.Sprintf("%s:%s", ev.Channel, ev.ThreadTimeStamp)
isActiveThread = c.activeThreads[threadKey]
}

if (isDirectMessage || isActiveThread) && isValidUser && isNotEdited && !isBot {
if isActiveThread {
c.logger.InfoKV("Received thread follow-up (no @mention needed)", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp)
} else {
c.logger.InfoKV("Received direct message in channel", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp)
}
profile, err := c.userFrontend.GetUserInfo(ev.User)
if err != nil {
c.logger.WarnKV("Failed to get user info", "user", ev.User, "error", err)
Expand Down Expand Up @@ -449,8 +558,8 @@

c.addToHistory(channelID, threadTS, timestamp, "user", userPrompt, profile.userId, profile.realName, profile.email) // Add user message to history

// Show a temporary "typing" indicator
c.userFrontend.SendMessage(channelID, threadTS, c.cfg.Slack.ThinkingMessage)
// Show a temporary "typing" indicator (randomized)
c.userFrontend.SendMessage(channelID, threadTS, randomThinkingMessage())

if !c.cfg.LLM.UseAgent {
// Prepare the final prompt with custom prompt as system instruction
Expand Down
2 changes: 1 addition & 1 deletion internal/slack/userFrontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (slackClient *SlackClient) SendMessage(channelID, threadTS, text string) {
history, err := slackClient.GetThreadReplies(channelID, threadTS)
if err == nil && history != nil {
for _, msg := range history {
if slackClient.IsBotUser(msg.User) && msg.Text == slackClient.thinkingMessage {
if slackClient.IsBotUser(msg.User) && isThinkingMessage(msg.Text) {
_, _, err := slackClient.DeleteMessage(channelID, msg.Timestamp)
if err != nil {
slackClient.logger.ErrorKV("Error deleting typing indicator message", "error", err)
Expand Down
Loading