Skip to content

Commit

Permalink
subscribe: correct Flow definitions (graphql#2795)
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanGoncharov authored Sep 9, 2020
1 parent 35f6df8 commit 9305c04
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 23 deletions.
3 changes: 1 addition & 2 deletions src/subscription/__tests__/eventEmitterAsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import eventEmitterAsyncIterator from './eventEmitterAsyncIterator';

describe('eventEmitterAsyncIterator', () => {
it('subscribe async-iterator mock', async () => {
// Create an AsyncIterator from an EventEmitter
// Create an AsyncGenerator from an EventEmitter
const emitter = new EventEmitter();
const iterator = eventEmitterAsyncIterator(emitter, 'publish');

Expand Down Expand Up @@ -41,7 +41,6 @@ describe('eventEmitterAsyncIterator', () => {
const i5 = iterator.next().then((x) => x);

// Terminate emitter
// $FlowFixMe[prop-missing]
await iterator.return();

// Publish is not caught after terminate
Expand Down
6 changes: 3 additions & 3 deletions src/subscription/__tests__/eventEmitterAsyncIterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type EventEmitter from 'events';
export default function eventEmitterAsyncIterator(
eventEmitter: EventEmitter,
eventName: string,
): AsyncIterator<mixed> {
): AsyncGenerator<mixed, void, void> {
const pullQueue = [];
const pushQueue = [];
let listening = true;
Expand Down Expand Up @@ -44,7 +44,7 @@ export default function eventEmitterAsyncIterator(
}

/* TODO: Flow doesn't support symbols as keys:
https://github.com/facebook/flow/issues/3258 */
https://github.com/facebook/flow/issues/3258 */
return ({
next() {
return listening ? pullValue() : this.return();
Expand All @@ -60,5 +60,5 @@ export default function eventEmitterAsyncIterator(
[Symbol.asyncIterator]() {
return this;
},
}: any);
}: $FlowFixMe);
}
40 changes: 26 additions & 14 deletions src/subscription/__tests__/subscribe-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import { describe, it } from 'mocha';

import resolveOnNextTick from '../../__testUtils__/resolveOnNextTick';

import invariant from '../../jsutils/invariant';
import isAsyncIterable from '../../jsutils/isAsyncIterable';

import type { DocumentNode } from '../../language/ast';
import { parse } from '../../language/parser';

Expand Down Expand Up @@ -130,11 +133,8 @@ async function createSubscription(
}

// `subscribe` returns Promise<AsyncIterator | ExecutionResult>
return {
sendImportantEmail,
// $FlowFixMe[incompatible-call]
subscription: await subscribe({ schema, document, rootValue: data }),
};
const subscription = await subscribe({ schema, document, rootValue: data });
return { sendImportantEmail, subscription };
}

async function expectPromiseToThrow(
Expand Down Expand Up @@ -192,6 +192,7 @@ describe('Subscription Initialization Phase', () => {
pubsub,
testSchema,
);
invariant(isAsyncIterable(subscription));

sendImportantEmail({
from: '[email protected]',
Expand Down Expand Up @@ -219,7 +220,6 @@ describe('Subscription Initialization Phase', () => {
}),
});

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema,
document: parse(`
Expand All @@ -228,6 +228,7 @@ describe('Subscription Initialization Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

pubsub.emit('importantEmail', {
importantEmail: {},
Expand All @@ -254,7 +255,6 @@ describe('Subscription Initialization Phase', () => {
}),
});

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema,
document: parse(`
Expand All @@ -263,6 +263,7 @@ describe('Subscription Initialization Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

expect(subscription).to.have.property('next');

Expand Down Expand Up @@ -303,7 +304,6 @@ describe('Subscription Initialization Phase', () => {
subscription: SubscriptionTypeMultiple,
});

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema,
document: parse(`
Expand All @@ -313,6 +313,7 @@ describe('Subscription Initialization Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

subscription.next(); // Ask for a result, but ignore it.

Expand Down Expand Up @@ -367,7 +368,6 @@ describe('Subscription Initialization Phase', () => {
const pubsub = new EventEmitter();

const { subscription } = await createSubscription(pubsub, emailSchema, ast);

expect(subscription).to.deep.equal({
errors: [
{
Expand Down Expand Up @@ -552,10 +552,13 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
const second = await createSubscription(pubsub);
invariant(isAsyncIterable(subscription));

const secondSubscription = (await createSubscription(pubsub)).subscription;
invariant(isAsyncIterable(secondSubscription));

const payload1 = subscription.next();
const payload2 = second.subscription.next();
const payload2 = secondSubscription.next();

expect(
sendImportantEmail({
Expand Down Expand Up @@ -593,6 +596,7 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
invariant(isAsyncIterable(subscription));

// Wait for the next subscription payload.
const payload = subscription.next();
Expand Down Expand Up @@ -683,6 +687,8 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
invariant(isAsyncIterable(subscription));

let payload = subscription.next();

// A new email arrives!
Expand Down Expand Up @@ -749,6 +755,8 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
invariant(isAsyncIterable(subscription));

let payload = subscription.next();

// A new email arrives!
Expand Down Expand Up @@ -803,6 +811,8 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
invariant(isAsyncIterable(subscription));

let payload = subscription.next();

// A new email arrives!
Expand Down Expand Up @@ -865,6 +875,8 @@ describe('Subscription Publish Phase', () => {
const { sendImportantEmail, subscription } = await createSubscription(
pubsub,
);
invariant(isAsyncIterable(subscription));

let payload = subscription.next();

// A new email arrives!
Expand Down Expand Up @@ -941,7 +953,6 @@ describe('Subscription Publish Phase', () => {
},
);

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema: erroringEmailSchema,
document: parse(`
Expand All @@ -954,6 +965,7 @@ describe('Subscription Publish Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

const payload1 = await subscription.next();
expect(payload1).to.deep.equal({
Expand Down Expand Up @@ -1013,7 +1025,6 @@ describe('Subscription Publish Phase', () => {
(email) => email,
);

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema: erroringEmailSchema,
document: parse(`
Expand All @@ -1026,6 +1037,7 @@ describe('Subscription Publish Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

const payload1 = await subscription.next();
expect(payload1).to.deep.equal({
Expand Down Expand Up @@ -1067,7 +1079,6 @@ describe('Subscription Publish Phase', () => {
(email) => email,
);

// $FlowFixMe[incompatible-call]
const subscription = await subscribe({
schema: erroringEmailSchema,
document: parse(`
Expand All @@ -1080,6 +1091,7 @@ describe('Subscription Publish Phase', () => {
}
`),
});
invariant(isAsyncIterable(subscription));

const payload1 = await subscription.next();
expect(payload1).to.deep.equal({
Expand Down
4 changes: 2 additions & 2 deletions src/subscription/mapAsyncIterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { PromiseOrValue } from '../jsutils/PromiseOrValue';
* which produces values mapped via calling the callback function.
*/
export default function mapAsyncIterator<T, U>(
iterable: AsyncIterable<T>,
iterable: AsyncIterable<T> | AsyncGenerator<T, void, void>,
callback: (T) => PromiseOrValue<U>,
rejectCallback?: (any) => PromiseOrValue<U>,
): AsyncGenerator<U, void, void> {
Expand Down Expand Up @@ -58,7 +58,7 @@ export default function mapAsyncIterator<T, U>(
[SYMBOL_ASYNC_ITERATOR]() {
return this;
},
}: any);
}: $FlowFixMe);
}

function asyncMapValue<T, U>(
Expand Down
4 changes: 2 additions & 2 deletions src/subscription/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export type SubscriptionArgs = {|
declare function subscribe(
SubscriptionArgs,
..._: []
): Promise<AsyncIterator<ExecutionResult> | ExecutionResult>;
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult>;
/* eslint-disable no-redeclare */
declare function subscribe(
schema: GraphQLSchema,
Expand Down Expand Up @@ -112,7 +112,7 @@ function reportGraphQLError(error: mixed): ExecutionResult {

function subscribeImpl(
args: SubscriptionArgs,
): Promise<AsyncIterator<ExecutionResult> | ExecutionResult> {
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
const {
schema,
document,
Expand Down

0 comments on commit 9305c04

Please sign in to comment.