Skip to content

Commit b19710a

Browse files
authored
Merge pull request #29061 from Lazin/ct/compute-data-threshold-across-shards
[CORE-15334] ct: Introduce new scheduling algorithm for lower latency
2 parents feda4ed + 1b0f1b7 commit b19710a

17 files changed

+1784
-421
lines changed

src/v/cloud_topics/level_zero/batcher/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ redpanda_cc_library(
6565
"//src/v/cloud_topics/level_zero/pipeline:write_request",
6666
"//src/v/config",
6767
"//src/v/model",
68+
"//src/v/ssx:semaphore",
6869
"//src/v/utils:retry_chain_node",
6970
"//src/v/utils:uuid",
7071
"@abseil-cpp//absl/container:btree",

src/v/cloud_topics/level_zero/batcher/aggregator.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ iobuf aggregator<Clock>::get_stream() {
9090
iobuf concat;
9191
for (auto& p : _aggregated) {
9292
if (p->ref != nullptr) {
93-
concat.append(std::move(p->ref->data_chunk.payload));
93+
concat.append_fragments(std::move(p->ref->data_chunk.payload));
9494
}
9595
}
9696
return concat;

src/v/cloud_topics/level_zero/batcher/batcher.cc

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ batcher<Clock>::batcher(
5151
, _rtc(_as)
5252
, _logger(cd_log, _rtc)
5353
, _stage(std::move(stage))
54-
, _probe(config::shard_local_cfg().disable_metrics()) {}
54+
, _probe(config::shard_local_cfg().disable_metrics())
55+
, _upload_sem(
56+
config::shard_local_cfg().cloud_storage_max_connections(), "l0/batcher") {
57+
}
5558

5659
template<class Clock>
5760
ss::future<> batcher<Clock>::start() {
@@ -254,29 +257,56 @@ ss::future<> batcher<Clock>::bg_controller_loop() {
254257
co_return;
255258
}
256259

260+
// Acquire semaphore units to limit concurrent background fibers.
261+
// This blocks until a slot is available.
262+
auto units_fut = co_await ss::coroutine::as_future(
263+
ss::get_units(_upload_sem, 1, _as));
264+
257265
auto list = _stage.pull_write_requests(
258-
10_MiB); // TODO: use configuration parameter
266+
config::shard_local_cfg()
267+
.cloud_topics_produce_batching_size_threshold(),
268+
config::shard_local_cfg()
269+
.cloud_topics_produce_cardinality_threshold());
270+
271+
bool complete = list.complete;
272+
273+
if (units_fut.failed()) {
274+
vlog(
275+
_logger.info,
276+
"Batcher upload loop is shutting down: {}",
277+
units_fut.get_exception());
278+
co_return;
279+
}
280+
auto units = std::move(units_fut.get());
259281

260282
// We can spawn the work in the background without worrying about memory
261-
// usage because the pipeline tracks the memory usage for us and will
262-
// stop accepting new write requests if we go over the limit.
263-
ssx::spawn_with_gate(_gate, [this, list = std::move(list)]() mutable {
264-
return run_once(std::move(list))
265-
.then([this](std::expected<std::monostate, errc> res) {
266-
if (!res.has_value()) {
267-
if (res.error() == errc::shutting_down) {
268-
vlog(
269-
_logger.info,
270-
"Batcher upload loop is shutting down");
271-
} else {
272-
vlog(
273-
_logger.info,
274-
"Batcher upload loop error: {}",
275-
res.error());
276-
}
277-
}
278-
});
279-
});
283+
// usage because the background fibers is holding units acquired above.
284+
ssx::spawn_with_gate(
285+
_gate,
286+
[this, list = std::move(list), units = std::move(units)]() mutable {
287+
return run_once(std::move(list))
288+
.then([this](std::expected<std::monostate, errc> res) {
289+
if (!res.has_value()) {
290+
if (res.error() == errc::shutting_down) {
291+
vlog(
292+
_logger.info,
293+
"Batcher upload loop is shutting down");
294+
} else {
295+
vlog(
296+
_logger.info,
297+
"Batcher upload loop error: {}",
298+
res.error());
299+
}
300+
}
301+
})
302+
.finally([u = std::move(units)] {});
303+
});
304+
305+
// The work is spawned in the background so we can grab data for the
306+
// next L0 object. If complete==true, all pending requests were pulled,
307+
// so wait for more. If complete==false, there are more pending
308+
// requests.
309+
more_work = !complete;
280310
}
281311
}
282312

src/v/cloud_topics/level_zero/batcher/batcher.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "config/property.h"
2525
#include "model/fundamental.h"
2626
#include "model/record_batch_reader.h"
27+
#include "ssx/semaphore.h"
2728
#include "utils/retry_chain_node.h"
2829
#include "utils/uuid.h"
2930

@@ -121,5 +122,8 @@ class batcher {
121122
write_pipeline<Clock>::stage _stage;
122123

123124
batcher_probe _probe;
125+
126+
// Limit the number of concurrent background fibers running run_once
127+
ssx::named_semaphore<Clock> _upload_sem;
124128
};
125129
} // namespace cloud_topics::l0

