Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shared-sync-lock-cleanup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@powersync/web": patch
---

Ensure shared sync releases db-locks on close/pagehide and cleans up pending lock requests.
37 changes: 26 additions & 11 deletions packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ export class LockedAsyncDatabaseAdapter
* This is only required for the long-lived shared IndexedDB connections.
*/
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;

if (this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
this._db.onClose(() => {
this.abortPendingLockRequests('Closed');
});
}
}

getConfiguration(): ResolvedWebSQLOpenOptions {
Expand Down Expand Up @@ -174,7 +180,7 @@ export class LockedAsyncDatabaseAdapter
if (dispose) {
dispose();
}
this.pendingAbortControllers.forEach((controller) => controller.abort('Closed'));
this.abortPendingLockRequests('Closed');
await this.baseDB?.close?.();
this.closed = true;
}
Expand Down Expand Up @@ -232,14 +238,15 @@ export class LockedAsyncDatabaseAdapter
}, timeoutMs)
: null;

return getNavigatorLocks().request(
`db-lock-${this._dbIdentifier}`,
{ signal: abortController.signal },
async () => {
this.pendingAbortControllers.delete(abortController);
if (timoutId) {
clearTimeout(timoutId);
}
const clearTimeoutIfSet = () => {
if (timoutId) {
clearTimeout(timoutId);
}
};

return getNavigatorLocks()
.request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, async () => {
clearTimeoutIfSet();
const holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
try {
return await callback();
Expand All @@ -248,8 +255,16 @@ export class LockedAsyncDatabaseAdapter
await this.baseDB.releaseHold(holdId);
}
}
}
);
})
.finally(() => {
clearTimeoutIfSet();
this.pendingAbortControllers.delete(abortController);
});
}

private abortPendingLockRequests(reason: string) {
this.pendingAbortControllers.forEach((controller) => controller.abort(reason));
this.pendingAbortControllers.clear();
}

async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
{
protected lockAbortController = new AbortController();
protected notifyRemoteClosed: AbortController | undefined;
private finalized = false;
private closeListeners = new Set<() => void>();

constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
if (options.remoteCanCloseUnexpectedly) {
this.notifyRemoteClosed = new AbortController();
}
if (options.onClose) {
this.closeListeners.add(options.onClose);
}
}

protected get baseConnection() {
Expand Down Expand Up @@ -160,14 +165,44 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
return this.baseConnection.registerOnTableChange(Comlink.proxy(callback));
}

onClose(callback: () => void): () => void {
this.closeListeners.add(callback);
return () => this.closeListeners.delete(callback);
}

private finalizeClose(): void {
if (this.finalized) {
return;
}
this.finalized = true;
// Ensure cleanup is idempotent if close is triggered from multiple paths.
this.notifyRemoteClosed?.abort();
try {
this.options.remote[Comlink.releaseProxy]();
} catch {
// Proxy can already be released on teardown.
}
this.closeListeners.forEach((listener) => {
try {
listener();
} catch {
// Avoid throwing during cleanup.
}
});
}

forceClose(): void {
this.lockAbortController.abort();
this.finalizeClose();
}

async close(): Promise<void> {
// Abort any pending lock requests.
this.lockAbortController.abort();
try {
await this.withRemote(() => this.baseConnection.close());
} finally {
this.options.remote[Comlink.releaseProxy]();
this.options.onClose?.();
this.finalizeClose();
}
}

Expand Down
33 changes: 28 additions & 5 deletions packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
protected isInitialized: Promise<void>;
protected dbAdapter: WebDBAdapter;
private abortOnClose = new AbortController();
private pagehideHandler?: (event: PageTransitionEvent) => void;
private pagehideTriggered = false;
private closeSignalPromise: Promise<void>;

constructor(options: SharedWebStreamingSyncImplementationOptions) {
super(options);
Expand Down Expand Up @@ -191,18 +194,35 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
*/
Comlink.expose(this.clientProvider, this.messagePort);

this.closeSignalPromise = new Promise<void>((resolve) => {
const signal = this.abortOnClose.signal;
if (signal.aborted) {
resolve();
return;
}
const onAbort = () => resolve();
signal.addEventListener('abort', onAbort, { once: true });
});

// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
// to free resources associated with this tab.
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
if (!this.abortOnClose.signal.aborted) {
if (!this.abortOnClose.signal.aborted && !this.pagehideTriggered) {
this.syncManager.addLockBasedCloseSignal(lock!.name);

await new Promise<void>((r) => {
this.abortOnClose.signal.onabort = () => r();
});
}
await this.closeSignalPromise;
});

this.pagehideHandler = (event) => {
if (!event.persisted) {
this.pagehideTriggered = true;
if (!this.abortOnClose.signal.aborted) {
this.abortOnClose.abort();
}
}
};
window.addEventListener('pagehide', this.pagehideHandler);
}

