-
Notifications
You must be signed in to change notification settings - Fork 10
Refactor Collection Creation for no delay #243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
47a9b89
f3494ec
e30f5d8
60c1afd
c0ae5a3
8a2b80f
e15ed29
e961d10
56c5166
230d6f1
c89ead5
f08dbfe
0a8f0e0
803c534
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| """Add organization_id, project_id, status, and updated_at to Collection | ||
|
|
||
| Revision ID: 75b5156a28fd | ||
| Revises: 8757b005d681 | ||
| Create Date: 2025-06-19 15:38:02.609786 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| import sqlmodel.sql.sqltypes | ||
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "75b5156a28fd" | ||
| down_revision = "8757b005d681" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade(): | ||
| op.add_column( | ||
| "collection", sa.Column("organization_id", sa.Integer(), nullable=False) | ||
| ) | ||
| op.add_column("collection", sa.Column("project_id", sa.Integer(), nullable=True)) | ||
| op.add_column( | ||
| "collection", | ||
| sa.Column("status", sqlmodel.sql.sqltypes.AutoString(), nullable=True), | ||
| ) | ||
| op.add_column("collection", sa.Column("updated_at", sa.DateTime(), nullable=False)) | ||
| op.alter_column( | ||
| "collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=True | ||
| ) | ||
| op.alter_column( | ||
| "collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=True | ||
| ) | ||
|
Comment on lines
+31
to
+
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these were already there in table why we need them again?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they are not being added again, they are getting altered from non-nullable to nullable columns
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok cool |
||
| op.create_foreign_key( | ||
| None, "collection", "project", ["project_id"], ["id"], ondelete="SET NULL" | ||
| ) | ||
| op.create_foreign_key( | ||
| None, | ||
| "collection", | ||
| "organization", | ||
| ["organization_id"], | ||
| ["id"], | ||
| ondelete="CASCADE", | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| op.drop_constraint(None, "collection", type_="foreignkey") | ||
| op.drop_constraint(None, "collection", type_="foreignkey") | ||
| op.alter_column( | ||
| "collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=False | ||
| ) | ||
| op.alter_column( | ||
| "collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=False | ||
| ) | ||
| op.drop_column("collection", "updated_at") | ||
| op.drop_column("collection", "status") | ||
| op.drop_column("collection", "project_id") | ||
| op.drop_column("collection", "organization_id") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -107,6 +107,30 @@ def get_current_user_org( | |
| CurrentUserOrg = Annotated[UserOrganization, Depends(get_current_user_org)] | ||
|
|
||
|
|
||
| def get_current_user_org_project( | ||
| current_user: CurrentUser, session: SessionDep, request: Request | ||
| ) -> UserProjectOrg: | ||
| api_key = request.headers.get("X-API-KEY") | ||
| organization_id = None | ||
| project_id = None | ||
|
|
||
| if api_key: | ||
| api_key_record = get_api_key_by_value(session, api_key) | ||
| if api_key_record: | ||
| validate_organization(session, api_key_record.organization_id) | ||
| organization_id = api_key_record.organization_id | ||
| project_id = api_key_record.project_id | ||
|
|
||
| return UserProjectOrg( | ||
| **current_user.model_dump(), | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
| ) | ||
|
|
||
|
|
||
| CurrentUserOrgproject = Annotated[UserProjectOrg, Depends(get_current_user_org_project)] | ||
nishika26 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def get_current_active_superuser(current_user: CurrentUser) -> User: | ||
| if not current_user.is_superuser: | ||
| raise HTTPException( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import inspect | ||
| import logging | ||
| import time | ||
| import warnings | ||
| from uuid import UUID, uuid4 | ||
| from typing import Any, List, Optional | ||
|
|
@@ -11,7 +12,7 @@ | |
| from pydantic import BaseModel, Field, HttpUrl | ||
| from sqlalchemy.exc import NoResultFound, MultipleResultsFound, SQLAlchemyError | ||
|
|
||
| from app.api.deps import CurrentUser, SessionDep | ||
| from app.api.deps import CurrentUser, SessionDep, CurrentUserOrgproject | ||
| from app.core.cloud import AmazonCloudStorage | ||
| from app.core.config import settings | ||
| from app.core.util import now, raise_from_unknown, post_callback | ||
|
|
@@ -173,59 +174,76 @@ | |
| request: CreationRequest, | ||
| payload: ResponsePayload, | ||
| ): | ||
| start_time = time.time() | ||
| client = OpenAI(api_key=settings.OPENAI_API_KEY) | ||
| if request.callback_url is None: | ||
| callback = SilentCallback(payload) | ||
| else: | ||
| callback = WebHookCallback(request.callback_url, payload) | ||
|
|
||
| # | ||
| # Create the assistant and vector store | ||
| # | ||
| callback = ( | ||
| SilentCallback(payload) | ||
| if request.callback_url is None | ||
| else WebHookCallback(request.callback_url, payload) | ||
| ) | ||
|
|
||
| vector_store_crud = OpenAIVectorStoreCrud(client) | ||
| try: | ||
| vector_store = vector_store_crud.create() | ||
| except OpenAIError as err: | ||
| callback.fail(str(err)) | ||
| callback.fail(f"Vector store creation failed: {err}") | ||
| return | ||
|
|
||
| storage = AmazonCloudStorage(current_user) | ||
| document_crud = DocumentCrud(session, current_user.id) | ||
| assistant_crud = OpenAIAssistantCrud(client) | ||
|
|
||
| docs = request(document_crud) | ||
| kwargs = dict(request.extract_super_type(AssistantOptions)) | ||
| try: | ||
| updates = vector_store_crud.update(vector_store.id, storage, docs) | ||
| documents = list(updates) | ||
| assistant = assistant_crud.create(vector_store.id, **kwargs) | ||
| except Exception as err: # blanket to handle SQL and OpenAI errors | ||
| logging.error(f"File Search setup error: {err} ({type(err).__name__})") | ||
| docs = list(request(document_crud)) | ||
| flat_docs = [doc for sublist in docs for doc in sublist] | ||
| except Exception as err: | ||
| logging.error(f"[Document Fetch Error] {err}") | ||
| callback.fail(f"Document fetch failed: {err}") | ||
| return | ||
|
|
||
| # Step 3: Collect file metadata | ||
| file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname} | ||
| file_sizes_kb = [ | ||
| storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs | ||
| ] | ||
|
|
||
| try: | ||
| vector_store_crud.update(vector_store.id, storage, iter(docs)) | ||
| assistant = assistant_crud.create( | ||
| vector_store.id, **dict(request.extract_super_type(AssistantOptions)) | ||
| ) | ||
|
||
| except Exception as err: | ||
| logging.error(f"[Assistant/Vector Update Error] {err}") | ||
| vector_store_crud.delete(vector_store.id) | ||
| callback.fail(str(err)) | ||
| return | ||
|
|
||
| # | ||
| # Store the results | ||
| # | ||
|
|
||
| collection_crud = CollectionCrud(session, current_user.id) | ||
| collection = Collection( | ||
| id=UUID(payload.key), | ||
| llm_service_id=assistant.id, | ||
| llm_service_name=request.model, | ||
| ) | ||
| try: | ||
| collection_crud.create(collection, documents) | ||
| collection = collection_crud.read_one(UUID(payload.key)) | ||
| collection.llm_service_id = assistant.id | ||
| collection.llm_service_name = request.model | ||
| collection.status = "Successful" | ||
| collection.updated_at = now() | ||
|
|
||
| if flat_docs: | ||
| logging.info( | ||
| f"[DocumentCollection] Linking {len(flat_docs)} documents to collection {collection.id}" | ||
| ) | ||
| DocumentCollectionCrud(session).create(collection, flat_docs) | ||
|
|
||
| collection_crud._update(collection) | ||
| except SQLAlchemyError as err: | ||
| _backout(assistant_crud, assistant.id) | ||
| logging.error(f"[Collection Save Error] {err}") | ||
| callback.fail(str(err)) | ||
| return | ||
|
|
||
| # | ||
| # Send back successful response | ||
| # | ||
| elapsed = time.time() - start_time | ||
| logging.info( | ||
| f"Collection created: {collection.id} | Time: {elapsed:.2f}s | " | ||
| f"Files: {len(flat_docs)} | Sizes: {file_sizes_kb} KB | Types: {list(file_exts)}" | ||
| ) | ||
|
|
||
| callback.success(collection.model_dump(mode="json")) | ||
|
|
||
|
|
@@ -236,14 +254,26 @@ | |
| ) | ||
| def create_collection( | ||
| session: SessionDep, | ||
| current_user: CurrentUser, | ||
| current_user: CurrentUserOrgproject, | ||
| request: CreationRequest, | ||
| background_tasks: BackgroundTasks, | ||
| ): | ||
| this = inspect.currentframe() | ||
| route = router.url_path_for(this.f_code.co_name) | ||
| payload = ResponsePayload("processing", route) | ||
|
|
||
| collection = Collection( | ||
| id=UUID(payload.key), | ||
| owner_id=current_user.id, | ||
| organization_id=current_user.organization_id, | ||
| project_id=current_user.project_id, | ||
| status="processing", | ||
| ) | ||
|
|
||
| collection_crud = CollectionCrud(session, current_user.id) | ||
| collection_crud.create(collection) | ||
|
|
||
| # 2. Launch background task | ||
| background_tasks.add_task( | ||
| do_create_collection, | ||
| session, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,27 +1,46 @@ | ||
| from uuid import UUID, uuid4 | ||
| from datetime import datetime | ||
| from typing import Optional | ||
|
|
||
| from sqlmodel import Field, Relationship, SQLModel | ||
|
|
||
| from app.core.util import now | ||
| from .user import User | ||
| from .organization import Organization | ||
| from .project import Project | ||
| from enum import Enum | ||
|
|
||
|
|
||
| class Collection(SQLModel, table=True): | ||
| id: UUID = Field( | ||
| default_factory=uuid4, | ||
| primary_key=True, | ||
| ) | ||
| id: UUID = Field(default_factory=uuid4, primary_key=True) | ||
|
|
||
| owner_id: int = Field( | ||
| foreign_key="user.id", | ||
| nullable=False, | ||
| ondelete="CASCADE", | ||
| ) | ||
| llm_service_id: str | ||
| llm_service_name: str | ||
| created_at: datetime = Field( | ||
| default_factory=now, | ||
|
|
||
| organization_id: int = Field( | ||
| foreign_key="organization.id", | ||
| nullable=False, | ||
| ondelete="CASCADE", | ||
| ) | ||
|
|
||
| project_id: int = Field( | ||
nishika26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| foreign_key="project.id", | ||
| nullable=True, | ||
| ondelete="SET NULL", | ||
| ) | ||
| deleted_at: datetime | None | ||
|
|
||
| llm_service_id: Optional[str] = Field(default=None, nullable=True) | ||
| llm_service_name: Optional[str] = Field(default=None, nullable=True) | ||
|
|
||
| status: Optional[str] = None | ||
|
|
||
| created_at: datetime = Field(default_factory=now) | ||
| updated_at: datetime = Field(default_factory=now) | ||
| deleted_at: Optional[datetime] = None | ||
|
|
||
| owner: User = Relationship(back_populates="collections") | ||
| organization: Organization = Relationship(back_populates="collections") | ||
| project: Project = Relationship(back_populates="collections") | ||
Uh oh!
There was an error while loading. Please reload this page.