Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/trace_load_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pyperf

from hta.common.trace import parse_trace_file, Trace
from hta.common.trace_collection import parse_trace_file, TraceCollection
from hta.common.trace_parser import set_default_trace_parsing_backend

from hta.configs.config import logger
Expand Down Expand Up @@ -40,7 +40,7 @@ def load_and_parse_trace(
range_it = range(loops)
t0 = pyperf.perf_counter()
for _ in range_it:
trace = Trace(trace_dir=trace_dir)
trace = TraceCollection(trace_dir=trace_dir)
trace.parse_traces(max_ranks=max_ranks, use_multiprocessing=use_multiprocessing)
return pyperf.perf_counter() - t0

Expand Down
27 changes: 15 additions & 12 deletions examples/symbol_table_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import pandas as pd
import plotly.express as px

from hta.common.trace import Trace
from hta.common.trace_collection import TraceCollection

path_to_hta = "~/HolisticTraceAnalysis"
trace_dir: str = path_to_hta + "/tests/data/vision_transformer"
Expand All @@ -32,20 +32,22 @@ def set_pandas_display_options():
pd.set_option("display.float_format", "{:.2f}".format)


def demo_statistics(trace: Trace, rank: int, k: Optional[int] = None) -> pd.DataFrame:
def demo_statistics(
trace: TraceCollection, rank: int, k: Optional[int] = None
) -> pd.DataFrame:
"""
Show the first k items of the kernels by duration in a specific rank's trace.
<rank>

Args:
trace: a Trace instance.
trace: a TraceCollection instance.
rank: the rank to be analyzed.
k: how many items to show in the output; If None, then show all items.

Returns:
The resulted dataframe from this analysis.
"""
df = trace.get_trace(rank)
df = trace.get_trace_df(rank)
sym_id_map = trace.symbol_table.get_sym_id_map()
sym_table = trace.symbol_table.get_sym_table()

Expand Down Expand Up @@ -98,24 +100,25 @@ def demo_visualization(df: pd.DataFrame, title: str, visualize: bool = False) ->
logging.info(f"{title}\n{df}\n")


def load_trace(trace_dir, max_ranks) -> Trace:
trace = Trace(trace_dir=trace_dir)
def load_trace(trace_dir, max_ranks) -> TraceCollection:
trace = TraceCollection(trace_dir=trace_dir)
trace.parse_traces(max_ranks=max_ranks, use_multiprocessing=True)
return trace


def run_demo(
trace_dir: str,
max_ranks: int,
preloaded_trace: Optional[Trace] = None,
preloaded_trace: Optional[TraceCollection] = None,
):
"""_summary_

Args:
trace_name (str): name of the trace
base_trace_dir (str): the base path of the traces
max_ranks (int): maximum number of ranks to be analyzed
preloaded_trace (Optional[Trace], optional): a preloaded trace. Defaults to None.
preloaded_trace (Optional[TraceCollection], optional): a preloaded collection of traces.
Defaults to None.
"""
# load the trace
if preloaded_trace is None:
Expand Down Expand Up @@ -148,13 +151,13 @@ def run_demo(
break
logging.info("\n===End of Symbol to ID Map\n")

df = demo_trace.get_trace(0)
df = demo_trace.get_trace_df(0)
logging.info(f"\n===Data Frame of Rank-0===\ntype={type(df)}\n")
logging.info(f"\n{df}\n")
logging.info("\n===End of Data Frame\n")

logging.info(f"===Data Frame Info===\ntype={type(df)}\n")
demo_trace.get_trace(0).info()
demo_trace.get_trace_df(0).info()

logging.info("\n===Kernel Statistics===\n")
top_k: int = 10
Expand All @@ -171,9 +174,9 @@ def run_demo(
)


def trace_info(trace: Trace):
def trace_info(trace: TraceCollection):
rank = next(iter(trace.traces))
df = trace.get_trace(rank)
df = trace.get_trace_df(rank)
logging.info(f"\n===Dataframe of Rank {rank}")
df.info()

Expand Down
43 changes: 23 additions & 20 deletions hta/analyzers/breakdown_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from hta.common.singletrace import Trace
from hta.common.trace_filter import GPUKernelFilter
from hta.common.trace_symbol_table import decode_symbol_id_to_symbol_name

Expand All @@ -23,7 +24,7 @@
# import statement used without the "if TYPE_CHECKING" guard will cause a circular
# dependency with trace_analysis.py causing mypy to fail and should not be removed.
if TYPE_CHECKING:
from hta.common.trace import Trace
from hta.common.trace_collection import TraceCollection

# This configures the threshold under which we consider gaps between
# kernels to be due to realistic delays in launching back-back kernels on the GPU
Expand All @@ -36,7 +37,7 @@ def __init__(self):
@classmethod
def get_gpu_kernel_breakdown(
cls,
t: "Trace",
t: "TraceCollection",
visualize: bool = True,
duration_ratio: float = 0.8,
num_kernels: int = 10,
Expand Down Expand Up @@ -75,8 +76,8 @@ def get_gpu_kernel_breakdown(
kernel_type_to_analysis.append(KernelType.MEMORY.name)

kernel_per_rank: Dict[str, Dict] = defaultdict(dict)
for rank, trace_df in t.traces.items():
gpu_kernels = trace_df[trace_df["stream"].ne(-1)].copy()
for rank, trace in t.traces.items():
gpu_kernels = trace.df[trace.df["stream"].ne(-1)].copy()
gpu_kernels["kernel_type"] = gpu_kernels[["name"]].apply(
lambda x: get_kernel_type(sym_table[x["name"]]), axis=1
)
Expand Down Expand Up @@ -216,13 +217,13 @@ def get_gpu_kernel_breakdown(
def _get_gpu_kernel_interval_dataframe(
cls,
trace_df: pd.DataFrame,
t: "Trace",
t: "TraceCollection",
) -> pd.DataFrame:
"""Obtains all GPU kernels in the trace dataframe and assigns them
an interval index that can be used for analyzing overlap.
@args: trace_df (pd.DataFrame) : trace df for specific rank
Please make sure this includes "end" column.
@args: t (Trace) : trace object
@args: t (TraceCollection) : object representing collection of traces.

Returns: pd.DataFrame with GPU kernels subset with an interval index
of [start, end) intervals.
Expand All @@ -238,13 +239,13 @@ def _get_gpu_kernel_interval_dataframe(
def _get_gpu_user_anno_interval_dataframe(
cls,
trace_df: pd.DataFrame,
t: "Trace",
t: "TraceCollection",
) -> Optional[pd.DataFrame]:
"""Obtains all GPU user annotations in the trace dataframe and assigns them
an interval index that can be used for analyzing overlap.
@args: trace_df (pd.DataFrame) : trace df for specific rank
Please make sure this includes "end" column.
@args: t (Trace) : trace object
@args: t (TraceCollection) : object representing collection of traces.

Returns: pd.DataFrame with GPU kernels subset with an interval index
of [start, end) intervals.
Expand Down Expand Up @@ -309,7 +310,7 @@ def _associate_gpu_kernels_with_user_annotations(
@classmethod
def get_gpu_kernels_with_user_annotations(
cls,
t: "Trace",
t: "TraceCollection",
rank: int,
expand_names: bool = True,
shortern_names: bool = True,
Expand All @@ -318,7 +319,7 @@ def get_gpu_kernels_with_user_annotations(
GPU user annotation. If the kernel overlaps with multiple user annotations,
we will pick the lowest/leaf annotation in the stack to attribute to.
Read more in get_gpu_kernels_with_user_annotations in hta/trace_analysis.py."""
trace_df = t.get_trace(rank)
trace_df = t.get_trace_df(rank)
trace_df["end"] = trace_df["ts"] + trace_df["dur"]
trace_df["user_annotation"] = -1

Expand All @@ -345,7 +346,7 @@ def get_gpu_kernels_with_user_annotations(
@classmethod
def get_gpu_user_annotation_breakdown(
cls,
t: "Trace",
t: "TraceCollection",
use_gpu_annotation: bool = True,
visualize: bool = True,
duration_ratio: float = 0.8,
Expand Down Expand Up @@ -401,8 +402,8 @@ def get_gpu_user_annotation_breakdown(

kernel_per_rank: Dict[int, pd.DataFrame] = {}

for rank, trace_df in t.traces.items():
gpu_user_annotation_kernels = trace_df[trace_df["cat"].eq(idx)].copy()
for rank, trace in t.traces.items():
gpu_user_annotation_kernels = trace.df[trace.df["cat"].eq(idx)].copy()
t.symbol_table.add_symbols_to_trace_df(gpu_user_annotation_kernels, "name")
logger.info(
f"rank = {rank}, num {annotation}s = {len(gpu_user_annotation_kernels)}"
Expand Down Expand Up @@ -639,15 +640,17 @@ def _get_idle_time_for_kernels(cls, kernels_df: pd.DataFrame) -> Tuple[int, int]
return kernel_time - kernel_run_time, kernel_time

@classmethod
def get_temporal_breakdown(cls, t: "Trace", visualize: bool = True) -> pd.DataFrame:
def get_temporal_breakdown(
cls, t: "TraceCollection", visualize: bool = True
) -> pd.DataFrame:
"""
Temporal breakdown implementation. See `get_temporal_breakdown` in `trace_analysis.py` for details.
"""
sym_table = t.symbol_table.get_sym_table()

def idle_time_per_rank(trace_df: pd.DataFrame) -> Tuple[int, int, int, int]:
def idle_time_per_rank(trace: Trace) -> Tuple[int, int, int, int]:
"""returns idle_time (us) , compute_time (us), non_compute_time (us), total_time (us)"""
gpu_kernels = trace_df[trace_df["stream"].ne(-1)].copy()
gpu_kernels = trace.df[trace.df["stream"].ne(-1)].copy()
idle_time, kernel_time = cls._get_idle_time_for_kernels(gpu_kernels)

gpu_kernels["kernel_type"] = gpu_kernels[["name"]].apply(
Expand All @@ -670,10 +673,10 @@ def idle_time_per_rank(trace_df: pd.DataFrame) -> Tuple[int, int, int, int]:
return idle_time, compute_time, non_compute_time, kernel_time

result: Dict[str, List[float]] = defaultdict(list)
for rank, trace_df in t.traces.items():
for rank, trace in t.traces.items():
result["rank"].append(rank)
idle_time, compute_time, non_compute_time, kernel_time = idle_time_per_rank(
trace_df
trace
)
result["idle_time(us)"].append(idle_time)
result["compute_time(us)"].append(compute_time)
Expand Down Expand Up @@ -802,7 +805,7 @@ def _analyze_idle_time_for_stream(
@classmethod
def get_idle_time_breakdown(
cls,
t: "Trace",
t: "TraceCollection",
consecutive_kernel_delay: int,
rank: int = 0,
streams: Optional[List[int]] = None,
Expand All @@ -824,7 +827,7 @@ def get_idle_time_breakdown(
and median of idle intervals between kernels on a CUDA stream, also broken down by
the idleness category (default = False).
"""
trace_df: pd.DataFrame = t.get_trace(rank)
trace_df: pd.DataFrame = t.get_trace_df(rank)

# Need to filter out events with `cuda_sync` category
kernel_cats = [
Expand Down
15 changes: 9 additions & 6 deletions hta/analyzers/communication_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,33 @@
import pandas as pd
import plotly.express as px

from hta.common.singletrace import Trace
from hta.utils.utils import get_kernel_type, KernelType, merge_kernel_intervals

# import statement used without the "if TYPE_CHECKING" guard will cause a circular
# dependency with trace_analysis.py causing mypy to fail and should not be removed.
if TYPE_CHECKING:
from hta.trace import Trace
from hta.common.trace_collection import TraceCollection


class CommunicationAnalysis:
def __init__(self):
pass

@classmethod
def get_comm_comp_overlap(cls, t: "Trace", visualize: bool = True) -> pd.DataFrame:
def get_comm_comp_overlap(
cls, t: "TraceCollection", visualize: bool = True
) -> pd.DataFrame:
"""
Communication analysis implementation. See `get_comm_comp_overlap` in `trace_analysis.py` for details.
"""
sym_table = t.symbol_table.get_sym_table()

def get_comm_comp_overlap_value(trace_df: pd.DataFrame) -> float:
def get_comm_comp_overlap_value(trace: Trace) -> float:
"""
Compute the overlap percentage between communication and computation kernels for one rank.
"""
gpu_kernels = trace_df[trace_df["stream"].ne(-1)].copy()
gpu_kernels = trace.df[trace.df["stream"].ne(-1)].copy()
gpu_kernels["kernel_type"] = gpu_kernels[["name"]].apply(
lambda x: get_kernel_type(sym_table[x["name"]]), axis=1
)
Expand Down Expand Up @@ -75,10 +78,10 @@ def get_comm_comp_overlap_value(trace_df: pd.DataFrame) -> float:
).sum()

result: Dict[str, List[float]] = defaultdict(list)
for rank, trace_df in t.traces.items():
for rank, trace in t.traces.items():
result["rank"].append(rank)
result["comp_comm_overlap_ratio"].append(
get_comm_comp_overlap_value(trace_df)
get_comm_comp_overlap_value(trace)
)
result_df = pd.DataFrame(result)
result_df["comp_comm_overlap_pctg"] = round(
Expand Down
Loading