/**
Expand Down Expand Up @@ -250,6 +270,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
this.messagePort.postMessage(closeMessagePayload);
});
this.abortOnClose.abort();
if (this.pagehideHandler) {
window.removeEventListener('pagehide', this.pagehideHandler);
}

// Release the proxy
this.syncManager[Comlink.releaseProxy]();
Expand Down
2 changes: 1 addition & 1 deletion packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
});
lastClient.closeListeners.push(async () => {
this.logger.info('Aborting open connection because associated tab closed.');
await wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex));
wrapped.markRemoteClosed();
await wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex));
});

return wrapped;
Expand Down
114 changes: 114 additions & 0 deletions packages/web/tests/force_close_db_lock_abort.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import * as Comlink from 'comlink';
import { describe, expect, it, vi } from 'vitest';
import { LockedAsyncDatabaseAdapter } from '../src/db/adapters/LockedAsyncDatabaseAdapter';
import { WorkerWrappedAsyncDatabaseConnection } from '../src/db/adapters/WorkerWrappedAsyncDatabaseConnection';
import type { AsyncDatabaseConnection, OpenAsyncDatabaseConnection } from '../src/db/adapters/AsyncDatabaseConnection';
import {
DEFAULT_WEB_SQL_FLAGS,
TemporaryStorageOption,
type ResolvedWebSQLOpenOptions
} from '../src/db/adapters/web-sql-flags';

const baseConfig: ResolvedWebSQLOpenOptions = {
dbFilename: 'crm.sqlite',
flags: DEFAULT_WEB_SQL_FLAGS,
temporaryStorage: TemporaryStorageOption.MEMORY,
cacheSizeKb: 1
};

const baseConnection: AsyncDatabaseConnection = {
init: async () => {},
close: async () => {},
markHold: async () => 'hold',
releaseHold: async () => {},
isAutoCommit: async () => true,
execute: async () => ({ rows: { _array: [], length: 0 }, rowsAffected: 0, insertId: 0 }),
executeRaw: async () => [],
executeBatch: async () => ({ rows: { _array: [], length: 0 }, rowsAffected: 0, insertId: 0 }),
registerOnTableChange: async () => () => {},
getConfig: async () => baseConfig
};

describe('forceClose db-lock abort', () => {
it('aborts pending db-lock when forceClose happens after lock request', async () => {
let lockRequestedResolve!: () => void;
const lockRequested = new Promise<void>((resolve) => {
lockRequestedResolve = resolve;
});

const mockLocks = {
request: ((...args: any[]) => {
const [name, optionsOrCallback, callback] = args;
const lockCallback = typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;
const signal =
typeof optionsOrCallback === 'object' && optionsOrCallback ? optionsOrCallback.signal : undefined;
return new Promise((resolve, reject) => {
const onAbort = () => {
reject(new DOMException('Aborted', 'AbortError'));
};
if (signal) {
signal.addEventListener('abort', onAbort);
}
Promise.resolve()
.then(async () => {
const callbackPromise = lockCallback?.({ name, mode: 'exclusive' });
lockRequestedResolve();
return callbackPromise;
})
.then(resolve)
.catch(reject)
.finally(() => {
if (signal) {
signal.removeEventListener('abort', onAbort);
}
});
});
}) as LockManager['request'],
query: vi.fn()
} as LockManager;

const locksSpy = vi.spyOn(navigator, 'locks', 'get').mockReturnValue(mockLocks);

const hangingExecute = vi.fn(() => new Promise<never>(() => {}));
const connection: AsyncDatabaseConnection = { ...baseConnection, execute: hangingExecute };
const remote = {
[Comlink.releaseProxy]: () => {}
} as Comlink.Remote<OpenAsyncDatabaseConnection<ResolvedWebSQLOpenOptions>>;
let wrapped: WorkerWrappedAsyncDatabaseConnection | null = null;

const adapter = new LockedAsyncDatabaseAdapter({
name: 'crm.sqlite',
openConnection: async () => {
wrapped = new WorkerWrappedAsyncDatabaseConnection({
baseConnection: connection,
identifier: 'crm.sqlite',
remoteCanCloseUnexpectedly: false,
remote
});
return wrapped;
}
});

const executePromise = adapter.execute('select 1');
const executeOutcome = executePromise.then(
() => ({ status: 'resolved' as const }),
(error) => ({ status: 'rejected' as const, error })
);

await lockRequested;
wrapped!.forceClose();

const outcome = await Promise.race([
executeOutcome,
new Promise<{ status: 'timeout' }>((resolve) => setTimeout(() => resolve({ status: 'timeout' }), 150))
]);

locksSpy.mockRestore();

expect(outcome.status).not.toBe('timeout');
expect(outcome.status).toBe('rejected');
if (outcome.status === 'rejected') {
expect(outcome.error?.name).toBe('AbortError');
}
});
});
Loading