Skip to content

Commit 1b35916

Browse files
committed
introduce TLocalPathId
1 parent 5a17a28 commit 1b35916

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+530
-188
lines changed

ydb/core/protos/tx_columnshard.proto

+1
Original file line numberDiff line numberDiff line change
@@ -348,4 +348,5 @@ message TInternalOperationData {
348348
optional uint32 ModificationType = 2;
349349
optional uint64 PathId = 3;
350350
optional bool WritePortions = 4;
351+
optional uint64 LocalPathId = 5;
351352
}

ydb/core/sys_view/common/schema.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ class TSystemViewResolver : public ISystemViewResolver {
290290
RegisterColumnTableSystemView<Schema::PrimaryIndexPortionStats>(TablePrimaryIndexPortionStatsName);
291291
RegisterColumnTableSystemView<Schema::PrimaryIndexGranuleStats>(TablePrimaryIndexGranuleStatsName);
292292
RegisterColumnTableSystemView<Schema::PrimaryIndexOptimizerStats>(TablePrimaryIndexOptimizerStatsName);
293+
RegisterColumnTableSystemView<Schema::TablePathIdMapping>(TablePathIdMappingName);
293294

294295
RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
295296
RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);

ydb/core/sys_view/common/schema.h

+10
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ constexpr TStringBuf TablePrimaryIndexStatsName = "primary_index_stats";
4242
constexpr TStringBuf TablePrimaryIndexPortionStatsName = "primary_index_portion_stats";
4343
constexpr TStringBuf TablePrimaryIndexGranuleStatsName = "primary_index_granule_stats";
4444
constexpr TStringBuf TablePrimaryIndexOptimizerStatsName = "primary_index_optimizer_stats";
45+
constexpr TStringBuf TablePathIdMappingName = "table_path_id_mapping";
4546

4647
constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute";
4748
constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour";
@@ -757,6 +758,15 @@ struct Schema : NIceDb::Schema {
757758
QueryCpuLimitPercentPerNode,
758759
QueryMemoryLimitPercentPerNode>;
759760
};
761+
struct TablePathIdMapping : Table<23> {
762+
struct InternalPathId : Column<1, NScheme::NTypeIds::Uint64> {};
763+
struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {};
764+
struct LocalPathId: Column<3, NScheme::NTypeIds::Uint64> {};
765+
766+
using TKey = TableKey<InternalPathId, TabletId>;
767+
using TColumns = TableColumns<InternalPathId, TabletId, LocalPathId>;
768+
};
769+
760770
};
761771

