Skip to content

Commit 171568c

Browse files
committed
Build correct index
1 parent 36a4662 commit 171568c

File tree

4 files changed

+63
-45
lines changed

4 files changed

+63
-45
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import {
22
BucketDescription,
33
BucketPriority,
44
BucketSource,
5-
BucketSourceType,
65
RequestedStream,
76
RequestJwtPayload,
87
RequestParameters,
@@ -60,6 +59,17 @@ export class BucketChecksumState {
6059
*/
6160
private lastChecksums: util.ChecksumMap | null = null;
6261
private lastWriteCheckpoint: bigint | null = null;
62+
/**
63+
* Once we've sent the first full checkpoint line including all {@link util.Checkpoint.streams} that the user is
64+
* subscribed to, we keep an index of the stream names to their index in that array.
65+
*
66+
* This is used to compress the representation of buckets in `checkpoint` and `checkpoint_diff` lines: For buckets
67+
* that are part of sync rules or default streams, we need to include the name of the defining sync rule or definition
68+
* yielding that bucket (so that clients can track progress for default streams).
69+
* But instead of sending the name for each bucket, we use the fact that it's part of the streams array and only send
70+
* their index, reducing the size of those messages.
71+
*/
72+
private streamNameToIndex: Map<string, number> | null = null;
6373

6474
private readonly parameterState: BucketParameterState;
6575

@@ -111,9 +121,7 @@ export class BucketChecksumState {
111121
const { buckets: allBuckets, updatedBuckets } = update;
112122

113123
/** Set of all buckets in this checkpoint. */
114-
const bucketDescriptionMap = new Map(
115-
allBuckets.map((b) => [b.bucket, this.parameterState.translateResolvedBucket(b)])
116-
);
124+
const bucketDescriptionMap = new Map(allBuckets.map((b) => [b.bucket, b]));
117125

118126
if (bucketDescriptionMap.size > this.context.maxBuckets) {
119127
throw new ServiceError(
@@ -171,6 +179,7 @@ export class BucketChecksumState {
171179
// TODO: If updatedBuckets is present, we can use that to more efficiently calculate a diff,
172180
// and avoid any unnecessary loops through the entire list of buckets.
173181
const diff = util.checksumsDiff(this.lastChecksums, checksumMap);
182+
const streamNameToIndex = this.streamNameToIndex!;
174183

175184
if (
176185
this.lastWriteCheckpoint == writeCheckpoint &&
@@ -195,7 +204,7 @@ export class BucketChecksumState {
195204

196205
const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({
197206
...e,
198-
...bucketDescriptionMap.get(e.bucket)!
207+
...this.parameterState.translateResolvedBucket(bucketDescriptionMap.get(e.bucket)!, streamNameToIndex)
199208
}));
200209
bucketsToFetch = [...generateBucketsToFetch].map((b) => {
201210
return {
@@ -236,8 +245,13 @@ export class BucketChecksumState {
236245
bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority }));
237246

238247
const subscriptions: util.StreamDescription[] = [];
248+
const streamNameToIndex = new Map<string, number>();
249+
this.streamNameToIndex = streamNameToIndex;
250+
239251
for (const source of this.parameterState.syncRules.bucketSources) {
240252
if (this.parameterState.isSubscribedToStream(source)) {
253+
streamNameToIndex.set(source.name, subscriptions.length);
254+
241255
subscriptions.push({
242256
name: source.name,
243257
is_default: source.subscribedToByDefault
@@ -251,7 +265,7 @@ export class BucketChecksumState {
251265
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
252266
buckets: [...checksumMap.values()].map((e) => ({
253267
...e,
254-
...bucketDescriptionMap.get(e.bucket)!
268+
...this.parameterState.translateResolvedBucket(bucketDescriptionMap.get(e.bucket)!, streamNameToIndex)
255269
})),
256270
streams: subscriptions
257271
}
@@ -426,8 +440,11 @@ export class BucketParameterState {
426440
/**
427441
* Translates an internal sync-rules {@link ResolvedBucket} instance to the public
428442
* {@link util.ClientBucketDescription}.
443+
*
444+
* @param lookupIndex A map from stream names to their index in {@link util.Checkpoint.streams}. These are used to
445+
* reference default buckets by their stream index instead of duplicating the name on wire.
429446
*/
430-
translateResolvedBucket(description: ResolvedBucket): util.ClientBucketDescription {
447+
translateResolvedBucket(description: ResolvedBucket, lookupIndex: Map<string, number>): util.ClientBucketDescription {
431448
// If the client is overriding the priority of any stream that yields this bucket, sync the bucket with that
432449
// priority.
433450
let priorityOverride: BucketPriority | null = null;
@@ -449,7 +466,8 @@ export class BucketParameterState {
449466
priority: priorityOverride ?? description.priority,
450467
subscriptions: description.inclusion_reasons.map((reason) => {
451468
if (reason == 'default') {
452-
return { default: 0 }; // TODO
469+
const stream = description.definition;
470+
return { default: lookupIndex.get(stream)! };
453471
} else {
454472
return reason.subscription;
455473
}

packages/sync-rules/test/src/parameter_queries.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,11 @@ describe('parameter queries', () => {
8484
// We _do_ need to care about the bucket string representation.
8585
expect(
8686
query.resolveBucketDescriptions([{ int1: 314, float1: 3.14, float2: 314 }], normalizeTokenParameters({}))
87-
).toEqual([{ bucket: 'mybucket[314,3.14,314]', definition: 'mybucket', priority: 3 }]);
87+
).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]);
8888

8989
expect(
9090
query.resolveBucketDescriptions([{ int1: 314n, float1: 3.14, float2: 314 }], normalizeTokenParameters({}))
91-
).toEqual([{ bucket: 'mybucket[314,3.14,314]', definition: 'mybucket', priority: 3 }]);
91+
).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]);
9292
});
9393

9494
test('plain token_parameter (baseline)', () => {
@@ -365,7 +365,7 @@ describe('parameter queries', () => {
365365
[{ user_id: 'user1' }],
366366
normalizeTokenParameters({ user_id: 'user1', is_admin: true })
367367
)
368-
).toEqual([{ bucket: 'mybucket["user1",1]', definition: 'mybucket', priority: 3 }]);
368+
).toEqual([{ bucket: 'mybucket["user1",1]', priority: 3 }]);
369369
});
370370

371371
test('case-sensitive parameter queries (1)', () => {

packages/sync-rules/test/src/static_parameter_queries.test.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ describe('static parameter queries', () => {
1010
expect(query.errors).toEqual([]);
1111
expect(query.bucketParameters!).toEqual(['user_id']);
1212
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([
13-
{ bucket: 'mybucket["user1"]', definition: 'mybucket', priority: 3 }
13+
{ bucket: 'mybucket["user1"]', priority: 3 }
1414
]);
1515
});
1616

@@ -20,7 +20,7 @@ describe('static parameter queries', () => {
2020
expect(query.errors).toEqual([]);
2121
expect(query.bucketParameters!).toEqual([]);
2222
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([
23-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
23+
{ bucket: 'mybucket[]', priority: 3 }
2424
]);
2525
});
2626

@@ -29,7 +29,7 @@ describe('static parameter queries', () => {
2929
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
3030
expect(query.errors).toEqual([]);
3131
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([
32-
{ bucket: 'mybucket["user1"]', definition: 'mybucket', priority: 3 }
32+
{ bucket: 'mybucket["user1"]', priority: 3 }
3333
]);
3434
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual(
3535
[]
@@ -41,7 +41,7 @@ describe('static parameter queries', () => {
4141
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
4242
expect(query.errors).toEqual([]);
4343
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([
44-
{ bucket: 'mybucket["USER1"]', definition: 'mybucket', priority: 3 }
44+
{ bucket: 'mybucket["USER1"]', priority: 3 }
4545
]);
4646
expect(query.bucketParameters!).toEqual(['upper_id']);
4747
});
@@ -51,7 +51,7 @@ describe('static parameter queries', () => {
5151
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
5252
expect(query.errors).toEqual([]);
5353
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'admin' }))).toEqual([
54-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
54+
{ bucket: 'mybucket[]', priority: 3 }
5555
]);
5656
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'user' }))).toEqual([]);
5757
});
@@ -61,7 +61,7 @@ describe('static parameter queries', () => {
6161
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
6262
expect(query.errors).toEqual([]);
6363
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't1' }))).toEqual([
64-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
64+
{ bucket: 'mybucket[]', priority: 3 }
6565
]);
6666
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't2' }))).toEqual([]);
6767
});
@@ -80,7 +80,7 @@ describe('static parameter queries', () => {
8080
expect(query.errors).toEqual([]);
8181

8282
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({}, { org_id: 'test' }))).toEqual([
83-
{ bucket: 'mybucket["test"]', definition: 'mybucket', priority: 3 }
83+
{ bucket: 'mybucket["test"]', priority: 3 }
8484
]);
8585
});
8686

