33import os
44import pathlib
55import re
6-
6+ import datetime as dt
77import redis
8- from redisbench_admin .run .metrics import collect_redis_metrics
9- from redisbench_admin .run_remote .run_remote import export_redis_metrics
108
119from redis_benchmarks_specification .__common__ .timeseries import (
1210 timeseries_test_sucess_flow ,
11+ push_data_to_redistimeseries ,
12+ get_project_ts_tags ,
1313)
1414
1515
1616def execute_init_commands (benchmark_config , r , dbconfig_keyname = "dbconfig" ):
1717 cmds = None
18+ lua_scripts = None
1819 res = 0
1920 if dbconfig_keyname in benchmark_config :
2021 for k , v in benchmark_config [dbconfig_keyname ].items ():
2122 if "init_commands" in k :
2223 cmds = v
24+ elif "init_lua" in k :
25+ lua_scripts = v
2326
2427 if type (cmds ) == str :
2528 cmds = [cmds ]
@@ -37,7 +40,7 @@ def execute_init_commands(benchmark_config, r, dbconfig_keyname="dbconfig"):
3740 quoting = csv .QUOTE_ALL ,
3841 skipinitialspace = True ,
3942 ):
40- if lines [0 ] != " " and len (lines [0 ]) > 0 :
43+ if len ( lines ) > 0 and lines [0 ] != " " and len (lines [0 ]) > 0 :
4144 cols .append (lines [0 ])
4245 cmd = cols
4346 is_array = True
@@ -57,6 +60,21 @@ def execute_init_commands(benchmark_config, r, dbconfig_keyname="dbconfig"):
5760 )
5861 )
5962
63+ # Process init_lua scripts
64+ if lua_scripts is not None :
65+ if type (lua_scripts ) == str :
66+ lua_scripts = [lua_scripts ]
67+
68+ for lua_script in lua_scripts :
69+ try :
70+ logging .info ("Executing Lua script (length: {} chars)" .format (len (lua_script )))
71+ # Execute the Lua script using EVAL command with 0 keys
72+ stdout = r .execute_command ("EVAL" , lua_script , 0 )
73+ logging .info ("Lua script result: {}" .format (stdout ))
74+ res = res + 1
75+ except Exception as e :
76+ logging .error ("Lua script execution failed: {}" .format (e ))
77+
6078 return res
6179
6280
@@ -119,6 +137,210 @@ def extract_testsuites(args):
119137 return testsuite_spec_files
120138
121139
140+
141+ def commandstats_latencystats_process_name (
142+ metric_name , prefix , setup_name , variant_labels_dict
143+ ):
144+ if prefix in metric_name :
145+ command_and_metric_and_shard = metric_name [len (prefix ) :]
146+ command = (
147+ command_and_metric_and_shard [0 ]
148+ + command_and_metric_and_shard [1 :].split ("_" , 1 )[0 ]
149+ )
150+ metric_and_shard = command_and_metric_and_shard [1 :].split ("_" , 1 )[1 ]
151+ metric = metric_and_shard
152+ shard = "1"
153+ if "_shard_" in metric_and_shard :
154+ metric = metric_and_shard .split ("_shard_" )[0 ]
155+ shard = metric_and_shard .split ("_shard_" )[1 ]
156+ variant_labels_dict ["metric" ] = metric
157+ variant_labels_dict ["command" ] = command
158+ variant_labels_dict ["command_and_metric" ] = "{} - {}" .format (command , metric )
159+ variant_labels_dict ["command_and_metric_and_setup" ] = "{} - {} - {}" .format (
160+ command , metric , setup_name
161+ )
162+ variant_labels_dict ["command_and_setup" ] = "{} - {}" .format (command , setup_name )
163+ variant_labels_dict ["shard" ] = shard
164+ variant_labels_dict ["metric_and_shard" ] = metric_and_shard
165+
166+ version = None
167+ branch = None
168+ if "version" in variant_labels_dict :
169+ version = variant_labels_dict ["version" ]
170+ if "branch" in variant_labels_dict :
171+ branch = variant_labels_dict ["branch" ]
172+
173+ if version is not None :
174+ variant_labels_dict ["command_and_metric_and_version" ] = (
175+ "{} - {} - {}" .format (command , metric , version )
176+ )
177+ variant_labels_dict ["command_and_metric_and_setup_and_version" ] = (
178+ "{} - {} - {} - {}" .format (command , metric , setup_name , version )
179+ )
180+
181+ if branch is not None :
182+ variant_labels_dict ["command_and_metric_and_branch" ] = (
183+ "{} - {} - {}" .format (command , metric , branch )
184+ )
185+ variant_labels_dict ["command_and_metric_and_setup_and_branch" ] = (
186+ "{} - {} - {} - {}" .format (command , metric , setup_name , branch )
187+ )
188+
189+
190+ def collect_redis_metrics (
191+ redis_conns ,
192+ sections = ["memory" , "cpu" , "commandstats" , "latencystats" ],
193+ section_filter = None ,
194+ ):
195+ start_time = dt .datetime .utcnow ()
196+ start_time_ms = int ((start_time - dt .datetime (1970 , 1 , 1 )).total_seconds () * 1000 )
197+ res = []
198+ overall = {}
199+ multi_shard = False
200+ if len (redis_conns ) > 1 :
201+ multi_shard = True
202+ for conn_n , conn in enumerate (redis_conns ):
203+ conn_res = {}
204+ for section in sections :
205+ info = conn .info (section )
206+ conn_res [section ] = info
207+ if section not in overall :
208+ overall [section ] = {}
209+ for k , v in info .items ():
210+ collect = True
211+ if section_filter is not None :
212+ if section in section_filter :
213+ if k not in section_filter [section ]:
214+ collect = False
215+ if collect and type (v ) is float or type (v ) is int :
216+ if k not in overall [section ]:
217+ overall [section ][k ] = 0
218+ overall [section ][k ] += v
219+ if collect and type (v ) is dict :
220+ for inner_k , inner_v in v .items ():
221+ if type (inner_v ) is float or type (inner_v ) is int :
222+ final_str_k = "{}_{}" .format (k , inner_k )
223+ if multi_shard :
224+ final_str_k += "_shard_{}" .format (conn_n + 1 )
225+ if final_str_k not in overall [section ]:
226+ overall [section ][final_str_k ] = inner_v
227+
228+ res .append (conn_res )
229+
230+ kv_overall = {}
231+ for sec , kv_detail in overall .items ():
232+ for k , metric_value in kv_detail .items ():
233+ metric_name = "{}_{}" .format (sec , k )
234+ kv_overall [metric_name ] = metric_value
235+
236+ return start_time_ms , res , kv_overall
237+
238+ def export_redis_metrics (
239+ artifact_version ,
240+ end_time_ms ,
241+ overall_end_time_metrics ,
242+ rts ,
243+ setup_name ,
244+ setup_type ,
245+ test_name ,
246+ tf_github_branch ,
247+ tf_github_org ,
248+ tf_github_repo ,
249+ tf_triggering_env ,
250+ metadata_dict = None ,
251+ expire_ms = 0 ,
252+ git_hash = None ,
253+ running_platform = None ,
254+ ):
255+ datapoint_errors = 0
256+ datapoint_inserts = 0
257+ sprefix = (
258+ "ci.benchmarks.redis/"
259+ + "{triggering_env}/{github_org}/{github_repo}" .format (
260+ triggering_env = tf_triggering_env ,
261+ github_org = tf_github_org ,
262+ github_repo = tf_github_repo ,
263+ )
264+ )
265+ logging .info (
266+ "Adding a total of {} server side metrics collected at the end of benchmark" .format (
267+ len (list (overall_end_time_metrics .items ()))
268+ )
269+ )
270+ timeseries_dict = {}
271+ by_variants = {}
272+ if tf_github_branch is not None and tf_github_branch != "" :
273+ by_variants ["by.branch/{}" .format (tf_github_branch )] = {
274+ "branch" : tf_github_branch
275+ }
276+ if git_hash is not None and git_hash != "" :
277+ by_variants ["by.hash/{}" .format (git_hash )] = {
278+ "hash" : git_hash
279+ }
280+ if artifact_version is not None and artifact_version != "" :
281+ by_variants ["by.version/{}" .format (artifact_version )] = {
282+ "version" : artifact_version
283+ }
284+ for (
285+ by_variant ,
286+ variant_labels_dict ,
287+ ) in by_variants .items ():
288+ for (
289+ metric_name ,
290+ metric_value ,
291+ ) in overall_end_time_metrics .items ():
292+ tsname_metric = "{}/{}/{}/benchmark_end/{}/{}" .format (
293+ sprefix ,
294+ test_name ,
295+ by_variant ,
296+ setup_name ,
297+ metric_name ,
298+ )
299+
300+ logging .debug (
301+ "Adding a redis server side metric collected at the end of benchmark."
302+ + " metric_name={} metric_value={} time-series name: {}" .format (
303+ metric_name ,
304+ metric_value ,
305+ tsname_metric ,
306+ )
307+ )
308+ variant_labels_dict ["metric" ] = metric_name
309+ commandstats_latencystats_process_name (
310+ metric_name , "commandstats_cmdstat_" , setup_name , variant_labels_dict
311+ )
312+ commandstats_latencystats_process_name (
313+ metric_name ,
314+ "latencystats_latency_percentiles_usec_" ,
315+ setup_name ,
316+ variant_labels_dict ,
317+ )
318+
319+ variant_labels_dict ["test_name" ] = test_name
320+ if metadata_dict is not None :
321+ variant_labels_dict .update (metadata_dict )
322+
323+ timeseries_dict [tsname_metric ] = {
324+ "labels" : get_project_ts_tags (
325+ tf_github_org ,
326+ tf_github_repo ,
327+ setup_name ,
328+ setup_type ,
329+ tf_triggering_env ,
330+ variant_labels_dict ,
331+ None ,
332+ running_platform ,
333+ ),
334+ "data" : {end_time_ms : metric_value },
335+ }
336+ i_errors , i_inserts = push_data_to_redistimeseries (rts , timeseries_dict , expire_ms )
337+ datapoint_errors = datapoint_errors + i_errors
338+ datapoint_inserts = datapoint_inserts + i_inserts
339+ return datapoint_errors , datapoint_inserts
340+
341+
342+
343+
122344def reset_commandstats (redis_conns ):
123345 for pos , redis_conn in enumerate (redis_conns ):
124346 logging .info ("Resetting commmandstats for shard {}" .format (pos ))
@@ -248,13 +470,16 @@ def exporter_datasink_common(
248470 deployment_type_and_name = f"{ setup_type } _AND_{ setup_name } "
249471 deployment_type_and_name_and_version = f"{ setup_type } _AND_{ setup_name } _AND_{ git_version } "
250472
251- # Add to deployment-specific set
252- deployment_set_key = f"ci.benchmarks.redis/{ tf_triggering_env } /{ deployment_type_and_name_and_version } :set"
253- datasink_conn .sadd (deployment_set_key , test_name )
473+ # Add to deployment-specific set (only if datasink connection is available)
474+ if datasink_conn is not None :
475+ deployment_set_key = f"ci.benchmarks.redis/{ tf_triggering_env } /{ deployment_type_and_name_and_version } :set"
476+ datasink_conn .sadd (deployment_set_key , test_name )
254477
255- # Add to testcases set
256- testcases_set_key = f"ci.benchmarks.redis/{ tf_triggering_env } /testcases:set"
257- datasink_conn .sadd (testcases_set_key , test_name )
478+ # Add to testcases set
479+ testcases_set_key = f"ci.benchmarks.redis/{ tf_triggering_env } /testcases:set"
480+ datasink_conn .sadd (testcases_set_key , test_name )
481+ else :
482+ logging .debug ("Datasink connection not available, skipping set operations" )
258483
259484 # Add metadata fields to timeseries metadata
260485 metadata ["deployment_type_AND_deployment_name" ] = deployment_type_and_name
0 commit comments