Skip to content

Commit 8734297

Browse files
author
Mouli Mukherjee
committed
temp refactor
1 parent ff22e57 commit 8734297

File tree

14 files changed

+112
-111
lines changed

14 files changed

+112
-111
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/src/catalog.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::time::Duration;
1717
use futures::Future;
1818
use itertools::Itertools;
1919
use mz_adapter_types::connection::ConnectionId;
20+
use mz_storage_types::connections::aws::AwsPrincipalContext;
2021
use tokio::sync::mpsc::UnboundedSender;
2122
use tokio::sync::MutexGuard;
2223
use tracing::{info, trace};
@@ -89,7 +90,7 @@ use mz_storage_types::sources::Timeline;
8990
use mz_transform::dataflow::DataflowMetainfo;
9091

9192
pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
92-
pub use crate::catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap, Config, StateConfig};
93+
pub use crate::catalog::config::{ClusterReplicaSizeMap, Config, StateConfig};
9394
pub use crate::catalog::open::BuiltinMigrationMetadata;
9495
pub use crate::catalog::state::CatalogState;
9596
use crate::command::CatalogDump;
@@ -4220,6 +4221,10 @@ impl SessionCatalog for ConnCatalog<'_> {
42204221
self.state.aws_privatelink_availability_zones.clone()
42214222
}
42224223

4224+
fn aws_principal_context(&self) -> Option<AwsPrincipalContext> {
4225+
self.state.aws_principal_context.clone()
4226+
}
4227+
42234228
fn system_vars(&self) -> &SystemVars {
42244229
&self.state.system_configuration
42254230
}

src/adapter/src/catalog/builtin_table_updates.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use mz_sql::catalog::{CatalogCluster, CatalogDatabase, CatalogSchema, CatalogTyp
4646
use mz_sql::func::FuncImplCatalogDetails;
4747
use mz_sql::names::{CommentObjectId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier};
4848
use mz_sql_parser::ast::display::AstDisplay;
49-
use mz_storage_types::connections::aws::{AwsAuth, AwsConfig};
49+
use mz_storage_types::connections::aws::{AwsAuth, AwsConfig, AwsPrincipalContext};
5050
use mz_storage_types::connections::inline::ReferencedConnection;
5151
use mz_storage_types::connections::{KafkaConnection, StringOrSecret};
5252
use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
@@ -55,8 +55,8 @@ use mz_storage_types::sources::{
5555
};
5656

5757
use crate::catalog::{
58-
AwsPrincipalContext, CatalogItem, CatalogState, ClusterVariant, Connection, DataSourceDesc,
59-
Database, DefaultPrivilegeObject, Func, Index, MaterializedView, Sink, Type, View,
58+
CatalogItem, CatalogState, ClusterVariant, Connection, DataSourceDesc, Database,
59+
DefaultPrivilegeObject, Func, Index, MaterializedView, Sink, Type, View,
6060
};
6161
use crate::coord::ConnMeta;
6262
use crate::subscribe::ActiveSubscribe;

src/adapter/src/catalog/config.rs

+1-60
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@ use std::time::Duration;
1515
use bytesize::ByteSize;
1616
use mz_build_info::BuildInfo;
1717
use mz_catalog;
18-
use mz_cloud_resources::AwsExternalIdPrefix;
1918
use mz_controller::clusters::ReplicaAllocation;
2019
use mz_orchestrator::MemoryLimit;
2120
use mz_ore::cast::CastFrom;
2221
use mz_ore::metrics::MetricsRegistry;
23-
use mz_repr::GlobalId;
2422
use mz_secrets::SecretsReader;
2523
use mz_sql::catalog::EnvironmentId;
2624
use mz_sql::session::vars::ConnectionCounter;
25+
use mz_storage_types::connections::aws::AwsPrincipalContext;
2726
use serde::{Deserialize, Serialize};
2827

2928
use crate::config::SystemParameterSyncConfig;
@@ -223,61 +222,3 @@ impl Default for ClusterReplicaSizeMap {
223222
Self(inner)
224223
}
225224
}
226-
227-
/// Context used to generate an AWS Principal.
228-
///
229-
/// In the case of AWS PrivateLink connections, Materialize will connect to the
230-
/// VPC endpoint as the AWS Principal generated via this context.
231-
#[derive(Debug, Clone, Serialize)]
232-
pub struct AwsPrincipalContext {
233-
pub aws_account_id: String,
234-
pub aws_external_id_prefix: AwsExternalIdPrefix,
235-
}
236-
237-
impl AwsPrincipalContext {
238-
pub fn to_privatelink_principal_string(&self, aws_external_id_suffix: GlobalId) -> String {
239-
format!(
240-
"arn:aws:iam::{}:role/mz_{}_{}",
241-
self.aws_account_id, self.aws_external_id_prefix, aws_external_id_suffix
242-
)
243-
}
244-
245-
pub fn to_aws_connection_principal_string(&self) -> String {
246-
format!(
247-
"arn:aws:iam::{}:role/MaterializeConnection",
248-
self.aws_account_id
249-
)
250-
}
251-
252-
pub fn to_aws_connection_external_id(&self, aws_external_id_suffix: GlobalId) -> String {
253-
format!(
254-
"mz_{}_{}",
255-
self.aws_external_id_prefix, aws_external_id_suffix
256-
)
257-
}
258-
259-
pub fn to_aws_example_trust_policy(
260-
&self,
261-
aws_external_id_suffix: GlobalId,
262-
) -> serde_json::Value {
263-
serde_json::json!(
264-
{
265-
"Version": "2012-10-17",
266-
"Statement": [
267-
{
268-
"Effect": "Allow",
269-
"Principal": {
270-
"AWS": self.to_aws_connection_principal_string()
271-
},
272-
"Action": "sts:AssumeRole",
273-
"Condition": {
274-
"StringEquals": {
275-
"sts:ExternalId": self.to_aws_connection_external_id(aws_external_id_suffix)
276-
}
277-
}
278-
}
279-
]
280-
}
281-
)
282-
}
283-
}

