Skip to content

Commit abb1399

Browse files
authored
Merge pull request #29509 from WillemKauf/extent_metadata_fetch
`ct/l1`: add `end_of_stream` flag to `extent_metadata_response`
2 parents c27373a + f951fe8 commit abb1399

File tree

9 files changed

+87
-26
lines changed

9 files changed

+87
-26
lines changed

src/v/cloud_topics/level_one/domain/db_domain_manager.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,11 +843,14 @@ db_domain_manager::get_extent_metadata(rpc::get_extent_metadata_request req) {
843843
}
844844
if (!extents_result->has_value()) {
845845
co_return rpc::get_extent_metadata_reply{
846-
.ec = rpc::errc::out_of_range,
846+
.ec = rpc::errc::ok,
847+
.extents = {},
848+
.end_of_stream = true,
847849
};
848850
}
849851

850852
chunked_vector<rpc::extent_metadata> extents;
853+
bool end_of_stream = true;
851854
auto gen = (*extents_result)->get_rows();
852855
while (auto row_opt = co_await gen()) {
853856
const auto& row = row_opt->get();
@@ -866,13 +869,15 @@ db_domain_manager::get_extent_metadata(rpc::get_extent_metadata_request req) {
866869
.max_timestamp = extent.val.max_timestamp,
867870
});
868871
if (extents.size() >= req.max_num_extents) {
872+
end_of_stream = false;
869873
break;
870874
}
871875
}
872876

873877
co_return rpc::get_extent_metadata_reply{
874878
.ec = rpc::errc::ok,
875879
.extents = std::move(extents),
880+
.end_of_stream = end_of_stream,
876881
};
877882
}
878883

src/v/cloud_topics/level_one/domain/simple_domain_manager.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ simple_domain_manager::get_extent_metadata(
638638
}
639639
co_return rpc::get_extent_metadata_reply{
640640
.ec = rpc::errc::ok,
641-
.extents = meta_to_rpc_extent_metadata(std::move(get_res->extents))};
641+
.extents = meta_to_rpc_extent_metadata(std::move(get_res->extents)),
642+
.end_of_stream = get_res->end_of_stream};
642643
}
643644

644645
ss::future<rpc::flush_domain_reply>

src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -777,10 +777,11 @@ TEST_F(DbDomainManagerTest, TestRestoreWithConcurrentReads) {
777777
.max_num_extents = std::numeric_limits<size_t>::max()})
778778
.then([&num_reads](auto reply) {
779779
++num_reads;
780-
if (reply.ec != l1_rpc::errc::out_of_range) {
781-
EXPECT_EQ(reply.ec, l1_rpc::errc::ok);
780+
if (reply.ec == l1_rpc::errc::ok) {
782781
auto num_extents = reply.extents.size();
783-
EXPECT_TRUE(num_extents == 3)
782+
// Readers can see either 0 or 3 extents, depending on
783+
// when their read arrives.
784+
EXPECT_TRUE(num_extents == 0 || num_extents == 3)
784785
<< "Unexpected number of extents: " << num_extents;
785786
}
786787
});

src/v/cloud_topics/level_one/metastore/extent_metadata_reader.cc

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ extent_metadata_reader::forward_generator() {
6161
} else {
6262
// Yield extents.
6363
auto extents = std::move(extent_md_res->extents);
64+
bool end_of_stream = extent_md_res->end_of_stream;
6465

65-
if (extents.empty()) {
66-
break;
67-
}
66+
dassert(
67+
end_of_stream || !extents.empty(),
68+
"end_of_stream=false requires non-empty extents");
6869

6970
for (const auto& extent : extents) {
7071
co_yield extent;
7172
}
7273

73-
next_offset = kafka::next_offset(extents.back().last_offset);
74+
if (end_of_stream) {
75+
break;
76+
}
77+
78+
if (!extents.empty()) {
79+
next_offset = kafka::next_offset(extents.back().last_offset);
80+
}
7481
}
7582
}
7683
}
@@ -94,16 +101,23 @@ extent_metadata_reader::backward_generator() {
94101
} else {
95102
// Yield extents.
96103
auto extents = std::move(extent_md_res->extents);
104+
bool end_of_stream = extent_md_res->end_of_stream;
97105

98-
if (extents.empty()) {
99-
break;
100-
}
106+
dassert(
107+
end_of_stream || !extents.empty(),
108+
"end_of_stream=false requires non-empty extents");
101109

102110
for (const auto& extent : extents) {
103111
co_yield extent;
104112
}
105113

106-
next_offset = kafka::prev_offset(extents.back().base_offset);
114+
if (end_of_stream) {
115+
break;
116+
}
117+
118+
if (!extents.empty()) {
119+
next_offset = kafka::prev_offset(extents.back().base_offset);
120+
}
107121
}
108122
}
109123
}

