Skip to content

Commit cea388b

Browse files
committed
Start integrating Rust extension
1 parent 7f2780a commit cea388b

File tree

6 files changed

+286
-3
lines changed

6 files changed

+286
-3
lines changed

packages/powersync_core/lib/src/sync/bucket_storage.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
365365
});
366366
}
367367

368+
Future<String> control(String op, [Object? payload]) async {
369+
return await writeTransaction(
370+
(tx) async {
371+
final [row] =
372+
await tx.execute('SELECT powersync_control(?, ?)', [op, payload]);
373+
return row.columnAt(0) as String;
374+
},
375+
// We flush when powersync_control yields an instruction to do so.
376+
flush: false,
377+
);
378+
}
379+
380+
Future<void> flushFileSystem() async {
381+
// Noop outside of web.
382+
}
383+
368384
/// Note: The asynchronous nature of this is due to this needing a global
369385
/// lock. The actual database operations are still synchronous, and it
370386
/// is assumed that multiple functions on this instance won't be called
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import 'sync_status.dart';
2+
3+
/// An internal instruction emitted by the sync client in the core extension in
4+
/// response to the Dart SDK passing sync data into the extension.
5+
sealed class Instruction {
6+
factory Instruction.fromJson(Map<String, Object?> json) {
7+
return switch (json) {
8+
{'LogLine': final logLine} =>
9+
LogLine.fromJson(logLine as Map<String, Object?>),
10+
{'UpdateSyncStatus': final updateStatus} =>
11+
UpdateSyncStatus.fromJson(updateStatus as Map<String, Object?>),
12+
{'EstablishSyncStream': final establish} =>
13+
EstablishSyncStream.fromJson(establish as Map<String, Object?>),
14+
{'FetchCredentials': final creds} =>
15+
FetchCredentials.fromJson(creds as Map<String, Object?>),
16+
{'CloseSyncStream': _} => const CloseSyncStream(),
17+
{'FlushFileSystem': _} => const FlushFileSystem(),
18+
{'DidCompleteSync': _} => const DidCompleteSync(),
19+
_ => UnknownSyncInstruction(json)
20+
};
21+
}
22+
}
23+
24+
final class LogLine implements Instruction {
25+
final String severity;
26+
final String line;
27+
28+
LogLine({required this.severity, required this.line});
29+
30+
factory LogLine.fromJson(Map<String, Object?> json) {
31+
return LogLine(
32+
severity: json['severity'] as String,
33+
line: json['line'] as String,
34+
);
35+
}
36+
}
37+
38+
final class EstablishSyncStream implements Instruction {
39+
final Map<String, Object?> request;
40+
41+
EstablishSyncStream(this.request);
42+
43+
factory EstablishSyncStream.fromJson(Map<String, Object?> json) {
44+
return EstablishSyncStream(json['request'] as Map<String, Object?>);
45+
}
46+
}
47+
48+
final class UpdateSyncStatus implements Instruction {
49+
final CoreSyncStatus status;
50+
51+
UpdateSyncStatus({required this.status});
52+
53+
factory UpdateSyncStatus.fromJson(Map<String, Object?> json) {
54+
return UpdateSyncStatus(
55+
status:
56+
CoreSyncStatus.fromJson(json['status'] as Map<String, Object?>));
57+
}
58+
}
59+
60+
final class CoreSyncStatus {
61+
final bool connected;
62+
final bool connecting;
63+
final List<SyncPriorityStatus> priorityStatus;
64+
final DownloadProgress? downloading;
65+
66+
CoreSyncStatus({
67+
required this.connected,
68+
required this.connecting,
69+
required this.priorityStatus,
70+
required this.downloading,
71+
});
72+
73+
factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
74+
return CoreSyncStatus(
75+
connected: json['connected'] as bool,
76+
connecting: json['connecting'] as bool,
77+
priorityStatus: [
78+
for (final entry in json['priority_status'] as List)
79+
_priorityStatusFromJson(entry as Map<String, Object?>)
80+
],
81+
downloading: switch (json['downloading']) {
82+
null => null,
83+
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
84+
},
85+
);
86+
}
87+
88+
static SyncPriorityStatus _priorityStatusFromJson(Map<String, Object?> json) {
89+
return (
90+
priority: BucketPriority(json['priority'] as int),
91+
hasSynced: json['has_synced'] as bool?,
92+
lastSyncedAt: switch (json['last_synced_at']) {
93+
null => null,
94+
final lastSyncedAt as int =>
95+
DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000),
96+
},
97+
);
98+
}
99+
}
100+
101+
final class DownloadProgress {
102+
final Map<String, BucketProgress> progress;
103+
104+
DownloadProgress(this.progress);
105+
106+
factory DownloadProgress.fromJson(Map<String, Object?> line) {
107+
return DownloadProgress(line.map((k, v) =>
108+
MapEntry(k, _bucketProgressFromJson(v as Map<String, Object?>))));
109+
}
110+
111+
static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {
112+
return (
113+
priority: BucketPriority(json['priority'] as int),
114+
atLast: json['at_last'] as int,
115+
sinceLast: json['since_last'] as int,
116+
targetCount: json['target_count'] as int,
117+
);
118+
}
119+
}
120+
121+
final class FetchCredentials implements Instruction {
122+
final bool didExpire;
123+
124+
FetchCredentials(this.didExpire);
125+
126+
factory FetchCredentials.fromJson(Map<String, Object?> line) {
127+
return FetchCredentials(line['did_expire'] as bool);
128+
}
129+
}
130+
131+
final class CloseSyncStream implements Instruction {
132+
const CloseSyncStream();
133+
}
134+
135+
final class FlushFileSystem implements Instruction {
136+
const FlushFileSystem();
137+
}
138+
139+
final class DidCompleteSync implements Instruction {
140+
const DidCompleteSync();
141+
}
142+
143+
final class UnknownSyncInstruction implements Instruction {
144+
final Map<String, Object?> source;
145+
146+
UnknownSyncInstruction(this.source);
147+
}

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22

