Skip to content

Commit 647a27c

Browse files
craig[bot]msbutler
andcommitted
Merge #144415
144415: crosscluster/logical: enable sql writer to run on tables with partial indexes r=jeffswenson a=msbutler Epic: none Release note (ops change): LDR can now replicate on tables with partial indexes when using the default on sql writer. Co-authored-by: Michael Butler <[email protected]>
2 parents ad84a5e + e9f2403 commit 647a27c

File tree

2 files changed

+53
-124
lines changed

2 files changed

+53
-124
lines changed

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 46 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -519,16 +519,8 @@ func TestLogicalStreamIngestionAdvancePTS(t *testing.T) {
519519
defer log.Scope(t).Close(t)
520520

521521
ctx := context.Background()
522-
clusterArgs := base.TestClusterArgs{
523-
ServerArgs: base.TestServerArgs{
524-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
525-
Knobs: base.TestingKnobs{
526-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
527-
},
528-
},
529-
}
530522

531-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
523+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
532524
defer server.Stopper().Stop(ctx)
533525

534526
dbA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
@@ -779,16 +771,8 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) {
779771
skip.UnderRace(t, "multi cluster/node config exhausts hardware")
780772

781773
ctx := context.Background()
782-
clusterArgs := base.TestClusterArgs{
783-
ServerArgs: base.TestServerArgs{
784-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
785-
Knobs: base.TestingKnobs{
786-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
787-
},
788-
},
789-
}
790774

791-
server, s, dbs, _ := setupServerWithNumDBs(t, ctx, clusterArgs, 1, 3)
775+
server, s, dbs, _ := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 3)
792776
defer server.Stopper().Stop(ctx)
793777

794778
dbA, dbB, dbC := dbs[0], dbs[1], dbs[2]
@@ -1154,18 +1138,9 @@ func TestLogicalJobResiliency(t *testing.T) {
11541138

11551139
skip.WithIssue(t, 131184)
11561140

1157-
clusterArgs := base.TestClusterArgs{
1158-
ServerArgs: base.TestServerArgs{
1159-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1160-
Knobs: base.TestingKnobs{
1161-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1162-
},
1163-
},
1164-
}
1165-
11661141
ctx := context.Background()
11671142

1168-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 3)
1143+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 3)
11691144
defer server.Stopper().Stop(ctx)
11701145

11711146
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -1252,17 +1227,7 @@ func TestMultipleSourcesIntoSingleDest(t *testing.T) {
12521227

12531228
ctx := context.Background()
12541229

1255-
clusterArgs := base.TestClusterArgs{
1256-
ServerArgs: base.TestServerArgs{
1257-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1258-
Knobs: base.TestingKnobs{
1259-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1260-
DistSQL: &execinfra.TestingKnobs{},
1261-
},
1262-
},
1263-
}
1264-
1265-
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 1, 3)
1230+
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 3)
12661231
defer server.Stopper().Stop(ctx)
12671232

12681233
PGURLs := GetPGURLs(t, s, dbNames)
@@ -1315,15 +1280,6 @@ func TestThreeWayReplication(t *testing.T) {
13151280

13161281
ctx := context.Background()
13171282

1318-
clusterArgs := base.TestClusterArgs{
1319-
ServerArgs: base.TestServerArgs{
1320-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1321-
Knobs: base.TestingKnobs{
1322-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1323-
},
1324-
},
1325-
}
1326-
13271283
verifyExpectedRowAllServers := func(
13281284
t *testing.T, runners []*sqlutils.SQLRunner, expectedRows [][]string, dbNames []string,
13291285
) {
@@ -1349,7 +1305,7 @@ func TestThreeWayReplication(t *testing.T) {
13491305
}
13501306

13511307
numDBs := 3
1352-
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 1, numDBs)
1308+
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, numDBs)
13531309
defer server.Stopper().Stop(ctx)
13541310

13551311
PGURLs := GetPGURLs(t, s, dbNames)
@@ -1415,17 +1371,8 @@ func TestTombstoneUpdate(t *testing.T) {
14151371

14161372
ctx := context.Background()
14171373

1418-
clusterArgs := base.TestClusterArgs{
1419-
ServerArgs: base.TestServerArgs{
1420-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1421-
Knobs: base.TestingKnobs{
1422-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1423-
},
1424-
},
1425-
}
1426-
14271374
numDBs := 3
1428-
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, clusterArgs, 1, numDBs)
1375+
server, s, runners, dbNames := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, numDBs)
14291376
defer server.Stopper().Stop(ctx)
14301377
PGURLs := GetPGURLs(t, s, dbNames)
14311378

