Skip to content

Commit d03eadc

Browse files
authored
Wait for the race condition of parsing before closing the readable stream controller (#435)
1 parent 51d5265 commit d03eadc

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

packages/api/ai/plan-parser.mts

+10-5
Original file line numberDiff line numberDiff line change
@@ -169,25 +169,28 @@ export function getPackagesToInstall(plan: Plan): string[] {
169169
)
170170
.flatMap((action) => action.packages);
171171
}
172-
173172
export async function streamParsePlan(
174173
stream: AsyncIterable<string>,
175174
app: DBAppType,
176175
_query: string,
177176
planId: string,
178177
) {
179178
let parser: StreamingXMLParser;
179+
const parsePromises: Promise<void>[] = [];
180180

181181
return new ReadableStream({
182182
async pull(controller) {
183183
if (parser === undefined) {
184184
parser = new StreamingXMLParser({
185185
async onTag(tag) {
186186
if (tag.name === 'planDescription' || tag.name === 'action') {
187-
const chunk = await toStreamingChunk(app, tag, planId);
188-
if (chunk) {
189-
controller.enqueue(JSON.stringify(chunk) + '\n');
190-
}
187+
const promise = (async () => {
188+
const chunk = await toStreamingChunk(app, tag, planId);
189+
if (chunk) {
190+
controller.enqueue(JSON.stringify(chunk) + '\n');
191+
}
192+
})();
193+
parsePromises.push(promise);
191194
}
192195
},
193196
});
@@ -197,6 +200,8 @@ export async function streamParsePlan(
197200
for await (const chunk of stream) {
198201
parser.parse(chunk);
199202
}
203+
// Wait for all pending parse operations to complete before closing
204+
await Promise.all(parsePromises);
200205
controller.close();
201206
} catch (error) {
202207
console.error(error);

0 commit comments

Comments
 (0)