Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions backend/app/api/routes/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
logger = logging.getLogger(__name__)
router = APIRouter(tags=["threads"])

from pydantic import BaseModel
from typing import Optional


def send_callback(callback_url: str, data: dict):
"""Send results to the callback URL (synchronously)."""
"""Send results to the callback URL (synchron ously)."""
try:
session = requests.Session()
# uncomment this to run locally without SSL
Expand Down Expand Up @@ -222,6 +225,8 @@ async def threads(
_current_user: UserOrganization = Depends(get_current_user_org),
):
"""Asynchronous endpoint that processes requests in background."""

# Fetch OpenAI credentials (required)
credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
Expand All @@ -234,22 +239,22 @@ async def threads(
)
client = OpenAI(api_key=credentials["api_key"])

# Fetch Langfuse credentials (optional)
langfuse_credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
provider="langfuse",
project_id=request.get("project_id"),
)
if not langfuse_credentials:
return APIResponse.failure_response(
error="LANGFUSE keys not configured for this organization."

# If Langfuse credentials exist, configure Langfuse
if langfuse_credentials:
langfuse_context.configure(
secret_key=langfuse_credentials["secret_key"],
public_key=langfuse_credentials["public_key"],
host=langfuse_credentials["host"],
)

langfuse_context.configure(
secret_key=langfuse_credentials["secret_key"],
public_key=langfuse_credentials["public_key"],
host=langfuse_credentials["host"],
)
# Validate thread
is_valid, error_message = validate_thread(client, request.get("thread_id"))
if not is_valid:
Expand Down Expand Up @@ -350,15 +355,29 @@ async def start_thread(
"""
Create a new OpenAI thread for the given question and start polling in the background.
"""
prompt = request["question"]
client = OpenAI(api_key=settings.OPENAI_API_KEY)
# Fetch OpenAI credentials (required)
openai_credentials = get_provider_credential(
session=db,
org_id=_current_user.organization_id,
provider="openai",
project_id=request.get("project_id"),
)

if not openai_credentials or "api_key" not in openai_credentials:
return APIResponse.failure_response(
error="OpenAI API key not configured for this organization."
)

client = OpenAI(api_key=openai_credentials["api_key"])
# Setup thread
is_success, error = setup_thread(client, request)
if not is_success:
return APIResponse.failure_response(error=error)

thread_id = request["thread_id"]
prompt = request["question"]

# Insert thread data into the database
upsert_thread_result(
db,
OpenAIThreadCreate(
Expand All @@ -370,6 +389,7 @@ async def start_thread(
),
)

# Schedule background task
background_tasks.add_task(poll_run_and_prepare_response, request, client, db)

return APIResponse.success_response(
Expand Down