diff --git a/server/src/cli.rs b/server/src/cli.rs index 1220e2a7..024de87a 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -314,6 +314,12 @@ pub struct InternalBackstageArgs { /// Used to show tokens used to refresh feature caches, but also tokens already validated/invalidated against upstream #[clap(long, env, global = true)] pub disable_tokens_endpoint: bool, + + /// Disables /internal-backstage/instancedata endpoint + /// + /// Used to show instance data for the edge instance. + #[clap(long, env, global = true)] + pub disable_instance_data_endpoint: bool, } #[derive(Args, Debug, Clone)] diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 8b331110..899baf9f 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -7,6 +7,7 @@ use crate::filters::{ use crate::http::broadcaster::Broadcaster; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; +use crate::metrics::edge_metrics::EdgeInstanceData; use crate::tokens::cache_key; use crate::types::{ self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters, @@ -15,6 +16,8 @@ use actix_web::web::{self, Data, Json, Query}; use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; +use tokio::sync::RwLock; +use tracing::{info, instrument}; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; @@ -272,6 +275,26 @@ pub async fn post_bulk_metrics( ); Ok(HttpResponse::Accepted().finish()) } + +#[utoipa::path(context_path = "/api/client", responses((status = 202, description = "Accepted Instance data"), (status = 403, description = "Was not allowed to post instance data")), request_body = EdgeInstanceData, security( +("Authorization" = []) +) +)] +#[post("/metrics/edge")] +#[instrument(skip(_edge_token, instance_data, connected_instances))] +pub async fn post_edge_instance_data( + _edge_token: EdgeToken, + instance_data: Json, + connected_instances: Data>>, +) -> EdgeResult { + info!("Accepted {instance_data:?}"); + connected_instances + .write() + .await + .push(instance_data.into_inner()); + Ok(HttpResponse::Accepted().finish()) +} + pub fn configure_client_api(cfg: &mut web::ServiceConfig) { let client_scope = web::scope("/client") .wrap(crate::middleware::as_async_middleware::as_async_middleware( @@ -282,7 +305,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) { .service(register) .service(metrics) .service(post_bulk_metrics) - .service(stream_features); + .service(stream_features) + .service(post_edge_instance_data); cfg.service(client_scope); } @@ -1358,7 +1382,6 @@ mod tests { let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let token_header = TokenHeader::from_str("NeedsToBeTested").unwrap(); - println!("token_header: {:?}", token_header); let app = test::init_service( App::new() .app_data(Data::from(features_cache.clone())) diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs new file mode 100644 index 00000000..5aba48f1 --- /dev/null +++ b/server/src/http/feature_refresher.rs @@ -0,0 +1,1593 @@ +use std::collections::HashSet; +use std::{sync::Arc, time::Duration}; + +use actix_web::http::header::EntityTag; +use chrono::Utc; +use dashmap::DashMap; +use eventsource_client::Client; +use futures::TryStreamExt; +use reqwest::StatusCode; +use tracing::{debug, info, warn}; +use unleash_types::client_features::ClientFeatures; +use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; +use unleash_yggdrasil::EngineState; + +use crate::error::{EdgeError, FeatureError}; +use crate::feature_cache::FeatureCache; +use crate::filters::{filter_client_features, FeatureFilterSet}; +use crate::http::headers::{ + UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, +}; +use crate::types::{build, EdgeResult, TokenType, TokenValidationStatus}; +use crate::{ + persistence::EdgePersistence, + tokens::{cache_key, simplify}, + types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, +}; + +use super::unleash_client::{ClientMetaInformation, UnleashClient}; + +fn frontend_token_is_covered_by_tokens( + frontend_token: &EdgeToken, + tokens_to_refresh: Arc>, +) -> bool { + tokens_to_refresh.iter().any(|client_token| { + client_token + .token + .same_environment_and_broader_or_equal_project_access(frontend_token) + }) +} + +#[derive(Clone)] +pub struct FeatureRefresher { + pub unleash_client: Arc, + pub tokens_to_refresh: Arc>, + pub features_cache: Arc, + pub engine_cache: Arc>, + pub refresh_interval: chrono::Duration, + pub persistence: Option>, + pub strict: bool, + pub streaming: bool, + pub client_meta_information: ClientMetaInformation, +} + +impl Default for FeatureRefresher { + fn default() -> Self { + Self { + refresh_interval: chrono::Duration::seconds(10), + unleash_client: Default::default(), + tokens_to_refresh: Arc::new(DashMap::default()), + features_cache: Arc::new(Default::default()), + engine_cache: Default::default(), + persistence: None, + strict: true, + streaming: false, + client_meta_information: Default::default(), + } + } +} + +fn client_application_from_token_and_name( + token: EdgeToken, + refresh_interval: i64, + client_meta_information: ClientMetaInformation, +) -> ClientApplication { + ClientApplication { + app_name: client_meta_information.app_name, + connect_via: None, + environment: token.environment, + instance_id: Some(client_meta_information.instance_id), + interval: refresh_interval as u32, + started: Utc::now(), + strategies: vec![], + metadata: MetricsMetadata { + platform_name: None, + platform_version: None, + sdk_version: Some(format!("unleash-edge:{}", build::PKG_VERSION)), + yggdrasil_version: None, + }, + } +} + +#[derive(Eq, PartialEq)] +pub enum FeatureRefresherMode { + Dynamic, + Streaming, + Strict, +} + +pub struct FeatureRefreshConfig { + features_refresh_interval: chrono::Duration, + mode: FeatureRefresherMode, + client_meta_information: ClientMetaInformation, +} + +impl FeatureRefreshConfig { + pub fn new( + features_refresh_interval: chrono::Duration, + mode: FeatureRefresherMode, + client_meta_information: ClientMetaInformation, + ) -> Self { + Self { + features_refresh_interval, + mode, + client_meta_information, + } + } +} + +impl FeatureRefresher { + pub fn new( + unleash_client: Arc, + features_cache: Arc, + engines: Arc>, + persistence: Option>, + config: FeatureRefreshConfig, + ) -> Self { + FeatureRefresher { + unleash_client, + tokens_to_refresh: Arc::new(DashMap::default()), + features_cache, + engine_cache: engines, + refresh_interval: config.features_refresh_interval, + persistence, + strict: config.mode != FeatureRefresherMode::Dynamic, + streaming: config.mode == FeatureRefresherMode::Streaming, + client_meta_information: config.client_meta_information, + } + } + + pub fn with_client(client: Arc) -> Self { + Self { + unleash_client: client, + ..Default::default() + } + } + + pub(crate) fn get_tokens_due_for_refresh(&self) -> Vec { + self.tokens_to_refresh + .iter() + .map(|e| e.value().clone()) + .filter(|token| { + token + .next_refresh + .map(|refresh| Utc::now() > refresh) + .unwrap_or(true) + }) + .collect() + } + + pub(crate) fn get_tokens_never_refreshed(&self) -> Vec { + self.tokens_to_refresh + .iter() + .map(|e| e.value().clone()) + .filter(|token| token.last_refreshed.is_none() && token.last_check.is_none()) + .collect() + } + + pub(crate) fn token_is_subsumed(&self, token: &EdgeToken) -> bool { + self.tokens_to_refresh + .iter() + .filter(|r| r.token.environment == token.environment) + .any(|t| t.token.subsumes(token)) + } + + pub(crate) fn frontend_token_is_covered_by_client_token( + &self, + frontend_token: &EdgeToken, + ) -> bool { + frontend_token_is_covered_by_tokens(frontend_token, self.tokens_to_refresh.clone()) + } + + /// This method no longer returns any data. Its responsibility lies in adding the token to our + /// list of tokens to perform refreshes for, as well as calling out to hydrate tokens that we haven't seen before. + /// Other tokens will be refreshed due to the scheduled task that refreshes tokens that haven been refreshed in ${refresh_interval} seconds + pub(crate) async fn register_and_hydrate_token(&self, token: &EdgeToken) { + self.register_token_for_refresh(token.clone(), None).await; + self.hydrate_new_tokens().await; + } + + pub(crate) async fn create_client_token_for_fe_token( + &self, + token: EdgeToken, + ) -> EdgeResult<()> { + if token.status == TokenValidationStatus::Validated + && token.token_type == Some(TokenType::Frontend) + { + if !self.frontend_token_is_covered_by_client_token(&token) { + warn!("The frontend token access is not covered by our current client tokens"); + Err(EdgeError::EdgeTokenError) + } else { + debug!("It is already covered by an existing client token. Doing nothing"); + Ok(()) + } + } else { + debug!("Token is not validated or is not a frontend token. Doing nothing"); + Ok(()) + } + } + + pub(crate) async fn features_for_filter( + &self, + token: EdgeToken, + filters: &FeatureFilterSet, + ) -> EdgeResult { + match self.get_features_by_filter(&token, filters) { + Some(features) if self.token_is_subsumed(&token) => Ok(features), + _ => { + if self.strict { + debug!("Strict behavior: Token is not subsumed by any registered tokens. Returning error"); + Err(EdgeError::InvalidTokenWithStrictBehavior) + } else { + debug!( + "Dynamic behavior: Had never seen this environment. Configuring fetcher" + ); + self.register_and_hydrate_token(&token).await; + self.get_features_by_filter(&token, filters).ok_or_else(|| { + EdgeError::ClientHydrationFailed( + "Failed to get features by filter after registering and hydrating token (This is very likely an error in Edge. Please report this!)" + .into(), + ) + }) + } + } + } + } + + fn get_features_by_filter( + &self, + token: &EdgeToken, + filters: &FeatureFilterSet, + ) -> Option { + self.features_cache + .get(&cache_key(token)) + .map(|client_features| filter_client_features(&client_features, filters)) + } + + /// + /// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token + pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option) { + if !self.tokens_to_refresh.contains_key(&token.token) { + self.unleash_client + .register_as_client( + token.token.clone(), + client_application_from_token_and_name( + token.clone(), + self.refresh_interval.num_seconds(), + self.client_meta_information.clone(), + ), + ) + .await + .unwrap_or_default(); + let mut registered_tokens: Vec = + self.tokens_to_refresh.iter().map(|t| t.clone()).collect(); + registered_tokens.push(TokenRefresh::new(token.clone(), etag)); + let minimum = simplify(®istered_tokens); + let mut keys = HashSet::new(); + for refreshes in minimum { + keys.insert(refreshes.token.token.clone()); + self.tokens_to_refresh + .insert(refreshes.token.token.clone(), refreshes.clone()); + } + self.tokens_to_refresh.retain(|key, _| keys.contains(key)); + } + } + + /// This is where we set up a listener per token. + pub async fn start_streaming_features_background_task( + &self, + client_meta_information: ClientMetaInformation, + custom_headers: Vec<(String, String)>, + ) -> anyhow::Result<()> { + use anyhow::Context; + + let refreshes = self.get_tokens_due_for_refresh(); + for refresh in refreshes { + let token = refresh.token.clone(); + let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str(); + + let mut es_client_builder = eventsource_client::ClientBuilder::for_url(streaming_url) + .context("Failed to create EventSource client for streaming")? + .header("Authorization", &token.token)? + .header(UNLEASH_APPNAME_HEADER, &client_meta_information.app_name)? + .header( + UNLEASH_INSTANCE_ID_HEADER, + &client_meta_information.instance_id, + )? + .header( + UNLEASH_CLIENT_SPEC_HEADER, + unleash_yggdrasil::SUPPORTED_SPEC_VERSION, + )?; + + for (key, value) in custom_headers.clone() { + es_client_builder = es_client_builder.header(&key, &value)?; + } + + let es_client = es_client_builder + .reconnect( + eventsource_client::ReconnectOptions::reconnect(true) + .retry_initial(true) + .delay(Duration::from_secs(5)) + .delay_max(Duration::from_secs(30)) + .backoff_factor(2) + .build(), + ) + .build(); + + let refresher = self.clone(); + + tokio::spawn(async move { + let mut stream = es_client + .stream() + .map_ok(move |sse| { + let token = token.clone(); + let refresher = refresher.clone(); + async move { + match sse { + // The first time we're connecting to Unleash. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + debug!( + "Connected to unleash! Populating my flag cache now.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + // Unleash has updated features for us. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + debug!( + "Got an unleash updated event. Updating cache.", + ); + + match serde_json::from_str(&event.data) { + Ok(features) => { refresher.handle_client_features_updated(&token, features, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + eventsource_client::SSE::Event(event) => { + info!( + "Got an SSE event that I wasn't expecting: {:#?}", + event + ); + } + eventsource_client::SSE::Connected(_) => { + debug!("SSE Connection established"); + } + eventsource_client::SSE::Comment(_) => { + // purposefully left blank. + }, + } + } + }) + .map_err(|e| warn!("Error in SSE stream: {:?}", e)); + + loop { + match stream.try_next().await { + Ok(Some(handler)) => handler.await, + Ok(None) => { + info!("SSE stream ended? Handler was None, anyway. Reconnecting."); + } + Err(e) => { + info!("SSE stream error: {e:?}. Reconnecting"); + } + } + } + }); + } + Ok(()) + } + + pub async fn start_refresh_features_background_task(&self) { + if self.streaming { + loop { + tokio::time::sleep(Duration::from_secs(3600)).await; + } + } else { + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + self.refresh_features().await; + } + } + } + } + } + + pub async fn hydrate_new_tokens(&self) { + let hydrations = self.get_tokens_never_refreshed(); + for hydration in hydrations { + self.refresh_single(hydration).await; + } + } + pub async fn refresh_features(&self) { + let refreshes = self.get_tokens_due_for_refresh(); + for refresh in refreshes { + self.refresh_single(refresh).await; + } + } + + async fn handle_client_features_updated( + &self, + refresh_token: &EdgeToken, + features: ClientFeatures, + etag: Option, + ) { + debug!("Got updated client features. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.update_last_refresh(refresh_token, etag, features.features.len()); + self.features_cache + .modify(key.clone(), refresh_token, features.clone()); + self.engine_cache + .entry(key.clone()) + .and_modify(|engine| { + if let Some(f) = self.features_cache.get(&key) { + let mut new_state = EngineState::default(); + let warnings = new_state.take_state(f.clone()); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + *engine = new_state; + + } + }) + .or_insert_with(|| { + let mut new_state = EngineState::default(); + + let warnings = new_state.take_state(features); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + new_state + }); + } + + pub async fn refresh_single(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesResponse::Updated(features, etag) => { + self.handle_client_features_updated(&refresh.token, features, etag) + .await + } + }, + Err(e) => { + match e { + EdgeError::ClientFeaturesFetchError(fe) => { + match fe { + FeatureError::Retriable(status_code) => match status_code { + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + info!("Upstream is having some problems, increasing my waiting period"); + self.backoff(&refresh.token); + } + StatusCode::TOO_MANY_REQUESTS => { + info!("Got told that upstream is receiving too many requests"); + self.backoff(&refresh.token); + } + _ => { + info!("Couldn't refresh features, but will retry next go") + } + }, + FeatureError::AccessDenied => { + info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); + self.tokens_to_refresh.remove(&refresh.token.token); + if !self.tokens_to_refresh.iter().any(|e| { + e.value().token.environment == refresh.token.environment + }) { + let cache_key = cache_key(&refresh.token); + // No tokens left that access the environment of our current refresh. Deleting client features and engine cache + self.features_cache.remove(&cache_key); + self.engine_cache.remove(&cache_key); + } + } + FeatureError::NotFound => { + info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"); + self.backoff(&refresh.token); + } + } + } + EdgeError::ClientCacheError => { + info!("Couldn't refresh features, but will retry next go") + } + _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"), + } + } + } + } + pub fn backoff(&self, token: &EdgeToken) { + self.tokens_to_refresh + .alter(&token.token, |_k, old_refresh| { + old_refresh.backoff(&self.refresh_interval) + }); + } + pub fn update_last_check(&self, token: &EdgeToken) { + self.tokens_to_refresh + .alter(&token.token, |_k, old_refresh| { + old_refresh.successful_check(&self.refresh_interval) + }); + } + + pub fn update_last_refresh( + &self, + token: &EdgeToken, + etag: Option, + feature_count: usize, + ) { + self.tokens_to_refresh + .alter(&token.token, |_k, old_refresh| { + old_refresh.successful_refresh(&self.refresh_interval, etag, feature_count) + }); + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + use std::sync::Arc; + + use actix_http::HttpService; + use actix_http_test::{test_server, TestServer}; + use actix_service::map_config; + use actix_web::dev::AppConfig; + use actix_web::http::header::EntityTag; + use actix_web::{web, App}; + use chrono::{Duration, Utc}; + use dashmap::DashMap; + use reqwest::Url; + use unleash_types::client_features::ClientFeature; + use unleash_yggdrasil::EngineState; + + use crate::feature_cache::{update_projects_from_feature_update, FeatureCache}; + use crate::filters::{project_filter, FeatureFilterSet}; + use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation}; + use crate::tests::features_from_disk; + use crate::tokens::cache_key; + use crate::types::TokenValidationStatus::Validated; + use crate::types::{TokenType, TokenValidationStatus}; + use crate::{ + http::unleash_client::UnleashClient, + types::{EdgeToken, TokenRefresh}, + }; + + use super::{frontend_token_is_covered_by_tokens, FeatureRefresher}; + + impl PartialEq for TokenRefresh { + fn eq(&self, other: &Self) -> bool { + self.token == other.token + && self.etag == other.etag + && self.last_refreshed == other.last_refreshed + && self.last_check == other.last_check + } + } + + fn create_test_client() -> UnleashClient { + let http_client = new_reqwest_client( + false, + None, + None, + Duration::seconds(5), + Duration::seconds(5), + ClientMetaInformation::test_config(), + ) + .expect("Failed to create client"); + + UnleashClient::from_url( + Url::parse("http://localhost:4242").unwrap(), + "Authorization".to_string(), + http_client, + ) + } + + #[tokio::test] + pub async fn registering_token_for_refresh_works() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let token = + EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + feature_refresher + .register_token_for_refresh(token, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 1); + } + + #[tokio::test] + pub async fn registering_multiple_tokens_with_same_environment_reduces_tokens_to_valid_minimal_set( + ) { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let token1 = + EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + let token2 = + EdgeToken::try_from("*:development.zyxwvutsrqponmlkjihgfedcba".to_string()).unwrap(); + feature_refresher + .register_token_for_refresh(token1, None) + .await; + feature_refresher + .register_token_for_refresh(token2, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 1); + } + + #[tokio::test] + pub async fn registering_multiple_non_overlapping_tokens_will_keep_all() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let project_a_token = + EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_b_token = + EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_c_token = + EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + feature_refresher + .register_token_for_refresh(project_a_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_b_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_c_token, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 3); + } + + #[tokio::test] + pub async fn registering_wildcard_project_token_only_keeps_the_wildcard() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let project_a_token = + EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_b_token = + EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_c_token = + EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let wildcard_token = + EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + + feature_refresher + .register_token_for_refresh(project_a_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_b_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_c_token, None) + .await; + feature_refresher + .register_token_for_refresh(wildcard_token, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 1); + assert!(feature_refresher + .tokens_to_refresh + .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")) + } + + #[tokio::test] + pub async fn registering_tokens_with_multiple_projects_overwrites_single_tokens() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let project_a_token = + EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_b_token = + EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let project_c_token = + EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let mut project_a_and_c_token = + EdgeToken::try_from("[]:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + project_a_and_c_token.projects = vec!["projecta".into(), "projectc".into()]; + + feature_refresher + .register_token_for_refresh(project_a_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_b_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_c_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_a_and_c_token, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 2); + assert!(feature_refresher + .tokens_to_refresh + .contains_key("[]:development.abcdefghijklmnopqrstuvwxyz")); + assert!(feature_refresher + .tokens_to_refresh + .contains_key("projectb:development.abcdefghijklmnopqrstuvwxyz")); + } + + #[tokio::test] + pub async fn registering_a_token_that_is_already_subsumed_does_nothing() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let star_token = + EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + let project_a_token = + EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + + feature_refresher + .register_token_for_refresh(star_token, None) + .await; + feature_refresher + .register_token_for_refresh(project_a_token, None) + .await; + + assert_eq!(feature_refresher.tokens_to_refresh.len(), 1); + assert!(feature_refresher + .tokens_to_refresh + .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")); + } + + #[tokio::test] + pub async fn simplification_only_happens_in_same_environment() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let project_a_token = + EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string()) + .unwrap(); + let production_wildcard_token = + EdgeToken::try_from("*:production.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap(); + feature_refresher + .register_token_for_refresh(project_a_token, None) + .await; + feature_refresher + .register_token_for_refresh(production_wildcard_token, None) + .await; + assert_eq!(feature_refresher.tokens_to_refresh.len(), 2); + } + + #[tokio::test] + pub async fn is_able_to_only_fetch_for_tokens_due_to_refresh() { + let unleash_client = create_test_client(); + let features_cache = Arc::new(FeatureCache::default()); + let engine_cache = Arc::new(DashMap::default()); + + let duration = Duration::seconds(5); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: duration, + ..Default::default() + }; + let no_etag_due_for_refresh_token = + EdgeToken::try_from("projecta:development.no_etag_due_for_refresh_token".to_string()) + .unwrap(); + let no_etag_so_is_due_for_refresh = TokenRefresh { + token: no_etag_due_for_refresh_token, + etag: None, + next_refresh: None, + last_refreshed: None, + last_check: None, + failure_count: 0, + last_feature_count: None, + }; + let etag_and_last_refreshed_token = + EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string()) + .unwrap(); + let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh { + token: etag_and_last_refreshed_token, + etag: Some(EntityTag::new_weak("abcde".into())), + next_refresh: Some(Utc::now() + Duration::seconds(10)), + last_refreshed: Some(Utc::now()), + last_check: Some(Utc::now()), + failure_count: 0, + last_feature_count: None, + }; + let etag_but_old_token = + EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap(); + + let ten_seconds_ago = Utc::now() - Duration::seconds(10); + let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh { + token: etag_but_old_token, + etag: Some(EntityTag::new_weak("abcde".into())), + next_refresh: None, + last_refreshed: Some(ten_seconds_ago), + last_check: Some(ten_seconds_ago), + failure_count: 0, + last_feature_count: None, + }; + feature_refresher.tokens_to_refresh.insert( + etag_but_last_refreshed_ten_seconds_ago.token.token.clone(), + etag_but_last_refreshed_ten_seconds_ago.clone(), + ); + feature_refresher.tokens_to_refresh.insert( + etag_and_last_refreshed_less_than_duration_ago + .token + .token + .clone(), + etag_and_last_refreshed_less_than_duration_ago, + ); + feature_refresher.tokens_to_refresh.insert( + no_etag_so_is_due_for_refresh.token.token.clone(), + no_etag_so_is_due_for_refresh.clone(), + ); + let tokens_to_refresh = feature_refresher.get_tokens_due_for_refresh(); + assert_eq!(tokens_to_refresh.len(), 2); + assert!(tokens_to_refresh.contains(&etag_but_last_refreshed_ten_seconds_ago)); + assert!(tokens_to_refresh.contains(&no_etag_so_is_due_for_refresh)); + } + + async fn client_api_test_server( + upstream_token_cache: Arc>, + upstream_features_cache: Arc, + upstream_engine_cache: Arc>, + ) -> TestServer { + test_server(move || { + HttpService::new(map_config( + App::new() + .app_data(web::Data::from(upstream_features_cache.clone())) + .app_data(web::Data::from(upstream_engine_cache.clone())) + .app_data(web::Data::from(upstream_token_cache.clone())) + .service(web::scope("/api").configure(crate::client_api::configure_client_api)), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } + #[tokio::test] + pub async fn getting_403_when_refreshing_features_will_remove_token() { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let features_cache: Arc = Arc::new(FeatureCache::default()); + let engine_cache: Arc> = Arc::new(DashMap::default()); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: Duration::seconds(60), + ..Default::default() + }; + let mut token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap(); + token.status = Validated; + token.token_type = Some(TokenType::Client); + feature_refresher + .register_token_for_refresh(token, None) + .await; + assert!(!feature_refresher.tokens_to_refresh.is_empty()); + feature_refresher.refresh_features().await; + assert!(feature_refresher.tokens_to_refresh.is_empty()); + assert!(feature_refresher.features_cache.is_empty()); + assert!(feature_refresher.engine_cache.is_empty()); + } + + #[tokio::test] + pub async fn getting_404_removes_tokens_from_token_to_refresh_but_not_its_features() { + let mut token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap(); + token.status = Validated; + token.token_type = Some(TokenType::Client); + let token_cache = DashMap::default(); + token_cache.insert(token.token.clone(), token.clone()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(token_cache); + let example_features = features_from_disk("../examples/features.json"); + let cache_key = cache_key(&token); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + upstream_engine_cache.insert(cache_key.clone(), engine_state); + let mut server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let features_cache: Arc = Arc::new(FeatureCache::default()); + let engine_cache: Arc> = Arc::new(DashMap::default()); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache, + engine_cache, + refresh_interval: Duration::milliseconds(1), + ..Default::default() + }; + feature_refresher + .register_token_for_refresh(token, None) + .await; + assert!(!feature_refresher.tokens_to_refresh.is_empty()); + feature_refresher.refresh_features().await; + assert!(!feature_refresher.tokens_to_refresh.is_empty()); + assert!(!feature_refresher.features_cache.is_empty()); + assert!(!feature_refresher.engine_cache.is_empty()); + server.stop().await; + tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due + feature_refresher.refresh_features().await; + assert_eq!( + feature_refresher + .tokens_to_refresh + .get("*:development.secret123") + .unwrap() + .failure_count, + 1 + ); + assert!(!feature_refresher.features_cache.is_empty()); + assert!(!feature_refresher.engine_cache.is_empty()); + assert!(warnings.is_none()); + } + + #[tokio::test] + pub async fn when_we_have_a_cache_and_token_gets_removed_caches_are_emptied() { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let token_cache_to_modify = upstream_token_cache.clone(); + let mut valid_token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap(); + valid_token.token_type = Some(TokenType::Client); + valid_token.status = Validated; + upstream_token_cache.insert(valid_token.token.clone(), valid_token.clone()); + let example_features = features_from_disk("../examples/features.json"); + let cache_key = cache_key(&valid_token); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + upstream_engine_cache.insert(cache_key.clone(), engine_state); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let mut feature_refresher = FeatureRefresher::with_client(Arc::new(unleash_client)); + feature_refresher.refresh_interval = Duration::seconds(0); + feature_refresher + .register_token_for_refresh(valid_token.clone(), None) + .await; + assert!(!feature_refresher.tokens_to_refresh.is_empty()); + feature_refresher.refresh_features().await; + assert!(!feature_refresher.features_cache.is_empty()); + assert!(!feature_refresher.engine_cache.is_empty()); + token_cache_to_modify.remove(&valid_token.token); + feature_refresher.refresh_features().await; + assert!(feature_refresher.tokens_to_refresh.is_empty()); + assert!(feature_refresher.features_cache.is_empty()); + assert!(feature_refresher.engine_cache.is_empty()); + assert!(warnings.is_none()); + } + + #[tokio::test] + pub async fn removing_one_of_multiple_keys_from_same_environment_does_not_remove_feature_and_engine_caches( + ) { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let token_cache_to_modify = upstream_token_cache.clone(); + let mut dx_token = EdgeToken::try_from("dx:development.secret123".to_string()).unwrap(); + dx_token.token_type = Some(TokenType::Client); + dx_token.status = Validated; + upstream_token_cache.insert(dx_token.token.clone(), dx_token.clone()); + let mut eg_token = EdgeToken::try_from("eg:development.secret123".to_string()).unwrap(); + eg_token.token_type = Some(TokenType::Client); + eg_token.status = Validated; + upstream_token_cache.insert(eg_token.token.clone(), eg_token.clone()); + let example_features = features_from_disk("../examples/hostedexample.json"); + let cache_key = cache_key(&dx_token); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + upstream_engine_cache.insert(cache_key.clone(), engine_state); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let mut feature_refresher = FeatureRefresher::with_client(Arc::new(unleash_client)); + feature_refresher.refresh_interval = Duration::seconds(0); + feature_refresher + .register_token_for_refresh(dx_token.clone(), None) + .await; + feature_refresher + .register_token_for_refresh(eg_token.clone(), None) + .await; + assert_eq!(feature_refresher.tokens_to_refresh.len(), 2); + assert_eq!(feature_refresher.features_cache.len(), 0); + assert_eq!(feature_refresher.engine_cache.len(), 0); + feature_refresher.refresh_features().await; + assert_eq!(feature_refresher.features_cache.len(), 1); + assert_eq!(feature_refresher.engine_cache.len(), 1); + token_cache_to_modify.remove(&dx_token.token); + feature_refresher.refresh_features().await; + assert_eq!(feature_refresher.tokens_to_refresh.len(), 1); + assert_eq!(feature_refresher.features_cache.len(), 1); + assert_eq!(feature_refresher.engine_cache.len(), 1); + assert!(warnings.is_none()); + } + + #[tokio::test] + pub async fn fetching_two_projects_from_same_environment_should_get_features_for_both_when_dynamic( + ) { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let mut dx_token = EdgeToken::try_from("dx:development.secret123".to_string()).unwrap(); + dx_token.token_type = Some(TokenType::Client); + dx_token.status = Validated; + upstream_token_cache.insert(dx_token.token.clone(), dx_token.clone()); + let mut eg_token = EdgeToken::try_from("eg:development.secret123".to_string()).unwrap(); + eg_token.token_type = Some(TokenType::Client); + eg_token.status = Validated; + upstream_token_cache.insert(eg_token.token.clone(), eg_token.clone()); + let example_features = features_from_disk("../examples/hostedexample.json"); + let cache_key = cache_key(&dx_token); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_engine_cache.insert(cache_key, engine_state); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let mut feature_refresher = FeatureRefresher::with_client(Arc::new(unleash_client)); + feature_refresher.strict = false; + feature_refresher.refresh_interval = Duration::seconds(0); + let dx_features = feature_refresher + .features_for_filter( + dx_token.clone(), + &FeatureFilterSet::from(project_filter(&dx_token)), + ) + .await + .expect("No dx features"); + assert!(dx_features + .features + .iter() + .all(|f| f.project == Some("dx".into()))); + assert_eq!(dx_features.features.len(), 16); + let eg_features = feature_refresher + .features_for_filter( + eg_token.clone(), + &FeatureFilterSet::from(project_filter(&eg_token)), + ) + .await + .expect("Could not get eg features"); + assert_eq!(eg_features.features.len(), 7); + assert!(eg_features + .features + .iter() + .all(|f| f.project == Some("eg".into()))); + assert!(warnings.is_none()); + } + + #[tokio::test] + pub async fn should_get_data_for_multi_project_token_even_if_we_have_data_for_one_of_the_projects_when_dynamic( + ) { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let mut dx_token = EdgeToken::from_str("dx:development.secret123").unwrap(); + dx_token.token_type = Some(TokenType::Client); + dx_token.status = Validated; + upstream_token_cache.insert(dx_token.token.clone(), dx_token.clone()); + let mut multitoken = EdgeToken::from_str("[]:development.secret321").unwrap(); + multitoken.token_type = Some(TokenType::Client); + multitoken.status = Validated; + multitoken.projects = vec!["dx".into(), "eg".into()]; + upstream_token_cache.insert(multitoken.token.clone(), multitoken.clone()); + let mut eg_token = EdgeToken::from_str("eg:development.devsecret").unwrap(); + eg_token.token_type = Some(TokenType::Client); + eg_token.status = Validated; + upstream_token_cache.insert(eg_token.token.clone(), eg_token.clone()); + let example_features = features_from_disk("../examples/hostedexample.json"); + let cache_key = cache_key(&dx_token); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_engine_cache.insert(cache_key, engine_state); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let mut feature_refresher = FeatureRefresher::with_client(Arc::new(unleash_client)); + feature_refresher.strict = false; + feature_refresher.refresh_interval = Duration::seconds(0); + let dx_features = feature_refresher + .features_for_filter( + dx_token.clone(), + &FeatureFilterSet::from(project_filter(&dx_token)), + ) + .await + .expect("No dx features found"); + assert_eq!(dx_features.features.len(), 16); + let unleash_cloud_features = feature_refresher + .features_for_filter( + multitoken.clone(), + &FeatureFilterSet::from(project_filter(&multitoken)), + ) + .await + .expect("No multi features"); + assert_eq!( + unleash_cloud_features + .features + .iter() + .filter(|f| f.project == Some("dx".into())) + .count(), + 16 + ); + assert_eq!( + unleash_cloud_features + .features + .iter() + .filter(|f| f.project == Some("eg".into())) + .count(), + 7 + ); + let eg_features = feature_refresher + .features_for_filter( + eg_token.clone(), + &FeatureFilterSet::from(project_filter(&eg_token)), + ) + .await + .expect("No eg_token features"); + assert_eq!( + eg_features + .features + .iter() + .filter(|f| f.project == Some("eg".into())) + .count(), + 7 + ); + assert!(warnings.is_none()); + } + + #[test] + fn front_end_token_is_properly_covered_by_current_tokens() { + let fe_token = EdgeToken { + projects: vec!["a".into(), "b".into()], + environment: Some("development".into()), + ..Default::default() + }; + + let wildcard_token = EdgeToken { + projects: vec!["*".into()], + environment: Some("development".into()), + ..Default::default() + }; + + let current_tokens = DashMap::new(); + let token_refresh = TokenRefresh { + token: wildcard_token.clone(), + etag: None, + next_refresh: None, + last_refreshed: None, + last_check: None, + failure_count: 0, + last_feature_count: None, + }; + + current_tokens.insert(wildcard_token.token, token_refresh); + + let current_tokens_arc = Arc::new(current_tokens); + assert!(frontend_token_is_covered_by_tokens( + &fe_token, + current_tokens_arc + )); + } + + #[tokio::test] + async fn refetching_data_when_feature_is_archived_should_remove_archived_feature() { + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); + let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let mut eg_token = EdgeToken::from_str("eg:development.devsecret").unwrap(); + eg_token.token_type = Some(TokenType::Client); + eg_token.status = Validated; + upstream_token_cache.insert(eg_token.token.clone(), eg_token.clone()); + let example_features = features_from_disk("../examples/hostedexample.json"); + let cache_key = cache_key(&eg_token); + upstream_features_cache.insert(cache_key.clone(), example_features.clone()); + let mut engine_state = EngineState::default(); + let warnings = engine_state.take_state(example_features.clone()); + upstream_engine_cache.insert(cache_key.clone(), engine_state); + let server = client_api_test_server( + upstream_token_cache, + upstream_features_cache.clone(), + upstream_engine_cache, + ) + .await; + let features_cache: Arc = Arc::new(FeatureCache::default()); + let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); + let feature_refresher = FeatureRefresher { + unleash_client: Arc::new(unleash_client), + features_cache: features_cache.clone(), + refresh_interval: Duration::seconds(0), + ..Default::default() + }; + + let _ = feature_refresher + .register_and_hydrate_token(&eg_token) + .await; + + // Now, let's say that all features are archived in upstream + let empty_features = features_from_disk("../examples/empty-features.json"); + upstream_features_cache.insert(cache_key.clone(), empty_features); + + feature_refresher.refresh_features().await; + // Since our response was empty, our theory is that there should be no features here now. + assert!(!features_cache + .get(&cache_key) + .unwrap() + .features + .iter() + .any(|f| f.project == Some("eg".into()))); + assert!(warnings.is_none()); + } + + #[test] + pub fn an_update_with_one_feature_removed_from_one_project_removes_the_feature_from_the_feature_list( + ) { + let features = features_from_disk("../examples/hostedexample.json").features; + let mut dx_data: Vec = features_from_disk("../examples/hostedexample.json") + .features + .iter() + .filter(|f| f.project == Some("dx".into())) + .cloned() + .collect(); + dx_data.remove(0); + let mut token = EdgeToken::from_str("[]:development.somesecret").unwrap(); + token.status = TokenValidationStatus::Validated; + token.projects = vec![String::from("dx")]; + + let updated = update_projects_from_feature_update(&token, &features, &dx_data); + assert_ne!( + features + .iter() + .filter(|p| p.project == Some("dx".into())) + .count(), + updated + .iter() + .filter(|p| p.project == Some("dx".into())) + .count() + ); + assert_eq!( + features + .iter() + .filter(|p| p.project == Some("eg".into())) + .count(), + updated + .iter() + .filter(|p| p.project == Some("eg".into())) + .count() + ); + } + + #[test] + pub fn project_state_from_update_should_overwrite_project_state_in_known_state() { + let features = features_from_disk("../examples/hostedexample.json").features; + let mut dx_data: Vec = features + .iter() + .filter(|f| f.project == Some("dx".into())) + .cloned() + .collect(); + dx_data.remove(0); + let mut eg_data: Vec = features + .iter() + .filter(|f| f.project == Some("eg".into())) + .cloned() + .collect(); + eg_data.remove(0); + dx_data.extend(eg_data); + let edge_token = EdgeToken { + token: "".to_string(), + token_type: Some(TokenType::Client), + environment: None, + projects: vec![String::from("dx"), String::from("eg")], + status: TokenValidationStatus::Validated, + }; + let update = update_projects_from_feature_update(&edge_token, &features, &dx_data); + assert_eq!(features.len() - update.len(), 2); // We've removed two elements + } + + #[test] + pub fn if_project_is_removed_but_token_has_access_to_project_update_should_remove_cached_project( + ) { + let features = features_from_disk("../examples/hostedexample.json").features; + let edge_token = EdgeToken { + token: "".to_string(), + token_type: Some(TokenType::Client), + environment: None, + projects: vec![String::from("dx"), String::from("eg")], + status: TokenValidationStatus::Validated, + }; + let eg_data: Vec = features + .iter() + .filter(|f| f.project == Some("eg".into())) + .cloned() + .collect(); + let update = update_projects_from_feature_update(&edge_token, &features, &eg_data); + assert!(!update.iter().any(|p| p.project == Some(String::from("dx")))); + } + #[test] + pub fn if_token_does_not_have_access_to_project_no_update_happens_to_project() { + let features = features_from_disk("../examples/hostedexample.json").features; + let edge_token = EdgeToken { + token: "".to_string(), + token_type: Some(TokenType::Client), + environment: None, + projects: vec![String::from("dx"), String::from("eg")], + status: TokenValidationStatus::Validated, + }; + let eg_data: Vec = features + .iter() + .filter(|f| f.project == Some("eg".into())) + .cloned() + .collect(); + let update = update_projects_from_feature_update(&edge_token, &features, &eg_data); + assert_eq!( + update + .iter() + .filter(|p| p.project == Some(String::from("unleash-cloud"))) + .count(), + 1 + ); + } + + #[test] + pub fn if_token_is_wildcard_our_entire_cache_is_replaced_by_update() { + let features = vec![ + ClientFeature { + name: "my.first.toggle.in.default".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: None, + impression_data: None, + project: Some("default".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ClientFeature { + name: "my.second.toggle.in.testproject".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: false, + stale: None, + impression_data: None, + project: Some("testproject".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ]; + let edge_token = EdgeToken { + token: "".to_string(), + token_type: Some(TokenType::Client), + environment: None, + projects: vec![String::from("*")], + status: TokenValidationStatus::Validated, + }; + let update: Vec = features + .clone() + .iter() + .filter(|t| t.project == Some("default".into())) + .cloned() + .collect(); + let updated = update_projects_from_feature_update(&edge_token, &features, &update); + assert_eq!(updated.len(), 1); + assert!(updated.iter().all(|f| f.project == Some("default".into()))) + } + + #[test] + pub fn token_with_access_to_different_project_than_exists_in_cache_should_never_delete_features_from_other_projects( + ) { + // Added after customer issue in May '24 when tokens unrelated to projects in cache with no actual features connected to them removed existing features in cache for unrelated projects + let features = vec![ + ClientFeature { + name: "my.first.toggle.in.default".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: None, + impression_data: None, + project: Some("testproject".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ClientFeature { + name: "my.second.toggle.in.testproject".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: false, + stale: None, + impression_data: None, + project: Some("testproject".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ]; + let empty_features = vec![]; + let unrelated_token_to_existing_features = EdgeToken { + token: "someotherproject:dev.myextralongsecretstringwithfeatures".to_string(), + token_type: Some(TokenType::Client), + environment: Some("dev".into()), + projects: vec![String::from("someother")], + status: TokenValidationStatus::Validated, + }; + let updated = update_projects_from_feature_update( + &unrelated_token_to_existing_features, + &features, + &empty_features, + ); + assert_eq!(updated.len(), 2); + } + #[test] + pub fn token_with_access_to_both_a_different_project_than_exists_in_cache_and_the_cached_project_should_delete_features_from_both_projects( + ) { + // Added after customer issue in May '24 when tokens unrelated to projects in cache with no actual features connected to them removed existing features in cache for unrelated projects + let features = vec![ + ClientFeature { + name: "my.first.toggle.in.default".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: None, + impression_data: None, + project: Some("testproject".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ClientFeature { + name: "my.second.toggle.in.testproject".to_string(), + feature_type: Some("release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: false, + stale: None, + impression_data: None, + project: Some("testproject".into()), + strategies: None, + variants: None, + dependencies: None, + }, + ]; + let empty_features = vec![]; + let token_with_access_to_both_empty_and_full_project = EdgeToken { + token: "[]:dev.myextralongsecretstringwithfeatures".to_string(), + token_type: Some(TokenType::Client), + environment: Some("dev".into()), + projects: vec![String::from("testproject"), String::from("someother")], + status: TokenValidationStatus::Validated, + }; + let updated = update_projects_from_feature_update( + &token_with_access_to_both_empty_and_full_project, + &features, + &empty_features, + ); + assert_eq!(updated.len(), 0); + } +} diff --git a/server/src/http/instance_data.rs b/server/src/http/instance_data.rs new file mode 100644 index 00000000..e36b426a --- /dev/null +++ b/server/src/http/instance_data.rs @@ -0,0 +1,109 @@ +use reqwest::{StatusCode, Url}; +use std::sync::Arc; +use chrono::Duration; +use tokio::sync::RwLock; + +use prometheus::Registry; +use tracing::{debug, trace, warn}; +use crate::cli::{CliArgs, EdgeMode}; +use crate::error::EdgeError; +use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation, UnleashClient}; +use crate::metrics::edge_metrics::EdgeInstanceData; + + +#[derive(Debug, Clone)] +pub struct InstanceDataSender { + pub unleash_client: Arc, + pub token: String, +} + +impl InstanceDataSender { + pub fn from_args(args: CliArgs, instance_data: Arc) -> Result, EdgeError> { + match args.mode { + EdgeMode::Edge(edge_args) => { + let instance_id = instance_data.identifier.clone(); + Ok(edge_args.tokens.first().map(|token| { + let client_meta_information = ClientMetaInformation { + app_name: args.app_name, + instance_id + }; + let http_client = new_reqwest_client( + edge_args.skip_ssl_verification, + edge_args.client_identity.clone(), + edge_args.upstream_certificate_file.clone(), + Duration::seconds(edge_args.upstream_request_timeout), + Duration::seconds(edge_args.upstream_socket_timeout), + client_meta_information.clone(), + ).expect("Could not construct reqwest client for posting observability data"); + let unleash_client = Url::parse(&edge_args.upstream_url.clone()) + .map(|url| { + UnleashClient::from_url(url, args.token_header.token_header.clone(), http_client) + }) + .map(|c| c.with_custom_client_headers(edge_args.custom_client_headers.clone())) + .map(Arc::new) + .map_err(|_| EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())).expect("Could not construct UnleashClient"); + Self { + unleash_client, + token: token.clone(), + } + })) + }, + _ => Ok(None) + } + } +} + +pub async fn send_instance_data( + instance_data_sender: Option, + prometheus_registry: Registry, + our_instance_data: Arc, + downstream_instance_data: Arc>>, +) { + let mut do_the_work = true; + loop { + let mut empty = true; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + if let Some(instance_data_sender) = instance_data_sender.clone() { + trace!("Looping instance data sending"); + let observed_data = our_instance_data.observe( + &prometheus_registry, + downstream_instance_data.read().await.clone(), + ); + if do_the_work { + let status = instance_data_sender.unleash_client + .send_instance_data( + observed_data, + &instance_data_sender.token, + ) + .await; + match status { + Ok(_) => {} + Err(e) => match e { + EdgeError::EdgeMetricsRequestError(status, _message) => { + warn!("Failed to post instance data with status {status}"); + if status == StatusCode::NOT_FOUND { + debug!("Upstream edge metrics not found, clearing our data about downstream instances to avoid growing to infinity (and beyond!)."); + empty = true; + do_the_work = false; + } else if status == StatusCode::FORBIDDEN { + warn!("Upstream edge metrics rejected our data, clearing our data about downstream instances to avoid growing to infinity (and beyond!)"); + empty = true; + do_the_work = false; + } + } + _ => { + warn!("Failed to post instance data due to unknown error {e:?}"); + empty = false; + } + }, + } + } + } else { + debug!("Did not have something to send instance data to"); + empty = true; // Emptying here, since we don't have anywhere to send the data to, to avoid growing memory + } + if empty { + downstream_instance_data.write().await.clear(); + } + } +} diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 35178d25..d6f5e8c0 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -2,5 +2,6 @@ pub mod background_send_metrics; pub mod broadcaster; pub(crate) mod headers; +pub mod instance_data; pub mod refresher; pub mod unleash_client; diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 641c5bda..5b346f8b 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -12,7 +12,7 @@ use reqwest::header::{HeaderMap, HeaderName}; use reqwest::{header, Client}; use reqwest::{ClientBuilder, Identity, RequestBuilder, StatusCode, Url}; use serde::{Deserialize, Serialize}; -use tracing::error; +use tracing::{debug, error}; use tracing::{info, trace, warn}; use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::ClientApplication; @@ -24,6 +24,7 @@ use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; use crate::metrics::client_metrics::MetricsBatch; +use crate::metrics::edge_metrics::EdgeInstanceData; use crate::tls::build_upstream_certificate; use crate::types::{ ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken, @@ -55,6 +56,20 @@ lazy_static! { vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0] ) .unwrap(); + pub static ref METRICS_UPLOAD: HistogramVec = register_histogram_vec!( + "client_metrics_upload", + "Timings for uploading client metrics in milliseconds", + &["status_code"], + vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0] + ) + .unwrap(); + pub static ref INSTANCE_DATA_UPLOAD: HistogramVec = register_histogram_vec!( + "instance_data_upload", + "Timings for uploading Edge instance data in milliseconds", + &["status_code"], + vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0] + ) + .unwrap(); pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!( Opts::new( "client_feature_fetch_failures", @@ -517,6 +532,7 @@ impl UnleashClient { token: &str, ) -> EdgeResult<()> { trace!("Sending metrics to bulk endpoint"); + let started_at = Utc::now(); let result = self .backing_client .post(self.urls.client_bulk_metrics_url.to_string()) @@ -528,6 +544,10 @@ impl UnleashClient { info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}"); EdgeError::EdgeMetricsError })?; + let ended = Utc::now(); + METRICS_UPLOAD + .with_label_values(&[result.status().as_str()]) + .observe(ended.signed_duration_since(started_at).num_milliseconds() as f64); if result.status().is_success() { Ok(()) } else { @@ -541,6 +561,48 @@ impl UnleashClient { } } + #[tracing::instrument(skip(self, instance_data, token))] + pub async fn send_instance_data( + &self, + instance_data: EdgeInstanceData, + token: &str, + ) -> EdgeResult<()> { + let started_at = Utc::now(); + let result = self + .backing_client + .post(self.urls.edge_instance_data_url.to_string()) + .headers(self.header_map(Some(token.into()))) + .timeout(Duration::seconds(3).to_std().unwrap()) + .json(&instance_data) + .send() + .await + .map_err(|e| { + info!("Failed to send instance data: {e:?}"); + EdgeError::EdgeMetricsError + })?; + let ended_at = Utc::now(); + INSTANCE_DATA_UPLOAD + .with_label_values(&[result.status().as_str()]) + .observe( + ended_at + .signed_duration_since(started_at) + .num_milliseconds() as f64, + ); + let r = if result.status().is_success() { + Ok(()) + } else { + match result.status() { + StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError( + result.status(), + result.json().await.ok(), + )), + _ => Err(EdgeMetricsRequestError(result.status(), None)), + } + }; + debug!("Sent instance data to upstream server"); + r + } + pub async fn validate_tokens( &self, request: ValidateTokensRequest, diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index ed55f703..97507701 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -7,16 +7,19 @@ use actix_web::{ use dashmap::DashMap; use iter_tools::Itertools; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ClientApplication; -use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::actix_web_metrics::PrometheusMetricsHandler; use crate::metrics::client_metrics::MetricsCache; use crate::types::{BuildInfo, EdgeJsonResult, EdgeToken, TokenInfo, TokenRefresh}; use crate::types::{ClientMetric, MetricsInfo, Status}; use crate::{auth::token_validator::TokenValidator, cli::InternalBackstageArgs}; use crate::{error::EdgeError, feature_cache::FeatureCache}; +use crate::{ + http::refresher::feature_refresher::FeatureRefresher, metrics::edge_metrics::EdgeInstanceData, +}; #[derive(Debug, Serialize, Deserialize)] pub struct EdgeStatus { @@ -146,24 +149,44 @@ pub async fn features( Ok(Json(features)) } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DebugEdgeInstanceData { + pub this_instance: EdgeInstanceData, + pub connected_instances: Vec, +} + +#[get("/instancedata")] +pub async fn instance_data( + this_instance: web::Data, + downstream_instance_data: web::Data>>, +) -> EdgeJsonResult { + Ok(Json(DebugEdgeInstanceData { + this_instance: this_instance.get_ref().clone(), + connected_instances: downstream_instance_data.read().await.clone(), + })) +} + pub fn configure_internal_backstage( cfg: &mut web::ServiceConfig, metrics_handler: PrometheusMetricsHandler, - internal_backtage_args: InternalBackstageArgs, + internal_backstage_args: InternalBackstageArgs, ) { cfg.service(health).service(info).service(ready); - if !internal_backtage_args.disable_tokens_endpoint { + if !internal_backstage_args.disable_tokens_endpoint { cfg.service(tokens); } - if !internal_backtage_args.disable_metrics_endpoint { + if !internal_backstage_args.disable_metrics_endpoint { cfg.service(web::resource("/metrics").route(web::get().to(metrics_handler))); } - if !internal_backtage_args.disable_metrics_batch_endpoint { + if !internal_backstage_args.disable_metrics_batch_endpoint { cfg.service(metrics_batch); } - if !internal_backtage_args.disable_features_endpoint { + if !internal_backstage_args.disable_features_endpoint { cfg.service(features); } + if !internal_backstage_args.disable_instance_data_endpoint { + cfg.service(instance_data); + } } #[cfg(test)] diff --git a/server/src/main.rs b/server/src/main.rs index b48e21a0..23542b33 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; - use actix_middleware_etag::Etag; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; use clap::Parser; use dashmap::DashMap; use futures::future::join_all; +use std::sync::Arc; +use tokio::sync::RwLock; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ConnectVia; use utoipa::OpenApi; @@ -16,22 +16,22 @@ use unleash_edge::builder::build_caches_and_refreshers; use unleash_edge::cli::{CliArgs, EdgeMode}; use unleash_edge::feature_cache::FeatureCache; use unleash_edge::http::background_send_metrics::send_metrics_one_shot; +use unleash_edge::http::broadcaster::Broadcaster; use unleash_edge::http::refresher::feature_refresher::FeatureRefresher; use unleash_edge::metrics::client_metrics::MetricsCache; +use unleash_edge::metrics::edge_metrics::EdgeInstanceData; use unleash_edge::offline::offline_hotload; use unleash_edge::persistence::{persist_data, EdgePersistence}; use unleash_edge::types::{EdgeToken, TokenValidationStatus}; -use unleash_edge::{cli, client_api, frontend_api, health_checker, openapi, ready_checker}; +use unleash_edge::{client_api, frontend_api, health_checker, openapi, ready_checker}; use unleash_edge::{edge_api, prom_metrics}; use unleash_edge::{internal_backstage, tls}; +use unleash_edge::http::instance_data::InstanceDataSender; #[cfg(not(tarpaulin_include))] #[actix_web::main] async fn main() -> Result<(), anyhow::Error> { - use unleash_edge::{ - http::{broadcaster::Broadcaster, unleash_client::ClientMetaInformation}, - metrics::metrics_pusher, - }; + use unleash_edge::{http::unleash_client::ClientMetaInformation, metrics::metrics_pusher}; let args = CliArgs::parse(); let disable_all_endpoint = args.disable_all_endpoint; @@ -62,9 +62,11 @@ async fn main() -> Result<(), anyhow::Error> { instance_id: args.clone().instance_id, }; let app_name = args.app_name.clone(); + let our_instance_data_for_app_context = Arc::new(EdgeInstanceData::new(&app_name)); + let our_instance_data = our_instance_data_for_app_context.clone(); let instance_id = args.instance_id.clone(); let custom_headers = match args.mode { - cli::EdgeMode::Edge(ref edge) => edge.custom_client_headers.clone(), + EdgeMode::Edge(ref edge) => edge.custom_client_headers.clone(), _ => vec![], }; @@ -75,8 +77,9 @@ async fn main() -> Result<(), anyhow::Error> { token_validator, feature_refresher, persistence, - ) = build_caches_and_refreshers(args).await.unwrap(); + ) = build_caches_and_refreshers(args.clone()).await.unwrap(); + let instance_data_sender = InstanceDataSender::from_args(args.clone(), our_instance_data.clone()); let token_validator_schedule = token_validator.clone(); let lazy_feature_cache = features_cache.clone(); let lazy_token_cache = token_cache.clone(); @@ -89,6 +92,9 @@ async fn main() -> Result<(), anyhow::Error> { let openapi = openapi::ApiDoc::openapi(); let refresher_for_app_data = feature_refresher.clone(); let prom_registry_for_write = metrics_handler.registry.clone(); + let instances_observed_for_app_context: Arc>> = + Arc::new(RwLock::new(Vec::new())); + let downstream_instance_data = instances_observed_for_app_context.clone(); let broadcaster = Broadcaster::new(features_cache.clone()); @@ -107,7 +113,9 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::from(token_cache.clone())) .app_data(web::Data::from(features_cache.clone())) .app_data(web::Data::from(engine_cache.clone())) - .app_data(web::Data::from(broadcaster.clone())); + .app_data(web::Data::from(broadcaster.clone())) + .app_data(web::Data::from(our_instance_data_for_app_context.clone())) + .app_data(web::Data::from(instances_observed_for_app_context.clone())); app = match token_validator.clone() { Some(v) => app.app_data(web::Data::from(v)), @@ -162,7 +170,7 @@ async fn main() -> Result<(), anyhow::Error> { .client_request_timeout(std::time::Duration::from_secs(request_timeout)); match schedule_args.mode { - cli::EdgeMode::Edge(edge) => { + EdgeMode::Edge(edge) => { let refresher_for_background = feature_refresher.clone().unwrap(); if edge.streaming { let app_name = app_name.clone(); @@ -200,45 +208,48 @@ async fn main() -> Result<(), anyhow::Error> { tokio::select! { _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); + info!("Actix is shutting down. Persisting data"); clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); + info!("Actix was shutdown properly"); }, _ = refresher.start_refresh_features_background_task() => { - tracing::info!("Feature refresher unexpectedly shut down"); + info!("Feature refresher unexpectedly shut down"); } _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { - tracing::info!("Metrics poster unexpectedly shut down"); + info!("Metrics poster unexpectedly shut down"); } _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { - tracing::info!("Persister was unexpectedly shut down"); + info!("Persister was unexpectedly shut down"); } _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { - tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher.clone()) => { + info!("Token validator validation of startup tokens was unexpectedly shut down"); } - _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { - tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write.clone(), edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + info!("Prometheus push unexpectedly shut down"); } - _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { - tracing::info!("Prometheus push unexpectedly shut down"); + _ = unleash_edge::http::instance_data::send_instance_data(instance_data_sender?, prom_registry_for_write.clone(), our_instance_data.clone(), downstream_instance_data.clone()) => { + info!("Instance data pusher unexpectedly quit"); } } } - cli::EdgeMode::Offline(offline_args) if offline_args.reload_interval > 0 => { + EdgeMode::Offline(offline_args) if offline_args.reload_interval > 0 => { tokio::select! { _ = offline_hotload::start_hotload_loop(lazy_feature_cache, lazy_engine_cache, offline_args) => { - tracing::info!("Hotloader unexpectedly shut down."); + info!("Hotloader unexpectedly shut down."); }, _ = server.run() => { - tracing::info!("Actix is shutting down. No pending tasks."); + info!("Actix is shutting down. No pending tasks."); }, } } _ => tokio::select! { _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); + info!("Actix is shutting down. Persisting data"); clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); + info!("Actix was shutdown properly"); } }, @@ -273,7 +284,7 @@ async fn clean_shutdown( ]) .await; if res.iter().all(|save| save.is_ok()) { - tracing::info!("Successfully persisted data to storage backend"); + info!("Successfully persisted data to storage backend"); } else { res.iter() .filter(|save| save.is_err()) diff --git a/server/src/metrics/actix_web_metrics.rs b/server/src/metrics/actix_web_metrics.rs index 5d11da10..522ff36a 100644 --- a/server/src/metrics/actix_web_metrics.rs +++ b/server/src/metrics/actix_web_metrics.rs @@ -121,6 +121,10 @@ impl Metrics { .f64_histogram(HTTP_SERVER_DURATION) .with_description("HTTP inbound request duration per route") .with_unit("ms") + .with_boundaries(vec![ + 1.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, + 1000.0, 1500.0, 2000.0, + ]) .build(); let http_server_request_size = meter diff --git a/server/src/metrics/edge_metrics.rs b/server/src/metrics/edge_metrics.rs new file mode 100644 index 00000000..72244ee2 --- /dev/null +++ b/server/src/metrics/edge_metrics.rs @@ -0,0 +1,319 @@ +use ahash::{HashMap, HashSet}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing::info; +use ulid::Ulid; +use utoipa::ToSchema; + +use crate::types::BuildInfo; + +#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct LatencyMetrics { + pub avg: f64, + pub count: f64, + pub p99: f64, +} + +impl LatencyMetrics { + pub fn new() -> Self { + Self { + avg: 0.0, + count: 0.0, + p99: 0.0, + } + } +} + +#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ProcessMetrics { + pub cpu_usage: f64, + pub memory_usage: f64, +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct InstanceTraffic { + pub get: HashMap, + pub post: HashMap, + pub access_denied: HashMap, +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpstreamLatency { + pub features: LatencyMetrics, + pub metrics: LatencyMetrics, + pub edge: LatencyMetrics, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct EdgeInstanceData { + pub identifier: String, + pub app_name: String, + pub region: Option, + pub edge_version: String, + pub process_metrics: Option, + pub started: DateTime, + pub traffic: InstanceTraffic, + pub latency_upstream: UpstreamLatency, + pub connected_streaming_clients: u64, + pub connected_edges: Vec, +} + +impl EdgeInstanceData { + pub fn new(app_name: &str) -> Self { + let build_info = BuildInfo::default(); + Self { + identifier: Ulid::new().to_string(), + app_name: app_name.to_string(), + region: std::env::var("AWS_REGION").ok(), + edge_version: build_info.package_version.clone(), + process_metrics: None, + started: Utc::now(), + traffic: InstanceTraffic::default(), + latency_upstream: UpstreamLatency::default(), + connected_edges: Vec::new(), + connected_streaming_clients: 0, + } + } + + pub fn observe( + &self, + registry: &prometheus::Registry, + connected_instances: Vec, + ) -> Self { + let mut observed = self.clone(); + let mut cpu_seconds = 0; + let mut resident_memory = 0; + let mut desired_urls = HashSet::default(); + desired_urls.insert("/api/client/features"); + desired_urls.insert("/api/client/metrics"); + desired_urls.insert("/api/client/metrics/bulk"); + desired_urls.insert("/api/client/metrics/edge"); + desired_urls.insert("/api/frontend"); + desired_urls.insert("/api/proxy"); + let mut get_requests = HashMap::default(); + let mut post_requests = HashMap::default(); + let mut access_denied = HashMap::default(); + + for family in registry.gather().iter() { + match family.get_name() { + "http_server_duration_milliseconds" => { + family + .get_metric() + .iter() + .filter(|m| { + m.has_histogram() + && m.get_label().iter().any(|l| { + l.get_name() == "url_path" + && desired_urls.contains(l.get_value()) + }) + && m.get_label().iter().any(|l| { + l.get_name() == "http_response_status_code" + && l.get_value() == "200" + || l.get_value() == "403" + }) + }) + .for_each(|m| { + let labels = m.get_label(); + + let path = labels + .iter() + .find(|l| l.get_name() == "url_path") + .unwrap() + .get_value(); + let method = labels + .iter() + .find(|l| l.get_name() == "http_request_method") + .unwrap() + .get_value(); + let status = labels + .iter() + .find(|l| l.get_name() == "http_response_status_code") + .unwrap() + .get_value(); + let latency = if status != "200" { + access_denied + .entry(path.to_string()) + .or_insert(LatencyMetrics::new()) + } else if method == "GET" { + get_requests + .entry(path.to_string()) + .or_insert(LatencyMetrics::new()) + } else { + post_requests + .entry(path.to_string()) + .or_insert(LatencyMetrics::new()) + }; + let total = m.get_histogram().get_sample_sum() * 1000.0; // convert to ms + let count = m.get_histogram().get_sample_count() as f64; + let p99 = get_percentile( + 99, + m.get_histogram().get_sample_count(), + m.get_histogram().get_bucket(), + ); + *latency = LatencyMetrics { + avg: if count == 0.0 { 0.0 } else { round_to_3_decimals(total / count) }, + count, + p99, + }; + }); + } + "process_cpu_seconds_total" => { + if let Some(cpu_second_metric) = family.get_metric().last() { + cpu_seconds = cpu_second_metric.get_counter().get_value() as u64; + } + } + "process_resident_memory_bytes" => { + if let Some(resident_memory_metric) = family.get_metric().last() { + resident_memory = resident_memory_metric.get_gauge().get_value() as u64; + } + } + "client_metrics_upload" => { + info!("{:?}", family.get_metric()); + if let Some(metrics_upload_metric) = family.get_metric().last() { + let count = metrics_upload_metric.get_histogram().get_sample_count(); + let p99 = get_percentile( + 99, + count, + metrics_upload_metric.get_histogram().get_bucket(), + ); + observed.latency_upstream.metrics = LatencyMetrics { + avg: round_to_3_decimals(metrics_upload_metric.get_histogram().get_sample_sum() + / count as f64), + count: count as f64, + p99, + } + } + } + "instance_data_upload" => { + if let Some(instance_data_upload_metric) = family.get_metric().last() { + let count = instance_data_upload_metric + .get_histogram() + .get_sample_count(); + let p99 = get_percentile( + 99, + count, + instance_data_upload_metric.get_histogram().get_bucket(), + ); + observed.latency_upstream.edge = LatencyMetrics { + avg: round_to_3_decimals(instance_data_upload_metric.get_histogram().get_sample_sum() + / count as f64), + count: count as f64, + p99, + } + } + } + "client_feature_fetch" => { + if let Some(feature_fetch_metric) = family.get_metric().last() { + let count = feature_fetch_metric.get_histogram().get_sample_count(); + let p99 = get_percentile( + 99, + count, + feature_fetch_metric.get_histogram().get_bucket(), + ); + observed.latency_upstream.features = LatencyMetrics { + avg: round_to_3_decimals(feature_fetch_metric.get_histogram().get_sample_sum() + / count as f64), + count: count as f64, + p99, + } + } + } + "connected_streaming_clients" => { + if let Some(connected_streaming_clients) = family.get_metric().last() { + observed.connected_streaming_clients = + connected_streaming_clients.get_gauge().get_value() as u64; + } + } + _ => {} + } + } + observed.traffic = InstanceTraffic { + get: get_requests, + post: post_requests, + access_denied, + }; + observed.process_metrics = Some(ProcessMetrics { + cpu_usage: cpu_seconds as f64, + memory_usage: resident_memory as f64, + }); + for connected_instance in connected_instances { + observed.connected_edges.push(connected_instance.clone()); + } + observed + } +} + +fn get_percentile(percentile: u64, count: u64, buckets: &[prometheus::proto::Bucket]) -> f64 { + let target = (percentile as f64 / 100.0) * count as f64; + let mut previous_upper_bound = 0.0; + let mut previous_count = 0; + for bucket in buckets { + if bucket.get_cumulative_count() as f64 >= target { + let nth_count = bucket.get_cumulative_count() - previous_count; + let observation_in_range = target - previous_count as f64; + return round_to_3_decimals(previous_upper_bound + + ((observation_in_range / nth_count as f64) + * (bucket.get_upper_bound() - previous_upper_bound))); + } + previous_upper_bound = bucket.get_upper_bound(); + previous_count = bucket.get_cumulative_count(); + } + 0.0 +} + +fn round_to_3_decimals(number: f64) -> f64 { + (number * 1000.0).round() / 1000.0 +} + +#[cfg(test)] +mod tests { + + #[test] + pub fn can_find_p99_of_a_range() { + let mut one_ms = prometheus::proto::Bucket::new(); + one_ms.set_cumulative_count(1000); + one_ms.set_upper_bound(1.0); + let mut five_ms = prometheus::proto::Bucket::new(); + five_ms.set_cumulative_count(2000); + five_ms.set_upper_bound(5.0); + let mut ten_ms = prometheus::proto::Bucket::new(); + ten_ms.set_cumulative_count(3000); + ten_ms.set_upper_bound(10.0); + let mut twenty_ms = prometheus::proto::Bucket::new(); + twenty_ms.set_cumulative_count(4000); + twenty_ms.set_upper_bound(20.0); + let mut fifty_ms = prometheus::proto::Bucket::new(); + fifty_ms.set_cumulative_count(5000); + fifty_ms.set_upper_bound(50.0); + let buckets = vec![one_ms, five_ms, ten_ms, twenty_ms, fifty_ms]; + let result = super::get_percentile(99, 5000, &buckets); + assert_eq!(result, 48.5); + } + + #[test] + pub fn can_find_p50_of_a_range() { + let mut one_ms = prometheus::proto::Bucket::new(); + one_ms.set_cumulative_count(1000); + one_ms.set_upper_bound(1.0); + let mut five_ms = prometheus::proto::Bucket::new(); + five_ms.set_cumulative_count(2000); + five_ms.set_upper_bound(5.0); + let mut ten_ms = prometheus::proto::Bucket::new(); + ten_ms.set_cumulative_count(3000); + ten_ms.set_upper_bound(10.0); + let mut twenty_ms = prometheus::proto::Bucket::new(); + twenty_ms.set_cumulative_count(4000); + twenty_ms.set_upper_bound(20.0); + let mut fifty_ms = prometheus::proto::Bucket::new(); + fifty_ms.set_cumulative_count(5000); + fifty_ms.set_upper_bound(50.0); + let buckets = vec![one_ms, five_ms, ten_ms, twenty_ms, fifty_ms]; + let result = super::get_percentile(50, 5000, &buckets); + assert_eq!(result, 7.5); + } +} diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 887fb965..e3759dfd 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -6,6 +6,7 @@ use tracing::trace; pub mod actix_web_metrics; pub mod client_metrics; +pub mod edge_metrics; pub mod metrics_pusher; pub mod route_formatter; diff --git a/server/src/prom_metrics.rs b/server/src/prom_metrics.rs index 028357e1..d1c25533 100644 --- a/server/src/prom_metrics.rs +++ b/server/src/prom_metrics.rs @@ -148,6 +148,16 @@ fn register_custom_metrics(registry: &prometheus::Registry) { crate::http::broadcaster::CONNECTED_STREAMING_CLIENTS.clone(), )) .unwrap(); + registry + .register(Box::new( + crate::http::unleash_client::METRICS_UPLOAD.clone(), + )) + .unwrap(); + registry + .register(Box::new( + crate::http::unleash_client::INSTANCE_DATA_UPLOAD.clone(), + )) + .unwrap(); } #[cfg(test)] diff --git a/server/src/urls.rs b/server/src/urls.rs index 494d09c4..56a2e8d1 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -17,6 +17,7 @@ pub struct UnleashUrls { pub edge_api_url: Url, pub edge_validate_url: Url, pub edge_metrics_url: Url, + pub edge_instance_data_url: Url, pub new_api_token_url: Url, pub client_features_stream_url: Url, } @@ -100,6 +101,11 @@ impl UnleashUrls { .path_segments_mut() .expect("Could not create /api/client/metrics/bulk") .push("bulk"); + let mut edge_instance_data_url = client_metrics_url.clone(); + edge_instance_data_url + .path_segments_mut() + .expect("Could not create /api/client/metrics/instance-data") + .push("edge"); UnleashUrls { base_url, api_url, @@ -114,6 +120,7 @@ impl UnleashUrls { edge_metrics_url, new_api_token_url, client_features_stream_url, + edge_instance_data_url, } } }