src/v/cloud_topics/level_zero/common/level_zero_probe.cc

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -211,31 +211,16 @@ void write_request_scheduler_probe::setup_internal_metrics(bool disable) {
211211
_metrics.add_group(
212212
prometheus_sanitize::metrics_name("cloud_topics_write_request_scheduler"),
213213
{sm::make_counter(
214-
"data_threshold_requests",
215-
[this] { return _data_threshold_requests; },
214+
"scheduler_requests",
215+
[this] { return _scheduler_requests; },
216216
sm::description(
217-
"Number of write requests scheduled by data threshold policy."),
217+
"Number of write requests scheduled by the scheduler."),
218218
labels),
219219

220220
sm::make_counter(
221-
"data_threshold_bytes",
222-
[this] { return _data_threshold_bytes; },
223-
sm::description(
224-
"Total number of bytes scheduled by data threshold policy."),
225-
labels),
226-
227-
sm::make_counter(
228-
"time_fallback_requests",
229-
[this] { return _time_fallback_requests; },
230-
sm::description(
231-
"Number of write requests scheduled by time based fallback policy."),
232-
labels),
233-
234-
sm::make_counter(
235-
"time_fallback_bytes",
236-
[this] { return _time_fallback_bytes; },
237-
sm::description(
238-
"Total number of bytes scheduled by time based fallback policy."),
221+
"scheduler_bytes",
222+
[this] { return _scheduler_bytes; },
223+
sm::description("Total number of bytes scheduled by scheduler."),
239224
labels),
240225

241226
sm::make_counter(
@@ -261,6 +246,12 @@ void write_request_scheduler_probe::setup_internal_metrics(bool disable) {
261246
"rx_bytes_xshard",
262247
[this] { return _rx_bytes_xshard; },
263248
sm::description("Total number of bytes received from another shard."),
249+
labels),
250+
251+
sm::make_gauge(
252+
"active_groups",
253+
[this] { return _active_groups; },
254+
sm::description("Number of active upload groups in the scheduler."),
264255
labels)});
265256
}
266257
batcher_probe::batcher_probe(bool disable) { setup_internal_metrics(disable); }

