diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index 8e0975372f54..633cbc9c9752 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -11,7 +11,7 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { - bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event) { const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2); if (task.GetInternalFreeAmount() < amount) { return false; @@ -48,8 +48,6 @@ namespace NActors { // append them to the packet task.Write(&part, sizeof(part)); task.Write(&descr, sizeof(descr)); - - *weightConsumed += amount; OutputQueueSize -= sizeof(TEventDescr2); Metrics->UpdateOutputChannelEvents(ChannelId); @@ -63,7 +61,7 @@ namespace NActors { } } - bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed) { + bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial) { for (;;) { Y_ABORT_UNLESS(!Queue.empty()); TEventHolder& event = Queue.front(); @@ -101,7 +99,7 @@ namespace NActors { break; case EState::BODY: - if (FeedPayload(task, event, weightConsumed)) { + if (FeedPayload(task, event)) { State = EState::DESCRIPTOR; } else { return false; @@ -109,7 +107,7 @@ namespace NActors { break; case EState::DESCRIPTOR: - if (!FeedDescriptor(task, event, weightConsumed)) { + if (!FeedDescriptor(task, event)) { return false; } event.Serial = serial; @@ -230,7 +228,7 @@ namespace NActors { return complete; } - bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event) { for (;;) { // calculate inline or external part size (it may cover a few sections, not just single one) while (!PartLenRemain) { @@ -255,8 +253,8 @@ namespace NActors { // serialize bytes const auto complete = IsPartInline - ? FeedInlinePayload(task, event, weightConsumed) - : FeedExternalPayload(task, event, weightConsumed); + ? FeedInlinePayload(task, event) + : FeedExternalPayload(task, event); if (!complete) { // no space to serialize return false; } else if (*complete) { // event serialized @@ -265,7 +263,7 @@ namespace NActors { } } - std::optional TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event) { if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) { return std::nullopt; } @@ -284,13 +282,12 @@ namespace NActors { }; task.WriteBookmark(std::move(partBookmark), &part, sizeof(part)); - *weightConsumed += sizeof(TChannelPart) + part.Size; OutputQueueSize -= part.Size; return complete; } - std::optional TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) { const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) { return std::nullopt; @@ -330,8 +327,6 @@ namespace NActors { } task.WriteBookmark(std::move(partBookmark), buffer, partSize); - - *weightConsumed += partSize + bytesSerialized; OutputQueueSize -= bytesSerialized; return complete; diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index d038c727d903..a70a48bfaca5 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -87,7 +87,7 @@ namespace NActors { void DropConfirmed(ui64 confirm); - bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); + bool FeedBuf(TTcpPacketOutTask& task, ui64 serial); bool IsEmpty() const { return Queue.empty(); @@ -142,11 +142,11 @@ namespace NActors { template bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); - bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - std::optional FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - std::optional FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event); + std::optional FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event); + std::optional FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event); - bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event); void AccountTraffic() { if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index ce43da1ff760..12f0f542c320 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -953,8 +953,11 @@ namespace NActors { // generate some data within this channel const ui64 netBefore = channel->GetBufferedAmountOfData(); - ui64 gross = 0; - const bool eventDone = channel->FeedBuf(task, serial, &gross); + const ui32 grossBefore = task.GetDataSize(); + const bool eventDone = channel->FeedBuf(task, serial); + const ui32 grossAfter = task.GetDataSize(); + Y_DEBUG_ABORT_UNLESS(grossBefore <= grossAfter); + const ui32 gross = grossAfter - grossBefore; channel->UnaccountedTraffic += gross; const ui64 netAfter = channel->GetBufferedAmountOfData(); Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink @@ -964,7 +967,7 @@ namespace NActors { TotalOutputQueueSize -= net; Proxy->Metrics->SubOutputBuffersTotalSize(net); bytesGenerated += gross; - Y_DEBUG_ABORT_UNLESS(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross); + Y_DEBUG_ABORT_UNLESS(gross || !net, "net# %" PRIu64 " gross# %" PRIu32, net, gross); // return it back to queue or delete, depending on whether this channel is still working or not ChannelScheduler->FinishPick(gross, EqualizeCounter); diff --git a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp index 2d9b0db26374..9a459b1675f7 100644 --- a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -59,8 +59,9 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { while (numEvents) { TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight(); ui32 before = task.GetDataSize(); - ui64 weightConsumed = 0; - numEvents -= channel->FeedBuf(task, 0, &weightConsumed); + ui64 weightConsumed = task.GetDataSize(); + numEvents -= channel->FeedBuf(task, 0); + weightConsumed = task.GetDataSize() - weightConsumed; ui32 after = task.GetDataSize(); Y_ABORT_UNLESS(after >= before); scheduler.FinishPick(weightConsumed, 0);