-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
This is not as robust as yours but thought the implementation of using WebStreams was interesting enough to share:
function pipe() {
const pipeline = new Set();
const builder = (fn) => {
const unit = {
toStream: () => new TransformStream({
transform: async (data, cont) => cont.enqueue(await fn(data)),
}),
[Symbol.toPrimitive]: () => (pipeline.add(unit), 0),
};
return unit;
};
builder.run = (items) => {
let stream = new ReadableStream({
start: (controller) => {
for (const item of items) controller.enqueue(item);
controller.close();
},
})
pipeline.forEach((unit) => stream = stream.pipeThrough(unit.toStream()));
return Array.fromAsync(stream);
};
return builder;
}
{
const $ = pipe();
$(x => x + 1)
| $(x => x + 1)
| $(x => x + 1)
| $(x => x + 1)
| $(x => x + 1);
console.log(await $.run([0, 10, 20, 30, 50]));
console.log(await $.run([100, 110, 120, 130, 150]));
}Metadata
Metadata
Assignees
Labels
No labels