@@ -91,7 +91,7 @@ describe('static parameter queries', () => {
9191
expect(query.bucketParameters).toEqual(['user_id']);
9292

9393
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([
94-
{ bucket: 'mybucket["user1"]', definition: 'mybucket', priority: 3 }
94+
{ bucket: 'mybucket["user1"]', priority: 3 }
9595
]);
9696
});
9797

@@ -102,7 +102,7 @@ describe('static parameter queries', () => {
102102
expect(query.bucketParameters).toEqual(['user_id']);
103103

104104
expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([
105-
{ bucket: 'mybucket["user1"]', definition: 'mybucket', priority: 3 }
105+
{ bucket: 'mybucket["user1"]', priority: 3 }
106106
]);
107107
});
108108

@@ -111,7 +111,7 @@ describe('static parameter queries', () => {
111111
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
112112
expect(query.errors).toEqual([]);
113113
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([
114-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
114+
{ bucket: 'mybucket[]', priority: 3 }
115115
]);
116116
});
117117

@@ -120,7 +120,7 @@ describe('static parameter queries', () => {
120120
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
121121
expect(query.errors).toEqual([]);
122122
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([
123-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
123+
{ bucket: 'mybucket[]', priority: 3 }
124124
]);
125125
});
126126

@@ -136,7 +136,7 @@ describe('static parameter queries', () => {
136136
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
137137
expect(query.errors).toEqual([]);
138138
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([
139-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
139+
{ bucket: 'mybucket[]', priority: 3 }
140140
]);
141141
});
142142

