Skip to content

Commit c1a3e08

Browse files
committed
Added support for cluster client
1 parent 8c5d1ad commit c1a3e08

File tree

5 files changed

+856
-12
lines changed

5 files changed

+856
-12
lines changed

redis/cluster.py

Lines changed: 188 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
AfterPooledConnectionsInstantiationEvent,
3030
AfterPubSubConnectionInstantiationEvent,
3131
ClientType,
32-
EventDispatcher,
32+
EventDispatcher, AfterCommandExecutionEvent,
3333
)
3434
from redis.exceptions import (
3535
AskError,
@@ -984,6 +984,7 @@ def pipeline(self, transaction=None, shard_hint=None):
984984
retry=self.retry,
985985
lock=self._lock,
986986
transaction=transaction,
987+
event_dispatcher=self._event_dispatcher
987988
)
988989

989990
def lock(
@@ -1367,6 +1368,9 @@ def _execute_command(self, target_node, *args, **kwargs):
13671368
moved = False
13681369
ttl = int(self.RedisClusterRequestTTL)
13691370

1371+
# Start timing for observability
1372+
start_time = time.monotonic()
1373+
13701374
while ttl > 0:
13711375
ttl -= 1
13721376
try:
@@ -1401,14 +1405,32 @@ def _execute_command(self, target_node, *args, **kwargs):
14011405
response = self.cluster_response_callbacks[command](
14021406
response, **kwargs
14031407
)
1408+
1409+
self._emit_after_command_execution_event(
1410+
command_name=command,
1411+
duration_seconds=time.monotonic() - start_time,
1412+
connection=connection,
1413+
)
14041414
return response
1405-
except AuthenticationError:
1415+
except AuthenticationError as e:
1416+
self._emit_after_command_execution_event(
1417+
command_name=command,
1418+
duration_seconds=time.monotonic() - start_time,
1419+
connection=connection,
1420+
error=e,
1421+
)
14061422
raise
1407-
except MaxConnectionsError:
1423+
except MaxConnectionsError as e:
14081424
# MaxConnectionsError indicates client-side resource exhaustion
14091425
# (too many connections in the pool), not a node failure.
14101426
# Don't treat this as a node failure - just re-raise the error
14111427
# without reinitializing the cluster.
1428+
self._emit_after_command_execution_event(
1429+
command_name=command,
1430+
duration_seconds=time.monotonic() - start_time,
1431+
connection=connection,
1432+
error=e,
1433+
)
14121434
raise
14131435
except (ConnectionError, TimeoutError) as e:
14141436
# ConnectionError can also be raised if we couldn't get a
@@ -1423,6 +1445,12 @@ def _execute_command(self, target_node, *args, **kwargs):
14231445
# Reset the cluster node's connection
14241446
target_node.redis_connection = None
14251447
self.nodes_manager.initialize()
1448+
self._emit_after_command_execution_event(
1449+
command_name=command,
1450+
duration_seconds=time.monotonic() - start_time,
1451+
connection=connection,
1452+
error=e,
1453+
)
14261454
raise e
14271455
except MovedError as e:
14281456
# First, we will try to patch the slots/nodes cache with the
@@ -1441,13 +1469,33 @@ def _execute_command(self, target_node, *args, **kwargs):
14411469
else:
14421470
self.nodes_manager.update_moved_exception(e)
14431471
moved = True
1444-
except TryAgainError:
1472+
self._emit_after_command_execution_event(
1473+
command_name=command,
1474+
duration_seconds=time.monotonic() - start_time,
1475+
connection=connection,
1476+
error=e,
1477+
)
1478+
except TryAgainError as e:
14451479
if ttl < self.RedisClusterRequestTTL / 2:
14461480
time.sleep(0.05)
1481+
1482+
self._emit_after_command_execution_event(
1483+
command_name=command,
1484+
duration_seconds=time.monotonic() - start_time,
1485+
connection=connection,
1486+
error=e,
1487+
)
14471488
except AskError as e:
14481489
redirect_addr = get_node_name(host=e.host, port=e.port)
14491490
asking = True
1450-
except (ClusterDownError, SlotNotCoveredError):
1491+
1492+
self._emit_after_command_execution_event(
1493+
command_name=command,
1494+
duration_seconds=time.monotonic() - start_time,
1495+
connection=connection,
1496+
error=e,
1497+
)
1498+
except (ClusterDownError, SlotNotCoveredError) as e:
14511499
# ClusterDownError can occur during a failover and to get
14521500
# self-healed, we will try to reinitialize the cluster layout
14531501
# and retry executing the command
@@ -1459,19 +1507,60 @@ def _execute_command(self, target_node, *args, **kwargs):
14591507

14601508
time.sleep(0.25)
14611509
self.nodes_manager.initialize()
1510+
1511+
self._emit_after_command_execution_event(
1512+
command_name=command,
1513+
duration_seconds=time.monotonic() - start_time,
1514+
connection=connection,
1515+
error=e,
1516+
)
14621517
raise
1463-
except ResponseError:
1518+
except ResponseError as e:
1519+
self._emit_after_command_execution_event(
1520+
command_name=command,
1521+
duration_seconds=time.monotonic() - start_time,
1522+
connection=connection,
1523+
error=e,
1524+
)
14641525
raise
14651526
except Exception as e:
14661527
if connection:
14671528
connection.disconnect()
1529+
1530+
self._emit_after_command_execution_event(
1531+
command_name=command,
1532+
duration_seconds=time.monotonic() - start_time,
1533+
connection=connection,
1534+
error=e,
1535+
)
14681536
raise e
14691537
finally:
14701538
if connection is not None:
14711539
redis_node.connection_pool.release(connection)
14721540

14731541
raise ClusterError("TTL exhausted.")
14741542

1543+
def _emit_after_command_execution_event(
1544+
self,
1545+
command_name: str,
1546+
duration_seconds: float,
1547+
connection: Connection,
1548+
error=None
1549+
):
1550+
"""
1551+
Triggers AfterCommandExecutionEvent emit.
1552+
"""
1553+
self._event_dispatcher.dispatch(
1554+
AfterCommandExecutionEvent(
1555+
command_name=command_name,
1556+
duration_seconds=duration_seconds,
1557+
server_address=connection.host,
1558+
server_port=connection.port,
1559+
db_namespace=str(connection.db),
1560+
error=error,
1561+
)
1562+
)
1563+
14751564
def close(self) -> None:
14761565
try:
14771566
with self._lock:
@@ -2326,6 +2415,7 @@ def __init__(
23262415
lock=None,
23272416
transaction=False,
23282417
policy_resolver: PolicyResolver = StaticPolicyResolver(),
2418+
event_dispatcher: Optional["EventDispatcher"] = None,
23292419
**kwargs,
23302420
):
23312421
""" """
@@ -2395,6 +2485,11 @@ def __init__(
23952485

23962486
self._policy_resolver = policy_resolver
23972487

2488+
if event_dispatcher is None:
2489+
self._event_dispatcher = EventDispatcher()
2490+
else:
2491+
self._event_dispatcher = event_dispatcher
2492+
23982493
def __repr__(self):
23992494
""" """
24002495
return f"{type(self).__name__}"
@@ -2889,14 +2984,24 @@ def __init__(self, pipe: ClusterPipeline):
28892984
def execute_command(self, *args, **kwargs):
28902985
return self.pipeline_execute_command(*args, **kwargs)
28912986

2892-
def _raise_first_error(self, stack):
2987+
def _raise_first_error(self, stack, start_time):
28932988
"""
28942989
Raise the first exception on the stack
28952990
"""
28962991
for c in stack:
28972992
r = c.result
28982993
if isinstance(r, Exception):
28992994
self.annotate_exception(r, c.position + 1, c.args)
2995+
2996+
self._pipe._event_dispatcher.dispatch(
2997+
AfterCommandExecutionEvent(
2998+
command_name="PIPELINE",
2999+
duration_seconds=time.monotonic() - start_time,
3000+
batch_size=len(stack),
3001+
error=r,
3002+
)
3003+
)
3004+
29003005
raise r
29013006

29023007
def execute(self, raise_on_error: bool = True) -> List[Any]:
@@ -3076,13 +3181,28 @@ def _send_cluster_commands(
30763181
# so that we can read them from different sockets as they come back.
30773182
# we dont' multiplex on the sockets as they come available,
30783183
# but that shouldn't make too much difference.
3184+
3185+
# Start timing for observability
3186+
start_time = time.monotonic()
3187+
30793188
try:
30803189
node_commands = nodes.values()
30813190
for n in node_commands:
30823191
n.write()
30833192

30843193
for n in node_commands:
30853194
n.read()
3195+
3196+
self._pipe._event_dispatcher.dispatch(
3197+
AfterCommandExecutionEvent(
3198+
command_name="PIPELINE",
3199+
duration_seconds=time.monotonic() - start_time,
3200+
server_address=n.connection.host,
3201+
server_port=n.connection.port,
3202+
db_namespace=str(n.connection.db),
3203+
batch_size=len(n.commands),
3204+
)
3205+
)
30863206
finally:
30873207
# release all of the redis connections we allocated earlier
30883208
# back into the connection pool.
@@ -3168,7 +3288,7 @@ def _send_cluster_commands(
31683288
response.append(c.result)
31693289

31703290
if raise_on_error:
3171-
self._raise_first_error(stack)
3291+
self._raise_first_error(stack, start_time)
31723292

31733293
return response
31743294

@@ -3373,9 +3493,38 @@ def _immediate_execute_command(self, *args, **options):
33733493

33743494
def _get_connection_and_send_command(self, *args, **options):
33753495
redis_node, connection = self._get_client_and_connection_for_transaction()
3376-
return self._send_command_parse_response(
3377-
connection, redis_node, args[0], *args, **options
3378-
)
3496+
3497+
# Start timing for observability
3498+
start_time = time.monotonic()
3499+
3500+
try:
3501+
response = self._send_command_parse_response(
3502+
connection, redis_node, args[0], *args, **options
3503+
)
3504+
3505+
self._pipe._event_dispatcher.dispatch(
3506+
AfterCommandExecutionEvent(
3507+
command_name=args[0],
3508+
duration_seconds=time.monotonic() - start_time,
3509+
server_address=connection.host,
3510+
server_port=connection.port,
3511+
db_namespace=str(connection.db),
3512+
)
3513+
)
3514+
3515+
return response
3516+
except Exception as e:
3517+
self._pipe._event_dispatcher.dispatch(
3518+
AfterCommandExecutionEvent(
3519+
command_name=args[0],
3520+
duration_seconds=time.monotonic() - start_time,
3521+
server_address=connection.host,
3522+
server_port=connection.port,
3523+
db_namespace=str(connection.db),
3524+
error=e,
3525+
)
3526+
)
3527+
raise
33793528

33803529
def _send_command_parse_response(
33813530
self, conn, redis_node: Redis, command_name, *args, **options
@@ -3413,13 +3562,24 @@ def _reinitialize_on_error(self, error):
34133562

34143563
self._executing = False
34153564

3416-
def _raise_first_error(self, responses, stack):
3565+
def _raise_first_error(self, responses, stack, start_time):
34173566
"""
34183567
Raise the first exception on the stack
34193568
"""
34203569
for r, cmd in zip(responses, stack):
34213570
if isinstance(r, Exception):
34223571
self.annotate_exception(r, cmd.position + 1, cmd.args)
3572+
3573+
self._pipe._event_dispatcher.dispatch(
3574+
AfterCommandExecutionEvent(
3575+
command_name='TRANSACTION',
3576+
duration_seconds=time.monotonic() - start_time,
3577+
server_address=self._transaction_connection.host,
3578+
server_port=self._transaction_connection.port,
3579+
db_namespace=str(self._transaction_connection.db),
3580+
)
3581+
)
3582+
34233583
raise r
34243584

34253585
def execute(self, raise_on_error: bool = True) -> List[Any]:
@@ -3456,6 +3616,10 @@ def _execute_transaction(
34563616
)
34573617
commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
34583618
packed_commands = connection.pack_commands(commands)
3619+
3620+
# Start timing for observability
3621+
start_time = time.monotonic()
3622+
34593623
connection.send_packed_command(packed_commands)
34603624
errors = []
34613625

@@ -3500,6 +3664,17 @@ def _execute_transaction(
35003664

35013665
self._executing = False
35023666

3667+
self._pipe._event_dispatcher.dispatch(
3668+
AfterCommandExecutionEvent(
3669+
command_name='TRANSACTION',
3670+
duration_seconds=time.monotonic() - start_time,
3671+
server_address=connection.host,
3672+
server_port=connection.port,
3673+
db_namespace=str(connection.db),
3674+
batch_size=len(self._command_queue),
3675+
)
3676+
)
3677+
35033678
# EXEC clears any watched keys
35043679
self._watching = False
35053680

@@ -3523,6 +3698,7 @@ def _execute_transaction(
35233698
self._raise_first_error(
35243699
response,
35253700
self._command_queue,
3701+
start_time,
35263702
)
35273703

35283704
# We have to run response callbacks manually

redis/event.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ class AfterCommandExecutionEvent:
311311
error: Optional[Exception] = None
312312
is_blocking: Optional[bool] = None
313313
batch_size: Optional[int] = None
314+
retry_attempts: Optional[int] = None
314315

315316
class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
316317
pass
@@ -497,4 +498,5 @@ def listen(self, event: AfterCommandExecutionEvent):
497498
error=event.error,
498499
is_blocking=event.is_blocking,
499500
batch_size=event.batch_size,
501+
retry_attempts=event.retry_attempts,
500502
)

redis/observability/recorder.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def record_operation_duration(
3939
error: Optional[Exception] = None,
4040
is_blocking: Optional[bool] = None,
4141
batch_size: Optional[int] = None,
42+
retry_attempts: Optional[int] = None,
4243
) -> None:
4344
"""
4445
Record a Redis command execution duration.
@@ -55,6 +56,7 @@ def record_operation_duration(
5556
error: Exception if command failed, None if successful
5657
is_blocking: Whether the operation is a blocking command
5758
batch_size: Number of commands in batch (for pipelines/transactions)
59+
retry_attempts: Number of retry attempts made
5860
5961
Example:
6062
>>> start = time.monotonic()
@@ -89,6 +91,7 @@ def record_operation_duration(
8991
network_peer_port=server_port,
9092
is_blocking=is_blocking,
9193
batch_size=batch_size,
94+
retry_attempts=retry_attempts,
9295
)
9396
# except Exception:
9497
# # Don't let metric recording errors break Redis operations

0 commit comments

Comments
 (0)