Skip to content

Commit e9f2403

Browse files
committed
crosscluster/logical: enable sql writer to run on tables with partial indexes
Epic: none Release note (ops change): LDR can now replicate on tables with partial indexes when using the default on sql writer.
1 parent d3c9cb4 commit e9f2403

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2391,6 +2391,40 @@ func TestMismatchColIDs(t *testing.T) {
23912391
jobutils.WaitForJobToPause(t, sqlA, jobID)
23922392
}
23932393

2394+
func TestPartialIndexes(t *testing.T) {
2395+
defer leaktest.AfterTest(t)()
2396+
skip.UnderDeadlock(t)
2397+
defer log.Scope(t).Close(t)
2398+
2399+
ctx := context.Background()
2400+
2401+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2402+
defer server.Stopper().Stop(ctx)
2403+
2404+
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'sql'")
2405+
2406+
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
2407+
2408+
dbA.Exec(t, "CREATE TABLE foo (pk INT PRIMARY KEY, pi INT, payload STRING)")
2409+
dbA.Exec(t, "CREATE INDEX idx ON foo (pi) WHERE pk > 0")
2410+
dbB.Exec(t, "CREATE TABLE b.foo (pk INT PRIMARY KEY, pi INT, payload STRING)")
2411+
dbB.Exec(t, "CREATE INDEX idx ON b.foo (pi) WHERE pk < 0")
2412+
dbB.Exec(t, "INSERT INTO foo VALUES (1, 2, 'hello')")
2413+
dbB.Exec(t, "INSERT INTO foo VALUES (-1, -2, 'world')")
2414+
2415+
var jobAID jobspb.JobID
2416+
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String()).Scan(&jobAID)
2417+
now := s.Clock().Now()
2418+
WaitUntilReplicatedTime(t, now, dbA, jobAID)
2419+
2420+
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = 2", [][]string{{"1", "2", "hello"}})
2421+
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = -2", [][]string{{"-1", "-2", "world"}})
2422+
2423+
// Check for partial indexes when using legacy kv writer.
2424+
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
2425+
dbA.ExpectErr(t, "cannot create logical replication stream: table foo has a partial index idx", "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String())
2426+
}
2427+
23942428
// TestLogicalReplicationCreationChecks verifies that we check that the table
23952429
// schemas are compatible when creating the replication stream.
23962430
func TestLogicalReplicationCreationChecks(t *testing.T) {
@@ -2442,15 +2476,9 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24422476
dbA.Exec(t, "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, composite_col)")
24432477
expectErr(t, "tab", `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding`)
24442478
replicationtestutils.WaitForAllProducerJobsToFail(t, dbB)
2445-
2446-
// Check for partial indexes.
24472479
dbA.Exec(t, "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk)")
2448-
dbA.Exec(t, "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0")
2449-
expectErr(t, "tab", "cannot create logical replication stream: table tab has a partial index partial_idx")
2450-
replicationtestutils.WaitForAllProducerJobsToFail(t, dbB)
24512480

24522481
// Check for hash sharded indexes.
2453-
dbA.Exec(t, "DROP INDEX partial_idx")
24542482
dbA.Exec(t, "CREATE INDEX hash_idx ON tab(pk) USING HASH WITH (bucket_count = 4)")
24552483
dbB.Exec(t, "CREATE INDEX hash_idx ON b.tab(pk) USING HASH WITH (bucket_count = 4)")
24562484
expectErr(t, "tab", "tab has a virtual computed column crdb_internal_pk_shard_4 that is a key of index hash_idx")

pkg/sql/catalog/tabledesc/logical_replication_helpers.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func CheckLogicalReplicationCompatibility(
3939
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
4040
}
4141

42-
if err := checkExpressionEvaluation(dst); err != nil {
42+
if err := checkExpressionEvaluation(dst, requireKvWriterCompatible); err != nil {
4343
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
4444
}
4545
if err := checkUniqueWithoutIndex(dst); err != nil {
@@ -87,11 +87,13 @@ func checkOutboundReferences(dst *descpb.TableDescriptor) error {
8787
// expressions. The writer expects to receive the full set of columns, even the
8888
// computed ones, along with a list of columns that we've already determined
8989
// should be updated.
90-
func checkExpressionEvaluation(dst *descpb.TableDescriptor) error {
90+
func checkExpressionEvaluation(dst *descpb.TableDescriptor, requireKVCompatible bool) error {
9191
// Disallow partial indexes.
92-
for _, idx := range dst.Indexes {
93-
if idx.IsPartial() {
94-
return errors.Newf("table %s has a partial index %s", dst.Name, idx.Name)
92+
if requireKVCompatible {
93+
for _, idx := range dst.Indexes {
94+
if idx.IsPartial() {
95+
return errors.Newf("table %s has a partial index %s", dst.Name, idx.Name)
96+
}
9597
}
9698
}
9799

0 commit comments

Comments
 (0)