Skip to content

Commit 8c5d1ad

Browse files
committed
Added stadalone client metrics export
1 parent f1a33dd commit 8c5d1ad

File tree

9 files changed

+943
-43
lines changed

9 files changed

+943
-43
lines changed

redis/client.py

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
AfterPubSubConnectionInstantiationEvent,
4646
AfterSingleConnectionInstantiationEvent,
4747
ClientType,
48-
EventDispatcher,
48+
EventDispatcher, AfterCommandExecutionEvent,
4949
)
5050
from redis.exceptions import (
5151
ConnectionError,
@@ -478,7 +478,8 @@ def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
478478
between the client and server.
479479
"""
480480
return Pipeline(
481-
self.connection_pool, self.response_callbacks, transaction, shard_hint
481+
self.connection_pool, self.response_callbacks, transaction, shard_hint,
482+
event_dispatcher=self._event_dispatcher
482483
)
483484

484485
def transaction(
@@ -662,16 +663,42 @@ def _execute_command(self, *args, **options):
662663
command_name = args[0]
663664
conn = self.connection or pool.get_connection()
664665

666+
# Start timing for observability
667+
start_time = time.monotonic()
668+
665669
if self._single_connection_client:
666670
self.single_connection_lock.acquire()
667671
try:
668-
return conn.retry.call_with_retry(
672+
result = conn.retry.call_with_retry(
669673
lambda: self._send_command_parse_response(
670674
conn, command_name, *args, **options
671675
),
672676
lambda _: self._close_connection(conn),
673677
)
674678

679+
self._event_dispatcher.dispatch(
680+
AfterCommandExecutionEvent(
681+
command_name=command_name,
682+
duration_seconds=time.monotonic() - start_time,
683+
server_address=conn.host,
684+
server_port=conn.port,
685+
db_namespace=str(conn.db),
686+
)
687+
)
688+
return result
689+
except Exception as e:
690+
self._event_dispatcher.dispatch(
691+
AfterCommandExecutionEvent(
692+
command_name=command_name,
693+
duration_seconds=time.monotonic() - start_time,
694+
server_address=conn.host,
695+
server_port=conn.port,
696+
db_namespace=str(conn.db),
697+
error=e,
698+
)
699+
)
700+
raise
701+
675702
finally:
676703
if conn and conn.should_reconnect():
677704
self._close_connection(conn)
@@ -1385,6 +1412,7 @@ def __init__(
13851412
response_callbacks,
13861413
transaction,
13871414
shard_hint,
1415+
event_dispatcher: EventDispatcher
13881416
):
13891417
self.connection_pool = connection_pool
13901418
self.connection: Optional[Connection] = None
@@ -1395,6 +1423,7 @@ def __init__(
13951423
self.command_stack = []
13961424
self.scripts: Set[Script] = set()
13971425
self.explicit_transaction = False
1426+
self._event_dispatcher = event_dispatcher
13981427

13991428
def __enter__(self) -> "Pipeline":
14001429
return self
@@ -1501,12 +1530,41 @@ def immediate_execute_command(self, *args, **options):
15011530
conn = self.connection_pool.get_connection()
15021531
self.connection = conn
15031532

1504-
return conn.retry.call_with_retry(
1505-
lambda: self._send_command_parse_response(
1506-
conn, command_name, *args, **options
1507-
),
1508-
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1509-
)
1533+
# Start timing for observability
1534+
start_time = time.monotonic()
1535+
1536+
try:
1537+
response = conn.retry.call_with_retry(
1538+
lambda: self._send_command_parse_response(
1539+
conn, command_name, *args, **options
1540+
),
1541+
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1542+
)
1543+
1544+
self._event_dispatcher.dispatch(
1545+
AfterCommandExecutionEvent(
1546+
command_name=command_name,
1547+
duration_seconds=time.monotonic() - start_time,
1548+
server_address=conn.host,
1549+
server_port=conn.port,
1550+
db_namespace=str(conn.db),
1551+
)
1552+
)
1553+
1554+
return response
1555+
except Exception as e:
1556+
self._event_dispatcher.dispatch(
1557+
AfterCommandExecutionEvent(
1558+
command_name=command_name,
1559+
duration_seconds=time.monotonic() - start_time,
1560+
server_address=conn.host,
1561+
server_port=conn.port,
1562+
db_namespace=str(conn.db),
1563+
error=e,
1564+
)
1565+
)
1566+
raise
1567+
15101568

15111569
def pipeline_execute_command(self, *args, **options) -> "Pipeline":
15121570
"""
@@ -1679,8 +1737,10 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
16791737
self.load_scripts()
16801738
if self.transaction or self.explicit_transaction:
16811739
execute = self._execute_transaction
1740+
operation_name = "MULTI"
16821741
else:
16831742
execute = self._execute_pipeline
1743+
operation_name = "PIPELINE"
16841744

16851745
conn = self.connection
16861746
if not conn:
@@ -1689,11 +1749,40 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
16891749
# back to the pool after we're done
16901750
self.connection = conn
16911751

1752+
# Start timing for observability
1753+
start_time = time.monotonic()
1754+
16921755
try:
1693-
return conn.retry.call_with_retry(
1756+
response = conn.retry.call_with_retry(
16941757
lambda: execute(conn, stack, raise_on_error),
16951758
lambda error: self._disconnect_raise_on_watching(conn, error),
16961759
)
1760+
1761+
self._event_dispatcher.dispatch(
1762+
AfterCommandExecutionEvent(
1763+
command_name=operation_name,
1764+
duration_seconds=time.monotonic() - start_time,
1765+
server_address=conn.host,
1766+
server_port=conn.port,
1767+
db_namespace=str(conn.db),
1768+
batch_size=len(stack),
1769+
)
1770+
)
1771+
return response
1772+
except Exception as e:
1773+
self._event_dispatcher.dispatch(
1774+
AfterCommandExecutionEvent(
1775+
command_name=operation_name,
1776+
duration_seconds=time.monotonic() - start_time,
1777+
server_address=conn.host,
1778+
server_port=conn.port,
1779+
db_namespace=str(conn.db),
1780+
error=e,
1781+
batch_size=len(stack),
1782+
)
1783+
)
1784+
raise
1785+
16971786
finally:
16981787
# in reset() the connection is disconnected before returned to the pool if
16991788
# it is marked for reconnect.

redis/event.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
22
import threading
33
from abc import ABC, abstractmethod
4+
from dataclasses import dataclass
45
from enum import Enum
56
from typing import Dict, List, Optional, Type, Union
67

78
from redis.auth.token import TokenInterface
89
from redis.credentials import CredentialProvider, StreamingCredentialProvider
10+
from redis.observability.recorder import record_operation_duration
911

1012

1113
class EventListenerInterface(ABC):
@@ -90,6 +92,7 @@ def __init__(
9092
],
9193
AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()],
9294
AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()],
95+
AfterCommandExecutionEvent: [ExportOperationDurationMetric()],
9396
AsyncAfterConnectionReleasedEvent: [
9497
AsyncReAuthConnectionListener(),
9598
],
@@ -295,6 +298,19 @@ def commands(self) -> tuple:
295298
def exception(self) -> Exception:
296299
return self._exception
297300

