Skip to content

Commit 284eca2

Browse files
committed
using Circuit breaker and ThreadPoolExecutor
1 parent 81535a3 commit 284eca2

File tree

1 file changed

+114
-65
lines changed

1 file changed

+114
-65
lines changed

backend/app/crud/evaluations/langfuse.py

Lines changed: 114 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -410,71 +410,97 @@ def fetch_trace_scores_from_langfuse(
410410
f"[fetch_trace_scores_from_langfuse] Found traces | count={len(trace_ids)}"
411411
)
412412

413-
# 3. Fetch trace details with scores for each trace
413+
# 3. Fetch trace details with scores concurrently
414414
traces: list[TraceData] = []
415415
# Track score aggregations by name: {name: {"data_type": str, "values": list}}
416416
score_aggregations: dict[str, dict[str, Any]] = {}
417417

418-
for trace_id in trace_ids:
419-
try:
420-
trace = langfuse.api.trace.get(trace_id)
421-
trace_data: TraceData = {
422-
"trace_id": trace_id,
423-
"question": "",
424-
"llm_answer": "",
425-
"ground_truth_answer": "",
426-
"question_id": "",
427-
"scores": [],
428-
}
418+
# Circuit breaker: abort early if too many consecutive failures
419+
max_consecutive_failures = 5
420+
consecutive_failures = 0
421+
total_failures = 0
422+
423+
def _fetch_single_trace(trace_id: str) -> TraceData | None:
424+
"""Fetch a single trace from Langfuse and extract its data."""
425+
trace = langfuse.api.trace.get(trace_id)
426+
trace_data: TraceData = {
427+
"trace_id": trace_id,
428+
"question": "",
429+
"llm_answer": "",
430+
"ground_truth_answer": "",
431+
"question_id": "",
432+
"scores": [],
433+
}
434+
435+
# Get question from input
436+
if trace.input:
437+
if isinstance(trace.input, dict):
438+
trace_data["question"] = trace.input.get("question", "")
439+
elif isinstance(trace.input, str):
440+
trace_data["question"] = trace.input
441+
442+
# Get answer from output
443+
if trace.output:
444+
if isinstance(trace.output, dict):
445+
trace_data["llm_answer"] = trace.output.get("answer", "")
446+
elif isinstance(trace.output, str):
447+
trace_data["llm_answer"] = trace.output
448+
449+
# Get ground truth and question_id from metadata
450+
if trace.metadata and isinstance(trace.metadata, dict):
451+
trace_data["ground_truth_answer"] = trace.metadata.get(
452+
"ground_truth", ""
453+
)
454+
trace_data["question_id"] = trace.metadata.get("question_id", "")
455+
456+
# Add scores from this trace
457+
if trace.scores:
458+
for score in trace.scores:
459+
score_name = score.name
460+
score_value = score.value
461+
score_comment = score.comment
462+
# Get data_type from Langfuse score, default to NUMERIC
463+
data_type = getattr(score, "data_type", None) or "NUMERIC"
464+
465+
# Build score entry for trace
466+
# Round numeric values to 2 decimal places
467+
if data_type != "CATEGORICAL" and isinstance(
468+
score_value, (int, float)
469+
):
470+
score_value = round(float(score_value), 2)
471+
472+
score_entry: TraceScore = {
473+
"name": score_name,
474+
"value": score_value,
475+
"data_type": data_type,
476+
}
477+
if score_comment:
478+
score_entry["comment"] = score_comment
429479

