Skip to content

Commit e26ec2c

Browse files
Merge pull request #18005 from Alek5andr-Kotov/LOGBROKER-9650b
Don't wait for TEvReadSetAck from non-existent tablets (#17913)
2 parents eb22692 + 842b11d commit e26ec2c

File tree

5 files changed

+133
-3
lines changed

5 files changed

+133
-3
lines changed

ydb/core/persqueue/pq_impl.cpp

+33-1
Original file line numberDiff line numberDiff line change
@@ -2893,9 +2893,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo
28932893
return;
28942894
}
28952895

2896+
if (ev->Get()->Dead) {
2897+
AckReadSetsToTablet(ev->Get()->TabletId, ctx);
2898+
return;
2899+
}
2900+
28962901
RestartPipe(ev->Get()->TabletId, ctx);
28972902
}
28982903

2904+
void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx)
2905+
{
2906+
THashSet<TDistributedTransaction*> txs;
2907+
2908+
for (ui64 txId : GetBindedTxs(tabletId)) {
2909+
auto* tx = GetTransaction(ctx, txId);
2910+
if (!tx) {
2911+
continue;
2912+
}
2913+
2914+
tx->OnReadSetAck(tabletId);
2915+
tx->UnbindMsgsFromPipe(tabletId);
2916+
2917+
txs.insert(tx);
2918+
}
2919+
2920+
if (txs.empty()) {
2921+
return;
2922+
}
2923+
2924+
for (auto* tx : txs) {
2925+
TryExecuteTxs(ctx, *tx);
2926+
}
2927+
2928+
TryWriteTxs(ctx);
2929+
}
2930+
28992931
void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
29002932
{
29012933
PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed");
@@ -2911,7 +2943,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo
29112943

29122944
void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29132945
{
2914-
for (auto& txId: GetBindedTxs(tabletId)) {
2946+
for (ui64 txId : GetBindedTxs(tabletId)) {
29152947
auto* tx = GetTransaction(ctx, txId);
29162948
if (!tx) {
29172949
continue;

ydb/core/persqueue/pq_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
560560

561561
void AddPendingEvent(IEventHandle* ev);
562562
void ProcessPendingEvents();
563+
564+
void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
563565
};
564566

565567

ydb/core/persqueue/transaction.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
284284
Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep()));
285285
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));
286286

287-
if (PredicateRecipients.contains(event.GetTabletConsumer())) {
288-
PredicateRecipients[event.GetTabletConsumer()] = true;
287+
OnReadSetAck(event.GetTabletConsumer());
288+
}
289+
290+
void TDistributedTransaction::OnReadSetAck(ui64 tabletId)
291+
{
292+
if (PredicateRecipients.contains(tabletId)) {
293+
PredicateRecipients[tabletId] = true;
289294
++PredicateAcksCount;
290295

291296
PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size());

ydb/core/persqueue/transaction.h

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct TDistributedTransaction {
3434
const TActorId& sender,
3535
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack);
3636
void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event);
37+
void OnReadSetAck(ui64 tabletId);
3738
void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event);
3839

3940
using EDecision = NKikimrTx::TReadSetData::EDecision;

ydb/core/persqueue/ut/pqtablet_ut.cpp

+90
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
245245
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
246246
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
247247

248+
void WaitForTheTransactionToBeDeleted(ui64 txId);
249+
248250
//
249251
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
250252
//
@@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
10821084
Ctx->Runtime->Send(event);
10831085
}
10841086

1087+
void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId)
1088+
{
1089+
const TString key = GetTxKey(txId);
1090+
1091+
for (size_t i = 0; i < 200; ++i) {
1092+
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
1093+
request->Record.SetCookie(12345);
1094+
auto cmd = request->Record.AddCmdReadRange();
1095+
auto range = cmd->MutableRange();
1096+
range->SetFrom(key);
1097+
range->SetIncludeFrom(true);
1098+
range->SetTo(key);
1099+
range->SetIncludeTo(true);
1100+
cmd->SetIncludeData(false);
1101+
SendToPipe(Ctx->Edge, request.release());
1102+
1103+
auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
1104+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1105+
1106+
const auto& result = response->Record.GetReadRangeResult(0);
1107+
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) {
1108+
Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300));
1109+
continue;
1110+
}
1111+
1112+
if (result.GetStatus() == NKikimrProto::NODATA) {
1113+
return;
1114+
}
1115+
}
1116+
1117+
UNIT_FAIL("Too many attempts");
1118+
}
1119+
10851120
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
10861121
{
10871122
TestParallelTransactions("consumer", "consumer");
@@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
20172052
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
20182053
}
20192054

2055+
Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
2056+
{
2057+
const ui64 txId = 67890;
2058+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2059+
2060+
// We are simulating a situation where the recipient of TEvReadSet has already completed a transaction
2061+
// and has been deleted.
2062+
//
2063+
// To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag
2064+
// instead of TEvReadSetAck.
2065+
TTestActorRuntimeBase::TEventFilter prev;
2066+
auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool {
2067+
if (auto* msg = event->CastAsLocal<TEvTxProcessing::TEvReadSet>()) {
2068+
const auto& r = msg->Record;
2069+
if (r.GetTabletSource() == Ctx->TabletId) {
2070+
runtime.Send(event->Sender,
2071+
Ctx->Edge,
2072+
new TEvTabletPipe::TEvClientConnected(mockTabletId,
2073+
NKikimrProto::ERROR,
2074+
event->Sender,
2075+
TActorId(),
2076+
true,
2077+
true, // Dead
2078+
0));
2079+
return true;
2080+
}
2081+
}
2082+
return false;
2083+
};
2084+
prev = Ctx->Runtime->SetEventFilter(filter);
2085+
2086+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
2087+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2088+
2089+
SendProposeTransactionRequest({.TxId=txId,
2090+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2091+
.TxOps={
2092+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2093+
}});
2094+
WaitProposeTransactionResponse({.TxId=txId,
2095+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
2096+
2097+
SendPlanStep({.Step=100, .TxIds={txId}});
2098+
2099+
// We are sending a TEvReadSet so that the PQ tablet can complete the transaction.
2100+
tablet->SendReadSet(*Ctx->Runtime,
2101+
{.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2102+
2103+
WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2104+
2105+
// Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction
2106+
// will switch from the WAIT_RS_AKS state to the DELETING state.
2107+
WaitForTheTransactionToBeDeleted(txId);
2108+
}
2109+
20202110
}
20212111

20222112
}

0 commit comments

Comments
 (0)