diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index 1e148458de..09506ab23b 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -8,114 +8,112 @@ import math import os import time - -from uuid import UUID, uuid4 +from calendar import c from tempfile import NamedTemporaryFile - from typing import ( - Awaitable, - Optional, TYPE_CHECKING, - Dict, + Any, + AsyncGenerator, + Awaitable, Callable, + Dict, List, Literal, - AsyncGenerator, - Any, + Optional, ) +from uuid import UUID, uuid4 +import json_stream +from aiostream import stream +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import StreamingResponse from motor.motor_asyncio import AsyncIOMotorDatabase from pydantic import ValidationError from pymongo import ReturnDocument from pymongo.collation import Collation from pymongo.errors import AutoReconnect, DuplicateKeyError -from fastapi import APIRouter, Depends, HTTPException, Request -from fastapi.responses import StreamingResponse -import json_stream -from aiostream import stream - from .models import ( - SUCCESSFUL_STATES, + ACTIVE, + MAX_BROWSER_WINDOWS, + MAX_CRAWL_SCALE, + PAUSED_PAYMENT_FAILED, + REASON_PAUSED, RUNNING_STATES, + SUCCESSFUL_STATES, WAITING_STATES, + AddedResponse, + AddedResponseId, + AddToOrgRequest, BaseCrawl, + Collection, + ConfigRevision, + Crawl, + CrawlConfig, + CrawlConfigDefaults, + DeleteCrawlList, + DeletedResponseId, + InvitePending, + InviteToOrgRequest, + OrgAcceptInviteResponse, Organization, - PlansResponse, - StorageRef, + OrgCreate, + OrgDeleteInviteResponse, + OrgImportResponse, + OrgInviteResponse, + OrgMetrics, + OrgOut, + OrgOutExport, + OrgProxies, + OrgPublicProfileUpdate, OrgQuotas, OrgQuotasIn, OrgQuotaUpdate, - OrgReadOnlyUpdate, OrgReadOnlyOnCancel, - OrgMetrics, + OrgReadOnlyUpdate, + OrgSlugsResponse, OrgWebhookUrls, - OrgCreate, - OrgProxies, - Subscription, - SubscriptionUpdate, - SubscriptionCancel, - RenameOrg, - UpdateRole, - RemovePendingInvite, - RemoveFromOrg, - AddToOrgRequest, - InvitePending, - InviteToOrgRequest, - UserRole, - User, + PageWithAllQA, PaginatedInvitePendingResponse, PaginatedOrgOutResponse, - CrawlConfig, - Crawl, - CrawlConfigDefaults, - UploadedCrawl, - ConfigRevision, + PlansResponse, Profile, - Collection, - OrgOut, - OrgOutExport, - PageWithAllQA, - DeleteCrawlList, - PAUSED_PAYMENT_FAILED, - REASON_PAUSED, - ACTIVE, - DeletedResponseId, - UpdatedResponse, - AddedResponse, - AddedResponseId, - SuccessResponseId, - OrgInviteResponse, - OrgAcceptInviteResponse, - OrgDeleteInviteResponse, RemovedResponse, - OrgSlugsResponse, - OrgImportResponse, - OrgPublicProfileUpdate, - MAX_BROWSER_WINDOWS, - MAX_CRAWL_SCALE, + RemoveFromOrg, + RemovePendingInvite, + RenameOrg, + StorageRef, + Subscription, + SubscriptionCancel, + SubscriptionUpdate, + SuccessResponseId, + UpdatedResponse, + UpdateRole, + UploadedCrawl, + User, + UserRole, ) from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .utils import ( + JSONSerializer, + browser_windows_from_scale, dt_now, - slug_from_name, - validate_slug, get_duplicate_key_error_field, + slug_from_name, validate_language_code, - JSONSerializer, - browser_windows_from_scale, + validate_slug, ) if TYPE_CHECKING: - from .invites import InviteOps + from .background_jobs import BackgroundJobOps from .basecrawls import BaseCrawlOps from .colls import CollectionOps + from .crawlmanager import CrawlManager + from .file_uploads import FileUploadOps + from .invites import InviteOps + from .pages import PageOps from .profiles import ProfileOps from .users import UserManager - from .background_jobs import BackgroundJobOps - from .pages import PageOps - from .file_uploads import FileUploadOps - from .crawlmanager import CrawlManager else: InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object BackgroundJobOps = UserManager = PageOps = FileUploadOps = CrawlManager = object @@ -628,74 +626,131 @@ async def update_quotas( quotas.context = None - previous_extra_mins = ( - org.quotas.extraExecMinutes - if (org.quotas and org.quotas.extraExecMinutes) - else 0 - ) - previous_gifted_mins = ( - org.quotas.giftedExecMinutes - if (org.quotas and org.quotas.giftedExecMinutes) - else 0 - ) - - if mode == "add": - increment_update: dict[str, Any] = { - "$inc": {}, + update: list[dict[str, Any]] = [ + { + "$set": { + "quotaUpdates": { + "$concatArrays": [ + "$quotaUpdates", + [{"modified": dt_now(), "update": {}}], + ] + }, + } } + ] - for field, value in quotas.model_dump( - exclude_unset=True, exclude_defaults=True, exclude_none=True - ).items(): - if field == "context" or value is None: - continue - inc = max(value, -org.quotas.model_dump().get(field, 0)) - increment_update["$inc"][f"quotas.{field}"] = inc + computed_quotas = {} - updated_org = await self.orgs.find_one_and_update( - {"_id": org.id}, - increment_update, - projection={"quotas": True}, - return_document=ReturnDocument.AFTER, - ) - quotas = OrgQuotasIn(**updated_org["quotas"]) - - update: dict[str, dict[str, dict[str, Any] | int]] = { - "$push": { - "quotaUpdates": OrgQuotaUpdate( - modified=dt_now(), - update=OrgQuotas( - **quotas.model_dump( - exclude_unset=True, exclude_defaults=True, exclude_none=True - ) - ), - context=context, - ).model_dump() - }, - "$inc": {}, - "$set": {}, - } - - if mode == "set": + if mode == "add": + update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0][ + "context" + ] = context + for field, value in quotas.model_dump().items(): + if field == "context": + continue + if value is None: + # set value of field in pushed update to current value in `quotas` + update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][ + field + ] = f"$quotas.{field}" + computed_quotas[field] = f"$quotas.{field}" + continue + new_value = { + "$add": [ + { + "$cond": { + "if": { + "$gt": [ + {"$multiply": [f"$quotas.{field}", -1]}, + value, + ] + }, + "then": {"$multiply": [f"$quotas.{field}", -1]}, + "else": value, + } + }, + f"$quotas.{field}", + ] + } + # set value of field in pushed update to current value in quotas + increment + update[0]["$set"][f"quotas.{field}"] = new_value + update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][ + field + ] = new_value + computed_quotas[field] = new_value + + elif mode == "set": increment_update = quotas.model_dump( exclude_unset=True, exclude_defaults=True, exclude_none=True ) - update["$set"]["quotas"] = increment_update + for field, value in increment_update.items(): + update[0]["$set"][f"quotas.{field}"] = value + computed_quotas[field] = value + update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0] = OrgQuotaUpdate( + modified=dt_now(), + update=OrgQuotas( + **quotas.model_dump( + exclude_unset=True, exclude_defaults=True, exclude_none=True + ) + ), + context=context, + ).model_dump() # Inc org available fields for extra/gifted execution time as needed - if quotas.extraExecMinutes is not None: - extra_secs_diff = (quotas.extraExecMinutes - previous_extra_mins) * 60 - if org.extraExecSecondsAvailable + extra_secs_diff <= 0: - update["$set"]["extraExecSecondsAvailable"] = 0 - else: - update["$inc"]["extraExecSecondsAvailable"] = extra_secs_diff + for extra_or_gifted in ["extra", "gifted"]: + previous_mins = { + "$cond": { + "if": { + "$or": [ + {"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", 0]}, + {"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", None]}, + ] + }, + "then": f"$quotas.{extra_or_gifted}ExecMinutes", + "else": 0, + } + } - if quotas.giftedExecMinutes is not None: - gifted_secs_diff = (quotas.giftedExecMinutes - previous_gifted_mins) * 60 - if org.giftedExecSecondsAvailable + gifted_secs_diff <= 0: - update["$set"]["giftedExecSecondsAvailable"] = 0 - else: - update["$inc"]["giftedExecSecondsAvailable"] = gifted_secs_diff + secs_diff = { + "$multiply": [ + { + "$subtract": [ + computed_quotas[f"{extra_or_gifted}ExecMinutes"], + previous_mins, + ] + }, + 60, + ] + } + + update[0]["$set"][f"{extra_or_gifted}ExecSecondsAvailable"] = { + "$cond": { + "if": {"$ne": [f"${extra_or_gifted}ExecSecondsAvailable", None]}, + "then": { + "$cond": { + "if": { + "$lte": [ + { + "$add": [ + f"${extra_or_gifted}ExecSecondsAvailable", + secs_diff, + ] + }, + 0, + ] + }, + "then": 0, + "else": { + "$add": [ + f"${extra_or_gifted}ExecSecondsAvailable", + secs_diff, + ] + }, + } + }, + "else": f"${extra_or_gifted}ExecSecondsAvailable", + } + } await self.orgs.find_one_and_update({"_id": org.id}, update)