Skip to content

Commit f416231

Browse files
authored
Fix bug with orderBy on duplicate values (#713)
* Unit test that reproduces bug with duplicate values in orderBy * Let requestLimitedSnapshot load all duplicate values * Do not filter changes > max value because we won't load them later since the collection will notice they were already in the collection * Changeset * Fix filter function in test * Fix status subscription * Remove sleeps now that the bug with subscription is fixed * Remove obsolete todo * Do not track loading promise on initial state load
1 parent e0c4e2d commit f416231

File tree

8 files changed

+641
-127
lines changed

8 files changed

+641
-127
lines changed

.changeset/sharp-streets-repair.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Fix bug with orderBy that caused queries to skip duplicate values and/or stall on duplicate values.

packages/db/src/collection/changes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class CollectionChangesManager<
108108
})
109109

110110
if (options.includeInitialState) {
111-
subscription.requestSnapshot()
111+
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
112112
}
113113

114114
// Add to batched listeners

packages/db/src/collection/subscription.ts

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ensureIndexForExpression } from "../indexes/auto-index.js"
2-
import { and, gt, lt } from "../query/builder/functions.js"
2+
import { and, eq, gt, lt } from "../query/builder/functions.js"
33
import { Value } from "../query/ir.js"
44
import { EventEmitter } from "../event-emitter.js"
55
import {
@@ -20,6 +20,7 @@ import type { CollectionImpl } from "./index.js"
2020
type RequestSnapshotOptions = {
2121
where?: BasicExpression<boolean>
2222
optimizedOnly?: boolean
23+
trackLoadSubsetPromise?: boolean
2324
}
2425

2526
type RequestLimitedSnapshotOptions = {
@@ -197,7 +198,10 @@ export class CollectionSubscription
197198
subscription: this,
198199
})
199200

200-
this.trackLoadSubsetPromise(syncResult)
201+
const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true
202+
if (trackLoadSubsetPromise) {
203+
this.trackLoadSubsetPromise(syncResult)
204+
}
201205

202206
// Also load data immediately from the collection
203207
const snapshot = this.collection.currentStateAsChanges(stateOpts)
@@ -218,10 +222,12 @@ export class CollectionSubscription
218222
}
219223

