Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .scheduler import main

if __name__ == '__main__':
main()
252 changes: 252 additions & 0 deletions shared_scripts/accession_sketcher/accession_sketcher/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import os
import sqlite3
import threading
import time
from typing import Optional, Tuple, List, Dict, Any

SCHEMA = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subdir TEXT NOT NULL,
filename TEXT NOT NULL,
url TEXT NOT NULL,
size INTEGER,
mtime TEXT,
status TEXT NOT NULL DEFAULT 'PENDING',
tries INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
out_path TEXT,
updated_at REAL,
created_at REAL
);
CREATE INDEX IF NOT EXISTS idx_status ON files(status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_file ON files(subdir, filename);
"""

class DB:
def __init__(self, path: str):
self.path = path
parent = os.path.dirname(path)
if parent:
os.makedirs(parent, exist_ok=True)
self._lock = threading.Lock()
self.conn = sqlite3.connect(self.path, check_same_thread=False, timeout=60.0)
with self.conn:
for stmt in SCHEMA.strip().split(";"):
if stmt.strip():
self.conn.execute(stmt)

def claim_next(
self,
error_cooldown_seconds: int = 3600,
error_max_total_tries: int = 20,
) -> Optional[Tuple[int, str, str, str]]:
"""Atomically claim work: PENDING first; else eligible ERROR (aged & below cap)."""
now = time.time()
cutoff = now - error_cooldown_seconds
with self._lock, self.conn:
row = self.conn.execute(
"SELECT id FROM files WHERE status='PENDING' ORDER BY id LIMIT 1"
).fetchone()
if row:
fid = row[0]
cur = self.conn.execute(
"UPDATE files SET status='DOWNLOADING', updated_at=? "
"WHERE id=? AND status='PENDING'",
(now, fid),
)
if cur.rowcount == 1:
return self.conn.execute(
"SELECT id, subdir, filename, url FROM files WHERE id=?",
(fid,),
).fetchone()

row = self.conn.execute(
"SELECT id FROM files "
"WHERE status='ERROR' AND tries < ? AND (updated_at IS NULL OR updated_at <= ?) "
"ORDER BY updated_at NULLS FIRST, id LIMIT 1",
(error_max_total_tries, cutoff),
).fetchone()
if not row:
return None
fid = row[0]
cur = self.conn.execute(
"UPDATE files SET status='DOWNLOADING', updated_at=? "
"WHERE id=? AND status='ERROR'",
(now, fid),
)
if cur.rowcount != 1:
return None
return self.conn.execute(
"SELECT id, subdir, filename, url FROM files WHERE id=?",
(fid,),
).fetchone()

def claim_batch(
self,
limit: int,
error_cooldown_seconds: int = 3600,
error_max_total_tries: int = 20,
) -> List[Tuple[int, str, str, str]]:
if limit <= 0:
return []
now = time.time()
cutoff = now - error_cooldown_seconds
claimed: List[Tuple[int, str, str, str]] = []
with self._lock, self.conn:
rows = self.conn.execute(
"SELECT id FROM files WHERE status='PENDING' ORDER BY id LIMIT ?",
(limit,),
).fetchall()
for (fid,) in rows:
cur = self.conn.execute(
"UPDATE files SET status='DOWNLOADING', updated_at=? "
"WHERE id=? AND status='PENDING'",
(now, fid),
)
if cur.rowcount == 1:
claimed.append(
self.conn.execute(
"SELECT id, subdir, filename, url FROM files WHERE id=?",
(fid,),
).fetchone()
)
if claimed:
return claimed

rows = self.conn.execute(
"SELECT id FROM files "
"WHERE status='ERROR' AND tries < ? AND (updated_at IS NULL OR updated_at <= ?) "
"ORDER BY updated_at NULLS FIRST, id LIMIT ?",
(error_max_total_tries, cutoff, limit),
).fetchall()
for (fid,) in rows:
cur = self.conn.execute(
"UPDATE files SET status='DOWNLOADING', updated_at=? "
"WHERE id=? AND status='ERROR'",
(now, fid),
)
if cur.rowcount == 1:
claimed.append(
self.conn.execute(
"SELECT id, subdir, filename, url FROM files WHERE id=?",
(fid,),
).fetchone()
)
return claimed

def reset_stuck(self, stale_seconds: int = 3600):
threshold = time.time() - stale_seconds
with self._lock, self.conn:
self.conn.execute(
"UPDATE files SET status='PENDING', updated_at=? "
"WHERE status IN ('DOWNLOADING','SKETCHING') AND (updated_at IS NULL OR updated_at < ?)",
(time.time(), threshold),
)

def upsert_file(
self,
subdir: str,
filename: str,
url: str,
size: Optional[int],
mtime: Optional[str],
):
ts = time.time()
with self._lock, self.conn:
self.conn.execute(
"""INSERT INTO files (subdir, filename, url, size, mtime, status, updated_at, created_at)
VALUES (?, ?, ?, ?, ?, 'PENDING', ?, ?)
ON CONFLICT(subdir, filename) DO UPDATE SET url=excluded.url, size=excluded.size, mtime=excluded.mtime, updated_at=excluded.updated_at""",
(subdir, filename, url, size, mtime, ts, ts),
)

def mark_status(
self,
file_id: int,
status: str,
out_path: Optional[str] = None,
error: Optional[str] = None,
inc_tries: bool = False,
):
ts = time.time()
with self._lock, self.conn:
if inc_tries:
self.conn.execute(
"UPDATE files SET status=?, out_path=?, last_error=?, tries=tries+1, updated_at=? WHERE id=?",
(status, out_path, error, ts, file_id),
)
else:
self.conn.execute(
"UPDATE files SET status=?, out_path=?, last_error=?, updated_at=? WHERE id=?",
(status, out_path, error, ts, file_id),
)

def update_size(self, file_id: int, size: Optional[int]):
if size is None:
return
ts = time.time()
with self._lock, self.conn:
self.conn.execute(
"UPDATE files SET size=?, updated_at=? WHERE id=?",
(size, ts, file_id),
)

def get_tries(self, file_id: int) -> int:
with self._lock:
cur = self.conn.execute("SELECT tries FROM files WHERE id=?", (file_id,))
row = cur.fetchone()
return int(row[0]) if row else 0

def existing_filenames(self, filenames: List[str]) -> set[str]:
if not filenames:
return set()
existing: set[str] = set()
with self._lock:
for i in range(0, len(filenames), 999):
chunk = filenames[i:i + 999]
placeholders = ",".join("?" for _ in chunk)
cur = self.conn.execute(
f"SELECT filename FROM files WHERE filename IN ({placeholders})",
chunk,
)
existing.update(row[0] for row in cur.fetchall())
return existing

def stats(self) -> Dict[str, Any]:
with self._lock:
cur = self.conn.execute("SELECT status, COUNT(*) FROM files GROUP BY status")
by_status = {row[0]: row[1] for row in cur.fetchall()}
cur2 = self.conn.execute("SELECT COUNT(*) FROM files")
total = cur2.fetchone()[0]
return {"total": total, "by_status": by_status}

def count_claimable(self, error_cooldown_seconds: int = 3600, error_max_total_tries: int = 20) -> int:
cutoff = time.time() - error_cooldown_seconds
with self._lock:
cur = self.conn.execute(
"SELECT COUNT(*) FROM files WHERE status='PENDING'"
)
pending = cur.fetchone()[0]
cur = self.conn.execute(
"SELECT COUNT(*) FROM files WHERE status='ERROR' AND tries < ? AND (updated_at IS NULL OR updated_at <= ?)",
(error_max_total_tries, cutoff),
)
retryable = cur.fetchone()[0]
return pending + retryable

def iter_pending(self, batch_size: int = 1000):
offset = 0
while True:
with self._lock:
rows = self.conn.execute(
"SELECT id, subdir, filename, url FROM files WHERE status='PENDING' ORDER BY id LIMIT ? OFFSET ?",
(batch_size, offset),
).fetchall()
if not rows:
break
for row in rows:
yield row
offset += batch_size
4 changes: 4 additions & 0 deletions shared_scripts/accession_sketcher/accession_sketcher/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .scheduler import main

if __name__ == '__main__':
main()
Loading