Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1456,27 +1456,30 @@ def _update_moved_slots(self) -> None:
e.host, e.port, PRIMARY, **self.connection_kwargs
)
self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node})
if redirected_node in self.slots_cache[e.slot_id]:
slot_nodes = self.slots_cache[e.slot_id]
if redirected_node not in slot_nodes:
# The new slot owner is a new server, or a server from a different
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
elif redirected_node is not slot_nodes[0]:
# The MOVED error resulted from a failover, and the new slot owner
# had previously been a replica.
old_primary = self.slots_cache[e.slot_id][0]
old_primary = slot_nodes[0]
# Update the old primary to be a replica and add it to the end of
# the slot's node list
old_primary.server_type = REPLICA
self.slots_cache[e.slot_id].append(old_primary)
slot_nodes.append(old_primary)
# Remove the old replica, which is now a primary, from the slot's
# node list
self.slots_cache[e.slot_id].remove(redirected_node)
slot_nodes.remove(redirected_node)
# Override the old primary with the new one
self.slots_cache[e.slot_id][0] = redirected_node
slot_nodes[0] = redirected_node
if self.default_node == old_primary:
# Update the default node with the new primary
self.default_node = redirected_node
else:
# The new slot owner is a new server, or a server from a different
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
# else: circular MOVED to current primary -> no-op

# Reset moved_exception
self._moved_exception = None

Expand Down
23 changes: 13 additions & 10 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,27 +1693,30 @@ def _update_moved_slots(self):
# This is a new node, we will add it to the nodes cache
redirected_node = ClusterNode(e.host, e.port, PRIMARY)
self.nodes_cache[redirected_node.name] = redirected_node
if redirected_node in self.slots_cache[e.slot_id]:
slot_nodes = self.slots_cache[e.slot_id]
if redirected_node not in slot_nodes:
# The new slot owner is a new server, or a server from a different
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
elif redirected_node is not slot_nodes[0]:
# The MOVED error resulted from a failover, and the new slot owner
# had previously been a replica.
old_primary = self.slots_cache[e.slot_id][0]
old_primary = slot_nodes[0]
# Update the old primary to be a replica and add it to the end of
# the slot's node list
old_primary.server_type = REPLICA
self.slots_cache[e.slot_id].append(old_primary)
slot_nodes.append(old_primary)
# Remove the old replica, which is now a primary, from the slot's
# node list
self.slots_cache[e.slot_id].remove(redirected_node)
slot_nodes.remove(redirected_node)
# Override the old primary with the new one
self.slots_cache[e.slot_id][0] = redirected_node
slot_nodes[0] = redirected_node
if self.default_node == old_primary:
# Update the default node with the new primary
self.default_node = redirected_node
else:
# The new slot owner is a new server, or a server from a different
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
# else: circular MOVED to current primary -> no-op

# Reset moved_exception
self._moved_exception = None

Expand Down
33 changes: 28 additions & 5 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,25 @@ def mock_all_nodes_resp(rc: RedisCluster, response: Any) -> RedisCluster:


