Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/disk_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ struct THistogramRequestCounters
EPublishingPolicy::Repl,
HistCounterOptions};
TLowResCounter ConfirmBlobs{EPublishingPolicy::Repl, HistCounterOptions};
TLowResCounter DeleteUnconfirmedBlobs{
EPublishingPolicy::Repl,
HistCounterOptions};

// BlobStorage based with kind and size
TLowResCounter WriteBlob{
Expand All @@ -423,6 +426,7 @@ struct THistogramRequestCounters
MakeMeta<&THistogramRequestCounters::AddConfirmedBlobs>(),
MakeMeta<&THistogramRequestCounters::AddUnconfirmedBlobs>(),
MakeMeta<&THistogramRequestCounters::ConfirmBlobs>(),
MakeMeta<&THistogramRequestCounters::DeleteUnconfirmedBlobs>(),

MakeMeta<&THistogramRequestCounters::WriteBlob>(),
MakeMeta<&THistogramRequestCounters::ReadBlob>(),
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ STFUNC(TPartitionActor::StateWork)
IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse);
IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse);
IgnoreFunc(TEvPartitionPrivate::TEvAddConfirmedBlobsResponse);
IgnoreFunc(TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse);

// Wakeup function should handle wakeup event taking into account that
// there is wakeup event scheduled during boot stage with
Expand Down Expand Up @@ -1094,6 +1095,7 @@ STFUNC(TPartitionActor::StateZombie)
IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse);
IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse);
IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse);
IgnoreFunc(TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse);

IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/kikimr/helpers.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/storage/core/public.h>

#include <cloud/storage/core/libs/tablet/blob_id.h>

#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
#include <contrib/ydb/library/actors/core/hfunc.h>

namespace NCloud::NBlockStore::NStorage::NPartition {

using namespace NActors;
using namespace NKikimr;
using namespace NKikimr::NTabletFlatExecutor;

////////////////////////////////////////////////////////////////////////////////

void TPartitionActor::HandleDeleteUnconfirmedBlobs(
const TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest::TPtr& ev,
const TActorContext& ctx)
{
auto* msg = ev->Get();

auto requestInfo =
CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext);

TRequestScope timer(*requestInfo);

LWTRACK(
RequestReceived_Partition,
requestInfo->CallContext->LWOrbit,
"DeleteUnconfirmedBlobs",
requestInfo->CallContext->RequestId);

AddTransaction<TEvPartitionPrivate::TDeleteUnconfirmedBlobsMethod>(
*requestInfo);

ExecuteTx(
ctx,
CreateTx<TDeleteUnconfirmedBlobs>(
requestInfo,
msg->CommitId));
}

bool TPartitionActor::PrepareDeleteUnconfirmedBlobs(
const TActorContext& ctx,
TTransactionContext& tx,
TTxPartition::TDeleteUnconfirmedBlobs& args)
{
Y_UNUSED(ctx);
Y_UNUSED(tx);
Y_UNUSED(args);

return true;
}

void TPartitionActor::ExecuteDeleteUnconfirmedBlobs(
const TActorContext& ctx,
TTransactionContext& tx,
TTxPartition::TDeleteUnconfirmedBlobs& args)
{
Y_UNUSED(ctx);

TPartitionDatabase db(tx.DB);

State->DeleteUnconfirmedBlobs(db, args.CommitId);

State->GetGarbageQueue().ReleaseBarrier(args.CommitId);
State->GetCommitQueue().ReleaseBarrier(args.CommitId);
}

void TPartitionActor::CompleteDeleteUnconfirmedBlobs(
const TActorContext& ctx,
TTxPartition::TDeleteUnconfirmedBlobs& args)
{
TRequestScope timer(*args.RequestInfo);

auto response = std::make_unique<
TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse>();
response->ExecCycles = args.RequestInfo->GetExecCycles();

LWTRACK(
ResponseSent_Partition,
args.RequestInfo->CallContext->LWOrbit,
"DeleteUnconfirmedBlobs",
args.RequestInfo->CallContext->RequestId);

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
RemoveTransaction(*args.RequestInfo);

auto time =
CyclesToDurationSafe(args.RequestInfo->GetTotalCycles()).MicroSeconds();
PartCounters->RequestCounters.DeleteUnconfirmedBlobs.AddRequest(time);

ProcessCommitQueue(ctx);
}

} // namespace NCloud::NBlockStore::NStorage::NPartition
29 changes: 23 additions & 6 deletions cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/diagnostics/block_digest.h>
#include <cloud/blockstore/libs/diagnostics/critical_events.h>
Expand Down Expand Up @@ -347,13 +348,29 @@ void TPartitionActor::HandleWriteBlocksCompleted(
ProfileLog->Write(std::move(record));
}

