From f64e10d188b79676c7cc390279e2b211f63c13ed Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 17 Jul 2025 11:21:09 +0300 Subject: [PATCH 1/7] Added support for Pub/Sub in MultiDBClient --- redis/client.py | 4 +- redis/multidb/client.py | 129 +++++++++++++++++++++++++++++- redis/multidb/command_executor.py | 84 +++++++++++++++++-- redis/multidb/event.py | 49 ++++++++++++ 4 files changed, 256 insertions(+), 10 deletions(-) diff --git a/redis/client.py b/redis/client.py index 060fc29493..adb57d404e 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1217,6 +1217,7 @@ def run_in_thread( sleep_time: float = 0.0, daemon: bool = False, exception_handler: Optional[Callable] = None, + pubsub = None ) -> "PubSubWorkerThread": for channel, handler in self.channels.items(): if handler is None: @@ -1230,8 +1231,9 @@ def run_in_thread( f"Shard Channel: '{s_channel}' has no handler registered" ) + pubsub = self if pubsub is None else pubsub thread = PubSubWorkerThread( - self, sleep_time, daemon=daemon, exception_handler=exception_handler + pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler ) thread.start() return thread diff --git a/redis/multidb/client.py b/redis/multidb/client.py index ed6883b110..b70784a892 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -1,8 +1,9 @@ import threading import socket -from typing import List, Any, Callable +from typing import List, Any, Callable, Optional from redis.background import BackgroundScheduler +from redis.client import PubSubWorkerThread from redis.exceptions import ConnectionError, TimeoutError from redis.commands import RedisModuleCommands, CoreCommands, SentinelCommands from redis.multidb.command_executor import DefaultCommandExecutor @@ -193,6 +194,17 @@ def transaction(self, func: Callable[["Pipeline"], None], *watches, **options): return self.command_executor.execute_transaction(func, *watches, *options) + def pubsub(self, **kwargs): + """ + Return a Publish/Subscribe object. With this object, you can + subscribe to channels and listen for messages that get published to + them. + """ + if not self.initialized: + self.initialize() + + return PubSub(self, **kwargs) + def _check_db_health(self, database: AbstractDatabase) -> None: """ Runs health checks on the given database until first failure. @@ -296,3 +308,118 @@ def execute(self) -> List[Any]: self._client.initialize() return self._client.command_executor.execute_pipeline(tuple(self._command_stack)) + +class PubSub: + """ + PubSub object for multi database client. + """ + def __init__(self, client: MultiDBClient, **kwargs): + self._client = client + self._client.command_executor.pubsub(**kwargs) + + def __enter__(self) -> "PubSub": + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.reset() + + def __del__(self) -> None: + try: + # if this object went out of scope prior to shutting down + # subscriptions, close the connection manually before + # returning it to the connection pool + self.reset() + except Exception: + pass + + def reset(self) -> None: + pass + + def close(self) -> None: + self.reset() + + @property + def subscribed(self) -> bool: + return self._client.command_executor.active_pubsub.subscribed + + def psubscribe(self, *args, **kwargs): + """ + Subscribe to channel patterns. Patterns supplied as keyword arguments + expect a pattern name as the key and a callable as the value. A + pattern's callable will be invoked automatically when a message is + received on that pattern rather than producing a message via + ``listen()``. + """ + return self._client.command_executor.execute_pubsub_method('psubscribe', *args, **kwargs) + + def punsubscribe(self, *args): + """ + Unsubscribe from the supplied patterns. If empty, unsubscribe from + all patterns. + """ + return self._client.command_executor.execute_pubsub_method('punsubscribe', *args) + + def subscribe(self, *args, **kwargs): + """ + Subscribe to channels. Channels supplied as keyword arguments expect + a channel name as the key and a callable as the value. A channel's + callable will be invoked automatically when a message is received on + that channel rather than producing a message via ``listen()`` or + ``get_message()``. + """ + return self._client.command_executor.execute_pubsub_method('subscribe', *args, **kwargs) + + def unsubscribe(self, *args): + """ + Unsubscribe from the supplied channels. If empty, unsubscribe from + all channels + """ + return self._client.command_executor.execute_pubsub_method('unsubscribe', *args) + + def ssubscribe(self, *args, **kwargs): + """ + Subscribes the client to the specified shard channels. + Channels supplied as keyword arguments expect a channel name as the key + and a callable as the value. A channel's callable will be invoked automatically + when a message is received on that channel rather than producing a message via + ``listen()`` or ``get_sharded_message()``. + """ + return self._client.command_executor.execute_pubsub_method('ssubscribe', *args, **kwargs) + + def sunsubscribe(self, *args): + """ + Unsubscribe from the supplied shard_channels. If empty, unsubscribe from + all shard_channels + """ + return self._client.command_executor.execute_pubsub_method('sunsubscribe', *args) + + def get_message( + self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 + ): + """ + Get the next message if one is available, otherwise None. + + If timeout is specified, the system will wait for `timeout` seconds + before returning. Timeout should be specified as a floating point + number, or None, to wait indefinitely. + """ + return self._client.command_executor.execute_pubsub_method( + 'get_message', + ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout + ) + + get_sharded_message = get_message + + def run_in_thread( + self, + sleep_time: float = 0.0, + daemon: bool = False, + exception_handler: Optional[Callable] = None, + ) -> "PubSubWorkerThread": + return self._client.command_executor.execute_pubsub_run_in_thread( + sleep_time=sleep_time, + daemon=daemon, + exception_handler=exception_handler, + pubsub=self + ) + diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index 3b600a9ebb..918f2f5fb0 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -3,13 +3,13 @@ from datetime import datetime, timedelta from typing import List, Union, Optional, Callable -from redis.client import Pipeline +from redis.client import Pipeline, PubSub, PubSubWorkerThread from redis.exceptions import ConnectionError, TimeoutError from redis.event import EventDispatcherInterface, OnCommandsFailEvent from redis.multidb.config import DEFAULT_AUTO_FALLBACK_INTERVAL from redis.multidb.database import Database, AbstractDatabase, Databases from redis.multidb.circuit import State as CBState -from redis.multidb.event import RegisterCommandFailure +from redis.multidb.event import RegisterCommandFailure, ActiveDatabaseChanged, ResubscribeOnActiveDatabaseChanged from redis.multidb.failover import FailoverStrategy from redis.multidb.failure_detector import FailureDetector @@ -35,7 +35,7 @@ def databases(self) -> Databases: @property @abstractmethod - def active_database(self) -> Union[Database, None]: + def active_database(self) -> Optional[Database]: """Returns currently active database.""" pass @@ -45,6 +45,23 @@ def active_database(self, database: AbstractDatabase) -> None: """Sets currently active database.""" pass + @abstractmethod + def pubsub(self, **kwargs): + """Initializes a PubSub object on a currently active database""" + pass + + @property + @abstractmethod + def active_pubsub(self) -> Optional[PubSub]: + """Returns currently active pubsub.""" + pass + + @active_pubsub.setter + @abstractmethod + def active_pubsub(self, pubsub: PubSub) -> None: + """Sets currently active pubsub.""" + pass + @property @abstractmethod def failover_strategy(self) -> FailoverStrategy: @@ -93,7 +110,9 @@ def __init__( self._event_dispatcher = event_dispatcher self._auto_fallback_interval = auto_fallback_interval self._next_fallback_attempt: datetime - self._active_database: Union[Database, None] = None + self._active_database: Optional[Database] = None + self._active_pubsub: Optional[PubSub] = None + self._active_pubsub_kwargs = {} self._setup_event_dispatcher() self._schedule_next_fallback() @@ -114,8 +133,22 @@ def active_database(self) -> Optional[AbstractDatabase]: @active_database.setter def active_database(self, database: AbstractDatabase) -> None: + old_active = self._active_database self._active_database = database + if old_active is not None and old_active is not database: + self._event_dispatcher.dispatch( + ActiveDatabaseChanged(old_active, self._active_database, self, **self._active_pubsub_kwargs) + ) + + @property + def active_pubsub(self) -> Optional[PubSub]: + return self._active_pubsub + + @active_pubsub.setter + def active_pubsub(self, pubsub: PubSub) -> None: + self._active_pubsub = pubsub + @property def failover_strategy(self) -> FailoverStrategy: return self._failover_strategy @@ -157,6 +190,39 @@ def callback(database): return self._execute_with_failure_detection(callback) + def pubsub(self, **kwargs): + def callback(database): + if self._active_pubsub is None: + self._active_pubsub = database.client.pubsub(**kwargs) + self._active_pubsub_kwargs = kwargs + return None + + return self._execute_with_failure_detection(callback) + + def execute_pubsub_method(self, method_name: str, *args, **kwargs): + """ + Executes given method on active pub/sub. + """ + def callback(database): + method = getattr(self.active_pubsub, method_name) + return method(*args, **kwargs) + + return self._execute_with_failure_detection(callback, *args) + + def execute_pubsub_run_in_thread( + self, + pubsub, + sleep_time: float = 0.0, + daemon: bool = False, + exception_handler: Optional[Callable] = None, + ) -> "PubSubWorkerThread": + def callback(database): + return self._active_pubsub.run_in_thread( + sleep_time, daemon=daemon, exception_handler=exception_handler, pubsub=pubsub + ) + + return self._execute_with_failure_detection(callback) + def _execute_with_failure_detection(self, callback: Callable, cmds: tuple = ()): """ Execute a commands execution callback with failure detection. @@ -169,7 +235,7 @@ def _execute_with_failure_detection(self, callback: Callable, cmds: tuple = ()): and self._next_fallback_attempt <= datetime.now() ) ): - self._active_database = self._failover_strategy.database + self.active_database = self._failover_strategy.database self._schedule_next_fallback() try: @@ -189,9 +255,11 @@ def _schedule_next_fallback(self) -> None: def _setup_event_dispatcher(self): """ - Registers command failure event listener. + Registers necessary listeners. """ - event_listener = RegisterCommandFailure(self._failure_detectors, self._databases) + failure_listener = RegisterCommandFailure(self._failure_detectors, self._databases) + resubscribe_listener = ResubscribeOnActiveDatabaseChanged() self._event_dispatcher.register_listeners({ - OnCommandsFailEvent: [event_listener], + OnCommandsFailEvent: [failure_listener], + ActiveDatabaseChanged: [resubscribe_listener], }) \ No newline at end of file diff --git a/redis/multidb/event.py b/redis/multidb/event.py index 08b633b512..b9f19724f4 100644 --- a/redis/multidb/event.py +++ b/redis/multidb/event.py @@ -2,8 +2,57 @@ from redis.event import EventListenerInterface, OnCommandsFailEvent from redis.multidb.config import Databases +from redis.multidb.database import AbstractDatabase from redis.multidb.failure_detector import FailureDetector +class ActiveDatabaseChanged: + """ + Event fired when an active database has been changed. + """ + def __init__( + self, + old_database: AbstractDatabase, + new_database: AbstractDatabase, + command_executor, + **kwargs + ): + self._old_database = old_database + self._new_database = new_database + self._command_executor = command_executor + self._kwargs = kwargs + + @property + def old_database(self) -> AbstractDatabase: + return self._old_database + + @property + def new_database(self) -> AbstractDatabase: + return self._new_database + + @property + def command_executor(self): + return self._command_executor + + @property + def kwargs(self): + return self._kwargs + +class ResubscribeOnActiveDatabaseChanged(EventListenerInterface): + """ + Re-subscribe currently active pub/sub to a new active database. + """ + def listen(self, event: ActiveDatabaseChanged): + old_pubsub = event.command_executor.active_pubsub + + if old_pubsub is not None: + # Re-assign old channels and patterns so they will be automatically subscribed on connection. + new_pubsub = event.new_database.client.pubsub(**event.kwargs) + new_pubsub.channels = old_pubsub.channels + new_pubsub.patterns = old_pubsub.patterns + new_pubsub.shard_channels = old_pubsub.shard_channels + new_pubsub.on_connect(None) + event.command_executor.active_pubsub = new_pubsub + old_pubsub.close() class RegisterCommandFailure(EventListenerInterface): """ From 2de5d09c3b9f555168eab0b3dd282f59d5a2b627 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Wed, 30 Jul 2025 14:38:36 +0300 Subject: [PATCH 2/7] Added scenario tests for Pub/Sub --- tests/test_scenario/conftest.py | 19 +++- tests/test_scenario/test_active_active.py | 111 ++++++++++++++++++++-- 2 files changed, 120 insertions(+), 10 deletions(-) diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index 8ae7441e98..bc230dbc6d 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -4,12 +4,20 @@ import pytest from redis.backoff import NoBackoff +from redis.event import EventDispatcher, EventListenerInterface from redis.multidb.client import MultiDBClient from redis.multidb.config import DatabaseConfig, MultiDbConfig, DEFAULT_HEALTH_CHECK_INTERVAL, \ DEFAULT_FAILURES_THRESHOLD +from redis.multidb.event import ActiveDatabaseChanged from redis.retry import Retry from tests.test_scenario.fault_injector_client import FaultInjectorClient +class CheckActiveDatabaseChangedListener(EventListenerInterface): + def __init__(self): + self.is_changed_flag = False + + def listen(self, event: ActiveDatabaseChanged): + self.is_changed_flag = True def get_endpoint_config(endpoint_name: str): endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None) @@ -33,13 +41,18 @@ def fault_injector_client(): return FaultInjectorClient(url) @pytest.fixture() -def r_multi_db(request) -> MultiDBClient: +def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListener]: endpoint_config = get_endpoint_config('re-active-active') username = endpoint_config.get('username', None) password = endpoint_config.get('password', None) failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) command_retry = request.param.get('command_retry', Retry(NoBackoff(), retries=3)) health_check_interval = request.param.get('health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL) + event_dispatcher = EventDispatcher() + listener = CheckActiveDatabaseChangedListener() + event_dispatcher.register_listeners({ + ActiveDatabaseChanged: [listener], + }) db_configs = [] db_config = DatabaseConfig( @@ -64,12 +77,12 @@ def r_multi_db(request) -> MultiDBClient: ) db_configs.append(db_config1) - config = MultiDbConfig( databases_config=db_configs, command_retry=command_retry, failure_threshold=failure_threshold, health_check_interval=health_check_interval, + event_dispatcher=event_dispatcher, ) - return MultiDBClient(config) \ No newline at end of file + return MultiDBClient(config), listener \ No newline at end of file diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 2b9bfc7e74..14f4dee375 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -1,13 +1,11 @@ +import json import logging import threading from time import sleep import pytest -from redis.backoff import NoBackoff from redis.client import Pipeline -from redis.exceptions import ConnectionError -from redis.retry import Retry from tests.test_scenario.conftest import get_endpoint_config from tests.test_scenario.fault_injector_client import ActionRequest, ActionType @@ -17,7 +15,7 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event endpoint_config = get_endpoint_config('re-active-active') action_request = ActionRequest( action_type=ActionType.NETWORK_FAILURE, - parameters={"bdb_id": endpoint_config['bdb_id'], "delay": 1, "cluster_index": 0} + parameters={"bdb_id": endpoint_config['bdb_id'], "delay": 2, "cluster_index": 0} ) result = fault_injector_client.trigger_action(action_request) @@ -34,10 +32,9 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event logger.info(f"Action completed. Status: {status_result['status']}") class TestActiveActiveStandalone: - def teardown_method(self, method): # Timeout so the cluster could recover from network failure. - sleep(3) + sleep(4) @pytest.mark.parametrize( "r_multi_db", @@ -54,6 +51,8 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector args=(fault_injector_client,event) ) + r_multi_db, listener = r_multi_db + # Client initialized on the first command. r_multi_db.set('key', 'value') thread.start() @@ -68,6 +67,8 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector assert r_multi_db.get('key') == 'value' sleep(0.1) + assert listener.is_changed_flag == True + @pytest.mark.parametrize( "r_multi_db", [ @@ -83,6 +84,8 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault args=(fault_injector_client,event) ) + r_multi_db, listener = r_multi_db + # Client initialized on first pipe execution. with r_multi_db.pipeline() as pipe: pipe.set('{hash}key1', 'value1') @@ -119,6 +122,8 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + assert listener.is_changed_flag == True + @pytest.mark.parametrize( "r_multi_db", [ @@ -134,6 +139,8 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject args=(fault_injector_client,event) ) + r_multi_db, listener = r_multi_db + # Client initialized on first pipe execution. pipe = r_multi_db.pipeline() pipe.set('{hash}key1', 'value1') @@ -168,6 +175,8 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + assert listener.is_changed_flag == True + @pytest.mark.parametrize( "r_multi_db", [ @@ -183,6 +192,8 @@ def test_transaction_failover_to_another_db(self, r_multi_db, fault_injector_cli args=(fault_injector_client,event) ) + r_multi_db, listener = r_multi_db + def callback(pipe: Pipeline): pipe.set('{hash}key1', 'value1') pipe.set('{hash}key2', 'value2') @@ -203,4 +214,90 @@ def callback(pipe: Pipeline): # Execute pipeline after network failure for _ in range(3): r_multi_db.transaction(callback) - sleep(0.1) \ No newline at end of file + sleep(0.1) + + assert listener.is_changed_flag == True + + @pytest.mark.parametrize( + "r_multi_db", + [ + {"failure_threshold": 2} + ], + indirect=True + ) + def test_pubsub_failover_to_another_db(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + + r_multi_db, listener = r_multi_db + data = json.dumps({'message': 'test'}) + + def handler(message): + assert message['data'] == data + + pubsub = r_multi_db.pubsub() + + # Assign a handler and run in a separate thread. + pubsub.subscribe(**{'test-channel': handler}) + pubsub_thread = pubsub.run_in_thread(sleep_time=0.1, daemon=True) + thread.start() + + # Execute pipeline before network failure + while not event.is_set(): + r_multi_db.publish('test-channel', data) + sleep(0.1) + + # Execute pipeline after network failure + for _ in range(3): + r_multi_db.publish('test-channel', data) + sleep(0.1) + + pubsub_thread.stop() + + assert listener.is_changed_flag == True + + @pytest.mark.parametrize( + "r_multi_db", + [ + {"failure_threshold": 2} + ], + indirect=True + ) + def test_sharded_pubsub_failover_to_another_db(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + + r_multi_db, listener = r_multi_db + data = json.dumps({'message': 'test'}) + + def handler(message): + assert message['data'] == data + + pubsub = r_multi_db.pubsub() + + # Assign a handler and run in a separate thread. + pubsub.ssubscribe(**{'test-channel': handler}) + pubsub_thread = pubsub.run_in_thread(sleep_time=0.1, daemon=True) + thread.start() + + # Execute pipeline before network failure + while not event.is_set(): + r_multi_db.spublish('test-channel', data) + sleep(0.1) + + # Execute pipeline after network failure + for _ in range(3): + r_multi_db.spublish('test-channel', data) + sleep(0.1) + + pubsub_thread.stop() + + assert listener.is_changed_flag == True \ No newline at end of file From a7f03c0328053432619300a6f7e0f8c29119a482 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Wed, 30 Jul 2025 14:58:00 +0300 Subject: [PATCH 3/7] Updated healthcheck retry --- redis/multidb/healthcheck.py | 1 + 1 file changed, 1 insertion(+) diff --git a/redis/multidb/healthcheck.py b/redis/multidb/healthcheck.py index a96b9cf815..1396a1e997 100644 --- a/redis/multidb/healthcheck.py +++ b/redis/multidb/healthcheck.py @@ -21,6 +21,7 @@ def __init__( retry: Retry, ) -> None: self._retry = retry + self._retry.update_supported_errors([ConnectionRefusedError]) @property def retry(self) -> Retry: From 0505e0a1b9c3fc23dd8cace9f40cf4b9de0d3413 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Wed, 30 Jul 2025 15:19:15 +0300 Subject: [PATCH 4/7] Increased timeout to avoid unprepared state before tests --- tests/test_scenario/test_active_active.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 14f4dee375..5aa98c23ec 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -34,7 +34,7 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event class TestActiveActiveStandalone: def teardown_method(self, method): # Timeout so the cluster could recover from network failure. - sleep(4) + sleep(5) @pytest.mark.parametrize( "r_multi_db", From a7a7b6dab1750e09a9ac8ab196a956d3f0d0f62b Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 31 Jul 2025 10:50:20 +0300 Subject: [PATCH 5/7] Added backoff retry and changed timeouts --- tests/test_scenario/conftest.py | 4 ++-- tests/test_scenario/test_active_active.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index bc230dbc6d..7d2332f041 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -3,7 +3,7 @@ import pytest -from redis.backoff import NoBackoff +from redis.backoff import NoBackoff, ExponentialBackoff from redis.event import EventDispatcher, EventListenerInterface from redis.multidb.client import MultiDBClient from redis.multidb.config import DatabaseConfig, MultiDbConfig, DEFAULT_HEALTH_CHECK_INTERVAL, \ @@ -46,7 +46,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen username = endpoint_config.get('username', None) password = endpoint_config.get('password', None) failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) - command_retry = request.param.get('command_retry', Retry(NoBackoff(), retries=3)) + command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.5, base=0.05), retries=3)) health_check_interval = request.param.get('health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL) event_dispatcher = EventDispatcher() listener = CheckActiveDatabaseChangedListener() diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 5aa98c23ec..09d156ce53 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -34,7 +34,7 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event class TestActiveActiveStandalone: def teardown_method(self, method): # Timeout so the cluster could recover from network failure. - sleep(5) + sleep(3) @pytest.mark.parametrize( "r_multi_db", From faa18ae86ff2996835419fa62ee8b766b74b7ac4 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 31 Jul 2025 11:19:34 +0300 Subject: [PATCH 6/7] Added retry for healthchecks to avoid fluctuations --- tests/test_scenario/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index 7d2332f041..30e1f011c0 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -9,6 +9,7 @@ from redis.multidb.config import DatabaseConfig, MultiDbConfig, DEFAULT_HEALTH_CHECK_INTERVAL, \ DEFAULT_FAILURES_THRESHOLD from redis.multidb.event import ActiveDatabaseChanged +from redis.multidb.healthcheck import EchoHealthCheck from redis.retry import Retry from tests.test_scenario.fault_injector_client import FaultInjectorClient @@ -47,6 +48,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen password = endpoint_config.get('password', None) failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.5, base=0.05), retries=3)) + health_checks = [EchoHealthCheck(command_retry)] health_check_interval = request.param.get('health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL) event_dispatcher = EventDispatcher() listener = CheckActiveDatabaseChangedListener() @@ -79,6 +81,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen config = MultiDbConfig( databases_config=db_configs, + health_checks=health_checks, command_retry=command_retry, failure_threshold=failure_threshold, health_check_interval=health_check_interval, From 7d5e9579e1a62e830439a0ce77ed813a1b4c7f85 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 31 Jul 2025 11:33:03 +0300 Subject: [PATCH 7/7] Changed retry configuration for healthchecks --- tests/test_scenario/conftest.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index 30e1f011c0..486dc948f1 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -48,7 +48,10 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen password = endpoint_config.get('password', None) failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.5, base=0.05), retries=3)) - health_checks = [EchoHealthCheck(command_retry)] + + # Retry configuration different for health checks as initial health check require more time in case + # if infrastructure wasn't restored from the previous test. + health_checks = [EchoHealthCheck(Retry(ExponentialBackoff(cap=5, base=0.5), retries=3))] health_check_interval = request.param.get('health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL) event_dispatcher = EventDispatcher() listener = CheckActiveDatabaseChangedListener()