Deterministic multi-provider mock LLM server for testing. Streams SSE responses in real OpenAI, Claude, and Gemini API formats, driven entirely by fixtures. Zero runtime dependencies — built on Node.js builtins only.
Supports both streaming (SSE) and non-streaming JSON responses across OpenAI (Chat Completions + Responses), Anthropic Claude (Messages), and Google Gemini (GenerateContent) APIs. Text completions, tool calls, and error injection. Point any process at it via OPENAI_BASE_URL, ANTHROPIC_BASE_URL, or Gemini base URL and get reproducible, instant responses.
npm install @copilotkit/llmockMSW (Mock Service Worker) is a popular API mocking library, but it solves a different problem.
The key difference is architecture. llmock runs a real HTTP server on a port. MSW patches http/https/fetch modules inside a single Node.js process. MSW can only intercept requests from the process that calls server.listen() — child processes, separate services, and workers are unaffected.
This matters for E2E tests where multiple processes make LLM API calls:
Playwright test runner (Node)
└─ controls browser → Next.js app (separate process)
└─ OPENAI_BASE_URL → llmock :5555
├─ Mastra agent workers
├─ LangGraph workers
└─ CopilotKit runtime
MSW can't intercept any of those calls. llmock can — it's a real server on a real port.
Use llmock when:
- Multiple processes need to hit the same mock (E2E tests, agent frameworks, microservices)
- You want multi-provider SSE format out of the box (OpenAI, Claude, Gemini)
- You prefer defining fixtures as JSON files rather than code
- You need a standalone CLI server
Use MSW when:
- All API calls originate from a single Node.js process (unit tests, SDK client tests)
- You're mocking many different APIs, not just OpenAI
- You want in-process interception without running a server
| Capability | llmock | MSW |
|---|---|---|
| Cross-process interception | Yes (real server) | No (in-process only) |
| OpenAI Chat Completions SSE | Built-in | Manual — build data: {json}\n\n + [DONE] yourself |
| OpenAI Responses API SSE | Built-in | Manual — MSW's sse() sends data: events, not OpenAI's event: format |
| Claude Messages API SSE | Built-in | Manual — build event:/data: SSE yourself |
| Gemini streaming | Built-in | Manual — build data: SSE yourself |
| WebSocket APIs | Built-in | No |
| Fixture file loading (JSON) | Yes | No — handlers are code-only |
| Request journal / inspection | Yes | No — track requests manually |
| Non-streaming responses | Yes | Yes |
| Error injection (one-shot) | Yes | Yes (via server.use()) |
| CLI for standalone use | Yes | No |
| Zero dependencies | Yes | No (~300KB) |
import { LLMock } from "@copilotkit/llmock";
const mock = new LLMock({ port: 5555 });
mock.onMessage("hello", { content: "Hi there!" });
const url = await mock.start();
// Point your OpenAI client at `url` instead of https://api.openai.com
// ... run your tests ...
await mock.stop();Real-world patterns from using llmock in Playwright E2E tests with CopilotKit, Mastra, LangGraph, and Agno agent frameworks.
Start the mock server once for the entire test suite. All child processes (Next.js, agent workers) inherit the URL via environment variable.
// e2e/llmock-setup.ts
import { LLMock } from "@copilotkit/llmock";
import * as path from "node:path";
let mockServer: LLMock | null = null;
export async function setupLLMock(): Promise<void> {
mockServer = new LLMock({ port: 5555 });
// Load JSON fixtures from a directory
mockServer.loadFixtureDir(path.join(__dirname, "fixtures", "openai"));
const url = await mockServer.start();
// Child processes use this to find the mock
process.env.LLMOCK_URL = `${url}/v1`;
}
export async function teardownLLMock(): Promise<void> {
if (mockServer) {
await mockServer.stop();
mockServer = null;
}
}The Next.js app (or any other service) just needs:
OPENAI_BASE_URL=http://localhost:5555/v1
OPENAI_API_KEY=mock-key
# Or for Anthropic Claude:
ANTHROPIC_BASE_URL=http://localhost:5555/v1
# Or for Google Gemini — point at the base URL:
# http://localhost:5555/v1betaDefine fixtures as JSON — one file per feature, loaded with loadFixtureFile or loadFixtureDir.
Text responses — match on a substring of the last user message:
{
"fixtures": [
{
"match": { "userMessage": "stock price of AAPL" },
"response": { "content": "The current stock price of Apple Inc. (AAPL) is $150.25." }
},
{
"match": { "userMessage": "capital of France" },
"response": { "content": "The capital of France is Paris." }
}
]
}Tool call responses — the agent framework receives these as tool calls and executes them:
{
"fixtures": [
{
"match": { "userMessage": "one step with eggs" },
"response": {
"toolCalls": [
{
"name": "generate_task_steps",
"arguments": "{\"steps\":[{\"description\":\"Crack eggs into bowl\",\"status\":\"enabled\"},{\"description\":\"Preheat oven to 350F\",\"status\":\"enabled\"}]}"
}
]
}
},
{
"match": { "userMessage": "background color to blue" },
"response": {
"toolCalls": [
{
"name": "change_background",
"arguments": "{\"background\":\"blue\"}"
}
]
}
}
]
}Fixtures are evaluated first-match-wins. When two fixtures could match the same message, load the more specific one first:
// Load HITL fixtures first — "one step with eggs" is more specific than
// "plan to make brownies" which also appears in the HITL user message
mockServer.loadFixtureFile(path.join(FIXTURES_DIR, "human-in-the-loop.json"));
// Then load everything else — earlier matches take priority
mockServer.loadFixtureDir(FIXTURES_DIR);When substring matching isn't enough — for example, when the last user message is the same across multiple requests but the system prompt differs — use predicates:
// Supervisor agent: same user message every time, but system prompt
// contains state flags like "Flights found: false"
mockServer.addFixture({
match: {
predicate: (req) => {
const sysMsg = req.messages.find((m) => m.role === "system");
return sysMsg?.content?.includes("Flights found: false") ?? false;
},
},
response: {
toolCalls: [
{
name: "supervisor_response",
arguments: '{"answer":"Let me find flights for you!","next_agent":"flights_agent"}',
},
],
},
});
mockServer.addFixture({
match: {
predicate: (req) => {
const sys = req.messages.find((m) => m.role === "system")?.content ?? "";
return sys.includes("Flights found: true") && sys.includes("Hotels found: false");
},
},
response: {
toolCalls: [
{
name: "supervisor_response",
arguments: '{"answer":"Now let me find hotels.","next_agent":"hotels_agent"}',
},
],
},
});After a tool executes, the next request contains a role: "tool" message with the result. Add a catch-all for these so the conversation can continue:
const toolResultFixture = {
match: {
predicate: (req) => {
const last = req.messages[req.messages.length - 1];
return last?.role === "tool";
},
},
response: { content: "Done! I've completed that for you." },
};
mockServer.addFixture(toolResultFixture);
// Move it to the front so it matches before substring-based fixtures
// (the last user message hasn't changed, so substring fixtures would
// match the same fixture again otherwise)
const fixtures = (mockServer as any).fixtures;
const idx = fixtures.indexOf(toolResultFixture);
if (idx > 0) {
fixtures.splice(idx, 1);
fixtures.unshift(toolResultFixture);
}Append a catch-all last to handle any request that doesn't match a specific fixture, preventing 404s from crashing the test:
mockServer.addFixture({
match: { predicate: () => true },
response: { content: "I understand. How can I help you with that?" },
});Create a new mock server instance.
| Option | Type | Default | Description |
|---|---|---|---|
port |
number |
0 (random) |
Port to listen on |
host |
string |
"127.0.0.1" |
Host to bind to |
latency |
number |
0 |
Default ms delay between SSE chunks |
chunkSize |
number |
20 |
Default characters per SSE chunk |
Static factory — creates an instance and starts it in one call. Returns Promise<LLMock>.
| Method | Returns | Description |
|---|---|---|
start() |
Promise<string> |
Start the server, returns the base URL |
stop() |
Promise<void> |
Stop the server |
url |
string |
Base URL (throws if not started) |
baseUrl |
string |
Alias for url |
port |
number |
Listening port (throws if not started) |
All registration methods return this for chaining.
Register a fixture with full control over match criteria.
mock.on({ userMessage: /weather/i, model: "gpt-4" }, { content: "It's sunny!" }, { latency: 50 });Shorthand — matches on the last user message.
mock.onMessage("hello", { content: "Hi!" });
mock.onMessage(/greet/i, { content: "Hey there!" });Shorthand — matches when the request contains a tool with the given name.
mock.onToolCall("get_weather", {
toolCalls: [{ name: "get_weather", arguments: '{"location":"SF"}' }],
});Shorthand — matches when a tool result message has the given tool_call_id.
mock.onToolResult("call_abc123", { content: "Temperature is 72F" });Add raw Fixture objects directly (appended to the end of the list).
Insert a fixture at the front of the list so it matches before all existing fixtures. Useful for catch-all predicates that must fire before substring-based fixtures.
mock.prependFixture({
match: { predicate: (req) => req.messages.at(-1)?.role === "tool" },
response: { content: "Done!" },
});Returns a readonly Fixture[] view of all registered fixtures. Useful for
debugging and logging fixture statistics without accessing private internals.
const fixtures = mock.getFixtures();
console.log(`${fixtures.length} fixtures loaded`);Load fixtures from JSON files on disk. See Fixture Files above.
Remove all registered fixtures.
Queue a one-shot error for the very next request. The error fires once, then auto-removes itself.
mock.nextRequestError(429, {
message: "Rate limited",
type: "rate_limit_error",
});
// Next request → 429 error
// Subsequent requests → normal fixture matchingEvery request to all API endpoints (/v1/chat/completions, /v1/responses, /v1/messages, and Gemini endpoints) is recorded in a journal.
| Method | Returns | Description |
|---|---|---|
getRequests() |
JournalEntry[] |
All recorded requests |
getLastRequest() |
JournalEntry | null |
Most recent request |
clearRequests() |
void |
Clear the journal |
journal |
Journal |
Direct access to the journal instance |
await fetch(mock.url + "/v1/chat/completions", { ... });
const last = mock.getLastRequest();
expect(last?.body.messages).toContainEqual({
role: "user",
content: "hello",
});The server also exposes journal data over HTTP (useful in CLI mode):
GET /v1/_requests— returns all journal entries as JSON. Supports?limit=N.DELETE /v1/_requests— clears the journal. Returns 204.
Clear all fixtures and the journal in one call. Works before or after the server is started.
afterEach(() => {
mock.reset();
});Fixtures are evaluated in registration order (first match wins). A fixture matches when all specified fields match the incoming request (AND logic).
| Field | Type | Matches on |
|---|---|---|
userMessage |
string | RegExp |
Content of the last role: "user" message |
toolName |
string |
Name of a tool in the request's tools array |
toolCallId |
string |
tool_call_id on a role: "tool" message |
model |
string | RegExp |
The model field in the request |
predicate |
(req) => boolean |
Arbitrary matching function |
{
content: "Hello world";
}Streams as SSE chunks, splitting content by chunkSize. With stream: false, returns a standard chat.completion JSON object.
{
toolCalls: [{ name: "get_weather", arguments: '{"location":"SF"}' }];
}{
error: { message: "Rate limited", type: "rate_limit_error" },
status: 429
}The server handles:
- POST
/v1/chat/completions— OpenAI Chat Completions API (streaming and non-streaming) - POST
/v1/responses— OpenAI Responses API (streaming and non-streaming) - POST
/v1/messages— Anthropic Claude Messages API (streaming and non-streaming) - POST
/v1beta/models/{model}:generateContent— Google Gemini (non-streaming) - POST
/v1beta/models/{model}:streamGenerateContent— Google Gemini (streaming)
WebSocket endpoints:
- WS
/v1/responses— OpenAI Responses API over WebSocket - WS
/v1/realtime— OpenAI Realtime API (text + tool calls) - WS
/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent— Gemini Live
All endpoints share the same fixture pool — the same fixtures work across all providers. Requests are translated to a common format internally for fixture matching.
The same fixtures that drive HTTP responses also work over WebSocket transport. llmock implements RFC 6455 WebSocket framing with zero external dependencies — connect, send events, and receive streaming responses in real provider formats.
Only text and tool call paths are supported over WebSocket. Audio, video, and binary frames are not implemented.
Connect to ws://localhost:5555/v1/responses and send a response.create event. The server streams back the same events as OpenAI's real WebSocket Responses API:
Connect to ws://localhost:5555/v1/realtime. The Realtime API uses a session-based protocol — configure the session, add conversation items, then request a response:
// → Configure session:
{ "type": "session.update", "session": { "modalities": ["text"], "model": "gpt-4o-realtime" } }
// → Add a user message:
{
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{ "type": "input_text", "text": "What is the capital of France?" }]
}
}
// → Request a response:
{ "type": "response.create" }
// ← Server streams:
// {"type": "response.created", ...}
// {"type": "response.text.delta", "delta": "The"}
// {"type": "response.text.delta", "delta": " capital"}
// ...
// {"type": "response.text.done", ...}
// {"type": "response.done", ...}Connect to ws://localhost:5555/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent. Gemini Live uses a setup/content/response flow:
// → Setup message (must be first):
{ "setup": { "model": "models/gemini-2.0-flash-live", "generationConfig": { "responseModalities": ["TEXT"] } } }
// → Send user content:
{ "clientContent": { "turns": [{ "role": "user", "parts": [{ "text": "Hello" }] }], "turnComplete": true } }
// ← Server streams:
// {"setupComplete": {}}
// {"serverContent": {"modelTurnComplete": false, "parts": [{"text": "Hello"}]}}
// {"serverContent": {"modelTurnComplete": true}}The package includes a standalone server binary:
llmock [options]| Option | Short | Default | Description |
|---|---|---|---|
--port |
-p |
4010 |
Port to listen on |
--host |
-h |
127.0.0.1 |
Host to bind to |
--fixtures |
-f |
./fixtures |
Path to fixtures directory or file |
--latency |
-l |
0 |
Latency between SSE chunks (ms) |
--chunk-size |
-c |
20 |
Characters per SSE chunk |
--help |
Show help |
# Start with bundled example fixtures
llmock
# Custom fixtures on a specific port
llmock -p 8080 -f ./my-fixtures
# Simulate slow responses
llmock --latency 100 --chunk-size 5If you need the raw HTTP server without the LLMock wrapper:
import { createServer } from "@copilotkit/llmock";
const fixtures = [{ match: { userMessage: "hi" }, response: { content: "Hello!" } }];
const { server, journal, url } = await createServer(fixtures, { port: 0 });
// ... use it ...
server.close();mock.on({ userMessage: "slow" }, { content: "Finally..." }, { latency: 200, chunkSize: 5 });llmock ships with a Claude Code skill that teaches your AI assistant how to write fixtures correctly — match fields, response types, agent loop patterns, gotchas, and debugging techniques. Available as the /write-fixtures slash command.
# Add the marketplace (one time)
/plugin marketplace add CopilotKit/llmock
# Install the plugin
/plugin install llmock@copilotkit-toolsThe skill appears as /llmock:write-fixtures.
claude --plugin-dir ./node_modules/@copilotkit/llmockSame result, no marketplace needed. Good for trying it out.
claude --add-dir ./node_modules/@copilotkit/llmockThe skill appears as /write-fixtures for the session.
mkdir -p .claude/commands
cp node_modules/@copilotkit/llmock/.claude/commands/write-fixtures.md .claude/commands/Permanently available as /write-fixtures in your project. Commit to share with your team.
Areas where llmock could grow, and explicit non-goals for the current scope.
- Audio and multimodal: OpenAI Realtime API audio buffers, voice activity detection, and audio transcription are not implemented. Gemini Live audio/video input and output are similarly out of scope. Only text and tool call paths are supported over WebSocket.
- Binary WebSocket frames: Only text frames are processed; binary frames are silently ignored.
- WebSocket compression:
permessage-deflateis not supported. - Session persistence: Realtime and Gemini Live sessions exist only for the lifetime of a single WebSocket connection. There is no cross-connection session resumption.
- Request metadata in predicates: Predicate functions receive only the
ChatCompletionRequest, not HTTP headers, method, or URL. - Multi-turn conversation state: Fixtures are stateless — there is no built-in way to sequence responses across multiple requests in a conversation.
- Validation on load: Fixture files are not schema-validated at load time; malformed fixtures surface as runtime errors.
- Inheritance and aliasing: No
$reforextendsmechanism for fixture reuse across files.
- Live API conformance: The
api-conformancetests validate response format structure but do not run against real LLM APIs. A subset of tests that hit actual OpenAI/Anthropic/Gemini endpoints (gated behind API keys) would catch format drift as providers evolve their APIs. - Token counts: Usage fields are always zero across all providers.
- Vision/image content: Image content parts are not handled by any provider.
--watchmode: No file-watching to auto-reload fixtures on change.--log-level: No configurable log verbosity.--validate-on-load: No flag to validate fixture schemas at startup.
CopilotKit uses llmock across its test suite to verify AI agent behavior across multiple LLM providers without hitting real APIs. The tests cover streaming text, tool calls, and multi-turn conversations across both v1 and v2 runtimes.
See the CopilotKit test suite for real-world examples of llmock in action.
MIT