Skip to content

Commit 22393b7

Browse files
authored
feat: add streaming HTTP transport support (#87)
- Add --transport CLI flag to select between sse and streamable-http - Add MCP_TRANSPORT environment variable support for transport selection - Create StreamableHTTPServer wrapper in pkg/mcp/server.go - Update server startup logic to use selected transport - Add documentation for transport configuration This enables MKP to work with environments like ToolHive that require HTTP-based communication. The default transport remains SSE for backward compatibility. Fixes #82
1 parent 7e1a038 commit 22393b7

File tree

3 files changed

+80
-6
lines changed

3 files changed

+80
-6
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,28 @@ The resource URIs follow these formats:
367367

368368
### Configuration
369369

370+
#### Transport Protocol
371+
372+
MKP supports two transport protocols for the MCP server:
373+
374+
- **SSE (Server-Sent Events)**: The default transport protocol, suitable for most use cases
375+
- **Streamable HTTP**: A streaming HTTP transport that supports both direct HTTP responses and SSE streams, useful for environments like ToolHive that require HTTP-based communication
376+
377+
You can configure the transport protocol using either a CLI flag or an environment variable:
378+
379+
```bash
380+
# Using CLI flag
381+
./build/mkp-server --transport=streamable-http
382+
383+
# Using environment variable
384+
MCP_TRANSPORT=streamable-http ./build/mkp-server
385+
386+
# Default (SSE)
387+
./build/mkp-server
388+
```
389+
390+
The `MCP_TRANSPORT` environment variable is automatically set by ToolHive when running MKP in that environment.
391+
370392
#### Controlling Resource Discovery
371393

372394
By default, MKP serves all Kubernetes resources as MCP resources, which provides

cmd/server/main.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,20 @@ import (
99
"os"
1010
"os/signal"
1111
"strconv"
12+
"strings"
1213
"syscall"
1314
"time"
1415

1516
"github.com/StacklokLabs/mkp/pkg/k8s"
1617
"github.com/StacklokLabs/mkp/pkg/mcp"
1718
)
1819

20+
const (
21+
// Transport types
22+
transportSSE = "sse"
23+
transportStreamableHTTP = "streamable-http"
24+
)
25+
1926
func main() {
2027
// Parse command line flags
2128
kubeconfig := flag.String("kubeconfig", "", "Path to kubeconfig file. If not provided, in-cluster config will be used")
@@ -28,6 +35,8 @@ func main() {
2835
"Interval to periodically re-read the kubeconfig (e.g., 5m for 5 minutes). If 0, no refresh will be performed")
2936
enableRateLimiting := flag.Bool("enable-rate-limiting", true,
3037
"Whether to enable rate limiting for tool calls. When false, no rate limiting will be applied")
38+
transport := flag.String("transport", getDefaultTransport(),
39+
"Transport protocol to use: 'sse' or 'streamable-http'. Can also be set via MCP_TRANSPORT environment variable")
3140

3241
flag.Parse()
3342

@@ -74,16 +83,30 @@ func main() {
7483
// Create MCP server using the helper function
7584
mcpServer := mcp.CreateServer(k8sClient, config)
7685

77-
// Create SSE server
78-
sseServer := mcp.CreateSSEServer(mcpServer)
86+
// Create and start the appropriate transport server
87+
var transportServer interface {
88+
Start(string) error
89+
Shutdown(context.Context) error
90+
}
91+
92+
switch strings.ToLower(*transport) {
93+
case transportStreamableHTTP:
94+
log.Println("Using streamable-http transport")
95+
transportServer = mcp.CreateStreamableHTTPServer(mcpServer)
96+
case transportSSE:
97+
log.Println("Using SSE transport")
98+
transportServer = mcp.CreateSSEServer(mcpServer)
99+
default:
100+
log.Fatalf("Invalid transport: %s. Must be 'sse' or 'streamable-http'", *transport)
101+
}
79102

80103
// Channel to receive server errors
81104
serverErrCh := make(chan error, 1)
82105

83106
// Start the server in a goroutine
84107
go func() {
85-
log.Printf("Starting MCP server on %s", *addr)
86-
if err := sseServer.Start(*addr); err != nil {
108+
log.Printf("Starting MCP server on %s with %s transport", *addr, *transport)
109+
if err := transportServer.Start(*addr); err != nil {
87110
log.Printf("Server error: %v", err)
88111
serverErrCh <- err
89112
}
@@ -106,8 +129,8 @@ func main() {
106129
go func() {
107130
log.Println("Initiating server shutdown...")
108131

109-
// Stop the SSE server
110-
err := sseServer.Shutdown(shutdownCtx)
132+
// Stop the transport server
133+
err := transportServer.Shutdown(shutdownCtx)
111134
if err != nil {
112135
log.Printf("Error during shutdown: %v", err)
113136
}
@@ -166,3 +189,27 @@ func getDefaultAddress() string {
166189

167190
return fmt.Sprintf(":%d", port)
168191
}
192+
193+
// getDefaultTransport returns the transport to use based on MCP_TRANSPORT environment variable.
194+
// If the environment variable is not set, returns "sse".
195+
// Valid values are "sse" and "streamable-http".
196+
func getDefaultTransport() string {
197+
defaultTransport := transportSSE
198+
199+
transportEnv := os.Getenv("MCP_TRANSPORT")
200+
if transportEnv == "" {
201+
return defaultTransport
202+
}
203+
204+
// Normalize the transport value
205+
transport := strings.ToLower(strings.TrimSpace(transportEnv))
206+
207+
// Validate the transport value
208+
if transport != transportSSE && transport != transportStreamableHTTP {
209+
log.Printf("Invalid MCP_TRANSPORT: %s, using default: %s",
210+
transportEnv, defaultTransport)
211+
return defaultTransport
212+
}
213+
214+
return transport
215+
}

pkg/mcp/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,8 @@ func StopServer() {
145145
func CreateSSEServer(mcpServer *server.MCPServer) *server.SSEServer {
146146
return server.NewSSEServer(mcpServer)
147147
}
148+
149+
// CreateStreamableHTTPServer creates a new StreamableHTTP server for the MCP server
150+
func CreateStreamableHTTPServer(mcpServer *server.MCPServer) *server.StreamableHTTPServer {
151+
return server.NewStreamableHTTPServer(mcpServer)
152+
}

0 commit comments

Comments
 (0)