src/v/cloud_topics/level_zero/common/level_zero_probe.h

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,9 @@ class write_request_scheduler_probe {
9797
public:
9898
explicit write_request_scheduler_probe(bool disable);
9999

100-
void register_data_threshold(size_t bytes) {
101-
_data_threshold_requests += 1;
102-
_data_threshold_bytes += bytes;
103-
}
104-
105-
void register_time_fallback(size_t bytes) {
106-
_time_fallback_requests += 1;
107-
_time_fallback_bytes += bytes;
100+
void register_request(size_t bytes) {
101+
_scheduler_requests += 1;
102+
_scheduler_bytes += bytes;
108103
}
109104

110105
void register_send_xshard(size_t bytes) {
@@ -117,23 +112,23 @@ class write_request_scheduler_probe {
117112
_rx_bytes_xshard += bytes;
118113
}
119114

115+
void set_active_groups(uint64_t count) { _active_groups = count; }
116+
120117
private:
121118
void setup_internal_metrics(bool disable);
122119

123-
/// Number of write requests and total bytes scheduled by data threshold
124-
/// policy.
125-
uint64_t _data_threshold_requests{0};
126-
uint64_t _data_threshold_bytes{0};
127-
/// Number of write requests and total bytes scheduled by time based
128-
/// fallback policy.
129-
uint64_t _time_fallback_requests{0};
130-
uint64_t _time_fallback_bytes{0};
120+
/// Number of write requests and total bytes scheduled
121+
uint64_t _scheduler_requests{0};
122+
uint64_t _scheduler_bytes{0};
131123
/// Number of requests and total bytes proxied to another shard
132124
uint64_t _tx_requests_xshard{0};
133125
uint64_t _tx_bytes_xshard{0};
134126
/// Number of requests and total bytes received from another shard
135127
uint64_t _rx_requests_xshard{0};
136128
uint64_t _rx_bytes_xshard{0};
129+
/// Number of active upload groups
130+
uint64_t _active_groups{0};
131+
137132
metrics::internal_metric_groups _metrics;
138133
};
139134

src/v/cloud_topics/level_zero/pipeline/base_pipeline.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ class base_pipeline {
191191
return _stages.next_stage(s);
192192
}
193193

194+
/// Return next stage index without checking if stage is registered.
195+
/// This is useful for accessing pre-allocated resources.
196+
int next_stage_index(pipeline_stage s) const {
197+
return _stages.next_stage_index(s);
198+
}
199+
194200
/// Resolve every pending write that matches the predicate with an error.
195201
template<typename Pred>
196202
void remove_requests(Pred pred, errc error, std::string_view reason) {

src/v/cloud_topics/level_zero/pipeline/pipeline_stage.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ pipeline_stage pipeline_stage_container::register_pipeline_stage() noexcept {
5050
return pipeline_stage(&_stages.at(_registered++));
5151
}
5252

53+
int pipeline_stage_container::next_stage_index(pipeline_stage old) const {
54+
if (old == unassigned_pipeline_stage) {
55+
// First stage is index 0
56+
return 0;
57+
}
58+
auto old_ix = old()->get_numeric_id();
59+
auto next_ix = old_ix + 1;
60+
// Return -1 if we would exceed the allocated stages
61+
if (static_cast<size_t>(next_ix) >= _stages.size()) {
62+
return -1;
63+
}
64+
return next_ix;
65+
}
66+
5367
} // namespace cloud_topics::l0
5468

5569
auto fmt::formatter<cloud_topics::l0::pipeline_stage>::format(

src/v/cloud_topics/level_zero/pipeline/pipeline_stage.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ class pipeline_stage_container {
4848
pipeline_stage first_stage() const;
4949
pipeline_stage register_pipeline_stage() noexcept;
5050

51+
/// Get the numeric index of the next stage after the given stage.
52+
/// Unlike next_stage(), this method does not check if the next stage
53+
/// is registered. It returns the index even if the stage hasn't been
54+
/// registered yet. This is useful for accessing pre-allocated resources
55+
/// (like counters) that are indexed by stage number.
56+
/// \param old The current pipeline stage
57+
/// \return The index of the next stage, or -1 if old is unassigned or last
58+
int next_stage_index(pipeline_stage old) const;
59+
5160
private:
5261
std::vector<pipeline_stage_id> _stages;
5362
size_t _registered{0};

src/v/cloud_topics/level_zero/pipeline/write_pipeline.cc

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,20 @@ void write_pipeline<Clock>::stage::signal_next_stage() {
269269
_parent->signal(_parent->next_stage(_ps));
270270
}
271271

272+
template<class Clock>
273+
void write_pipeline<Clock>::stage::enqueue_foreign_request(
274+
write_request<Clock>& req, bool signal) {
275+
// Foreign requests are proxied from another shard where their bytes
276+
// were already accounted for. We place them directly at the next stage
277+
// without any byte accounting.
278+
auto next = _parent->next_stage(_ps);
279+
req.stage = next;
280+
_parent->get_pending().push_back(req);
281+
if (signal) {
282+
_parent->signal(next);
283+
}
284+
}
285+
272286
template<class Clock>
273287
write_pipeline<Clock>::write_requests_list
274288
write_pipeline<Clock>::stage::pull_write_requests(
@@ -374,14 +388,32 @@ size_t write_pipeline<Clock>::stage_bytes(pipeline_stage s) const {
374388
if (s == unassigned_pipeline_stage) {
375389
return 0;
376390
}
377-
return _stage_bytes[static_cast<size_t>(s()->get_numeric_id())];
391+
return _stage_bytes[static_cast<size_t>(s()->get_numeric_id())].count;
392+
}
393+
394+
template<class Clock>
395+
const std::atomic<size_t>*
396+
write_pipeline<Clock>::stage_bytes_ref(pipeline_stage s) const {
397+
if (s == unassigned_pipeline_stage) {
398+
return nullptr;
399+
}
400+
return &_stage_bytes[static_cast<size_t>(s()->get_numeric_id())].count;
401+
}
402+
403+
template<class Clock>
404+
const std::atomic<size_t>*
405+
write_pipeline<Clock>::stage_bytes_ref_by_index(int index) const {
406+
if (index < 0 || static_cast<size_t>(index) >= _stage_bytes.size()) {
407+
return nullptr;
408+
}
409+
return &_stage_bytes[static_cast<size_t>(index)].count;
378410
}
379411

380412
template<class Clock>
381413
size_t write_pipeline<Clock>::current_size() const {
382414
size_t total = 0;
383-
for (auto bytes : _stage_bytes) {
384-
total += bytes;
415+
for (const auto& bytes : _stage_bytes) {
416+
total += bytes.count.load(std::memory_order_relaxed);
385417
}
386418
return total;
387419
}

0 commit comments

Comments
 (0)