Skip to content

Commit 3bbb970

Browse files
committed
subscription: implement ClientFilterType and ClientFilterFlags
1 parent 45031ab commit 3bbb970

File tree

11 files changed

+217
-44
lines changed

11 files changed

+217
-44
lines changed

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/subscriptions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ async fn edit_filter(subscription: &mut SubscriptionData, matches: &ArgMatches)
617617
warn!("'{}' filter has been set without principals making this subscription apply to nothing.", op)
618618
}
619619

620-
subscription.set_client_filter(Some(ClientFilter::new(op, princs)));
620+
subscription.set_client_filter(Some(ClientFilter::new_legacy(op, princs)));
621621
return Ok(());
622622
}
623623

common/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ deadpool-sqlite = "0.5.0"
2626
openssl = "0.10.66"
2727
postgres-openssl = "0.5.0"
2828
strum = { version = "0.26.1", features = ["derive"] }
29+
bitflags = { version = "2.6.0", features = ["serde"] }
30+
glob = "0.3.1"
2931

3032
[dev-dependencies]
3133
tempfile = "3.14.0"
@@ -68,4 +70,4 @@ assets = [
6870
{ source = "../openwec.conf.sample.toml", dest = "/usr/share/doc/openwec/", mode = "0644", doc = true },
6971
{ source = "../README.md", dest = "/usr/share/doc/openwec/", mode = "0644", doc = true },
7072
{ source = "../doc/*", dest = "/usr/share/doc/openwec/doc/", mode = "0644", doc = true },
71-
]
73+
]

