Skip to content

Commit 3258134

Browse files
committed
Make the stream client more resilient and robust, including implementing resumable streams. We also will now send invisible "ping" packets to keep connected clients alive when there are no real data packets to send, which will be especially helpful to older clients
1 parent 849550e commit 3258134

27 files changed

+1429
-324
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4-
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
54
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
65
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
76

@@ -18,7 +17,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
1817

1918
// Handle HEAD request to get last chunk index for this client
2019
if (request.method === "HEAD") {
21-
const lastChunkIndex = await relayRealtimeStreams.getLastChunkIndex(
20+
const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
2221
$params.runId,
2322
$params.streamId,
2423
clientId
@@ -39,7 +38,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
3938
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
4039
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
4140

42-
return relayRealtimeStreams.ingestData(
41+
return v1RealtimeStreams.ingestData(
4342
request.body,
4443
$params.runId,
4544
$params.streamId,
@@ -80,11 +79,15 @@ export const loader = createLoaderApiRoute(
8079
},
8180
},
8281
async ({ params, request, resource: run, authentication }) => {
82+
// Get Last-Event-ID header for resuming from a specific position
83+
const lastEventId = request.headers.get("Last-Event-ID") || undefined;
84+
8385
return v1RealtimeStreams.streamResponse(
8486
request,
8587
run.friendlyId,
8688
params.streamId,
87-
request.signal
89+
request.signal,
90+
lastEventId
8891
);
8992
}
9093
);

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ export type RealtimeStreamsOptions = {
1414
// Legacy constant for backward compatibility (no longer written, but still recognized when reading)
1515
const END_SENTINEL = "<<CLOSE_STREAM>>";
1616

17+
// Internal types for stream pipeline
18+
type StreamChunk =
19+
| { type: "ping" }
20+
| { type: "data"; redisId: string; data: string }
21+
| { type: "legacy-data"; redisId: string; data: string };
22+
1723
// Class implementing both interfaces
1824
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
1925
private logger: Logger;
@@ -28,22 +34,40 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
2834
request: Request,
2935
runId: string,
3036
streamId: string,
31-
signal: AbortSignal
37+
signal: AbortSignal,
38+
lastEventId?: string
3239
): Promise<Response> {
3340
const redis = new Redis(this.options.redis ?? {});
3441
const streamKey = `stream:${runId}:${streamId}`;
3542
let isCleanedUp = false;
3643

37-
const stream = new ReadableStream({
44+
const stream = new ReadableStream<StreamChunk>({
3845
start: async (controller) => {
39-
let lastId = "0";
46+
// Start from lastEventId if provided, otherwise from beginning
47+
let lastId = lastEventId || "0";
4048
let retryCount = 0;
4149
const maxRetries = 3;
4250
let lastDataTime = Date.now();
51+
let lastEnqueueTime = Date.now();
4352
const blockTimeMs = 5000;
53+
const pingIntervalMs = 10000; // 10 seconds
54+
55+
if (lastEventId) {
56+
this.logger.debug("[RealtimeStreams][streamResponse] Resuming from lastEventId", {
57+
streamKey,
58+
lastEventId,
59+
});
60+
}
4461

4562
try {
4663
while (!signal.aborted) {
64+
// Check if we need to send a ping
65+
const timeSinceLastEnqueue = Date.now() - lastEnqueueTime;
66+
if (timeSinceLastEnqueue >= pingIntervalMs) {
67+
controller.enqueue({ type: "ping" });
68+
lastEnqueueTime = Date.now();
69+
}
70+
4771
try {
4872
const messages = await redis.xread(
4973
"COUNT",
@@ -88,9 +112,16 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
88112
continue;
89113
}
90114

91-
controller.enqueue(data);
115+
// Enqueue structured chunk with Redis stream ID
116+
controller.enqueue({
117+
type: "data",
118+
redisId: id,
119+
data,
120+
});
121+
92122
foundData = true;
93123
lastDataTime = Date.now();
124+
lastEnqueueTime = Date.now();
94125

95126
if (signal.aborted) {
96127
controller.close();
@@ -161,12 +192,31 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
161192
await cleanup();
162193
},
163194
})
164-
.pipeThrough(new LineTransformStream())
165195
.pipeThrough(
166-
new TransformStream({
196+
// Transform 1: Split data content by newlines, preserving metadata
197+
new TransformStream<StreamChunk, StreamChunk & { line?: string }>({
198+
transform(chunk, controller) {
199+
if (chunk.type === "ping") {
200+
controller.enqueue(chunk);
201+
} else if (chunk.type === "data" || chunk.type === "legacy-data") {
202+
// Split data by newlines, emit separate chunks with same metadata
203+
const lines = chunk.data.split("\n").filter((line) => line.trim().length > 0);
204+
for (const line of lines) {
205+
controller.enqueue({ ...chunk, line });
206+
}
207+
}
208+
},
209+
})
210+
)
211+
.pipeThrough(
212+
// Transform 2: Format as SSE
213+
new TransformStream<StreamChunk & { line?: string }, string>({
167214
transform(chunk, controller) {
168-
for (const line of chunk) {
169-
controller.enqueue(`data: ${line}\n\n`);
215+
if (chunk.type === "ping") {
216+
controller.enqueue(`: ping\n\n`);
217+
} else if ((chunk.type === "data" || chunk.type === "legacy-data") && chunk.line) {
218+
// Use Redis stream ID as SSE event ID
219+
controller.enqueue(`id: ${chunk.redisId}\ndata: ${chunk.line}\n\n`);
170220
}
171221
},
172222
})

0 commit comments

Comments
 (0)