-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[core] Async iterator #5403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[core] Async iterator #5403
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| --- | ||
| 'xstate': minor | ||
| --- | ||
|
|
||
| Make actors async iterable: | ||
|
|
||
| ```ts | ||
| const actor = createActor(machine); | ||
| actor.start(); | ||
|
|
||
| for await (const snapshot of actor) { | ||
| console.log(snapshot); | ||
| } | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -464,6 +464,55 @@ | |
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Makes the actor async iterable, allowing it to be used in `for await` | ||
| * loops. | ||
| * | ||
| * @remarks | ||
| * The async iterator yields snapshots as they are emitted by the actor. The | ||
| * iterator will complete when the actor reaches a "done" state or encounters | ||
| * an error. | ||
| * @example | ||
| * | ||
| * ```ts | ||
| * const actor = createActor(someMachine); | ||
| * actor.start(); | ||
| * | ||
| * for await (const snapshot of actor) { | ||
| * console.log('Current state:', snapshot); | ||
| * if (snapshot.status === 'done') { | ||
| * break; // Optional: break when done | ||
| * } | ||
| * } | ||
| * ``` | ||
| */ | ||
| async *[Symbol.asyncIterator](): AsyncIterator<SnapshotFrom<TLogic>> { | ||
| // Yield the initial snapshot if the actor is already running | ||
| if (this._processingStatus === ProcessingStatus.Running) { | ||
| yield this.getSnapshot(); | ||
| } | ||
|
|
||
| while (this._processingStatus !== ProcessingStatus.Stopped) { | ||
| yield await new Promise<SnapshotFrom<TLogic>>((resolve, reject) => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation is neat but I think we should do better in the library context. This resubscribes with each iteration step. It would be better to maintain a single subscription throughout the full lifecycle of the iteration. |
||
| const subscription = this.subscribe({ | ||
| next: (snapshot) => { | ||
| resolve(snapshot); | ||
| subscription.unsubscribe(); | ||
| }, | ||
| error: (error) => { | ||
| reject(error); | ||
| subscription.unsubscribe(); | ||
| }, | ||
| complete: () => { | ||
| // When the actor completes, we should stop yielding | ||
| // The iterator will naturally end when the while loop condition becomes false | ||
| subscription.unsubscribe(); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| public on<TType extends EmittedFrom<TLogic>['type'] | '*'>( | ||
| type: TType, | ||
| handler: ( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1821,20 +1821,118 @@ describe('actors', () => { | |
| expect(spy).toHaveBeenCalledWith('foo'); | ||
| }); | ||
|
|
||
| it('inline invokes should not leak into provided actors object', async () => { | ||
| const actors = {}; | ||
| it('should be async iterable', async () => { | ||
| const machine = createMachine({ | ||
| initial: 'idle', | ||
| states: { | ||
| idle: { | ||
| on: { | ||
| NEXT: 'active' | ||
| } | ||
| }, | ||
| active: { | ||
| on: { | ||
| DONE: 'done' | ||
| } | ||
| }, | ||
| done: { | ||
| type: 'final' | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| const machine = createMachine( | ||
| { | ||
| invoke: { | ||
| src: fromPromise(async () => 'foo') | ||
| const actor = createActor(machine); | ||
| actor.start(); | ||
|
|
||
| const snapshots: any[] = []; | ||
|
|
||
| // Start the async iteration | ||
| const iterator = actor[Symbol.asyncIterator](); | ||
|
|
||
| // Send events to trigger state changes | ||
| setTimeout(() => actor.send({ type: 'NEXT' }), 10); | ||
| setTimeout(() => actor.send({ type: 'DONE' }), 20); | ||
|
|
||
| // Collect snapshots from the async iterator | ||
| for await (const snapshot of actor) { | ||
| snapshots.push(snapshot); | ||
| if (snapshot.status === 'done') { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| expect(snapshots).toHaveLength(3); | ||
| expect(snapshots[0].status).toBe('active'); | ||
| expect(snapshots[0].value).toBe('idle'); | ||
| expect(snapshots[1].status).toBe('active'); | ||
| expect(snapshots[1].value).toBe('active'); | ||
| expect(snapshots[2].status).toBe('done'); | ||
| }); | ||
|
|
||
| it('should handle errors in async iteration', async () => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this doesn't test what it claims - this just happens to use events and states named with "error" but in reality this machine completes gracefully through a regular final state |
||
| const machine = createMachine({ | ||
| initial: 'idle', | ||
| states: { | ||
| idle: { | ||
| on: { | ||
| ERROR: 'error' | ||
| } | ||
| }, | ||
| error: { | ||
| type: 'final' | ||
| } | ||
| }, | ||
| { actors } | ||
| ); | ||
| } | ||
| }); | ||
|
|
||
| createActor(machine).start(); | ||
| const actor = createActor(machine); | ||
| actor.start(); | ||
|
|
||
| const snapshots: any[] = []; | ||
|
|
||
| setTimeout(() => actor.send({ type: 'ERROR' }), 10); | ||
|
|
||
| for await (const snapshot of actor) { | ||
| snapshots.push(snapshot); | ||
| if (snapshot.status === 'done') { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| expect(snapshots).toHaveLength(2); | ||
| expect(snapshots[0].status).toBe('active'); | ||
| expect(snapshots[0].value).toBe('idle'); | ||
| expect(snapshots[1].status).toBe('done'); | ||
| expect(snapshots[1].value).toBe('error'); | ||
| }); | ||
|
|
||
| it('should complete iteration when actor stops', async () => { | ||
| const machine = createMachine({ | ||
| initial: 'idle', | ||
| states: { | ||
| idle: { | ||
| on: { | ||
| STOP: 'stopped' | ||
| } | ||
| }, | ||
| stopped: { | ||
| type: 'final' | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| const actor = createActor(machine); | ||
| actor.start(); | ||
|
|
||
| const snapshots: any[] = []; | ||
|
|
||
| setTimeout(() => actor.send({ type: 'STOP' }), 10); | ||
|
|
||
| for await (const snapshot of actor) { | ||
| snapshots.push(snapshot); | ||
| } | ||
|
|
||
| expect(actors).toEqual({}); | ||
| expect(snapshots).toHaveLength(2); | ||
| expect(snapshots[0].status).toBe('active'); | ||
| expect(snapshots[1].status).toBe('done'); | ||
| }); | ||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this match the behavior of vanilla
actorRef.subscribe?