From 9305c044ae2cb40269f3823710b234bb8bc59c06 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 10 Sep 2020 02:47:48 +0300 Subject: [PATCH] subscribe: correct Flow definitions (#2795) --- .../eventEmitterAsyncIterator-test.js | 3 +- .../__tests__/eventEmitterAsyncIterator.js | 6 +-- src/subscription/__tests__/subscribe-test.js | 40 ++++++++++++------- src/subscription/mapAsyncIterator.js | 4 +- src/subscription/subscribe.js | 4 +- 5 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js index 3632c89b42..3c475534f9 100644 --- a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js +++ b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js @@ -7,7 +7,7 @@ import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; describe('eventEmitterAsyncIterator', () => { it('subscribe async-iterator mock', async () => { - // Create an AsyncIterator from an EventEmitter + // Create an AsyncGenerator from an EventEmitter const emitter = new EventEmitter(); const iterator = eventEmitterAsyncIterator(emitter, 'publish'); @@ -41,7 +41,6 @@ describe('eventEmitterAsyncIterator', () => { const i5 = iterator.next().then((x) => x); // Terminate emitter - // $FlowFixMe[prop-missing] await iterator.return(); // Publish is not caught after terminate diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator.js b/src/subscription/__tests__/eventEmitterAsyncIterator.js index 47f5ff49b0..2455411e5e 100644 --- a/src/subscription/__tests__/eventEmitterAsyncIterator.js +++ b/src/subscription/__tests__/eventEmitterAsyncIterator.js @@ -7,7 +7,7 @@ import type EventEmitter from 'events'; export default function eventEmitterAsyncIterator( eventEmitter: EventEmitter, eventName: string, -): AsyncIterator { +): AsyncGenerator { const pullQueue = []; const pushQueue = []; let listening = true; @@ -44,7 +44,7 @@ export default function eventEmitterAsyncIterator( } /* TODO: Flow doesn't support symbols as keys: - https://github.com/facebook/flow/issues/3258 */ + https://github.com/facebook/flow/issues/3258 */ return ({ next() { return listening ? pullValue() : this.return(); @@ -60,5 +60,5 @@ export default function eventEmitterAsyncIterator( [Symbol.asyncIterator]() { return this; }, - }: any); + }: $FlowFixMe); } diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 13b7a00361..8dac86928b 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -5,6 +5,9 @@ import { describe, it } from 'mocha'; import resolveOnNextTick from '../../__testUtils__/resolveOnNextTick'; +import invariant from '../../jsutils/invariant'; +import isAsyncIterable from '../../jsutils/isAsyncIterable'; + import type { DocumentNode } from '../../language/ast'; import { parse } from '../../language/parser'; @@ -130,11 +133,8 @@ async function createSubscription( } // `subscribe` returns Promise - return { - sendImportantEmail, - // $FlowFixMe[incompatible-call] - subscription: await subscribe({ schema, document, rootValue: data }), - }; + const subscription = await subscribe({ schema, document, rootValue: data }); + return { sendImportantEmail, subscription }; } async function expectPromiseToThrow( @@ -192,6 +192,7 @@ describe('Subscription Initialization Phase', () => { pubsub, testSchema, ); + invariant(isAsyncIterable(subscription)); sendImportantEmail({ from: 'yuzhi@graphql.org', @@ -219,7 +220,6 @@ describe('Subscription Initialization Phase', () => { }), }); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema, document: parse(` @@ -228,6 +228,7 @@ describe('Subscription Initialization Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); pubsub.emit('importantEmail', { importantEmail: {}, @@ -254,7 +255,6 @@ describe('Subscription Initialization Phase', () => { }), }); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema, document: parse(` @@ -263,6 +263,7 @@ describe('Subscription Initialization Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); expect(subscription).to.have.property('next'); @@ -303,7 +304,6 @@ describe('Subscription Initialization Phase', () => { subscription: SubscriptionTypeMultiple, }); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema, document: parse(` @@ -313,6 +313,7 @@ describe('Subscription Initialization Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); subscription.next(); // Ask for a result, but ignore it. @@ -367,7 +368,6 @@ describe('Subscription Initialization Phase', () => { const pubsub = new EventEmitter(); const { subscription } = await createSubscription(pubsub, emailSchema, ast); - expect(subscription).to.deep.equal({ errors: [ { @@ -552,10 +552,13 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); - const second = await createSubscription(pubsub); + invariant(isAsyncIterable(subscription)); + + const secondSubscription = (await createSubscription(pubsub)).subscription; + invariant(isAsyncIterable(secondSubscription)); const payload1 = subscription.next(); - const payload2 = second.subscription.next(); + const payload2 = secondSubscription.next(); expect( sendImportantEmail({ @@ -593,6 +596,7 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); + invariant(isAsyncIterable(subscription)); // Wait for the next subscription payload. const payload = subscription.next(); @@ -683,6 +687,8 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); + invariant(isAsyncIterable(subscription)); + let payload = subscription.next(); // A new email arrives! @@ -749,6 +755,8 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); + invariant(isAsyncIterable(subscription)); + let payload = subscription.next(); // A new email arrives! @@ -803,6 +811,8 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); + invariant(isAsyncIterable(subscription)); + let payload = subscription.next(); // A new email arrives! @@ -865,6 +875,8 @@ describe('Subscription Publish Phase', () => { const { sendImportantEmail, subscription } = await createSubscription( pubsub, ); + invariant(isAsyncIterable(subscription)); + let payload = subscription.next(); // A new email arrives! @@ -941,7 +953,6 @@ describe('Subscription Publish Phase', () => { }, ); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema: erroringEmailSchema, document: parse(` @@ -954,6 +965,7 @@ describe('Subscription Publish Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); const payload1 = await subscription.next(); expect(payload1).to.deep.equal({ @@ -1013,7 +1025,6 @@ describe('Subscription Publish Phase', () => { (email) => email, ); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema: erroringEmailSchema, document: parse(` @@ -1026,6 +1037,7 @@ describe('Subscription Publish Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); const payload1 = await subscription.next(); expect(payload1).to.deep.equal({ @@ -1067,7 +1079,6 @@ describe('Subscription Publish Phase', () => { (email) => email, ); - // $FlowFixMe[incompatible-call] const subscription = await subscribe({ schema: erroringEmailSchema, document: parse(` @@ -1080,6 +1091,7 @@ describe('Subscription Publish Phase', () => { } `), }); + invariant(isAsyncIterable(subscription)); const payload1 = await subscription.next(); expect(payload1).to.deep.equal({ diff --git a/src/subscription/mapAsyncIterator.js b/src/subscription/mapAsyncIterator.js index b6247f5d0b..8ab691d391 100644 --- a/src/subscription/mapAsyncIterator.js +++ b/src/subscription/mapAsyncIterator.js @@ -7,7 +7,7 @@ import type { PromiseOrValue } from '../jsutils/PromiseOrValue'; * which produces values mapped via calling the callback function. */ export default function mapAsyncIterator( - iterable: AsyncIterable, + iterable: AsyncIterable | AsyncGenerator, callback: (T) => PromiseOrValue, rejectCallback?: (any) => PromiseOrValue, ): AsyncGenerator { @@ -58,7 +58,7 @@ export default function mapAsyncIterator( [SYMBOL_ASYNC_ITERATOR]() { return this; }, - }: any); + }: $FlowFixMe); } function asyncMapValue( diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 9a46de1968..3a20d23ab1 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -60,7 +60,7 @@ export type SubscriptionArgs = {| declare function subscribe( SubscriptionArgs, ..._: [] -): Promise | ExecutionResult>; +): Promise | ExecutionResult>; /* eslint-disable no-redeclare */ declare function subscribe( schema: GraphQLSchema, @@ -112,7 +112,7 @@ function reportGraphQLError(error: mixed): ExecutionResult { function subscribeImpl( args: SubscriptionArgs, -): Promise | ExecutionResult> { +): Promise | ExecutionResult> { const { schema, document,