@@ -147,10 +147,10 @@ describe('static parameter queries', () => {
147147
expect(query.errors).toEqual([]);
148148
expect(
149149
query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {}))
150-
).toEqual([{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 }]);
150+
).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]);
151151
expect(
152152
query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {}))
153-
).toEqual([{ bucket: 'mybucket[0]', definition: 'mybucket', priority: 3 }]);
153+
).toEqual([{ bucket: 'mybucket[0]', priority: 3 }]);
154154
});
155155

156156
test('IN for permissions in request.jwt() (2)', function () {
@@ -160,7 +160,7 @@ describe('static parameter queries', () => {
160160
expect(query.errors).toEqual([]);
161161
expect(
162162
query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {}))
163-
).toEqual([{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }]);
163+
).toEqual([{ bucket: 'mybucket[]', priority: 3 }]);
164164
expect(
165165
query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {}))
166166
).toEqual([]);
@@ -171,7 +171,7 @@ describe('static parameter queries', () => {
171171
const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery;
172172
expect(query.errors).toEqual([]);
173173
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superuser' }, {}))).toEqual([
174-
{ bucket: 'mybucket[]', definition: 'mybucket', priority: 3 }
174+
{ bucket: 'mybucket[]', priority: 3 }
175175
]);
176176
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superadmin' }, {}))).toEqual([]);
177177
});

