Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/disk_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ struct THistogramRequestCounters
EPublishingPolicy::Repl,
HistCounterOptions};
TLowResCounter ConfirmBlobs{EPublishingPolicy::Repl, HistCounterOptions};
TLowResCounter DeleteObsoleteUnconfirmedBlobs{
EPublishingPolicy::Repl,
HistCounterOptions};

// BlobStorage based with kind and size
TLowResCounter WriteBlob{
Expand All @@ -423,6 +426,7 @@ struct THistogramRequestCounters
MakeMeta<&THistogramRequestCounters::AddConfirmedBlobs>(),
MakeMeta<&THistogramRequestCounters::AddUnconfirmedBlobs>(),
MakeMeta<&THistogramRequestCounters::ConfirmBlobs>(),
MakeMeta<&THistogramRequestCounters::DeleteObsoleteUnconfirmedBlobs>(),

MakeMeta<&THistogramRequestCounters::WriteBlob>(),
MakeMeta<&THistogramRequestCounters::ReadBlob>(),
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ STFUNC(TPartitionActor::StateWork)
IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse);
IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse);
IgnoreFunc(TEvPartitionPrivate::TEvAddConfirmedBlobsResponse);
IgnoreFunc(TEvPartitionPrivate::TEvDeleteObsoleteUnconfirmedBlobsResponse);

// Wakeup function should handle wakeup event taking into account that
// there is wakeup event scheduled during boot stage with
Expand Down Expand Up @@ -1094,6 +1095,7 @@ STFUNC(TPartitionActor::StateZombie)
IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse);
IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse);
IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse);
IgnoreFunc(TEvPartitionPrivate::TEvDeleteObsoleteUnconfirmedBlobsResponse);

IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/kikimr/helpers.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/storage/core/public.h>

#include <cloud/storage/core/libs/tablet/blob_id.h>

#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
#include <contrib/ydb/library/actors/core/hfunc.h>

namespace NCloud::NBlockStore::NStorage::NPartition {

using namespace NActors;
using namespace NKikimr;
using namespace NKikimr::NTabletFlatExecutor;

////////////////////////////////////////////////////////////////////////////////

void TPartitionActor::HandleDeleteObsoleteUnconfirmedBlobs(
const TEvPartitionPrivate::TEvDeleteObsoleteUnconfirmedBlobsRequest::TPtr&
ev,
const TActorContext& ctx)
{
auto* msg = ev->Get();

auto requestInfo =
CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext);

TRequestScope timer(*requestInfo);

LWTRACK(
RequestReceived_Partition,
requestInfo->CallContext->LWOrbit,
"DeleteObsoleteUnconfirmedBlobs",
requestInfo->CallContext->RequestId);

AddTransaction<TEvPartitionPrivate::TDeleteObsoleteUnconfirmedBlobsMethod>(
*requestInfo);

ExecuteTx(
ctx,
CreateTx<TDeleteObsoleteUnconfirmedBlobs>(
requestInfo,
msg->CommitId,
std::move(msg->Blobs)));
}

bool TPartitionActor::PrepareDeleteObsoleteUnconfirmedBlobs(
const TActorContext& ctx,
TTransactionContext& tx,
TTxPartition::TDeleteObsoleteUnconfirmedBlobs& args)
{
Y_UNUSED(ctx);
Y_UNUSED(tx);
Y_UNUSED(args);

return true;
}

void TPartitionActor::ExecuteDeleteObsoleteUnconfirmedBlobs(
const TActorContext& ctx,
TTransactionContext& tx,
TTxPartition::TDeleteObsoleteUnconfirmedBlobs& args)
{
Y_UNUSED(ctx);

TPartitionDatabase db(tx.DB);

State->DeleteUnconfirmedBlobs(db, args.CommitId, args.Blobs);

State->GetGarbageQueue().ReleaseBarrier(args.CommitId);
State->GetCommitQueue().ReleaseBarrier(args.CommitId);
}

void TPartitionActor::CompleteDeleteObsoleteUnconfirmedBlobs(
const TActorContext& ctx,
TTxPartition::TDeleteObsoleteUnconfirmedBlobs& args)
{
TRequestScope timer(*args.RequestInfo);

auto response = std::make_unique<
TEvPartitionPrivate::TEvDeleteObsoleteUnconfirmedBlobsResponse>();
response->ExecCycles = args.RequestInfo->GetExecCycles();

LWTRACK(
ResponseSent_Partition,
args.RequestInfo->CallContext->LWOrbit,
"DeleteObsoleteUnconfirmedBlobs",
args.RequestInfo->CallContext->RequestId);

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
RemoveTransaction(*args.RequestInfo);

auto time =
CyclesToDurationSafe(args.RequestInfo->GetTotalCycles()).MicroSeconds();
PartCounters->RequestCounters.DeleteObsoleteUnconfirmedBlobs.AddRequest(
time);

ProcessCommitQueue(ctx);
}

} // namespace NCloud::NBlockStore::NStorage::NPartition
21 changes: 16 additions & 5 deletions cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/diagnostics/block_digest.h>
#include <cloud/blockstore/libs/diagnostics/critical_events.h>
Expand Down Expand Up @@ -347,13 +348,23 @@ void TPartitionActor::HandleWriteBlocksCompleted(
ProfileLog->Write(std::move(record));
}

