Skip to content

[CC] Attribution inline processing stop repeated failures#341

Open
helisha91 wants to merge 6 commits into
mainfrom
attribution_inline_processing_stop_repeated_failures
Open

[CC] Attribution inline processing stop repeated failures#341
helisha91 wants to merge 6 commits into
mainfrom
attribution_inline_processing_stop_repeated_failures

Conversation

@helisha91

Copy link
Copy Markdown
Contributor

LogSage-0.1.8
Inline log processing + Stateful repeated failures stop job
@namitdhameja I used Claude Code

@helisha91 helisha91 requested a review from namitdhameja May 21, 2026 12:54
@greptile-apps

greptile-apps Bot commented May 21, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds inline streaming log processing to NVRxLogAnalyzer — a start phase that polls a log file incrementally and a terminal end phase that finalizes attribution — plus a stateful repeated-failure guard that issues a STOP after repeated_amount consecutive identical attributions across job cycles.

  • Streaming dispatch: _analyze_logs_rt_dispatch routes to analyze_logs_rt_start (long-running polling loop) or analyze_logs_rt_end (final attribution) based on job_stage; engine.py wires up _schedule_start_analysis as a fire-and-forget task on ANALYSIS_INTENT_TRACK_ONLY signals.
  • Repeated-failure guard: cycle_counter_dict tracks consecutive matching attributions per path; when the count reaches repeated_amount, auto_resume is overridden to STOP - DONT RESTART IMMEDIATE.
  • Test harness: three new scripts drive the writer/analyzer pair over four cycles, explicitly working around several known bugs in the production code path.

Confidence Score: 2/5

The streaming path has multiple runtime-breaking bugs that prevent it from working correctly in production; the feature is not functional as written.

The streaming mode's start phase acquires _log_analysis_lock and then runs a long-running polling loop, holding the lock for its entire duration. The terminal/end analysis, scheduled concurrently, also needs the same lock and will wait indefinitely — the job can never produce a final attribution in streaming mode. On top of this, the four bugs documented in previous review rounds (trailing-comma 1-tuples on all four scalar config attributes, uninitialized job_inline_data_dict entries, blocking time.sleep() inside an async method, and UnboundLocalError for last_attribution_dict_chunk) remain unaddressed. The integration test explicitly patches around each of these, confirming they are present in the production path.

src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py and src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py require the most attention.

Important Files Changed

Filename Overview
src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py Adds inline streaming log analysis (analyze_logs_rt_start / analyze_logs_rt_end / _analyze_logs_rt_dispatch) and repeated-failure STOP logic; contains multiple known runtime-breaking bugs (trailing commas producing 1-tuples, uninitialized job_inline_data_dict, blocking time.sleep in async, UnboundLocalError for last_attribution_dict_chunk) plus a dead attribution_list variable and an inconsistent in membership test for FinishedStatus.
src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py Adds streaming start/end stage support (job_stage forwarding, is_streaming_logs property, _streaming_kwargs); the _log_analysis_lock wrapping analyzer.run() is held for the entire long-running start phase, deadlocking any concurrent end/terminal analysis call.
src/nvidia_resiliency_ext/attribution/analyzer/engine.py Refactors _schedule_terminal_analysis into a generic _schedule_stage_analysis helper, adds _schedule_start_analysis/_run_start_analysis for streaming mode, and threads job_stage through analyze/_run_llm_analysis; the refactor is clean and the logic looks correct at this layer.
src/nvidia_resiliency_ext/attribution/orchestration/config.py Updates default LLM model and base URL, adds is_streaming_logs field to LogSageExecutionConfig; straightforward additions with no issues.
tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py New integration test script for the dispatch path; explicitly documents and patches around four known production bugs (1-tuple config, uninitialized dicts, blocking sleep) to make the test runnable — each workaround marks a defect in the production code path.
pyproject.toml Bumps logsage dependency to >=0.1.8 and adds Python <4.0 upper bound; correct and expected for the new LogSageCycleFields and other imports.

Sequence Diagram

sequenceDiagram
    participant API as POST /logs
    participant Engine as Analyzer
    participant Lock as _log_analysis_lock
    participant StartPhase as analyze_logs_rt_start
    participant EndPhase as analyze_logs_rt_end

    API->>Engine: "TRACK_ONLY intent (streaming=True)"
    Engine->>Engine: _schedule_start_analysis(path)
    Engine-->>StartPhase: fire-and-forget task

    StartPhase->>Lock: acquire lock
    Note over StartPhase,Lock: Lock held for entire polling loop (minutes)

    loop every chunks_per_time minutes
        StartPhase->>StartPhase: read new log lines
        StartPhase->>StartPhase: _retry_return_application_errors_rt()
        StartPhase->>StartPhase: time.sleep() blocks event loop
    end

    API->>Engine: TERMINAL intent
    Engine->>Engine: _schedule_terminal_analysis(path)
    Engine-->>EndPhase: fire-and-forget task

    EndPhase->>Lock: try acquire lock
    Note over EndPhase,Lock: DEADLOCK - StartPhase still holds lock
    EndPhase--xEndPhase: blocked forever - no attribution produced
