diff --git a/.changeset/shared-sync-cloneable-abort.md b/.changeset/shared-sync-cloneable-abort.md new file mode 100644 index 000000000..2e2ffd3e6 --- /dev/null +++ b/.changeset/shared-sync-cloneable-abort.md @@ -0,0 +1,5 @@ +--- +"@powersync/web": patch +--- + +Use a cloneable abort reason when closing shared sync client ports to avoid DataCloneError across Comlink. diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 5c4f5b683..c3979a3fb 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -1,5 +1,4 @@ import { - AbortOperation, BaseObserver, ConnectionManager, createLogger, @@ -313,11 +312,10 @@ export class SharedSyncImplementation extends BaseObserver { if (abortController?.activePort == port) { - abortController!.controller.abort( - new AbortOperation('Closing pending requests after client port is removed') - ); + abortController!.controller.abort(abortReason); } }); diff --git a/packages/web/tests/shared_sync_abort_reason.test.ts b/packages/web/tests/shared_sync_abort_reason.test.ts new file mode 100644 index 000000000..0ca5ab339 --- /dev/null +++ b/packages/web/tests/shared_sync_abort_reason.test.ts @@ -0,0 +1,96 @@ +import * as Comlink from 'comlink'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { SharedSyncImplementation, type WrappedSyncPort } from '../src/worker/sync/SharedSyncImplementation'; +import type { AbstractSharedSyncClientProvider } from '../src/worker/sync/AbstractSharedSyncClientProvider'; + +describe('Shared sync abort reasons', { sequential: true }, () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('uses a cloneable abort reason when closing a client port', async () => { + const implementation = new SharedSyncImplementation(); + const abortController = new AbortController(); + const { port1 } = new MessageChannel(); + + const clientProvider = { + fetchCredentials: vi.fn(async () => null), + invalidateCredentials: vi.fn(), + uploadCrud: vi.fn(async () => {}), + statusChanged: vi.fn(), + getDBWorkerPort: vi.fn(async () => port1), + trace: vi.fn(), + debug: vi.fn(), + info: vi.fn(), + log: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + time: vi.fn(), + timeEnd: vi.fn(), + [Comlink.releaseProxy]: vi.fn() + } as unknown as Comlink.Remote; + + const wrappedPort = { + port: port1, + clientProvider, + currentSubscriptions: [], + closeListeners: [] + } satisfies WrappedSyncPort; + + (implementation as any).ports.push(wrappedPort); + (implementation as any).fetchCredentialsController = { + controller: abortController, + activePort: wrappedPort + }; + + const abortSpy = vi.spyOn(abortController, 'abort'); + await implementation.removePort(wrappedPort); + + expect(abortSpy).toHaveBeenCalled(); + const reason = abortSpy.mock.calls[0]?.[0]; + expect(typeof reason).toBe('string'); + }); + + it('uses a cloneable abort reason when closing a client port with uploads in-flight', async () => { + const implementation = new SharedSyncImplementation(); + const abortController = new AbortController(); + const { port1 } = new MessageChannel(); + + const clientProvider = { + fetchCredentials: vi.fn(async () => null), + invalidateCredentials: vi.fn(), + uploadCrud: vi.fn(async () => {}), + statusChanged: vi.fn(), + getDBWorkerPort: vi.fn(async () => port1), + trace: vi.fn(), + debug: vi.fn(), + info: vi.fn(), + log: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + time: vi.fn(), + timeEnd: vi.fn(), + [Comlink.releaseProxy]: vi.fn() + } as unknown as Comlink.Remote; + + const wrappedPort = { + port: port1, + clientProvider, + currentSubscriptions: [], + closeListeners: [] + } satisfies WrappedSyncPort; + + (implementation as any).ports.push(wrappedPort); + (implementation as any).uploadDataController = { + controller: abortController, + activePort: wrappedPort + }; + + const abortSpy = vi.spyOn(abortController, 'abort'); + await implementation.removePort(wrappedPort); + + expect(abortSpy).toHaveBeenCalled(); + const reason = abortSpy.mock.calls[0]?.[0]; + expect(typeof reason).toBe('string'); + }); +});