Skip to content

Commit a6070b7

Browse files
authored
Fix weight consumption problem with XDC (#16026)
1 parent a730758 commit a6070b7

File tree

4 files changed

+23
-24
lines changed

4 files changed

+23
-24
lines changed

Diff for: ydb/library/actors/interconnect/interconnect_channel.cpp

+9-14
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
LWTRACE_USING(ACTORLIB_PROVIDER);
1212

1313
namespace NActors {
14-
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
14+
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event) {
1515
const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2);
1616
if (task.GetInternalFreeAmount() < amount) {
1717
return false;
@@ -48,8 +48,6 @@ namespace NActors {
4848
// append them to the packet
4949
task.Write<false>(&part, sizeof(part));
5050
task.Write<false>(&descr, sizeof(descr));
51-
52-
*weightConsumed += amount;
5351
OutputQueueSize -= sizeof(TEventDescr2);
5452
Metrics->UpdateOutputChannelEvents(ChannelId);
5553

@@ -63,7 +61,7 @@ namespace NActors {
6361
}
6462
}
6563

66-
bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed) {
64+
bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial) {
6765
for (;;) {
6866
Y_ABORT_UNLESS(!Queue.empty());
6967
TEventHolder& event = Queue.front();
@@ -101,15 +99,15 @@ namespace NActors {
10199
break;
102100

103101
case EState::BODY:
104-
if (FeedPayload(task, event, weightConsumed)) {
102+
if (FeedPayload(task, event)) {
105103
State = EState::DESCRIPTOR;
106104
} else {
107105
return false;
108106
}
109107
break;
110108

111109
case EState::DESCRIPTOR:
112-
if (!FeedDescriptor(task, event, weightConsumed)) {
110+
if (!FeedDescriptor(task, event)) {
113111
return false;
114112
}
115113
event.Serial = serial;
@@ -230,7 +228,7 @@ namespace NActors {
230228
return complete;
231229
}
232230

233-
bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
231+
bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event) {
234232
for (;;) {
235233
// calculate inline or external part size (it may cover a few sections, not just single one)
236234
while (!PartLenRemain) {
@@ -255,8 +253,8 @@ namespace NActors {
255253

256254
// serialize bytes
257255
const auto complete = IsPartInline
258-
? FeedInlinePayload(task, event, weightConsumed)
259-
: FeedExternalPayload(task, event, weightConsumed);
256+
? FeedInlinePayload(task, event)
257+
: FeedExternalPayload(task, event);
260258
if (!complete) { // no space to serialize
261259
return false;
262260
} else if (*complete) { // event serialized
@@ -265,7 +263,7 @@ namespace NActors {
265263
}
266264
}
267265

268-
std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
266+
std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event) {
269267
if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) {
270268
return std::nullopt;
271269
}
@@ -284,13 +282,12 @@ namespace NActors {
284282
};
285283

286284
task.WriteBookmark(std::move(partBookmark), &part, sizeof(part));
287-
*weightConsumed += sizeof(TChannelPart) + part.Size;
288285
OutputQueueSize -= part.Size;
289286

290287
return complete;
291288
}
292289

293-
std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
290+
std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) {
294291
const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
295292
if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) {
296293
return std::nullopt;
@@ -330,8 +327,6 @@ namespace NActors {
330327
}
331328

332329
task.WriteBookmark(std::move(partBookmark), buffer, partSize);
333-
334-
*weightConsumed += partSize + bytesSerialized;
335330
OutputQueueSize -= bytesSerialized;
336331

337332
return complete;

Diff for: ydb/library/actors/interconnect/interconnect_channel.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ namespace NActors {
8787

8888
void DropConfirmed(ui64 confirm);
8989

90-
bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
90+
bool FeedBuf(TTcpPacketOutTask& task, ui64 serial);
9191

9292
bool IsEmpty() const {
9393
return Queue.empty();
@@ -142,11 +142,11 @@ namespace NActors {
142142
template<bool External>
143143
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
144144

145-
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
146-
std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
147-
std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
145+
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event);
146+
std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event);
147+
std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event);
148148

149-
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
149+
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event);
150150

151151
void AccountTraffic() {
152152
if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {

Diff for: ydb/library/actors/interconnect/interconnect_tcp_session.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -953,8 +953,11 @@ namespace NActors {
953953

954954
// generate some data within this channel
955955
const ui64 netBefore = channel->GetBufferedAmountOfData();
956-
ui64 gross = 0;
957-
const bool eventDone = channel->FeedBuf(task, serial, &gross);
956+
const ui32 grossBefore = task.GetDataSize();
957+
const bool eventDone = channel->FeedBuf(task, serial);
958+
const ui32 grossAfter = task.GetDataSize();
959+
Y_DEBUG_ABORT_UNLESS(grossBefore <= grossAfter);
960+
const ui32 gross = grossAfter - grossBefore;
958961
channel->UnaccountedTraffic += gross;
959962
const ui64 netAfter = channel->GetBufferedAmountOfData();
960963
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink
@@ -964,7 +967,7 @@ namespace NActors {
964967
TotalOutputQueueSize -= net;
965968
Proxy->Metrics->SubOutputBuffersTotalSize(net);
966969
bytesGenerated += gross;
967-
Y_DEBUG_ABORT_UNLESS(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
970+
Y_DEBUG_ABORT_UNLESS(gross || !net, "net# %" PRIu64 " gross# %" PRIu32, net, gross);
968971

969972
// return it back to queue or delete, depending on whether this channel is still working or not
970973
ChannelScheduler->FinishPick(gross, EqualizeCounter);

Diff for: ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
5959
while (numEvents) {
6060
TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight();
6161
ui32 before = task.GetDataSize();
62-
ui64 weightConsumed = 0;
63-
numEvents -= channel->FeedBuf(task, 0, &weightConsumed);
62+
ui64 weightConsumed = task.GetDataSize();
63+
numEvents -= channel->FeedBuf(task, 0);
64+
weightConsumed = task.GetDataSize() - weightConsumed;
6465
ui32 after = task.GetDataSize();
6566
Y_ABORT_UNLESS(after >= before);
6667
scheduler.FinishPick(weightConsumed, 0);

0 commit comments

Comments
 (0)