From e8524639843a0f0a9ae96de0ca2955168a1c9c5b Mon Sep 17 00:00:00 2001 From: Lionell Pack Date: Thu, 1 Feb 2024 16:57:17 +1100 Subject: [PATCH 1/2] Add timeout to delta chunk send operation. Add a metric for how long this operation takes so we can reduce the timeout once we have more data. --- .../optout/partner/OptOutPartnerEndpoint.java | 111 ++++++++++-------- .../uid2/optout/web/RetryingWebClient.java | 9 +- .../optout/web/RetryingWebClientTest.java | 46 ++++++++ 3 files changed, 115 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java index c669ab7..7e798bc 100644 --- a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java +++ b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java @@ -5,9 +5,11 @@ import com.uid2.shared.Utils; import com.uid2.shared.optout.OptOutEntry; import com.uid2.shared.optout.OptOutUtils; +import io.micrometer.core.instrument.Metrics; import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.micrometer.core.instrument.Timer; import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +18,7 @@ import java.net.URISyntaxException; import java.net.http.HttpRequest; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { @@ -30,9 +33,14 @@ public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { private final EndpointConfig config; private final RetryingWebClient retryingClient; + private final Timer timer; public OptOutPartnerEndpoint(Vertx vertx, EndpointConfig config) { this.config = config; + this.timer = Timer.builder("uid2.optout.deltasend_successfulchunktime_ms") + .description("Timer for each HTTP connection that successfully transfers part of a delta to a partner") + .tag("remote_partner", this.name()) + .register(Metrics.globalRegistry); this.retryingClient = new RetryingWebClient(vertx, config.url(), config.method(), config.retryCount(), config.retryBackoffMs()); } @@ -43,58 +51,61 @@ public String name() { @Override public Future send(OptOutEntry entry) { + long startTimeMs = System.currentTimeMillis(); return this.retryingClient.send( - (URI uri, HttpMethod method) -> { - URIBuilder uriBuilder = new URIBuilder(uri); - - for (String queryParam : config.queryParams()) { - int indexOfEqualSign = queryParam.indexOf('='); - String paramName = queryParam.substring(0, indexOfEqualSign); - String paramValue = queryParam.substring(indexOfEqualSign + 1); - String replacedValue = replaceValueReferences(entry, paramValue); - - uriBuilder.addParameter(paramName, replacedValue); - } - - URI uriWithParams; - try { - uriWithParams = uriBuilder.build(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - - HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(uriWithParams) - .method(method.toString(), HttpRequest.BodyPublishers.noBody()); - - for (String additionalHeader : this.config.additionalHeaders()) { - int indexOfColonSign = additionalHeader.indexOf(':'); - String headerName = additionalHeader.substring(0, indexOfColonSign); - String headerValue = additionalHeader.substring(indexOfColonSign + 1); - String replacedValue = replaceValueReferences(entry, headerValue); - builder.header(headerName, replacedValue); - } - - LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - - return builder.build(); - }, - resp -> { - if (resp == null) { - throw new RuntimeException("response is null"); - } - - if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { - return true; - } - - LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) { - return false; - } else { - throw new UnexpectedStatusCodeException(resp.statusCode()); + (URI uri, HttpMethod method) -> { + URIBuilder uriBuilder = new URIBuilder(uri); + + for (String queryParam : config.queryParams()) { + int indexOfEqualSign = queryParam.indexOf('='); + String paramName = queryParam.substring(0, indexOfEqualSign); + String paramValue = queryParam.substring(indexOfEqualSign + 1); + String replacedValue = replaceValueReferences(entry, paramValue); + + uriBuilder.addParameter(paramName, replacedValue); + } + + URI uriWithParams; + try { + uriWithParams = uriBuilder.build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(uriWithParams) + .method(method.toString(), HttpRequest.BodyPublishers.noBody()); + + for (String additionalHeader : this.config.additionalHeaders()) { + int indexOfColonSign = additionalHeader.indexOf(':'); + String headerName = additionalHeader.substring(0, indexOfColonSign); + String headerValue = additionalHeader.substring(indexOfColonSign + 1); + String replacedValue = replaceValueReferences(entry, headerValue); + builder.header(headerName, replacedValue); + } + + LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); + + return builder.build(); + }, + resp -> { + if (resp == null) { + throw new RuntimeException("response is null"); + } + + if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { + long finishTimeMs = System.currentTimeMillis(); + timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS); + return true; + } + + LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); + if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) { + return false; + } else { + throw new UnexpectedStatusCodeException(resp.statusCode()); + } } - } ); } diff --git a/src/main/java/com/uid2/optout/web/RetryingWebClient.java b/src/main/java/com/uid2/optout/web/RetryingWebClient.java index b69a3cf..d7dba0c 100644 --- a/src/main/java/com/uid2/optout/web/RetryingWebClient.java +++ b/src/main/java/com/uid2/optout/web/RetryingWebClient.java @@ -12,6 +12,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.BiFunction; @@ -19,15 +20,20 @@ public class RetryingWebClient { private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class); private final URI uri; private final HttpMethod method; + private final long resultTimeoutMs; private final int retryCount; private final int retryBackoffMs; private final HttpClient httpClient; private Vertx vertx; public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) { + this(vertx, uri, method, retryCount, retryBackoffMs, 5*60*1000); + } + public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs, long resultTimeoutMs) { this.vertx = vertx; this.uri = URI.create(uri); this.method = method; + this.resultTimeoutMs = resultTimeoutMs; this.httpClient = HttpClient.newHttpClient(); this.retryCount = retryCount; @@ -42,7 +48,8 @@ public Future send(BiFunction requestCreator Promise promise = Promise.promise(); HttpRequest request = requestCreator.apply(this.uri, this.method); - CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); + CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .orTimeout(this.resultTimeoutMs, TimeUnit.MILLISECONDS); asyncResponse.thenAccept(response -> { try { diff --git a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java index 94c6695..aa87619 100644 --- a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java +++ b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java @@ -1,7 +1,11 @@ package com.uid2.optout.web; import io.netty.handler.codec.http.HttpMethod; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.After; @@ -11,8 +15,11 @@ import java.net.URI; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; @RunWith(VertxUnitRunner.class) public class RetryingWebClientTest { @@ -34,6 +41,13 @@ public void setup(TestContext ctx) { // pick a random code and respond with it int statusCode = Integer.valueOf(statusCodes[rand.nextInt(statusCodes.length)]); req.response().setStatusCode(statusCode).end(); + } else if (subPath.startsWith("delayed")) { + vertx.setTimer(1000, id -> { + try { + req.response().setStatusCode(200).end(); + } + catch (Exception ex) {} + }); } else { int statusCode = Integer.valueOf(subPath); req.response().setStatusCode(statusCode).end(); @@ -175,4 +189,36 @@ private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMeth })); } } + + public Function assertStatusCodeFactory(TestContext ctx, int code) { + return result -> { + ctx.assertEquals(code, result.statusCode()); + return code == result.statusCode(); + }; + } + public Handler> ensureAsyncExceptionFactory(TestContext ctx, Class exceptionClass) { + return ctx.asyncAssertFailure(cause -> { + ctx.assertTrue(cause.getClass() == exceptionClass, "Expected a " + exceptionClass.toString() + " but got a " + cause); + }); + } + + @Test + public void longRequest_longerTimeout_expectSuccess(TestContext ctx) { + testDelayedResponse(ctx, assertStatusCodeFactory(ctx, 200), 1500) + .onComplete(ctx.asyncAssertSuccess()); + } + + @Test + public void longRequest_shorterTimeout_expectFailure(TestContext ctx) { + testDelayedResponse(ctx, req -> true, 500) + .onComplete(ensureAsyncExceptionFactory(ctx, TimeoutException.class)); + } + + private Future testDelayedResponse(TestContext ctx, Function assertion, int resultTimeoutMs) { + Async async = ctx.async(); + + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/delayed", HttpMethod.GET, 0, 0, resultTimeoutMs); + return c.send((URI uri, HttpMethod method) -> HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(), assertion) + .andThen(r -> async.complete()); + } } From 32659583c432453bc44b7b2f45e2a1b29a56b929 Mon Sep 17 00:00:00 2001 From: Lionell Pack Date: Thu, 1 Feb 2024 16:59:30 +1100 Subject: [PATCH 2/2] Make PR easier to read with a whitespace tweak. --- .../optout/partner/OptOutPartnerEndpoint.java | 104 +++++++++--------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java index 7e798bc..17f5eef 100644 --- a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java +++ b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java @@ -53,59 +53,59 @@ public String name() { public Future send(OptOutEntry entry) { long startTimeMs = System.currentTimeMillis(); return this.retryingClient.send( - (URI uri, HttpMethod method) -> { - URIBuilder uriBuilder = new URIBuilder(uri); - - for (String queryParam : config.queryParams()) { - int indexOfEqualSign = queryParam.indexOf('='); - String paramName = queryParam.substring(0, indexOfEqualSign); - String paramValue = queryParam.substring(indexOfEqualSign + 1); - String replacedValue = replaceValueReferences(entry, paramValue); - - uriBuilder.addParameter(paramName, replacedValue); - } - - URI uriWithParams; - try { - uriWithParams = uriBuilder.build(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - - HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(uriWithParams) - .method(method.toString(), HttpRequest.BodyPublishers.noBody()); - - for (String additionalHeader : this.config.additionalHeaders()) { - int indexOfColonSign = additionalHeader.indexOf(':'); - String headerName = additionalHeader.substring(0, indexOfColonSign); - String headerValue = additionalHeader.substring(indexOfColonSign + 1); - String replacedValue = replaceValueReferences(entry, headerValue); - builder.header(headerName, replacedValue); - } - - LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - - return builder.build(); - }, - resp -> { - if (resp == null) { - throw new RuntimeException("response is null"); - } - - if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { - long finishTimeMs = System.currentTimeMillis(); - timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS); - return true; - } - - LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) { - return false; - } else { - throw new UnexpectedStatusCodeException(resp.statusCode()); - } + (URI uri, HttpMethod method) -> { + URIBuilder uriBuilder = new URIBuilder(uri); + + for (String queryParam : config.queryParams()) { + int indexOfEqualSign = queryParam.indexOf('='); + String paramName = queryParam.substring(0, indexOfEqualSign); + String paramValue = queryParam.substring(indexOfEqualSign + 1); + String replacedValue = replaceValueReferences(entry, paramValue); + + uriBuilder.addParameter(paramName, replacedValue); + } + + URI uriWithParams; + try { + uriWithParams = uriBuilder.build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(uriWithParams) + .method(method.toString(), HttpRequest.BodyPublishers.noBody()); + + for (String additionalHeader : this.config.additionalHeaders()) { + int indexOfColonSign = additionalHeader.indexOf(':'); + String headerName = additionalHeader.substring(0, indexOfColonSign); + String headerValue = additionalHeader.substring(indexOfColonSign + 1); + String replacedValue = replaceValueReferences(entry, headerValue); + builder.header(headerName, replacedValue); + } + + LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); + + return builder.build(); + }, + resp -> { + if (resp == null) { + throw new RuntimeException("response is null"); + } + + if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { + long finishTimeMs = System.currentTimeMillis(); + timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS); + return true; + } + + LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); + if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) { + return false; + } else { + throw new UnexpectedStatusCodeException(resp.statusCode()); } + } ); }