Skip to content

Commit 7e9b044

Browse files
Merge pull request #18007 from Alek5andr-Kotov/LOGBROKER-9650d
Don't wait for TEvReadSetAck from non-existent tablets (#17913)
2 parents 54dc8dc + a5bdb5a commit 7e9b044

File tree

5 files changed

+133
-3
lines changed

5 files changed

+133
-3
lines changed

ydb/core/persqueue/pq_impl.cpp

+33-1
Original file line numberDiff line numberDiff line change
@@ -2902,9 +2902,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo
29022902
return;
29032903
}
29042904

2905+
if (ev->Get()->Dead) {
2906+
AckReadSetsToTablet(ev->Get()->TabletId, ctx);
2907+
return;
2908+
}
2909+
29052910
RestartPipe(ev->Get()->TabletId, ctx);
29062911
}
29072912

2913+
void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx)
2914+
{
2915+
THashSet<TDistributedTransaction*> txs;
2916+
2917+
for (ui64 txId : GetBindedTxs(tabletId)) {
2918+
auto* tx = GetTransaction(ctx, txId);
2919+
if (!tx) {
2920+
continue;
2921+
}
2922+
2923+
tx->OnReadSetAck(tabletId);
2924+
tx->UnbindMsgsFromPipe(tabletId);
2925+
2926+
txs.insert(tx);
2927+
}
2928+
2929+
if (txs.empty()) {
2930+
return;
2931+
}
2932+
2933+
for (auto* tx : txs) {
2934+
TryExecuteTxs(ctx, *tx);
2935+
}
2936+
2937+
TryWriteTxs(ctx);
2938+
}
2939+
29082940
void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
29092941
{
29102942
PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed");
@@ -2917,7 +2949,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo
29172949

29182950
void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29192951
{
2920-
for (auto& txId: GetBindedTxs(tabletId)) {
2952+
for (ui64 txId : GetBindedTxs(tabletId)) {
29212953
auto* tx = GetTransaction(ctx, txId);
29222954
if (!tx) {
29232955
continue;

ydb/core/persqueue/pq_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
566566

567567
void AddPendingEvent(IEventHandle* ev);
568568
void ProcessPendingEvents();
569+
570+
void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
569571
};
570572

571573

ydb/core/persqueue/transaction.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
312312
Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep()));
313313
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));
314314

315-
if (PredicateRecipients.contains(event.GetTabletConsumer())) {
316-
PredicateRecipients[event.GetTabletConsumer()] = true;
315+
OnReadSetAck(event.GetTabletConsumer());
316+
}
317+
318+
void TDistributedTransaction::OnReadSetAck(ui64 tabletId)
319+
{
320+
if (PredicateRecipients.contains(tabletId)) {
321+
PredicateRecipients[tabletId] = true;
317322
++PredicateAcksCount;
318323

319324
PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size());

ydb/core/persqueue/transaction.h

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct TDistributedTransaction {
3434
const TActorId& sender,
3535
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack);
3636
void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event);
37+
void OnReadSetAck(ui64 tabletId);
3738
void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event);
3839

3940
using EDecision = NKikimrTx::TReadSetData::EDecision;

ydb/core/persqueue/ut/pqtablet_ut.cpp

+90
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
245245
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
246246
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
247247

248+
void WaitForTheTransactionToBeDeleted(ui64 txId);
249+
248250
//
249251
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
250252
//
@@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
10821084
Ctx->Runtime->Send(event);
10831085
}
10841086

1087+
void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId)
1088+
{
1089+
const TString key = GetTxKey(txId);
1090+
1091+
for (size_t i = 0; i < 200; ++i) {
1092+
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
1093+
request->Record.SetCookie(12345);
1094+
auto cmd = request->Record.AddCmdReadRange();
1095+
auto range = cmd->MutableRange();
1096+
range->SetFrom(key);
1097+
range->SetIncludeFrom(true);
1098+
range->SetTo(key);
1099+
range->SetIncludeTo(true);
1100+
cmd->SetIncludeData(false);
1101+
SendToPipe(Ctx->Edge, request.release());
1102+
1103+
auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
1104+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1105+
1106+
const auto& result = response->Record.GetReadRangeResult(0);
1107+
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) {
1108+
Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300));
1109+
continue;
1110+
}
1111+
1112+
if (result.GetStatus() == NKikimrProto::NODATA) {
1113+
return;
1114+
}
1115+
}
1116+
1117+
UNIT_FAIL("Too many attempts");
1118+
}
1119+
10851120
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
10861121
{
10871122
TestParallelTransactions("consumer", "consumer");
@@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
20172052
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
20182053
}
20192054

2055+
Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
2056+
{
2057+
const ui64 txId = 67890;
2058+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2059+
2060+
// We are simulating a situation where the recipient of TEvReadSet has already completed a transaction
2061+
// and has been deleted.
2062+
//
2063+
// To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag
2064+
// instead of TEvReadSetAck.
2065+
TTestActorRuntimeBase::TEventFilter prev;
2066+
auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool {
2067+
if (auto* msg = event->CastAsLocal<TEvTxProcessing::TEvReadSet>()) {
2068+
const auto& r = msg->Record;
2069+
if (r.GetTabletSource() == Ctx->TabletId) {
2070+
runtime.Send(event->Sender,
2071+
Ctx->Edge,
2072+
new TEvTabletPipe::TEvClientConnected(mockTabletId,
2073+
NKikimrProto::ERROR,
2074+
event->Sender,
2075+
TActorId(),
2076+
true,
2077+
true, // Dead
2078+
0));
2079+
return true;
2080+
}
2081+
}
2082+
return false;
2083+
};
2084+
prev = Ctx->Runtime->SetEventFilter(filter);
2085+
2086+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
2087+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2088+
2089+
SendProposeTransactionRequest({.TxId=txId,
2090+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2091+
.TxOps={
2092+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2093+
}});
2094+
WaitProposeTransactionResponse({.TxId=txId,
2095+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
2096+
2097+
SendPlanStep({.Step=100, .TxIds={txId}});
2098+
2099+
// We are sending a TEvReadSet so that the PQ tablet can complete the transaction.
2100+
tablet->SendReadSet(*Ctx->Runtime,
2101+
{.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2102+
2103+
WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2104+
2105+
// Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction
2106+
// will switch from the WAIT_RS_AKS state to the DELETING state.
2107+
WaitForTheTransactionToBeDeleted(txId);
2108+
}
2109+
20202110
}
20212111

20222112
}

0 commit comments

Comments
 (0)