301+
@dataclass
302+
class AfterCommandExecutionEvent:
303+
"""
304+
Event fired after command execution.
305+
"""
306+
command_name: str
307+
duration_seconds: float
308+
server_address: Optional[str] = None
309+
server_port: Optional[int] = None
310+
db_namespace: Optional[str] = None
311+
error: Optional[Exception] = None
312+
is_blocking: Optional[bool] = None
313+
batch_size: Optional[int] = None
298314

299315
class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
300316
pass
@@ -466,3 +482,19 @@ def _raise_on_error(self, error: Exception):
466482

467483
async def _raise_on_error_async(self, error: Exception):
468484
raise EventException(error, self._event)
485+
486+
class ExportOperationDurationMetric(EventListenerInterface):
487+
"""
488+
Listener that exports operation duration metric after command execution.
489+
"""
490+
def listen(self, event: AfterCommandExecutionEvent):
491+
record_operation_duration(
492+
command_name=event.command_name,
493+
duration_seconds=event.duration_seconds,
494+
server_address=event.server_address,
495+
server_port=event.server_port,
496+
db_namespace=event.db_namespace,
497+
error=event.error,
498+
is_blocking=event.is_blocking,
499+
batch_size=event.batch_size,
500+
)

redis/exceptions.py

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,52 @@
1+
from enum import Enum
2+
13
"Core exceptions raised by the Redis client"
24

35

