diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21d1441e..a205e7c9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -115,10 +115,16 @@ jobs: cd web-ui npm run type-check - - name: Run tests + - name: Run tests with coverage run: | cd web-ui - npm test -- --run + npm test -- --run --coverage + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: web-ui-coverage + path: web-ui/coverage # --------------------------------------------------------- # E2E tests (depends on server tests + web-ui) diff --git a/server/app/api_routes/routes/jobs.py b/server/app/api_routes/routes/jobs.py index 93824774..2eecd29f 100644 --- a/server/app/api_routes/routes/jobs.py +++ b/server/app/api_routes/routes/jobs.py @@ -22,7 +22,6 @@ from app.models.job import Job, JobStatus from app.schemas.job import JobListItem, JobListResponse, JobResultsResponse from app.services.storage.factory import get_storage_service -from app.services.video_summary_service import derive_video_summary from app.settings import settings logger = logging.getLogger(__name__) @@ -231,21 +230,24 @@ async def get_job(job_id: UUID, db: Session = Depends(get_db)) -> JobResultsResp # Clean Break: All completed jobs return result_url + summary # No more inline results for any job type + # v0.16.8: Summary comes from job.summary (pre-computed by worker) - # Load results from storage to derive summary + # Verify results file exists try: - results_path = job.output_path - file_path = storage.load_file(results_path) - with open(file_path, "r") as f: - results = json.load(f) + storage.load_file(job.output_path) except FileNotFoundError as err: raise HTTPException(status_code=404, detail="Results file not found") from err - except json.JSONDecodeError as err: - raise HTTPException(status_code=500, detail="Invalid results file") from err # Return result_url and summary for all completed jobs result_url = storage.get_signed_url(job.output_path) - summary = derive_video_summary(results) + # v0.16.8: Use pre-computed summary from job.summary (set by worker) + # v0.16.9: Guard against malformed JSON (same as list_jobs) + summary = None + if job.summary: + try: + summary = json.loads(job.summary) + except json.JSONDecodeError: + logger.warning("Invalid summary JSON for job %s", job_id) return JobResultsResponse( job_id=job.job_id, status=job.status.value, diff --git a/server/app/workers/worker.py b/server/app/workers/worker.py index 9d1758f3..642c4eca 100644 --- a/server/app/workers/worker.py +++ b/server/app/workers/worker.py @@ -676,11 +676,16 @@ def _finalize_job(self, job_id: str, meta: dict, results: dict) -> None: if job.job_type in ("video", "video_multi"): job.progress = 100 # Discussion #354: Pre-compute summary for /v1/jobs hot path - summary_dict = derive_video_summary(output_data) - job.summary = json.dumps(summary_dict) + # v0.16.8: Use plugin-provided summary if available (decoupled from server) + # v0.16.9: Only use video summary fallback for video jobs + summary_dict = output_data.get("summary") + if not summary_dict and job.job_type in ("video", "video_multi"): + # Fallback for video plugins that don't provide summary + summary_dict = derive_video_summary(output_data) + job.summary = json.dumps(summary_dict) if summary_dict is not None else None db.commit() send_job_completed(str(job.job_id)) - logger.info(f"Job {job.job_id} completed successfully via Ray") + logger.info(f"Job {job_id} completed successfully via Ray") finally: db.close() @@ -987,8 +992,13 @@ def cb(current_frame: int, total: int = total_frames) -> None: if job.job_type in ("video", "video_multi"): job.progress = 100 # Discussion #354: Pre-compute summary for /v1/jobs hot path - summary_dict = derive_video_summary(output_data) - job.summary = json.dumps(summary_dict) + # v0.16.8: Use plugin-provided summary if available (decoupled from server) + # v0.16.9: Only use video summary fallback for video jobs + summary_dict = output_data.get("summary") + if not summary_dict and job.job_type in ("video", "video_multi"): + # Fallback for video plugins that don't provide summary + summary_dict = derive_video_summary(output_data) + job.summary = json.dumps(summary_dict) if summary_dict is not None else None db.commit() # Notify WebSocket subscribers diff --git a/server/tests/api/routes/test_jobs_unified.py b/server/tests/api/routes/test_jobs_unified.py index 208f7be7..50482a1a 100644 --- a/server/tests/api/routes/test_jobs_unified.py +++ b/server/tests/api/routes/test_jobs_unified.py @@ -116,8 +116,9 @@ def test_get_job_completed(client, session, storage): """Test GET /v1/jobs/{id} for completed job with results. Clean Break: Completed jobs return result_url, not inline results. + v0.16.8: Summary is pre-computed by worker and stored in job.summary. """ - # Create a completed job + # Create a completed job with pre-computed summary (set by worker) job_id = uuid4() output_path = f"image/output/{job_id}.json" job = Job( @@ -127,6 +128,8 @@ def test_get_job_completed(client, session, storage): input_path="image/input/test.png", output_path=output_path, job_type="image", + # v0.16.8: Pre-computed summary from worker (plugin provides this) + summary=json.dumps({"text_length": 14, "word_count": 2}), ) session.add(job) session.commit() @@ -180,8 +183,9 @@ def test_get_job_image_type(client, session, storage): """Test GET /v1/jobs/{id} for image job. Clean Break: Image jobs also use result_url for consistency. + v0.16.8: Summary is pre-computed by worker and stored in job.summary. """ - # Create a completed image job + # Create a completed image job with pre-computed summary job_id = uuid4() output_path = f"image/output/{job_id}.json" job = Job( @@ -191,6 +195,8 @@ def test_get_job_image_type(client, session, storage): input_path="image/input/test.png", output_path=output_path, job_type="image", + # v0.16.8: Pre-computed summary from worker + summary=json.dumps({"text_length": 10, "word_count": 2}), ) session.add(job) session.commit() @@ -215,8 +221,9 @@ def test_get_job_video_type(client, session, storage): """Test GET /v1/jobs/{id} for video job. Issue #350: Video jobs return result_url instead of inline results. + v0.16.8: Summary is pre-computed by worker and stored in job.summary. """ - # Create a completed video job + # Create a completed video job with pre-computed summary job_id = uuid4() output_path = f"video/output/{job_id}.json" job = Job( @@ -226,6 +233,8 @@ def test_get_job_video_type(client, session, storage): input_path="video/input/test.mp4", output_path=output_path, job_type="video", + # v0.16.8: Pre-computed summary from worker + summary=json.dumps({"frame_count": 1, "detection_count": 0, "classes": []}), ) session.add(job) session.commit() @@ -268,27 +277,37 @@ def test_get_job_results_file_not_found(client, session): def test_get_job_results_invalid_json(client, session, storage): - """Test GET /v1/jobs/{id} when results file contains invalid JSON.""" - # Create a completed job with invalid JSON results + """Test GET /v1/jobs/{id} when results file contains invalid JSON. + + v0.16.8: API no longer parses the JSON file for summary - it comes from + job.summary (pre-computed by worker). API just checks file existence. + Invalid JSON would have been caught by the worker when saving results. + """ + # Create a completed job with pre-computed summary job_id = uuid4() + output_path = f"image/output/{job_id}.json" job = Job( job_id=job_id, status=JobStatus.completed, plugin_id="ocr", input_path="image/input/test.png", - output_path="image/output/invalid.json", + output_path=output_path, job_type="image", + summary=json.dumps({"text_length": 0, "word_count": 0}), ) session.add(job) session.commit() - # Create invalid JSON file - storage.save_file(BytesIO(b"invalid json"), "image/output/invalid.json") + # Create a file (even with invalid JSON - API doesn't parse it) + storage.save_file(BytesIO(b"invalid json"), output_path) response = client.get(f"/v1/jobs/{job_id}") - assert response.status_code == 500 - assert "invalid results file" in response.json()["detail"].lower() + # v0.16.8: API returns 200 because file exists, summary comes from DB + assert response.status_code == 200 + data = response.json() + assert data["result_url"] is not None + assert data["summary"] is not None # Issue #350: Artifact Pattern - video jobs return result_url, not results @@ -298,6 +317,7 @@ def test_get_job_video_returns_result_url(client, session, storage): """Test GET /v1/jobs/{id} returns result_url for video jobs. Issue #350: Video jobs should return a URL for lazy loading. + v0.16.8: Summary is pre-computed by worker and stored in job.summary. """ job_id = uuid4() output_path = f"video/output/{job_id}.json" @@ -308,6 +328,10 @@ def test_get_job_video_returns_result_url(client, session, storage): input_path="video/input/test.mp4", output_path=output_path, job_type="video", + # v0.16.8: Pre-computed summary from worker + summary=json.dumps( + {"frame_count": 100, "detection_count": 200, "classes": ["player", "ball"]} + ), ) session.add(job) session.commit() @@ -338,6 +362,7 @@ def test_get_job_video_includes_summary(client, session, storage): """Test GET /v1/jobs/{id} includes summary for video jobs. Issue #350: Summary contains derived metadata. + v0.16.8: Summary is pre-computed by worker and stored in job.summary. """ job_id = uuid4() output_path = f"video/output/{job_id}.json" @@ -348,6 +373,10 @@ def test_get_job_video_includes_summary(client, session, storage): input_path="video/input/test.mp4", output_path=output_path, job_type="video", + # v0.16.8: Pre-computed summary from worker (matches test assertions) + summary=json.dumps( + {"frame_count": 50, "detection_count": 100, "classes": ["player", "ball"]} + ), ) session.add(job) session.commit() diff --git a/server/tests/app/workers/test_worker.py b/server/tests/app/workers/test_worker.py index 2586322d..7342fc93 100644 --- a/server/tests/app/workers/test_worker.py +++ b/server/tests/app/workers/test_worker.py @@ -629,13 +629,19 @@ def test_worker_stores_summary_on_completion(test_engine, session): mock_plugin_service.get_plugin_manifest.return_value = { "tools": [{"id": "detect", "input_types": ["video"]}] } - # Return video results with frames and detections + # Return video results with frames, detections, and plugin-computed summary + # Plugin (yolo-tracker) computes summary via compute_video_summary() mock_plugin_service.run_plugin_tool.return_value = { "total_frames": 10, "frames": [ {"frame_idx": i, "detections": [{"class": "person"}, {"class": "car"}]} for i in range(10) ], + "summary": { + "frame_count": 10, + "detection_count": 20, + "classes": ["person", "car"], + }, } # Run worker @@ -651,11 +657,12 @@ def test_worker_stores_summary_on_completion(test_engine, session): # Discussion #354: Summary should be stored assert updated_job.summary is not None, "Worker should compute and store summary" - # Verify summary content + # Verify summary content (from plugin-provided summary) summary = json.loads(updated_job.summary) assert "frame_count" in summary assert "detection_count" in summary assert "classes" in summary - # 10 frames * 2 detections each = 20 total + # Verify plugin's summary is used (not recomputed) + assert summary["frame_count"] == 10 assert summary["detection_count"] == 20 - assert set(summary["classes"]) == {"person", "car"} + assert summary["classes"] == ["person", "car"] diff --git a/web-ui/src/App.polling.test.tsx b/web-ui/src/App.polling.test.tsx new file mode 100644 index 00000000..61bcee70 --- /dev/null +++ b/web-ui/src/App.polling.test.tsx @@ -0,0 +1,263 @@ +/** + * Tests for unified job polling in App.tsx + * + * Guards against: + * - Duplicate polling (two pollers racing) + * - Stale state overwrites + * - Polling continuing after terminal state + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { render, screen, fireEvent, act } from "@testing-library/react"; +import App from "./App"; + +// Mock WebSocket +vi.mock("./hooks/useWebSocket", () => ({ + useWebSocket: vi.fn(() => ({ + isConnected: true, + isConnecting: false, + connectionStatus: "connected" as const, + attempt: 0, + error: null, + errorInfo: null, + sendFrame: vi.fn(), + switchPlugin: vi.fn(), + disconnect: vi.fn(), + reconnect: vi.fn(), + latestResult: null, + stats: { framesProcessed: 0, avgProcessingTime: 0 }, + })), +})); + +// Mock API client +const mockGetPlugins = vi.fn(); +const mockGetPluginManifest = vi.fn(); +const mockGetJob = vi.fn(); +const mockListJobs = vi.fn(); + +vi.mock("./api/client", () => ({ + apiClient: { + getPlugins: () => mockGetPlugins(), + getPluginManifest: (id: string) => mockGetPluginManifest(id), + getJob: (id: string) => mockGetJob(id), + listJobs: () => mockListJobs(), + }, +})); + +// Mock JobList to expose onJobSelect callback +vi.mock("./components/JobList", () => ({ + JobList: ({ onJobSelect }: { onJobSelect: (job: unknown) => void }) => ( +
+ + +
+ ), +})); + +describe("App - Unified Job Polling", () => { + let setIntervalSpy: ReturnType; + + const mockPlugin = { + name: "test-plugin", + description: "Test Plugin", + version: "1.0.0", + }; + + const mockManifest = { + id: "test-plugin", + name: "Test Plugin", + version: "1.0.0", + tools: [ + { + id: "detect_objects", + title: "Detect Objects", + capabilities: ["object_detection"], + }, + ], + }; + + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + setIntervalSpy = vi.spyOn(window, "setInterval"); + + mockGetPlugins.mockResolvedValue([mockPlugin]); + mockGetPluginManifest.mockResolvedValue(mockManifest); + mockListJobs.mockResolvedValue([]); + mockGetJob.mockResolvedValue({ + job_id: "job-pending-123", + status: "pending", + created_at: new Date().toISOString(), + }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + }); + + it("should create only ONE polling interval for selectedJob", async () => { + await act(async () => { + render(); + }); + + // Go to Jobs view + const jobsTab = screen.getByRole("button", { name: /^Jobs$/i }); + await act(async () => { + fireEvent.click(jobsTab); + }); + + // Clear any intervals from initial render + setIntervalSpy.mockClear(); + + // Select a pending job + const selectJobButton = screen.getByTestId("select-pending-job"); + await act(async () => { + fireEvent.click(selectJobButton); + }); + + // Advance timers to allow polling to start + await act(async () => { + vi.advanceTimersByTime(100); + }); + + // Should have created exactly ONE interval for polling + const intervalCalls = setIntervalSpy.mock.calls.length; + expect(intervalCalls).toBeLessThanOrEqual(1); + + // Advance time and verify polling happens + await act(async () => { + vi.advanceTimersByTime(1000); + }); + + // getJob should be called once after first poll + expect(mockGetJob).toHaveBeenCalledWith("job-pending-123"); + }); + + it("should NOT poll when job is already completed", async () => { + await act(async () => { + render(); + }); + + // Go to Jobs view + const jobsTab = screen.getByRole("button", { name: /^Jobs$/i }); + await act(async () => { + fireEvent.click(jobsTab); + }); + + // Clear any previous calls from mount + mockGetJob.mockClear(); + + // Select a completed job + const selectJobButton = screen.getByTestId("select-completed-job"); + await act(async () => { + fireEvent.click(selectJobButton); + }); + + // Advance time - should NOT poll for completed job + await act(async () => { + vi.advanceTimersByTime(3000); + }); + + // getJob should NOT be called for completed job + expect(mockGetJob).not.toHaveBeenCalled(); + }); + + it("should NOT create polling interval for uploadResult", async () => { + await act(async () => { + render(); + }); + + // Advance timers to let initial effects settle + await act(async () => { + vi.advanceTimersByTime(100); + }); + + // Record intervals before any job selection + const intervalsBeforeSelect = setIntervalSpy.mock.calls.length; + + // The key assertion: there should be NO dedicated uploadResult poller + // The only poller should be for selectedJob + // This test documents that uploadResult should NOT have its own poller + + // Navigate around to trigger any potential effects + const tabs = ["Stream", "Upload", "Jobs"]; + for (const tabName of tabs) { + const tab = screen.getByRole("button", { name: new RegExp(`^${tabName}$`, "i") }); + await act(async () => { + fireEvent.click(tab); + }); + await act(async () => { + vi.advanceTimersByTime(100); + }); + } + + // The interval count should not grow significantly from view changes + const intervalsAfterNav = setIntervalSpy.mock.calls.length; + expect(intervalsAfterNav - intervalsBeforeSelect).toBeLessThan(5); + }); + + it("should stop polling when job reaches terminal state", async () => { + // This test verifies that polling stops after a job completes + // We mock getJob to return completed on first call, then verify no further calls + + mockGetJob.mockResolvedValue({ + job_id: "job-pending-123", + status: "completed", + created_at: new Date().toISOString(), + }); + + await act(async () => { + render(); + }); + + // Go to Jobs view + const jobsTab = screen.getByRole("button", { name: /^Jobs$/i }); + await act(async () => { + fireEvent.click(jobsTab); + }); + + // Select pending job + const selectJobButton = screen.getByTestId("select-pending-job"); + await act(async () => { + fireEvent.click(selectJobButton); + }); + + // Clear mock calls + mockGetJob.mockClear(); + + // Advance time - should poll once, get "completed", then stop + await act(async () => { + vi.advanceTimersByTime(1000); + }); + + // First poll happens + expect(mockGetJob).toHaveBeenCalledTimes(1); + + // Advance more time - should NOT poll again because status is now "completed" + await act(async () => { + vi.advanceTimersByTime(5000); + }); + + // Should still be 1 (no additional polls) + expect(mockGetJob).toHaveBeenCalledTimes(1); + }); +}); diff --git a/web-ui/src/App.tsx b/web-ui/src/App.tsx index ad962616..625febac 100644 --- a/web-ui/src/App.tsx +++ b/web-ui/src/App.tsx @@ -216,48 +216,60 @@ function App() { }, [selectedPlugin]); // ------------------------------------------------------------------------- - // v0.10.1: Job Polling - Poll selectedJob for progress updates + // v0.10.1: Job Polling - Single authoritative poller for job state // Discussion #234: Stop polling when job reaches completed/failed status + // FIX: Only ONE poller - serialized async polling prevents race conditions // ------------------------------------------------------------------------- useEffect(() => { - if (!selectedJob?.job_id) return; - + const jobId = selectedJob?.job_id; + if (!jobId) return; // Stop polling if job already reached terminal state if (selectedJob?.status === "completed" || selectedJob?.status === "failed") return; - const interval = setInterval(async () => { + let cancelled = false; + let inFlight = false; + let timeoutId: ReturnType | undefined; + + const poll = async () => { + if (cancelled || inFlight) return; + inFlight = true; try { - const job = await apiClient.getJob(selectedJob.job_id); - setSelectedJob(job); + const job = await apiClient.getJob(jobId); + const isTerminal = job.status === "completed" || job.status === "failed"; + if (!cancelled) { + setSelectedJob((current) => + current?.job_id === jobId ? job : current + ); + } + if (!cancelled && !isTerminal) { + timeoutId = window.setTimeout(poll, 1000); + } } catch (err) { - console.error("Job polling failed:", err); + if (!cancelled) { + console.error("Job polling failed:", err); + timeoutId = window.setTimeout(poll, 1000); + } + } finally { + inFlight = false; } - }, 1000); + }; - return () => clearInterval(interval); + timeoutId = window.setTimeout(poll, 1000); + return () => { + cancelled = true; + if (timeoutId !== undefined) window.clearTimeout(timeoutId); + }; }, [selectedJob?.job_id, selectedJob?.status]); // ------------------------------------------------------------------------- - // v0.10.2: Poll uploadResult for progress updates (Upload / Video Upload) - // Discussion #234: Stop polling when job reaches completed/failed status + // v0.10.2: UploadResult Polling (DISABLED) + // Upload jobs are now handled by the main selectedJob poller above + // This eliminates duplicate polling and race conditions // ------------------------------------------------------------------------- useEffect(() => { - if (!uploadResult?.job_id) return; - - // Stop polling if job already reached terminal state - if (uploadResult?.status === "completed" || uploadResult?.status === "failed") return; - - const interval = setInterval(async () => { - try { - const job = await apiClient.getJob(uploadResult.job_id); - setUploadResult(job); - } catch (err) { - console.error("Upload job polling failed:", err); - } - }, 1000); - - return () => clearInterval(interval); - }, [uploadResult?.job_id, uploadResult?.status]); + // No-op: uploadResult no longer has its own polling loop + return; + }, [uploadResult]); // ------------------------------------------------------------------------- // v0.10.1: Unlock tools when job completes or fails