diff --git a/src/helpers/__tests__/events.test.ts b/src/helpers/__tests__/events.test.ts new file mode 100644 index 0000000..b77770a --- /dev/null +++ b/src/helpers/__tests__/events.test.ts @@ -0,0 +1,135 @@ +import { EventEmitter } from 'node:events'; +import { onceWhen } from '../events'; + +describe('onceWhen tests', () => { + test('should resolve when event is emitted and predicate matches', async () => { + const emitter = new EventEmitter<{ + myTestEvent: [eventNumber: number, msg: string]; + }>(); + + setTimeout(() => { + for (let i = 0; i <= 5; i++) { + emitter.emit('myTestEvent', i, `Message ${i}`); + } + }, 10); // Emit after a delay + + const [eventNumber, msg] = await onceWhen(emitter, 'myTestEvent', (eventNumber, msg) => { + return eventNumber === 5; + }); + expect(eventNumber).toBe(5); + expect(msg).toBe('Message 5'); + + // Expect that the event listener was removed after onceWhen is finished + expect(emitter.eventNames()).toStrictEqual([]); + }); + + test('should reject if aborted immediately', async () => { + const emitter = new EventEmitter<{ + myTestEvent: [eventNumber: number]; + }>(); + const controller = new AbortController(); + const abortReason = new Error('Test aborted'); + controller.abort(abortReason); + await expect( + onceWhen(emitter, 'myTestEvent', () => true, { signal: controller.signal }) + ).rejects.toThrow(abortReason); + + // Expect that the event listener was removed after onceWhen is finished + expect(emitter.eventNames()).toStrictEqual([]); + }); + + test('should reject if aborted before event is emitted', async () => { + const emitter = new EventEmitter<{ + myTestEvent: [eventNumber: number]; + }>(); + const controller = new AbortController(); + const abortReason = new Error('Test aborted'); + // controller.abort(abortReason); + setTimeout(() => { + for (let i = 0; i <= 5; i++) { + emitter.emit('myTestEvent', i); + if (i === 3) { + controller.abort(abortReason); // Abort after emitting some events + } + } + }, 10); // Emit after a delay + + let lastEventNumberSeen = 0; + await expect( + onceWhen( + emitter, + 'myTestEvent', + eventNumber => { + lastEventNumberSeen = eventNumber; + return false; + }, + { signal: controller.signal } + ) + ).rejects.toThrow(abortReason); + + // Check that we saw events before the abort + expect(lastEventNumberSeen).toBe(3); + + // Expect that the event listener was removed after onceWhen is finished + expect(emitter.eventNames()).toStrictEqual([]); + }); + + test('should resolve if event is emitted before abort', async () => { + const emitter = new EventEmitter<{ + myTestEvent: [eventNumber: number]; + }>(); + const controller = new AbortController(); + + setTimeout(() => { + for (let i = 0; i <= 5; i++) { + emitter.emit('myTestEvent', i); + } + controller.abort(); // Abort after emitting all events + }, 10); // Emit after a delay + + const [eventNumber] = await onceWhen(emitter, 'myTestEvent', eventNumber => eventNumber === 5, { + signal: controller.signal, + }); + expect(eventNumber).toBe(5); + + // Expect that the event listener was removed after onceWhen is finished + expect(emitter.eventNames()).toStrictEqual([]); + }); + + test('should reject if predict function throws', async () => { + const emitter = new EventEmitter<{ + myTestEvent: [eventNumber: number]; + }>(); + setTimeout(() => { + for (let i = 0; i <= 5; i++) { + emitter.emit('myTestEvent', i); + } + }, 10); + + let lastEventNumberSeen = 0; + const predictFunctionError = new Error('Predict function error'); + await expect( + onceWhen(emitter, 'myTestEvent', eventNumber => { + lastEventNumberSeen = eventNumber; + if (eventNumber === 3) { + throw predictFunctionError; + } + return false; + }) + ).rejects.toThrow(predictFunctionError); + expect(lastEventNumberSeen).toBe(3); + + // Expect that the event listener was removed after onceWhen is finished + expect(emitter.eventNames()).toStrictEqual([]); + }); + + test('abort signal test', async () => { + const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>(); + const signal = AbortSignal.timeout(10); + setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000); + const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal }); + // This rejects because the signal is aborted before the event is emitted + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + await expect(whenPromise).rejects.toThrow(signal.reason); + }); +}); diff --git a/src/helpers/events.ts b/src/helpers/events.ts new file mode 100644 index 0000000..8bd8621 --- /dev/null +++ b/src/helpers/events.ts @@ -0,0 +1,103 @@ +import { EventEmitter, addAbortListener } from 'node:events'; + +// This is a workaround for Node.js versions that do not support Symbol.dispose +const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose'); + +/** + * Creates a Promise that resolves when the specified `eventName` is emitted by the `EventEmitter` + * and the provided predicate returns `true` for the emitted arguments. + * + * Similar to [`events.once`](https://nodejs.org/api/events.html#eventsonceemitter-name-options), + * but includes support for a predicate function to filter events. Only events for which + * the predicate returns `true` will cause the Promise to resolve. + * + * The resolved value is an array of the arguments emitted with the event. + * + * Supports typed `EventEmitter`s and optional cancellation via `AbortSignal`. + * + * @example + * ```ts + * import { EventEmitter } from 'node:events'; + * + * const emitter = new EventEmitter<{ + * myEvent: [id: number, msg: string]; + * }>(); + * + * setTimeout(() => { + * for (let i = 0; i <= 5; i++) { + * emitter.emit('myEvent', i, `Message ${i}`); + * } + * }, 100); + * + * const [id, msg] = await onceWhen(emitter, 'myEvent', (id, msg) => id === 3); + * + * // outputs: "Received event with id: 3, message: Message 3" + * console.log(`Received event with id: ${id}, message: ${msg}`); + * ``` + * + * @example + * ```ts + * import { EventEmitter } from 'node:events'; + * + * const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>(); + * + * const signal = AbortSignal.timeout(10); + * + * setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000); + * + * const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal }); + * + * // This rejects because the signal is aborted before the event is emitted + * await expect(whenPromise).rejects.toThrow(signal.reason); + * ``` + */ +export function onceWhen< + EventMap extends Record = Record, + K extends Extract = Extract +>( + emitter: EventEmitter, + eventName: K, + predicate: (...args: EventMap[K]) => boolean, + options?: { signal?: AbortSignal } +): Promise { + return new Promise((resolve, reject) => { + // Immediate abort check + if (options?.signal?.aborted) { + reject((options.signal.reason as Error) ?? new Error('Aborted')); + return; + } + + // Cleanup helper: remove both the event listener and the abort listener + const cleanup = () => { + // eslint-disable-next-line @typescript-eslint/no-use-before-define + (emitter as EventEmitter).off(eventName, listener); + // eslint-disable-next-line @typescript-eslint/no-use-before-define + disposable?.[DisposeSymbol](); + }; + + // Abort handler + const onAbort = () => { + cleanup(); + reject((options?.signal?.reason as Error) ?? new Error('Aborted')); + }; + + // Our event listener that checks the predicate + const listener = (...args: EventMap[K]) => { + try { + if (predicate(...args)) { + cleanup(); + resolve(args); + } + } catch (err) { + cleanup(); + reject(err as Error); + return; + } + }; + + // Install the AbortSignal listener via Node’s helper + const disposable = options?.signal ? addAbortListener(options.signal, onAbort) : undefined; + + (emitter as EventEmitter).on(eventName, listener); + }); +} diff --git a/src/helpers/index.ts b/src/helpers/index.ts index ebda5e1..06c3e1a 100644 --- a/src/helpers/index.ts +++ b/src/helpers/index.ts @@ -2,5 +2,6 @@ export * from './iterators'; export * from './time'; export * from './values'; export * from './is-debugging'; +export * from './events'; export { WorkerThreadManager } from './worker-thread-manager'; export type { WorkerPoolModuleInterface } from './worker-thread-manager';