Skip to content

Commit 0f55ef2

Browse files
authored
Followers support for secondary indexes (#17965)
1 parent 7a070ce commit 0f55ef2

File tree

16 files changed

+439
-138
lines changed

16 files changed

+439
-138
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+29-12
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,28 @@ namespace {
699699
return true;
700700
}
701701

702+
[[nodiscard]] bool ParseReadReplicasSettings(
703+
Ydb::Table::ReadReplicasSettings& readReplicasSettings,
704+
const TCoNameValueTuple& setting,
705+
TExprContext& ctx
706+
) {
707+
const auto replicasSettings = TString(
708+
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
709+
);
710+
711+
Ydb::StatusIds::StatusCode code;
712+
TString errText;
713+
if (!ConvertReadReplicasSettingsToProto(replicasSettings, readReplicasSettings, code, errText)) {
714+
715+
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
716+
NYql::YqlStatusFromYdbStatus(code),
717+
errText));
718+
return false;
719+
}
720+
721+
return true;
722+
}
723+
702724
bool ParseAsyncReplicationSettingsBase(
703725
TReplicationSettingsBase& dstSettings, const TCoNameValueTupleList& srcSettings, TExprContext& ctx, TPositionHandle pos,
704726
const TString& objectName = "replication"
@@ -1783,17 +1805,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
17831805
}
17841806

17851807
} else if (name == "readReplicasSettings") {
1786-
const auto replicasSettings = TString(
1787-
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
1788-
);
1789-
Ydb::StatusIds::StatusCode code;
1790-
TString errText;
1791-
if (!ConvertReadReplicasSettingsToProto(replicasSettings,
1792-
*alterTableRequest.mutable_set_read_replicas_settings(), code, errText)) {
1793-
1794-
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
1795-
NYql::YqlStatusFromYdbStatus(code),
1796-
errText));
1808+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), setting, ctx)) {
17971809
return SyncError();
17981810
}
17991811
} else if (name == "setTtlSettings") {
@@ -1944,12 +1956,17 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
19441956
} else if (settingName == "tableSettings") {
19451957
auto tableSettings = indexSetting.Value().Cast<TCoNameValueTupleList>();
19461958
for (const auto& tableSetting : tableSettings) {
1947-
if (IsPartitioningSetting(tableSetting.Name().Value())) {
1959+
const auto name = tableSetting.Name().Value();
1960+
if (IsPartitioningSetting(name)) {
19481961
if (!ParsePartitioningSettings(
19491962
*alterTableRequest.mutable_alter_partitioning_settings(), tableSetting, ctx
19501963
)) {
19511964
return SyncError();
19521965
}
1966+
} else if (name == "readReplicasSettings") {
1967+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), tableSetting, ctx)) {
1968+
return SyncError();
1969+
}
19531970
} else {
19541971
ctx.AddError(
19551972
TIssue(ctx.GetPosition(tableSetting.Name().Pos()),

ydb/core/kqp/ut/common/kqp_ut_common.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,36 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
15141514
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
15151515
}
15161516

1517+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName) {
1518+
int result = 0;
1519+
1520+
TTestActorRuntime* runtime = server.GetRuntime();
1521+
auto sender = runtime->AllocateEdgeActor();
1522+
auto shards = GetTableShards(&server, sender, path);
1523+
UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");
1524+
1525+
for (auto x : shards) {
1526+
runtime->SendToPipe(
1527+
x,
1528+
sender,
1529+
new TEvTablet::TEvGetCounters,
1530+
0,
1531+
GetPipeConfigWithRetries());
1532+
1533+
auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
1534+
UNIT_ASSERT(ev);
1535+
1536+
const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
1537+
for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetCumulativeCounters()) {
1538+
if (counter.GetName() == counterName) {
1539+
result += counter.GetValue();
1540+
}
1541+
}
1542+
}
1543+
1544+
return result;
1545+
}
1546+
15171547
TTableId ResolveTableId(Tests::TServer* server, TActorId sender, const TString& path) {
15181548
auto response = Navigate(*server->GetRuntime(), sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
15191549
return response->ResultSet.at(0).TableId;

ydb/core/kqp/ut/common/kqp_ut_common.h

+1
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons
391391

392392
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
393393
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
394+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName);
394395

395396
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed = false);
396397

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

