Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement custom storage for orgs #2093

Open
wants to merge 69 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
6253106
Add back custom storage endpoints
tw4l Sep 20, 2024
4b88b32
Flush out tests for setting custom storage
tw4l Sep 20, 2024
0a90002
Fix test issue with bucket not existing for now
tw4l Sep 20, 2024
5b2a31c
Add additional tests
tw4l Sep 23, 2024
ac11a24
Fix custom storage so it works as expected
tw4l Sep 24, 2024
7b2ac32
Actually unset custom replica storage before deleting
tw4l Sep 24, 2024
26d5cc4
Add TODO where custom storage deletion is failing
tw4l Sep 24, 2024
3c2dbf8
Fix check for whether storage label is in use
tw4l Sep 24, 2024
6d75066
Remove todo on endpoint that's fine
tw4l Sep 24, 2024
8eb4d99
Add todos re: tasks necessary to change storage
tw4l Sep 24, 2024
89f1064
Check that no crawls are running before updating storage
tw4l Sep 25, 2024
f9c81e5
Start adding post-storage update logic
tw4l Sep 25, 2024
371cf27
WIP: Add background job to copy old s3 bucket to new
tw4l Sep 25, 2024
b8609cd
WIP: Start adding logic to handle replica location updates
tw4l Sep 25, 2024
9b6c3ae
Add additional note
tw4l Sep 25, 2024
587952c
Fix argument
tw4l Sep 25, 2024
ede158e
Fix another argument
tw4l Sep 25, 2024
5627130
Fixups
tw4l Sep 25, 2024
7eb0f95
Fix linting
tw4l Sep 25, 2024
dc87351
More linting fixes
tw4l Sep 25, 2024
f1045f6
Refactor, seperate storage and replicas updates
tw4l Sep 26, 2024
2a746b0
More refactoring
tw4l Sep 26, 2024
03060dd
Make post-update task methods private
tw4l Sep 26, 2024
dd58cdb
Check if any bg jobs running before changing storage
tw4l Sep 26, 2024
8ef2b2d
Check bg job finished as well
tw4l Sep 26, 2024
8c12e26
Fixups
tw4l Sep 26, 2024
92d08b2
Storage update improvements
tw4l Sep 26, 2024
824a439
Fixup
tw4l Sep 26, 2024
ce01439
Remove TODO
tw4l Sep 26, 2024
3949a86
Remove another todo
tw4l Sep 26, 2024
9e0c8df
More fixups
tw4l Sep 26, 2024
e8fa3bb
Add provider to s3storage for rclone
tw4l Sep 26, 2024
ba9007f
Fix typo
tw4l Sep 26, 2024
4ba7e56
Make API endpoints that change storage superuser-only for now
tw4l Sep 30, 2024
6e9a0c6
Add typing for init_storages_api, import Callable
tw4l Sep 30, 2024
85d34d1
Add missing User import
tw4l Sep 30, 2024
7ea417a
Fix StorageOps in operator main
tw4l Oct 1, 2024
d41d5c8
Always use oid prefix in s3 storage
tw4l Oct 1, 2024
87ee14a
Post-rebase fixups and remove create bucket fallback
tw4l Oct 10, 2024
cf8a0d5
Create extra test buckets in CI
tw4l Oct 15, 2024
0c85a63
Add test for non-verified custom storage
tw4l Oct 15, 2024
6f7b09a
Refactor to move updates to FastAPI background tasks
tw4l Oct 15, 2024
336fac8
Include default replicas in /storage response if no org replicas
tw4l Oct 15, 2024
9879119
Fix unsetting of presigned URLs
tw4l Oct 16, 2024
f2fcba6
Add --progress flag to rclone copy command
tw4l Oct 16, 2024
6351ebd
Increase ttl seconds after finished for testing on dev
tw4l Oct 17, 2024
e17a750
Ensure there are no double slashes between bucket name and oid
tw4l Oct 17, 2024
e3b53e6
Increase memory limit/request for copy job to 500Mi
tw4l Oct 17, 2024
a3a905a
Reduce copy job ttlSecondsAfterFinished to 60
tw4l Oct 17, 2024
fa9315f
Add storage tag to API endpoints
tw4l Oct 17, 2024
706c31f
Add flags to rclone to reduce memory usage, set limit to 350Mi
tw4l Oct 17, 2024
aab19f9
Fix positional operator in storage ref update
tw4l Oct 17, 2024
dcfb1b7
One more positional operator fix
tw4l Oct 17, 2024
e9c2aa5
Update docstrings and comments
tw4l Oct 17, 2024
35827e6
Make all-storages response valid JSON with response model
tw4l Oct 17, 2024
3597d01
Add admin docs for storage
tw4l Oct 17, 2024
e361526
Fix API endpoint path in docs example
tw4l Oct 17, 2024
29fbe6f
Docs typo fix
tw4l Oct 17, 2024
adf5fc3
Add provider field note
tw4l Oct 17, 2024
6e0b547
Docs language cleanup
tw4l Oct 17, 2024
b4b1443
Check /all-storages in backend tests
tw4l Oct 17, 2024
63ca2b9
Add API endpoint for background job progress
tw4l Oct 18, 2024
de52804
Fix linting
tw4l Oct 18, 2024
0e2debb
Format post-rebase with Black
tw4l Dec 3, 2024
731526a
Format with Black
tw4l Jan 24, 2025
92ecc3f
Fix linting
tw4l Jan 24, 2025
ff37866
Fix linking
tw4l Feb 24, 2025
bb2291d
Cast job to CopyBucketJob
tw4l Feb 24, 2025
9c3b053
Fix type errors from rebase
tw4l Feb 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/k3d-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ jobs:
- name: Wait for all pods to be ready
run: kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
timeout-minutes: 30
run: pytest -vv ./backend/test/test_*.py
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/microk8s-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ jobs:
- name: Wait for all pods to be ready
run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
run: pytest -vv ./backend/test/test_*.py

Expand Down
181 changes: 169 additions & 12 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
from .models import (
BaseFile,
Organization,
BackgroundJob,
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
CopyBucketJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
User,
SuccessResponse,
SuccessResponseId,
JobProgress,
BackgroundJob,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import dt_now
Expand All @@ -43,7 +45,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes, too-many-lines, too-many-return-statements, too-many-public-methods
class BackgroundJobOps:
"""k8s background job management"""

Expand All @@ -56,7 +58,7 @@ class BackgroundJobOps:

migration_jobs_scale: int

# pylint: disable=too-many-locals, too-many-arguments, invalid-name
# pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments, invalid-name

def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
self.jobs = mdb["jobs"]
Expand Down Expand Up @@ -302,7 +304,7 @@ async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
) -> str:
"""Create background job to delete org and its data"""

try:
Expand Down Expand Up @@ -339,7 +341,7 @@ async def create_delete_org_job(
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None
return ""

async def create_recalculate_org_stats_job(
self,
Expand Down Expand Up @@ -473,6 +475,73 @@ async def create_optimize_crawl_pages_job(
print(f"warning: optimize pages job could not be started: {exc}")
return None

async def create_copy_bucket_job(
self,
org: Organization,
prev_storage_ref: StorageRef,
new_storage_ref: StorageRef,
existing_job_id: Optional[str] = None,
) -> str:
"""Start background job to copy entire s3 bucket and return job id"""
prev_storage = self.storage_ops.get_org_storage_by_ref(org, prev_storage_ref)
prev_endpoint, prev_bucket = self.strip_bucket(prev_storage.endpoint_url)

new_storage = self.storage_ops.get_org_storage_by_ref(org, new_storage_ref)
new_endpoint, new_bucket = self.strip_bucket(new_storage.endpoint_url)

# Ensure buckets terminate with trailing slash
prev_bucket = os.path.join(prev_bucket, "")
new_bucket = os.path.join(new_bucket, "")

job_type = BgJobType.COPY_BUCKET.value

try:
job_id = await self.crawl_manager.run_copy_bucket_job(
oid=str(org.id),
job_type=job_type,
prev_storage=prev_storage_ref,
prev_endpoint=prev_endpoint,
prev_bucket=prev_bucket,
new_storage=new_storage_ref,
new_endpoint=new_endpoint,
new_bucket=new_bucket,
job_id_prefix=f"{job_type}-{org.id}",
existing_job_id=existing_job_id,
)
if existing_job_id:
copy_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": copy_job.started,
"finished": copy_job.finished,
}
if copy_job.previousAttempts:
copy_job.previousAttempts.append(previous_attempt)
else:
copy_job.previousAttempts = [previous_attempt]
copy_job.started = dt_now()
copy_job.finished = None
copy_job.success = None
else:
copy_job = CopyBucketJob(
id=job_id,
oid=org.id,
started=dt_now(),
prev_storage=prev_storage_ref,
new_storage=new_storage_ref,
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": copy_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
print(
f"warning: copy bucket job could not be started for org {org.id}: {exc}"
)
return ""

async def job_finished(
self,
job_id: str,
Expand All @@ -498,6 +567,9 @@ async def job_finished(
await self.handle_delete_replica_job_finished(
cast(DeleteReplicaJob, job)
)
if job_type == BgJobType.COPY_BUCKET and job.oid:
org = await self.org_ops.get_org_by_id(job.oid)
await self.org_ops.update_read_only(org, False)
else:
print(
f"Background job {job.id} failed, sending email to superuser",
Expand Down Expand Up @@ -528,6 +600,9 @@ async def get_background_job(
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
CopyBucketJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
]:
Expand All @@ -544,33 +619,84 @@ async def get_background_job(

def _get_job_by_type_from_data(self, data: dict[str, object]):
"""convert dict to propert background job type"""
if data["type"] == BgJobType.CREATE_REPLICA:
if data["type"] == BgJobType.CREATE_REPLICA.value:
return CreateReplicaJob.from_dict(data)

if data["type"] == BgJobType.DELETE_REPLICA:
if data["type"] == BgJobType.DELETE_REPLICA.value:
return DeleteReplicaJob.from_dict(data)

if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
if data["type"] == BgJobType.RECALCULATE_ORG_STATS.value:
return RecalculateOrgStatsJob.from_dict(data)

if data["type"] == BgJobType.READD_ORG_PAGES:
if data["type"] == BgJobType.READD_ORG_PAGES.value:
return ReAddOrgPagesJob.from_dict(data)

if data["type"] == BgJobType.OPTIMIZE_PAGES:
if data["type"] == BgJobType.OPTIMIZE_PAGES.value:
return OptimizePagesJob.from_dict(data)

if data["type"] == BgJobType.COPY_BUCKET.value:
return CopyBucketJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def get_job_progress(self, job_id: str) -> JobProgress:
"""Return progress of background job for supported types"""
job = await self.get_background_job(job_id)

if job.type != BgJobType.COPY_BUCKET:
raise HTTPException(status_code=403, detail="job_type_not_supported")

if job.success is False:
raise HTTPException(status_code=400, detail="job_failed")

if job.finished:
return JobProgress(percentage=1.0)

log_tail = await self.crawl_manager.tail_background_job(job_id)
if not log_tail:
raise HTTPException(status_code=400, detail="job_log_not_available")

lines = log_tail.splitlines()
reversed_lines = list(reversed(lines))

progress = JobProgress(percentage=0.0)

# Parse lines in reverse order until we find one with latest stats
for line in reversed_lines:
try:
if "ETA" not in line:
continue

stats_groups = line.split(",")
for group in stats_groups:
group = group.strip()
if "%" in group:
progress.percentage = float(group.strip("%")) / 100
if "ETA" in group:
eta_str = group.strip("ETA ")
# Split on white space to remove byte mark rclone sometimes
# adds to end of stats line
eta_list = eta_str.split(" ")
progress.eta = eta_list[0]

break
# pylint: disable=bare-except
except:
continue

return progress

async def list_background_jobs(
self,
org: Optional[Organization] = None,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
job_type: Optional[str] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[int] = -1,
) -> Tuple[List[BackgroundJob], int]:
) -> Tuple[List[Union[CreateReplicaJob, DeleteReplicaJob, CopyBucketJob]], int]:
"""List all background jobs"""
# pylint: disable=duplicate-code
# Zero-index page for query
Expand All @@ -585,6 +711,12 @@ async def list_background_jobs(
if success in (True, False):
query["success"] = success

if running:
query["success"] = None

if running is False:
query["success"] = {"$in": [True, False]}

if job_type:
query["type"] = job_type

Expand Down Expand Up @@ -676,6 +808,7 @@ async def retry_org_background_job(
self, job: BackgroundJob, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job specific to one org"""
# pylint: disable=too-many-return-statements
if job.type == BgJobType.CREATE_REPLICA:
job = cast(CreateReplicaJob, job)
file = await self.get_replica_job_file(job, org)
Expand Down Expand Up @@ -736,6 +869,16 @@ async def retry_org_background_job(
)
return {"success": True}

if job.type == BgJobType.COPY_BUCKET:
job = cast(CopyBucketJob, job)
await self.create_copy_bucket_job(
org,
job.prev_storage,
job.new_storage,
existing_job_id=job.id,
)
return {"success": True}

return {"success": False}

async def retry_failed_org_background_jobs(
Expand Down Expand Up @@ -773,7 +916,7 @@ async def retry_all_failed_background_jobs(


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments
def init_background_jobs_api(
app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep
):
Expand All @@ -800,6 +943,18 @@ async def get_org_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@router.get(
"/{job_id}/progress",
response_model=JobProgress,
)
async def get_job_progress(
job_id: str,
# pylint: disable=unused-argument
org: Organization = Depends(org_crawl_dep),
):
"""Return progress information for background job"""
return await ops.get_job_progress(job_id)

@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
Expand Down Expand Up @@ -894,6 +1049,7 @@ async def list_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
Expand All @@ -904,6 +1060,7 @@ async def list_background_jobs(
page_size=pageSize,
page=page,
success=success,
running=running,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches, too-many-positional-arguments
class BaseCrawlOps:
"""operations that apply to all crawls"""

Expand Down
1 change: 1 addition & 0 deletions backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@


# ============================================================================
# pylint: disable=too-many-positional-arguments
class CollectionOps:
"""ops for working with named collections of crawls"""

Expand Down
4 changes: 2 additions & 2 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
class CrawlConfigOps:
"""Crawl Config Operations"""

# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-positional-arguments

user_manager: UserManager
org_ops: OrgOps
Expand Down Expand Up @@ -1105,7 +1105,7 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID):


# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments,too-many-positional-arguments
def init_crawl_config_api(
app,
dbclient,
Expand Down
Loading
Loading