Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shared-sync-cloneable-abort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@powersync/web": patch
---

Use a cloneable abort reason when closing shared sync client ports to avoid DataCloneError across Comlink.
6 changes: 2 additions & 4 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
AbortOperation,
BaseObserver,
ConnectionManager,
createLogger,
Expand Down Expand Up @@ -313,11 +312,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
* The port might currently be in use. Any active functions might
* not resolve. Abort them here.
*/
const abortReason = 'Closing pending requests after client port is removed';
[this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => {
if (abortController?.activePort == port) {
abortController!.controller.abort(
new AbortOperation('Closing pending requests after client port is removed')
);
abortController!.controller.abort(abortReason);
}
});

Expand Down
96 changes: 96 additions & 0 deletions packages/web/tests/shared_sync_abort_reason.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import * as Comlink from 'comlink';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { SharedSyncImplementation, type WrappedSyncPort } from '../src/worker/sync/SharedSyncImplementation';
import type { AbstractSharedSyncClientProvider } from '../src/worker/sync/AbstractSharedSyncClientProvider';

describe('Shared sync abort reasons', { sequential: true }, () => {
afterEach(() => {
vi.restoreAllMocks();
});

it('uses a cloneable abort reason when closing a client port', async () => {
const implementation = new SharedSyncImplementation();
const abortController = new AbortController();
const { port1 } = new MessageChannel();

const clientProvider = {
fetchCredentials: vi.fn(async () => null),
invalidateCredentials: vi.fn(),
uploadCrud: vi.fn(async () => {}),
statusChanged: vi.fn(),
getDBWorkerPort: vi.fn(async () => port1),
trace: vi.fn(),
debug: vi.fn(),
info: vi.fn(),
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
time: vi.fn(),
timeEnd: vi.fn(),
[Comlink.releaseProxy]: vi.fn()
} as unknown as Comlink.Remote<AbstractSharedSyncClientProvider>;

const wrappedPort = {
port: port1,
clientProvider,
currentSubscriptions: [],
closeListeners: []
} satisfies WrappedSyncPort;

(implementation as any).ports.push(wrappedPort);
(implementation as any).fetchCredentialsController = {
controller: abortController,
activePort: wrappedPort
};

const abortSpy = vi.spyOn(abortController, 'abort');
await implementation.removePort(wrappedPort);

expect(abortSpy).toHaveBeenCalled();
const reason = abortSpy.mock.calls[0]?.[0];
expect(typeof reason).toBe('string');
});

it('uses a cloneable abort reason when closing a client port with uploads in-flight', async () => {
const implementation = new SharedSyncImplementation();
const abortController = new AbortController();
const { port1 } = new MessageChannel();

const clientProvider = {
fetchCredentials: vi.fn(async () => null),
invalidateCredentials: vi.fn(),
uploadCrud: vi.fn(async () => {}),
statusChanged: vi.fn(),
getDBWorkerPort: vi.fn(async () => port1),
trace: vi.fn(),
debug: vi.fn(),
info: vi.fn(),
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
time: vi.fn(),
timeEnd: vi.fn(),
[Comlink.releaseProxy]: vi.fn()
} as unknown as Comlink.Remote<AbstractSharedSyncClientProvider>;

const wrappedPort = {
port: port1,
clientProvider,
currentSubscriptions: [],
closeListeners: []
} satisfies WrappedSyncPort;

(implementation as any).ports.push(wrappedPort);
(implementation as any).uploadDataController = {
controller: abortController,
activePort: wrappedPort
};

const abortSpy = vi.spyOn(abortController, 'abort');
await implementation.removePort(wrappedPort);

expect(abortSpy).toHaveBeenCalled();
const reason = abortSpy.mock.calls[0]?.[0];
expect(typeof reason).toBe('string');
});
});