Skip to content

Commit 52f7757

Browse files
committed
[Blobstore] delete stalled uncofirmed blobs
1 parent c3df175 commit 52f7757

File tree

9 files changed

+241
-5
lines changed

9 files changed

+241
-5
lines changed

cloud/blockstore/libs/storage/core/disk_counters.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,9 @@ struct THistogramRequestCounters
397397
EPublishingPolicy::Repl,
398398
HistCounterOptions};
399399
TLowResCounter ConfirmBlobs{EPublishingPolicy::Repl, HistCounterOptions};
400+
TLowResCounter DeleteStalledUnconfirmedBlobs{
401+
EPublishingPolicy::Repl,
402+
HistCounterOptions};
400403

401404
// BlobStorage based with kind and size
402405
TLowResCounter WriteBlob{
@@ -423,6 +426,7 @@ struct THistogramRequestCounters
423426
MakeMeta<&THistogramRequestCounters::AddConfirmedBlobs>(),
424427
MakeMeta<&THistogramRequestCounters::AddUnconfirmedBlobs>(),
425428
MakeMeta<&THistogramRequestCounters::ConfirmBlobs>(),
429+
MakeMeta<&THistogramRequestCounters::DeleteStalledUnconfirmedBlobs>(),
426430

427431
MakeMeta<&THistogramRequestCounters::WriteBlob>(),
428432
MakeMeta<&THistogramRequestCounters::ReadBlob>(),

cloud/blockstore/libs/storage/partition/part_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class TPartitionActor final
225225
void EnqueueProcessWriteQueueIfNeeded(const NActors::TActorContext& ctx);
226226
void EnqueueTrimFreshLogIfNeeded(const NActors::TActorContext& ctx);
227227
void EnqueueAddConfirmedBlobsIfNeeded(const NActors::TActorContext& ctx);
228+
void EnqueueDeleteStalledUnconfirmedBlobsIfNeeded(const NActors::TActorContext& ctx);
228229

229230
void UpdateStats(const NProto::TPartitionStats& update);
230231
void UpdateActorStats(const NActors::TActorContext& ctx);
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#include "part_actor.h"
2+
3+
#include <cloud/blockstore/libs/kikimr/helpers.h>
4+
#include <cloud/blockstore/libs/service/request_helpers.h>
5+
#include <cloud/blockstore/libs/storage/core/public.h>
6+
7+
#include <cloud/storage/core/libs/common/alloc.h>
8+
#include <cloud/storage/core/libs/tablet/blob_id.h>
9+
10+
#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
11+
#include <contrib/ydb/library/actors/core/hfunc.h>
12+
13+
namespace NCloud::NBlockStore::NStorage::NPartition {
14+
15+
using namespace NActors;
16+
using namespace NKikimr;
17+
using namespace NKikimr::NTabletFlatExecutor;
18+
19+
////////////////////////////////////////////////////////////////////////////////
20+
21+
void TPartitionActor::EnqueueDeleteStalledUnconfirmedBlobsIfNeeded(
22+
const TActorContext& ctx)
23+
{
24+
if (State->GetStalledUnconfirmedBlobsState().Status !=
25+
EOperationStatus::Idle)
26+
{
27+
// already enqueued
28+
return;
29+
}
30+
31+
if (State->GetStalledUnconfirmedBlobs().empty()) {
32+
// not ready
33+
return;
34+
}
35+
36+
State->GetStalledUnconfirmedBlobsState().SetStatus(
37+
EOperationStatus::Enqueued);
38+
39+
auto request = std::make_unique<
40+
TEvPartitionPrivate::TEvDeleteStalledUnconfirmedBlobsRequest>(
41+
MakeIntrusive<TCallContext>(CreateRequestId()));
42+
43+
LOG_DEBUG(
44+
ctx,
45+
TBlockStoreComponents::PARTITION,
46+
"%s DeleteStalledUnconfirmedBlobs request sent: %lu",
47+
LogTitle.GetWithTime().c_str(),
48+
request->CallContext->RequestId);
49+
50+
NCloud::Send(ctx, SelfId(), std::move(request));
51+
}
52+
53+
void TPartitionActor::HandleDeleteStalledUnconfirmedBlobs(
54+
const TEvPartitionPrivate::TEvDeleteStalledUnconfirmedBlobsRequest::TPtr&
55+
ev,
56+
const TActorContext& ctx)
57+
{
58+
auto* msg = ev->Get();
59+
60+
auto requestInfo =
61+
CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext);
62+
63+
TRequestScope timer(*requestInfo);
64+
65+
LWTRACK(
66+
RequestReceived_Partition,
67+
requestInfo->CallContext->LWOrbit,
68+
"DeleteStalledUnconfirmedBlobs",
69+
requestInfo->CallContext->RequestId);
70+
71+
State->GetStalledUnconfirmedBlobsState().SetStatus(
72+
EOperationStatus::Started);
73+
74+
AddTransaction<TEvPartitionPrivate::TDeleteStalledUnconfirmedBlobsMethod>(
75+
*requestInfo);
76+
77+
ExecuteTx(ctx, CreateTx<TDeleteStalledUnconfirmedBlobs>(requestInfo));
78+
}
79+
80+
bool TPartitionActor::PrepareDeleteStalledUnconfirmedBlobs(
81+
const TActorContext& ctx,
82+
TTransactionContext& tx,
83+
TTxPartition::TDeleteStalledUnconfirmedBlobs& args)
84+
{
85+
Y_UNUSED(ctx);
86+
Y_UNUSED(tx);
87+
Y_UNUSED(args);
88+
89+
return true;
90+
}
91+
92+
void TPartitionActor::ExecuteDeleteStalledUnconfirmedBlobs(
93+
const TActorContext& ctx,
94+
TTransactionContext& tx,
95+
TTxPartition::TDeleteStalledUnconfirmedBlobs& args)
96+
{
97+
Y_UNUSED(ctx);
98+
Y_UNUSED(args);
99+
100+
TPartitionDatabase db(tx.DB);
101+
102+
const auto& stalledBlobs = State->GetStalledUnconfirmedBlobs();
103+
104+
for (const auto& [commitId, blobs]: stalledBlobs) {
105+
for (const auto& blob: blobs) {
106+
auto blobId = MakePartialBlobId(commitId, blob.UniqueId);
107+
db.DeleteUnconfirmedBlob(blobId);
108+
}
109+
110+
// Release barriers for this commitId
111+
State->GetGarbageQueue().ReleaseBarrier(commitId);
112+
State->GetCommitQueue().ReleaseBarrier(commitId);
113+
}
114+
115+
// Clear the stalled blobs structure
116+
State->ClearStalledUnconfirmedBlobs();
117+
}
118+
119+
void TPartitionActor::CompleteDeleteStalledUnconfirmedBlobs(
120+
const TActorContext& ctx,
121+
TTxPartition::TDeleteStalledUnconfirmedBlobs& args)
122+
{
123+
TRequestScope timer(*args.RequestInfo);
124+
125+
State->GetStalledUnconfirmedBlobsState().SetStatus(EOperationStatus::Idle);
126+
127+
RemoveTransaction(*args.RequestInfo);
128+
129+
UpdateCPUUsageStat(ctx.Now(), args.RequestInfo->GetExecCycles());
130+
auto time =
131+
CyclesToDurationSafe(args.RequestInfo->GetTotalCycles()).MicroSeconds();
132+
PartCounters->RequestCounters.DeleteStalledUnconfirmedBlobs.AddRequest(
133+
time);
134+
135+
// Barriers were released in Execute, so process commit queue
136+
ProcessCommitQueue(ctx);
137+
}
138+
139+
} // namespace NCloud::NBlockStore::NStorage::NPartition

cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,18 @@ void TPartitionActor::HandleWriteBlocksCompleted(
347347
ProfileLog->Write(std::move(record));
348348
}
349349

350-
if (msg->UnconfirmedBlobsAdded && !HasError(msg->GetError())) {
351-
// blobs are confirmed, but AddBlobs request will be executed
352-
// (for this commit) later
353-
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
350+
if (msg->UnconfirmedBlobsAdded) {
351+
if (!HasError(msg->GetError())) {
352+
// blobs are confirmed, but AddBlobs request will be executed
353+
// (for this commit) later
354+
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
355+
} else {
356+
// blobs are stalled, so they need to be deleted
357+
State->BlobsStalled(commitId, std::move(msg->BlobsToConfirm));
358+
}
354359
Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired);
355360
// commit & garbage queue barriers will be released when confirmed
356-
// blobs are added
361+
// blobs are added or when stalled blobs are deleted
357362
} else {
358363
LOG_TRACE(
359364
ctx,
@@ -387,6 +392,7 @@ void TPartitionActor::HandleWriteBlocksCompleted(
387392
ProcessCommitQueue(ctx);
388393
EnqueueFlushIfNeeded(ctx);
389394
EnqueueAddConfirmedBlobsIfNeeded(ctx);
395+
EnqueueDeleteStalledUnconfirmedBlobsIfNeeded(ctx);
390396
}
391397

392398
} // namespace NCloud::NBlockStore::NStorage::NPartition

cloud/blockstore/libs/storage/partition/part_events_private.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ using TFlushedCommitIds = TVector<TFlushedCommitId>;
179179
xxx(PatchBlob, __VA_ARGS__) \
180180
xxx(AddConfirmedBlobs, __VA_ARGS__) \
181181
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
182+
xxx(DeleteStalledUnconfirmedBlobs, __VA_ARGS__) \
182183
// BLOCKSTORE_PARTITION_REQUESTS_PRIVATE
183184

184185
////////////////////////////////////////////////////////////////////////////////
@@ -674,6 +675,18 @@ struct TEvPartitionPrivate
674675
{
675676
};
676677

678+
//
679+
// DeleteStalledUnconfirmedBlobs
680+
//
681+
682+
struct TDeleteStalledUnconfirmedBlobsRequest
683+
{
684+
};
685+
686+
struct TDeleteStalledUnconfirmedBlobsResponse
687+
{
688+
};
689+
677690
//
678691
// AddUnconfirmedBlobs
679692
//

cloud/blockstore/libs/storage/partition/part_state.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,23 @@ void TPartitionState::BlobsConfirmed(
760760
UnconfirmedBlobCount -= blobCount;
761761
}
762762

763+
void TPartitionState::BlobsStalled(ui64 commitId, TVector<TBlobToConfirm> blobs)
764+
{
765+
auto it = UnconfirmedBlobs.find(commitId);
766+
Y_DEBUG_ABORT_UNLESS(it != UnconfirmedBlobs.end());
767+
768+
auto& dstBlobs = it->second;
769+
const auto blobCount = dstBlobs.size();
770+
Y_DEBUG_ABORT_UNLESS(blobs.empty() || blobCount == blobs.size());
771+
772+
StalledUnconfirmedBlobs[commitId] = std::move(dstBlobs);
773+
StalledUnconfirmedBlobCount += blobCount;
774+
775+
UnconfirmedBlobs.erase(it);
776+
Y_DEBUG_ABORT_UNLESS(UnconfirmedBlobCount >= blobCount);
777+
UnconfirmedBlobCount -= blobCount;
778+
}
779+
763780
void TPartitionState::ConfirmBlobs(
764781
TPartitionDatabase& db,
765782
const TVector<TPartialBlobId>& unrecoverableBlobs)

cloud/blockstore/libs/storage/partition/part_state.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,10 @@ class TPartitionState
13311331
// not yet been added to the index
13321332
TCommitIdToBlobsToConfirm ConfirmedBlobs;
13331333
ui32 ConfirmedBlobCount = 0;
1334+
// contains entries from UnconfirmedBlobs that failed to write and need to
1335+
// be deleted
1336+
TCommitIdToBlobsToConfirm StalledUnconfirmedBlobs;
1337+
ui32 StalledUnconfirmedBlobCount = 0;
13341338

13351339
public:
13361340
const TCommitIdToBlobsToConfirm& GetUnconfirmedBlobs() const
@@ -1353,6 +1357,22 @@ class TPartitionState
13531357
return ConfirmedBlobCount;
13541358
}
13551359

