Skip to content

feat(asr): add SSE streaming endpoint with lead-budget backpressure#78

Draft
jedzill4 wants to merge 17 commits intorelease/v2.0.0from
feat/asr-sse-impl
Draft

feat(asr): add SSE streaming endpoint with lead-budget backpressure#78
jedzill4 wants to merge 17 commits intorelease/v2.0.0from
feat/asr-sse-impl

Conversation

@jedzill4
Copy link
Copy Markdown
Contributor

@jedzill4 jedzill4 commented Apr 21, 2026

Summary

  • Add POST /asr/transcribe/stream — an SSE endpoint that streams intermediate transcription snapshots as event: transcription frames, followed by a final event: done frame with the persisted result
  • Lead-budget backpressure pacer (_Backpressure) prevents 1011 "keepalive ping timeout" closes by capping how far ahead of the server's progress the client sends
  • Unified audio decode — single librosa.load call produces both the audio array and duration (_DecodedAudio), eliminating redundant decode in the streaming path
  • ASRStreamChunk (Pydantic BaseModel) carries paragraphs, current_time, and total_time per update
  • Offload librosa.load to worker thread (asyncio.to_thread) to avoid blocking the event loop
  • Backpressure applied consistently to both streaming and non-streaming paths
  • Mark existing POST /asr/transcribe as deprecated=True in OpenAPI

Stress Test Results (15:37 Spanish audio, model=tiny, GPU)

N concurrent Clients completed 1011 failures Wall-clock Peak server lag
1 1/1 0 938s (15:38)
2 2/2 0 971s (16:11) ~23s
3 2/3 1 (at 91%) 977s (16:17) 68s
4 0/4 4 (at ~45%) 525s (early fail) 99s