+80
Original file line numberDiff line numberDiff line change
@@ -2372,6 +2372,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23722372
SELECT * FROM FollowersKv WHERE Key = 21;
23732373
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
23742374
AssertSuccessResult(result);
2375+
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2376+
kikimr.GetTestServer(),
2377+
"/Root/FollowersKv",
2378+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2379+
));
23752380

23762381
CompareYson(R"(
23772382
[
@@ -2385,6 +2390,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23852390
SELECT * FROM FollowersKv WHERE Value != "One" ORDER BY Key;
23862391
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
23872392
AssertSuccessResult(result);
2393+
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2394+
kikimr.GetTestServer(),
2395+
"/Root/FollowersKv",
2396+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2397+
));
23882398

23892399
CompareYson(R"(
23902400
[
@@ -2400,6 +2410,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24002410
SELECT * FROM TwoShard WHERE Key = 2;
24012411
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
24022412
AssertSuccessResult(result);
2413+
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2414+
kikimr.GetTestServer(),
2415+
"/Root/TwoShard",
2416+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2417+
));
24032418

24042419
CompareYson(R"(
24052420
[
@@ -2420,6 +2435,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24202435
[[4000000001u];["BigOne"];[-1]]
24212436
]
24222437
)", FormatResultSetYson(result.GetResultSet(0)));
2438+
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2439+
kikimr.GetTestServer(),
2440+
"/Root/TwoShard",
2441+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2442+
));
24232443
}
24242444

24252445
Y_UNIT_TEST(StaleRO_Immediate) {
@@ -2444,6 +2464,66 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24442464
)", FormatResultSetYson(result.GetResultSet(0)));
24452465
}
24462466