common/src/database/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ pub mod tests {
196196
.set_ignore_channel_error(false)
197197
.set_client_filter(Some(ClientFilter::from(
198198
"Only".to_string(),
199+
"KerberosPrinc".to_string(),
200+
None,
199201
Some("couscous,boulette".to_string()),
200202
)?))
201203
.set_outputs(vec![
@@ -232,7 +234,7 @@ pub mod tests {
232234
);
233235
assert_eq!(
234236
tata.client_filter().unwrap().targets(),
235-
&HashSet::from(["couscous".to_string(), "boulette".to_string()])
237+
HashSet::from(["couscous", "boulette"])
236238
);
237239

238240
assert_eq!(
@@ -298,10 +300,10 @@ pub mod tests {
298300
);
299301
assert_eq!(
300302
tata2.client_filter().unwrap().targets(),
301-
&HashSet::from([
302-
"couscous".to_string(),
303-
"boulette".to_string(),
304-
"semoule".to_string()
303+
HashSet::from([
304+
"couscous",
305+
"boulette",
306+
"semoule"
305307
])
306308
);
307309
assert_eq!(tata2.is_active_for("couscous"), true);
@@ -329,7 +331,7 @@ pub mod tests {
329331
);
330332
assert_eq!(
331333
tata2_clone.client_filter().unwrap().targets(),
332-
&HashSet::from(["boulette".to_string(), "semoule".to_string()])
334+
HashSet::from(["boulette", "semoule"])
333335
);
334336

335337
assert_eq!(tata2_clone.is_active_for("couscous"), true);

common/src/database/postgres.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,11 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
216216
let client_filter_op: Option<String> = row.try_get("client_filter_op")?;
217217

218218
let client_filter = match client_filter_op {
219-
Some(op) => Some(ClientFilter::from(op, row.try_get("client_filter_value")?)?),
219+
Some(op) => {
220+
let client_filter_type: Option<_> = row.try_get("client_filter_type").unwrap();
221+
let client_filter_type = client_filter_type.unwrap_or("KerberosPrinc".to_owned());
222+
Some(ClientFilter::from(op, client_filter_type, row.try_get("client_filter_flags")?, row.try_get("client_filter_value")?)?)
223+
},
220224
None => None
221225
};
222226

@@ -628,6 +632,8 @@ impl Database for PostgresDatabase {
628632

629633
let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?;
630634
let client_filter_op: Option<String> = subscription.client_filter().map(|f| f.operation().to_string());
635+
let client_filter_type = subscription.client_filter().map(|f| f.kind().to_string());
636+
let client_filter_flags = subscription.client_filter().map(|f| f.flags().to_string());
631637
let client_filter_value = subscription.client_filter().and_then(|f| f.targets_to_opt_string());
632638

633639
let count = self
@@ -638,9 +644,9 @@ impl Database for PostgresDatabase {
638644
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
639645
heartbeat_interval, connection_retry_count, connection_retry_interval,
640646
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
641-
ignore_channel_error, client_filter_op, client_filter_value, outputs, locale,
647+
ignore_channel_error, client_filter_op, client_filter_type, client_filter_flags, client_filter_value, outputs, locale,
642648
data_locale)
643-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
649+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
644650
ON CONFLICT (uuid) DO UPDATE SET
645651
version = excluded.version,
646652
revision = excluded.revision,
@@ -658,6 +664,8 @@ impl Database for PostgresDatabase {
658664
content_format = excluded.content_format,
659665
ignore_channel_error = excluded.ignore_channel_error,
660666
client_filter_op = excluded.client_filter_op,
667+
client_filter_type = excluded.client_filter_type,
668+
client_filter_flags = excluded.client_filter_flags,
661669
client_filter_value = excluded.client_filter_value,
662670
outputs = excluded.outputs,
663671
locale = excluded.locale,
@@ -680,6 +688,8 @@ impl Database for PostgresDatabase {
680688
&subscription.content_format().to_string(),
681689
&subscription.ignore_channel_error(),
682690
&client_filter_op,
691+
&client_filter_type,
692+
&client_filter_flags,
683693
&client_filter_value,
684694
&serde_json::to_string(subscription.outputs())?.as_str(),
685695
&subscription.locale(),

common/src/database/schema/postgres/_014_alter_client_filter_in_subscriptions.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ impl PostgresMigration for AlterClientFilterInSubscriptionsTable {
1616
async fn up(&self, tx: &mut Transaction) -> Result<()> {
1717
tx.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_op TO client_filter_op", &[]).await?;
1818
tx.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_value TO client_filter_value", &[]).await?;
19+
tx.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_type TEXT", &[]).await?;
20+
tx.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_flags TEXT", &[]).await?;
1921
Ok(())
2022
}
2123

2224
async fn down(&self, tx: &mut Transaction) -> Result<()> {
2325
tx.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_op TO princs_filter_op", &[]).await?;
2426
tx.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_value TO princs_filter_value", &[]).await?;
27+
tx.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_type", &[]).await?;
28+
tx.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_flags", &[]).await?;
2529
Ok(())
2630
}
2731
}

common/src/database/schema/sqlite/_014_alter_client_filter_in_subscriptions.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ impl SQLiteMigration for AlterClientFilterInSubscriptionsTable {
1717
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
1818
conn.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_value TO client_filter_value", [])
1919
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
20+
conn.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_type TEXT", [])
21+
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
22+
conn.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_flags TEXT", [])
23+
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
2024
Ok(())
2125
}
2226

@@ -25,6 +29,10 @@ impl SQLiteMigration for AlterClientFilterInSubscriptionsTable {
2529
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
2630
conn.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_value TO princs_filter_value", [])
2731
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
32+
conn.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_type", [])
33+
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
34+
conn.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_flags", [])
35+
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
2836
Ok(())
2937
}
3038
}

common/src/database/sqlite.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,11 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
169169
let client_filter_op: Option<String> = row.get("client_filter_op")?;
170170

171171
let client_filter = match client_filter_op {
172-
Some(op) => Some(ClientFilter::from(op, row.get("client_filter_value")?)?),
172+
Some(op) => {
173+
let client_filter_type: Option<_> = row.get("client_filter_type")?;
174+
let client_filter_type = client_filter_type.unwrap_or("KerberosPrinc".to_owned());
175+
Some(ClientFilter::from(op, client_filter_type, row.get("client_filter_flags")?, row.get("client_filter_value")?)?)
176+
},
173177
None => None
174178
};
175179

@@ -553,6 +557,8 @@ impl Database for SQLiteDatabase {
553557
async fn store_subscription(&self, subscription: &SubscriptionData) -> Result<()> {
554558
let subscription = subscription.clone();
555559
let client_filter_op: Option<String> = subscription.client_filter().map(|f| f.operation().to_string());
560+
let client_filter_type = subscription.client_filter().map(|f| f.kind().to_string());
561+
let client_filter_flags = subscription.client_filter().map(|f| f.flags().to_string());
556562
let client_filter_value = subscription.client_filter().and_then(|f| f.targets_to_opt_string());
557563

558564
let count = self
@@ -564,12 +570,12 @@ impl Database for SQLiteDatabase {
564570
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
565571
heartbeat_interval, connection_retry_count, connection_retry_interval,
566572
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
567-
ignore_channel_error, client_filter_op, client_filter_value, outputs, locale,
573+
ignore_channel_error, client_filter_op, client_filter_type, client_filter_flags, client_filter_value, outputs, locale,
568574
data_locale)
569575
VALUES (:uuid, :version, :revision, :name, :uri, :query,
570576
:heartbeat_interval, :connection_retry_count, :connection_retry_interval,
571577
:max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format,
572-
:ignore_channel_error, :client_filter_op, :client_filter_value, :outputs,
578+
:ignore_channel_error, :client_filter_op, :client_filter_type, :client_filter_flags, :client_filter_value, :outputs,
573579
:locale, :data_locale)
574580
ON CONFLICT (uuid) DO UPDATE SET
575581
version = excluded.version,
@@ -588,6 +594,8 @@ impl Database for SQLiteDatabase {
588594
content_format = excluded.content_format,
589595
ignore_channel_error = excluded.ignore_channel_error,
590596
client_filter_op = excluded.client_filter_op,
597+
client_filter_type = excluded.client_filter_type,
598+
client_filter_flags = excluded.client_filter_flags,
591599
client_filter_value = excluded.client_filter_value,
592600
outputs = excluded.outputs,
593601
locale = excluded.locale,
@@ -610,6 +618,8 @@ impl Database for SQLiteDatabase {
610618
":content_format": subscription.content_format().to_string(),
611619
":ignore_channel_error": subscription.ignore_channel_error(),
612620
":client_filter_op": client_filter_op,
621+
":client_filter_type": client_filter_type,
622+
":client_filter_flags": client_filter_flags,
613623
":client_filter_value": client_filter_value,
614624
":outputs": serde_json::to_string(subscription.outputs())?,
615625
":locale": subscription.locale(),

common/src/models/config.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ impl From<ClientFilterOperation> for crate::subscription::ClientFilterOperation
178178
#[serde(deny_unknown_fields)]
179179
struct ClientFilter {
180180
pub operation: ClientFilterOperation,
181+
#[serde(rename = "type", default)]
182+
pub kind: crate::subscription::ClientFilterType,
183+
#[serde(default)]
184+
pub flags: crate::subscription::ClientFilterFlags,
181185
#[serde(alias = "cert_subjects", alias = "princs")]
182186
pub targets: HashSet<String>,
183187
}
@@ -186,7 +190,7 @@ impl TryFrom<ClientFilter> for crate::subscription::ClientFilter {
186190
type Error = anyhow::Error;
187191

188192
fn try_from(value: ClientFilter) -> std::prelude::v1::Result<Self, Self::Error> {
189-
Ok(crate::subscription::ClientFilter::new(value.operation.into(), value.targets))
193+
crate::subscription::ClientFilter::try_new(value.operation.into(), value.kind, value.flags, value.targets)
190194
}
191195
}
192196

@@ -506,7 +510,9 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end"
506510
let mut targets = HashSet::new();
507511
targets.insert("[email protected]".to_string());
508512
targets.insert("[email protected]".to_string());
509-
let filter = crate::subscription::ClientFilter::new(crate::subscription::ClientFilterOperation::Only, targets);
513+
let kind = crate::subscription::ClientFilterType::KerberosPrinc;
514+
let flags = crate::subscription::ClientFilterFlags::empty();
515+
let filter = crate::subscription::ClientFilter::try_new(crate::subscription::ClientFilterOperation::Only, kind, flags, targets)?;
510516

511517
expected.set_client_filter(Some(filter));
512518

common/src/models/export.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ mod v1 {
202202
return None;
203203
};
204204

205-
Some(crate::subscription::ClientFilter::new(op.into(), self.princs))
205+
Some(crate::subscription::ClientFilter::new_legacy(op.into(), self.princs))
206206
}
207207
}
208208

@@ -549,15 +549,15 @@ pub mod v2 {
549549
return None;
550550
};
551551

552-
Some(crate::subscription::ClientFilter::new(op.into(), self.princs))
552+
Some(crate::subscription::ClientFilter::new_legacy(op.into(), self.princs))
553553
}
554554
}
555555

556556
impl From<Option<crate::subscription::ClientFilter>> for PrincsFilter {
557557
fn from(value: Option<crate::subscription::ClientFilter>) -> Self {
558558
Self {
559559
operation: value.as_ref().and_then(|f| Some(f.operation().clone().into())),
560-
princs: value.map_or(HashSet::new(), |f| f.targets().clone()),
560+
princs: value.map_or(HashSet::new(), |f| f.targets().iter().cloned().map(String::from).collect()),
561561
}
562562
}
563563
}
@@ -712,10 +712,12 @@ mod tests {
712712
.set_max_elements(Some(100))
713713
.set_read_existing_events(false)
714714
.set_uri(Some("toto".to_string()))
715-
.set_client_filter(Some(crate::subscription::ClientFilter::new(
715+
.set_client_filter(Some(crate::subscription::ClientFilter::try_new(
716716
crate::subscription::ClientFilterOperation::Except,
717+
crate::subscription::ClientFilterType::KerberosPrinc,
718+
crate::subscription::ClientFilterFlags::CaseSensitive,
717719
targets,
718-
)))
720+
)?))
719721
.set_outputs(vec![crate::subscription::SubscriptionOutput::new(
720722
crate::subscription::SubscriptionOutputFormat::Json,
721723
crate::subscription::SubscriptionOutputDriver::Tcp(

0 commit comments

Comments
 (0)