33
import 'package:collection/collection.dart';
4+
import 'package:powersync_core/src/sync/instruction.dart';
45

56
import 'sync_status.dart';
67
import 'bucket_storage.dart';
@@ -79,6 +80,20 @@ final class MutableSyncStatus {
7980
}
8081
}
8182

83+
void applyFromCore(CoreSyncStatus status) {
84+
connected = status.connected;
85+
connecting = status.connecting;
86+
downloading = status.downloading != null;
87+
priorityStatusEntries = status.priorityStatus;
88+
downloadProgress = switch (status.downloading) {
89+
null => null,
90+
final downloading => InternalSyncDownloadProgress(downloading.progress),
91+
};
92+
lastSyncedAt = status.priorityStatus
93+
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)
94+
?.lastSyncedAt;
95+
}
96+
8297
SyncStatus immutableSnapshot() {
8398
return SyncStatus(
8499
connected: connected,

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import 'dart:async';
22
import 'dart:convert' as convert;
3+
import 'dart:typed_data';
34

45
import 'package:http/http.dart' as http;
56
import 'package:logging/logging.dart';
7+
import 'package:logging/logging.dart';
68
import 'package:meta/meta.dart';
79
import 'package:powersync_core/src/abort_controller.dart';
810
import 'package:powersync_core/src/exceptions.dart';
@@ -13,7 +15,7 @@ import 'package:sqlite_async/mutex.dart';
1315

1416
import 'bucket_storage.dart';
1517
import '../crud.dart';
16-
18+
import 'instruction.dart';
1719
import 'internal_connector.dart';
1820
import 'mutable_sync_status.dart';
1921
import 'stream_utils.dart';
@@ -137,7 +139,12 @@ class StreamingSyncImplementation implements StreamingSync {
137139
}
138140
// Protect sync iterations with exclusivity (if a valid Mutex is provided)
139141
await syncMutex.lock(() {
140-
return _streamingSyncIteration();
142+
switch (options.source.syncImplementation) {
143+
case SyncClientImplementation.dart:
144+
return _dartStreamingSyncIteration();
145+
case SyncClientImplementation.rust:
146+
return _rustStreamingSyncIteration();
147+
}
141148
}, timeout: _retryDelay);
142149
} catch (e, stacktrace) {
143150
if (aborted && e is http.ClientException) {
@@ -238,6 +245,7 @@ class StreamingSyncImplementation implements StreamingSync {
238245
}
239246

240247
assert(identical(_activeCrudUpload, completer));
248+
_nonLineSyncEvents.add(const UploadCompleted());
241249
_activeCrudUpload = null;
242250
completer.complete();
243251
});
@@ -281,6 +289,10 @@ class StreamingSyncImplementation implements StreamingSync {
281289
});
282290
}
283291