2467+
Y_UNIT_TEST_TWIN(StaleRO_IndexFollowers, EnableFollowers) {
2468+
auto kikimr = DefaultKikimrRunner();
2469+
auto db = kikimr.GetTableClient();
2470+
auto session = db.CreateSession().GetValueSync().GetSession();
2471+
2472+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2473+
--!syntax_v1
2474+
CREATE TABLE `KeySubkey` (
2475+
Key Uint64,
2476+
Subkey Uint64,
2477+
Value String,
2478+
Order Uint32,
2479+
PRIMARY KEY (Key, Subkey)
2480+
);
2481+
2482+
ALTER TABLE `KeySubkey` ADD INDEX `idx` GLOBAL SYNC ON (`Key`, `Order`) COVER (`Value`);
2483+
)").GetValueSync());
2484+
2485+
if constexpr (EnableFollowers) {
2486+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2487+
--!syntax_v1
2488+
ALTER TABLE `KeySubkey` ALTER INDEX `idx` SET READ_REPLICAS_SETTINGS "PER_AZ:1";
2489+
)").GetValueSync());
2490+
}
2491+
2492+
AssertSuccessResult(session.ExecuteDataQuery(R"(
2493+
--!syntax_v1
2494+
2495+
REPLACE INTO `KeySubkey` (`Key`, `Subkey`, `Value`, `Order`) VALUES
2496+
(1u, 2u, "One", 7u),
2497+
(1u, 3u, "Two", 4u),
2498+
(21u, 8u, "Three", 1u),
2499+
(31u, 0u, "Four", 8u);
2500+
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
2501+
2502+
auto result = session.ExecuteDataQuery(R"(
2503+
--!syntax_v1
2504+
SELECT * FROM `KeySubkey` VIEW `idx` WHERE Key = 1 ORDER BY `Order`;
2505+
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2506+
AssertSuccessResult(result);
2507+
2508+
const auto FollowerCpuTime = GetCumulativeCounterValue(
2509+
kikimr.GetTestServer(),
2510+
"/Root/KeySubkey/idx/indexImplTable",
2511+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2512+
);
2513+
if constexpr (EnableFollowers) {
2514+
UNIT_ASSERT_UNEQUAL(0, FollowerCpuTime);
2515+
} else {
2516+
UNIT_ASSERT_EQUAL(0, FollowerCpuTime);
2517+
}
2518+
2519+
CompareYson(R"(
2520+
[
2521+
[[1u];[4u];[3u];["Two"]];
2522+
[[1u];[7u];[2u];["One"]];
2523+
]
2524+
)", FormatResultSetYson(result.GetResultSet(0)));
2525+
}
2526+
24472527
Y_UNIT_TEST(ReadRangeWithParams) {
24482528
auto kikimr = DefaultKikimrRunner();
24492529
auto db = kikimr.GetTableClient();

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

+39-7
Original file line numberDiff line numberDiff line change
@@ -2802,19 +2802,30 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
28022802
AlterTableAddIndex(EIndexTypeSql::GlobalVectorKMeansTree);
28032803
}
28042804

2805-
Y_UNIT_TEST(AlterTableAlterIndex) {
2805+
Y_UNIT_TEST_TWIN(AlterTableAlterIndex, UseQueryService) {
28062806
TKikimrRunner kikimr;
2807+
auto queryClient = kikimr.GetQueryClient();
28072808
auto db = kikimr.GetTableClient();
28082809
auto session = db.CreateSession().GetValueSync().GetSession();
28092810
CreateSampleTablesWithIndex(session);
28102811

2812+
const auto executeGeneric = [&queryClient, &session](const TString& query) -> TStatus {
2813+
if constexpr (UseQueryService) {
2814+
Y_UNUSED(session);
2815+
return queryClient.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2816+
} else {
2817+
Y_UNUSED(queryClient);
2818+
return session.ExecuteSchemeQuery(query).ExtractValueSync();
2819+
}
2820+
};
2821+
28112822
constexpr int minPartitionsCount = 10;
28122823
{
2813-
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
2824+
const auto result = executeGeneric(Sprintf(R"(
28142825
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_MIN_PARTITIONS_COUNT %d;
28152826
)", minPartitionsCount
28162827
)
2817-
).ExtractValueSync();
2828+
);
28182829
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
28192830
}
28202831
{
@@ -2826,18 +2837,39 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
28262837

28272838
constexpr int partitionSizeMb = 555;
28282839
{
2829-
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
2840+
const auto result = executeGeneric(Sprintf(R"(
28302841
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_PARTITION_SIZE_MB %d;
2831-
)", partitionSizeMb)
2832-
).ExtractValueSync();
2833-
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2842+
)", partitionSizeMb
2843+
)
2844+
);
2845+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
28342846
}
28352847
{
28362848
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
28372849
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
28382850
auto indexDesc = describe.GetTableDescription();
28392851
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), partitionSizeMb);
28402852
}
2853+
2854+
constexpr TStringBuf readReplicasModeAsString = "PER_AZ";
2855+
constexpr auto readReplicasMode = NYdb::NTable::TReadReplicasSettings::EMode::PerAz;
2856+
constexpr ui64 readReplicasCount = 1;
2857+
{
2858+
const auto result = executeGeneric(Sprintf(R"(
2859+
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET READ_REPLICAS_SETTINGS "%s:%)" PRIu64 R"(";
2860+
)", readReplicasModeAsString.data(), readReplicasCount
2861+
)
2862+
);
2863+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2864+
}
2865+
{
2866+
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
2867+
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
2868+
auto indexDesc = describe.GetTableDescription();
2869+
UNIT_ASSERT(indexDesc.GetReadReplicasSettings());
2870+
UNIT_ASSERT(indexDesc.GetReadReplicasSettings()->GetMode() == readReplicasMode);
2871+
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetReadReplicasSettings()->GetReadReplicasCount(), readReplicasCount);
2872+
}
28412873
}
28422874