220224
/**
221-
* Sends a snapshot that is limited to the first `limit` rows that fulfill the `where` clause and are bigger than `minValue`.
225+
* Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to `minValue`.
222226
* Requires a range index to be set with `setOrderByIndex` prior to calling this method.
223227
* It uses that range index to load the items in the order of the index.
224-
* Note: it does not send keys that have already been sent before.
228+
* Note 1: it may load more rows than the provided LIMIT because it loads all values equal to `minValue` + limit values greater than `minValue`.
229+
* This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values.
230+
* Note 2: it does not send keys that have already been sent before.
225231
*/
226232
requestLimitedSnapshot({
227233
orderBy,
@@ -257,12 +263,49 @@ export class CollectionSubscription
257263

258264
let biggestObservedValue = minValue
259265
const changes: Array<ChangeMessage<any, string | number>> = []
260-
let keys: Array<string | number> = index.take(limit, minValue, filterFn)
266+
267+
// If we have a minValue we need to handle the case
268+
// where there might be duplicate values equal to minValue that we need to include
269+
// because we can have data like this: [1, 2, 3, 3, 3, 4, 5]
270+
// so if minValue is 3 then the previous snapshot may not have included all 3s
271+
// e.g. if it was offset 0 and limit 3 it would only have loaded the first 3
272+
// so we load all rows equal to minValue first, to be sure we don't skip any duplicate values
273+
let keys: Array<string | number> = []
274+
if (minValue !== undefined) {
275+
// First, get all items with the same value as minValue
276+
const { expression } = orderBy[0]!
277+
const allRowsWithMinValue = this.collection.currentStateAsChanges({
278+
where: eq(expression, new Value(minValue)),
279+
})
280+
281+
if (allRowsWithMinValue) {
282+
const keysWithMinValue = allRowsWithMinValue
283+
.map((change) => change.key)
284+
.filter((key) => !this.sentKeys.has(key) && filterFn(key))
285+
286+
// Add items with the minValue first
287+
keys.push(...keysWithMinValue)
288+
289+
// Then get items greater than minValue
290+
const keysGreaterThanMin = index.take(
291+
limit - keys.length,
292+
minValue,
293+
filterFn
294+
)
295+
keys.push(...keysGreaterThanMin)
296+
} else {
297+
keys = index.take(limit, minValue, filterFn)
298+
}
299+
} else {
300+
keys = index.take(limit, minValue, filterFn)
301+
}
261302

262303
const valuesNeeded = () => Math.max(limit - changes.length, 0)
263304
const collectionExhausted = () => keys.length === 0
264305

265306
while (valuesNeeded() > 0 && !collectionExhausted()) {
307+
const insertedKeys = new Set<string | number>() // Track keys we add to `changes` in this iteration
308+
266309
for (const key of keys) {
267310
const value = this.collection.get(key)!
268311
changes.push({
@@ -271,6 +314,7 @@ export class CollectionSubscription
271314
value,
272315
})
273316
biggestObservedValue = value
317+
insertedKeys.add(key) // Track this key
274318
}
275319

276320
keys = index.take(valuesNeeded(), biggestObservedValue, filterFn)
@@ -296,9 +340,41 @@ export class CollectionSubscription
296340
subscription: this,
297341
})
298342

299-
this.trackLoadSubsetPromise(syncResult)
343+
// Make parallel loadSubset calls for values equal to minValue and values greater than minValue
344+
const promises: Array<Promise<void>> = []
345+
346+
// First promise: load all values equal to minValue
347+
if (typeof minValue !== `undefined`) {
348+
const { expression } = orderBy[0]!
349+
const exactValueFilter = eq(expression, new Value(minValue))
350+
351+
const equalValueResult = this.collection._sync.loadSubset({
352+
where: exactValueFilter,
353+
subscription: this,
354+
})
355+
356+
if (equalValueResult instanceof Promise) {
357+
promises.push(equalValueResult)
358+
}
359+
}
360+
361+
// Second promise: load values greater than minValue
362+
if (syncResult instanceof Promise) {
363+
promises.push(syncResult)
364+
}
365+
366+
// Track the combined promise
367+
if (promises.length > 0) {
368+
const combinedPromise = Promise.all(promises).then(() => {})
369+
this.trackLoadSubsetPromise(combinedPromise)
370+
} else {
371+
this.trackLoadSubsetPromise(syncResult)
372+
}
300373
}
301374

