Skip to content

Commit 306cf43

Browse files
unify watched query listeners and subscriptions to consistent API
1 parent 7afab68 commit 306cf43

File tree

22 files changed

+229
-116
lines changed

22 files changed

+229
-116
lines changed

demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
7272
});
7373

7474
// This updates a cache in order to display results instantly on page load.
75-
listsQuery.subscribe({
75+
listsQuery.registerListener({
7676
onData: (data) => {
7777
// Store the data in localStorage for instant caching
7878
localStorage.setItem('listscache', JSON.stringify(data));

demos/yjs-react-supabase-text-collab/powersync.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ replication:
2323
# Multiple connection support is on the roadmap
2424
connections:
2525
- type: postgresql
26-
# The PowerSync server container can access the Postgres DB via the DB's service name.
27-
# In this case the hostname is pg-db
28-
29-
# The connection URI or individual parameters can be specified.
30-
# Individual params take precedence over URI params
3126
uri: postgresql://postgres:postgres@supabase_db_yjs-react-supabase-text-collab:5432/postgres
3227

3328
# SSL settings

demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
7272

7373
let synced = false;
7474

75-
updateQuery.subscribe({
75+
updateQuery.registerListener({
7676
onData: async (diff) => {
7777
for (const added of diff.added) {
7878
Y.applyUpdateV2(doc, b64ToUint8Array(added.update_b64));

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
921921
const builderFactory = WatchedQueryBuilderMap[mode];
922922
if (!builderFactory) {
923923
throw new Error(
924-
`Unsupported watch mode: ${mode}. Please specify on of [${Object.values(IncrementalWatchMode).join(', ')}]`
924+
`Unsupported watch mode: ${mode}. Please specify one of [${Object.values(IncrementalWatchMode).join(', ')}]`
925925
);
926926
}
927927
return builderFactory(this) as WatchedQueryBuilderMap[Mode];
@@ -964,7 +964,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
964964
}
965965
});
966966

967-
const dispose = watchedQuery.subscribe({
967+
const dispose = watchedQuery.registerListener({
968968
onData: (data) => {
969969
if (!data) {
970970
// This should not happen. We only use null for the initial data.

packages/common/src/client/watched/GetAllQuery.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ export type GetAllQueryOptions<RowType = unknown> = {
99
sql: string;
1010
parameters?: ReadonlyArray<unknown>;
1111
/**
12-
* Optional transformer function to convert raw rows into the desired RowType.
12+
* Optional mapper function to convert raw rows into the desired RowType.
1313
* @example
14-
* ```typescript
15-
* (rawRow: Record<string, unknown>) => ({
16-
* id: rawRow.id as string,
14+
* ```javascript
15+
* (rawRow) => ({
16+
* id: rawRow.id,
1717
* created_at: new Date(rawRow.created_at),
1818
* })
1919
* ```
2020
*/
21-
transformer?: (rawRow: Record<string, unknown>) => RowType;
21+
mapper?: (rawRow: Record<string, unknown>) => RowType;
2222
};
2323

2424
/**
@@ -38,8 +38,8 @@ export class GetAllQuery<RowType = unknown> implements WatchCompatibleQuery<RowT
3838
const { db } = options;
3939
const { sql, parameters = [] } = this.compile();
4040
const rawResult = await db.getAll<unknown>(sql, [...parameters]);
41-
if (this.options.transformer) {
42-
return rawResult.map(this.options.transformer);
41+
if (this.options.mapper) {
42+
return rawResult.map(this.options.mapper);
4343
}
4444
return rawResult as RowType[];
4545
}

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { CompiledQuery } from '../../types/types.js';
2-
import { BaseListener, BaseObserverInterface } from '../../utils/BaseObserver.js';
2+
import { BaseListener } from '../../utils/BaseObserver.js';
3+
import { MetaBaseObserverInterface } from '../../utils/MetaBaseObserver.js';
34
import { AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js';
45

6+
/**
7+
* State for {@link WatchedQuery} instances.
8+
*/
59
export interface WatchedQueryState<Data> {
610
/**
711
* Indicates the initial loading state (hard loading).
@@ -57,43 +61,34 @@ export interface WatchedQueryOptions {
5761
reportFetching?: boolean;
5862
}
5963

60-
export enum WatchedQuerySubscriptionEvent {
64+
export enum WatchedQueryListenerEvent {
6165
ON_DATA = 'onData',
6266
ON_ERROR = 'onError',
63-
ON_STATE_CHANGE = 'onStateChange'
64-
}
65-
66-
export interface WatchedQuerySubscription<Data> {
67-
[WatchedQuerySubscriptionEvent.ON_DATA]?: (data: Data) => void | Promise<void>;
68-
[WatchedQuerySubscriptionEvent.ON_ERROR]?: (error: Error) => void | Promise<void>;
69-
[WatchedQuerySubscriptionEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState<Data>) => void | Promise<void>;
67+
ON_STATE_CHANGE = 'onStateChange',
68+
CLOSED = 'closed'
7069
}
7170

72-
export type SubscriptionCounts = Record<WatchedQuerySubscriptionEvent, number> & {
73-
total: number;
74-
};
75-
76-
export interface WatchedQueryListener extends BaseListener {
77-
closed: () => void;
78-
subscriptionsChanged: (counts: SubscriptionCounts) => void;
71+
export interface WatchedQueryListener<Data> extends BaseListener {
72+
[WatchedQueryListenerEvent.ON_DATA]?: (data: Data) => void | Promise<void>;
73+
[WatchedQueryListenerEvent.ON_ERROR]?: (error: Error) => void | Promise<void>;
74+
[WatchedQueryListenerEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState<Data>) => void | Promise<void>;
75+
[WatchedQueryListenerEvent.CLOSED]?: () => void | Promise<void>;
7976
}
8077

8178
export interface WatchedQuery<Data = unknown, Settings extends WatchedQueryOptions = WatchedQueryOptions>
82-
extends BaseObserverInterface<WatchedQueryListener> {
79+
extends MetaBaseObserverInterface<WatchedQueryListener<Data>> {
8380
/**
8481
* Current state of the watched query.
8582
*/
8683
readonly state: WatchedQueryState<Data>;
8784

8885
readonly closed: boolean;
8986

90-
readonly subscriptionCounts: SubscriptionCounts;
91-
9287
/**
9388
* Subscribe to watched query events.
9489
* @returns A function to unsubscribe from the events.
9590
*/
96-
subscribe(subscription: WatchedQuerySubscription<Data>): () => void;
91+
registerListener(listener: WatchedQueryListener<Data>): () => void;
9792

9893
/**
9994
* Updates the underlying query options.

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js';
2-
import { BaseObserver } from '../../../utils/BaseObserver.js';
3-
import {
4-
SubscriptionCounts,
5-
WatchedQuery,
6-
WatchedQueryListener,
7-
WatchedQueryOptions,
8-
WatchedQueryState,
9-
WatchedQuerySubscription,
10-
WatchedQuerySubscriptionEvent
11-
} from '../WatchedQuery.js';
2+
import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js';
3+
import { WatchedQuery, WatchedQueryListener, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
124

135
/**
146
* @internal
@@ -27,7 +19,7 @@ export interface LinkQueryOptions<Data, Settings extends WatchedQueryOptions = W
2719
settings: Settings;
2820
}
2921

30-
type WatchedQueryProcessorListener<Data> = WatchedQuerySubscription<Data> & WatchedQueryListener;
22+
type WatchedQueryProcessorListener<Data> = WatchedQueryListener<Data>;
3123

3224
/**
3325
* Performs underlying watching and yields a stream of results.
@@ -37,7 +29,7 @@ export abstract class AbstractQueryProcessor<
3729
Data = unknown[],
3830
Settings extends WatchedQueryOptions = WatchedQueryOptions
3931
>
40-
extends BaseObserver<WatchedQueryProcessorListener<Data>>
32+
extends MetaBaseObserver<WatchedQueryProcessorListener<Data>>
4133
implements WatchedQuery<Data, Settings>
4234
{
4335
readonly state: WatchedQueryState<Data>;
@@ -51,15 +43,6 @@ export abstract class AbstractQueryProcessor<
5143
return this._closed;
5244
}
5345

54-
get subscriptionCounts() {
55-
const listenersArray = Array.from(this.listeners);
56-
return Object.values(WatchedQuerySubscriptionEvent).reduce((totals: Partial<SubscriptionCounts>, key) => {
57-
totals[key] = listenersArray.filter((l) => !!l[key]).length;
58-
totals.total = (totals.total ?? 0) + totals[key];
59-
return totals;
60-
}, {}) as SubscriptionCounts;
61-
}
62-
6346
constructor(protected options: AbstractQueryProcessorOptions<Data, Settings>) {
6447
super();
6548
this.abortController = new AbortController();
@@ -156,18 +139,6 @@ export abstract class AbstractQueryProcessor<
156139
});
157140
}
158141

159-
subscribe(subscription: WatchedQuerySubscription<Data>): () => void {
160-
// hook in to subscription events in order to report changes
161-
const baseDispose = this.registerListener({ ...subscription });
162-
163-
this.iterateListeners((l) => l.subscriptionsChanged?.(this.subscriptionCounts));
164-
165-
return () => {
166-
baseDispose();
167-
this.iterateListeners((l) => l.subscriptionsChanged?.(this.subscriptionCounts));
168-
};
169-
}
170-
171142
async close() {
172143
await this.initialized;
173144
this.abortController.abort();

packages/common/src/client/watched/processors/ComparisonWatchedQueryBuilder.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,28 @@ import { WatchedQueryBuilder } from '../WatchedQueryBuilder.js';
44
import { WatchedQueryComparator } from './comparators.js';
55
import { ComparisonWatchedQuerySettings, OnChangeQueryProcessor } from './OnChangeQueryProcessor.js';
66

7+
/**
8+
* Options for building incrementally watched queries that compare the result set.
9+
* It uses a comparator to determine if the result set has changed since the last update.
10+
* If the result set has changed, it emits the new result set.
11+
*/
712
export interface ComparisonWatchProcessorOptions<DataType> {
813
comparator?: WatchedQueryComparator<DataType>;
914
watch: ComparisonWatchedQuerySettings<DataType>;
1015
}
1116

17+
/**
18+
* Default implementation of the {@link WatchedQueryComparator} for watched queries.
19+
* It uses JSON stringification to compare the entire result set.
20+
* Array based results should use {@link ArrayComparator} for more efficient item comparison.
21+
*/
22+
export const DEFAULT_WATCHED_QUERY_COMPARATOR: WatchedQueryComparator<any> = {
23+
checkEquality: (a, b) => JSON.stringify(a) === JSON.stringify(b)
24+
};
25+
26+
/**
27+
* Builds an incrementally watched query that emits results after comparing the result set for changes.
28+
*/
1229
export class ComparisonWatchedQueryBuilder implements WatchedQueryBuilder {
1330
constructor(protected db: AbstractPowerSyncDatabase) {}
1431

@@ -41,9 +58,7 @@ export class ComparisonWatchedQueryBuilder implements WatchedQueryBuilder {
4158
): WatchedQuery<DataType, ComparisonWatchedQuerySettings<DataType>> {
4259
return new OnChangeQueryProcessor({
4360
db: this.db,
44-
comparator: options.comparator ?? {
45-
checkEquality: (a, b) => JSON.stringify(a) == JSON.stringify(b)
46-
},
61+
comparator: options.comparator ?? DEFAULT_WATCHED_QUERY_COMPARATOR,
4762
watchOptions: options.watch,
4863
placeholderData: options.watch.placeholderData
4964
});

packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,44 @@
11
import { WatchCompatibleQuery, WatchedQuery, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
22
import { AbstractQueryProcessor, AbstractQueryProcessorOptions, LinkQueryOptions } from './AbstractQueryProcessor.js';
33

4-
export interface Differential<RowType> {
4+
/**
5+
* Represents an updated row in a differential watched query.
6+
* It contains both the current and previous state of the row.
7+
*/
8+
export interface WatchedQueryRowDifferential<RowType> {
59
current: RowType;
610
previous: RowType;
711
}
812

13+
/**
14+
* Represents the result of a watched query that has been differentiated.
15+
* {@link WatchedQueryState.data} is of the {@link WatchedQueryDifferential} form when using the {@link IncrementalWatchMode.DIFFERENTIAL} mode.
16+
*/
917
export interface WatchedQueryDifferential<RowType> {
1018
added: RowType[];
1119
all: RowType[];
1220
removed: RowType[];
13-
updated: Differential<RowType>[];
21+
updated: WatchedQueryRowDifferential<RowType>[];
1422
unchanged: RowType[];
1523
}
1624

17-
export interface Differentiator<RowType> {
25+
/**
26+
* Differentiator for incremental watched queries which allows to identify and compare items in the result set.
27+
*/
28+
export interface WatchedQueryDifferentiator<RowType> {
29+
/**
30+
* Unique identifier for the item.
31+
*/
1832
identify: (item: RowType) => string;
33+
/**
34+
* Generates a key for comparing items with matching identifiers.
35+
*/
1936
compareBy: (item: RowType) => string;
2037
}
2138

39+
/**
40+
* Settings for incremental watched queries using the {@link IncrementalWatchMode.DIFFERENTIAL} mode.
41+
*/
2242
export interface DifferentialWatchedQuerySettings<RowType> extends WatchedQueryOptions {
2343
/**
2444
* The query here must return an array of items that can be differentiated.
@@ -37,11 +57,15 @@ export interface DifferentialWatchedQuerySettings<RowType> extends WatchedQueryO
3757
*/
3858
export interface DifferentialQueryProcessorOptions<RowType>
3959
extends AbstractQueryProcessorOptions<WatchedQueryDifferential<RowType>, DifferentialWatchedQuerySettings<RowType>> {
40-
differentiator: Differentiator<RowType>;
60+
differentiator: WatchedQueryDifferentiator<RowType>;
4161
}
4262

4363
type DataHashMap<RowType> = Map<string, { hash: string; item: RowType }>;
4464

65+
/**
66+
* An empty differential result set.
67+
* This is used as the initial state for differential incrementally watched queries.
68+
*/
4569
export const EMPTY_DIFFERENTIAL = {
4670
added: [],
4771
all: [],

packages/common/src/client/watched/processors/DifferentialWatchedQueryBuilder.ts

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,38 @@ import { WatchedQueryBuilder } from '../WatchedQueryBuilder.js';
44
import {
55
DifferentialQueryProcessor,
66
DifferentialWatchedQuerySettings,
7-
Differentiator,
87
EMPTY_DIFFERENTIAL,
9-
WatchedQueryDifferential
8+
WatchedQueryDifferential,
9+
WatchedQueryDifferentiator
1010
} from './DifferentialQueryProcessor.js';
1111

12+
/**
13+
* Options for creating an incrementally watched query that emits differential results.
14+
*
15+
*/
1216
export type DifferentialWatchedQueryBuilderOptions<RowType> = {
13-
differentiator?: Differentiator<RowType>;
17+
differentiator?: WatchedQueryDifferentiator<RowType>;
1418
watch: DifferentialWatchedQuerySettings<RowType>;
1519
};
1620

21+
/**
22+
* Default implementation of the {@link Differentiator} for watched queries.
23+
* It identifies items by their `id` property if available, otherwise it uses JSON stringification
24+
* of the entire item for identification and comparison.
25+
*/
26+
export const DEFAULT_WATCHED_QUERY_DIFFERENTIATOR: WatchedQueryDifferentiator<any> = {
27+
identify: (item) => {
28+
if (item && typeof item == 'object' && typeof item['id'] == 'string') {
29+
return item['id'];
30+
}
31+
return JSON.stringify(item);
32+
},
33+
compareBy: (item) => JSON.stringify(item)
34+
};
35+
36+
/**
37+
* Builds a watched query which emits differential results based on the provided differentiator.
38+
*/
1739
export class DifferentialWatchedQueryBuilder implements WatchedQueryBuilder {
1840
constructor(protected db: AbstractPowerSyncDatabase) {}
1941

@@ -39,7 +61,7 @@ export class DifferentialWatchedQueryBuilder implements WatchedQueryBuilder {
3961
* FROM
4062
* assets
4163
* ',
42-
* transformer: (raw) => {
64+
* mapper: (raw) => {
4365
* return {
4466
* id: raw.id as string,
4567
* make: raw.make as string
@@ -55,15 +77,7 @@ export class DifferentialWatchedQueryBuilder implements WatchedQueryBuilder {
5577
): WatchedQuery<WatchedQueryDifferential<RowType>, DifferentialWatchedQuerySettings<RowType>> {
5678
return new DifferentialQueryProcessor({
5779
db: this.db,
58-
differentiator: options.differentiator ?? {
59-
identify: (item: RowType) => {
60-
if (item && typeof item == 'object' && typeof item['id'] == 'string') {
61-
return item['id'];
62-
}
63-
return JSON.stringify(item);
64-
},
65-
compareBy: (item: RowType) => JSON.stringify(item)
66-
},
80+
differentiator: options.differentiator ?? DEFAULT_WATCHED_QUERY_DIFFERENTIATOR,
6781
watchOptions: options.watch,
6882
placeholderData: options.watch.placeholderData ?? EMPTY_DIFFERENTIAL
6983
});

packages/common/src/client/watched/processors/comparators.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export type ArrayComparatorOptions<ItemType> = {
1313
};
1414

1515
/**
16-
* Compares array results of watched queries.
16+
* Compares array results of watched queries for incrementally watched queries created in the {@link IncrementalWatchMode.COMPARISON} mode.
1717
*/
1818
export class ArrayComparator<ItemType> implements WatchedQueryComparator<ItemType[]> {
1919
constructor(protected options: ArrayComparatorOptions<ItemType>) {}

0 commit comments

Comments
 (0)