From 156c76ed77dc0d9b2dc3c988264d44763e8334aa Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 10 Sep 2020 10:41:55 +0300 Subject: [PATCH] subscribe-test: remove dependency on Node's EventEmitter (#2796) --- .../__tests__/eventEmitterAsyncIterator.js | 64 ------- ...cIterator-test.js => simplePubSub-test.js} | 23 ++- src/subscription/__tests__/simplePubSub.js | 70 ++++++++ src/subscription/__tests__/subscribe-test.js | 164 ++++++++---------- 4 files changed, 151 insertions(+), 170 deletions(-) delete mode 100644 src/subscription/__tests__/eventEmitterAsyncIterator.js rename src/subscription/__tests__/{eventEmitterAsyncIterator-test.js => simplePubSub-test.js} (63%) create mode 100644 src/subscription/__tests__/simplePubSub.js diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator.js b/src/subscription/__tests__/eventEmitterAsyncIterator.js deleted file mode 100644 index 2455411e5e..0000000000 --- a/src/subscription/__tests__/eventEmitterAsyncIterator.js +++ /dev/null @@ -1,64 +0,0 @@ -import type EventEmitter from 'events'; - -/** - * Create an AsyncIterator from an EventEmitter. Useful for mocking a - * PubSub system for tests. - */ -export default function eventEmitterAsyncIterator( - eventEmitter: EventEmitter, - eventName: string, -): AsyncGenerator { - const pullQueue = []; - const pushQueue = []; - let listening = true; - eventEmitter.addListener(eventName, pushValue); - - function pushValue(event: mixed) { - if (pullQueue.length !== 0) { - pullQueue.shift()({ value: event, done: false }); - } else { - pushQueue.push(event); - } - } - - function pullValue() { - return new Promise((resolve) => { - if (pushQueue.length !== 0) { - resolve({ value: pushQueue.shift(), done: false }); - } else { - pullQueue.push(resolve); - } - }); - } - - function emptyQueue() { - if (listening) { - listening = false; - eventEmitter.removeListener(eventName, pushValue); - for (const resolve of pullQueue) { - resolve({ value: undefined, done: true }); - } - pullQueue.length = 0; - pushQueue.length = 0; - } - } - - /* TODO: Flow doesn't support symbols as keys: - https://github.com/facebook/flow/issues/3258 */ - return ({ - next() { - return listening ? pullValue() : this.return(); - }, - return() { - emptyQueue(); - return Promise.resolve({ value: undefined, done: true }); - }, - throw(error: mixed) { - emptyQueue(); - return Promise.reject(error); - }, - [Symbol.asyncIterator]() { - return this; - }, - }: $FlowFixMe); -} diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js b/src/subscription/__tests__/simplePubSub-test.js similarity index 63% rename from src/subscription/__tests__/eventEmitterAsyncIterator-test.js rename to src/subscription/__tests__/simplePubSub-test.js index 3c475534f9..d92339687b 100644 --- a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js +++ b/src/subscription/__tests__/simplePubSub-test.js @@ -1,19 +1,16 @@ -import EventEmitter from 'events'; - import { expect } from 'chai'; import { describe, it } from 'mocha'; -import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; +import SimplePubSub from './simplePubSub'; -describe('eventEmitterAsyncIterator', () => { +describe('SimplePubSub', () => { it('subscribe async-iterator mock', async () => { - // Create an AsyncGenerator from an EventEmitter - const emitter = new EventEmitter(); - const iterator = eventEmitterAsyncIterator(emitter, 'publish'); + const pubsub = new SimplePubSub(); + const iterator = pubsub.getSubscriber(); // Queue up publishes - expect(emitter.emit('publish', 'Apple')).to.equal(true); - expect(emitter.emit('publish', 'Banana')).to.equal(true); + expect(pubsub.emit('Apple')).to.equal(true); + expect(pubsub.emit('Banana')).to.equal(true); // Read payloads expect(await iterator.next()).to.deep.equal({ @@ -30,8 +27,8 @@ describe('eventEmitterAsyncIterator', () => { const i4 = iterator.next().then((x) => x); // Publish - expect(emitter.emit('publish', 'Coconut')).to.equal(true); - expect(emitter.emit('publish', 'Durian')).to.equal(true); + expect(pubsub.emit('Coconut')).to.equal(true); + expect(pubsub.emit('Durian')).to.equal(true); // Await out of order to get correct results expect(await i4).to.deep.equal({ done: false, value: 'Durian' }); @@ -40,11 +37,11 @@ describe('eventEmitterAsyncIterator', () => { // Read ahead const i5 = iterator.next().then((x) => x); - // Terminate emitter + // Terminate queue await iterator.return(); // Publish is not caught after terminate - expect(emitter.emit('publish', 'Fig')).to.equal(false); + expect(pubsub.emit('Fig')).to.equal(false); // Find that cancelled read-ahead got a "done" result expect(await i5).to.deep.equal({ done: true, value: undefined }); diff --git a/src/subscription/__tests__/simplePubSub.js b/src/subscription/__tests__/simplePubSub.js new file mode 100644 index 0000000000..e12c93d0b9 --- /dev/null +++ b/src/subscription/__tests__/simplePubSub.js @@ -0,0 +1,70 @@ +/** + * Create an AsyncIterator from an EventEmitter. Useful for mocking a + * PubSub system for tests. + */ +export default class SimplePubSub { + _subscribers: Set<(T) => void>; + + constructor() { + this._subscribers = new Set(); + } + + emit(event: T): boolean { + for (const subscriber of this._subscribers) { + subscriber(event); + } + return this._subscribers.size > 0; + } + + getSubscriber(transform?: (T) => R): AsyncGenerator { + const pullQueue = []; + const pushQueue = []; + let listening = true; + this._subscribers.add(pushValue); + + const emptyQueue = () => { + listening = false; + this._subscribers.delete(pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + }; + + /* TODO: Flow doesn't support symbols as keys: + https://github.com/facebook/flow/issues/3258 */ + return ({ + next() { + if (!listening) { + return Promise.resolve({ value: undefined, done: true }); + } + + if (pushQueue.length > 0) { + return Promise.resolve({ value: pushQueue.shift(), done: false }); + } + return new Promise((resolve) => pullQueue.push(resolve)); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: mixed) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }: any); + + function pushValue(event: T): void { + const value = transform != null ? transform(event) : event; + if (pullQueue.length > 0) { + pullQueue.shift()({ value, done: false }); + } else { + pushQueue.push(value); + } + } + } +} diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 8dac86928b..5df245c3e7 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -1,5 +1,3 @@ -import EventEmitter from 'events'; - import { expect } from 'chai'; import { describe, it } from 'mocha'; @@ -19,7 +17,14 @@ import { GraphQLInt, GraphQLString, GraphQLBoolean } from '../../type/scalars'; import { createSourceEventStream, subscribe } from '../subscribe'; -import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; +import SimplePubSub from './simplePubSub'; + +type Email = {| + from: string, + subject: string, + message: string, + unread: boolean, +|}; const EmailType = new GraphQLObjectType({ name: 'Email', @@ -100,41 +105,35 @@ const defaultSubscriptionAST = parse(` } `); -async function createSubscription( - pubsub: EventEmitter, +function createSubscription( + pubsub: SimplePubSub, schema: GraphQLSchema = emailSchema, document: DocumentNode = defaultSubscriptionAST, ) { + const emails = [ + { + from: 'joe@graphql.org', + subject: 'Hello', + message: 'Hello World', + unread: false, + }, + ]; + const data = { - inbox: { - emails: [ - { - from: 'joe@graphql.org', - subject: 'Hello', - message: 'Hello World', - unread: false, + inbox: { emails }, + importantEmail: pubsub.getSubscriber((newEmail) => { + emails.push(newEmail); + + return { + importantEmail: { + email: newEmail, + inbox: data.inbox, }, - ], - }, - importantEmail() { - return eventEmitterAsyncIterator(pubsub, 'importantEmail'); - }, + }; + }), }; - function sendImportantEmail(newEmail: mixed) { - data.inbox.emails.push(newEmail); - // Returns true if the event was consumed by a subscriber. - return pubsub.emit('importantEmail', { - importantEmail: { - email: newEmail, - inbox: data.inbox, - }, - }); - } - - // `subscribe` returns Promise - const subscription = await subscribe({ schema, document, rootValue: data }); - return { sendImportantEmail, subscription }; + return subscribe({ schema, document, rootValue: data }); } async function expectPromiseToThrow( @@ -174,7 +173,7 @@ describe('Subscription Initialization Phase', () => { }); it('accepts multiple subscription fields defined in schema', async () => { - const pubsub = new EventEmitter(); + const pubsub = new SimplePubSub(); const SubscriptionTypeMultiple = new GraphQLObjectType({ name: 'Subscription', fields: { @@ -188,13 +187,10 @@ describe('Subscription Initialization Phase', () => { subscription: SubscriptionTypeMultiple, }); - const { subscription, sendImportantEmail } = await createSubscription( - pubsub, - testSchema, - ); + const subscription = await createSubscription(pubsub, testSchema); invariant(isAsyncIterable(subscription)); - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -205,7 +201,7 @@ describe('Subscription Initialization Phase', () => { }); it('accepts type definition with sync subscribe function', async () => { - const pubsub = new EventEmitter(); + const pubsub = new SimplePubSub(); const schema = new GraphQLSchema({ query: QueryType, subscription: new GraphQLObjectType({ @@ -213,8 +209,7 @@ describe('Subscription Initialization Phase', () => { fields: { importantEmail: { type: GraphQLString, - subscribe: () => - eventEmitterAsyncIterator(pubsub, 'importantEmail'), + subscribe: () => pubsub.getSubscriber(), }, }, }), @@ -230,15 +225,13 @@ describe('Subscription Initialization Phase', () => { }); invariant(isAsyncIterable(subscription)); - pubsub.emit('importantEmail', { - importantEmail: {}, - }); + pubsub.emit({ importantEmail: {} }); await subscription.next(); }); it('accepts type definition with async subscribe function', async () => { - const pubsub = new EventEmitter(); + const pubsub = new SimplePubSub(); const schema = new GraphQLSchema({ query: QueryType, subscription: new GraphQLObjectType({ @@ -248,7 +241,7 @@ describe('Subscription Initialization Phase', () => { type: GraphQLString, subscribe: async () => { await resolveOnNextTick(); - return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + return pubsub.getSubscriber(); }, }, }, @@ -267,10 +260,7 @@ describe('Subscription Initialization Phase', () => { expect(subscription).to.have.property('next'); - pubsub.emit('importantEmail', { - importantEmail: {}, - }); - + pubsub.emit({ importantEmail: {} }); await subscription.next(); }); @@ -285,7 +275,7 @@ describe('Subscription Initialization Phase', () => { type: EmailEventType, subscribe() { didResolveImportantEmail = true; - return eventEmitterAsyncIterator(new EventEmitter(), 'event'); + return new SimplePubSub().getSubscriber(); }, }, nonImportantEmail: { @@ -293,7 +283,7 @@ describe('Subscription Initialization Phase', () => { // istanbul ignore next (Shouldn't be called) subscribe() { didResolveNonImportantEmail = true; - return eventEmitterAsyncIterator(new EventEmitter(), 'event'); + return new SimplePubSub().getSubscriber(); }, }, }, @@ -365,9 +355,9 @@ describe('Subscription Initialization Phase', () => { } `); - const pubsub = new EventEmitter(); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub, emailSchema, ast); - const { subscription } = await createSubscription(pubsub, emailSchema, ast); expect(subscription).to.deep.equal({ errors: [ { @@ -403,7 +393,7 @@ describe('Subscription Initialization Phase', () => { }), }); - const pubsub = new EventEmitter(); + const pubsub = new SimplePubSub(); await expectPromiseToThrow( () => createSubscription(pubsub, invalidEmailSchema), @@ -544,24 +534,22 @@ describe('Subscription Initialization Phase', () => { }); }); -// Once a subscription returns a valid AsyncIterator, it can still yield -// errors. +// Once a subscription returns a valid AsyncIterator, it can still yield errors. describe('Subscription Publish Phase', () => { it('produces a payload for multiple subscribe in same subscription', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); - const secondSubscription = (await createSubscription(pubsub)).subscription; + const secondSubscription = await createSubscription(pubsub); invariant(isAsyncIterable(secondSubscription)); const payload1 = subscription.next(); const payload2 = secondSubscription.next(); expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -592,10 +580,8 @@ describe('Subscription Publish Phase', () => { }); it('produces a payload per subscription event', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); // Wait for the next subscription payload. @@ -603,7 +589,7 @@ describe('Subscription Publish Phase', () => { // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -632,7 +618,7 @@ describe('Subscription Publish Phase', () => { // Another new email arrives, before subscription.next() is called. expect( - sendImportantEmail({ + pubsub.emit({ from: 'hyo@graphql.org', subject: 'Tools', message: 'I <3 making things', @@ -667,7 +653,7 @@ describe('Subscription Publish Phase', () => { // Which may result in disconnecting upstream services as well. expect( - sendImportantEmail({ + pubsub.emit({ from: 'adam@graphql.org', subject: 'Important', message: 'Read me please', @@ -683,17 +669,15 @@ describe('Subscription Publish Phase', () => { }); it('produces a payload when there are multiple events', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); let payload = subscription.next(); // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -723,7 +707,7 @@ describe('Subscription Publish Phase', () => { // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright 2', message: 'Tests are good 2', @@ -751,17 +735,15 @@ describe('Subscription Publish Phase', () => { }); it('should not trigger when subscription is already done', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); let payload = subscription.next(); // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -792,7 +774,7 @@ describe('Subscription Publish Phase', () => { // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright 2', message: 'Tests are good 2', @@ -807,17 +789,15 @@ describe('Subscription Publish Phase', () => { }); it('should not trigger when subscription is thrown', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); let payload = subscription.next(); // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright', message: 'Tests are good', @@ -856,7 +836,7 @@ describe('Subscription Publish Phase', () => { // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Alright 2', message: 'Tests are good 2', @@ -871,17 +851,15 @@ describe('Subscription Publish Phase', () => { }); it('event order is correct for multiple publishes', async () => { - const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = await createSubscription( - pubsub, - ); + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub); invariant(isAsyncIterable(subscription)); let payload = subscription.next(); // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Message', message: 'Tests are good', @@ -891,7 +869,7 @@ describe('Subscription Publish Phase', () => { // A new email arrives! expect( - sendImportantEmail({ + pubsub.emit({ from: 'yuzhi@graphql.org', subject: 'Message 2', message: 'Tests are good 2',