@@ -1051,3 +1051,60 @@ async def test_hiredis(df_factory):
10511051 server .start ()
10521052 client = base_redis .Redis (port = server .port , protocol = 3 , cache_config = CacheConfig ())
10531053 client .ping ()
1054+
1055+
1056+ @dfly_args ({"proactor_threads" : 1 })
1057+ async def test_pipeline_cache_size (df_factory ):
1058+ server = df_factory .create (proactor_threads = 1 )
1059+ server .start ()
1060+
1061+ # Start 1 client.
1062+ good_client = server .client ()
1063+
1064+ await good_client .execute_command ("set foo bar" )
1065+
1066+ bad_actor_client = server .client ()
1067+
1068+ info = await bad_actor_client .info ()
1069+
1070+ # Cache is empty.
1071+ assert info ["pipeline_cache_bytes" ] == 0
1072+ assert info ["dispatch_queue_bytes" ] == 0
1073+
1074+ async def push_pipeline (bad_actor_client ):
1075+ # Fill cache.
1076+ p = bad_actor_client .pipeline (transaction = True )
1077+ for i in range (1 ):
1078+ p .lpush (str (i ), "V" )
1079+ await p .execute ()
1080+
1081+ await push_pipeline (bad_actor_client )
1082+ info = await good_client .info ()
1083+
1084+ old_pipeline_cache_bytes = info ["pipeline_cache_bytes" ]
1085+ assert old_pipeline_cache_bytes > 0
1086+ assert info ["dispatch_queue_bytes" ] == 0
1087+
1088+ # Whoops, total pipeline_cache_bytes haven't changed. If a workload aggregates a bunch
1089+ # pipeline_cache_bytes because it recycled too many messages, they won't gradually be released
1090+ # if one command (one connection out of `n` connections) dispatches async. Only 1 command out of
1091+ # n connections must be dispatched async and the pipeline won't gradually be relesed.
1092+ for i in range (30 ):
1093+ await push_pipeline (bad_actor_client )
1094+ await good_client .execute_command (f"set foo{ i } bar" )
1095+
1096+ info = await good_client .info ()
1097+
1098+ # Pipeline cache bytes remained constant :(
1099+ assert old_pipeline_cache_bytes == info ["pipeline_cache_bytes" ]
1100+ assert info ["dispatch_queue_bytes" ] == 0
1101+
1102+ # Now drain it
1103+ for i in range (30 ):
1104+ await good_client .execute_command (f"set foo{ i } bar" )
1105+
1106+ info = await good_client .info ()
1107+
1108+ # Drained
1109+ assert info ["pipeline_cache_bytes" ] == 0
1110+ assert info ["dispatch_queue_bytes" ] == 0
0 commit comments