Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract sender parameters to config carrier class #7151

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,18 @@ public GrpcExporter<T> build() {
GrpcSenderProvider grpcSenderProvider = resolveGrpcSenderProvider();
GrpcSender<T> grpcSender =
grpcSenderProvider.createSender(
endpoint,
grpcEndpointPath,
compressor,
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
grpcChannel,
grpcStubFactory,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager());
GrpcSenderConfig.create(
endpoint,
grpcEndpointPath,
compressor,
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
grpcChannel,
grpcStubFactory,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager()));
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.grpc;

import com.google.auto.value.AutoValue;
import io.grpc.Channel;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@AutoValue
@Immutable
public abstract class GrpcSenderConfig<T extends Marshaler> {

@SuppressWarnings("TooManyParameters")
public static <T extends Marshaler> GrpcSenderConfig<T> create(
URI endpoint,
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
return new AutoValue_GrpcSenderConfig<>(
endpoint,
endpointPath,
compressor,
timeoutNanos,
connectTimeoutNanos,
headersSupplier,
managedChannel,
stubFactory,
retryPolicy,
sslContext,
trustManager);
}

public abstract URI getEndpoint();

public abstract String getEndpointPath();

@Nullable
public abstract Compressor getCompressor();

public abstract long getTimeoutNanos();

public abstract long getConnectTimeoutNanos();

public abstract Supplier<Map<String, List<String>>> getHeadersSupplier();

@Nullable
public abstract Object getManagedChannel();

public abstract Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>>
getStubFactory();

@Nullable
public abstract RetryPolicy getRetryPolicy();

@Nullable
public abstract SSLContext getSslContext();

@Nullable
public abstract X509TrustManager getTrustManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,7 @@

package io.opentelemetry.exporter.internal.grpc;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;

/**
* A service provider interface (SPI) for providing {@link GrpcSender}s backed by different client
Expand All @@ -27,18 +16,6 @@
*/
public interface GrpcSenderProvider {

/** Returns a {@link GrpcSender} configured with the provided parameters. */
@SuppressWarnings("TooManyParameters")
<T extends Marshaler> GrpcSender<T> createSender(
URI endpoint,
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager);
/** Returns a {@link GrpcSender} configured with the provided config. */
<T extends Marshaler> GrpcSender<T> createSender(GrpcSenderConfig<T> grpcSenderConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,18 @@ public HttpExporter<T> build() {
HttpSenderProvider httpSenderProvider = resolveHttpSenderProvider();
HttpSender httpSender =
httpSenderProvider.createSender(
endpoint,
compressor,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
proxyOptions,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager());
HttpSenderConfig.create(
endpoint,
compressor,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
proxyOptions,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager()));
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());

return new HttpExporter<>(exporterName, type, httpSender, meterProviderSupplier, exportAsJson);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.http;

import com.google.auto.value.AutoValue;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@AutoValue
@Immutable
public abstract class HttpSenderConfig {

@SuppressWarnings("TooManyParameters")
public static HttpSenderConfig create(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headerSupplier,
@Nullable ProxyOptions proxyOptions,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
return new AutoValue_HttpSenderConfig(
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
proxyOptions,
retryPolicy,
sslContext,
trustManager);
}

public abstract String getEndpoint();

@Nullable
public abstract Compressor getCompressor();

public abstract boolean getExportAsJson();

public abstract String getContentType();

public abstract long getTimeoutNanos();

public abstract long getConnectTimeoutNanos();

public abstract Supplier<Map<String, List<String>>> getHeadersSupplier();

@Nullable
public abstract ProxyOptions getProxyOptions();

@Nullable
public abstract RetryPolicy getRetryPolicy();

@Nullable
public abstract SSLContext getSslContext();

@Nullable
public abstract X509TrustManager getTrustManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@

package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;

/**
* A service provider interface (SPI) for providing {@link HttpSender}s backed by different HTTP
* client libraries.
Expand All @@ -24,18 +14,6 @@
*/
public interface HttpSenderProvider {

/** Returns a {@link HttpSender} configured with the provided parameters. */
@SuppressWarnings("TooManyParameters")
HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Supplier<Map<String, List<String>>> headerSupplier,
@Nullable ProxyOptions proxyOptions,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager);
/** Returns a {@link HttpSender} configured with the provided config. */
HttpSender createSender(HttpSenderConfig httpSenderConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderConfig;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;

/**
* {@link GrpcSender} SPI implementation for {@link UpstreamGrpcSender}.
Expand All @@ -36,27 +31,17 @@
public class UpstreamGrpcSenderProvider implements GrpcSenderProvider {

@Override
public <T extends Marshaler> GrpcSender<T> createSender(
URI endpoint,
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
public <T extends Marshaler> GrpcSender<T> createSender(GrpcSenderConfig<T> grpcSenderConfig) {
boolean shutdownChannel = false;
Object managedChannel = grpcSenderConfig.getManagedChannel();
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
shutdownChannel = true;
managedChannel = minimalFallbackManagedChannel(endpoint);
managedChannel = minimalFallbackManagedChannel(grpcSenderConfig.getEndpoint());
}

String authorityOverride = null;
Map<String, List<String>> headers = headersSupplier.get();
Map<String, List<String>> headers = grpcSenderConfig.getHeadersSupplier().get();
if (headers != null) {
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
if (entry.getKey().equals("host") && !entry.getValue().isEmpty()) {
Expand All @@ -66,6 +51,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
}

String compression = Codec.Identity.NONE.getMessageEncoding();
Compressor compressor = grpcSenderConfig.getCompressor();
if (compressor != null) {
CompressorRegistry.getDefaultInstance()
.register(
Expand All @@ -84,12 +70,17 @@ public OutputStream compress(OutputStream os) throws IOException {
}

MarshalerServiceStub<T, ?, ?> stub =
stubFactory
grpcSenderConfig
.getStubFactory()
.get()
.apply((Channel) managedChannel, authorityOverride)
.withCompression(compression);

return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
return new UpstreamGrpcSender<>(
stub,
shutdownChannel,
grpcSenderConfig.getTimeoutNanos(),
grpcSenderConfig.getHeadersSupplier());
}

/**
Expand Down
Loading
Loading