Skip to content

Commit 52a7733

Browse files
authored
Merge pull request #586 from powersync-ja/fix-missing-names-filter
Fix missing filter when setting `last_op`
2 parents 5bd9e84 + e9b90bb commit 52a7733

File tree

4 files changed

+66
-19
lines changed

4 files changed

+66
-19
lines changed

.changeset/polite-news-sneeze.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix applying bucket state around partial syncs.

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,9 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
154154
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
155155
}
156156

157-
const buckets = checkpoint.buckets;
157+
let buckets = checkpoint.buckets;
158158
if (priority !== undefined) {
159-
buckets.filter((b) => hasMatchingPriority(priority, b));
159+
buckets = buckets.filter((b) => hasMatchingPriority(priority, b));
160160
}
161161
const bucketNames = buckets.map((b) => b.bucket);
162162
await this.writeTransaction(async (tx) => {

packages/node/tests/sync.test.ts

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ describe('Sync', () => {
6161
mockSyncServiceTest('without priorities', async ({ syncService }) => {
6262
const database = await syncService.createDatabase();
6363
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
64-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
64+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
6565

6666
syncService.pushLine({
6767
checkpoint: {
@@ -96,7 +96,7 @@ describe('Sync', () => {
9696
mockSyncServiceTest('interrupted sync', async ({ syncService }) => {
9797
let database = await syncService.createDatabase();
9898
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
99-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
99+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
100100

101101
syncService.pushLine({
102102
checkpoint: {
@@ -111,12 +111,12 @@ describe('Sync', () => {
111111

112112
// Close this database before sending the checkpoint...
113113
await database.close();
114-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0));
114+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
115115

116116
// And open a new one
117117
database = await syncService.createDatabase();
118118
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
119-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
119+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
120120

121121
// Send same checkpoint again
122122
syncService.pushLine({
@@ -135,7 +135,7 @@ describe('Sync', () => {
135135
mockSyncServiceTest('interrupted sync with new checkpoint', async ({ syncService }) => {
136136
let database = await syncService.createDatabase();
137137
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
138-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
138+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
139139

140140
syncService.pushLine({
141141
checkpoint: {
@@ -150,10 +150,10 @@ describe('Sync', () => {
150150

151151
// Re-open database
152152
await database.close();
153-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0));
153+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
154154
database = await syncService.createDatabase();
155155
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
156-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
156+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
157157

158158
// Send checkpoint with new data
159159
syncService.pushLine({
@@ -171,7 +171,7 @@ describe('Sync', () => {
171171
mockSyncServiceTest('different priorities', async ({ syncService }) => {
172172
let database = await syncService.createDatabase();
173173
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
174-
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
174+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
175175

176176
syncService.pushLine({
177177
checkpoint: {
@@ -219,6 +219,39 @@ describe('Sync', () => {
219219
pushCheckpointComplete(syncService);
220220
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
221221
});
222+
223+
mockSyncServiceTest('uses correct state when reconnecting', async ({syncService}) => {
224+
let database = await syncService.createDatabase();
225+
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
226+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
227+
228+
syncService.pushLine({
229+
checkpoint: {
230+
last_op_id: '10',
231+
buckets: [
232+
bucket('a', 5, {priority: 0}),
233+
bucket('b', 5, {priority: 3}),
234+
]
235+
}
236+
});
237+
238+
// Sync priority 0 completely, start with rest
239+
pushDataLine(syncService, 'a', 5);
240+
pushDataLine(syncService, 'b', 1);
241+
pushCheckpointComplete(syncService, 0);
242+
await database.waitForFirstSync({priority: 0});
243+
244+
await database.close();
245+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
246+
database = await syncService.createDatabase();
247+
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
248+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
249+
250+
expect(syncService.connectedListeners[0].buckets).toStrictEqual([
251+
{"name": "a", "after": "10"},
252+
{"name": "b", "after": "6"},
253+
]);
254+
});
222255
});
223256
});
224257

packages/node/tests/utils.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import path from 'node:path';
44
import { onTestFinished, test } from 'vitest';
55
import {
66
AbstractPowerSyncDatabase,
7-
AbstractRemoteOptions,
87
column,
98
NodePowerSyncDatabaseOptions,
109
PowerSyncBackendConnector,
@@ -76,20 +75,30 @@ export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDataba
7675
// TODO: Unify this with the test setup for the web SDK.
7776
export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockSyncService }>({
7877
syncService: async ({ tmpdir }, use) => {
79-
const listeners: ReadableStreamDefaultController<StreamingSyncLine>[] = [];
78+
interface Listener {
79+
request: any,
80+
stream: ReadableStreamDefaultController<StreamingSyncLine>,
81+
}
82+
83+
const listeners: Listener[] = [];
8084

8185
const inMemoryFetch: typeof fetch = async (info, init?) => {
8286
const request = new Request(info, init);
8387
if (request.url.endsWith('/sync/stream')) {
84-
let thisController: ReadableStreamDefaultController<StreamingSyncLine> | null = null;
88+
const body = await request.json();
89+
let listener: Listener | null = null;
8590

8691
const syncLines = new ReadableStream<StreamingSyncLine>({
8792
start(controller) {
88-
thisController = controller;
89-
listeners.push(controller);
93+
listener = {
94+
request: body,
95+
stream: controller,
96+
};
97+
98+
listeners.push(listener);
9099
},
91100
cancel() {
92-
listeners.splice(listeners.indexOf(thisController!), 1);
101+
listeners.splice(listeners.indexOf(listener!), 1);
93102
}
94103
});
95104

@@ -120,11 +129,11 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS
120129

121130
await use({
122131
get connectedListeners() {
123-
return listeners.length;
132+
return listeners.map((e) => e.request);
124133
},
125134
pushLine(line) {
126135
for (const listener of listeners) {
127-
listener.enqueue(line);
136+
listener.stream.enqueue(line);
128137
}
129138
},
130139
createDatabase: newConnection
@@ -134,7 +143,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS
134143

135144
export interface MockSyncService {
136145
pushLine: (line: StreamingSyncLine) => void;
137-
connectedListeners: number;
146+
connectedListeners: any[];
138147
createDatabase: () => Promise<PowerSyncDatabase>;
139148
}
140149

0 commit comments

Comments
 (0)