src/v/cloud_topics/level_one/metastore/metastore.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ class metastore {
454454

455455
struct extent_metadata_response {
456456
extent_metadata_vec extents{};
457+
// True when no more extents exist beyond this response (end of
458+
// iteration). False when more extents may exist (hit max_num_extents
459+
// limit).
460+
bool end_of_stream{true};
457461
};
458462

459463
// Returns a number of extents in the offset range `[start, end]`

src/v/cloud_topics/level_one/metastore/replicated_metastore.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,7 @@ replicated_metastore::get_extent_metadata_forwards(
834834

835835
metastore::extent_metadata_response resp;
836836
resp.extents = rpc_to_meta_extent_metadata(std::move(reply.extents));
837+
resp.end_of_stream = reply.end_of_stream;
837838

838839
co_return resp;
839840
}
@@ -869,6 +870,7 @@ replicated_metastore::get_extent_metadata_backwards(
869870

870871
metastore::extent_metadata_response resp;
871872
resp.extents = rpc_to_meta_extent_metadata(std::move(reply.extents));
873+
resp.end_of_stream = reply.end_of_stream;
872874

873875
co_return resp;
874876
}

src/v/cloud_topics/level_one/metastore/rpc_types.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,13 @@ struct get_extent_metadata_reply
360360
get_extent_metadata_reply,
361361
serde::version<0>,
362362
serde::compat_version<0>> {
363-
auto serde_fields() { return std::tie(ec, extents); }
363+
auto serde_fields() { return std::tie(ec, extents, end_of_stream); }
364364

365365
errc ec;
366366
chunked_vector<extent_metadata> extents;
367+
// True when no more extents exist beyond this response (end of iteration).
368+
// False when more extents may exist (hit max_num_extents limit).
369+
bool end_of_stream{true};
367370
};
368371
struct get_extent_metadata_request
369372
: serde::envelope<

src/v/cloud_topics/level_one/metastore/simple_metastore.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ simple_metastore::get_extent_metadata_forwards(
764764
const auto& prt = prt_ref->get();
765765

766766
extent_metadata_vec extents;
767+
bool end_of_stream = true;
767768

768769
auto min_it = std::ranges::lower_bound(
769770
prt.extents, min_offset, std::less<>{}, &extent::last_offset);
@@ -773,17 +774,19 @@ simple_metastore::get_extent_metadata_forwards(
773774
break;
774775
}
775776

776-
if (extents.size() >= max_num_extents) {
777-
break;
778-
}
779-
780777
extents.push_back(
781778
{.base_offset = extent.base_offset,
782779
.last_offset = extent.last_offset,
783780
.max_timestamp = extent.max_timestamp});
781+
782+
if (extents.size() >= max_num_extents) {
783+
end_of_stream = false;
784+
break;
785+
}
784786
}
785787

786-
return extent_metadata_response{.extents = std::move(extents)};
788+
return extent_metadata_response{
789+
.extents = std::move(extents), .end_of_stream = end_of_stream};
787790
}
788791

789792
ss::future<std::expected<metastore::extent_metadata_response, metastore::errc>>
@@ -813,6 +816,7 @@ simple_metastore::get_extent_metadata_backwards(
813816
const auto& prt = prt_ref->get();
814817

815818
extent_metadata_vec extents;
819+
bool end_of_stream = true;
816820

817821
auto max_it = std::ranges::lower_bound(
818822
prt.extents, max_offset, std::less<>{}, &extent::last_offset);
@@ -833,17 +837,19 @@ simple_metastore::get_extent_metadata_backwards(
833837
break;
834838
}
835839

836-
if (extents.size() >= max_num_extents) {
837-
break;
838-
}
839-
840840
extents.push_back(
841841
{.base_offset = extent.base_offset,
842842
.last_offset = extent.last_offset,
843843
.max_timestamp = extent.max_timestamp});
844+
845+
if (extents.size() >= max_num_extents) {
846+
end_of_stream = false;
847+
break;
848+
}
844849
}
845850

846-
return extent_metadata_response{.extents = std::move(extents)};
851+
return extent_metadata_response{
852+
.extents = std::move(extents), .end_of_stream = end_of_stream};
847853
}
848854

849855
} // namespace cloud_topics::l1

0 commit comments

Comments
 (0)