Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/helpers/__tests__/events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { EventEmitter } from 'node:events';
import { onceWhen } from '../events';

describe('onceWhen tests', () => {
test('should resolve when event is emitted and predicate matches', async () => {
const emitter = new EventEmitter<{
myTestEvent: [eventNumber: number, msg: string];
}>();

setTimeout(() => {
for (let i = 0; i <= 5; i++) {
emitter.emit('myTestEvent', i, `Message ${i}`);
}
}, 10); // Emit after a delay

const [eventNumber, msg] = await onceWhen(emitter, 'myTestEvent', (eventNumber, msg) => {
return eventNumber === 5;
});
expect(eventNumber).toBe(5);
expect(msg).toBe('Message 5');

// Expect that the event listener was removed after onceWhen is finished
expect(emitter.eventNames()).toStrictEqual([]);
});

test('should reject if aborted immediately', async () => {
const emitter = new EventEmitter<{
myTestEvent: [eventNumber: number];
}>();
const controller = new AbortController();
const abortReason = new Error('Test aborted');
controller.abort(abortReason);
await expect(
onceWhen(emitter, 'myTestEvent', () => true, { signal: controller.signal })
).rejects.toThrow(abortReason);

// Expect that the event listener was removed after onceWhen is finished
expect(emitter.eventNames()).toStrictEqual([]);
});

test('should reject if aborted before event is emitted', async () => {
const emitter = new EventEmitter<{
myTestEvent: [eventNumber: number];
}>();
const controller = new AbortController();
const abortReason = new Error('Test aborted');
// controller.abort(abortReason);
setTimeout(() => {
for (let i = 0; i <= 5; i++) {
emitter.emit('myTestEvent', i);
if (i === 3) {
controller.abort(abortReason); // Abort after emitting some events
}
}
}, 10); // Emit after a delay

let lastEventNumberSeen = 0;
await expect(
onceWhen(
emitter,
'myTestEvent',
eventNumber => {
lastEventNumberSeen = eventNumber;
return false;
},
{ signal: controller.signal }
)
).rejects.toThrow(abortReason);

// Check that we saw events before the abort
expect(lastEventNumberSeen).toBe(3);

// Expect that the event listener was removed after onceWhen is finished
expect(emitter.eventNames()).toStrictEqual([]);
});

test('should resolve if event is emitted before abort', async () => {
const emitter = new EventEmitter<{
myTestEvent: [eventNumber: number];
}>();
const controller = new AbortController();

setTimeout(() => {
for (let i = 0; i <= 5; i++) {
emitter.emit('myTestEvent', i);
}
controller.abort(); // Abort after emitting all events
}, 10); // Emit after a delay

const [eventNumber] = await onceWhen(emitter, 'myTestEvent', eventNumber => eventNumber === 5, {
signal: controller.signal,
});
expect(eventNumber).toBe(5);

// Expect that the event listener was removed after onceWhen is finished
expect(emitter.eventNames()).toStrictEqual([]);
});

test('should reject if predict function throws', async () => {
const emitter = new EventEmitter<{
myTestEvent: [eventNumber: number];
}>();
setTimeout(() => {
for (let i = 0; i <= 5; i++) {
emitter.emit('myTestEvent', i);
}
}, 10);

let lastEventNumberSeen = 0;
const predictFunctionError = new Error('Predict function error');
await expect(
onceWhen(emitter, 'myTestEvent', eventNumber => {
lastEventNumberSeen = eventNumber;
if (eventNumber === 3) {
throw predictFunctionError;
}
return false;
})
).rejects.toThrow(predictFunctionError);
expect(lastEventNumberSeen).toBe(3);

// Expect that the event listener was removed after onceWhen is finished
expect(emitter.eventNames()).toStrictEqual([]);
});

test('abort signal test', async () => {
const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>();
const signal = AbortSignal.timeout(10);
setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000);
const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal });
// This rejects because the signal is aborted before the event is emitted
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await expect(whenPromise).rejects.toThrow(signal.reason);
});
});
103 changes: 103 additions & 0 deletions src/helpers/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { EventEmitter, addAbortListener } from 'node:events';

// This is a workaround for Node.js versions that do not support Symbol.dispose
const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose');

/**
* Creates a Promise that resolves when the specified `eventName` is emitted by the `EventEmitter`
* and the provided predicate returns `true` for the emitted arguments.
*
* Similar to [`events.once`](https://nodejs.org/api/events.html#eventsonceemitter-name-options),
* but includes support for a predicate function to filter events. Only events for which
* the predicate returns `true` will cause the Promise to resolve.
*
* The resolved value is an array of the arguments emitted with the event.
*
* Supports typed `EventEmitter`s and optional cancellation via `AbortSignal`.
*
* @example
* ```ts
* import { EventEmitter } from 'node:events';
*
* const emitter = new EventEmitter<{
* myEvent: [id: number, msg: string];
* }>();
*
* setTimeout(() => {
* for (let i = 0; i <= 5; i++) {
* emitter.emit('myEvent', i, `Message ${i}`);
* }
* }, 100);
*
* const [id, msg] = await onceWhen(emitter, 'myEvent', (id, msg) => id === 3);
*
* // outputs: "Received event with id: 3, message: Message 3"
* console.log(`Received event with id: ${id}, message: ${msg}`);
* ```
*
* @example
* ```ts
* import { EventEmitter } from 'node:events';
*
* const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>();
*
* const signal = AbortSignal.timeout(10);
*
* setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000);
*
* const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal });
*
* // This rejects because the signal is aborted before the event is emitted
* await expect(whenPromise).rejects.toThrow(signal.reason);
* ```
*/
export function onceWhen<
EventMap extends Record<string, any[]> = Record<string, any[]>,
K extends Extract<keyof EventMap, string> = Extract<keyof EventMap, string>
>(
emitter: EventEmitter<EventMap>,
eventName: K,
predicate: (...args: EventMap[K]) => boolean,
options?: { signal?: AbortSignal }
): Promise<EventMap[K]> {
return new Promise((resolve, reject) => {
// Immediate abort check
if (options?.signal?.aborted) {
reject((options.signal.reason as Error) ?? new Error('Aborted'));
return;
}

// Cleanup helper: remove both the event listener and the abort listener
const cleanup = () => {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
(emitter as EventEmitter).off(eventName, listener);
// eslint-disable-next-line @typescript-eslint/no-use-before-define
disposable?.[DisposeSymbol]();
};

// Abort handler
const onAbort = () => {
cleanup();
reject((options?.signal?.reason as Error) ?? new Error('Aborted'));
};

// Our event listener that checks the predicate
const listener = (...args: EventMap[K]) => {
try {
if (predicate(...args)) {
cleanup();
resolve(args);
}
} catch (err) {
cleanup();
reject(err as Error);
return;
}
};

// Install the AbortSignal listener via Node’s helper
const disposable = options?.signal ? addAbortListener(options.signal, onAbort) : undefined;

(emitter as EventEmitter).on(eventName, listener);
});
}
1 change: 1 addition & 0 deletions src/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export * from './iterators';
export * from './time';
export * from './values';
export * from './is-debugging';
export * from './events';
export { WorkerThreadManager } from './worker-thread-manager';
export type { WorkerPoolModuleInterface } from './worker-thread-manager';