Loading

Reviews (2): Last reviewed commit: "attribution_inline_processing_stop_repea..." | Re-trigger Greptile

Comment on lines +535 to +538
self.repeated_amount = int(self._init_config.get("repeated_amount", 3)),
self.stop_accumulating_count = int(self._init_config.get("stop_accumulating_count", 3)),
self.logs_minutes_before_job_end = int(self._init_config.get("logs_minutes_before_job_end", 20)),
self.chunks_per_time = int(self._init_config.get("chunks_per_time", 5)),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Trailing commas turn all four scalar config attributes into 1-tuples. self.repeated_amount becomes (3,), self.stop_accumulating_count becomes (3,), etc. Every downstream use then throws TypeError: empty_logs_stop -= 1 on line 628, time.sleep(self.chunks_per_time*60) on line 645, int(self.logs_minutes_before_job_end / self.chunks_per_time) on line 705, and self.cycle_counter_dict[...] == self.repeated_amount - 1 on line 800 all fail at runtime. The integration test even works around this by manually patching these attributes after construction, which confirms the production code path is broken.

Suggested change
self.repeated_amount = int(self._init_config.get("repeated_amount", 3)),
self.stop_accumulating_count = int(self._init_config.get("stop_accumulating_count", 3)),
self.logs_minutes_before_job_end = int(self._init_config.get("logs_minutes_before_job_end", 20)),
self.chunks_per_time = int(self._init_config.get("chunks_per_time", 5)),
self.repeated_amount = int(self._init_config.get("repeated_amount", 3))
self.stop_accumulating_count = int(self._init_config.get("stop_accumulating_count", 3))
self.logs_minutes_before_job_end = int(self._init_config.get("logs_minutes_before_job_end", 20))
self.chunks_per_time = int(self._init_config.get("chunks_per_time", 5))

llm, app_data, True)
attribution_list.append(attribution_raw_chunk)

self.job_inline_data_dict[path].append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 self.job_inline_data_dict[path] is never initialized before analyze_logs_rt_start appends to it, so the first call raises KeyError. The test script explicitly documents and works around this by calling analyzer.job_inline_data_dict.setdefault(path, []) before dispatching, confirming the production path is broken.

Suggested change
self.job_inline_data_dict[path].append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk))
self.job_inline_data_dict.setdefault(path, []).append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk))


self.job_inline_data_dict[path].append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk))

time.sleep(self.chunks_per_time*60)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 time.sleep() inside an async method blocks the entire asyncio event loop for chunks_per_time * 60 seconds per iteration. While this task is sleeping, no other coroutine on the loop (including the eventual terminal/end-stage analysis) can make progress. This should be await asyncio.sleep(self.chunks_per_time * 60) to yield control cooperatively.

Comment on lines +751 to +779
if last_with_errors.application_errors_list_full:
application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = get_attribution(
llm, last_with_errors, True)
last_attribution_dict_chunk = attribution_dict_chunk
last_attribution_raw_chunk = attribution_raw_chunk
last_application_log_chunk = application_log
last_hw_category_chunk = hw_category_chunk

if last_with_errors is None:
history = self.job_inline_data_dict.get(path, [])
last_with_errors = (history[-1][2] if len(history) >= 1 else None)

last_with_errors.checkpoint_saved = any([item[2].checkpoint_saved for item in self.job_inline_data_dict[path]])

last_with_errors = finished_validation(llm, last_with_errors)

logger.info("error extraction latency: %s", time.time() - s_time)
application_errors_full = [error[0] for error in last_with_errors.application_errors_list_full]

self.temporal_cache_dict.pop(path, None)

if (
len(last_with_errors.application_errors_list_full) == 0
or last_with_errors.finished == FinishedStatus.APPLICATION_DONE
):
attribution_finished = attribution_from_finished_status(last_with_errors, last_with_errors.application_errors_list_unique)
return attribution_finished

attribution_output = last_attribution_dict_chunk["attribution"]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 last_attribution_dict_chunk (and the three sibling last_* variables) are only assigned inside the if last_with_errors.application_errors_list_full: guard on line 751. If finished_validation on line 765 transitions finished to a non-APPLICATION_DONE terminal state while leaving the errors list non-empty, the early-return gate on line 772 is not taken and line 779 hits an UnboundLocalError. Initializing all four variables to None before the if block makes the path safe.

