Skip to content

Support assuming an intermediate role when using the AWS-MSK-IAM SASL mechanism #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func getPreRun(cmd *cobra.Command, args []string) error {

func getRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
sess, _ := session.NewSession()

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func replPreRun(cmd *cobra.Command, args []string) error {

func replRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
sess, _ := session.NewSession()

adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
Expand Down
48 changes: 30 additions & 18 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (
)

type sharedOptions struct {
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
saslAssumeRole string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
}

func (s sharedOptions) validate() error {
Expand Down Expand Up @@ -95,6 +96,10 @@ func (s sharedOptions) validate() error {
(s.saslUsername != "" || s.saslPassword != "") {
log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM")
}

if saslMechanism != admin.SASLMechanismAWSMSKIAM && s.saslAssumeRole != "" {
log.Warn("AssumeRole is ignored unless using SASL AWS-MSK-IAM")
}
}

return err
Expand Down Expand Up @@ -150,10 +155,11 @@ func (s sharedOptions) getAdminClient(
SkipVerify: s.tlsSkipVerify,
},
SASL: admin.SASLConfig{
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
AssumeRole: s.saslAssumeRole,
},
},
ReadOnly: readOnly,
Expand Down Expand Up @@ -211,6 +217,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
os.Getenv("TOPICCTL_SASL_USERNAME"),
"SASL username if using SASL; will override value set in cluster config",
)
cmd.Flags().StringVar(
&options.saslAssumeRole,
"sasl-assume-role",
"",
"Intermediate role to assume if using SASL AWS-MSK-IAM",
)
cmd.Flags().StringVar(
&options.tlsCACert,
"tls-ca-cert",
Expand Down
20 changes: 15 additions & 5 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -48,10 +50,11 @@ type TLSConfig struct {

// SASLConfig stores the SASL-related configuration for a connection.
type SASLConfig struct {
Enabled bool
Mechanism SASLMechanism
Username string
Password string
Enabled bool
Mechanism SASLMechanism
Username string
Password string
AssumeRole string
}

// Connector is a wrapper around the low-level, kafka-go dialer and client.
Expand All @@ -75,7 +78,14 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
switch config.SASL.Mechanism {
case SASLMechanismAWSMSKIAM:
sess := session.Must(session.NewSession())
signer := sigv4.NewSigner(sess.Config.Credentials)
var creds *credentials.Credentials
if config.SASL.AssumeRole != "" {
creds = stscreds.NewCredentials(sess, config.SASL.AssumeRole)
} else {
creds = sess.Config.Credentials
}

signer := sigv4.NewSigner(creds)
region := aws.StringValue(sess.Config.Region)

mechanismClient = &aws_msk_iam.Mechanism{
Expand Down
16 changes: 12 additions & 4 deletions pkg/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type SASLConfig struct {

// Password is the SASL password. Ignored if mechanism is AWS-MSK-IAM.
Password string `json:"password"`

// Intermediate role ARN to assume. Only used if mechanism is AWS-MSK-IAM.
AssumeRole string `json:"assumeRole"`
}

// Validate evaluates whether the cluster config is valid.
Expand Down Expand Up @@ -165,6 +168,10 @@ func (c ClusterConfig) Validate() error {
(c.Spec.SASL.Username != "" || c.Spec.SASL.Password != "") {
log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM")
}

if saslMechanism != admin.SASLMechanismAWSMSKIAM && c.Spec.SASL.AssumeRole != "" {
log.Warn("AssumeRole is ignored unless using SASL AWS-MSK-IAM")
}
}

return err
Expand Down Expand Up @@ -231,10 +238,11 @@ func (c ClusterConfig) NewAdminClient(
SkipVerify: c.Spec.TLS.SkipVerify,
},
SASL: admin.SASLConfig{
Enabled: c.Spec.SASL.Enabled,
Mechanism: saslMechanism,
Username: saslUsername,
Password: saslPassword,
Enabled: c.Spec.SASL.Enabled,
Mechanism: saslMechanism,
Username: saslUsername,
Password: saslPassword,
AssumeRole: c.Spec.SASL.AssumeRole,
},
},
ExpectedClusterID: c.Spec.ClusterID,
Expand Down