Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] Minor controller cleanups and refactoring #31149

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,11 +1289,6 @@ impl Coordinator {
// Validate `sink.from` is in fact a storage collection
self.controller.storage.check_exists(sink.from)?;

let status_id = self
.catalog()
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SINK_STATUS_HISTORY);
let status_id = Some(self.catalog().get_entry(&status_id).latest_global_id());

// The AsOf is used to determine at what time to snapshot reading from
// the persist collection. This is primarily relevant when we do _not_
// want to include the snapshot in the sink.
Expand Down Expand Up @@ -1334,7 +1329,6 @@ impl Coordinator {
as_of,
with_snapshot: sink.with_snapshot,
version: sink.version,
status_id,
from_storage_metadata: (),
};

Expand Down
6 changes: 0 additions & 6 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3559,11 +3559,6 @@ impl Coordinator {
}
}

let status_id = self
.catalog()
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SINK_STATUS_HISTORY);
let status_id = Some(self.catalog().get_entry(&status_id).latest_global_id());

let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let storage_sink_desc = StorageSinkDesc {
from: sink.from,
Expand All @@ -3580,7 +3575,6 @@ impl Coordinator {
with_snapshot,
version: sink.version,
partition_strategy: sink.partition_strategy,
status_id,
from_storage_metadata: (),
};

Expand Down
6 changes: 3 additions & 3 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::IngestionDescription;
use mz_timely_util::progress::any_antichain;
use proptest::prelude::{any, Arbitrary};
Expand Down Expand Up @@ -260,7 +260,7 @@ impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunSinkCommand<T> {
pub id: GlobalId,
pub description: StorageSinkDesc<MetadataFilled, T>,
pub description: StorageSinkDesc<CollectionMetadata, T>,
}

impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
Expand All @@ -270,7 +270,7 @@ impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<GlobalId>(),
any::<StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>(),
any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
)
.prop_map(|(id, description)| Self { id, description })
.boxed()
Expand Down
4 changes: 2 additions & 2 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCa
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::read_holds::ReadHold;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sinks::{MetadataUnfilled, StorageSinkConnection, StorageSinkDesc};
use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
SourceExportDetails, Timeline,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl<T> CollectionDescription<T> {

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ExportDescription<T = mz_repr::Timestamp> {
pub sink: StorageSinkDesc<MetadataUnfilled, T>,
pub sink: StorageSinkDesc<(), T>,
/// The ID of the instance in which to install the export.
pub instance_id: StorageInstanceId,
}
Expand Down
20 changes: 2 additions & 18 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,6 @@ pub trait StorageCollections: Debug {
/// but hold off on that initially.) Callers must provide a Some if any of
/// the collections is a table. A None may be given if none of the
/// collections are a table (i.e. all materialized views, sources, etc).
async fn create_collections(
&self,
storage_metadata: &StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Result<(), StorageError<Self::Timestamp>> {
self.create_collections_for_bootstrap(
storage_metadata,
register_ts,
collections,
&BTreeSet::new(),
)
.await
}

/// Like [`Self::create_collections`], except used specifically for bootstrap.
///
/// `migrated_storage_collections` is a set of migrated storage collections to be excluded
/// from the txn-wal sub-system.
Expand Down Expand Up @@ -467,7 +451,7 @@ where
/// reconcile it with the previous state using
/// [StorageCollections::initialize_state],
/// [StorageCollections::prepare_state], and
/// [StorageCollections::create_collections].
/// [StorageCollections::create_collections_for_bootstrap].
pub async fn new(
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
Expand Down Expand Up @@ -1545,7 +1529,7 @@ where
snapshot
.into_iter()
.map(|(row, diff)| {
assert!(diff == 1, "snapshot doesn't accumulate to set");
assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
row
})
.collect()
Expand Down
7 changes: 3 additions & 4 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ mod tests {
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::sinks::{
KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
KafkaSinkFormatType, MetadataFilled, SinkEnvelope, SinkPartitionStrategy,
StorageSinkConnection, StorageSinkDesc,
KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection,
StorageSinkDesc,
};
use mz_storage_types::sources::load_generator::{
LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
Expand Down Expand Up @@ -342,7 +342,7 @@ mod tests {
}
}

fn sink_description() -> StorageSinkDesc<MetadataFilled, u64> {
fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
StorageSinkDesc {
from: GlobalId::System(1),
from_desc: RelationDesc::new(
Expand Down Expand Up @@ -390,7 +390,6 @@ mod tests {
version: Default::default(),
envelope: SinkEnvelope::Upsert,
as_of: Antichain::from_elem(0),
status_id: Default::default(),
from_storage_metadata: CollectionMetadata {
persist_location: PersistLocation {
blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
Expand Down
66 changes: 20 additions & 46 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use derivative::Derivative;
use differential_dataflow::lattice::Lattice;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -238,23 +237,27 @@ fn warm_persist_state_in_background(
client: PersistClient,
shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
/// Bound the number of shards that we warm at a single time, to limit our overall resource use.
const MAX_CONCURRENT_WARMS: usize = 16;
let logic = async move {
let fetchers = FuturesUnordered::new();
for shard_id in shard_ids {
let client = client.clone();
fetchers.push(async move {
client
.create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, Diff>(
shard_id,
Arc::new(RelationDesc::empty()),
Arc::new(UnitSchema),
true,
Diagnostics::from_purpose("warm persist load state"),
)
.await
let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
.map(|shard_id| {
let client = client.clone();
async move {
client
.create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, Diff>(
shard_id,
Arc::new(RelationDesc::empty()),
Arc::new(UnitSchema),
true,
Diagnostics::from_purpose("warm persist load state"),
)
.await
}
})
}
let fetchers = fetchers.collect::<Vec<_>>().await;
.buffer_unordered(MAX_CONCURRENT_WARMS)
.collect()
.await;
let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
fetchers
};
Expand Down Expand Up @@ -1465,11 +1468,6 @@ where
};
*cur_export = new_export;

let status_id = match new_description.sink.status_id.clone() {
Some(id) => Some(self.storage_collections.collection_metadata(id)?.data_shard),
None => None,
};

let cmd = RunSinkCommand {
id,
description: StorageSinkDesc {
Expand All @@ -1480,13 +1478,12 @@ where
as_of: new_description.sink.as_of,
version: new_description.sink.version,
partition_strategy: new_description.sink.partition_strategy,
status_id,
from_storage_metadata,
with_snapshot: new_description.sink.with_snapshot,
},
};

// Fetch the client for this exports's cluster.
// Fetch the client for this export's cluster.
let instance = self
.instances
.get_mut(&new_description.instance_id)
Expand Down Expand Up @@ -1533,17 +1530,6 @@ where
.storage_collections
.collection_metadata(new_export_description.sink.from)?;

let status_id =
if let Some(status_collection_id) = new_export_description.sink.status_id {
Some(
self.storage_collections
.collection_metadata(status_collection_id)?
.data_shard,
)
} else {
None
};

let cmd = RunSinkCommand {
id,
description: StorageSinkDesc {
Expand All @@ -1565,7 +1551,6 @@ where
// read holds are held for the correct amount of time.
// TODO(petrosagg): change the controller to explicitly track dataflow executions
as_of: as_of.to_owned(),
status_id,
from_storage_metadata,
},
};
Expand Down Expand Up @@ -3173,16 +3158,6 @@ where
.storage_collections
.collection_metadata(description.sink.from)?;

let status_id = if let Some(status_collection_id) = description.sink.status_id {
Some(
self.storage_collections
.collection_metadata(status_collection_id)?
.data_shard,
)
} else {
None
};

let cmd = RunSinkCommand {
id,
description: StorageSinkDesc {
Expand All @@ -3193,7 +3168,6 @@ where
as_of: description.sink.as_of.clone(),
version: description.sink.version,
partition_strategy: description.sink.partition_strategy.clone(),
status_id,
from_storage_metadata,
with_snapshot: description.sink.with_snapshot,
},
Expand Down
3 changes: 1 addition & 2 deletions src/storage-types/src/sinks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import "storage-types/src/connections.proto";
import "storage-types/src/controller.proto";

message ProtoStorageSinkDesc {
reserved 5, 8, 9, 10;
reserved 5, 7, 8, 9, 10;
mz_repr.global_id.ProtoGlobalId from = 1;
mz_repr.relation_and_scalar.ProtoRelationDesc from_desc = 2;
ProtoStorageSinkConnection connection = 3;
optional ProtoSinkEnvelope envelope = 4;
optional mz_storage_types.controller.ProtoCollectionMetadata from_storage_metadata = 6;
optional string status_id = 7;
mz_repr.antichain.ProtoU64Antichain as_of = 11;
bool with_snapshot = 12;
uint64 version = 13;
Expand Down
Loading
Loading