430-
# Get question from input
431-
if trace.input:
432-
if isinstance(trace.input, dict):
433-
trace_data["question"] = trace.input.get("question", "")
434-
elif isinstance(trace.input, str):
435-
trace_data["question"] = trace.input
436-
437-
# Get answer from output
438-
if trace.output:
439-
if isinstance(trace.output, dict):
440-
trace_data["llm_answer"] = trace.output.get("answer", "")
441-
elif isinstance(trace.output, str):
442-
trace_data["llm_answer"] = trace.output
443-
444-
# Get ground truth and question_id from metadata
445-
if trace.metadata and isinstance(trace.metadata, dict):
446-
trace_data["ground_truth_answer"] = trace.metadata.get(
447-
"ground_truth", ""
448-
)
449-
trace_data["question_id"] = trace.metadata.get("question_id", "")
450-
451-
# Add scores from this trace
452-
if trace.scores:
453-
for score in trace.scores:
454-
score_name = score.name
455-
score_value = score.value
456-
score_comment = score.comment
457-
# Get data_type from Langfuse score, default to NUMERIC
458-
data_type = getattr(score, "data_type", None) or "NUMERIC"
459-
460-
# Build score entry for trace
461-
# Round numeric values to 2 decimal places
462-
if data_type != "CATEGORICAL" and isinstance(
463-
score_value, (int, float)
464-
):
465-
score_value = round(float(score_value), 2)
466-
467-
score_entry: TraceScore = {
468-
"name": score_name,
469-
"value": score_value,
470-
"data_type": data_type,
471-
}
472-
if score_comment:
473-
score_entry["comment"] = score_comment
480+
trace_data["scores"].append(score_entry)
481+
482+
return trace_data
483+
484+
max_workers = min(5, max(1, len(trace_ids)))
485+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
486+
future_to_trace_id = {
487+
executor.submit(_fetch_single_trace, tid): tid for tid in trace_ids
488+
}
489+
490+
for future in as_completed(future_to_trace_id):
491+
trace_id = future_to_trace_id[future]
492+
try:
493+
trace_data = future.result()
494+
if trace_data is None:
495+
continue
474496

475-
trace_data["scores"].append(score_entry)
497+
consecutive_failures = 0
476498

477-
# Aggregate for summary calculation
499+
# Aggregate scores for summary calculation
500+
for score_entry in trace_data["scores"]:
501+
score_name = score_entry["name"]
502+
score_value = score_entry["value"]
503+
data_type = score_entry["data_type"]
478504
if score_value is not None:
479505
if score_name not in score_aggregations:
480506
score_aggregations[score_name] = {
@@ -483,14 +509,37 @@ def fetch_trace_scores_from_langfuse(
483509
}
484510
score_aggregations[score_name]["values"].append(score_value)
485511

486-
traces.append(trace_data)
512+
traces.append(trace_data)
487513

488-
except Exception as e:
489-
logger.warning(
490-
f"[fetch_trace_scores_from_langfuse] Failed to fetch trace | "
491-
f"trace_id={trace_id} | error={e}"
492-
)
493-
continue
514+
except Exception as e:
515+
consecutive_failures += 1
516+
total_failures += 1
517+
logger.warning(
518+
f"[fetch_trace_scores_from_langfuse] Failed to fetch trace | "
519+
f"trace_id={trace_id} | error={e}"
520+
)
521+
522+
if consecutive_failures >= max_consecutive_failures:
523+
# Cancel remaining futures
524+
for f in future_to_trace_id:
525+
f.cancel()
526+
logger.error(
527+
f"[fetch_trace_scores_from_langfuse] Circuit breaker triggered | "
528+
f"consecutive_failures={consecutive_failures} | "
529+
f"total_failures={total_failures} | "
530+
f"total_traces={len(trace_ids)}"
531+
)
532+
raise RuntimeError(
533+
f"Langfuse API unavailable: {consecutive_failures} consecutive "
534+
f"trace fetches failed. Aborting to prevent prolonged outage."
535+
)
536+
537+
# If more than half of traces failed, treat as a Langfuse outage
538+
if total_failures > 0 and total_failures > len(trace_ids) // 2:
539+
raise RuntimeError(
540+
f"Langfuse API degraded: {total_failures}/{len(trace_ids)} trace "
541+
f"fetches failed. Try again later."
542+
)
494543

495544
# 4. Calculate summary scores for all scores that have at least one value
496545
summary_scores = []

0 commit comments

Comments
 (0)