diff --git a/.gitignore b/.gitignore index 4d944e4..f84c97f 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,7 @@ venv.bak/ # sphinx docs docs/build/ +.python-version +*.ipynb +*.gz +*.json \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cc6f70..4fb20c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased #### Added - +- Improved critical path visualization by filtering out non-essential intermediate nodes, for clearer and more readable trace overlays. #### Changed #### Fixed diff --git a/docs/source/features/optimized_critical_path_visualization.rst b/docs/source/features/optimized_critical_path_visualization.rst new file mode 100644 index 0000000..6ca4fc7 --- /dev/null +++ b/docs/source/features/optimized_critical_path_visualization.rst @@ -0,0 +1,23 @@ +Optimized Critical Path Visualization +===================================== + +A new visualization mode has been added to improve the clarity of critical path overlays. + +This mode filters out non-essential intermediate nodes(except at the start and end of the path). + +This reduces clutter and helps focus on key performance events. + +Usage +----- + +Call the method :meth:`TraceAnalysis.overlay_critical_change_path_analysis` with appropriate arguments to generate the cleaner trace overlay. + +This enhancement aids in debugging and performance analysis in complex heterogeneous workloads. + +Example:: + + analyzer.overlay_critical_change_path_analysis( + rank=0, + critical_path_graph=cp_graph, + output_dir="path/to/output" + ) diff --git a/hta/analyzers/critical_path_analysis.py b/hta/analyzers/critical_path_analysis.py index c594413..8aaf824 100644 --- a/hta/analyzers/critical_path_analysis.py +++ b/hta/analyzers/critical_path_analysis.py @@ -2050,3 +2050,155 @@ def get_flow_event( t.write_raw_trace(output_file, overlaid_trace) return output_file + @classmethod + def overlay_critical_change_path_analysis( + cls, + t: "Trace", + rank: int, + critical_path_graph: CPGraph, + output_dir: str, + ) -> str: + """ + Overlay a new trace that highlights critical change events in the critical path. + + This includes: + - Marking start/end events + - Identifying change of thread (TID) as turning points + - Creating flow arrows between them + + Args: + t: Trace object + rank: Rank of trace file + critical_path_graph: The CPGraph containing path info + output_dir: Where to write the overlaid trace + + Returns: + The path of the generated trace file + """ + sym_index = t.symbol_table.get_sym_id_map() + path = Path(output_dir).expanduser() + if not path.exists(): + os.makedirs(path) + elif not path.is_dir(): + logger.error(f"{output_dir} is not a directory.") + return "" + + output_file = os.path.join( + str(path), "overlaid_critical_path_" + t.trace_files[rank].split("/")[-1] + ) + overlaid_trace = t.get_raw_trace_for_one_rank(rank=rank) + raw_events = overlaid_trace["traceEvents"] + # Traverse events and mark them as critical + candidate_events = [] + for ev_idx, event in enumerate(raw_events): + if ev_idx in critical_path_graph.critical_path_events_set: + candidate_events.append(event) + event["args"]["critical"] = 1 + + def custom_sort(event): + if event.get("name") == "cudaStreamSynchronize" or event.get("name") == "cudaDeviceSynchronize" or event.get("name") == "cudaEventSynchronize": #or event.get("cat") == "cpu_op": # 特定类型的事件,按 ts + dur 排序 + return event["ts"] + event["dur"] + else: + return event["ts"] + candidate_events = sorted(candidate_events, key=custom_sort) + # Extract the first and last events + end_event = candidate_events[len(candidate_events) - 1] + end_event["args"]["critical_change"] = 1 + start_event = candidate_events[0] + start_event["args"]["critical_change"] = 1 + # Remove python function and cpu op, because when appearing in the first and last positions, they are not important for display + candidate_events = [event for event in candidate_events if event["cat"] != "cpu_op"] + + + # Need to additionally handle these critical events + # So as to find those turning point events + critical_change_events = [] + critical_change_events.append(start_event) + + for ev_idx, event in enumerate(candidate_events): + if ev_idx + 1 < len(candidate_events): + next_event = candidate_events[ev_idx + 1] + last_change_event = critical_change_events[len(critical_change_events) - 1] + if last_change_event["tid"] != next_event["tid"]: + if last_change_event != event: + critical_change_events.append(event) + event["args"]["critical_change"] = 1 + critical_change_events.append(next_event) + next_event["args"]["critical_change"] = 1 + elif last_change_event != event: + critical_change_events.append(event) + event["args"]["critical_change"] = 1 + critical_change_events.append(end_event) + + flow_events = [] + def get_flow_event( + nid: int, + event: Dict[str, Any], + edge: CPEdge, + flow_id: int, + is_start: bool, + ): + # This helps with showing the arrows in chrome trace + if is_start: + ts = event["ts"] + else: + ts = event["ts"] + event["dur"] + return Trace.flow_event( + id=flow_id, + pid=event["pid"], + tid=event["tid"], + ts=event["ts"] if critical_path_graph.node_list[nid].is_start else ts, + is_start=is_start, + name="critical_change_path", + cat=str(edge.type.value), + args={ + "weight": int(edge.weight), + "critical": True, + "last_event": event["name"], + "is_end": critical_path_graph.node_list[nid].is_start + }, + ) + # add edges + events_df = critical_path_graph.trace_df + key_to_index = { + (row["correlation"], row["cat"]): idx + for idx, row in events_df.iterrows() + } + last_node = None + last_event = None + flow_id = 1 + for idx, event in enumerate(critical_change_events): + if event["cat"] == "cpu_op": + event_id = events_df[events_df["name"] == sym_index.get(event["name"])].index[0] + else: + key = (event["args"].get("correlation", 0), sym_index.get(event["cat"])) + # Find and add the corresponding index, if not found then add None + event_id = key_to_index.get(key, None) + if event_id is None: + logger.warning(f"Cannot map event to graph node: {event}") + continue + start_node, end_node = critical_path_graph.get_nodes_for_event(event_id) + if start_node is None or end_node is None: + print("start or end node is None") + if last_node is not None: + edge = critical_path_graph._add_edge_helper(last_node, start_node, CPEdgeType.OPERATOR_KERNEL) + u, v = edge.begin, edge.end + if last_event["args"].get("stream", -1) == -1 and last_event["name"] != "cudaEventSynchronize" and last_event["name"] != "cudaStreamSynchronize" and last_event["name"] != "cudaDeviceSynchronize": + is_start = True + else: + is_start =False + flow_evt = get_flow_event(u, last_event, edge, flow_id, is_start=is_start) + flow_evt["ph"] = "t" + flow_events.append(flow_evt) + last_node = end_node + last_event = event + if flow_events: + flow_events[0]["ph"] = "s" + flow_events[-1]["ph"] = "f" + overlaid_trace["traceEvents"] = [ + event for event in overlaid_trace["traceEvents"] + if event["ph"] not in ["s", "t", "f", "b", "e"] + ] + overlaid_trace["traceEvents"].extend(flow_events) + t.write_raw_trace(output_file, overlaid_trace) + return output_file diff --git a/hta/trace_analysis.py b/hta/trace_analysis.py index 56704af..d672597 100644 --- a/hta/trace_analysis.py +++ b/hta/trace_analysis.py @@ -765,3 +765,16 @@ def overlay_critical_path_analysis( only_show_critical_events, show_all_edges, ) + + def overlay_critical_change_path_analysis( + self, + rank: int, + critical_path_graph: CPGraph, + output_dir: str, + ) -> str: + return CriticalPathAnalysis.overlay_critical_change_path_analysis( + self.t, + rank, + critical_path_graph, + output_dir, + )