diff --git a/.changeset/shared-sync-lock-cleanup.md b/.changeset/shared-sync-lock-cleanup.md new file mode 100644 index 000000000..a39a6c32e --- /dev/null +++ b/.changeset/shared-sync-lock-cleanup.md @@ -0,0 +1,5 @@ +--- +"@powersync/web": patch +--- + +Ensure shared sync releases db-locks on close/pagehide and cleans up pending lock requests. diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 4a46e3a9c..b59f707c3 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -115,6 +115,12 @@ export class LockedAsyncDatabaseAdapter * This is only required for the long-lived shared IndexedDB connections. */ this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS; + + if (this._db instanceof WorkerWrappedAsyncDatabaseConnection) { + this._db.onClose(() => { + this.abortPendingLockRequests('Closed'); + }); + } } getConfiguration(): ResolvedWebSQLOpenOptions { @@ -174,7 +180,7 @@ export class LockedAsyncDatabaseAdapter if (dispose) { dispose(); } - this.pendingAbortControllers.forEach((controller) => controller.abort('Closed')); + this.abortPendingLockRequests('Closed'); await this.baseDB?.close?.(); this.closed = true; } @@ -232,14 +238,15 @@ export class LockedAsyncDatabaseAdapter }, timeoutMs) : null; - return getNavigatorLocks().request( - `db-lock-${this._dbIdentifier}`, - { signal: abortController.signal }, - async () => { - this.pendingAbortControllers.delete(abortController); - if (timoutId) { - clearTimeout(timoutId); - } + const clearTimeoutIfSet = () => { + if (timoutId) { + clearTimeout(timoutId); + } + }; + + return getNavigatorLocks() + .request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, async () => { + clearTimeoutIfSet(); const holdId = this.requiresHolds ? await this.baseDB.markHold() : null; try { return await callback(); @@ -248,8 +255,16 @@ export class LockedAsyncDatabaseAdapter await this.baseDB.releaseHold(holdId); } } - } - ); + }) + .finally(() => { + clearTimeoutIfSet(); + this.pendingAbortControllers.delete(abortController); + }); + } + + private abortPendingLockRequests(reason: string) { + this.pendingAbortControllers.forEach((controller) => controller.abort(reason)); + this.pendingAbortControllers.clear(); } async readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions | undefined): Promise { diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index e03bf8aa8..68add7a47 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -32,11 +32,16 @@ export class WorkerWrappedAsyncDatabaseConnection void>(); constructor(protected options: WrappedWorkerConnectionOptions) { if (options.remoteCanCloseUnexpectedly) { this.notifyRemoteClosed = new AbortController(); } + if (options.onClose) { + this.closeListeners.add(options.onClose); + } } protected get baseConnection() { @@ -160,14 +165,44 @@ export class WorkerWrappedAsyncDatabaseConnection void): () => void { + this.closeListeners.add(callback); + return () => this.closeListeners.delete(callback); + } + + private finalizeClose(): void { + if (this.finalized) { + return; + } + this.finalized = true; + // Ensure cleanup is idempotent if close is triggered from multiple paths. + this.notifyRemoteClosed?.abort(); + try { + this.options.remote[Comlink.releaseProxy](); + } catch { + // Proxy can already be released on teardown. + } + this.closeListeners.forEach((listener) => { + try { + listener(); + } catch { + // Avoid throwing during cleanup. + } + }); + } + + forceClose(): void { + this.lockAbortController.abort(); + this.finalizeClose(); + } + async close(): Promise { // Abort any pending lock requests. this.lockAbortController.abort(); try { await this.withRemote(() => this.baseConnection.close()); } finally { - this.options.remote[Comlink.releaseProxy](); - this.options.onClose?.(); + this.finalizeClose(); } } diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 66d3b51ed..6f25bddbc 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -109,6 +109,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem protected isInitialized: Promise; protected dbAdapter: WebDBAdapter; private abortOnClose = new AbortController(); + private pagehideHandler?: (event: PageTransitionEvent) => void; + private pagehideTriggered = false; + private closeSignalPromise: Promise; constructor(options: SharedWebStreamingSyncImplementationOptions) { super(options); @@ -191,18 +194,35 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem */ Comlink.expose(this.clientProvider, this.messagePort); + this.closeSignalPromise = new Promise((resolve) => { + const signal = this.abortOnClose.signal; + if (signal.aborted) { + resolve(); + return; + } + const onAbort = () => resolve(); + signal.addEventListener('abort', onAbort, { once: true }); + }); + // Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which // will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker // to free resources associated with this tab. getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => { - if (!this.abortOnClose.signal.aborted) { + if (!this.abortOnClose.signal.aborted && !this.pagehideTriggered) { this.syncManager.addLockBasedCloseSignal(lock!.name); - - await new Promise((r) => { - this.abortOnClose.signal.onabort = () => r(); - }); } + await this.closeSignalPromise; }); + + this.pagehideHandler = (event) => { + if (!event.persisted) { + this.pagehideTriggered = true; + if (!this.abortOnClose.signal.aborted) { + this.abortOnClose.abort(); + } + } + }; + window.addEventListener('pagehide', this.pagehideHandler); } /** @@ -250,6 +270,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem this.messagePort.postMessage(closeMessagePayload); }); this.abortOnClose.abort(); + if (this.pagehideHandler) { + window.removeEventListener('pagehide', this.pagehideHandler); + } // Release the proxy this.syncManager[Comlink.releaseProxy](); diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 5c4f5b683..fbfa09bf3 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -487,8 +487,8 @@ export class SharedSyncImplementation extends BaseObserver { this.logger.info('Aborting open connection because associated tab closed.'); - await wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex)); wrapped.markRemoteClosed(); + await wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex)); }); return wrapped; diff --git a/packages/web/tests/force_close_db_lock_abort.test.ts b/packages/web/tests/force_close_db_lock_abort.test.ts new file mode 100644 index 000000000..6a173e224 --- /dev/null +++ b/packages/web/tests/force_close_db_lock_abort.test.ts @@ -0,0 +1,114 @@ +import * as Comlink from 'comlink'; +import { describe, expect, it, vi } from 'vitest'; +import { LockedAsyncDatabaseAdapter } from '../src/db/adapters/LockedAsyncDatabaseAdapter'; +import { WorkerWrappedAsyncDatabaseConnection } from '../src/db/adapters/WorkerWrappedAsyncDatabaseConnection'; +import type { AsyncDatabaseConnection, OpenAsyncDatabaseConnection } from '../src/db/adapters/AsyncDatabaseConnection'; +import { + DEFAULT_WEB_SQL_FLAGS, + TemporaryStorageOption, + type ResolvedWebSQLOpenOptions +} from '../src/db/adapters/web-sql-flags'; + +const baseConfig: ResolvedWebSQLOpenOptions = { + dbFilename: 'crm.sqlite', + flags: DEFAULT_WEB_SQL_FLAGS, + temporaryStorage: TemporaryStorageOption.MEMORY, + cacheSizeKb: 1 +}; + +const baseConnection: AsyncDatabaseConnection = { + init: async () => {}, + close: async () => {}, + markHold: async () => 'hold', + releaseHold: async () => {}, + isAutoCommit: async () => true, + execute: async () => ({ rows: { _array: [], length: 0 }, rowsAffected: 0, insertId: 0 }), + executeRaw: async () => [], + executeBatch: async () => ({ rows: { _array: [], length: 0 }, rowsAffected: 0, insertId: 0 }), + registerOnTableChange: async () => () => {}, + getConfig: async () => baseConfig +}; + +describe('forceClose db-lock abort', () => { + it('aborts pending db-lock when forceClose happens after lock request', async () => { + let lockRequestedResolve!: () => void; + const lockRequested = new Promise((resolve) => { + lockRequestedResolve = resolve; + }); + + const mockLocks = { + request: ((...args: any[]) => { + const [name, optionsOrCallback, callback] = args; + const lockCallback = typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; + const signal = + typeof optionsOrCallback === 'object' && optionsOrCallback ? optionsOrCallback.signal : undefined; + return new Promise((resolve, reject) => { + const onAbort = () => { + reject(new DOMException('Aborted', 'AbortError')); + }; + if (signal) { + signal.addEventListener('abort', onAbort); + } + Promise.resolve() + .then(async () => { + const callbackPromise = lockCallback?.({ name, mode: 'exclusive' }); + lockRequestedResolve(); + return callbackPromise; + }) + .then(resolve) + .catch(reject) + .finally(() => { + if (signal) { + signal.removeEventListener('abort', onAbort); + } + }); + }); + }) as LockManager['request'], + query: vi.fn() + } as LockManager; + + const locksSpy = vi.spyOn(navigator, 'locks', 'get').mockReturnValue(mockLocks); + + const hangingExecute = vi.fn(() => new Promise(() => {})); + const connection: AsyncDatabaseConnection = { ...baseConnection, execute: hangingExecute }; + const remote = { + [Comlink.releaseProxy]: () => {} + } as Comlink.Remote>; + let wrapped: WorkerWrappedAsyncDatabaseConnection | null = null; + + const adapter = new LockedAsyncDatabaseAdapter({ + name: 'crm.sqlite', + openConnection: async () => { + wrapped = new WorkerWrappedAsyncDatabaseConnection({ + baseConnection: connection, + identifier: 'crm.sqlite', + remoteCanCloseUnexpectedly: false, + remote + }); + return wrapped; + } + }); + + const executePromise = adapter.execute('select 1'); + const executeOutcome = executePromise.then( + () => ({ status: 'resolved' as const }), + (error) => ({ status: 'rejected' as const, error }) + ); + + await lockRequested; + wrapped!.forceClose(); + + const outcome = await Promise.race([ + executeOutcome, + new Promise<{ status: 'timeout' }>((resolve) => setTimeout(() => resolve({ status: 'timeout' }), 150)) + ]); + + locksSpy.mockRestore(); + + expect(outcome.status).not.toBe('timeout'); + expect(outcome.status).toBe('rejected'); + if (outcome.status === 'rejected') { + expect(outcome.error?.name).toBe('AbortError'); + } + }); +}); diff --git a/packages/web/tests/shared_sync_db_lock_cleanup.test.ts b/packages/web/tests/shared_sync_db_lock_cleanup.test.ts new file mode 100644 index 000000000..6963588a4 --- /dev/null +++ b/packages/web/tests/shared_sync_db_lock_cleanup.test.ts @@ -0,0 +1,163 @@ +import * as Comlink from 'comlink'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { DBAdapter } from '@powersync/common'; +import type { AsyncDatabaseConnection } from '../src/db/adapters/AsyncDatabaseConnection'; +import { + DEFAULT_WEB_SQL_FLAGS, + TemporaryStorageOption, + type ResolvedWebSQLOpenOptions +} from '../src/db/adapters/web-sql-flags'; +import { SharedSyncImplementation, type WrappedSyncPort } from '../src/worker/sync/SharedSyncImplementation'; +import type { AbstractSharedSyncClientProvider } from '../src/worker/sync/AbstractSharedSyncClientProvider'; + +vi.mock('comlink', async () => { + const actual = await vi.importActual('comlink'); + return { + ...actual, + wrap: vi.fn() + }; +}); + +describe('Shared sync db-lock cleanup', { sequential: true }, () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('releases db-lock when the dedicated worker dies mid-operation', async () => { + let lockRequestedResolve!: (name: string) => void; + const lockRequested = new Promise((resolve) => { + lockRequestedResolve = resolve; + }); + + const activeLocks = new Set(); + const mockLocks = { + request: ((...args: any[]) => { + const [name, optionsOrCallback, callback] = args; + const lockCallback = typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; + const signal = + typeof optionsOrCallback === 'object' && optionsOrCallback ? optionsOrCallback.signal : undefined; + activeLocks.add(name); + lockRequestedResolve(name); + return new Promise((resolve, reject) => { + const onAbort = () => { + activeLocks.delete(name); + reject(new DOMException('Aborted', 'AbortError')); + }; + if (signal) { + signal.addEventListener('abort', onAbort); + } + Promise.resolve() + .then(() => lockCallback?.({ name, mode: 'exclusive' })) + .then(resolve) + .catch(reject) + .finally(() => { + if (signal) { + signal.removeEventListener('abort', onAbort); + } + activeLocks.delete(name); + }); + }); + }) as LockManager['request'], + query: vi.fn() + } as LockManager; + + vi.spyOn(navigator, 'locks', 'get').mockReturnValue(mockLocks); + + let executeCalledResolve!: () => void; + const executeCalled = new Promise((resolve) => { + executeCalledResolve = resolve; + }); + + const hangingExecute = vi.fn(() => { + executeCalledResolve(); + return new Promise(() => {}); + }); + const hangingClose = vi.fn(() => new Promise(() => {})); + + const baseConfig: ResolvedWebSQLOpenOptions = { + dbFilename: 'crm.sqlite', + flags: DEFAULT_WEB_SQL_FLAGS, + temporaryStorage: TemporaryStorageOption.MEMORY, + cacheSizeKb: 1 + }; + + const baseConnection: AsyncDatabaseConnection = { + init: async () => {}, + close: hangingClose, + markHold: async () => 'hold', + releaseHold: async () => {}, + isAutoCommit: async () => true, + execute: hangingExecute, + executeRaw: async () => [], + executeBatch: async () => ({ rows: { _array: [], length: 0 }, rowsAffected: 0, insertId: 0 }), + registerOnTableChange: async () => () => {}, + getConfig: async () => baseConfig + }; + + const openFn = vi.fn(async () => baseConnection) as unknown as Comlink.Remote< + (...args: unknown[]) => Promise + >; + (openFn as any)[Comlink.releaseProxy] = vi.fn(); + const wrapMock = Comlink.wrap as unknown as ReturnType; + wrapMock.mockReturnValue(openFn); + + const implementation = new SharedSyncImplementation(); + const { port1 } = new MessageChannel(); + + const clientProvider = { + fetchCredentials: vi.fn(async () => null), + invalidateCredentials: vi.fn(), + uploadCrud: vi.fn(async () => {}), + statusChanged: vi.fn(), + getDBWorkerPort: vi.fn(async () => port1), + trace: vi.fn(), + debug: vi.fn(), + info: vi.fn(), + log: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + time: vi.fn(), + timeEnd: vi.fn(), + [Comlink.releaseProxy]: vi.fn() + } as unknown as Comlink.Remote; + + const port: WrappedSyncPort = { + port: port1, + clientProvider, + currentSubscriptions: [], + closeListeners: [] + }; + + (implementation as unknown as { ports: WrappedSyncPort[] }).ports.push(port); + (implementation as unknown as { syncParams: unknown }).syncParams = { + streamOptions: {}, + dbParams: baseConfig + }; + + await (implementation as unknown as { openInternalDB: () => Promise }).openInternalDB(); + + const adapter = port.db as DBAdapter; + const executePromise = adapter.execute('select 1'); + const executeOutcome = executePromise.then( + () => ({ status: 'resolved' as const }), + (error) => ({ status: 'rejected' as const, error }) + ); + + const lockName = await lockRequested; + await executeCalled; + expect(lockName).toBe('db-lock-crm.sqlite'); + expect(activeLocks.has(lockName)).toBe(true); + + const closeListener = port.closeListeners[0]; + expect(closeListener).toBeDefined(); + void Promise.resolve(closeListener?.()).catch(() => {}); + + const outcome = await Promise.race([ + executeOutcome, + new Promise<{ status: 'timeout' }>((resolve) => setTimeout(() => resolve({ status: 'timeout' }), 150)) + ]); + + expect(outcome.status).not.toBe('timeout'); + await vi.waitFor(() => expect(activeLocks.size).toBe(0), { timeout: 50 }); + }); +}); diff --git a/packages/web/tests/shared_sync_pagehide.test.ts b/packages/web/tests/shared_sync_pagehide.test.ts new file mode 100644 index 000000000..4690349bf --- /dev/null +++ b/packages/web/tests/shared_sync_pagehide.test.ts @@ -0,0 +1,98 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { BucketStorageAdapter } from '@powersync/common'; +import { SharedWebStreamingSyncImplementation, WebRemote } from '@powersync/web'; +import type { WebDBAdapter } from '../src/db/adapters/WebDBAdapter'; +import { TestConnector } from './utils/MockStreamOpenFactory'; + +vi.mock('comlink', async () => { + const actual = await vi.importActual('comlink'); + return { + ...actual, + wrap: () => ({ + setLogLevel: vi.fn(), + setParams: vi.fn(() => Promise.resolve()), + addLockBasedCloseSignal: vi.fn(), + triggerCrudUpload: vi.fn(), + connect: vi.fn(), + disconnect: vi.fn(), + updateSubscriptions: vi.fn(), + getWriteCheckpoint: vi.fn(), + hasCompletedSync: vi.fn(), + _testUpdateAllStatuses: vi.fn(), + [actual.releaseProxy]: vi.fn() + }), + expose: vi.fn() + }; +}); + +describe('Shared sync pagehide cleanup', { sequential: true }, () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('releases tab-close-signal lock on pagehide', async () => { + let lockRequestedResolve!: (name: string) => void; + const lockRequested = new Promise((resolve) => { + lockRequestedResolve = resolve; + }); + + const activeLocks = new Set(); + let lockCallbackStarted = false; + let lockCallbackFinished = false; + const mockLocks = { + request: ((...args: any[]) => { + const [name, optionsOrCallback, callback] = args; + const lockCallback = typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; + activeLocks.add(name); + lockRequestedResolve(name); + return Promise.resolve() + .then(() => { + lockCallbackStarted = true; + return lockCallback?.({ name, mode: 'exclusive' }); + }) + .finally(() => { + lockCallbackFinished = true; + activeLocks.delete(name); + }); + }) as LockManager['request'], + query: vi.fn() + } as LockManager; + + vi.spyOn(navigator, 'locks', 'get').mockReturnValue(mockLocks); + + const adapter = {} as unknown as BucketStorageAdapter; + const dbAdapter = { + getConfiguration: () => ({}), + shareConnection: async () => ({ identifier: 'crm.sqlite', port: {} as MessagePort }) + } as unknown as WebDBAdapter; + const remote = new WebRemote(new TestConnector()); + + new SharedWebStreamingSyncImplementation({ + adapter, + remote, + uploadCrud: async () => {}, + identifier: 'crm.sqlite', + crudUploadThrottleMs: 1000, + retryDelayMs: 1000, + db: dbAdapter, + subscriptions: [], + sync: { + worker: () => ({ port: {} as MessagePort } as SharedWorker) + } + }); + + const lockName = await lockRequested; + expect(activeLocks.has(lockName)).toBe(true); + + const pagehide = + typeof PageTransitionEvent === 'function' + ? new PageTransitionEvent('pagehide', { persisted: false }) + : Object.assign(new Event('pagehide'), { persisted: false }); + window.dispatchEvent(pagehide); + + await vi.waitFor(() => expect(lockCallbackStarted).toBe(true)); + await vi.waitFor(() => expect(lockCallbackFinished).toBe(true)); + + expect(activeLocks.size).toBe(0); + }); +});