Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9ce17b5
initial logs
lalitb Oct 14, 2025
984acfc
fix
lalitb Oct 14, 2025
4a15d01
fix
lalitb Oct 14, 2025
d28885b
format
lalitb Oct 14, 2025
4d70368
fix
lalitb Oct 14, 2025
86d2ae0
fix
lalitb Oct 14, 2025
1156ece
fmt
lalitb Oct 14, 2025
956261d
Merge branch 'main' into add-geneva-logs
utpilla Oct 16, 2025
2daa2bd
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
01fa7b4
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
0a6ed0b
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
5f880dc
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
61019a8
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
9d4ba4f
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
b84ef98
Update opentelemetry-exporter-geneva/geneva-uploader/src/config_servi…
lalitb Oct 17, 2025
e6f094a
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
86af6df
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
e04d0a2
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
be5fafa
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
4370e3f
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
bce88b7
Update opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
lalitb Oct 17, 2025
2220a2f
fix
lalitb Oct 17, 2025
d9e33f8
fix info/debug usage
lalitb Oct 17, 2025
6650e1f
fix the attribute order
lalitb Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = fa
# Azure Identity dependencies - using public crates.io versions
azure_identity = "0.27.0"
azure_core = "0.27.0"
tracing = "0.1"

[features]
self_signed_certs = [] # Empty by default for security
Expand Down
101 changes: 91 additions & 10 deletions opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::payload_encoder::otlp_encoder::OtlpEncoder;
use opentelemetry_proto::tonic::logs::v1::ResourceLogs;
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use std::sync::Arc;
use tracing::{debug, info};

/// Public batch type (already LZ4 chunked compressed).
/// Produced by `OtlpEncoder::encode_log_batch` and returned to callers.
Expand Down Expand Up @@ -44,13 +45,25 @@ pub struct GenevaClient {

impl GenevaClient {
pub fn new(cfg: GenevaClientConfig) -> Result<Self, String> {
info!(
name: "client.new",
target: "geneva-uploader",
"Initializing GenevaClient with endpoint={}, namespace={}, account={}",
cfg.endpoint, cfg.namespace, cfg.account
);

// Validate MSI resource presence for managed identity variants
match cfg.auth_method {
AuthMethod::SystemManagedIdentity
| AuthMethod::UserManagedIdentity { .. }
| AuthMethod::UserManagedIdentityByObjectId { .. }
| AuthMethod::UserManagedIdentityByResourceId { .. } => {
if cfg.msi_resource.is_none() {
debug!(
name: "client.new.validate_msi_resource",
target: "geneva-uploader",
"Validation failed: msi_resource must be provided for managed identity auth"
);
return Err(
"msi_resource must be provided for managed identity auth".to_string()
);
Expand All @@ -71,10 +84,15 @@ impl GenevaClient {
auth_method: cfg.auth_method,
msi_resource: cfg.msi_resource,
};
let config_client = Arc::new(
GenevaConfigClient::new(config_client_config)
.map_err(|e| format!("GenevaConfigClient init failed: {e}"))?,
);
let config_client =
Arc::new(GenevaConfigClient::new(config_client_config).map_err(|e| {
debug!(
name: "client.new.config_client_init",
target: "geneva-uploader",
"GenevaConfigClient init failed: {}", e
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug! macro should use structured logging format instead of string formatting. Use error = %e in the macro arguments and remove the {} placeholder from the message.

Copilot uses AI. Check for mistakes.
);
format!("GenevaConfigClient init failed: {e}")
})?);

let source_identity = format!(
"Tenant={}/Role={}/RoleInstance={}",
Expand All @@ -95,8 +113,21 @@ impl GenevaClient {
config_version: config_version.clone(),
};

let uploader = GenevaUploader::from_config_client(config_client, uploader_config)
.map_err(|e| format!("GenevaUploader init failed: {e}"))?;
let uploader =
GenevaUploader::from_config_client(config_client, uploader_config).map_err(|e| {
debug!(
name: "client.new.uploader_init",
target: "geneva-uploader",
"GenevaUploader init failed: {}", e
);
format!("GenevaUploader init failed: {e}")
})?;

info!(
name: "client.new.complete",
target: "geneva-uploader",
"GenevaClient initialized successfully"
);

Ok(Self {
uploader: Arc::new(uploader),
Expand All @@ -110,38 +141,88 @@ impl GenevaClient {
&self,
logs: &[ResourceLogs],
) -> Result<Vec<EncodedBatch>, String> {
debug!(
name: "client.encode_and_compress_logs",
target: "geneva-uploader",
"Encoding and compressing {} resource logs", logs.len()
);

let log_iter = logs
.iter()
.flat_map(|resource_log| resource_log.scope_logs.iter())
.flat_map(|scope_log| scope_log.log_records.iter());

self.encoder
.encode_log_batch(log_iter, &self.metadata)
.map_err(|e| format!("Compression failed: {e}"))
.map_err(|e| {
debug!(
name: "client.encode_and_compress_logs.error",
target: "geneva-uploader",
"Log compression failed: {}", e
);
format!("Compression failed: {e}")
})
}

/// Encode OTLP spans into LZ4 chunked compressed batches.
pub fn encode_and_compress_spans(
&self,
spans: &[ResourceSpans],
) -> Result<Vec<EncodedBatch>, String> {
debug!(
name: "client.encode_and_compress_spans",
target: "geneva-uploader",
"Encoding and compressing {} resource spans", spans.len()
);

let span_iter = spans
.iter()
.flat_map(|resource_span| resource_span.scope_spans.iter())
.flat_map(|scope_span| scope_span.spans.iter());

self.encoder
.encode_span_batch(span_iter, &self.metadata)
.map_err(|e| format!("Compression failed: {e}"))
.map_err(|e| {
debug!(
name: "client.encode_and_compress_spans.error",
target: "geneva-uploader",
"Span compression failed: {}", e
);
format!("Compression failed: {e}")
})
}

/// Upload a single compressed batch.
/// This allows for granular control over uploads, including custom retry logic for individual batches.
pub async fn upload_batch(&self, batch: &EncodedBatch) -> Result<(), String> {
debug!(
name: "client.upload_batch",
target: "geneva-uploader",
event_name = %batch.event_name,
size = batch.data.len(),
"Uploading batch"
);

self.uploader
.upload(batch.data.clone(), &batch.event_name, &batch.metadata)
.await
.map(|_| ())
.map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name))
.map(|_| {
debug!(
name: "client.upload_batch.success",
target: "geneva-uploader",
event_name = %batch.event_name,
"Successfully uploaded batch"
);
})
.map_err(|e| {
debug!(
name: "client.upload_batch.error",
target: "geneva-uploader",
event_name = %batch.event_name,
error = %e,
"Geneva upload failed"
);
format!("Geneva upload failed: {e} Event: {}", batch.event_name)
})
}
}
Loading
Loading