Pacer validated for N<=2. N=3+ failures are server-side (WhisperLiveKit's internal ASR_WS_PING_TIMEOUT=10s fires when GPU compute stalls its event loop), not a client-side pacing issue. Recommended: add a concurrency semaphore (limit 2) and/or increase WhisperLiveKit's ping timeout to 60-120s.

Settings

Setting Default Purpose
TRANSCRIBE_WS_LEAD_BUDGET_SECONDS 20.0 Max seconds of audio ahead of server progress
TRANSCRIBE_WS_LEAD_INITIAL_GRACE_SECONDS 20.0 Wall-clock headroom before pacing kicks in
TRANSCRIBE_WS_PING_INTERVAL_SECONDS 20 Client-side WS ping interval
TRANSCRIBE_WS_PING_TIMEOUT_SECONDS 60 Client-side WS ping timeout
TRANSCRIBE_SSE_KEEPALIVE_SECONDS 15 SSE keepalive comment interval

Behaviour

  • Cache hit: emits a single event: done immediately
  • Cache miss: streams event: transcription per upstream snapshot → persists final result → emits event: done
  • Upstream failure (RuntimeError): emits event: error with code: UPSTREAM_SERVICE_ERROR; no DB write

Code Quality

  • All docstrings use Google-style format (consistent with codebase)
  • from __future__ import annotations in asr_client.py
  • %-format logging throughout (no f-string in logger calls)
  • No dead code (transcribe_audio_path removed)
  • No RST cross-references in docstrings
  • Error propagation consistent across streaming and non-streaming paths

Test Plan

  • 15 ASR-specific tests passing (pytest test/audio/test_asr_client_stream.py test/api/endpoints/routers/asr/test_transcribe.py)
  • Tests cover: streaming happy path with DB persistence, cache-hit path, upstream error path, ASRStreamChunk API, generator unit tests, OpenAPI deprecation flag, SSE formatters
  • Concurrent stress test validated N=1 and N=2 clean completion with real GPU inference

Commits on this branch (ASR-specific)

  1. bc57c25 feat(asr): add TRANSCRIBE_SSE_KEEPALIVE_SECONDS setting
  2. 7e556d0 refactor(asr): extract lines_to_paragraphs helper
  3. fcdfbc6 feat(asr): add transcribe_audio_bytes_stream async generator
  4. 0f1b1fa feat(asr): add SSE event formatting helpers
  5. f0c6f32 feat(asr): add POST /asr/transcribe/stream SSE endpoint
  6. 235f3c8 test(asr): verify SSE stream emits only done event on cache hit
  7. 04f1e01 test(asr): verify deprecated flag on POST /asr/transcribe
  8. 4aec6af test(asr): verify SSE stream emits error event on upstream failure
  9. dd78a17 feat(asr): add lead-budget backpressure pacer to prevent 1011 keepalive timeouts
  10. 78e6174 refactor(asr): clean up dead code, unify decode, improve consistency
  11. a2a342b fix(asr): preserve RuntimeError messages and normalize docstrings

@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented Apr 21, 2026

Reviewer's Guide

Implements a new SSE-based streaming transcription endpoint backed by a WebSocket-driven async generator, refactors shared ASR paragraph mapping logic, wires in keepalive and error handling, and adds tests plus configuration to support async streaming behaviour while marking the legacy endpoint as deprecated.

Class diagram for updated ASR client and transcribe router helpers

classDiagram
    class ASRClientModule {
        <<module>>
        +lines_to_paragraphs(lines list_WLKMessageTranscriptionLine) list_ASRParagraph
        +transcribe_audio_bytes_stream(payload bytes) AsyncGenerator_list_ASRParagraph_None
        +_stream_and_signal_end(payload bytes, websocket websockets_ClientConnection) None
        +_stream_audio_bytes(payload bytes, websocket websockets_ClientConnection) int
        +transcribe_audio_bytes(payload bytes) WLKMessageStatus_or_None
    }

    class TranscribeRouterModule {
        <<module>>
        +_format_transcription_event(document_id UUID, paragraphs list_ASRParagraph) str
        +_format_done_event(document_id UUID, paragraphs list_ASRParagraph) str
        +_format_error_event(detail str, code str) str
        +transcribe(file UploadFile, use_cache bool, ws_uri str, session Session) ASRDocument
        +transcribe_stream(file UploadFile, use_cache bool, ws_uri str, session Session) StreamingResponse
    }

    class ASRParagraph {
        +speaker_no int
        +start float
        +end float
        +text str
    }

    class WLKMessageTranscriptionLine {
        +speaker int
        +start float
        +end float
        +text str
    }

    class DatabaseAPI {
        +audio_transcription_get(transcription_id UUID, session Session) AudioTranscription_or_None
        +audio_transcription_create_or_update(transcription_id UUID, name str, transcription list_ASRParagraph, session Session) None
    }

    class Settings {
        +TRANSCRIBE_WS_URI str_or_None
        +TRANSCRIBE_SSE_KEEPALIVE_SECONDS int
    }

    ASRClientModule ..> WLKMessageTranscriptionLine : uses
    ASRClientModule ..> ASRParagraph : maps_to
    ASRClientModule ..> Settings : reads

    TranscribeRouterModule ..> ASRClientModule : calls_transcribe_audio_bytes_stream
    TranscribeRouterModule ..> DatabaseAPI : uses
    TranscribeRouterModule ..> ASRParagraph : serializes
    TranscribeRouterModule ..> Settings : reads_keepalive
Loading

Flow diagram for cache_hit_vs_streaming_and_error_paths

flowchart TD
    A["Start POST /asr/transcribe/stream"] --> B["Read file bytes and compute document_id"]
    B --> C{"use_cache is True?"}

    C -- No --> F["Initialize keepalive (if interval > 0)"]
    C -- Yes --> D["audio_transcription_get(document_id)"]
    D --> E{"Cached transcription found?"}
    E -- Yes --> E1["Yield SSE done event with cached ASRDocument"]
    E1 --> Z["End"]
    E -- No --> F

    F --> G["Call transcribe_audio_bytes_stream(payload)"]
    G --> H["Loop: wait for next snapshot or keepalive"]

    H --> I{"Next snapshot received?"}
    I -- Yes --> J["Update last_snapshot and yield SSE transcription event"]
    J --> H

    I -- No (keepalive) --> K["Yield keepalive comment frame"]
    K --> H

    H --> L{"Stream finished?"}
    L -- No --> H
    L -- Yes --> M["Persist last_snapshot via audio_transcription_create_or_update"]
    M --> N["Yield SSE done event with final ASRDocument"]
    N --> Z

    H --> X{"RuntimeError from upstream?"}
    X -- Yes --> Y["Yield SSE error event (code UPSTREAM_SERVICE_ERROR)\nSkip DB write"]
    Y --> Z
Loading

File-Level Changes

Change Details Files
Introduce async WebSocket-backed streaming transcription generator and helper for mapping WS lines to ASRParagraphs.
  • Add lines_to_paragraphs helper to convert WLKMessageTranscriptionLine objects into ASRParagraph instances.
  • Implement transcribe_audio_bytes_stream async generator that connects to the upstream WebSocket, yields paragraph snapshots per active_transcription status, handles ready_to_stop and connection closure, and cleans up a background streaming task on cancellation or errors.
  • Add _stream_and_signal_end helper to stream audio bytes then send an empty end-of-stream marker over the WebSocket, with logging and error handling improvements.
aymurai/audio/asr_client.py
test/audio/test_asr_client_stream.py
Add SSE streaming endpoint for ASR transcription with cache integration, keepalive, and structured event formatting.
  • Introduce _format_transcription_event, _format_done_event, and _format_error_event helpers that render ASRDocument payloads and error payloads into SSE wire format frames.
  • Add POST /asr/transcribe/stream endpoint that reads uploaded audio, performs cache lookup and early-return on hits, otherwise streams snapshots from transcribe_audio_bytes_stream as transcription events, persists the final snapshot, and emits a done event.
  • Implement SSE keepalive mechanism using an asyncio task and queue to periodically emit comment frames, coordinated with upstream snapshots via asyncio.wait, including robust cleanup of keepalive tasks and the upstream async iterator.
  • Ensure DB write only occurs after successful streaming, and skip persistence when an upstream RuntimeError or unexpected exception occurs, instead emitting an error SSE event with a structured payload.
aymurai/api/endpoints/routers/asr/transcribe.py
test/api/endpoints/routers/asr/test_transcribe.py
Deprecate the existing non-streaming ASR endpoint and wire project configuration for async tests and SSE keepalive interval.
  • Mark the existing POST /asr/transcribe endpoint as deprecated=True in the OpenAPI schema while retaining its implementation.
  • Add TRANSCRIBE_SSE_KEEPALIVE_SECONDS setting with a default of 15 seconds to control SSE keepalive interval.
  • Update test dependencies to include pytest-asyncio and configure pytest asyncio_mode=auto to support async generator tests.
aymurai/api/endpoints/routers/asr/transcribe.py
aymurai/settings.py
pyproject.toml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues, and left some high level feedback:

  • In transcribe_stream, the ws_uri: str = Depends(get_transcribe_ws_uri) dependency is never used because transcribe_audio_bytes_stream reads from settings.TRANSCRIBE_WS_URI directly; either pass ws_uri into the streaming client or remove the dependency to avoid confusion and dead code.
  • The new lines_to_paragraphs helper omits the speaker_id field that was previously set in _transcribe_audio_bytes_with_error_handling; if consumers rely on this field, consider preserving or explicitly deprecating it to avoid subtle behavioral changes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `transcribe_stream`, the `ws_uri: str = Depends(get_transcribe_ws_uri)` dependency is never used because `transcribe_audio_bytes_stream` reads from `settings.TRANSCRIBE_WS_URI` directly; either pass `ws_uri` into the streaming client or remove the dependency to avoid confusion and dead code.
- The new `lines_to_paragraphs` helper omits the `speaker_id` field that was previously set in `_transcribe_audio_bytes_with_error_handling`; if consumers rely on this field, consider preserving or explicitly deprecating it to avoid subtle behavioral changes.

## Individual Comments

### Comment 1
<location path="aymurai/audio/asr_client.py" line_range="44-51" />
<code_context>
     if not status:
         raise AymuraiAPIException(detail="No transcription result received")

-    return [
-        ASRParagraph(
-            speaker_no=line.speaker,
</code_context>
<issue_to_address>
**issue (bug_risk):** lines_to_paragraphs drops the speaker_id field that the previous mapping included

The old implementation populated both speaker_no and a derived speaker_id (e.g., f"speaker-{line.speaker}"). The new helper only sets speaker_no, start, end, and text, which changes the response shape. If any callers rely on speaker_id (e.g., for labeling or grouping), this could break them. If that field is still required, please add it back in lines_to_paragraphs or generate it consistently elsewhere.
</issue_to_address>

### Comment 2
<location path="aymurai/api/endpoints/routers/asr/transcribe.py" line_range="195-197" />
<code_context>
+            )
+            if cached is not None:
+                logger.debug(f"Audio transcription DB hit for {filename}")
+                cached_paragraphs = [
+                    ASRParagraph.model_validate(p)
+                    for p in (cached.validation or cached.transcription)
+                ]
+                yield _format_done_event(document_id, cached_paragraphs)
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential crash if both cached.validation and cached.transcription are None

In the cache-hit branch, `(cached.validation or cached.transcription)` can evaluate to `None` if both are unset/false-y, causing a `TypeError` when iterated. If this state is possible (e.g., partially written rows), add a guard (e.g., default to `[]` or validate that at least one is populated) and handle the invalid cache state (skip cache or emit an error SSE).
</issue_to_address>

### Comment 3
<location path="aymurai/api/endpoints/routers/asr/transcribe.py" line_range="215-218" />
<code_context>
+        if interval > 0:
+            keepalive_task = asyncio.create_task(_keepalive_pump())
+
+        last_snapshot: list[ASRParagraph] = []
+        stream_iter = transcribe_audio_bytes_stream(data).__aiter__()
+
+        try:
</code_context>
<issue_to_address>
**suggestion:** Iterator creation errors from transcribe_audio_bytes_stream bypass SSE-style error reporting

If `transcribe_audio_bytes_stream` fails during iterator creation (e.g., bad `TRANSCRIBE_WS_URI`), the exception escapes before the `try/finally`, so the client gets a plain 500 instead of an SSE error event. To keep SSE behavior consistent, wrap `stream_iter` construction in a `try/except` that yields an error via `_format_error_event` and then returns, mirroring the error handling used inside the loop.

```suggestion
        last_snapshot: list[ASRParagraph] = []
        try:
            stream_iter = transcribe_audio_bytes_stream(data).__aiter__()
        except Exception as exc:
            formatted_error = _format_error_event(exc)
            if formatted_error is not None:
                yield formatted_error
            return

        try:
```
</issue_to_address>

### Comment 4
<location path="test/api/endpoints/routers/asr/test_transcribe.py" line_range="344-272" />
<code_context>
+def test_should_emit_error_event_when_upstream_fails_mid_stream(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case for the generic INTERNAL_ERROR SSE branch when an unexpected exception is raised during streaming.

This test only covers the `RuntimeError``UPSTREAM_SERVICE_ERROR` path. The generic `except Exception: ... code="INTERNAL_ERROR"` branch remains untested. Please add a similar test where the patched `transcribe_audio_bytes_stream` raises a non-`RuntimeError` (e.g. `ValueError("boom")`), and assert that the final SSE is `event: error` with `code: "INTERNAL_ERROR"`, and that no `AudioTranscription` record is written to the DB.

Suggested implementation:

```python
def test_should_emit_error_event_when_upstream_fails_mid_stream(
    asr_test_client,
    make_wav_bytes,
):
    client, engine = asr_test_client
    audio_bytes = make_wav_bytes(freq_hz=550)
    document_id = data_to_uuid(audio_bytes)

    async def failing_generator(_payload):
        yield [
            ASRParagraph(
    # existing test body continues here ...


def test_should_emit_internal_error_event_when_unexpected_exception_mid_stream(
    asr_test_client,
    make_wav_bytes,
    session,
):
    """
    Ensure that a non-RuntimeError raised during streaming results in an
    INTERNAL_ERROR SSE and that no AudioTranscription row is persisted.
    """
    from app.models import AudioTranscription  # adjust import if already present elsewhere

    client, engine = asr_test_client
    audio_bytes = make_wav_bytes(freq_hz=550)
    document_id = data_to_uuid(audio_bytes)

    async def failing_generator(_payload):
        # Simulate an unexpected exception in the upstream stream
        yield [
            ASRParagraph(
                text="partial text before failure",
                start=0.0,
                end=1.0,
                speaker=None,
            )
        ]
        raise ValueError("boom")

    # Patch the engine to use the failing generator
    engine.transcribe_audio_bytes_stream = failing_generator

    headers = {
        "content-type": "audio/wav",
        "x-document-id": str(document_id),
    }

    # Consume the SSE stream
    with client.stream(
        "POST",
        "/asr/transcribe/stream",
        headers=headers,
        content=audio_bytes,
    ) as response:
        assert response.status_code == 200

        # Collect raw SSE events
        raw_events = []
        current_event_lines = []

        for line in response.iter_lines():
            if not line:
                # End of one SSE event
                if current_event_lines:
                    raw_events.append("\n".join(current_event_lines))
                    current_event_lines = []
                continue
            # Decode bytes -> str if necessary
            if isinstance(line, bytes):
                line = line.decode("utf-8")
            current_event_lines.append(line)

        if current_event_lines:
            raw_events.append("\n".join(current_event_lines))

    # The last event should be an error with INTERNAL_ERROR code
    assert raw_events, "Expected at least one SSE event"
    last_event = raw_events[-1]
    assert last_event.startswith("event: error")

    # Find the data line and parse as JSON
    data_line = next(
        (l for l in last_event.splitlines() if l.startswith("data: ")),
        None,
    )
    assert data_line is not None, f"No data line in SSE event: {last_event}"
    import json

    payload = json.loads(data_line.removeprefix("data: ").strip())
    assert payload.get("code") == "INTERNAL_ERROR"

    # Ensure no AudioTranscription was persisted for this document
    transcription = (
        session.query(AudioTranscription)
        .filter(AudioTranscription.document_id == document_id)
        .one_or_none()
    )
    assert transcription is None

```

1. Ensure that a `session` (SQLAlchemy session) fixture is available in this test module; if it has a different name (e.g. `db_session`), update the test signature and usage accordingly.
2. If `AudioTranscription` is already imported at the top of the file, remove the local import inside the test to avoid duplication.
3. If there is already a helper to parse SSE events in this test module (e.g. `parse_sse_events(response)`), replace the manual SSE parsing block with that helper to match existing conventions.
4. Verify that the SSE error payload shape matches your implementation (key names like `code`, `message`, etc.). Adjust the `payload.get("code")` assertion or additional assertions to align with the actual error payload structure.
5. If the endpoint path or headers differ in this project (e.g. using `"/v1/asr/transcribe/stream"` or different document-id header), update the URL and header keys to match the existing tests in this file.
</issue_to_address>

### Comment 5
<location path="test/api/endpoints/routers/asr/test_transcribe.py" line_range="191-199" />
<code_context>
+
+
+# MARK: POST Transcribe Stream
+def _parse_sse_events(body: str) -> list[tuple[str, str]]:
+    """Parse SSE wire format into (event_name, data) tuples. Ignores comments."""
+    events: list[tuple[str, str]] = []
+    for chunk in body.split("\n\n"):
+        chunk = chunk.strip()
+        if not chunk or chunk.startswith(":"):
+            continue
+        event_name = "message"
+        data_lines: list[str] = []
+        for line in chunk.split("\n"):
+            if line.startswith("event:"):
+                event_name = line[len("event:") :].strip()
+            elif line.startswith("data:"):
+                data_lines.append(line[len("data:") :].strip())
+        events.append((event_name, "\n".join(data_lines)))
+    return events
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding an explicit test to verify that keepalive comments are actually sent and are ignored by the SSE parser.

The parser correctly ignores comment lines (e.g. `: keepalive`), but there’s no test confirming that keepalives are actually emitted when `TRANSCRIBE_SSE_KEEPALIVE_SECONDS > 0`. To cover this, you could add a test that sets `TRANSCRIBE_SSE_KEEPALIVE_SECONDS` to a very small value, patches `transcribe_audio_bytes_stream` to yield one snapshot and then block long enough for a keepalive to be sent, and asserts that `": keepalive"` appears in `response.text` while `_parse_sse_events` still only returns the expected transcription/done events.

```suggestion
def test_should_format_error_event_with_code_and_detail():
    frame = _format_error_event(detail="boom", code="UPSTREAM_SERVICE_ERROR")

    assert frame.startswith("event: error\n")
    assert '"code": "UPSTREAM_SERVICE_ERROR"' in frame
    assert '"detail": "boom"' in frame


def test_parse_sse_events_ignores_keepalive_comments():
    """Ensure keepalive comment frames are present in the raw stream but ignored by the parser."""
    # Simulate a stream with interleaved keepalive comments and 2 real events
    raw_sse = (
        ": keepalive\n\n"
        "event: transcription\n"
        'data: {"paragraph_index": 0, "text": "hola"}\n\n'
        ": keepalive\n\n"
        "event: done\n"
        "data: {}\n\n"
        ": keepalive\n\n"
    )

    # Sanity check: the raw payload must contain keepalive comments
    assert ": keepalive" in raw_sse

    events = _parse_sse_events(raw_sse)

    # Only the non-comment events should be returned by the parser
    assert events == [
        ("transcription", '{"paragraph_index": 0, "text": "hola"}'),
        ("done", "{}"),
    ]


# MARK: POST Transcribe Stream
```
</issue_to_address>

### Comment 6
<location path="test/audio/test_asr_client_stream.py" line_range="100-109" />
<code_context>
+@pytest.mark.asyncio
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test to cover websocket receive failures and ensure they surface as RuntimeError from transcribe_audio_bytes_stream.

Right now we only exercise the happy path (snapshots + `ready_to_stop`), but the generator’s contract around `websocket.recv()` failures isn’t tested. Please add a test that uses a FakeWS whose `recv()` raises `websockets.exceptions.WebSocketException`, and, with the existing patches to `websockets.connect`, `_stream_audio_bytes`, and `settings.TRANSCRIBE_WS_URI`, assert that iterating `transcribe_audio_bytes_stream` raises `RuntimeError("Transcription service websocket error")`. This will verify the behaviour that the SSE endpoint tests depend on.

Suggested implementation:

```python

```

Please add the following test function to `test/audio/test_asr_client_stream.py` (for example, immediately after `test_should_yield_paragraphs_per_status_when_streaming`), and adjust any module paths to match your project:

```python
@pytest.mark.asyncio
async def test_transcribe_audio_bytes_stream_raises_runtime_error_on_ws_recv_failure(
    mocker, settings
):
    # Fake websocket whose recv() always fails
    failing_ws = FakeWS(incoming_messages=[])
    async def _failing_recv():
        raise websockets.exceptions.WebSocketException("boom")
    failing_ws.recv = _failing_recv

    # Patch websockets.connect to return our failing websocket
    mocker.patch("audio.asr_client_stream.websockets.connect", return_value=failing_ws)

    # Patch the websocket URI to avoid real network usage
    settings.TRANSCRIBE_WS_URI = "wss://example.test/transcribe"

    # Patch _stream_audio_bytes to yield some dummy audio so the client
    # attempts to call websocket.recv()
    async def _dummy_stream_audio_bytes(*args, **kwargs):
        yield b"\x00\x01"

    mocker.patch("audio.asr_client_stream._stream_audio_bytes", side_effect=_dummy_stream_audio_bytes)

    # Call the generator and assert that websocket recv failures surface
    # as RuntimeError("Transcription service websocket error")
    from audio.asr_client_stream import transcribe_audio_bytes_stream

    async def _consume():
        async for _chunk in transcribe_audio_bytes_stream(b"fake-audio"):
            pass

    with pytest.raises(RuntimeError) as excinfo:
        await _consume()

    assert str(excinfo.value) == "Transcription service websocket error"
```

You may need to adapt:

1. The import path `"audio.asr_client_stream"` in the `mocker.patch` calls and the `from audio.asr_client_stream import transcribe_audio_bytes_stream` line to match the real module where `transcribe_audio_bytes_stream`, `_stream_audio_bytes`, and the `websockets.connect` call are defined.
2. The fixture arguments (`mocker`, `settings`) to match your existing test fixtures (e.g., if you use a different name or source for settings).
3. The `TRANSCRIBE_WS_URI` attribute name or location if your settings module exposes it differently.

The key behaviour to preserve is:
- `FakeWS.recv()` raising `websockets.exceptions.WebSocketException`
- The generator `transcribe_audio_bytes_stream` being consumed in a loop
- The test asserting that this results in `RuntimeError("Transcription service websocket error")`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +44 to +51
return [
ASRParagraph(
speaker_no=line.speaker,
start=line.start,
end=line.end,
text=line.text,
)
for line in lines
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): lines_to_paragraphs drops the speaker_id field that the previous mapping included

The old implementation populated both speaker_no and a derived speaker_id (e.g., f"speaker-{line.speaker}"). The new helper only sets speaker_no, start, end, and text, which changes the response shape. If any callers rely on speaker_id (e.g., for labeling or grouping), this could break them. If that field is still required, please add it back in lines_to_paragraphs or generate it consistently elsewhere.

Comment on lines +195 to +197
cached_paragraphs = [
ASRParagraph.model_validate(p)
for p in (cached.validation or cached.transcription)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential crash if both cached.validation and cached.transcription are None

In the cache-hit branch, (cached.validation or cached.transcription) can evaluate to None if both are unset/false-y, causing a TypeError when iterated. If this state is possible (e.g., partially written rows), add a guard (e.g., default to [] or validate that at least one is populated) and handle the invalid cache state (skip cache or emit an error SSE).

Comment on lines +215 to +218
last_snapshot: list[ASRParagraph] = []
stream_iter = transcribe_audio_bytes_stream(data).__aiter__()

try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Iterator creation errors from transcribe_audio_bytes_stream bypass SSE-style error reporting

If transcribe_audio_bytes_stream fails during iterator creation (e.g., bad TRANSCRIBE_WS_URI), the exception escapes before the try/finally, so the client gets a plain 500 instead of an SSE error event. To keep SSE behavior consistent, wrap stream_iter construction in a try/except that yields an error via _format_error_event and then returns, mirroring the error handling used inside the loop.

Suggested change
last_snapshot: list[ASRParagraph] = []
stream_iter = transcribe_audio_bytes_stream(data).__aiter__()
try:
last_snapshot: list[ASRParagraph] = []
try:
stream_iter = transcribe_audio_bytes_stream(data).__aiter__()
except Exception as exc:
formatted_error = _format_error_event(exc)
if formatted_error is not None:
yield formatted_error
return
try:


# Final result persisted
with Session(engine) as session:
record = session.get(AudioTranscription, document_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test case for the generic INTERNAL_ERROR SSE branch when an unexpected exception is raised during streaming.

This test only covers the RuntimeErrorUPSTREAM_SERVICE_ERROR path. The generic except Exception: ... code="INTERNAL_ERROR" branch remains untested. Please add a similar test where the patched transcribe_audio_bytes_stream raises a non-RuntimeError (e.g. ValueError("boom")), and assert that the final SSE is event: error with code: "INTERNAL_ERROR", and that no AudioTranscription record is written to the DB.

Suggested implementation:

def test_should_emit_error_event_when_upstream_fails_mid_stream(
    asr_test_client,
    make_wav_bytes,
):
    client, engine = asr_test_client
    audio_bytes = make_wav_bytes(freq_hz=550)
    document_id = data_to_uuid(audio_bytes)

    async def failing_generator(_payload):
        yield [
            ASRParagraph(
    # existing test body continues here ...


def test_should_emit_internal_error_event_when_unexpected_exception_mid_stream(
    asr_test_client,
    make_wav_bytes,
    session,
):
    """
    Ensure that a non-RuntimeError raised during streaming results in an
    INTERNAL_ERROR SSE and that no AudioTranscription row is persisted.
    """
    from app.models import AudioTranscription  # adjust import if already present elsewhere

    client, engine = asr_test_client
    audio_bytes = make_wav_bytes(freq_hz=550)
    document_id = data_to_uuid(audio_bytes)

    async def failing_generator(_payload):
        # Simulate an unexpected exception in the upstream stream
        yield [
            ASRParagraph(
                text="partial text before failure",
                start=0.0,
                end=1.0,
                speaker=None,
            )
        ]
        raise ValueError("boom")

    # Patch the engine to use the failing generator
    engine.transcribe_audio_bytes_stream = failing_generator

    headers = {
        "content-type": "audio/wav",
        "x-document-id": str(document_id),
    }

    # Consume the SSE stream
    with client.stream(
        "POST",
        "/asr/transcribe/stream",
        headers=headers,
        content=audio_bytes,
    ) as response:
        assert response.status_code == 200

        # Collect raw SSE events
        raw_events = []
        current_event_lines = []

        for line in response.iter_lines():
            if not line:
                # End of one SSE event
                if current_event_lines:
                    raw_events.append("\n".join(current_event_lines))
                    current_event_lines = []
                continue
            # Decode bytes -> str if necessary
            if isinstance(line, bytes):
                line = line.decode("utf-8")
            current_event_lines.append(line)

        if current_event_lines:
            raw_events.append("\n".join(current_event_lines))

    # The last event should be an error with INTERNAL_ERROR code
    assert raw_events, "Expected at least one SSE event"
    last_event = raw_events[-1]
    assert last_event.startswith("event: error")

    # Find the data line and parse as JSON
    data_line = next(
        (l for l in last_event.splitlines() if l.startswith("data: ")),
        None,
    )
    assert data_line is not None, f"No data line in SSE event: {last_event}"
    import json

    payload = json.loads(data_line.removeprefix("data: ").strip())
    assert payload.get("code") == "INTERNAL_ERROR"

    # Ensure no AudioTranscription was persisted for this document
    transcription = (
        session.query(AudioTranscription)
        .filter(AudioTranscription.document_id == document_id)
        .one_or_none()
    )
    assert transcription is None
  1. Ensure that a session (SQLAlchemy session) fixture is available in this test module; if it has a different name (e.g. db_session), update the test signature and usage accordingly.
  2. If AudioTranscription is already imported at the top of the file, remove the local import inside the test to avoid duplication.
  3. If there is already a helper to parse SSE events in this test module (e.g. parse_sse_events(response)), replace the manual SSE parsing block with that helper to match existing conventions.
  4. Verify that the SSE error payload shape matches your implementation (key names like code, message, etc.). Adjust the payload.get("code") assertion or additional assertions to align with the actual error payload structure.
  5. If the endpoint path or headers differ in this project (e.g. using "/v1/asr/transcribe/stream" or different document-id header), update the URL and header keys to match the existing tests in this file.

Comment on lines +191 to +199
def test_should_format_error_event_with_code_and_detail():
frame = _format_error_event(detail="boom", code="UPSTREAM_SERVICE_ERROR")

assert frame.startswith("event: error\n")
assert '"code": "UPSTREAM_SERVICE_ERROR"' in frame
assert '"detail": "boom"' in frame


# MARK: POST Transcribe Stream
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding an explicit test to verify that keepalive comments are actually sent and are ignored by the SSE parser.

The parser correctly ignores comment lines (e.g. : keepalive), but there’s no test confirming that keepalives are actually emitted when TRANSCRIBE_SSE_KEEPALIVE_SECONDS > 0. To cover this, you could add a test that sets TRANSCRIBE_SSE_KEEPALIVE_SECONDS to a very small value, patches transcribe_audio_bytes_stream to yield one snapshot and then block long enough for a keepalive to be sent, and asserts that ": keepalive" appears in response.text while _parse_sse_events still only returns the expected transcription/done events.

Suggested change
def test_should_format_error_event_with_code_and_detail():
frame = _format_error_event(detail="boom", code="UPSTREAM_SERVICE_ERROR")
assert frame.startswith("event: error\n")
assert '"code": "UPSTREAM_SERVICE_ERROR"' in frame
assert '"detail": "boom"' in frame
# MARK: POST Transcribe Stream
def test_should_format_error_event_with_code_and_detail():
frame = _format_error_event(detail="boom", code="UPSTREAM_SERVICE_ERROR")
assert frame.startswith("event: error\n")
assert '"code": "UPSTREAM_SERVICE_ERROR"' in frame
assert '"detail": "boom"' in frame
def test_parse_sse_events_ignores_keepalive_comments():
"""Ensure keepalive comment frames are present in the raw stream but ignored by the parser."""
# Simulate a stream with interleaved keepalive comments and 2 real events
raw_sse = (
": keepalive\n\n"
"event: transcription\n"
'data: {"paragraph_index": 0, "text": "hola"}\n\n'
": keepalive\n\n"
"event: done\n"
"data: {}\n\n"
": keepalive\n\n"
)
# Sanity check: the raw payload must contain keepalive comments
assert ": keepalive" in raw_sse
events = _parse_sse_events(raw_sse)
# Only the non-comment events should be returned by the parser
assert events == [
("transcription", '{"paragraph_index": 0, "text": "hola"}'),
("done", "{}"),
]
# MARK: POST Transcribe Stream

Comment on lines +100 to +109
@pytest.mark.asyncio
async def test_should_yield_paragraphs_per_status_when_streaming():
fake_ws = FakeWS(
incoming_messages=[
_active_msg(0, "hola", 0.0, 1.0),
_active_msg(0, "hola mundo", 0.0, 2.0),
_ready_to_stop_msg(),
]
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test to cover websocket receive failures and ensure they surface as RuntimeError from transcribe_audio_bytes_stream.

Right now we only exercise the happy path (snapshots + ready_to_stop), but the generator’s contract around websocket.recv() failures isn’t tested. Please add a test that uses a FakeWS whose recv() raises websockets.exceptions.WebSocketException, and, with the existing patches to websockets.connect, _stream_audio_bytes, and settings.TRANSCRIBE_WS_URI, assert that iterating transcribe_audio_bytes_stream raises RuntimeError("Transcription service websocket error"). This will verify the behaviour that the SSE endpoint tests depend on.

Suggested implementation:

Please add the following test function to test/audio/test_asr_client_stream.py (for example, immediately after test_should_yield_paragraphs_per_status_when_streaming), and adjust any module paths to match your project:

@pytest.mark.asyncio
async def test_transcribe_audio_bytes_stream_raises_runtime_error_on_ws_recv_failure(
    mocker, settings
):
    # Fake websocket whose recv() always fails
    failing_ws = FakeWS(incoming_messages=[])
    async def _failing_recv():
        raise websockets.exceptions.WebSocketException("boom")
    failing_ws.recv = _failing_recv

    # Patch websockets.connect to return our failing websocket
    mocker.patch("audio.asr_client_stream.websockets.connect", return_value=failing_ws)

    # Patch the websocket URI to avoid real network usage
    settings.TRANSCRIBE_WS_URI = "wss://example.test/transcribe"

    # Patch _stream_audio_bytes to yield some dummy audio so the client
    # attempts to call websocket.recv()
    async def _dummy_stream_audio_bytes(*args, **kwargs):
        yield b"\x00\x01"

    mocker.patch("audio.asr_client_stream._stream_audio_bytes", side_effect=_dummy_stream_audio_bytes)

    # Call the generator and assert that websocket recv failures surface
    # as RuntimeError("Transcription service websocket error")
    from audio.asr_client_stream import transcribe_audio_bytes_stream

    async def _consume():
        async for _chunk in transcribe_audio_bytes_stream(b"fake-audio"):
            pass

    with pytest.raises(RuntimeError) as excinfo:
        await _consume()

    assert str(excinfo.value) == "Transcription service websocket error"

You may need to adapt:

  1. The import path "audio.asr_client_stream" in the mocker.patch calls and the from audio.asr_client_stream import transcribe_audio_bytes_stream line to match the real module where transcribe_audio_bytes_stream, _stream_audio_bytes, and the websockets.connect call are defined.
  2. The fixture arguments (mocker, settings) to match your existing test fixtures (e.g., if you use a different name or source for settings).
  3. The TRANSCRIBE_WS_URI attribute name or location if your settings module exposes it differently.

The key behaviour to preserve is:

  • FakeWS.recv() raising websockets.exceptions.WebSocketException
  • The generator transcribe_audio_bytes_stream being consumed in a loop
  • The test asserting that this results in RuntimeError("Transcription service websocket error").

@jedzill4 jedzill4 marked this pull request as draft April 22, 2026 00:32
…ve timeouts

The upstream WhisperLiveKit server uses a bounded WS frame queue and
processes audio at real-time pace. Firehosing audio saturates the
queue, blocks the frame reader, and causes ping/pong control frames
to time out with 1011 close.

Add _Backpressure lead-budget pacer that caps how far ahead of the
server's reported progress the client is allowed to send. Progress
is the max of server_current_time and (wallclock_elapsed - grace).

Also:
- Add ASRStreamChunk NamedTuple with current_time/total_time fields
- Expose current_time/total_time in SSE transcription and done events
- Offload audio decode to worker thread (asyncio.to_thread) to avoid
  blocking the event loop during librosa.load
- Add TRANSCRIBE_WS_LEAD_BUDGET_SECONDS and INITIAL_GRACE settings
- Add TRANSCRIBE_WS_PING_INTERVAL/TIMEOUT settings (20s/60s defaults)
- Improve WebSocket error logging with close code/reason details

Stress tested: N=1 and N=2 concurrent 15min transcriptions complete
cleanly. N=3+ exposes server-side GPU saturation (WhisperLiveKit's
own ping timeout fires when event loop stalls), which requires a
concurrency gate or server-side tuning -- not a client fix.
- Remove dead transcribe_audio_path() function (never called)
- Add _Backpressure to non-streaming transcribe_audio_bytes() for
  consistent 1011 protection across both code paths
- Unify double librosa.load decode into single _DecodedAudio struct
  (eliminates redundant audio decode in streaming path)
- Convert ASRStreamChunk from NamedTuple to BaseModel for consistency
  with the rest of the codebase
- Add from __future__ import annotations to asr_client.py
- Fix _Backpressure docstring to use standard Google-style sections
- Fix _describe_ws_exception docstring to remove RST :class: refs
- Fix f-string logger calls to %-format in transcribe.py
- Propagate WebSocket errors in _receive_updates instead of silently
  swallowing them (consistent with streaming path)
- Clean stale ASR_WS_PING_INTERVA/TIMEOUT vars from local .env
- Add except RuntimeError: raise to transcribe_audio_bytes so descriptive
  error messages from _receive_updates pass through instead of being
  wrapped in generic 'Unexpected error during transcription'
- Normalize all docstrings in asr_client.py and transcribe.py to
  consistent Google-style (remove verbose type annotations like
  '(bytes)', '(str)', use concise return descriptions)
- Document ws_uri Depends parameter purpose in transcribe() docstring
@jedzill4 jedzill4 changed the title feat(asr): add POST /asr/transcribe/stream SSE streaming endpoint feat(asr): add SSE streaming endpoint with lead-budget backpressure Apr 22, 2026
- Extract _format_sse_event() from near-identical _format_transcription_event
  and _format_done_event (single parametrized helper)
- Extract _iter_ws_messages() async generator to deduplicate the
  recv-parse-match loop shared between _receive_updates and
  transcribe_audio_bytes_stream
- Add test: non-RuntimeError during streaming emits INTERNAL_ERROR
- Add test: progress fields (current_time/total_time) in done event
- Add test: SSE keepalive comments fire during slow transcription
- Add test: non-streaming websocket RuntimeError → 502
- Add test: non-streaming None result → 500

20 tests passing (up from 15).
The lead-budget _Backpressure class and its plumbing were superseded by a simple 0.01s inter-chunk sleep that yields to the event loop. Drop the dead class, unused settings (TRANSCRIBE_WS_LEAD_BUDGET_SECONDS, TRANSCRIBE_WS_LEAD_INITIAL_GRACE_SECONDS), stale docstrings, and the test's obsolete _backpressure kwarg. Expose the sleep duration as TRANSCRIBE_WS_CHUNK_SLEEP_SECONDS.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant