Skip to content

Commit

Permalink
storage protocol and rending changes
Browse files Browse the repository at this point in the history
* add new StorageCommand::RunOneshotIngestion
* add new StorageReseponse::StagedBatches
* add build_oneshot_ingestion_dataflow function which calls render(...) from the previous commit
  • Loading branch information
ParkMyCar committed Jan 6, 2025
1 parent dd246d5 commit 7667ddf
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/storage-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ filegroup(
"src/client.proto",
"src/statistics.proto",
"//src/cluster-client:all_protos",
"//src/persist-client:all_protos",
"//src/proto:all_protos",
"//src/repr:all_protos",
"//src/storage-types:all_protos",
Expand Down
1 change: 1 addition & 0 deletions src/storage-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rdkafka = { version = "0.29.0", features = [
] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = { version = "1.0.125" }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
static_assertions = "1.1"
timely = "0.15.1"
tokio = { version = "1.38.0", features = [
Expand Down
28 changes: 28 additions & 0 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ package mz_storage_client.client;

import "cluster-client/src/client.proto";
import "google/protobuf/empty.proto";
import "persist-client/src/batch.proto";
import "proto/src/chrono.proto";
import "proto/src/proto.proto";
import "repr/src/antichain.proto";
import "repr/src/global_id.proto";
import "storage-client/src/statistics.proto";
import "storage-types/src/controller.proto";
import "storage-types/src/oneshot_sources.proto";
import "storage-types/src/parameters.proto";
import "storage-types/src/sinks.proto";
import "storage-types/src/sources.proto";
Expand All @@ -45,6 +48,13 @@ message ProtoRunIngestionCommand {
mz_storage_types.sources.ProtoIngestionDescription description = 2;
}

message ProtoRunOneshotIngestionCommand {
mz_repr.global_id.ProtoGlobalId ingestion_id = 1;
mz_repr.global_id.ProtoGlobalId collection_id = 2;
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3;
mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4;
}

message ProtoCreateSources {
repeated ProtoRunIngestionCommand sources = 1;
}
Expand Down Expand Up @@ -84,6 +94,7 @@ message ProtoStorageCommand {
google.protobuf.Empty allow_writes = 7;
ProtoRunSinks run_sinks = 4;
mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5;
ProtoRunOneshotIngestionCommand oneshot_ingestion = 10;
}
}

Expand Down Expand Up @@ -121,10 +132,27 @@ message ProtoStorageResponse {
repeated mz_repr.global_id.ProtoGlobalId ids = 1;
}

message ProtoStagedBatches {
message BatchResult {
oneof value {
mz_persist_client.batch.ProtoBatch batch = 1;
string error = 2;
}
}

message Inner {
mz_repr.global_id.ProtoGlobalId id = 1;
repeated BatchResult batches = 2;
}

repeated Inner batches = 1;
}

oneof kind {
ProtoFrontierUppersKind frontier_uppers = 1;
ProtoDroppedIds dropped_ids = 2;
ProtoStatisticsUpdates stats = 3;
ProtoStatusUpdates status_updates = 4;
ProtoStagedBatches staged_batches = 5;
}
}
149 changes: 146 additions & 3 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ use std::iter;
use async_trait::async_trait;
use differential_dataflow::lattice::Lattice;
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
use mz_ore::assert_none;
use mz_persist_client::batch::ProtoBatch;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Diff, GlobalId, Row};
use mz_service::client::{GenericClient, Partitionable, PartitionedState};
use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sources::IngestionDescription;
use mz_timely_util::progress::any_antichain;
use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Strategy, Union};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use timely::progress::frontier::{Antichain, MutableAntichain};
use timely::PartialOrder;
use tonic::{Request, Status as TonicStatus, Streaming};
Expand Down Expand Up @@ -119,6 +123,7 @@ pub enum StorageCommand<T = mz_repr::Timestamp> {
UpdateConfiguration(StorageParameters),
/// Run the enumerated sources, each associated with its identifier.
RunIngestions(Vec<RunIngestionCommand>),
RunOneshotIngestion(RunOneshotIngestionCommand),
/// Enable compaction in storage-managed collections.
///
/// Each entry in the vector names a collection and provides a frontier after which
Expand All @@ -137,7 +142,7 @@ impl<T> StorageCommand<T> {
| AllowWrites
| UpdateConfiguration(_)
| AllowCompaction(_) => false,
RunIngestions(_) | RunSinks(_) => true,
RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true,
}
}
}
Expand Down Expand Up @@ -184,6 +189,47 @@ impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
}
}

