Skip to content
117 changes: 117 additions & 0 deletions backend/app/alembic/versions/050_add_project_id_to_job_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""add project id to job table

Revision ID: 050
Revises: 049
Create Date: 2026-04-07 14:23:00.938901

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "050"
down_revision = "049"
branch_labels = None
depends_on = None

chain_status_enum = postgresql.ENUM(
"PENDING",
"RUNNING",
"FAILED",
"COMPLETED",
name="chainstatus",
create_type=False,
)


def upgrade():
chain_status_enum.create(op.get_bind())
op.add_column(
"job",
sa.Column(
"project_id",
sa.Integer(),
nullable=True,
comment="Project ID of the job's project",
),
)
op.alter_column(
"llm_call",
"chain_id",
existing_type=sa.UUID(),
comment="Reference to the parent chain (NULL for standalone llm_call requests)",
existing_comment="Reference to the parent chain (NULL for standalone /llm/call requests)",
existing_nullable=True,
)
op.alter_column(
"llm_call",
"input_type",
existing_type=sa.VARCHAR(),
comment="Input type: text, audio, image, pdf, multimodal",
existing_comment="Input type: text, audio, image",
existing_nullable=False,
)
op.execute("ALTER TABLE llm_chain ALTER COLUMN status DROP DEFAULT")
op.alter_column(
"llm_chain",
"status",
existing_type=sa.VARCHAR(),
type_=chain_status_enum,
existing_comment="Chain execution status (pending, running, failed, completed)",
existing_nullable=False,
postgresql_using="UPPER(status)::chainstatus",
)
op.execute(
"ALTER TABLE llm_chain ALTER COLUMN status SET DEFAULT 'PENDING'::chainstatus"
)
op.alter_column(
"llm_chain",
"error",
existing_type=sa.TEXT(),
type_=sqlmodel.sql.sqltypes.AutoString(),
existing_comment="Error message if the chain execution failed",
existing_nullable=True,
)


def downgrade():
op.alter_column(
"llm_chain",
"error",
existing_type=sqlmodel.sql.sqltypes.AutoString(),
type_=sa.TEXT(),
existing_comment="Error message if the chain execution failed",
existing_nullable=True,
)
op.execute("ALTER TABLE llm_chain ALTER COLUMN status DROP DEFAULT")
op.alter_column(
"llm_chain",
"status",
existing_type=sa.Enum(
"PENDING", "RUNNING", "FAILED", "COMPLETED", name="chainstatus"
),
type_=sa.VARCHAR(),
existing_comment="Chain execution status (pending, running, failed, completed)",
existing_nullable=False,
)
op.execute("ALTER TABLE llm_chain ALTER COLUMN status SET DEFAULT 'pending'")
op.execute("DROP TYPE IF EXISTS chainstatus")
op.alter_column(
"llm_call",
"input_type",
existing_type=sa.VARCHAR(),
comment="Input type: text, audio, image",
existing_comment="Input type: text, audio, image, pdf, multimodal",
existing_nullable=False,
)
op.alter_column(
"llm_call",
"chain_id",
existing_type=sa.UUID(),
comment="Reference to the parent chain (NULL for standalone /llm/call requests)",
existing_comment="Reference to the parent chain (NULL for standalone llm_call requests)",
existing_nullable=True,
)
op.drop_column("job", "project_id")
10 changes: 10 additions & 0 deletions backend/app/api/docs/llm/get_llm_call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Retrieve the status and results of an LLM call job by job ID.

This endpoint allows you to poll for the status and results of an asynchronous LLM call job that was previously initiated via the POST `/llm/call` endpoint.


### Notes

- This endpoint returns both the job status AND the actual LLM response when complete
- LLM responses are also delivered asynchronously via the callback URL (if provided)
- Jobs can be queried at any time after creation
108 changes: 100 additions & 8 deletions backend/app/api/routes/llm.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import logging
from uuid import UUID

from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.models import LLMCallRequest, LLMCallResponse, Message
from app.crud.jobs import JobCrud
from app.crud.llm import get_llm_calls_by_job_id
from app.models import (
LLMCallRequest,
LLMCallResponse,
LLMJobImmediatePublic,
LLMJobPublic,
JobStatus,
)
from app.models.llm.response import LLMResponse, Usage
from app.services.llm.jobs import start_job
from app.utils import APIResponse, validate_callback_url, load_description