src/adapter/src/catalog/state.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::time::{Duration, Instant};
1717
use anyhow::bail;
1818
use itertools::Itertools;
1919
use mz_adapter_types::connection::ConnectionId;
20+
use mz_storage_types::connections::aws::AwsPrincipalContext;
2021
use serde::Serialize;
2122
use tokio::sync::mpsc;
2223
use tracing::info;
@@ -72,7 +73,7 @@ use mz_storage_types::connections::inline::{
7273
};
7374
use mz_transform::Optimizer;
7475

75-
use crate::catalog::{AwsPrincipalContext, BuiltinTableUpdate, ClusterReplicaSizeMap, ConnCatalog};
76+
use crate::catalog::{BuiltinTableUpdate, ClusterReplicaSizeMap, ConnCatalog};
7677
use crate::coord::ConnMeta;
7778
use crate::optimize::{self, Optimize};
7879
use crate::session::Session;

src/adapter/src/coord.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ use mz_sql::rbac::UnauthorizedError;
112112
use mz_sql::session::user::{RoleMetadata, User};
113113
use mz_sql::session::vars::{self, ConnectionCounter};
114114
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};
115+
use mz_storage_types::connections::aws::AwsPrincipalContext;
115116
use mz_storage_types::connections::inline::IntoInlineConnection;
116117
use mz_storage_types::connections::ConnectionContext;
117118
use mz_storage_types::sources::Timeline;
@@ -127,7 +128,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
127128
use uuid::Uuid;
128129

129130
use crate::catalog::{
130-
self, AwsPrincipalContext, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
131+
self, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
131132
ClusterReplicaSizeMap, Connection, DataSourceDesc, Source,
132133
};
133134
use crate::client::{Client, Handle};

src/clusterd/src/bin/clusterd.rs

-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
321321
ConnectionContext::from_cli_args(
322322
&args.tracing.startup_log_filter,
323323
args.aws_external_id,
324-
None, // TODO(mouli): fix passing to clusterd
325324
secrets_reader,
326325
None,
327326
),

src/environmentd/src/bin/environmentd/main.rs

-1
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,6 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
981981
connection_context: ConnectionContext::from_cli_args(
982982
&args.tracing.startup_log_filter,
983983
args.aws_external_id_prefix,
984-
args.aws_account_id.clone(),
985984
secrets_reader,
986985
cloud_resource_reader,
987986
),

