Skip to content
2 changes: 1 addition & 1 deletion src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ redpanda_cc_library(
"//src/v/cloud_topics/level_one/frontend_reader/tests:__pkg__",
"//src/v/cloud_topics/level_zero/frontend_reader:__pkg__",
"//src/v/cloud_topics/reconciler:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
"//src/v/kafka/data:__pkg__",
"//src/v/redpanda:__pkg__",
],
Expand Down Expand Up @@ -126,7 +127,6 @@ redpanda_cc_library(
],
implementation_deps = [
"//src/v/base",
"//src/v/cloud_io:cache",
"//src/v/cloud_io:remote",
"//src/v/cloud_topics:cluster_services_interface",
"//src/v/cloud_topics:data_plane_api",
Expand Down
1 change: 0 additions & 1 deletion src/v/cloud_topics/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ ss::future<> app::construct(
data_plane = co_await make_data_plane(
ssx::sformat("{}::data_plane", _logger_name),
remote,
cloud_cache,
bucket,
storage,
&controller->get_cluster_epoch_generator());
Expand Down
33 changes: 33 additions & 0 deletions src/v/cloud_topics/batch_cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ load("//bazel:build.bzl", "redpanda_cc_library")
package(default_visibility = [
"//src/v/cloud_topics:__pkg__",
"//src/v/cloud_topics/batch_cache/tests:__pkg__",
"//src/v/cloud_topics/level_zero/reader:__pkg__",
"//src/v/cloud_topics/level_zero/reader/tests:__pkg__",
])

redpanda_cc_library(
Expand All @@ -22,6 +24,35 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "hydrated_cache_api",
hdrs = [
"hydrated_cache_api.h",
],
deps = [
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:types",
"//src/v/model",
],
)

redpanda_cc_library(
name = "hydrated_object_index",
srcs = [
"hydrated_object_index.cc",
],
hdrs = [
"hydrated_object_index.h",
],
deps = [
"//src/v/cloud_topics:types",
"//src/v/model",
"//src/v/storage:batch_cache",
"@abseil-cpp//absl/container:btree",
"@abseil-cpp//absl/container:flat_hash_map",
],
)

redpanda_cc_library(
name = "batch_cache",
srcs = [
Expand All @@ -31,6 +62,8 @@ redpanda_cc_library(
"batch_cache.h",
],
deps = [
":hydrated_cache_api",
":hydrated_object_index",
":probe",
"//src/v/base",
"//src/v/bytes",
Expand Down
226 changes: 222 additions & 4 deletions src/v/cloud_topics/batch_cache/batch_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
#include "cloud_topics/batch_cache/batch_cache.h"

#include "config/configuration.h"
#include "model/record.h"
#include "ssx/future-util.h"
#include "storage/batch_cache.h"
#include "storage/log_manager.h"
#include "storage/ntp_config.h"

#include <seastar/core/preempt.hh>
#include <seastar/core/shard_id.hh>

#include <chrono>

Expand All @@ -33,11 +35,13 @@ batch_cache::batch_cache(
: batch_cache(&log_manager.local().log_mgr(), gc_interval) {}

ss::future<> batch_cache::start() {
// Setup materialized index cleanup timer
_cleanup_timer.set_callback([this] {
auto gh = _gate.hold();
ssx::spawn_with_gate(_gate, [this] { return cleanup_index_entries(); });
});
_cleanup_timer.arm(_gc_interval);

return ss::now();
}

Expand Down Expand Up @@ -74,7 +78,7 @@ void batch_cache::put(
}
}
it->second->put(b, storage::batch_cache::is_dirty_entry::no);
_probe.register_put(b.size_bytes());
_probe.register_materialized_put(b.size_bytes());
}

std::optional<model::record_batch>
Expand All @@ -97,13 +101,13 @@ batch_cache::get(const model::topic_id_partition& tidp, model::offset o) {
rb->base_offset(),
rb->last_offset(),
o);
_probe.register_get(rb->size_bytes());
_probe.register_materialized_get(rb->size_bytes());
} else {
_probe.register_miss();
_probe.register_materialized_miss();
}
return rb;
}
_probe.register_miss();
_probe.register_materialized_miss();
return std::nullopt;
}

Expand All @@ -112,6 +116,8 @@ ss::future<> batch_cache::cleanup_index_entries() {
// the index may no longer reference any live entries. If this
// is the case we need to delete the batch_cache_index from the
// '_index' collection to avoid accumulating orphaned entries.

// Clean up materialized batch index
auto it = _index.begin();
while (it != _index.end()) {
if (it->second->empty()) {
Expand All @@ -125,7 +131,219 @@ ss::future<> batch_cache::cleanup_index_entries() {
it = _index.lower_bound(next);
}
}

// Clean up empty per-partition hydrated indices
auto hydrated_it = _partition_hydrated.begin();
while (hydrated_it != _partition_hydrated.end()) {
bool should_remove = false;
if (
hydrated_it->second.translation_map
&& hydrated_it->second.translation_map->extent_count() == 0
&& hydrated_it->second.translation_map->epoch_count() == 0) {
should_remove = true;
}
if (should_remove) {
hydrated_it = _partition_hydrated.erase(hydrated_it);
} else {
++hydrated_it;
}
if (ss::need_preempt() && hydrated_it != _partition_hydrated.end()) {
model::topic_id_partition next = hydrated_it->first;
co_await ss::yield();
hydrated_it = _partition_hydrated.lower_bound(next);
}
}

_cleanup_timer.arm(_gc_interval);
}

// partition_hydrated_cache implementation

partition_hydrated_cache::partition_hydrated_cache(batch_cache& cache)
: _cache(cache) {}

bool partition_hydrated_cache::is_cached(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
byte_range_size_t size) const {
return _cache.is_cached_hydrated_internal(tidp, id, byte_offset, size);
}

std::optional<iobuf> partition_hydrated_cache::get(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
byte_range_size_t size) {
return _cache.get_hydrated_internal(tidp, id, byte_offset, size);
}

void partition_hydrated_cache::put(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
iobuf payload) {
_cache.put_hydrated_internal(tidp, id, byte_offset, std::move(payload));
}

void partition_hydrated_cache::truncate_hydrated(
const model::topic_id_partition& tidp, cluster_epoch invalidated_epoch) {
_cache.truncate_hydrated_internal(tidp, invalidated_epoch);
}

// batch_cache per-partition hydrated cache methods

partition_hydrated_cache_api* batch_cache::get_partition_hydrated_cache() {
if (!_partition_hydrated_cache) {
_partition_hydrated_cache = std::make_unique<partition_hydrated_cache>(
*this);
}
return _partition_hydrated_cache.get();
}

partition_hydrated_state& batch_cache::ensure_partition_hydrated_state(
const model::topic_id_partition& tidp) {
auto it = _partition_hydrated.find(tidp);
if (it != _partition_hydrated.end()) {
return it->second;
}

partition_hydrated_state state;

if (_lm != nullptr) {
// Create a factory function that creates batch_cache_index instances
batch_cache_index_factory factory = [this]() {
auto cache_ix = _lm->create_cache(storage::with_cache::yes);
if (cache_ix.has_value()) {
return std::make_unique<storage::batch_cache_index>(
std::move(*cache_ix));
}
return storage::batch_cache_index_ptr{};
};
state.translation_map = std::make_unique<partition_hydrated_index>(
std::move(factory));
}

auto [new_it, _] = _partition_hydrated.emplace(tidp, std::move(state));
return new_it->second;
}

bool batch_cache::is_cached_hydrated_internal(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
byte_range_size_t size) const {
auto it = _partition_hydrated.find(tidp);
if (it == _partition_hydrated.end() || !it->second.translation_map) {
return false;
}
return it->second.translation_map->has_extent(id, byte_offset, size());
}

std::optional<iobuf> batch_cache::get_hydrated_internal(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
byte_range_size_t size) {
if (_lm == nullptr) {
return std::nullopt;
}
_gate.check();

auto it = _partition_hydrated.find(tidp);
if (it == _partition_hydrated.end() || !it->second.translation_map) {
_probe.register_hydrated_miss();
return std::nullopt;
}

auto& state = it->second;

// Use get_extent which handles subset queries and data retrieval
auto data = state.translation_map->get_extent(id, byte_offset, size());
if (!data.has_value()) {
_probe.register_hydrated_miss();
return std::nullopt;
}

_probe.register_hydrated_get(data->size_bytes());
return data;
}

void batch_cache::put_hydrated_internal(
const model::topic_id_partition& tidp,
const object_id& id,
first_byte_offset_t byte_offset,
iobuf payload) {
if (_lm == nullptr) {
return;
}
_gate.check();

auto& state = ensure_partition_hydrated_state(tidp);
if (!state.translation_map) {
return;
}

// put_extent allocates a synthetic offset and creates epoch index if needed
auto synthetic = state.translation_map->put_extent(
id, byte_offset, payload.size_bytes());

if (!synthetic.has_value()) {
// Caching is disabled or failed to create index
return;
}

// Get the batch_cache_index for this epoch
auto* index = state.translation_map->get_batch_cache_index(id.epoch);
if (index == nullptr) {
return;
}

// Check if already stored in the index
if (index->get(*synthetic).has_value()) {
return;
}

// Create a fake record batch to store the payload
// We use the synthetic offset as the base_offset
// The term is set to a placeholder value since this is hydrated (not
// materialized) data
model::record_batch_header header{
.size_bytes = static_cast<int32_t>(
model::packed_record_batch_header_size + payload.size_bytes()),
.base_offset = *synthetic,
.type = model::record_batch_type::raft_data,
.crc = 0,
.attrs = model::record_batch_attributes{},
.last_offset_delta = 0,
.first_timestamp = model::timestamp::now(),
.max_timestamp = model::timestamp::now(),
.producer_id = -1,
.producer_epoch = -1,
.base_sequence = -1,
.record_count = 1,
.ctx = model::record_batch_header::context(
model::term_id{0}, ss::this_shard_id()),
};

auto batch = model::record_batch(header, std::move(payload));
index->put(batch, storage::batch_cache::is_dirty_entry::no);
_probe.register_hydrated_put(batch.size_bytes());
}

void batch_cache::truncate_hydrated_internal(
const model::topic_id_partition& tidp, cluster_epoch invalidated_epoch) {
auto it = _partition_hydrated.find(tidp);
if (it == _partition_hydrated.end() || !it->second.translation_map) {
return;
}

auto& state = it->second;

// Truncate entries from epochs older than the invalidated epoch.
// This removes the per-epoch batch_cache_index entries and truncates
// their underlying batch_cache data.
state.translation_map->truncate_epoch(invalidated_epoch);
}

} // namespace cloud_topics
Loading