Skip to content

Don't wait for TEvReadSetAck from non-existent tablets (#17913) #18005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2893,9 +2893,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo
return;
}

if (ev->Get()->Dead) {
AckReadSetsToTablet(ev->Get()->TabletId, ctx);
return;
}

RestartPipe(ev->Get()->TabletId, ctx);
}

void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx)
{
THashSet<TDistributedTransaction*> txs;

for (ui64 txId : GetBindedTxs(tabletId)) {
auto* tx = GetTransaction(ctx, txId);
if (!tx) {
continue;
}

tx->OnReadSetAck(tabletId);
tx->UnbindMsgsFromPipe(tabletId);

txs.insert(tx);
}

if (txs.empty()) {
return;
}

for (auto* tx : txs) {
TryExecuteTxs(ctx, *tx);
}

TryWriteTxs(ctx);
}

void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed");
Expand All @@ -2911,7 +2943,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo

void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
{
for (auto& txId: GetBindedTxs(tabletId)) {
for (ui64 txId : GetBindedTxs(tabletId)) {
auto* tx = GetTransaction(ctx, txId);
if (!tx) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

void AddPendingEvent(IEventHandle* ev);
void ProcessPendingEvents();

void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
};


Expand Down
9 changes: 7 additions & 2 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep()));
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));

if (PredicateRecipients.contains(event.GetTabletConsumer())) {
PredicateRecipients[event.GetTabletConsumer()] = true;
OnReadSetAck(event.GetTabletConsumer());
}

void TDistributedTransaction::OnReadSetAck(ui64 tabletId)
{
if (PredicateRecipients.contains(tabletId)) {
PredicateRecipients[tabletId] = true;
++PredicateAcksCount;

PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct TDistributedTransaction {
const TActorId& sender,
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack);
void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event);
void OnReadSetAck(ui64 tabletId);
void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event);

using EDecision = NKikimrTx::TReadSetData::EDecision;
Expand Down
90 changes: 90 additions & 0 deletions ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
void SendSaveTxState(TAutoPtr<IEventHandle>& event);

void WaitForTheTransactionToBeDeleted(ui64 txId);

//
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
//
Expand Down Expand Up @@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
Ctx->Runtime->Send(event);
}

void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId)
{
const TString key = GetTxKey(txId);

for (size_t i = 0; i < 200; ++i) {
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
request->Record.SetCookie(12345);
auto cmd = request->Record.AddCmdReadRange();
auto range = cmd->MutableRange();
range->SetFrom(key);
range->SetIncludeFrom(true);
range->SetTo(key);
range->SetIncludeTo(true);
cmd->SetIncludeData(false);
SendToPipe(Ctx->Edge, request.release());

auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);

const auto& result = response->Record.GetReadRangeResult(0);
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) {
Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300));
continue;
}

if (result.GetStatus() == NKikimrProto::NODATA) {
return;
}
}

UNIT_FAIL("Too many attempts");
}

Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
{
TestParallelTransactions("consumer", "consumer");
Expand Down Expand Up @@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
}

Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
{
const ui64 txId = 67890;
const ui64 mockTabletId = MakeTabletID(false, 22222);

// We are simulating a situation where the recipient of TEvReadSet has already completed a transaction
// and has been deleted.
//
// To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag
// instead of TEvReadSetAck.
TTestActorRuntimeBase::TEventFilter prev;
auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool {
if (auto* msg = event->CastAsLocal<TEvTxProcessing::TEvReadSet>()) {
const auto& r = msg->Record;
if (r.GetTabletSource() == Ctx->TabletId) {
runtime.Send(event->Sender,
Ctx->Edge,
new TEvTabletPipe::TEvClientConnected(mockTabletId,
NKikimrProto::ERROR,
event->Sender,
TActorId(),
true,
true, // Dead
0));
return true;
}
}
return false;
};
prev = Ctx->Runtime->SetEventFilter(filter);

NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
PQTabletPrepare({.partitions=1}, {}, *Ctx);

SendProposeTransactionRequest({.TxId=txId,
.Senders={mockTabletId}, .Receivers={mockTabletId},
.TxOps={
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});

SendPlanStep({.Step=100, .TxIds={txId}});

// We are sending a TEvReadSet so that the PQ tablet can complete the transaction.
tablet->SendReadSet(*Ctx->Runtime,
{.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});

WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});

// Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction
// will switch from the WAIT_RS_AKS state to the DELETING state.
WaitForTheTransactionToBeDeleted(txId);
}

}

}
Loading