Skip to content

Commit d6289c6

Browse files
committed
crates/agent: authorization API uses DB snapshots
Update the API to fetch on-demand snapshots from the DB of authorization state, bounding both the frequency with which snapshots may be taken and also for how long a stale snapshot may be used. A request will either succeed against a current snapshot (happy fast path), or will fail. If it fails, the caller is asked to retry after a given delay, and meanwhile the API will have typically started to refresh the current snapshot. If the request continues to fail against a snapshot which as taken _after_ the issued-at time of the request token, only then is the error considered permanently failed. This means that requests are cheap, evaluted only by the agent-api, but we *also* don't have caching artifacts that could result in false errors when tasks or collections are being created or changed. Instead, we incur a minor delay while a sufficiently-recent snapshot is taken. A final change is that we now prefix match on shard ID templates and journal name templates, rather than munging both into a more-approximate prefix match over catalog names.
1 parent bf7ae34 commit d6289c6

File tree

5 files changed

+425
-175
lines changed

5 files changed

+425
-175
lines changed

crates/agent-sql/src/data_plane.rs

-67
Original file line numberDiff line numberDiff line change
@@ -68,70 +68,3 @@ pub async fn fetch_data_planes(
6868

6969
Ok(r.into_iter().collect())
7070
}
71-
72-
pub async fn fetch_data_plane_by_task_and_fqdn(
73-
pool: &sqlx::PgPool,
74-
task_shard: &str,
75-
task_data_plane_fqdn: &str,
76-
) -> sqlx::Result<Option<tables::DataPlane>> {
77-
sqlx::query_as!(
78-
tables::DataPlane,
79-
r#"
80-
select
81-
d.id as "control_id: Id",
82-
d.data_plane_name,
83-
d.data_plane_fqdn,
84-
false as "is_default!: bool",
85-
d.hmac_keys,
86-
d.broker_address,
87-
d.reactor_address,
88-
d.ops_logs_name as "ops_logs_name: models::Collection",
89-
d.ops_stats_name as "ops_stats_name: models::Collection"
90-
from data_planes d
91-
join live_specs t on t.data_plane_id = d.id
92-
where d.data_plane_fqdn = $2 and starts_with($1::text, t.catalog_name)
93-
"#,
94-
task_shard,
95-
task_data_plane_fqdn,
96-
)
97-
.fetch_optional(pool)
98-
.await
99-
}
100-
101-
pub async fn verify_task_authorization(
102-
pool: &sqlx::PgPool,
103-
task_shard: &str,
104-
journal_name_or_prefix: &str,
105-
required_role: &str,
106-
) -> sqlx::Result<Option<(String, models::Collection, models::Id, bool)>> {
107-
let r = sqlx::query!(
108-
r#"
109-
select
110-
t.catalog_name as "task_name: String",
111-
c.catalog_name as "collection_name: models::Collection",
112-
c.data_plane_id as "collection_data_plane_id: models::Id",
113-
exists(
114-
select 1
115-
from internal.task_roles($1, $3::text::grant_capability) r
116-
where starts_with($2, r.role_prefix)
117-
) as "authorized!: bool"
118-
from live_specs t, live_specs c
119-
where starts_with($1, t.catalog_name)
120-
and starts_with($2, c.catalog_name)
121-
"#,
122-
task_shard,
123-
journal_name_or_prefix,
124-
required_role,
125-
)
126-
.fetch_optional(pool)
127-
.await?;
128-
129-
Ok(r.map(|r| {
130-
(
131-
r.task_name,
132-
r.collection_name,
133-
r.collection_data_plane_id,
134-
r.authorized,
135-
)
136-
}))
137-
}

0 commit comments

Comments
 (0)