Skip to content

Commit 7f2780a

Browse files
authored
Merge pull request #281 from powersync-ja/sync-client-refactor
Refactor sync client and options
2 parents f5de535 + 1749b3c commit 7f2780a

22 files changed

+562
-314
lines changed

demos/benchmarks/lib/powersync.dart

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,15 @@ Future<String> getDatabasePath() async {
8888

8989
var currentConnector = BackendConnector();
9090

91+
const options = SyncOptions(
92+
params: {'size_bucket': AppConfig.sizeBucket},
93+
crudThrottleTime: Duration(milliseconds: 1),
94+
);
95+
9196
Future<void> resync() async {
9297
await db.disconnectAndClear();
9398
timer.start(db);
94-
db.connect(
95-
connector: currentConnector,
96-
params: {'size_bucket': AppConfig.sizeBucket},
97-
crudThrottleTime: const Duration(milliseconds: 1));
99+
db.connect(connector: currentConnector, options: options);
98100
}
99101

100102
Future<void> openDatabase() async {
@@ -106,8 +108,5 @@ Future<void> openDatabase() async {
106108
BenchmarkItem.updateItemBenchmarks();
107109

108110
timer.start(db);
109-
db.connect(
110-
connector: currentConnector,
111-
params: {'size_bucket': AppConfig.sizeBucket},
112-
crudThrottleTime: const Duration(milliseconds: 1));
111+
db.connect(connector: currentConnector, options: options);
113112
}

packages/powersync_core/lib/powersync_core.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export 'src/exceptions.dart';
1010
export 'src/log.dart';
1111
export 'src/open_factory.dart';
1212
export 'src/schema.dart';
13+
export 'src/sync/options.dart' hide ResolvedSyncOptions;
1314
export 'src/sync/sync_status.dart'
1415
hide BucketProgress, InternalSyncDownloadProgress;
1516
export 'src/uuid.dart';

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import 'package:powersync_core/src/log_internal.dart';
1515
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
1616
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
1717
import 'package:powersync_core/src/schema.dart';
18+
import 'package:powersync_core/src/sync/internal_connector.dart';
19+
import 'package:powersync_core/src/sync/options.dart';
1820
import 'package:powersync_core/src/sync/streaming_sync.dart';
1921
import 'package:powersync_core/src/sync/sync_status.dart';
2022
import 'package:sqlite_async/sqlite3_common.dart';
@@ -118,10 +120,9 @@ class PowerSyncDatabaseImpl
118120
@internal
119121
Future<void> connectInternal({
120122
required PowerSyncBackendConnector connector,
121-
required Duration crudThrottleTime,
123+
required SyncOptions options,
122124
required AbortController abort,
123125
required Zone asyncWorkZone,
124-
Map<String, dynamic>? params,
125126
}) async {
126127
final dbRef = database.isolateConnectionFactory();
127128

@@ -134,6 +135,7 @@ class PowerSyncDatabaseImpl
134135
SendPort? initPort;
135136
final hasInitPort = Completer<void>();
136137
final receivedIsolateExit = Completer<void>();
138+
final resolved = ResolvedSyncOptions(options);
137139

138140
Future<void> waitForShutdown() async {
139141
// Only complete the abortion signal after the isolate shuts down. This
@@ -161,22 +163,27 @@ class PowerSyncDatabaseImpl
161163
Future<void> handleMessage(Object? data) async {
162164
if (data is List) {
163165
String action = data[0] as String;
164-
if (action == "getCredentials") {
166+
if (action == "getCredentialsCached") {
165167
await (data[1] as PortCompleter).handle(() async {
166168
final token = await connector.getCredentialsCached();
167169
logger.fine('Credentials: $token');
168170
return token;
169171
});
170-
} else if (action == "invalidateCredentials") {
172+
} else if (action == "prefetchCredentials") {
171173
logger.fine('Refreshing credentials');
174+
final invalidate = data[2] as bool;
175+
172176
await (data[1] as PortCompleter).handle(() async {
173-
await connector.prefetchCredentials();
177+
if (invalidate) {
178+
connector.invalidateCredentials();
179+
}
180+
return await connector.prefetchCredentials();
174181
});
175182
} else if (action == 'init') {
176183
final port = initPort = data[1] as SendPort;
177184
hasInitPort.complete();
178-
var crudStream =
179-
database.onChange(['ps_crud'], throttle: crudThrottleTime);
185+
var crudStream = database
186+
.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
180187
crudUpdateSubscription = crudStream.listen((event) {
181188
port.send(['update']);
182189
});
@@ -238,8 +245,7 @@ class PowerSyncDatabaseImpl
238245
_PowerSyncDatabaseIsolateArgs(
239246
receiveMessages.sendPort,
240247
dbRef,
241-
retryDelay,
242-
clientParams,
248+
resolved,
243249
crudMutex.shared,
244250
syncMutex.shared,
245251
),
@@ -282,16 +288,14 @@ class PowerSyncDatabaseImpl
282288
class _PowerSyncDatabaseIsolateArgs {
283289
final SendPort sPort;
284290
final IsolateConnectionFactory dbRef;
285-
final Duration retryDelay;
286-
final Map<String, dynamic>? parameters;
291+
final ResolvedSyncOptions options;
287292
final SerializedMutex crudMutex;
288293
final SerializedMutex syncMutex;
289294

290295
_PowerSyncDatabaseIsolateArgs(
291296
this.sPort,
292297
this.dbRef,
293-
this.retryDelay,
294-
this.parameters,
298+
this.options,
295299
this.crudMutex,
296300
this.syncMutex,
297301
);
@@ -362,15 +366,16 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
362366
sPort.send(['log', copy]);
363367
});
364368

365-
Future<PowerSyncCredentials?> loadCredentials() async {
369+
Future<PowerSyncCredentials?> getCredentialsCached() async {
366370
final r = IsolateResult<PowerSyncCredentials?>();
367-
sPort.send(['getCredentials', r.completer]);
371+
sPort.send(['getCredentialsCached', r.completer]);
368372
return r.future;
369373
}
370374

371-
Future<void> invalidateCredentials() async {
372-
final r = IsolateResult<void>();
373-
sPort.send(['invalidateCredentials', r.completer]);
375+
Future<PowerSyncCredentials?> prefetchCredentials(
376+
{required bool invalidate}) async {
377+
final r = IsolateResult<PowerSyncCredentials?>();
378+
sPort.send(['prefetchCredentials', r.completer, invalidate]);
374379
return r.future;
375380
}
376381

@@ -388,13 +393,14 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
388393
final storage = BucketStorage(connection);
389394
final sync = StreamingSyncImplementation(
390395
adapter: storage,
391-
credentialsCallback: loadCredentials,
392-
invalidCredentialsCallback: invalidateCredentials,
393-
uploadCrud: uploadCrud,
396+
connector: InternalConnector(
397+
getCredentialsCached: getCredentialsCached,
398+
prefetchCredentials: prefetchCredentials,
399+
uploadCrud: uploadCrud,
400+
),
394401
crudUpdateTriggerStream: crudUpdateController.stream,
395-
retryDelay: args.retryDelay,
402+
options: args.options,
396403
client: http.Client(),
397-
syncParameters: args.parameters,
398404
crudMutex: crudMutex,
399405
syncMutex: syncMutex,
400406
);
@@ -429,6 +435,6 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
429435
// This should be rare - any uncaught error is a bug. And in most cases,
430436
// it should occur after the database is already open.
431437
await shutdown();
432-
throw error;
438+
Error.throwWithStackTrace(error, stack);
433439
});
434440
}

packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'package:powersync_core/sqlite_async.dart';
66
import 'package:powersync_core/src/abort_controller.dart';
77
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
88
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
9+
import '../sync/options.dart';
910
import 'powersync_database.dart';
1011

1112
import '../connector.dart';
@@ -110,12 +111,12 @@ class PowerSyncDatabaseImpl
110111

111112
@override
112113
@internal
113-
Future<void> connectInternal(
114-
{required PowerSyncBackendConnector connector,
115-
required Duration crudThrottleTime,
116-
required AbortController abort,
117-
required Zone asyncWorkZone,
118-
Map<String, dynamic>? params}) {
114+
Future<void> connectInternal({
115+
required PowerSyncBackendConnector connector,
116+
required AbortController abort,
117+
required Zone asyncWorkZone,
118+
required SyncOptions options,
119+
}) {
119120
throw UnimplementedError();
120121
}
121122

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import 'package:powersync_core/src/powersync_update_notification.dart';
1313
import 'package:powersync_core/src/schema.dart';
1414
import 'package:powersync_core/src/schema_logic.dart';
1515
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
16+
import 'package:powersync_core/src/sync/options.dart';
1617
import 'package:powersync_core/src/sync/sync_status.dart';
1718

1819
mixin PowerSyncDatabaseMixin implements SqliteConnection {
@@ -37,6 +38,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
3738
/// Use [attachedLogger] to propagate logs to [Logger.root] for custom logging.
3839
Logger get logger;
3940

41+
@Deprecated("This field is unused, pass params to connect() instead")
4042
Map<String, dynamic>? clientParams;
4143

4244
/// Current connection status.
@@ -72,6 +74,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
7274
/// Delay between retrying failed requests.
7375
/// Defaults to 5 seconds.
7476
/// Only has an effect if changed before calling [connect].
77+
@Deprecated('Set option when calling connect() instead')
7578
Duration retryDelay = const Duration(seconds: 5);
7679

7780
@protected
@@ -269,17 +272,31 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
269272
///
270273
/// The connection is automatically re-opened if it fails for any reason.
271274
///
275+
/// To set sync parameters used in your sync rules (if any), use
276+
/// [SyncOptions.params]. [SyncOptions] can also be used to tune the behavior
277+
/// of the sync client, see that class for more information.
278+
///
272279
/// Status changes are reported on [statusStream].
273280
Future<void> connect({
274281
required PowerSyncBackendConnector connector,
275-
Duration crudThrottleTime = const Duration(milliseconds: 10),
282+
SyncOptions? options,
283+
@Deprecated('Use SyncOptions.crudThrottleTime instead')
284+
Duration? crudThrottleTime,
276285
Map<String, dynamic>? params,
277286
}) async {
278287
// The initialization process acquires a sync connect lock (through
279288
// updateSchema), so ensure the database is ready before we try to acquire
280289
// the lock for the connection.
281290
await initialize();
282291

292+
final resolvedOptions = SyncOptions(
293+
crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime,
294+
// ignore: deprecated_member_use_from_same_package
295+
retryDelay: options?.retryDelay ?? retryDelay,
296+
params: options?.params ?? params,
297+
);
298+
299+
// ignore: deprecated_member_use_from_same_package
283300
clientParams = params;
284301
var thisConnectAborter = AbortController();
285302
final zone = Zone.current;
@@ -294,8 +311,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
294311

295312
await connectInternal(
296313
connector: connector,
297-
crudThrottleTime: crudThrottleTime,
298-
params: params,
314+
options: resolvedOptions,
299315
abort: thisConnectAborter,
300316
// Run follow-up async tasks in the parent zone, a new one is introduced
301317
// while we hold the lock (and async tasks won't hold the sync lock).
@@ -342,17 +358,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
342358
/// [connect] method and should not be called elsewhere.
343359
/// This method will only be called internally when no other sync client is
344360
/// active, so the method should not call [disconnect] itself.
345-
///
346-
/// The [crudThrottleTime] is the throttle time between CRUD operations, it
347-
/// defaults to 10 milliseconds in [connect].
348361
@protected
349362
@internal
350363
Future<void> connectInternal({
351364
required PowerSyncBackendConnector connector,
352-
required Duration crudThrottleTime,
365+
required SyncOptions options,
353366
required AbortController abort,
354367
required Zone asyncWorkZone,
355-
Map<String, dynamic>? params,
356368
});
357369

358370
/// Close the sync connection.

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import 'package:powersync_core/src/log.dart';
1111
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
1212
import 'package:powersync_core/src/open_factory/web/web_open_factory.dart';
1313
import 'package:powersync_core/src/schema.dart';
14+
import 'package:powersync_core/src/sync/internal_connector.dart';
1415
import 'package:powersync_core/src/sync/streaming_sync.dart';
1516
import 'package:sqlite_async/sqlite_async.dart';
1617

18+
import '../../sync/options.dart';
1719
import '../../web/sync_controller.dart';
1820

1921
/// A PowerSync managed database.
@@ -114,13 +116,11 @@ class PowerSyncDatabaseImpl
114116
@internal
115117
Future<void> connectInternal({
116118
required PowerSyncBackendConnector connector,
117-
required Duration crudThrottleTime,
118119
required AbortController abort,
119120
required Zone asyncWorkZone,
120-
Map<String, dynamic>? params,
121+
required SyncOptions options,
121122
}) async {
122-
final crudStream =
123-
database.onChange(['ps_crud'], throttle: crudThrottleTime);
123+
final resolved = ResolvedSyncOptions(options);
124124

125125
final storage = BucketStorage(database);
126126
StreamingSync sync;
@@ -130,25 +130,23 @@ class PowerSyncDatabaseImpl
130130
sync = await SyncWorkerHandle.start(
131131
database: this,
132132
connector: connector,
133-
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
133+
options: options,
134134
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
135-
syncParams: params,
136135
);
137136
} catch (e) {
138137
logger.warning(
139138
'Could not use shared worker for synchronization, falling back to locks.',
140139
e,
141140
);
141+
final crudStream =
142+
database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
142143

143144
sync = StreamingSyncImplementation(
144145
adapter: storage,
145-
credentialsCallback: connector.getCredentialsCached,
146-
invalidCredentialsCallback: connector.prefetchCredentials,
147-
uploadCrud: () => connector.uploadData(this),
146+
connector: InternalConnector.wrap(connector, this),
148147
crudUpdateTriggerStream: crudStream,
149-
retryDelay: Duration(seconds: 3),
148+
options: resolved,
150149
client: BrowserClient(),
151-
syncParameters: params,
152150
// Only allows 1 sync implementation to run at a time per database
153151
// This should be global (across tabs) when using Navigator locks.
154152
identifier: database.openFactory.path,

0 commit comments

Comments
 (0)