Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions server/app/api_routes/routes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from app.models.job import Job, JobStatus
from app.schemas.job import JobListItem, JobListResponse, JobResultsResponse
from app.services.storage.factory import get_storage_service
from app.services.video_summary_service import derive_video_summary
from app.settings import settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -231,21 +230,18 @@ async def get_job(job_id: UUID, db: Session = Depends(get_db)) -> JobResultsResp

# Clean Break: All completed jobs return result_url + summary
# No more inline results for any job type
# v0.16.8: Summary comes from job.summary (pre-computed by worker)

# Load results from storage to derive summary
# Verify results file exists
try:
results_path = job.output_path
file_path = storage.load_file(results_path)
with open(file_path, "r") as f:
results = json.load(f)
storage.load_file(job.output_path)
except FileNotFoundError as err:
raise HTTPException(status_code=404, detail="Results file not found") from err
except json.JSONDecodeError as err:
raise HTTPException(status_code=500, detail="Invalid results file") from err

# Return result_url and summary for all completed jobs
result_url = storage.get_signed_url(job.output_path)
Comment on lines +235 to 242
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid load_file() just to prove the artefact exists.

With the S3 backend, load_file() downloads the whole object to a temp file. Here that path is discarded, so every GET /v1/jobs/{job_id} can now trigger a full download and leak a temp file on the polling path. Please use a lightweight exists/HEAD-style storage call instead, or make the signed-URL step the only storage round-trip here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/app/api_routes/routes/jobs.py` around lines 235 - 242, The code calls
storage.load_file(job.output_path) solely to verify the artifact exists, which
with the S3 backend downloads the full object; replace this with a lightweight
existence/HEAD check (e.g., storage.exists or storage.head) or remove the
existence call and rely on storage.get_signed_url(job.output_path) to surface
errors, and update the exception handling to raise
HTTPException(status_code=404, ...) when the lightweight check indicates missing
artifact (referencing storage.load_file, storage.get_signed_url, and
job.output_path to locate the change).

summary = derive_video_summary(results)
# v0.16.8: Use pre-computed summary from job.summary (set by worker)
summary = json.loads(job.summary) if job.summary else None
Comment on lines +243 to +244
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle malformed job.summary the same way as list_jobs().

This json.loads() is unguarded, so a bad row now turns the detail endpoint into a 500 even though the list endpoint already falls back to summary=None on JSONDecodeError.

🛡️ Small hardening patch
-    summary = json.loads(job.summary) if job.summary else None
+    try:
+        summary = json.loads(job.summary) if job.summary else None
+    except json.JSONDecodeError:
+        summary = None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/app/api_routes/routes/jobs.py` around lines 243 - 244, The detail
endpoint currently does an unguarded json.loads(job.summary) which raises on
malformed JSON; wrap that call (where summary = json.loads(job.summary) if
job.summary else None) in a try/except matching the behavior of list_jobs():
catch json.JSONDecodeError (and optionally TypeError if needed) and set summary
= None on error so a bad row does not raise a 500.