375+
// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function
376+
// and that that also works properly (i.e. does not skip duplicate values)
377+
302378
/**
303379
* Filters and flips changes for keys that have not been sent yet.
304380
* Deletes are filtered out for keys that have not been sent yet.

packages/db/src/query/compiler/order-by.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,11 @@ export function processOrderBy(
179179
orderByOptimizationInfo
180180

181181
setSizeCallback = (getSize: () => number) => {
182-
optimizableOrderByCollections[followRefCollection.id] = {
183-
...optimizableOrderByCollections[followRefCollection.id]!,
184-
dataNeeded: () => {
182+
optimizableOrderByCollections[followRefCollection.id]![`dataNeeded`] =
183+
() => {
185184
const size = getSize()
186185
return Math.max(0, orderByOptimizationInfo!.limit - size)
187-
},
188-
}
186+
}
189187
}
190188
}
191189
}

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 26 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -73,29 +73,33 @@ export class CollectionSubscriber<
7373
)
7474
}
7575

76+
const trackLoadPromise = () => {
77+
// Guard against duplicate transitions
78+
if (!this.subscriptionLoadingPromises.has(subscription)) {
79+
let resolve: () => void
80+
const promise = new Promise<void>((res) => {
81+
resolve = res
82+
})
83+
84+
this.subscriptionLoadingPromises.set(subscription, {
85+
resolve: resolve!,
86+
})
87+
this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
88+
promise
89+
)
90+
}
91+
}
92+
93+
// It can be that we are not yet subscribed when the first `loadSubset` call happens (i.e. the initial query).
94+
// So we also check the status here and if it's `loadingSubset` then we track the load promise
95+
if (subscription.status === `loadingSubset`) {
96+
trackLoadPromise()
97+
}
98+
7699
// Subscribe to subscription status changes to propagate loading state
77100
const statusUnsubscribe = subscription.on(`status:change`, (event) => {
78-
// TODO: For now we are setting this loading state whenever the subscription
79-
// status changes to 'loadingSubset'. But we have discussed it only happening
80-
// when the the live query has it's offset/limit changed, and that triggers the
81-
// subscription to request a snapshot. This will require more work to implement,
82-
// and builds on https://github.com/TanStack/db/pull/663 which this PR
83-
// does not yet depend on.
84101
if (event.status === `loadingSubset`) {
85-
// Guard against duplicate transitions
86-
if (!this.subscriptionLoadingPromises.has(subscription)) {
87-
let resolve: () => void
88-
const promise = new Promise<void>((res) => {
89-
resolve = res
90-
})
91-
92-
this.subscriptionLoadingPromises.set(subscription, {
93-
resolve: resolve!,
94-
})
95-
this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
96-
promise
97-
)
98-
}
102+
trackLoadPromise()
99103
} else {
100104
// status is 'ready'
101105
const deferred = this.subscriptionLoadingPromises.get(subscription)
@@ -176,30 +180,14 @@ export class CollectionSubscriber<
176180
whereExpression: BasicExpression<boolean> | undefined,
177181
orderByInfo: OrderByOptimizationInfo
178182
) {
179-
const { orderBy, offset, limit, comparator, dataNeeded, index } =
180-
orderByInfo
183+
const { orderBy, offset, limit, index } = orderByInfo
181184

182185
const sendChangesInRange = (
183186
changes: Iterable<ChangeMessage<any, string | number>>
184187
) => {
185188
// Split live updates into a delete of the old value and an insert of the new value
186-
// and filter out changes that are bigger than the biggest value we've sent so far
187-
// because they can't affect the topK (and if later we need more data, we will dynamically load more data)
188189
const splittedChanges = splitUpdates(changes)
189-
let filteredChanges = splittedChanges
190-
if (dataNeeded && dataNeeded() === 0) {
191-
// If the topK is full [..., maxSentValue] then we do not need to send changes > maxSentValue
192-
// because they can never make it into the topK.
193-
// However, if the topK isn't full yet, we need to also send changes > maxSentValue
194-
// because they will make it into the topK
195-
filteredChanges = filterChangesSmallerOrEqualToMax(
196-
splittedChanges,
197-
comparator,
198-
this.biggest
199-
)
200-
}
201-
202-
this.sendChangesToPipelineWithTracking(filteredChanges, subscription)
190+
this.sendChangesToPipelineWithTracking(splittedChanges, subscription)
203191
}
204192

205193
// Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far
@@ -395,37 +383,3 @@ function* splitUpdates<
395383
}
396384
}
397385
}
398-
399-
function* filterChanges<
400-
T extends object = Record<string, unknown>,
401-
TKey extends string | number = string | number,
402-
>(
403-
changes: Iterable<ChangeMessage<T, TKey>>,
404-
f: (change: ChangeMessage<T, TKey>) => boolean
405-
): Generator<ChangeMessage<T, TKey>> {
406-
for (const change of changes) {
407-
if (f(change)) {
408-
yield change
409-
}
410-
}
411-
}
412-
413-
/**
414-
* Filters changes to only include those that are smaller or equal to the provided max value
415-
* @param changes - Iterable of changes to filter
416-
* @param comparator - Comparator function to use for filtering
417-
* @param maxValue - Range to filter changes within (range boundaries are exclusive)
418-
* @returns Iterable of changes that fall within the range
419-
*/
420-
function* filterChangesSmallerOrEqualToMax<
421-
T extends object = Record<string, unknown>,
422-
TKey extends string | number = string | number,
423-
>(
424-
changes: Iterable<ChangeMessage<T, TKey>>,
425-
comparator: (a: any, b: any) => number,
426-
maxValue: any
427-
): Generator<ChangeMessage<T, TKey>> {
428-
yield* filterChanges(changes, (change) => {
429-
return !maxValue || comparator(change.value, maxValue) <= 0
430-
})
431-
}

0 commit comments

Comments
 (0)