Skip to content

Commit 54dc8dc

Browse files
The PQ tablet loses its TEvReadSet (#17842) (#17852)
1 parent c6401b7 commit 54dc8dc

File tree

3 files changed

+156
-18
lines changed

3 files changed

+156
-18
lines changed

ydb/core/persqueue/pq_impl.cpp

+52-10
Original file line numberDiff line numberDiff line change
@@ -3139,6 +3139,11 @@ void TPersQueue::SetTxInFlyCounter()
31393139

31403140
void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
31413141
{
3142+
if (!InitCompleted) {
3143+
AddPendingEvent(ev.Release());
3144+
return;
3145+
}
3146+
31423147
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
31433148

31443149
NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
@@ -3155,6 +3160,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
31553160

31563161
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
31573162
{
3163+
if (!InitCompleted) {
3164+
AddPendingEvent(ev.Release());
3165+
return;
3166+
}
3167+
31583168
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
31593169
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());
31603170

@@ -3334,6 +3344,11 @@ void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransa
33343344

33353345
void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx)
33363346
{
3347+
if (!InitCompleted) {
3348+
AddPendingEvent(ev.Release());
3349+
return;
3350+
}
3351+
33373352
PQ_LOG_D("Handle TEvTxProcessing::TEvPlanStep " << ev->Get()->Record.ShortDebugString());
33383353

33393354
EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release());
@@ -3343,6 +3358,11 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont
33433358

33443359
void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx)
33453360
{
3361+
if (!InitCompleted) {
3362+
AddPendingEvent(ev.Release());
3363+
return;
3364+
}
3365+
33463366
PQ_LOG_D("Handle TEvTxProcessing::TEvReadSet " << ev->Get()->Record.ShortDebugString());
33473367

33483368
NKikimrTx::TEvReadSet& event = ev->Get()->Record;
@@ -3354,7 +3374,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
33543374
}
33553375

