Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Connection Implementation #23282

Merged
merged 8 commits into from
Dec 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
@@ -381,18 +381,18 @@ steps:
composition: zippy
args: [--scenario=AlterConnectionWithKafkaSources, --actions=10000, --max-execution-time=30m]

- group: Secrets
key: secrets
- group: AWS
key: aws
steps:
- id: secrets-aws-secrets-manager
label: "Secrets AWS"
- id: aws
label: AWS
timeout_in_minutes: 30
agents:
queue: linux-x86_64
artifact_paths: junit_*.xml
plugins:
- ./ci/plugins/mzcompose:
composition: secrets-aws-secrets-manager
composition: aws

- id: secrets-local-file
label: "Secrets Local File"
2 changes: 1 addition & 1 deletion doc/developer/design/20231110_aws_connections.md
Original file line number Diff line number Diff line change
@@ -127,7 +127,7 @@ CREATE TABLE mz_internal.mz_aws_connections (
-- The value of the `AWS ACCESS KEY ID` option, if specified as a string.
-- `NULL` otherwise.
access_key_id text,
-- The value of the `AWS ACCESS KEY ID` option, if specified as a secret.
-- The ID of the `AWS ACCESS KEY ID` secret in `mz_secrets`, if specified as a secret.
-- `NULL` otherwise.
access_key_id_secret_id text,
-- The value of the `ASSUME ROLE ARN` option, if specified. `NULL` otherwise.
1 change: 1 addition & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
@@ -1222,6 +1222,7 @@ The `mz_scheduling_parks_histogram` view describes a histogram of [dataflow] wor

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_activity_log_redacted -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_aggregates -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_aws_connections -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_dataflow_operator_reachability -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_dataflow_operator_reachability_per_worker -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_dataflow_operator_reachability_raw -->
64 changes: 64 additions & 0 deletions misc/python/materialize/checks/all_checks/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from __future__ import annotations

from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent


@externally_idempotent(False)
class AwsConnection(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
$[version>=8000] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_aws_connection = true
ALTER SYSTEM SET enable_connection_validation_syntax = true

> CREATE CONNECTION aws_assume_role
TO AWS (ASSUME ROLE ARN 'assume-role', ASSUME ROLE SESSION NAME 'session-name');

> CREATE SECRET aws_secret_access_key as '...';

> CREATE CONNECTION aws_credentials
TO AWS (ACCESS KEY ID = 'access_key', SECRET ACCESS KEY = SECRET aws_secret_access_key);
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
"""
> ALTER CONNECTION aws_assume_role SET (ASSUME ROLE ARN 'assume-role-2');
""",
"""
> ALTER CONNECTION aws_credentials SET (ACCESS KEY ID 'access_key_2');
""",
]
]