if (msg->AddingUnconfirmedBlobsRequested && !HasError(msg->GetError())) {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
if (msg->AddingUnconfirmedBlobsRequested) {
if (HasError(msg->GetError())) {
// blobs are obsolete, delete them directly
auto request = std::make_unique<
TEvPartitionPrivate::TEvDeleteObsoleteUnconfirmedBlobsRequest>(
MakeIntrusive<TCallContext>(CreateRequestId()),
commitId,
std::move(msg->BlobsToConfirm));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

может просто удалим все блобы соответствующего коммита?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

по идее это должно быть редкое событие, наверное можно прям тут ExecuteTx, переделаю

Copy link
Collaborator

@SvartMetal SvartMetal Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

я имел ввиду, не нужно ли удалить все блобы соответствующего коммита, то есть, может быть не нужно передавать BlobsToConfirm?

кажется стоит так делать ради атомарности

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

да, так будет более правильно, но в данном случае результат будет всегда тот же самый, поправил

NCloud::Send(ctx, SelfId(), std::move(request));
} else {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
}
Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired);
// commit & garbage queue barriers will be released when confirmed
// blobs are added
// blobs are added or when obsolete blobs are deleted
} else {
LOG_TRACE(
ctx,
Expand Down
25 changes: 25 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_events_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ using TFlushedCommitIds = TVector<TFlushedCommitId>;
xxx(PatchBlob, __VA_ARGS__) \
xxx(AddConfirmedBlobs, __VA_ARGS__) \
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
xxx(DeleteObsoleteUnconfirmedBlobs, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS_PRIVATE

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -674,6 +675,30 @@ struct TEvPartitionPrivate
{
};

//
// DeleteObsoleteUnconfirmedBlobs
//

struct TDeleteObsoleteUnconfirmedBlobsRequest
{
ui64 CommitId = 0;
TVector<TBlobToConfirm> Blobs;

TDeleteObsoleteUnconfirmedBlobsRequest() = default;

TDeleteObsoleteUnconfirmedBlobsRequest(
ui64 commitId,
TVector<TBlobToConfirm> blobs)
: CommitId(commitId)
, Blobs(std::move(blobs))
{}
};

struct TDeleteObsoleteUnconfirmedBlobsResponse
{
ui64 ExecCycles = 0;
};

//
// AddUnconfirmedBlobs
//
Expand Down
20 changes: 20 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,25 @@ void TPartitionState::WriteUnconfirmedBlob(
UnconfirmedBlobCount++;
}

void TPartitionState::DeleteUnconfirmedBlobs(
TPartitionDatabase& db,
ui64 commitId,
const TVector<TBlobToConfirm>& blobs)
{
for (const auto& blob: blobs) {
auto blobId = MakePartialBlobId(commitId, blob.UniqueId);
db.DeleteUnconfirmedBlob(blobId);
}

auto it = UnconfirmedBlobs.find(commitId);
if (it != UnconfirmedBlobs.end()) {
const auto blobCount = it->second.size();
UnconfirmedBlobs.erase(it);
Y_DEBUG_ABORT_UNLESS(UnconfirmedBlobCount >= blobCount);
UnconfirmedBlobCount -= blobCount;
}
}

void TPartitionState::ConfirmedBlobsAdded(
TPartitionDatabase& db,
ui64 commitId)
Expand Down Expand Up @@ -760,6 +779,7 @@ void TPartitionState::BlobsConfirmed(
UnconfirmedBlobCount -= blobCount;
}


void TPartitionState::ConfirmBlobs(
TPartitionDatabase& db,
const TVector<TPartialBlobId>& unrecoverableBlobs)
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,11 @@ class TPartitionState
ui64 commitId,
const TBlobToConfirm& blob);

void DeleteUnconfirmedBlobs(
TPartitionDatabase& db,
ui64 commitId,
const TVector<TBlobToConfirm>& blobs);

void ConfirmedBlobsAdded(TPartitionDatabase& db, ui64 commitId);

void BlobsConfirmed(ui64 commitId, TVector<TBlobToConfirm> blobs);
Expand Down
26 changes: 26 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(ScanDiskBatch, __VA_ARGS__) \
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
xxx(ConfirmBlobs, __VA_ARGS__) \
xxx(DeleteObsoleteUnconfirmedBlobs, __VA_ARGS__) \
xxx(LoadCompactionMapChunk, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_TRANSACTIONS

Expand Down Expand Up @@ -678,6 +679,31 @@ struct TTxPartition
}
};

//
// DeleteObsoleteUnconfirmedBlobs
//

struct TDeleteObsoleteUnconfirmedBlobs
{
const TRequestInfoPtr RequestInfo;
const ui64 CommitId;
const TVector<TBlobToConfirm> Blobs;

TDeleteObsoleteUnconfirmedBlobs(
TRequestInfoPtr requestInfo,
ui64 commitId,
TVector<TBlobToConfirm> blobs)
: RequestInfo(std::move(requestInfo))
, CommitId(commitId)
, Blobs(std::move(blobs))
{}

void Clear()
{
// nothing to do
}
};

//
// CreateCheckpoint
//
Expand Down
Loading
Loading