Skip to content

Commit 4fc6cce

Browse files
committed
backfill: handle transaction retry for vector index backfill
Previously, when backfilling a vector index, a transaction retry would cause backfill failure because the vector index backfill writer would overwrite the values read by the backfill reader with what it wanted to push down to KV so as to avoid new allocations. This worked well so long as there were no transaction retries but would fail to re-encode the index entry on a retry because the writer lost access to the unquantized vector. The quantized vector cannot be reused on a transaction retry because fixups may cause the target partition to change. This patch creates a scratch rowenc.IndexEntry in the backfill helper to store the input vector entry. Before attempting to write the entry, we copy the input IndexEntry to the scratch entry and use that to re-encode the vector, which is still modified in place to limit new allocations. Additionally, this patch switches the writer from using CPut() to CPutAllowingIfNotExists() so that if the backfill job restarts, we don't see the partially written index and fail due to duplicate keys. Informs: #143107 Release note: None
1 parent 65a4ba7 commit 4fc6cce

File tree

5 files changed

+159
-14
lines changed

5 files changed

+159
-14
lines changed

Diff for: pkg/sql/backfill/BUILD.bazel

+11
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,30 @@ go_library(
6060
go_test(
6161
name = "backfill_test",
6262
srcs = [
63+
"backfill_test.go",
6364
"index_backfiller_cols_test.go",
6465
"mvcc_index_merger_test.go",
6566
],
6667
embed = [":backfill"],
6768
deps = [
69+
"//pkg/base",
6870
"//pkg/keys",
6971
"//pkg/kv",
7072
"//pkg/roachpb",
73+
"//pkg/security/securityassets",
74+
"//pkg/security/securitytest",
75+
"//pkg/server",
7176
"//pkg/sql/catalog",
7277
"//pkg/sql/catalog/catenumpb",
7378
"//pkg/sql/catalog/descpb",
7479
"//pkg/sql/catalog/tabledesc",
80+
"//pkg/sql/execinfra",
7581
"//pkg/sql/sem/catid",
82+
"//pkg/testutils/serverutils",
83+
"//pkg/testutils/sqlutils",
84+
"//pkg/util/leaktest",
85+
"//pkg/util/log",
86+
"//pkg/util/randutil",
7687
"@com_github_cockroachdb_errors//:errors",
7788
"@com_github_stretchr_testify//require",
7889
],

Diff for: pkg/sql/backfill/backfill.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -485,17 +485,19 @@ type VectorIndexHelper struct {
485485
// context of the provided transaction. This lookup then gives the leaf partition
486486
// where the index entry is to be inserted and the unquantized vector can then be
487487
// re-encoded to get the properly quantized vector with the new partition's
488-
// centroid.
488+
// centroid. The new key is then returned in the outputEntry. The inputEntry is
489+
// not overwritten in case the transaction has to be retried.
489490
func (vih *VectorIndexHelper) ReEncodeVector(
490-
ctx context.Context, txn *kv.Txn, keyBytes []byte, entry *rowenc.IndexEntry,
491+
ctx context.Context, txn *kv.Txn, inputEntry rowenc.IndexEntry, outputEntry *rowenc.IndexEntry,
491492
) (*rowenc.IndexEntry, error) {
493+
keyBytes := inputEntry.Key[len(vih.indexPrefix):]
492494
key, err := vecencoding.DecodeVectorKey(keyBytes, vih.numPrefixCols)
493495
if err != nil {
494496
return &rowenc.IndexEntry{}, err
495497
}
496498

497499
// Decode vector and suffix bytes from the entry value.
498-
valueBytes, err := entry.Value.GetBytes()
500+
valueBytes, err := inputEntry.Value.GetBytes()
499501
if err != nil {
500502
return &rowenc.IndexEntry{}, err
501503
}
@@ -517,16 +519,16 @@ func (vih *VectorIndexHelper) ReEncodeVector(
517519
panic("expected encoded vector to be of type DBytes")
518520
}
519521

520-
entry.Key = entry.Key[:0]
521-
entry.Key = append(entry.Key, vih.indexPrefix...)
522-
entry.Key = key.Encode(entry.Key)
522+
outputEntry.Key = append(outputEntry.Key[:0], vih.indexPrefix...)
523+
outputEntry.Key = key.Encode(outputEntry.Key)
524+
523525
entryValueLen := vecencoding.EncodedVectorValueLen(quantizedVector.UnsafeBytes(), suffix)
524-
buf := entry.Value.AllocBytes(entryValueLen)[:0]
526+
buf := outputEntry.Value.AllocBytes(entryValueLen)[:0]
525527
vecencoding.EncodeVectorValue(buf, quantizedVector.UnsafeBytes(), suffix)
526528

527-
// entry.Family is left unchanged from the entry we received, originally encoded by rowenc.EncodeSecondaryIndexes()
529+
outputEntry.Family = inputEntry.Family
528530

529-
return entry, nil
531+
return outputEntry, nil
530532
}
531533

532534
// IndexBackfiller is capable of backfilling all the added index.

Diff for: pkg/sql/backfill/backfill_test.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package backfill_test
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
8+
"github.com/cockroachdb/cockroach/pkg/base"
9+
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
10+
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
11+
"github.com/cockroachdb/cockroach/pkg/server"
12+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
13+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
14+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
18+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestMain(m *testing.M) {
23+
securityassets.SetLoader(securitytest.EmbeddedAssets)
24+
randutil.SeedForTests()
25+
serverutils.InitTestServerFactory(server.TestServerFactory)
26+
27+
os.Exit(m.Run())
28+
}
29+
30+
// testRetryError implements pgerror.ClientVisibleRetryError
31+
type testRetryError struct{}
32+
33+
func (e *testRetryError) Error() string {
34+
return "test retry error"
35+
}
36+
37+
func (e *testRetryError) ClientVisibleRetryError() {}
38+
39+
func TestVectorColumnAndIndexBackfill(t *testing.T) {
40+
defer leaktest.AfterTest(t)()
41+
defer log.Scope(t).Close(t)
42+
43+
// Track whether we've injected an error
44+
var errorState struct {
45+
mu syncutil.Mutex
46+
hasErrored bool
47+
}
48+
49+
ctx := context.Background()
50+
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{
51+
Knobs: base.TestingKnobs{
52+
DistSQL: &execinfra.TestingKnobs{
53+
// Inject a retriable error on the first call to the vector index backfiller.
54+
VectorIndexBackfillTxnError: func() error {
55+
errorState.mu.Lock()
56+
defer errorState.mu.Unlock()
57+
if !errorState.hasErrored {
58+
errorState.hasErrored = true
59+
return &testRetryError{}
60+
}
61+
return nil
62+
},
63+
},
64+
},
65+
})
66+
defer srv.Stopper().Stop(ctx)
67+
sqlDB := sqlutils.MakeSQLRunner(db)
68+
69+
// Create a table with a vector column
70+
sqlDB.Exec(t, `
71+
CREATE TABLE vectors (
72+
id INT PRIMARY KEY,
73+
vec VECTOR(3)
74+
)
75+
`)
76+
77+
// Insert 200 rows with random vector data
78+
sqlDB.Exec(t, `
79+
INSERT INTO vectors (id, vec)
80+
SELECT
81+
generate_series(1, 200) as id,
82+
ARRAY[random(), random(), random()]::vector(3) as vec
83+
`)
84+
85+
// Create a vector index on the vector column
86+
sqlDB.Exec(t, `
87+
CREATE VECTOR INDEX vec_idx ON vectors (vec)
88+
`)
89+
90+
// Test vector similarity search and see that the backfiller got at
91+
// least some of the vectors in there.
92+
var matchCount int
93+
sqlDB.QueryRow(t, `
94+
SELECT count(*)
95+
FROM (
96+
SELECT id
97+
FROM vectors@vec_idx
98+
ORDER BY vec <-> ARRAY[0.5, 0.5, 0.5]::vector(3)
99+
LIMIT 200
100+
)
101+
`).Scan(&matchCount)
102+
// There's some non-determinism here where we may not find all 200 vectors.
103+
// I chose 190 as a low water mark to prevent test flakes, but it should really
104+
// be 200 in most cases.
105+
require.Greater(t, matchCount, 190, "Expected to find at least 190 similar vectors")
106+
}

Diff for: pkg/sql/execinfra/server_config.go

+4
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,10 @@ type TestingKnobs struct {
259259
// indexes.
260260
IndexBackfillProgressReportInterval time.Duration
261261

262+
// VectorIndexBackfillTxnError is called during vector index entry backfill to
263+
// simulate a transaction error.
264+
VectorIndexBackfillTxnError func() error
265+
262266
// ForceDiskSpill forces any processors/operators that can fall back to disk
263267
// to fall back to disk immediately.
264268
//

Diff for: pkg/sql/rowexec/indexbackfiller.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ func (ib *indexBackfiller) constructIndexEntries(
182182
}
183183

184184
func (ib *indexBackfiller) maybeReencodeVectorIndexEntry(
185-
ctx context.Context, indexEntry *rowenc.IndexEntry,
185+
ctx context.Context, tmpEntry *rowenc.IndexEntry, indexEntry *rowenc.IndexEntry,
186186
) (bool, error) {
187-
indexID, keyBytes, err := rowenc.DecodeIndexKeyPrefix(ib.flowCtx.EvalCtx.Codec, ib.desc.GetID(), indexEntry.Key)
187+
indexID, _, err := rowenc.DecodeIndexKeyPrefix(ib.flowCtx.EvalCtx.Codec, ib.desc.GetID(), indexEntry.Key)
188188
if err != nil {
189189
return false, err
190190
}
@@ -194,14 +194,35 @@ func (ib *indexBackfiller) maybeReencodeVectorIndexEntry(
194194
return false, nil
195195
}
196196

197+
// Initialize the tmpEntry. This will store the input entry that we are encoding
198+
// so that we can overwrite the indexEntry we were given with the output
199+
// encoding. This allows us to preserve the initial template indexEntry across
200+
// transaction retries.
201+
if tmpEntry.Key == nil {
202+
tmpEntry.Key = make(roachpb.Key, len(indexEntry.Key))
203+
tmpEntry.Value.RawBytes = make([]byte, len(indexEntry.Value.RawBytes))
204+
}
205+
tmpEntry.Key = append(tmpEntry.Key[:0], indexEntry.Key...)
206+
tmpEntry.Value.RawBytes = append(tmpEntry.Value.RawBytes[:0], indexEntry.Value.RawBytes...)
207+
tmpEntry.Family = indexEntry.Family
208+
197209
err = ib.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
198-
entry, err := vih.ReEncodeVector(ctx, txn.KV(), keyBytes, indexEntry)
210+
entry, err := vih.ReEncodeVector(ctx, txn.KV(), *tmpEntry, indexEntry)
199211
if err != nil {
200212
return err
201213
}
202214

215+
if ib.flowCtx.Cfg.TestingKnobs.VectorIndexBackfillTxnError != nil {
216+
err = ib.flowCtx.Cfg.TestingKnobs.VectorIndexBackfillTxnError()
217+
if err != nil {
218+
return err
219+
}
220+
}
221+
203222
b := txn.KV().NewBatch()
204-
b.CPut(entry.Key, &entry.Value, nil)
223+
// If the backfill job was interrupted and restarted, we may redo work, so allow
224+
// that we may see duplicate keys here.
225+
b.CPutAllowingIfNotExists(entry.Key, &entry.Value, entry.Value.TagAndDataBytes())
205226
return txn.KV().Run(ctx, b)
206227
})
207228
if err != nil {
@@ -289,6 +310,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
289310
g.GoCtx(func(ctx context.Context) error {
290311
defer close(stopProgress)
291312

313+
var vectorInputEntry rowenc.IndexEntry
292314
for indexBatch := range indexEntryCh {
293315
for _, indexEntry := range indexBatch.indexEntries {
294316
// If there is at least one vector index being written, we need to check to see
@@ -298,7 +320,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
298320
// TODO(mw5h): batch up multiple index entries into a single batch.
299321
// As is, we insert a single vector per batch, which is very slow.
300322
if len(ib.VectorIndexes) > 0 {
301-
isVectorIndex, err := ib.maybeReencodeVectorIndexEntry(ctx, &indexEntry)
323+
isVectorIndex, err := ib.maybeReencodeVectorIndexEntry(ctx, &vectorInputEntry, &indexEntry)
302324
if err != nil {
303325
return err
304326
} else if isVectorIndex {

0 commit comments

Comments
 (0)