Skip to content

Commit

Permalink
use uuid for oneshot ingestion dataflows instead of GlobalId
Browse files Browse the repository at this point in the history
  • Loading branch information
ParkMyCar committed Jan 17, 2025
1 parent 95c5ec4 commit 51fad2c
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use mz_storage_client::client::TableData;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use smallvec::SmallVec;
use url::Url;
use uuid::Uuid;

use crate::coord::sequencer::inner::return_if_err;
use crate::coord::{Coordinator, TargetCluster};
Expand Down Expand Up @@ -76,8 +77,9 @@ impl Coordinator {
return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
};

// Generate a unique UUID for our ingestion.
let ingestion_id = Uuid::new_v4();
let collection_id = dest_table.global_id_writes();
let (_, ingestion_id) = self.transient_id_gen.allocate_id();
let request = OneshotIngestionRequest {
source: mz_storage_types::oneshot_sources::ContentSource::Http { url },
format: mz_storage_types::oneshot_sources::ContentFormat::Csv,
Expand Down
1 change: 1 addition & 0 deletions src/storage-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tokio = { version = "1.38.0", features = [
tokio-stream = "0.1.11"
tonic = "0.12.1"
tracing = "0.1.37"
uuid = "1.2.2"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[build-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ message ProtoRunIngestionCommand {
}

message ProtoRunOneshotIngestionCommand {
mz_repr.global_id.ProtoGlobalId ingestion_id = 1;
mz_proto.ProtoU128 ingestion_id = 1;
mz_repr.global_id.ProtoGlobalId collection_id = 2;
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3;
mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4;
Expand Down Expand Up @@ -141,7 +141,7 @@ message ProtoStorageResponse {
}

message Inner {
mz_repr.global_id.ProtoGlobalId id = 1;
mz_proto.ProtoU128 id = 1;
repeated BatchResult batches = 2;
}

Expand Down
7 changes: 4 additions & 3 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunOneshotIngestionCommand {
/// The ID of the ingestion dataflow.
pub ingestion_id: GlobalId,
pub ingestion_id: uuid::Uuid,
/// The ID of collection we'll stage batches for.
pub collection_id: GlobalId,
/// Metadata for the collection we'll stage batches for.
Expand Down Expand Up @@ -604,7 +604,7 @@ pub enum StorageResponse<T = mz_repr::Timestamp> {
/// Punctuation indicates that no more responses will be transmitted for the specified ids
DroppedIds(BTreeSet<GlobalId>),
/// Batches that have been staged in Persist and maybe will be linked into a shard.
StagedBatches(BTreeMap<GlobalId, Vec<Result<ProtoBatch, String>>>),
StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),

/// A list of statistics updates, currently only for sources.
StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
Expand Down Expand Up @@ -755,7 +755,8 @@ pub struct PartitionedStorageState<T> {
/// individual partition.
uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
/// Staged batches from oneshot sources that will get appended by `environmentd`.
oneshot_source_responses: BTreeMap<GlobalId, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
oneshot_source_responses:
BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
}

impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
Expand Down
2 changes: 1 addition & 1 deletion src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ pub trait StorageController: Debug {
/// Create a oneshot ingestion.
async fn create_oneshot_ingestion(
&mut self,
ingestion_id: GlobalId,
ingestion_id: uuid::Uuid,
collection_id: GlobalId,
instance_id: StorageInstanceId,
request: OneshotIngestionRequest,
Expand Down
1 change: 1 addition & 0 deletions src/storage-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time
tokio-postgres = { version = "0.7.8", features = ["serde"] }
tokio-stream = "0.1.11"
tracing = "0.1.37"
uuid = "1.2.2"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[package.metadata.cargo-udeps.ignore]
Expand Down
10 changes: 4 additions & 6 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + Tim
pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
/// Closures that can be used to send responses from oneshot ingestions.
#[derivative(Debug = "ignore")]
pending_oneshot_ingestions: BTreeMap<GlobalId, OneshotResultCallback<ProtoBatch>>,
pending_oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotResultCallback<ProtoBatch>>,

/// Interface for managed collections
pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
Expand Down Expand Up @@ -1387,7 +1387,7 @@ where
/// Create a oneshot ingestion.
async fn create_oneshot_ingestion(
&mut self,
ingestion_id: GlobalId,
ingestion_id: uuid::Uuid,
collection_id: GlobalId,
instance_id: StorageInstanceId,
request: OneshotIngestionRequest,
Expand All @@ -1400,10 +1400,8 @@ where
.collection_metadata
.clone();
let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
StorageError::ExportInstanceMissing {
storage_instance_id: instance_id,
export_id: ingestion_id,
}
// TODO(cf2): Refine this error.
StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
})?;
let oneshot_cmd = RunOneshotIngestionCommand {
ingestion_id,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/internal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub enum InternalStorageCommand {
/// batches in Persist, that can later be appended to the shard.
RunOneshotIngestion {
/// ID of the running dataflow that is doing the ingestion.
ingestion_id: GlobalId,
ingestion_id: uuid::Uuid,
/// ID of the collection we'll create batches for.
collection_id: GlobalId,
/// Metadata of the collection we'll create batches for.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ pub fn build_export_dataflow<A: Allocate>(
pub(crate) fn build_oneshot_ingestion_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
storage_state: &mut StorageState,
ingestion_id: GlobalId,
ingestion_id: uuid::Uuid,
collection_id: GlobalId,
collection_meta: CollectionMetadata,
description: OneshotIngestionRequest,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub struct StorageState {
/// Descriptions of each installed export.
pub exports: BTreeMap<GlobalId, StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>,
/// Descriptions of oneshot ingestions that are currently running.
pub oneshot_ingestions: BTreeMap<GlobalId, OneshotIngestionDescription<ProtoBatch>>,
pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
/// Undocumented
pub now: NowFn,
/// Index of the associated timely dataflow worker.
Expand Down

0 comments on commit 51fad2c

Please sign in to comment.