Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ venv.bak/

# sphinx docs
docs/build/
.python-version
*.ipynb
*.gz
*.json
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
#### Added

- Improved critical path visualization by filtering out non-essential intermediate nodes, for clearer and more readable trace overlays.
#### Changed

#### Fixed
Expand Down
23 changes: 23 additions & 0 deletions docs/source/features/optimized_critical_path_visualization.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Optimized Critical Path Visualization
=====================================

A new visualization mode has been added to improve the clarity of critical path overlays.

This mode filters out non-essential intermediate nodes(except at the start and end of the path).

This reduces clutter and helps focus on key performance events.

Usage
-----

Call the method :meth:`TraceAnalysis.overlay_critical_change_path_analysis` with appropriate arguments to generate the cleaner trace overlay.

This enhancement aids in debugging and performance analysis in complex heterogeneous workloads.

Example::

analyzer.overlay_critical_change_path_analysis(
rank=0,
critical_path_graph=cp_graph,
output_dir="path/to/output"
)
152 changes: 152 additions & 0 deletions hta/analyzers/critical_path_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2050,3 +2050,155 @@ def get_flow_event(
t.write_raw_trace(output_file, overlaid_trace)

return output_file
@classmethod
def overlay_critical_change_path_analysis(
cls,
t: "Trace",
rank: int,
critical_path_graph: CPGraph,
output_dir: str,
) -> str:
"""
Overlay a new trace that highlights critical change events in the critical path.

This includes:
- Marking start/end events
- Identifying change of thread (TID) as turning points
- Creating flow arrows between them

Args:
t: Trace object
rank: Rank of trace file
critical_path_graph: The CPGraph containing path info
output_dir: Where to write the overlaid trace

Returns:
The path of the generated trace file
"""
sym_index = t.symbol_table.get_sym_id_map()
path = Path(output_dir).expanduser()
if not path.exists():
os.makedirs(path)
elif not path.is_dir():
logger.error(f"{output_dir} is not a directory.")
return ""

output_file = os.path.join(
str(path), "overlaid_critical_path_" + t.trace_files[rank].split("/")[-1]
)
overlaid_trace = t.get_raw_trace_for_one_rank(rank=rank)
raw_events = overlaid_trace["traceEvents"]
# Traverse events and mark them as critical
candidate_events = []
for ev_idx, event in enumerate(raw_events):
if ev_idx in critical_path_graph.critical_path_events_set:
candidate_events.append(event)
event["args"]["critical"] = 1

def custom_sort(event):
if event.get("name") == "cudaStreamSynchronize" or event.get("name") == "cudaDeviceSynchronize" or event.get("name") == "cudaEventSynchronize": #or event.get("cat") == "cpu_op": # 特定类型的事件,按 ts + dur 排序
return event["ts"] + event["dur"]
else:
return event["ts"]
candidate_events = sorted(candidate_events, key=custom_sort)
# Extract the first and last events
end_event = candidate_events[len(candidate_events) - 1]
end_event["args"]["critical_change"] = 1
start_event = candidate_events[0]
start_event["args"]["critical_change"] = 1
# Remove python function and cpu op, because when appearing in the first and last positions, they are not important for display
candidate_events = [event for event in candidate_events if event["cat"] != "cpu_op"]


# Need to additionally handle these critical events
# So as to find those turning point events
critical_change_events = []
critical_change_events.append(start_event)

for ev_idx, event in enumerate(candidate_events):
if ev_idx + 1 < len(candidate_events):
next_event = candidate_events[ev_idx + 1]
last_change_event = critical_change_events[len(critical_change_events) - 1]
if last_change_event["tid"] != next_event["tid"]:
if last_change_event != event:
critical_change_events.append(event)
event["args"]["critical_change"] = 1
critical_change_events.append(next_event)
next_event["args"]["critical_change"] = 1
elif last_change_event != event:
critical_change_events.append(event)
event["args"]["critical_change"] = 1
critical_change_events.append(end_event)

flow_events = []
def get_flow_event(
nid: int,
event: Dict[str, Any],
edge: CPEdge,
flow_id: int,
is_start: bool,
):
# This helps with showing the arrows in chrome trace
if is_start:
ts = event["ts"]
else:
ts = event["ts"] + event["dur"]
return Trace.flow_event(
id=flow_id,
pid=event["pid"],
tid=event["tid"],
ts=event["ts"] if critical_path_graph.node_list[nid].is_start else ts,
is_start=is_start,
name="critical_change_path",
cat=str(edge.type.value),
args={
"weight": int(edge.weight),
"critical": True,
"last_event": event["name"],
"is_end": critical_path_graph.node_list[nid].is_start
},
)
# add edges
events_df = critical_path_graph.trace_df
key_to_index = {
(row["correlation"], row["cat"]): idx
for idx, row in events_df.iterrows()
}
last_node = None
last_event = None
flow_id = 1
for idx, event in enumerate(critical_change_events):
if event["cat"] == "cpu_op":
event_id = events_df[events_df["name"] == sym_index.get(event["name"])].index[0]
else:
key = (event["args"].get("correlation", 0), sym_index.get(event["cat"]))
# Find and add the corresponding index, if not found then add None
event_id = key_to_index.get(key, None)
if event_id is None:
logger.warning(f"Cannot map event to graph node: {event}")
continue
start_node, end_node = critical_path_graph.get_nodes_for_event(event_id)
if start_node is None or end_node is None:
print("start or end node is None")
if last_node is not None:
edge = critical_path_graph._add_edge_helper(last_node, start_node, CPEdgeType.OPERATOR_KERNEL)
u, v = edge.begin, edge.end
if last_event["args"].get("stream", -1) == -1 and last_event["name"] != "cudaEventSynchronize" and last_event["name"] != "cudaStreamSynchronize" and last_event["name"] != "cudaDeviceSynchronize":
is_start = True
else:
is_start =False
flow_evt = get_flow_event(u, last_event, edge, flow_id, is_start=is_start)
flow_evt["ph"] = "t"
flow_events.append(flow_evt)
last_node = end_node
last_event = event
if flow_events:
flow_events[0]["ph"] = "s"
flow_events[-1]["ph"] = "f"
overlaid_trace["traceEvents"] = [
event for event in overlaid_trace["traceEvents"]
if event["ph"] not in ["s", "t", "f", "b", "e"]
]
overlaid_trace["traceEvents"].extend(flow_events)
t.write_raw_trace(output_file, overlaid_trace)
return output_file
13 changes: 13 additions & 0 deletions hta/trace_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,16 @@ def overlay_critical_path_analysis(
only_show_critical_events,
show_all_edges,
)

def overlay_critical_change_path_analysis(
self,
rank: int,
critical_path_graph: CPGraph,
output_dir: str,
) -> str:
return CriticalPathAnalysis.overlay_critical_change_path_analysis(
self.t,
rank,
critical_path_graph,
output_dir,
)