Skip to content

Latest commit

 

History

History
316 lines (220 loc) · 14.1 KB

File metadata and controls

316 lines (220 loc) · 14.1 KB

channel — Channel Interface, Implementations, and Session Routing

Package path: offdev/micro-agent-go/internal/channel, offdev/micro-agent-go/internal/channel/cli, offdev/micro-agent-go/internal/channel/telegram, offdev/micro-agent-go/internal/channel/cron, offdev/micro-agent-go/internal/channel/http
Last updated: 2026-04-12


Overview

A Channel is any input/output surface that can exchange messages with the agent runtime. Adding a new channel (Slack, Discord, gRPC, HTTP) requires only implementing the Channel interface.

All channels fan messages into a shared goroutine pool in internal/app. The SessionRouter prevents cross-channel session collisions by prefixing session keys with the channel ID.


Channel Interface

type StreamChunk struct {
    Content  string
    Thinking bool  // true = reasoning (display only); false = main reply
}

type Channel interface {
    ID() string
    Receive(ctx context.Context) (*InboundMessage, error)
    Send(ctx context.Context, msg *OutboundMessage) error
    StreamChunks(ctx context.Context, chunks <-chan StreamChunk) error
    Close() error
}

InboundMessage / OutboundMessage

type Attachment struct {
    Filename string // original basename when known
    MIME     string // reported content type when known (may be empty)
    Text     string // UTF-8 text extracted for documents; empty when only Image is set
    Image    []byte // raw JPEG/PNG/GIF/WebP for vision models (optional)
}

type InboundMessage struct {
    SessionID   string
    Content     string
    Attachments []Attachment // optional; see “Document attachments” below
}

type OutboundMessage struct {
    SessionID string
    Content   string
    ParseMode string // optional channel-specific formatting; Telegram ignores it and chooses parse_mode from content per message
}

UserTextForModel(m *InboundMessage) builds the text transcript for the user turn: non-empty Content first, then each image attachment as ### Attached image: <name> (<mime>), then each document as ### Attached file: <name> (<mime>)\n\n<text>. Empty document bodies are omitted.

InboundHasUserInput(m) is true when Content, any attachment Text, or any attachment Image is non-empty. The multi-channel loop in internal/app skips a turn only when this is false (so image-only turns are accepted).

UserCoreMessage(m) returns a core.Message with RoleUser, Content from UserTextForModel, and Images populated from attachment Image bytes for the LLM provider (OpenAI-style multimodal content with image_url data URLs).


Document and image attachments

