Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions proto/redpanda/core/admin/internal/cloud_topics/v1/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ service MetastoreService {
authz: SUPERUSER
};
}

// Get the size in bytes of a topic partition in the Kafka namespace. If
// the metastore contains no data for this NTP then a not found error will
// be thrown.
rpc GetSize(GetSizeRequest) returns (GetSizeResponse) {
option (pbgen.rpc) = {
authz: SUPERUSER
};
}
}

// GetOffsetsRequest is the request for looking up the offsets in the metastore
Expand All @@ -77,3 +86,16 @@ message Offsets {
// the exclusive upper bound of what is stored in the metastore.
int64 next_offset = 2;
}

// GetSizeRequest is the request for looking up the size in bytes in the
// metastore for a topic partition.
message GetSizeRequest {
// The topic partition to lookup the size for.
common.v1.TopicPartition partition = 1;
}

// GetSizeResponse is the response for looking up the size in the metastore.
message GetSizeResponse {
// The size in bytes of this partition in the metastore.
uint64 size_bytes = 1;
}
25 changes: 25 additions & 0 deletions src/v/cloud_topics/level_one/domain/db_domain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,31 @@ db_domain_manager::get_offsets(rpc::get_offsets_request req) {
};
}

ss::future<rpc::get_size_reply>
db_domain_manager::get_size(rpc::get_size_request req) {
auto gl_res = co_await gate_and_open_reads();
if (!gl_res.has_value()) {
co_return rpc::get_size_reply{.ec = gl_res.error()};
}
auto reader = state_reader(db_->db().create_snapshot());
auto metadata_res = co_await reader.get_metadata(req.tp);
if (!metadata_res.has_value()) {
co_return rpc::get_size_reply{
.ec = log_and_convert(metadata_res.error(), "Error getting metadata"),
};
}
if (!metadata_res->has_value()) {
co_return rpc::get_size_reply{
.ec = rpc::errc::missing_ntp,
};
}
const auto& metadata = **metadata_res;
co_return rpc::get_size_reply{
.ec = rpc::errc::ok,
.size = metadata.size,
};
}

