Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions src/v/cloud_storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "topic_manifest_state",
hdrs = [
"topic_manifest_state.h",
],
visibility = [
"//visibility:public",
],
deps = [
"//src/v/cluster:topic_configuration",
"//src/v/model",
"//src/v/serde",
],
)

redpanda_cc_library(
name = "topic_mount_manifest_path",
srcs = [
Expand All @@ -30,6 +45,47 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "topic_path_utils",
srcs = [
"topic_path_utils.cc",
],
hdrs = [
"topic_path_utils.h",
],
implementation_deps = [
"//src/v/hashing:xx",
],
visibility = ["//visibility:public"],
deps = [
":remote_label",
"//src/v/base",
"//src/v/container:chunked_vector",
"//src/v/model",
"@fmt",
"@seastar",
],
)

redpanda_cc_library(
name = "topic_path_provider",
srcs = [
"topic_path_provider.cc",
],
hdrs = [
"topic_path_provider.h",
],
implementation_deps = [
":topic_path_utils",
],
visibility = ["//visibility:public"],
deps = [
":remote_label",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "cloud_storage",
srcs = [
Expand Down Expand Up @@ -72,7 +128,6 @@ redpanda_cc_library(
"topic_manifest_downloader.cc",
"topic_mount_handler.cc",
"topic_mount_manifest.cc",
"topic_path_utils.cc",
"tx_range_manifest.cc",
],
hdrs = [
Expand Down Expand Up @@ -118,10 +173,10 @@ redpanda_cc_library(
"topic_manifest_downloader.h",
"topic_mount_handler.h",
"topic_mount_manifest.h",
"topic_path_utils.h",
"tx_range_manifest.h",
],
implementation_deps = [
":topic_manifest_state",
"//src/v/json",
"@rapidjson",
],
Expand All @@ -131,6 +186,8 @@ redpanda_cc_library(
":remote_label",
":segment_meta_cstore",
":topic_mount_manifest_path",
":topic_path_provider",
":topic_path_utils",
":types",
"//src/v/base",
"//src/v/bytes:iobuf",
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class remote_segment;
class partition_manifest;
class topic_mount_manifest;
class topic_manifest;
class topic_path_provider;
class partition_probe;
class async_manifest_view;

Expand Down
61 changes: 12 additions & 49 deletions src/v/cloud_storage/remote_path_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/topic_mount_manifest.h"
#include "cloud_storage/topic_mount_manifest_path.h"
#include "cloud_storage/topic_path_utils.h"
#include "cloud_storage/types.h"
#include "model/fundamental.h"

Expand All @@ -26,48 +25,21 @@ namespace cloud_storage {
remote_path_provider::remote_path_provider(
std::optional<remote_label> label,
std::optional<model::topic_namespace> topic_namespace_override)
: label_(label)
, _topic_namespace_override(std::move(topic_namespace_override)) {}
: topic_path_provider(std::move(label), std::move(topic_namespace_override)) {
}

remote_path_provider remote_path_provider::copy() const {
remote_path_provider ret(label_, _topic_namespace_override);
remote_path_provider ret(label_, topic_namespace_override_);
return ret;
}

ss::sstring remote_path_provider::topic_manifest_prefix(
const model::topic_namespace& topic) const {
const auto& tp_ns = _topic_namespace_override.value_or(topic);
if (label_.has_value()) {
return labeled_topic_manifest_prefix(*label_, tp_ns);
}
return prefixed_topic_manifest_prefix(tp_ns);
}

ss::sstring remote_path_provider::topic_manifest_path(
const model::topic_namespace& topic, model::initial_revision_id rev) const {
const auto& tp_ns = _topic_namespace_override.value_or(topic);
if (label_.has_value()) {
return labeled_topic_manifest_path(*label_, tp_ns, rev);
}
return prefixed_topic_manifest_bin_path(tp_ns);
}

std::optional<ss::sstring> remote_path_provider::topic_manifest_path_json(
const model::topic_namespace& topic) const {
if (label_.has_value()) {
return std::nullopt;
}
const auto& tp_ns = _topic_namespace_override.value_or(topic);
return prefixed_topic_manifest_json_path(tp_ns);
}

ss::sstring remote_path_provider::partition_manifest_prefix(
const model::ntp& ntp, model::initial_revision_id rev) const {
std::optional<model::ntp> ntp_override;
if (_topic_namespace_override.has_value()) {
if (topic_namespace_override_.has_value()) {
ntp_override = model::ntp(
_topic_namespace_override->ns,
_topic_namespace_override->tp,
topic_namespace_override_->ns,
topic_namespace_override_->tp,
ntp.tp.partition);
}
const auto& maybe_overridden_ntp = ntp_override.value_or(ntp);
Expand Down Expand Up @@ -100,10 +72,10 @@ std::optional<ss::sstring> remote_path_provider::partition_manifest_path_json(
return std::nullopt;
}
std::optional<model::ntp> ntp_override;
if (_topic_namespace_override.has_value()) {
if (topic_namespace_override_.has_value()) {
ntp_override = model::ntp(
_topic_namespace_override->ns,
_topic_namespace_override->tp,
topic_namespace_override_->ns,
topic_namespace_override_->tp,
ntp.tp.partition);
}
const auto& maybe_overridden_ntp = ntp_override.value_or(ntp);
Expand Down Expand Up @@ -136,10 +108,10 @@ ss::sstring remote_path_provider::segment_path(
const auto segment_name = partition_manifest::generate_remote_segment_name(
segment);
std::optional<model::ntp> ntp_override;
if (_topic_namespace_override.has_value()) {
if (topic_namespace_override_.has_value()) {
ntp_override = model::ntp(
_topic_namespace_override->ns,
_topic_namespace_override->tp,
topic_namespace_override_->ns,
topic_namespace_override_->tp,
ntp.tp.partition);
}
const auto& maybe_overridden_ntp = ntp_override.value_or(ntp);
Expand All @@ -161,13 +133,4 @@ ss::sstring remote_path_provider::segment_path(
manifest.get_ntp(), manifest.get_revision_id(), segment);
}

ss::sstring remote_path_provider::topic_lifecycle_marker_path(
const model::topic_namespace& topic, model::initial_revision_id rev) const {
const auto& tp_ns = _topic_namespace_override.value_or(topic);
if (label_.has_value()) {
return labeled_topic_lifecycle_marker_path(*label_, tp_ns, rev);
}
return prefixed_topic_lifecycle_marker_path(tp_ns, rev);
}

} // namespace cloud_storage
26 changes: 4 additions & 22 deletions src/v/cloud_storage/remote_path_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#pragma once

#include "cloud_storage/fwd.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/topic_path_provider.h"
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand All @@ -20,7 +20,9 @@

namespace cloud_storage {

class remote_path_provider {
/// Provides path computation for all cloud storage objects.
/// Inherits topic path methods from topic_path_provider.
class remote_path_provider : public topic_path_provider {
public:
// Discourage accidental copies to encourage referencing of a single path
// provider (e.g. the one owned by the archival STM).
Expand All @@ -44,17 +46,6 @@ class remote_path_provider {
// For use in copy() and in coroutines.
remote_path_provider(remote_path_provider&&) = default;

// Prefix of the topic manifest path. This can be used to filter objects to
// find topic manifests.
ss::sstring
topic_manifest_prefix(const model::topic_namespace& topic) const;

// Topic manifest path.
ss::sstring topic_manifest_path(
const model::topic_namespace& topic, model::initial_revision_id) const;
std::optional<ss::sstring>
topic_manifest_path_json(const model::topic_namespace& topic) const;

// Prefix of the partition manifest path. This can be used to filter
// objects to find partition or spillover manifests.
ss::sstring partition_manifest_prefix(
Expand Down Expand Up @@ -92,15 +83,6 @@ class remote_path_provider {
const model::ntp& ntp,
model::initial_revision_id rev,
const segment_meta& segment) const;

// Topic lifecycle marker path.
ss::sstring topic_lifecycle_marker_path(
const model::topic_namespace& topic,
model::initial_revision_id rev) const;

private:
std::optional<remote_label> label_;
std::optional<model::topic_namespace> _topic_namespace_override;
};

} // namespace cloud_storage
3 changes: 3 additions & 0 deletions src/v/cloud_storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ redpanda_cc_gtest(
deps = [
"//src/v/cloud_storage",
"//src/v/cloud_storage:topic_mount_manifest_path",
"//src/v/cloud_storage:topic_path_utils",
"//src/v/cloud_storage:types",
"//src/v/model",
"//src/v/model/tests:random",
Expand Down Expand Up @@ -303,6 +304,7 @@ redpanda_cc_btest(
"//src/v/bytes:iostream",
"//src/v/bytes:streambuf",
"//src/v/cloud_storage",
"//src/v/cloud_storage:topic_path_utils",
"//src/v/cloud_storage:types",
"//src/v/cluster",
"//src/v/model",
Expand Down Expand Up @@ -509,6 +511,7 @@ redpanda_cc_gtest(
"//src/v/cloud_io/tests:s3_imposter",
"//src/v/cloud_io/tests:scoped_remote",
"//src/v/cloud_storage",
"//src/v/cloud_storage:topic_path_utils",
"//src/v/cloud_storage:types",
"//src/v/cloud_storage_clients",
"//src/v/model",
Expand Down
28 changes: 3 additions & 25 deletions src/v/cloud_storage/topic_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
#include "bytes/iostream.h"
#include "bytes/streambuf.h"
#include "cloud_storage/logger.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/topic_manifest_state.h"
#include "cloud_storage/topic_path_provider.h"
#include "cloud_storage/types.h"
#include "json/encodings.h"
#include "json/istreamwrapper.h"
Expand All @@ -36,29 +37,6 @@
#include <stdexcept>
#include <string_view>

namespace {
// topic manifest state is a serde-friendly representation of
// topic_manifest. it will allow to evolve the manifest without pushing
// fields to topic_properties, if the need arises
struct topic_manifest_state
: public serde::envelope<
topic_manifest_state,
serde::version<0>,
serde::compat_version<0>> {
cluster::topic_configuration cfg;
// note: initial_revision will be used to initialize
// cfg.properties.remote_topic_properties.initial_revision, but keep it
// separate here to mirror the old behavior.

model::initial_revision_id initial_revision;

auto serde_fields() { return std::tie(cfg, initial_revision); }

bool operator==(const topic_manifest_state&) const = default;
};

} // namespace

namespace cloud_storage {

/// JSON parsing handler for topic manifest written by legacy
Expand Down Expand Up @@ -448,7 +426,7 @@ ss::sstring topic_manifest::display_name() const {
}

remote_manifest_path topic_manifest::get_manifest_path(
const remote_path_provider& path_provider) const {
const topic_path_provider& path_provider) const {
vassert(_topic_config, "Topic config is not set");
return remote_manifest_path{
path_provider.topic_manifest_path(_topic_config->tp_ns, _rev)};
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/topic_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cloud_storage/base_manifest.h"
#include "cloud_storage/fwd.h"
#include "cloud_storage/topic_path_provider.h"
#include "cloud_storage/types.h"
#include "cluster/topic_configuration.h"

Expand Down Expand Up @@ -59,7 +60,7 @@ class topic_manifest final : public base_manifest {
ss::future<iobuf> serialize_buf() const override;

/// Manifest object name in S3
remote_manifest_path get_manifest_path(const remote_path_provider&) const;
remote_manifest_path get_manifest_path(const topic_path_provider&) const;

manifest_type get_manifest_type() const override {
return manifest_type::topic;
Expand Down
32 changes: 32 additions & 0 deletions src/v/cloud_storage/topic_manifest_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "cluster/topic_configuration.h"
#include "model/fundamental.h"
#include "serde/envelope.h"

namespace cloud_storage {

// A serde-friendly representation of topic_manifest.
struct topic_manifest_state
: public serde::envelope<
topic_manifest_state,
serde::version<0>,
serde::compat_version<0>> {
auto serde_fields() { return std::tie(cfg, initial_revision); }
bool operator==(const topic_manifest_state&) const = default;

cluster::topic_configuration cfg;
model::initial_revision_id initial_revision;
};

} // namespace cloud_storage
Loading