src/sql/src/catalog.rs

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use mz_repr::explain::ExprHumanizer;
3131
use mz_repr::role_id::RoleId;
3232
use mz_repr::{ColumnName, GlobalId, RelationDesc};
3333
use mz_sql_parser::ast::{Expr, Ident, QualifiedReplica, UnresolvedItemName};
34+
use mz_storage_types::connections::aws::AwsPrincipalContext;
3435
use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
3536
use mz_storage_types::connections::Connection;
3637
use mz_storage_types::sources::SourceDesc;
@@ -275,6 +276,9 @@ pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionR
275276
/// Returns the set of supported AWS PrivateLink availability zone ids.
276277
fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
277278

279+
/// Returns the AWS context for using AWS connections
280+
fn aws_principal_context(&self) -> Option<AwsPrincipalContext>;
281+
278282
/// Returns system vars
279283
fn system_vars(&self) -> &SystemVars;
280284

src/sql/src/plan/statement/ddl/connection.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,18 @@ impl ConnectionOptionExtracted {
184184

185185
let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
186186
(Some(arn), session_name) => {
187-
Some(AwsAuth::AssumeRole(AwsAssumeRole { arn, session_name }))
187+
let aws_principal_context =
188+
scx.catalog.aws_principal_context().ok_or_else(|| {
189+
sql_err!("AWS principal context should have been provided")
190+
})?;
191+
Some(AwsAuth::AssumeRole(AwsAssumeRole {
192+
arn,
193+
session_name,
194+
mz_principal: aws_principal_context
195+
.to_aws_connection_principal_string(),
196+
external_id_prefix: aws_principal_context.aws_external_id_prefix.to_string(),
197+
}))
188198
}
189-
190199
_ => None,
191200
};
192201

src/storage-types/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ prost = { version = "0.11.3", features = ["no-recursion-limit"] }
4545
rdkafka = { version = "0.29.0", features = ["cmake-build", "ssl-vendored", "libz-static", "zstd"] }
4646
scopeguard = "1.1.0"
4747
serde = { version = "1.0.152", features = ["derive"] }
48+
serde_json = "1.0.89"
4849
thiserror = "1.0.37"
4950
timely = { version = "0.12.0", default-features = false, features = ["bincode"] }
5051
tokio = { version = "1.24.2", features = ["fs", "rt", "sync", "test-util", "time"] }

src/storage-types/src/connections.rs

-5
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ pub struct ConnectionContext {
122122
pub librdkafka_log_level: tracing::Level,
123123
/// A prefix for an external ID to use for all AWS AssumeRole operations.
124124
pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
125-
/// The AWS principal for Materialize.
126-
pub aws_principal: Option<String>,
127125
/// A secrets reader.
128126
pub secrets_reader: Arc<dyn SecretsReader>,
129127
/// A cloud resource reader, if supported in this configuration.
@@ -143,7 +141,6 @@ impl ConnectionContext {
143141
pub fn from_cli_args(
144142
startup_log_level: &CloneableEnvFilter,
145143
aws_external_id_prefix: Option<AwsExternalIdPrefix>,
146-
aws_principal: Option<String>,
147144
secrets_reader: Arc<dyn SecretsReader>,
148145
cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
149146
) -> ConnectionContext {
@@ -153,7 +150,6 @@ impl ConnectionContext {
153150
"librdkafka",
154151
),
155152
aws_external_id_prefix,
156-
aws_principal,
157153
secrets_reader,
158154
cloud_resource_reader,
159155
ssh_tunnel_manager: SshTunnelManager::default(),
@@ -165,7 +161,6 @@ impl ConnectionContext {
165161
ConnectionContext {
166162
librdkafka_log_level: tracing::Level::INFO,
167163
aws_external_id_prefix: None,
168-
aws_principal: None,
169164
secrets_reader,
170165
cloud_resource_reader: None,
171166
ssh_tunnel_manager: SshTunnelManager::default(),

src/storage-types/src/connections/aws.proto

+2
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ message ProtoAwsCredentials {
3636
message ProtoAwsAssumeRole {
3737
string arn = 1;
3838
optional string session_name = 2;
39+
string mz_principal = 3;
40+
string external_id_prefix = 4;
3941
}

0 commit comments

Comments
 (0)