Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ Create `.env` in the project root:
```bash
cat > .env << 'EOF'
GROQ_API_KEY=your_groq_key_here
# Required for the realtime demo (constella-realtime); not used by the legacy
# push-to-talk demo (constella-demo) or the eval harness.
CARTESIA_API_KEY=your_cartesia_key_here
# Optional: override the default Cartesia voice (UUID from play.cartesia.ai/voices)
# CARTESIA_VOICE_ID=
ANTHROPIC_API_KEY=optional_for_llm_judge_eval
HF_HOME=/Users/hgz/.cache/huggingface
EOF
Expand Down
311 changes: 311 additions & 0 deletions constella/demo/realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
"""Realtime voice demo — auto-commit on silence via fastrtc + Cartesia.

Per-turn pipeline:
mic → fastrtc VAD → Groq Whisper → run_turn() → Cartesia streaming TTS → speaker

UI is a Gradio Blocks layout: mic + text input + examples on the left, Ana's
reply + running transcript on the right, specialist verdict JSON at the bottom.

Usage:
constella-realtime # localhost:7860
constella-realtime --port 8080
"""
from __future__ import annotations

import argparse
import json
import logging
import os
from pathlib import Path
from typing import Literal

import gradio as gr
import numpy as np

from constella.orchestrator import run_turn
from constella.realtime.audio import cartesia_pcm_to_numpy, numpy_to_wav_tempfile
from constella.realtime.tts import SAMPLE_RATE as TTS_SAMPLE_RATE, stream_tts
from constella.schemas import ConversationState, PatientContext

log = logging.getLogger(__name__)

DEFAULT_PATIENT = (
Path(__file__).resolve().parents[1] / "eval" / "scenarios" / "patient_maria.json"
)

EXAMPLE_LINES = [
"Hi, yes this is Maria.",
"I checked my sugar this morning and it was 142.",
"Hola, soy María. ¿Cómo está usted?",
"Pues mi blood sugar esta mañana estaba en 165, no muy mal.",
"Sí, tomé el metformin con breakfast pero el insulin lo olvidé last night.",
"Tengo un dolor fuerte en el pecho y me duele el brazo izquierdo.",
]

_SPANISH_FUNCTION_WORDS = frozenset({
"hola", "está", "sí", "bueno", "pero", "porque", "cómo", "qué",
"dónde", "entonces", "muy", "mucho", "también", "señora", "usted",
"soy", "eres", "tomó", "gracias", "bien", "tiene", "desde", "hospital",
"medicamento", "azúcar", "insulina", "doctor", "doctora", "ahora",
"poco", "ayer", "mañana", "preocupes", "quiero", "puedes",
})


def _load_patient() -> PatientContext:
return PatientContext.model_validate_json(DEFAULT_PATIENT.read_text())


def _detect_reply_language(text: str) -> Literal["en", "es"] | None:
"""Heuristic language detection on Ana's reply text.

We can't use the language specialist's verdict as the TTS hint because it
runs on the PATIENT's utterance — Ana may reply in a different language
(her prompt tells her to match the patient's register, which often means
Spanish even when the patient said a greeting in English). Cartesia needs
the language of the TEXT IT IS SPEAKING, which is Ana's reply.
"""
if any(c in text for c in "ñ¿¡"):
return "es"
lower = text.lower()
if any(c in lower for c in "áéíóú"):
return "es"
words = set(lower.split())
if len(words & _SPANISH_FUNCTION_WORDS) >= 2:
return "es"
return "en"


def _transcribe(wav_path: Path) -> str:
"""Groq Whisper — batch, but fastrtc only calls us once per VAD pause so
the clip is already short (typically 2-5 s)."""
from openai import OpenAI

api_key = os.environ.get("GROQ_API_KEY")
if not api_key:
raise RuntimeError("GROQ_API_KEY required for ASR in the realtime demo.")
client = OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1")
with open(wav_path, "rb") as f:
resp = client.audio.transcriptions.create(
model="whisper-large-v3-turbo",
file=f,
response_format="text",
)
return str(resp).strip()


def _format_transcript(state: ConversationState) -> str:
if not state.history:
return "_(start of call)_"
lines = []
for turn in state.history[-20:]:
who = "**María:**" if turn.speaker == "patient" else "**Nurse Ana:**"
lines.append(f"{who} {turn.text}")
return "\n\n".join(lines)


def _build_verdict(result) -> str:
verdict = {
"action": result.action.kind,
"latency_ms": {
"primary": round(result.primary_latency_ms),
"specialists_parallel": round(result.specialist_latency_ms),
"total": round(result.total_latency_ms),
},
"language": result.language.model_dump() if result.language else None,
"medication": result.medication.model_dump() if result.medication else None,
"labs": result.labs.model_dump() if result.labs else None,
"escalation": result.escalation.model_dump() if result.escalation else None,
}
return json.dumps(verdict, indent=2, default=str)


# Module-level conversation state — single-user demo. Keeping it here (rather
# than in gr.State) is deliberate: threading state through AdditionalOutputs
# triggers a state_change event on every turn, which re-enters the
# on_additional_outputs handler and silently drops subsequent UI updates.
# For multi-user prod we'd key this by fastrtc session_id.
_state: ConversationState | None = None


def _ensure_state() -> ConversationState:
global _state
if _state is None:
_state = ConversationState(patient=_load_patient())
return _state


def _voice_handler(audio, _webrtc_value):
"""fastrtc handler — runs on every VAD pause.

Signature note: fastrtc prepends a `"__webrtc_value__"` placeholder when
the WebRTC component's value is passed as a string (see
fastrtc/tracks.py:set_args). After the audio replacement, the args become
(audio_tuple, webrtc_value). No other inputs are wired to the stream, so
the handler takes exactly two positional args.
"""
from fastrtc import AdditionalOutputs
global _state

state = _ensure_state()
sample_rate, np_audio = audio
wav_path = numpy_to_wav_tempfile(sample_rate, np_audio)
try:
patient_text = _transcribe(wav_path)
finally:
wav_path.unlink(missing_ok=True)

if not patient_text:
log.info("empty transcription, skipping turn")
return

log.info("patient: %s", patient_text)
result = run_turn(state, patient_text)
_state = result.state
log.info(
"nurse [%s, %.0f ms]: %s",
result.action.kind,
result.total_latency_ms,
result.nurse_text,
)

reply_lang = _detect_reply_language(result.nurse_text)

yield AdditionalOutputs(
result.nurse_text,
_build_verdict(result),
_format_transcript(result.state),
)

for pcm_chunk in stream_tts(result.nurse_text, language=reply_lang):
yield cartesia_pcm_to_numpy(pcm_chunk, TTS_SAMPLE_RATE)


def _text_handler(patient_text: str):
"""Text-only fallback path: no audio playback, just text updates."""
global _state
if not patient_text or not patient_text.strip():
state = _ensure_state()
return "", "{}", _format_transcript(state)

state = _ensure_state()
result = run_turn(state, patient_text.strip())
_state = result.state
return (
result.nurse_text,
_build_verdict(result),
_format_transcript(result.state),
)


def build_ui() -> gr.Blocks:
from fastrtc import ReplyOnPause, WebRTC

with gr.Blocks(title="Constella — bilingual constellation voice agent") as demo:
gr.Markdown(
"# Constella\n"
"You are **María González** — 58-year-old diabetic patient, just out of hospital. "
"**Nurse Ana** will follow up. Speak or type. English, Spanish, or both mid-sentence.\n\n"
"> **Voice path:** click **Record** and speak; Ana answers the moment you pause. \n"
"> **Text path:** type below and press Enter."
)

with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### You (María)")
webrtc = WebRTC(
mode="send-receive",
modality="audio",
label="Speak",
full_screen=False,
height=240,
button_labels={
"start": "Record",
"stop": "Stop",
"waiting": "Ana is replying…",
},
)
text_in = gr.Textbox(
placeholder="or type here...",
label="Text",
lines=2,
)
send_btn = gr.Button("Send text", variant="secondary", size="sm")
gr.Examples(
examples=[[ex] for ex in EXAMPLE_LINES],
inputs=[text_in],
label="Try these",
)

with gr.Column(scale=1):
gr.Markdown("### Nurse Ana")
nurse_text = gr.Textbox(
label="Latest reply",
lines=4,
interactive=False,
)
transcript = gr.Markdown(
value="_(start of call)_",
label="Conversation transcript",
)

verdict_out = gr.Code(
label="What the 4 specialists saw (action + latency breakdown)",
language="json",
lines=18,
)

# Voice path — audio in, audio out, text/verdict/transcript as additional outputs.
webrtc.stream(
fn=ReplyOnPause(_voice_handler),
inputs=[webrtc],
outputs=[webrtc],
time_limit=60,
)
webrtc.on_additional_outputs(
lambda nt, v, t: (nt, v, t),
outputs=[nurse_text, verdict_out, transcript],
)

# Text path (no audio).
text_in.submit(
_text_handler,
inputs=[text_in],
outputs=[nurse_text, verdict_out, transcript],
)
send_btn.click(
_text_handler,
inputs=[text_in],
outputs=[nurse_text, verdict_out, transcript],
)

return demo


def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
provider_override = os.environ.get("CONSTELLA_PROVIDER", "").lower()
if os.environ.get("GROQ_API_KEY") and provider_override != "openrouter":
log.warning(
"Using Groq for LLM inference. Free-tier rate limits (6000 TPM on 8B) "
"will cause multi-second backoff on realtime bursts — Constella fires 5 "
"concurrent calls per turn. If you see latency > 3 s per turn, either "
"set CONSTELLA_PROVIDER=openrouter in .env or upgrade to Groq Dev tier "
"at https://console.groq.com/settings/billing."
)
if not os.environ.get("CARTESIA_API_KEY"):
log.error(
"CARTESIA_API_KEY is not set. The voice path will fail on the first "
"TTS call. Add it to .env and restart."
)
p = argparse.ArgumentParser()
p.add_argument("--port", type=int, default=7860)
args = p.parse_args()
build_ui().launch(server_port=args.port)
return 0


if __name__ == "__main__":
raise SystemExit(main())
Loading