def validate(self) -> Testdrive:
# We just check that the connections are still safe to reference.
# The error is inconsistent depending on the way the check is being run.
return Testdrive(
dedent(
"""
! VALIDATE CONNECTION aws_assume_role;
regex:.*

! VALIDATE CONNECTION aws_credentials;
regex:.*
"""
)
)
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_logged_errors_detect.py
Original file line number Diff line number Diff line change
@@ -94,6 +94,8 @@
| (platform-checks|legacy-upgrade|upgrade-matrix|feature-benchmark)-materialized-.* \| .*cannot\ load\ unknown\ system\ parameter\ from\ catalog\ storage
# For platform-checks upgrade tests
| cannot\ load\ unknown\ system\ parameter\ from\ catalog\ storage(\ to\ set\ (default|configured)\ parameter)?\ name=enable_dangerous_functions
| internal\ error:\ no\ AWS\ external\ ID\ prefix\ configured
| failed\ writing\ row\ to\ mz_aws_connections.*no\ AWS\ external\ ID\ prefix\ configured
)
""",
re.VERBOSE | re.MULTILINE,
90 changes: 79 additions & 11 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
@@ -13,10 +13,11 @@ use bytesize::ByteSize;
use chrono::{DateTime, Utc};
use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
use mz_catalog::builtin::{
MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES,
MZ_CLUSTERS, MZ_CLUSTER_LINKS, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_METRICS,
MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS,
MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS,
MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTERS, MZ_CLUSTER_LINKS,
MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_METRICS, MZ_CLUSTER_REPLICA_SIZES,
MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_DATABASES,
MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS,
MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES,
MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEWS, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_ROLES, MZ_ROLE_MEMBERS, MZ_SCHEMAS, MZ_SECRETS,
@@ -46,8 +47,9 @@ use mz_sql::catalog::{CatalogCluster, CatalogDatabase, CatalogSchema, CatalogTyp
use mz_sql::func::FuncImplCatalogDetails;
use mz_sql::names::{CommentObjectId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier};
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::connections::KafkaConnection;
use mz_storage_types::connections::{KafkaConnection, StringOrSecret};
use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
use mz_storage_types::sources::{
GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection,
@@ -636,26 +638,35 @@ impl CatalogState {
diff,
));
} else {
tracing::error!("does this even happen?");
tracing::error!(%id, "missing SSH public key; cannot write row to mz_ssh_connections table");
}
}
mz_storage_types::connections::Connection::Kafka(ref kafka) => {
updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
}
mz_storage_types::connections::Connection::Aws(ref aws_config) => {
match self.pack_aws_connection_update(id, aws_config, diff) {
Ok(update) => {
updates.push(update);
}
Err(e) => {
tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
}
}
}
mz_storage_types::connections::Connection::AwsPrivatelink(_) => {
if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
updates.extend(self.pack_aws_privatelink_connection_update(
updates.push(self.pack_aws_privatelink_connection_update(
id,
aws_principal_context,
diff,
));
} else {
tracing::error!("Missing AWS principal context, cannot write to mz_aws_privatelink_connections table");
tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
}
}
mz_storage_types::connections::Connection::Csr(_)
| mz_storage_types::connections::Connection::Postgres(_)
| mz_storage_types::connections::Connection::Aws(_) => (),
| mz_storage_types::connections::Connection::Postgres(_) => (),
};
updates
}
@@ -714,12 +725,69 @@ impl CatalogState {
connection_id: GlobalId,
aws_principal_context: &AwsPrincipalContext,
diff: Diff,
) -> Result<BuiltinTableUpdate, Error> {
) -> BuiltinTableUpdate {
let id = self.resolve_builtin_table(&MZ_AWS_PRIVATELINK_CONNECTIONS);
let row = Row::pack_slice(&[
Datum::String(&connection_id.to_string()),
Datum::String(&aws_principal_context.to_principal_string(connection_id)),
]);
BuiltinTableUpdate { id, row, diff }
}

pub fn pack_aws_connection_update(
&self,
connection_id: GlobalId,
aws_config: &AwsConnection,
diff: Diff,
) -> Result<BuiltinTableUpdate, anyhow::Error> {
let id = self.resolve_builtin_table(&MZ_AWS_CONNECTIONS);

let mut access_key_id = None;
let mut access_key_id_secret_id = None;
let mut assume_role_arn = None;
let mut assume_role_session_name = None;
let mut principal = None;
let mut external_id = None;
let mut example_trust_policy = None;
match &aws_config.auth {
AwsAuth::Credentials(credentials) => match &credentials.access_key_id {
StringOrSecret::String(access_key) => access_key_id = Some(access_key.as_str()),
StringOrSecret::Secret(secret_id) => {
access_key_id_secret_id = Some(secret_id.to_string())
}
},
AwsAuth::AssumeRole(assume_role) => {
assume_role_arn = Some(assume_role.arn.as_str());
assume_role_session_name = assume_role.session_name.as_deref();
principal = self
.config
.connection_context
.aws_connection_role_arn
.as_deref();
external_id =
Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
example_trust_policy = {
let policy = assume_role
.example_trust_policy(&self.config.connection_context, connection_id)?;
let policy = Jsonb::from_serde_json(policy).expect("valid json");
Some(policy.into_row())
};
}
}

let row = Row::pack_slice(&[
Datum::String(&connection_id.to_string()),
Datum::from(aws_config.endpoint.as_deref()),
Datum::from(aws_config.region.as_deref()),
Datum::from(access_key_id),
Datum::from(access_key_id_secret_id.as_deref()),
Datum::from(assume_role_arn),
Datum::from(assume_role_session_name),
Datum::from(principal),
Datum::from(external_id.as_deref()),
Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guswynn I golfed this a little more to get rid of the holder entirely.

]);

Ok(BuiltinTableUpdate { id, row, diff })
}

2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
@@ -43,6 +43,8 @@ breaking:
- persist-types/src/stats.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: currently does not require backward-compatibility
- storage-types/src/connections/aws.proto
lint:
use:
- DEFAULT
22 changes: 22 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
@@ -2646,6 +2646,27 @@ pub static MZ_AWS_PRIVATELINK_CONNECTIONS: Lazy<BuiltinTable> = Lazy::new(|| Bui
sensitivity: DataSensitivity::Public,
});

pub static MZ_AWS_CONNECTIONS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_aws_connections",
schema: MZ_INTERNAL_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::String.nullable(false))
.with_column("endpoint", ScalarType::String.nullable(true))
.with_column("region", ScalarType::String.nullable(true))
.with_column("access_key_id", ScalarType::String.nullable(true))
.with_column("access_key_id_secret_id", ScalarType::String.nullable(true))
.with_column("assume_role_arn", ScalarType::String.nullable(true))
.with_column(
"assume_role_session_name",
ScalarType::String.nullable(true),
)
.with_column("principal", ScalarType::String.nullable(true))
.with_column("external_id", ScalarType::String.nullable(true))
.with_column("example_trust_policy", ScalarType::Jsonb.nullable(true)),
is_retained_metrics_object: false,
sensitivity: DataSensitivity::Public,
});

pub static MZ_CLUSTER_REPLICA_METRICS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_cluster_replica_metrics",
// TODO[btv] - make this public once we work out whether and how to fuse it with
@@ -6185,6 +6206,7 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Table(&MZ_STORAGE_USAGE_BY_SHARD),
Builtin::Table(&MZ_EGRESS_IPS),
Builtin::Table(&MZ_AWS_PRIVATELINK_CONNECTIONS),
Builtin::Table(&MZ_AWS_CONNECTIONS),
Builtin::Table(&MZ_SUBSCRIPTIONS),
Builtin::Table(&MZ_SESSIONS),
Builtin::Table(&MZ_DEFAULT_PRIVILEGES),
6 changes: 6 additions & 0 deletions src/clusterd/src/bin/clusterd.rs
Original file line number Diff line number Diff line change
@@ -174,6 +174,11 @@ struct Args {
#[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", parse(from_str = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable))]
aws_external_id_prefix: Option<AwsExternalIdPrefix>,

/// The ARN for a Materialize-controlled role to assume before assuming
/// a customer's requested role for an AWS connection.
#[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
aws_connection_role_arn: Option<String>,

// === Process orchestrator options. ===
/// Where to write a PID lock file.
///
@@ -333,6 +338,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
args.environment_id,
&args.tracing.startup_log_filter,
args.aws_external_id_prefix,
args.aws_connection_role_arn,
secrets_reader,
None,
),
7 changes: 7 additions & 0 deletions src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
@@ -587,6 +587,7 @@ where
};
let environment_id = self.connection_context().environment_id.clone();
let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
let persist_pubsub_url = self.persist_pubsub_url.clone();
let persist_txn_tables = self.persist_txn_tables;
let secrets_args = self.secrets_args.to_flags();
@@ -620,6 +621,12 @@ where
aws_external_id_prefix
));
}
if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
args.push(format!(
"--aws-connection-role-arn={}",
aws_connection_role_arn
));
}
if let Some(memory_limit) = location.allocation.memory_limit {
args.push(format!(
"--announce-memory-limit={}",
1 change: 1 addition & 0 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
@@ -892,6 +892,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
args.environment_id.to_string(),
&args.tracing.startup_log_filter,
args.aws_external_id_prefix,
args.aws_connection_role_arn,
secrets_reader,
cloud_resource_reader,
);
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ Array
As
Asc
Assert
Assume
At
Auction
Authority
6 changes: 4 additions & 2 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
@@ -669,6 +669,8 @@ impl_display_t!(DbzTxMetadataOption);
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ConnectionOptionName {
AccessKeyId,
AssumeRoleArn,
AssumeRoleSessionName,
AvailabilityZones,
AwsPrivatelink,
Broker,
@@ -680,7 +682,6 @@ pub enum ConnectionOptionName {
Port,
ProgressTopic,
Region,
RoleArn,
SaslMechanisms,
SaslPassword,
SaslUsername,
@@ -712,7 +713,8 @@ impl AstDisplay for ConnectionOptionName {
ConnectionOptionName::Port => "PORT",
ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
ConnectionOptionName::Region => "REGION",
ConnectionOptionName::RoleArn => "ROLE ARN",
ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
ConnectionOptionName::SaslPassword => "SASL PASSWORD",
ConnectionOptionName::SaslUsername => "SASL USERNAME",
Loading