Skip to content

Remap path #15991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
AFL_VERIFY(ShardIds.size() == ShardsCount)("count", ShardIds.size())("ids", JoinSeq(",", ShardIds));
std::set<NColumnShard::TInternalPathId> pathIdsSet;
for (auto&& i : ShardIds) {
auto pathIds = Controller->GetPathIds(i);
const auto pathIdTranslator = Controller->GetPathIdTranslator(i);
auto pathIds = pathIdTranslator->GetInternalPathIds();
pathIdsSet.insert(pathIds.begin(), pathIds.end());
}
PathIds = std::vector<NColumnShard::TInternalPathId>(pathIdsSet.begin(), pathIdsSet.end());
Expand Down Expand Up @@ -502,17 +503,17 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
// tester.CheckCount();
// }

Y_UNIT_TEST(SplitEmpty) {
TAsyncReshardingTest tester;
// Y_UNIT_TEST(SplitEmpty) {
// TAsyncReshardingTest tester;

tester.CheckCount();
// tester.CheckCount();

tester.StartResharding("SPLIT");
// tester.StartResharding("SPLIT");

tester.CheckCount();
tester.WaitResharding();
tester.CheckCount();
}
// tester.CheckCount();
// tester.WaitResharding();
// tester.CheckCount();
// }

// Y_UNIT_TEST(ChangeSchemaAndSplit) {
// TAsyncReshardingTest tester;
Expand Down
27 changes: 12 additions & 15 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,28 +246,25 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

TAutoPtr<IEventHandle> handle;

