-
Notifications
You must be signed in to change notification settings - Fork 46
Mike/ait token streaming OpenAI sdk #3074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mschristensen
merged 9 commits into
AIT-token-streaming-OpenAI-SDK
from
mike/AIT-token-streaming-OpenAI-SDK
Jan 7, 2026
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
85109fc
fixup! Fixup based on style review comments
mschristensen 2fd0330
fixup! Fixup based on style review comments
mschristensen b7116ee
fixup! Fixup based on style review comments
mschristensen aac77ad
ait/guides: refactor openai message per token
mschristensen cc361c8
ait/guides: rename openai sdk guide
mschristensen 1c06919
fixup! ait/guides: rename openai sdk guide
mschristensen 53b0736
fixup! ait/guides: rename openai sdk guide
mschristensen 878d248
fixup! ait/guides: rename openai sdk guide
mschristensen ba71b9a
fixup! ait/guides: rename openai sdk guide
mschristensen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
381 changes: 381 additions & 0 deletions
381
src/pages/docs/guides/ai-transport/openai-message-per-token.mdx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,381 @@ | ||
| --- | ||
| title: "Guide: Stream OpenAI responses using the message-per-token pattern" | ||
| meta_description: "Stream tokens from the OpenAI Responses API over Ably in realtime." | ||
| meta_keywords: "AI, token streaming, OpenAI, Responses API, AI transport, Ably, realtime" | ||
| --- | ||
|
|
||
| This guide shows you how to stream AI responses from OpenAI's [Responses API](https://platform.openai.com/docs/api-reference/responses) over Ably using the [message-per-token pattern](/docs/ai-transport/features/token-streaming/message-per-token). Specifically, it implements the [explicit start/stop events approach](/docs/ai-transport/features/token-streaming/message-per-token#explicit-events), which publishes each response token as an individual message, along with explicit lifecycle events to signal when responses begin and end. | ||
|
|
||
| Using Ably to distribute tokens from the OpenAI SDK enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees, ensuring that each client receives the complete response stream with all tokens delivered in order. This approach decouples your AI inference from client connections, enabling you to scale agents independently and handle reconnections gracefully. | ||
|
|
||
| <Aside data-type="note"> | ||
| To discover other approaches to token streaming, including the [message-per-response](/docs/ai-transport/features/token-streaming/message-per-response) pattern, see the [token streaming](/docs/ai-transport/features/token-streaming) documentation. | ||
| </Aside> | ||
|
|
||
| ## Prerequisites <a id="prerequisites"/> | ||
|
|
||
| To follow this guide, you need: | ||
| - Node.js 20 or higher | ||
| - An OpenAI API key | ||
| - An Ably API key | ||
|
|
||
| Useful links: | ||
| - [OpenAI developer quickstart](https://platform.openai.com/docs/quickstart) | ||
| - [Ably JavaScript SDK getting started](/docs/getting-started/javascript) | ||
|
|
||
| Create a new NPM package, which will contain the publisher and subscriber code: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| mkdir ably-openai-example && cd ably-openai-example | ||
| npm init -y | ||
| ``` | ||
| </Code> | ||
|
|
||
| Install the required packages using NPM: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| npm install openai@^4 ably@^2 | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="note"> | ||
| This guide uses version 4.x of the OpenAI SDK. Some details of interacting with the OpenAI SDK may differ from those given here if using a different major version. | ||
| </Aside> | ||
|
|
||
| Export your OpenAI API key to the environment, which will be used later in the guide by the OpenAI SDK: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| export OPENAI_API_KEY="your_api_key_here" | ||
| ``` | ||
| </Code> | ||
|
|
||
| ## Step 1: Get a streamed response from OpenAI <a id="step-1"/> | ||
|
|
||
| Initialize an OpenAI client and use the [Responses API](https://platform.openai.com/docs/api-reference/responses) to stream model output as a series of events. | ||
|
|
||
| Create a new file `publisher.mjs` with the following contents: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| import OpenAI from 'openai'; | ||
|
|
||
| // Initialize OpenAI client | ||
| const openai = new OpenAI(); | ||
|
|
||
| // Process each streaming event | ||
| function processEvent(event) { | ||
| console.log(JSON.stringify(event)); | ||
| // This function is updated in the next sections | ||
| } | ||
|
|
||
| // Create streaming response from OpenAI | ||
| async function streamOpenAIResponse(prompt) { | ||
| const stream = await openai.responses.create({ | ||
| model: "gpt-5", | ||
| input: prompt, | ||
| stream: true, | ||
| }); | ||
|
|
||
| // Iterate through streaming events | ||
| for await (const event of stream) { | ||
| processEvent(event); | ||
| } | ||
| } | ||
|
|
||
| // Usage example | ||
| streamOpenAIResponse("Tell me a short joke"); | ||
| ``` | ||
| </Code> | ||
|
|
||
| ### Understand OpenAI streaming events <a id="understand-streaming-events"/> | ||
|
|
||
| OpenAI's Responses API [streams](https://platform.openai.com/docs/guides/streaming-responses) model output as a series of events when you set `stream: true`. Each streamed event includes a `type` property which describes the [event type](https://platform.openai.com/docs/api-reference/responses-streaming). A complete text response can be constructed from the following event types: | ||
|
|
||
| - [`response.created`](https://platform.openai.com/docs/api-reference/responses-streaming/response/created): Signals the start of a response. Contains `response.id` to correlate subsequent events. | ||
|
|
||
| - [`response.output_item.added`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_item/added): Indicates a new output item. If `item.type === "message"` the item contains model response text; other types may be specified, such as `"reasoning"` for internal reasoning tokens. The `item.id` is included in all subsequent events relating to this item, enabling you to selectively stream tokens for specific items. The `output_index` indicates the position of this item in the response's output array. | ||
|
|
||
| - [`response.content_part.added`](https://platform.openai.com/docs/api-reference/responses-streaming/response/content_part/added): Indicates a new content part within an output item. Use `item_id`, `output_index`, and `content_index` to correlate tokens. | ||
|
|
||
| - [`response.output_text.delta`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_text/delta): Contains a single token in the `delta` field. The `item_id` identifies which output item this token belongs to. The `output_index` and `content_index` identify the specific content part. | ||
|
|
||
| - [`response.content_part.done`](https://platform.openai.com/docs/api-reference/responses-streaming/response/content_part/done): Signals completion of a content part. Contains the complete `part` object with full text, along with `item_id`, `output_index`, and `content_index`. | ||
|
|
||
| - [`response.output_item.done`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_item/done): Signals completion of an output item. Contains the complete `item` object and `output_index`. | ||
|
|
||
| - [`response.completed`](https://platform.openai.com/docs/api-reference/responses-streaming/response/completed): Signals the end of the response. | ||
|
|
||
| The following example shows the event sequence received when streaming a response: | ||
|
|
||
| <Code> | ||
| ```json | ||
| // 1. Response starts | ||
| {"type":"response.created","response":{"id":"resp_abc123","status":"in_progress"}} | ||
|
|
||
| // 2. First output item (reasoning) is added | ||
| {"type":"response.output_item.added","output_index":0,"item":{"id":"rs_456","type":"reasoning"}} | ||
| {"type":"response.output_item.done","output_index":0,"item":{"id":"rs_456","type":"reasoning"}} | ||
|
|
||
| // 3. Second output item (message) is added | ||
| {"type":"response.output_item.added","output_index":1,"item":{"id":"msg_789","type":"message"}} | ||
| {"type":"response.content_part.added","item_id":"msg_789","output_index":1,"content_index":0} | ||
|
|
||
| // 4. Text tokens stream in as delta events | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"Why"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" don"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"'t"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" scientists"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" trust"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" atoms"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"?"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" Because"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" they"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" make"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" up"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" everything"} | ||
| {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"."} | ||
|
|
||
| // 5. Content part and output item complete | ||
| {"type":"response.content_part.done","item_id":"msg_789","output_index":1,"content_index":0,"part":{"type":"output_text","text":"Why don't scientists trust atoms? Because they make up everything."}} | ||
| {"type":"response.output_item.done","output_index":1,"item":{"id":"msg_789","type":"message","status":"completed","content":[{"type":"output_text","text":"Why don't scientists trust atoms? Because they make up everything."}]}} | ||
|
|
||
| // 6. Response completes | ||
| {"type":"response.completed","response":{"id":"resp_abc123","status":"completed"}} | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="note"> | ||
| This is only an illustrative example for a simple "text in, text out" use case and may not reflect the exact sequence of events that you observe from the OpenAI API. It also does not describe response generation errors or refusals. For complete details on all event types and their properties, see [OpenAI Streaming events](https://platform.openai.com/docs/api-reference/responses-streaming/response). | ||
| </Aside> | ||
|
|
||
| ## Step 2: Publish streaming events to Ably <a id="step-2"/> | ||
|
|
||
| Publish OpenAI streaming events to Ably to reliably and scalably distribute them to subscribers. | ||
|
|
||
| This implementation follows the [explicit start/stop events pattern](/docs/ai-transport/features/token-streaming/message-per-token#explicit-events), which provides clear response boundaries. | ||
|
|
||
| ### Initialize the Ably client <a id="initialize-ably"/> | ||
|
|
||
| Add the Ably client initialization to your `publisher.mjs` file: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| import Ably from 'ably'; | ||
| import OpenAI from 'openai'; | ||
|
|
||
| // Initialize OpenAI client | ||
| const openai = new OpenAI(); | ||
|
|
||
| // Initialize Ably Realtime client | ||
| const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); | ||
|
|
||
| // Create a channel for publishing streamed AI responses | ||
| const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); | ||
| ``` | ||
| </Code> | ||
|
|
||
| The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency. | ||
|
|
||
| ### Map OpenAI streaming events to Ably messages <a id="map-events"/> | ||
|
|
||
| Choose how to map [OpenAI streaming events](#understand-streaming-events) to Ably messages. You can choose any mapping strategy that suits your application's needs. This guide uses the following pattern as an example: | ||
|
|
||
| - `start`: Signals the beginning of a response | ||
| - `token`: Contains the incremental text content for each delta | ||
| - `stop`: Signals the completion of a response | ||
|
|
||
| <Aside data-type="note"> | ||
| This implementation assumes each response contains a single `message` type output item. It filters out reasoning tokens and other non-`message` output items, and processes responses sequentially. For production use cases with concurrent responses or multiple output items or content parts, consider tracking state per response ID. | ||
| </Aside> | ||
|
|
||
| Update your `publisher.mjs` file to initialize the Ably client and update the `processEvent()` function to publish events to Ably: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| // Track state across events | ||
| let responseId = null; | ||
| let messageItemId = null; | ||
|
|
||
| // Process each streaming event and publish to Ably | ||
| function processEvent(event) { | ||
| switch (event.type) { | ||
| case 'response.created': | ||
| // Capture response ID when response starts | ||
| responseId = event.response.id; | ||
|
|
||
| // Publish start event | ||
| channel.publish({ | ||
| name: 'start', | ||
| extras: { | ||
| headers: { responseId } | ||
| } | ||
| }); | ||
| break; | ||
|
|
||
| case 'response.output_item.added': | ||
| // Capture message item ID when a message output item is added | ||
| if (event.item.type === 'message') { | ||
| messageItemId = event.item.id; | ||
| } | ||
| break; | ||
|
|
||
| case 'response.output_text.delta': | ||
| // Publish tokens from message output items only | ||
| if (event.item_id === messageItemId) { | ||
| channel.publish({ | ||
| name: 'token', | ||
| data: event.delta, | ||
| extras: { | ||
| headers: { responseId } | ||
| } | ||
| }); | ||
| } | ||
| break; | ||
|
|
||
| case 'response.completed': | ||
| // Publish stop event when response completes | ||
| channel.publish({ | ||
| name: 'stop', | ||
| extras: { | ||
| headers: { responseId } | ||
| } | ||
| }); | ||
| break; | ||
| } | ||
| } | ||
| ``` | ||
| </Code> | ||
|
|
||
| This implementation: | ||
|
|
||
| - Publishes a `start` event when the response begins, including the `responseId` in message `extras` | ||
| - Filters for `response.output_text.delta` events from `message` type output items | ||
| - Publishes each token with a message `name` of `token` and includes the `responseId` in message `extras` | ||
| - Publishes a `stop` event when the response completes | ||
|
|
||
| <Aside data-type="note"> | ||
| Ably messages are published without `await` to maximize throughput. Ably maintains message ordering even without awaiting each publish. For more information, see [Publishing tokens](/docs/ai-transport/features/token-streaming/message-per-token#publishing). | ||
| </Aside> | ||
|
|
||
| Run the publisher to see tokens streaming to Ably: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| node publisher.mjs | ||
| ``` | ||
| </Code> | ||
|
|
||
| ## Step 3: Subscribe to streaming tokens <a id="step-3"/> | ||
|
|
||
| Create a subscriber that receives the streaming tokens from Ably and reconstructs the response. | ||
|
|
||
| Create a new file `subscriber.mjs` with the following contents: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| import Ably from 'ably'; | ||
|
|
||
| // Initialize Ably Realtime client | ||
| const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); | ||
|
|
||
| // Get the same channel used by the publisher | ||
| const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); | ||
|
|
||
| // Track responses by ID | ||
| const responses = new Map(); | ||
|
|
||
| // Handle response start | ||
| await channel.subscribe('start', (message) => { | ||
| const responseId = message.extras?.headers?.responseId; | ||
| console.log('\n[Response started]', responseId); | ||
| responses.set(responseId, ''); | ||
| }); | ||
|
|
||
| // Handle tokens | ||
| await channel.subscribe('token', (message) => { | ||
| const responseId = message.extras?.headers?.responseId; | ||
| const token = message.data; | ||
|
|
||
| // Append token to response | ||
| const currentText = responses.get(responseId) || ''; | ||
| responses.set(responseId, currentText + token); | ||
|
|
||
| // Display token as it arrives | ||
| process.stdout.write(token); | ||
| }); | ||
|
|
||
| // Handle response stop | ||
| await channel.subscribe('stop', (message) => { | ||
| const responseId = message.extras?.headers?.responseId; | ||
| const finalText = responses.get(responseId); | ||
| console.log('\n[Response completed]', responseId); | ||
| }); | ||
|
|
||
| console.log('Subscriber ready, waiting for tokens...'); | ||
| ``` | ||
| </Code> | ||
|
|
||
| Run the subscriber in a separate terminal: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| node subscriber.mjs | ||
| ``` | ||
| </Code> | ||
|
|
||
| With the subscriber running, run the publisher in another terminal. The tokens stream in realtime as they are generated by the OpenAI model. | ||
|
|
||
| ## Step 4: Stream with multiple publishers and subscribers <a id="step-4"/> | ||
|
|
||
| Ably's [channel-oriented sessions](/docs/ai-transport/features/sessions-identity#connection-oriented-vs-channel-oriented-sessions) enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections. | ||
|
|
||
| ### Broadcasting to multiple subscribers <a id="broadcasting"/> | ||
|
|
||
| Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications. | ||
|
|
||
| Run a subscriber in multiple separate terminals: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| # Terminal 1 | ||
| node subscriber.mjs | ||
|
|
||
| # Terminal 2 | ||
| node subscriber.mjs | ||
|
|
||
| # Terminal 3 | ||
| node subscriber.mjs | ||
| ``` | ||
| </Code> | ||
|
|
||
| All subscribers receive the same stream of tokens in realtime. | ||
|
|
||
| ### Publishing concurrent responses <a id="multiple-publishers"/> | ||
|
|
||
| The implementation uses `responseId` in message `extras` to correlate tokens with their originating response. This enables multiple publishers to stream different responses concurrently on the same channel, with each subscriber correctly tracking all responses independently. | ||
|
|
||
| To demonstrate this, run a publisher in multiple separate terminals: | ||
|
|
||
| <Code> | ||
| ```shell | ||
| # Terminal 1 | ||
| node publisher.mjs | ||
|
|
||
| # Terminal 2 | ||
| node publisher.mjs | ||
|
|
||
| # Terminal 3 | ||
| node publisher.mjs | ||
| ``` | ||
| </Code> | ||
|
|
||
| All running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the `responseId` to correlate tokens. | ||
|
|
||
| ## Next steps | ||
|
|
||
| - Learn about [client hydration strategies](/docs/ai-transport/features/token-streaming/message-per-token#hydration) for handling late joiners and reconnections | ||
| - Understand [sessions and identity](/docs/ai-transport/features/sessions-identity) in AI enabled applications | ||
| - Explore the [message-per-response pattern](/docs/ai-transport/features/token-streaming/message-per-response) for storing complete AI responses as single messages in history | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we recommend checking for error responses from the publish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a separate ticket to address this: https://ably.atlassian.net/browse/AIT-238