Skip to content

replayio/durable-streams

 
 

Repository files navigation

Memento polaroid icon

Durable Streams

The open protocol for real-time sync to client applications

HTTP-based durable streams for streaming data reliably to web browsers, mobile apps, and native clients with offset-based resumability.

Durable Streams provides a simple, production-proven protocol for creating and consuming ordered, replayable data streams with support for catch-up reads and live tailing.

Tip

Read the Annoucing Durable Streams post on the Electric blog.

Architecture Overview

flowchart TB
    subgraph Clients["Client Applications"]
        Web["Web Browser"]
        Mobile["Mobile App"]
        Native["Native Client"]
    end

    subgraph Protocol["Durable Streams Protocol"]
        HTTP["HTTP API"]
        Offsets["Offset-based\nResumption"]
        Cache["CDN\nCaching"]
    end

    subgraph Server["Durable Streams Server"]
        Store["Durable\nStorage"]
        Registry["Stream\nRegistry"]
    end

    Web --> HTTP
    Mobile --> HTTP
    Native --> HTTP
    HTTP --> Offsets
    Offsets --> Cache
    Cache --> Store
    Store --> Registry
Loading

The Missing Primitive

Modern applications frequently need ordered, durable sequences of data that can be replayed from arbitrary points and tailed in real time. Common patterns include:

  • AI conversation streaming - Stream LLM token responses with resume capability across reconnections
  • Agentic apps - Stream tool outputs and progress events with replay and clean reconnect semantics
  • Database synchronization - Stream database changes to web, mobile, and native clients
  • Collaborative editing - Sync CRDTs and operational transforms across devices
  • Real-time updates - Push application state to clients with guaranteed delivery
  • Event sourcing - Build event-sourced architectures with client-side replay
  • Workflow execution - Stream workflow state changes with full history

While durable streams exist throughout backend infrastructure (database WALs, Kafka topics, event stores), they aren't available as a first-class primitive for client applications. There's no simple, HTTP-based durable stream that sits alongside databases and object storage as a standard cloud primitive.

WebSocket and SSE connections are easy to start, but they're fragile in practice: tabs get suspended, networks flap, devices switch, pages refresh. When that happens, you either lose in-flight data or build a bespoke backend storage and client resume protocol on top.

AI products make this painfully visible. Token streaming is the UI for chat and copilots, and agentic apps stream progress events, tool outputs, and partial results over long-running sessions. When the stream fails, the product fails—even if the model did the right thing.

Durable Streams addresses this gap. It's a minimal HTTP-based protocol for durable, offset-based streaming designed for client applications across all platforms: web browsers, mobile apps, native clients, IoT devices, and edge workers. Based on 1.5 years of production use at Electric for real-time Postgres sync, reliably delivering millions of state changes every day.