762772
bool MaybeSystemViewPath(const TVector<TString>& path);

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
5757
if (!operationIds.emplace(operation->GetWriteId()).second) {
5858
continue;
5959
}
60-
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId(), minReadSnapshot));
60+
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId().InternalPathId, minReadSnapshot));
6161
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
6262
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
6363
operation->OnWriteFinish(txc, {}, true);
@@ -75,7 +75,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
7575
lock.SetDataShard(Self->TabletID());
7676
lock.SetGeneration(info.GetGeneration());
7777
lock.SetCounter(info.GetInternalGenerationCounter());
78-
lock.SetPathId(writeMeta.GetPathId().GetInternalPathIdValue());
78+
lock.SetPathId(writeMeta.GetPathId().InternalPathId.GetInternalPathIdValue());
7979
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
8080
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
8181
}
@@ -102,11 +102,11 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
102102
}
103103
const auto& writeMeta = writeResult.GetWriteMeta();
104104
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
105-
pathIds.emplace(op->GetPathId());
105+
pathIds.emplace(op->GetPathId().InternalPathId);
106106
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
107-
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId())) {
107+
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId().InternalPathId)) {
108108
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(
109-
writeMeta.GetPathId(), writeResult.GetPKBatchVerified(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
109+
writeMeta.GetPathId().InternalPathId, writeResult.GetPKBatchVerified(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
110110
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
111111
}
112112
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
4343
const auto minReadSnapshot = Self->GetMinReadSnapshot();
4444
for (auto&& aggr : buffer.GetAggregations()) {
4545
const auto& writeMeta = aggr->GetWriteMeta();
46-
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId(), minReadSnapshot));
46+
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetPathId().InternalPathId, minReadSnapshot));
4747
txc.DB.NoMoreReadsForTx();
4848
TWriteOperation::TPtr operation;
4949
if (writeMeta.HasLongTxId()) {
@@ -105,14 +105,14 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
105105
lock.SetDataShard(Self->TabletID());
106106
lock.SetGeneration(info.GetGeneration());
107107
lock.SetCounter(info.GetInternalGenerationCounter());
108-
lock.SetPathId(writeMeta.GetPathId().GetInternalPathIdValue());
108+
lock.SetPathId(writeMeta.GetPathId().InternalPathId.GetInternalPathIdValue());
109109
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
110110
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
111111
}
112112
} else {
113113
Y_ABORT_UNLESS(aggr->GetInsertWriteIds().size() == 1);
114114
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(
115-
Self->TabletID(), writeMeta, (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
115+
Self->TabletID(), writeMeta.GetPathId().LocalPathId, writeMeta.GetDedupId(), (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
116116
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
117117
}
118118
}
@@ -141,8 +141,8 @@ void TTxWrite::DoComplete(const TActorContext& ctx) {
141141
if (!writeMeta.HasLongTxId()) {
142142
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
143143
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
144-
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId())) {
145-
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetPathId(),
144+
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId().InternalPathId)) {
145+
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetPathId().InternalPathId,
146146
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
147147
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
148148
}

ydb/core/tx/columnshard/columnshard.cpp

+21-20
Original file line numberDiff line numberDiff line change
@@ -406,28 +406,29 @@ void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDa
406406
}
407407

408408
void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) {
409-
auto tables = TablesManager.GetTables();
410409
TTableStatsBuilder tableStatsBuilder(Counters, Executor());
411-
LOG_S_DEBUG("There are stats for " << tables.size() << " tables");
412-
for (const auto& [pathId, _] : tables) {
413-
auto* periodicTableStats = ev->Record.AddTables();
414-
periodicTableStats->SetDatashardId(TabletID());
415-
periodicTableStats->SetTableLocalId(pathId.GetInternalPathIdValue());
416-
417-
periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
418-
periodicTableStats->SetGeneration(Executor()->Generation());
419-
periodicTableStats->SetRound(StatsReportRound++);
420-
periodicTableStats->SetNodeId(ctx.SelfID.NodeId());
421-
periodicTableStats->SetStartTime(StartTime().MilliSeconds());
422-
423-
if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
424-
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
410+
LOG_S_DEBUG("There are stats for " << TablesManager.GetTableCount() << " tables");
411+
TablesManager.ForEachPathId(
412+
[&](const NColumnShard::TInternalPathId pathId) {
413+
auto* periodicTableStats = ev->Record.AddTables();
414+
periodicTableStats->SetDatashardId(TabletID());
415+
periodicTableStats->SetTableLocalId(pathId.GetInternalPathIdValue());
416+
417+
periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
418+
periodicTableStats->SetGeneration(Executor()->Generation());
419+
periodicTableStats->SetRound(StatsReportRound++);
420+
periodicTableStats->SetNodeId(ctx.SelfID.NodeId());
421+
periodicTableStats->SetStartTime(StartTime().MilliSeconds());
422+
423+
if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
424+
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
425+
}
426+
427+
tableStatsBuilder.FillTableStats(pathId, *(periodicTableStats->MutableTableStats()));
428+
429+
LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
425430
}
426-
427-
tableStatsBuilder.FillTableStats(pathId, *(periodicTableStats->MutableTableStats()));
428-
429-
LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
430-
}
431+
);
431432
}
432433

