Skip to content

Commit

Permalink
feat: edge to support delta streaming (#716)
Browse files Browse the repository at this point in the history
* feat: edge to support delta streaming
  • Loading branch information
sjaanus authored Feb 7, 2025
1 parent 610b832 commit 5d66cb8
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 29 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
ulid = "1.1.4"
unleash-types = { version = "0.15.5", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.16.1" }
unleash-yggdrasil = { version = "0.17.0" }
utoipa = { version = "5.3.1", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "9.0.0", features = ["actix-web"] }
[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dashmap::DashMap;
use reqwest::Url;
use tracing::{debug, error, warn};
use unleash_types::client_features::ClientFeatures;
use unleash_yggdrasil::EngineState;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::cli::RedisMode;
use crate::feature_cache::FeatureCache;
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc<dyn
features_cache.insert(key.clone(), features.clone());
let mut engine_state = EngineState::default();

let warnings = engine_state.take_state(features);
let warnings = engine_state.take_state(UpdateMessage::FullResponse(features));
if let Some(warnings) = warnings {
warn!("Failed to hydrate features for {key:?}: {warnings:?}");
}
Expand Down
120 changes: 119 additions & 1 deletion server/src/http/refresher/delta_refresher.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::time::Duration;
use eventsource_client::Client;
use actix_web::http::header::EntityTag;
use reqwest::StatusCode;
use futures::TryStreamExt;
use tracing::{debug, info, warn};
use unleash_types::client_features::{ClientFeaturesDelta};
use unleash_yggdrasil::EngineState;

use crate::error::{EdgeError, FeatureError};
use crate::http::headers::{UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER};
use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh};
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::ClientMetaInformation;
use crate::tokens::cache_key;

impl FeatureRefresher {
Expand All @@ -19,7 +24,7 @@ impl FeatureRefresher {
let updated_len = delta.events.len();

debug!(
"Got updated client features delta. Updating features with {etag:?}, events count {updated_len}"
"Got updated client features delta. Updating features with etag {etag:?}, events count {updated_len}"
);

let key = cache_key(refresh_token);
Expand Down Expand Up @@ -110,6 +115,117 @@ impl FeatureRefresher {
}
}
}

pub async fn start_streaming_delta_background_task(
&self,
client_meta_information: ClientMetaInformation,
custom_headers: Vec<(String, String)>,
) -> anyhow::Result<()> {
use anyhow::Context;

let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
let token = refresh.token.clone();
let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str();

let mut es_client_builder = eventsource_client::ClientBuilder::for_url(streaming_url)
.context("Failed to create EventSource client for streaming")?
.header("Authorization", &token.token)?
.header(UNLEASH_APPNAME_HEADER, &client_meta_information.app_name)?
.header(
UNLEASH_INSTANCE_ID_HEADER,
&client_meta_information.instance_id,
)?
.header(
UNLEASH_CLIENT_SPEC_HEADER,
unleash_yggdrasil::SUPPORTED_SPEC_VERSION,
)?;

for (key, value) in custom_headers.clone() {
es_client_builder = es_client_builder.header(&key, &value)?;
}

let es_client = es_client_builder
.reconnect(
eventsource_client::ReconnectOptions::reconnect(true)
.retry_initial(true)
.delay(Duration::from_secs(5))
.delay_max(Duration::from_secs(30))
.backoff_factor(2)
.build(),
)
.build();

let refresher = self.clone();

tokio::spawn(async move {
let mut stream = es_client
.stream()
.map_ok(move |sse| {
let token = token.clone();
let refresher = refresher.clone();
async move {
match sse {
// The first time we're connecting to Unleash.
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-connected" =>
{
debug!(
"Connected to unleash! Populating my flag cache now.",
);

match serde_json::from_str(&event.data) {
Ok(delta) => { refresher.handle_client_features_delta_updated(&token, delta, None).await; }
Err(e) => { warn!("Could not parse features response to internal representation: {e:?}");
}
}
}
// Unleash has updated features for us.
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-updated" =>
{
debug!(
"Got an unleash updated event. Updating cache.",
);

match serde_json::from_str(&event.data) {
Ok(delta) => { refresher.handle_client_features_delta_updated(&token, delta, None).await; }
Err(e) => { warn!("Could not parse features response to internal representation: {e:?}");
}
}
}
eventsource_client::SSE::Event(event) => {
info!(
"Got an SSE event that I wasn't expecting: {:#?}",
event
);
}
eventsource_client::SSE::Connected(_) => {
debug!("SSE Connection established");
}
eventsource_client::SSE::Comment(_) => {
// purposefully left blank.
},
}
}
})
.map_err(|e| warn!("Error in SSE stream: {:?}", e));

loop {
match stream.try_next().await {
Ok(Some(handler)) => handler.await,
Ok(None) => {
info!("SSE stream ended? Handler was None, anyway. Reconnecting.");
}
Err(e) => {
info!("SSE stream error: {e:?}. Reconnecting");
}
}
}
});
}
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -277,4 +393,6 @@ mod tests {
})
.await
}


}
20 changes: 10 additions & 10 deletions server/src/http/refresher/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::{ClientFeatures, DeltaEvent};
use unleash_types::client_metrics::{ClientApplication, MetricsMetadata};
use unleash_yggdrasil::EngineState;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::error::{EdgeError, FeatureError};
use crate::feature_cache::FeatureCache;
Expand Down Expand Up @@ -495,7 +495,7 @@ impl FeatureRefresher {
.and_modify(|engine| {
if let Some(f) = self.features_cache.get(&key) {
let mut new_state = EngineState::default();
let warnings = new_state.take_state(f.clone());
let warnings = new_state.take_state(UpdateMessage::FullResponse(f.clone()));
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
Expand All @@ -506,7 +506,7 @@ impl FeatureRefresher {
.or_insert_with(|| {
let mut new_state = EngineState::default();

let warnings = new_state.take_state(features);
let warnings = new_state.take_state(UpdateMessage::FullResponse(features));
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
Expand Down Expand Up @@ -623,7 +623,7 @@ mod tests {
use dashmap::DashMap;
use reqwest::Url;
use unleash_types::client_features::ClientFeature;
use unleash_yggdrasil::EngineState;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::feature_cache::{update_projects_from_feature_update, FeatureCache};
use crate::filters::{project_filter, FeatureFilterSet};
Expand Down Expand Up @@ -1043,7 +1043,7 @@ mod tests {
let example_features = features_from_disk("../examples/features.json");
let cache_key = cache_key(&token);
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let mut server = client_api_test_server(
Expand Down Expand Up @@ -1099,7 +1099,7 @@ mod tests {
let example_features = features_from_disk("../examples/features.json");
let cache_key = cache_key(&valid_token);
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let server = client_api_test_server(
Expand Down Expand Up @@ -1144,7 +1144,7 @@ mod tests {
let example_features = features_from_disk("../examples/hostedexample.json");
let cache_key = cache_key(&dx_token);
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let server = client_api_test_server(
Expand Down Expand Up @@ -1194,7 +1194,7 @@ mod tests {
let cache_key = cache_key(&dx_token);
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_engine_cache.insert(cache_key, engine_state);
let server = client_api_test_server(
upstream_token_cache,
Expand Down Expand Up @@ -1256,7 +1256,7 @@ mod tests {
let cache_key = cache_key(&dx_token);
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_engine_cache.insert(cache_key, engine_state);
let server = client_api_test_server(
upstream_token_cache,
Expand Down Expand Up @@ -1364,7 +1364,7 @@ mod tests {
let cache_key = cache_key(&eg_token);
upstream_features_cache.insert(cache_key.clone(), example_features.clone());
let mut engine_state = EngineState::default();
let warnings = engine_state.take_state(example_features.clone());
let warnings = engine_state.take_state(UpdateMessage::FullResponse(example_features.clone()));
upstream_engine_cache.insert(cache_key.clone(), engine_state);
let server = client_api_test_server(
upstream_token_cache,
Expand Down
36 changes: 25 additions & 11 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,31 @@ async fn main() -> Result<(), anyhow::Error> {
if edge.streaming {
let app_name = app_name.clone();
let custom_headers = custom_headers.clone();
tokio::spawn(async move {
let _ = refresher_for_background
.start_streaming_features_background_task(
ClientMetaInformation {
app_name,
instance_id,
},
custom_headers,
)
.await;
});
if edge.delta {
tokio::spawn(async move {
let _ = refresher_for_background
.start_streaming_delta_background_task(
ClientMetaInformation {
app_name,
instance_id,
},
custom_headers,
)
.await;
});
} else {
tokio::spawn(async move {
let _ = refresher_for_background
.start_streaming_features_background_task(
ClientMetaInformation {
app_name,
instance_id,
},
custom_headers,
)
.await;
});
}
}

let refresher = feature_refresher.clone().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions server/src/offline/offline_hotload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::warn;
use unleash_types::client_features::{
ClientFeature, ClientFeatures, Strategy, Variant, WeightType,
};
use unleash_yggdrasil::EngineState;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::{cli::OfflineArgs, error::EdgeError, feature_cache::FeatureCache, types::EdgeToken};

Expand Down Expand Up @@ -69,7 +69,7 @@ pub(crate) fn load_offline_engine_cache(
client_features.clone(),
);
let mut engine = EngineState::default();
let warnings = engine.take_state(client_features);
let warnings = engine.take_state(UpdateMessage::FullResponse(client_features));
engine_cache.insert(crate::tokens::cache_key(edge_token), engine);
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
Expand Down

0 comments on commit 5d66cb8

Please sign in to comment.