@@ -1051,3 +1051,49 @@ async def test_hiredis(df_factory):
1051
1051
server .start ()
1052
1052
client = base_redis .Redis (port = server .port , protocol = 3 , cache_config = CacheConfig ())
1053
1053
client .ping ()
1054
+
1055
+
1056
+ @dfly_args ({"proactor_threads" : 1 })
1057
+ async def test_pipeline_cache_size (df_factory ):
1058
+ server = df_factory .create (proactor_threads = 1 )
1059
+ server .start ()
1060
+
1061
+ # Start 1 client.
1062
+ good_client = server .client ()
1063
+
1064
+ await good_client .execute_command ("set foo bar" )
1065
+
1066
+ bad_actor_client = server .client ()
1067
+
1068
+ info = await bad_actor_client .info ()
1069
+
1070
+ # Cache is empty.
1071
+ assert info ["pipeline_cache_bytes" ] == 0
1072
+ assert info ["dispatch_queue_bytes" ] == 0
1073
+
1074
+ async def push_pipeline (bad_actor_client ):
1075
+ # Fill cache.
1076
+ p = bad_actor_client .pipeline (transaction = True )
1077
+ for i in range (1 ):
1078
+ p .lpush (str (i ), "V" )
1079
+ await p .execute ()
1080
+
1081
+ await push_pipeline (bad_actor_client )
1082
+ info = await good_client .info ()
1083
+
1084
+ old_pipeline_cache_bytes = info ["pipeline_cache_bytes" ]
1085
+ assert old_pipeline_cache_bytes > 0
1086
+ assert info ["dispatch_queue_bytes" ] == 0
1087
+
1088
+ # Whoops, total pipeline_cache_bytes haven't changed. If a workload aggregates a bunch
1089
+ # pipeline_cache_bytes because it recycled too many messages, they won't gradually be released
1090
+ # if one command (one connection out of `n` connections) dispatches async. Only 1 command out of
1091
+ # n connections must be dispatched async and the pipeline won't gradually be relesed.
1092
+ for i in range (30 ):
1093
+ await push_pipeline (bad_actor_client )
1094
+ await good_client .execute_command (f"set foo{ i } bar" )
1095
+
1096
+ info = await good_client .info ()
1097
+
1098
+ # Pipeline cache bytes remained constant :(
1099
+ assert old_pipeline_cache_bytes == info ["pipeline_cache_bytes" ]
0 commit comments