28432875
void CreateTestTableWithVectorIndex(TSession& session) {

ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ TVector<ISubOperation::TPtr> CreateConsistentAlterTable(TOperationId id, const T
751751
if (!(IsAdministrator(AppData(), context.UserToken.Get()) && !AppData()->AdministrationAllowedSIDs.empty())
752752
&& (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig", "IncrementalBackupConfig"})
753753
|| (alter.HasPartitionConfig()
754-
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy"})
754+
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy", "FollowerCount", "FollowerGroups"})
755755
)
756756
)
757757
) {

ydb/core/tx/schemeshard/schemeshard_utils.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ NKikimrSchemeOp::TPartitionConfig PartitionConfigForIndexes(
136136
if (baseTablePartitionConfig.HasKeepSnapshotTimeout()) {
137137
result.SetKeepSnapshotTimeout(baseTablePartitionConfig.GetKeepSnapshotTimeout());
138138
}
139+
if (indexTableDesc.GetPartitionConfig().FollowerGroupsSize()) {
140+
result.MutableFollowerGroups()->CopyFrom(indexTableDesc.GetPartitionConfig().GetFollowerGroups());
141+
}
139142
// skip repeated NKikimrStorageSettings.TStorageRoom StorageRooms = 17;
140-
// skip optional NKikimrHive.TFollowerGroup FollowerGroup = 23;
141143

142144
return result;
143145
}

ydb/core/ydb_convert/table_description.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,7 @@ void FillGlobalIndexSettings(Ydb::Table::GlobalIndexSettings& settings,
10151015
}
10161016

10171017
FillPartitioningSettingsImpl(settings, indexImplTableDescription);
1018+
FillReadReplicasSettings(settings, indexImplTableDescription);
10181019
}
10191020

10201021
template <typename TYdbProto>
@@ -1633,6 +1634,11 @@ void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
16331634
FillReadReplicasSettingsImpl(out, in);
16341635
}
16351636

1637+
void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
1638+
const NKikimrSchemeOp::TTableDescription& in) {
1639+
FillReadReplicasSettingsImpl(out, in);
1640+
}
1641+
16361642
bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
16371643
const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles,
16381644
Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable)

ydb/core/ydb_convert/table_description.h

+2
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ void FillReadReplicasSettings(Ydb::Table::DescribeTableResult& out,
141141
const NKikimrSchemeOp::TTableDescription& in);
142142
void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
143143
const NKikimrSchemeOp::TTableDescription& in);
144+
void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
145+
const NKikimrSchemeOp::TTableDescription& in);
144146

145147
// in
146148
bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,

ydb/core/ydb_convert/table_settings.cpp

+26-1
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,10 @@ bool FillIndexTablePartitioning(
403403
) {
404404
auto fillIndexPartitioning = [&](const Ydb::Table::GlobalIndexSettings& settings, std::vector<NKikimrSchemeOp::TTableDescription>& indexImplTableDescriptions) {
405405
auto& indexImplTableDescription = indexImplTableDescriptions.emplace_back();
406+
auto& partitionConfig = *indexImplTableDescription.MutablePartitionConfig();
406407

407408
if (settings.has_partitioning_settings()) {
408-
if (!FillPartitioningPolicy(*indexImplTableDescription.MutablePartitionConfig(), settings, code, error)) {
409+
if (!FillPartitioningPolicy(partitionConfig, settings, code, error)) {
409410
return false;
410411
}
411412
}
@@ -414,6 +415,30 @@ bool FillIndexTablePartitioning(
414415
return false;
415416
}
416417
}
418+
if (settings.has_read_replicas_settings()) {
419+
const auto& readReplicasSettings = settings.read_replicas_settings();
420+
switch (readReplicasSettings.settings_case()) {
421+
case Ydb::Table::ReadReplicasSettings::kPerAzReadReplicasCount:
422+
{
423+
auto& followerGroup = *partitionConfig.AddFollowerGroups();
424+
followerGroup.SetFollowerCount(readReplicasSettings.per_az_read_replicas_count());
425+
followerGroup.SetRequireAllDataCenters(true);
426+
followerGroup.SetFollowerCountPerDataCenter(true);
427+
break;
428+
}
429+
case Ydb::Table::ReadReplicasSettings::kAnyAzReadReplicasCount:
430+
{
431+
auto& followerGroup = *partitionConfig.AddFollowerGroups();
432+
followerGroup.SetFollowerCount(readReplicasSettings.any_az_read_replicas_count());
433+
followerGroup.SetRequireAllDataCenters(false);
434+
break;
435+
}
436+
default:
437+
code = Ydb::StatusIds::BAD_REQUEST;
438+
error = TStringBuilder() << "Unknown read_replicas_settings type";
439+
return false;
440+
}
441+
}
417442
return true;
418443
};
419444

0 commit comments

Comments
 (0)