/// A command that starts ingesting the given ingestion description
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RunOneshotIngestionCommand {
/// The ID of the ingestion dataflow.
pub ingestion_id: GlobalId,
/// The ID of collection we'll stage batches for.
pub collection_id: GlobalId,
/// Metadata for the collection we'll stage batches for.
pub collection_meta: CollectionMetadata,
/// Details for the oneshot ingestion.
pub request: OneshotIngestionRequest,
}

impl RustType<ProtoRunOneshotIngestionCommand> for RunOneshotIngestionCommand {
fn into_proto(&self) -> ProtoRunOneshotIngestionCommand {
ProtoRunOneshotIngestionCommand {
ingestion_id: Some(self.ingestion_id.into_proto()),
collection_id: Some(self.collection_id.into_proto()),
storage_metadata: Some(self.collection_meta.into_proto()),
request: Some(self.request.into_proto()),
}
}

fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result<Self, TryFromProtoError> {
Ok(RunOneshotIngestionCommand {
ingestion_id: proto
.ingestion_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?,
collection_id: proto
.collection_id
.into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?,
collection_meta: proto
.storage_metadata
.into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?,
request: proto
.request
.into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?,
})
}
}

impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoRunSinkCommand {
ProtoRunSinkCommand {
Expand Down Expand Up @@ -246,6 +292,9 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
sources: sources.into_proto(),
}),
StorageCommand::RunOneshotIngestion(oneshot) => {
OneshotIngestion(oneshot.into_proto())
}
StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
sinks: sinks.into_proto(),
}),
Expand Down Expand Up @@ -277,6 +326,9 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
Some(RunSinks(ProtoRunSinks { sinks })) => {
Ok(StorageCommand::RunSinks(sinks.into_rust()?))
}
Some(OneshotIngestion(oneshot)) => {
Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?))
}
None => Err(TryFromProtoError::missing_field(
"ProtoStorageCommand::kind",
)),
Expand Down Expand Up @@ -543,6 +595,8 @@ pub enum StorageResponse<T = mz_repr::Timestamp> {
FrontierUppers(Vec<(GlobalId, Antichain<T>)>),
/// Punctuation indicates that no more responses will be transmitted for the specified ids
DroppedIds(BTreeSet<GlobalId>),
/// Batches that have been staged in Persist and maybe will be linked into a shard.
StagedBatches(BTreeMap<GlobalId, Vec<Result<ProtoBatch, String>>>),

/// A list of statistics updates, currently only for sources.
StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
Expand All @@ -554,7 +608,9 @@ pub enum StorageResponse<T = mz_repr::Timestamp> {
impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoStorageResponse {
use proto_storage_response::Kind::*;
use proto_storage_response::{ProtoDroppedIds, ProtoStatisticsUpdates, ProtoStatusUpdates};
use proto_storage_response::{
ProtoDroppedIds, ProtoStagedBatches, ProtoStatisticsUpdates, ProtoStatusUpdates,
};
ProtoStorageResponse {
kind: Some(match self {
StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()),
Expand All @@ -576,6 +632,29 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
StorageResponse::StatusUpdates(updates) => StatusUpdates(ProtoStatusUpdates {
updates: updates.into_proto(),
}),
StorageResponse::StagedBatches(staged) => {
let batches = staged
.into_iter()
.map(|(collection_id, batches)| {
let batches = batches
.into_iter()
.map(|result| {
use proto_storage_response::proto_staged_batches::batch_result::Value;
let value = match result {
Ok(batch) => Value::Batch(batch.clone()),
Err(err) => Value::Error(err.clone()),
};
proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
})
.collect();
proto_storage_response::proto_staged_batches::Inner {
id: Some(collection_id.into_proto()),
batches,
}
})
.collect();
StagedBatches(ProtoStagedBatches { batches })
}
}),
}
}
Expand Down Expand Up @@ -605,6 +684,35 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
Some(StatusUpdates(ProtoStatusUpdates { updates })) => {
Ok(StorageResponse::StatusUpdates(updates.into_rust()?))
}
Some(StagedBatches(staged)) => {
let batches: BTreeMap<_, _> = staged
.batches
.into_iter()
.map(|inner| {
let id = inner
.id
.into_rust_if_some("ProtoStagedBatches::Inner::id")?;

let mut batches = Vec::with_capacity(inner.batches.len());
for maybe_batch in inner.batches {
use proto_storage_response::proto_staged_batches::batch_result::Value;

let value = maybe_batch.value.ok_or_else(|| {
TryFromProtoError::missing_field("BatchResult::value")
})?;
let batch = match value {
Value::Batch(batch) => Ok(batch),
Value::Error(err) => Err(err),
};
batches.push(batch);
}

Ok::<_, TryFromProtoError>((id, batches))
})
.collect::<Result<_, _>>()?;

Ok(StorageResponse::StagedBatches(batches))
}
None => Err(TryFromProtoError::missing_field(
"ProtoStorageResponse::kind",
)),
Expand Down Expand Up @@ -638,6 +746,8 @@ pub struct PartitionedStorageState<T> {
/// Upper frontiers for sources and sinks, both unioned across all partitions and from each
/// individual partition.
uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
/// Staged batches from oneshot sources that will get appended by `environmentd`.
oneshot_source_responses: BTreeMap<GlobalId, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
}

impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
Expand All @@ -651,6 +761,7 @@ where
PartitionedStorageState {
parts,
uppers: BTreeMap::new(),
oneshot_source_responses: BTreeMap::new(),
}
}
}
Expand Down Expand Up @@ -681,7 +792,8 @@ where
StorageCommand::InitializationComplete
| StorageCommand::AllowWrites
| StorageCommand::UpdateConfiguration(_)
| StorageCommand::AllowCompaction(_) => {}
| StorageCommand::AllowCompaction(_)
| StorageCommand::RunOneshotIngestion(_) => {}
};
}

