diff --git a/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java b/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java deleted file mode 100644 index daaad9a97..000000000 --- a/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java +++ /dev/null @@ -1,30 +0,0 @@ -package tech.ydb.common.retry; - -/** - * Recipes should use the configured error policy to decide how to retry - * errors like unsuccessful {@link tech.ydb.core.StatusCode}. - * - * @author Aleksandr Gorshenin - * @param Type of errors to check - */ -public interface ErrorPolicy { - - /** - * Returns true if the given value should be retried - * - * @param value value to check - * @return true if value is retryable - */ - boolean isRetryable(T value); - - /** - * Returns true if the given exception should be retried - * Usually exceptions are never retried, but some policies can implement more difficult logic - * - * @param ex exception to check - * @return true if exception is retryable - */ - default boolean isRetryable(Exception ex) { - return false; - } -} diff --git a/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java b/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java index e6d14ba47..392783efd 100644 --- a/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java +++ b/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java @@ -7,11 +7,11 @@ * * @author Aleksandr Gorshenin */ -public abstract class ExponentialBackoffRetry implements RetryPolicy { +public class ExponentialBackoffRetry implements RetryPolicy { private final long backoffMs; private final int backoffCeiling; - protected ExponentialBackoffRetry(long backoffMs, int backoffCeiling) { + public ExponentialBackoffRetry(long backoffMs, int backoffCeiling) { this.backoffMs = backoffMs; this.backoffCeiling = backoffCeiling; } @@ -22,6 +22,11 @@ protected long backoffTimeMillis(int retryNumber) { return delay + ThreadLocalRandom.current().nextLong(delay); } + @Override + public long nextRetryMs(int retryCount, long elapsedTimeMs) { + return backoffTimeMillis(retryCount); + } + /** * Return current base of backoff delays * @return backoff base duration in milliseconds @@ -37,4 +42,5 @@ public long getBackoffMillis() { public int getBackoffCeiling() { return backoffCeiling; } + } diff --git a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java new file mode 100644 index 000000000..bb80fc8c0 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java @@ -0,0 +1,104 @@ +package tech.ydb.common.retry; + + +import tech.ydb.core.Status; +import tech.ydb.core.UnexpectedResultException; + +/** + * Recipes should use the retry configuration to decide how to retry + * errors like unsuccessful {@link tech.ydb.core.Status}. + * + * @author Aleksandr Gorshenin + */ +@FunctionalInterface +public interface RetryConfig { + + /** + * Returns retry policy for the given {@link Status} and {@code null} if that status is not retryable + * + * @param status status to check + * @return policy of retries or {@code null} if the status is not retryable + */ + RetryPolicy getStatusRetryPolicy(Status status); + + /** + * Returns retry policy for the given exception and {@code null} if that exception is not retryable + * + * @param th exception to check + * @return policy of retries or {@code null} if the exception is not retryable + */ + default RetryPolicy getThrowableRetryPolicy(Throwable th) { + for (Throwable ex = th; ex != null; ex = ex.getCause()) { + if (ex instanceof UnexpectedResultException) { + return getStatusRetryPolicy(((UnexpectedResultException) ex).getStatus()); + } + } + return null; + } + + /** + * Infinity retries with default exponential delay.
This policy does not retries conditionally + * retryable errors so it can be used for both as idempotent and non idempotent operations + * + * @return retry configuration object + */ + static RetryConfig retryForever() { + return newConfig().retryForever(); + } + + /** + * Retries until the specified elapsed milliseconds expire.
This policy does not retries + * conditionally retryable errors so it can be used for both as idempotent and non idempotent operations + * @param maxElapsedMs maximum timeout for retries + * @return retry configuration object + */ + static RetryConfig retryUntilElapsed(long maxElapsedMs) { + return newConfig().retryUntilElapsed(maxElapsedMs); + } + + /** + * Infinity retries with default exponential delay.
This policy does retries conditionally + * retryable errors so it can be used ONLY for idempotent operations + * @return retry configuration object + */ + static RetryConfig idempotentRetryForever() { + return newConfig().retryConditionallyRetryableErrors(true).retryForever(); + } + + /** + * Retries until the specified elapsed milliseconds expire.
This policy does retries + * conditionally retryable errors so it can be used ONLY for idempotent operations + * @param maxElapsedMs maximum timeout for retries + * @return retry configuration object + */ + static RetryConfig idempotentRetryUntilElapsed(long maxElapsedMs) { + return newConfig().retryConditionallyRetryableErrors(true).retryUntilElapsed(maxElapsedMs); + } + + /** + * Disabled retries configuration. Any error is considered as non retryable + * @return retry configuration object + */ + static RetryConfig noRetries() { + return (Status status) -> null; + } + + /** + * Create a new custom configuration of retries + * @return retry configuration builder + */ + static Builder newConfig() { + return new YdbRetryBuilder(); + } + + interface Builder { + Builder retryConditionallyRetryableErrors(boolean retry); + Builder retryNotFound(boolean retry); + Builder withSlowBackoff(long backoff, int ceiling); + Builder withFastBackoff(long backoff, int ceiling); + + RetryConfig retryForever(); + RetryConfig retryNTimes(int maxRetries); + RetryConfig retryUntilElapsed(long maxElapsedMs); + } +} diff --git a/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java b/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java index 83a3010e2..9095482a2 100644 --- a/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java +++ b/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java @@ -5,6 +5,7 @@ * * @author Aleksandr Gorshenin */ +@FunctionalInterface public interface RetryPolicy { /** * Called when an operation is failed for some reason to determine if it should be retried. diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java new file mode 100644 index 000000000..dffedf382 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java @@ -0,0 +1,69 @@ +package tech.ydb.common.retry; + +/** + * + * @author Aleksandr Gorshenin + */ +class YdbRetryBuilder implements RetryConfig.Builder { + private boolean idempotent = false; + private boolean retryNotFound = false; + + private long fastBackoff = 5; + private int fastCeiling = 10; + + private long slowBackoff = 500; + private int slowCeiling = 6; + + @Override + public YdbRetryBuilder retryConditionallyRetryableErrors(boolean retry) { + this.idempotent = retry; + return this; + } + + @Override + public YdbRetryBuilder retryNotFound(boolean retry) { + this.retryNotFound = retry; + return this; + } + + @Override + public YdbRetryBuilder withSlowBackoff(long backoff, int ceiling) { + this.slowBackoff = backoff; + this.slowCeiling = ceiling; + return this; + } + + @Override + public YdbRetryBuilder withFastBackoff(long backoff, int ceiling) { + this.fastBackoff = backoff; + this.fastCeiling = ceiling; + return this; + } + + @Override + public RetryConfig retryForever() { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> 0, + new ExponentialBackoffRetry(fastBackoff, fastCeiling), + new ExponentialBackoffRetry(slowBackoff, slowCeiling) + ); + } + + @Override + public RetryConfig retryNTimes(int maxRetries) { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> retryCount >= maxRetries ? -1 : 0, + new RetryNTimes(maxRetries, fastBackoff, fastCeiling), + new RetryNTimes(maxRetries, slowBackoff, slowCeiling) + ); + } + + @Override + public RetryConfig retryUntilElapsed(long maxElapsedMs) { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> elapsedTimeMs > maxElapsedMs ? -1 : 0, + new RetryUntilElapsed(maxElapsedMs, fastBackoff, fastCeiling), + new RetryUntilElapsed(maxElapsedMs, slowBackoff, slowCeiling) + ); + } +} diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java new file mode 100644 index 000000000..754a2e32b --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java @@ -0,0 +1,62 @@ +package tech.ydb.common.retry; + +import tech.ydb.core.Status; + +/** + * + * @author Aleksandr Gorshenin + */ +class YdbRetryConfig implements RetryConfig { + private final boolean retryConditionally; + private final boolean retryNotFound; + private final RetryPolicy immediatelly; + private final RetryPolicy fast; + private final RetryPolicy slow; + + YdbRetryConfig(boolean conditionally, boolean notFound, RetryPolicy instant, RetryPolicy fast, RetryPolicy slow) { + this.retryConditionally = conditionally; + this.retryNotFound = notFound; + this.immediatelly = instant; + this.fast = fast; + this.slow = slow; + } + + @Override + public RetryPolicy getStatusRetryPolicy(Status status) { + if (status == null) { + return null; + } + + switch (status.getCode()) { + // Instant retry + case BAD_SESSION: + case SESSION_BUSY: + return immediatelly; + + // Fast backoff + case ABORTED: + case UNDETERMINED: + return fast; + + // Slow backoff + case OVERLOADED: + case CLIENT_RESOURCE_EXHAUSTED: + return slow; + + // Conditionally retryable statuses + case CLIENT_CANCELLED: + case CLIENT_INTERNAL_ERROR: + case TRANSPORT_UNAVAILABLE: + case UNAVAILABLE: + return retryConditionally ? fast : null; + + // Not found has special flag for retries + case NOT_FOUND: + return retryNotFound ? fast : null; + + // All other codes are not retryable + default: + return null; + } + } +} diff --git a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java new file mode 100644 index 000000000..93774eef6 --- /dev/null +++ b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java @@ -0,0 +1,198 @@ +package tech.ydb.common.retry; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; + +/** + * + * @author Aleksandr Gorshenin + */ +public class RetryConfigTest { + private void assertDuration(long from, long to, long ms) { + Assert.assertTrue("time " + ms + " must be great than " + from, from <= ms); + Assert.assertTrue("time " + ms + " must be lower than " + to, to >= ms); + } + + @Test + public void nullStatusesTest() { + RetryConfig config = RetryConfig.retryForever(); + + Assert.assertNull(config.getThrowableRetryPolicy(null)); + Assert.assertNull(config.getStatusRetryPolicy(null)); + } + + @Test + public void throwableRetriesTest() { + RetryConfig config = RetryConfig.retryUntilElapsed(1000); + + Assert.assertNull(config.getThrowableRetryPolicy(new RuntimeException("test message"))); + Assert.assertNull(config.getThrowableRetryPolicy(new Exception("1", new RuntimeException("2")))); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND)); + + Assert.assertEquals(immediatelly, config.getThrowableRetryPolicy( + new UnexpectedResultException("base", Status.of(StatusCode.BAD_SESSION))) + ); + Assert.assertEquals(immediatelly, config.getThrowableRetryPolicy(new Exception("base", + new UnexpectedResultException("cause", Status.of(StatusCode.SESSION_BUSY))) + )); + Assert.assertEquals(fast, config.getThrowableRetryPolicy(new Exception("base", + new UnexpectedResultException("cause", Status.of(StatusCode.NOT_FOUND))) + )); + } + + @Test + public void noRetryPolicyTest() { + RetryConfig config = RetryConfig.noRetries(); + // unretrayable + for (StatusCode code: StatusCode.values()) { + Assert.assertNull(config.getStatusRetryPolicy(Status.of(code))); + } + } + + @Test + public void nonIdempotentRetryPolicyTest() { + RetryConfig config = RetryConfig.retryForever(); + + // unretrayable + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.SCHEME_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.ALREADY_EXISTS))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAUTHORIZED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAVAILABLE))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.TRANSPORT_UNAVAILABLE))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_CANCELLED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_INTERNAL_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + Assert.assertNotNull(immediatelly); + Assert.assertEquals(immediatelly, config.getStatusRetryPolicy(Status.of(StatusCode.SESSION_BUSY))); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + Assert.assertNotNull(fast); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNDETERMINED))); + + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); + Assert.assertNotNull(slow); + Assert.assertEquals(slow, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED))); + } + + @Test + public void idempotentRetryPolicyTest() { + RetryConfig config = RetryConfig.idempotentRetryForever(); + + // unretrayable + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.SCHEME_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.ALREADY_EXISTS))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAUTHORIZED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + Assert.assertNotNull(immediatelly); + Assert.assertEquals(immediatelly, config.getStatusRetryPolicy(Status.of(StatusCode.SESSION_BUSY))); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + Assert.assertNotNull(fast); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNDETERMINED))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNAVAILABLE))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.TRANSPORT_UNAVAILABLE))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_CANCELLED))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_INTERNAL_ERROR))); + + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); + Assert.assertNotNull(slow); + Assert.assertEquals(slow, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED))); + } + + @Test + public void notFoundRetryPolicyTest() { + RetryConfig config = RetryConfig.newConfig().retryNotFound(true).retryForever(); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); + } + + @Test + public void foreverRetryTest() { + RetryConfig config = RetryConfig.newConfig().withSlowBackoff(100, 5).withFastBackoff(10, 10).retryForever(); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + assertDuration(10, 20, fast.nextRetryMs(0, 0)); + assertDuration(10, 20, fast.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); + assertDuration(100, 200, slow.nextRetryMs(0, 0)); + assertDuration(100, 200, slow.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + + @Test + public void untilElapsedRetryTest() { + RetryConfig config = RetryConfig.idempotentRetryUntilElapsed(5000); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 5000)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5000)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(0, 5001)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5001)); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + assertDuration(5, 10, fast.nextRetryMs(0, 0)); + Assert.assertEquals(3, fast.nextRetryMs(0, 4997)); + Assert.assertEquals(5000, fast.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(1, fast.nextRetryMs(Integer.MAX_VALUE, 4999)); + Assert.assertEquals(-1, fast.nextRetryMs(Integer.MAX_VALUE, 5000)); + + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); + assertDuration(500, 1000, slow.nextRetryMs(0, 0)); + Assert.assertEquals(3, slow.nextRetryMs(0, 4997)); + Assert.assertEquals(5000, slow.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(1, slow.nextRetryMs(Integer.MAX_VALUE, 4999)); + Assert.assertEquals(-1, slow.nextRetryMs(Integer.MAX_VALUE, 5000)); + } + + @Test + public void nTimesRetryTest() { + RetryConfig config = RetryConfig.newConfig().retryNTimes(8); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); + Assert.assertEquals(0, immediatelly.nextRetryMs(7, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(8, 0)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(8, Integer.MAX_VALUE)); + + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + assertDuration(5, 10, fast.nextRetryMs(0, 0)); + assertDuration(5, 10, fast.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, 0)); + assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, fast.nextRetryMs(8, 0)); + Assert.assertEquals(-1, fast.nextRetryMs(8, Integer.MAX_VALUE)); + + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); + assertDuration(500, 1000, slow.nextRetryMs(0, 0)); + assertDuration(500, 1000, slow.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, 0)); + assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, slow.nextRetryMs(8, 0)); + Assert.assertEquals(-1, slow.nextRetryMs(8, Integer.MAX_VALUE)); + } +} diff --git a/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java b/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java index bf5baa0a8..cf127a259 100644 --- a/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java +++ b/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java @@ -1,9 +1,5 @@ package tech.ydb.common.retry; -import tech.ydb.common.retry.RetryNTimes; -import tech.ydb.common.retry.RetryUntilElapsed; -import tech.ydb.common.retry.RetryForever; - import org.junit.Assert; import org.junit.Test; @@ -99,6 +95,22 @@ public void untilElapsedTest() { Assert.assertEquals(-1, policy.nextRetryMs(7, 2500)); } + @Test + public void foreverElapsedTest() { + ExponentialBackoffRetry policy = new ExponentialBackoffRetry(50, 3); + + assertDuration(50, 100, policy.nextRetryMs(0, 0)); + assertDuration(50, 100, policy.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(100, 200, policy.nextRetryMs(1, 75)); + assertDuration(200, 400, policy.nextRetryMs(2, 225)); + assertDuration(400, 800, policy.nextRetryMs(3, 525)); + assertDuration(400, 800, policy.nextRetryMs(4, 1125)); + assertDuration(400, 800, policy.nextRetryMs(5, 1725)); + assertDuration(400, 800, policy.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(400, 800, policy.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + + @Test public void updateElapsedTest() { RetryUntilElapsed policy = new RetryUntilElapsed(2500, 50, 3); diff --git a/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java new file mode 100644 index 000000000..37cd1a1dd --- /dev/null +++ b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java @@ -0,0 +1,46 @@ +package tech.ydb.common.transaction.impl; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.common.transaction.TxMode; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbTransactionImplTest { + private class MockTx extends YdbTransactionImpl { + + public MockTx(TxMode txMode, String txId) { + super(txMode, txId); + } + + @Override + public String getSessionId() { + return "MOCK"; + } + } + + @Test + public void baseTest() { + MockTx tx = new MockTx(TxMode.SNAPSHOT_RO, "test-id"); + + Assert.assertEquals("test-id", tx.getId()); + Assert.assertEquals("MOCK", tx.getSessionId()); + Assert.assertEquals(TxMode.SNAPSHOT_RO, tx.getTxMode()); + Assert.assertTrue(tx.isActive()); + Assert.assertFalse(tx.getStatusFuture().isDone()); + } + + @Test + public void nullTest() { + MockTx tx = new MockTx(TxMode.NONE, null); + + Assert.assertNull(tx.getId()); + Assert.assertEquals("MOCK", tx.getSessionId()); + Assert.assertEquals(TxMode.NONE, tx.getTxMode()); + Assert.assertFalse(tx.isActive()); + Assert.assertFalse(tx.getStatusFuture().isDone()); + } +} diff --git a/topic/pom.xml b/topic/pom.xml index 98652283c..5c6acd117 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -40,14 +40,21 @@ zstd-jni 1.5.2-5 + junit junit test + + org.mockito + mockito-core + test + tech.ydb.test ydb-junit4-support + test org.apache.logging.log4j @@ -55,4 +62,26 @@ test + + + + jdk8-build + + 1.8 + + + + + 4.11.0 + + + + + org.mockito + mockito-inline + ${mockito.version} + + + + diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index d57352869..7cf1e980d 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -4,25 +4,34 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import org.slf4j.Logger; +import tech.ydb.common.retry.ExponentialBackoffRetry; +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; /** * @author Nikolay Perfilov */ public abstract class GrpcStreamRetrier { - // TODO: add retry policy - private static final int MAX_RECONNECT_COUNT = 0; // Inf - private static final int EXP_BACKOFF_BASE_MS = 256; - private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec) - private static final int EXP_BACKOFF_MAX_POWER = 7; + public static final RetryConfig RETRY_ALL = new RetryConfig() { + @Override + public RetryPolicy getStatusRetryPolicy(Status status) { + return RETRY_ALL_POLICY; + } + + @Override + public RetryPolicy getThrowableRetryPolicy(Throwable th) { + return RETRY_ALL_POLICY; + } + }; + + private static final RetryPolicy RETRY_ALL_POLICY = new ExponentialBackoffRetry(256, 7); + private static final int ID_LENGTH = 6; private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" .toCharArray(); @@ -30,18 +39,20 @@ public abstract class GrpcStreamRetrier { protected final String id; protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); protected final AtomicBoolean isStopped = new AtomicBoolean(false); - protected final AtomicInteger reconnectCounter = new AtomicInteger(0); + private final Logger logger; private final ScheduledExecutorService scheduler; - private final BiConsumer errorsHandler; + private final RetryConfig retryConfig; + private volatile int retryCount; + private volatile long retryStartedAt; - protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer errorsHandler) { + protected GrpcStreamRetrier(Logger logger, RetryConfig retryConfig, ScheduledExecutorService scheduler) { + this.logger = logger; + this.retryConfig = retryConfig; this.scheduler = scheduler; this.id = generateRandomId(ID_LENGTH); - this.errorsHandler = errorsHandler; } - protected abstract Logger getLogger(); protected abstract String getStreamName(); protected abstract void onStreamReconnect(); protected abstract void onShutdown(String reason); @@ -54,47 +65,33 @@ protected static String generateRandomId(int length) { .toString(); } - private void tryScheduleReconnect() { - int currentReconnectCounter = reconnectCounter.get() + 1; - if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) { - if (isStopped.compareAndSet(false, true)) { - String errorMessage = "[" + id + "] Maximum retry count (" + MAX_RECONNECT_COUNT - + ") exceeded. Shutting down " + getStreamName(); - getLogger().error(errorMessage); - shutdownImpl(errorMessage); - return; - } else { - getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " + - "shut down.", id, MAX_RECONNECT_COUNT, getStreamName()); - } - } - if (isReconnecting.compareAndSet(false, true)) { - reconnectCounter.set(currentReconnectCounter); - int delayMs = currentReconnectCounter <= EXP_BACKOFF_MAX_POWER - ? EXP_BACKOFF_BASE_MS * (1 << currentReconnectCounter) - : EXP_BACKOFF_CEILING_MS; - // Add jitter - delayMs = delayMs + ThreadLocalRandom.current().nextInt(delayMs); - getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, currentReconnectCounter, - getStreamName(), delayMs); - try { - scheduler.schedule(this::reconnect, delayMs, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException exception) { - String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " + - "Shutting down " + getStreamName(); - getLogger().error(errorMessage); - shutdownImpl(errorMessage); - } - } else { - getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id, + private void tryScheduleReconnect(long delay) { + if (!isReconnecting.compareAndSet(false, true)) { + logger.info("[{}] should reconnect {} stream, but reconnect is already in progress", id, getStreamName()); + return; + } + + logger.warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryCount, getStreamName(), delay); + try { + scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException exception) { + String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " + + "Shutting down " + getStreamName(); + logger.error(errorMessage); + shutdownImpl(errorMessage); } } + protected void resetRetries() { + retryStartedAt = -1; + retryCount = 0; + } + void reconnect() { - getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), reconnectCounter.get()); + logger.info("[{}] {} reconnect #{} started", id, getStreamName(), retryCount); if (!isReconnecting.compareAndSet(true, false)) { - getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen"); + logger.warn("Couldn't reset reconnect flag. Shouldn't happen"); } onStreamReconnect(); } @@ -104,7 +101,7 @@ protected CompletableFuture shutdownImpl() { } protected CompletableFuture shutdownImpl(String reason) { - getLogger().info("[{}] Shutting down {}" + logger.info("[{}] Shutting down {}" + (reason == null || reason.isEmpty() ? "" : " with reason: " + reason), id, getStreamName()); isStopped.set(true); return CompletableFuture.runAsync(() -> { @@ -113,32 +110,53 @@ protected CompletableFuture shutdownImpl(String reason) { } protected void onSessionClosed(Status status, Throwable th) { - getLogger().info("[{}] onSessionClosed called", id); + logger.info("[{}] onSessionClosed called", id); + RetryPolicy retryPolicy; if (th != null) { - getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th); + logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th); + retryPolicy = retryConfig.getThrowableRetryPolicy(th); } else { if (status.isSuccess()) { if (isStopped.get()) { - getLogger().info("[{}] {} stream session closed successfully", id, getStreamName()); + logger.info("[{}] {} stream session closed successfully", id, getStreamName()); return; } else { - getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName(), - getStreamName()); + logger.warn("[{}] {} stream session was closed on working {}", id, getStreamName()); } } else { - getLogger().warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); + logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); } + retryPolicy = retryConfig.getStatusRetryPolicy(status); + } + + if (isStopped.get()) { + logger.info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName()); + return; } - if (errorsHandler != null) { - errorsHandler.accept(status, th); + if (retryPolicy != null) { + if (retryCount < 1) { + retryStartedAt = System.currentTimeMillis(); + } + long delay = retryPolicy.nextRetryMs(retryCount + 1, System.currentTimeMillis() - retryStartedAt); + if (delay >= 0) { + retryCount++; + tryScheduleReconnect(delay); + return; + } } - if (!isStopped.get()) { - tryScheduleReconnect(); - } else { - getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName()); + long elapsedMs = retryStartedAt > 0 ? System.currentTimeMillis() - retryStartedAt : 0; + if (!isStopped.compareAndSet(false, true)) { + logger.warn("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down.", + id, retryCount, elapsedMs, getStreamName()); + return; } + + String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs + + " ms elapsed. Shutting down " + getStreamName(); + logger.error(errorMessage); + shutdownImpl(errorMessage); } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 984134e06..a0694e5c7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { private final String consumerName; public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { - super(topicRpc.getScheduler(), settings.getErrorsHandler()); + super(logger, settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new ReadSessionImpl(); @@ -88,11 +88,6 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { logger.info(message.toString()); } - @Override - protected Logger getLogger() { - return logger; - } - @Override protected String getStreamName() { return "Reader"; @@ -515,7 +510,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { } logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { - reconnectCounter.set(0); + resetRetries(); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index f27b719ed..88843cb65 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -9,7 +9,10 @@ import com.google.common.collect.ImmutableList; +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; +import tech.ydb.topic.impl.GrpcStreamRetrier; /** * @author Nikolay Perfilov @@ -20,17 +23,17 @@ public class ReaderSettings { private final String consumerName; private final String readerName; private final List topics; + private final RetryConfig retryConfig; private final long maxMemoryUsageBytes; private final Executor decompressionExecutor; - private final BiConsumer errorsHandler; private ReaderSettings(Builder builder) { this.consumerName = builder.consumerName; this.readerName = builder.readerName; this.topics = ImmutableList.copyOf(builder.topics); + this.retryConfig = builder.retryConfig; this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; this.decompressionExecutor = builder.decompressionExecutor; - this.errorsHandler = builder.errorsHandler; } public String getConsumerName() { @@ -42,12 +45,17 @@ public String getReaderName() { return readerName; } + public RetryConfig getRetryConfig() { + return retryConfig; + } + public List getTopics() { return topics; } + @Deprecated public BiConsumer getErrorsHandler() { - return errorsHandler; + return null; } public long getMaxMemoryUsageBytes() { @@ -70,9 +78,9 @@ public static class Builder { private boolean readWithoutConsumer = false; private String readerName = null; private List topics = new ArrayList<>(); + private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL; private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; private Executor decompressionExecutor = null; - private BiConsumer errorsHandler = null; public Builder setConsumerName(String consumerName) { this.consumerName = consumerName; @@ -91,6 +99,7 @@ public Builder withoutConsumer() { /** * Set reader name for debug purposes + * @param readerName name of reader * @return settings builder */ public Builder setReaderName(String readerName) { @@ -108,13 +117,42 @@ public Builder setTopics(List topics) { return this; } + /** + * Set {@link RetryConfig} to define behavior of the stream internal retries + * @param config retry mode + * @return settings builder + */ + public Builder setRetryConfig(RetryConfig config) { + this.retryConfig = config; + return this; + } + public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { this.maxMemoryUsageBytes = maxMemoryUsageBytes; return this; } + /** + * @param handler + * @return builder + * @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead + */ + @Deprecated public Builder setErrorsHandler(BiConsumer handler) { - this.errorsHandler = handler; + final RetryConfig currentConfig = retryConfig; + retryConfig = new RetryConfig() { + @Override + public RetryPolicy getStatusRetryPolicy(Status status) { + handler.accept(status, null); + return currentConfig.getStatusRetryPolicy(status); + } + + @Override + public RetryPolicy getThrowableRetryPolicy(Throwable th) { + handler.accept(null, th); + return currentConfig.getThrowableRetryPolicy(th); + } + }; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 7f160c4d6..6a59468d0 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -2,8 +2,11 @@ import java.util.function.BiConsumer; +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.impl.GrpcStreamRetrier; /** * @author Nikolay Perfilov @@ -17,9 +20,9 @@ public class WriterSettings { private final String messageGroupId; private final Long partitionId; private final Codec codec; + private final RetryConfig retryConfig; private final long maxSendBufferMemorySize; private final int maxSendBufferMessagesCount; - private final BiConsumer errorsHandler; private WriterSettings(Builder builder) { this.topicPath = builder.topicPath; @@ -27,9 +30,9 @@ private WriterSettings(Builder builder) { this.messageGroupId = builder.messageGroupId; this.partitionId = builder.partitionId; this.codec = builder.codec; + this.retryConfig = builder.retryConfig; this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize; this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount; - this.errorsHandler = builder.errorsHandler; } public static Builder newBuilder() { @@ -48,8 +51,9 @@ public String getMessageGroupId() { return messageGroupId; } + @Deprecated public BiConsumer getErrorsHandler() { - return errorsHandler; + return null; } public Long getPartitionId() { @@ -60,6 +64,10 @@ public Codec getCodec() { return codec; } + public RetryConfig getRetryConfig() { + return retryConfig; + } + public long getMaxSendBufferMemorySize() { return maxSendBufferMemorySize; } @@ -77,9 +85,9 @@ public static class Builder { private String messageGroupId = null; private Long partitionId = null; private Codec codec = Codec.GZIP; + private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL; private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT; private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT; - private BiConsumer errorsHandler = null; /** * Set path to a topic to write to @@ -135,6 +143,16 @@ public Builder setCodec(Codec codec) { return this; } + /** + * Set {@link RetryConfig} to define behavior of the stream internal retries + * @param config retry mode + * @return settings builder + */ + public Builder setRetryConfig(RetryConfig config) { + this.retryConfig = config; + return this; + } + /** * Set memory usage limit for send buffer. * Writer will not accept new messages if memory usage exceeds this limit. @@ -158,8 +176,27 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) { return this; } + /** + * @param handler + * @return builder + * @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead + */ + @Deprecated public Builder setErrorsHandler(BiConsumer handler) { - this.errorsHandler = handler; + final RetryConfig currentConfig = retryConfig; + retryConfig = new RetryConfig() { + @Override + public RetryPolicy getStatusRetryPolicy(Status status) { + handler.accept(status, null); + return currentConfig.getStatusRetryPolicy(status); + } + + @Override + public RetryPolicy getThrowableRetryPolicy(Throwable th) { + handler.accept(null, th); + return currentConfig.getThrowableRetryPolicy(th); + } + }; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 49a7e7ebd..41529d97e 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private CompletableFuture lastAcceptedMessageFuture; public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(topicRpc.getScheduler(), settings.getErrorsHandler()); + super(logger, settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new WriteSessionImpl(); @@ -81,11 +81,6 @@ public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressi logger.info(message); } - @Override - protected Logger getLogger() { - return logger; - } - @Override protected String getStreamName() { return "Writer"; @@ -479,7 +474,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { - reconnectCounter.set(0); + resetRetries(); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); diff --git a/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java new file mode 100644 index 000000000..8358aafd2 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java @@ -0,0 +1,240 @@ +package tech.ydb.topic.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcReadWriteStream; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.topic.YdbTopic; +import tech.ydb.proto.topic.v1.TopicServiceGrpc; +import tech.ydb.topic.TopicClient; + +/** + * + * @author Aleksandr Gorshenin + */ +public class BaseMockedTest { + private static final Logger logger = LoggerFactory.getLogger(BaseMockedTest.class); + + private interface WriteStream extends + GrpcReadWriteStream { + } + + private final GrpcTransport transport = Mockito.mock(GrpcTransport.class); + private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class); + private final ScheduledFuture emptyFuture = Mockito.mock(ScheduledFuture.class); + private final WriteStream writeStream = Mockito.mock(WriteStream.class); + private final SchedulerAssert schedulerHelper = new SchedulerAssert(); + + protected final TopicClient client = TopicClient.newClient(transport) + .setCompressionExecutor(Runnable::run) // Disable compression in separate executors + .build(); + + private volatile MockedWriteStream streamMock = null; + + @Before + public void beforeEach() { + streamMock = null; + + Mockito.when(transport.getScheduler()).thenReturn(scheduler); + Mockito.when(transport.readWriteStreamCall(Mockito.eq(TopicServiceGrpc.getStreamWriteMethod()), Mockito.any())) + .thenReturn(writeStream); + + // Every writeStream.start updates mockedWriteStream + Mockito.when(writeStream.start(Mockito.any())).thenAnswer(defaultStreamMockAnswer()); + + // Every writeStream.senbNext add message from client to mockedWriteStream.sent list + Mockito.doAnswer((Answer) (InvocationOnMock iom) -> { + streamMock.sent.add(iom.getArgument(0, YdbTopic.StreamWriteMessage.FromClient.class)); + return null; + }).when(writeStream).sendNext(Mockito.any()); + + Mockito.when(scheduler.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any())) + .thenAnswer((InvocationOnMock iom) -> { + logger.debug("mock scheduled task"); + schedulerHelper.tasks.add(iom.getArgument(0, Runnable.class)); + return emptyFuture; + }); + } + + protected MockedWriteStream currentStream() { + return streamMock; + } + + protected SchedulerAssert getScheduler() { + return schedulerHelper; + } + + protected OngoingStubbing> mockStreams() { + return Mockito.when(writeStream.start(Mockito.any())); + } + + protected Answer> defaultStreamMockAnswer() { + return (InvocationOnMock iom) -> { + streamMock = new MockedWriteStream(iom.getArgument(0)); + return streamMock.streamFuture; + }; + } + + protected Answer> errorStreamMockAnswer(StatusCode code) { + return (iom) -> { + streamMock = null; + return CompletableFuture.completedFuture(Status.of(code)); + }; + } + + protected static class SchedulerAssert { + private final Queue tasks = new ConcurrentLinkedQueue<>(); + + public SchedulerAssert hasNoTasks() { + Assert.assertTrue(tasks.isEmpty()); + return this; + } + + public SchedulerAssert hasTasks(int count) { + Assert.assertEquals(count, tasks.size()); + return this; + } + + public SchedulerAssert executeNextTasks(int count) { + Assert.assertTrue(count <= tasks.size()); + + CompletableFuture.runAsync(() -> { + logger.debug("execute {} scheduled tasks", count); + for (int idx = 0; idx < count; idx++) { + tasks.poll().run(); + } + }).join(); + return this; + } + } + + protected static class MockedWriteStream { + private final GrpcReadWriteStream.Observer observer; + private final CompletableFuture streamFuture = new CompletableFuture<>(); + private final List sent = new ArrayList<>(); + private volatile int sentIdx = 0; + + public MockedWriteStream(GrpcReadStream.Observer observer) { + this.observer = observer; + } + + public void complete(Status status) { + streamFuture.complete(status); + } + + public void complete(Throwable th) { + streamFuture.completeExceptionally(th); + } + + public void hasNoNewMessages() { + Assert.assertTrue(sentIdx >= sent.size()); + } + + public Checker nextMsg() { + Assert.assertTrue(sentIdx < sent.size()); + return new Checker(sent.get(sentIdx++)); + } + + public void responseErrorBadRequest() { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.BAD_REQUEST) + .build(); + observer.onNext(msg); + } + + public void responseErrorSchemeError() { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SCHEME_ERROR) + .build(); + observer.onNext(msg); + } + + public void responseInit(long lastSeqNo) { + responseInit(lastSeqNo, 123, "mocked", new int[] { 0, 1, 2}); + } + + public void responseInit(long lastSeqNo, long partitionId, String sessionId, int[] codecs) { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setInitResponse(YdbTopic.StreamWriteMessage.InitResponse.newBuilder() + .setLastSeqNo(lastSeqNo) + .setPartitionId(partitionId) + .setSessionId(sessionId) + .setSupportedCodecs(YdbTopic.SupportedCodecs.newBuilder() + .addAllCodecs(IntStream.of(codecs).boxed().collect(Collectors.toList()))) + ).build(); + observer.onNext(msg); + } + + public void responseWriteWritten(long firstSeqNo, int messagesCount) { + List acks = LongStream + .range(firstSeqNo, firstSeqNo + messagesCount) + .mapToObj(seqNo -> YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.newBuilder() + .setSeqNo(seqNo) + .setWritten(YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Written.newBuilder()) + .build()) + .collect(Collectors.toList()); + + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse.newBuilder().addAllAcks(acks)) + .build(); + observer.onNext(msg); + } + + protected class Checker { + private final YdbTopic.StreamWriteMessage.FromClient msg; + + public Checker(YdbTopic.StreamWriteMessage.FromClient msg) { + this.msg = msg; + } + + public Checker isInit() { + Assert.assertTrue("next msg must be init request", msg.hasInitRequest()); + return this; + } + + public Checker hasInitPath(String path) { + Assert.assertEquals("invalid init request path", path, msg.getInitRequest().getPath()); + return this; + } + + public Checker isWrite() { + Assert.assertTrue("next msg must be write request", msg.hasWriteRequest()); + return this; + } + + public Checker hasWrite(int codec, long... seqnums) { + Assert.assertEquals("invalid write codec", codec, msg.getWriteRequest().getCodec()); + Assert.assertEquals("invalid messages count", seqnums.length, msg.getWriteRequest().getMessagesCount()); + for (int idx = 0; idx < seqnums.length; idx++) { + Assert.assertEquals("invalid msg seqNo " + idx, seqnums[idx], + msg.getWriteRequest().getMessages(idx).getSeqNo()); + } + return this; + } + } + } +} diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java new file mode 100644 index 000000000..10c105e82 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java @@ -0,0 +1,455 @@ +package tech.ydb.topic.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.topic.settings.WriterSettings; +import tech.ydb.topic.write.AsyncWriter; +import tech.ydb.topic.write.InitResult; +import tech.ydb.topic.write.Message; +import tech.ydb.topic.write.QueueOverflowException; +import tech.ydb.topic.write.SyncWriter; +import tech.ydb.topic.write.WriteAck; + +/** + * + * @author Aleksandr Gorshenin + */ +public class TopicRetriesTest extends BaseMockedTest { + + @Test + public void writerDefaultRetryTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .build()); + writer.init(); + + // Retry #1 - TRANSPORT_UNAVAILABLE + Assert.assertNull(currentStream()); + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + writer.send(Message.of("test-message".getBytes())); + stream1.nextMsg().isWrite().hasWrite(2, 1); + stream1.responseWriteWritten(1, 1); + + stream1.complete(Status.of(StatusCode.SUCCESS)); + + // Retry #2 - Stream is closed by server + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #3 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseErrorBadRequest(); + + // Retry #4 - Stream send bad request + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream3 = currentStream(); + Assert.assertNotEquals(stream2, stream3); + + stream3.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream3.hasNoNewMessages(); + stream3.responseInit(1); + + writer.send(Message.of("other-message".getBytes())); + stream3.nextMsg().isWrite().hasWrite(2, 2); + stream3.responseWriteWritten(2, 1); + + writer.flush(); + writer.shutdown(1, TimeUnit.SECONDS); + stream3.complete(Status.SUCCESS); + } + + @Test + public void writerNoRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + // No stream and no retries in scheduler + Assert.assertNull(currentStream()); + getScheduler().hasNoTasks(); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void writerNoRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + // Even successful completing closes writer + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void writerNoRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + stream1.responseErrorBadRequest(); + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void writerIdempotentRetryTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setProducerId("test-id") + .setRetryConfig(RetryConfig.idempotentRetryForever()) + .build()); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(10); + + writer.send(Message.of("test-message".getBytes())); + stream1.nextMsg().isWrite().hasWrite(2, 11); + stream1.responseWriteWritten(11, 1); + + stream1.complete(new RuntimeException("io exception", + new UnexpectedResultException("inner", Status.of(StatusCode.CLIENT_INTERNAL_ERROR))) + ); + + // Retry #1 - Stream is by runtime exception + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #2 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #3 - TRANSPORT_UNAVAILABLE + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #4 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(12); + + writer.send(Message.of("other-message".getBytes())); + stream2.nextMsg().isWrite().hasWrite(2, 13); + stream2.responseWriteWritten(13, 1); + + writer.flush(); + writer.shutdown(1, TimeUnit.SECONDS); + stream2.complete(Status.SUCCESS); + } + + @Test + public void asyncWriterDefaultRetryTest() throws QueueOverflowException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + AsyncWriter writer = client.createAsyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setProducerId("producer-id") + .build()); + CompletableFuture initFuture = writer.init(); + + // Retry #1 - TRANSPORT_UNAVAILABLE + Assert.assertNull(currentStream()); + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(3); + + Assert.assertTrue(initFuture.isDone()); + Assert.assertEquals(3, initFuture.join().getSeqNo()); + + CompletableFuture m1Future = writer.send(Message.of("test-message".getBytes())); + CompletableFuture m2Future = writer.send(Message.of("test-message2".getBytes())); + + stream1.nextMsg().isWrite().hasWrite(2, 4); // m1 + stream1.nextMsg().isWrite().hasWrite(2, 5); // m2 + + Assert.assertFalse(m1Future.isDone()); + Assert.assertFalse(m2Future.isDone()); + + stream1.responseWriteWritten(4, 1); // ack for m1 + + Assert.assertTrue(m1Future.isDone()); + Assert.assertEquals(4, m1Future.join().getSeqNo()); + Assert.assertFalse(m2Future.isDone()); + + stream1.complete(Status.of(StatusCode.BAD_SESSION)); + + // Retry #2 - Stream is closed by server + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #3 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(4); + + stream2.nextMsg().isWrite().hasWrite(2, 5); // m2 + + CompletableFuture m3Future = writer.send(Message.of("other-message3".getBytes())); + + stream2.responseErrorSchemeError(); + + // Retry #4 - Stream send bad request + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream3 = currentStream(); + Assert.assertNotEquals(stream2, stream3); + + stream3.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream3.hasNoNewMessages(); + stream3.responseInit(4); + + stream3.nextMsg().isWrite().hasWrite(2, 5, 6); // m2 & m3 + + Assert.assertFalse(m2Future.isDone()); + Assert.assertFalse(m3Future.isDone()); + + stream3.responseWriteWritten(5, 2); + + Assert.assertTrue(m2Future.isDone()); + Assert.assertEquals(5, m2Future.join().getSeqNo()); + Assert.assertTrue(m3Future.isDone()); + Assert.assertEquals(6, m3Future.join().getSeqNo()); + + writer.shutdown(); + stream3.complete(Status.SUCCESS); + } + + @Test + public void asyncWriterIdempotentRetryTest() throws QueueOverflowException { + mockStreams() + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + AsyncWriter writer = client.createAsyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.idempotentRetryForever()) + .build()); + + CompletableFuture initFuture = writer.init(); + CompletableFuture m1Future = writer.send(Message.of("test-message".getBytes())); + CompletableFuture m2Future = writer.send(Message.of("test-message2".getBytes())); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(30); + + Assert.assertTrue(initFuture.isDone()); + Assert.assertEquals(30, initFuture.join().getSeqNo()); + + stream1.nextMsg().isWrite().hasWrite(2, 31, 32); // m1 & m2 + + Assert.assertFalse(m1Future.isDone()); + Assert.assertFalse(m2Future.isDone()); + + stream1.responseWriteWritten(31, 1); + + Assert.assertTrue(m1Future.isDone()); + Assert.assertEquals(31, m1Future.join().getSeqNo()); + Assert.assertFalse(m2Future.isDone()); + + stream1.complete(Status.of(StatusCode.ABORTED)); + + // Retry #1 - ABORTED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #2 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #3 - TRANSPORT_UNAVAILABLE + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #4 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(31); + + stream2.nextMsg().isWrite().hasWrite(2, 32); // m2 + + CompletableFuture m3Future = writer.send(Message.of("other-message".getBytes())); + + stream2.nextMsg().isWrite().hasWrite(2, 33); + stream2.responseWriteWritten(32, 2); + + Assert.assertTrue(m2Future.isDone()); + Assert.assertTrue(m3Future.isDone()); + Assert.assertEquals(32, m2Future.join().getSeqNo()); + Assert.assertEquals(33, m3Future.join().getSeqNo()); + + writer.shutdown().join(); + stream2.complete(Status.SUCCESS); + } + + @Test + public void asyncDisabledRetryStreamCloseTest() throws QueueOverflowException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + AsyncWriter writer = client.createAsyncWriter(settings); + CompletableFuture initFuture = writer.init(); + CompletableFuture messageFuture = writer.send(Message.of("test".getBytes())); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + + Assert.assertFalse(initFuture.isDone()); + Assert.assertFalse(messageFuture.isDone()); + + // Even successful completing closes writer + stream1.complete(Status.SUCCESS); + + Assert.assertTrue(initFuture.isCompletedExceptionally()); + Assert.assertTrue(messageFuture.isCompletedExceptionally()); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown().join(); + } + + @Test + public void asyncDisabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + stream1.responseErrorBadRequest(); + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void asyncDisabledRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); +// writer. + + // No stream and no retries in scheduler + Assert.assertNull(currentStream()); + getScheduler().hasNoTasks(); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } +} diff --git a/topic/src/test/resources/log4j2.xml b/topic/src/test/resources/log4j2.xml index c799da309..c59b5d2aa 100644 --- a/topic/src/test/resources/log4j2.xml +++ b/topic/src/test/resources/log4j2.xml @@ -2,7 +2,7 @@ - +