packages/sync-rules/test/src/table_valued_function_queries.test.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ describe('table-valued function queries', () => {
1919
expect(query.bucketParameters).toEqual(['v']);
2020

2121
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([
22-
{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 },
23-
{ bucket: 'mybucket[2]', definition: 'mybucket', priority: 3 },
24-
{ bucket: 'mybucket[3]', definition: 'mybucket', priority: 3 }
22+
{ bucket: 'mybucket[1]', priority: 3 },
23+
{ bucket: 'mybucket[2]', priority: 3 },
24+
{ bucket: 'mybucket[3]', priority: 3 }
2525
]);
2626
});
2727

@@ -32,9 +32,9 @@ describe('table-valued function queries', () => {
3232
expect(query.bucketParameters).toEqual(['v']);
3333

3434
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([
35-
{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 },
36-
{ bucket: 'mybucket[2]', definition: 'mybucket', priority: 3 },
37-
{ bucket: 'mybucket[3]', definition: 'mybucket', priority: 3 }
35+
{ bucket: 'mybucket[1]', priority: 3 },
36+
{ bucket: 'mybucket[2]', priority: 3 },
37+
{ bucket: 'mybucket[3]', priority: 3 }
3838
]);
3939
});
4040

@@ -88,9 +88,9 @@ describe('table-valued function queries', () => {
8888
expect(query.bucketParameters).toEqual(['value']);
8989

9090
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([
91-
{ bucket: 'mybucket["a"]', definition: 'mybucket', priority: 3 },
92-
{ bucket: 'mybucket["b"]', definition: 'mybucket', priority: 3 },
93-
{ bucket: 'mybucket["c"]', definition: 'mybucket', priority: 3 }
91+
{ bucket: 'mybucket["a"]', priority: 3 },
92+
{ bucket: 'mybucket["b"]', priority: 3 },
93+
{ bucket: 'mybucket["c"]', priority: 3 }
9494
]);
9595
});
9696

@@ -109,9 +109,9 @@ describe('table-valued function queries', () => {
109109
expect(query.bucketParameters).toEqual(['value']);
110110

111111
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([
112-
{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 },
113-
{ bucket: 'mybucket[2]', definition: 'mybucket', priority: 3 },
114-
{ bucket: 'mybucket[3]', definition: 'mybucket', priority: 3 }
112+
{ bucket: 'mybucket[1]', priority: 3 },
113+
{ bucket: 'mybucket[2]', priority: 3 },
114+
{ bucket: 'mybucket[3]', priority: 3 }
115115
]);
116116
});
117117

@@ -130,9 +130,9 @@ describe('table-valued function queries', () => {
130130
expect(query.bucketParameters).toEqual(['value']);
131131

132132
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([
133-
{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 },
134-
{ bucket: 'mybucket[2]', definition: 'mybucket', priority: 3 },
135-
{ bucket: 'mybucket[3]', definition: 'mybucket', priority: 3 }
133+
{ bucket: 'mybucket[1]', priority: 3 },
134+
{ bucket: 'mybucket[2]', priority: 3 },
135+
{ bucket: 'mybucket[3]', priority: 3 }
136136
]);
137137
});
138138

@@ -151,8 +151,8 @@ describe('table-valued function queries', () => {
151151
expect(query.bucketParameters).toEqual(['v']);
152152

153153
expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([
154-
{ bucket: 'mybucket[2]', definition: 'mybucket', priority: 3 },
155-
{ bucket: 'mybucket[3]', definition: 'mybucket', priority: 3 }
154+
{ bucket: 'mybucket[2]', priority: 3 },
155+
{ bucket: 'mybucket[3]', priority: 3 }
156156
]);
157157
});
158158

@@ -184,7 +184,7 @@ describe('table-valued function queries', () => {
184184
{}
185185
)
186186
)
187-
).toEqual([{ bucket: 'mybucket[1]', definition: 'mybucket', priority: 3 }]);
187+
).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]);
188188
});
189189

190190
describe('dangerous queries', function () {

0 commit comments

Comments
 (0)