1360+
const TCommitIdToBlobsToConfirm& GetStalledUnconfirmedBlobs() const
1361+
{
1362+
return StalledUnconfirmedBlobs;
1363+
}
1364+
1365+
ui32 GetStalledUnconfirmedBlobCount() const
1366+
{
1367+
return StalledUnconfirmedBlobCount;
1368+
}
1369+
1370+
void ClearStalledUnconfirmedBlobs()
1371+
{
1372+
StalledUnconfirmedBlobs.clear();
1373+
StalledUnconfirmedBlobCount = 0;
1374+
}
1375+
13561376
bool OverlapsUnconfirmedBlobs(
13571377
ui64 lowCommitId,
13581378
ui64 highCommitId,
@@ -1374,6 +1394,8 @@ class TPartitionState
13741394

13751395
void BlobsConfirmed(ui64 commitId, TVector<TBlobToConfirm> blobs);
13761396

1397+
void BlobsStalled(ui64 commitId, TVector<TBlobToConfirm> blobs);
1398+
13771399
//
13781400
// WriteBlob
13791401
//
@@ -1400,6 +1422,19 @@ class TPartitionState
14001422
return AddConfirmedBlobsState;
14011423
}
14021424

1425+
//
1426+
// StalledUnconfirmedBlobs
1427+
//
1428+
1429+
private:
1430+
TOperationState StalledUnconfirmedBlobsState;
1431+
1432+
public:
1433+
TOperationState& GetStalledUnconfirmedBlobsState()
1434+
{
1435+
return StalledUnconfirmedBlobsState;
1436+
}
1437+
14031438
//
14041439
// ConfirmBlobs
14051440
//

