Skip to content

KIKIMR-21403: add followers support for secondary indexes #17965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 6, 2025
41 changes: 29 additions & 12 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,28 @@ namespace {
return true;
}

[[nodiscard]] bool ParseReadReplicasSettings(
Ydb::Table::ReadReplicasSettings& readReplicasSettings,
const TCoNameValueTuple& setting,
TExprContext& ctx
) {
const auto replicasSettings = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);

Ydb::StatusIds::StatusCode code;
TString errText;
if (!ConvertReadReplicasSettingsToProto(replicasSettings, readReplicasSettings, code, errText)) {

ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
NYql::YqlStatusFromYdbStatus(code),
errText));
return false;
}

return true;
}

bool ParseAsyncReplicationSettingsBase(
TReplicationSettingsBase& dstSettings, const TCoNameValueTupleList& srcSettings, TExprContext& ctx, TPositionHandle pos,
const TString& objectName = "replication"
Expand Down Expand Up @@ -1783,17 +1805,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
}

} else if (name == "readReplicasSettings") {
const auto replicasSettings = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
Ydb::StatusIds::StatusCode code;
TString errText;
if (!ConvertReadReplicasSettingsToProto(replicasSettings,
*alterTableRequest.mutable_set_read_replicas_settings(), code, errText)) {

ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
NYql::YqlStatusFromYdbStatus(code),
errText));
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), setting, ctx)) {
return SyncError();
}
} else if (name == "setTtlSettings") {
Expand Down Expand Up @@ -1944,12 +1956,17 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
} else if (settingName == "tableSettings") {
auto tableSettings = indexSetting.Value().Cast<TCoNameValueTupleList>();
for (const auto& tableSetting : tableSettings) {
if (IsPartitioningSetting(tableSetting.Name().Value())) {
const auto name = tableSetting.Name().Value();
if (IsPartitioningSetting(name)) {
if (!ParsePartitioningSettings(
*alterTableRequest.mutable_alter_partitioning_settings(), tableSetting, ctx
)) {
return SyncError();
}
} else if (name == "readReplicasSettings") {
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), tableSetting, ctx)) {
return SyncError();
}
} else {
ctx.AddError(
TIssue(ctx.GetPosition(tableSetting.Name().Pos()),
Expand Down
30 changes: 30 additions & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,36 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
}

int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName) {
int result = 0;

TTestActorRuntime* runtime = server.GetRuntime();
auto sender = runtime->AllocateEdgeActor();
auto shards = GetTableShards(&server, sender, path);
UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");

for (auto x : shards) {
runtime->SendToPipe(
x,
sender,
new TEvTablet::TEvGetCounters,
0,
GetPipeConfigWithRetries());

auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
UNIT_ASSERT(ev);

const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetCumulativeCounters()) {
if (counter.GetName() == counterName) {
result += counter.GetValue();
}
}
}

return result;
}

TTableId ResolveTableId(Tests::TServer* server, TActorId sender, const TString& path) {
auto response = Navigate(*server->GetRuntime(), sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
return response->ResultSet.at(0).TableId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons

void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName);

void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed = false);

Expand Down
80 changes: 80 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM FollowersKv WHERE Key = 21;
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
kikimr.GetTestServer(),
"/Root/FollowersKv",
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
));

CompareYson(R"(
[
Expand All @@ -2385,6 +2390,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM FollowersKv WHERE Value != "One" ORDER BY Key;
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
kikimr.GetTestServer(),
"/Root/FollowersKv",
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
));

CompareYson(R"(
[
Expand All @@ -2400,6 +2410,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM TwoShard WHERE Key = 2;
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
kikimr.GetTestServer(),
"/Root/TwoShard",
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
));

CompareYson(R"(
[
Expand All @@ -2420,6 +2435,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
[[4000000001u];["BigOne"];[-1]]
]
)", FormatResultSetYson(result.GetResultSet(0)));
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
kikimr.GetTestServer(),
"/Root/TwoShard",
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
));
}

