Skip to content

Commit f951fe8

Browse files
committed
ct/l1: add end_of_stream flag to extent_metadata_response
This flag serves as a strong indicator that we have iterated over all the extents requested in an offset range, rather than having an empty extent list represent this case. Also change the handling of the extent response in the `db_domain_manager` to return an empty list of extents with `end_of_stream=true` instead of an error when no extents are found in the requested range. The edge case of `extents.empty()` and `end_of_stream=false` potentially resulting in an infinite loop in `extent_metadata_reader` is also handled by ensuring that a request for `0` extents results in a single extent being returned with `end_of_stream=false`.
1 parent da64410 commit f951fe8

File tree

9 files changed

+87
-26
lines changed

9 files changed

+87
-26
lines changed

src/v/cloud_topics/level_one/domain/db_domain_manager.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,11 +843,14 @@ db_domain_manager::get_extent_metadata(rpc::get_extent_metadata_request req) {
843843
}
844844
if (!extents_result->has_value()) {
845845
co_return rpc::get_extent_metadata_reply{
846-
.ec = rpc::errc::out_of_range,
846+
.ec = rpc::errc::ok,
847+
.extents = {},
848+
.end_of_stream = true,
847849
};
848850
}
849851

850852
chunked_vector<rpc::extent_metadata> extents;
853+
bool end_of_stream = true;
851854
auto gen = (*extents_result)->get_rows();
852855
while (auto row_opt = co_await gen()) {
853856
const auto& row = row_opt->get();
@@ -866,13 +869,15 @@ db_domain_manager::get_extent_metadata(rpc::get_extent_metadata_request req) {
866869
.max_timestamp = extent.val.max_timestamp,
867870
});
868871
if (extents.size() >= req.max_num_extents) {
872+
end_of_stream = false;
869873
break;
870874
}
871875
}
872876

873877
co_return rpc::get_extent_metadata_reply{
874878
.ec = rpc::errc::ok,
875879
.extents = std::move(extents),
880+
.end_of_stream = end_of_stream,
876881
};
877882
}
878883

src/v/cloud_topics/level_one/domain/simple_domain_manager.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ simple_domain_manager::get_extent_metadata(
638638
}
639639
co_return rpc::get_extent_metadata_reply{
640640
.ec = rpc::errc::ok,
641-
.extents = meta_to_rpc_extent_metadata(std::move(get_res->extents))};
641+
.extents = meta_to_rpc_extent_metadata(std::move(get_res->extents)),
642+
.end_of_stream = get_res->end_of_stream};
642643
}
643644

644645
ss::future<rpc::flush_domain_reply>

src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -777,10 +777,11 @@ TEST_F(DbDomainManagerTest, TestRestoreWithConcurrentReads) {
777777
.max_num_extents = std::numeric_limits<size_t>::max()})
778778
.then([&num_reads](auto reply) {
779779
++num_reads;
780-
if (reply.ec != l1_rpc::errc::out_of_range) {
781-
EXPECT_EQ(reply.ec, l1_rpc::errc::ok);
780+
if (reply.ec == l1_rpc::errc::ok) {
782781
auto num_extents = reply.extents.size();
783-
EXPECT_TRUE(num_extents == 3)
782+
// Readers can see either 0 or 3 extents, depending on
783+
// when their read arrives.
784+
EXPECT_TRUE(num_extents == 0 || num_extents == 3)
784785
<< "Unexpected number of extents: " << num_extents;
785786
}
786787
});

src/v/cloud_topics/level_one/metastore/extent_metadata_reader.cc

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ extent_metadata_reader::forward_generator() {
6161
} else {
6262
// Yield extents.
6363
auto extents = std::move(extent_md_res->extents);
64+
bool end_of_stream = extent_md_res->end_of_stream;
6465

65-
if (extents.empty()) {
66-
break;
67-
}
66+
dassert(
67+
end_of_stream || !extents.empty(),
68+
"end_of_stream=false requires non-empty extents");
6869

6970
for (const auto& extent : extents) {
7071
co_yield extent;
7172
}
7273

73-
next_offset = kafka::next_offset(extents.back().last_offset);
74+
if (end_of_stream) {
75+
break;
76+
}
77+
78+
if (!extents.empty()) {
79+
next_offset = kafka::next_offset(extents.back().last_offset);
80+
}
7481
}
7582
}
7683
}
@@ -94,16 +101,23 @@ extent_metadata_reader::backward_generator() {
94101
} else {
95102
// Yield extents.
96103
auto extents = std::move(extent_md_res->extents);
104+
bool end_of_stream = extent_md_res->end_of_stream;
97105

98-
if (extents.empty()) {
99-
break;
100-
}
106+
dassert(
107+
end_of_stream || !extents.empty(),
108+
"end_of_stream=false requires non-empty extents");
101109

102110
for (const auto& extent : extents) {
103111
co_yield extent;
104112
}
105113

106-
next_offset = kafka::prev_offset(extents.back().base_offset);
114+
if (end_of_stream) {
115+
break;
116+
}
117+
118+
if (!extents.empty()) {
119+
next_offset = kafka::prev_offset(extents.back().base_offset);
120+
}
107121
}
108122
}
109123
}

