diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index 729745d2..989df797 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -28,6 +28,7 @@ lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = fa # Azure Identity dependencies - using public crates.io versions azure_identity = "0.27.0" azure_core = "0.27.0" +tracing = "0.1" [features] self_signed_certs = [] # Empty by default for security diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index 8994e06b..6a66e537 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -7,6 +7,7 @@ use crate::payload_encoder::otlp_encoder::OtlpEncoder; use opentelemetry_proto::tonic::logs::v1::ResourceLogs; use opentelemetry_proto::tonic::trace::v1::ResourceSpans; use std::sync::Arc; +use tracing::{debug, info}; /// Public batch type (already LZ4 chunked compressed). /// Produced by `OtlpEncoder::encode_log_batch` and returned to callers. @@ -44,6 +45,15 @@ pub struct GenevaClient { impl GenevaClient { pub fn new(cfg: GenevaClientConfig) -> Result { + info!( + name: "client.new", + target: "geneva-uploader", + endpoint = %cfg.endpoint, + namespace = %cfg.namespace, + account = %cfg.account, + "Initializing GenevaClient" + ); + // Validate MSI resource presence for managed identity variants match cfg.auth_method { AuthMethod::SystemManagedIdentity @@ -51,6 +61,11 @@ impl GenevaClient { | AuthMethod::UserManagedIdentityByObjectId { .. } | AuthMethod::UserManagedIdentityByResourceId { .. } => { if cfg.msi_resource.is_none() { + debug!( + name: "client.new.validate_msi_resource", + target: "geneva-uploader", + "Validation failed: msi_resource must be provided for managed identity auth" + ); return Err( "msi_resource must be provided for managed identity auth".to_string() ); @@ -71,10 +86,16 @@ impl GenevaClient { auth_method: cfg.auth_method, msi_resource: cfg.msi_resource, }; - let config_client = Arc::new( - GenevaConfigClient::new(config_client_config) - .map_err(|e| format!("GenevaConfigClient init failed: {e}"))?, - ); + let config_client = + Arc::new(GenevaConfigClient::new(config_client_config).map_err(|e| { + debug!( + name: "client.new.config_client_init", + target: "geneva-uploader", + error = %e, + "GenevaConfigClient init failed" + ); + format!("GenevaConfigClient init failed: {e}") + })?); let source_identity = format!( "Tenant={}/Role={}/RoleInstance={}", @@ -95,8 +116,22 @@ impl GenevaClient { config_version: config_version.clone(), }; - let uploader = GenevaUploader::from_config_client(config_client, uploader_config) - .map_err(|e| format!("GenevaUploader init failed: {e}"))?; + let uploader = + GenevaUploader::from_config_client(config_client, uploader_config).map_err(|e| { + debug!( + name: "client.new.uploader_init", + target: "geneva-uploader", + error = %e, + "GenevaUploader init failed" + ); + format!("GenevaUploader init failed: {e}") + })?; + + info!( + name: "client.new.complete", + target: "geneva-uploader", + "GenevaClient initialized successfully" + ); Ok(Self { uploader: Arc::new(uploader), @@ -110,6 +145,13 @@ impl GenevaClient { &self, logs: &[ResourceLogs], ) -> Result, String> { + debug!( + name: "client.encode_and_compress_logs", + target: "geneva-uploader", + resource_logs_count = logs.len(), + "Encoding and compressing resource logs" + ); + let log_iter = logs .iter() .flat_map(|resource_log| resource_log.scope_logs.iter()) @@ -117,7 +159,15 @@ impl GenevaClient { self.encoder .encode_log_batch(log_iter, &self.metadata) - .map_err(|e| format!("Compression failed: {e}")) + .map_err(|e| { + debug!( + name: "client.encode_and_compress_logs.error", + target: "geneva-uploader", + error = %e, + "Log compression failed" + ); + format!("Compression failed: {e}") + }) } /// Encode OTLP spans into LZ4 chunked compressed batches. @@ -125,6 +175,13 @@ impl GenevaClient { &self, spans: &[ResourceSpans], ) -> Result, String> { + debug!( + name: "client.encode_and_compress_spans", + target: "geneva-uploader", + resource_spans_count = spans.len(), + "Encoding and compressing resource spans" + ); + let span_iter = spans .iter() .flat_map(|resource_span| resource_span.scope_spans.iter()) @@ -132,16 +189,48 @@ impl GenevaClient { self.encoder .encode_span_batch(span_iter, &self.metadata) - .map_err(|e| format!("Compression failed: {e}")) + .map_err(|e| { + debug!( + name: "client.encode_and_compress_spans.error", + target: "geneva-uploader", + error = %e, + "Span compression failed" + ); + format!("Compression failed: {e}") + }) } /// Upload a single compressed batch. /// This allows for granular control over uploads, including custom retry logic for individual batches. pub async fn upload_batch(&self, batch: &EncodedBatch) -> Result<(), String> { + debug!( + name: "client.upload_batch", + target: "geneva-uploader", + event_name = %batch.event_name, + size = batch.data.len(), + "Uploading batch" + ); + self.uploader .upload(batch.data.clone(), &batch.event_name, &batch.metadata) .await - .map(|_| ()) - .map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name)) + .map(|_| { + debug!( + name: "client.upload_batch.success", + target: "geneva-uploader", + event_name = %batch.event_name, + "Successfully uploaded batch" + ); + }) + .map_err(|e| { + debug!( + name: "client.upload_batch.error", + target: "geneva-uploader", + event_name = %batch.event_name, + error = %e, + "Geneva upload failed" + ); + format!("Geneva upload failed: {e} Event: {}", batch.event_name) + }) } } diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 55dd2124..474f9930 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -8,6 +8,7 @@ use reqwest::{ use serde::Deserialize; use std::time::Duration; use thiserror::Error; +use tracing::{debug, info}; use uuid::Uuid; use chrono::{DateTime, Utc}; @@ -252,6 +253,15 @@ impl GenevaConfigClient { /// * `GenevaConfigClientError::AuthMethodNotImplemented` - If the specified authentication method is not yet supported #[allow(dead_code)] pub(crate) fn new(config: GenevaConfigClientConfig) -> Result { + info!( + name: "config_client.new", + target: "geneva-uploader", + endpoint = %config.endpoint, + account = %config.account, + namespace = %config.namespace, + "Initializing GenevaConfigClient" + ); + let agent_identity = "GenevaUploader"; let agent_version = "0.1"; @@ -264,30 +274,73 @@ impl GenevaConfigClient { // TODO: Certificate auth would be removed in favor of managed identity., // This is for testing, so we can use self-signed certs, and password in plain text. AuthMethod::Certificate { path, password } => { + info!( + name: "config_client.new.certificate_auth", + target: "geneva-uploader", + "Using Certificate authentication" + ); // Read the PKCS#12 file - let p12_bytes = fs::read(path) - .map_err(|e| GenevaConfigClientError::Certificate(e.to_string()))?; - let identity = Identity::from_pkcs12(&p12_bytes, password) - .map_err(|e| GenevaConfigClientError::Certificate(e.to_string()))?; + let p12_bytes = fs::read(path).map_err(|e| { + debug!( + name: "config_client.new.certificate_read_error", + target: "geneva-uploader", + error = %e, + "Failed to read certificate file" + ); + GenevaConfigClientError::Certificate(e.to_string()) + })?; + let identity = Identity::from_pkcs12(&p12_bytes, password).map_err(|e| { + debug!( + name: "config_client.new.certificate_parse_error", + target: "geneva-uploader", + error = %e, + "Failed to parse PKCS#12 certificate" + ); + GenevaConfigClientError::Certificate(e.to_string()) + })?; //TODO - use use_native_tls instead of preconfigured_tls once we no longer need self-signed certs // and TLS 1.2 as the exclusive protocol. let tls_connector = configure_tls_connector(native_tls::TlsConnector::builder(), identity) .build() - .map_err(|e| GenevaConfigClientError::Certificate(e.to_string()))?; + .map_err(|e| { + debug!( + name: "config_client.new.tls_connector_error", + target: "geneva-uploader", + error = %e, + "Failed to build TLS connector" + ); + GenevaConfigClientError::Certificate(e.to_string()) + })?; client_builder = client_builder.use_preconfigured_tls(tls_connector); } AuthMethod::WorkloadIdentity { .. } => { + info!( + name: "config_client.new.workload_identity_auth", + target: "geneva-uploader", + "Using Workload Identity authentication" + ); // No special HTTP client configuration needed for Workload Identity // Authentication is done via Bearer token in request headers } AuthMethod::SystemManagedIdentity | AuthMethod::UserManagedIdentity { .. } | AuthMethod::UserManagedIdentityByObjectId { .. } - | AuthMethod::UserManagedIdentityByResourceId { .. } => { /* no special HTTP client changes needed */ + | AuthMethod::UserManagedIdentityByResourceId { .. } => { + info!( + name: "config_client.new.managed_identity_auth", + target: "geneva-uploader", + "Using Managed Identity authentication" + ); + /* no special HTTP client changes needed */ } #[cfg(feature = "mock_auth")] AuthMethod::MockAuth => { + debug!( + name: "config_client.new.mock_auth_warning", + target: "geneva-uploader", + "WARNING: Using MockAuth for GenevaConfigClient. This should only be used in tests!" + ); // Mock authentication for testing purposes, no actual auth needed // Just use the default client builder eprintln!("WARNING: Using MockAuth for GenevaConfigClient. This should only be used in tests!"); @@ -361,14 +414,26 @@ impl GenevaConfigClient { /// - AZURE_CLIENT_ID and AZURE_TENANT_ID must be set explicitly in the pod spec /// - AZURE_FEDERATED_TOKEN_FILE is auto-injected by the workload identity webhook async fn get_workload_identity_token(&self) -> Result { - let resource = - match &self.config.auth_method { - AuthMethod::WorkloadIdentity { resource } => resource, - _ => return Err(GenevaConfigClientError::WorkloadIdentityAuth( + debug!( + name: "config_client.get_workload_identity_token", + target: "geneva-uploader", + "Acquiring Workload Identity token" + ); + + let resource = match &self.config.auth_method { + AuthMethod::WorkloadIdentity { resource } => resource, + _ => { + debug!( + name: "config_client.get_workload_identity_token.invalid_auth_method", + target: "geneva-uploader", + "get_workload_identity_token called but auth method is not WorkloadIdentity" + ); + return Err(GenevaConfigClientError::WorkloadIdentityAuth( "get_workload_identity_token called but auth method is not WorkloadIdentity" .to_string(), - )), - }; + )); + } + }; // TODO: Extract scope generation logic into helper function shared with get_msi_token() let base = resource.trim_end_matches("/.default").trim_end_matches('/'); @@ -382,6 +447,12 @@ impl GenevaConfigClient { // Pass None to let azure_identity crate read AZURE_CLIENT_ID, AZURE_TENANT_ID, // and AZURE_FEDERATED_TOKEN_FILE from environment variables automatically let credential = WorkloadIdentityCredential::new(None).map_err(|e| { + debug!( + name: "config_client.get_workload_identity_token.create_credential_error", + target: "geneva-uploader", + error = %e, + "Failed to create WorkloadIdentityCredential" + ); GenevaConfigClientError::WorkloadIdentityAuth(format!( "Failed to create WorkloadIdentityCredential. Ensure AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_FEDERATED_TOKEN_FILE environment variables are set: {e}" )) @@ -391,12 +462,26 @@ impl GenevaConfigClient { for scope in &scope_candidates { //TODO - It looks like the get_token API accepts a slice of &str match credential.get_token(&[scope.as_str()], None).await { - Ok(token) => return Ok(token.token.secret().to_string()), + Ok(token) => { + info!( + name: "config_client.get_workload_identity_token.success", + target: "geneva-uploader", + "Successfully acquired Workload Identity token" + ); + return Ok(token.token.secret().to_string()); + } Err(e) => last_err = Some(e.to_string()), } } let detail = last_err.unwrap_or_else(|| "no error detail".into()); + debug!( + name: "config_client.get_workload_identity_token.failed", + target: "geneva-uploader", + scopes = %scope_candidates.join(", "), + error = %detail, + "Workload Identity token acquisition failed" + ); Err(GenevaConfigClientError::WorkloadIdentityAuth(format!( "Workload Identity token acquisition failed. Scopes tried: {scopes}. Last error: {detail}", scopes = scope_candidates.join(", ") @@ -405,7 +490,18 @@ impl GenevaConfigClient { /// Get MSI token for GCS authentication async fn get_msi_token(&self) -> Result { + debug!( + name: "config_client.get_msi_token", + target: "geneva-uploader", + "Acquiring Managed Identity token" + ); + let resource = self.config.msi_resource.as_ref().ok_or_else(|| { + debug!( + name: "config_client.get_msi_token.missing_msi_resource", + target: "geneva-uploader", + "msi_resource not set in config (required for Managed Identity auth)" + ); GenevaConfigClientError::MsiAuth( "msi_resource not set in config (required for Managed Identity auth)".to_string(), ) @@ -444,18 +540,38 @@ impl GenevaConfigClient { ..Default::default() }; let credential = ManagedIdentityCredential::new(Some(options)).map_err(|e| { + debug!( + name: "config_client.get_msi_token.create_credential_error", + target: "geneva-uploader", + error = %e, + "Failed to create MSI credential" + ); GenevaConfigClientError::MsiAuth(format!("Failed to create MSI credential: {e}")) })?; let mut last_err: Option = None; for scope in &scope_candidates { match credential.get_token(&[scope.as_str()], None).await { - Ok(token) => return Ok(token.token.secret().to_string()), + Ok(token) => { + info!( + name: "config_client.get_msi_token.success", + target: "geneva-uploader", + "Successfully acquired Managed Identity token" + ); + return Ok(token.token.secret().to_string()); + } Err(e) => last_err = Some(e.to_string()), } } let detail = last_err.unwrap_or_else(|| "no error detail".into()); + debug!( + name: "config_client.get_msi_token.failed", + target: "geneva-uploader", + scopes = %scope_candidates.join(", "), + error = %detail, + "Managed Identity token acquisition failed" + ); Err(GenevaConfigClientError::MsiAuth(format!( "Managed Identity token acquisition failed. Scopes tried: {scopes}. Last error: {detail}. IMDS fallback intentionally disabled.", scopes = scope_candidates.join(", ") @@ -509,16 +625,34 @@ impl GenevaConfigClient { pub(crate) async fn get_ingestion_info( &self, ) -> Result<(IngestionGatewayInfo, MonikerInfo, String)> { + debug!( + name: "config_client.get_ingestion_info", + target: "geneva-uploader", + "Getting ingestion info (checking cache first)" + ); + // First, try to read from cache (shared read access) if let Ok(guard) = self.cached_data.read() { if let Some(cached_data) = guard.as_ref() { let expiry = cached_data.token_expiry; if expiry > Utc::now() + chrono::Duration::minutes(5) { + debug!( + name: "config_client.get_ingestion_info.cache_hit", + target: "geneva-uploader", + expiry = %expiry, + "Using cached ingestion info" + ); return Ok(( cached_data.auth_info.0.clone(), cached_data.auth_info.1.clone(), cached_data.token_endpoint.clone(), )); + } else { + debug!( + name: "config_client.get_ingestion_info.cache_expired", + target: "geneva-uploader", + "Cached token expired or expiring soon, fetching fresh data" + ); } } } @@ -579,14 +713,33 @@ impl GenevaConfigClient { /// Internal method that actually fetches data from Geneva Config Service async fn fetch_ingestion_info(&self) -> Result<(IngestionGatewayInfo, MonikerInfo)> { + info!( + name: "config_client.fetch_ingestion_info", + target: "geneva-uploader", + "Fetching fresh ingestion info from Geneva Config Service" + ); + let tag_id = Uuid::new_v4().to_string(); // TODO: consider cheaper counter if perf-critical let mut url = String::with_capacity(self.precomputed_url_prefix.len() + 50); write!(&mut url, "{}&TagId={tag_id}", self.precomputed_url_prefix).map_err(|e| { + debug!( + name: "config_client.fetch_ingestion_info.write_url_error", + target: "geneva-uploader", + error = %e, + "Failed to write URL" + ); GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}")) })?; let req_id = Uuid::new_v4().to_string(); + debug!( + name: "config_client.fetch_ingestion_info.request", + target: "geneva-uploader", + request_id = %req_id, + "Sending config request with request_id" + ); + let mut request = self.http_client.get(&url); request = request.header("x-ms-client-request-id", req_id); @@ -612,31 +765,77 @@ impl GenevaConfigClient { // Send HTTP request let response = match request.send().await { Ok(resp) => resp, - Err(e) => return Err(GenevaConfigClientError::Http(e)), + Err(e) => { + debug!( + name: "config_client.fetch_ingestion_info.http_error", + target: "geneva-uploader", + error = %e, + "Config service HTTP request failed" + ); + return Err(GenevaConfigClientError::Http(e)); + } }; let status = response.status(); let body = response.text().await?; if status.is_success() { - let parsed = serde_json::from_str::(&body).map_err(|e| { - GenevaConfigClientError::AuthInfoNotFound(format!("Failed to parse response: {e}")) - })?; + debug!( + name: "config_client.fetch_ingestion_info.response", + target: "geneva-uploader", + "Config service returned success status" + ); + + let parsed: GenevaResponse = match serde_json::from_str::(&body) { + Ok(p) => p, + Err(e) => { + debug!( + name: "config_client.fetch_ingestion_info.parse_error", + target: "geneva-uploader", + error = %e, + "Failed to parse config service response" + ); + return Err(GenevaConfigClientError::AuthInfoNotFound(format!( + "Failed to parse response: {e}" + ))); + } + }; for account in parsed.storage_account_keys { if account.is_primary_moniker && account.account_moniker_name.contains("diag") { + // Move (not clone) the strings out of the StorageAccountKey; no extra allocation + let account_moniker_name = account.account_moniker_name; + let account_group_name = account.account_group_name; let moniker_info = MonikerInfo { - name: account.account_moniker_name, - account_group: account.account_group_name, + name: account_moniker_name, + account_group: account_group_name, }; + info!( + name: "config_client.fetch_ingestion_info.success", + target: "geneva-uploader", + moniker = %moniker_info.name, + "Successfully retrieved ingestion info" + ); return Ok((parsed.ingestion_gateway_info, moniker_info)); } } + debug!( + name: "config_client.fetch_ingestion_info.no_diag_moniker", + target: "geneva-uploader", + "No primary diag moniker found in storage accounts" + ); Err(GenevaConfigClientError::MonikerNotFound( "No primary diag moniker found in storage accounts".to_string(), )) } else { + debug!( + name: "config_client.fetch_ingestion_info.error_status", + target: "geneva-uploader", + status = status.as_u16(), + body = %body, + "Config service returned error" + ); Err(GenevaConfigClientError::RequestFailed { status: status.as_u16(), message: body, diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs index 141fbf63..e530a63f 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs @@ -9,6 +9,7 @@ use std::fmt::Write; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tracing::debug; use url::form_urlencoded::byte_serialize; use uuid::Uuid; @@ -214,6 +215,14 @@ impl GenevaUploader { event_name: &str, metadata: &BatchMetadata, ) -> Result { + debug!( + name: "uploader.upload", + target: "geneva-uploader", + event_name = %event_name, + size = data.len(), + "Starting upload" + ); + // Always get fresh auth info let (auth_info, moniker_info, monitoring_endpoint) = self.config_client.get_ingestion_info().await?; @@ -230,6 +239,15 @@ impl GenevaUploader { auth_info.endpoint.trim_end_matches('/'), upload_uri ); + + debug!( + name: "uploader.upload.post", + target: "geneva-uploader", + event_name = %event_name, + moniker = %moniker_info.name, + "Posting to ingestion gateway" + ); + // Send the upload request let response = self .http_client @@ -245,11 +263,34 @@ impl GenevaUploader { let body = response.text().await?; if status == reqwest::StatusCode::ACCEPTED { - let ingest_response: IngestionResponse = - serde_json::from_str(&body).map_err(GenevaUploaderError::SerdeJson)?; + let ingest_response: IngestionResponse = serde_json::from_str(&body).map_err(|e| { + debug!( + name: "uploader.upload.parse_error", + target: "geneva-uploader", + error = %e, + "Failed to parse ingestion response" + ); + GenevaUploaderError::SerdeJson(e) + })?; + + debug!( + name: "uploader.upload.success", + target: "geneva-uploader", + event_name = %event_name, + ticket = %ingest_response.ticket, + "Upload successful" + ); Ok(ingest_response) } else { + debug!( + name: "uploader.upload.failed", + target: "geneva-uploader", + event_name = %event_name, + status = status.as_u16(), + body = %body, + "Upload failed" + ); Err(GenevaUploaderError::UploadFailed { status: status.as_u16(), message: body, diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index f783efee..af07c86f 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -10,6 +10,7 @@ use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::trace::v1::Span; use std::borrow::Cow; use std::sync::Arc; +use tracing::debug; const FIELD_ENV_NAME: &str = "env_name"; const FIELD_ENV_VER: &str = "env_ver"; @@ -152,6 +153,9 @@ impl OtlpEncoder { let schema_ids_string = batch_data.format_schema_ids(); batch_data.metadata.schema_ids = schema_ids_string; + let schemas_count = batch_data.schemas.len(); + let events_count = batch_data.events.len(); + let blob = CentralBlob { version: 1, format: 2, @@ -160,8 +164,28 @@ impl OtlpEncoder { events: batch_data.events, }; let uncompressed = blob.to_bytes(); - let compressed = lz4_chunked_compression(&uncompressed) - .map_err(|e| format!("compression failed: {e}"))?; + let compressed = lz4_chunked_compression(&uncompressed).map_err(|e| { + debug!( + name: "encoder.encode_log_batch.compress_error", + target: "geneva-uploader", + event_name = %batch_event_name, + error = %e, + "LZ4 compression failed" + ); + format!("compression failed: {e}") + })?; + + debug!( + name: "encoder.encode_log_batch", + target: "geneva-uploader", + event_name = %batch_event_name, + schemas = schemas_count, + events = events_count, + uncompressed_size = uncompressed.len(), + compressed_size = compressed.len(), + "Encoded log batch" + ); + blobs.push(EncodedBatch { event_name: batch_event_name.to_string(), data: compressed, @@ -266,6 +290,9 @@ impl OtlpEncoder { schema_ids: schema_ids_string, }; + let schemas_count = schemas.len(); + let events_count = events.len(); + let blob = CentralBlob { version: 1, format: 2, @@ -275,8 +302,26 @@ impl OtlpEncoder { }; let uncompressed = blob.to_bytes(); - let compressed = lz4_chunked_compression(&uncompressed) - .map_err(|e| format!("compression failed: {e}"))?; + let compressed = lz4_chunked_compression(&uncompressed).map_err(|e| { + debug!( + name: "encoder.encode_span_batch.compress_error", + target: "geneva-uploader", + error = %e, + "LZ4 compression failed for spans" + ); + format!("compression failed: {e}") + })?; + + debug!( + name: "encoder.encode_span_batch", + target: "geneva-uploader", + event_name = EVENT_NAME, + schemas = schemas_count, + spans = events_count, + uncompressed_size = uncompressed.len(), + compressed_size = compressed.len(), + "Encoded span batch" + ); Ok(vec![EncodedBatch { event_name: EVENT_NAME.to_string(), diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs index b2ae12ef..6763edb4 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs @@ -1,4 +1,30 @@ //! run with `$ cargo run --example basic +//! +//! # Geneva Uploader Internal Logs +//! +//! By default, this example enables DEBUG level logs for geneva-uploader, showing all internal +//! operations including initialization, auth, encoding, compression, and uploads. +//! +//! ## Default behavior (no RUST_LOG needed) +//! ```bash +//! cargo run --example basic +//! ``` +//! This shows DEBUG level logs from geneva-uploader. +//! +//! ## Override to INFO level (initialization, auth token acquisition, GCS config only) +//! ```bash +//! RUST_LOG=geneva-uploader=info cargo run --example basic +//! ``` +//! +//! ## Disable geneva-uploader logs +//! ```bash +//! RUST_LOG=geneva-uploader=off cargo run --example basic +//! ``` +//! +//! ## Filter out noisy dependencies while keeping geneva-uploader at DEBUG +//! ```bash +//! RUST_LOG=hyper=off,reqwest=off cargo run --example basic +//! ``` use geneva_uploader::client::{GenevaClient, GenevaClientConfig}; use geneva_uploader::AuthMethod; @@ -102,13 +128,32 @@ async fn main() { .add_directive("reqwest=off".parse().unwrap()); let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel); - // Create a new tracing::Fmt layer to print the logs to stdout. It has a - // default filter of `info` level and above, and `debug` and above for logs - // from OpenTelemetry crates. The filter levels can be customized as needed. - let filter_fmt = EnvFilter::new("info") + // Create a new tracing::Fmt layer to print the logs to stdout. + // Default filter: info level for most logs, debug level for opentelemetry, hyper, reqwest, and geneva-uploader. + // Users can override these defaults with RUST_LOG (later directives override earlier ones). + // Examples: + // cargo run --example basic # Uses defaults (geneva-uploader=debug) + // RUST_LOG=geneva-uploader=info cargo run --example basic # Override to info level + // RUST_LOG=geneva-uploader=off cargo run --example basic # Disable geneva-uploader logs + // RUST_LOG=hyper=off,reqwest=off cargo run --example basic # Quiet noisy deps, keep geneva-uploader=debug + let mut filter_fmt = EnvFilter::new("info") + .add_directive("opentelemetry=debug".parse().unwrap()) .add_directive("hyper=debug".parse().unwrap()) .add_directive("reqwest=debug".parse().unwrap()) - .add_directive("opentelemetry=debug".parse().unwrap()); + .add_directive("geneva-uploader=debug".parse().unwrap()); + + if let Ok(spec) = std::env::var("RUST_LOG") { + for part in spec.split(',') { + let p = part.trim(); + if p.is_empty() { + continue; + } + if let Ok(d) = p.parse() { + filter_fmt = filter_fmt.add_directive(d); + } + } + } + let fmt_layer = tracing_subscriber::fmt::layer() .with_thread_names(true) .with_filter(filter_fmt); diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_msi_test.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_msi_test.rs index 9012f691..4d296317 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_msi_test.rs +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_msi_test.rs @@ -151,7 +151,8 @@ async fn main() { let filter_fmt = EnvFilter::new("info") .add_directive("hyper=debug".parse().unwrap()) .add_directive("reqwest=debug".parse().unwrap()) - .add_directive("opentelemetry=debug".parse().unwrap()); + .add_directive("opentelemetry=debug".parse().unwrap()) + .add_directive("geneva-uploader=debug".parse().unwrap()); let fmt_layer = tracing_subscriber::fmt::layer() .with_thread_names(true) .with_filter(filter_fmt); diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_workload_identity_test.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_workload_identity_test.rs index 6777cfc4..6c4ec918 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_workload_identity_test.rs +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic_workload_identity_test.rs @@ -130,7 +130,8 @@ async fn main() { let filter_fmt = EnvFilter::new("info") .add_directive("hyper=debug".parse().unwrap()) .add_directive("reqwest=debug".parse().unwrap()) - .add_directive("opentelemetry=debug".parse().unwrap()); + .add_directive("opentelemetry=debug".parse().unwrap()) + .add_directive("geneva-uploader=debug".parse().unwrap()); let fmt_layer = tracing_subscriber::fmt::layer() .with_thread_names(true) .with_filter(filter_fmt);