From 26dbeeb64bba7b1bffb5fcd7df7b2d6fc3ce7955 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 22 Jan 2025 13:02:08 -0500 Subject: [PATCH 1/5] Bound the number of concurrent warm requests --- src/storage-controller/src/lib.rs | 35 +++++++++++++++++-------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index b4efeca3bee51..0088b82ee4ff2 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -21,7 +21,6 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use derivative::Derivative; use differential_dataflow::lattice::Lattice; -use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use itertools::Itertools; @@ -238,23 +237,27 @@ fn warm_persist_state_in_background( client: PersistClient, shard_ids: impl Iterator + Send + 'static, ) -> mz_ore::task::JoinHandle> { + /// Bound the number of shards that we warm at a single time, to limit our overall resource use. + const MAX_CONCURRENT_WARMS: usize = 16; let logic = async move { - let fetchers = FuturesUnordered::new(); - for shard_id in shard_ids { - let client = client.clone(); - fetchers.push(async move { - client - .create_batch_fetcher::( - shard_id, - Arc::new(RelationDesc::empty()), - Arc::new(UnitSchema), - true, - Diagnostics::from_purpose("warm persist load state"), - ) - .await + let fetchers: Vec<_> = tokio_stream::iter(shard_ids) + .map(|shard_id| { + let client = client.clone(); + async move { + client + .create_batch_fetcher::( + shard_id, + Arc::new(RelationDesc::empty()), + Arc::new(UnitSchema), + true, + Diagnostics::from_purpose("warm persist load state"), + ) + .await + } }) - } - let fetchers = fetchers.collect::>().await; + .buffer_unordered(MAX_CONCURRENT_WARMS) + .collect() + .await; let fetchers: Box = Box::new(fetchers); fetchers }; From 04c90faf7d78453c6a7eed8ebdc212042b356e22 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 22 Jan 2025 13:02:42 -0500 Subject: [PATCH 2/5] Typos and minor cleanups --- src/storage-client/src/storage_collections.rs | 2 +- src/storage-controller/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 2511a4f4e35d5..053b65c44ee0b 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -1545,7 +1545,7 @@ where snapshot .into_iter() .map(|(row, diff)| { - assert!(diff == 1, "snapshot doesn't accumulate to set"); + assert_eq!(diff, 1, "snapshot doesn't accumulate to set"); row }) .collect() diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0088b82ee4ff2..14729d5e7c644 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1489,7 +1489,7 @@ where }, }; - // Fetch the client for this exports's cluster. + // Fetch the client for this export's cluster. let instance = self .instances .get_mut(&new_description.instance_id) From af7048a14b5beb973592369f998d300f5405dedb Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 22 Jan 2025 13:40:17 -0500 Subject: [PATCH 3/5] Remove unused per-sink status collection id Status is no longer reported by the sink itself, so the id is no longer needed at the protocol level. --- src/adapter/src/coord/ddl.rs | 6 ----- src/adapter/src/coord/sequencer/inner.rs | 6 ----- src/storage-controller/src/history.rs | 1 - src/storage-controller/src/lib.rs | 29 ------------------------ src/storage-types/src/sinks.proto | 3 +-- src/storage-types/src/sinks.rs | 8 ------- 6 files changed, 1 insertion(+), 52 deletions(-) diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 6e35aaf5fb2dd..5a285902678eb 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1289,11 +1289,6 @@ impl Coordinator { // Validate `sink.from` is in fact a storage collection self.controller.storage.check_exists(sink.from)?; - let status_id = self - .catalog() - .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SINK_STATUS_HISTORY); - let status_id = Some(self.catalog().get_entry(&status_id).latest_global_id()); - // The AsOf is used to determine at what time to snapshot reading from // the persist collection. This is primarily relevant when we do _not_ // want to include the snapshot in the sink. @@ -1334,7 +1329,6 @@ impl Coordinator { as_of, with_snapshot: sink.with_snapshot, version: sink.version, - status_id, from_storage_metadata: (), }; diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 4c134b3caf1da..113cf32f1ef99 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3559,11 +3559,6 @@ impl Coordinator { } } - let status_id = self - .catalog() - .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SINK_STATUS_HISTORY); - let status_id = Some(self.catalog().get_entry(&status_id).latest_global_id()); - let from_entry = self.catalog().get_entry_by_global_id(&sink.from); let storage_sink_desc = StorageSinkDesc { from: sink.from, @@ -3580,7 +3575,6 @@ impl Coordinator { with_snapshot, version: sink.version, partition_strategy: sink.partition_strategy, - status_id, from_storage_metadata: (), }; diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index e154daea283a1..607296172d775 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -390,7 +390,6 @@ mod tests { version: Default::default(), envelope: SinkEnvelope::Upsert, as_of: Antichain::from_elem(0), - status_id: Default::default(), from_storage_metadata: CollectionMetadata { persist_location: PersistLocation { blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 14729d5e7c644..7263d4c3ffcab 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1468,11 +1468,6 @@ where }; *cur_export = new_export; - let status_id = match new_description.sink.status_id.clone() { - Some(id) => Some(self.storage_collections.collection_metadata(id)?.data_shard), - None => None, - }; - let cmd = RunSinkCommand { id, description: StorageSinkDesc { @@ -1483,7 +1478,6 @@ where as_of: new_description.sink.as_of, version: new_description.sink.version, partition_strategy: new_description.sink.partition_strategy, - status_id, from_storage_metadata, with_snapshot: new_description.sink.with_snapshot, }, @@ -1536,17 +1530,6 @@ where .storage_collections .collection_metadata(new_export_description.sink.from)?; - let status_id = - if let Some(status_collection_id) = new_export_description.sink.status_id { - Some( - self.storage_collections - .collection_metadata(status_collection_id)? - .data_shard, - ) - } else { - None - }; - let cmd = RunSinkCommand { id, description: StorageSinkDesc { @@ -1568,7 +1551,6 @@ where // read holds are held for the correct amount of time. // TODO(petrosagg): change the controller to explicitly track dataflow executions as_of: as_of.to_owned(), - status_id, from_storage_metadata, }, }; @@ -3176,16 +3158,6 @@ where .storage_collections .collection_metadata(description.sink.from)?; - let status_id = if let Some(status_collection_id) = description.sink.status_id { - Some( - self.storage_collections - .collection_metadata(status_collection_id)? - .data_shard, - ) - } else { - None - }; - let cmd = RunSinkCommand { id, description: StorageSinkDesc { @@ -3196,7 +3168,6 @@ where as_of: description.sink.as_of.clone(), version: description.sink.version, partition_strategy: description.sink.partition_strategy.clone(), - status_id, from_storage_metadata, with_snapshot: description.sink.with_snapshot, }, diff --git a/src/storage-types/src/sinks.proto b/src/storage-types/src/sinks.proto index a943f9a739f66..236c67d8a8256 100644 --- a/src/storage-types/src/sinks.proto +++ b/src/storage-types/src/sinks.proto @@ -23,13 +23,12 @@ import "storage-types/src/connections.proto"; import "storage-types/src/controller.proto"; message ProtoStorageSinkDesc { - reserved 5, 8, 9, 10; + reserved 5, 7, 8, 9, 10; mz_repr.global_id.ProtoGlobalId from = 1; mz_repr.relation_and_scalar.ProtoRelationDesc from_desc = 2; ProtoStorageSinkConnection connection = 3; optional ProtoSinkEnvelope envelope = 4; optional mz_storage_types.controller.ProtoCollectionMetadata from_storage_metadata = 6; - optional string status_id = 7; mz_repr.antichain.ProtoU64Antichain as_of = 11; bool with_snapshot = 12; uint64 version = 13; diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index 7f7c613361e55..d64423a325346 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -49,7 +49,6 @@ pub struct StorageSinkDesc pub version: u64, pub envelope: SinkEnvelope, pub as_of: Antichain, - pub status_id: Option<::StatusId>, pub from_storage_metadata: ::StorageMetadata, } @@ -79,7 +78,6 @@ impl { any::(), any::(), any::>(), - any::>(), any::(), any::(), any::(), @@ -164,7 +160,6 @@ impl Arbitrary for StorageSinkDesc { connection, envelope, as_of, - status_id, from_storage_metadata, partition_strategy, with_snapshot, @@ -177,7 +172,6 @@ impl Arbitrary for StorageSinkDesc { envelope, version, as_of: Antichain::from_iter(as_of), - status_id, from_storage_metadata, partition_strategy, with_snapshot, @@ -196,7 +190,6 @@ impl RustType for StorageSinkDesc for StorageSinkDesc Date: Wed, 22 Jan 2025 13:46:30 -0500 Subject: [PATCH 4/5] Remove a now-unused trait --- src/storage-client/src/client.rs | 6 ++--- src/storage-client/src/controller.rs | 4 ++-- src/storage-controller/src/history.rs | 6 ++--- src/storage-types/src/sinks.rs | 32 +++++---------------------- src/storage/src/internal_control.rs | 4 ++-- src/storage/src/render.rs | 4 ++-- src/storage/src/render/sinks.rs | 9 ++++---- src/storage/src/sink/kafka.rs | 8 +++---- src/storage/src/storage_state.rs | 4 ++-- 9 files changed, 29 insertions(+), 48 deletions(-) diff --git a/src/storage-client/src/client.rs b/src/storage-client/src/client.rs index f653269d20df0..27818e6c968ec 100644 --- a/src/storage-client/src/client.rs +++ b/src/storage-client/src/client.rs @@ -30,7 +30,7 @@ use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::oneshot_sources::OneshotIngestionRequest; use mz_storage_types::parameters::StorageParameters; -use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; +use mz_storage_types::sinks::StorageSinkDesc; use mz_storage_types::sources::IngestionDescription; use mz_timely_util::progress::any_antichain; use proptest::prelude::{any, Arbitrary}; @@ -260,7 +260,7 @@ impl RustType for RunSinkCommand { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct RunSinkCommand { pub id: GlobalId, - pub description: StorageSinkDesc, + pub description: StorageSinkDesc, } impl Arbitrary for RunSinkCommand { @@ -270,7 +270,7 @@ impl Arbitrary for RunSinkCommand { fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { ( any::(), - any::>(), + any::>(), ) .prop_map(|(id, description)| Self { id, description }) .boxed() diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 25e28ce8b11d1..d8970294310cf 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -41,7 +41,7 @@ use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCa use mz_storage_types::parameters::StorageParameters; use mz_storage_types::read_holds::ReadHold; use mz_storage_types::read_policy::ReadPolicy; -use mz_storage_types::sinks::{MetadataUnfilled, StorageSinkConnection, StorageSinkDesc}; +use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc}; use mz_storage_types::sources::{ GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline, @@ -157,7 +157,7 @@ impl CollectionDescription { #[derive(Clone, Debug, Eq, PartialEq)] pub struct ExportDescription { - pub sink: StorageSinkDesc, + pub sink: StorageSinkDesc<(), T>, /// The ID of the instance in which to install the export. pub instance_id: StorageInstanceId, } diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 607296172d775..2e5b3a143fef7 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -238,8 +238,8 @@ mod tests { use mz_storage_types::instances::StorageInstanceId; use mz_storage_types::sinks::{ KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat, - KafkaSinkFormatType, MetadataFilled, SinkEnvelope, SinkPartitionStrategy, - StorageSinkConnection, StorageSinkDesc, + KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection, + StorageSinkDesc, }; use mz_storage_types::sources::load_generator::{ LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails, @@ -342,7 +342,7 @@ mod tests { } } - fn sink_description() -> StorageSinkDesc { + fn sink_description() -> StorageSinkDesc { StorageSinkDesc { from: GlobalId::System(1), from_desc: RelationDesc::new( diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index d64423a325346..b66209f4dc1cc 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -15,7 +15,6 @@ use std::time::Duration; use mz_dyncfg::ConfigSet; use mz_expr::MirScalarExpr; -use mz_persist_types::ShardId; use mz_pgcopy::CopyFormatParams; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::bytes::ByteSize; @@ -40,7 +39,7 @@ pub mod s3_oneshot_sink; /// A sink for updates to a relational collection. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct StorageSinkDesc { +pub struct StorageSinkDesc { pub from: GlobalId, pub from_desc: RelationDesc, pub connection: StorageSinkConnection, @@ -49,11 +48,11 @@ pub struct StorageSinkDesc pub version: u64, pub envelope: SinkEnvelope, pub as_of: Antichain, - pub from_storage_metadata: ::StorageMetadata, + pub from_storage_metadata: S, } -impl - AlterCompatible for StorageSinkDesc +impl AlterCompatible + for StorageSinkDesc { /// Determines if `self` is compatible with another `StorageSinkDesc`, in /// such a way that it is possible to turn `self` into `other` through a @@ -118,26 +117,7 @@ impl Deserialize<'a> + Eq + PartialEq; - type StorageMetadata: Debug + Clone + Serialize + for<'a> Deserialize<'a> + Eq + PartialEq; -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct MetadataUnfilled; -impl StorageSinkDescFillState for MetadataUnfilled { - type StatusId = GlobalId; - type StorageMetadata = (); -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct MetadataFilled; -impl StorageSinkDescFillState for MetadataFilled { - type StatusId = ShardId; - type StorageMetadata = CollectionMetadata; -} - -impl Arbitrary for StorageSinkDesc { +impl Arbitrary for StorageSinkDesc { type Strategy = BoxedStrategy; type Parameters = (); @@ -182,7 +162,7 @@ impl Arbitrary for StorageSinkDesc { } } -impl RustType for StorageSinkDesc { +impl RustType for StorageSinkDesc { fn into_proto(&self) -> ProtoStorageSinkDesc { ProtoStorageSinkDesc { connection: Some(self.connection.into_proto()), diff --git a/src/storage/src/internal_control.rs b/src/storage/src/internal_control.rs index b4b0fcbe662a0..8599017081ab7 100644 --- a/src/storage/src/internal_control.rs +++ b/src/storage/src/internal_control.rs @@ -14,7 +14,7 @@ use mz_rocksdb::config::SharedWriteBufferManager; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::oneshot_sources::OneshotIngestionRequest; use mz_storage_types::parameters::StorageParameters; -use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; +use mz_storage_types::sinks::StorageSinkDesc; use mz_storage_types::sources::IngestionDescription; use serde::{Deserialize, Serialize}; use timely::communication::Allocate; @@ -98,7 +98,7 @@ pub enum InternalStorageCommand { /// Render a sink dataflow. RunSinkDataflow( GlobalId, - StorageSinkDesc, + StorageSinkDesc, ), /// Drop all state and operators for a dataflow. This is a vec because some /// dataflows have their state spread over multiple IDs (i.e. sources that diff --git a/src/storage/src/render.rs b/src/storage/src/render.rs index d936fb6dda442..0b4e54fc6e184 100644 --- a/src/storage/src/render.rs +++ b/src/storage/src/render.rs @@ -206,7 +206,7 @@ use mz_repr::{GlobalId, Row}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs; use mz_storage_types::oneshot_sources::{OneshotIngestionDescription, OneshotIngestionRequest}; -use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; +use mz_storage_types::sinks::StorageSinkDesc; use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceConnection}; use mz_timely_util::antichain::AntichainExt; use timely::communication::Allocate; @@ -425,7 +425,7 @@ pub fn build_export_dataflow( timely_worker: &mut TimelyWorker, storage_state: &mut StorageState, id: GlobalId, - description: StorageSinkDesc, + description: StorageSinkDesc, ) { let worker_logging = timely_worker.log_register().get("timely").map(Into::into); let debug_name = id.to_string(); diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index f5351b6fdfd55..bd0e399b09c63 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -23,8 +23,9 @@ use mz_interchange::envelopes::combine_at_timestamp; use mz_persist_client::operators::shard_source::SnapshotMode; use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp}; use mz_storage_operators::persist_source; +use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; -use mz_storage_types::sinks::{MetadataFilled, StorageSinkConnection, StorageSinkDesc}; +use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc}; use mz_timely_util::builder_async::PressOnDropButton; use timely::dataflow::operators::Leave; use timely::dataflow::scopes::Child; @@ -42,7 +43,7 @@ pub(crate) fn render_sink<'g, G: Scope>( scope: &mut Child<'g, G, mz_repr::Timestamp>, storage_state: &mut StorageState, sink_id: GlobalId, - sink: &StorageSinkDesc, + sink: &StorageSinkDesc, ) -> (Stream, Vec) { let sink_render = get_sink_render_for(&sink.connection); @@ -104,7 +105,7 @@ pub(crate) fn render_sink<'g, G: Scope>( /// `DiffPair`s. fn zip_into_diff_pairs( sink_id: GlobalId, - sink: &StorageSinkDesc, + sink: &StorageSinkDesc, sink_render: &dyn SinkRender, collection: Collection, ) -> Collection, DiffPair), Diff> @@ -211,7 +212,7 @@ where fn render_sink( &self, storage_state: &mut StorageState, - sink: &StorageSinkDesc, + sink: &StorageSinkDesc, sink_id: GlobalId, sinked_collection: Collection, DiffPair), Diff>, err_collection: Collection, diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index 74c6ebd6822bb..8da4b73218b00 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -101,11 +101,11 @@ use mz_ore::vec::VecExt; use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp}; use mz_storage_client::sink::progress_key::ProgressKey; use mz_storage_types::configuration::StorageConfiguration; +use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs::KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS; use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError}; use mz_storage_types::sinks::{ - KafkaSinkConnection, KafkaSinkFormatType, MetadataFilled, SinkEnvelope, SinkPartitionStrategy, - StorageSinkDesc, + KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, SinkPartitionStrategy, StorageSinkDesc, }; use mz_timely_util::antichain::AntichainExt; use mz_timely_util::builder_async::{ @@ -147,7 +147,7 @@ impl> SinkRender for KafkaSinkConnection { fn render_sink( &self, storage_state: &mut StorageState, - sink: &StorageSinkDesc, + sink: &StorageSinkDesc, sink_id: GlobalId, input: Collection, DiffPair), Diff>, // TODO(benesch): errors should stream out through the sink, @@ -615,7 +615,7 @@ fn sink_collection>( connection: KafkaSinkConnection, partition_strategy: SinkPartitionStrategy, storage_configuration: StorageConfiguration, - sink: &StorageSinkDesc, + sink: &StorageSinkDesc, metrics: KafkaSinkMetrics, statistics: SinkStatistics, write_frontier: Rc>>, diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 114544f22a02a..879a83320144b 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -97,7 +97,7 @@ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::oneshot_sources::OneshotIngestionDescription; -use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; +use mz_storage_types::sinks::StorageSinkDesc; use mz_storage_types::sources::IngestionDescription; use mz_storage_types::AlterCompatible; use mz_timely_util::builder_async::PressOnDropButton; @@ -276,7 +276,7 @@ pub struct StorageState { /// Descriptions of each installed ingestion. pub ingestions: BTreeMap>, /// Descriptions of each installed export. - pub exports: BTreeMap>, + pub exports: BTreeMap>, /// Descriptions of oneshot ingestions that are currently running. pub oneshot_ingestions: BTreeMap>, /// Undocumented From a6179d79be0389a671826371b0cf3f628a6c11a6 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 22 Jan 2025 17:33:29 -0500 Subject: [PATCH 5/5] Remove unused StorageCollections method It's never called. --- src/storage-client/src/storage_collections.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 053b65c44ee0b..9f0460965fc14 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -228,22 +228,6 @@ pub trait StorageCollections: Debug { /// but hold off on that initially.) Callers must provide a Some if any of /// the collections is a table. A None may be given if none of the /// collections are a table (i.e. all materialized views, sources, etc). - async fn create_collections( - &self, - storage_metadata: &StorageMetadata, - register_ts: Option, - collections: Vec<(GlobalId, CollectionDescription)>, - ) -> Result<(), StorageError> { - self.create_collections_for_bootstrap( - storage_metadata, - register_ts, - collections, - &BTreeSet::new(), - ) - .await - } - - /// Like [`Self::create_collections`], except used specifically for bootstrap. /// /// `migrated_storage_collections` is a set of migrated storage collections to be excluded /// from the txn-wal sub-system. @@ -467,7 +451,7 @@ where /// reconcile it with the previous state using /// [StorageCollections::initialize_state], /// [StorageCollections::prepare_state], and - /// [StorageCollections::create_collections]. + /// [StorageCollections::create_collections_for_bootstrap]. pub async fn new( persist_location: PersistLocation, persist_clients: Arc,