cloud/blockstore/libs/storage/partition/part_tx.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
6262
xxx(ScanDiskBatch, __VA_ARGS__) \
6363
xxx(AddUnconfirmedBlobs, __VA_ARGS__) \
6464
xxx(ConfirmBlobs, __VA_ARGS__) \
65+
xxx(DeleteStalledUnconfirmedBlobs, __VA_ARGS__) \
6566
xxx(LoadCompactionMapChunk, __VA_ARGS__) \
6667
// BLOCKSTORE_PARTITION_TRANSACTIONS
6768

@@ -678,6 +679,25 @@ struct TTxPartition
678679
}
679680
};
680681

682+
//
683+
// DeleteStalledUnconfirmedBlobs
684+
//
685+
686+
struct TDeleteStalledUnconfirmedBlobs
687+
{
688+
const TRequestInfoPtr RequestInfo;
689+
690+
TDeleteStalledUnconfirmedBlobs(
691+
TRequestInfoPtr requestInfo)
692+
: RequestInfo(std::move(requestInfo))
693+
{}
694+
695+
void Clear()
696+
{
697+
// nothing to do
698+
}
699+
};
700+
681701
//
682702
// CreateCheckpoint
683703
//

cloud/blockstore/libs/storage/partition/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ SRCS(
3434
part_actor_patchblob.cpp
3535
part_actor_readblob.cpp
3636
part_actor_readblocks.cpp
37+
part_actor_deletestalledunconfirmedblobs.cpp
3738
part_actor_scan_disk.cpp
3839
part_actor_statpartition.cpp
3940
part_actor_stats.cpp

0 commit comments

Comments
 (0)