Skip to content

Commit

Permalink
More work for dealing with prometheus data
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswk committed Feb 5, 2025
1 parent 43591b9 commit 6e19fe1
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 27 deletions.
6 changes: 6 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ pub struct InternalBackstageArgs {
/// Used to show tokens used to refresh feature caches, but also tokens already validated/invalidated against upstream
#[clap(long, env, global = true)]
pub disable_tokens_endpoint: bool,

/// Disables /internal-backstage/instancedata endpoint
///
/// Used to show instance data for the edge instance.
#[clap(long, env, global = true)]
pub disable_instance_data_endpoint: bool,
}

#[derive(Args, Debug, Clone)]
Expand Down
30 changes: 28 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::RwLock;

use crate::cli::{EdgeArgs, EdgeMode};
use crate::error::EdgeError;
use crate::feature_cache::FeatureCache;
Expand All @@ -7,14 +9,16 @@ use crate::filters::{
use crate::http::broadcaster::Broadcaster;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::metrics::edge_metrics::EdgeInstanceData;
use crate::tokens::cache_key;
use crate::types::{
self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters,
};
use actix_web::web::{self, Data, Json, Query};
use actix_web::web::{self, post, Data, Json, Query};
use actix_web::Responder;
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
use tracing::{info, instrument};
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

Expand Down Expand Up @@ -272,6 +276,27 @@ pub async fn post_bulk_metrics(
);
Ok(HttpResponse::Accepted().finish())
}