@@ -1477,16 +1424,7 @@ func TestForeignKeyConstraints(t *testing.T) {
14771424

14781425
ctx := context.Background()
14791426

1480-
clusterArgs := base.TestClusterArgs{
1481-
ServerArgs: base.TestServerArgs{
1482-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1483-
Knobs: base.TestingKnobs{
1484-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1485-
},
1486-
},
1487-
}
1488-
1489-
server, s, dbA, _ := setupLogicalTestServer(t, ctx, clusterArgs, 1)
1427+
server, s, dbA, _ := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
14901428
defer server.Stopper().Stop(ctx)
14911429

14921430
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -1782,14 +1720,7 @@ func TestLogicalStreamIngestionJobWithFallbackUDF(t *testing.T) {
17821720
skip.WithIssue(t, 129569, "flakey test")
17831721

17841722
ctx := context.Background()
1785-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, base.TestClusterArgs{
1786-
ServerArgs: base.TestServerArgs{
1787-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
1788-
Knobs: base.TestingKnobs{
1789-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
1790-
},
1791-
},
1792-
}, 1)
1723+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
17931724
defer server.Stopper().Stop(ctx)
17941725

17951726
lwwFunc := `CREATE OR REPLACE FUNCTION repl_apply(action STRING, proposed tab, existing tab, prev tab, existing_mvcc_timestamp DECIMAL, existing_origin_timestamp DECIMAL, proposed_mvcc_timestamp DECIMAL)
@@ -2072,17 +2003,9 @@ func TestUserPrivileges(t *testing.T) {
20722003
defer log.Scope(t).Close(t)
20732004

20742005
ctx := context.Background()
2075-
clusterArgs := base.TestClusterArgs{
2076-
ServerArgs: base.TestServerArgs{
2077-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2078-
Knobs: base.TestingKnobs{
2079-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2080-
},
2081-
},
2082-
}
20832006
rng, _ := randutil.NewPseudoRand()
20842007

2085-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
2008+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
20862009
defer server.Stopper().Stop(ctx)
20872010

20882011
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -2224,16 +2147,8 @@ func TestLogicalReplicationSchemaChanges(t *testing.T) {
22242147
defer log.Scope(t).Close(t)
22252148

22262149
ctx := context.Background()
2227-
clusterArgs := base.TestClusterArgs{
2228-
ServerArgs: base.TestServerArgs{
2229-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2230-
Knobs: base.TestingKnobs{
2231-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2232-
},
2233-
},
2234-
}
22352150

2236-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
2151+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
22372152
defer server.Stopper().Stop(ctx)
22382153

22392154
// Add some stuff to tables in prep for schema change testing.
@@ -2312,16 +2227,8 @@ func TestUserDefinedTypes(t *testing.T) {
23122227
skip.UnderDuress(t, "this needs to be multi-node but that tends to be too slow for duressed builds")
23132228

23142229
ctx := context.Background()
2315-
clusterArgs := base.TestClusterArgs{
2316-
ServerArgs: base.TestServerArgs{
2317-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2318-
Knobs: base.TestingKnobs{
2319-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2320-
},
2321-
},
2322-
}
23232230

2324-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 3)
2231+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 3)
23252232
defer server.Stopper().Stop(ctx)
23262233

23272234
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -2484,6 +2391,40 @@ func TestMismatchColIDs(t *testing.T) {
24842391
jobutils.WaitForJobToPause(t, sqlA, jobID)
24852392
}
24862393

2394+
func TestPartialIndexes(t *testing.T) {
2395+
defer leaktest.AfterTest(t)()
2396+
skip.UnderDeadlock(t)
2397+
defer log.Scope(t).Close(t)
2398+
2399+
ctx := context.Background()
2400+
2401+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2402+
defer server.Stopper().Stop(ctx)
2403+
2404+
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'sql'")
2405+
2406+
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
2407+
2408+
dbA.Exec(t, "CREATE TABLE foo (pk INT PRIMARY KEY, pi INT, payload STRING)")
2409+
dbA.Exec(t, "CREATE INDEX idx ON foo (pi) WHERE pk > 0")
2410+
dbB.Exec(t, "CREATE TABLE b.foo (pk INT PRIMARY KEY, pi INT, payload STRING)")
2411+
dbB.Exec(t, "CREATE INDEX idx ON b.foo (pi) WHERE pk < 0")
2412+
dbB.Exec(t, "INSERT INTO foo VALUES (1, 2, 'hello')")
2413+
dbB.Exec(t, "INSERT INTO foo VALUES (-1, -2, 'world')")
2414+
2415+
var jobAID jobspb.JobID
2416+
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String()).Scan(&jobAID)
2417+
now := s.Clock().Now()
2418+
WaitUntilReplicatedTime(t, now, dbA, jobAID)
2419+
2420+
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = 2", [][]string{{"1", "2", "hello"}})
2421+
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = -2", [][]string{{"-1", "-2", "world"}})
2422+
2423+
// Check for partial indexes when using legacy kv writer.
2424+
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
2425+
dbA.ExpectErr(t, "cannot create logical replication stream: table foo has a partial index idx", "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String())
2426+
}
2427+
24872428
// TestLogicalReplicationCreationChecks verifies that we check that the table
24882429
// schemas are compatible when creating the replication stream.
24892430
func TestLogicalReplicationCreationChecks(t *testing.T) {
@@ -2492,16 +2433,8 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24922433
defer log.Scope(t).Close(t)
24932434

24942435
ctx := context.Background()
2495-
clusterArgs := base.TestClusterArgs{
2496-
ServerArgs: base.TestServerArgs{
2497-
DefaultTestTenant: base.TestControlsTenantsExplicitly,
2498-
Knobs: base.TestingKnobs{
2499-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2500-
},
2501-
},
2502-
}
25032436

2504-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
2437+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
25052438
defer server.Stopper().Stop(ctx)
25062439

25072440
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -2543,15 +2476,9 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
25432476
dbA.Exec(t, "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, composite_col)")
25442477
expectErr(t, "tab", `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding`)
25452478
replicationtestutils.WaitForAllProducerJobsToFail(t, dbB)
2546-
2547-
// Check for partial indexes.
25482479
dbA.Exec(t, "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk)")
2549-
dbA.Exec(t, "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0")
2550-
expectErr(t, "tab", "cannot create logical replication stream: table tab has a partial index partial_idx")
2551-
replicationtestutils.WaitForAllProducerJobsToFail(t, dbB)
25522480

25532481
// Check for hash sharded indexes.
2554-
dbA.Exec(t, "DROP INDEX partial_idx")
25552482
dbA.Exec(t, "CREATE INDEX hash_idx ON tab(pk) USING HASH WITH (bucket_count = 4)")
25562483
dbB.Exec(t, "CREATE INDEX hash_idx ON b.tab(pk) USING HASH WITH (bucket_count = 4)")
25572484
expectErr(t, "tab", "tab has a virtual computed column crdb_internal_pk_shard_4 that is a key of index hash_idx")

pkg/sql/catalog/tabledesc/logical_replication_helpers.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func CheckLogicalReplicationCompatibility(
3939
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
4040
}
4141

42-
if err := checkExpressionEvaluation(dst); err != nil {
42+
if err := checkExpressionEvaluation(dst, requireKvWriterCompatible); err != nil {
4343
return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg)
4444
}
4545
if err := checkUniqueWithoutIndex(dst); err != nil {
@@ -87,11 +87,13 @@ func checkOutboundReferences(dst *descpb.TableDescriptor) error {
8787
// expressions. The writer expects to receive the full set of columns, even the
8888
// computed ones, along with a list of columns that we've already determined
8989
// should be updated.
90-
func checkExpressionEvaluation(dst *descpb.TableDescriptor) error {
90+
func checkExpressionEvaluation(dst *descpb.TableDescriptor, requireKVCompatible bool) error {
9191
// Disallow partial indexes.
92-
for _, idx := range dst.Indexes {
93-
if idx.IsPartial() {
94-
return errors.Newf("table %s has a partial index %s", dst.Name, idx.Name)
92+
if requireKVCompatible {
93+
for _, idx := range dst.Indexes {
94+
if idx.IsPartial() {
95+
return errors.Newf("table %s has a partial index %s", dst.Name, idx.Name)
96+
}
9597
}
9698
}
9799

0 commit comments

Comments
 (0)