Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 99 additions & 10 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
AfterPubSubConnectionInstantiationEvent,
AfterSingleConnectionInstantiationEvent,
ClientType,
EventDispatcher,
EventDispatcher, AfterCommandExecutionEvent,
)
from redis.exceptions import (
ConnectionError,
Expand Down Expand Up @@ -478,7 +478,8 @@ def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
between the client and server.
"""
return Pipeline(
self.connection_pool, self.response_callbacks, transaction, shard_hint
self.connection_pool, self.response_callbacks, transaction, shard_hint,
event_dispatcher=self._event_dispatcher
)

def transaction(
Expand Down Expand Up @@ -662,16 +663,42 @@ def _execute_command(self, *args, **options):
command_name = args[0]
conn = self.connection or pool.get_connection()

# Start timing for observability
start_time = time.monotonic()

if self._single_connection_client:
self.single_connection_lock.acquire()
try:
return conn.retry.call_with_retry(
result = conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda _: self._close_connection(conn),
)

self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=command_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
)
)
return result
except Exception as e:
self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=command_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
error=e,
)
)
raise

finally:
if conn and conn.should_reconnect():
self._close_connection(conn)
Expand Down Expand Up @@ -1385,6 +1412,7 @@ def __init__(
response_callbacks,
transaction,
shard_hint,
event_dispatcher: EventDispatcher
):
self.connection_pool = connection_pool
self.connection: Optional[Connection] = None
Expand All @@ -1395,6 +1423,7 @@ def __init__(
self.command_stack = []
self.scripts: Set[Script] = set()
self.explicit_transaction = False
self._event_dispatcher = event_dispatcher

def __enter__(self) -> "Pipeline":
return self
Expand Down Expand Up @@ -1501,12 +1530,41 @@ def immediate_execute_command(self, *args, **options):
conn = self.connection_pool.get_connection()
self.connection = conn

return conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
)
# Start timing for observability
start_time = time.monotonic()

try:
response = conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
)

self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=command_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
)
)

return response
except Exception as e:
self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=command_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
error=e,
)
)
raise


def pipeline_execute_command(self, *args, **options) -> "Pipeline":
"""
Expand Down Expand Up @@ -1679,8 +1737,10 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
self.load_scripts()
if self.transaction or self.explicit_transaction:
execute = self._execute_transaction
operation_name = "MULTI"
else:
execute = self._execute_pipeline
operation_name = "PIPELINE"

conn = self.connection
if not conn:
Expand All @@ -1689,11 +1749,40 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
# back to the pool after we're done
self.connection = conn

# Start timing for observability
start_time = time.monotonic()

try:
return conn.retry.call_with_retry(
response = conn.retry.call_with_retry(
lambda: execute(conn, stack, raise_on_error),
lambda error: self._disconnect_raise_on_watching(conn, error),
)

self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=operation_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
batch_size=len(stack),
)
)
return response
except Exception as e:
self._event_dispatcher.dispatch(
AfterCommandExecutionEvent(
command_name=operation_name,
duration_seconds=time.monotonic() - start_time,
server_address=conn.host,
server_port=conn.port,
db_namespace=str(conn.db),
error=e,
batch_size=len(stack),
)
)
raise

finally:
# in reset() the connection is disconnected before returned to the pool if
# it is marked for reconnect.
Expand Down
Loading
Loading