diff --git a/Cargo.lock b/Cargo.lock index 5ce990b2..d5d28877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4763,9 +4763,9 @@ dependencies = [ [[package]] name = "unleash-yggdrasil" -version = "0.16.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34dd3238d299dd71e59ed901b67e5d371d8c3e66ae1945fe31da7d876feb613f" +checksum = "b8c059a19bd116c26bdc3b2acccb09ac2f9142615db9505606b367d120d455f4" dependencies = [ "chrono", "convert_case 0.6.0", diff --git a/server/Cargo.toml b/server/Cargo.toml index 7e5a88cc..e9a75c61 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -96,7 +96,7 @@ tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] } ulid = "1.1.4" unleash-types = { version = "0.15.5", features = ["openapi", "hashes"] } -unleash-yggdrasil = { version = "0.16.1" } +unleash-yggdrasil = { version = "0.17.0" } utoipa = { version = "5.3.1", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "9.0.0", features = ["actix-web"] } [dev-dependencies] diff --git a/server/src/builder.rs b/server/src/builder.rs index 23c36d1b..6a6e0395 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -8,7 +8,7 @@ use dashmap::DashMap; use reqwest::Url; use tracing::{debug, error, warn}; use unleash_types::client_features::ClientFeatures; -use unleash_yggdrasil::EngineState; +use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; @@ -70,7 +70,7 @@ async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc, + ) -> anyhow::Result<()> { + use anyhow::Context; + + let refreshes = self.get_tokens_due_for_refresh(); + for refresh in refreshes { + let token = refresh.token.clone(); + let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str(); + + let mut es_client_builder = eventsource_client::ClientBuilder::for_url(streaming_url) + .context("Failed to create EventSource client for streaming")? + .header("Authorization", &token.token)? + .header(UNLEASH_APPNAME_HEADER, &client_meta_information.app_name)? + .header( + UNLEASH_INSTANCE_ID_HEADER, + &client_meta_information.instance_id, + )? + .header( + UNLEASH_CLIENT_SPEC_HEADER, + unleash_yggdrasil::SUPPORTED_SPEC_VERSION, + )?; + + for (key, value) in custom_headers.clone() { + es_client_builder = es_client_builder.header(&key, &value)?; + } + + let es_client = es_client_builder + .reconnect( + eventsource_client::ReconnectOptions::reconnect(true) + .retry_initial(true) + .delay(Duration::from_secs(5)) + .delay_max(Duration::from_secs(30)) + .backoff_factor(2) + .build(), + ) + .build(); + + let refresher = self.clone(); + + tokio::spawn(async move { + let mut stream = es_client + .stream() + .map_ok(move |sse| { + let token = token.clone(); + let refresher = refresher.clone(); + async move { + match sse { + // The first time we're connecting to Unleash. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + debug!( + "Connected to unleash! Populating my flag cache now.", + ); + + match serde_json::from_str(&event.data) { + Ok(delta) => { refresher.handle_client_features_delta_updated(&token, delta, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + // Unleash has updated features for us. + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + debug!( + "Got an unleash updated event. Updating cache.", + ); + + match serde_json::from_str(&event.data) { + Ok(delta) => { refresher.handle_client_features_delta_updated(&token, delta, None).await; } + Err(e) => { warn!("Could not parse features response to internal representation: {e:?}"); + } + } + } + eventsource_client::SSE::Event(event) => { + info!( + "Got an SSE event that I wasn't expecting: {:#?}", + event + ); + } + eventsource_client::SSE::Connected(_) => { + debug!("SSE Connection established"); + } + eventsource_client::SSE::Comment(_) => { + // purposefully left blank. + }, + } + } + }) + .map_err(|e| warn!("Error in SSE stream: {:?}", e)); + + loop { + match stream.try_next().await { + Ok(Some(handler)) => handler.await, + Ok(None) => { + info!("SSE stream ended? Handler was None, anyway. Reconnecting."); + } + Err(e) => { + info!("SSE stream error: {e:?}. Reconnecting"); + } + } + } + }); + } + Ok(()) + } } #[cfg(test)] @@ -277,4 +393,6 @@ mod tests { }) .await } + + } diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index 406d1f69..d643d321 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -11,7 +11,7 @@ use reqwest::StatusCode; use tracing::{debug, info, warn}; use unleash_types::client_features::{ClientFeatures, DeltaEvent}; use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; -use unleash_yggdrasil::EngineState; +use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::error::{EdgeError, FeatureError}; use crate::feature_cache::FeatureCache; @@ -495,7 +495,7 @@ impl FeatureRefresher { .and_modify(|engine| { if let Some(f) = self.features_cache.get(&key) { let mut new_state = EngineState::default(); - let warnings = new_state.take_state(f.clone()); + let warnings = new_state.take_state(UpdateMessage::FullResponse(f.clone())); if let Some(warnings) = warnings { warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); }; @@ -506,7 +506,7 @@ impl FeatureRefresher { .or_insert_with(|| { let mut new_state = EngineState::default(); - let warnings = new_state.take_state(features); + let warnings = new_state.take_state(UpdateMessage::FullResponse(features)); if let Some(warnings) = warnings { warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); }; @@ -623,7 +623,7 @@ mod tests { use dashmap::DashMap; use reqwest::Url; use unleash_types::client_features::ClientFeature; - use unleash_yggdrasil::EngineState; + use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::feature_cache::{update_projects_from_feature_update, FeatureCache}; use crate::filters::{project_filter, FeatureFilterSet}; @@ -1043,7 +1043,7 @@ mod tests { let example_features = features_from_disk("../examples/features.json"); let cache_key = cache_key(&token); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); upstream_engine_cache.insert(cache_key.clone(), engine_state); let mut server = client_api_test_server( @@ -1099,7 +1099,7 @@ mod tests { let example_features = features_from_disk("../examples/features.json"); let cache_key = cache_key(&valid_token); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); upstream_engine_cache.insert(cache_key.clone(), engine_state); let server = client_api_test_server( @@ -1144,7 +1144,7 @@ mod tests { let example_features = features_from_disk("../examples/hostedexample.json"); let cache_key = cache_key(&dx_token); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); upstream_engine_cache.insert(cache_key.clone(), engine_state); let server = client_api_test_server( @@ -1194,7 +1194,7 @@ mod tests { let cache_key = cache_key(&dx_token); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_engine_cache.insert(cache_key, engine_state); let server = client_api_test_server( upstream_token_cache, @@ -1256,7 +1256,7 @@ mod tests { let cache_key = cache_key(&dx_token); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_engine_cache.insert(cache_key, engine_state); let server = client_api_test_server( upstream_token_cache, @@ -1364,7 +1364,7 @@ mod tests { let cache_key = cache_key(&eg_token); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); let mut engine_state = EngineState::default(); - let warnings = engine_state.take_state(example_features.clone()); + let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone())); upstream_engine_cache.insert(cache_key.clone(), engine_state); let server = client_api_test_server( upstream_token_cache, diff --git a/server/src/main.rs b/server/src/main.rs index 321c659c..b48e21a0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -167,17 +167,31 @@ async fn main() -> Result<(), anyhow::Error> { if edge.streaming { let app_name = app_name.clone(); let custom_headers = custom_headers.clone(); - tokio::spawn(async move { - let _ = refresher_for_background - .start_streaming_features_background_task( - ClientMetaInformation { - app_name, - instance_id, - }, - custom_headers, - ) - .await; - }); + if edge.delta { + tokio::spawn(async move { + let _ = refresher_for_background + .start_streaming_delta_background_task( + ClientMetaInformation { + app_name, + instance_id, + }, + custom_headers, + ) + .await; + }); + } else { + tokio::spawn(async move { + let _ = refresher_for_background + .start_streaming_features_background_task( + ClientMetaInformation { + app_name, + instance_id, + }, + custom_headers, + ) + .await; + }); + } } let refresher = feature_refresher.clone().unwrap(); diff --git a/server/src/offline/offline_hotload.rs b/server/src/offline/offline_hotload.rs index 95ab63da..cd77f8fa 100644 --- a/server/src/offline/offline_hotload.rs +++ b/server/src/offline/offline_hotload.rs @@ -14,7 +14,7 @@ use tracing::warn; use unleash_types::client_features::{ ClientFeature, ClientFeatures, Strategy, Variant, WeightType, }; -use unleash_yggdrasil::EngineState; +use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::{cli::OfflineArgs, error::EdgeError, feature_cache::FeatureCache, types::EdgeToken}; @@ -69,7 +69,7 @@ pub(crate) fn load_offline_engine_cache( client_features.clone(), ); let mut engine = EngineState::default(); - let warnings = engine.take_state(client_features); + let warnings = engine.take_state(UpdateMessage::FullResponse(client_features)); engine_cache.insert(crate::tokens::cache_key(edge_token), engine); if let Some(warnings) = warnings { warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");