Skip to content

[storage] Add a progress shard for sinks and skip the snapshot when possible #31152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 11, 2025
4 changes: 3 additions & 1 deletion src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,8 +1058,10 @@ impl Catalog {
CatalogItem::ContinualTask(ct) => {
storage_collections_to_create.insert(ct.global_id());
}
CatalogItem::Sink(sink) => {
storage_collections_to_create.insert(sink.global_id());
}
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
Expand Down
51 changes: 46 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@ use mz_sql::session::vars::SystemVars;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::ExplainStage;
use mz_storage_client::client::TableData;
use mz_storage_client::controller::{CollectionDescription, DataSource};
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
use mz_storage_types::connections::Connection as StorageConnection;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::read_holds::ReadHold;
use mz_storage_types::sinks::S3SinkFormat;
use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use mz_storage_types::sources::Timeline;
use mz_timestamp_oracle::postgres_oracle::{
PostgresTimestampOracle, PostgresTimestampOracleConfig,
Expand Down Expand Up @@ -2075,9 +2076,11 @@ impl Coordinator {
self.ship_dataflow(df_desc, mview.cluster_id, None).await;
}
CatalogItem::Sink(sink) => {
self.create_storage_export(sink.global_id(), sink)
.await
.unwrap_or_terminate("cannot fail to create exports");
policies_to_set
.entry(CompactionWindow::Default)
.or_insert_with(Default::default)
.storage_ids
.insert(sink.global_id());
}
CatalogItem::Connection(catalog_connection) => {
if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
Expand Down Expand Up @@ -2731,6 +2734,44 @@ impl Coordinator {
collections.push((ct.global_id(), collection_desc));
}
}
CatalogItem::Sink(sink) => {
let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let from_desc = storage_sink_from_entry
.desc(&self.catalog().resolve_full_name(
storage_sink_from_entry.name(),
storage_sink_from_entry.conn_id(),
))
.expect("sinks can only be built on items with descs")
.into_owned();
let collection_desc = CollectionDescription {
// TODO(sinks): make generic once we have more than one sink type.
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Sink {
desc: ExportDescription {
sink: StorageSinkDesc {
from: sink.from,
from_desc,
connection: sink
.connection
.clone()
.into_inline_connection(self.catalog().state()),
partition_strategy: sink.partition_strategy.clone(),
envelope: sink.envelope,
as_of: Antichain::from_elem(Timestamp::minimum()),
with_snapshot: sink.with_snapshot,
version: sink.version,
from_storage_metadata: (),
to_storage_metadata: (),
},
instance_id: sink.cluster_id,
},
},
since: None,
status_collection_id: None,
timeline: None,
};
collections.push((sink.global_id, collection_desc));
}
_ => (),
}
}
Expand Down
34 changes: 24 additions & 10 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ use mz_sql::session::vars::{
MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
MAX_SOURCES, MAX_TABLES,
};
use mz_storage_client::controller::ExportDescription;
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::PostgresConnection;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use mz_storage_types::sources::GenericSourceConnection;
use serde_json::json;
use tracing::{event, info_span, warn, Instrument, Level};
Expand Down Expand Up @@ -1056,9 +1057,10 @@ impl Coordinator {
}

pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
let storage_metadata = self.catalog.state().storage_metadata();
self.controller
.storage
.drop_sinks(sink_gids)
.drop_sinks(storage_metadata, sink_gids)
.unwrap_or_terminate("cannot fail to drop sinks");
}

Expand Down Expand Up @@ -1318,7 +1320,7 @@ impl Coordinator {
storage_sink_from_entry.name(),
storage_sink_from_entry.conn_id(),
))
.expect("indexes can only be built on items with descs")
.expect("sinks can only be built on items with descs")
.into_owned(),
connection: sink
.connection
Expand All @@ -1330,18 +1332,30 @@ impl Coordinator {
with_snapshot: sink.with_snapshot,
version: sink.version,
from_storage_metadata: (),
to_storage_metadata: (),
};