Y_UNIT_TEST(StaleRO_Immediate) {
Expand All @@ -2444,6 +2464,66 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
)", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST_TWIN(StaleRO_IndexFollowers, EnableFollowers) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `KeySubkey` (
Key Uint64,
Subkey Uint64,
Value String,
Order Uint32,
PRIMARY KEY (Key, Subkey)
);

ALTER TABLE `KeySubkey` ADD INDEX `idx` GLOBAL SYNC ON (`Key`, `Order`) COVER (`Value`);
)").GetValueSync());

if constexpr (EnableFollowers) {
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
ALTER TABLE `KeySubkey` ALTER INDEX `idx` SET READ_REPLICAS_SETTINGS "PER_AZ:1";
)").GetValueSync());
}

AssertSuccessResult(session.ExecuteDataQuery(R"(
--!syntax_v1

REPLACE INTO `KeySubkey` (`Key`, `Subkey`, `Value`, `Order`) VALUES
(1u, 2u, "One", 7u),
(1u, 3u, "Two", 4u),
(21u, 8u, "Three", 1u),
(31u, 0u, "Four", 8u);
)", TTxControl::BeginTx().CommitTx()).GetValueSync());

auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
SELECT * FROM `KeySubkey` VIEW `idx` WHERE Key = 1 ORDER BY `Order`;
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);

const auto FollowerCpuTime = GetCumulativeCounterValue(
kikimr.GetTestServer(),
"/Root/KeySubkey/idx/indexImplTable",
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
);
if constexpr (EnableFollowers) {
UNIT_ASSERT_UNEQUAL(0, FollowerCpuTime);
} else {
UNIT_ASSERT_EQUAL(0, FollowerCpuTime);
}

CompareYson(R"(
[
[[1u];[4u];[3u];["Two"]];
[[1u];[7u];[2u];["One"]];
]
)", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(ReadRangeWithParams) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
Expand Down
46 changes: 39 additions & 7 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2802,19 +2802,30 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
AlterTableAddIndex(EIndexTypeSql::GlobalVectorKMeansTree);
}

Y_UNIT_TEST(AlterTableAlterIndex) {
Y_UNIT_TEST_TWIN(AlterTableAlterIndex, UseQueryService) {
TKikimrRunner kikimr;
auto queryClient = kikimr.GetQueryClient();
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
CreateSampleTablesWithIndex(session);

const auto executeGeneric = [&queryClient, &session](const TString& query) -> TStatus {
if constexpr (UseQueryService) {
Y_UNUSED(session);
return queryClient.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
} else {
Y_UNUSED(queryClient);
return session.ExecuteSchemeQuery(query).ExtractValueSync();
}
};

constexpr int minPartitionsCount = 10;
{
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
const auto result = executeGeneric(Sprintf(R"(
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_MIN_PARTITIONS_COUNT %d;
)", minPartitionsCount
)
).ExtractValueSync();
);
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
Expand All @@ -2826,18 +2837,39 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

constexpr int partitionSizeMb = 555;
{
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
const auto result = executeGeneric(Sprintf(R"(
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_PARTITION_SIZE_MB %d;
)", partitionSizeMb)
).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
)", partitionSizeMb
)
);
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
auto indexDesc = describe.GetTableDescription();
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), partitionSizeMb);
}

constexpr TStringBuf readReplicasModeAsString = "PER_AZ";
constexpr auto readReplicasMode = NYdb::NTable::TReadReplicasSettings::EMode::PerAz;
constexpr ui64 readReplicasCount = 1;
{
const auto result = executeGeneric(Sprintf(R"(
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET READ_REPLICAS_SETTINGS "%s:%)" PRIu64 R"(";
)", readReplicasModeAsString.data(), readReplicasCount
)
);
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
auto indexDesc = describe.GetTableDescription();
UNIT_ASSERT(indexDesc.GetReadReplicasSettings());
UNIT_ASSERT(indexDesc.GetReadReplicasSettings()->GetMode() == readReplicasMode);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetReadReplicasSettings()->GetReadReplicasCount(), readReplicasCount);
}
}

