Skip to content

Commit 0a7c9af

Browse files
ikreymertw4l
andcommitted
refactor to add 'pendingSize' to crawl which unambiguously stores the… (#3013)
… pending, un-uploaded size - use pending size to determine if quota reached - also request pause to be set before assuming paused state - also ensure data is actually committed before shutting down pods (in case of any edge cases) - clear paused flag in redis after crawler pods shutdown - add OpCrawlStats to avoid adding unnecessary profile_update to public API this assumes changes in crawler to support: clearing size after WACZ upload, ensure upload happens if pod starts when crawl is paused --------- Co-authored-by: Tessa Walsh <[email protected]>
1 parent 246470e commit 0a7c9af

File tree

4 files changed

+90
-95
lines changed

4 files changed

+90
-95
lines changed

backend/btrixcloud/crawls.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -370,21 +370,7 @@ async def get_active_crawls_size(self, oid: UUID) -> int:
370370
cursor = self.crawls.aggregate(
371371
[
372372
{"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}},
373-
{"$group": {"_id": None, "totalSum": {"$sum": "$stats.size"}}},
374-
]
375-
)
376-
results = await cursor.to_list(length=1)
377-
if not results:
378-
return 0
379-
380-
return results[0].get("totalSum") or 0
381-
382-
async def get_active_crawls_uploaded_wacz_size(self, oid: UUID) -> int:
383-
"""get size of all waczs already uploaded for running/paused crawls"""
384-
cursor = self.crawls.aggregate(
385-
[
386-
{"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}},
387-
{"$group": {"_id": None, "totalSum": {"$sum": "$fileSize"}}},
373+
{"$group": {"_id": None, "totalSum": {"$sum": "$pendingSize"}}},
388374
]
389375
)
390376
results = await cursor.to_list(length=1)
@@ -669,14 +655,16 @@ async def update_crawl_state_if_allowed(
669655
return res is not None
670656

671657
async def update_running_crawl_stats(
672-
self, crawl_id: str, is_qa: bool, stats: CrawlStats
658+
self, crawl_id: str, is_qa: bool, stats: CrawlStats, pending_size: int
673659
) -> bool:
674660
"""update running crawl stats"""
675661
prefix = "" if not is_qa else "qa."
676662
query = {"_id": crawl_id, "type": "crawl", f"{prefix}state": "running"}
677-
res = await self.crawls.find_one_and_update(
678-
query, {"$set": {f"{prefix}stats": stats.dict()}}
679-
)
663+
update: dict[str, dict | int] = {f"{prefix}stats": stats.dict()}
664+
if not is_qa:
665+
update["pendingSize"] = pending_size
666+
667+
res = await self.crawls.find_one_and_update(query, {"$set": update})
680668
return res is not None
681669

682670
async def inc_crawl_exec_time(

backend/btrixcloud/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,6 @@ class CrawlStats(BaseModel):
304304
done: int = 0
305305
size: int = 0
306306

307-
profile_update: Optional[str] = ""
308-
309307

310308
# ============================================================================
311309

@@ -907,6 +905,7 @@ class CrawlOut(BaseMongoModel):
907905

908906
fileSize: int = 0
909907
fileCount: int = 0
908+
pendingSize: int = 0
910909

911910
tags: Optional[List[str]] = []
912911

@@ -1091,6 +1090,8 @@ class Crawl(BaseCrawl, CrawlConfigCore):
10911090
qa: Optional[QARun] = None
10921091
qaFinished: Optional[Dict[str, QARun]] = {}
10931092

1093+
pendingSize: int = 0
1094+
10941095

10951096
# ============================================================================
10961097
class CrawlCompleteIn(BaseModel):

backend/btrixcloud/operator/crawls.py

Lines changed: 64 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,16 @@
2222
TYPE_RUNNING_STATES,
2323
TYPE_ALL_CRAWL_STATES,
2424
TYPE_PAUSED_STATES,
25-
AUTO_PAUSED_STATES,
2625
RUNNING_STATES,
2726
WAITING_STATES,
2827
RUNNING_AND_STARTING_ONLY,
2928
RUNNING_AND_WAITING_STATES,
3029
SUCCESSFUL_STATES,
3130
FAILED_STATES,
3231
PAUSED_STATES,
33-
CrawlStats,
3432
CrawlFile,
3533
CrawlCompleteIn,
3634
StorageRef,
37-
Organization,
3835
)
3936

4037
from btrixcloud.utils import (
@@ -48,6 +45,7 @@
4845
from .models import (
4946
CrawlSpec,
5047
CrawlStatus,
48+
OpCrawlStats,
5149
StopReason,
5250
MCBaseRequest,
5351
MCSyncData,
@@ -398,7 +396,13 @@ async def sync_crawls(self, data: MCSyncData):
398396
if status.pagesFound < status.desiredScale:
399397
status.desiredScale = max(1, status.pagesFound)
400398

401-
is_paused = bool(crawl.paused_at) and status.state in PAUSED_STATES
399+
# paused and shut down pods if size is <= 4096 (empty dir)
400+
# paused_at is set state is a valid paused state
401+
is_paused = (
402+
bool(crawl.paused_at)
403+
and status.sizePending <= 4096
404+
and status.state in PAUSED_STATES
405+
)
402406

403407
for i in range(0, status.desiredScale):
404408
if status.pagesFound < i * num_browsers_per_pod:
@@ -686,7 +690,7 @@ async def set_state(
686690
crawl: CrawlSpec,
687691
allowed_from: Sequence[TYPE_ALL_CRAWL_STATES],
688692
finished: Optional[datetime] = None,
689-
stats: Optional[CrawlStats] = None,
693+
stats: Optional[OpCrawlStats] = None,
690694
):
691695
"""set status state and update db, if changed
692696
if allowed_from passed in, can only transition from allowed_from state,
@@ -837,7 +841,7 @@ async def fail_crawl(
837841
crawl: CrawlSpec,
838842
status: CrawlStatus,
839843
pods: dict,
840-
stats: CrawlStats,
844+
stats: OpCrawlStats,
841845
redis: Redis,
842846
) -> bool:
843847
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
@@ -980,6 +984,10 @@ async def sync_crawl_state(
980984
)
981985

982986
if not crawler_running and redis:
987+
# clear paused key now so can resume
988+
if crawl.paused_at:
989+
await redis.delete(f"{crawl.id}:paused")
990+
983991
# if crawler is not running for REDIS_TTL seconds, also stop redis
984992
# but not right away in case crawler pod is just restarting.
985993
# avoids keeping redis pods around while no crawler pods are up
@@ -1006,12 +1014,12 @@ async def sync_crawl_state(
10061014
status.lastActiveTime = date_to_str(dt_now())
10071015

10081016
file_done = await redis.rpop(self.done_key)
1017+
10091018
while file_done:
10101019
msg = json.loads(file_done)
10111020
# add completed file
10121021
if msg.get("filename"):
10131022
await self.add_file_to_crawl(msg, crawl, redis)
1014-
await redis.incr("filesAdded")
10151023

10161024
# get next file done
10171025
file_done = await redis.rpop(self.done_key)
@@ -1381,7 +1389,7 @@ def get_log_line(self, message, details):
13811389
}
13821390
return json.dumps(err)
13831391

1384-
async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
1392+
async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis) -> int:
13851393
"""Handle finished CrawlFile to db"""
13861394

13871395
filecomplete = CrawlCompleteIn(**cc_data)
@@ -1398,6 +1406,11 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
13981406
)
13991407

14001408
await redis.incr("filesAddedSize", filecomplete.size)
1409+
await redis.incr("filesAdded")
1410+
1411+
# sizes = await redis.hkeys(f"{crawl.id}:size")
1412+
# for size in sizes:
1413+
# await redis.hmset(f"{crawl.id}:size", {size: 0 for size in sizes})
14011414

14021415
await self.crawl_ops.add_crawl_file(
14031416
crawl.db_crawl_id, crawl.is_qa, crawl_file, filecomplete.size
@@ -1407,7 +1420,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
14071420

14081421
# no replicas for QA for now
14091422
if crawl.is_qa:
1410-
return True
1423+
return filecomplete.size
14111424

14121425
try:
14131426
await self.background_job_ops.create_replica_jobs(
@@ -1417,7 +1430,7 @@ async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis):
14171430
except Exception as exc:
14181431
print("Replicate Exception", exc, flush=True)
14191432

1420-
return True
1433+
return filecomplete.size
14211434

14221435
async def is_crawl_stopping(
14231436
self, crawl: CrawlSpec, status: CrawlStatus
@@ -1446,8 +1459,7 @@ async def is_crawl_stopping(
14461459

14471460
# pause crawl if org is set read-only
14481461
if org.readOnly:
1449-
await self.pause_crawl(crawl, org)
1450-
return "paused_org_readonly"
1462+
return self.request_pause_crawl("paused_org_readonly", crawl)
14511463

14521464
# pause crawl if storage quota is reached
14531465
if org.quotas.storageQuota:
@@ -1457,44 +1469,35 @@ async def is_crawl_stopping(
14571469
active_crawls_total_size = await self.crawl_ops.get_active_crawls_size(
14581470
crawl.oid
14591471
)
1460-
print(f"Active crawls total size: {active_crawls_total_size}", flush=True)
1461-
already_uploaded_size = (
1462-
await self.crawl_ops.get_active_crawls_uploaded_wacz_size(crawl.oid)
1463-
)
1464-
print(
1465-
f"Active crawls already uploaded size: {already_uploaded_size}",
1466-
flush=True,
1467-
)
1468-
active_crawls_not_uploaded_size = (
1469-
active_crawls_total_size - already_uploaded_size
1470-
)
1471-
print(
1472-
f"Active crawls not yet uploaded size: {active_crawls_not_uploaded_size}",
1473-
flush=True,
1474-
)
1475-
if self.org_ops.storage_quota_reached(org, active_crawls_not_uploaded_size):
1476-
await self.pause_crawl(crawl, org)
1477-
return "paused_storage_quota_reached"
1472+
1473+
if self.org_ops.storage_quota_reached(org, active_crawls_total_size):
1474+
return self.request_pause_crawl("paused_storage_quota_reached", crawl)
14781475

14791476
# pause crawl if execution time quota is reached
14801477
if self.org_ops.exec_mins_quota_reached(org):
1481-
await self.pause_crawl(crawl, org)
1482-
return "paused_time_quota_reached"
1478+
return self.request_pause_crawl("paused_time_quota_reached", crawl)
14831479

14841480
if crawl.paused_at and status.stopReason not in PAUSED_STATES:
14851481
return "paused"
14861482

14871483
return None
14881484

1489-
async def pause_crawl(self, crawl: CrawlSpec, org: Organization):
1490-
"""Pause crawl and update crawl spec"""
1491-
paused_at = dt_now()
1492-
await self.crawl_ops.pause_crawl(crawl.id, org, pause=True, paused_at=paused_at)
1493-
crawl.paused_at = paused_at
1485+
def request_pause_crawl(
1486+
self, reason: StopReason, crawl: CrawlSpec
1487+
) -> Optional[StopReason]:
1488+
"""Request crawl to be paused asynchronously, equivalent of user clicking 'pause' button
1489+
if crawl is paused, then use the specified reason instead of default paused state
1490+
"""
1491+
if crawl.paused_at:
1492+
return reason
1493+
1494+
print(f"request pause for {reason}")
1495+
self.run_task(self.crawl_ops.pause_crawl(crawl.id, crawl.org, pause=True))
1496+
return None
14941497

14951498
async def get_redis_crawl_stats(
14961499
self, redis: Redis, crawl_id: str
1497-
) -> tuple[CrawlStats, dict[str, Any]]:
1500+
) -> tuple[OpCrawlStats, dict[str, Any]]:
14981501
"""get page stats"""
14991502
try:
15001503
# crawler >0.9.0, done key is a value
@@ -1514,7 +1517,7 @@ async def get_redis_crawl_stats(
15141517

15151518
profile_update = await redis.get(f"{crawl_id}:profileUploaded")
15161519

1517-
stats = CrawlStats(
1520+
stats = OpCrawlStats(
15181521
found=pages_found,
15191522
done=pages_done,
15201523
size=archive_size,
@@ -1534,45 +1537,36 @@ async def update_crawl_state(
15341537
results = await redis.hgetall(f"{crawl.id}:status")
15351538
stats, sizes = await self.get_redis_crawl_stats(redis, crawl.id)
15361539

1537-
print(f"crawl.paused_at: {crawl.paused_at}", flush=True)
1538-
print(f"crawl.stopping: {crawl.stopping}", flush=True)
1539-
print(f"status.stopReason: {status.stopReason}", flush=True)
1540+
pending_size = stats.size
15401541

1541-
print(f"stats.size initial: {stats.size}", flush=True)
1542+
stats.size += status.filesAddedSize
1543+
1544+
total_size = stats.size
1545+
1546+
print(f"pending size: {pending_size}", flush=True)
15421547
print(f"status.filesAdded: {status.filesAdded}", flush=True)
15431548
print(f"status.filesAddedSize: {status.filesAddedSize}", flush=True)
1544-
1545-
# need to add size of previously completed WACZ files as well!
1546-
# TODO: Fix this so that it works as expected with pausing
1547-
# - The if clause here is close to a solution except it still results
1548-
# in pauses after the first showing a smaller-than-expected size
1549-
# because it no longer counts files added previous to resuming the crawl.
1550-
# - Kind of seems like what we need here is either a way of still adding
1551-
# files added prior to the current pause without double-adding files
1552-
# that are currently being uploaded.
1553-
# - Another way to do that might be to have the crawler decrement the size
1554-
# of a crawl by the amount of WACZs that are uploaded, so that this here
1555-
# in the operator can stay simpler?
1556-
if status.stopReason not in PAUSED_STATES:
1557-
stats.size += status.filesAddedSize
1558-
print(f"stats.size after adding filesAddedSize: {stats.size}", flush=True)
1559-
else:
1560-
print(
1561-
"not adding filesAddedSize to stats.size, crawl is pausing", flush=True
1562-
)
1549+
print(f"total: {total_size}", flush=True)
1550+
print(
1551+
f"org quota: {crawl.org.bytesStored + stats.size} <= {crawl.org.quotas.storageQuota}",
1552+
flush=True,
1553+
)
15631554

15641555
# update status
15651556
status.pagesDone = stats.done
15661557
status.pagesFound = stats.found
1567-
status.size = stats.size
1558+
1559+
status.sizePending = pending_size
1560+
status.size = total_size
15681561
status.sizeHuman = humanize.naturalsize(status.size)
15691562

15701563
await self.crawl_ops.update_running_crawl_stats(
1571-
crawl.db_crawl_id, crawl.is_qa, stats
1564+
crawl.db_crawl_id, crawl.is_qa, stats, pending_size
15721565
)
15731566

15741567
for key, value in sizes.items():
15751568
increase_storage = False
1569+
pod_info = None
15761570
value = int(value)
15771571
if value > 0 and status.podStatus:
15781572
pod_info = status.podStatus[key]
@@ -1588,11 +1582,11 @@ async def update_crawl_state(
15881582
increase_storage = True
15891583

15901584
# out of storage
1591-
if pod_info.isNewExit and pod_info.exitCode == 3:
1585+
if pod_info and pod_info.isNewExit and pod_info.exitCode == 3:
15921586
pod_info.used.storage = pod_info.allocated.storage
15931587
increase_storage = True
15941588

1595-
if increase_storage:
1589+
if pod_info and increase_storage:
15961590
new_storage = math.ceil(
15971591
pod_info.used.storage * self.min_avail_storage_ratio / 1_000_000_000
15981592
)
@@ -1655,18 +1649,15 @@ async def update_crawl_state(
16551649
else:
16561650
paused_state = "paused"
16571651

1658-
await redis.delete(f"{crawl.id}:paused")
1652+
# await redis.delete(f"{crawl.id}:paused")
16591653
await self.set_state(
16601654
paused_state,
16611655
status,
16621656
crawl,
16631657
allowed_from=RUNNING_AND_WAITING_STATES,
16641658
)
16651659

1666-
if (
1667-
paused_state in AUTO_PAUSED_STATES
1668-
and not status.autoPausedEmailsSent
1669-
):
1660+
if paused_state != "paused" and not status.autoPausedEmailsSent:
16701661
await self.crawl_ops.notify_org_admins_of_auto_paused_crawl(
16711662
paused_reason=paused_state,
16721663
cid=crawl.cid,
@@ -1731,7 +1722,7 @@ async def mark_finished(
17311722
crawl: CrawlSpec,
17321723
status: CrawlStatus,
17331724
state: TYPE_NON_RUNNING_STATES,
1734-
stats: Optional[CrawlStats] = None,
1725+
stats: Optional[OpCrawlStats] = None,
17351726
) -> bool:
17361727
"""mark crawl as finished, set finished timestamp and final state"""
17371728

@@ -1775,7 +1766,7 @@ async def do_crawl_finished_tasks(
17751766
crawl: CrawlSpec,
17761767
status: CrawlStatus,
17771768
state: TYPE_NON_RUNNING_STATES,
1778-
stats: Optional[CrawlStats],
1769+
stats: Optional[OpCrawlStats],
17791770
) -> None:
17801771
"""Run tasks after crawl completes in asyncio.task coroutine."""
17811772
await self.crawl_config_ops.stats_recompute_last(

0 commit comments

Comments
 (0)