292+
Future<void> _rustStreamingSyncIteration() async {
293+
await _ActiveRustStreamingIteration(this).syncIteration();
294+
}
295+
284296
Future<(List<BucketRequest>, Map<String, BucketDescription?>)>
285297
_collectLocalBucketState() async {
286298
final bucketEntries = await adapter.getBucketStates();
@@ -295,7 +307,7 @@ class StreamingSyncImplementation implements StreamingSync {
295307
return (initialRequests, localDescriptions);
296308
}
297309

298-
Future<void> _streamingSyncIteration() async {
310+
Future<void> _dartStreamingSyncIteration() async {
299311
var (bucketRequests, bucketMap) = await _collectLocalBucketState();
300312
if (aborted) {
301313
return;
@@ -565,6 +577,91 @@ typedef BucketDescription = ({
565577
int priority,
566578
});
567579

580+
final class _ActiveRustStreamingIteration {
581+
final StreamingSyncImplementation sync;
582+
583+
StreamSubscription<void>? _completedUploads;
584+
final Completer<void> _completedStream = Completer();
585+
586+
_ActiveRustStreamingIteration(this.sync);
587+
588+
Future<void> syncIteration() async {
589+
try {
590+
await _control('start', convert.json.encode(sync.options.params));
591+
assert(_completedStream.isCompleted, 'Should have started streaming');
592+
await _completedStream.future;
593+
} finally {
594+
_completedUploads?.cancel();
595+
await _stop();
596+
}
597+
}
598+
599+
Stream<ReceivedLine> _receiveLines(Object? data) {
600+
return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new);
601+
}
602+
603+
Future<void> _handleLines(EstablishSyncStream request) async {
604+
final events = addBroadcast(
605+
_receiveLines(request.request), sync._nonLineSyncEvents.stream);
606+
607+
listen:
608+
await for (final event in events) {
609+
switch (event) {
610+
case ReceivedLine(line: final Uint8List line):
611+
await _control('line_binary', line);
612+
case ReceivedLine(line: final line as String):
613+
await _control('line_text', line);
614+
case UploadCompleted():
615+
await _control('completed_upload');
616+
case TokenRefreshComplete():
617+
await _control('refreshed_token');
618+
case AbortRequested():
619+
break listen;
620+
}
621+
}
622+
}
623+
624+
Future<void> _stop() => _control('stop');
625+
626+
Future<void> _control(String operation, [Object? payload]) async {
627+
final rawResponse = await sync.adapter.control(operation, payload);
628+
final instructions = convert.json.decode(rawResponse) as List;
629+
630+
for (final instruction in instructions) {
631+
await _handleInstruction(
632+
Instruction.fromJson(instruction as Map<String, Object?>));
633+
}
634+
}
635+
636+
Future<void> _handleInstruction(Instruction instruction) async {
637+
switch (instruction) {
638+
case LogLine(:final severity, :final line):
639+
sync.logger.log(
640+
switch (severity) {
641+
'DEBUG' => Level.FINE,
642+
'INFO' => Level.INFO,
643+
_ => Level.WARNING,
644+
},
645+
line);
646+
case EstablishSyncStream():
647+
_completedStream.complete(_handleLines(instruction));
648+
case UpdateSyncStatus(:final status):
649+
sync._state.updateStatus((m) => m.applyFromCore(status));
650+
case FetchCredentials():
651+
// TODO: Handle this case.
652+
throw UnimplementedError();
653+
case CloseSyncStream():
654+
sync._nonLineSyncEvents.add(AbortRequested());
655+
case FlushFileSystem():
656+
await sync.adapter.flushFileSystem();
657+
case DidCompleteSync():
658+
sync._state.updateStatus((m) => m.downloadError = null);
659+
case UnknownSyncInstruction(:final source):
660+
sync.logger.warning('Unknown instruction: $source');
661+
}
662+
}
663+
}
664+
568665
sealed class SyncEvent {}
569666

570667
final class ReceivedLine implements SyncEvent {

packages/powersync_core/lib/src/sync/sync_status.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ extension type const BucketPriority._(int priorityNumber) {
189189
/// A [Comparator] instance suitable for comparing [BucketPriority] values.
190190
static int comparator(BucketPriority a, BucketPriority b) =>
191191
-a.priorityNumber.compareTo(b.priorityNumber);
192+
193+
/// The priority used by PowerSync to indicate that a full sync was completed.
194+
static const fullSyncPriority = BucketPriority._(2147483647);
192195
}
193196

194197
/// Partial information about the synchronization status for buckets within a

packages/powersync_core/lib/src/web/web_bucket_storage.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,9 @@ class WebBucketStorage extends BucketStorage {
1717
return _webDb.writeTransaction(callback,
1818
lockTimeout: lockTimeout, flush: flush);
1919
}
20+
21+
@override
22+
Future<void> flushFileSystem() {
23+
return _webDb.flush();
24+
}
2025
}

0 commit comments

Comments
 (0)