#[utoipa::path(context_path = "/api/client", responses((status = 202, description = "Accepted Instance data"), (status = 403, description = "Was not allowed to post instance data")), request_body = EdgeInstanceData, security(
("Authorization" = [])
)
)]
#[post("/metrics/edge")]
#[instrument(skip(_edge_token, instance_data, connected_instances))]
pub async fn post_edge_instance_data(
_edge_token: EdgeToken,
instance_data: Json<EdgeInstanceData>,
connected_instances: Data<RwLock<Vec<EdgeInstanceData>>>,
) -> EdgeResult<HttpResponse> {
tracing::info!("Accepted {instance_data:?}");
connected_instances
.write()
.unwrap()
.push(instance_data.into_inner());
info!("Adding to {connected_instances:?}");
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
let client_scope = web::scope("/client")
.wrap(crate::middleware::as_async_middleware::as_async_middleware(
Expand All @@ -282,7 +307,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
.service(register)
.service(metrics)
.service(post_bulk_metrics)
.service(stream_features);
.service(stream_features)
.service(post_edge_instance_data);

cfg.service(client_scope);
}
Expand Down
47 changes: 47 additions & 0 deletions server/src/http/instance_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::sync::{Arc, RwLock};

use prometheus::Registry;
use tracing::{debug, info, trace};

use crate::metrics::edge_metrics::EdgeInstanceData;

use super::refresher::feature_refresher::FeatureRefresher;

pub async fn send_instance_data(
feature_refresher: Arc<FeatureRefresher>,
prometheus_registry: &Registry,
our_instance_data: Arc<EdgeInstanceData>,
downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
) {
loop {
trace!("Looping instance data sending");
let mut observed_data = our_instance_data.observe(prometheus_registry);
{
let downstream_instance_data = downstream_instance_data.read().unwrap().clone();
for downstream in downstream_instance_data {
observed_data = observed_data.add_downstream(downstream);
}
}
{
downstream_instance_data.write().unwrap().clear();
}
let status = feature_refresher
.unleash_client
.send_instance_data(
observed_data,
&feature_refresher
.tokens_to_refresh
.iter()
.next()
.map(|t| t.value().clone())
.map(|t| t.token.token.clone())
.expect("No token to refresh, cowardly panic'ing"),
)
.await;
match status {
Ok(_) => info!("Posted instance data"),
Err(_) => info!("Failed to post instance data"),
}
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
}
}
3 changes: 2 additions & 1 deletion server/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
pub mod background_send_metrics;
pub mod broadcaster;
pub(crate) mod headers;
pub mod unleash_client;
pub mod instance_data;
pub mod refresher;
pub mod unleash_client;
83 changes: 70 additions & 13 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::http::headers::{
UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
};
use crate::metrics::client_metrics::MetricsBatch;
use crate::metrics::edge_metrics::EdgeInstanceData;
use crate::tls::build_upstream_certificate;
use crate::types::{
ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken,
Expand Down Expand Up @@ -55,6 +56,20 @@ lazy_static! {
vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0]
)
.unwrap();
pub static ref METRICS_UPLOAD: HistogramVec = register_histogram_vec!(
"client_metrics_upload",
"Timings for uploading client metrics in milliseconds",
&["status_code"],
vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
)
.unwrap();
pub static ref INSTANCE_DATA_UPLOAD: HistogramVec = register_histogram_vec!(
"instance_data_upload",
"Timings for uploading Edge instance data in milliseconds",
&["status_code"],
vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
)
.unwrap();
pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!(
Opts::new(
"client_feature_fetch_failures",
Expand Down Expand Up @@ -243,7 +258,6 @@ impl UnleashClient {

#[cfg(test)]
pub fn new_insecure(server_url: &str) -> Result<Self, EdgeError> {

Ok(Self {
urls: UnleashUrls::from_str(server_url)?,
backing_client: new_reqwest_client(
Expand Down Expand Up @@ -518,6 +532,7 @@ impl UnleashClient {
token: &str,
) -> EdgeResult<()> {
trace!("Sending metrics to bulk endpoint");
let started_at = Utc::now();
let result = self
.backing_client
.post(self.urls.client_bulk_metrics_url.to_string())
Expand All @@ -529,6 +544,48 @@ impl UnleashClient {
info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}");
EdgeError::EdgeMetricsError
})?;
let ended = Utc::now();
METRICS_UPLOAD
.with_label_values(&[result.status().as_str()])
.observe(ended.signed_duration_since(started_at).num_milliseconds() as f64);
if result.status().is_success() {
Ok(())
} else {
match result.status() {
StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
result.status(),
result.json().await.ok(),
)),
_ => Err(EdgeMetricsRequestError(result.status(), None)),
}
}
}

pub async fn send_instance_data(
&self,
instance_data: EdgeInstanceData,
token: &str,
) -> EdgeResult<()> {
let started_at = Utc::now();
let result = self
.backing_client
.post(self.urls.edge_instance_data_url.to_string())
.headers(self.header_map(Some(token.into())))
.json(&instance_data)
.send()
.await
.map_err(|e| {
info!("Failed to send instance data: {e:?}");
EdgeError::EdgeMetricsError
})?;
let ended_at = Utc::now();
INSTANCE_DATA_UPLOAD
.with_label_values(&[result.status().as_str()])
.observe(
ended_at
.signed_duration_since(started_at)
.num_milliseconds() as f64,
);
if result.status().is_success() {
Ok(())
} else {
Expand Down Expand Up @@ -609,17 +666,6 @@ mod tests {
use std::path::PathBuf;
use std::str::FromStr;

use actix_http::{body::MessageBody, HttpService, TlsAcceptorConfig};
use actix_http_test::{test_server, TestServer};
use actix_middleware_etag::Etag;
use actix_service::map_config;
use actix_web::{
dev::{AppConfig, ServiceRequest, ServiceResponse},
http::header::EntityTag,
web, App, HttpResponse,
};
use chrono::Duration;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use crate::cli::ClientIdentity;
use crate::http::unleash_client::new_reqwest_client;
use crate::{
Expand All @@ -631,8 +677,19 @@ mod tests {
ValidateTokensRequest,
},
};
use actix_http::{body::MessageBody, HttpService, TlsAcceptorConfig};
use actix_http_test::{test_server, TestServer};
use actix_middleware_etag::Etag;
use actix_service::map_config;
use actix_web::{
dev::{AppConfig, ServiceRequest, ServiceResponse},
http::header::EntityTag,
web, App, HttpResponse,
};
use chrono::Duration;
use unleash_types::client_features::{ClientFeature, ClientFeatures};

use super::{EdgeTokens, UnleashClient, ClientMetaInformation};
use super::{ClientMetaInformation, EdgeTokens, UnleashClient};

impl ClientFeaturesRequest {
pub(crate) fn new(api_key: String, etag: Option<String>) -> Self {
Expand Down
34 changes: 28 additions & 6 deletions server/src/internal_backstage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use serde::{Deserialize, Serialize};
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ClientApplication;

use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::actix_web_metrics::PrometheusMetricsHandler;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::{BuildInfo, EdgeJsonResult, EdgeToken, TokenInfo, TokenRefresh};
use crate::types::{ClientMetric, MetricsInfo, Status};
use crate::{auth::token_validator::TokenValidator, cli::InternalBackstageArgs};
use crate::{error::EdgeError, feature_cache::FeatureCache};
use crate::{
http::refresher::feature_refresher::FeatureRefresher, metrics::edge_metrics::EdgeInstanceData,
};

#[derive(Debug, Serialize, Deserialize)]
pub struct EdgeStatus {
Expand Down Expand Up @@ -146,24 +148,44 @@ pub async fn features(
Ok(Json(features))
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DebugEdgeInstanceData {
pub this_instance: EdgeInstanceData,
pub connected_instances: Vec<EdgeInstanceData>,
}

#[get("/instancedata")]
pub async fn instance_data(
this_instance: web::Data<EdgeInstanceData>,
downstream_instance_data: web::Data<std::sync::RwLock<Vec<EdgeInstanceData>>>,
) -> EdgeJsonResult<DebugEdgeInstanceData> {
Ok(Json(DebugEdgeInstanceData {
this_instance: this_instance.get_ref().clone(),
connected_instances: downstream_instance_data.read().unwrap().clone(),
}))
}

pub fn configure_internal_backstage(
cfg: &mut web::ServiceConfig,
metrics_handler: PrometheusMetricsHandler,
internal_backtage_args: InternalBackstageArgs,
internal_backstage_args: InternalBackstageArgs,
) {
cfg.service(health).service(info).service(ready);
if !internal_backtage_args.disable_tokens_endpoint {
if !internal_backstage_args.disable_tokens_endpoint {
cfg.service(tokens);
}
if !internal_backtage_args.disable_metrics_endpoint {
if !internal_backstage_args.disable_metrics_endpoint {
cfg.service(web::resource("/metrics").route(web::get().to(metrics_handler)));
}
if !internal_backtage_args.disable_metrics_batch_endpoint {
if !internal_backstage_args.disable_metrics_batch_endpoint {
cfg.service(metrics_batch);
}
if !internal_backtage_args.disable_features_endpoint {
if !internal_backstage_args.disable_features_endpoint {
cfg.service(features);
}
if !internal_backstage_args.disable_instance_data_endpoint {
cfg.service(instance_data);
}
}

#[cfg(test)]
Expand Down
20 changes: 16 additions & 4 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use unleash_edge::{internal_backstage, tls};
#[cfg(not(tarpaulin_include))]
#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
use std::sync::RwLock;

use unleash_edge::{
http::{broadcaster::Broadcaster, unleash_client::ClientMetaInformation},
metrics::metrics_pusher,
metrics::{edge_metrics::EdgeInstanceData, metrics_pusher},
};

let args = CliArgs::parse();
let disable_all_endpoint = args.disable_all_endpoint;
if args.markdown_help {
Expand Down Expand Up @@ -62,6 +63,8 @@ async fn main() -> Result<(), anyhow::Error> {
instance_id: args.clone().instance_id,
};
let app_name = args.app_name.clone();
let our_instance_data_for_app_context = Arc::new(EdgeInstanceData::new(&app_name));
let our_instance_data = our_instance_data_for_app_context.clone();
let instance_id = args.instance_id.clone();
let custom_headers = match args.mode {
cli::EdgeMode::Edge(ref edge) => edge.custom_client_headers.clone(),
Expand Down Expand Up @@ -89,6 +92,9 @@ async fn main() -> Result<(), anyhow::Error> {
let openapi = openapi::ApiDoc::openapi();
let refresher_for_app_data = feature_refresher.clone();
let prom_registry_for_write = metrics_handler.registry.clone();
let instances_observed_for_app_context: Arc<RwLock<Vec<EdgeInstanceData>>> =
Arc::new(RwLock::new(Vec::new()));
let downstream_instance_data = instances_observed_for_app_context.clone();

let broadcaster = Broadcaster::new(features_cache.clone());

Expand All @@ -111,7 +117,9 @@ async fn main() -> Result<(), anyhow::Error> {
.app_data(web::Data::from(token_cache.clone()))
.app_data(web::Data::from(features_cache.clone()))
.app_data(web::Data::from(engine_cache.clone()))
.app_data(web::Data::from(broadcaster.clone()));
.app_data(web::Data::from(broadcaster.clone()))
.app_data(web::Data::from(our_instance_data_for_app_context.clone()))
.app_data(web::Data::from(instances_observed_for_app_context.clone()));

app = match token_validator.clone() {
Some(v) => app.app_data(web::Data::from(v)),
Expand Down Expand Up @@ -173,6 +181,7 @@ async fn main() -> Result<(), anyhow::Error> {
let custom_headers = custom_headers.clone();
tokio::spawn(async move {
let _ = refresher_for_background
.clone()
.start_streaming_features_background_task(
ClientMetaInformation {
app_name,
Expand Down Expand Up @@ -209,9 +218,12 @@ async fn main() -> Result<(), anyhow::Error> {
_ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => {
tracing::info!("Token validator validation of startup tokens was unexpectedly shut down");
}
_ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => {
_ = metrics_pusher::prometheus_remote_write(prom_registry_for_write.clone(), edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => {
tracing::info!("Prometheus push unexpectedly shut down");
}
_ = unleash_edge::http::instance_data::send_instance_data(refresher.clone(), &prom_registry_for_write, our_instance_data.clone(), downstream_instance_data.clone()) => {
tracing::info!("Instance data pusher unexpectedly quit");
}
}
}
cli::EdgeMode::Offline(offline_args) if offline_args.reload_interval > 0 => {
Expand Down
Loading

0 comments on commit 6e19fe1

Please sign in to comment.