Y_UNIT_TEST(AlterTableAlterVectorIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ TVector<ISubOperation::TPtr> CreateConsistentAlterTable(TOperationId id, const T
if (!(IsAdministrator(AppData(), context.UserToken.Get()) && !AppData()->AdministrationAllowedSIDs.empty())
&& (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig", "IncrementalBackupConfig"})
|| (alter.HasPartitionConfig()
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy"})
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy", "FollowerCount", "FollowerGroups"})
)
)
) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ NKikimrSchemeOp::TPartitionConfig PartitionConfigForIndexes(
if (baseTablePartitionConfig.HasKeepSnapshotTimeout()) {
result.SetKeepSnapshotTimeout(baseTablePartitionConfig.GetKeepSnapshotTimeout());
}
if (indexTableDesc.GetPartitionConfig().FollowerGroupsSize()) {
result.MutableFollowerGroups()->CopyFrom(indexTableDesc.GetPartitionConfig().GetFollowerGroups());
}
// skip repeated NKikimrStorageSettings.TStorageRoom StorageRooms = 17;
// skip optional NKikimrHive.TFollowerGroup FollowerGroup = 23;

return result;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ void FillGlobalIndexSettings(Ydb::Table::GlobalIndexSettings& settings,
}

FillPartitioningSettingsImpl(settings, indexImplTableDescription);
FillReadReplicasSettings(settings, indexImplTableDescription);
}

template <typename TYdbProto>
Expand Down Expand Up @@ -1633,6 +1634,11 @@ void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
FillReadReplicasSettingsImpl(out, in);
}

void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
const NKikimrSchemeOp::TTableDescription& in) {
FillReadReplicasSettingsImpl(out, in);
}

bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles,
Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable)
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ void FillReadReplicasSettings(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in);
void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
const NKikimrSchemeOp::TTableDescription& in);
void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
const NKikimrSchemeOp::TTableDescription& in);

// in
bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
Expand Down
27 changes: 26 additions & 1 deletion ydb/core/ydb_convert/table_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,10 @@ bool FillIndexTablePartitioning(
) {
auto fillIndexPartitioning = [&](const Ydb::Table::GlobalIndexSettings& settings, std::vector<NKikimrSchemeOp::TTableDescription>& indexImplTableDescriptions) {
auto& indexImplTableDescription = indexImplTableDescriptions.emplace_back();
auto& partitionConfig = *indexImplTableDescription.MutablePartitionConfig();

if (settings.has_partitioning_settings()) {
if (!FillPartitioningPolicy(*indexImplTableDescription.MutablePartitionConfig(), settings, code, error)) {
if (!FillPartitioningPolicy(partitionConfig, settings, code, error)) {
return false;
}
}
Expand All @@ -414,6 +415,30 @@ bool FillIndexTablePartitioning(
return false;
}
}
if (settings.has_read_replicas_settings()) {
const auto& readReplicasSettings = settings.read_replicas_settings();
switch (readReplicasSettings.settings_case()) {
case Ydb::Table::ReadReplicasSettings::kPerAzReadReplicasCount:
{
auto& followerGroup = *partitionConfig.AddFollowerGroups();
followerGroup.SetFollowerCount(readReplicasSettings.per_az_read_replicas_count());
followerGroup.SetRequireAllDataCenters(true);
followerGroup.SetFollowerCountPerDataCenter(true);
break;
}
case Ydb::Table::ReadReplicasSettings::kAnyAzReadReplicasCount:
{
auto& followerGroup = *partitionConfig.AddFollowerGroups();
followerGroup.SetFollowerCount(readReplicasSettings.any_az_read_replicas_count());
followerGroup.SetRequireAllDataCenters(false);
break;
}
default:
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Unknown read_replicas_settings type";
return false;
}
}
return true;
};

Expand Down
Loading
Loading