diff --git a/internal/llm/langchain.go b/internal/llm/langchain.go index 3bd2848a..2411185e 100644 --- a/internal/llm/langchain.go +++ b/internal/llm/langchain.go @@ -244,7 +244,12 @@ Thought:{{.agent_scratchpad}} `, historyBuilder.String())), ) - e := agents.NewExecutor(ag, agents.WithMaxIterations(maxAgentIterations)) + e := agents.NewExecutor(ag, + agents.WithMaxIterations(maxAgentIterations), + agents.WithParserErrorHandler(agents.NewParserErrorHandler(func(s string) string { + return "Your previous response could not be parsed. Remember: if you want to use a tool, respond ONLY with the Action/Action Input format. If you want to give a final answer, respond ONLY with the AI: format. Do not mix both. Your unparseable response was: " + s + })), + ) call, err := e.Call(ctx, map[string]any{ "input": prompt, diff --git a/internal/mcp/client.go b/internal/mcp/client.go index 6c674924..e6587cb2 100644 --- a/internal/mcp/client.go +++ b/internal/mcp/client.go @@ -13,6 +13,7 @@ import ( "time" "github.com/mark3labs/mcp-go/client" + mcptransport "github.com/mark3labs/mcp-go/client/transport" "github.com/mark3labs/mcp-go/mcp" customErrors "github.com/tuannvm/slack-mcp-client/internal/common/errors" @@ -93,7 +94,12 @@ func NewClient(transport, addressOrCommand string, serverName string, args []str return nil, customErrors.WrapMCPError(err, "client_start", fmt.Sprintf("Failed to start MCP client for %s", addressOrCommand)) } case "http": - mcpClient, err = client.NewStreamableHttpClient(addressOrCommand) + // Convert resolvedHeaders map to map[string]string for streamable HTTP transport + httpHeaders := make(map[string]string) + for k, v := range resolvedHeaders { + httpHeaders[k] = v + } + mcpClient, err = client.NewStreamableHttpClient(addressOrCommand, mcptransport.WithHTTPHeaders(httpHeaders)) if err != nil { return nil, customErrors.WrapMCPError(err, "client_creation", fmt.Sprintf("Failed to create MCP client for %s", addressOrCommand)) } diff --git a/internal/mcp/mcpTool.go b/internal/mcp/mcpTool.go index 411d9d24..cbe532cc 100644 --- a/internal/mcp/mcpTool.go +++ b/internal/mcp/mcpTool.go @@ -32,7 +32,9 @@ func (t *ToolInfo) Call(ctx context.Context, input string) (string, error) { var args map[string]interface{} err := json.Unmarshal([]byte(input), &args) if err != nil { - return "", fmt.Errorf("failed to unmarshal input: %w", err) + // Return parse error as a tool result so the LLM can see what happened + // and retry with valid JSON, rather than crashing the agent loop. + return fmt.Sprintf("[Tool Error] Invalid JSON input for %s: %v. Your Action Input must be a single-line valid JSON object with no trailing text.", t.Name(), err), nil } isError := "false" @@ -47,7 +49,10 @@ func (t *ToolInfo) Call(ctx context.Context, input string) (string, error) { res, err := t.Client.CallTool(ctx, t.Name(), args) if err != nil { isError = "true" - return "", fmt.Errorf("while calling tool %s: %w", t.Name(), err) + // Return error as a tool result instead of a Go error so the LLM agent + // can see what happened and retry or use a different approach, rather + // than crashing the entire agent loop. + return fmt.Sprintf("[Tool Error] %s failed: %v", t.Name(), err), nil } return res, nil diff --git a/internal/slack/agentCallbackHandler.go b/internal/slack/agentCallbackHandler.go index f51615b2..a84e8c12 100644 --- a/internal/slack/agentCallbackHandler.go +++ b/internal/slack/agentCallbackHandler.go @@ -2,6 +2,8 @@ package slackbot import ( "context" + "strings" + "github.com/tmc/langchaingo/callbacks" ) @@ -13,9 +15,32 @@ type agentCallbackHandler struct { } func (handler *agentCallbackHandler) HandleChainEnd(_ context.Context, outputs map[string]any) { - if text, ok := outputs["text"]; ok { - if textStr, ok := text.(string); ok { - handler.sendMessage(textStr) - } + text, ok := outputs["text"] + if !ok { + return + } + textStr, ok := text.(string) + if !ok { + return + } + + // Only send the final answer to the user. + // Intermediate agent steps contain "Action:" and "Action Input:" lines + // which are internal reasoning and should not be shown. + // The final answer uses the format: + // Thought: Do I need to use a tool? No + // AI: + if strings.Contains(textStr, "Action:") && strings.Contains(textStr, "Action Input:") { + // This is an intermediate tool-calling step — suppress it + return + } + + // Strip the ReAct scaffolding prefix from the final answer + if idx := strings.Index(textStr, "AI:"); idx != -1 { + textStr = strings.TrimSpace(textStr[idx+3:]) + } + + if textStr != "" { + handler.sendMessage(textStr) } } diff --git a/internal/slack/client.go b/internal/slack/client.go index e3ccbe00..e10d8e1e 100644 --- a/internal/slack/client.go +++ b/internal/slack/client.go @@ -6,10 +6,14 @@ import ( "context" "encoding/json" "fmt" + "io" + "math/rand" + "net/http" "os" "strings" "time" + "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" @@ -37,6 +41,36 @@ type Client struct { historyLimit int discoveredTools map[string]mcp.ToolInfo tracingHandler observability.TracingHandler + activeThreads map[string]bool // channel:threadTS keys where bot has been mentioned +} + +// thinkingMessages is the pool of random "working on it" indicators. +var thinkingMessages = []string{ + "On it...", + "Digging in...", + "One sec...", + "Let me check...", + "Pulling that up...", + "Looking into it...", + "Crunching...", + "Searching...", + "Firing up the neurons...", + "Brb, asking the data...", +} + +// randomThinkingMessage returns a random thinking indicator. +func randomThinkingMessage() string { + return thinkingMessages[rand.Intn(len(thinkingMessages))] +} + +// isThinkingMessage checks if a message is any of the known thinking indicators. +func isThinkingMessage(text string) bool { + for _, msg := range thinkingMessages { + if text == msg { + return true + } + } + return false } // Message represents a message in the conversation history @@ -201,6 +235,7 @@ func NewClient(userFrontend UserFrontend, stdLogger *logging.Logger, mcpClients historyLimit: cfg.Slack.MessageHistory, // Store configured number of messages per channel discoveredTools: discoveredTools, tracingHandler: tracingHandler, + activeThreads: make(map[string]bool), }, nil } @@ -229,6 +264,14 @@ func (c *Client) handleEvents() { c.logger.Warn("Connection failed. Retrying...") case socketmode.EventTypeConnected: c.logger.Info("Connected to Slack!") + case socketmode.EventTypeSlashCommand: + cmd, ok := evt.Data.(slack.SlashCommand) + if !ok { + c.logger.WarnKV("Ignored unexpected SlashCommand event type", "type", fmt.Sprintf("%T", evt.Data)) + continue + } + c.logger.InfoKV("Received slash command", "command", cmd.Command, "user", cmd.UserID, "channel", cmd.ChannelID) + go c.handleSlashCommand(cmd, evt) case socketmode.EventTypeEventsAPI: eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) if !ok { @@ -245,6 +288,55 @@ func (c *Client) handleEvents() { c.logger.Info("Slack event channel closed.") } +// handleSlashCommand handles incoming slash commands via Socket Mode. +func (c *Client) handleSlashCommand(cmd slack.SlashCommand, evt socketmode.Event) { + switch cmd.Command { + case "/usage": + // Fetch status from the sidecar status-server running on localhost:8081 + resp, err := http.Get("http://localhost:8081/slack/status") + if err != nil { + c.logger.ErrorKV("Failed to reach status server", "error", err) + c.userFrontend.Ack(*evt.Request, map[string]interface{}{ + "response_type": "ephemeral", + "text": fmt.Sprintf("❌ Status server unreachable: %v", err), + }) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + c.logger.ErrorKV("Failed to read status response", "error", err) + c.userFrontend.Ack(*evt.Request, map[string]interface{}{ + "response_type": "ephemeral", + "text": "❌ Failed to read status response", + }) + return + } + + // Parse the Block Kit JSON from the status server + var statusResponse map[string]interface{} + if err := json.Unmarshal(body, &statusResponse); err != nil { + c.logger.ErrorKV("Failed to parse status response", "error", err) + c.userFrontend.Ack(*evt.Request, map[string]interface{}{ + "response_type": "ephemeral", + "text": "❌ Failed to parse status response", + }) + return + } + + c.logger.Info("Returning status response to Slack") + c.userFrontend.Ack(*evt.Request, statusResponse) + + default: + c.logger.WarnKV("Unknown slash command", "command", cmd.Command) + c.userFrontend.Ack(*evt.Request, map[string]interface{}{ + "response_type": "ephemeral", + "text": fmt.Sprintf("Unknown command: %s", cmd.Command), + }) + } +} + // handleEventMessage processes specific EventsAPI messages. func (c *Client) handleEventMessage(event slackevents.EventsAPIEvent) { switch event.Type { @@ -264,6 +356,12 @@ func (c *Client) handleEventMessage(event slackevents.EventsAPIEvent) { if parentTS == "" { parentTS = ev.TimeStamp // Use the original message timestamp if no thread } + + // Track this thread so the bot stays engaged for follow-up messages + threadKey := fmt.Sprintf("%s:%s", ev.Channel, parentTS) + c.activeThreads[threadKey] = true + c.logger.InfoKV("Tracking active thread", "channel", ev.Channel, "threadTS", parentTS) + // Use handleUserPrompt for app mentions too, for consistency go c.handleUserPrompt(strings.TrimSpace(messageText), ev.Channel, parentTS, ev.TimeStamp, profile) @@ -273,8 +371,19 @@ func (c *Client) handleEventMessage(event slackevents.EventsAPIEvent) { isNotEdited := ev.SubType != "message_changed" isBot := ev.BotID != "" || ev.SubType == "bot_message" - if isDirectMessage && isValidUser && isNotEdited && !isBot { - c.logger.InfoKV("Received direct message in channel", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp) + // Check if this is a reply in a thread the bot is already participating in + isActiveThread := false + if ev.ThreadTimeStamp != "" { + threadKey := fmt.Sprintf("%s:%s", ev.Channel, ev.ThreadTimeStamp) + isActiveThread = c.activeThreads[threadKey] + } + + if (isDirectMessage || isActiveThread) && isValidUser && isNotEdited && !isBot { + if isActiveThread { + c.logger.InfoKV("Received thread follow-up (no @mention needed)", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp) + } else { + c.logger.InfoKV("Received direct message in channel", "channel", ev.Channel, "user", ev.User, "text", ev.Text, "ThreadTS", ev.ThreadTimeStamp) + } profile, err := c.userFrontend.GetUserInfo(ev.User) if err != nil { c.logger.WarnKV("Failed to get user info", "user", ev.User, "error", err) @@ -449,8 +558,8 @@ func (c *Client) handleUserPrompt(userPrompt, channelID, threadTS string, timest c.addToHistory(channelID, threadTS, timestamp, "user", userPrompt, profile.userId, profile.realName, profile.email) // Add user message to history - // Show a temporary "typing" indicator - c.userFrontend.SendMessage(channelID, threadTS, c.cfg.Slack.ThinkingMessage) + // Show a temporary "typing" indicator (randomized) + c.userFrontend.SendMessage(channelID, threadTS, randomThinkingMessage()) if !c.cfg.LLM.UseAgent { // Prepare the final prompt with custom prompt as system instruction diff --git a/internal/slack/userFrontend.go b/internal/slack/userFrontend.go index 49c6553a..fba35ab3 100644 --- a/internal/slack/userFrontend.go +++ b/internal/slack/userFrontend.go @@ -170,7 +170,7 @@ func (slackClient *SlackClient) SendMessage(channelID, threadTS, text string) { history, err := slackClient.GetThreadReplies(channelID, threadTS) if err == nil && history != nil { for _, msg := range history { - if slackClient.IsBotUser(msg.User) && msg.Text == slackClient.thinkingMessage { + if slackClient.IsBotUser(msg.User) && isThinkingMessage(msg.Text) { _, _, err := slackClient.DeleteMessage(channelID, msg.Timestamp) if err != nil { slackClient.logger.ErrorKV("Error deleting typing indicator message", "error", err)