33563376
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) {
3357-
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
3377+
if ((tx->State > NKikimrPQ::TTransaction::EXECUTED) ||
3378+
((tx->State == NKikimrPQ::TTransaction::EXECUTED) && !tx->WriteInProgress)) {
33583379
if (ack) {
33593380
PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer());
33603381
ctx.Send(ev->Sender, ack.release());
@@ -3731,15 +3752,17 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
37313752
void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37323753
NKikimrClient::TKeyValueRequest& request)
37333754
{
3734-
Y_ABORT_UNLESS(!WriteTxsInProgress);
3755+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37353756

37363757
for (auto& [txId, state] : WriteTxs) {
3758+
// There may be cases when in one iteration of a record we change the state of a transaction and delete it
37373759
auto tx = GetTransaction(ctx, txId);
3738-
Y_ABORT_UNLESS(tx);
3739-
3740-
tx->AddCmdWrite(request, state);
3760+
if (tx) {
3761+
PQ_LOG_D("write key for TxId " << txId);
3762+
tx->AddCmdWrite(request, state);
37413763

3742-
ChangedTxs.emplace(tx->Step, txId);
3764+
ChangedTxs.emplace(tx->Step, txId);
3765+
}
37433766
}
37443767

37453768
WriteTxs.clear();
@@ -3748,9 +3771,7 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37483771
void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
37493772
NKikimrClient::TKeyValueRequest& request)
37503773
{
3751-
Y_ABORT_UNLESS(!WriteTxsInProgress,
3752-
"PQ %" PRIu64,
3753-
TabletID());
3774+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37543775

37553776
for (ui64 txId : DeleteTxs) {
37563777
PQ_LOG_D("delete key for TxId " << txId);
@@ -4858,8 +4879,9 @@ void TPersQueue::DeleteSupportivePartitions(const TActorContext& ctx)
48584879
void TPersQueue::OnInitComplete(const TActorContext& ctx)
48594880
{
48604881
SignalTabletActive(ctx);
4861-
TryStartTransaction(ctx);
48624882
InitCompleted = true;
4883+
ProcessPendingEvents();
4884+
TryStartTransaction(ctx);
48634885
}
48644886

48654887
ui64 TPersQueue::GetAllowedStep() const
@@ -4895,6 +4917,11 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
48954917

48964918
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx)
48974919
{
4920+
if (!InitCompleted) {
4921+
AddPendingEvent(ev.Release());
4922+
return;
4923+
}
4924+
48984925
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransactionAttach " << ev->Get()->Record.ShortDebugString());
48994926

49004927
const ui64 txId = ev->Get()->Record.GetTxId();
@@ -5105,6 +5132,21 @@ ui64 TPersQueue::GetGeneration() {
51055132
return *TabletGeneration;
51065133
}
51075134

5135+
void TPersQueue::AddPendingEvent(IEventHandle* ev)
5136+
{
5137+
PendingEvents.emplace_back(ev);
5138+
}
5139+
5140+
void TPersQueue::ProcessPendingEvents()
5141+
{
5142+
auto events = std::move(PendingEvents);
5143+
PendingEvents.clear();
5144+
5145+
for (auto& ev : events) {
5146+
HandleHook(ev);
5147+
}
5148+
}
5149+
51085150
bool TPersQueue::HandleHook(STFUNC_SIG)
51095151
{
51105152
SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR);

ydb/core/persqueue/pq_impl.h

+5
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
561561
void ResendEvReadSetToReceiversForState(const TActorContext& ctx, NKikimrPQ::TTransaction::EState state);
562562

563563
void DeleteSupportivePartitions(const TActorContext& ctx);
564+
565+
TDeque<TAutoPtr<IEventHandle>> PendingEvents;
566+
567+
void AddPendingEvent(IEventHandle* ev);
568+
void ProcessPendingEvents();
564569
};
565570

566571

ydb/core/persqueue/ut/pqtablet_ut.cpp

+99-8
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
197197

198198
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
199199
void SendReadSetAck(NHelpers::TPQTabletMock& tablet);
200+
void WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet);
200201

201202
void SendDropTablet(const TDropTabletParams& params);
202203
void WaitDropTabletReply(const TDropTabletReplyMatcher& matcher);
@@ -206,7 +207,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
206207

207208
void SendCancelTransactionProposal(const TCancelTransactionProposalParams& params);
208209

209-
void StartPQWriteTxsObserver();
210+
void StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* ev = nullptr);
210211
void WaitForPQWriteTxs();
211212

212213
template <class T> void WaitForEvent(size_t count);
@@ -215,7 +216,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
215216

216217
void TestWaitingForTEvReadSet(size_t senders, size_t receivers);
217218

218-
void StartPQWriteObserver(bool& flag, unsigned cookie);
219+
void StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev = nullptr);
219220
void WaitForPQWriteComplete(bool& flag);
220221

221222
bool FoundPQWriteState = false;
@@ -241,6 +242,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
241242
void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state);
242243
void WaitForExecStep(ui64 step);
243244

245+
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
246+
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
247+
244248
//
245249
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
246250
//
@@ -522,6 +526,17 @@ void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TRe
522526
}
523527
}
524528

529+
void TPQTabletFixture::WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet)
530+
{
531+
TDispatchOptions options;
532+
options.CustomFinalCondition = [&]() {
533+
return tablet.ReadSetAck.Defined();
534+
};
535+
Ctx->Runtime->DispatchEvents(options, TDuration::Seconds(2));
536+
537+
UNIT_ASSERT(!tablet.ReadSetAck.Defined());
538+
}
539+
525540
void TPQTabletFixture::SendDropTablet(const TDropTabletParams& params)
526541
{
527542
auto event = MakeHolder<TEvPersQueue::TEvDropTablet>();
@@ -745,18 +760,21 @@ void TPQTabletFixture::WaitWriteResponse(const TWriteResponseMatcher& matcher)
745760
Ctx->Runtime->SetObserverFunc(prev);
746761
}
747762