from logsage.auto_resume_policy.attribution_classes import ApplicationData, LRUCache
from logsage.auto_resume_policy.error_attribution import get_proposed_solution_cat
from logsage.auto_resume_policy.error_extraction import return_application_errors
from logsage.auto_resume_policy.attribution_classes import *

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Wildcard star-import pulls every name from attribution_classes into this module's namespace. Listing the used symbols (FinishedStatus, AutoResumeAction, Attribution, ErrorAttribution, ApplicationData, LRUCache) explicitly prevents accidental name shadowing and is consistent with how other logsage symbols are imported in this file.

Suggested change
from logsage.auto_resume_policy.attribution_classes import *
from logsage.auto_resume_policy.attribution_classes import (
ApplicationData,
Attribution,
AutoResumeAction,
ErrorAttribution,
FinishedStatus,
LRUCache,
)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +759 to +761
if last_with_errors is None:
history = self.job_inline_data_dict.get(path, [])
last_with_errors = (history[-1][2] if len(history) >= 1 else None)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 last_with_errors is assigned from chunk_data on line 713, which is always the return value of _retry_return_application_errors_rt. That function always returns an ApplicationData instance (never None), so the if last_with_errors is None: branch can never execute and should be removed.

Comment on lines 182 to 184
async with self._log_analysis_lock:
result = await analyzer.run(run_kwargs)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _log_analysis_lock deadlocks the end/terminal analysis in streaming mode

When streaming is enabled, _run_start_analysis calls fetch_log_result(path, job_stage="start"), which reaches _fetch_log_result_lib and acquires _log_analysis_lock before awaiting analyzer.run(run_kwargs). The run() call dispatches to analyze_logs_rt_start, a long-running polling loop; the lock is therefore held for the entire duration of the start phase (minutes or longer). When the terminal/end analysis fires concurrently and also calls _fetch_log_result_lib, it tries to acquire the same lock and blocks indefinitely — neither coroutine can make progress. The docstring on _run_start_analysis correctly identifies that the coalescer must be bypassed, but the underlying _log_analysis_lock wrapping analyzer.run() was not addressed, so the deadlock remains on the streaming path.

self._schedule_progressive_analysis(result.normalized_path, user, job_id)
elif intent == ANALYSIS_INTENT_TERMINAL:
self._schedule_terminal_analysis(result.normalized_path)
elif intent == ANALYSIS_INTENT_TRACK_ONLY and self._log.is_streaming_logs:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the changes to this file is needed.

ANALYSIS_INTENT_PROGRESSIVE : this is the start of a cycle
ANALYSIS_INTENT_TERMINAL : this is the end of a cycle

submit(..., analysis_intent="progressive")
-> self._log.submit(...) # tracks job/submission
-> _schedule_progressive_analysis(...)
-> _start_progressive_analysis(...)
-> self._log.start_progressive_analysis(...)
-> LogAnalyzerRunner.start_progressive_analysis(...)
-> MCP/lib progressive-start adapter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See:
async def run(self, _arguments: Mapping[str, Any]) -> dict[str, str | None]:
"""Return status metadata without running terminal attribution."""
payload = ProgressiveStartResult(
status=PROGRESSIVE_STATUS_UNSUPPORTED,
message="LogSage progressive start API is not configured",
).as_payload()
payload["module"] = MODULE_LOG_ANALYZER_PROGRESSIVE_START
return payload

This is where is invoked after the MCP, currently it is stubbed (returns PROGRESSIVE_STATUS_UNSUPPORTED); and need to be hooked up with logsage start handler

DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/nemotron-3-super-120b-a12b")
DEFAULT_LLM_BASE_URL = os.environ.get("NVRX_LLM_BASE_URL", "https://integrate.api.nvidia.com/v1")
DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/qwen/qwen-235b")
DEFAULT_LLM_BASE_URL = os.environ.get("NVRX_LLM_BASE_URL", "https://inference-api.nvidia.com/v1")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are nvidia internal and not usable from a public repo.

# Internal NVIDIA users can override to inference.nvidia.com via NVRX_LLM_BASE_URL.
DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/nemotron-3-super-120b-a12b")
DEFAULT_LLM_BASE_URL = os.environ.get("NVRX_LLM_BASE_URL", "https://integrate.api.nvidia.com/v1")
DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/qwen/qwen-235b")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public repo should still have default as nvidia model; for deployment this is overriden.

return self._lib_log_analyzer

async def _fetch_log_result_lib(self, path: str) -> Dict[str, Any]:
def _streaming_kwargs(self) -> Dict[str, Any]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed; the request already has:
ANALYSIS_INTENT_PROGRESSIVE / ANALYSIS_INTENT_TERMINAL (for FT)
OR
only ANALYSIS_INTENT_TERMINAL (for service)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namitdhameja I couldn’t find it. In which PR does it exist?

)
]

async def analyze_logs_rt_start(self) -> list[ApplicationData]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the changes here. But if analyze_logs_rt_start is the start of log cycle then can tie it with ProgressiveLogAnalysisStartTool:run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants