Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,81 @@
create_langfuse_dataset_run,
update_traces_with_cosine_scores,
)
from app.crud.job import get_batch_job
from app.crud.job import get_batch_job, update_batch_job
from app.models import EvaluationRun
from app.models.batch_job import BatchJob, BatchJobUpdate
from app.utils import get_langfuse_client, get_openai_client

logger = logging.getLogger(__name__)


def _extract_batch_error_message(
provider: OpenAIBatchProvider,
error_file_id: str,
batch_job: BatchJob,
session: Session,
) -> str:
"""
Download the error file from OpenAI, parse JSONL entries, and extract
the most common error message. Updates batch_job.error_message.

Args:
provider: OpenAI batch provider instance
error_file_id: OpenAI error file ID
batch_job: BatchJob to update with error message
session: Database session

Returns:
Human-readable error message with the top error and counts
"""
try:
error_content = provider.download_file(error_file_id)
lines = error_content.strip().split("\n")

error_counts: dict[str, int] = {}
for line in lines:
try:
entry = json.loads(line)
message = (
entry.get("response", {})
.get("body", {})
.get("error", {})
.get("message", "Unknown error")
)
error_counts[message] = error_counts.get(message, 0) + 1
except json.JSONDecodeError:
continue

if error_counts:
top_error = max(error_counts, key=error_counts.get)
top_count = error_counts[top_error]
total = sum(error_counts.values())
error_msg = f"{top_error} ({top_count}/{total} requests)"
else:
error_msg = "Batch completed with errors but could not parse error file"

# Update batch_job with extracted error message
batch_job_update = BatchJobUpdate(error_message=error_msg)
update_batch_job(
session=session, batch_job=batch_job, batch_job_update=batch_job_update
)

logger.info(
f"[_extract_batch_error_message] Extracted error | batch_job_id={batch_job.id} | {error_msg}"
)

return error_msg

except Exception as e:
logger.error(
f"[_extract_batch_error_message] Failed to extract errors | batch_job_id={batch_job.id} | {e}",
exc_info=True,
)
return (
f"Batch completed with all requests failed (error_file_id: {error_file_id})"
)


def parse_evaluation_output(
raw_results: list[dict[str, Any]], dataset_items: list[dict[str, Any]]
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -560,14 +628,49 @@ async def check_and_process_evaluation(

# IMPORTANT: Poll OpenAI to get the latest status before checking
provider = OpenAIBatchProvider(client=openai_client)
poll_batch_status(session=session, provider=provider, batch_job=batch_job)
status_result = poll_batch_status(
session=session, provider=provider, batch_job=batch_job
)

# Refresh batch_job to get the updated provider_status
session.refresh(batch_job)
provider_status = batch_job.provider_status

# Handle different provider statuses
if provider_status == "completed":
# Check if batch completed but all requests failed
# (output_file_id is absent, error_file_id is present)
if not batch_job.provider_output_file_id and status_result.get(
"error_file_id"
):
error_msg = _extract_batch_error_message(
provider=provider,
error_file_id=status_result["error_file_id"],
batch_job=batch_job,
session=session,
)

eval_run = update_evaluation_run(
session=session,
eval_run=eval_run,
status="failed",
error_message=error_msg,
)

logger.error(
f"[check_and_process_evaluation] {log_prefix} Batch completed with all requests failed | {error_msg}"
)

return {
"run_id": eval_run.id,
"run_name": eval_run.run_name,
"previous_status": previous_status,
"current_status": "failed",
"provider_status": provider_status,
"action": "failed",
"error": error_msg,
}

# Process the completed evaluation
await process_completed_evaluation(
eval_run=eval_run,
Expand Down
Loading
Loading