Channels may supply document attachments (text extracted from files such as .txt, .pdf, and common source/markup extensions) and/or image attachments (raw raster bytes for vision). Extraction lives in internal/attachments (ExtractText for text; ImageMIME for JPEG/PNG/GIF/WebP sniffing and image/* hints). Raw input is capped at 10 MiB per file (attachments.MaxDocumentBytes).

CLI — Queue paths with /attach <path> (quote paths that contain spaces). The next non-command line sends that text together with all successfully loaded pending attachments; failures are printed to stderr and skipped. Pending paths are cleared after that send. Image files are sent as vision input (not OCR).

Telegram — Inbound message.photo uses the largest PhotoSize, getFile, and download; images are attached as vision bytes (same underlying file object flow as InputMediaPhoto when sending photos). Inbound message.document: if the payload is a supported image, it is treated as vision; otherwise it is passed through ExtractText. Optional caption (and text, if present) are merged into Content. If download or extraction fails, a normal user message with an error explanation is enqueued instead of dropping the update.

HTTPPOST /api/chat accepts either JSON or multipart/form-data:

  • JSON (default for requests without file bodies): session_id, message (may be empty if attachments is non-empty), optional attachments: [{ "filename", "data" (standard base64), "mime_type" }].
  • Multipart: fields session_id, message (optional if at least one file is present), and one or more file parts named file. Total upload size is bounded by the channel’s multipart limit (8 MiB).

The embedded UI sends multipart when the user selects files; otherwise it sends JSON. Optional auth: header X-UA-Token when HTTP_CHANNEL_TOKEN is set; the page can store a token in localStorage.ua_http_token.


SessionRouter (internal/channel)

SessionRouter maps (channelID, sessionID) pairs to compound session keys, preventing cross-channel session collisions when multiple channels share a single SessionStore.

router := &channel.SessionRouter{}
key := router.Key("telegram", "12345678") // "telegram:12345678"
key := router.Key("cli", "cli")           // "cli:cli"

The runMultiChannel loop in internal/app calls router.Key(ch.ID(), msg.SessionID) for every inbound message before looking up or creating the conversation in the SessionStore.


CLI Channel (internal/channel/cli)

A readline-based interactive terminal channel. Single session ID ("cli").

ch, err := cli.New("ua> ")
defer ch.Close()

msg, err := ch.Receive(ctx)   // blocks until user hits Enter; returns context. Canceled on ^C / EOF
ch.Send(ctx, &channel.OutboundMessage{Content: "Hello!"})
ch.StreamChunks(ctx, chunkChan)

History: Input history is persisted to ~/.micro-agent/history (readline default).

Interrupt: ^C returns context.Canceled from Receive. EOF does the same.

StreamChunks: Writes each chunk to stdout as it arrives. When StreamChunk.Thinking is true, text is dim grey (256-color 240); otherwise light grey (244). Two line breaks are inserted after the thinking segment ends and before the main reply. Thinking is display-only and is not stored in session history. After a run completes, internal/app skips Send for the CLI channel because the reply was already streamed.

Attachments: Use /attach <path> to queue files for the next message (see “Document attachments”).


Telegram Channel (internal/channel/telegram)

A long-polling Telegram Bot API channel. Session IDs are the numeric Telegram chat ID as a string.

ch, err := telegram.New(os.Getenv("TELEGRAM_BOT_TOKEN"))
defer ch.Close()

Polling: A background goroutine calls getUpdates with timeout=30 (long-polling). The dedicated http.Client has a 35-second timeout to avoid premature cancellation. On transient errors the goroutine waits 5 seconds before retrying. The update offset is advanced after each successful response to prevent re-delivery.

Session ID: Set to strconv.FormatInt(chat.id, 10) from the incoming update.

Send: Calls POST https://api.telegram.org/bot<token>/sendMessage. Content is truncated to 4096 Unicode code points (Telegram's limit) if necessary. parse_mode is chosen per message from the content (HTML if tags are present, else Markdown if markdown patterns are present, else plain). Because Telegram legacy Markdown uses single * for bold and does not support ATX headings (##), content that contains **bold** or #/## headings is converted to HTML (e.g. <b>…</b>) and sent with parse_mode HTML so it renders correctly. OutboundMessage.ParseMode is ignored. Content is sent as-is except when that conversion is applied (then it is escaped for HTML).

SendTyping: Calls sendChatAction with action typing for the given session (chat) ID.

StreamChunks: Drains the chunk stream and, while StreamChunk.Thinking is true, sends "typing" chat actions at most once every 4 seconds. The session ID must be provided via context.WithValue(ctx, telegram.SessionIDKey, sessionID) so typing is sent to the correct chat. The app wires Telegram this way and feeds deltas (including thinking) into the chunk channel; the final assistant reply is always sent via Send after the run completes.

Buffer: Inbound updates are buffered (capacity 64). When the buffer is full, updates are dropped rather than blocking the poll loop.

Documents: User-sent files (message.document) are downloaded and converted to text attachments (see “Document attachments”). Plain text-only updates behave as before.

Close: Cancels the polling goroutine context; all outstanding Receive calls unblock.

Environment variables:

Variable Description
TELEGRAM_BOT_TOKEN Bot token; required to enable the channel

Cron / Heartbeat Channel (internal/channel/cron)

A periodic heartbeat channel for autonomous, headless agent operation. Fires a synthetic InboundMessage at a fixed interval.

ch, err := cron.New(
    "/path/to/mem.db",   // SQLite DB path for tick state persistence
    heartbeatContent,    // message content emitted on each tick
    time.Minute,         // tick interval; 0 defaults to 1 minute
)
defer ch.Close()

Session ID: Fixed as "cron". With SessionRouter, the full key is "cron:cron".

Single-flight guard: If the previous tick's handler goroutine is still running when the next tick fires, the new tick is dropped rather than spawning a concurrent handler. Uses sync.Mutex.

Persistence: Tick state is stored in a cron_ticks SQLite table:

CREATE TABLE cron_ticks (
    schedule  TEXT    PRIMARY KEY,
    last_tick INTEGER NOT NULL DEFAULT 0
);

On each tick, the channel truncates the current time to the interval boundary and checks whether last_tick >= tickTime. If already recorded, the tick is skipped. This prevents re-execution after a process restart.

Send / StreamChunks: Send is a no-op; StreamChunks drains the chunk channel without writing anywhere (the cron channel has no human recipient for streamed text).

Configuration: Channel-related options are resolved by internal/config (nested JSON + env; see docs/config.md). Environment variables override file values. Used when building channels in internal/app:

Variable Description
CRON_ENABLED true to enable the cron channel (default: false)
HEARTBEAT_FILE Path to the heartbeat content file (default: HEARTBEAT.md in cwd)
CRON_INTERVAL Tick interval as a Go duration string, e.g. "5m" (default: "1m")

Heartbeat content: When HEARTBEAT_FILE is set (or HEARTBEAT.md exists in cwd), its content is used as the tick message. Otherwise a built-in fallback prompt is used.


HTTP Channel + Embedded UI (internal/channel/http)

An HTTP channel that serves an embedded chat page and accepts chat turns via POST /api/chat. Session IDs are client-provided (browser stores a UUID in localStorage).

ch, err := httpchannel.New("127.0.0.1:8765", "")
defer ch.Close()

Endpoints:

Endpoint Description
GET / Embedded chat UI
POST /api/chat Accepts one chat turn (JSON or multipart; see below) and streams SSE response

POST /api/chat — JSON (Content-Type: application/json, body limit 1 MiB):

Either message or attachments (or both) must be non-empty.

{
  "session_id": "client-generated-uuid",
  "message": "Summarize this.",
  "attachments": [
    {
      "filename": "notes.txt",
      "data": "<base64-encoded raw file bytes>",
      "mime_type": "text/plain"
    }
  ]
}

POST /api/chat — multipart (Content-Type: multipart/form-data, memory limit 8 MiB):

  • session_id (required)
  • message (optional if at least one file is uploaded)
  • file — repeat the part name for multiple files

Streaming protocol (SSE):

  • event: thinking + data: ... for reasoning chunks
  • event: delta + data: ... for assistant reply chunks
  • event: error + data: ... on turn failures/timeouts
  • event: done when the turn is complete

Session routing: internal/app sets context.WithValue(ctx, httpchannel.SessionIDKey, msg.SessionID) so StreamChunks and Send can write to the correct in-flight request.

Single-flight per session: POST /api/chat uses a per-session mutex; only one in-flight turn is allowed for the same session_id.

Final Send behavior: Like CLI, HTTP visible output is streamed via StreamChunks; the final assistant blob is not sent again via Send to avoid duplicate text. Send is still used to finalize turns and emit event: error when needed.

Security: By default it should bind to loopback (127.0.0.1). If HTTP_CHANNEL_TOKEN is set, requests must include matching X-UA-Token.


Runtime Modes

Mode is determined in cmd/ua (parseMode) and passed to internal/app.Run. Interactive vs daemon controls whether the CLI channel is started and where log output goes.

Interactive (default when stdin is a TTY, or --interactive)

Starts the CLI channel plus any configured background channels (Telegram, cron, HTTP). All channels fan messages into the shared goroutine pool.

Daemon (--daemon, or when stdin is not a TTY)

No CLI channel is started. Log output goes to both os.Stderr and the configured log file simultaneously via io.MultiWriter. Designed for systemd or container deployments.

Concurrency Model

Each channel runs its own Receive goroutine in internal/app. Messages are forwarded to a shared chan inboundEnvelope (buffered at workerPoolSize * 2). A semaphore limits concurrent agent invocations to workerPoolSize = 8. Context cancellation propagates into all goroutines for clean shutdown on SIGINT/SIGTERM.


Adding a Custom Channel

type MyChannel struct { /* ... */ }

func (c *MyChannel) ID() string { return "my-channel" }
func (c *MyChannel) Receive(ctx context.Context) (*channel.InboundMessage, error) { /* ... */ }
func (c *MyChannel) Send(ctx context.Context, msg *channel.OutboundMessage) error { /* ... */ }
func (c *MyChannel) StreamChunks(ctx context.Context, chunks <-chan channel.StreamChunk) error { /* ... */ }
func (c *MyChannel) Close() error { /* ... */ }

Register it in internal/app/app.go's Run() (or buildChannels) alongside the existing channels.