Skip to content

Commit 88923e9

Browse files
committed
Use in-memory sync tests
1 parent 1421bb6 commit 88923e9

File tree

4 files changed

+104
-66
lines changed

4 files changed

+104
-66
lines changed

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,19 @@ final class CoreSyncStatus {
9999
}
100100

101101
final class DownloadProgress {
102-
final Map<String, BucketProgress> progress;
102+
final Map<String, BucketProgress> buckets;
103103

104-
DownloadProgress(this.progress);
104+
DownloadProgress(this.buckets);
105105

106106
factory DownloadProgress.fromJson(Map<String, Object?> line) {
107-
return DownloadProgress(line.map((k, v) =>
108-
MapEntry(k, _bucketProgressFromJson(v as Map<String, Object?>))));
107+
final rawBuckets = line['buckets'] as Map<String, Object?>;
108+
109+
return DownloadProgress(rawBuckets.map((k, v) {
110+
return MapEntry(
111+
k,
112+
_bucketProgressFromJson(v as Map<String, Object?>),
113+
);
114+
}));
109115
}
110116

111117
static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ final class MutableSyncStatus {
8787
priorityStatusEntries = status.priorityStatus;
8888
downloadProgress = switch (status.downloading) {
8989
null => null,
90-
final downloading => InternalSyncDownloadProgress(downloading.progress),
90+
final downloading => InternalSyncDownloadProgress(downloading.buckets),
9191
};
9292
lastSyncedAt = status.priorityStatus
9393
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class StreamingSyncImplementation implements StreamingSync {
139139
// Protect sync iterations with exclusivity (if a valid Mutex is provided)
140140
await syncMutex.lock(() {
141141
switch (options.source.syncImplementation) {
142+
// ignore: deprecated_member_use_from_same_package
142143
case SyncClientImplementation.dart:
143144
return _dartStreamingSyncIteration();
144145
case SyncClientImplementation.rust:
@@ -568,7 +569,7 @@ String _syncErrorMessage(Object? error) {
568569
} else if (error is PowerSyncProtocolException) {
569570
return 'Protocol error';
570571
} else {
571-
return '${error.runtimeType}';
572+
return '${error.runtimeType}: $error';
572573
}
573574
}
574575

@@ -592,7 +593,7 @@ final class _ActiveRustStreamingIteration {
592593
assert(_completedStream.isCompleted, 'Should have started streaming');
593594
await _completedStream.future;
594595
} finally {
595-
_isActive = true;
596+
_isActive = false;
596597
_completedUploads?.cancel();
597598
await _stop();
598599
}
@@ -608,6 +609,10 @@ final class _ActiveRustStreamingIteration {
608609

609610
loop:
610611
await for (final event in events) {
612+
if (!_isActive || sync.aborted) {
613+
break;
614+
}
615+
611616
switch (event) {
612617
case ReceivedLine(line: final Uint8List line):
613618
await _control('line_binary', line);
@@ -623,7 +628,9 @@ final class _ActiveRustStreamingIteration {
623628
}
624629
}
625630

626-
Future<void> _stop() => _control('stop');
631+
Future<void> _stop() {
632+
return _control('stop');
633+
}
627634

628635
Future<void> _control(String operation, [Object? payload]) async {
629636
final rawResponse = await sync.adapter.control(operation, payload);
@@ -662,7 +669,10 @@ final class _ActiveRustStreamingIteration {
662669
});
663670
}
664671
case CloseSyncStream():
665-
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
672+
if (!sync.aborted) {
673+
_isActive = false;
674+
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
675+
}
666676
case FlushFileSystem():
667677
await sync.adapter.flushFileSystem();
668678
case DidCompleteSync():

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,26 @@ import 'utils/in_memory_http.dart';
1616
import 'utils/test_utils_impl.dart';
1717

1818
void main() {
19+
_declareTests(
20+
'dart sync client',
21+
SyncOptions(
22+
// ignore: deprecated_member_use_from_same_package
23+
syncImplementation: SyncClientImplementation.dart,
24+
),
25+
);
26+
27+
_declareTests(
28+
'rust sync client',
29+
SyncOptions(
30+
syncImplementation: SyncClientImplementation.rust,
31+
),
32+
);
33+
}
34+
35+
void _declareTests(String name, SyncOptions options) {
1936
final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF;
2037

21-
group('in-memory sync tests', () {
38+
group(name, () {
2239
late final testUtils = TestUtils();
2340

2441
late TestPowerSyncFactory factory;
@@ -44,6 +61,7 @@ void main() {
4461
expiresAt: DateTime.now(),
4562
);
4663
}, uploadData: (db) => uploadData(db)),
64+
options: options,
4765
);
4866
}
4967

@@ -107,6 +125,7 @@ void main() {
107125
});
108126
await expectLater(
109127
status, emits(isSyncStatus(downloading: false, hasSynced: true)));
128+
await syncClient.abort();
110129

111130
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
112131
addTearDown(independentDb.close);
@@ -122,65 +141,68 @@ void main() {
122141
isTrue);
123142
});
124143

125-
test('can save independent buckets in same transaction', () async {
126-
final status = await waitForConnection();
127-
128-
syncService.addLine({
129-
'checkpoint': Checkpoint(
130-
lastOpId: '0',
131-
writeCheckpoint: null,
132-
checksums: [
133-
BucketChecksum(bucket: 'a', checksum: 0, priority: 3),
134-
BucketChecksum(bucket: 'b', checksum: 0, priority: 3),
135-
],
136-
)
137-
});
138-
await expectLater(status, emits(isSyncStatus(downloading: true)));
139-
140-
var commits = 0;
141-
raw.commits.listen((_) => commits++);
144+
// ignore: deprecated_member_use_from_same_package
145+
if (options.syncImplementation == SyncClientImplementation.dart) {
146+
test('can save independent buckets in same transaction', () async {
147+
final status = await waitForConnection();
142148

143-
syncService
144-
..addLine({
145-
'data': {
146-
'bucket': 'a',
147-
'data': <Map<String, Object?>>[
148-
{
149-
'op_id': '1',
150-
'op': 'PUT',
151-
'object_type': 'a',
152-
'object_id': '1',
153-
'checksum': 0,
154-
'data': {},
155-
}
156-
],
157-
}
158-
})
159-
..addLine({
160-
'data': {
161-
'bucket': 'b',
162-
'data': <Map<String, Object?>>[
163-
{
164-
'op_id': '2',
165-
'op': 'PUT',
166-
'object_type': 'b',
167-
'object_id': '1',
168-
'checksum': 0,
169-
'data': {},
170-
}
149+
syncService.addLine({
150+
'checkpoint': Checkpoint(
151+
lastOpId: '0',
152+
writeCheckpoint: null,
153+
checksums: [
154+
BucketChecksum(bucket: 'a', checksum: 0, priority: 3),
155+
BucketChecksum(bucket: 'b', checksum: 0, priority: 3),
171156
],
172-
}
157+
)
173158
});
159+
await expectLater(status, emits(isSyncStatus(downloading: true)));
174160

175-
// Wait for the operations to be inserted.
176-
while (raw.select('SELECT * FROM ps_oplog;').length < 2) {
177-
await pumpEventQueue();
178-
}
161+
var commits = 0;
162+
raw.commits.listen((_) => commits++);
179163

180-
// The two buckets should have been inserted in a single transaction
181-
// because the messages were received in quick succession.
182-
expect(commits, 1);
183-
});
164+
syncService
165+
..addLine({
166+
'data': {
167+
'bucket': 'a',
168+
'data': <Map<String, Object?>>[
169+
{
170+
'op_id': '1',
171+
'op': 'PUT',
172+
'object_type': 'a',
173+
'object_id': '1',
174+
'checksum': 0,
175+
'data': {},
176+
}
177+
],
178+
}
179+
})
180+
..addLine({
181+
'data': {
182+
'bucket': 'b',
183+
'data': <Map<String, Object?>>[
184+
{
185+
'op_id': '2',
186+
'op': 'PUT',
187+
'object_type': 'b',
188+
'object_id': '1',
189+
'checksum': 0,
190+
'data': {},
191+
}
192+
],
193+
}
194+
});
195+
196+
// Wait for the operations to be inserted.
197+
while (raw.select('SELECT * FROM ps_oplog;').length < 2) {
198+
await pumpEventQueue();
199+
}
200+
201+
// The two buckets should have been inserted in a single transaction
202+
// because the messages were received in quick succession.
203+
expect(commits, 1);
204+
});
205+
}
184206

185207
group('partial sync', () {
186208
test('updates sync state incrementally', () async {
@@ -281,6 +303,7 @@ void main() {
281303
});
282304
await database.waitForFirstSync(priority: BucketPriority(1));
283305
expect(database.currentStatus.hasSynced, isFalse);
306+
await syncClient.abort();
284307

285308
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
286309
addTearDown(independentDb.close);
@@ -485,7 +508,7 @@ void main() {
485508
}) async {
486509
await expectLater(
487510
status,
488-
emits(isSyncStatus(
511+
emitsThrough(isSyncStatus(
489512
downloading: true,
490513
downloadProgress: isSyncDownloadProgress(
491514
progress: total,
@@ -644,7 +667,6 @@ void main() {
644667
await checkProgress(progress(8, 8), progress(10, 14));
645668

646669
addCheckpointComplete(0);
647-
await checkProgress(progress(8, 8), progress(10, 14));
648670

649671
addDataLine('b', 4);
650672
await checkProgress(progress(8, 8), progress(14, 14));

0 commit comments

Comments
 (0)