From 7846da977026b00a00d08ca817f1ed767ff772d0 Mon Sep 17 00:00:00 2001 From: Lars Schneidenbach Date: Tue, 25 Nov 2025 19:22:21 -0500 Subject: [PATCH 1/3] updated checks for problems with hw timestamps Signed-off-by: Lars Schneidenbach --- src/aiu_trace_analyzer/pipeline/timesync.py | 37 +++++++++++++++------ 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/aiu_trace_analyzer/pipeline/timesync.py b/src/aiu_trace_analyzer/pipeline/timesync.py index 338038d..f1dde0a 100644 --- a/src/aiu_trace_analyzer/pipeline/timesync.py +++ b/src/aiu_trace_analyzer/pipeline/timesync.py @@ -60,9 +60,9 @@ def _get_DTS_rela_to_TS1_in_us(event: TraceEvent, freq: float) -> list[float]: return converted -def _get_DTS_rela_to_TS5_in_us(event: TraceEvent, freq: float) -> list[float]: +def _get_DTS_rela_to_TSRef_in_us(event: TraceEvent, freq: float, ref_idx: int) -> list[float]: converted = _conv_DTS_to_array_in_us(event, freq) - converted = [converted[i] - converted[4] for i in range(5)] + converted = [converted[i] - converted[ref_idx] for i in range(5)] return converted @@ -73,12 +73,23 @@ def _convert_cycle_timestamps(event: TraceEvent, freq: float) -> list[float]: Let T_ts5 = T_flex_end Let T_ts[i] = T_ts5 - (TS5-TS[i])/Freq, for i in 1-4 + THIS FN HAS A NASTY SIDE EFFECT OF UPDATING event["ts"] and event["dur"] !!! ''' - wall_clock_t5 = event["ts"] + event["dur"] - converted = _get_DTS_rela_to_TS5_in_us(event, freq) - converted = [wall_clock_t5 + converted[i] for i in range(5)] - event["dur"] = converted[4] - converted[0] - event["ts"] = wall_clock_t5 - event["dur"] + ref_idx = 4 + if event["name"].endswith("Cmpt Prep"): + ref_idx = 2 + if event["name"].endswith(" DmaI"): + ref_idx = 1 + if event["name"].endswith("Cmpt Exec"): + ref_idx = 3 + + wall_clock_tref = event["ts"] + event["dur"] + converted = _get_DTS_rela_to_TSRef_in_us(event, freq, ref_idx) + converted = [wall_clock_tref + converted[i] for i in range(5)] + event["args"]["orig_ts"] = (event["ts"], event["dur"]) + event["dur"] = converted[ref_idx] - converted[0] + event["ts"] = wall_clock_tref - event["dur"] + assert event["ts"] >= 0.0, f"new event ts < 0.0 {event}, {converted}, {wall_clock_tref}" aiulog.log(aiulog.TRACE, "TS CONV: _convert_cycle_timestamps", event["ts"], event["dur"], converted) return converted @@ -95,12 +106,16 @@ def _align_hts_to_beg(event: TraceEvent, freq: float) -> list[float]: def _align_hts_by_type(op_id: int, event: TraceEvent, freq: float) -> list[float]: + """ + THIS FN HAS A NASTY SIDE EFFECT OF UPDATING event["ts"] and event["dur"] !!! + """ converted = _get_DTS_rela_to_TS1_in_us(event, freq) dts_intervals = [converted[i+1] - converted[i] for i in range(4)] hts_end = event["ts"] + event["dur"] event["dur"] = dts_intervals[op_id] event["ts"] = hts_end - event["dur"] + assert event["ts"] >= 0.0, f"new event ts < 0.0 {event}, {hts_end}, {dts_intervals}" converted = [(converted[i] - converted[op_id + 1] + hts_end) for i in range(5)] @@ -135,9 +150,9 @@ def cycle_count_to_wallclock(event: TraceEvent, _: AbstractContext, config: dict if event["ph"] == "X" and "args" in event and "TS1" in event["args"]: event["args"]["ts_all"] = _convert_cycle_timestamps(event, config["soc_frequency"]) assert event["ts"] >= get_cycle_ts_as_clock(1, event["args"]["ts_all"]), \ - "TS1 is projected before the event timestamp. Please check the frequency setting." + f"TS1 is projected before the event timestamp. Please check the frequency setting. {event}" assert event["ts"] + event["dur"] <= get_cycle_ts_as_clock(5, event["args"]["ts_all"]), \ - "TS5 is projected past the end of the event. Please check the frequency setting." + f"TS5 is projected past the end of the event. Please check the frequency setting. {event}" last = event["args"]["ts_all"][0] for i, t in enumerate(event["args"]["ts_all"][1:]): @@ -208,7 +223,9 @@ def cycle_count_conversion_cleanup(event: TraceEvent, _: AbstractContext) -> lis if "args" in event and "ts_all" in event["args"]: event["args"].pop("ts_all") if "args" in event and "jobhash" in event["args"]: - event["args"].pop("jobhash") + jobhash = event["args"].pop("jobhash") + if "jobname" in event["args"]: + event["args"]["jobname"] += f"({jobhash})" return [event] From 07b487395f38e5204619dfdce77210466e8490d7 Mon Sep 17 00:00:00 2001 From: Lars Schneidenbach Date: Tue, 25 Nov 2025 19:23:51 -0500 Subject: [PATCH 2/3] 32bit cycle epoch: keep track of earliest ts before applying overflow count Signed-off-by: Lars Schneidenbach --- src/aiu_trace_analyzer/core/acelyzer.py | 3 +- src/aiu_trace_analyzer/pipeline/normalize.py | 40 +++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/src/aiu_trace_analyzer/core/acelyzer.py b/src/aiu_trace_analyzer/core/acelyzer.py index eed6555..48f4973 100644 --- a/src/aiu_trace_analyzer/core/acelyzer.py +++ b/src/aiu_trace_analyzer/core/acelyzer.py @@ -412,7 +412,8 @@ def register_processing_functions(self, process.register_stage(callback=event_pipe.normalize_phase1, context=normalize_ctx) if args.flex_ts_fix: process.register_stage(callback=event_pipe.frequency_align_collect, context=frequency_align_ctx) - process.register_stage(callback=event_pipe.pipeline_barrier, context=event_pipe._main_barrier_context) + process.register_stage(callback=event_pipe.pipeline_barrier, context=event_pipe._main_barrier_context) + if args.flex_ts_fix: process.register_stage(callback=event_pipe.frequency_align_apply, context=frequency_align_ctx) process.register_stage(callback=event_pipe.normalize_phase2, context=normalize_ctx) diff --git a/src/aiu_trace_analyzer/pipeline/normalize.py b/src/aiu_trace_analyzer/pipeline/normalize.py index 3ccaf0a..4e12369 100644 --- a/src/aiu_trace_analyzer/pipeline/normalize.py +++ b/src/aiu_trace_analyzer/pipeline/normalize.py @@ -71,6 +71,14 @@ def __init__(self, soc_frequency: float, ignore_crit: bool = False, text="OVC: local_correction fix has missed a spot in TS-sequence of {d[count]} event(s).", data={"count": 0}, is_error=True + ), + TraceWarning( + name="epoch_start", + text="OVC: reference epoch had to be updated {d[count]} times with newly detected earlier timestamps. " + "This indicates events with 0.0-timestamps or out-of-order input events. " + "(pid={d[pid]}, job={d[job]}, start={d[epoch_start]})", + data={"count": 0, "pid": set([]), "job": set([]), "epoch_start": 0.0}, + update_fn={"count": int.__add__, "pid": set.union, "job": set.union, "epoch_start": lambda x, y: min(x,y)} ) ]) self.soc_frequency = soc_frequency @@ -156,7 +164,7 @@ def event_filtered(self, event: TraceEvent) -> bool: return True return False - def get_overflow_count(self, qid, job: str, ts: float, cycle: int) -> tuple[float, float, float]: + def init_reference_overflow(self, qid, job: str, ts: float, cycle: int) -> float: # potential start ts of epoch assuming cycle->wallclk mapping is correct epoch_start = ts - cycle / self.soc_frequency @@ -164,6 +172,29 @@ def get_overflow_count(self, qid, job: str, ts: float, cycle: int) -> tuple[floa if qid not in self.queues: self.queues[qid] = {"0": (epoch_start, ts, cycle)} # store epoch0 for this job aiulog.log(aiulog.INFO, "OVC: Reference Epoch for", qid, job, ts-epoch_start, epoch_start) + + return epoch_start + + def update_reference_overflow(self, qid, job: str, ts: float, cycle: int) -> None: + epoch_start = self.init_reference_overflow(qid, job, ts, cycle) + + # check for potential earlier reference epoch to prevent negative ts/overflow counts + if self.queues[qid]["0"][0] > epoch_start: + self.queues[qid] = {"0": (epoch_start, ts, cycle)} + self.warnings["epoch_start"].update({ + "count": 1, + "pid": [qid], + "job": [job], + "epoch_start": epoch_start + }) + aiulog.log( + aiulog.DEBUG, + "OVC: Detected new earliest Reference Epoch for", qid, job, ts-epoch_start, epoch_start, + "This indicates unstable input timestamps.") + + def get_overflow_count(self, qid, job: str, ts: float, cycle: int) -> tuple[float, float, float]: + epoch_start = self.init_reference_overflow(qid, job, ts, cycle) + # ts distance to reference point time_since_epoch0 = ts - self.queues[qid]["0"][0] @@ -215,6 +246,13 @@ def tsx_32bit_local_correction(self, event: TraceEvent) -> dict: if event["dur"] > self.OVERFLOW_TIME_SPAN_US: self.warnings["long_dur"].update() + qid = self.queue_hash(event) + self.update_reference_overflow( + qid, + str(event["args"]["jobhash"]), + event["ts"], + int(event["args"]["TS1"])) + if "Cmpt Exec" not in event["name"]: return args From c6815a4dba2f247518631c0ae16adc29282ccf8e Mon Sep 17 00:00:00 2001 From: Lars Schneidenbach Date: Mon, 1 Dec 2025 13:19:32 -0500 Subject: [PATCH 3/3] simplified usage of min-function Signed-off-by: Lars Schneidenbach --- src/aiu_trace_analyzer/pipeline/normalize.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/aiu_trace_analyzer/pipeline/normalize.py b/src/aiu_trace_analyzer/pipeline/normalize.py index 4e12369..62aad72 100644 --- a/src/aiu_trace_analyzer/pipeline/normalize.py +++ b/src/aiu_trace_analyzer/pipeline/normalize.py @@ -78,7 +78,11 @@ def __init__(self, soc_frequency: float, ignore_crit: bool = False, "This indicates events with 0.0-timestamps or out-of-order input events. " "(pid={d[pid]}, job={d[job]}, start={d[epoch_start]})", data={"count": 0, "pid": set([]), "job": set([]), "epoch_start": 0.0}, - update_fn={"count": int.__add__, "pid": set.union, "job": set.union, "epoch_start": lambda x, y: min(x,y)} + update_fn={ + "count": int.__add__, + "pid": set.union, + "job": set.union, + "epoch_start": min} ) ]) self.soc_frequency = soc_frequency