Skip to content

Commit 1421bb6

Browse files
committed
Add new sync implementation
1 parent cea388b commit 1421bb6

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

packages/powersync_core/lib/src/sync/options.dart

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,43 @@ final class SyncOptions {
2424
/// When set to null, PowerSync defaults to a delay of 5 seconds.
2525
final Duration? retryDelay;
2626

27+
/// The [SyncClientImplementation] to use.
28+
final SyncClientImplementation syncImplementation;
29+
2730
const SyncOptions({
2831
this.crudThrottleTime,
2932
this.retryDelay,
3033
this.params,
34+
this.syncImplementation = SyncClientImplementation.defaultClient,
3135
});
3236
}
3337

38+
/// The PowerSync SDK offers two different implementations for receiving sync
39+
/// lines: One handling most logic in Dart, and a newer one offloading that work
40+
/// to the native PowerSync extension.
41+
enum SyncClientImplementation {
42+
/// A sync implementation that decodes and handles sync lines in Dart.
43+
@Deprecated(
44+
"Don't use SyncClientImplementation.dart directly, "
45+
"use SyncClientImplementation.defaultClient instead.",
46+
)
47+
dart,
48+
49+
/// An experimental sync implementation that parses and handles sync lines in
50+
/// the native PowerSync core extensions.
51+
///
52+
/// This implementation can be more performant than the Dart implementation,
53+
/// and supports receiving sync lines in a more efficient format.
54+
///
55+
/// Note that this option is currently experimental.
56+
@experimental
57+
rust;
58+
59+
/// The default sync client implementation to use.
60+
// ignore: deprecated_member_use_from_same_package
61+
static const defaultClient = dart;
62+
}
63+
3464
@internal
3565
extension type ResolvedSyncOptions(SyncOptions source) {
3666
Duration get crudThrottleTime =>

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import 'dart:typed_data';
44

55
import 'package:http/http.dart' as http;
66
import 'package:logging/logging.dart';
7-
import 'package:logging/logging.dart';
87
import 'package:meta/meta.dart';
98
import 'package:powersync_core/src/abort_controller.dart';
109
import 'package:powersync_core/src/exceptions.dart';
@@ -444,6 +443,7 @@ class StreamingSyncImplementation implements StreamingSync {
444443
case UploadCompleted():
445444
// Only relevant for the Rust sync implementation.
446445
break;
446+
case AbortCurrentIteration():
447447
case TokenRefreshComplete():
448448
// We have a new token, so stop the iteration.
449449
haveInvalidated = true;
@@ -579,6 +579,7 @@ typedef BucketDescription = ({
579579

580580
final class _ActiveRustStreamingIteration {
581581
final StreamingSyncImplementation sync;
582+
var _isActive = true;
582583

583584
StreamSubscription<void>? _completedUploads;
584585
final Completer<void> _completedStream = Completer();
@@ -591,6 +592,7 @@ final class _ActiveRustStreamingIteration {
591592
assert(_completedStream.isCompleted, 'Should have started streaming');
592593
await _completedStream.future;
593594
} finally {
595+
_isActive = true;
594596
_completedUploads?.cancel();
595597
await _stop();
596598
}
@@ -604,7 +606,7 @@ final class _ActiveRustStreamingIteration {
604606
final events = addBroadcast(
605607
_receiveLines(request.request), sync._nonLineSyncEvents.stream);
606608

607-
listen:
609+
loop:
608610
await for (final event in events) {
609611
switch (event) {
610612
case ReceivedLine(line: final Uint8List line):
@@ -613,10 +615,10 @@ final class _ActiveRustStreamingIteration {
613615
await _control('line_text', line);
614616
case UploadCompleted():
615617
await _control('completed_upload');
618+
case AbortCurrentIteration():
619+
break loop;
616620
case TokenRefreshComplete():
617621
await _control('refreshed_token');
618-
case AbortRequested():
619-
break listen;
620622
}
621623
}
622624
}
@@ -647,11 +649,20 @@ final class _ActiveRustStreamingIteration {
647649
_completedStream.complete(_handleLines(instruction));
648650
case UpdateSyncStatus(:final status):
649651
sync._state.updateStatus((m) => m.applyFromCore(status));
650-
case FetchCredentials():
651-
// TODO: Handle this case.
652-
throw UnimplementedError();
652+
case FetchCredentials(:final didExpire):
653+
if (didExpire) {
654+
await sync.connector.prefetchCredentials(invalidate: true);
655+
} else {
656+
sync.connector.prefetchCredentials().then((_) {
657+
if (_isActive && !sync.aborted) {
658+
sync._nonLineSyncEvents.add(const TokenRefreshComplete());
659+
}
660+
}, onError: (Object e, StackTrace s) {
661+
sync.logger.warning('Could not prefetch credentials', e, s);
662+
});
663+
}
653664
case CloseSyncStream():
654-
sync._nonLineSyncEvents.add(AbortRequested());
665+
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
655666
case FlushFileSystem():
656667
await sync.adapter.flushFileSystem();
657668
case DidCompleteSync():
@@ -677,3 +688,7 @@ final class UploadCompleted implements SyncEvent {
677688
final class TokenRefreshComplete implements SyncEvent {
678689
const TokenRefreshComplete();
679690
}
691+
692+
final class AbortCurrentIteration implements SyncEvent {
693+
const AbortCurrentIteration();
694+
}

packages/powersync_core/lib/src/web/sync_worker.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ class _ConnectedClient {
8080
final encodedParams =>
8181
jsonDecode(encodedParams) as Map<String, Object?>,
8282
},
83+
syncImplementation: switch (request.implementationName) {
84+
null => SyncClientImplementation.defaultClient,
85+
final name => SyncClientImplementation.values.byName(name),
86+
},
8387
);
8488

8589
_runner = _worker.referenceSyncTask(

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,16 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
6969
required String databaseName,
7070
required int crudThrottleTimeMs,
7171
required int requestId,
72-
required int? retryDelayMs,
72+
required int retryDelayMs,
73+
required String implementationName,
7374
String? syncParamsEncoded,
7475
});
7576

7677
external String get databaseName;
7778
external int get requestId;
7879
external int get crudThrottleTimeMs;
7980
external int? get retryDelayMs;
81+
external String? get implementationName;
8082
external String? get syncParamsEncoded;
8183
}
8284

@@ -417,6 +419,7 @@ final class WorkerCommunicationChannel {
417419
crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds,
418420
retryDelayMs: options.retryDelay.inMilliseconds,
419421
requestId: id,
422+
implementationName: options.source.syncImplementation.name,
420423
syncParamsEncoded: switch (options.source.params) {
421424
null => null,
422425
final params => jsonEncode(params),

0 commit comments

Comments
 (0)