if (msg->AddingUnconfirmedBlobsRequested && !HasError(msg->GetError())) {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired);
if (msg->AddingUnconfirmedBlobsRequested) {
if (HasError(msg->GetError())) {
// blobs are obsolete, delete them directly
auto request = std::make_unique<
TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest>(
MakeIntrusive<TCallContext>(CreateRequestId()),
commitId);
NCloud::Send(ctx, SelfId(), std::move(request));
} else {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
}
STORAGE_VERIFY(
msg->CollectGarbageBarrierAcquired,
TWellKnownEntityTypes::TABLET,
TabletID());
STORAGE_VERIFY(
!msg->TrimFreshLogBarrierAcquired,
TWellKnownEntityTypes::TABLET,
TabletID());
// commit & garbage queue barriers will be released when confirmed
// blobs are added
// blobs are added or when obsolete blobs are deleted
} else {
LOG_TRACE(
ctx,
Expand Down
21 changes: 21 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_events_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ using TFlushedCommitIds = TVector<TFlushedCommitId>;
xxx(PatchBlob, __VA_ARGS__) \
xxx(AddConfirmedBlobs, __VA_ARGS__) \
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
xxx(DeleteUnconfirmedBlobs, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS_PRIVATE

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -698,6 +699,26 @@ struct TEvPartitionPrivate
ui64 ExecCycles = 0;
};

//
// DeleteUnconfirmedBlobs
//

struct TDeleteUnconfirmedBlobsRequest
{
ui64 CommitId = 0;

TDeleteUnconfirmedBlobsRequest() = default;

explicit TDeleteUnconfirmedBlobsRequest(ui64 commitId)
: CommitId(commitId)
{}
};

struct TDeleteUnconfirmedBlobsResponse
{
ui64 ExecCycles = 0;
};

//
// OperationCompleted
//
Expand Down
19 changes: 19 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,25 @@ void TPartitionState::WriteUnconfirmedBlob(
UnconfirmedBlobCount++;
}

void TPartitionState::DeleteUnconfirmedBlobs(
TPartitionDatabase& db,
ui64 commitId)
{
auto it = UnconfirmedBlobs.find(commitId);
if (it != UnconfirmedBlobs.end()) {
const auto& blobs = it->second;
for (const auto& blob: blobs) {
auto blobId = MakePartialBlobId(commitId, blob.UniqueId);
db.DeleteUnconfirmedBlob(blobId);
}

const auto blobCount = blobs.size();
UnconfirmedBlobs.erase(it);
Y_DEBUG_ABORT_UNLESS(UnconfirmedBlobCount >= blobCount);
UnconfirmedBlobCount -= blobCount;
}
}

void TPartitionState::ConfirmedBlobsAdded(
TPartitionDatabase& db,
ui64 commitId)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,10 @@ class TPartitionState
ui64 commitId,
const TBlobToConfirm& blob);

void DeleteUnconfirmedBlobs(
TPartitionDatabase& db,
ui64 commitId);

void ConfirmedBlobsAdded(TPartitionDatabase& db, ui64 commitId);

void BlobsConfirmed(ui64 commitId, TVector<TBlobToConfirm> blobs);
Expand Down
23 changes: 23 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(ScanDiskBatch, __VA_ARGS__) \
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
xxx(ConfirmBlobs, __VA_ARGS__) \
xxx(DeleteUnconfirmedBlobs, __VA_ARGS__) \
xxx(LoadCompactionMapChunk, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_TRANSACTIONS

Expand Down Expand Up @@ -678,6 +679,28 @@ struct TTxPartition
}
};

//
// DeleteUnconfirmedBlobs
//

struct TDeleteUnconfirmedBlobs
{
const TRequestInfoPtr RequestInfo;
const ui64 CommitId;

TDeleteUnconfirmedBlobs(
TRequestInfoPtr requestInfo,
ui64 commitId)
: RequestInfo(std::move(requestInfo))
, CommitId(commitId)
{}

void Clear()
{
// nothing to do
}
};

//
// CreateCheckpoint
//
Expand Down
Loading
Loading