Skip to content

Commit 9656154

Browse files
committed
Remove Rust-specific parts
1 parent e964975 commit 9656154

File tree

8 files changed

+54
-319
lines changed

8 files changed

+54
-319
lines changed

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 0 additions & 147 deletions
This file was deleted.

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import 'dart:async';
22

33
import 'package:collection/collection.dart';
4-
import 'package:powersync_core/src/sync/instruction.dart';
54

65
import 'sync_status.dart';
76
import 'bucket_storage.dart';
@@ -80,20 +79,6 @@ final class MutableSyncStatus {
8079
}
8180
}
8281

83-
void applyFromCore(CoreSyncStatus status) {
84-
connected = status.connected;
85-
connecting = status.connecting;
86-
downloading = status.downloading != null;
87-
priorityStatusEntries = status.priorityStatus;
88-
downloadProgress = switch (status.downloading) {
89-
null => null,
90-
final downloading => InternalSyncDownloadProgress(downloading.progress),
91-
};
92-
lastSyncedAt = status.priorityStatus
93-
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)
94-
?.lastSyncedAt;
95-
}
96-
9782
SyncStatus immutableSnapshot() {
9883
return SyncStatus(
9984
connected: connected,

packages/powersync_core/lib/src/sync/options.dart

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,10 @@ final class SyncOptions {
2424
/// When set to null, PowerSync defaults to a delay of 5 seconds.
2525
final Duration? retryDelay;
2626

27-
/// The client implementation to use.
28-
///
29-
/// This allows switching between the existing [SyncClientImplementation.dart]
30-
/// implementation and a newer one ([SyncClientImplementation.rust]).
31-
///
32-
/// Note that not setting this field to the default value is experimental.
33-
final SyncClientImplementation syncImplementation;
34-
3527
const SyncOptions({
3628
this.crudThrottleTime,
3729
this.retryDelay,
3830
this.params,
39-
this.syncImplementation = SyncClientImplementation.dart,
4031
});
4132
}
4233

@@ -54,37 +45,13 @@ extension type ResolvedSyncOptions(SyncOptions source) {
5445
crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime,
5546
retryDelay: other.retryDelay ?? retryDelay,
5647
params: other.params ?? params,
57-
syncImplementation: other.syncImplementation,
5848
);
5949

6050
final didChange = !_mapEquality.equals(other.params, params) ||
6151
other.crudThrottleTime != crudThrottleTime ||
62-
other.retryDelay != retryDelay ||
63-
other.syncImplementation != source.syncImplementation;
52+
other.retryDelay != retryDelay;
6453
return (ResolvedSyncOptions(newOptions), didChange);
6554
}
6655

6756
static const _mapEquality = MapEquality<String, dynamic>();
6857
}
69-
70-
/// Supported sync client implementations.
71-
///
72-
/// Not using the default implementation (currently [dart], but this may change
73-
/// in the future) is experimental.
74-
@experimental
75-
enum SyncClientImplementation {
76-
/// Decode and handle data received from the sync service in Dart.
77-
///
78-
/// This is the default option.
79-
dart,
80-
81-
/// An _experimental_ implementation of the sync client that is written in
82-
/// Rust and shared across the PowerSync SDKs.
83-
///
84-
/// Since this client decodes sync lines in Rust instead of parsing them in
85-
/// Dart, it can be more performant than the the default [dart]
86-
/// implementation. Since this option has not seen as much real-world testing,
87-
/// it is marked as __experimental__ at the moment!
88-
@experimental
89-
rust,
90-
}

packages/powersync_core/lib/src/sync/stream_utils.dart

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'dart:convert' as convert;
44

55
/// Inject a broadcast stream into a normal stream.
66
Stream<T> addBroadcast<T>(Stream<T> a, Stream<T> broadcast) {
7+
assert(broadcast.isBroadcast);
78
return mergeStreams([a, broadcast]);
89
}
910

@@ -63,17 +64,27 @@ Stream<T> mergeStreams<T>(List<Stream<T>> streams) {
6364
return controller.stream;
6465
}
6566

66-
/// Given a stream of lines, parse each line as JSON.
67-
Stream<Object?> mapJson(Stream<String> lineInput) {
68-
final jsonInput = lineInput.transform(
69-
StreamTransformer.fromHandlers(handleError: (error, stackTrace, sink) {
70-
/// On Web if the connection is closed, this error will throw, but
71-
/// the stream is never closed. This closes the stream on error.
72-
sink.close();
73-
}, handleData: (String data, EventSink<dynamic> sink) {
74-
sink.add(convert.jsonDecode(data));
75-
}));
76-
return jsonInput;
67+
extension ByteStreamToLines on Stream<List<int>> {
68+
/// Decodes this stream using UTF8 and then splits the text stream by
69+
/// newlines.
70+
Stream<String> get lines {
71+
final textInput = transform(convert.utf8.decoder);
72+
return textInput.transform(const convert.LineSplitter());
73+
}
74+
}
75+
76+
extension StreamToJson on Stream<String> {
77+
Stream<Object?> get parseJson {
78+
final jsonInput = transform(
79+
StreamTransformer.fromHandlers(handleError: (error, stackTrace, sink) {
80+
/// On Web if the connection is closed, this error will throw, but
81+
/// the stream is never closed. This closes the stream on error.
82+
sink.close();
83+
}, handleData: (String data, EventSink<dynamic> sink) {
84+
sink.add(convert.jsonDecode(data));
85+
}));
86+
return jsonInput;
87+
}
7788
}
7889

7990
void pauseAll(List<StreamSubscription<void>> subscriptions) {

0 commit comments

Comments
 (0)