Expand Down Expand Up @@ -805,6 +917,37 @@ where
StorageResponse::StatusUpdates(updates) => {
Some(Ok(StorageResponse::StatusUpdates(updates)))
}
StorageResponse::StagedBatches(batches) => {
let mut finished_batches = BTreeMap::new();

for (collection_id, batches) in batches {
tracing::info!(%shard_id, %collection_id, "got batch");

let entry = self
.oneshot_source_responses
.entry(collection_id)
.or_default();
let novel = entry.insert(shard_id, batches);
assert_none!(novel, "Duplicate oneshot source response");

// Check if we've received responses from all shards.
if entry.len() == self.parts {
let entry = self
.oneshot_source_responses
.remove(&collection_id)
.expect("checked above");
let all_batches: Vec<_> = entry.into_values().flatten().collect();

finished_batches.insert(collection_id, all_batches);
}
}

if !finished_batches.is_empty() {
Some(Ok(StorageResponse::StagedBatches(finished_batches)))
} else {
None
}
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use mz_cluster_client::client::ClusterReplicaLocation;
use mz_cluster_client::metrics::WallclockLagMetrics;
use mz_cluster_client::ReplicaId;
use mz_ore::collections::CollectionExt;
use mz_persist_client::batch::ProtoBatch;
use mz_persist_client::read::{Cursor, ReadHandle};
use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
use mz_persist_types::schema::SchemaId;
Expand All @@ -41,6 +42,7 @@ use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::inline::InlinedConnection;
use mz_storage_types::controller::{CollectionMetadata, StorageError};
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
use mz_storage_types::read_policy::ReadPolicy;
Expand Down Expand Up @@ -508,6 +510,16 @@ pub trait StorageController: Debug {
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Create a oneshot ingestion.
async fn create_oneshot_ingestion(
&mut self,
ingestion_id: GlobalId,
collection_id: GlobalId,
instance_id: StorageInstanceId,
request: OneshotIngestionRequest,
result_tx: OneshotResultCallback<ProtoBatch>,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Alter the sink identified by the given id to match the provided `ExportDescription`.
async fn alter_export(
&mut self,
Expand Down
Loading

0 comments on commit 7667ddf

Please sign in to comment.