diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index d70569bb95..daee830bbb 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1456,27 +1456,30 @@ def _update_moved_slots(self) -> None: e.host, e.port, PRIMARY, **self.connection_kwargs ) self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node}) - if redirected_node in self.slots_cache[e.slot_id]: + slot_nodes = self.slots_cache[e.slot_id] + if redirected_node not in slot_nodes: + # The new slot owner is a new server, or a server from a different + # shard. We need to remove all current nodes from the slot's list + # (including replications) and add just the new node. + self.slots_cache[e.slot_id] = [redirected_node] + elif redirected_node is not slot_nodes[0]: # The MOVED error resulted from a failover, and the new slot owner # had previously been a replica. - old_primary = self.slots_cache[e.slot_id][0] + old_primary = slot_nodes[0] # Update the old primary to be a replica and add it to the end of # the slot's node list old_primary.server_type = REPLICA - self.slots_cache[e.slot_id].append(old_primary) + slot_nodes.append(old_primary) # Remove the old replica, which is now a primary, from the slot's # node list - self.slots_cache[e.slot_id].remove(redirected_node) + slot_nodes.remove(redirected_node) # Override the old primary with the new one - self.slots_cache[e.slot_id][0] = redirected_node + slot_nodes[0] = redirected_node if self.default_node == old_primary: # Update the default node with the new primary self.default_node = redirected_node - else: - # The new slot owner is a new server, or a server from a different - # shard. We need to remove all current nodes from the slot's list - # (including replications) and add just the new node. - self.slots_cache[e.slot_id] = [redirected_node] + # else: circular MOVED to current primary -> no-op + # Reset moved_exception self._moved_exception = None diff --git a/redis/cluster.py b/redis/cluster.py index dabac841db..9d23e18a2b 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1693,27 +1693,30 @@ def _update_moved_slots(self): # This is a new node, we will add it to the nodes cache redirected_node = ClusterNode(e.host, e.port, PRIMARY) self.nodes_cache[redirected_node.name] = redirected_node - if redirected_node in self.slots_cache[e.slot_id]: + slot_nodes = self.slots_cache[e.slot_id] + if redirected_node not in slot_nodes: + # The new slot owner is a new server, or a server from a different + # shard. We need to remove all current nodes from the slot's list + # (including replications) and add just the new node. + self.slots_cache[e.slot_id] = [redirected_node] + elif redirected_node is not slot_nodes[0]: # The MOVED error resulted from a failover, and the new slot owner # had previously been a replica. - old_primary = self.slots_cache[e.slot_id][0] + old_primary = slot_nodes[0] # Update the old primary to be a replica and add it to the end of # the slot's node list old_primary.server_type = REPLICA - self.slots_cache[e.slot_id].append(old_primary) + slot_nodes.append(old_primary) # Remove the old replica, which is now a primary, from the slot's # node list - self.slots_cache[e.slot_id].remove(redirected_node) + slot_nodes.remove(redirected_node) # Override the old primary with the new one - self.slots_cache[e.slot_id][0] = redirected_node + slot_nodes[0] = redirected_node if self.default_node == old_primary: # Update the default node with the new primary self.default_node = redirected_node - else: - # The new slot owner is a new server, or a server from a different - # shard. We need to remove all current nodes from the slot's list - # (including replications) and add just the new node. - self.slots_cache[e.slot_id] = [redirected_node] + # else: circular MOVED to current primary -> no-op + # Reset moved_exception self._moved_exception = None diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 25f487fe4c..8ca0bb8541 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -233,19 +233,25 @@ def mock_all_nodes_resp(rc: RedisCluster, response: Any) -> RedisCluster: async def moved_redirection_helper( - create_redis: Callable[..., RedisCluster], failover: bool = False + create_redis: Callable[..., RedisCluster], + failover: bool = False, + circular_moved=False, ) -> None: """ - Test that the client handles MOVED response after a failover. - Redirection after a failover means that the redirection address is of a - replica that was promoted to a primary. + Test that the client correctly handles MOVED responses in the following scenarios: + 1. Slot migration to a different shard (failover=False, circular_moved=False) — + a standard slot move between shards. + 2. Failover event (failover=True, circular_moved=False) — + the redirect target is a replica that has just been promoted to primary. + 3. Circular MOVED (failover=False, circular_moved=True) — + the redirect points to a node already known to be the primary of its shard. At first call it should return a MOVED ResponseError that will point the client to the next server it should talk to. Verify that: 1. it tries to talk to the redirected node - 2. it updates the slot's primary to the redirected node + 2. it updates the slot's primary to the redirected node, if required For a failover, also verify: 3. the redirected node's server type updated to 'primary' @@ -261,6 +267,8 @@ async def moved_redirection_helper( warnings.warn("Skipping this test since it requires to have a replica") return redirect_node = rc.nodes_manager.slots_cache[slot][1] + elif circular_moved: + redirect_node = prev_primary else: # Use one of the primaries to be the redirected node redirect_node = rc.get_primaries()[0] @@ -287,6 +295,10 @@ def ok_response(self, *args, **options): if failover: assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY assert prev_primary.server_type == REPLICA + elif circular_moved: + fetched_node = rc.get_node(host=r_host, port=r_port) + assert fetched_node == prev_primary + assert fetched_node.server_type == PRIMARY class TestRedisClusterObj: @@ -613,6 +625,17 @@ async def test_moved_redirection_after_failover( """ await moved_redirection_helper(create_redis, failover=True) + async def test_moved_redirection_circular_moved( + self, create_redis: Callable[..., RedisCluster] + ) -> None: + """ + Verify that the client does not update its slot map when receiving a circular MOVED response + (i.e., a MOVED redirect pointing back to the same node), and retries again the same node. + """ + await moved_redirection_helper( + create_redis, failover=False, circular_moved=True + ) + async def test_refresh_using_specific_nodes( self, create_redis: Callable[..., RedisCluster] ) -> None: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index a6cfcd2d94..e4d6c1b6cf 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -273,18 +273,22 @@ def find_node_ip_based_on_port(cluster_client, port): return node.host -def moved_redirection_helper(request, failover=False): +def moved_redirection_helper(request, failover=False, circular_moved=False): """ - Test that the client handles MOVED response after a failover. - Redirection after a failover means that the redirection address is of a - replica that was promoted to a primary. + Test that the client correctly handles MOVED responses in the following scenarios: + 1. Slot migration to a different shard (failover=False, circular_moved=False) — + a standard slot move between shards. + 2. Failover event (failover=True, circular_moved=False) — + the redirect target is a replica that has just been promoted to primary. + 3. Circular MOVED (failover=False, circular_moved=True) — + the redirect points to a node already known to be the primary of its shard. At first call it should return a MOVED ResponseError that will point the client to the next server it should talk to. Verify that: 1. it tries to talk to the redirected node - 2. it updates the slot's primary to the redirected node + 2. it updates the slot's primary to the redirected node, if required For a failover, also verify: 3. the redirected node's server type updated to 'primary' @@ -300,8 +304,10 @@ def moved_redirection_helper(request, failover=False): warnings.warn("Skipping this test since it requires to have a replica") return redirect_node = rc.nodes_manager.slots_cache[slot][1] + elif circular_moved: + redirect_node = prev_primary else: - # Use one of the primaries to be the redirected node + # Use one of the other primaries to be the redirected node redirect_node = rc.get_primaries()[0] r_host = redirect_node.host r_port = redirect_node.port @@ -324,6 +330,10 @@ def ok_response(connection, *args, **options): if failover: assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY assert prev_primary.server_type == REPLICA + elif circular_moved: + fetched_node = rc.get_node(host=r_host, port=r_port) + assert fetched_node == prev_primary + assert fetched_node.server_type == PRIMARY @pytest.mark.onlycluster @@ -547,6 +557,13 @@ def test_moved_redirection_after_failover(self, request): """ moved_redirection_helper(request, failover=True) + def test_moved_redirection_circular_moved(self, request): + """ + Verify that the client does not update its slot map when receiving a circular MOVED response + (i.e., a MOVED redirect pointing back to the same node), and retries again the same node. + """ + moved_redirection_helper(request, failover=False, circular_moved=True) + def test_refresh_using_specific_nodes(self, request): """ Test making calls on specific nodes when the cluster has failed over to