1+ import networkx as nx
2+ import pandas as pd
3+ import numpy as np
4+ import itertools
5+ import os
6+
7+ from sklearn .decomposition import PCA
8+ # function from the SciPy library to calculate the distance between embedding vectors.
9+ from scipy .spatial .distance import cosine
10+
11+ G = nx .Graph ()
12+ #=======================================================================================================================
13+ # CONFIGURATION
14+ #=======================================================================================================================
15+
16+ dataset_path = 'Dataset'
17+ daily_subsequence_length = 96
18+ weekly_window_size = 672
19+ all_weekly_embeddings = {}
20+ column = "total_power_max"
21+
22+ #=======================================================================================================================
23+ # PHYSICAL RACK NODE CREATION (ADD MANUALLY)
24+ #=======================================================================================================================
25+ # Adding each rack as a rack type node
26+
27+ G .add_node (33 , type = 'rack' , x = 20 , y = 10 )
28+ G .add_node (34 , type = 'rack' , x = 19 , y = 10 )
29+ G .add_node (35 , type = 'rack' , x = 18 , y = 10 )
30+
31+ G .add_node (18 , type = 'rack' , x = 20 , y = 6 )
32+ G .add_node (19 , type = 'rack' , x = 19 , y = 6 )
33+ G .add_node (20 , type = 'rack' , x = 18 , y = 6 )
34+
35+ G .add_node (0 , type = 'rack' , x = 21 , y = 2 )
36+ G .add_node (1 , type = 'rack' , x = 20 , y = 2 )
37+ G .add_node (2 , type = 'rack' , x = 19 , y = 2 )
38+ G .add_node (3 , type = 'rack' , x = 18 , y = 2 )
39+
40+ #=======================================================================================================================
41+ # PHYSICAL SERVER NODE CREATION
42+ #=======================================================================================================================
43+
44+ # Adding each parquet file as a server type node
45+ # Loop through each folder in the dataset_path directory
46+ for rack_id_str in os .listdir (dataset_path ):
47+ rack_path = os .path .join (dataset_path , rack_id_str )
48+
49+ # Make sure it's actually a folder
50+ if os .path .isdir (rack_path ):
51+ # Get a list of all parquet files inside the rack folder
52+ parquet_files = [f for f in os .listdir (rack_path ) if f .endswith ('.parquet' )]
53+
54+ # Sort the files numerically based on their name
55+ parquet_files .sort (key = lambda name : int (name .split ('.' )[0 ]))
56+
57+ # Loop through the sorted files to create a server node for each
58+ for z_index , filename in enumerate (parquet_files ):
59+ rack_id = int (rack_id_str )
60+ server_id = f"{ rack_id } -{ z_index } "
61+
62+ G .add_node (server_id , type = 'server' , rack_id = rack_id , z_index = z_index )
63+
64+ #=======================================================================================================================
65+ # PHYSICAL EDGE CREATION
66+ #=======================================================================================================================
67+
68+ #-----------------------------------------------------------------------------------------------------------------------
69+ # SERVER VERTCAL NEIGHBOUR EDGE CREATION
70+ #-----------------------------------------------------------------------------------------------------------------------
71+
72+ # Adding edges to server nodes that are above/below one another on the same rack
73+ for server_id , attributes in G .nodes (data = True ):
74+ if attributes .get ('type' ) == 'server' :
75+ rack_id = attributes .get ('rack_id' )
76+ z_index = attributes .get ('z_index' )
77+ G .add_edge (server_id , rack_id , type = 'PART_OF' )
78+ if z_index > 0 :
79+ server_below_id = f"{ rack_id } -{ z_index - 1 } "
80+ G .add_edge (server_id , server_below_id , type = 'ABOVE' )
81+
82+ #-----------------------------------------------------------------------------------------------------------------------
83+ # RACK ADJACENT TO RACK EDGE CREATION
84+ #-----------------------------------------------------------------------------------------------------------------------
85+
86+ rack_nodes = [node for node , attr in G .nodes (data = True ) if attr .get ('type' ) == "rack" ]
87+ for rack1 , rack2 in itertools .combinations (rack_nodes , 2 ):
88+ x1 , y1 = G .nodes [rack1 ]['x' ], G .nodes [rack1 ]['y' ]
89+ x2 , y2 = G .nodes [rack2 ]['x' ], G .nodes [rack2 ]['y' ]
90+ if y1 == y2 and abs (x1 - x2 ) == 1 :
91+ G .add_edge (rack1 , rack2 , type = 'ADJACENT_TO' )
92+
93+ #-----------------------------------------------------------------------------------------------------------------------
94+ # SAME LEVEL SERVER EDGE CREATION
95+ #-----------------------------------------------------------------------------------------------------------------------
96+ servers_by_rack = {}
97+ for server_id , attr in G .nodes (data = True ):
98+ if attr .get ('type' ) == 'server' :
99+ rack_id = attr .get ('rack_id' )
100+ z_index = attr .get ('z_index' )
101+ if rack_id not in servers_by_rack :
102+ servers_by_rack [rack_id ] = {}
103+ servers_by_rack [rack_id ][z_index ] = server_id
104+
105+ for rack1 , rack2 , attr in G .edges (data = True ):
106+ if attr .get ('type' ) == 'ADJACENT_TO' :
107+ servers_in_rack1 = servers_by_rack .get (rack1 , {})
108+ servers_in_rack2 = servers_by_rack .get (rack2 , {})
109+ common_z_indexes = set (servers_in_rack1 .keys ()) & set (servers_in_rack2 .keys ())
110+ for z in common_z_indexes :
111+ G .add_edge (servers_in_rack1 [z ], servers_in_rack2 [z ], type = 'SAME_LEVEL_AS' )
112+
113+ #=======================================================================================================================
114+ # TIME SERIES DATA
115+ #=======================================================================================================================
116+
117+ def generate_embedding_from_series (series_data , subsequence_length ):
118+ if len (series_data ) < subsequence_length :
119+ return None
120+
121+ length = subsequence_length
122+ convolution_size = length // 3
123+ if convolution_size == 0 : return None
124+
125+ initial_P = []
126+ first_subseq = series_data [:length ]
127+ for j in range (length - convolution_size + 1 ):
128+ window_sum = np .sum (first_subseq [j :j + convolution_size ])
129+ initial_P .append (window_sum )
130+ initial_P = np .array (initial_P )
131+
132+ Proj = [initial_P ]
133+ for i in range (1 , len (series_data ) - length + 1 ):
134+ new_window_sum = np .sum (series_data [i + length - convolution_size : i + length ])
135+ new_P_vector = np .empty_like (Proj [- 1 ])
136+ new_P_vector [:- 1 ] = Proj [- 1 ][1 :]
137+ new_P_vector [- 1 ] = new_window_sum
138+ Proj .append (new_P_vector )
139+ Proj = np .array (Proj )
140+
141+ if Proj .shape [0 ] < 3 or Proj .shape [1 ] < 3 :
142+ return None
143+
144+ pca = PCA (n_components = 3 )
145+ Proj_reduced = pca .fit_transform (Proj )
146+
147+ max_val , min_val = np .max (series_data ), np .min (series_data )
148+ input_value = (max_val - min_val ) * convolution_size
149+ ref_input_vector = np .full ((1 , Proj .shape [1 ]), input_value )
150+ v_ref = pca .transform (ref_input_vector )[0 ]
151+
152+ def _calculate_scalar_angle (v1 , v2 ):
153+ v1_norm = v1 / np .linalg .norm (v1 )
154+ v2_norm = v2 / np .linalg .norm (v2 )
155+ dot_product = np .dot (v1_norm , v2_norm )
156+ return np .arccos (np .clip (dot_product , - 1.0 , 1.0 ))
157+
158+ u_x , u_y , u_z = np .array ([1 ,0 ,0 ]), np .array ([0 ,1 ,0 ]), np .array ([0 ,0 ,1 ])
159+ phi_x , phi_y , phi_z = _calculate_scalar_angle (u_x , v_ref ), _calculate_scalar_angle (u_y , v_ref ), _calculate_scalar_angle (u_z , v_ref )
160+
161+ def _get_rotation_matrix_x (a ): c ,s = np .cos (a ),np .sin (a ); return np .array ([[1 ,0 ,0 ],[0 ,c ,- s ],[0 ,s ,c ]])
162+ def _get_rotation_matrix_y (a ): c ,s = np .cos (a ),np .sin (a ); return np .array ([[c ,0 ,s ],[0 ,1 ,0 ],[- s ,0 ,c ]])
163+ def _get_rotation_matrix_z (a ): c ,s = np .cos (a ),np .sin (a ); return np .array ([[c ,- s ,0 ],[s ,c ,0 ],[0 ,0 ,1 ]])
164+
165+ R_ux , R_uy , R_uz = _get_rotation_matrix_x (phi_x ), _get_rotation_matrix_y (phi_y ), _get_rotation_matrix_z (phi_z )
166+ current_SProj_3d = Proj_reduced @ R_ux .T @ R_uy .T @ R_uz .T
167+ SProj = current_SProj_3d [:, 1 :]
168+
169+ # np.mean(SProj, axis=0)
170+ # The average of all the ry values for that week.
171+ # The average of all the rz values for that week.
172+ # final_embedding is an array containing [average_ry, average_rz]
173+ final_embedding = np .mean (SProj , axis = 0 )
174+ return final_embedding
175+
176+ #=======================================================================================================================
177+ # MAIN LOOP TO PROCESS ALL SERVERS
178+ #=======================================================================================================================
179+ print ("Starting embedding generation.." )
180+ """
181+ It loops through each rack folder and each server's Parquet file within it. For each server, it breaks down its entire
182+ power usage time series into weekly chunks. It then calls 'generate_embedding_from_series' on each
183+ of these weekly chunks to convert it into a single vector that represents that week's behavior. Finally, it
184+ stores all these vectors in a dictionary, organized by week number and server ID.
185+ """
186+
187+ # Start a loop that iterates through every file and folder inside the 'Dataset' directory.
188+ # 'os.listdir' gets a list of all item names, and 'rack_id_str' will be the file name e.g., '0', '1', '18'.
189+ for rack_id_str in os .listdir (dataset_path ):
190+
191+ # Create a path to the item.
192+ # 'os.path.join' combines path parts e.g., 'Dataset' + '0' -> 'Dataset/0'.
193+ rack_path = os .path .join (dataset_path , rack_id_str )
194+
195+ # Check if the current item is a directory
196+ if os .path .isdir (rack_path ):
197+
198+ # Get a list of all files inside the current rack folder that end with '.parquet'.
199+ parquet_files = [f for f in os .listdir (rack_path ) if f .endswith ('.parquet' )]
200+
201+ # It sorts the list of filenames numerically, not alphabetically.
202+ # key=lambda name: Specifies a custom sorting rule.
203+ # name.split('.')[0] Takes a filename like '10.parquet', splits it at '.', and takes the first part ('10').
204+ # int(...) Converts the datatype to int.
205+ parquet_files .sort (key = lambda name : int (name .split ('.' )[0 ]))
206+
207+ # Start a new loop that goes through the sorted list of Parquet files for the current rack.
208+ # We use index ('enumerate') for z_index
209+ for z_index , filename in enumerate (parquet_files ):
210+
211+ # Convert the rack ID from a string (e.g., '0') to an integer (0).
212+ rack_id = int (rack_id_str )
213+
214+ # Create a unique ID for the server
215+ server_id = f"{ rack_id } -{ z_index } "
216+ print (f"Processing server: { server_id } " )
217+
218+ # Create the full path to the specific server's Parquet file.
219+ file_path = os .path .join (rack_path , filename )
220+
221+ # Use the pandas library to read the Parquet file into a dataframe
222+ df = pd .read_parquet (file_path )
223+
224+ # Prepare the time-series data for analysis.
225+ # df["total_power_max"] Selects just the 'total_power_max' column from the DataFrame.
226+ # .dropna() Removes any rows with missing (NaN) values.
227+ # .values Converts the data from a pandas Series into a NumPy array for faster execution.
228+ series = df [column ].dropna ().values
229+
230+ # Start the innermost loop. This slides a window across the server's full time series.
231+ # 'range(start, stop, step)' it starts at index 0 and jumps forward by 'weekly_window_size' each time.
232+ for i in range (0 , len (series ), weekly_window_size ):
233+
234+ # Calculate the week number.
235+ week_index = i // weekly_window_size
236+
237+ # Extract the data for the current week using array slicing.
238+ weekly_data_chunk = series [i : i + weekly_window_size ]
239+
240+ # Call our function to turn this week's data into a single embedding vector.
241+ embedding = generate_embedding_from_series (weekly_data_chunk , daily_subsequence_length )
242+
243+ if embedding is not None :
244+
245+ # Check if we have seen this week_index before.
246+ if week_index not in all_weekly_embeddings :
247+ # If not, create a new empty dictionary for it.
248+ all_weekly_embeddings [week_index ] = {}
249+
250+ # Store the calculated embedding in our nested dictionary.
251+ # The structure is: {week_index: {server_id: embedding_vector}}
252+ all_weekly_embeddings [week_index ][server_id ] = embedding
253+
254+ #=======================================================================================================================
255+ # WEEKLY SIMILARITY GRAPH CREATION
256+ #=======================================================================================================================
257+ """
258+ Builds a series of weekly "similarity graphs" based on the server embeddings you previously generated. For each week,
259+ it creates a new graph where an edge is drawn between any two servers if their behavior was more than 95% similar.
260+ The final output is a dictionary of these weekly graphs
261+ """
262+
263+ print ("\n Building weekly similarity graphs..." )
264+
265+ # This will store our final graphs, one for each week.
266+ weekly_graphs = {}
267+ similarity_threshold = 0.95 # Connect servers if they are 95% similar or more.
268+
269+ # Loop through each week that we have embeddings for
270+ for week_index , embeddings_dict in all_weekly_embeddings .items ():
271+
272+ # Create a new, empty graph for this week's similarity data.
273+ G_week = nx .Graph ()
274+
275+ # Get a list of all servers that have an embedding for this week.
276+ server_ids = list (embeddings_dict .keys ())
277+
278+ # Compare every server with every other server for this week.
279+ for server1 , server2 in itertools .combinations (server_ids , 2 ):
280+ embedding1 = embeddings_dict [server1 ]
281+ embedding2 = embeddings_dict [server2 ]
282+
283+ # Calculate cosine similarity. The function calculates distance (0=identical, 2=opposite),
284+ # so we subtract from 1 to get similarity (1=identical, -1=opposite).
285+ cosine_similarity = 1 - cosine (embedding1 , embedding2 )
286+
287+ # If the servers are very similar, add a 'SIMILAR_TO' edge
288+ if cosine_similarity >= similarity_threshold :
289+ G_week .add_node (server1 ) # Ensure nodes exist before adding edge
290+ G_week .add_node (server2 )
291+ G_week .add_edge (server1 , server2 , type = 'SIMILAR_TO' , weight = cosine_similarity )
292+
293+ weekly_graphs [week_index ] = G_week
294+ print (
295+ f" - Week { week_index } : Created graph with { G_week .number_of_nodes ()} nodes and { G_week .number_of_edges ()} similarity edges." )
296+
297+ #=======================================================================================================================
298+ # PRINT SPECIFIC WEEK
299+ #=======================================================================================================================
300+ target_week = 90
301+
302+ if target_week in all_weekly_embeddings :
303+ print (f"\n --- Displaying all server embeddings for Week { target_week } ---" )
304+
305+ # Get the inner dictionary containing all server data for that specific week
306+ embeddings_for_target_week = all_weekly_embeddings [target_week ]
307+
308+ # Loop through each server and its embedding in that week's dictionary
309+ # .items() gives us both the key (server_id) and the value (embedding)
310+ for server_id , embedding in embeddings_for_target_week .items ():
311+ # Print the server ID and its calculated embedding vector
312+ print (f" Server: { server_id } , Embedding: { embedding } " )
313+
314+ else :
315+ print (f"\n Week { target_week } was not found in the dataset." )
0 commit comments