Skip to content

Commit 4834130

Browse files
ivanmorozov333ivanmorozov333
and
ivanmorozov333
authored
remove deprecated snapshot (#17987)
Co-authored-by: ivanmorozov333 <[email protected]>
1 parent 1353f57 commit 4834130

13 files changed

+141
-39
lines changed

ydb/core/tx/columnshard/engines/db_wrapper.cpp

+8-11
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
6565
using IndexColumns = NColumnShard::Schema::IndexColumns;
6666
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
6767
db.Table<IndexColumns>()
68-
.Key(0, 0, row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(),
69-
portion.GetPortionId(), row.Chunk)
68+
.Key(0, 0, row.ColumnId, 1, 1, portion.GetPortionId(), row.Chunk)
7069
.Update(NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
7170
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
7271
NIceDb::TUpdate<IndexColumns::Blob>(portion.GetBlobId(row.GetBlobRange().GetBlobIdxVerified()).SerializeBinary()),
@@ -96,7 +95,7 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe
9695
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
9796
using IndexColumns = NColumnShard::Schema::IndexColumns;
9897
db.Table<IndexColumns>()
99-
.Key(0, 0, row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(),
98+
.Key(0, 0, row.ColumnId, 1, 1,
10099
portion.GetPortionId(), row.Chunk)
101100
.Delete();
102101
}
@@ -162,8 +161,6 @@ bool TDbWrapper::LoadPortions(const std::optional<TInternalPathId> pathId,
162161
portion->SetShardingVersion(rowset.template GetValue<IndexPortions::ShardingVersion>());
163162
}
164163
portion->SetRemoveSnapshot(rowset.template GetValue<IndexPortions::XPlanStep>(), rowset.template GetValue<IndexPortions::XTxId>());
165-
portion->SetMinSnapshotDeprecated(TSnapshot(
166-
rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>(), rowset.template GetValue<IndexPortions::MinSnapshotTxId>()));
167164

168165
NKikimrTxColumnShard::TIndexPortionMeta metaProto;
169166
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
@@ -211,8 +208,8 @@ void TDbWrapper::EraseIndex(const TPortionInfo& portion, const TIndexChunk& row)
211208
db.Table<IndexIndexes>().Key(portion.GetPathId().GetRawValue(), portion.GetPortionId(), row.GetIndexId(), 0).Delete();
212209
}
213210

214-
bool TDbWrapper::LoadIndexes(
215-
const std::optional<TInternalPathId> pathId, const std::function<void(const TInternalPathId pathId, const ui64 portionId, TIndexChunkLoadContext&&)>& callback) {
211+
bool TDbWrapper::LoadIndexes(const std::optional<TInternalPathId> pathId,
212+
const std::function<void(const TInternalPathId pathId, const ui64 portionId, TIndexChunkLoadContext&&)>& callback) {
216213
NIceDb::TNiceDb db(Database);
217214
using IndexIndexes = NColumnShard::Schema::IndexIndexes;
218215
const auto pred = [&](auto& rowset) {
@@ -222,8 +219,8 @@ bool TDbWrapper::LoadIndexes(
222219

223220
while (!rowset.EndOfSet()) {
224221
NOlap::TIndexChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
225-
callback(TInternalPathId::FromRawValue(rowset.template GetValue<IndexIndexes::PathId>()), rowset.template GetValue<IndexIndexes::PortionId>(),
226-
std::move(chunkLoadContext));
222+
callback(TInternalPathId::FromRawValue(rowset.template GetValue<IndexIndexes::PathId>()),
223+
rowset.template GetValue<IndexIndexes::PortionId>(), std::move(chunkLoadContext));
227224

228225
if (!rowset.Next()) {
229226
return false;
@@ -263,8 +260,8 @@ TConclusion<THashMap<TInternalPathId, std::map<NOlap::TSnapshot, TGranuleShardin
263260
snapshot.DeserializeFromString(rowset.GetValue<Schema::ShardingInfo::Snapshot>()).Validate();
264261
NSharding::TGranuleShardingLogicContainer logic;
265262
logic.DeserializeFromString(rowset.GetValue<Schema::ShardingInfo::Logic>()).Validate();
266-
TGranuleShardingInfo gShardingInfo(
267-
logic, snapshot, rowset.GetValue<Schema::ShardingInfo::VersionId>(), TInternalPathId::FromRawValue(rowset.GetValue<Schema::ShardingInfo::PathId>()));
263+
TGranuleShardingInfo gShardingInfo(logic, snapshot, rowset.GetValue<Schema::ShardingInfo::VersionId>(),
264+
TInternalPathId::FromRawValue(rowset.GetValue<Schema::ShardingInfo::PathId>()));
268265
AFL_VERIFY(result[gShardingInfo.GetPathId()].emplace(gShardingInfo.GetSinceSnapshot(), gShardingInfo).second);
269266

270267
if (!rowset.Next()) {

ydb/core/tx/columnshard/engines/portions/compacted.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ void TCompactedPortionInfo::DoSaveMetaToDatabase(NIceDb::TNiceDb& db) const {
1717
NIceDb::TUpdate<IndexPortions::CommitTxId>(0), NIceDb::TUpdate<IndexPortions::InsertWriteId>(0),
1818
NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
1919
NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
20-
NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(GetMinSnapshotDeprecated().GetPlanStep()),
21-
NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(GetMinSnapshotDeprecated().GetTxId()),
20+
NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(1),
21+
NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(1),
2222
NIceDb::TUpdate<IndexPortions::Metadata>(metaProto.SerializeAsString()));
2323
}
2424

ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {
2727
result->PathId = PathId;
2828
result->PortionId = GetPortionIdVerified();
2929

30-
AFL_VERIFY(MinSnapshotDeprecated);
31-
result->MinSnapshotDeprecated = *MinSnapshotDeprecated;
3230
if (RemoveSnapshot) {
3331
AFL_VERIFY(RemoveSnapshot->Valid());
3432
result->RemoveSnapshot = *RemoveSnapshot;

ydb/core/tx/columnshard/engines/portions/constructor_portion.h

-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ class TPortionInfoConstructor {
2222

2323
TPortionMetaConstructor MetaConstructor;
2424

25-
std::optional<TSnapshot> MinSnapshotDeprecated;
2625
std::optional<TSnapshot> RemoveSnapshot;
2726
std::optional<ui64> SchemaVersion;
2827
std::optional<ui64> ShardingVersion;
@@ -33,7 +32,6 @@ class TPortionInfoConstructor {
3332
TPortionInfoConstructor(TPortionInfo&& portion)
3433
: PathId(portion.GetPathId())
3534
, PortionId(portion.GetPortionId())
36-
, MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated())
3735
, RemoveSnapshot(portion.GetRemoveSnapshotOptional())
3836
, SchemaVersion(portion.GetSchemaVersionVerified())
3937
, ShardingVersion(portion.GetShardingVersionOptional()) {
@@ -55,7 +53,6 @@ class TPortionInfoConstructor {
5553
TPortionInfoConstructor(const TPortionInfo& portion, const bool withMetadata, const bool withMetadataBlobs)
5654
: PathId(portion.GetPathId())
5755
, PortionId(portion.GetPortionId())
58-
, MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated())
5956
, RemoveSnapshot(portion.GetRemoveSnapshotOptional())
6057
, SchemaVersion(portion.GetSchemaVersionVerified())
6158
, ShardingVersion(portion.GetShardingVersionOptional()) {
@@ -114,17 +111,8 @@ class TPortionInfoConstructor {
114111
AFL_VERIFY(PathId);
115112
}
116113

117-
const TSnapshot& GetMinSnapshotDeprecatedVerified() const {
118-
AFL_VERIFY(!!MinSnapshotDeprecated);
119-
return *MinSnapshotDeprecated;
120-
}
121-
122114
std::shared_ptr<ISnapshotSchema> GetSchema(const TVersionedIndex& index) const;
123115

124-
void SetMinSnapshotDeprecated(const TSnapshot& snap) {
125-
MinSnapshotDeprecated = snap;
126-
}
127-
128116
void SetSchemaVersion(const ui64 version) {
129117
AFL_VERIFY(version);
130118
SchemaVersion = version;

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
2323
sb << "(portion_id:" << PortionId << ";"
2424
<< "path_id:" << PathId << ";records_count:" << GetRecordsCount()
2525
<< ";"
26-
"min_schema_snapshot:("
27-
<< MinSnapshotDeprecated.DebugString()
28-
<< ");"
2926
"schema_version:"
3027
<< SchemaVersion
3128
<< ";"

ydb/core/tx/columnshard/engines/portions/portion_info.h

-5
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ class TPortionInfo {
9090

9191
TInternalPathId PathId;
9292
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
93-
TSnapshot MinSnapshotDeprecated = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
9493
TSnapshot RemoveSnapshot = TSnapshot::Zero();
9594
ui64 SchemaVersion = 0;
9695
std::optional<ui64> ShardingVersion;
@@ -118,10 +117,6 @@ class TPortionInfo {
118117
virtual EPortionType GetPortionType() const = 0;
119118
virtual bool IsCommitted() const = 0;
120119

121-
const TSnapshot& GetMinSnapshotDeprecated() const {
122-
return MinSnapshotDeprecated;
123-
}
124-
125120
ui64 GetMemorySize() const {
126121
return sizeof(TPortionInfo) + Meta.GetMemorySize() - sizeof(TPortionMeta);
127122
}

ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncP
117117
}
118118

119119
TPortionAccessorConstructor constructor = TPortionAccessorConstructor::BuildForRewriteBlobs(source.PortionInfo.GetPortionInfo());
120-
constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0));
121120
constructor.MutablePortionConstructor().SetSchemaVersion(to->GetVersion());
122121
constructor.MutablePortionConstructor().MutableMeta().ResetTierName(targetTier);
123122

ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::Bui
3939
return TPortionAccessorConstructor(std::make_unique<TCompactedPortionInfoConstructor>(granule));
4040
}
4141
}();
42-
constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0));
4342
constructor.MutablePortionConstructor().SetSchemaVersion(schemaVersion);
4443
return BuildByBlobs(std::move(chunks), inplaceChunks, std::move(constructor), operators);
4544
}

ydb/core/tx/columnshard/engines/portions/written.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ void TWrittenPortionInfo::DoSaveMetaToDatabase(NIceDb::TNiceDb& db) const {
2020
NIceDb::TUpdate<IndexPortions::InsertWriteId>((ui64)*InsertWriteId),
2121
NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
2222
NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
23-
NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(GetMinSnapshotDeprecated().GetPlanStep()),
24-
NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(GetMinSnapshotDeprecated().GetTxId()),
23+
NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(1),
24+
NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(1),
2525
NIceDb::TUpdate<IndexPortions::Metadata>(metaProto.SerializeAsString()));
2626
}
2727

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ enum class ENormalizerSequentialId : ui32 {
6969
DeprecatedRestoreV1Chunks_V1,
7070
RestoreV1Chunks_V2,
7171
RestoreV2Chunks,
72+
CleanDeprecatedSnapshot,
7273

7374
MAX
7475
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#include "clean_deprecated_snapshot.h"
2+
3+
#include <ydb/core/protos/config.pb.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
namespace NKikimr::NOlap::NCleanDeprecatedSnapshot {
7+
8+
std::optional<std::vector<TColumnChunkLoadContext>> GetChunksToRewrite(
9+
NTabletFlatExecutor::TTransactionContext& txc, NColumnShard::TBlobGroupSelector& dsGroupSelector) {
10+
using namespace NColumnShard;
11+
NIceDb::TNiceDb db(txc.DB);
12+
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
13+
return std::nullopt;
14+
}
15+
16+
std::vector<TColumnChunkLoadContext> chunksToRewrite;
17+
auto rowset = db.Table<Schema::IndexColumns>().Select();
18+
if (!rowset.IsReady()) {
19+
return std::nullopt;
20+
}
21+
while (!rowset.EndOfSet()) {
22+
TColumnChunkLoadContext chunk(rowset, &dsGroupSelector);
23+
if (chunk.GetMinSnapshotDeprecated().GetPlanStep() != 1 || chunk.GetMinSnapshotDeprecated().GetTxId() != 1) {
24+
chunksToRewrite.emplace_back(chunk);
25+
}
26+
if (!rowset.Next()) {
27+
return std::nullopt;
28+
}
29+
}
30+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("tasks_for_rewrite", chunksToRewrite.size());
31+
return chunksToRewrite;
32+
}
33+
34+
class TChanges: public INormalizerChanges {
35+
private:
36+
const std::vector<TColumnChunkLoadContext> Chunks;
37+
38+
public:
39+
TChanges(std::vector<TColumnChunkLoadContext>&& chunks)
40+
: Chunks(std::move(chunks)) {
41+
}
42+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
43+
using namespace NColumnShard;
44+
NIceDb::TNiceDb db(txc.DB);
45+
for (const auto& i : Chunks) {
46+
db.Table<Schema::IndexColumns>()
47+
.Key(0, 0, i.GetAddress().GetColumnId(), i.GetMinSnapshotDeprecated().GetPlanStep(), i.GetMinSnapshotDeprecated().GetTxId(),
48+
i.GetPortionId(), i.GetAddress().GetChunkIdx())
49+
.Delete();
50+
db.Table<Schema::IndexColumns>()
51+
.Key(0, 0, i.GetAddress().GetColumnId(), 1, 1, i.GetPortionId(), i.GetAddress().GetChunkIdx())
52+
.Update(NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(i.GetRemoveSnapshot().GetPlanStep()),
53+
NIceDb::TUpdate<Schema::IndexColumns::XTxId>(i.GetRemoveSnapshot().GetTxId()),
54+
NIceDb::TUpdate<Schema::IndexColumns::Blob>(i.GetBlobRange().BlobId.SerializeBinary()),
55+
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(i.GetMetaProto().SerializeAsString()),
56+
NIceDb::TUpdate<Schema::IndexColumns::Offset>(i.GetBlobRange().Offset),
57+
NIceDb::TUpdate<Schema::IndexColumns::Size>(i.GetBlobRange().Size),
58+
NIceDb::TUpdate<Schema::IndexColumns::PathId>(i.GetPathId().GetRawValue()));
59+
}
60+
ACFL_WARN("normalizer", "TCleanDeprecatedSnapshotNormalizer")("message", TStringBuilder() << Chunks.size() << " portions rewrited");
61+
return true;
62+
}
63+
64+
ui64 GetSize() const override {
65+
return Chunks.size();
66+
}
67+
};
68+
69+
TConclusion<std::vector<INormalizerTask::TPtr>> TCleanDeprecatedSnapshotNormalizer::DoInit(
70+
const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
71+
using namespace NColumnShard;
72+
auto batchesToDelete = GetChunksToRewrite(txc, DsGroupSelector);
73+
if (!batchesToDelete) {
74+
return TConclusionStatus::Fail("Not ready");
75+
}
76+
77+
std::vector<INormalizerTask::TPtr> result;
78+
std::vector<TColumnChunkLoadContext> chunks;
79+
for (auto&& b : *batchesToDelete) {
80+
chunks.emplace_back(b);
81+
if (chunks.size() == 1000) {
82+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(chunks))));
83+
chunks.clear();
84+
}
85+
}
86+
if (chunks.size()) {
87+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(chunks))));
88+
}
89+
return result;
90+
}
91+
92+
} // namespace NKikimr::NOlap::NCleanDeprecatedSnapshot
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
5+
namespace NKikimr::NOlap::NCleanDeprecatedSnapshot {
6+
7+
class TCleanDeprecatedSnapshotNormalizer : public TNormalizationController::INormalizerComponent {
8+
private:
9+
using TBase = TNormalizationController::INormalizerComponent;
10+
static TString ClassName() {
11+
return "CleanDeprecatedSnapshot";
12+
}
13+
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanDeprecatedSnapshotNormalizer>(ClassName());
14+
15+
NColumnShard::TBlobGroupSelector DsGroupSelector;
16+
17+
public:
18+
TCleanDeprecatedSnapshotNormalizer(const TNormalizationController::TInitContext& info)
19+
: TBase(info)
20+
, DsGroupSelector(info.GetStorageInfo()) {
21+
}
22+
23+
std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
24+
return ENormalizerSequentialId::CleanDeprecatedSnapshot;
25+
}
26+
27+
TString GetClassName() const override {
28+
return ClassName();
29+
}
30+
31+
TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
32+
};
33+
34+
} //namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/portion/ya.make

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ SRCS(
1212
GLOBAL restore_v1_chunks.cpp
1313
GLOBAL restore_v2_chunks.cpp
1414
GLOBAL leaked_blobs.cpp
15+
GLOBAL clean_deprecated_snapshot.cpp
16+
1517
)
1618

1719
PEERDIR(

0 commit comments

Comments
 (0)