The protocol provides:

  • Universal - Works anywhere HTTP works: web browsers, mobile apps, native clients, IoT devices, edge workers
  • Simple - Built on standard HTTP with no custom protocols
  • Resumable - Offset-based reads let you resume from any point
  • Real-time - Long-poll and SSE modes for live tailing with catch-up from any offset
  • Economical - HTTP-native design leverages CDN infrastructure for efficient scaling
  • Flexible - Content-type agnostic byte streams
  • Composable - Build higher-level abstractions on top (like Electric's real-time Postgres sync engine)

Packages

This monorepo contains:

Package Description
@durable-streams/client TypeScript read-only client (smaller bundle)
@durable-streams/writer TypeScript read/write client (includes create/append/delete)
@durable-streams/server Node.js reference server implementation
@durable-streams/react React hooks for server-side state with real-time sync
@durable-streams/vite Vite plugin for SSR hydration and reducer extraction
@durable-streams/cli Command-line tool
@durable-streams/test-ui Visual web interface for testing and exploring streams
@durable-streams/conformance-tests Protocol compliance test suite
@durable-streams/benchmarks Performance benchmarking suite

Try It Out Locally

524000540-460eb79d-3970-4882-b39a-50bfd9d4c63d

Run the local server and use either the web-based Test UI or the command-line CLI:

Option 1: Test UI

# Clone and install
git clone https://github.com/durable-streams/durable-streams.git
cd durable-streams
pnpm install

# Terminal 1: Start the local server
pnpm start:dev

# Terminal 2: Launch the Test UI
cd packages/test-ui
pnpm dev

Open http://localhost:3000 to:

  • Create and manage streams with different content types (text/plain, application/json, binary)
  • Write messages with keyboard shortcuts
  • Monitor real-time stream updates
  • View the stream registry to see all active streams
  • Inspect stream metadata and content-type rendering

See the Test UI README for details.

Option 2: CLI

# Clone and install
git clone https://github.com/durable-streams/durable-streams.git
cd durable-streams
pnpm install

# Terminal 1: Start the local server
pnpm start:dev

# Terminal 2: Link the CLI globally (one-time setup)
pnpm link:dev

# Set the server URL
export STREAM_URL=http://localhost:8787

# Create a stream
durable-stream-dev create my-stream

# Terminal 3: Start reading (will show data as it arrives)
durable-stream-dev read my-stream

# Back in Terminal 2: Write data and watch it appear in Terminal 3
durable-stream-dev write my-stream "Hello, world!"
durable-stream-dev write my-stream "More data..."
echo "Piped content!" | durable-stream-dev write my-stream

See the CLI README for details.

The Test UI and CLI share the same __registry__ system stream, so streams created in one are visible in the other.

Quick Start

Read-only client (typical browser/mobile usage)

For applications that only need to read from streams:

npm install @durable-streams/client
import { DurableStream } from "@durable-streams/client"

const stream = new DurableStream({
  url: "https://your-server.com/v1/stream/my-stream",
})

// Read existing data from stream (returns immediately)
const result = await stream.read({ live: false })
console.log(new TextDecoder().decode(result.data))

Read/write client

For applications that need to create and write to streams:

npm install @durable-streams/writer
import { DurableStream } from "@durable-streams/writer"

// Create a new stream
const stream = await DurableStream.create({
  url: "https://your-server.com/v1/stream/my-stream",
  contentType: "application/json",
})

// Append data
await stream.append(JSON.stringify({ event: "user.created", userId: "123" }))
await stream.append(JSON.stringify({ event: "user.updated", userId: "123" }))

// Writer also includes all read operations
const result = await stream.read({ live: false })

Resume from an offset

// Read and save the offset
const result = await stream.read({ live: false })
const savedOffset = result.offset // Save this for later

// Resume from saved offset (catch-up mode returns immediately)
const resumed = await stream.read({ offset: savedOffset, live: false })

React Integration

The @durable-streams/react package provides React hooks for building real-time, synchronized applications with server-side state.

How It Works

sequenceDiagram
    participant Client as React Client
    participant Server as Durable Streams Server
    participant Store as Stream Storage

    Client->>Server: Subscribe to stream
    Server->>Store: Read from offset
    Store-->>Server: Events
    Server-->>Client: Initial state + offset

    Note over Client,Server: Real-time sync begins

    Client->>Server: dispatch(action)
    Server->>Store: Append event
    Store-->>Server: New offset
    Server-->>Client: Updated state
    Server-->>Client: Broadcast to other clients
Loading

Installation

npm install @durable-streams/react

useServerState - Simple State Sync

Like React's useState, but synchronized across all clients:

import { PartyProvider, useServerState } from "@durable-streams/react"

function App() {
  return (
    <PartyProvider serverUrl="http://localhost:8787/v1/stream">
      <Counter />
    </PartyProvider>
  )
}

function Counter() {
  // State is shared across all connected clients!
  const [count, setCount] = useServerState(0, "my-counter")

  return (
    <div>
      <span>{count}</span>
      <button onClick={() => setCount((c) => c + 1)}>+</button>
      <button onClick={() => setCount((c) => c - 1)}>-</button>
    </div>
  )
}

useServerReducer - Complex State with Actions

Like React's useReducer, but with server-side persistence and sync:

import { useServerReducer } from "@durable-streams/react"

type Action =
  | { type: "add"; text: string }
  | { type: "toggle"; id: number }
  | { type: "remove"; id: number }

interface State {
  todos: Array<{ id: number; text: string; done: boolean }>
  nextId: number
}

function todoReducer(state: State, action: Action): State {
  switch (action.type) {
    case "add":
      return {
        ...state,
        todos: [
          ...state.todos,
          { id: state.nextId, text: action.text, done: false },
        ],
        nextId: state.nextId + 1,
      }
    case "toggle":
      return {
        ...state,
        todos: state.todos.map((t) =>
          t.id === action.id ? { ...t, done: !t.done } : t
        ),
      }
    case "remove":
      return {
        ...state,
        todos: state.todos.filter((t) => t.id !== action.id),
      }
  }
}

function TodoList() {
  const [state, dispatch] = useServerReducer(
    todoReducer,
    { todos: [], nextId: 1 },
    "my-todos"
  )

  return (
    <div>
      {state.todos.map((todo) => (
        <div key={todo.id}>
          <input
            type="checkbox"
            checked={todo.done}
            onChange={() => dispatch({ type: "toggle", id: todo.id })}
          />
          <span>{todo.text}</span>
          <button onClick={() => dispatch({ type: "remove", id: todo.id })}>
            Delete
          </button>
        </div>
      ))}
      <button
        onClick={() => {
          const text = prompt("New todo:")
          if (text) dispatch({ type: "add", text })
        }}
      >
        Add Todo
      </button>
    </div>
  )
}

serverAtom - Jotai-style Atoms

Define state once, use anywhere:

import { serverAtom, useServerAtom } from "@durable-streams/react"

// Define the atom
const counterAtom = serverAtom({
  key: "global-counter",
  default: 0,
})

// Use in any component - all instances share the same state
function CounterDisplay() {
  const [count] = useServerAtom(counterAtom)
  return <span>Count: {count}</span>
}

function CounterButton() {
  const [, setCount] = useServerAtom(counterAtom)
  return <button onClick={() => setCount((c) => c + 1)}>Increment</button>
}

SSR Hydration with TanStack Start

For server-side rendering, the @durable-streams/react package provides utilities for SSR hydration with incremental sync caching.

SSR Data Flow

flowchart TB
    subgraph SSR["Server-Side Rendering"]
        Loader["Route Loader"]
        Cache["SSR State Cache"]
        Compute["Compute State\n(run reducer locally)"]
    end

    subgraph DS["Durable Streams Server"]
        Events["GET /events\n(with fromOffset)"]
    end

    subgraph Client["Client Hydration"]
        Hydrate["HydrationBoundary"]
        Hooks["useServerState\nuseServerReducer"]
        Subscribe["Subscribe from\nhydrated offset"]
    end

    Loader --> Cache
    Cache -->|"Cache hit"| Compute
    Cache -->|"Cache miss"| Events
    Events -->|"New events only"| Compute
    Compute --> Hydrate
    Hydrate --> Hooks
    Hooks --> Subscribe
Loading

Setup with TanStack Start

// routes/demo.tsx
import { createFileRoute } from "@tanstack/react-router"
import { createServerFn } from "@tanstack/react-start"
import {
  HydrationBoundary,
  PartyProvider,
  loadMultipleStreamsWithReducers,
  setStateReducer,
  useServerState,
  useServerReducer,
} from "@durable-streams/react"

// Define your reducer (shared between SSR and client)
function todoReducer(state, action) {
  // ... reducer logic
}

// SSR loader - runs on server, computes state locally
const loadDemoData = createServerFn({ method: "GET" }).handler(async () => {
  const serverUrl = "http://localhost:8787/v1/stream"

  // Fetch events and compute state with local reducers
  // Caching is automatic - subsequent SSR requests only fetch new events!
  return loadMultipleStreamsWithReducers(serverUrl, [
    {
      path: "demo/counter",
      reducer: setStateReducer, // Built-in for useServerState
      initialState: 0,
    },
    {
      path: "demo/todos",
      reducer: todoReducer, // Your custom reducer
      initialState: { todos: [], nextId: 1 },
      usePayloadAsAction: true, // Required for useServerReducer
    },
  ])
})

// Route component
function DemoPage() {
  const data = Route.useLoaderData()

  return (
    <PartyProvider serverUrl="http://localhost:8787/v1/stream">
      <HydrationBoundary data={data}>
        <Counter />
        <TodoList />
      </HydrationBoundary>
    </PartyProvider>
  )
}

export const Route = createFileRoute("/demo")({
  loader: loadDemoData,
  component: DemoPage,
})

Incremental Sync Caching

The SSR loader automatically caches computed state. On subsequent requests:

First Request:
┌────────────────────────────────────────────────────────┐
│  1. Check cache → empty                                │
│  2. Fetch ALL events from server                       │
│  3. Run reducer to compute state                       │
│  4. Cache state + offset                               │
│  5. Return to client                                   │
└────────────────────────────────────────────────────────┘

Subsequent Requests (no new events):
┌────────────────────────────────────────────────────────┐
│  1. Check cache → found at offset X                    │
│  2. Fetch events with fromOffset=X                     │
│  3. Server returns 0 events                            │
│  4. Return cached state directly ← FAST!               │
└────────────────────────────────────────────────────────┘

Subsequent Requests (with new events):
┌────────────────────────────────────────────────────────┐
│  1. Check cache → found at offset X                    │
│  2. Fetch events with fromOffset=X                     │
│  3. Server returns only NEW events                     │
│  4. Apply new events to cached state                   │
│  5. Update cache, return to client                     │
└────────────────────────────────────────────────────────┘

Cache Management

import { ssrStateCache } from "@durable-streams/react"

// View cache stats
console.log(ssrStateCache.stats())
// { size: 3, paths: ["demo/counter", "demo/todos", ...] }

// Clear all cached state
ssrStateCache.clear()

// Clear specific stream
ssrStateCache.delete("demo/counter")

// Disable caching for specific streams
loadStreamWithReducer({
  serverUrl: "...",
  path: "volatile/stream",
  reducer: myReducer,
  initialState: {},
  useCache: false, // Disable caching
})

Protocol in 60 Seconds

Here's the protocol in action with raw HTTP:

Create a stream:

curl -X PUT https://your-server.com/v1/stream/my-stream \
  -H "Content-Type: application/json"

Append data:

curl -X POST https://your-server.com/v1/stream/my-stream \
  -H "Content-Type: application/json" \
  -d '{"event":"user.created","userId":"123"}'

# Server returns:
# Stream-Next-Offset: abc123xyz

Read from beginning:

curl "https://your-server.com/v1/stream/my-stream?offset=-1"

# Server returns:
# Stream-Next-Offset: abc123xyz
# Cache-Control: public, max-age=60
# [response body with data]

Resume from offset:

curl "https://your-server.com/v1/stream/my-stream?offset=abc123xyz"

Live tail (long-poll):

curl "https://your-server.com/v1/stream/my-stream?offset=abc123xyz&live=long-poll"
# Waits for new data, returns when available or times out

The key headers:

  • Stream-Next-Offset - Resume point for next read (exactly-once delivery)
  • Cache-Control - Enables CDN/browser caching for historical reads
  • Content-Type - Set at stream creation, preserved for all reads

Message Framing

Durable Streams operates in two modes for handling message boundaries:

Byte Stream Mode (Default)

By default, Durable Streams is a raw byte stream with no message boundaries. When you append data, it's concatenated directly. Each read returns all bytes from your offset to the current end of the stream, but these boundaries don't align with application-level "messages."

// Append multiple messages
await stream.append("hello")
await stream.append("world")

// Read from beginning - returns all data concatenated
const result = await stream.read({ live: false })
// result.data = "helloworld" (complete stream from offset to end)

// If more data arrives and you read again from the returned offset
await stream.append("!")
const next = await stream.read({ offset: result.offset })
// next.data = "!" (complete new data from last offset to new end)

You must implement your own framing. For example, newline-delimited JSON (NDJSON):

// Write with newlines
await stream.append(JSON.stringify({ event: "user.created" }) + "\n")
await stream.append(JSON.stringify({ event: "user.updated" }) + "\n")

// Parse line by line
const text = new TextDecoder().decode(result.data)
const messages = text.split("\n").filter(Boolean).map(JSON.parse)

JSON Mode

When creating a stream with contentType: "application/json", the server guarantees message boundaries. Each read returns a complete JSON array of the messages appended since the last offset.

// Create a JSON-mode stream
const stream = await DurableStream.create({
  url: "https://your-server.com/v1/stream/my-stream",
  contentType: "application/json",
})

// Append individual JSON values
await stream.append({ event: "user.created", userId: "123" })
await stream.append({ event: "user.updated", userId: "123" })

// Read returns parsed JSON array
for await (const message of stream.json({ live: false })) {
  console.log(message)
  // { event: "user.created", userId: "123" }
  // { event: "user.updated", userId: "123" }
}

In JSON mode:

  • Each append() stores one message
  • Supports all JSON types: objects, arrays, strings, numbers, booleans, null
  • Message boundaries are preserved across reads
  • Reads return JSON arrays of all messages
  • Ideal for structured event streams

Offset Semantics

flowchart LR
    Start["-1"\nStream Start] --> A["offset_001"]
    A --> B["offset_002"]
    B --> C["offset_003"]
    C --> D["offset_004"]
    D --> End["Current\nEnd"]

    style Start fill:#e1f5fe
    style End fill:#c8e6c9
Loading

Offsets are opaque tokens that identify positions within a stream:

  • Opaque strings - Treat as black boxes; don't parse or construct them
  • Lexicographically sortable - You can compare offsets to determine ordering
  • "-1" means start - Use offset: "-1" to read from the beginning
  • Server-generated - Always use the offset value returned in responses
// Start from beginning (catch-up mode)
const result = await stream.read({ offset: "-1", live: false })

// Resume from last position (always use returned offset)
const next = await stream.read({ offset: result.offset, live: false })

The only special offset value is "-1" for stream start. All other offsets are opaque strings returned by the server—never construct or parse them yourself.

Protocol

Durable Streams is built on a simple HTTP-based protocol. See PROTOCOL.md for the complete specification.

Core operations:

Method Endpoint Description
PUT /stream/{path} Create a new stream
POST /stream/{path} Append bytes to a stream
GET /stream/{path}?offset=X Read from a stream (catch-up)
GET /stream/{path}?offset=X&live=long-poll Live tail (long-poll)
GET /stream/{path}?offset=X&live=sse Live tail (Server-Sent Events)
DELETE /stream/{path} Delete a stream
HEAD /stream/{path} Get stream metadata

Key features:

  • Exactly-once delivery guarantee with offset-based resumption
  • Opaque, lexicographically sortable offsets for resumption
  • Optional sequence numbers for writer coordination
  • TTL and expiry time support
  • Content-type preservation
  • CDN-friendly caching and request collapsing

CDN Caching

Historical reads (catch-up from known offsets) are fully cacheable at CDNs and in browsers:

# Request
GET /v1/stream/my-stream?offset=abc123

# Response
HTTP/1.1 200 OK
Cache-Control: public, max-age=60, stale-while-revalidate=300
ETag: "stream-id:abc123:xyz789"
Stream-Next-Offset: xyz789
Content-Type: application/json

[response body]

How it works:

  • Offset-based URLs - Same offset = same data, perfect for caching
  • Cache-Control - Historical data cached for 60s, stale content served during revalidation
  • ETag - Efficient revalidation for unchanged data
  • Request collapsing - Multiple clients requesting same offset collapsed to single upstream request

Performance

Durable Streams is built for production scale:

  • Low latency - Sub-15ms end-to-end delivery in production deployments
  • High concurrency - Tested with millions of concurrent clients subscribed to a single stream without degradation
  • Minimal overhead - The protocol itself adds minimal overhead; throughput scales with your infrastructure
  • Horizontal scaling - Offset-based design enables aggressive caching at CDN edges, so read-heavy workloads (common in sync and AI scenarios) scale horizontally without overwhelming origin servers

Relationship to Backend Streaming Systems

Backend streaming systems like Kafka, RabbitMQ, and Kinesis excel at server-to-server messaging and backend event processing. Durable Streams complements these systems by solving a different problem: reliably streaming data to client applications.

flowchart LR
    subgraph Backend["Backend Systems"]
        Kafka["Kafka"]
        DB["Database"]
        Queue["Message Queue"]
    end

    subgraph App["Application Server"]
        Auth["Authorization"]
        Transform["Data Shaping"]
        Filter["Filtering"]
    end

    subgraph DS["Durable Streams"]
        Protocol["HTTP Protocol"]
        CDN["CDN Caching"]
    end

    subgraph Clients["Client Applications"]
        Web["Web"]
        Mobile["Mobile"]
        Native["Native"]
    end

    Kafka --> Auth
    DB --> Auth
    Queue --> Auth
    Auth --> Transform
    Transform --> Filter
    Filter --> Protocol
    Protocol --> CDN
    CDN --> Web
    CDN --> Mobile
    CDN --> Native
Loading

The challenges of streaming to clients are distinct from server-to-server streaming:

  • Client diversity - Supporting web browsers, mobile apps, native clients, each with different capabilities and constraints
  • Network unreliability - Clients disconnect constantly (backgrounded tabs, network switches, page refreshes)
  • Resumability requirements - Clients need to pick up exactly where they left off without data loss
  • Economics - Per-connection costs make dedicated connections to millions of clients prohibitive
  • Protocol compatibility - Kafka/AMQP protocols don't run in browsers or on most mobile platforms
  • Data shaping and authorization - Backend streams typically contain raw, unfiltered events; client streams need per-user filtering, transformation, and authorization applied

Complementary architecture:

Your application server consumes from backend streaming systems, applies authorization logic, shapes data for specific clients, and fans out via Durable Streams. This separation allows:

  • Backend systems to optimize for throughput, partitioning, and server-to-server reliability
  • Application servers to enforce authorization boundaries and transform data
  • Durable Streams to optimize for HTTP compatibility, CDN leverage, and client resumability
  • Each layer to use protocols suited to its environment

Why Not SSE or WebSockets?

Server-Sent Events (SSE) and WebSockets provide real-time communication, but both share fundamental limitations for durable streaming:

Feature SSE WebSockets Durable Streams
Durability No No Yes
Resumption Manual Manual Built-in
Catch-up Manual Manual Built-in
Caching Poor None CDN-friendly
Binary No Yes Yes
Stateless No No Yes

Durable Streams addresses all of these:

  • Durable storage - Data persists across server restarts and client disconnections
  • Standardized resumption - Opaque, lexicographically sortable offsets with defined semantics
  • Unified catch-up and live protocol - Same offset-based API for historical and real-time data
  • Simple protocol - Standard HTTP methods and headers, no custom framing required
  • Caching-friendly - Offset-based requests enable efficient CDN and browser caching
  • Stateless servers - No connection state to manage; clients track their own offsets
  • Binary support - Content-type agnostic byte streams
  • Conformance tests - Standardized test suite ensures consistent behavior

Durable Streams can use SSE as a transport mechanism (via live=sse mode) while providing the missing durability layer on top.

Building Your Own Implementation

The protocol is designed to support implementations in any language or platform. A conforming server implementation requires:

  1. HTTP API - Implement the protocol operations (PUT, POST, GET, DELETE, HEAD) as defined in PROTOCOL.md
  2. Durable storage - Persist stream data with offset tracking (in-memory, file-based, database, object storage, etc.)
  3. Offset management - Generate opaque, lexicographically sortable offset tokens

Client implementations need only support standard HTTP requests and offset tracking.

We encourage implementations in other languages and environments (Go, Rust, Python, Java, C#, Swift, Kotlin, etc.). Use the conformance test suite to verify protocol compliance:

import { runConformanceTests } from "@durable-streams/conformance-tests"

runConformanceTests({
  baseUrl: "http://localhost:8787",
})

Node.js Reference Server

npm install @durable-streams/server
import { createDurableStreamServer } from "@durable-streams/server"

const server = createDurableStreamServer({
  port: 8787,
  // In-memory storage (for development)
  // Add file-backed storage for production
})

await server.start()

See @durable-streams/server for more details.

CLI Tool

npm install -g @durable-streams/cli

# Set the server URL (defaults to http://localhost:8787)
export STREAM_URL=https://your-server.com
# Create a stream
durable-stream create my-stream

# Write to a stream
echo "hello world" | durable-stream write my-stream

# Read from a stream
durable-stream read my-stream

# Delete a stream
durable-stream delete my-stream

Use Cases

Database Sync

Stream database changes to web and mobile clients for real-time synchronization:

// Server: stream database changes
for (const change of db.changes()) {
  await stream.append(JSON.stringify(change))
}

// Client: receive and apply changes (works in browsers, React Native, native apps)
const result = await stream.read({ offset: lastSeenOffset })
const changes = parseChanges(result.data)
changes.forEach(applyChange)

Event Sourcing

Build event-sourced systems with durable event logs:

// Append events
await stream.append(JSON.stringify({ type: "OrderCreated", orderId: "123" }))
await stream.append(JSON.stringify({ type: "OrderPaid", orderId: "123" }))

// Replay from beginning (catch-up mode for full replay)
const result = await stream.read({ offset: "-1", live: false })
const events = parseEvents(result.data)
const state = events.reduce(applyEvent, initialState)

AI Conversation Streaming

Stream LLM responses with full conversation history accessible across devices:

// Stream AI response chunks
for await (const token of llm.stream(prompt)) {
  await stream.append(token)
}

// Client can resume from any point (switch devices, refresh page, reconnect)
const result = await stream.read({ offset: lastSeenOffset })
renderTokens(new TextDecoder().decode(result.data))

Testing Your Implementation

Use the conformance test suite to verify your server implements the protocol correctly:

import { runConformanceTests } from "@durable-streams/conformance-tests"

runConformanceTests({
  baseUrl: "http://localhost:8787",
})

Benchmarking

Measure your server's performance:

import { runBenchmarks } from "@durable-streams/benchmarks"

runBenchmarks({
  baseUrl: "http://localhost:8787",
  environment: "local",
})

Contributing

We welcome contributions! This project follows the Contributor Covenant code of conduct.

Development

# Clone the repository
git clone https://github.com/durable-streams/durable-streams.git
cd durable-streams

# Install dependencies
pnpm install

# Build all packages
pnpm build

# Run tests
pnpm test:run

# Lint and format
pnpm lint:fix
pnpm format

Changesets

We use changesets for version management:

# Add a changeset
pnpm changeset

# Version packages (done by CI)
pnpm changeset:version

# Publish (done by CI)
pnpm changeset:publish

License

Apache 2.0 - see LICENSE

Links


Status: Early development - API subject to change

About

The open protocol for real-time sync to client applications

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • TypeScript 96.5%
  • CSS 2.0%
  • JavaScript 1.4%
  • HTML 0.1%