Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a topic manifest uploader for cloud topics, enabling topic manifest uploads when there are topic property changes or leadership changes on partition 0. The implementation follows the existing tiered storage pattern and reuses code by extracting cloud_storage components into separate Bazel libraries.
Changes:
- Adds
topic_manifest_upload_managerto handle topic manifest uploads following partition 0 leadership - Refactors cloud_storage components into separate Bazel libraries for better reusability
- Extracts
topic_path_providerfromremote_path_providerto share path computation logic
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/cloud_topics/topic_manifest_upload_manager.h | Defines the manager interface for handling topic manifest uploads |
| src/v/cloud_topics/topic_manifest_upload_manager.cc | Implements upload loop with retry logic and leadership tracking |
| src/v/cloud_topics/tests/topic_manifest_uploader_test.cc | Adds integration tests for manifest upload scenarios |
| src/v/cloud_storage/topic_manifest_uploader.h | Defines the uploader component with error handling |
| src/v/cloud_storage/topic_manifest_uploader.cc | Implements manifest upload with proper exception handling |
| src/v/cloud_storage/topic_path_provider.h | Extracts topic-level path computation into reusable component |
| src/v/cloud_storage/topic_path_provider.cc | Implements topic path generation logic |
| src/v/cloud_storage/remote_path_provider.h | Refactors to inherit from topic_path_provider |
| src/v/cloud_storage/remote_path_provider.cc | Updates to use inherited topic path methods |
| src/v/cloud_topics/manager/manager.h | Adds callback registration for property changes |
| src/v/cloud_topics/manager/manager.cc | Implements property change notification handling |
| src/v/cloud_topics/app.h | Adds topic_manifest_upload_mgr service |
| src/v/cloud_topics/app.cc | Wires up the upload manager with notifications |
| src/v/cluster/archival/ntp_archiver_service.cc | Updates to use new topic_manifest_uploader |
| src/v/cloud_storage/BUILD | Splits monolithic library into granular components |
| src/v/cloud_topics/BUILD | Adds dependencies for new upload manager |
6bdb3de to
696bcbe
Compare
CI test resultstest results on build#79988
|
Lazin
left a comment
There was a problem hiding this comment.
I think we shouldn't introduce dependency on cloud_storage in cloud_topics. I think that the code reuse should be done by moving more general things like the remote_path_provider to cloud_io.
src/v/cloud_topics/BUILD
Outdated
| ":cluster_services_interface", | ||
| ":data_plane_impl", | ||
| ":topic_manifest_upload_manager", | ||
| "//src/v/cloud_storage:remote", |
There was a problem hiding this comment.
Let's not make cloud-topics depend on tiered-storage.
There was a problem hiding this comment.
I agree it's an unfortunate dependency, but it felt necessary if we want to tick the manifest uploads probe. Those feel like they don't belong in cloud_io, given cloud_io is meant to be RP-agnostic
There was a problem hiding this comment.
Another alternative is to have the uploader live in cloud_storage or somesuch, given there isn't really anything specific to cloud_topics here either, just the fact that we're only uploading for cloud_topics partitions.
EDIT: i take it back, we are relying on the cloud_topics manager for the leadership/properties notifications...
|
Maybe it makes sense to add a service that uploads topic manifests to the |
| // Manages topic manifest uploads for cloud topics partitions. | ||
| // Starts an upload loop when a partition becomes leader, stops when leadership | ||
| // is lost, and re-uploads when properties change. | ||
| class topic_manifest_upload_manager { |
There was a problem hiding this comment.
can this be generalized to both TS and CT?
There was a problem hiding this comment.
having two places that upload topic manifests adds complexity
There was a problem hiding this comment.
Yes, this generalizes to TS as well. I think longer term we can probably use this code for tiered storage too, but I would prefer to not block CT changes on changes to TS
| ss::future<> topic_manifest_upload_manager::reset_or_signal_loop( | ||
| model::topic_id_partition tidp, | ||
| ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>> partition) { | ||
| auto it = _loops.find(tidp); |
There was a problem hiding this comment.
The thing missing here is update on topic configuration change. In the TS this is a completely separate code path triggered by the cluster::partition. When the topic config changes it triggers the restart of the ntp_archiver which in turn triggers the reupload of the topic manifest. This service is not doing this part.
There was a problem hiding this comment.
Note that this is called from on_leadership_or_properties_change, which is triggered when there is a topic properties change
|
Chatted a bit with Evgeny about this and decided that to avoid the dependency on cloud_storage, i'll add a new topic_manifest_io that doesn't depend on cloud_storage::remote |
ecee06f to
cddae3d
Compare
|
Force pushes (compare):
|
cddae3d to
638f079
Compare
|
Ah i need to rebase to get some changes to ssx::actor |
da2e579 to
8034b6e
Compare
I'll have an upcoming change that will upload topic manifests outside of cloud_storage. To that end, this pulls out the path provider utilities into their own bazel library.
The underlying serializable state for topic manifests (for topics from versions of Redpanda that serialize topic manifests with serde) can be pulled out and reused by others who want to upload topic manifests (e.g. cloud topics), without having to depend on all the base_manifest and legacy parsing code in topic_manifest.cc. This commit pulls this out into its own tiny header.
I intend on adding a loop for uploading topic manifests for cloud topics. For tiered storage, this currently happens in the NTP archiver. This commit duplicates some logic that is in the archiver for uploading into its own little library. I'm opting to not touch the NTP archiver, just since I don't want changes to cloud_topics to depend on changes to cloud_storage and vice versa.
This adds an upload loop for topic manifests that runs only on cloud topics' partitions 0. This functionality matches what is in the tiered storage NTP archiver, which does the same thing in its main upload loop on partition 0. Note that there isn't really anything cloud topics specific in this loop -- we could arguably use this same loop for tiered storage too. That is left as future work.
Test for the recently added topic manifest uploader, that uploads a topic manifest from partition 0 when there are leadership transfers or topic property changes.
Review feedback. Reduces complexity a bit.
8034b6e to
0bbcc9a
Compare
This adds a topic manifest uploader for topic manifests. The topic manifest uploads follow cloud topic partition 0 leadership, and happen when there is a topic properties change. This follows the behavior of tiered storage topic manifest uploads, which happen in the NTP archiver of partition 0.
While they won't be used for WCR, topic manifests will be used for read replicas and topic mounting.
I wanted to reuse a good amount of code in tiered storage for topic manifests uploads and download. To that end, a good chunk of this PR is just pulling cloud_storage things into their own bazel libraries.
Backports Required
Release Notes