Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ build-backend = "poetry_dynamic_versioning.backend"
[tool.poetry.dependencies]
torch = ">=2.3.0"
packaging = "*"
python = ">=3.10"
python = ">=3.10,<4.0"
psutil = ">=6.0.0"
pyyaml = "*"
numpy = ">=1.26"
Expand All @@ -52,7 +52,7 @@ langchain-core = { version = ">=0.3.51,<0.4.0", optional = true }
langchain-openai = { version = ">=0.3.0,<1.0.0", optional = true }
mcp = { version = ">=1.15.0", optional = true }
setproctitle = { version = ">=1.3.0", optional = true }
logsage = { version = ">=0.1.7", optional = true }
logsage = { version = ">=0.1.8", optional = true }
fastapi = { version = ">=0.100.0", optional = true }
uvicorn = { version = ">=0.20.0", extras = ["standard"], optional = true }
pydantic = { version = ">=2.0.0", optional = true }
Expand Down
94 changes: 81 additions & 13 deletions src/nvidia_resiliency_ext/attribution/analyzer/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ async def submit(
self._schedule_progressive_analysis(result.normalized_path, user, job_id)
elif intent == ANALYSIS_INTENT_TERMINAL:
self._schedule_terminal_analysis(result.normalized_path)
elif intent == ANALYSIS_INTENT_TRACK_ONLY and self._log.is_streaming_logs:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the changes to this file is needed.

ANALYSIS_INTENT_PROGRESSIVE : this is the start of a cycle
ANALYSIS_INTENT_TERMINAL : this is the end of a cycle

submit(..., analysis_intent="progressive")
-> self._log.submit(...) # tracks job/submission
-> _schedule_progressive_analysis(...)
-> _start_progressive_analysis(...)
-> self._log.start_progressive_analysis(...)
-> LogAnalyzerRunner.start_progressive_analysis(...)
-> MCP/lib progressive-start adapter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See:
async def run(self, _arguments: Mapping[str, Any]) -> dict[str, str | None]:
"""Return status metadata without running terminal attribution."""
payload = ProgressiveStartResult(
status=PROGRESSIVE_STATUS_UNSUPPORTED,
message="LogSage progressive start API is not configured",
).as_payload()
payload["module"] = MODULE_LOG_ANALYZER_PROGRESSIVE_START
return payload

This is where is invoked after the MCP, currently it is stubbed (returns PROGRESSIVE_STATUS_UNSUPPORTED); and need to be hooked up with logsage start handler

self._schedule_start_analysis(result.normalized_path)

return result

Expand Down Expand Up @@ -375,7 +377,7 @@ async def _start_progressive_analysis(

async def _run_terminal_analysis(self, normalized_path: str) -> None:
"""Run the normal result-gathering path after an explicit terminal signal."""
result = await self.analyze(normalized_path)
result = await self.analyze(normalized_path, job_stage="end")
if isinstance(result, LogAnalyzerError):
logger.warning(
"Terminal analysis failed for %s: %s: %s",
Expand All @@ -388,48 +390,104 @@ async def _run_terminal_analysis(self, normalized_path: str) -> None:

def _schedule_terminal_analysis(self, normalized_path: str) -> None:
"""Start final analysis without delaying POST /logs terminal responses."""
self._schedule_stage_analysis(
normalized_path,
self._run_terminal_analysis,
stage_label="terminal",
)

async def _run_start_analysis(self, normalized_path: str) -> None:
"""Kick off streaming LogSage at job start.

Bypasses the coalescer: ``analyze_logs_rt_start`` is a long-running reader
that accumulates per-path state but produces no cacheable attribution
payload; coalescing here would deadlock the slot until the streamer exits,
preventing the eventual terminal analysis from running.
"""
try:
await self._log.fetch_log_result(normalized_path, job_stage="start")
except Exception as e:
logger.warning(
"Start analysis failed for %s: %s: %s",
normalized_path,
type(e).__name__,
e,
)
else:
logger.debug("Start analysis completed for %s", normalized_path)

def _schedule_start_analysis(self, normalized_path: str) -> None:
"""Kick off streaming LogSage start without delaying POST /logs responses."""
self._schedule_stage_analysis(
normalized_path,
self._run_start_analysis,
stage_label="start",
)

def _schedule_stage_analysis(
self,
normalized_path: str,
coro_factory: Any,
*,
stage_label: str,
) -> None:
"""Schedule a fire-and-forget per-stage analysis task on the running loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
if self._main_loop is None or self._main_loop.is_closed():
logger.error(
"Event loop not set — skipping terminal analysis start for %s",
"Event loop not set — skipping %s analysis start for %s",
stage_label,
normalized_path,
)
return
future = asyncio.run_coroutine_threadsafe(
self._run_terminal_analysis(normalized_path),
coro_factory(normalized_path),
self._main_loop,
)
future.add_done_callback(
lambda done: self._log_terminal_analysis_failure(normalized_path, done)
lambda done: self._log_stage_analysis_failure(
normalized_path, done, stage_label=stage_label
)
)
else:
task = loop.create_task(self._run_terminal_analysis(normalized_path))
task = loop.create_task(coro_factory(normalized_path))
task.add_done_callback(
lambda done: self._log_terminal_analysis_failure(normalized_path, done)
lambda done: self._log_stage_analysis_failure(
normalized_path, done, stage_label=stage_label
)
)
logger.debug("Scheduled terminal analysis start for %s", normalized_path)
logger.debug("Scheduled %s analysis start for %s", stage_label, normalized_path)

@staticmethod
def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None:
def _log_stage_analysis_failure(normalized_path: str, done: Any, *, stage_label: str) -> None:
try:
done.result()
except (asyncio.CancelledError, concurrent.futures.CancelledError):
logger.debug("Terminal analysis task cancelled for %s", normalized_path)
logger.debug(
"%s analysis task cancelled for %s", stage_label.capitalize(), normalized_path
)
except Exception as e:
logger.warning(
"Terminal analysis task failed for %s: %s: %s",
"%s analysis task failed for %s: %s: %s",
stage_label.capitalize(),
normalized_path,
type(e).__name__,
e,
)

@staticmethod
def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None:
Analyzer._log_stage_analysis_failure(normalized_path, done, stage_label="terminal")

async def analyze(
self,
log_path: str,
file: Optional[str] = None,
wl_restart: Optional[int] = None,
*,
job_stage: Optional[str] = None,
) -> LogAnalyzerOutcome:
"""
Analyze a log file using LLM.
Expand Down Expand Up @@ -492,7 +550,10 @@ async def analyze(
f"job.job_id={getattr(job, 'job_id', 'N/A') if job else 'N/A'}"
)
coalesced_raw = await self._coalescer.get_or_compute(
validated, lambda: self._run_llm_analysis(validated, user=user, job_id=job_id)
validated,
lambda: self._run_llm_analysis(
validated, user=user, job_id=job_id, job_stage=job_stage
),
)
bundle = coalesced_from_cache(coalesced_raw)
fr_dump = bundle.fr_dump_path
Expand Down Expand Up @@ -591,10 +652,17 @@ def read_file_preview(
# ─── Internal methods ───

async def _run_llm_analysis(
self, path: str, user: str = "unknown", job_id: Optional[str] = None
self,
path: str,
user: str = "unknown",
job_id: Optional[str] = None,
*,
job_stage: Optional[str] = None,
) -> LogAnalysisCoalesced:
"""On cache miss: delegate to :meth:`LogAnalyzer.run_attribution_for_path`."""
return await self._log.run_attribution_for_path(path, user=user, job_id=job_id)
return await self._log.run_attribution_for_path(
path, user=user, job_id=job_id, job_stage=job_stage
)

async def _analyze_splitlog_mode(
self,
Expand Down
Loading
Loading