Skip to content

Refactor import s3 state proto so that it can contain not only checksum state, but other info too #16024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions ydb/core/backup/common/checksum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class TSHA256 : public IChecksum {
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

TChecksumState GetState() const override {
TChecksumState state;
void GetState(TS3DownloadState& state) const override {
auto& sha256State = *state.MutableSha256State();

for (ui32 h : Context.h) {
Expand All @@ -49,11 +48,9 @@ class TSHA256 : public IChecksum {
}
sha256State.SetNum(Context.num);
sha256State.SetMdLen(Context.md_len);

return state;
}

void Continue(const TChecksumState& state) override {
void Continue(const TS3DownloadState& state) override {
const auto& sha256State = state.GetSha256State();
SHA256_Init(&Context);
FillArrayFromProto(Context.h, sha256State.GetH());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/backup/common/checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace NKikimr::NBackup {

using NKikimrBackup::TChecksumState;
using NKikimrBackup::TS3DownloadState;

class IChecksum {
public:
Expand All @@ -17,8 +17,8 @@ class IChecksum {
virtual void AddData(TStringBuf data) = 0;
virtual TString Finalize() = 0;

virtual TChecksumState GetState() const = 0;
virtual void Continue(const TChecksumState& state) = 0;
virtual void GetState(TS3DownloadState& state) const = 0;
virtual void Continue(const TS3DownloadState& state) = 0;
};

IChecksum* CreateChecksum();
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/protos/checksum.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ message TSha256State {
optional uint32 MdLen = 6;
}

// Used to serialize the intermediate state of a checksum.
message TChecksumState {
oneof state {
// Used to serialize the intermediate state of S3 import process:
// - checksum calculation
// - encrypted file reader
message TS3DownloadState {
oneof ChecksumState {
TSha256State Sha256State = 1;
}
}
9 changes: 6 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,10 @@ class TDataShard
struct ProcessedBytes : Column<4, NScheme::NTypeIds::Uint64> {};
struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {};
struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {};
struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; };
// Column that stores proto with state of particular S3 download.
// For historical reasons its name is ChecksumState,
// but it stores not only state of checksum calculation.
struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TS3DownloadState; };

using TKey = TableKey<TxId>;
using TColumns = TableColumns<
Expand Down Expand Up @@ -3391,14 +3394,14 @@ class TDataShard
}
for (const auto& pi : SysTablesPartOwners) {
ev->Record.AddSysTablesPartOwners(pi);
}
}
}

ev->Record.MutableTableStats()->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_PREPARE_IMMEDIATE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_IMMEDIATE].Get());
ev->Record.MutableTableStats()->SetPlannedTxCompleted(TabletCounters->Cumulative()[COUNTER_PLANNED_TX_COMPLETE].Get());
ev->Record.MutableTableStats()->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_PREPARE_OVERLOADED].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOADED].Get());
ev->Record.MutableTableStats()->SetTxRejectedBySpace(
TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get()
TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get()
+ TabletCounters->Cumulative()[COUNTER_PREPARE_DISK_SPACE_EXHAUSTED].Get()
+ TabletCounters->Cumulative()[COUNTER_WRITE_OUT_OF_SPACE].Get()
+ TabletCounters->Cumulative()[COUNTER_WRITE_DISK_SPACE_EXHAUSTED].Get()
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_s3_download.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ struct TS3Download {
ui64 ProcessedBytes = 0;
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
NKikimrBackup::TChecksumState ChecksumState;
NKikimrBackup::TS3DownloadState DownloadState;

void Out(IOutputStream& out) const {
out << "{"
<< " DataETag: " << DataETag
<< " ProcessedBytes: " << ProcessedBytes
<< " WrittenBytes: " << WrittenBytes
<< " WrittenRows: " << WrittenRows
<< " ChecksumState: " << ChecksumState.ShortDebugString()
<< " DownloadState: " << DownloadState.ShortDebugString()
<< " }";
}
};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_s3_downloads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) {
info.WrittenRows = rowset.GetValueOrDefault<Schema::S3Downloads::WrittenRows>(0);

if (rowset.HaveValue<Schema::S3Downloads::ChecksumState>()) {
info.ChecksumState = rowset.GetValue<Schema::S3Downloads::ChecksumState>();
info.DownloadState = rowset.GetValue<Schema::S3Downloads::ChecksumState>();
}

if (!rowset.Next()) {
Expand Down Expand Up @@ -61,7 +61,7 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co
NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows),
NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState)
NIceDb::TUpdate<Schema::S3Downloads::DownloadState>(newInfo.DownloadState)
);

return info;
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/datashard/import_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,12 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
}

TChecksumState GetChecksumState() const {
TChecksumState checksumState;
TS3DownloadState GetDownloadState() const {
TS3DownloadState downloadState;
if (Checksum) {
checksumState = Checksum->GetState();
Checksum->GetState(downloadState);
}
return checksumState;
return downloadState;
}

void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) {
Expand All @@ -435,7 +435,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
const auto& info = ev->Get()->Info;
if (!info.DataETag) {
Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, {
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetDownloadState()
}));
return;
}
Expand All @@ -456,7 +456,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
WrittenBytes = info.WrittenBytes;
WrittenRows = info.WrittenRows;
if (Checksum) {
Checksum->Continue(info.ChecksumState);
Checksum->Continue(info.DownloadState);
}

if (!ContentLength || ProcessedBytes >= ContentLength) {
Expand Down Expand Up @@ -615,7 +615,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", size# " << record->ByteSizeLong());

Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, {
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetDownloadState()
}));
}

Expand Down
Loading