-
Notifications
You must be signed in to change notification settings - Fork 834
feat(runs): populate RunDetails.executed_by from forwarded auth headers #7547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
8a4c1c5
32b10ea
5236f86
0a8dbb5
9598171
4f9befc
125d7a1
e7dfbf1
1d901bd
fdc0ea0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| -- Add created_by to actions: the OIDC subject of the identity that created the run. | ||
| -- Captured from the auth headers the load balancer forwards (it enforces auth), | ||
| -- and used to populate ActionMetadata.executed_by on read. | ||
| ALTER TABLE actions ADD COLUMN IF NOT EXISTS created_by VARCHAR(255); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Add executed_by to actions: the serialized common.EnrichedIdentity of the run's | ||
| -- creator, captured from the OIDC claims the load balancer forwards (subject plus | ||
| -- name/email when present). created_by keeps the bare subject for querying; this | ||
| -- column carries the full identity surfaced as ActionMetadata.executed_by. | ||
| ALTER TABLE actions ADD COLUMN IF NOT EXISTS executed_by BYTEA; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,16 @@ type Action struct { | |
| // Who initiated this run(web, CLI, scheduler, etc.) | ||
| RunSource string `db:"run_source" json:"run_source,omitempty"` | ||
|
|
||
| // CreatedBy is the OIDC subject of the identity that created this run, captured | ||
| // from the auth headers the load balancer forwards. Kept for querying/filtering. | ||
| // NULL for runs created without an authenticated identity. | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clarified the field comment: |
||
| CreatedBy sql.NullString `db:"created_by" json:"created_by,omitempty"` | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intentional: |
||
|
|
||
| // ExecutedBy is the serialized common.EnrichedIdentity of the run's creator | ||
| // (subject plus name/email when the forwarded OIDC claims include them). | ||
| // Surfaced directly as ActionMetadata.executed_by. NULL for unauthenticated runs. | ||
| ExecutedBy []byte `db:"executed_by" json:"executed_by,omitempty"` | ||
|
|
||
| // Trigger fields — only set for runs created via RUN_SOURCE_SCHEDULE_TRIGGER. | ||
| TriggerTaskName sql.NullString `db:"trigger_task_name"` | ||
| TriggerName sql.NullString `db:"trigger_name"` | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| package service | ||
|
|
||
| import ( | ||
| "encoding/base64" | ||
| "encoding/json" | ||
| "net/http" | ||
| "strings" | ||
|
|
||
| "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common" | ||
| ) | ||
|
|
||
| const ( | ||
| // albDataHeader is the signed JWT of user claims set by ALB authenticate-oidc | ||
| // (browser/cookie path). Its payload carries sub, email, given_name, family_name. | ||
| albDataHeader = "X-Amzn-Oidc-Data" | ||
| // albIdentityHeader is also set by ALB authenticate-oidc and carries the OIDC | ||
| // subject (`sub`) directly — used as a fallback when the data header is absent. | ||
| albIdentityHeader = "X-Amzn-Oidc-Identity" | ||
| // authorizationHeader carries the Bearer token on the JWT-validation path | ||
| // (SDK/CLI). The load balancer validates it and forwards it unchanged. | ||
| authorizationHeader = "Authorization" | ||
| bearerPrefix = "Bearer " | ||
| ) | ||
|
|
||
| // oidcClaims is the subset of OIDC claims we surface as the executing identity. | ||
| type oidcClaims struct { | ||
| Sub string `json:"sub"` | ||
| Email string `json:"email"` | ||
| GivenName string `json:"given_name"` | ||
| FamilyName string `json:"family_name"` | ||
| } | ||
|
|
||
| // identityFromHeaders builds the EnrichedIdentity of the caller from the auth headers | ||
| // the load balancer forwards. Auth is enforced upstream (e.g. ALB OIDC / JWT | ||
| // validation), so the claims are trusted and only decoded here — not re-verified. | ||
| // Returns nil when no authenticated identity is present. | ||
| func identityFromHeaders(h http.Header) *common.EnrichedIdentity { | ||
|
Comment on lines
+33
to
+37
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in |
||
| // authenticate-oidc (browser/cookie) path: full claims in the signed data JWT. | ||
| if id := identityFromJWT(h.Get(albDataHeader)); id != nil { | ||
| return id | ||
| } | ||
| // Same path, subject only — when the data header is unavailable. | ||
| if sub := strings.TrimSpace(h.Get(albIdentityHeader)); sub != "" { | ||
| return subjectOnlyIdentity(sub) | ||
| } | ||
| // JWT (SDK/CLI) path: decode the forwarded Bearer token's claims. | ||
| if authz := h.Get(authorizationHeader); len(authz) > len(bearerPrefix) && | ||
| strings.EqualFold(authz[:len(bearerPrefix)], bearerPrefix) { | ||
| return identityFromJWT(strings.TrimSpace(authz[len(bearerPrefix):])) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // identityFromJWT decodes a JWT's claims payload (without verifying the signature — | ||
| // the load balancer already validated it) into an EnrichedIdentity. Returns nil on | ||
| // any malformed input or when no subject claim is present. | ||
| func identityFromJWT(token string) *common.EnrichedIdentity { | ||
| parts := strings.Split(token, ".") | ||
| if len(parts) != 3 { | ||
| return nil | ||
| } | ||
| payload, err := decodeJWTSegment(parts[1]) | ||
| if err != nil { | ||
| return nil | ||
| } | ||
| var c oidcClaims | ||
| if err := json.Unmarshal(payload, &c); err != nil || c.Sub == "" { | ||
| return nil | ||
| } | ||
| id := subjectOnlyIdentity(c.Sub) | ||
| if c.Email != "" || c.GivenName != "" || c.FamilyName != "" { | ||
| id.GetUser().Spec = &common.UserSpec{ | ||
| FirstName: c.GivenName, | ||
| LastName: c.FamilyName, | ||
| Email: c.Email, | ||
| } | ||
| } | ||
| return id | ||
| } | ||
|
|
||
| // decodeJWTSegment base64url-decodes a JWT segment, tolerating both the unpadded | ||
| // form (per the JWT spec) and the padded form some issuers — notably AWS ALB's | ||
| // x-amzn-oidc-data — emit. Without this, a payload whose length isn't a multiple of | ||
| // 4 fails strict RawURLEncoding and the claims (email, name) are silently dropped. | ||
| func decodeJWTSegment(seg string) ([]byte, error) { | ||
| if b, err := base64.RawURLEncoding.DecodeString(seg); err == nil { | ||
| return b, nil | ||
| } | ||
| if pad := len(seg) % 4; pad != 0 { | ||
| seg += strings.Repeat("=", 4-pad) | ||
| } | ||
| return base64.URLEncoding.DecodeString(seg) | ||
| } | ||
|
|
||
| // subjectOnlyIdentity builds a minimal EnrichedIdentity carrying just the subject. | ||
| // Mirrors the cloud transformer fallback; used when only the subject is available. | ||
| func subjectOnlyIdentity(subject string) *common.EnrichedIdentity { | ||
| if subject == "" { | ||
| return nil | ||
| } | ||
| return &common.EnrichedIdentity{ | ||
| Principal: &common.EnrichedIdentity_User{ | ||
| User: &common.User{ | ||
| Id: &common.UserIdentifier{Subject: subject}, | ||
| }, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| // identitySubject returns the subject of an EnrichedIdentity, or "" if absent. | ||
| func identitySubject(id *common.EnrichedIdentity) string { | ||
| return id.GetUser().GetId().GetSubject() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,200 @@ | ||
| package service | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/http" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/flyteorg/flyte/v2/flytestdlib/logger" | ||
| "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common" | ||
| ) | ||
|
|
||
| const ( | ||
| userinfoHTTPTimeout = 3 * time.Second | ||
| identityCacheTTL = 10 * time.Minute | ||
| oidcDiscoveryPath = "/.well-known/openid-configuration" | ||
| ) | ||
|
|
||
| // identityEnricher fills in a caller's profile (email, first/last name) by calling | ||
| // the OIDC userinfo endpoint with their access token. It is needed on the Bearer | ||
| // path, where the access token carries only the subject — the profile claims live | ||
| // in userinfo, not the token. Results are cached by subject. Every failure mode is | ||
| // best-effort: the caller's unenriched (subject-only) identity is returned instead. | ||
| type identityEnricher struct { | ||
| authServerBaseURL string | ||
| httpClient *http.Client | ||
|
|
||
| mu sync.Mutex | ||
| userinfoURL string // resolved lazily from OIDC discovery, then cached | ||
| cache map[string]cachedClaims | ||
| } | ||
|
|
||
| type cachedClaims struct { | ||
| claims *oidcClaims | ||
| expires time.Time | ||
| } | ||
|
|
||
| // newIdentityEnricher returns an enricher for the given OAuth2 authorization-server | ||
| // base URL (e.g. https://signin.example.com/oauth2/default), or nil when unset — | ||
| // in which case enrich is a no-op and identities stay subject-only. | ||
| func newIdentityEnricher(authServerBaseURL string) *identityEnricher { | ||
| if authServerBaseURL == "" { | ||
| return nil | ||
| } | ||
| return &identityEnricher{ | ||
| authServerBaseURL: strings.TrimRight(authServerBaseURL, "/"), | ||
| httpClient: &http.Client{Timeout: userinfoHTTPTimeout}, | ||
| cache: map[string]cachedClaims{}, | ||
| } | ||
| } | ||
|
|
||
| // enrich fills any profile fields (email, first/last name) missing from base with | ||
| // userinfo claims fetched using the access token. Fields already present on base | ||
| // (e.g. from x-amzn-oidc-data) are authoritative and kept. userinfo is queried only | ||
| // when the profile is incomplete and not cached. base is returned unchanged on any | ||
| // miss or error — enrichment never blocks or fails run creation. | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed: the comment now states that |
||
| func (e *identityEnricher) enrich(ctx context.Context, accessToken string, base *common.EnrichedIdentity) *common.EnrichedIdentity { | ||
| if e == nil || base.GetUser() == nil { | ||
| return base | ||
| } | ||
| subject := base.GetUser().GetId().GetSubject() | ||
| if subject == "" { | ||
| return base | ||
| } | ||
| // Serve a previously fetched set of claims without another userinfo round-trip. | ||
| if cached := e.cachedFor(subject); cached != nil { | ||
| return mergeClaims(base, cached) | ||
| } | ||
| if isCompleteProfile(base) || accessToken == "" { | ||
| return base | ||
| } | ||
| claims, err := e.fetchUserinfo(ctx, accessToken) | ||
| if err != nil { | ||
| logger.Warnf(ctx, "identity enrichment: userinfo fetch failed for subject %q: %v", subject, err) | ||
| return base | ||
| } | ||
| e.store(subject, claims) | ||
| return mergeClaims(base, claims) | ||
|
Comment on lines
+77
to
+89
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in |
||
| } | ||
|
|
||
| func (e *identityEnricher) cachedFor(subject string) *oidcClaims { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| if c, ok := e.cache[subject]; ok && time.Now().Before(c.expires) { | ||
| return c.claims | ||
| } | ||
| return nil | ||
| } | ||
|
Comment on lines
+92
to
+105
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in
Comment on lines
+92
to
+105
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in |
||
|
|
||
| func (e *identityEnricher) store(subject string, claims *oidcClaims) { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| e.cache[subject] = cachedClaims{claims: claims, expires: time.Now().Add(identityCacheTTL)} | ||
| } | ||
|
Comment on lines
+107
to
+118
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| func (e *identityEnricher) fetchUserinfo(ctx context.Context, accessToken string) (*oidcClaims, error) { | ||
| url, err := e.resolveUserinfoURL(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| req.Header.Set(authorizationHeader, bearerPrefix+accessToken) | ||
| resp, err := e.httpClient.Do(req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer func() { _ = resp.Body.Close() }() | ||
| if resp.StatusCode != http.StatusOK { | ||
| return nil, fmt.Errorf("userinfo returned status %d", resp.StatusCode) | ||
| } | ||
| var c oidcClaims | ||
| if err := json.NewDecoder(resp.Body).Decode(&c); err != nil { | ||
| return nil, fmt.Errorf("decode userinfo: %w", err) | ||
| } | ||
| return &c, nil | ||
| } | ||
|
|
||
| // resolveUserinfoURL reads userinfo_endpoint from the OIDC discovery document once, | ||
| // then caches it for the life of the process. | ||
| func (e *identityEnricher) resolveUserinfoURL(ctx context.Context) (string, error) { | ||
| e.mu.Lock() | ||
| cached := e.userinfoURL | ||
| e.mu.Unlock() | ||
| if cached != "" { | ||
| return cached, nil | ||
| } | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, e.authServerBaseURL+oidcDiscoveryPath, nil) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| resp, err := e.httpClient.Do(req) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| defer func() { _ = resp.Body.Close() }() | ||
| if resp.StatusCode != http.StatusOK { | ||
| return "", fmt.Errorf("oidc discovery returned status %d", resp.StatusCode) | ||
| } | ||
| var doc struct { | ||
| UserinfoEndpoint string `json:"userinfo_endpoint"` | ||
| } | ||
| if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { | ||
| return "", fmt.Errorf("decode oidc discovery: %w", err) | ||
| } | ||
| if doc.UserinfoEndpoint == "" { | ||
| return "", fmt.Errorf("oidc discovery has no userinfo_endpoint") | ||
| } | ||
| e.mu.Lock() | ||
| e.userinfoURL = doc.UserinfoEndpoint | ||
| e.mu.Unlock() | ||
| return doc.UserinfoEndpoint, nil | ||
| } | ||
|
|
||
| // isCompleteProfile reports whether the identity already carries every profile | ||
| // field, in which case no userinfo lookup is needed. | ||
| func isCompleteProfile(id *common.EnrichedIdentity) bool { | ||
| s := id.GetUser().GetSpec() | ||
| return s.GetFirstName() != "" && s.GetLastName() != "" && s.GetEmail() != "" | ||
| } | ||
|
|
||
| // mergeClaims fills only the profile fields missing from base with the fetched | ||
| // claims; fields already set on base are kept (header-provided values win). | ||
| func mergeClaims(base *common.EnrichedIdentity, c *oidcClaims) *common.EnrichedIdentity { | ||
| if c == nil || base.GetUser() == nil { | ||
| return base | ||
| } | ||
| s := base.GetUser().GetSpec() | ||
| first, last, email := s.GetFirstName(), s.GetLastName(), s.GetEmail() | ||
| if first == "" { | ||
| first = c.GivenName | ||
| } | ||
| if last == "" { | ||
| last = c.FamilyName | ||
| } | ||
| if email == "" { | ||
| email = c.Email | ||
| } | ||
| if first != "" || last != "" || email != "" { | ||
| base.GetUser().Spec = &common.UserSpec{FirstName: first, LastName: last, Email: email} | ||
| } | ||
| return base | ||
| } | ||
|
|
||
| // accessTokenFromHeaders returns the caller's Bearer access token (SDK/JWT path), | ||
| // which the load balancer has validated. The cookie path is not enriched this way: | ||
| // its forwarded access token is short-lived and already expired by request time, so | ||
| // it relies on the claims the proxy injects into x-amzn-oidc-data instead. | ||
| func accessTokenFromHeaders(h http.Header) string { | ||
| if authz := h.Get(authorizationHeader); len(authz) > len(bearerPrefix) && | ||
| strings.EqualFold(authz[:len(bearerPrefix)], bearerPrefix) { | ||
| return strings.TrimSpace(authz[len(bearerPrefix):]) | ||
| } | ||
| return "" | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: the column is now
TEXT(the migration comment notes the OIDCsublength is IdP-dependent and can exceed 255).