433434
void TColumnShard::SendPeriodicStats() {

ydb/core/tx/columnshard/columnshard.h

+3-8
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,12 @@ namespace TEvColumnShard {
270270
struct TEvWriteResult : public TEventPB<TEvWriteResult, NKikimrTxColumnShard::TEvWriteResult, TEvColumnShard::EvWriteResult> {
271271
TEvWriteResult() = default;
272272

273-
TEvWriteResult(ui64 origin, const NEvWrite::TWriteMeta& writeMeta, ui32 status)
274-
: TEvWriteResult(origin, writeMeta, writeMeta.GetWriteId(), status)
275-
{
276-
}
277-
278-
TEvWriteResult(ui64 origin, const NEvWrite::TWriteMeta& writeMeta, const i64 writeId, ui32 status) {
273+
TEvWriteResult(ui64 origin, const NColumnShard::TLocalPathId& localPathId, TString dedupId, const i64 writeId, ui32 status) {
279274
Record.SetOrigin(origin);
280275
Record.SetTxInitiator(0);
281276
Record.SetWriteId(writeId);
282-
Record.SetTableId(writeMeta.GetPathId().GetInternalPathIdValue());
283-
Record.SetDedupId(writeMeta.GetDedupId());
277+
Record.SetTableId(localPathId.GetLocalPathIdValue());
278+
Record.SetDedupId(dedupId);
284279
Record.SetStatus(status);
285280
}
286281

ydb/core/tx/columnshard/columnshard__scan.cpp

+21-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,29 @@ void TColumnShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContex
3131
return;
3232
}
3333

34-
const auto pathId = TInternalPathId::FromInternalPathIdValue(record.GetLocalPathId());
35-
Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnReadEvent();
34+
const auto localPathId = TLocalPathId::FromLocalPathIdValue(record.GetLocalPathId());
35+
const auto internalPathId = TablesManager.ResolveInternalPathId(localPathId);
36+
if (!internalPathId) {
37+
//TODO FIXME
38+
const auto& request = ev->Get()->Record;
39+
const TString table = request.GetTablePath();
40+
const ui32 scanGen = request.GetGeneration();
41+
const auto scanComputeActor = ev->Sender;
42+
43+
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, TabletID());
44+
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
45+
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
46+
TStringBuilder() << "table not found");
47+
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
48+
49+
ctx.Send(scanComputeActor, ev.Release());
50+
return;
51+
}
52+
53+
Counters.GetColumnTablesCounters()->GetPathIdCounter(*internalPathId)->OnReadEvent();
3654
ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
3755
Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
38-
Execute(new NOlap::NReader::TTxScan(this, ev), ctx);
56+
Execute(new NOlap::NReader::TTxScan(this, ev, *internalPathId), ctx);
3957
}
4058

4159
void TColumnShard::Handle(TEvColumnShard::TEvInternalScan::TPtr& ev, const TActorContext& ctx) {

ydb/core/tx/columnshard/columnshard__statistics.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,18 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
285285
Send(ev->Sender, response.release(), 0, ev->Cookie);
286286
return;
287287
}
288+
auto internalPathId = TablesManager.ResolveInternalPathId(TLocalPathId::FromLocalPathIdValue(record.GetTable().GetPathId().GetLocalId()));
289+
if (!internalPathId) {
290+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "Statistic request for unknown LocalPathId");
291+
respRecord.SetStatus(NKikimrStat::TEvStatisticsResponse::STATUS_ERROR);
292+
293+
Send(ev->Sender, response.release(), 0, ev->Cookie);
294+
return;
295+
}
288296

289297
AFL_VERIFY(HasIndex());
290298
auto index = GetIndexAs<NOlap::TColumnEngineForLogs>();
291-
auto spg = index.GetGranuleOptional(TInternalPathId::FromInternalPathIdValue(record.GetTable().GetPathId().GetLocalId()));
299+
auto spg = index.GetGranuleOptional(*internalPathId);
292300
AFL_VERIFY(spg);
293301

294302
std::set<ui32> columnTagsRequested;

0 commit comments

Comments
 (0)