Skip to content

Commit 7272420

Browse files
committed
fix(realtime): gracefully recover from ECONNRESET errors when sending stream data from tasks to the server
1 parent 885d2d3 commit 7272420

File tree

9 files changed

+460
-38
lines changed

9 files changed

+460
-38
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@ export async function action({ request, params }: ActionFunctionArgs) {
1616
return new Response("No body provided", { status: 400 });
1717
}
1818

19-
return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
19+
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
20+
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
21+
22+
return relayRealtimeStreams.ingestData(
23+
request.body,
24+
$params.runId,
25+
$params.streamId,
26+
resumeFromChunkNumber
27+
);
2028
}
2129

2230
export const loader = createLoaderApiRoute(

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
33
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
4-
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
4+
import {
5+
createActionApiRoute,
6+
createLoaderApiRoute,
7+
} from "~/services/routeBuilders/apiBuilder.server";
58

69
const ParamsSchema = z.object({
710
runId: z.string(),
@@ -14,10 +17,6 @@ const { action } = createActionApiRoute(
1417
params: ParamsSchema,
1518
},
1619
async ({ request, params, authentication }) => {
17-
if (!request.body) {
18-
return new Response("No body provided", { status: 400 });
19-
}
20-
2120
const run = await $replica.taskRun.findFirst({
2221
where: {
2322
friendlyId: params.runId,
@@ -54,8 +53,80 @@ const { action } = createActionApiRoute(
5453
return new Response("Target not found", { status: 404 });
5554
}
5655

57-
return relayRealtimeStreams.ingestData(request.body, targetId, params.streamId);
56+
if (!request.body) {
57+
return new Response("No body provided", { status: 400 });
58+
}
59+
60+
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
61+
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
62+
63+
return relayRealtimeStreams.ingestData(
64+
request.body,
65+
targetId,
66+
params.streamId,
67+
resumeFromChunkNumber
68+
);
69+
}
70+
);
71+
72+
const loader = createLoaderApiRoute(
73+
{
74+
params: ParamsSchema,
75+
allowJWT: false,
76+
corsStrategy: "none",
77+
findResource: async (params, authentication) => {
78+
return $replica.taskRun.findFirst({
79+
where: {
80+
friendlyId: params.runId,
81+
runtimeEnvironmentId: authentication.environment.id,
82+
},
83+
select: {
84+
id: true,
85+
friendlyId: true,
86+
parentTaskRun: {
87+
select: {
88+
friendlyId: true,
89+
},
90+
},
91+
rootTaskRun: {
92+
select: {
93+
friendlyId: true,
94+
},
95+
},
96+
},
97+
});
98+
},
99+
},
100+
async ({ request, params, resource: run }) => {
101+
if (!run) {
102+
return new Response("Run not found", { status: 404 });
103+
}
104+
105+
const targetId =
106+
params.target === "self"
107+
? run.friendlyId
108+
: params.target === "parent"
109+
? run.parentTaskRun?.friendlyId
110+
: run.rootTaskRun?.friendlyId;
111+
112+
if (!targetId) {
113+
return new Response("Target not found", { status: 404 });
114+
}
115+
116+
// Handle HEAD request to get last chunk index
117+
if (request.method !== "HEAD") {
118+
return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 });
119+
}
120+
121+
const lastChunkIndex = await relayRealtimeStreams.getLastChunkIndex(targetId, params.streamId);
122+
123+
return new Response(null, {
124+
status: 200,
125+
headers: {
126+
"X-Last-Chunk-Index": lastChunkIndex.toString(),
127+
},
128+
});
58129
}
59130
);
60131

61-
export { action };
132+
export { action, loader };

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,36 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5555
lastId = id;
5656

5757
if (fields && fields.length >= 2) {
58-
if (fields[1] === END_SENTINEL && i === entries.length - 1) {
59-
controller.close();
60-
return;
58+
// Extract the data field from the Redis entry
59+
// Fields format: ["field1", "value1", "field2", "value2", ...]
60+
let data: string | null = null;
61+
62+
for (let j = 0; j < fields.length; j += 2) {
63+
if (fields[j] === "data") {
64+
data = fields[j + 1];
65+
break;
66+
}
6167
}
6268

63-
if (fields[1] !== END_SENTINEL) {
64-
controller.enqueue(fields[1]);
69+
// Handle legacy entries that don't have field names (just data at index 1)
70+
if (data === null && fields.length >= 2) {
71+
data = fields[1];
6572
}
6673

67-
if (signal.aborted) {
68-
controller.close();
69-
return;
74+
if (data) {
75+
if (data === END_SENTINEL && i === entries.length - 1) {
76+
controller.close();
77+
return;
78+
}
79+
80+
if (data !== END_SENTINEL) {
81+
controller.enqueue(data);
82+
}
83+
84+
if (signal.aborted) {
85+
controller.close();
86+
return;
87+
}
7088
}
7189
}
7290
}
@@ -127,10 +145,14 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
127145
async ingestData(
128146
stream: ReadableStream<Uint8Array>,
129147
runId: string,
130-
streamId: string
148+
streamId: string,
149+
resumeFromChunk?: number
131150
): Promise<Response> {
132151
const redis = new Redis(this.options.redis ?? {});
133152
const streamKey = `stream:${runId}:${streamId}`;
153+
const startChunk = resumeFromChunk ?? 0;
154+
// Start counting from the resume point, not from 0
155+
let currentChunkIndex = startChunk;
134156

135157
async function cleanup() {
136158
try {
@@ -151,9 +173,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
151173
break;
152174
}
153175

154-
logger.debug("[RedisRealtimeStreams][ingestData] Reading data", {
176+
// Write each chunk with its index
177+
logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
155178
streamKey,
156179
runId,
180+
chunkIndex: currentChunkIndex,
181+
resumeFromChunk: startChunk,
157182
value,
158183
});
159184

@@ -163,9 +188,13 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
163188
"~",
164189
String(env.REALTIME_STREAM_MAX_LENGTH),
165190
"*",
191+
"chunkIndex",
192+
currentChunkIndex.toString(),
166193
"data",
167194
value
168195
);
196+
197+
currentChunkIndex++;
169198
}
170199

171200
// Send the END_SENTINEL and set TTL with a pipeline.
@@ -200,4 +229,50 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
200229
await cleanup();
201230
}
202231
}
232+
233+
async getLastChunkIndex(runId: string, streamId: string): Promise<number> {
234+
const redis = new Redis(this.options.redis ?? {});
235+
const streamKey = `stream:${runId}:${streamId}`;
236+
237+
try {
238+
// Get the last entry from the stream using XREVRANGE
239+
const entries = await redis.xrevrange(streamKey, "+", "-", "COUNT", 1);
240+
241+
if (!entries || entries.length === 0) {
242+
// No entries in stream, return -1 to indicate no chunks received
243+
return -1;
244+
}
245+
246+
const [_id, fields] = entries[0];
247+
248+
// Find the chunkIndex field
249+
for (let i = 0; i < fields.length; i += 2) {
250+
if (fields[i] === "chunkIndex") {
251+
const chunkIndex = parseInt(fields[i + 1], 10);
252+
logger.debug("[RedisRealtimeStreams][getLastChunkIndex] Found last chunk", {
253+
streamKey,
254+
chunkIndex,
255+
});
256+
return chunkIndex;
257+
}
258+
}
259+
260+
// If no chunkIndex field found (legacy entries), return -1
261+
logger.warn("[RedisRealtimeStreams][getLastChunkIndex] No chunkIndex found in entry", {
262+
streamKey,
263+
});
264+
return -1;
265+
} catch (error) {
266+
logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error getting last chunk:", {
267+
error,
268+
streamKey,
269+
});
270+
// Return -1 to indicate we don't know what the server has
271+
return -1;
272+
} finally {
273+
await redis.quit().catch((err) => {
274+
logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err });
275+
});
276+
}
277+
}
203278
}

apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,29 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
134134
async ingestData(
135135
stream: ReadableStream<Uint8Array>,
136136
runId: string,
137-
streamId: string
137+
streamId: string,
138+
resumeFromChunk?: number
138139
): Promise<Response> {
139140
const [localStream, fallbackStream] = stream.tee();
140141

141-
logger.debug("[RelayRealtimeStreams][ingestData] Ingesting data", { runId, streamId });
142+
logger.debug("[RelayRealtimeStreams][ingestData] Ingesting data", {
143+
runId,
144+
streamId,
145+
resumeFromChunk,
146+
});
142147

143148
// Handle local buffering asynchronously and catch errors
144149
this.handleLocalIngestion(localStream, runId, streamId).catch((err) => {
145150
logger.error("[RelayRealtimeStreams][ingestData] Error in local ingestion:", { err });
146151
});
147152

148153
// Forward to the fallback ingestor asynchronously and catch errors
149-
return this.options.fallbackIngestor.ingestData(fallbackStream, runId, streamId);
154+
return this.options.fallbackIngestor.ingestData(
155+
fallbackStream,
156+
runId,
157+
streamId,
158+
resumeFromChunk
159+
);
150160
}
151161

152162
/**
@@ -237,6 +247,11 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
237247
});
238248
}
239249

250+
async getLastChunkIndex(runId: string, streamId: string): Promise<number> {
251+
// Relay doesn't store chunks, forward to fallback
252+
return this.options.fallbackIngestor.getLastChunkIndex(runId, streamId);
253+
}
254+
240255
// Don't forget to clear interval on shutdown if needed
241256
close() {
242257
clearInterval(this.cleanupInterval);

apps/webapp/app/services/realtime/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ export interface StreamIngestor {
55
ingestData(
66
stream: ReadableStream<Uint8Array>,
77
runId: string,
8-
streamId: string
8+
streamId: string,
9+
resumeFromChunk?: number
910
): Promise<Response>;
11+
12+
getLastChunkIndex(runId: string, streamId: string): Promise<number>;
1013
}
1114

1215
// Interface for stream response

docker/config/toxiproxy.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[
2+
{
3+
"name": "trigger_webapp_local",
4+
"listen": "[::]:30303",
5+
"upstream": "host.docker.internal:3030",
6+
"enabled": true
7+
}
8+
]

docker/docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@ services:
141141
networks:
142142
- app_network
143143

144+
toxiproxy:
145+
container_name: toxiproxy
146+
image: ghcr.io/shopify/toxiproxy:latest
147+
restart: always
148+
volumes:
149+
- ./config/toxiproxy.json:/config/toxiproxy.json
150+
ports:
151+
- "30303:30303" # Proxied webapp port
152+
- "8474:8474" # Toxiproxy API port
153+
networks:
154+
- app_network
155+
command: ["-host", "0.0.0.0", "-config", "/config/toxiproxy.json"]
156+
144157
# otel-collector:
145158
# container_name: otel-collector
146159
# image: otel/opentelemetry-collector-contrib:latest

0 commit comments

Comments
 (0)