Skip to content

Commit 91c8450

Browse files
committed
chore: Stop writing and close reader when consumer closes writer
1 parent 7f8aa18 commit 91c8450

8 files changed

+1030
-728
lines changed

example/pipeline.ttl

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
@prefix rml: <https://w3id.org/conn/rml#>.
88
@prefix ql: <http://semweb.mmlab.be/ns/ql#>.
99

10-
<> owl:imports <../node_modules/@ajuvercr/js-runner/ontology.ttl>.
11-
<> owl:imports <../node_modules/@ajuvercr/js-runner/channels/file.ttl>.
12-
<> owl:imports <../node_modules/@ajuvercr/js-runner/channels/http.ttl>.
10+
<> owl:imports <../node_modules/@rdfc/js-runner/ontology.ttl>.
11+
<> owl:imports <../node_modules/@rdfc/js-runner/channels/file.ttl>.
12+
<> owl:imports <../node_modules/@rdfc/js-runner/channels/http.ttl>.
1313
<> owl:imports <../processor.ttl>.
1414
[ ] a js:JsChannel;
1515
:writer <json/writer>.

lib/client.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
2020
import { Manager } from "./memberManager";
2121
import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy";
2222
import debug from "debug";
23-
import type { Writer } from "@ajuvercr/js-runner";
23+
import type { Writer } from "@rdfc/js-runner";
2424

2525
// import { ReadableStream } from "stream/web";
2626
export { intoConfig } from "./config";
@@ -468,8 +468,14 @@ export async function processor(
468468
client.on("fragment", () => console.error("Fragment!"));
469469
}
470470

471+
const reader = client.stream({ highWaterMark: 10 }).getReader();
472+
473+
writer.on("end", async () => {
474+
await reader.cancel();
475+
console.log("Writer closed, so closing reader as well.");
476+
});
477+
471478
return async () => {
472-
const reader = client.stream({ highWaterMark: 10 }).getReader();
473479
let el = await reader.read();
474480
const seen = new Set();
475481
while (el) {

0 commit comments

Comments
 (0)