return JobResultsResponse(
job_id=job.job_id,
status=job.status.value,
Expand Down
14 changes: 11 additions & 3 deletions server/app/workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,15 @@ def _finalize_job(self, job_id: str, meta: dict, results: dict) -> None:
if job.job_type in ("video", "video_multi"):
job.progress = 100
# Discussion #354: Pre-compute summary for /v1/jobs hot path
summary_dict = derive_video_summary(output_data)
# v0.16.8: Use plugin-provided summary if available (decoupled from server)
summary_dict = output_data.get("summary")
if not summary_dict:
# Fallback for plugins that don't provide summary
summary_dict = derive_video_summary(output_data)
job.summary = json.dumps(summary_dict)
Comment on lines +679 to 684
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only derive a fallback summary for video jobs.

derive_video_summary() is video-specific, but this block now runs for every completed job. For a legacy image plugin that does not emit summary, or one that deliberately returns {}, we'll persist a bogus {"frame_count": 0, ...} payload into job.summary, and /v1/jobs/{job_id} will expose that as if it were real image metadata.

🔧 Minimal fix
-            summary_dict = output_data.get("summary")
-            if not summary_dict:
-                # Fallback for plugins that don't provide summary
-                summary_dict = derive_video_summary(output_data)
-            job.summary = json.dumps(summary_dict)
+            summary_dict = output_data.get("summary")
+            if summary_dict is None and job.job_type in ("video", "video_multi"):
+                # Fallback for legacy video plugins only
+                summary_dict = derive_video_summary(output_data)
+            job.summary = json.dumps(summary_dict) if summary_dict is not None else None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# v0.16.8: Use plugin-provided summary if available (decoupled from server)
summary_dict = output_data.get("summary")
if not summary_dict:
# Fallback for plugins that don't provide summary
summary_dict = derive_video_summary(output_data)
job.summary = json.dumps(summary_dict)
# v0.16.8: Use plugin-provided summary if available (decoupled from server)
summary_dict = output_data.get("summary")
if summary_dict is None and job.job_type in ("video", "video_multi"):
# Fallback for legacy video plugins only
summary_dict = derive_video_summary(output_data)
job.summary = json.dumps(summary_dict) if summary_dict is not None else None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/app/workers/worker.py` around lines 679 - 684, The current logic calls
derive_video_summary() for every job when output_data lacks a summary; change it
to only run the video-specific fallback for video jobs by guarding the call with
a job-type check (e.g., if job.type == "video" or job.kind == "video" per your
model), so: read summary_dict = output_data.get("summary'); if not summary_dict
and job indicates video then set summary_dict =
derive_video_summary(output_data); otherwise leave summary_dict as-is (including
empty dict) and then set job.summary = json.dumps(summary_dict); ensure you
reference the existing variables/functions summary_dict, output_data,
derive_video_summary, and job.summary when making this change.

db.commit()
send_job_completed(str(job.job_id))
logger.info(f"Job {job.job_id} completed successfully via Ray")
logger.info(f"Job {job_id} completed successfully via Ray")
finally:
db.close()

Expand Down Expand Up @@ -987,7 +991,11 @@ def cb(current_frame: int, total: int = total_frames) -> None:
if job.job_type in ("video", "video_multi"):
job.progress = 100
# Discussion #354: Pre-compute summary for /v1/jobs hot path
summary_dict = derive_video_summary(output_data)
# v0.16.8: Use plugin-provided summary if available (decoupled from server)
summary_dict = output_data.get("summary")
if not summary_dict:
# Fallback for plugins that don't provide summary
summary_dict = derive_video_summary(output_data)
job.summary = json.dumps(summary_dict)
db.commit()

Expand Down
44 changes: 36 additions & 8 deletions server/tests/api/routes/test_jobs_unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ def test_get_job_completed(client, session, storage):
"""Test GET /v1/jobs/{id} for completed job with results.

Clean Break: Completed jobs return result_url, not inline results.
v0.16.8: Summary is pre-computed by worker and stored in job.summary.
"""
# Create a completed job
# Create a completed job with pre-computed summary (set by worker)
job_id = uuid4()
output_path = f"image/output/{job_id}.json"
job = Job(
Expand All @@ -127,6 +128,8 @@ def test_get_job_completed(client, session, storage):
input_path="image/input/test.png",
output_path=output_path,
job_type="image",
# v0.16.8: Pre-computed summary from worker (plugin provides this)
summary=json.dumps({"text_length": 14, "word_count": 2}),
)
session.add(job)
session.commit()
Expand Down Expand Up @@ -180,8 +183,9 @@ def test_get_job_image_type(client, session, storage):
"""Test GET /v1/jobs/{id} for image job.

Clean Break: Image jobs also use result_url for consistency.
v0.16.8: Summary is pre-computed by worker and stored in job.summary.
"""
# Create a completed image job
# Create a completed image job with pre-computed summary
job_id = uuid4()
output_path = f"image/output/{job_id}.json"
job = Job(
Expand All @@ -191,6 +195,8 @@ def test_get_job_image_type(client, session, storage):
input_path="image/input/test.png",
output_path=output_path,
job_type="image",
# v0.16.8: Pre-computed summary from worker
summary=json.dumps({"text_length": 10, "word_count": 2}),
)
session.add(job)
session.commit()
Expand All @@ -215,8 +221,9 @@ def test_get_job_video_type(client, session, storage):
"""Test GET /v1/jobs/{id} for video job.

Issue #350: Video jobs return result_url instead of inline results.
v0.16.8: Summary is pre-computed by worker and stored in job.summary.
"""
# Create a completed video job
# Create a completed video job with pre-computed summary
job_id = uuid4()
output_path = f"video/output/{job_id}.json"
job = Job(
Expand All @@ -226,6 +233,8 @@ def test_get_job_video_type(client, session, storage):
input_path="video/input/test.mp4",
output_path=output_path,
job_type="video",
# v0.16.8: Pre-computed summary from worker
summary=json.dumps({"frame_count": 1, "detection_count": 0, "classes": []}),
)
session.add(job)
session.commit()
Expand Down Expand Up @@ -268,8 +277,13 @@ def test_get_job_results_file_not_found(client, session):


def test_get_job_results_invalid_json(client, session, storage):
"""Test GET /v1/jobs/{id} when results file contains invalid JSON."""
# Create a completed job with invalid JSON results
"""Test GET /v1/jobs/{id} when results file contains invalid JSON.

v0.16.8: API no longer parses the JSON file for summary - it comes from
job.summary (pre-computed by worker). API just checks file existence.
Invalid JSON would have been caught by the worker when saving results.
"""
# Create a completed job with pre-computed summary
job_id = uuid4()
job = Job(
job_id=job_id,
Expand All @@ -278,17 +292,21 @@ def test_get_job_results_invalid_json(client, session, storage):
input_path="image/input/test.png",
output_path="image/output/invalid.json",
job_type="image",
summary=json.dumps({"text_length": 0, "word_count": 0}),
)
session.add(job)
session.commit()

# Create invalid JSON file
# Create a file (even with invalid JSON - API doesn't parse it)
storage.save_file(BytesIO(b"invalid json"), "image/output/invalid.json")

response = client.get(f"/v1/jobs/{job_id}")

assert response.status_code == 500
assert "invalid results file" in response.json()["detail"].lower()
# v0.16.8: API returns 200 because file exists, summary comes from DB
assert response.status_code == 200
data = response.json()
assert data["result_url"] is not None
assert data["summary"] is not None


# Issue #350: Artifact Pattern - video jobs return result_url, not results
Expand All @@ -298,6 +316,7 @@ def test_get_job_video_returns_result_url(client, session, storage):
"""Test GET /v1/jobs/{id} returns result_url for video jobs.

Issue #350: Video jobs should return a URL for lazy loading.
v0.16.8: Summary is pre-computed by worker and stored in job.summary.
"""
job_id = uuid4()
output_path = f"video/output/{job_id}.json"
Expand All @@ -308,6 +327,10 @@ def test_get_job_video_returns_result_url(client, session, storage):
input_path="video/input/test.mp4",
output_path=output_path,
job_type="video",
# v0.16.8: Pre-computed summary from worker
summary=json.dumps(
{"frame_count": 100, "detection_count": 200, "classes": ["player", "ball"]}
),
)
session.add(job)
session.commit()
Expand Down Expand Up @@ -338,6 +361,7 @@ def test_get_job_video_includes_summary(client, session, storage):
"""Test GET /v1/jobs/{id} includes summary for video jobs.

Issue #350: Summary contains derived metadata.
v0.16.8: Summary is pre-computed by worker and stored in job.summary.
"""
job_id = uuid4()
output_path = f"video/output/{job_id}.json"
Expand All @@ -348,6 +372,10 @@ def test_get_job_video_includes_summary(client, session, storage):
input_path="video/input/test.mp4",
output_path=output_path,
job_type="video",
# v0.16.8: Pre-computed summary from worker (matches test assertions)
summary=json.dumps(
{"frame_count": 50, "detection_count": 100, "classes": ["player", "ball"]}
),
)
session.add(job)
session.commit()
Expand Down
15 changes: 11 additions & 4 deletions server/tests/app/workers/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,19 @@ def test_worker_stores_summary_on_completion(test_engine, session):
mock_plugin_service.get_plugin_manifest.return_value = {
"tools": [{"id": "detect", "input_types": ["video"]}]
}
# Return video results with frames and detections
# Return video results with frames, detections, and plugin-computed summary
# Plugin (yolo-tracker) computes summary via compute_video_summary()
mock_plugin_service.run_plugin_tool.return_value = {
"total_frames": 10,
"frames": [
{"frame_idx": i, "detections": [{"class": "person"}, {"class": "car"}]}
for i in range(10)
],
"summary": {
"frame_count": 10,
"detection_count": 20,
"classes": ["person", "car"],
},
}

# Run worker
Expand All @@ -651,11 +657,12 @@ def test_worker_stores_summary_on_completion(test_engine, session):
# Discussion #354: Summary should be stored
assert updated_job.summary is not None, "Worker should compute and store summary"

# Verify summary content
# Verify summary content (from plugin-provided summary)
summary = json.loads(updated_job.summary)
assert "frame_count" in summary
assert "detection_count" in summary
assert "classes" in summary
# 10 frames * 2 detections each = 20 total
# Verify plugin's summary is used (not recomputed)
assert summary["frame_count"] == 10
assert summary["detection_count"] == 20
assert set(summary["classes"]) == {"person", "car"}
assert summary["classes"] == ["person", "car"]
Loading