diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt index 33f9216f8b..ba0f5feaf0 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt @@ -1,3 +1,15 @@ +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.ConfluentKafkaInstrumentedConsumerBuilderOptions() -> void +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableMetrics.get -> bool +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableMetrics.set -> void +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableTraces.get -> bool +Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableTraces.set -> void +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.ConfluentKafkaInstrumentedProducerBuilderOptions() -> void +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableMetrics.get -> bool +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableMetrics.set -> void +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableTraces.get -> bool +Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableTraces.set -> void Confluent.Kafka.InstrumentedConsumerBuilder Confluent.Kafka.InstrumentedConsumerBuilder.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable>! config) -> void Confluent.Kafka.InstrumentedProducerBuilder @@ -11,10 +23,12 @@ OpenTelemetry.Trace.TracerProviderBuilderExtensions override Confluent.Kafka.InstrumentedConsumerBuilder.Build() -> Confluent.Kafka.IConsumer! override Confluent.Kafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer! static Confluent.Kafka.OpenTelemetryConsumerBuilderExtensions.AsInstrumentedConsumerBuilder(this Confluent.Kafka.ConsumerBuilder! consumerBuilder) -> Confluent.Kafka.InstrumentedConsumerBuilder! +static Confluent.Kafka.OpenTelemetryConsumerBuilderExtensions.AsInstrumentedConsumerBuilder(this Confluent.Kafka.ConsumerBuilder! consumerBuilder, Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions? options) -> Confluent.Kafka.InstrumentedConsumerBuilder! static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync(this Confluent.Kafka.IConsumer! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler! handler) -> System.Threading.Tasks.ValueTask?> static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync(this Confluent.Kafka.IConsumer! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler! handler, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask?> static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.TryExtractPropagationContext(this Confluent.Kafka.ConsumeResult! consumeResult, out OpenTelemetry.Context.Propagation.PropagationContext propagationContext) -> bool static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder(this Confluent.Kafka.ProducerBuilder! producerBuilder) -> Confluent.Kafka.InstrumentedProducerBuilder! +static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder(this Confluent.Kafka.ProducerBuilder! producerBuilder, Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions? options) -> Confluent.Kafka.InstrumentedProducerBuilder! static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder! diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md index 538cfcfb55..e72b56c8b8 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md @@ -5,6 +5,10 @@ * Updated OpenTelemetry core component version(s) to `1.16.0`. ([#4487](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/4487)) +* Supported manual configuration of traces and metrics for Kafka consumers + and producers created outside a DI container. + ([#4545](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/4545)) + ## 0.1.0-alpha.7 Released 2026-May-29 diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedConsumerBuilderOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedConsumerBuilderOptions.cs new file mode 100644 index 0000000000..cb696f4c0c --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedConsumerBuilderOptions.cs @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace Confluent.Kafka; + +/// +/// Options for configuring telemetry on a +/// when creating an instrumented producer in code. +/// +public sealed class ConfluentKafkaInstrumentedConsumerBuilderOptions +{ + /// + /// Gets or sets a value indicating whether metrics should be enabled for the consumer. + /// + public bool EnableMetrics { get; set; } + + /// + /// Gets or sets a value indicating whether tracing should be enabled for the consumer. + /// + public bool EnableTraces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedProducerBuilderOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedProducerBuilderOptions.cs new file mode 100644 index 0000000000..c68b2d1ba0 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentedProducerBuilderOptions.cs @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace Confluent.Kafka; + +/// +/// Options for configuring telemetry on a +/// when creating an instrumented producer in code. +/// +public sealed class ConfluentKafkaInstrumentedProducerBuilderOptions +{ + /// + /// Gets or sets a value indicating whether metrics should be enabled for the producer. + /// + public bool EnableMetrics { get; set; } + + /// + /// Gets or sets a value indicating whether tracing should be enabled for the producer. + /// + public bool EnableTraces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs index e26de22b62..33e665ebcd 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumerBuilderExtensions.cs @@ -10,6 +10,32 @@ namespace Confluent.Kafka; /// public static class OpenTelemetryConsumerBuilderExtensions { + /// + /// Converts a to an + /// with explicitly configured telemetry options. + /// + /// Type of the key. + /// Type of the value. + /// The instance. + /// The optional to use. + /// An instance. +#if NET + [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Use 'InstrumentedConsumerBuilder' constructor to avoid reflection.")] +#endif + public static InstrumentedConsumerBuilder AsInstrumentedConsumerBuilder( + this ConsumerBuilder consumerBuilder, + ConfluentKafkaInstrumentedConsumerBuilderOptions? options) + { + var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(); + if (options != null) + { + instrumentedConsumerBuilder.EnableMetrics = options.EnableMetrics; + instrumentedConsumerBuilder.EnableTraces = options.EnableTraces; + } + + return instrumentedConsumerBuilder; + } + /// /// Converts a to an . /// diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs index f2610c63ed..92d051e18a 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryProducerBuilderExtensions.cs @@ -10,6 +10,32 @@ namespace Confluent.Kafka; /// public static class OpenTelemetryProducerBuilderExtensions { + /// + /// Converts to + /// with explicitly configured telemetry options. + /// + /// Type of the key. + /// Type of the value. + /// The instance. + /// The to use. + /// An instance. +#if NET + [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Use 'InstrumentedProducerBuilder' constructor to avoid reflection.")] +#endif + public static InstrumentedProducerBuilder AsInstrumentedProducerBuilder( + this ProducerBuilder producerBuilder, + ConfluentKafkaInstrumentedProducerBuilderOptions? options) + { + var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder(); + if (options != null) + { + instrumentedProducerBuilder.EnableMetrics = options.EnableMetrics; + instrumentedProducerBuilder.EnableTraces = options.EnableTraces; + } + + return instrumentedProducerBuilder; + } + /// /// Converts to . /// diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md index a21100e67e..e3b4d7fe63 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md @@ -89,9 +89,22 @@ configured exporters. To extend an already built `ConsumerBuilder` or `ProducerBuilder` -instance with OpenTelemetry instrumentation, you can use the `AsInstrumentedConsumerBuilder` +instance with OpenTelemetry instrumentation, you can use the +`AsInstrumentedConsumerBuilder` and `AsInstrumentedProducerBuilder` extension methods. +> [!IMPORTANT] +> When you create dynamic producers or consumers outside a DI container, +> OpenTelemetry instrumentation (metrics and traces) is disabled by default. +> You must explicitly pass configuration options to enable it. +> If you do not use the standard DI registration methods (such as +`.AddKafkaProducerInstrumentation()` +> or `.AddKafkaConsumerInstrumentation()`), you must also manually call +`.AddSource("OpenTelemetry.Instrumentation.ConfluentKafka")` on your +> TracerProviderBuilder +> and `.AddMeter("OpenTelemetry.Instrumentation.ConfluentKafka")` on your +> MeterProviderBuilder so that the providers can listen to the emitted signals. + ### Example for `ConsumerBuilder` ```csharp @@ -112,8 +125,15 @@ consumerBuilder.SetErrorHandler((consumer, error) => Console.WriteLine($"Error: consumerBuilder.SetLogHandler((consumer, logMessage) => Console.WriteLine($"Log: {logMessage.Message}")); consumerBuilder.SetStatisticsHandler((consumer, statistics) => Console.WriteLine($"Statistics: {statistics}")); -// Convert to InstrumentedConsumerBuilder -var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(); +// Explicitly enable OpenTelemetry features for standalone usage +var telemetryOptions = new ConfluentKafkaInstrumentedConsumerBuilderOptions +{ + EnableTraces = true, + EnableMetrics = true, +}; + +// Convert to InstrumentedConsumerBuilder with options +var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(telemetryOptions); // Build the consumer var consumer = instrumentedConsumerBuilder.Build(); @@ -137,8 +157,15 @@ producerBuilder.SetErrorHandler((producer, error) => Console.WriteLine($"Error: producerBuilder.SetLogHandler((producer, logMessage) => Console.WriteLine($"Log: {logMessage.Message}")); producerBuilder.SetStatisticsHandler((producer, statistics) => Console.WriteLine($"Statistics: {statistics}")); -// Convert to InstrumentedProducerBuilder -var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder(); +// Explicitly enable OpenTelemetry features for standalone usage +var telemetryOptions = new ConfluentKafkaInstrumentedProducerBuilderOptions +{ + EnableTraces = true, + EnableMetrics = true, +}; + +// Convert to InstrumentedProducerBuilder with options +var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder(telemetryOptions); // Build the producer var producer = instrumentedProducerBuilder.Build(); diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs index 27b4301bac..32627bcdba 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryConsumerBuilderExtensionsTests.cs @@ -46,7 +46,8 @@ public void ShouldConvertConsumerBuilderToInstrumentedConsumerBuilder() Assert.Equal(PartitionsLostHandler, instrumentedConsumerBuilder.GetInternalPartitionsLostHandler()); Assert.Equal(keyDeserializer, instrumentedConsumerBuilder.GetInternalKeyDeserializer()); Assert.Equal(valueDeserializer, instrumentedConsumerBuilder.GetInternalValueDeserializer()); - return; + Assert.False(instrumentedConsumerBuilder.EnableMetrics); + Assert.False(instrumentedConsumerBuilder.EnableTraces); void ErrorHandler(IConsumer consumer, Error error) { @@ -123,7 +124,6 @@ public void ShouldConvertUserDefinedConsumerBuilderToInstrumentedConsumerBuilder Assert.Equal(PartitionsLostHandler, instrumentedConsumerBuilder.GetInternalPartitionsLostHandler()); Assert.Equal(keyDeserializer, instrumentedConsumerBuilder.GetInternalKeyDeserializer()); Assert.Equal(valueDeserializer, instrumentedConsumerBuilder.GetInternalValueDeserializer()); - return; void ErrorHandler(IConsumer consumer, Error error) { @@ -161,6 +161,54 @@ IEnumerable PartitionsLostHandler(IConsumer> + { + new("bootstrap.servers", "localhost:9092"), + }; + + var consumerBuilder = new ConsumerBuilder(config); + + var options = new ConfluentKafkaInstrumentedConsumerBuilderOptions + { + EnableMetrics = enableMetrics, + EnableTraces = enableTraces, + }; + + // Act + var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(options); + + // Assert + Assert.Equal(enableMetrics, instrumentedConsumerBuilder.EnableMetrics); + Assert.Equal(enableTraces, instrumentedConsumerBuilder.EnableTraces); + } + + [Fact] + public void ShouldNotThrowNullReferenceExceptionsIfOptionsIsNull() + { + // Arrange + var config = new List> + { + new("bootstrap.servers", "localhost:9092"), + }; + + var consumerBuilder = new ConsumerBuilder(config); + + // Act + var instrumentedBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(null); + + // Assert + Assert.False(instrumentedBuilder.EnableMetrics); + Assert.False(instrumentedBuilder.EnableTraces); + } + private class CustomConsumerBuilder(IEnumerable> config) : ConsumerBuilder(config); } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs index f64ad88077..f7a343e1ac 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetryProducerBuilderExtensionsTests.cs @@ -38,7 +38,8 @@ public void ShouldConvertToInstrumentedProducerBuilder() Assert.Equal(OAuthBearerTokenRefreshHandler, instrumentedProducerBuilder.GetInternalOAuthBearerTokenRefreshHandler()); Assert.Equal(keySerializer, instrumentedProducerBuilder.GetInternalKeySerializer()); Assert.Equal(valueSerializer, instrumentedProducerBuilder.GetInternalValueSerializer()); - return; + Assert.False(instrumentedProducerBuilder.EnableMetrics); + Assert.False(instrumentedProducerBuilder.EnableTraces); void ErrorHandler(IProducer producer, Error error) { @@ -88,7 +89,6 @@ public void ShouldConvertUserDefinedProducerBuilderToInstrumentedProducerBuilder Assert.Equal(OAuthBearerTokenRefreshHandler, instrumentedProducerBuilder.GetInternalOAuthBearerTokenRefreshHandler()); Assert.Equal(keySerializer, instrumentedProducerBuilder.GetInternalKeySerializer()); Assert.Equal(valueSerializer, instrumentedProducerBuilder.GetInternalValueSerializer()); - return; void ErrorHandler(IProducer producer, Error error) { @@ -107,6 +107,46 @@ void OAuthBearerTokenRefreshHandler(IProducer producer, string o } } + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void ShouldSetTracingPropertiesIfOptionsIsDefined(bool enableMetrics, bool enableTraces) + { + // Arrange + var config = new ProducerConfig(); + var builder = new ProducerBuilder(config); + + var options = new ConfluentKafkaInstrumentedProducerBuilderOptions + { + EnableMetrics = enableMetrics, + EnableTraces = enableTraces, + }; + + // Act + var instrumentedBuilder = builder.AsInstrumentedProducerBuilder(options); + + // Assert + Assert.Equal(enableMetrics, instrumentedBuilder.EnableMetrics); + Assert.Equal(enableTraces, instrumentedBuilder.EnableTraces); + } + + [Fact] + public void ShouldNotThrowNullReferenceExceptionsIfOptionsIsNull() + { + // Arrange + var config = new ProducerConfig(); + var builder = new ProducerBuilder(config); + + // Act + var instrumentedBuilder = builder.AsInstrumentedProducerBuilder(null); + + // Assert + Assert.False(instrumentedBuilder.EnableMetrics); + Assert.False(instrumentedBuilder.EnableTraces); + } + private class CustomProducerBuilder(IEnumerable> config) : ProducerBuilder(config); }