diff --git a/cloud/blockstore/libs/storage/core/disk_counters.h b/cloud/blockstore/libs/storage/core/disk_counters.h index c6c7ff668d6..54a148d67ec 100644 --- a/cloud/blockstore/libs/storage/core/disk_counters.h +++ b/cloud/blockstore/libs/storage/core/disk_counters.h @@ -397,6 +397,9 @@ struct THistogramRequestCounters EPublishingPolicy::Repl, HistCounterOptions}; TLowResCounter ConfirmBlobs{EPublishingPolicy::Repl, HistCounterOptions}; + TLowResCounter DeleteUnconfirmedBlobs{ + EPublishingPolicy::Repl, + HistCounterOptions}; // BlobStorage based with kind and size TLowResCounter WriteBlob{ @@ -423,6 +426,7 @@ struct THistogramRequestCounters MakeMeta<&THistogramRequestCounters::AddConfirmedBlobs>(), MakeMeta<&THistogramRequestCounters::AddUnconfirmedBlobs>(), MakeMeta<&THistogramRequestCounters::ConfirmBlobs>(), + MakeMeta<&THistogramRequestCounters::DeleteUnconfirmedBlobs>(), MakeMeta<&THistogramRequestCounters::WriteBlob>(), MakeMeta<&THistogramRequestCounters::ReadBlob>(), diff --git a/cloud/blockstore/libs/storage/partition/part_actor.cpp b/cloud/blockstore/libs/storage/partition/part_actor.cpp index 8371d23953a..ff3789f7fea 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor.cpp @@ -1033,6 +1033,7 @@ STFUNC(TPartitionActor::StateWork) IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse); IgnoreFunc(TEvPartitionPrivate::TEvAddConfirmedBlobsResponse); + IgnoreFunc(TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse); // Wakeup function should handle wakeup event taking into account that // there is wakeup event scheduled during boot stage with @@ -1094,6 +1095,7 @@ STFUNC(TPartitionActor::StateZombie) IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse); IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); IgnoreFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogResponse); + IgnoreFunc(TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse); IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_deleteunconfirmedblobs.cpp b/cloud/blockstore/libs/storage/partition/part_actor_deleteunconfirmedblobs.cpp new file mode 100644 index 00000000000..7c47079f478 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_actor_deleteunconfirmedblobs.cpp @@ -0,0 +1,100 @@ +#include "part_actor.h" + +#include +#include +#include + +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + +//////////////////////////////////////////////////////////////////////////////// + +void TPartitionActor::HandleDeleteUnconfirmedBlobs( + const TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + + TRequestScope timer(*requestInfo); + + LWTRACK( + RequestReceived_Partition, + requestInfo->CallContext->LWOrbit, + "DeleteUnconfirmedBlobs", + requestInfo->CallContext->RequestId); + + AddTransaction( + *requestInfo); + + ExecuteTx( + ctx, + CreateTx( + requestInfo, + msg->CommitId)); +} + +bool TPartitionActor::PrepareDeleteUnconfirmedBlobs( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TDeleteUnconfirmedBlobs& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(tx); + Y_UNUSED(args); + + return true; +} + +void TPartitionActor::ExecuteDeleteUnconfirmedBlobs( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TDeleteUnconfirmedBlobs& args) +{ + Y_UNUSED(ctx); + + TPartitionDatabase db(tx.DB); + + State->DeleteUnconfirmedBlobs(db, args.CommitId); + + State->GetGarbageQueue().ReleaseBarrier(args.CommitId); + State->GetCommitQueue().ReleaseBarrier(args.CommitId); +} + +void TPartitionActor::CompleteDeleteUnconfirmedBlobs( + const TActorContext& ctx, + TTxPartition::TDeleteUnconfirmedBlobs& args) +{ + TRequestScope timer(*args.RequestInfo); + + auto response = std::make_unique< + TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsResponse>(); + response->ExecCycles = args.RequestInfo->GetExecCycles(); + + LWTRACK( + ResponseSent_Partition, + args.RequestInfo->CallContext->LWOrbit, + "DeleteUnconfirmedBlobs", + args.RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); + RemoveTransaction(*args.RequestInfo); + + auto time = + CyclesToDurationSafe(args.RequestInfo->GetTotalCycles()).MicroSeconds(); + PartCounters->RequestCounters.DeleteUnconfirmedBlobs.AddRequest(time); + + ProcessCommitQueue(ctx); +} + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp b/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp index f1dc80ac491..7e4fd69643a 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp @@ -1,5 +1,6 @@ #include "part_actor.h" +#include #include #include #include @@ -347,13 +348,29 @@ void TPartitionActor::HandleWriteBlocksCompleted( ProfileLog->Write(std::move(record)); } - if (msg->AddingUnconfirmedBlobsRequested && !HasError(msg->GetError())) { - // blobs are confirmed, but AddBlobs request will be executed - // (for this commit) later - State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm)); - Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired); + if (msg->AddingUnconfirmedBlobsRequested) { + if (HasError(msg->GetError())) { + // blobs are obsolete, delete them directly + auto request = std::make_unique< + TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest>( + MakeIntrusive(CreateRequestId()), + commitId); + NCloud::Send(ctx, SelfId(), std::move(request)); + } else { + // blobs are confirmed, but AddBlobs request will be executed + // (for this commit) later + State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm)); + } + STORAGE_VERIFY( + msg->CollectGarbageBarrierAcquired, + TWellKnownEntityTypes::TABLET, + TabletID()); + STORAGE_VERIFY( + !msg->TrimFreshLogBarrierAcquired, + TWellKnownEntityTypes::TABLET, + TabletID()); // commit & garbage queue barriers will be released when confirmed - // blobs are added + // blobs are added or when obsolete blobs are deleted } else { LOG_TRACE( ctx, diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 2697432c436..9430cf6d735 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -179,6 +179,7 @@ using TFlushedCommitIds = TVector; xxx(PatchBlob, __VA_ARGS__) \ xxx(AddConfirmedBlobs, __VA_ARGS__) \ xxx(AddUnconfirmedBlobs, __VA_ARGS__) \ + xxx(DeleteUnconfirmedBlobs, __VA_ARGS__) \ // BLOCKSTORE_PARTITION_REQUESTS_PRIVATE //////////////////////////////////////////////////////////////////////////////// @@ -698,6 +699,26 @@ struct TEvPartitionPrivate ui64 ExecCycles = 0; }; + // + // DeleteUnconfirmedBlobs + // + + struct TDeleteUnconfirmedBlobsRequest + { + ui64 CommitId = 0; + + TDeleteUnconfirmedBlobsRequest() = default; + + explicit TDeleteUnconfirmedBlobsRequest(ui64 commitId) + : CommitId(commitId) + {} + }; + + struct TDeleteUnconfirmedBlobsResponse + { + ui64 ExecCycles = 0; + }; + // // OperationCompleted // diff --git a/cloud/blockstore/libs/storage/partition/part_state.cpp b/cloud/blockstore/libs/storage/partition/part_state.cpp index 019d101a619..707cfb02066 100644 --- a/cloud/blockstore/libs/storage/partition/part_state.cpp +++ b/cloud/blockstore/libs/storage/partition/part_state.cpp @@ -706,6 +706,25 @@ void TPartitionState::WriteUnconfirmedBlob( UnconfirmedBlobCount++; } +void TPartitionState::DeleteUnconfirmedBlobs( + TPartitionDatabase& db, + ui64 commitId) +{ + auto it = UnconfirmedBlobs.find(commitId); + if (it != UnconfirmedBlobs.end()) { + const auto& blobs = it->second; + for (const auto& blob: blobs) { + auto blobId = MakePartialBlobId(commitId, blob.UniqueId); + db.DeleteUnconfirmedBlob(blobId); + } + + const auto blobCount = blobs.size(); + UnconfirmedBlobs.erase(it); + Y_DEBUG_ABORT_UNLESS(UnconfirmedBlobCount >= blobCount); + UnconfirmedBlobCount -= blobCount; + } +} + void TPartitionState::ConfirmedBlobsAdded( TPartitionDatabase& db, ui64 commitId) diff --git a/cloud/blockstore/libs/storage/partition/part_state.h b/cloud/blockstore/libs/storage/partition/part_state.h index e6c32cf59aa..2e25cdfde01 100644 --- a/cloud/blockstore/libs/storage/partition/part_state.h +++ b/cloud/blockstore/libs/storage/partition/part_state.h @@ -1370,6 +1370,10 @@ class TPartitionState ui64 commitId, const TBlobToConfirm& blob); + void DeleteUnconfirmedBlobs( + TPartitionDatabase& db, + ui64 commitId); + void ConfirmedBlobsAdded(TPartitionDatabase& db, ui64 commitId); void BlobsConfirmed(ui64 commitId, TVector blobs); diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index b3c3f290c86..f72487d64b7 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -62,6 +62,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition { xxx(ScanDiskBatch, __VA_ARGS__) \ xxx(AddUnconfirmedBlobs, __VA_ARGS__) \ xxx(ConfirmBlobs, __VA_ARGS__) \ + xxx(DeleteUnconfirmedBlobs, __VA_ARGS__) \ xxx(LoadCompactionMapChunk, __VA_ARGS__) \ // BLOCKSTORE_PARTITION_TRANSACTIONS @@ -678,6 +679,28 @@ struct TTxPartition } }; + // + // DeleteUnconfirmedBlobs + // + + struct TDeleteUnconfirmedBlobs + { + const TRequestInfoPtr RequestInfo; + const ui64 CommitId; + + TDeleteUnconfirmedBlobs( + TRequestInfoPtr requestInfo, + ui64 commitId) + : RequestInfo(std::move(requestInfo)) + , CommitId(commitId) + {} + + void Clear() + { + // nothing to do + } + }; + // // CreateCheckpoint // diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index 6afdd6edb80..f27dc1b8784 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -158,67 +158,66 @@ class TDummyActor final //////////////////////////////////////////////////////////////////////////////// -class TEventExecutionOrderController +class TEventExecutionOrderFilter { public: - TEventExecutionOrderController( + TEventExecutionOrderFilter( TTestActorRuntimeBase& runtime, - const TVector>& eventOrders, - TTestActorRuntimeBase::TEventFilter baseFilter = nullptr) + const TVector>& eventOrders) + : Runtime(runtime) { for (const auto& [prerequisiteEvent, dependentEvent]: eventOrders) { EventDependencies[dependentEvent] = prerequisiteEvent; } + } - runtime.SetEventFilter( - [this, baseFilter, &runtime]( - TTestActorRuntimeBase& rt, - TAutoPtr& ev) -> bool - { - Y_ABORT_UNLESS(ev); - - TActorId recipient = ev->GetRecipientRewrite(); - ui32 eventType = ev->GetTypeRewrite(); - - bool baseFilterResult = baseFilter ? baseFilter(rt, ev) : false; - - auto prerequisiteEventIt = EventDependencies.find(eventType); - if (prerequisiteEventIt != EventDependencies.end()) { - ui32 prerequisiteEvent = prerequisiteEventIt->second; - auto& processedEventsByRecipient = - ProcessedEvents[recipient]; - if (!processedEventsByRecipient.contains( - prerequisiteEvent)) { - DelayedEvents[recipient][prerequisiteEvent] = - std::move(ev); - return true; - } - } + TTestActorRuntimeBase::TEventFilter operator()( + TTestActorRuntimeBase::TEventFilter baseFilter = nullptr) + { + return [this, baseFilter]( + TTestActorRuntimeBase& rt, + TAutoPtr& ev) -> bool + { + Y_ABORT_UNLESS(ev); - auto& delayedEventsByRecipient = DelayedEvents[recipient]; - auto delayedEventIt = delayedEventsByRecipient.find(eventType); - if (delayedEventIt != delayedEventsByRecipient.end()) { - ProcessedEvents[recipient].insert(eventType); - TAutoPtr delayedEvent = - std::move(delayedEventIt->second); - delayedEventsByRecipient.erase(delayedEventIt); - - // Remove the recipient from the map if there are no more - // delayed events - if (delayedEventsByRecipient.empty()) { - DelayedEvents.erase(recipient); - } + TActorId recipient = ev->GetRecipientRewrite(); + ui32 eventType = ev->GetTypeRewrite(); - runtime.Schedule( - delayedEvent, - TDuration::MilliSeconds(100)); + bool baseFilterResult = baseFilter ? baseFilter(rt, ev) : false; + + auto prerequisiteEventIt = EventDependencies.find(eventType); + if (prerequisiteEventIt != EventDependencies.end()) { + ui32 prerequisiteEvent = prerequisiteEventIt->second; + auto& processedEventsByRecipient = ProcessedEvents[recipient]; + if (!processedEventsByRecipient.contains(prerequisiteEvent)) { + DelayedEvents[recipient][prerequisiteEvent] = std::move(ev); + return true; } + } - return baseFilterResult; - }); + auto& delayedEventsByRecipient = DelayedEvents[recipient]; + auto delayedEventIt = delayedEventsByRecipient.find(eventType); + if (delayedEventIt != delayedEventsByRecipient.end()) { + ProcessedEvents[recipient].insert(eventType); + TAutoPtr delayedEvent = + std::move(delayedEventIt->second); + delayedEventsByRecipient.erase(delayedEventIt); + + // Remove the recipient from the map if there are no more + // delayed events + if (delayedEventsByRecipient.empty()) { + DelayedEvents.erase(recipient); + } + + Runtime.Schedule(delayedEvent, TDuration::MilliSeconds(100)); + } + + return baseFilterResult; + }; } private: + TTestActorRuntimeBase& Runtime; THashMap EventDependencies; THashMap> ProcessedEvents; // Map of delayed events by recipient and event type @@ -13107,12 +13106,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // Set up event order controller to ensure that // AddUnconfirmedBlobsResponse is processed before WriteBlobResponse - TEventExecutionOrderController orderController( + TEventExecutionOrderFilter orderFilter( *runtime, TVector>{ {TEvPartitionPrivate::EvAddUnconfirmedBlobsResponse, - TEvPartitionPrivate::EvWriteBlobResponse}}, - rejectWriteBlobFilter); + TEvPartitionPrivate::EvWriteBlobResponse}}); + runtime->SetEventFilter(orderFilter(rejectWriteBlobFilter)); TPartitionClient partition(*runtime); partition.WaitReady(); @@ -13124,11 +13123,8 @@ Y_UNIT_TEST_SUITE(TPartitionTest) { auto response = partition.StatPartition(); const auto& stats = response->Record.GetStats(); - // Without proper error handling it crashes in BlobsConfirmed due to - // checksum verification, so execution never reaches this point. If - // we disable verification, we hit the `1 != 0` check — meaning we - // end up confirming an E_REJECTED blob without a checksum. - UNIT_ASSERT_VALUES_EQUAL(1, stats.GetUnconfirmedBlobCount()); + // In case of errors we just delete obsolete unconfirmed blobs + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetUnconfirmedBlobCount()); } } @@ -13201,7 +13197,7 @@ Y_UNIT_TEST_SUITE(TPartitionTest) { auto response = partition.StatPartition(); const auto& stats = response->Record.GetStats(); - UNIT_ASSERT_VALUES_EQUAL(2, stats.GetUnconfirmedBlobCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetUnconfirmedBlobCount()); } rejectWrite = false; @@ -13220,6 +13216,101 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT(!barriers.empty()); } + Y_UNIT_TEST(ShouldCleanupUnconfirmedBlobsAfterErrorOnWriteBlob) + { + auto config = DefaultConfig(); + config.SetWriteBlobThreshold(1); + config.SetAddingUnconfirmedBlobsEnabled(true); + auto runtime = PrepareTestActorRuntime( + config, + 4096, + {}, + {.MediaKind = NCloud::NProto::STORAGE_MEDIA_HYBRID}); + + bool shouldRejectDeleteUnconfirmedBlobsRequest = true; + // Create event filter for WriteBlobResponse rejection + TTestActorRuntimeBase::TEventFilter rejectionFilter = + [&](TTestActorRuntimeBase&, TAutoPtr& ev) + { + switch (ev->GetTypeRewrite()) { + case TEvPartitionPrivate::EvWriteBlobResponse: { + auto* msg = + ev->Get(); + auto& e = const_cast(msg->Error); + e.SetCode(E_REJECTED); + return false; + } + case TEvPartitionPrivate:: + EvDeleteUnconfirmedBlobsRequest: { + return shouldRejectDeleteUnconfirmedBlobsRequest; + } + }; + return false; + }; + + // Set up event order filter to ensure that + // AddUnconfirmedBlobsResponse is processed before WriteBlobResponse + TEventExecutionOrderFilter addWriteBlobOrderFilter( + *runtime, + TVector>{ + {TEvPartitionPrivate::EvAddUnconfirmedBlobsResponse, + TEvPartitionPrivate::EvWriteBlobResponse}}); + runtime->SetEventFilter(addWriteBlobOrderFilter(rejectionFilter)); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.SendWriteBlocksRequest(TBlockRange32::WithLength(10, 1), 1); + + runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetUnconfirmedBlobCount()); + } + + // Set up event order filter to ensure that + // WriteBlobResponse is processed before AddUnconfirmedBlobsResponse + TEventExecutionOrderFilter writeAddBlobOrderFilter( + *runtime, + TVector>{ + {TEvPartitionPrivate::EvWriteBlobResponse, + TEvPartitionPrivate::EvAddUnconfirmedBlobsResponse}}); + runtime->SetEventFilter(writeAddBlobOrderFilter(rejectionFilter)); + + partition.SendWriteBlocksRequest(TBlockRange32::WithLength(11, 1), 1); + + runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(2, stats.GetUnconfirmedBlobCount()); + } + + partition.RebootTablet(); + partition.WaitReady(); + + runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetUnconfirmedBlobCount()); + } + + shouldRejectDeleteUnconfirmedBlobsRequest = false; + + partition.SendWriteBlocksRequest(TBlockRange32::WithLength(12, 1), 1); + + runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetUnconfirmedBlobCount()); + } + } + Y_UNIT_TEST(ShouldSendPartitionStatistics) { auto config = DefaultConfig(); diff --git a/cloud/blockstore/libs/storage/partition/ya.make b/cloud/blockstore/libs/storage/partition/ya.make index 64223a2fe58..6872e2dd7e4 100644 --- a/cloud/blockstore/libs/storage/partition/ya.make +++ b/cloud/blockstore/libs/storage/partition/ya.make @@ -16,6 +16,7 @@ SRCS( part_actor_compactrange.cpp part_actor_confirmblobs.cpp part_actor_deletegarbage.cpp + part_actor_deleteunconfirmedblobs.cpp part_actor_describeblocks.cpp part_actor_flush.cpp part_actor_getusedblocks.cpp