diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts index b6f3ce7536d6..883d01381831 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts @@ -29,8 +29,11 @@ testing { suites { val version35Test by registering(JvmTestSuite::class) { dependencies { - implementation("org.hsqldb:hsqldb:2.3.4") + // this only exists to make Intellij happy since it doesn't (currently at least) understand our + // inclusion of this artifact inside :testing-common + compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow")) + implementation("org.hsqldb:hsqldb:2.3.4") compileOnly("io.vertx:vertx-codegen:$vertxVersion") implementation("io.vertx:vertx-web:$vertxVersion") implementation("io.vertx:vertx-rx-java2:$vertxVersion") @@ -42,8 +45,11 @@ testing { val latestDepTest by registering(JvmTestSuite::class) { dependencies { - implementation("org.hsqldb:hsqldb:2.3.4") + // this only exists to make Intellij happy since it doesn't (currently at least) understand our + // inclusion of this artifact inside :testing-common + compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow")) + implementation("org.hsqldb:hsqldb:2.3.4") implementation("io.vertx:vertx-web:latest.release") implementation("io.vertx:vertx-rx-java2:latest.release") implementation("io.vertx:vertx-web-client:latest.release") diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/VertxReactivePropagationTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/VertxReactivePropagationTest.groovy deleted file mode 100644 index bcb75491eefc..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/VertxReactivePropagationTest.groovy +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.context.Context -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.ClientAttributes -import io.opentelemetry.semconv.HttpAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.UrlAttributes -import io.opentelemetry.semconv.UserAgentAttributes -import io.opentelemetry.semconv.incubating.DbIncubatingAttributes -import io.opentelemetry.testing.internal.armeria.client.WebClient -import io.opentelemetry.testing.internal.armeria.common.HttpRequest -import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder -import io.vertx.reactivex.core.Vertx -import spock.lang.Shared - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors - -import static VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE -import static VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv -import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxReactivePropagationTest extends AgentInstrumentationSpecification { - @Shared - WebClient client - - @Shared - int port - - @Shared - Vertx server - - def setupSpec() { - port = PortUtils.findOpenPort() - server = VertxReactiveWebServer.start(port) - client = WebClient.of("h1c://localhost:${port}") - } - - def cleanupSpec() { - server.close() - } - - //Verifies that context is correctly propagated and sql query span has correct parent. - //Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation - @SuppressWarnings("deprecation") // TODO DbIncubatingAttributes.DB_CONNECTION_STRING deprecation - def "should propagate context over vert.x rx-java framework"() { - setup: - def response = client.get("/listProducts").aggregate().join() - - expect: - response.status().code() == SUCCESS.status - - and: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "GET /listProducts" - kind SERVER - hasNoParent() - attributes { - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" Long - "$ClientAttributes.CLIENT_ADDRESS" "127.0.0.1" - "$UrlAttributes.URL_PATH" "/listProducts" - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_SCHEME" "http" - "$UserAgentAttributes.USER_AGENT_ORIGINAL" String - "$HttpAttributes.HTTP_ROUTE" "/listProducts" - } - } - span(1) { - name "handleListProducts" - kind SpanKind.INTERNAL - childOf span(0) - } - span(2) { - name "listProducts" - kind SpanKind.INTERNAL - childOf span(1) - } - span(3) { - name "SELECT test.products" - kind CLIENT - childOf span(2) - attributes { - "$DbIncubatingAttributes.DB_SYSTEM" "hsqldb" - "${maybeStable(DbIncubatingAttributes.DB_NAME)}" "test" - "$DbIncubatingAttributes.DB_USER" emitStableDatabaseSemconv() ? null : "SA" - "$DbIncubatingAttributes.DB_CONNECTION_STRING" emitStableDatabaseSemconv() ? null : "hsqldb:mem:" - "${maybeStable(DbIncubatingAttributes.DB_STATEMENT)}" "SELECT id, name, price, weight FROM products" - "${maybeStable(DbIncubatingAttributes.DB_OPERATION)}" "SELECT" - "${maybeStable(DbIncubatingAttributes.DB_SQL_TABLE)}" "products" - } - } - } - } - } - - @SuppressWarnings("deprecation") // TODO DbIncubatingAttributes.DB_CONNECTION_STRING deprecation - def "should propagate context correctly over vert.x rx-java framework with high concurrency"() { - setup: - int count = 100 - def baseUrl = "/listProducts" - def latch = new CountDownLatch(1) - - def pool = Executors.newFixedThreadPool(8) - def propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() - def setter = { HttpRequestBuilder carrier, String name, String value -> - carrier.header(name, value) - } - - when: - count.times { index -> - def job = { - latch.await() - runWithSpan("client " + index) { - HttpRequestBuilder builder = HttpRequest.builder() - .get("${baseUrl}?${TEST_REQUEST_ID_PARAMETER}=${index}") - Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index) - propagator.inject(Context.current(), builder, setter) - client.execute(builder.build()).aggregate().join() - } - } - pool.submit(job) - } - - latch.countDown() - - then: - assertTraces(count) { - (0..count - 1).each { - trace(it, 5) { - def rootSpan = it.span(0) - def requestId = Long.valueOf(rootSpan.name.substring("client ".length())) - - span(0) { - name "client $requestId" - kind SpanKind.INTERNAL - hasNoParent() - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(1) { - name "GET /listProducts" - kind SERVER - childOf(span(0)) - attributes { - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" Long - "$ClientAttributes.CLIENT_ADDRESS" "127.0.0.1" - "$UrlAttributes.URL_PATH" baseUrl - "$UrlAttributes.URL_QUERY" "$TEST_REQUEST_ID_PARAMETER=$requestId" - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_SCHEME" "http" - "$UserAgentAttributes.USER_AGENT_ORIGINAL" String - "$HttpAttributes.HTTP_ROUTE" "/listProducts" - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(2) { - name "handleListProducts" - kind SpanKind.INTERNAL - childOf(span(1)) - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(3) { - name "listProducts" - kind SpanKind.INTERNAL - childOf(span(2)) - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(4) { - name "SELECT test.products" - kind CLIENT - childOf(span(3)) - attributes { - "$DbIncubatingAttributes.DB_SYSTEM" "hsqldb" - "${maybeStable(DbIncubatingAttributes.DB_NAME)}" "test" - "$DbIncubatingAttributes.DB_USER" emitStableDatabaseSemconv() ? null : "SA" - "$DbIncubatingAttributes.DB_CONNECTION_STRING" emitStableDatabaseSemconv() ? null : "hsqldb:mem:" - "${maybeStable(DbIncubatingAttributes.DB_STATEMENT)}" "SELECT id AS request$requestId, name, price, weight FROM products" - "${maybeStable(DbIncubatingAttributes.DB_OPERATION)}" "SELECT" - "${maybeStable(DbIncubatingAttributes.DB_SQL_TABLE)}" "products" - } - } - } - } - } - - cleanup: - pool.shutdownNow() - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy deleted file mode 100644 index f646d0b14c08..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package client - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import io.vertx.circuitbreaker.CircuitBreakerOptions -import io.vertx.core.AsyncResult -import io.vertx.core.VertxOptions -import io.vertx.core.http.HttpMethod -import io.vertx.ext.web.client.WebClientOptions -import io.vertx.reactivex.circuitbreaker.CircuitBreaker -import io.vertx.reactivex.core.Vertx -import io.vertx.reactivex.ext.web.client.HttpRequest -import io.vertx.reactivex.ext.web.client.WebClient -import spock.lang.Shared - -import java.util.concurrent.CompletableFuture -import java.util.function.Consumer - -class VertxRxCircuitBreakerWebClientTest extends HttpClientTest> implements AgentTestTrait { - - @Shared - Vertx vertx = Vertx.vertx(new VertxOptions()) - @Shared - def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) - @Shared - WebClient client = WebClient.create(vertx, clientOptions) - @Shared - CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions() - .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. - ) - - @Override - HttpRequest buildRequest(String method, URI uri, Map headers) { - def request = client.requestAbs(HttpMethod.valueOf(method), "$uri") - headers.each { request.putHeader(it.key, it.value) } - return request - } - - @Override - int sendRequest(HttpRequest request, String method, URI uri, Map headers) { - // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through - // a callback. - CompletableFuture future = new CompletableFuture<>() - sendRequestWithCallback(request) { - if (it.succeeded()) { - future.complete(it.result().statusCode()) - } else { - future.completeExceptionally(it.cause()) - } - } - return future.get() - } - - void sendRequestWithCallback(HttpRequest request, Consumer consumer) { - breaker.execute({ command -> - request.rxSend().subscribe({ response -> - command.complete(response) - }, { throwable -> - command.fail(throwable) - }) - }, { - consumer.accept(it) - }) - } - - @Override - void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, HttpClientResult requestResult) { - sendRequestWithCallback(request) { - if (it.succeeded()) { - requestResult.complete(it.result().statusCode()) - } else { - requestResult.complete(it.cause()) - } - } - } - - @Override - String expectedClientSpanName(URI uri, String method) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return "CONNECT" - default: - return super.expectedClientSpanName(uri, method) - } - } - - @Override - Set> httpAttributes(URI uri) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return [] - } - return super.httpAttributes(uri) - } - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testHttps() { - false - } - - @Override - boolean testReadTimeout() { - false - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - return new VertxRxCircuitBreakerSingleConnection(host, port, breaker) - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy deleted file mode 100644 index 4abac2d6218a..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package client - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import io.vertx.core.VertxOptions -import io.vertx.core.http.HttpMethod -import io.vertx.ext.web.client.WebClientOptions -import io.vertx.reactivex.core.Vertx -import io.vertx.reactivex.core.buffer.Buffer -import io.vertx.reactivex.ext.web.client.HttpRequest -import io.vertx.reactivex.ext.web.client.HttpResponse -import io.vertx.reactivex.ext.web.client.WebClient -import spock.lang.Shared - -class VertxRxWebClientTest extends HttpClientTest> implements AgentTestTrait { - - @Shared - Vertx vertx = Vertx.vertx(new VertxOptions()) - @Shared - def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) - @Shared - WebClient client = WebClient.create(vertx, clientOptions) - - @Override - HttpRequest buildRequest(String method, URI uri, Map headers) { - def request = client.requestAbs(HttpMethod.valueOf(method), "$uri") - headers.each { request.putHeader(it.key, it.value) } - return request - } - - @Override - int sendRequest(HttpRequest request, String method, URI uri, Map headers) { - return request.rxSend().blockingGet().statusCode() - } - - @Override - void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, HttpClientResult requestResult) { - request.rxSend() - .subscribe(new io.reactivex.functions.Consumer>() { - @Override - void accept(HttpResponse httpResponse) throws Exception { - requestResult.complete(httpResponse.statusCode()) - } - }, new io.reactivex.functions.Consumer() { - @Override - void accept(Throwable throwable) throws Exception { - requestResult.complete(throwable) - } - }) - } - - @Override - String expectedClientSpanName(URI uri, String method) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return "CONNECT" - default: - return super.expectedClientSpanName(uri, method) - } - } - - @Override - Throwable clientSpanError(URI uri, Throwable exception) { - if (exception.class == RuntimeException) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - exception = exception.getCause() - } - } - return exception - } - - @Override - Set> httpAttributes(URI uri) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return [] - } - return super.httpAttributes(uri) - } - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testHttps() { - false - } - - @Override - boolean testReadTimeout() { - false - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - return new VertxRxSingleConnection(host, port) - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy deleted file mode 100644 index 50f2899b41ff..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package server - -import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint -import io.vertx.circuitbreaker.CircuitBreakerOptions -import io.vertx.core.Promise -import io.vertx.reactivex.circuitbreaker.CircuitBreaker -import io.vertx.reactivex.core.AbstractVerticle -import io.vertx.reactivex.ext.web.Router - -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest { - - @Override - protected Class verticle() { - return VertxRxCircuitBreakerWebTestServer - } - - static class VertxRxCircuitBreakerWebTestServer extends AbstractVerticle { - - @Override - void start(Promise startPromise) { - int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) - Router router = Router.router(super.@vertx) - CircuitBreaker breaker = - CircuitBreaker.create( - "my-circuit-breaker", - super.@vertx, - new CircuitBreakerOptions() - .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. - ) - - router.route(SUCCESS.path).handler { ctx -> - breaker.execute({ future -> - future.complete(SUCCESS) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(endpoint.body) - } - }) - } - router.route(INDEXED_CHILD.path).handler { ctx -> - breaker.execute({ future -> - future.complete(INDEXED_CHILD) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - endpoint.collectSpanAttributes { ctx.request().params().get(it) } - ctx.response().setStatusCode(endpoint.status).end() - } - }) - } - router.route(QUERY_PARAM.path).handler { ctx -> - breaker.execute({ future -> - future.complete(QUERY_PARAM) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(ctx.request().query()) - } - }) - } - router.route(REDIRECT.path).handler { ctx -> - breaker.execute({ future -> - future.complete(REDIRECT) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).putHeader("location", endpoint.body).end() - } - }) - } - router.route(ERROR.path).handler { ctx -> - breaker.execute({ future -> - future.complete(ERROR) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(endpoint.body) - } - }) - } - router.route(EXCEPTION.path).handler { ctx -> - breaker.execute({ future -> - future.fail(new Exception(EXCEPTION.body)) - }, { - try { - def cause = it.cause() - controller(EXCEPTION) { - throw cause - } - } catch (Exception ex) { - ctx.response().setStatusCode(EXCEPTION.status).end(ex.message) - } - }) - } - router.route("/path/:id/param").handler { ctx -> - breaker.execute({ future -> - future.complete(PATH_PARAM) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(ctx.request().getParam("id")) - } - }) - } - router.route(CAPTURE_HEADERS.path).handler { ctx -> - breaker.execute({ future -> - future.complete(CAPTURE_HEADERS) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status) - .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) - .end(endpoint.body) - } - }) - } - - - super.@vertx.createHttpServer() - .requestHandler(router) - .listen(port) { startPromise.complete() } - } - } - - @Override - boolean hasExceptionOnServerSpan(ServerEndpoint endpoint) { - return endpoint != EXCEPTION && super.hasExceptionOnServerSpan(endpoint) - } - - @Override - boolean testHttpPipelining() { - false - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy deleted file mode 100644 index a8f6ad7e7fa7..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package server - -import io.opentelemetry.instrumentation.api.internal.HttpConstants -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpServerTest -import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint -import io.vertx.core.DeploymentOptions -import io.vertx.core.Promise -import io.vertx.core.Vertx -import io.vertx.core.VertxOptions -import io.vertx.core.json.JsonObject -import io.vertx.reactivex.core.AbstractVerticle -import io.vertx.reactivex.ext.web.Router - -import java.util.concurrent.CompletableFuture -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxRxHttpServerTest extends HttpServerTest implements AgentTestTrait { - public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port" - - @Override - Vertx startServer(int port) { - Vertx server = Vertx.vertx(new VertxOptions() - // Useful for debugging: - // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) - ) - CompletableFuture future = new CompletableFuture<>() - server.deployVerticle(verticle().getName(), - new DeploymentOptions() - .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) - .setInstances(3)) { res -> - if (!res.succeeded()) { - throw new IllegalStateException("Cannot deploy server Verticle", res.cause()) - } - future.complete(null) - } - - future.get(30, TimeUnit.SECONDS) - return server - } - - @Override - void stopServer(Vertx server) { - server.close() - } - - @Override - boolean testPathParam() { - return true - } - - @Override - boolean verifyServerSpanEndTime() { - // server spans are ended inside of the controller spans - return false - } - - @Override - String expectedHttpRoute(ServerEndpoint endpoint, String method) { - if (method == HttpConstants._OTHER) { - return getContextPath() + endpoint.path - } - return super.expectedHttpRoute(endpoint, method) - } - - protected Class verticle() { - return VertxReactiveWebServer - } - - static class VertxReactiveWebServer extends AbstractVerticle { - - @Override - void start(Promise startPromise) { - int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) - Router router = Router.router(super.@vertx) - - router.route(SUCCESS.path).handler { ctx -> - controller(SUCCESS) { - ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body) - } - } - router.route(INDEXED_CHILD.path).handler { ctx -> - controller(INDEXED_CHILD) { - INDEXED_CHILD.collectSpanAttributes { ctx.request().params().get(it) } - ctx.response().setStatusCode(INDEXED_CHILD.status).end() - } - } - router.route(QUERY_PARAM.path).handler { ctx -> - controller(QUERY_PARAM) { - ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query()) - } - } - router.route(REDIRECT.path).handler { ctx -> - controller(REDIRECT) { - ctx.response().setStatusCode(REDIRECT.status).putHeader("location", REDIRECT.body).end() - } - } - router.route(ERROR.path).handler { ctx -> - controller(ERROR) { - ctx.response().setStatusCode(ERROR.status).end(ERROR.body) - } - } - router.route(EXCEPTION.path).handler { ctx -> - controller(EXCEPTION) { - throw new Exception(EXCEPTION.body) - } - } - router.route("/path/:id/param").handler { ctx -> - controller(PATH_PARAM) { - ctx.response().setStatusCode(PATH_PARAM.status).end(ctx.request().getParam("id")) - } - } - router.route(CAPTURE_HEADERS.path).handler { ctx -> - controller(CAPTURE_HEADERS) { - ctx.response().setStatusCode(CAPTURE_HEADERS.status) - .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) - .end(CAPTURE_HEADERS.body) - } - } - - super.@vertx.createHttpServer() - .requestHandler(router) - .listen(port) { startPromise.complete() } - } - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java similarity index 94% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java index fc1020e2ebbc..4ea115fc571d 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package client; +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; import io.vertx.core.AsyncResult; import io.vertx.reactivex.circuitbreaker.CircuitBreaker; diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java new file mode 100644 index 000000000000..5922237f2a2d --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java @@ -0,0 +1,138 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.AsyncResult; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxCircuitBreakerWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final Vertx vertx = Vertx.vertx(new VertxOptions()); + private final WebClient httpClient = buildClient(vertx); + private final CircuitBreaker breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + new CircuitBreakerOptions() + .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. + ); + + private static WebClient buildClient(Vertx vertx) { + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) + throws ExecutionException, InterruptedException { + // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through + // a callback. + CompletableFuture future = new CompletableFuture<>(); + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + future.complete(result.result().statusCode()); + } else { + future.completeExceptionally(result.cause()); + } + }); + + return future.get(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void sendRequestWithCallback( + HttpRequest request, Consumer>> consumer) { + breaker.execute( + command -> request.rxSend().subscribe(command::complete, command::fail), consumer::accept); + } + + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + requestResult.complete(result.result().statusCode()); + } else { + requestResult.complete(result.cause()); + } + }); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.setHttpAttributes(VertxRxCircuitBreakerWebClientTest::getHttpAttributes); + optionsBuilder.setExpectedClientSpanNameMapper( + VertxRxCircuitBreakerWebClientTest::expectedClientSpanName); + optionsBuilder.setSingleConnectionFactory( + (host, port) -> new VertxRxCircuitBreakerSingleConnection(host, port, breaker)); + } + + private static Set> getHttpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return Collections.emptySet(); + default: + return HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES; + } + } + + private static String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java similarity index 96% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java index 8620eb2b18c0..2a5bc1fe2f69 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package client; +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection; import io.vertx.core.VertxOptions; diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java new file mode 100644 index 000000000000..0c4175abf7b4 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java @@ -0,0 +1,116 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.reactivex.functions.Consumer; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final WebClient httpClient = buildClient(); + + private static WebClient buildClient() { + Vertx vertx = Vertx.vertx(new VertxOptions()); + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) { + return request.rxSend().blockingGet().statusCode(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + request + .rxSend() + .subscribe( + (Consumer>) + httpResponse -> requestResult.complete(httpResponse.statusCode()), + requestResult::complete); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.setHttpAttributes(VertxRxWebClientTest::getHttpAttributes); + optionsBuilder.setClientSpanErrorMapper(VertxRxWebClientTest::clientSpanError); + optionsBuilder.setExpectedClientSpanNameMapper(VertxRxWebClientTest::expectedClientSpanName); + optionsBuilder.setSingleConnectionFactory(VertxRxSingleConnection::new); + } + + private static Set> getHttpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return Collections.emptySet(); + default: + return HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES; + } + } + + private static Throwable clientSpanError(URI uri, Throwable exception) { + if (exception.getClass() == RuntimeException.class) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + exception = exception.getCause(); + break; + default: + break; + } + } + return exception; + } + + private static String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java new file mode 100644 index 000000000000..b2b03f85bbfb --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java @@ -0,0 +1,108 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; + +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; + +abstract class AbstractVertxRxVerticle extends AbstractVerticle { + + abstract void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action); + + void configure(Router router) { + router + .route(SUCCESS.getPath()) + .handler( + ctx -> + handle( + ctx, + SUCCESS, + () -> + ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody()))); + + router + .route(INDEXED_CHILD.getPath()) + .handler( + ctx -> + handle( + ctx, + INDEXED_CHILD, + () -> { + INDEXED_CHILD.collectSpanAttributes( + parameter -> ctx.request().params().get(parameter)); + ctx.response().setStatusCode(INDEXED_CHILD.getStatus()).end(); + })); + + router + .route(QUERY_PARAM.getPath()) + .handler( + ctx -> + handle( + ctx, + QUERY_PARAM, + () -> + ctx.response() + .setStatusCode(QUERY_PARAM.getStatus()) + .end(ctx.request().query()))); + + router + .route(REDIRECT.getPath()) + .handler( + ctx -> + handle( + ctx, + REDIRECT, + () -> + ctx.response() + .setStatusCode(REDIRECT.getStatus()) + .putHeader("location", REDIRECT.getBody()) + .end())); + + router + .route(ERROR.getPath()) + .handler( + ctx -> + handle( + ctx, + ERROR, + () -> ctx.response().setStatusCode(ERROR.getStatus()).end(ERROR.getBody()))); + + router + .route("/path/:id/param") + .handler( + ctx -> + handle( + ctx, + PATH_PARAM, + () -> + ctx.response() + .setStatusCode(PATH_PARAM.getStatus()) + .end(ctx.request().getParam("id")))); + + router + .route(CAPTURE_HEADERS.getPath()) + .handler( + ctx -> + handle( + ctx, + CAPTURE_HEADERS, + () -> + ctx.response() + .setStatusCode(CAPTURE_HEADERS.getStatus()) + .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) + .end(CAPTURE_HEADERS.getBody()))); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java new file mode 100644 index 000000000000..441120ded812 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java @@ -0,0 +1,246 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv; +import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ClientAttributes.CLIENT_ADDRESS; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_ROUTE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_ADDRESS; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_PORT; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_PATH; +import static io.opentelemetry.semconv.UrlAttributes.URL_QUERY; +import static io.opentelemetry.semconv.UrlAttributes.URL_SCHEME; +import static io.opentelemetry.semconv.UserAgentAttributes.USER_AGENT_ORIGINAL; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_CONNECTION_STRING; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_OPERATION; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SQL_TABLE; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_STATEMENT; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.testing.internal.armeria.client.WebClient; +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse; +import io.opentelemetry.testing.internal.armeria.common.HttpRequest; +import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder; +import io.vertx.reactivex.core.Vertx; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxReactivePropagationTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + private static WebClient client; + private static int port; + private static Vertx server; + + @BeforeAll + static void setUp() throws ExecutionException, InterruptedException, TimeoutException { + port = PortUtils.findOpenPort(); + server = VertxReactiveWebServer.start(port); + client = WebClient.of("h1c://localhost:" + port); + } + + @AfterAll + static void cleanUp() { + server.close(); + } + + // Verifies that context is correctly propagated and sql query span has correct parent. + // Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void contextPropagation() { + AggregatedHttpResponse response = client.get("/listProducts").aggregate().join(); + assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, "/listProducts"), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts")), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("SELECT test.products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(DB_SYSTEM, "hsqldb"), + equalTo(maybeStable(DB_NAME), "test"), + equalTo(DB_USER, emitStableDatabaseSemconv() ? null : "SA"), + equalTo( + DB_CONNECTION_STRING, + emitStableDatabaseSemconv() ? null : "hsqldb:mem:"), + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id, name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products")))); + } + + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void highConcurrency() { + int count = 100; + String baseUrl = "/listProducts"; + CountDownLatch latch = new CountDownLatch(1); + + ExecutorService pool = Executors.newFixedThreadPool(8); + cleanup.deferCleanup(pool::shutdownNow); + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + TextMapSetter setter = + (carrier, name, value) -> carrier.header(name, value); + + for (int i = 0; i < count; i++) { + int index = i; + pool.submit( + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + }); + } + + latch.countDown(); + + List> assertions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + assertions.add( + trace -> { + long requestId = + Long.parseLong(trace.getSpan(0).getName().substring("client ".length())); + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("client " + requestId) + .hasKind(SpanKind.INTERNAL) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, baseUrl), + equalTo(URL_QUERY, TEST_REQUEST_ID_PARAMETER + "=" + requestId), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts"), + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("SELECT test.products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(3)) + .hasAttributesSatisfyingExactly( + equalTo(DB_SYSTEM, "hsqldb"), + equalTo(maybeStable(DB_NAME), "test"), + equalTo(DB_USER, emitStableDatabaseSemconv() ? null : "SA"), + equalTo( + DB_CONNECTION_STRING, + emitStableDatabaseSemconv() ? null : "hsqldb:mem:"), + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id AS request" + + requestId + + ", name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products"))); + }); + } + testing.waitAndAssertTraces(assertions); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java similarity index 98% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java index 77cef04c0aad..321fae4215cc 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; import io.opentelemetry.api.GlobalOpenTelemetry; diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java new file mode 100644 index 000000000000..ef2e0de1dee7 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.Promise; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; + +class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest { + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestHttpPipelining(false); + options.setHasExceptionOnServerSpan(endpoint -> endpoint != EXCEPTION); + } + + @Override + protected Class verticle() { + return VertxRxCircuitBreakerWebTestServer.class; + } + + public static class VertxRxCircuitBreakerWebTestServer extends AbstractVertxRxVerticle { + CircuitBreaker breaker; + + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + breaker.execute( + future -> future.complete(endpoint), + result -> { + if (result.failed()) { + throw new IllegalStateException(result.cause()); + } + controller(endpoint, action::run); + }); + } + + @Override + public void start(Promise startPromise) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + // Disable the timeout otherwise it makes each test take this long. + new CircuitBreakerOptions().setTimeout(-1)); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + breaker.execute( + future -> future.fail(new IllegalStateException(EXCEPTION.getBody())), + result -> { + try { + Throwable cause = result.cause(); + controller( + EXCEPTION, + () -> { + throw cause; + }); + } catch (Throwable throwable) { + ctx.response() + .setStatusCode(EXCEPTION.getStatus()) + .end(throwable.getMessage()); + } + })); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port, httpServerAsyncResult -> startPromise.complete()); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java new file mode 100644 index 000000000000..4c4393f7b299 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.api.internal.HttpConstants; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxHttpServerTest extends AbstractHttpServerTest { + static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; + + @RegisterExtension + static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent(); + + @Override + protected Vertx setupServer() throws Exception { + Vertx server = + Vertx.vertx( + new VertxOptions() + // Useful for debugging: + // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) + ); + CompletableFuture future = new CompletableFuture<>(); + server.deployVerticle( + verticle().getName(), + new DeploymentOptions() + .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) + .setInstances(3), + result -> { + if (!result.succeeded()) { + throw new IllegalStateException("Cannot deploy server Verticle", result.cause()); + } + future.complete(null); + }); + + future.get(30, TimeUnit.SECONDS); + return server; + } + + @Override + protected void stopServer(Vertx vertx) { + vertx.close(); + } + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestPathParam(true); + // server spans are ended inside the controller spans + options.setVerifyServerSpanEndTime(false); + options.setExpectedHttpRoute( + (endpoint, method) -> { + if (HttpConstants._OTHER.equals(method)) { + return getContextPath() + endpoint.getPath(); + } + return expectedHttpRoute(endpoint, method); + }); + } + + protected Class verticle() { + return VertxReactiveWebServer.class; + } + + public static class VertxReactiveWebServer extends AbstractVertxRxVerticle { + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + controller(endpoint, action::run); + } + + @Override + public void start(Promise startFuture) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + handle( + ctx, + EXCEPTION, + () -> { + throw new IllegalStateException(EXCEPTION.getBody()); + })); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port, httpServerAsyncResult -> startFuture.complete()); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy deleted file mode 100644 index 51cf2101432f..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.context.Context -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.ClientAttributes -import io.opentelemetry.semconv.HttpAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.UrlAttributes -import io.opentelemetry.semconv.UserAgentAttributes -import io.opentelemetry.semconv.incubating.DbIncubatingAttributes -import io.opentelemetry.testing.internal.armeria.client.WebClient -import io.opentelemetry.testing.internal.armeria.common.HttpRequest -import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder -import io.vertx.reactivex.core.Vertx -import spock.lang.Shared - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors - -import static VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE -import static VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv -import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxReactivePropagationTest extends AgentInstrumentationSpecification { - @Shared - WebClient client - - @Shared - int port - - @Shared - Vertx server - - def setupSpec() { - port = PortUtils.findOpenPort() - server = VertxReactiveWebServer.start(port) - client = WebClient.of("h1c://localhost:${port}") - } - - def cleanupSpec() { - server.close() - } - - //Verifies that context is correctly propagated and sql query span has correct parent. - //Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation - @SuppressWarnings("deprecation") // TODO DbIncubatingAttributes.DB_CONNECTION_STRING deprecation - def "should propagate context over vert.x rx-java framework"() { - setup: - def response = client.get("/listProducts").aggregate().join() - - expect: - response.status().code() == SUCCESS.status - - and: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "GET /listProducts" - kind SERVER - hasNoParent() - attributes { - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" Long - "$ClientAttributes.CLIENT_ADDRESS" "127.0.0.1" - "$UrlAttributes.URL_PATH" "/listProducts" - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_SCHEME" "http" - "$UserAgentAttributes.USER_AGENT_ORIGINAL" String - "$HttpAttributes.HTTP_ROUTE" "/listProducts" - } - } - span(1) { - name "handleListProducts" - kind SpanKind.INTERNAL - childOf span(0) - } - span(2) { - name "listProducts" - kind SpanKind.INTERNAL - childOf span(1) - } - span(3) { - name "SELECT test.products" - kind CLIENT - childOf span(2) - attributes { - "$DbIncubatingAttributes.DB_SYSTEM" "hsqldb" - "${maybeStable(DbIncubatingAttributes.DB_NAME)}" "test" - "$DbIncubatingAttributes.DB_USER" emitStableDatabaseSemconv() ? null : "SA" - "$DbIncubatingAttributes.DB_CONNECTION_STRING" emitStableDatabaseSemconv() ? null : "hsqldb:mem:" - "${maybeStable(DbIncubatingAttributes.DB_STATEMENT)}" "SELECT id, name, price, weight FROM products" - "${maybeStable(DbIncubatingAttributes.DB_OPERATION)}" "SELECT" - "${maybeStable(DbIncubatingAttributes.DB_SQL_TABLE)}" "products" - } - } - } - } - } - - def "should propagate context correctly over vert.x rx-java framework with high concurrency"() { - setup: - int count = 100 - def baseUrl = "/listProducts" - def latch = new CountDownLatch(1) - - def pool = Executors.newFixedThreadPool(8) - def propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() - def setter = { HttpRequestBuilder carrier, String name, String value -> - carrier.header(name, value) - } - - when: - count.times { index -> - def job = { - latch.await() - runWithSpan("client " + index) { - HttpRequestBuilder builder = HttpRequest.builder() - .get("${baseUrl}?${TEST_REQUEST_ID_PARAMETER}=${index}") - Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index) - propagator.inject(Context.current(), builder, setter) - client.execute(builder.build()).aggregate().join() - } - } - pool.submit(job) - } - - latch.countDown() - - then: - assertTraces(count) { - (0..count - 1).each { - trace(it, 5) { - def rootSpan = it.span(0) - def requestId = Long.valueOf(rootSpan.name.substring("client ".length())) - - span(0) { - name "client $requestId" - kind SpanKind.INTERNAL - hasNoParent() - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(1) { - name "GET /listProducts" - kind SERVER - childOf(span(0)) - attributes { - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" Long - "$ClientAttributes.CLIENT_ADDRESS" "127.0.0.1" - "$UrlAttributes.URL_PATH" baseUrl - "$UrlAttributes.URL_QUERY" "$TEST_REQUEST_ID_PARAMETER=$requestId" - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_SCHEME" "http" - "$UserAgentAttributes.USER_AGENT_ORIGINAL" String - "$HttpAttributes.HTTP_ROUTE" "/listProducts" - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(2) { - name "handleListProducts" - kind SpanKind.INTERNAL - childOf(span(1)) - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(3) { - name "listProducts" - kind SpanKind.INTERNAL - childOf(span(2)) - attributes { - "${TEST_REQUEST_ID_ATTRIBUTE}" requestId - } - } - span(4) { - name "SELECT test.products" - kind CLIENT - childOf(span(3)) - attributes { - "$DbIncubatingAttributes.DB_SYSTEM" "hsqldb" - "${maybeStable(DbIncubatingAttributes.DB_NAME)}" "test" - "$DbIncubatingAttributes.DB_USER" emitStableDatabaseSemconv() ? null : "SA" - "$DbIncubatingAttributes.DB_CONNECTION_STRING" emitStableDatabaseSemconv() ? null : "hsqldb:mem:" - "${maybeStable(DbIncubatingAttributes.DB_STATEMENT)}" "SELECT id AS request$requestId, name, price, weight FROM products" - "${maybeStable(DbIncubatingAttributes.DB_OPERATION)}" "SELECT" - "${maybeStable(DbIncubatingAttributes.DB_SQL_TABLE)}" "products" - } - } - } - } - } - - cleanup: - pool.shutdownNow() - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy deleted file mode 100644 index e7168361c0e4..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package client - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.vertx.circuitbreaker.CircuitBreakerOptions -import io.vertx.core.AsyncResult -import io.vertx.core.VertxOptions -import io.vertx.core.http.HttpMethod -import io.vertx.ext.web.client.WebClientOptions -import io.vertx.reactivex.circuitbreaker.CircuitBreaker -import io.vertx.reactivex.core.Vertx -import io.vertx.reactivex.ext.web.client.HttpRequest -import io.vertx.reactivex.ext.web.client.WebClient -import spock.lang.Shared - -import java.util.concurrent.CompletableFuture -import java.util.function.Consumer - -class VertxRxCircuitBreakerWebClientTest extends HttpClientTest> implements AgentTestTrait { - - @Shared - Vertx vertx = Vertx.vertx(new VertxOptions()) - @Shared - def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) - @Shared - WebClient client = WebClient.create(vertx, clientOptions) - @Shared - CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions() - .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. - ) - - @Override - HttpRequest buildRequest(String method, URI uri, Map headers) { - def request = client.requestAbs(HttpMethod.valueOf(method), "$uri") - headers.each { request.putHeader(it.key, it.value) } - return request - } - - @Override - int sendRequest(HttpRequest request, String method, URI uri, Map headers) { - // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through - // a callback. - CompletableFuture future = new CompletableFuture<>() - sendRequestWithCallback(request) { - if (it.succeeded()) { - future.complete(it.result().statusCode()) - } else { - future.completeExceptionally(it.cause()) - } - } - return future.get() - } - - void sendRequestWithCallback(HttpRequest request, Consumer consumer) { - breaker.executeCommand({ command -> - request.rxSend().subscribe({ response -> - command.complete(response) - }, { throwable -> - command.fail(throwable) - }) - }, { - consumer.accept(it) - }) - } - - @Override - void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, HttpClientResult requestResult) { - sendRequestWithCallback(request) { - if (it.succeeded()) { - requestResult.complete(it.result().statusCode()) - } else { - requestResult.complete(it.cause()) - } - } - } - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testHttps() { - false - } - - @Override - boolean testReadTimeout() { - false - } - - @Override - boolean testNonStandardHttpMethod() { - false - } - - @Override - Set> httpAttributes(URI uri) { - def attributes = super.httpAttributes(uri) - attributes.remove(NetworkAttributes.NETWORK_PROTOCOL_VERSION) - attributes.remove(ServerAttributes.SERVER_ADDRESS) - attributes.remove(ServerAttributes.SERVER_PORT) - return attributes - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - return new VertxRxCircuitBreakerSingleConnection(host, port, breaker) - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxWebClientTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxWebClientTest.groovy deleted file mode 100644 index 695687584e85..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/client/VertxRxWebClientTest.groovy +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package client - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.vertx.core.VertxOptions -import io.vertx.core.http.HttpMethod -import io.vertx.ext.web.client.WebClientOptions -import io.vertx.reactivex.core.Vertx -import io.vertx.reactivex.core.buffer.Buffer -import io.vertx.reactivex.ext.web.client.HttpRequest -import io.vertx.reactivex.ext.web.client.HttpResponse -import io.vertx.reactivex.ext.web.client.WebClient -import spock.lang.Shared - -class VertxRxWebClientTest extends HttpClientTest> implements AgentTestTrait { - - @Shared - Vertx vertx = Vertx.vertx(new VertxOptions()) - @Shared - def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) - @Shared - WebClient client = WebClient.create(vertx, clientOptions) - - @Override - HttpRequest buildRequest(String method, URI uri, Map headers) { - def request = client.requestAbs(HttpMethod.valueOf(method), "$uri") - headers.each { request.putHeader(it.key, it.value) } - return request - } - - @Override - int sendRequest(HttpRequest request, String method, URI uri, Map headers) { - return request.rxSend().blockingGet().statusCode() - } - - @Override - void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, HttpClientResult requestResult) { - request.rxSend() - .subscribe(new io.reactivex.functions.Consumer>() { - @Override - void accept(HttpResponse httpResponse) throws Exception { - requestResult.complete(httpResponse.statusCode()) - } - }, new io.reactivex.functions.Consumer() { - @Override - void accept(Throwable throwable) throws Exception { - requestResult.complete(throwable) - } - }) - } - - @Override - Throwable clientSpanError(URI uri, Throwable exception) { - if (exception.class == RuntimeException) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - exception = exception.getCause() - } - } - return exception - } - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testHttps() { - false - } - - @Override - boolean testReadTimeout() { - false - } - - @Override - boolean testNonStandardHttpMethod() { - false - } - - @Override - Set> httpAttributes(URI uri) { - def attributes = super.httpAttributes(uri) - attributes.remove(NetworkAttributes.NETWORK_PROTOCOL_VERSION) - attributes.remove(ServerAttributes.SERVER_ADDRESS) - attributes.remove(ServerAttributes.SERVER_PORT) - return attributes - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - return new VertxRxSingleConnection(host, port) - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy deleted file mode 100644 index d031ceae0dc6..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package server - -import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint -import io.vertx.circuitbreaker.CircuitBreakerOptions -import io.vertx.core.Future -import io.vertx.reactivex.circuitbreaker.CircuitBreaker -import io.vertx.reactivex.core.AbstractVerticle -import io.vertx.reactivex.ext.web.Router - -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest { - - @Override - protected Class verticle() { - return VertxRxCircuitBreakerWebTestServer - } - - static class VertxRxCircuitBreakerWebTestServer extends AbstractVerticle { - - @Override - void start(final Future startFuture) { - int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) - Router router = Router.router(super.@vertx) - CircuitBreaker breaker = - CircuitBreaker.create( - "my-circuit-breaker", - super.@vertx, - new CircuitBreakerOptions() - .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. - ) - - router.route(SUCCESS.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(SUCCESS) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(endpoint.body) - } - }) - } - router.route(INDEXED_CHILD.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(INDEXED_CHILD) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - endpoint.collectSpanAttributes { ctx.request().params().get(it) } - ctx.response().setStatusCode(endpoint.status).end() - } - }) - } - router.route(QUERY_PARAM.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(QUERY_PARAM) - }, { it -> - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(ctx.request().query()) - } - }) - } - router.route(REDIRECT.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(REDIRECT) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).putHeader("location", endpoint.body).end() - } - }) - } - router.route(ERROR.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(ERROR) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(endpoint.body) - } - }) - } - router.route(EXCEPTION.path).handler { ctx -> - breaker.executeCommand({ future -> - future.fail(new Exception(EXCEPTION.body)) - }, { - try { - def cause = it.cause() - controller(EXCEPTION) { - throw cause - } - } catch (Exception ex) { - ctx.response().setStatusCode(EXCEPTION.status).end(ex.message) - } - }) - } - router.route("/path/:id/param").handler { ctx -> - breaker.executeCommand({ future -> - future.complete(PATH_PARAM) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status).end(ctx.request().getParam("id")) - } - }) - } - router.route(CAPTURE_HEADERS.path).handler { ctx -> - breaker.executeCommand({ future -> - future.complete(CAPTURE_HEADERS) - }, { - if (it.failed()) { - throw it.cause() - } - ServerEndpoint endpoint = it.result() - controller(endpoint) { - ctx.response().setStatusCode(endpoint.status) - .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) - .end(endpoint.body) - } - }) - } - - super.@vertx.createHttpServer() - .requestHandler { router.accept(it) } - .listen(port) { startFuture.complete() } - } - } - - @Override - boolean hasExceptionOnServerSpan(ServerEndpoint endpoint) { - return endpoint != EXCEPTION && super.hasExceptionOnServerSpan(endpoint) - } - - @Override - boolean testHttpPipelining() { - false - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxHttpServerTest.groovy b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxHttpServerTest.groovy deleted file mode 100644 index c6f8077c21e7..000000000000 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/groovy/server/VertxRxHttpServerTest.groovy +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package server - -import io.opentelemetry.instrumentation.api.internal.HttpConstants -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpServerTest -import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint -import io.vertx.core.DeploymentOptions -import io.vertx.core.Future -import io.vertx.core.Vertx -import io.vertx.core.VertxOptions -import io.vertx.core.json.JsonObject -import io.vertx.reactivex.core.AbstractVerticle -import io.vertx.reactivex.ext.web.Router - -import java.util.concurrent.CompletableFuture -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class VertxRxHttpServerTest extends HttpServerTest implements AgentTestTrait { - public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port" - - @Override - Vertx startServer(int port) { - Vertx server = Vertx.vertx(new VertxOptions() - // Useful for debugging: - // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) - .setClusterPort(port)) - CompletableFuture future = new CompletableFuture<>() - server.deployVerticle(verticle().getName(), - new DeploymentOptions() - .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) - .setInstances(3)) { res -> - if (!res.succeeded()) { - throw new IllegalStateException("Cannot deploy server Verticle", res.cause()) - } - future.complete(null) - } - - future.get(30, TimeUnit.SECONDS) - return server - } - - @Override - void stopServer(Vertx server) { - server.close() - } - - @Override - boolean testPathParam() { - return true - } - - @Override - boolean verifyServerSpanEndTime() { - // server spans are ended inside of the controller spans - return false - } - - @Override - String expectedHttpRoute(ServerEndpoint endpoint, String method) { - if (method == HttpConstants._OTHER) { - return getContextPath() + endpoint.path - } - return super.expectedHttpRoute(endpoint, method) - } - - protected Class verticle() { - return VertxReactiveWebServer - } - - static class VertxReactiveWebServer extends AbstractVerticle { - - @Override - void start(final Future startFuture) { - int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) - Router router = Router.router(super.@vertx) - - router.route(SUCCESS.path).handler { ctx -> - controller(SUCCESS) { - ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body) - } - } - router.route(INDEXED_CHILD.path).handler { ctx -> - controller(INDEXED_CHILD) { - INDEXED_CHILD.collectSpanAttributes { ctx.request().params().get(it) } - ctx.response().setStatusCode(INDEXED_CHILD.status).end() - } - } - router.route(QUERY_PARAM.path).handler { ctx -> - controller(QUERY_PARAM) { - ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query()) - } - } - router.route(REDIRECT.path).handler { ctx -> - controller(REDIRECT) { - ctx.response().setStatusCode(REDIRECT.status).putHeader("location", REDIRECT.body).end() - } - } - router.route(ERROR.path).handler { ctx -> - controller(ERROR) { - ctx.response().setStatusCode(ERROR.status).end(ERROR.body) - } - } - router.route(EXCEPTION.path).handler { ctx -> - controller(EXCEPTION) { - throw new Exception(EXCEPTION.body) - } - } - router.route("/path/:id/param").handler { ctx -> - controller(PATH_PARAM) { - ctx.response().setStatusCode(PATH_PARAM.status).end(ctx.request().getParam("id")) - } - } - router.route(CAPTURE_HEADERS.path).handler { ctx -> - controller(CAPTURE_HEADERS) { - ctx.response().setStatusCode(CAPTURE_HEADERS.status) - .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) - .end(CAPTURE_HEADERS.body) - } - } - - super.@vertx.createHttpServer() - .requestHandler { router.accept(it) } - .listen(port) { startFuture.complete() } - } - } -} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java similarity index 89% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxCircuitBreakerSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java index 005ef2cf46b6..f0ca03669631 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/client/VertxRxCircuitBreakerSingleConnection.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package client; +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; import io.vertx.core.AsyncResult; import io.vertx.reactivex.circuitbreaker.CircuitBreaker; @@ -12,7 +12,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection { +class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection { private final CircuitBreaker breaker; public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) { diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java new file mode 100644 index 000000000000..60bb28c86b5e --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java @@ -0,0 +1,129 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.AsyncResult; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxCircuitBreakerWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final Vertx vertx = Vertx.vertx(new VertxOptions()); + private final WebClient httpClient = buildClient(vertx); + private final CircuitBreaker breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + new CircuitBreakerOptions() + .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. + ); + + private static WebClient buildClient(Vertx vertx) { + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) + throws ExecutionException, InterruptedException { + // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through + // a callback. + CompletableFuture future = new CompletableFuture<>(); + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + future.complete(result.result().statusCode()); + } else { + future.completeExceptionally(result.cause()); + } + }); + + return future.get(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void sendRequestWithCallback( + HttpRequest request, Consumer>> consumer) { + breaker.executeCommand( + command -> request.rxSend().subscribe(command::complete, command::fail), consumer::accept); + } + + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + requestResult.complete(result.result().statusCode()); + } else { + requestResult.complete(result.cause()); + } + }); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.disableTestNonStandardHttpMethod(); + optionsBuilder.setHttpAttributes(VertxRxCircuitBreakerWebClientTest::getHttpAttributes); + optionsBuilder.setSingleConnectionFactory( + (host, port) -> new VertxRxCircuitBreakerSingleConnection(host, port, breaker)); + } + + private static Set> getHttpAttributes(URI uri) { + Set> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES); + attributes.remove(NETWORK_PROTOCOL_VERSION); + attributes.remove(SERVER_ADDRESS); + attributes.remove(SERVER_PORT); + return attributes; + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java similarity index 93% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java index 8620eb2b18c0..c317b4e8ba0c 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/client/VertxRxSingleConnection.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package client; +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection; import io.vertx.core.VertxOptions; @@ -17,7 +17,7 @@ import java.util.Map; import java.util.Objects; -public class VertxRxSingleConnection implements SingleConnection { +class VertxRxSingleConnection implements SingleConnection { private final WebClient webClient; private final String host; private final int port; diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java new file mode 100644 index 000000000000..84ea81b89be5 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java @@ -0,0 +1,108 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.reactivex.functions.Consumer; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final WebClient httpClient = buildClient(); + + private static WebClient buildClient() { + Vertx vertx = Vertx.vertx(new VertxOptions()); + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) { + return request.rxSend().blockingGet().statusCode(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + request + .rxSend() + .subscribe( + (Consumer>) + httpResponse -> requestResult.complete(httpResponse.statusCode()), + requestResult::complete); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.disableTestNonStandardHttpMethod(); + optionsBuilder.setHttpAttributes(VertxRxWebClientTest::getHttpAttributes); + optionsBuilder.setClientSpanErrorMapper(VertxRxWebClientTest::clientSpanError); + optionsBuilder.setSingleConnectionFactory(VertxRxSingleConnection::new); + } + + private static Set> getHttpAttributes(URI uri) { + Set> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES); + attributes.remove(NETWORK_PROTOCOL_VERSION); + attributes.remove(SERVER_ADDRESS); + attributes.remove(SERVER_PORT); + return attributes; + } + + private static Throwable clientSpanError(URI uri, Throwable exception) { + if (exception.getClass() == RuntimeException.class) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + exception = exception.getCause(); + break; + default: + break; + } + } + return exception; + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java new file mode 100644 index 000000000000..b2b03f85bbfb --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java @@ -0,0 +1,108 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; + +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; + +abstract class AbstractVertxRxVerticle extends AbstractVerticle { + + abstract void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action); + + void configure(Router router) { + router + .route(SUCCESS.getPath()) + .handler( + ctx -> + handle( + ctx, + SUCCESS, + () -> + ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody()))); + + router + .route(INDEXED_CHILD.getPath()) + .handler( + ctx -> + handle( + ctx, + INDEXED_CHILD, + () -> { + INDEXED_CHILD.collectSpanAttributes( + parameter -> ctx.request().params().get(parameter)); + ctx.response().setStatusCode(INDEXED_CHILD.getStatus()).end(); + })); + + router + .route(QUERY_PARAM.getPath()) + .handler( + ctx -> + handle( + ctx, + QUERY_PARAM, + () -> + ctx.response() + .setStatusCode(QUERY_PARAM.getStatus()) + .end(ctx.request().query()))); + + router + .route(REDIRECT.getPath()) + .handler( + ctx -> + handle( + ctx, + REDIRECT, + () -> + ctx.response() + .setStatusCode(REDIRECT.getStatus()) + .putHeader("location", REDIRECT.getBody()) + .end())); + + router + .route(ERROR.getPath()) + .handler( + ctx -> + handle( + ctx, + ERROR, + () -> ctx.response().setStatusCode(ERROR.getStatus()).end(ERROR.getBody()))); + + router + .route("/path/:id/param") + .handler( + ctx -> + handle( + ctx, + PATH_PARAM, + () -> + ctx.response() + .setStatusCode(PATH_PARAM.getStatus()) + .end(ctx.request().getParam("id")))); + + router + .route(CAPTURE_HEADERS.getPath()) + .handler( + ctx -> + handle( + ctx, + CAPTURE_HEADERS, + () -> + ctx.response() + .setStatusCode(CAPTURE_HEADERS.getStatus()) + .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) + .end(CAPTURE_HEADERS.getBody()))); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java new file mode 100644 index 000000000000..441120ded812 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java @@ -0,0 +1,246 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv; +import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ClientAttributes.CLIENT_ADDRESS; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_ROUTE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_ADDRESS; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_PORT; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_PATH; +import static io.opentelemetry.semconv.UrlAttributes.URL_QUERY; +import static io.opentelemetry.semconv.UrlAttributes.URL_SCHEME; +import static io.opentelemetry.semconv.UserAgentAttributes.USER_AGENT_ORIGINAL; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_CONNECTION_STRING; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_OPERATION; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SQL_TABLE; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_STATEMENT; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.testing.internal.armeria.client.WebClient; +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse; +import io.opentelemetry.testing.internal.armeria.common.HttpRequest; +import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder; +import io.vertx.reactivex.core.Vertx; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxReactivePropagationTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + private static WebClient client; + private static int port; + private static Vertx server; + + @BeforeAll + static void setUp() throws ExecutionException, InterruptedException, TimeoutException { + port = PortUtils.findOpenPort(); + server = VertxReactiveWebServer.start(port); + client = WebClient.of("h1c://localhost:" + port); + } + + @AfterAll + static void cleanUp() { + server.close(); + } + + // Verifies that context is correctly propagated and sql query span has correct parent. + // Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void contextPropagation() { + AggregatedHttpResponse response = client.get("/listProducts").aggregate().join(); + assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, "/listProducts"), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts")), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("SELECT test.products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(DB_SYSTEM, "hsqldb"), + equalTo(maybeStable(DB_NAME), "test"), + equalTo(DB_USER, emitStableDatabaseSemconv() ? null : "SA"), + equalTo( + DB_CONNECTION_STRING, + emitStableDatabaseSemconv() ? null : "hsqldb:mem:"), + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id, name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products")))); + } + + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void highConcurrency() { + int count = 100; + String baseUrl = "/listProducts"; + CountDownLatch latch = new CountDownLatch(1); + + ExecutorService pool = Executors.newFixedThreadPool(8); + cleanup.deferCleanup(pool::shutdownNow); + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + TextMapSetter setter = + (carrier, name, value) -> carrier.header(name, value); + + for (int i = 0; i < count; i++) { + int index = i; + pool.submit( + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + }); + } + + latch.countDown(); + + List> assertions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + assertions.add( + trace -> { + long requestId = + Long.parseLong(trace.getSpan(0).getName().substring("client ".length())); + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("client " + requestId) + .hasKind(SpanKind.INTERNAL) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, baseUrl), + equalTo(URL_QUERY, TEST_REQUEST_ID_PARAMETER + "=" + requestId), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts"), + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("SELECT test.products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(3)) + .hasAttributesSatisfyingExactly( + equalTo(DB_SYSTEM, "hsqldb"), + equalTo(maybeStable(DB_NAME), "test"), + equalTo(DB_USER, emitStableDatabaseSemconv() ? null : "SA"), + equalTo( + DB_CONNECTION_STRING, + emitStableDatabaseSemconv() ? null : "hsqldb:mem:"), + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id AS request" + + requestId + + ", name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products"))); + }); + } + testing.waitAndAssertTraces(assertions); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/VertxReactiveWebServer.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java similarity index 98% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/VertxReactiveWebServer.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java index 1a8124699053..66f8334bad61 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/VertxReactiveWebServer.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; import io.opentelemetry.api.GlobalOpenTelemetry; diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java new file mode 100644 index 000000000000..ee2b7ed1fc3a --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.Future; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; + +class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest { + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestHttpPipelining(false); + options.setHasExceptionOnServerSpan(endpoint -> endpoint != EXCEPTION); + } + + @Override + protected Class verticle() { + return VertxRxCircuitBreakerWebTestServer.class; + } + + public static class VertxRxCircuitBreakerWebTestServer extends AbstractVertxRxVerticle { + CircuitBreaker breaker; + + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + breaker.executeCommand( + future -> future.complete(endpoint), + result -> { + if (result.failed()) { + throw new IllegalStateException(result.cause()); + } + controller(endpoint, action::run); + }); + } + + @Override + public void start(Future startFuture) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + // Disable the timeout otherwise it makes each test take this long. + new CircuitBreakerOptions().setTimeout(-1)); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + breaker.executeCommand( + future -> future.fail(new IllegalStateException(EXCEPTION.getBody())), + result -> { + try { + Throwable cause = result.cause(); + controller( + EXCEPTION, + () -> { + throw cause; + }); + } catch (Throwable throwable) { + ctx.response() + .setStatusCode(EXCEPTION.getStatus()) + .end(throwable.getMessage()); + } + })); + + vertx + .createHttpServer() + .requestHandler(router::accept) + .listen(port, httpServerAsyncResult -> startFuture.complete()); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java new file mode 100644 index 000000000000..287065c300dc --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.api.internal.HttpConstants; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxHttpServerTest extends AbstractHttpServerTest { + static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; + + @RegisterExtension + static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent(); + + @Override + protected Vertx setupServer() throws Exception { + Vertx server = + Vertx.vertx( + new VertxOptions() + // Useful for debugging: + // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) + .setClusterPort(port)); + CompletableFuture future = new CompletableFuture<>(); + server.deployVerticle( + verticle().getName(), + new DeploymentOptions() + .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) + .setInstances(3), + result -> { + if (!result.succeeded()) { + throw new IllegalStateException("Cannot deploy server Verticle", result.cause()); + } + future.complete(null); + }); + + future.get(30, TimeUnit.SECONDS); + return server; + } + + @Override + protected void stopServer(Vertx vertx) { + vertx.close(); + } + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestPathParam(true); + // server spans are ended inside the controller spans + options.setVerifyServerSpanEndTime(false); + options.setExpectedHttpRoute( + (endpoint, method) -> { + if (HttpConstants._OTHER.equals(method)) { + return getContextPath() + endpoint.getPath(); + } + return expectedHttpRoute(endpoint, method); + }); + } + + protected Class verticle() { + return VertxReactiveWebServer.class; + } + + public static class VertxReactiveWebServer extends AbstractVertxRxVerticle { + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + controller(endpoint, action::run); + } + + @Override + public void start(Future startFuture) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + handle( + ctx, + EXCEPTION, + () -> { + throw new IllegalStateException(EXCEPTION.getBody()); + })); + + vertx + .createHttpServer() + .requestHandler(router::accept) + .listen(port, httpServerAsyncResult -> startFuture.complete()); + } + } +}