Skip to content

Commit 105c60a

Browse files
committed
Message spilling too
1 parent 90aaffb commit 105c60a

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

cpp/src/shuffler/shuffler.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,14 @@ std::size_t postbox_spilling(
138138
continue;
139139
}
140140
// We extract the chunk, spilled it, and insert it back into the PostBox.
141-
auto chunk = postbox.extract(pid, cid);
142-
chunk.set_data_buffer(br->move(chunk.release_data_buffer(), host_reservation));
143-
postbox.insert(std::move(chunk));
144-
RAPIDSMPF_NVTX_MARKER("postbox_spilling::chunk_spilled_bytes", size);
141+
{
142+
RAPIDSMPF_NVTX_SCOPED_RANGE("postbox_spilling::chunk", size);
143+
auto chunk = postbox.extract(pid, cid);
144+
chunk.set_data_buffer(
145+
br->move(chunk.release_data_buffer(), host_reservation)
146+
);
147+
postbox.insert(std::move(chunk));
148+
}
145149
if ((total_spilled += size) >= amount) {
146150
break;
147151
}

cpp/src/streaming/core/context.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <utility>
88

99
#include <rapidsmpf/buffer/resource.hpp>
10+
#include <rapidsmpf/nvtx.hpp>
1011
#include <rapidsmpf/streaming/core/context.hpp>
1112
#include <rapidsmpf/utils.hpp>
1213

@@ -33,6 +34,7 @@ std::size_t spill_messages(
3334
std::shared_ptr<BufferResource> br,
3435
std::size_t amount
3536
) {
37+
RAPIDSMPF_NVTX_SCOPED_RANGE("spill_messages::amount", amount);
3638
// Recall that std::map is sorted by key by default, so iteration follows the
3739
// order in which messages were inserted into `spillable_messages`.
3840
std::map<SpillableMessages::MessageId, ContentDescription> cds =
@@ -45,9 +47,11 @@ std::size_t spill_messages(
4547
break;
4648
}
4749
if (cd.spillable()) {
50+
RAPIDSMPF_NVTX_SCOPED_RANGE("spill_messages::spill", id);
4851
total_spilled += spillable_messages.spill(id, br.get());
4952
}
5053
}
54+
RAPIDSMPF_NVTX_MARKER("spill_messages::total_spilled_bytes", total_spilled);
5155
return total_spilled;
5256
}
5357
} // namespace

0 commit comments

Comments
 (0)