Skip to content

Commit

Permalink
refactor: Move Relay & Manage to use config providers. (#1081)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Jan 28, 2025
1 parent 180f93d commit 61cc0d2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 71 deletions.
1 change: 1 addition & 0 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl Pail {
relay::Relay {
xds_port,
mds_port,
locality: None,
provider: Some(Providers::File { path }),
}
.run(RunArgs {
Expand Down
2 changes: 1 addition & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Cli {
manager.run(locality, config, ready, shutdown_rx).await
}
(Commands::Relay(relay), Admin::Relay(ready)) => {
relay.run(config, ready, shutdown_rx).await
relay.run(locality, config, ready, shutdown_rx).await
}
_ => unreachable!(),
}
Expand Down
2 changes: 2 additions & 0 deletions src/cli/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ impl Default for Relay {
impl Relay {
pub async fn run(
self,
locality: Option<crate::net::endpoint::Locality>,
config: Arc<Config>,
ready: Ready,
shutdown_rx: crate::signal::ShutdownRx,
) -> crate::Result<()> {
relay::Relay {
xds_port: self.xds_port,
mds_port: self.mds_port,
locality,
provider: self.providers,
}
.run(crate::components::RunArgs {
Expand Down
21 changes: 14 additions & 7 deletions src/components/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,20 @@ impl Manage {
.modify(|map| map.update_unlocated_endpoints(locality.clone()));
}

let provider_task = self.provider.spawn(
config.clone(),
ready.provider_is_healthy.clone(),
self.locality,
self.address_selector,
false,
);
let provider_task = match self.provider {
Providers::Agones {
config_namespace,
gameservers_namespace,
} => crate::config::providersv2::Providers::default()
.k8s()
.k8s_namespace(config_namespace.unwrap_or_default())
.agones()
.agones_namespace(gameservers_namespace),
Providers::File { path } => crate::config::providersv2::Providers::default()
.fs()
.fs_path(path),
}
.spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality);

let _relay_stream = if !self.relay_servers.is_empty() {
tracing::info!("connecting to relay server");
Expand Down
77 changes: 14 additions & 63 deletions src/components/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl Ready {
pub struct Relay {
pub xds_port: u16,
pub mds_port: u16,
pub locality: Option<crate::net::endpoint::Locality>,
pub provider: Option<Providers>,
}

Expand All @@ -59,71 +60,21 @@ impl Relay {
mut shutdown_rx,
}: RunArgs<Ready>,
) -> crate::Result<()> {
let _provider_task = self.provider.map(|provider| {
let config = config.clone();
let provider_is_healthy = ready.provider_is_healthy.clone();
let _provider_task = match self.provider {
Some(Providers::Agones {
config_namespace, ..
}) => crate::config::providersv2::Providers::default()
.k8s()
.k8s_namespace(config_namespace.unwrap_or_default())
.spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality),

match provider {
Providers::Agones {
config_namespace, ..
} => {
let config_namespace = config_namespace.unwrap_or_else(|| "default".into());
let fut = Providers::task(provider_is_healthy.clone(), move || {
let config = config.clone();
let config_namespace = config_namespace.clone();
let provider_is_healthy = provider_is_healthy.clone();
async move {
let client = tokio::time::timeout(
std::time::Duration::from_secs(5),
kube::Client::try_default(),
)
.await??;
Some(Providers::File { path }) => crate::config::providersv2::Providers::default()
.fs()
.fs_path(path)
.spawn_providers(&config, ready.provider_is_healthy.clone(), self.locality),

let configmap_reflector =
crate::config::providers::k8s::update_filters_from_configmap(
client.clone(),
&config_namespace,
config.clone(),
);

use tokio_stream::StreamExt;
tokio::pin!(configmap_reflector);

loop {
match configmap_reflector.next().await {
Some(Ok(_)) => {
provider_is_healthy.store(true, Ordering::SeqCst);
}
Some(Err(error)) => {
provider_is_healthy.store(false, Ordering::SeqCst);
return Err(error);
}
None => {
provider_is_healthy.store(false, Ordering::SeqCst);
break;
}
}
}

tracing::info!("configmap stream ending");
Ok(())
}
});

tokio::spawn(fut)
}
Providers::File { path } => {
tokio::spawn(Providers::task(provider_is_healthy.clone(), move || {
let config = config.clone();
let path = path.clone();
let provider_is_healthy = provider_is_healthy.clone();
async move {
crate::config::watch::fs(config, provider_is_healthy, path, None).await
}
}))
}
}
});
None => tokio::spawn(std::future::pending()),
};

crate::cli::Service::default()
.xds()
Expand Down

0 comments on commit 61cc0d2

Please sign in to comment.