Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.ConfluentKafkaInstrumentedConsumerBuilderOptions() -> void
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableMetrics.get -> bool
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableMetrics.set -> void
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableTraces.get -> bool
Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions.EnableTraces.set -> void
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.ConfluentKafkaInstrumentedProducerBuilderOptions() -> void
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableMetrics.get -> bool
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableMetrics.set -> void
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableTraces.get -> bool
Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions.EnableTraces.set -> void
Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>
Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string!, string!>>! config) -> void
Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>
Expand All @@ -11,10 +23,12 @@ OpenTelemetry.Trace.TracerProviderBuilderExtensions
override Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IConsumer<TKey, TValue>!
override Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IProducer<TKey, TValue>!
static Confluent.Kafka.OpenTelemetryConsumerBuilderExtensions.AsInstrumentedConsumerBuilder<TKey, TValue>(this Confluent.Kafka.ConsumerBuilder<TKey, TValue>! consumerBuilder) -> Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>!
static Confluent.Kafka.OpenTelemetryConsumerBuilderExtensions.AsInstrumentedConsumerBuilder<TKey, TValue>(this Confluent.Kafka.ConsumerBuilder<TKey, TValue>! consumerBuilder, Confluent.Kafka.ConfluentKafkaInstrumentedConsumerBuilderOptions? options) -> Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>!
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync<TKey, TValue>(this Confluent.Kafka.IConsumer<TKey, TValue>! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>! handler) -> System.Threading.Tasks.ValueTask<Confluent.Kafka.ConsumeResult<TKey, TValue>?>
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync<TKey, TValue>(this Confluent.Kafka.IConsumer<TKey, TValue>! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>! handler, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<Confluent.Kafka.ConsumeResult<TKey, TValue>?>
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.TryExtractPropagationContext<TKey, TValue>(this Confluent.Kafka.ConsumeResult<TKey, TValue>! consumeResult, out OpenTelemetry.Context.Propagation.PropagationContext propagationContext) -> bool
static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder<TKey, TValue>(this Confluent.Kafka.ProducerBuilder<TKey, TValue>! producerBuilder) -> Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>!
static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder<TKey, TValue>(this Confluent.Kafka.ProducerBuilder<TKey, TValue>! producerBuilder, Confluent.Kafka.ConfluentKafkaInstrumentedProducerBuilderOptions? options) -> Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder!
Expand Down
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
* Updated OpenTelemetry core component version(s) to `1.16.0`.
([#4487](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/4487))

* Supported manual configuration of traces and metrics for Kafka consumers
and producers created outside a DI container.
([#4545](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/4545))

## 0.1.0-alpha.7

Released 2026-May-29
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace Confluent.Kafka;

/// <summary>
/// Options for configuring telemetry on a <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/>
/// when creating an instrumented producer in code.
/// </summary>
public sealed class ConfluentKafkaInstrumentedConsumerBuilderOptions
{
/// <summary>
/// Gets or sets a value indicating whether metrics should be enabled for the consumer.
/// </summary>
public bool EnableMetrics { get; set; }

/// <summary>
/// Gets or sets a value indicating whether tracing should be enabled for the consumer.
/// </summary>
public bool EnableTraces { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace Confluent.Kafka;

/// <summary>
/// Options for configuring telemetry on a <see cref="InstrumentedProducerBuilder{TKey, TValue}"/>
/// when creating an instrumented producer in code.
/// </summary>
public sealed class ConfluentKafkaInstrumentedProducerBuilderOptions
{
/// <summary>
/// Gets or sets a value indicating whether metrics should be enabled for the producer.
/// </summary>
public bool EnableMetrics { get; set; }

/// <summary>
/// Gets or sets a value indicating whether tracing should be enabled for the producer.
/// </summary>
public bool EnableTraces { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ namespace Confluent.Kafka;
/// </summary>
public static class OpenTelemetryConsumerBuilderExtensions
{
/// <summary>
/// Converts a <see cref="ConsumerBuilder{TKey, TValue}"/> to an <see cref="InstrumentedConsumerBuilder{TKey,TValue}"/>
/// with explicitly configured telemetry options.
/// </summary>
/// <typeparam name="TKey">Type of the key.</typeparam>
/// <typeparam name="TValue">Type of the value.</typeparam>
/// <param name="consumerBuilder">The <see cref="ConsumerBuilder{TKey, TValue}"/> instance.</param>
/// <param name="options">The optional <see cref="ConfluentKafkaInstrumentedConsumerBuilderOptions"/> to use.</param>
/// <returns>An <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/> instance.</returns>
#if NET
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Use 'InstrumentedConsumerBuilder<TKey, TValue>' constructor to avoid reflection.")]
#endif
public static InstrumentedConsumerBuilder<TKey, TValue> AsInstrumentedConsumerBuilder<TKey, TValue>(
this ConsumerBuilder<TKey, TValue> consumerBuilder,
ConfluentKafkaInstrumentedConsumerBuilderOptions? options)
{
var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder();
if (options != null)
{
instrumentedConsumerBuilder.EnableMetrics = options.EnableMetrics;
instrumentedConsumerBuilder.EnableTraces = options.EnableTraces;
}

return instrumentedConsumerBuilder;
}

/// <summary>
/// Converts a <see cref="ConsumerBuilder{TKey, TValue}"/> to an <see cref="InstrumentedConsumerBuilder{TKey,TValue}"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ namespace Confluent.Kafka;
/// </summary>
public static class OpenTelemetryProducerBuilderExtensions
{
/// <summary>
/// Converts <see cref="ProducerBuilder{TKey,TValue}"/> to <see cref="InstrumentedProducerBuilder{TKey,TValue}"/>
/// with explicitly configured telemetry options.
/// </summary>
/// <typeparam name="TKey">Type of the key.</typeparam>
/// <typeparam name="TValue">Type of the value.</typeparam>
/// <param name="producerBuilder">The <see cref="ProducerBuilder{TKey, TValue}"/> instance.</param>
/// <param name="options">The <see cref="ConfluentKafkaInstrumentedProducerBuilderOptions"/> to use.</param>
/// <returns>An <see cref="InstrumentedProducerBuilder{TKey, TValue}"/> instance.</returns>
#if NET
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Use 'InstrumentedProducerBuilder<TKey, TValue>' constructor to avoid reflection.")]
#endif
public static InstrumentedProducerBuilder<TKey, TValue> AsInstrumentedProducerBuilder<TKey, TValue>(
this ProducerBuilder<TKey, TValue> producerBuilder,
ConfluentKafkaInstrumentedProducerBuilderOptions? options)
{
var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder();
if (options != null)
{
instrumentedProducerBuilder.EnableMetrics = options.EnableMetrics;
instrumentedProducerBuilder.EnableTraces = options.EnableTraces;
}

return instrumentedProducerBuilder;
}

/// <summary>
/// Converts <see cref="ProducerBuilder{TKey,TValue}"/> to <see cref="InstrumentedProducerBuilder{TKey,TValue}"/>.
/// </summary>
Expand Down
37 changes: 32 additions & 5 deletions src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,22 @@ configured exporters.

To extend an already built `ConsumerBuilder<TKey, TValue>`
or `ProducerBuilder<TKey, TValue>`
instance with OpenTelemetry instrumentation, you can use the `AsInstrumentedConsumerBuilder`
instance with OpenTelemetry instrumentation, you can use the
`AsInstrumentedConsumerBuilder`
and `AsInstrumentedProducerBuilder` extension methods.

> [!IMPORTANT]
> When you create dynamic producers or consumers outside a DI container,
> OpenTelemetry instrumentation (metrics and traces) is disabled by default.
> You must explicitly pass configuration options to enable it.
> If you do not use the standard DI registration methods (such as
`.AddKafkaProducerInstrumentation()`
> or `.AddKafkaConsumerInstrumentation()`), you must also manually call
`.AddSource("OpenTelemetry.Instrumentation.ConfluentKafka")` on your
> TracerProviderBuilder
> and `.AddMeter("OpenTelemetry.Instrumentation.ConfluentKafka")` on your
> MeterProviderBuilder so that the providers can listen to the emitted signals.

### Example for `ConsumerBuilder<TKey, TValue>`

```csharp
Expand All @@ -112,8 +125,15 @@ consumerBuilder.SetErrorHandler((consumer, error) => Console.WriteLine($"Error:
consumerBuilder.SetLogHandler((consumer, logMessage) => Console.WriteLine($"Log: {logMessage.Message}"));
consumerBuilder.SetStatisticsHandler((consumer, statistics) => Console.WriteLine($"Statistics: {statistics}"));

// Convert to InstrumentedConsumerBuilder
var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder();
// Explicitly enable OpenTelemetry features for standalone usage
var telemetryOptions = new ConfluentKafkaInstrumentedConsumerBuilderOptions
{
EnableTraces = true,
EnableMetrics = true,
};

// Convert to InstrumentedConsumerBuilder with options
var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(telemetryOptions);

// Build the consumer
var consumer = instrumentedConsumerBuilder.Build();
Expand All @@ -137,8 +157,15 @@ producerBuilder.SetErrorHandler((producer, error) => Console.WriteLine($"Error:
producerBuilder.SetLogHandler((producer, logMessage) => Console.WriteLine($"Log: {logMessage.Message}"));
producerBuilder.SetStatisticsHandler((producer, statistics) => Console.WriteLine($"Statistics: {statistics}"));

// Convert to InstrumentedProducerBuilder
var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder();
// Explicitly enable OpenTelemetry features for standalone usage
var telemetryOptions = new ConfluentKafkaInstrumentedProducerBuilderOptions
{
EnableTraces = true,
EnableMetrics = true,
};

// Convert to InstrumentedProducerBuilder with options
var instrumentedProducerBuilder = producerBuilder.AsInstrumentedProducerBuilder(telemetryOptions);

// Build the producer
var producer = instrumentedProducerBuilder.Build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void ShouldConvertConsumerBuilderToInstrumentedConsumerBuilder()
Assert.Equal(PartitionsLostHandler, instrumentedConsumerBuilder.GetInternalPartitionsLostHandler());
Assert.Equal(keyDeserializer, instrumentedConsumerBuilder.GetInternalKeyDeserializer());
Assert.Equal(valueDeserializer, instrumentedConsumerBuilder.GetInternalValueDeserializer());
return;
Assert.False(instrumentedConsumerBuilder.EnableMetrics);
Assert.False(instrumentedConsumerBuilder.EnableTraces);

void ErrorHandler(IConsumer<string, string> consumer, Error error)
{
Expand Down Expand Up @@ -123,7 +124,6 @@ public void ShouldConvertUserDefinedConsumerBuilderToInstrumentedConsumerBuilder
Assert.Equal(PartitionsLostHandler, instrumentedConsumerBuilder.GetInternalPartitionsLostHandler());
Assert.Equal(keyDeserializer, instrumentedConsumerBuilder.GetInternalKeyDeserializer());
Assert.Equal(valueDeserializer, instrumentedConsumerBuilder.GetInternalValueDeserializer());
return;

void ErrorHandler(IConsumer<string, string> consumer, Error error)
{
Expand Down Expand Up @@ -161,6 +161,54 @@ IEnumerable<TopicPartitionOffset> PartitionsLostHandler(IConsumer<string, string
}
}

[Theory]
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public void ShouldSetTracingPropertiesIfOptionsIsDefined(bool enableMetrics, bool enableTraces)
{
// Arrange
var config = new List<KeyValuePair<string, string>>
{
new("bootstrap.servers", "localhost:9092"),
};

var consumerBuilder = new ConsumerBuilder<string, string>(config);

var options = new ConfluentKafkaInstrumentedConsumerBuilderOptions
{
EnableMetrics = enableMetrics,
EnableTraces = enableTraces,
};

// Act
var instrumentedConsumerBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(options);

// Assert
Assert.Equal(enableMetrics, instrumentedConsumerBuilder.EnableMetrics);
Assert.Equal(enableTraces, instrumentedConsumerBuilder.EnableTraces);
}

[Fact]
public void ShouldNotThrowNullReferenceExceptionsIfOptionsIsNull()
{
// Arrange
var config = new List<KeyValuePair<string, string>>
{
new("bootstrap.servers", "localhost:9092"),
};

var consumerBuilder = new ConsumerBuilder<string, string>(config);

// Act
var instrumentedBuilder = consumerBuilder.AsInstrumentedConsumerBuilder(null);

// Assert
Assert.False(instrumentedBuilder.EnableMetrics);
Assert.False(instrumentedBuilder.EnableTraces);
}

private class CustomConsumerBuilder<TKey, TValue>(IEnumerable<KeyValuePair<string, string>> config)
: ConsumerBuilder<TKey, TValue>(config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public void ShouldConvertToInstrumentedProducerBuilder()
Assert.Equal(OAuthBearerTokenRefreshHandler, instrumentedProducerBuilder.GetInternalOAuthBearerTokenRefreshHandler());
Assert.Equal(keySerializer, instrumentedProducerBuilder.GetInternalKeySerializer());
Assert.Equal(valueSerializer, instrumentedProducerBuilder.GetInternalValueSerializer());
return;
Assert.False(instrumentedProducerBuilder.EnableMetrics);
Assert.False(instrumentedProducerBuilder.EnableTraces);

void ErrorHandler(IProducer<string, string> producer, Error error)
{
Expand Down Expand Up @@ -88,7 +89,6 @@ public void ShouldConvertUserDefinedProducerBuilderToInstrumentedProducerBuilder
Assert.Equal(OAuthBearerTokenRefreshHandler, instrumentedProducerBuilder.GetInternalOAuthBearerTokenRefreshHandler());
Assert.Equal(keySerializer, instrumentedProducerBuilder.GetInternalKeySerializer());
Assert.Equal(valueSerializer, instrumentedProducerBuilder.GetInternalValueSerializer());
return;

void ErrorHandler(IProducer<string, string> producer, Error error)
{
Expand All @@ -107,6 +107,46 @@ void OAuthBearerTokenRefreshHandler(IProducer<string, string> producer, string o
}
}

[Theory]
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public void ShouldSetTracingPropertiesIfOptionsIsDefined(bool enableMetrics, bool enableTraces)
{
// Arrange
var config = new ProducerConfig();
var builder = new ProducerBuilder<string, string>(config);

var options = new ConfluentKafkaInstrumentedProducerBuilderOptions
{
EnableMetrics = enableMetrics,
EnableTraces = enableTraces,
};

// Act
var instrumentedBuilder = builder.AsInstrumentedProducerBuilder(options);

// Assert
Assert.Equal(enableMetrics, instrumentedBuilder.EnableMetrics);
Assert.Equal(enableTraces, instrumentedBuilder.EnableTraces);
}

[Fact]
public void ShouldNotThrowNullReferenceExceptionsIfOptionsIsNull()
{
// Arrange
var config = new ProducerConfig();
var builder = new ProducerBuilder<string, string>(config);

// Act
var instrumentedBuilder = builder.AsInstrumentedProducerBuilder(null);

// Assert
Assert.False(instrumentedBuilder.EnableMetrics);
Assert.False(instrumentedBuilder.EnableTraces);
}

private class CustomProducerBuilder<TKey, TValue>(IEnumerable<KeyValuePair<string, string>> config)
: ProducerBuilder<TKey, TValue>(config);
}