6+
class ExceptionType(Enum):
7+
NETWORK = 'network'
8+
TLS = 'tls'
9+
AUTH = 'auth'
10+
SERVER = 'server'
11+
12+
413
class RedisError(Exception):
5-
pass
14+
def __init__(self, *args):
15+
super().__init__(*args)
16+
self.error_type = ExceptionType.SERVER
17+
18+
def __repr__(self):
19+
return f"{self.error_type.value}:{self.__class__.__name__}"
620

721

822
class ConnectionError(RedisError):
9-
pass
23+
def __init__(self, *args):
24+
super().__init__(*args)
25+
self.error_type = ExceptionType.NETWORK
1026

1127

1228
class TimeoutError(RedisError):
13-
pass
29+
def __init__(self, *args):
30+
super().__init__(*args)
31+
self.error_type = ExceptionType.NETWORK
1432

1533

1634
class AuthenticationError(ConnectionError):
17-
pass
35+
def __init__(self, *args):
36+
super().__init__(*args)
37+
self.error_type = ExceptionType.AUTH
1838

1939

2040
class AuthorizationError(ConnectionError):
21-
pass
41+
def __init__(self, *args):
42+
super().__init__(*args)
43+
self.error_type = ExceptionType.AUTH
2244

2345

2446
class BusyLoadingError(ConnectionError):
25-
pass
47+
def __init__(self, *args):
48+
super().__init__(*args)
49+
self.error_type = ExceptionType.NETWORK
2650

2751

2852
class InvalidResponse(RedisError):
@@ -70,7 +94,9 @@ class ReadOnlyError(ResponseError):
7094

7195

7296
class NoPermissionError(ResponseError):
73-
pass
97+
def __init__(self, *args):
98+
super().__init__(*args)
99+
self.error_type = ExceptionType.AUTH
74100

75101

76102
class ModuleError(ResponseError):
@@ -84,6 +110,7 @@ class LockError(RedisError, ValueError):
84110
# This was originally chosen to behave like threading.Lock.
85111

86112
def __init__(self, message=None, lock_name=None):
113+
super().__init__(message)
87114
self.message = message
88115
self.lock_name = lock_name
89116

@@ -106,15 +133,22 @@ class AuthenticationWrongNumberOfArgsError(ResponseError):
106133
were sent to the AUTH command
107134
"""
108135

109-
pass
136+
def __init__(self, *args):
137+
super().__init__(*args)
138+
self.error_type = ExceptionType.AUTH
110139

111140

112141
class RedisClusterException(Exception):
113142
"""
114143
Base exception for the RedisCluster client
115144
"""
116145

117-
pass
146+
def __init__(self, *args):
147+
super().__init__(*args)
148+
self.error_type = ExceptionType.SERVER
149+
150+
def __repr__(self):
151+
return f"{self.error_type.value}:{self.__class__.__name__}"
118152

119153

120154
class ClusterError(RedisError):
@@ -123,7 +157,9 @@ class ClusterError(RedisError):
123157
command execution TTL
124158
"""
125159

126-
pass
160+
def __init__(self, *args):
161+
super().__init__(*args)
162+
self.error_type = ExceptionType.SERVER
127163

128164

129165
class ClusterDownError(ClusterError, ResponseError):
@@ -140,6 +176,7 @@ class ClusterDownError(ClusterError, ResponseError):
140176
def __init__(self, resp):
141177
self.args = (resp,)
142178
self.message = resp
179+
self.error_type = ExceptionType.SERVER
143180

144181

145182
class AskError(ResponseError):
@@ -160,6 +197,7 @@ class AskError(ResponseError):
160197

161198
def __init__(self, resp):
162199
"""should only redirect to master node"""
200+
super().__init__(resp)
163201
self.args = (resp,)
164202
self.message = resp
165203
slot_id, new_node = resp.split(" ")
@@ -176,7 +214,7 @@ class TryAgainError(ResponseError):
176214
"""
177215

178216
def __init__(self, *args, **kwargs):
179-
pass
217+
super().__init__(*args)
180218

181219

182220
class ClusterCrossSlotError(ResponseError):
@@ -188,6 +226,10 @@ class ClusterCrossSlotError(ResponseError):
188226

189227
message = "Keys in request don't hash to the same slot"
190228

229+
def __init__(self, *args):
230+
super().__init__(*args)
231+
self.error_type = ExceptionType.SERVER
232+
191233

192234
class MovedError(AskError):
193235
"""

0 commit comments

Comments
 (0)