748-
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie)
763+
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev)
749764
{
750765
flag = false;
751766

752-
auto observer = [&flag, cookie](TAutoPtr<IEventHandle>& event) {
767+
auto observer = [&flag, cookie, ev](TAutoPtr<IEventHandle>& event) {
753768
if (auto* kvResponse = event->CastAsLocal<TEvKeyValue::TEvResponse>()) {
754-
if (kvResponse->Record.HasCookie()) {
755-
}
756769
if ((event->Sender == event->Recipient) &&
757770
kvResponse->Record.HasCookie() &&
758771
(kvResponse->Record.GetCookie() == cookie)) {
759772
flag = true;
773+
774+
if (ev) {
775+
*ev = event;
776+
return TTestActorRuntimeBase::EEventAction::DROP;
777+
}
760778
}
761779
}
762780

@@ -793,9 +811,9 @@ void TPQTabletFixture::SendCancelTransactionProposal(const TCancelTransactionPro
793811
event.Release());
794812
}
795813

796-
void TPQTabletFixture::StartPQWriteTxsObserver()
814+
void TPQTabletFixture::StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* event)
797815
{
798-
StartPQWriteObserver(FoundPQWriteTxs, 5); // TPersQueue::WRITE_TX_COOKIE
816+
StartPQWriteObserver(FoundPQWriteTxs, 5, event); // TPersQueue::WRITE_TX_COOKIE
799817
}
800818

801819
void TPQTabletFixture::WaitForPQWriteTxs()
@@ -1030,6 +1048,40 @@ void TPQTabletFixture::WaitForExecStep(ui64 step)
10301048
UNIT_FAIL("expected execution step " << step);
10311049
}
10321050

1051+
void TPQTabletFixture::InterceptSaveTxState(TAutoPtr<IEventHandle>& ev)
1052+
{
1053+
bool found = false;
1054+
1055+
TTestActorRuntimeBase::TEventFilter prev;
1056+
auto filter = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) -> bool {
1057+
if (auto* msg = event->CastAsLocal<TEvKeyValue::TEvRequest>()) {
1058+
if (msg->Record.HasCookie() && (msg->Record.GetCookie() == 5)) { // WRITE_TX_COOKIE
1059+
ev = event;
1060+
found = true;
1061+
return true;
1062+
}
1063+
}
1064+
1065+
return false;
1066+
};
1067+
prev = Ctx->Runtime->SetEventFilter(filter);
1068+
1069+
TDispatchOptions options;
1070+
options.CustomFinalCondition = [&found]() {
1071+
return found;
1072+
};
1073+
1074+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1075+
UNIT_ASSERT(found);
1076+
1077+
Ctx->Runtime->SetEventFilter(prev);
1078+
}
1079+
1080+
void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
1081+
{
1082+
Ctx->Runtime->Send(event);
1083+
}
1084+
10331085
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
10341086
{
10351087
TestParallelTransactions("consumer", "consumer");
@@ -1926,6 +1978,45 @@ Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In
19261978
WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2});
19271979
}
19281980

1981+
Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
1982+
{
1983+
const ui64 txId = 67890;
1984+
const ui64 mockTabletId = 22222;
1985+
1986+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
1987+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
1988+
1989+
SendProposeTransactionRequest({.TxId=txId,
1990+
.Senders={mockTabletId}, .Receivers={mockTabletId},
1991+
.TxOps={
1992+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
1993+
}});
1994+
WaitProposeTransactionResponse({.TxId=txId,
1995+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1996+
1997+
SendPlanStep({.Step=100, .TxIds={txId}});
1998+
1999+
WaitForCalcPredicateResult();
2000+
2001+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2002+
2003+
//WaitProposeTransactionResponse({.TxId=txId,
2004+
// .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2005+
2006+
TAutoPtr<IEventHandle> kvRequest;
2007+
InterceptSaveTxState(kvRequest);
2008+
2009+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2010+
2011+
WaitForNoReadSetAck(*tablet);
2012+
2013+
SendSaveTxState(kvRequest);
2014+
2015+
WaitForTxState(txId, NKikimrPQ::TTransaction::EXECUTED);
2016+
2017+
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
2018+
}
2019+
19292020
}
19302021

19312022
}

0 commit comments

Comments
 (0)