1+ import csv
2+ import io
3+ from typing import List , Dict , Any
4+ from fastapi import APIRouter , UploadFile , File , Depends , HTTPException
5+ from sqlmodel import Session
6+ from langfuse import Langfuse
7+
8+ from app .api .deps import get_current_user_org , get_db
9+ from app .core import logging
10+ from app .models import UserOrganization , UserProjectOrg
11+ from app .utils import APIResponse
12+ from app .crud .credentials import get_provider_credential
13+
14+ logger = logging .getLogger (__name__ )
15+ router = APIRouter (tags = ["evaluation" ])
16+
17+ @router .post ("/evaluation/upload-dataset" )
18+ async def upload_dataset (
19+ dataset_name : str ,
20+ file : UploadFile = File (...),
21+ _session : Session = Depends (get_db ),
22+ _current_user : UserOrganization = Depends (get_current_user_org ),
23+ ):
24+ """
25+ Upload a CSV dataset for evaluation.
26+ The CSV file should have two columns: input and expected_output.
27+ Only the first 30 rows will be processed.
28+ """
29+ if not file .filename .endswith ('.csv' ):
30+ return APIResponse .failure_response (error = "Only CSV files are supported" )
31+
32+ # Get Langfuse credentials
33+ langfuse_credentials = get_provider_credential (
34+ session = _session ,
35+ org_id = _current_user .organization_id ,
36+ provider = "langfuse" ,
37+ project_id = _current_user .project_id ,
38+ )
39+ if not langfuse_credentials :
40+ return APIResponse .failure_response (
41+ error = "Langfuse credentials not configured for this organization."
42+ )
43+
44+ # Initialize Langfuse client
45+ langfuse = Langfuse (
46+ public_key = langfuse_credentials ["public_key" ],
47+ secret_key = langfuse_credentials ["secret_key" ],
48+ host = langfuse_credentials ["host" ],
49+ )
50+ # langfuse = Langfuse(
51+ # public_key="pk-lf-00d2d47a-86f0-4d8f-9d8b-ef24fc722731",
52+ # secret_key="sk-lf-5a7caba5-9293-409d-b1ef-e4b3fef990b7",
53+ # host="https://cloud.langfuse.com",
54+ # )
55+
56+ try :
57+ # Read and validate CSV file
58+ contents = await file .read ()
59+ logger .info (f"Read { len (contents )} bytes from file" )
60+
61+ # Decode contents and create CSV reader
62+ csv_content = contents .decode ('utf-8' )
63+ logger .info (f"CSV content preview: { csv_content [:200 ]} ..." )
64+
65+ csv_file = io .StringIO (csv_content )
66+ reader = csv .DictReader (csv_file )
67+
68+ # Validate headers
69+ if not reader .fieldnames :
70+ return APIResponse .failure_response (error = "CSV file is empty or has no headers" )
71+
72+ logger .info (f"CSV headers found: { reader .fieldnames } " )
73+
74+ if not all (header in reader .fieldnames for header in ['input' , 'expected_output' ]):
75+ return APIResponse .failure_response (
76+ error = "CSV must contain 'input' and 'expected_output' columns"
77+ )
78+
79+ # Create dataset
80+ try :
81+ dataset = langfuse .create_dataset (name = dataset_name )
82+ logger .info (f"Created dataset with ID: { dataset .id } " )
83+ except Exception as e :
84+ logger .error (f"Error creating dataset: { str (e )} " )
85+ return APIResponse .failure_response (error = f"Failed to create dataset: { str (e )} " )
86+
87+ # Process rows (limited to 30)
88+ rows_processed = 0
89+ rows_data = [] # Store rows for logging
90+
91+ for row in reader :
92+ if rows_processed >= 30 :
93+ break
94+
95+ try :
96+ # Log the row data
97+ logger .info (f"Processing row { rows_processed + 1 } : { row } " )
98+
99+ # Create dataset item
100+ item = langfuse .create_dataset_item (
101+ dataset_name = dataset_name ,
102+ input = row ['input' ],
103+ expected_output = row ['expected_output' ]
104+ )
105+ logger .info (f"Created dataset item with ID: { item .id } " )
106+
107+ rows_processed += 1
108+ rows_data .append (row )
109+ except Exception as e :
110+ logger .error (f"Error processing row { rows_processed + 1 } : { str (e )} " )
111+ continue
112+
113+ if rows_processed == 0 :
114+ return APIResponse .failure_response (error = "No rows were successfully processed" )
115+
116+ # Log summary
117+ logger .info (f"Successfully processed { rows_processed } rows" )
118+ logger .info (f"Processed data: { rows_data } " )
119+
120+ return APIResponse .success_response (
121+ data = {
122+ "message" : f"Successfully uploaded { rows_processed } rows to dataset '{ dataset_name } '" ,
123+ "rows_processed" : rows_processed ,
124+ "dataset_id" : dataset .id if hasattr (dataset , 'id' ) else None
125+ }
126+ )
127+
128+ except Exception as e :
129+ logger .error (f"Error uploading dataset: { str (e )} " )
130+ return APIResponse .failure_response (error = str (e ))
131+ finally :
132+ await file .close ()
0 commit comments