ss::future<rpc::get_compaction_info_reply>
db_domain_manager::do_get_compaction_info(
const gate_read_lock&,
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/level_one/domain/db_domain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class db_domain_manager final : public domain_manager {
ss::future<rpc::get_offsets_reply>
get_offsets(rpc::get_offsets_request) override;

ss::future<rpc::get_size_reply> get_size(rpc::get_size_request) override;

ss::future<rpc::get_compaction_info_reply>
get_compaction_info(rpc::get_compaction_info_request) override;

Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/level_one/domain/domain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class domain_manager {
virtual ss::future<rpc::get_offsets_reply>
get_offsets(rpc::get_offsets_request) = 0;

virtual ss::future<rpc::get_size_reply> get_size(rpc::get_size_request) = 0;

virtual ss::future<rpc::get_compaction_info_reply>
get_compaction_info(rpc::get_compaction_info_request) = 0;

Expand Down
27 changes: 27 additions & 0 deletions src/v/cloud_topics/level_one/domain/simple_domain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,33 @@ simple_domain_manager::get_offsets(rpc::get_offsets_request req) {
};
}

ss::future<rpc::get_size_reply>
simple_domain_manager::get_size(rpc::get_size_request req) {
auto gate = maybe_gate();
if (!gate.has_value()) {
co_return rpc::get_size_reply{
.ec = rpc::errc::not_leader,
};
}
auto sync_res = co_await stm_->sync(10s);
if (!sync_res.has_value()) {
co_return rpc::get_size_reply{
.ec = convert_stm_errc(sync_res.error()),
};
}
auto& stm_state = stm_->state();
auto get_res = simple_metastore::get_size(stm_state, req.tp);
if (!get_res.has_value()) {
co_return rpc::get_size_reply{
.ec = convert_metastore_errc(get_res.error()),
};
}
co_return rpc::get_size_reply{
.ec = rpc::errc::ok,
.size = get_res->size,
};
}

rpc::get_compaction_info_reply simple_domain_manager::do_get_compaction_info(
const state& stm_state, rpc::get_compaction_info_request req) {
auto get_res = simple_metastore::get_compaction_info(
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/level_one/domain/simple_domain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class simple_domain_manager final : public domain_manager {
ss::future<rpc::get_offsets_reply>
get_offsets(rpc::get_offsets_request) override;

ss::future<rpc::get_size_reply> get_size(rpc::get_size_request) override;

ss::future<rpc::get_compaction_info_reply>
get_compaction_info(rpc::get_compaction_info_request) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,3 +880,66 @@ TEST_F(DbDomainManagerTest, TestGarbageCollectionAfterRemoveTopic) {
RPTEST_REQUIRE_EVENTUALLY(
30s, [&] { return all_objects_missing(object_ids); });
}

TEST_F(DbDomainManagerTest, TestGetSizeBasic) {
auto tp = make_tp();
// Add 3 objects, each with an extent of size 512 bytes.
{
l1_rpc::add_objects_request req;
req.new_objects = make_new_objects(tp, kafka::offset(0), 3, 10);
req.new_terms = make_terms(tp, kafka::offset(0), model::term_id(1));
auto reply = initial_manager->add_objects(std::move(req)).get();
ASSERT_EQ(reply.ec, l1_rpc::errc::ok);
}

// Query size - should be 3 * 512 = 1536 bytes.
l1_rpc::get_size_request size_req{.tp = tp};
auto size_reply = initial_manager->get_size(std::move(size_req)).get();
ASSERT_EQ(size_reply.ec, l1_rpc::errc::ok);
ASSERT_EQ(size_reply.size, 3 * 512);
}

TEST_F(DbDomainManagerTest, TestGetSizeAfterReplace) {
auto tp = make_tp();
// Add 5 objects, each with an extent of size 512 bytes.
for (int i = 0; i < 5; ++i) {
l1_rpc::add_objects_request req;
req.new_objects = make_new_objects(tp, kafka::offset(i), 1, 1);
req.new_terms = make_terms(tp, kafka::offset(i), model::term_id(1));
auto reply = initial_manager->add_objects(std::move(req)).get();
ASSERT_EQ(reply.ec, l1_rpc::errc::ok);
}

// Initial size should be 5 * 512 = 2560 bytes.
{
l1_rpc::get_size_request size_req{.tp = tp};
auto size_reply = initial_manager->get_size(std::move(size_req)).get();
ASSERT_EQ(size_reply.ec, l1_rpc::errc::ok);
ASSERT_EQ(size_reply.size, 5 * 512);
}

// Replace all 5 extents with 1 extent (also 512 bytes).
l1_rpc::replace_objects_request replace_req{
.metastore_partition = model::partition_id(0),
.new_objects = make_new_objects(tp, kafka::offset(0), 1, 5),
};
auto replace_reply
= initial_manager->replace_objects(std::move(replace_req)).get();
ASSERT_EQ(replace_reply.ec, l1_rpc::errc::ok);

// Size should now be 1 * 512 = 512 bytes.
{
l1_rpc::get_size_request size_req{.tp = tp};
auto size_reply = initial_manager->get_size(std::move(size_req)).get();
ASSERT_EQ(size_reply.ec, l1_rpc::errc::ok);
ASSERT_EQ(size_reply.size, 512);
}
}

TEST_F(DbDomainManagerTest, TestGetSizeMissingPartition) {
auto tp = make_tp();
// Query size for a partition that doesn't exist.
l1_rpc::get_size_request size_req{.tp = tp};
auto size_reply = initial_manager->get_size(std::move(size_req)).get();
ASSERT_EQ(size_reply.ec, l1_rpc::errc::missing_ntp);
}
38 changes: 38 additions & 0 deletions src/v/cloud_topics/level_one/metastore/leader_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ ss::future<rpc::get_offsets_reply> do_get_offsets(
co_return co_await domain_mgr->get_offsets(std::move(req));
}

ss::future<rpc::get_size_reply> do_get_size(
domain_supervisor& domain_supervisor,
const model::ntp& ntp,
rpc::get_size_request req) {
auto domain_mgr = domain_supervisor.get(ntp);
if (!domain_mgr) {
co_return rpc::get_size_reply{.ec = rpc::errc::not_leader};
}
co_return co_await domain_mgr->get_size(std::move(req));
}

ss::future<rpc::get_compaction_info_reply> do_get_compaction_info(
domain_supervisor& domain_supervisor,
const model::ntp& ntp,
Expand Down Expand Up @@ -340,6 +351,13 @@ template ss::future<rpc::get_offsets_reply> leader_router::process<
&leader_router::get_offsets_locally,
&leader_router::client::get_offsets>(rpc::get_offsets_request, bool);

template ss::future<rpc::get_size_reply>
leader_router::remote_dispatch<&leader_router::client::get_size>(
rpc::get_size_request, model::node_id);
template ss::future<rpc::get_size_reply> leader_router::process<
&leader_router::get_size_locally,
&leader_router::client::get_size>(rpc::get_size_request, bool);

template ss::future<rpc::get_compaction_info_reply>
leader_router::remote_dispatch<&leader_router::client::get_compaction_info>(
rpc::get_compaction_info_request, model::node_id);
Expand Down Expand Up @@ -565,6 +583,26 @@ ss::future<rpc::get_offsets_reply> leader_router::get_offsets(
&client::get_offsets>(std::move(request), bool(local_only_exec));
}

ss::future<rpc::get_size_reply> leader_router::get_size_locally(
rpc::get_size_request request,
const model::ntp& metastore_ntp,
ss::shard_id shard) {
co_return co_await container().invoke_on(
shard,
[metastore_ntp, req = std::move(request)](leader_router& fe) mutable {
return do_get_size(
*(fe._domain_supervisor), metastore_ntp, std::move(req));
});
}

ss::future<rpc::get_size_reply> leader_router::get_size(
rpc::get_size_request request, local_only local_only_exec) {
auto holder = _gate.hold();
co_return co_await process<
&leader_router::get_size_locally,
&client::get_size>(std::move(request), bool(local_only_exec));
}

ss::future<rpc::get_compaction_info_reply>
leader_router::get_compaction_info_locally(
rpc::get_compaction_info_request request,
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_topics/level_one/metastore/leader_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class leader_router : public ss::peering_sharded_service<leader_router> {
ss::future<rpc::get_offsets_reply>
get_offsets(rpc::get_offsets_request, local_only = local_only::no);

ss::future<rpc::get_size_reply>
get_size(rpc::get_size_request, local_only = local_only::no);

ss::future<rpc::get_compaction_info_reply> get_compaction_info(
rpc::get_compaction_info_request, local_only = local_only::no);

Expand Down Expand Up @@ -165,6 +168,9 @@ class leader_router : public ss::peering_sharded_service<leader_router> {
ss::future<rpc::get_offsets_reply> get_offsets_locally(
rpc::get_offsets_request, const model::ntp& metastore_ntp, ss::shard_id);

ss::future<rpc::get_size_reply> get_size_locally(
rpc::get_size_request, const model::ntp& metastore_ntp, ss::shard_id);

ss::future<rpc::get_compaction_info_reply> get_compaction_info_locally(
rpc::get_compaction_info_request,
const model::ntp& metastore_ntp,
Expand Down
Loading
Loading