let res = self
.controller
.storage
.create_exports(vec![(
id,
ExportDescription {
let collection_desc = CollectionDescription {
// TODO(sinks): make generic once we have more than one sink type.
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Sink {
desc: ExportDescription {
sink: storage_sink_desc,
instance_id: sink.cluster_id,
},
)])
},
since: None,
status_collection_id: None,
timeline: None,
};
let collections = vec![(id, collection_desc)];

// Create the collections.
let storage_metadata = self.catalog.state().storage_metadata();
let res = self
.controller
.storage
.create_collections(storage_metadata, None, collections)
.await;

// Drop read holds after the export has been created, at which point
Expand Down
55 changes: 42 additions & 13 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@ use futures::{future, Future};
use itertools::Itertools;
use maplit::btreeset;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
use mz_cloud_resources::VpcEndpointConfig;
use mz_controller_types::ReplicaId;
use mz_expr::{
CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
};
use mz_ore::cast::CastFrom;
use mz_ore::collections::{CollectionExt, HashSet};
use mz_ore::task::{self, spawn, JoinHandle};
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::vec::VecExt;
use mz_ore::{assert_none, instrument};
use mz_persist_client::stats::SnapshotPartStats;
use mz_repr::adt::jsonb::Jsonb;
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
use mz_repr::explain::json::json_string;
Expand All @@ -39,6 +46,7 @@ use mz_repr::{
CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
};
use mz_sql::ast::AlterSourceAddSubsourceOption;
use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
use mz_sql::catalog::{
CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
Expand All @@ -52,17 +60,7 @@ use mz_sql::names::{
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::sinks::StorageSinkDesc;
use smallvec::SmallVec;
use timely::progress::Timestamp as TimelyTimestamp;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
use mz_ore::cast::CastFrom;
use mz_ore::{assert_none, instrument};
use mz_persist_client::stats::SnapshotPartStats;
use mz_sql::ast::AlterSourceAddSubsourceOption;
use mz_sql::plan::{
AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
Expand Down Expand Up @@ -90,9 +88,11 @@ use mz_storage_types::AlterCompatible;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
use mz_transform::EmptyStatisticsOracle;
use smallvec::SmallVec;
use timely::progress::Antichain;
use timely::progress::Timestamp as TimelyTimestamp;
use tokio::sync::{oneshot, watch};
use tracing::{warn, Instrument, Span};
use tracing::{info, warn, Instrument, Span};

use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
use crate::command::{ExecuteResponse, Response};
Expand Down Expand Up @@ -1349,6 +1349,9 @@ impl Coordinator {
.await
.unwrap_or_terminate("cannot fail to create exports");

self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
.await;

ctx.retire(Ok(ExecuteResponse::CreatedSink))
}

Expand Down Expand Up @@ -3441,7 +3444,7 @@ impl Coordinator {
ctx: ExecuteContext,
plan: plan::AlterSinkPlan,
) {
// 1. Put a read hold on the new relation
// Put a read hold on the new relation
let id_bundle = crate::CollectionIdBundle {
storage_ids: BTreeSet::from_iter([plan.sink.from]),
compute_ids: BTreeMap::new(),
Expand Down Expand Up @@ -3483,6 +3486,15 @@ impl Coordinator {
}
};

info!(
"preparing alter sink for {}: frontiers={:?} export={:?}",
plan.global_id,
self.controller
.storage_collections
.collections_frontiers(vec![plan.global_id, plan.sink.from]),
self.controller.storage.export(plan.global_id)
);

// Now we must wait for the sink to make enough progress such that there is overlap between
// the new `from` collection's read hold and the sink's write frontier.
self.install_storage_watch_set(
Expand Down Expand Up @@ -3510,6 +3522,17 @@ impl Coordinator {
return;
}
}
{
let plan = &ctx.plan;
info!(
"finishing alter sink for {}: frontiers={:?} export={:?}",
plan.global_id,
self.controller
.storage_collections
.collections_frontiers(vec![plan.global_id, plan.sink.from]),
self.controller.storage.export(plan.global_id)
);
}

let plan::AlterSinkPlan {
item_id,
Expand All @@ -3527,7 +3550,12 @@ impl Coordinator {
.expect("sink known to exist")
.write_frontier;
let as_of = ctx.read_hold.least_valid_read();
assert!(write_frontier.iter().all(|t| as_of.less_than(t)));
assert!(
write_frontier.iter().all(|t| as_of.less_than(t)),
"{:?} should be strictly less than {:?}",
&*as_of,
&**write_frontier
);

let catalog_sink = Sink {
create_sql: sink.create_sql,
Expand Down Expand Up @@ -3576,6 +3604,7 @@ impl Coordinator {
version: sink.version,
partition_strategy: sink.partition_strategy,
from_storage_metadata: (),
to_storage_metadata: (),
};

self.controller
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl<T> ShouldTerminateGracefully for StorageError<T> {
| StorageError::ReadBeforeSince(_)
| StorageError::InvalidUppers(_)
| StorageError::InvalidUsage(_)
| StorageError::SourceIdReused(_)
| StorageError::CollectionIdReused(_)
| StorageError::SinkIdReused(_)
| StorageError::IdentifierMissing(_)
| StorageError::IdentifierInvalid(_)
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,9 +1614,9 @@ impl CatalogItem {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Sink(_)
| CatalogItem::ContinualTask(_) => true,
CatalogItem::Log(_)
| CatalogItem::Sink(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
Expand Down
Loading