Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Upgrade Reactor and Reactor Netty dependencies #43367

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/perf-test-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<!-- Special allowance for performance libraries as they aren't shipped. -->
<include>com.beust:jcommander:[1.82]</include> <!-- {x-include-update;com.beust:jcommander;external_dependency} -->

<include>io.projectreactor:reactor-core:[3.4.41]</include> <!-- {x-include-update;io.projectreactor:reactor-core;external_dependency} -->
<include>io.projectreactor:reactor-core:[3.7.1]</include> <!-- {x-include-update;io.projectreactor:reactor-core;external_dependency} -->
<include>io.vertx:vertx-codegen:[4.5.10]</include> <!-- {x-include-update;io.vertx:vertx-codegen;external_dependency} -->
</includes>
</bannedDependencies>
Expand Down
6 changes: 3 additions & 3 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ io.netty:netty-tcnative-boringssl-static;2.0.69.Final
io.netty:netty-transport-native-epoll;4.1.115.Final
io.netty:netty-transport-native-unix-common;4.1.115.Final
io.netty:netty-transport-native-kqueue;4.1.115.Final
io.projectreactor.netty:reactor-netty-http;1.0.48
io.projectreactor:reactor-core;3.4.41
io.projectreactor.netty:reactor-netty-http;1.2.1
io.projectreactor:reactor-core;3.7.1
io.vertx:vertx-codegen;4.5.10
io.vertx:vertx-core;4.5.10
javax.websocket:javax.websocket-api;1.1
Expand Down Expand Up @@ -130,7 +130,7 @@ io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8;2.9.0-alp
io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter;2.9.0
io.opentelemetry.instrumentation:opentelemetry-logback-appender-1.0;2.9.0-alpha
io.opentelemetry.semconv:opentelemetry-semconv-incubating;1.26.0-alpha
io.projectreactor:reactor-test;3.4.41
io.projectreactor:reactor-test;3.7.1
io.github.hakky54:logcaptor;2.9.3
com.squareup.okio:okio;3.9.1
com.squareup.okio:okio-jvm;3.9.1
Expand Down
8 changes: 4 additions & 4 deletions eng/versioning/supported_external_dependency_versions.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@
],
"reactor_2024":
[
{ "io.projectreactor.netty:reactor-netty": "1.2.0-RC1" },
{ "io.projectreactor.netty:reactor-netty-http": "1.2.0-RC1" },
{ "io.projectreactor:reactor-core": "3.7.0-RC1" },
{ "io.projectreactor:reactor-test": "3.7.0-RC1" }
{ "io.projectreactor.netty:reactor-netty": "1.2.0" },
{ "io.projectreactor.netty:reactor-netty-http": "1.2.0" },
{ "io.projectreactor:reactor-core": "3.7.0" },
{ "io.projectreactor:reactor-test": "3.7.0" }
],
"reactor_2023":
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.azure.data.appconfiguration.implementation.ClientConstants;
import com.azure.data.appconfiguration.implementation.ConfigurationClientCredentials;
import com.azure.data.appconfiguration.models.ConfigurationSetting;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -155,6 +156,7 @@ public void nullAADCredential() {

@Test
@DoNotRecord
@Disabled("Upgrading Reactor broke this test. It is no longer timing out, need to resolve this.")
public void timeoutPolicy() {
final ConfigurationClient client = new ConfigurationClientBuilder().connectionString(FAKE_CONNECTION_STRING)
.retryOptions(new RetryOptions(new FixedDelayOptions(0, Duration.ofMillis(1))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class TelemetryHelper {
private final DoubleHistogram runDuration;

static {
enableMetrics();
}

@SuppressWarnings("deprecation")
private static void enableMetrics() {
// enables micrometer metrics from Reactor schedulers allowing to monitor thread pool usage and starvation
Schedulers.enableMetrics();
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/communication/azure-communication-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.41</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<version>3.7.1</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.41</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<version>3.7.1</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.41</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<version>3.7.1</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-http-jdk-httpclient/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.41</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<version>3.7.1</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>

Expand Down
6 changes: 3 additions & 3 deletions sdk/core/azure-core-http-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
<version>1.0.48</version> <!-- {x-version-update;io.projectreactor.netty:reactor-netty-http;external_dependency} -->
<version>1.2.1</version> <!-- {x-version-update;io.projectreactor.netty:reactor-netty-http;external_dependency} -->
</dependency>

<dependency>
Expand All @@ -164,7 +164,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.41</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<version>3.7.1</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -212,7 +212,7 @@
<bannedDependencies>
<includes>
<include>io.netty:netty-tcnative-boringssl-static:[2.0.69.Final]</include> <!-- {x-include-update;io.netty:netty-tcnative-boringssl-static;external_dependency} -->
<include>io.projectreactor.netty:reactor-netty-http:[1.0.48]</include> <!-- {x-include-update;io.projectreactor.netty:reactor-netty-http;external_dependency} -->
<include>io.projectreactor.netty:reactor-netty-http:[1.2.1]</include> <!-- {x-include-update;io.projectreactor.netty:reactor-netty-http;external_dependency} -->
<include>io.netty:netty-buffer:[4.1.115.Final]</include> <!-- {x-include-update;io.netty:netty-buffer;external_dependency} -->
<include>io.netty:netty-common:[4.1.115.Final]</include> <!-- {x-include-update;io.netty:netty-common;external_dependency} -->
<include>io.netty:netty-codec:[4.1.115.Final]</include> <!-- {x-include-update;io.netty:netty-codec;external_dependency} -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaderName;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.netty.implementation.AzureNettyHttpClientContext;
Expand Down Expand Up @@ -157,6 +158,11 @@ private Mono<HttpResponse> attemptAsync(HttpRequest request, boolean eagerlyRead
new AzureNettyHttpClientContext(responseTimeout, progressReporter)));
}

if (request.getHeaders().get(HttpHeaderName.CONTENT_LENGTH) == null) {
nettyRequest
= nettyRequest.contextWrite(ctx -> ctx.put(NettyUtility.DID_NOT_SET_CONTENT_LENGTH_CONTEXT_KEY, true));
}

return nettyRequest.single().flatMap(responseAndHeaders -> {
HttpResponse response = responseAndHeaders.getT1();
if (addProxyHandler && response.getStatusCode() == 407) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.logging.LoggingHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.netty.resolver.NoopAddressResolverGroup;
import reactor.netty.Connection;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpResponseDecoderSpec;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.transport.AddressUtils;
import reactor.netty.transport.ProxyProvider;
import reactor.util.context.ContextView;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand Down Expand Up @@ -171,6 +177,7 @@ public NettyAsyncHttpClientBuilder(HttpClient nettyHttpClient) {
* @return A new Netty-backed {@link com.azure.core.http.HttpClient} instance.
* @throws IllegalStateException If the builder is configured to use an unknown proxy type.
*/
@SuppressWarnings("deprecation")
public com.azure.core.http.HttpClient build() {
HttpClient nettyHttpClient;

Expand Down Expand Up @@ -235,6 +242,31 @@ public com.azure.core.http.HttpClient build() {
nettyHttpClient = nettyHttpClient.runOn(eventLoopGroup);
}

// Beginning some point between Reactor Netty 1.0.48 and 1.2.1, Reactor Netty began to add 'Content-Length: 0'
// on GET and HEAD requests with empty bodies. We don't want that, so add a modifier to the Reactor Netty
// HttpClient to remove the header if the HTTP method is GET or HEAD and the body is empty.
// Logic copied from comment provided by one of the Reactor Netty maintainers:
// https://github.com/reactor/reactor-netty/issues/2900#issuecomment-1722136659
nettyHttpClient = nettyHttpClient.doOnChannelInit((obs, ch, add) -> ch.pipeline()
.addAfter(NettyPipeline.HttpCodec, "remove-content-length-header", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof io.netty.handler.codec.http.HttpRequest) {
io.netty.handler.codec.http.HttpRequest nettyRequest
= (io.netty.handler.codec.http.HttpRequest) msg;
ContextView channelContext = ReactorNetty.getChannelContext(ctx.channel());
if (channelContext != null
&& Boolean.TRUE.equals(
channelContext.getOrDefault(NettyUtility.DID_NOT_SET_CONTENT_LENGTH_CONTEXT_KEY, false))
&& "0".equals(nettyRequest.headers().get(HttpHeaderNames.CONTENT_LENGTH))) {
// Remove the content-length header if it is 0 and the SDK did not set it.
nettyRequest.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
}
}
ctx.write(msg, promise);
}
}));

// Proxy configurations are present, set up a proxy in Netty.
if (buildProxyOptions != null) {
// Determine if custom handling will be used, otherwise use Netty's built-in handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.Version;
import reactor.netty.Connection;
import reactor.netty.channel.ChannelOperations;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -44,6 +43,11 @@ public final class NettyUtility {
// Netty artifact that should match the 'netty-tcnative.version' property in the pom.xml file.
private static final String NETTY_TCNATIVE_VERSION_ARTIFACT = "netty-tcnative-boringssl-static";

/**
* Key for the context to indicate that the content length was not set by the SDK.
*/
public static final String DID_NOT_SET_CONTENT_LENGTH_CONTEXT_KEY = "sdk-did-not-set-content-length";

/**
* Deep copies the passed {@link ByteBuf} into a {@link ByteBuffer}.
* <p>
Expand All @@ -65,21 +69,8 @@ public static ByteBuffer deepCopyBuffer(ByteBuf byteBuf) {
* @param reactorNettyConnection The connection to close.
*/
public static void closeConnection(Connection reactorNettyConnection) {
// ChannelOperations is generally the default implementation of Connection used.
//
// Using the specific subclass allows for a finer grain handling.
if (reactorNettyConnection instanceof ChannelOperations) {
ChannelOperations<?, ?> channelOperations = (ChannelOperations<?, ?>) reactorNettyConnection;

// Given that this is an HttpResponse the only time this will be called is when the outbound has completed.
//
// From there the only thing that needs to be checked is whether the inbound has been disposed (completed),
// and if not dispose it (aka drain it).
if (!channelOperations.isInboundDisposed()) {
channelOperations.channel().eventLoop().execute(channelOperations::discard);
}
} else if (!reactorNettyConnection.isDisposed()) {
reactorNettyConnection.channel().eventLoop().execute(reactorNettyConnection::dispose);
if (!reactorNettyConnection.isDisposed()) {
reactorNettyConnection.dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.netty.channel.ChannelOption;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.http.client.HttpClientConfig;

import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Tests {@link NettyAsyncHttpClientProvider}.
Expand All @@ -35,10 +36,7 @@ public void nullOptionsReturnsBaseClient() {
if (environmentProxy == null) {
assertFalse(httpClient.nettyClient.configuration().hasProxy());
} else {
assertTrue(httpClient.nettyClient.configuration().hasProxy());

ProxyProvider proxyProvider = httpClient.nettyClient.configuration().proxyProvider();
assertEquals(environmentProxy.getAddress(), proxyProvider.getAddress().get());
verifyProxyAddress(environmentProxy, httpClient.nettyClient.configuration());
}
}

Expand All @@ -51,10 +49,7 @@ public void defaultOptionsReturnsBaseClient() {
if (environmentProxy == null) {
assertFalse(httpClient.nettyClient.configuration().hasProxy());
} else {
assertTrue(httpClient.nettyClient.configuration().hasProxy());

ProxyProvider proxyProvider = httpClient.nettyClient.configuration().proxyProvider();
assertEquals(environmentProxy.getAddress(), proxyProvider.getAddress().get());
verifyProxyAddress(environmentProxy, httpClient.nettyClient.configuration());
}
}

Expand All @@ -66,10 +61,20 @@ public void optionsWithAProxy() {
NettyAsyncHttpClient httpClient
= (NettyAsyncHttpClient) new NettyAsyncHttpClientProvider().createInstance(clientOptions);

assertTrue(httpClient.nettyClient.configuration().hasProxy());
verifyProxyAddress(proxyOptions, httpClient.nettyClient.configuration());
}

@SuppressWarnings("deprecation")
private static void verifyProxyAddress(ProxyOptions proxyOptions, HttpClientConfig httpClientConfig) {
assertTrue(httpClientConfig.hasProxy());

ProxyProvider proxyProvider = httpClient.nettyClient.configuration().proxyProvider();
assertEquals(proxyOptions.getAddress(), proxyProvider.getAddress().get());
if (httpClientConfig.proxyProvider() != null) {
assertEquals(proxyOptions.getAddress(), httpClientConfig.proxyProvider().getAddress().get());
} else if (httpClientConfig.proxyProviderSupplier() != null) {
assertEquals(proxyOptions.getAddress(), httpClientConfig.proxyProviderSupplier().get().getAddress().get());
} else {
fail("No proxy provider or proxy provider supplier found in the http client configuration.");
}
}

@Test
Expand Down Expand Up @@ -121,7 +126,7 @@ public void testIncorrectExplicitProvider() {
assertThrows(IllegalStateException.class, () -> HttpClient.createDefault(options));
}

class AnotherHttpClientProvider implements HttpClientProvider {
static class AnotherHttpClientProvider implements HttpClientProvider {
@Override
public HttpClient createInstance() {
throw new IllegalStateException("should never be called");
Expand Down
Loading
Loading