diff --git a/src/main/java/io/lettuce/core/BaseConnectionFuture.java b/src/main/java/io/lettuce/core/BaseConnectionFuture.java new file mode 100644 index 000000000..96313c400 --- /dev/null +++ b/src/main/java/io/lettuce/core/BaseConnectionFuture.java @@ -0,0 +1,311 @@ +package io.lettuce.core; + +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Base class for connection futures that provides protection against event loop deadlocks. + *

+ * This class ensures that all callbacks (thenApply, thenAccept, etc.) execute on a separate thread pool rather than on Netty + * event loop threads. This prevents deadlocks when users call blocking sync operations inside callbacks. + *

+ * Example of the problem this solves: + * + *

+ * {@code
+ * // DANGEROUS with plain CompletableFuture - can deadlock!
+ * future.thenApply(conn -> conn.sync().ping());
+ *
+ * // SAFE with BaseConnectionFuture - always runs on separate thread
+ * future.thenApply(conn -> conn.sync().ping());
+ * }
+ * 
+ * + * @param Connection type + * @author Ali Takavci + * @since 7.4 + */ +public abstract class BaseConnectionFuture implements CompletionStage, Future { + + protected final CompletableFuture delegate; + + protected final Executor defaultExecutor; + + /** + * Create a new {@link BaseConnectionFuture} wrapping the given delegate future. + * + * @param delegate the underlying CompletableFuture + */ + protected BaseConnectionFuture(CompletableFuture delegate) { + this(delegate, ForkJoinPool.commonPool()); + } + + /** + * Create a new {@link BaseConnectionFuture} wrapping the given delegate future with a custom executor. + * + * @param delegate the underlying CompletableFuture + * @param defaultExecutor the executor to use for async callbacks + */ + protected BaseConnectionFuture(CompletableFuture delegate, Executor defaultExecutor) { + this.delegate = delegate; + this.defaultExecutor = defaultExecutor; + } + + /** + * Subclasses must implement this to wrap a new CompletableFuture in the appropriate subclass type. + * + * @param future the future to wrap + * @param the new type + * @return the wrapped future + */ + protected abstract CompletionStage wrap(CompletableFuture future); + + // ========================================================================= + // Future interface methods + // ========================================================================= + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + // ========================================================================= + // CompletionStage methods - ALL force async execution + // ========================================================================= + + @Override + public CompletionStage thenApply(Function fn) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenApplyAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenApplyAsync(Function fn) { + return wrap(delegate.thenApplyAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenApplyAsync(Function fn, Executor executor) { + return wrap(delegate.thenApplyAsync(fn, executor)); + } + + @Override + public CompletionStage thenAccept(Consumer action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenAcceptAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action) { + return wrap(delegate.thenAcceptAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { + return wrap(delegate.thenAcceptAsync(action, executor)); + } + + @Override + public CompletionStage thenRun(Runnable action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenRunAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenRunAsync(Runnable action) { + return wrap(delegate.thenRunAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenRunAsync(Runnable action, Executor executor) { + return wrap(delegate.thenRunAsync(action, executor)); + } + + @Override + public CompletionStage thenCombine(CompletionStage other, + BiFunction fn) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenCombineAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, + BiFunction fn) { + return wrap(delegate.thenCombineAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, + BiFunction fn, Executor executor) { + return wrap(delegate.thenCombineAsync(other, fn, executor)); + } + + @Override + public CompletionStage thenAcceptBoth(CompletionStage other, + BiConsumer action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenAcceptBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, + BiConsumer action) { + return wrap(delegate.thenAcceptBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, + BiConsumer action, Executor executor) { + return wrap(delegate.thenAcceptBothAsync(other, action, executor)); + } + + @Override + public CompletionStage runAfterBoth(CompletionStage other, Runnable action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.runAfterBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(delegate.runAfterBothAsync(other, action, executor)); + } + + @Override + public CompletionStage applyToEither(CompletionStage other, Function fn) { + // Force async execution to prevent event loop blocking + return wrap(delegate.applyToEitherAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + return wrap(delegate.applyToEitherAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn, + Executor executor) { + return wrap(delegate.applyToEitherAsync(other, fn, executor)); + } + + @Override + public CompletionStage acceptEither(CompletionStage other, Consumer action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.acceptEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + return wrap(delegate.acceptEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, + Executor executor) { + return wrap(delegate.acceptEitherAsync(other, action, executor)); + } + + @Override + public CompletionStage runAfterEither(CompletionStage other, Runnable action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.runAfterEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(delegate.runAfterEitherAsync(other, action, executor)); + } + + @Override + public CompletionStage thenCompose(Function> fn) { + // Force async execution to prevent event loop blocking + return wrap(delegate.thenComposeAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn) { + return wrap(delegate.thenComposeAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn, Executor executor) { + return wrap(delegate.thenComposeAsync(fn, executor)); + } + + @Override + public CompletionStage whenComplete(BiConsumer action) { + // Force async execution to prevent event loop blocking + return wrap(delegate.whenCompleteAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action) { + return wrap(delegate.whenCompleteAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { + return wrap(delegate.whenCompleteAsync(action, executor)); + } + + @Override + public CompletionStage handle(BiFunction fn) { + // Force async execution to prevent event loop blocking + return wrap(delegate.handleAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage handleAsync(BiFunction fn) { + return wrap(delegate.handleAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage handleAsync(BiFunction fn, Executor executor) { + return wrap(delegate.handleAsync(fn, executor)); + } + + @Override + public CompletionStage exceptionally(Function fn) { + // exceptionally doesn't have an async variant, but it's typically safe + // as it only runs on exception and doesn't block + return wrap(delegate.exceptionally(fn)); + } + + @Override + public CompletableFuture toCompletableFuture() { + return delegate.toCompletableFuture(); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilder.java b/src/main/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilder.java new file mode 100644 index 000000000..1d752e1c8 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilder.java @@ -0,0 +1,393 @@ +package io.lettuce.core.failover; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import io.lettuce.core.ConnectionFuture; +import io.lettuce.core.Delegating; +import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.StatefulRedisConnectionImpl; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.core.failover.health.HealthCheck; +import io.lettuce.core.failover.health.HealthCheckStrategy; +import io.lettuce.core.failover.health.HealthCheckStrategySupplier; +import io.lettuce.core.failover.health.HealthStatus; +import io.lettuce.core.failover.health.HealthStatusManager; +import io.lettuce.core.resource.ClientResources; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * Manages asynchronous connection creation and health checking for multiple Redis databases. This class handles the complex + * async logic of establishing connections to multiple databases, waiting for health checks, and filtering out unhealthy or + * failed connections. + * + * @param Key type + * @param Value type + * @author Ali Takavci + * @since 7.4 + */ +class MultiDbAsyncConnectionBuilder { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiDbAsyncConnectionBuilder.class); + + private final HealthStatusManager healthStatusManager; + + private final ClientResources resources; + + private final MultiDbClientImpl client; + + /** + * Creates a new MultiDbAsyncConnectionBuilder. + * + * @param healthStatusManager the health status manager to use for tracking database health + * @param resources the client resources + * @param client the MultiDbClient instance for creating connections + */ + MultiDbAsyncConnectionBuilder(HealthStatusManager healthStatusManager, ClientResources resources, + MultiDbClientImpl client) { + this.healthStatusManager = healthStatusManager; + this.resources = resources; + this.client = client; + } + + /** + * Asynchronously creates connections to all configured databases and waits for initial health checks. + * + * @param databaseConfigs the database configurations + * @param codec the Redis codec to use + * @param multiDbConnectionFactory function to create the final multi-db connection wrapper + * @return a CompletableFuture that completes with the multi-db connection + */ + CompletableFuture> connectAsync(Map databaseConfigs, + RedisCodec codec, MultiDbConnectionFactory multiDbConnectionFactory) { + + CompletableFuture> connectionFuture = new CompletableFuture<>(); + + // Create async database connections for all configured endpoints + Map>>> databaseFutures = new ConcurrentHashMap<>( + databaseConfigs.size()); + + for (Map.Entry entry : databaseConfigs.entrySet()) { + RedisURI uri = entry.getKey(); + DatabaseConfig config = entry.getValue(); + + CompletableFuture>> dbFuture = createRedisDatabaseAsync(config, + codec); + databaseFutures.put(uri, dbFuture); + } + + // Wait for all database connections to complete (both successful and failed) + CompletableFuture allDatabasesFuture = CompletableFuture + .allOf(databaseFutures.values().toArray(new CompletableFuture[0])); + + // Handle completion with partial success support + allDatabasesFuture.handle((v, throwable) -> { + + CompletableFuture>>> result = handleDatabaseFutures( + databaseFutures); + + result.handle((healthyDatabaseMap, exc) -> { + if (exc != null) { + connectionFuture.completeExceptionally(exc); + } else if (healthyDatabaseMap != null && !healthyDatabaseMap.isEmpty()) { + // Create the multi-db connection wrapper with only healthy and open databases + connectionFuture.complete(multiDbConnectionFactory.create(healthyDatabaseMap, codec, healthStatusManager)); + } else { + // this should not happen ever + connectionFuture.completeExceptionally(new IllegalStateException( + "Healthy database map is empty without any errors propagated! This should be investigated.")); + } + return null; + }); + return null; + }); + + return connectionFuture; + } + + /** + * Asynchronously creates a Redis database connection with circuit breaker and health check support. + * + * @param config the database configuration + * @param codec the codec to use for encoding/decoding + * @return a CompletableFuture that completes with the created database + */ + CompletableFuture>> createRedisDatabaseAsync(DatabaseConfig config, + RedisCodec codec) { + + RedisURI uri = config.getRedisURI(); + client.setOptions(config.getClientOptions()); + + try { + // Use the async connect method from RedisClient + ConnectionFuture> connectionFuture = client.connectAsync(codec, uri); + // Reset options immediately after connectAsync() call + client.resetOptions(); + + return connectionFuture.toCompletableFuture().thenApply(connection -> { + try { + + HealthCheck healthCheck = null; + if (HealthCheckStrategySupplier.NO_HEALTH_CHECK != config.getHealthCheckStrategySupplier()) { + HealthCheckStrategy hcStrategy = config.getHealthCheckStrategySupplier().get(config.getRedisURI(), + new DatabaseRawConnectionFactoryImpl(config.getClientOptions(), client)); + healthCheck = healthStatusManager.add(uri, hcStrategy); + } + + DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection); + CircuitBreakerImpl circuitBreaker = new CircuitBreakerImpl(config.getCircuitBreakerConfig()); + databaseEndpoint.bind(circuitBreaker); + + RedisDatabaseImpl> database = new RedisDatabaseImpl<>(config, connection, + databaseEndpoint, circuitBreaker, healthCheck); + if (logger.isInfoEnabled()) { + logger.info("Created database: {} with CircuitBreaker {} and HealthCheck {}", database.getId(), + circuitBreaker.getId(), healthCheck != null ? healthCheck.getEndpoint() : "N/A"); + } + return database; + } catch (Exception e) { + // If database setup fails, close the connection + connection.closeAsync(); + throw e; + } + }); + } catch (Exception e) { + client.resetOptions(); + CompletableFuture>> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(e); + return failedFuture; + } + } + + /** + * Handles the completion of individual database futures, collecting successful connections and closing failed ones. Also + * waits for initial health checks to complete and filters out unhealthy databases and closed connections. + * + * @param databaseFutures the map of database futures + * @return a CompletableFuture that completes with the map of healthy databases + */ + CompletableFuture>>> handleDatabaseFutures( + Map>>> databaseFutures) { + + CompletableFuture>>> healthyDatabasesFuture = new CompletableFuture<>(); + try { + + // Collect successfully created databases, skip failed ones + Map>> databases = collectDatabasesWithEstablishedConnections( + databaseFutures); + + // Wait for initial health checks asynchronously + CompletableFuture> healthCheckFuture = collectHealthStatuses(databases); + + healthCheckFuture.whenComplete((healthStatuses, ht) -> { + if (ht != null) { + // Health check failed, close all connections + databases.values().forEach(db -> db.getConnection().closeAsync()); + healthyDatabasesFuture.completeExceptionally(ht); + return; + } + + healthyDatabasesFuture.complete(databases); + }); + + } catch (Exception e) { + // Unexpected error during connection creation + logger.error("Unexpected error during async connection", e); + databaseFutures.values().forEach(dbFuture -> { + if (dbFuture.isDone() && !dbFuture.isCompletedExceptionally()) { + dbFuture.thenAccept(db -> db.getConnection().closeAsync()); + } + }); + healthyDatabasesFuture.completeExceptionally(e); + + } + + return healthyDatabasesFuture; + } + + /** + * Collects all databases that have successfully established connections. If no database has successfully connected, throws + * a {@link RedisConnectionException} exception with all failures as suppressed exceptions. If at least one database has + * successfully connected, logs a warning for each failed connection and proceeds. + * + * @param databaseFutures the map of database futures + * @return a map of successfully connected databases + */ + Map>> collectDatabasesWithEstablishedConnections( + Map>>> databaseFutures) { + // Collect successfully created databases, skip failed ones + Map>> databases = new ConcurrentHashMap<>(); + List failures = new ArrayList<>(); + + for (Map.Entry>>> entry : databaseFutures + .entrySet()) { + CompletableFuture>> dbFuture = entry.getValue(); + + try { + RedisDatabaseImpl> database = dbFuture.get(); + databases.put(entry.getKey(), database); + } catch (Exception e) { + // Failed to get database, record the failure + failures.add(e); + logger.warn("Failed to create database connection for {}: {}", entry.getKey(), e.getMessage()); + } + } + + // Check if we have at least one successful connection + if (databases.isEmpty()) { + // All connections failed - fail the entire operation + logger.error("All database connections failed. Total failures: {}", failures.size()); + + // Create a composite exception with all failures + RedisConnectionException compositeException = new RedisConnectionException( + "Failed to connect to any database. All " + databaseFutures.size() + " connection(s) failed."); + failures.forEach(compositeException::addSuppressed); + throw compositeException; + } else { + // We have at least one successful connection - proceed + logger.info("Successfully connected to {} out of {} database(s)", databases.size(), databaseFutures.size()); + if (!failures.isEmpty()) { + logger.info("{} database connection(s) failed but proceeding with successful ones", failures.size()); + } + + } + return databases; + } + + /** + * Asynchronously waits for all successfully connected databases to have their health status determined. Once all health + * statuses are known, selects the most weighted healthy database. + * + * @param databases the map of databases to check + * @return a CompletableFuture that completes with a map of health statuses when all are determined + */ + CompletableFuture> collectHealthStatuses( + Map>> databases) { + + logger.info("Waiting for health status of {} successfully connected databases", databases.size()); + + StatusTracker statusTracker = new StatusTracker(healthStatusManager, resources); + + // Create a list of futures, one for each database's health status + List>> healthCheckFutures = new ArrayList<>(); + + for (Map.Entry>> entry : databases.entrySet()) { + RedisURI endpoint = entry.getKey(); + RedisDatabaseImpl> database = entry.getValue(); + + CompletableFuture healthFuture; + + // Check if health checks are enabled for this database + if (database.getHealthCheck() != null) { + logger.info("Health checks enabled for {}, waiting for result", endpoint); + // Wait asynchronously for this database's health status to be determined + healthFuture = statusTracker.waitForHealthStatusAsync(endpoint); + } else { + // No health check configured - assume healthy + logger.info("No health check configured for database {}, defaulting to HEALTHY", endpoint); + healthFuture = CompletableFuture.completedFuture(HealthStatus.HEALTHY); + } + + // Transform to include the endpoint with the health status + CompletableFuture> entryFuture = healthFuture + .thenApply(status -> new AbstractMap.SimpleEntry<>(endpoint, status)); + + healthCheckFutures.add(entryFuture); + } + + // Wait for all health checks to complete + CompletableFuture allHealthChecksFuture = CompletableFuture + .allOf(healthCheckFutures.toArray(new CompletableFuture[0])); + + // Once all health checks are done, collect and return the health statuses + return allHealthChecksFuture.thenApply(v -> { + // Collect all health statuses + Map healthStatuses = new HashMap<>(); + for (CompletableFuture> future : healthCheckFutures) { + Map.Entry entry = future.join(); + healthStatuses.put(entry.getKey(), entry.getValue()); + logger.info("Database {} health status: {}", entry.getKey(), entry.getValue()); + } + + // Find the most weighted healthy database + RedisDatabaseImpl> selectedDatabase = databases.entrySet().stream() + .filter(entry -> healthStatuses.get(entry.getKey()).isHealthy()) + .max(Comparator.comparing(entry -> entry.getValue().getWeight())).map(Map.Entry::getValue).orElse(null); + + if (selectedDatabase == null) { + throw new RedisConnectionException( + "All configured databases are unhealthy. Health statuses: " + healthStatuses); + } + + logger.info("Selected database with weight {} as initial active database", selectedDatabase.getWeight()); + return healthStatuses; + }); + } + + /** + * Extracts the database endpoint from a Redis connection. + * + * @param connection the Redis connection + * @return the database endpoint + */ + private DatabaseEndpoint extractDatabaseEndpoint(StatefulRedisConnection connection) { + RedisChannelWriter writer = ((StatefulRedisConnectionImpl) connection).getChannelWriter(); + if (writer instanceof Delegating) { + writer = (RedisChannelWriter) ((Delegating) writer).unwrap(); + } + return (DatabaseEndpoint) writer; + } + + /** + * Factory interface for creating multi-database connections. + * + * @param Key type + * @param Value type + */ + @FunctionalInterface + interface MultiDbConnectionFactory { + + StatefulRedisMultiDbConnection create( + Map>> healthyDatabaseMap, RedisCodec codec, + HealthStatusManager healthStatusManager); + + } + + /** + * Implementation of DatabaseRawConnectionFactory that uses the MultiDbClient. + */ + private static class DatabaseRawConnectionFactoryImpl implements DatabaseRawConnectionFactory { + + private final io.lettuce.core.ClientOptions clientOptions; + + private final MultiDbClientImpl client; + + public DatabaseRawConnectionFactoryImpl(io.lettuce.core.ClientOptions clientOptions, MultiDbClientImpl client) { + this.clientOptions = clientOptions; + this.client = client; + } + + @Override + public StatefulRedisConnection connectToDatabase(RedisURI endpoint) { + client.setOptions(clientOptions); + try { + return client.connect(endpoint); + } finally { + client.resetOptions(); + } + } + + } + +} diff --git a/src/main/java/io/lettuce/core/failover/MultiDbClient.java b/src/main/java/io/lettuce/core/failover/MultiDbClient.java index 6ab430cc8..05404258d 100644 --- a/src/main/java/io/lettuce/core/failover/MultiDbClient.java +++ b/src/main/java/io/lettuce/core/failover/MultiDbClient.java @@ -42,7 +42,8 @@ public static MultiDbClient create(ClientResources resources, Collection getRedisURIs(); /** - * Open a new connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys and values. + * Open a new multi database connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys + * and values. * * @param codec Use this codec to encode/decode keys and values, must not be {@code null} * @param Key type @@ -52,15 +53,15 @@ public static MultiDbClient create(ClientResources resources, Collection StatefulRedisMultiDbConnection connect(RedisCodec codec); /** - * Open a new connection to a Redis server that treats keys and values as UTF-8 strings. + * Open a new multi database connection to a Redis server that treats keys and values as UTF-8 strings. * * @return A new stateful Redis connection */ StatefulRedisMultiDbConnection connect(); /** - * Open a new pub/sub connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys and - * values. + * Open a new multi database pub/sub connection to a Redis server. Use the supplied {@link RedisCodec codec} to + * encode/decode keys and values. * * @param codec Use this codec to encode/decode keys and values, must not be {@code null} * @param Key type @@ -70,10 +71,39 @@ public static MultiDbClient create(ClientResources resources, Collection StatefulRedisMultiDbPubSubConnection connectPubSub(RedisCodec codec); /** - * Open a new pub/sub connection to a Redis server that treats keys and values as UTF-8 strings. + * Open a new multi database pub/sub connection to a Redis server that treats keys and values as UTF-8 strings. * * @return A new stateful pub/sub connection */ StatefulRedisMultiDbPubSubConnection connectPubSub(); + /** + * Open asynchronously a new multi database connection to a Redis server. "Use the default {@link RedisCodec codec} + * StringCodec.UTF8 to encode/decode keys and values. + *

+ * The returned {@link MultiDbConnectionFuture} ensures that all callbacks (thenApply, thenAccept, etc.) execute on a + * separate thread pool rather than on Netty event loop threads, preventing deadlocks when calling blocking sync operations + * inside callbacks. + * + * @return a {@link MultiDbConnectionFuture} that is notified with the connection progress. + * @since 7.4 + */ + public MultiDbConnectionFuture connectAsync(); + + /** + * Open asynchronously a new multi database connection to a Redis server. Use the supplied {@link RedisCodec codec} to + * encode/decode keys and values. + *

+ * The returned {@link MultiDbConnectionFuture} ensures that all callbacks (thenApply, thenAccept, etc.) execute on a + * separate thread pool rather than on Netty event loop threads, preventing deadlocks when calling blocking sync operations + * inside callbacks. + * + * @param codec Use this codec to encode/decode keys and values, must not be {@code null} + * @param Key type + * @param Value type + * @return a {@link MultiDbConnectionFuture} that is notified with the connection progress. + * @since 7.4 + */ + public MultiDbConnectionFuture connectAsync(RedisCodec codec); + } diff --git a/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java b/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java index 4d5c14dad..89a1cfa8b 100644 --- a/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java +++ b/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java @@ -4,6 +4,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -105,6 +106,17 @@ void resetOptions() { this.localClientOptions.remove(); } + /** + * Open a new connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys and values. This + * method is synchronous and will block until all database connections are established. It also waits for the initial health + * checks to complete starting from most weighted database, ensuring that at least one database is healthy before returning + * to use in the order of their weights. + * + * @param codec Use this codec to encode/decode keys and values, must not be {@code null} + * @param Key type + * @param Value type + * @return A new stateful Redis connection + */ public StatefulRedisMultiDbConnection connect(RedisCodec codec) { if (codec == null) { @@ -127,13 +139,12 @@ public StatefulRedisMultiDbConnection connect(RedisCodec code databases.put(uri, database); } - StatusTracker statusTracker = new StatusTracker(healthStatusManager); + StatusTracker statusTracker = new StatusTracker(healthStatusManager, getResources()); // Wait for health checks to complete if configured waitForInitialHealthyDatabase(statusTracker, databases); // Provide a connection factory for dynamic database addition - return new StatefulRedisMultiDbConnectionImpl, K, V>(databases, getResources(), codec, - this::createRedisDatabase, healthStatusManager); + return createMultiDbConnection(databases, codec, healthStatusManager); } protected HealthStatusManager createHealthStatusManager() { @@ -169,6 +180,82 @@ private RedisDatabaseImpl> createRedisDatab } } + // ASYNC CONNECT + /** + * Asynchronously open a new connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys + * and values. This method is asynchronous and returns a {@link MultiDbConnectionFuture} that completes when all database + * connections are established and initial health checks (if configured) have completed. + *

+ * The returned {@link MultiDbConnectionFuture} ensures that all callbacks (thenApply, thenAccept, etc.) execute on a + * separate thread pool rather than on Netty event loop threads, preventing deadlocks when calling blocking sync operations + * inside callbacks. + * + * @param codec Use this codec to encode/decode keys and values, must not be {@code null} + * @param Key type + * @param Value type + * @return A new stateful Redis connection future + */ + @Override + public MultiDbConnectionFuture connectAsync() { + HealthStatusManager healthStatusManager = createHealthStatusManager(); + MultiDbAsyncConnectionBuilder builder = new MultiDbAsyncConnectionBuilder<>(healthStatusManager, + getResources(), this); + + CompletableFuture> future = builder.connectAsync(databaseConfigs, + newStringStringCodec(), this::createMultiDbConnection); + + return MultiDbConnectionFuture.from(future, getResources().eventExecutorGroup()); + } + + /** + * Asynchronously open a new connection to a Redis server. Use the supplied {@link RedisCodec codec} to encode/decode keys + * and values. This method is asynchronous and returns a {@link MultiDbConnectionFuture} that completes when all database + * connections are established and initial health checks (if configured) have completed. + *

+ * The returned {@link MultiDbConnectionFuture} ensures that all callbacks (thenApply, thenAccept, etc.) execute on a + * separate thread pool rather than on Netty event loop threads, preventing deadlocks when calling blocking sync operations + * inside callbacks. + * + * @param codec Use this codec to encode/decode keys and values, must not be {@code null} + * @param Key type + * @param Value type + * @return A new stateful Redis connection future + */ + @Override + public MultiDbConnectionFuture connectAsync(RedisCodec codec) { + if (codec == null) { + throw new IllegalArgumentException("codec must not be null"); + } + + HealthStatusManager healthStatusManager = createHealthStatusManager(); + MultiDbAsyncConnectionBuilder builder = new MultiDbAsyncConnectionBuilder<>(healthStatusManager, getResources(), + this); + + CompletableFuture> future = builder.connectAsync(databaseConfigs, codec, + this::createMultiDbConnection); + + return MultiDbConnectionFuture.from(future, getResources().eventExecutorGroup()); + } + + /** + * Creates a new {@link StatefulRedisMultiDbConnection} instance with the provided healthy database map. + * + * @param healthyDatabaseMap the map of healthy databases + * @param codec the Redis codec + * @param healthStatusManager the health status manager + * @param Key type + * @param Value type + * @return a new multi-database connection + */ + protected StatefulRedisMultiDbConnection createMultiDbConnection( + Map>> healthyDatabaseMap, RedisCodec codec, + HealthStatusManager healthStatusManager) { + + return new StatefulRedisMultiDbConnectionImpl, K, V>(healthyDatabaseMap, getResources(), + codec, this::createRedisDatabase, healthStatusManager); + } + // END OF ASYNC CONNECT + /** * Open a new connection to a Redis server that treats keys and values as UTF-8 strings. * @@ -197,7 +284,7 @@ public StatefulRedisMultiDbPubSubConnection connectPubSub(RedisCode databases.put(uri, database); } - StatusTracker statusTracker = new StatusTracker(healthStatusManager); + StatusTracker statusTracker = new StatusTracker(healthStatusManager, getResources()); // Wait for health checks to complete if configured waitForInitialHealthyDatabase(statusTracker, databases); diff --git a/src/main/java/io/lettuce/core/failover/MultiDbConnectionFuture.java b/src/main/java/io/lettuce/core/failover/MultiDbConnectionFuture.java new file mode 100644 index 000000000..dc4788d4a --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/MultiDbConnectionFuture.java @@ -0,0 +1,85 @@ +package io.lettuce.core.failover; + +import java.util.concurrent.*; + +import io.lettuce.core.BaseConnectionFuture; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; + +/** + * A {@code MultiDbConnectionFuture} represents the result of an asynchronous multi-database connection initialization. + *

+ * This future wrapper ensures that all callbacks (thenApply, thenAccept, etc.) execute on a separate thread pool rather than on + * Netty event loop threads. This prevents deadlocks when users call blocking sync operations inside callbacks. + *

+ * Example of the problem this solves: + * + *

+ * {@code
+ * // DANGEROUS with plain CompletableFuture - can deadlock!
+ * future.thenApply(conn -> conn.sync().ping());
+ *
+ * // SAFE with MultiDbConnectionFuture - always runs on separate thread
+ * future.thenApply(conn -> conn.sync().ping());
+ * }
+ * 
+ * + * @param Key type + * @param Value type + * @author Ali Takavci + * @since 7.4 + */ +public class MultiDbConnectionFuture extends BaseConnectionFuture> { + + /** + * Create a new {@link MultiDbConnectionFuture} wrapping the given delegate future. + * + * @param delegate the underlying CompletableFuture + */ + public MultiDbConnectionFuture(CompletableFuture> delegate) { + super(delegate); + } + + /** + * Create a new {@link MultiDbConnectionFuture} wrapping the given delegate future with a custom executor. + * + * @param delegate the underlying CompletableFuture + * @param defaultExecutor the executor to use for async callbacks + */ + public MultiDbConnectionFuture(CompletableFuture> delegate, Executor defaultExecutor) { + super(delegate, defaultExecutor); + } + + /** + * Create a {@link MultiDbConnectionFuture} from a {@link CompletableFuture}. + * + * @param future the CompletableFuture to wrap + * @param Key type + * @param Value type + * @return the wrapped future + */ + public static MultiDbConnectionFuture from(CompletableFuture> future) { + return new MultiDbConnectionFuture<>(future); + } + + /** + * Create a {@link MultiDbConnectionFuture} from a {@link CompletableFuture} with a custom executor. + * + * @param future the CompletableFuture to wrap + * @param executor the executor to use for async callbacks + * @param Key type + * @param Value type + * @return the wrapped future + */ + public static MultiDbConnectionFuture from(CompletableFuture> future, + Executor executor) { + return new MultiDbConnectionFuture<>(future, executor); + } + + @Override + protected CompletionStage wrap(CompletableFuture future) { + // We can't preserve K,V type parameters when wrapping arbitrary U types, + // so we just return the CompletableFuture directly + return future; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/RedisDatabaseImpl.java b/src/main/java/io/lettuce/core/failover/RedisDatabaseImpl.java index e24827b77..64cdc565f 100644 --- a/src/main/java/io/lettuce/core/failover/RedisDatabaseImpl.java +++ b/src/main/java/io/lettuce/core/failover/RedisDatabaseImpl.java @@ -61,7 +61,7 @@ public float getWeight() { return weight; } - public C getConnection() { + C getConnection() { return connection; } diff --git a/src/main/java/io/lettuce/core/failover/StatusTracker.java b/src/main/java/io/lettuce/core/failover/StatusTracker.java index 933932723..bd09e0000 100644 --- a/src/main/java/io/lettuce/core/failover/StatusTracker.java +++ b/src/main/java/io/lettuce/core/failover/StatusTracker.java @@ -6,9 +6,13 @@ import io.lettuce.core.failover.health.HealthStatusChangeEvent; import io.lettuce.core.failover.health.HealthStatusListener; import io.lettuce.core.failover.health.HealthStatusManager; +import io.lettuce.core.resource.ClientResources; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -21,14 +25,17 @@ class StatusTracker { private final HealthStatusManager healthStatusManager; - public StatusTracker(HealthStatusManager healthStatusManager) { + private final ScheduledExecutorService scheduler; + + public StatusTracker(HealthStatusManager healthStatusManager, ClientResources clientResources) { this.healthStatusManager = healthStatusManager; + this.scheduler = clientResources.eventExecutorGroup(); } /** * Waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven approach with * CountDownLatch to avoid polling. - * + * * @param endpoint the endpoint to wait for * @return the determined health status (HEALTHY or UNHEALTHY) * @throws RedisConnectionException if interrupted while waiting or if a timeout occurs @@ -84,4 +91,80 @@ public void onStatusChange(HealthStatusChangeEvent event) { } } + /** + * Asynchronously waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven approach + * with CompletableFuture to avoid blocking. + * + * @param endpoint the endpoint to wait for + * @return CompletableFuture that completes with the determined health status (HEALTHY or UNHEALTHY) + */ + public CompletableFuture waitForHealthStatusAsync(RedisURI endpoint) { + // First check if status is already determined + HealthStatus currentStatus = healthStatusManager.getHealthStatus(endpoint); + if (currentStatus != HealthStatus.UNKNOWN) { + return CompletableFuture.completedFuture(currentStatus); + } + + // Create a CompletableFuture to return + CompletableFuture future = new CompletableFuture<>(); + AtomicBoolean listenerRemoved = new AtomicBoolean(false); + + // Create a temporary listener for this specific endpoint + HealthStatusListener tempListener = new HealthStatusListener() { + + @Override + public void onStatusChange(HealthStatusChangeEvent event) { + if (event.getEndpoint().equals(endpoint) && event.getNewStatus() != HealthStatus.UNKNOWN) { + // Complete the future with the new status + if (future.complete(event.getNewStatus())) { + // Successfully completed, clean up listener + if (listenerRemoved.compareAndSet(false, true)) { + healthStatusManager.unregisterListener(endpoint, this); + } + } + } + } + + }; + + // Register the temporary listener + healthStatusManager.registerListener(endpoint, tempListener); + + // Double-check status after registering listener (race condition protection) + currentStatus = healthStatusManager.getHealthStatus(endpoint); + if (currentStatus != HealthStatus.UNKNOWN) { + // Status already determined, complete immediately + if (listenerRemoved.compareAndSet(false, true)) { + healthStatusManager.unregisterListener(endpoint, tempListener); + } + future.complete(currentStatus); + return future; + } + + // Set up timeout manually + long timeoutMs = healthStatusManager.getMaxWaitFor(endpoint); + + scheduler.schedule(() -> { + // Try to complete exceptionally with timeout + if (future.completeExceptionally( + new RedisConnectionException("Timeout while waiting for health check result for " + endpoint))) { + // Successfully completed with timeout, clean up listener + if (listenerRemoved.compareAndSet(false, true)) { + healthStatusManager.unregisterListener(endpoint, tempListener); + } + } + scheduler.shutdown(); + }, timeoutMs, TimeUnit.MILLISECONDS); + + // Clean up scheduler when future completes (either successfully or exceptionally) + future.whenComplete((status, throwable) -> { + // Ensure listener is removed + if (listenerRemoved.compareAndSet(false, true)) { + healthStatusManager.unregisterListener(endpoint, tempListener); + } + }); + + return future; + } + } diff --git a/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java b/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java index 3c5416c47..1ae044c1b 100644 --- a/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java @@ -1,9 +1,9 @@ package io.lettuce.core.failover; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static io.lettuce.TestTags.INTEGRATION_TEST; import java.util.List; import java.util.stream.Collectors; @@ -19,8 +19,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import com.google.gson.internal.reflect.ReflectionHelper; - import io.lettuce.core.RedisURI; import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; import io.lettuce.test.LettuceExtension; @@ -33,7 +31,7 @@ * @since 7.1 */ @ExtendWith(LettuceExtension.class) -@Tag("integration") +@Tag(INTEGRATION_TEST) class CircuitBreakerMetricsIntegrationTests extends MultiDbTestSupport { @Inject diff --git a/src/test/java/io/lettuce/core/failover/DatabaseCommandTrackerUnitTests.java b/src/test/java/io/lettuce/core/failover/DatabaseCommandTrackerUnitTests.java index 28118bdc7..5e1279063 100644 --- a/src/test/java/io/lettuce/core/failover/DatabaseCommandTrackerUnitTests.java +++ b/src/test/java/io/lettuce/core/failover/DatabaseCommandTrackerUnitTests.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; +import static io.lettuce.TestTags.UNIT_TEST; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +56,7 @@ * @author Ali Takavci * @since 7.4 */ -@Tag("unit") +@Tag(UNIT_TEST) class DatabaseCommandTrackerUnitTests { private DatabaseCommandTracker.CommandWriter mockWriter; @@ -84,7 +85,7 @@ private CircuitBreakerConfig getCBConfig(float failureRateThreshold, int minimum @Nested @DisplayName("Write Delegation Tests") - @Tag("unit") + @Tag(UNIT_TEST) class WriteDelegationTests { @Test @@ -161,7 +162,7 @@ void shouldDelegateBatchCommandWriteWhenCircuitBreakerIsClosed() { @Nested @DisplayName("Circuit Breaker Open State Tests") - @Tag("unit") + @Tag(UNIT_TEST) class CircuitBreakerOpenStateTests { @Test @@ -229,7 +230,7 @@ void shouldCompleteAllCommandsExceptionallyWhenCircuitBreakerIsOpenForBatchWrite @Nested @DisplayName("Timeout Exception Tracking Tests") - @Tag("unit") + @Tag(UNIT_TEST) class TimeoutExceptionTrackingTests { @Test @@ -304,7 +305,7 @@ void shouldNotRecordNonTimeoutExceptionsViaOnCompleteCallback() { @Nested @DisplayName("Channel Registration Tests") - @Tag("unit") + @Tag(UNIT_TEST) class ChannelRegistrationTests { @Test @@ -359,7 +360,7 @@ void shouldRemoveHandlerFromPipelineWhenChannelIsReset() { @Nested @DisplayName("Exception Handling Tests") - @Tag("unit") + @Tag(UNIT_TEST) class ExceptionHandlingTests { @Test @@ -409,7 +410,7 @@ void shouldRecordExceptionThrownDuringBatchWrite() { @Nested @DisplayName("Success Tracking Tests") - @Tag("unit") + @Tag(UNIT_TEST) class SuccessTrackingTests { @Test @@ -440,7 +441,7 @@ void shouldNotRecordSuccessViaOnCompleteCallback() { @Nested @DisplayName("Callback Attachment Tests") - @Tag("unit") + @Tag(UNIT_TEST) class CallbackAttachmentTests { @Test diff --git a/src/test/java/io/lettuce/core/failover/DatabaseEndpointCallbackTests.java b/src/test/java/io/lettuce/core/failover/DatabaseEndpointCallbackTests.java index f4d858579..b0914647e 100644 --- a/src/test/java/io/lettuce/core/failover/DatabaseEndpointCallbackTests.java +++ b/src/test/java/io/lettuce/core/failover/DatabaseEndpointCallbackTests.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import static org.awaitility.Awaitility.await; +import static io.lettuce.TestTags.UNIT_TEST; + import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.time.Duration; @@ -39,7 +41,7 @@ * * @author Ali Takavci */ -@Tag("unit") +@Tag(UNIT_TEST) class DatabaseEndpointCallbackTests { private ClientResources clientResources; @@ -73,7 +75,7 @@ private CircuitBreakerConfig getCBConfig(float failureRateThreshold, int minimum @Nested @DisplayName("Timeout Exception Tracking Tests") - @Tag("unit") + @Tag(UNIT_TEST) class TimeoutExceptionTrackingTests { @Test @@ -160,7 +162,7 @@ void shouldNotTrackSuccessViaCallback() { @Nested @DisplayName("Failover Behavior and Generation Tracking Tests") - @Tag("unit") + @Tag(UNIT_TEST) class FailoverBehaviorTests { @Test @@ -251,7 +253,7 @@ void shouldHandleMultipleTimeoutExceptionsCompletingAtDifferentTimes() { @Nested @DisplayName("Failover Behavior and Generation Tracking Tests") - @Tag("unit") + @Tag(UNIT_TEST) class FailoverBehaviorAndGenerationTrackingTests { @Test @@ -427,7 +429,7 @@ void shouldHandleConcurrentTimeoutExceptionsDuringFailover() throws Exception { @Nested @DisplayName("Concurrent Timeout Exception Tests") - @Tag("unit") + @Tag(UNIT_TEST) class ConcurrentTimeoutExceptionTests { @Test @@ -522,7 +524,7 @@ void shouldHandleRaceBetweenCompletionAndStateChange() throws Exception { @Nested @DisplayName("Edge Cases and Special Scenarios") - @Tag("unit") + @Tag(UNIT_TEST) class EdgeCaseTests { @Test diff --git a/src/test/java/io/lettuce/core/failover/DatabasePubSubEndpointTrackerTests.java b/src/test/java/io/lettuce/core/failover/DatabasePubSubEndpointTrackerTests.java index 079fc6c95..aeb365c97 100644 --- a/src/test/java/io/lettuce/core/failover/DatabasePubSubEndpointTrackerTests.java +++ b/src/test/java/io/lettuce/core/failover/DatabasePubSubEndpointTrackerTests.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; +import static io.lettuce.TestTags.UNIT_TEST; import java.util.ArrayList; import java.util.Collection; @@ -56,7 +57,7 @@ * @author Ali Takavci * @since 7.4 */ -@Tag("unit") +@Tag(UNIT_TEST) class DatabasePubSubEndpointTrackerTests { private ClientResources clientResources; @@ -90,7 +91,7 @@ private CircuitBreakerConfig getCBConfig(float failureRateThreshold, int minimum @Nested @DisplayName("DatabaseCommandTracker Integration Tests") - @Tag("unit") + @Tag(UNIT_TEST) class DatabaseCommandTrackerIntegrationTests { @Test @@ -159,7 +160,7 @@ public void onComplete(java.util.function.BiConsumer @Nested @DisplayName("Circuit Breaker Integration Tests") - @Tag("unit") + @Tag(UNIT_TEST) class CircuitBreakerIntegrationTests { @Test @@ -257,7 +258,7 @@ void shouldCompleteAllPubSubCommandsInBatchExceptionallyWhenCircuitBreakerIsOpen @Nested @DisplayName("Channel Lifecycle Tests") - @Tag("unit") + @Tag(UNIT_TEST) class ChannelLifecycleTests { @Test diff --git a/src/test/java/io/lettuce/core/failover/HealthCheckIntegrationTests.java b/src/test/java/io/lettuce/core/failover/HealthCheckIntegrationTests.java index 839116e46..43ca78d95 100644 --- a/src/test/java/io/lettuce/core/failover/HealthCheckIntegrationTests.java +++ b/src/test/java/io/lettuce/core/failover/HealthCheckIntegrationTests.java @@ -33,6 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.with; +import static io.lettuce.TestTags.INTEGRATION_TEST;; /** * Integration tests for health check functionality in MultiDbClient. @@ -41,7 +42,7 @@ * @since 7.1 */ @ExtendWith(LettuceExtension.class) -@Tag("integration") +@Tag(INTEGRATION_TEST) @DisplayName("HealthCheck Integration Tests") public class HealthCheckIntegrationTests extends MultiDbTestSupport { @@ -85,7 +86,7 @@ void extractRunIds() { @Nested @DisplayName("Health Check Configuration") - @Tag("integration") + @Tag(INTEGRATION_TEST) class HealthCheckConfigurationIntegrationTests { @Test @@ -305,7 +306,7 @@ void shouldConfigureHealthCheckIntervalAndTimeout() { @Nested @DisplayName("Health Check Lifecycle") - @Tag("integration") + @Tag(INTEGRATION_TEST) class HealthCheckLifecycleIntegrationTests { @Test @@ -450,7 +451,7 @@ void shouldTransitionFromUnknownToHealthy() { @Nested @DisplayName("Failover Integration") - @Tag("integration") + @Tag(INTEGRATION_TEST) class FailoverIntegrationTests { @Test diff --git a/src/test/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilderUnitTests.java b/src/test/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilderUnitTests.java new file mode 100644 index 000000000..3921962ef --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilderUnitTests.java @@ -0,0 +1,284 @@ +package io.lettuce.core.failover; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static io.lettuce.TestTags.UNIT_TEST; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.failover.health.HealthCheck; +import io.lettuce.core.failover.health.HealthStatus; +import io.lettuce.core.failover.health.HealthStatusManager; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; + +/** + * Unit tests for {@link MultiDbAsyncConnectionBuilder}. + * + * @author Ali Takavci + * @since 7.4 + */ +@Tag(UNIT_TEST) +class MultiDbAsyncConnectionBuilderUnitTests { + + @Mock + private HealthStatusManager healthStatusManager; + + @Mock + private MultiDbClientImpl client; + + @Mock + private StatefulRedisConnection mockConnection1; + + @Mock + private StatefulRedisConnection mockConnection2; + + @Mock + private HealthCheck healthCheck; + + private ClientResources resources; + + private MultiDbAsyncConnectionBuilder builder; + + private RedisURI uri1; + + private RedisURI uri2; + + private DatabaseConfig config1; + + private DatabaseConfig config2; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + resources = DefaultClientResources.create(); + + uri1 = RedisURI.create("redis://localhost:6379"); + uri2 = RedisURI.create("redis://localhost:6380"); + + config1 = DatabaseConfig.builder(uri1).weight(1.0f).clientOptions(ClientOptions.create()).build(); + + config2 = DatabaseConfig.builder(uri2).weight(0.5f).clientOptions(ClientOptions.create()).build(); + + builder = new MultiDbAsyncConnectionBuilder<>(healthStatusManager, resources, client); + } + + @Test + void constructorShouldInitializeFields() { + assertThat(builder).isNotNull(); + } + + @Test + void collectDatabasesWithEstablishedConnectionsShouldReturnSuccessfulConnections() { + // Given + Map>>> databaseFutures = new ConcurrentHashMap<>(); + + RedisDatabaseImpl> db1 = createMockDatabase(config1, mockConnection1); + RedisDatabaseImpl> db2 = createMockDatabase(config2, mockConnection2); + + databaseFutures.put(uri1, CompletableFuture.completedFuture(db1)); + databaseFutures.put(uri2, CompletableFuture.completedFuture(db2)); + + // When + Map>> result = builder + .collectDatabasesWithEstablishedConnections(databaseFutures); + + // Then + assertThat(result).hasSize(2); + assertThat(result).containsKeys(uri1, uri2); + assertThat(result.get(uri1)).isEqualTo(db1); + assertThat(result.get(uri2)).isEqualTo(db2); + } + + @Test + void collectDatabasesWithEstablishedConnectionsShouldHandlePartialFailure() { + // Given + Map>>> databaseFutures = new ConcurrentHashMap<>(); + + RedisDatabaseImpl> db1 = createMockDatabase(config1, mockConnection1); + + CompletableFuture>> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new RedisConnectionException("Connection failed")); + + databaseFutures.put(uri1, CompletableFuture.completedFuture(db1)); + databaseFutures.put(uri2, failedFuture); + + // When + Map>> result = builder + .collectDatabasesWithEstablishedConnections(databaseFutures); + + // Then + assertThat(result).hasSize(1); + assertThat(result).containsKey(uri1); + assertThat(result.get(uri1)).isEqualTo(db1); + } + + @Test + void collectDatabasesWithEstablishedConnectionsShouldThrowWhenAllFail() { + // Given + Map>>> databaseFutures = new ConcurrentHashMap<>(); + + CompletableFuture>> failedFuture1 = new CompletableFuture<>(); + failedFuture1.completeExceptionally(new RedisConnectionException("Connection 1 failed")); + + CompletableFuture>> failedFuture2 = new CompletableFuture<>(); + failedFuture2.completeExceptionally(new RedisConnectionException("Connection 2 failed")); + + databaseFutures.put(uri1, failedFuture1); + databaseFutures.put(uri2, failedFuture2); + + // When/Then + assertThatThrownBy(() -> builder.collectDatabasesWithEstablishedConnections(databaseFutures)) + .isInstanceOf(RedisConnectionException.class).hasMessageContaining("Failed to connect to any database") + .hasMessageContaining("2 connection(s) failed"); + } + + @Test + void collectHealthStatusesShouldWaitForAllHealthChecks() throws Exception { + // Given + RedisDatabaseImpl> db1 = createMockDatabaseWithHealthCheck(config1, + mockConnection1); + RedisDatabaseImpl> db2 = createMockDatabaseWithHealthCheck(config2, + mockConnection2); + + Map>> databases = new HashMap<>(); + databases.put(uri1, db1); + databases.put(uri2, db2); + + // Mock health status manager to return health statuses + when(healthStatusManager.getHealthStatus(uri1)).thenReturn(HealthStatus.HEALTHY); + when(healthStatusManager.getHealthStatus(uri2)).thenReturn(HealthStatus.HEALTHY); + + // When + CompletableFuture> future = builder.collectHealthStatuses(databases); + + // Then + assertThat(future).isNotNull(); + // Note: This test requires actual async execution which is hard to test in unit tests + // Integration tests should cover the full async flow + } + + @Test + void collectHealthStatusesShouldHandleDatabasesWithoutHealthChecks() throws Exception { + // Given + RedisDatabaseImpl> db1 = createMockDatabase(config1, mockConnection1); + RedisDatabaseImpl> db2 = createMockDatabase(config2, mockConnection2); + + Map>> databases = new HashMap<>(); + databases.put(uri1, db1); + databases.put(uri2, db2); + + // When + CompletableFuture> future = builder.collectHealthStatuses(databases); + + // Then + assertThat(future).isNotNull(); + Map result = future.get(); + assertThat(result).hasSize(2); + assertThat(result.get(uri1)).isEqualTo(HealthStatus.HEALTHY); + assertThat(result.get(uri2)).isEqualTo(HealthStatus.HEALTHY); + } + + @Test + void collectHealthStatusesShouldReturnUnhealthyStatuses() throws Exception { + // Given + RedisDatabaseImpl> db1 = createMockDatabase(config1, mockConnection1); + RedisDatabaseImpl> db2 = createMockDatabase(config2, mockConnection2); + + Map>> databases = new HashMap<>(); + databases.put(uri1, db1); + databases.put(uri2, db2); + + // When + CompletableFuture> future = builder.collectHealthStatuses(databases); + + // Then + assertThat(future).isNotNull(); + Map result = future.get(); + assertThat(result).hasSize(2); + // Without health checks, databases are assumed HEALTHY + assertThat(result.get(uri1)).isEqualTo(HealthStatus.HEALTHY); + assertThat(result.get(uri2)).isEqualTo(HealthStatus.HEALTHY); + } + + @Test + void handleDatabaseFuturesShouldCloseConnectionsOnHealthCheckFailure() { + // Given + Map>>> databaseFutures = new ConcurrentHashMap<>(); + + RedisDatabaseImpl> db1 = createMockDatabase(config1, mockConnection1); + RedisDatabaseImpl> db2 = createMockDatabase(config2, mockConnection2); + + databaseFutures.put(uri1, CompletableFuture.completedFuture(db1)); + databaseFutures.put(uri2, CompletableFuture.completedFuture(db2)); + + when(mockConnection1.isOpen()).thenReturn(true); + when(mockConnection2.isOpen()).thenReturn(true); + + // Mock health status manager to throw exception + when(healthStatusManager.getHealthStatus(any())).thenThrow(new RuntimeException("Health check failed")); + + // When + CompletableFuture>>> future = builder + .handleDatabaseFutures(databaseFutures); + + // Then - connections should be closed on error + // Note: Full verification requires integration test due to async nature + assertThat(future).isNotNull(); + } + + @Test + void handleDatabaseFuturesShouldHandleUnexpectedExceptions() { + // Given + Map>>> databaseFutures = new ConcurrentHashMap<>(); + + // Create a future that will throw when accessed + CompletableFuture>> badFuture = new CompletableFuture<>(); + badFuture.completeExceptionally(new RuntimeException("Unexpected error")); + + databaseFutures.put(uri1, badFuture); + + // When + CompletableFuture>>> future = builder + .handleDatabaseFutures(databaseFutures); + + // Then + assertThatThrownBy(() -> future.get()).isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RedisConnectionException.class); + } + + // Helper method to create mock database + private RedisDatabaseImpl> createMockDatabase(DatabaseConfig config, + StatefulRedisConnection connection) { + DatabaseEndpoint endpoint = mock(DatabaseEndpoint.class); + CircuitBreakerImpl circuitBreaker = mock(CircuitBreakerImpl.class); + return new RedisDatabaseImpl<>(config, connection, endpoint, circuitBreaker, null); + } + + // Helper method to create mock database with health check + private RedisDatabaseImpl> createMockDatabaseWithHealthCheck(DatabaseConfig config, + StatefulRedisConnection connection) { + DatabaseEndpoint endpoint = mock(DatabaseEndpoint.class); + CircuitBreakerImpl circuitBreaker = mock(CircuitBreakerImpl.class); + return new RedisDatabaseImpl<>(config, connection, endpoint, circuitBreaker, healthCheck); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/MultiDbClientConnectAsyncIntegrationTests.java b/src/test/java/io/lettuce/core/failover/MultiDbClientConnectAsyncIntegrationTests.java new file mode 100644 index 000000000..c2500405a --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/MultiDbClientConnectAsyncIntegrationTests.java @@ -0,0 +1,547 @@ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static io.lettuce.core.codec.StringCodec.UTF8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import org.awaitility.Durations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.RedisURI; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.TestFutures; + +/** + * Integration tests for {@link MultiDbClient#connectAsync(RedisCodec)} method. + * + * @author Ali Takavci + * @since 7.4 + */ +@ExtendWith(LettuceExtension.class) +@Tag(INTEGRATION_TEST) +class MultiDbClientConnectAsyncIntegrationTests extends MultiDbTestSupport { + + @SuppressWarnings("rawtypes") + private final Queue connections = new ConcurrentLinkedQueue<>(); + + @Inject + MultiDbClientConnectAsyncIntegrationTests(MultiDbClient client) { + super(client); + } + + @BeforeEach + void setUp() { + directClient1.connect().sync().flushall(); + directClient2.connect().sync().flushall(); + } + + @SuppressWarnings("rawtypes") + @AfterEach + void tearDown() { + while (!connections.isEmpty()) { + try { + ((StatefulRedisMultiDbConnection) connections.poll().get(2, TimeUnit.SECONDS)).closeAsync(); + } catch (Exception e) { + // Ignore + } + } + } + + @Test + void connectAsyncWithoutCodec() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(); + connections.add(future); + + assertThat((Object) future).isNotNull(); + assertThat((Object) future).isInstanceOf(MultiDbConnectionFuture.class); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + assertThat(connection.getTimeout()).isEqualTo(RedisURI.DEFAULT_TIMEOUT_DURATION); + } + + @Test + void connectAsyncWithByteArrayCodec() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(ByteArrayCodec.INSTANCE); + connections.add(future); + + assertThat((Object) future).isNotNull(); + + StatefulRedisMultiDbConnection byteConnection = future.get(10, TimeUnit.SECONDS); + + assertThat(byteConnection).isNotNull(); + assertThat(byteConnection.isOpen()).isTrue(); + } + + @Test + void connectAsyncShouldRejectNullCodec() { + assertThatThrownBy(() -> multiDbClient.connectAsync(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("codec must not be null"); + } + + @Test + void connectAsyncShouldCompleteSuccessfully() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + // Wait for completion + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + assertThat(connection.getCurrentEndpoint()).isNotNull(); + } + + @Test + void connectAsyncShouldAllowCommandExecution() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Execute a command + String result = TestFutures.getOrTimeout(connection.async().set("key1", "value1")); + assertThat(result).isEqualTo("OK"); + + String value = TestFutures.getOrTimeout(connection.async().get("key1")); + assertThat(value).isEqualTo("value1"); + } + + @Test + void connectAsyncShouldSupportDatabaseSwitching() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Set a key on the current database + TestFutures.awaitOrTimeout(connection.async().set("key1", "value1")); + + RedisURI currentEndpoint = connection.getCurrentEndpoint(); + RedisURI otherEndpoint = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(currentEndpoint)).findFirst().get(); + + // Switch to the other database + connection.switchTo(otherEndpoint); + + // Key should not exist on the other database + String value = TestFutures.getOrTimeout(connection.async().get("key1")); + assertThat(value).isNull(); + + // Set a different value on the other database + TestFutures.awaitOrTimeout(connection.async().set("key1", "value2")); + + // Switch back to the original database + connection.switchTo(currentEndpoint); + + // Original value should still be there + value = TestFutures.getOrTimeout(connection.async().get("key1")); + assertThat(value).isEqualTo("value1"); + } + + @Test + void connectAsyncShouldWaitForHealthChecks() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Verify that at least one database is healthy + await().atMost(Durations.TWO_SECONDS).pollInterval(Durations.ONE_HUNDRED_MILLISECONDS).untilAsserted(() -> { + boolean anyHealthy = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .anyMatch(uri -> connection.isHealthy(uri)); + assertThat(anyHealthy).isTrue(); + }); + } + + @Test + void connectAsyncShouldSupportMultipleConnections() throws Exception { + MultiDbConnectionFuture future1 = multiDbClient.connectAsync(UTF8); + MultiDbConnectionFuture future2 = multiDbClient.connectAsync(UTF8); + connections.add(future1); + connections.add(future2); + + StatefulRedisMultiDbConnection connection = future1.get(10, TimeUnit.SECONDS); + StatefulRedisMultiDbConnection connection2 = future2.get(10, TimeUnit.SECONDS); + + assertThat(connection).isNotNull(); + assertThat(connection2).isNotNull(); + assertThat(connection).isNotSameAs(connection2); + + assertThat(connection.isOpen()).isTrue(); + assertThat(connection2.isOpen()).isTrue(); + } + + @Test + void connectAsyncShouldHandleCompletionStageOperations() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + CompletableFuture resultFuture = future.toCompletableFuture().thenApply(conn -> { + return "connected"; + }); + + String result = resultFuture.get(10, TimeUnit.SECONDS); + assertThat(result).isEqualTo("connected"); + StatefulRedisMultiDbConnection connection = future.get(); + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + } + + @Test + void connectAsyncShouldSupportWhenComplete() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + CompletableFuture completionTracker = new CompletableFuture<>(); + + future.whenComplete((conn, throwable) -> { + if (throwable == null) { + completionTracker.complete(true); + } else { + completionTracker.completeExceptionally(throwable); + } + }); + + Boolean completed = completionTracker.get(10, TimeUnit.SECONDS); + assertThat(completed).isTrue(); + StatefulRedisMultiDbConnection connection = future.get(); + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + } + + @Test + void connectAsyncShouldAllowSyncOperations() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Use sync API + String result = connection.sync().set("syncKey", "syncValue"); + assertThat(result).isEqualTo("OK"); + + String value = connection.sync().get("syncKey"); + assertThat(value).isEqualTo("syncValue"); + } + + @Test + void connectAsyncShouldAllowReactiveOperations() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Use reactive API + String result = connection.reactive().set("reactiveKey", "reactiveValue").block(Duration.ofSeconds(5)); + assertThat(result).isEqualTo("OK"); + + String value = connection.reactive().get("reactiveKey").block(Duration.ofSeconds(5)); + assertThat(value).isEqualTo("reactiveValue"); + } + + @Test + void connectAsyncShouldProvideAllEndpoints() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + Iterable endpoints = connection.getEndpoints(); + assertThat(endpoints).isNotNull(); + + long count = StreamSupport.stream(endpoints.spliterator(), false).count(); + // We have 3 databases configured in MultiDbTestSupport (DB1, DB2, DB3) + // Only databases that successfully connect AND remain open are included + assertThat(count).isGreaterThanOrEqualTo(2).isLessThanOrEqualTo(3); + } + + @Test + void connectAsyncShouldProvideAccessToAllConnectedDatabases() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + + StatefulRedisMultiDbConnection connection = future.get(10, TimeUnit.SECONDS); + + // Verify we can access all connected database endpoints + Iterable endpoints = connection.getEndpoints(); + assertThat(endpoints).isNotNull(); + + long count = StreamSupport.stream(endpoints.spliterator(), false).count(); + assertThat(count).isGreaterThan(0); + + // Verify each endpoint is accessible + for (RedisURI endpoint : endpoints) { + assertThat(endpoint).isNotNull(); + assertThat(endpoint.getHost()).isNotEmpty(); + assertThat(endpoint.getPort()).isGreaterThan(0); + } + } + + /** + * Edge case: Test that connection handles rapid successive calls correctly. + */ + @Test + void connectAsyncShouldHandleRapidSuccessiveCalls() throws Exception { + // Create multiple connections rapidly + MultiDbConnectionFuture future1 = multiDbClient.connectAsync(UTF8); + MultiDbConnectionFuture future2 = multiDbClient.connectAsync(UTF8); + MultiDbConnectionFuture future3 = multiDbClient.connectAsync(UTF8); + connections.add(future1); + connections.add(future2); + connections.add(future3); + + // All should complete successfully + StatefulRedisMultiDbConnection conn1 = future1.get(10, TimeUnit.SECONDS); + StatefulRedisMultiDbConnection conn2 = future2.get(10, TimeUnit.SECONDS); + StatefulRedisMultiDbConnection conn3 = future3.get(10, TimeUnit.SECONDS); + + assertThat(conn1).isNotNull(); + assertThat(conn2).isNotNull(); + assertThat(conn3).isNotNull(); + + assertThat(conn1.isOpen()).isTrue(); + assertThat(conn2.isOpen()).isTrue(); + assertThat(conn3.isOpen()).isTrue(); + + } + + /** + * Edge case: Test that connection can be closed immediately after creation. + */ + @Test + void connectAsyncShouldAllowImmediateClose() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + + StatefulRedisMultiDbConnection conn = future.get(10, TimeUnit.SECONDS); + + // Close immediately + conn.close(); + + // Verify it's closed + assertThat(conn.isOpen()).isFalse(); + } + + /** + * Edge case: Test async close operation. + */ + @Test + void connectAsyncShouldSupportAsyncClose() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + + StatefulRedisMultiDbConnection conn = future.get(10, TimeUnit.SECONDS); + + // Close asynchronously + CompletableFuture closeFuture = conn.closeAsync(); + closeFuture.get(5, TimeUnit.SECONDS); + + // Verify it's closed + assertThat(conn.isOpen()).isFalse(); + } + + /** + * Edge case: Test connection with all databases having the same weight. + */ + @Test + void connectAsyncShouldHandleEqualWeights() throws Exception { + // Create a client with equal weights + DatabaseConfig db1 = DatabaseConfig.builder(MultiDbTestSupport.URI1).weight(1.0f).build(); + DatabaseConfig db2 = DatabaseConfig.builder(MultiDbTestSupport.URI2).weight(1.0f).build(); + + MultiDbClient equalWeightClient = MultiDbClient.create(java.util.Arrays.asList(db1, db2)); + + try { + MultiDbConnectionFuture future = equalWeightClient.connectAsync(UTF8); + + StatefulRedisMultiDbConnection conn = future.get(10, TimeUnit.SECONDS); + + assertThat(conn).isNotNull(); + assertThat(conn.isOpen()).isTrue(); + + // Verify we can execute commands + String result = conn.sync().ping(); + assertThat(result).isEqualTo("PONG"); + + } finally { + equalWeightClient.shutdown(); + } + } + + /** + * Test that connectAsync completes successfully even with partial database failures. + */ + @Test + void connectAsyncShouldSucceedWithPartialFailures() throws Exception { + // Create a client with one valid and one invalid endpoint + DatabaseConfig validDb = DatabaseConfig.builder(MultiDbTestSupport.URI1).weight(1.0f).build(); + DatabaseConfig invalidDb = DatabaseConfig.builder(RedisURI.create("redis://localhost:9999")).weight(0.5f).build(); + + MultiDbClient partialClient = MultiDbClient.create(java.util.Arrays.asList(validDb, invalidDb)); + + try { + MultiDbConnectionFuture future = partialClient.connectAsync(UTF8); + connections.add(future); + + StatefulRedisMultiDbConnection conn = future.get(15, TimeUnit.SECONDS); + + assertThat(conn).isNotNull(); + assertThat(conn.isOpen()).isTrue(); + + // Should be able to execute commands on the valid database + String result = conn.sync().ping(); + assertThat(result).isEqualTo("PONG"); + + } finally { + partialClient.shutdown(); + } + } + + /** + * Test that connectAsync fails when all databases are unreachable. + */ + @Test + void connectAsyncShouldFailWhenAllDatabasesUnreachable() { + // Create a client with only invalid endpoints + DatabaseConfig invalidDb1 = DatabaseConfig.builder(RedisURI.create("redis://localhost:9998")).weight(1.0f).build(); + DatabaseConfig invalidDb2 = DatabaseConfig.builder(RedisURI.create("redis://localhost:9999")).weight(0.5f).build(); + + MultiDbClient failClient = MultiDbClient.create(java.util.Arrays.asList(invalidDb1, invalidDb2)); + + try { + MultiDbConnectionFuture future = failClient.connectAsync(UTF8); + + assertThatThrownBy(() -> future.get(15, TimeUnit.SECONDS)).isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(io.lettuce.core.RedisConnectionException.class) + .hasMessageContaining("Failed to connect to any database"); + } finally { + failClient.shutdown(); + } + } + + /** + * Test that connectAsync with null codec throws IllegalArgumentException. + */ + @Test + void connectAsyncShouldThrowOnNullCodec() { + assertThatThrownBy(() -> multiDbClient.connectAsync(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("codec must not be null"); + } + + /** + * Test that connectAsync can be called multiple times and creates independent connections. + */ + @Test + void connectAsyncShouldAllowMultipleConnections() throws Exception { + MultiDbConnectionFuture future1 = multiDbClient.connectAsync(UTF8); + MultiDbConnectionFuture future2 = multiDbClient.connectAsync(UTF8); + connections.add(future1); + connections.add(future2); + + StatefulRedisMultiDbConnection conn1 = future1.get(10, TimeUnit.SECONDS); + StatefulRedisMultiDbConnection conn2 = future2.get(10, TimeUnit.SECONDS); + + assertThat(conn1).isNotNull(); + assertThat(conn2).isNotNull(); + assertThat(conn1).isNotSameAs(conn2); + + assertThat(conn1.isOpen()).isTrue(); + assertThat(conn2.isOpen()).isTrue(); + + // Both should work independently + assertThat(conn1.sync().ping()).isEqualTo("PONG"); + assertThat(conn2.sync().ping()).isEqualTo("PONG"); + + } + + /** + * Test that connectAsync with ByteArrayCodec works correctly. + */ + @Test + void connectAsyncShouldWorkWithByteArrayCodec() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(ByteArrayCodec.INSTANCE); + connections.add(future); + + StatefulRedisMultiDbConnection conn = future.get(10, TimeUnit.SECONDS); + + assertThat(conn).isNotNull(); + assertThat(conn.isOpen()).isTrue(); + + // Test basic operations with byte arrays + byte[] key = "testkey".getBytes(); + byte[] value = "testvalue".getBytes(); + + conn.sync().set(key, value); + byte[] retrieved = conn.sync().get(key); + + assertThat(retrieved).isEqualTo(value); + + } + + /** + * Test that connectAsync future can be composed with other futures. + *

+ * This test verifies that MultiDbConnectionFuture protects users from deadlocks by ensuring callbacks always run on a + * separate thread, even when using thenApply() (which normally runs on the completing thread). + */ + @Test + void connectAsyncFutureShouldBeComposable() throws Exception { + MultiDbConnectionFuture future = multiDbClient.connectAsync(UTF8); + connections.add(future); + + // Safe to use thenApply() with blocking sync calls because MultiDbConnectionFuture + // automatically executes callbacks on a separate thread + CompletableFuture pingFuture = future.thenApply(conn -> { + return conn.sync().ping(); + }).toCompletableFuture(); + + String result = pingFuture.get(10, TimeUnit.SECONDS); + assertThat(result).isEqualTo("PONG"); + future.get().close(); + } + + /** + * Test that connectAsync future handles exceptions properly in composition. + */ + @Test + void connectAsyncFutureShouldHandleExceptionsInComposition() { + // Create a client with invalid endpoint + DatabaseConfig invalidDb = DatabaseConfig.builder(RedisURI.create("redis://localhost:9999")).weight(1.0f).build(); + MultiDbClient failClient = MultiDbClient.create(java.util.Arrays.asList(invalidDb)); + + try { + MultiDbConnectionFuture future = failClient.connectAsync(UTF8); + + CompletableFuture composedFuture = future.toCompletableFuture().thenApply(conn -> conn.sync().ping()) + .exceptionally(throwable -> "ERROR: " + throwable.getMessage()); + + String result = composedFuture.get(15, TimeUnit.SECONDS); + assertThat(result).startsWith("ERROR:"); + } catch (Exception e) { + // Expected + } finally { + failClient.shutdown(); + } + } + +} diff --git a/src/test/java/io/lettuce/core/failover/MultiDbClientThreadLocalOptionsIntegrationTests.java b/src/test/java/io/lettuce/core/failover/MultiDbClientThreadLocalOptionsIntegrationTests.java new file mode 100644 index 000000000..7feaee7c2 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/MultiDbClientThreadLocalOptionsIntegrationTests.java @@ -0,0 +1,349 @@ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.awaitility.Durations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SocketOptions; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.core.failover.health.HealthCheckStrategySupplier; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.ReflectionTestUtils; +import io.lettuce.test.settings.TestSettings; + +/** + * Tests to verify ThreadLocal ClientOptions behavior in MultiDbClientImpl, particularly in async contexts where ThreadLocal can + * cause race conditions. + * + * @author Ali Takavci + */ +@ExtendWith(LettuceExtension.class) +@Tag(INTEGRATION_TEST) +class MultiDbClientThreadLocalOptionsIntegrationTests { + + private static final RedisURI URI1 = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port()).build(); + + private static final RedisURI URI2 = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port() + 1).build(); + + private MultiDbClient client; + + private StatefulRedisMultiDbConnection connection; + + @BeforeEach + void setUp() { + // Will be created per test with specific options + } + + @AfterEach + void tearDown() { + if (connection != null && connection.isOpen()) { + connection.close(); + } + if (client != null) { + client.shutdown(); + } + } + + /** + * Test that different databases can have different ClientOptions in sync connect. This should work because sync connect + * uses ThreadLocal correctly (set -> use -> reset on same thread). + */ + @Test + void syncConnectShouldRespectPerDatabaseOptions() { + // Create two databases with DISTINCT options that we can verify + ClientOptions options1 = ClientOptions.builder().autoReconnect(true) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build()).build(); + + ClientOptions options2 = ClientOptions.builder().autoReconnect(false) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).build()).build(); + + DatabaseConfig db1 = DatabaseConfig.builder(URI1).weight(1.0f).clientOptions(options1).build(); + + DatabaseConfig db2 = DatabaseConfig.builder(URI2).weight(0.5f).clientOptions(options2).build(); + + client = MultiDbClient.create(Arrays.asList(db1, db2)); + + // This should work - sync connect uses ThreadLocal correctly + connection = client.connect(StringCodec.UTF8); + + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + + // Verify that each database connection has the correct options + StatefulRedisConnection conn1 = ((RedisDatabaseImpl) connection.getDatabase(URI1)).getConnection(); + StatefulRedisConnection conn2 = ((RedisDatabaseImpl) connection.getDatabase(URI2)).getConnection(); + + // Verify database 1 has options1 + assertThat(conn1.getOptions().isAutoReconnect()).isTrue(); + assertThat(conn1.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(5)); + + // Verify database 2 has options2 + assertThat(conn2.getOptions().isAutoReconnect()).isFalse(); + assertThat(conn2.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(10)); + } + + /** + * Test that async connect with different ClientOptions per database properly applies options to each database connection. + */ + @Test + void asyncConnectWithDifferentOptionsPerDatabase() throws Exception { + // Create two databases with DISTINCT options that we can verify + ClientOptions options1 = ClientOptions.builder().autoReconnect(true) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build()).build(); + + ClientOptions options2 = ClientOptions.builder().autoReconnect(false) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).build()).build(); + + DatabaseConfig db1 = DatabaseConfig.builder(URI1).weight(1.0f).clientOptions(options1).build(); + + DatabaseConfig db2 = DatabaseConfig.builder(URI2).weight(0.5f).clientOptions(options2).build(); + + client = MultiDbClient.create(Arrays.asList(db1, db2)); + + // Connect asynchronously + MultiDbConnectionFuture future = client.connectAsync(StringCodec.UTF8); + + connection = future.get(10, TimeUnit.SECONDS); + + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + + // Verify that each database connection has the correct options + StatefulRedisConnection conn1 = ((RedisDatabaseImpl) connection.getDatabase(URI1)).getConnection(); + StatefulRedisConnection conn2 = ((RedisDatabaseImpl) connection.getDatabase(URI2)).getConnection(); + + // Verify database 1 has options1 + assertThat(conn1.getOptions().isAutoReconnect()).isTrue(); + assertThat(conn1.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(5)); + + // Verify database 2 has options2 + assertThat(conn2.getOptions().isAutoReconnect()).isFalse(); + assertThat(conn2.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(10)); + } + + /** + * Test that verifies multiple parallel async connections each get the correct per-database options. This test ensures that + * ThreadLocal options don't leak between different connection attempts. + */ + @Test + void parallelAsyncConnectsShouldNotShareThreadLocalOptions() throws Exception { + // Create databases with distinct options + ClientOptions options1 = ClientOptions.builder().autoReconnect(true) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(3)).build()).build(); + + ClientOptions options2 = ClientOptions.builder().autoReconnect(false) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(7)).build()).build(); + + DatabaseConfig db1 = DatabaseConfig.builder(URI1).weight(1.0f).clientOptions(options1).build(); + + DatabaseConfig db2 = DatabaseConfig.builder(URI2).weight(0.5f).clientOptions(options2).build(); + + client = MultiDbClient.create(Arrays.asList(db1, db2)); + + // Create multiple connections in parallel to increase chance of race condition if it exists + int numConnections = 5; + List>> futures = new ArrayList<>(); + + for (int i = 0; i < numConnections; i++) { + MultiDbConnectionFuture future = client.connectAsync(StringCodec.UTF8); + futures.add(future.toCompletableFuture()); + } + + // Wait for all to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS); + + // Verify all connections have correct per-database options + for (CompletableFuture> future : futures) { + StatefulRedisMultiDbConnection conn = future.get(); + assertThat(conn).isNotNull(); + assertThat(conn.isOpen()).isTrue(); + + // Verify that each database connection has the correct options + StatefulRedisConnection conn1 = ((RedisDatabaseImpl) conn.getDatabase(URI1)).getConnection(); + StatefulRedisConnection conn2 = ((RedisDatabaseImpl) conn.getDatabase(URI2)).getConnection(); + + // Verify database 1 has options1 (autoReconnect=true, connectTimeout=3s) + assertThat(conn1.getOptions().isAutoReconnect()).isTrue(); + assertThat(conn1.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(3)); + + // Verify database 2 has options2 (autoReconnect=false, connectTimeout=7s) + assertThat(conn2.getOptions().isAutoReconnect()).isFalse(); + assertThat(conn2.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(7)); + + conn.close(); + } + } + + private static class OperationInfo { + + @SuppressWarnings("unused") + final String operation; + + final Thread thread; + + final ClientOptions options; + + public OperationInfo(String operation, Thread thread, ClientOptions options) { + this.operation = operation; + this.thread = thread; + this.options = options; + } + + } + + private static class ThreadLocalWrapper extends ThreadLocal { + + ConcurrentLinkedQueue operations = new ConcurrentLinkedQueue<>(); + + @Override + public void remove() { + operations.add(new OperationInfo("remove", Thread.currentThread(), super.get())); + super.remove(); + } + + @Override + public void set(ClientOptions value) { + operations.add(new OperationInfo("set", Thread.currentThread(), value)); + super.set(value); + } + + } + + /** + * Test that verifies ThreadLocal is properly cleaned up after sync connect. This should pass because sync connect uses + * try-finally to clean up ThreadLocal. + */ + @Test + void syncConnectShouldCleanUpThreadLocal() { + // Create databases with distinct options + ClientOptions options1 = ClientOptions.builder().autoReconnect(true) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(3)).build()).build(); + + ClientOptions options2 = ClientOptions.builder().autoReconnect(false) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(7)).build()).build(); + + DatabaseConfig db1 = DatabaseConfig.builder(URI1).weight(1.0f).clientOptions(options1) + .healthCheckStrategySupplier(HealthCheckStrategySupplier.NO_HEALTH_CHECK).build(); + + DatabaseConfig db2 = DatabaseConfig.builder(URI2).weight(0.5f).clientOptions(options2) + .healthCheckStrategySupplier(HealthCheckStrategySupplier.NO_HEALTH_CHECK).build(); + + client = MultiDbClient.create(Arrays.asList(db1, db2)); + + // Wrap ThreadLocal to track set/remove operations + ThreadLocalWrapper wrapper = new ThreadLocalWrapper(); + + ReflectionTestUtils.setField(client, "localClientOptions", wrapper); + + // Connect + connection = client.connect(StringCodec.UTF8); + assertThat(connection).isNotNull(); + + StatefulRedisMultiDbConnection connection2 = client.connect(StringCodec.UTF8); + assertThat(connection2).isNotNull(); + connection2.close(); + + // there should be one set and one corresponding remove for each database in connectAsync + assertThat(wrapper.operations).hasSize(8); + assertThat(wrapper.operations).extracting("operation").containsExactly("set", "remove", "set", "remove", "set", + "remove", "set", "remove"); + // it contains 4 sets with non null options object and the exact same values should show up in 4 removes with exact same + // threads and options instances. + // so each pair received from queue should be matching with threads and options objects + for (int i = 0; i < 4; i++) { + OperationInfo set = wrapper.operations.poll(); + OperationInfo remove = wrapper.operations.poll(); + assertThat(set.options).isNotNull(); + assertThat(set.thread).isEqualTo(remove.thread); + assertThat(set.options).isSameAs(remove.options); + } + + } + + /** + * Test that verifies ThreadLocal set/reset pairs are completed synchronously on the calling thread when connectAsync() + * returns. This test verifies: 1) For multiple databases, each gets its own set/reset pair on the calling thread 2) All + * pairs are completed before connectAsync() returns 3) Async callbacks run on a different thread (event loop) and have no + * timing relationship with resetOptions() 4) Despite immediate reset, correct options are applied to each database + */ + @Test + void asyncConnectShouldCompleteThreadLocalOpsOnCallingThread() throws Exception { + // Create two databases with distinct options + ClientOptions options1 = ClientOptions.builder().autoReconnect(true) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(8)).build()).build(); + + ClientOptions options2 = ClientOptions.builder().autoReconnect(false) + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(12)).build()).build(); + + DatabaseConfig db1 = DatabaseConfig.builder(URI1).weight(1.0f).clientOptions(options1) + .healthCheckStrategySupplier(HealthCheckStrategySupplier.NO_HEALTH_CHECK).build(); + + DatabaseConfig db2 = DatabaseConfig.builder(URI2).weight(0.5f).clientOptions(options2) + .healthCheckStrategySupplier(HealthCheckStrategySupplier.NO_HEALTH_CHECK).build(); + + client = MultiDbClient.create(Arrays.asList(db1, db2)); + + // Track ThreadLocal operations and async callback thread + ThreadLocalWrapper wrapper = new ThreadLocalWrapper(); + AtomicReference asyncCallbackThread = new AtomicReference<>(); + + ReflectionTestUtils.setField(client, "localClientOptions", wrapper); + + MultiDbConnectionFuture future = client.connectAsync(StringCodec.UTF8); + + // Capture the async callback thread + future.whenComplete((conn, throwable) -> { + asyncCallbackThread.set(Thread.currentThread()); + }); + + // Once connectAsync() returns, all set/reset pairs should be complete + assertThat(wrapper.operations).hasSizeGreaterThanOrEqualTo(4); // At least 2 sets and 2 removes + assertThat(wrapper.operations).extracting("operation").contains("set", "remove", "set", "remove"); + + // Verify all ThreadLocal operations happened on the calling thread + for (OperationInfo op : wrapper.operations) { + assertThat(op.thread).isEqualTo(Thread.currentThread()); + } + + connection = future.get(10, TimeUnit.SECONDS); + + assertThat(connection).isNotNull(); + assertThat(connection.isOpen()).isTrue(); + + // Verify the async callback runs on a different thread (event loop thread) + await().atMost(Durations.ONE_SECOND).pollInterval(Durations.ONE_HUNDRED_MILLISECONDS) + .untilAsserted(() -> assertThat(asyncCallbackThread.get()).isNotNull()); + assertThat(asyncCallbackThread.get()).isNotEqualTo(Thread.currentThread()); + + // Verify that the connection has the correct options despite resetOptions() being called immediately + StatefulRedisConnection conn1 = ((RedisDatabaseImpl) connection.getDatabase(URI1)).getConnection(); + StatefulRedisConnection conn2 = ((RedisDatabaseImpl) connection.getDatabase(URI2)).getConnection(); + + assertThat(conn1.getOptions().isAutoReconnect()).isTrue(); + assertThat(conn1.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(8)); + + assertThat(conn2.getOptions().isAutoReconnect()).isFalse(); + assertThat(conn2.getOptions().getSocketOptions().getConnectTimeout()).isEqualTo(Duration.ofSeconds(12)); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/MultiDbOutboundHandlerUnitTests.java b/src/test/java/io/lettuce/core/failover/MultiDbOutboundHandlerUnitTests.java index 9768df063..932c4ffd3 100644 --- a/src/test/java/io/lettuce/core/failover/MultiDbOutboundHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/failover/MultiDbOutboundHandlerUnitTests.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import static io.lettuce.TestTags.UNIT_TEST; import java.io.IOException; import java.util.ArrayList; @@ -53,7 +54,7 @@ * @author Ali Takavci * @since 7.4 */ -@Tag("unit") +@Tag(UNIT_TEST) class MultiDbOutboundHandlerUnitTests { private MultiDbOutboundHandler handler; diff --git a/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java b/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java index c09d61ee4..941518409 100644 --- a/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java +++ b/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java @@ -62,11 +62,11 @@ public void tearDownMultiDb() { directClient3.shutdown(); } - public static final RedisURI URI1 = RedisURI.create(TestSettings.host(), TestSettings.port()); + public static final RedisURI URI1 = RedisURI.create(TestSettings.host(), TestSettings.port(10)); - public static final RedisURI URI2 = RedisURI.create(TestSettings.host(), TestSettings.port(1)); + public static final RedisURI URI2 = RedisURI.create(TestSettings.host(), TestSettings.port(11)); - public static final RedisURI URI3 = RedisURI.create(TestSettings.host(), TestSettings.port(5)); + public static final RedisURI URI3 = RedisURI.create(TestSettings.host(), TestSettings.port(12)); /* * DBs configured with Disable health checks for testing CB detected failures without interference from health checks diff --git a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java index bd54d5d59..d8d52695d 100644 --- a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java +++ b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.Closeable; import java.lang.reflect.Field; @@ -98,4 +99,16 @@ void shutdownShutsDownResourcesAfterChannels() throws Exception { assertThat(future).isDone(); } + @Test + void connectAsyncShouldRejectNullCodec() { + MultiDbClient client = MultiDbClient.create(MultiDbTestSupport.DBs); + + try { + assertThatThrownBy(() -> client.connectAsync(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("codec must not be null"); + } finally { + client.shutdown(); + } + } + } diff --git a/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java index d271470ec..8433d0457 100644 --- a/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java @@ -496,12 +496,14 @@ void shouldHandleConcurrentAddsAndRemovesOnMultipleUris() throws Exception { StatefulRedisMultiDbConnection connection = multiDbClient.connect(); int initialEndpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false).collect(Collectors.toList()) .size(); + int portOffset = StreamSupport.stream(connection.getEndpoints().spliterator(), false).mapToInt(u -> u.getPort()).max() + .orElse(0) + 1; // Create multiple URIs RedisURI[] uris = new RedisURI[5]; AtomicInteger[] addCounts = new AtomicInteger[5]; AtomicInteger[] removeCount = new AtomicInteger[5]; for (int i = 0; i < 5; i++) { - uris[i] = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port(6 + i)) + uris[i] = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port(portOffset + i)) .withPassword(TestSettings.password()).build(); addCounts[i] = new AtomicInteger(0); removeCount[i] = new AtomicInteger(0); diff --git a/src/test/java/io/lettuce/core/failover/health/PingStrategyIntegrationTests.java b/src/test/java/io/lettuce/core/failover/health/PingStrategyIntegrationTests.java index 1ecafa865..1c4f8fd4f 100644 --- a/src/test/java/io/lettuce/core/failover/health/PingStrategyIntegrationTests.java +++ b/src/test/java/io/lettuce/core/failover/health/PingStrategyIntegrationTests.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static io.lettuce.TestTags.INTEGRATION_TEST; /** * Integration tests for {@link PingStrategy} using Toxiproxy for network failure simulation. @@ -34,7 +35,7 @@ * @since 7.1 */ @ExtendWith(LettuceExtension.class) -@Tag("integration") +@Tag(INTEGRATION_TEST) @DisplayName("PingStrategy Integration Tests") public class PingStrategyIntegrationTests extends MultiDbTestSupport { diff --git a/src/test/java/io/lettuce/core/failover/metrics/LockFreeSlidingTimeWindowMetricsUnitTests.java b/src/test/java/io/lettuce/core/failover/metrics/LockFreeSlidingTimeWindowMetricsUnitTests.java index e820179bf..b2251c0c0 100644 --- a/src/test/java/io/lettuce/core/failover/metrics/LockFreeSlidingTimeWindowMetricsUnitTests.java +++ b/src/test/java/io/lettuce/core/failover/metrics/LockFreeSlidingTimeWindowMetricsUnitTests.java @@ -13,6 +13,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.offset; +import static io.lettuce.TestTags.UNIT_TEST; /** * Unit tests for lock-free sliding window metrics implementation. @@ -20,7 +21,7 @@ * @author Ali Takavci * @since 7.1 */ -@Tag("unit") +@Tag(UNIT_TEST) @DisplayName("Lock-Free Sliding Window Metrics") class LockFreeSlidingTimeWindowMetricsUnitTests { diff --git a/src/test/resources/docker-env/docker-compose.yml b/src/test/resources/docker-env/docker-compose.yml index cafcbeb98..1f5b9e652 100644 --- a/src/test/resources/docker-env/docker-compose.yml +++ b/src/test/resources/docker-env/docker-compose.yml @@ -21,12 +21,15 @@ services: container_name: redis-failover environment: - PORT=6487 - - NODES=2 + - NODES=5 volumes: - ${REDIS_ENV_WORK_DIR}/redis-standalone-multi/work:/redis/work:rw ports: - "6487:6487" - "6488:6488" + - "6489:6489" + - "6490:6490" + - "6491:6491" networks: - redis-network