ct: Remove disk cache from L0 data layer#29464
ct: Remove disk cache from L0 data layer#29464Lazin wants to merge 12 commits intoredpanda-data:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR removes the cloud storage disk cache from the L0 data layer read path and replaces it with a memory-based record batch cache for storing hydrated L0 objects.
Changes:
- Replaced disk-based cache operations with memory-based hydrated cache using the batch_cache infrastructure
- Added per-partition hydrated cache tracking with epoch-based organization
- Modified L0 object format to include a footer mapping partition data locations
- Updated all L0 read/write request handling to include topic_id and topic_id_partition parameters
Reviewed changes
Copilot reviewed 67 out of 67 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
src/v/cloud_topics/batch_cache/hydrated_cache_api.h |
New API interface for per-partition hydrated L0 object caching |
src/v/cloud_topics/batch_cache/hydrated_object_index.h |
New index structure mapping L0 extents to synthetic offsets per epoch |
src/v/cloud_topics/batch_cache/batch_cache.cc |
Implementation of hydrated cache operations using batch_cache |
src/v/cloud_topics/level_zero/common/object.h |
New L0 object footer structure for partition data locations |
src/v/cloud_topics/level_zero/batcher/aggregator.cc |
Updated to write partition data with footer to L0 objects |
src/v/cloud_topics/level_zero/reader/materialized_extent_reader.cc |
Refactored to download full L0 objects and cache all partitions |
src/v/cloud_topics/level_zero/pipeline/write_request.h |
Added topic_id field to write requests |
src/v/cloud_topics/level_zero/pipeline/read_request.h |
Added topic_id_partition field to read requests |
src/v/cloud_topics/data_plane_impl.cc |
Removed cloud_io::cache dependency |
| Multiple test files | Updated to pass topic_id/topic_id_partition parameters |
CI test resultstest results on build#79862test results on build#79907
|
|
|
||
| template<class Clock> | ||
| iobuf aggregator<Clock>::get_stream() { | ||
| ss::future<iobuf> aggregator<Clock>::get_stream(l0::footer& footer_out) { |
There was a problem hiding this comment.
This can return iobuf. Doesn't have to be async.
|
|
||
| // Serialize the footer and append it to the payload | ||
| iobuf footer_buf; | ||
| co_await serde::write_async(footer_buf, footer_out.copy()); |
There was a problem hiding this comment.
No need to use write_async. It is possible to serialize using serde::to_iobuf.
|
|
||
| template<class Clock> | ||
| aggregator<Clock>::L0_object aggregator<Clock>::prepare(object_id id) { | ||
| ss::future<typename aggregator<Clock>::L0_object> |
There was a problem hiding this comment.
This should return an object of type L0_object instead of the future.
| iobuf get_stream(); | ||
| /// Returns the payload with footer appended and populates the footer | ||
| /// struct. | ||
| ss::future<iobuf> get_stream(l0::footer& footer_out); |
There was a problem hiding this comment.
should return iobuf
fundamentally, there is nothing in the implementation that makes this function async. all the data is available in memory
| // result = co_await l0::footer::read(std::move(missing)); | ||
| // return std::get<l0::footer>(result); | ||
| // ``` | ||
| static ss::future<std::variant<footer, size_t>> read(iobuf); |
There was a problem hiding this comment.
shouldn't be async
the footer should be possible to serialize/deserialize using from_iobuf/to_iobuf
the size of the footer is limited (cardinality is capped at 1000 by default) so it's impossible to hit reactor stalls
| virtual ~partition_hydrated_cache_api() = default; | ||
|
|
||
| /// Check if hydrated data for this extent is cached for the partition. | ||
| virtual bool has( |
| return &new_it->second; | ||
| } | ||
|
|
||
| std::optional<model::offset> partition_hydrated_index::put_extent( |
There was a problem hiding this comment.
the batch_cache_probe should be updated here as well as in the batch_cache.cc
| const object_id& id, first_byte_offset_t byte_offset) const; | ||
|
|
||
| /// Check if this specific extent is cached. | ||
| bool has_extent(const object_id& id, first_byte_offset_t byte_offset) const; |
There was a problem hiding this comment.
The partition_hydrated_index should work slightly differently.
The read path can add the full extent to the cache. But then the read path will ask for the extent with the same id but with byte_offset which is greater or equal to the one that was used to add the entry.
Example:
- put: id=Foo, byte_offset=0, size_bytes=1024
- has_extent: id=Foo, bytes_offset=100, size_bytes=200 -> returns true
- get: id=Foo, bytes_offset=100, size_bytes=200 -> returns the actual byte range (full offset is 0 + 100, size = 200)
| /// Get the batch_cache_index for a specific epoch. | ||
| /// Returns nullptr if the epoch doesn't exist. | ||
| storage::batch_cache_index* | ||
| get_batch_cache_index(cluster_epoch epoch) const; |
There was a problem hiding this comment.
This shouldn't be exposed. There should be get_extent method that wraps together get_synthetic_offset and get_batch_cache_index. It should take into account that the caller may query for the subset of the extent (the full extent is [1000, 2000] the query asks [1200, 1400]).
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
The index is a translation layer used to store hydrated but not materialized batches in the record batch cache. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Add hydrated L0 object caching to the batch_cache Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Use hydrated object cache in the read path. When the object is downloaded by the read path its footer is analyzed and pyaloads that belong to different partitions are dissiminated to the corresponding caches. The dependency on the cloud storage cache is removed. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
The cloud storage cache is no longer used anywhere in the L0. Signed-off-by: Evgeny Lazin <[email protected]>
9a7d457 to
3f03104
Compare
Subj.
The PR removes cloud storage cache from the read path. Instead, it uses record batch cache to store hydrated L0 objects.
Backports Required
Release Notes