From 842b11d44b2507ed42cd758436fa6cc4886a0058 Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov Date: Mon, 5 May 2025 12:32:53 +0300 Subject: [PATCH] Don't wait for TEvReadSetAck from non-existent tablets (#17913) --- ydb/core/persqueue/pq_impl.cpp | 34 +++++++++- ydb/core/persqueue/pq_impl.h | 2 + ydb/core/persqueue/transaction.cpp | 9 ++- ydb/core/persqueue/transaction.h | 1 + ydb/core/persqueue/ut/pqtablet_ut.cpp | 90 +++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 3 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 67114c133bb1..b9f5365c2add 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -2893,9 +2893,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo return; } + if (ev->Get()->Dead) { + AckReadSetsToTablet(ev->Get()->TabletId, ctx); + return; + } + RestartPipe(ev->Get()->TabletId, ctx); } +void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx) +{ + THashSet txs; + + for (ui64 txId : GetBindedTxs(tabletId)) { + auto* tx = GetTransaction(ctx, txId); + if (!tx) { + continue; + } + + tx->OnReadSetAck(tabletId); + tx->UnbindMsgsFromPipe(tabletId); + + txs.insert(tx); + } + + if (txs.empty()) { + return; + } + + for (auto* tx : txs) { + TryExecuteTxs(ctx, *tx); + } + + TryWriteTxs(ctx); +} + void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed"); @@ -2911,7 +2943,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) { - for (auto& txId: GetBindedTxs(tabletId)) { + for (ui64 txId : GetBindedTxs(tabletId)) { auto* tx = GetTransaction(ctx, txId); if (!tx) { continue; diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index b3d4559d79a1..2a03f48abab0 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -560,6 +560,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void AddPendingEvent(IEventHandle* ev); void ProcessPendingEvents(); + + void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 7bdc7e891d01..af1ce5acfa7a 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -284,8 +284,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep())); Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId())); - if (PredicateRecipients.contains(event.GetTabletConsumer())) { - PredicateRecipients[event.GetTabletConsumer()] = true; + OnReadSetAck(event.GetTabletConsumer()); +} + +void TDistributedTransaction::OnReadSetAck(ui64 tabletId) +{ + if (PredicateRecipients.contains(tabletId)) { + PredicateRecipients[tabletId] = true; ++PredicateAcksCount; PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size()); diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index d3cf152c9c24..a34baa75ddd1 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -34,6 +34,7 @@ struct TDistributedTransaction { const TActorId& sender, std::unique_ptr ack); void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event); + void OnReadSetAck(ui64 tabletId); void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event); using EDecision = NKikimrTx::TReadSetData::EDecision; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 3769ad2a2390..3543b308214d 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -245,6 +245,8 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture { void InterceptSaveTxState(TAutoPtr& event); void SendSaveTxState(TAutoPtr& event); + void WaitForTheTransactionToBeDeleted(ui64 txId); + // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait // @@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr& event) Ctx->Runtime->Send(event); } +void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId) +{ + const TString key = GetTxKey(txId); + + for (size_t i = 0; i < 200; ++i) { + auto request = std::make_unique(); + request->Record.SetCookie(12345); + auto cmd = request->Record.AddCmdReadRange(); + auto range = cmd->MutableRange(); + range->SetFrom(key); + range->SetIncludeFrom(true); + range->SetTo(key); + range->SetIncludeTo(true); + cmd->SetIncludeData(false); + SendToPipe(Ctx->Edge, request.release()); + + auto response = Ctx->Runtime->GrabEdgeEvent(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + + const auto& result = response->Record.GetReadRangeResult(0); + if (result.GetStatus() == static_cast(NKikimrProto::OK)) { + Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300)); + continue; + } + + if (result.GetStatus() == NKikimrProto::NODATA) { + return; + } + } + + UNIT_FAIL("Too many attempts"); +} + Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture) { TestParallelTransactions("consumer", "consumer"); @@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture) WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); } +Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture) +{ + const ui64 txId = 67890; + const ui64 mockTabletId = MakeTabletID(false, 22222); + + // We are simulating a situation where the recipient of TEvReadSet has already completed a transaction + // and has been deleted. + // + // To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag + // instead of TEvReadSetAck. + TTestActorRuntimeBase::TEventFilter prev; + auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr& event) -> bool { + if (auto* msg = event->CastAsLocal()) { + const auto& r = msg->Record; + if (r.GetTabletSource() == Ctx->TabletId) { + runtime.Send(event->Sender, + Ctx->Edge, + new TEvTabletPipe::TEvClientConnected(mockTabletId, + NKikimrProto::ERROR, + event->Sender, + TActorId(), + true, + true, // Dead + 0)); + return true; + } + } + return false; + }; + prev = Ctx->Runtime->SetEventFilter(filter); + + NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId); + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + SendProposeTransactionRequest({.TxId=txId, + .Senders={mockTabletId}, .Receivers={mockTabletId}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId}}); + + // We are sending a TEvReadSet so that the PQ tablet can complete the transaction. + tablet->SendReadSet(*Ctx->Runtime, + {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + + WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + // Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction + // will switch from the WAIT_RS_AKS state to the DELETING state. + WaitForTheTransactionToBeDeleted(txId); +} + } }