Expand Down Expand Up @@ -34,7 +44,7 @@ def llm_callback_notification(body: APIResponse[LLMCallResponse]):
@router.post(
"/llm/call",
description=load_description("llm/llm_call.md"),
response_model=APIResponse[Message],
response_model=APIResponse[LLMJobImmediatePublic],
callbacks=llm_callback_router.routes,
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
Expand All @@ -43,22 +53,104 @@ def llm_call(
):
"""
Endpoint to initiate an LLM call as a background job.
Returns job information for polling.
"""
project_id = _current_user.project_.id
organization_id = _current_user.organization_.id

if request.callback_url:
validate_callback_url(str(request.callback_url))

start_job(
job_id = start_job(
db=session,
request=request,
project_id=project_id,
organization_id=organization_id,
)

return APIResponse.success_response(
data=Message(
message=f"Your response is being generated and will be delivered via callback."
),
# Fetch job details to return immediate response
job_crud = JobCrud(session=session)
job = job_crud.get(job_id=job_id, project_id=project_id)

if not job:
raise HTTPException(status_code=404, detail="Job not found")

if request.callback_url:
message = "Your response is being generated and will be delivered via callback."
else:
message = "Your response is being generated"

job_response = LLMJobImmediatePublic(
job_id=job.id,
status=job.status.value,
message=message,
job_inserted_at=job.created_at,
job_updated_at=job.updated_at,
)

return APIResponse.success_response(data=job_response)


@router.get(
"/llm/call/{job_id}",
description=load_description("llm/get_llm_call.md"),
response_model=APIResponse[LLMJobPublic],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def get_llm_call_status(
_current_user: AuthContextDep,
session: SessionDep,
job_id: UUID,
) -> APIResponse[LLMJobPublic]:
"""
Poll for LLM call job status and results.
Returns job information with nested LLM response when complete.
"""

project_id = _current_user.project_.id

job_crud = JobCrud(session=session)
job = job_crud.get(job_id=job_id, project_id=project_id)

if not job:
raise HTTPException(status_code=404, detail="Job not found")

llm_call_response = None
if job.status.value == JobStatus.SUCCESS:
llm_calls = get_llm_calls_by_job_id(
session=session, job_id=job_id, project_id=project_id
)

if llm_calls:
# Get the first LLM call from the list which will be the only call for the job id
# since we initially won't be using this endpoint for llm chains
llm_call = llm_calls[0]

llm_response = LLMResponse(
provider_response_id=llm_call.provider_response_id or "",
conversation_id=llm_call.conversation_id,
provider=llm_call.provider,
model=llm_call.model,
output=llm_call.content,
)

if not llm_call.usage:
raise HTTPException(
status_code=500,
detail="Completed LLM job is missing usage data",
)

llm_call_response = LLMCallResponse(
response=llm_response,
usage=Usage(**llm_call.usage),
provider_raw_response=None,
)

job_response = LLMJobPublic(
job_id=job.id,
status=job.status.value,
llm_response=llm_call_response,
error_message=job.error_message,
)

return APIResponse.success_response(data=job_response)
25 changes: 18 additions & 7 deletions backend/app/crud/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class JobCrud:
def __init__(self, session: Session):
self.session = session

def create(self, job_type: JobType, trace_id: str | None = None) -> Job:
new_job = Job(
job_type=job_type,
trace_id=trace_id,
)
def create(
self,
job_type: JobType,
trace_id: str | None = None,
project_id: int | None = None,
) -> Job:
new_job = Job(job_type=job_type, trace_id=trace_id, project_id=project_id)
self.session.add(new_job)
self.session.commit()
self.session.refresh(new_job)
Expand All @@ -38,5 +40,14 @@ def update(self, job_id: UUID, job_update: JobUpdate) -> Job:

return job

def get(self, job_id: UUID) -> Job | None:
return self.session.get(Job, job_id)
def get(self, job_id: UUID, project_id: int | None = None) -> Job | None:
job = self.session.get(Job, job_id)
if job is None:
return None
if (
project_id is not None
and job.project_id is not None
and job.project_id != project_id
):
return None
return job
11 changes: 6 additions & 5 deletions backend/app/crud/llm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import base64
import json
from uuid import UUID
from typing import Any, Literal

from uuid import UUID
from sqlmodel import Session, select

from app.core.util import now
import base64
import json
from app.models.llm import LlmCall, LLMCallRequest, ConfigBlob
from app.models.llm.request import (
TextInput,
Expand Down Expand Up @@ -234,13 +235,13 @@ def get_llm_call_by_id(


def get_llm_calls_by_job_id(
session: Session,
job_id: UUID,
session: Session, job_id: UUID, project_id: int
) -> list[LlmCall]:
statement = (
select(LlmCall)
.where(
LlmCall.job_id == job_id,
LlmCall.project_id == project_id,
LlmCall.deleted_at.is_(None),
)
.order_by(LlmCall.created_at.desc())
Expand Down
3 changes: 3 additions & 0 deletions backend/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@
LLMChainRequest,
LLMChainResponse,
LlmChain,
LLMJobBasePublic,
LLMJobImmediatePublic,
LLMJobPublic,
)

from .message import Message
Expand Down
5 changes: 5 additions & 0 deletions backend/app/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class Job(SQLModel, table=True):
description="Tracing ID for correlating logs and traces.",
sa_column_kwargs={"comment": "Tracing ID for correlating logs and traces"},
)
project_id: int | None = Field(
default=None,
description="Project ID of the project the job belongs to.",
sa_column_kwargs={"comment": "Project ID of the job's project"},
)
error_message: str | None = Field(
default=None,
description="Error details if the job fails.",
Expand Down
3 changes: 3 additions & 0 deletions backend/app/models/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@
AudioOutput,
LLMChainResponse,
IntermediateChainResponse,
LLMJobBasePublic,
LLMJobImmediatePublic,
LLMJobPublic,
)
Loading
Loading