async def moved_redirection_helper(
create_redis: Callable[..., RedisCluster], failover: bool = False
create_redis: Callable[..., RedisCluster],
failover: bool = False,
circular_moved=False,
) -> None:
"""
Test that the client handles MOVED response after a failover.
Redirection after a failover means that the redirection address is of a
replica that was promoted to a primary.
Test that the client correctly handles MOVED responses in the following scenarios:
1. Slot migration to a different shard (failover=False, circular_moved=False) —
a standard slot move between shards.
2. Failover event (failover=True, circular_moved=False) —
the redirect target is a replica that has just been promoted to primary.
3. Circular MOVED (failover=False, circular_moved=True) —
the redirect points to a node already known to be the primary of its shard.

At first call it should return a MOVED ResponseError that will point
the client to the next server it should talk to.

Verify that:
1. it tries to talk to the redirected node
2. it updates the slot's primary to the redirected node
2. it updates the slot's primary to the redirected node, if required

For a failover, also verify:
3. the redirected node's server type updated to 'primary'
Expand All @@ -261,6 +267,8 @@ async def moved_redirection_helper(
warnings.warn("Skipping this test since it requires to have a replica")
return
redirect_node = rc.nodes_manager.slots_cache[slot][1]
elif circular_moved:
redirect_node = prev_primary
else:
# Use one of the primaries to be the redirected node
redirect_node = rc.get_primaries()[0]
Expand All @@ -287,6 +295,10 @@ def ok_response(self, *args, **options):
if failover:
assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY
assert prev_primary.server_type == REPLICA
elif circular_moved:
fetched_node = rc.get_node(host=r_host, port=r_port)
assert fetched_node == prev_primary
assert fetched_node.server_type == PRIMARY


class TestRedisClusterObj:
Expand Down Expand Up @@ -613,6 +625,17 @@ async def test_moved_redirection_after_failover(
"""
await moved_redirection_helper(create_redis, failover=True)

async def test_moved_redirection_circular_moved(
self, create_redis: Callable[..., RedisCluster]
) -> None:
"""
Verify that the client does not update its slot map when receiving a circular MOVED response
(i.e., a MOVED redirect pointing back to the same node), and retries again the same node.
"""
await moved_redirection_helper(
create_redis, failover=False, circular_moved=True
)

async def test_refresh_using_specific_nodes(
self, create_redis: Callable[..., RedisCluster]
) -> None:
Expand Down
29 changes: 23 additions & 6 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,22 @@ def find_node_ip_based_on_port(cluster_client, port):
return node.host


def moved_redirection_helper(request, failover=False):
def moved_redirection_helper(request, failover=False, circular_moved=False):
"""
Test that the client handles MOVED response after a failover.
Redirection after a failover means that the redirection address is of a
replica that was promoted to a primary.
Test that the client correctly handles MOVED responses in the following scenarios:
1. Slot migration to a different shard (failover=False, circular_moved=False) —
a standard slot move between shards.
2. Failover event (failover=True, circular_moved=False) —
the redirect target is a replica that has just been promoted to primary.
3. Circular MOVED (failover=False, circular_moved=True) —
the redirect points to a node already known to be the primary of its shard.

At first call it should return a MOVED ResponseError that will point
the client to the next server it should talk to.

Verify that:
1. it tries to talk to the redirected node
2. it updates the slot's primary to the redirected node
2. it updates the slot's primary to the redirected node, if required

For a failover, also verify:
3. the redirected node's server type updated to 'primary'
Expand All @@ -300,8 +304,10 @@ def moved_redirection_helper(request, failover=False):
warnings.warn("Skipping this test since it requires to have a replica")
return
redirect_node = rc.nodes_manager.slots_cache[slot][1]
elif circular_moved:
redirect_node = prev_primary
else:
# Use one of the primaries to be the redirected node
# Use one of the other primaries to be the redirected node
redirect_node = rc.get_primaries()[0]
r_host = redirect_node.host
r_port = redirect_node.port
Expand All @@ -324,6 +330,10 @@ def ok_response(connection, *args, **options):
if failover:
assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY
assert prev_primary.server_type == REPLICA
elif circular_moved:
fetched_node = rc.get_node(host=r_host, port=r_port)
assert fetched_node == prev_primary
assert fetched_node.server_type == PRIMARY


@pytest.mark.onlycluster
Expand Down Expand Up @@ -547,6 +557,13 @@ def test_moved_redirection_after_failover(self, request):
"""
moved_redirection_helper(request, failover=True)

def test_moved_redirection_circular_moved(self, request):
"""
Verify that the client does not update its slot map when receiving a circular MOVED response
(i.e., a MOVED redirect pointing back to the same node), and retries again the same node.
"""
moved_redirection_helper(request, failover=False, circular_moved=True)

def test_refresh_using_specific_nodes(self, request):
"""
Test making calls on specific nodes when the cluster has failed over to
Expand Down