size_t shard = 0;
std::set<NColumnShard::TInternalPathId> pathids;
std::optional<NColumnShard::TSchemeShardLocalPathId> schemeShardLocalPathId;
for (auto&& i : csController->GetShardActualIds()) {
Cerr << ">>> shard actual id: " << i << Endl;
for (auto&& j : csController->GetPathIds(i)) {
Cerr << ">>> path id: " << j << Endl;
pathids.insert(j);
}
if (++shard == 3) {
break;
const auto pathIds = csController->GetPathIdTranslator(i)->GetSchemeShardLocalPathIds();
UNIT_ASSERT(pathIds.size() == 1);
if (schemeShardLocalPathId.has_value()) {
UNIT_ASSERT(schemeShardLocalPathId == *pathIds.begin());
} else {
schemeShardLocalPathId = *pathIds.begin();
}
}

UNIT_ASSERT(pathids.size() == 1);
const auto& pathId = *pathids.begin();
UNIT_ASSERT(schemeShardLocalPathId.has_value());

shard = 0;
for (auto&& i : csController->GetShardActualIds()) {
size_t shard = 0;
for (const auto& [tabletId, pathIdTranslator]: csController->GetActiveTablets()) {
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
request->Record.MutableTable()->MutablePathId()->SetLocalId(pathId.GetRawValue());

runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward(request.release(), i, false));
request->Record.MutableTable()->MutablePathId()->SetLocalId(schemeShardLocalPathId->GetRawValue());
runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward(request.release(), static_cast<ui64>(tabletId), false));
if (++shard == 3) {
break;
}
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,13 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {
olapHelper.CreateTestOlapTable();
tieringHelper.WriteSampleData();
csController->WaitCompactions(TDuration::Seconds(5));
THashSet<NColumnShard::TInternalPathId> pathsToLock{
NColumnShard::TInternalPathId::FromRawValue(0),
NColumnShard::TInternalPathId::FromRawValue(1),
NColumnShard::TInternalPathId::FromRawValue(2),
NColumnShard::TInternalPathId::FromRawValue(3),
NColumnShard::TInternalPathId::FromRawValue(4),
NColumnShard::TInternalPathId::FromRawValue(5),
THashSet<NColumnShard::TInternalPathId> pathsToLock;
for (const auto& [_, pathIdTranslator]: csController->GetActiveTablets()) {
for (size_t i = 0; i != 6; ++i) {
if (auto internalPathId = pathIdTranslator->ResolveInternalPathId(NColumnShard::TSchemeShardLocalPathId::FromRawValue(i))) {
pathsToLock.insert(*internalPathId);
}
}
};

csController->RegisterLock("table", std::make_shared<NOlap::NDataLocks::TListTablesLock>("table", std::move(pathsToLock), NOlap::NDataLocks::ELockCategory::Compaction));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (!operationIds.emplace(operation->GetWriteId()).second) {
continue;
}
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId().InternalPathId, minReadSnapshot));
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
operation->OnWriteFinish(txc, {}, true);
Expand All @@ -75,7 +75,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
lock.SetDataShard(Self->TabletID());
lock.SetGeneration(info.GetGeneration());
lock.SetCounter(info.GetInternalGenerationCounter());
lock.SetPathId(writeMeta.GetTableId().GetRawValue());
writeMeta.GetPathId().SchemeShardLocalPathId.ToProto(lock);
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
Expand Down Expand Up @@ -103,11 +103,11 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
continue;
}
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
pathIds.emplace(op->GetPathId());
pathIds.emplace(op->GetPathId().InternalPathId);
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetTableId())) {
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId().InternalPathId)) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(
writeMeta.GetTableId(), writeResult.GetPKBatchVerified(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
writeMeta.GetPathId().InternalPathId, writeResult.GetPKBatchVerified(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId().InternalPathId, minReadSnapshot));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
AFL_VERIFY(!writeMeta.HasLongTxId());
Expand Down Expand Up @@ -90,7 +90,7 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
lock.SetDataShard(Self->TabletID());
lock.SetGeneration(info.GetGeneration());
lock.SetCounter(info.GetInternalGenerationCounter());
lock.SetPathId(writeMeta.GetTableId().GetRawValue());
writeMeta.GetPathId().SchemeShardLocalPathId.ToProto(lock);
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
Expand Down Expand Up @@ -120,8 +120,8 @@ void TTxWrite::DoComplete(const TActorContext& ctx) {
AFL_VERIFY(!writeMeta.HasLongTxId());
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetTableId())) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId().InternalPathId)) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetPathId().InternalPathId,
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit());
EnqueueBackgroundActivities();
BackgroundSessionsManager->Start();
ctx.Send(SelfId(), new NActors::TEvents::TEvWakeup());
ctx.Send(SelfId(), new NActors::TEvents::TEvWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
Expand Down Expand Up @@ -411,10 +411,11 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_pt
TTableStatsBuilder tableStatsBuilder(Counters, Executor());

LOG_S_DEBUG("There are stats for " << tables.size() << " tables");
for (const auto& [pathId, _] : tables) {
for (const auto& [internalPathId, table] : tables) {
const auto& schemeShardLocalPathId = table.GetPathId().SchemeShardLocalPathId;
auto* periodicTableStats = ev->Record.AddTables();
periodicTableStats->SetDatashardId(TabletID());
periodicTableStats->SetTableLocalId(pathId.GetRawValue());
periodicTableStats->SetTableLocalId(schemeShardLocalPathId.GetRawValue());

periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
periodicTableStats->SetGeneration(Executor()->Generation());
Expand All @@ -426,9 +427,9 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_pt
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
}

tableStatsBuilder.FillTableStats(pathId, *(periodicTableStats->MutableTableStats()));
tableStatsBuilder.FillTableStats(internalPathId, *(periodicTableStats->MutableTableStats()));

LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
LOG_S_TRACE("Add stats for table, tableLocalID=" << schemeShardLocalPathId);
}
}

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace TEvColumnShard {

struct TEvInternalScan: public TEventLocal<TEvInternalScan, EvInternalScan> {
private:
YDB_READONLY_DEF(NColumnShard::TInternalPathId, PathId);
YDB_READONLY_DEF(NColumnShard::TUnifiedPathId, PathId);
YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
YDB_READONLY_DEF(std::optional<ui64>, LockId);
YDB_ACCESSOR(bool, Reverse, false);
Expand All @@ -114,7 +114,7 @@ namespace TEvColumnShard {
ColumnIds.emplace_back(id);
}

TEvInternalScan(const NColumnShard::TInternalPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional<ui64> lockId)
TEvInternalScan(const NColumnShard::TUnifiedPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional<ui64> lockId)
: PathId(pathId)
, Snapshot(snapshot)
, LockId(lockId)
Expand Down Expand Up @@ -242,11 +242,11 @@ namespace TEvColumnShard {
struct TEvWrite : public TEventPB<TEvWrite, NKikimrTxColumnShard::TEvWrite, TEvColumnShard::EvWrite> {
TEvWrite() = default;

TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, ui64 tableId,
TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, NColumnShard::TSchemeShardLocalPathId tableId,
const TString& dedupId, const TString& data, const ui32 writePartId,
const NEvWrite::EModificationType modificationType) {
ActorIdToProto(source, Record.MutableSource());
Record.SetTableId(tableId);
tableId.ToProto(Record);
Record.SetDedupId(dedupId);
Record.SetData(data);
Record.SetWritePartId(writePartId);
Expand Down Expand Up @@ -279,7 +279,7 @@ namespace TEvColumnShard {
Record.SetOrigin(origin);
Record.SetTxInitiator(0);
Record.SetWriteId(writeId);
Record.SetTableId(writeMeta.GetTableId().GetRawValue());
writeMeta.GetPathId().SchemeShardLocalPathId.ToProto(Record);
Record.SetDedupId(writeMeta.GetDedupId());
Record.SetStatus(status);
}
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "columnshard__scan.h"
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include "columnshard.h"
#include "columnshard_impl.h"
#include "engines/reader/sys_view/abstract/policy.h"
#include "engines/reader/transaction/tx_scan.h"
#include "engines/reader/transaction/tx_internal_scan.h"

Expand Down Expand Up @@ -30,7 +32,22 @@ void TColumnShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContex
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(TInternalPathId::FromRawValue(record.GetLocalPathId()))->OnReadEvent();
const auto schemeShardLocalPath = TSchemeShardLocalPathId::FromProto(record);
auto internalPathId = TablesManager.ResolveInternalPathId(schemeShardLocalPath);
if (!internalPathId && NOlap::NReader::NSysView::NAbstract::ISysViewPolicy::BuildByPath(record.GetTablePath())) {
internalPathId = TInternalPathId::FromRawValue(schemeShardLocalPath.GetRawValue()); //TODO register ColumnStore in tablesmanager
}
if (!internalPathId) {
const auto& request = ev->Get()->Record;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

какой-то не нужный блок переопределения переменных, которые в дальнейшем 1 раз используются

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

поправил

auto error = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(request.GetGeneration(), TabletID());
error->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST,TStringBuilder() << "table: " << request.GetTablePath() << "not found");
NYql::IssueToMessage(issue, error->Record.MutableIssues()->Add());

ctx.Send(ev->Sender, error.Release());
return;
}
Counters.GetColumnTablesCounters()->GetPathIdCounter(*internalPathId)->OnReadEvent();
ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
Execute(new NOlap::NReader::TTxScan(this, ev), ctx);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,11 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
}

AFL_VERIFY(HasIndex());
const auto& schemeShardLocalPathId = TSchemeShardLocalPathId::FromProto(record.GetTable().GetPathId());
const auto& internalPathId = TablesManager.ResolveInternalPathId(schemeShardLocalPathId);
AFL_VERIFY(internalPathId);
auto index = GetIndexAs<NOlap::TColumnEngineForLogs>();
auto spg = index.GetGranuleOptional(TInternalPathId::FromRawValue(record.GetTable().GetPathId().GetLocalId()));
auto spg = index.GetGranuleOptional(*internalPathId);
AFL_VERIFY(spg);

std::set<ui32> columnTagsRequested;
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
Y_ABORT("invalid function usage");
}

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetPathId())(
"reason", overloadReason);

ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie);
Expand Down Expand Up @@ -162,7 +162,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
} else {
const TMonotonic now = TMonotonic::Now();
Counters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant(), aggr->GetRows());
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetPathId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "")
<< " at tablet " << TabletID());
}
Expand Down Expand Up @@ -438,14 +438,16 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

const auto pathId = TInternalPathId::FromRawValue(operation.GetTableId().GetTableId());

if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
const auto schemeShardLocalPathId = TSchemeShardLocalPathId::FromProto(operation.GetTableId());
const auto& internalPathId = TablesManager.ResolveInternalPathId(schemeShardLocalPathId);
AFL_VERIFY(internalPathId);
const auto& pathId = TUnifiedPathId{*internalPathId, schemeShardLocalPathId};
if (!TablesManager.IsReadyForStartWrite(*internalPathId, false)) {
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();
Counters.GetColumnTablesCounters()->GetPathIdCounter(*internalPathId)->OnWriteEvent();

auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
Expand All @@ -462,7 +464,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (outOfSpace) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded")("source", "dataevent");
}
auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(pathId);
auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(*internalPathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
Expand Down
Loading
Loading