src/v/cloud_topics/level_one/metastore/metastore.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ class metastore {
454454

455455
struct extent_metadata_response {
456456
extent_metadata_vec extents{};
457+
// True when no more extents exist beyond this response (end of
458+
// iteration). False when more extents may exist (hit max_num_extents
459+
// limit).
460+
bool end_of_stream{true};
457461
};
458462

459463
// Returns a number of extents in the offset range `[start, end]`

src/v/cloud_topics/level_one/metastore/replicated_metastore.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,7 @@ replicated_metastore::get_extent_metadata_forwards(
834834

835835
metastore::extent_metadata_response resp;
836836
resp.extents = rpc_to_meta_extent_metadata(std::move(reply.extents));
837+
resp.end_of_stream = reply.end_of_stream;
837838

838839
co_return resp;
839840
}
@@ -869,6 +870,7 @@ replicated_metastore::get_extent_metadata_backwards(
869870

870871
metastore::extent_metadata_response resp;
871872
resp.extents = rpc_to_meta_extent_metadata(std::move(reply.extents));
873+
resp.end_of_stream = reply.end_of_stream;
872874

873875
co_return resp;
874876
}

src/v/cloud_topics/level_one/metastore/rpc_types.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,13 @@ struct get_extent_metadata_reply
360360
get_extent_metadata_reply,
361361
serde::version<0>,
362362
serde::compat_version<0>> {
363-
auto serde_fields() { return std::tie(ec, extents); }
363+
auto serde_fields() { return std::tie(ec, extents, end_of_stream); }
364364

365365
errc ec;
366366
chunked_vector<extent_metadata> extents;
367+
// True when no more extents exist beyond this response (end of iteration).
368+
// False when more extents may exist (hit max_num_extents limit).
369+
bool end_of_stream{true};
367370
};
368371
struct get_extent_metadata_request
369372
: serde::envelope<

src/v/cloud_topics/level_one/metastore/simple_metastore.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ simple_metastore::get_extent_metadata_forwards(
764764
const auto& prt = prt_ref->get();
765765

766766
extent_metadata_vec extents;
767+
bool end_of_stream = true;
767768

768769
auto min_it = std::ranges::lower_bound(
769770
prt.extents, min_offset, std::less<>{}, &extent::last_offset);
@@ -773,17 +774,19 @@ simple_metastore::get_extent_metadata_forwards(
773774
break;
774775
}
775776

776-
if (extents.size() >= max_num_extents) {
777-
break;
778-
}
779-
780777
extents.push_back(
781778
{.base_offset = extent.base_offset,
782779
.last_offset = extent.last_offset,
783780
.max_timestamp = extent.max_timestamp});
781+
782+
if (extents.size() >= max_num_extents) {
783+
end_of_stream = false;
784+
break;
785+
}
784786
}
785787

786-
return extent_metadata_response{.extents = std::move(extents)};
788+
return extent_metadata_response{
789+
.extents = std::move(extents), .end_of_stream = end_of_stream};
787790
}
788791

789792
ss::future<std::expected<metastore::extent_metadata_response, metastore::errc>>
@@ -813,6 +816,7 @@ simple_metastore::get_extent_metadata_backwards(
813816
const auto& prt = prt_ref->get();
814817

815818
extent_metadata_vec extents;
819+
bool end_of_stream = true;
816820

817821
auto max_it = std::ranges::lower_bound(
818822
prt.extents, max_offset, std::less<>{}, &extent::last_offset);
@@ -833,17 +837,19 @@ simple_metastore::get_extent_metadata_backwards(
833837
break;
834838
}
835839

836-
if (extents.size() >= max_num_extents) {
837-
break;
838-
}
839-
840840
extents.push_back(
841841
{.base_offset = extent.base_offset,
842842
.last_offset = extent.last_offset,
843843
.max_timestamp = extent.max_timestamp});
844+
845+
if (extents.size() >= max_num_extents) {
846+
end_of_stream = false;
847+
break;
848+
}
844849
}
845850

846-
return extent_metadata_response{.extents = std::move(extents)};
851+
return extent_metadata_response{
852+
.extents = std::move(extents), .end_of_stream = end_of_stream};
847853
}
848854

849855
} // namespace cloud_topics::l1

0 commit comments

Comments
 (0)