Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 114 additions & 56 deletions sotodlib/site_pipeline/jobdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,55 +300,101 @@ def get_jobs(self,
[session.expunge(j) for j in jobs]
return jobs

def lock(self, job_id, owner=None, force=False):
"""Lock a Job record by id. If the Job is already locked, a
JobLockedError is raised.
def lock(self, job_ids, owner=None, force=False):
"""Lock one or more Jobs record by id. If a Job is already locked,
a JobLockedError is raised.

Returns a Job object that has been expunged from the database
Returns the Job objects that has been expunged from the database
session. The object attributes can be modified, but won't be
written back to the database unless the object is merged into
a new session.

"""
if owner is None:
owner = self._lockstr()

now = time.time()

if isinstance(job_ids, int):
job_ids = [job_ids]

job_ids = [(j.id if isinstance(j, Job) else j) for j in job_ids]

with self.session_scope() as session:
q = session.query(Job)
if force:
q = q.filter(sqy.and_(Job.id == job_id))
else:
q = q.filter(sqy.and_(Job.id == job_id,
Job.lock == None)) # noqa: E711
n = q.update({Job.lock: time.time(), Job.lock_owner: owner})
q = session.query(Job).filter(Job.id.in_(job_ids))

if not force:
q = q.filter(Job.lock == None)

n = q.update(
{Job.lock: now, Job.lock_owner: owner},
synchronize_session=False
)

session.commit()

with self.session_scope() as session:
job = session.get(Job, job_id)
session.expunge(job)
jobs = (
session.query(Job)
.filter(Job.id.in_(job_ids))
.all()
)

for job in jobs:
session.expunge(job)

if n == 0 or job.lock_owner != owner:
locked_jobs = [j for j in jobs if j.lock_owner == owner]

if n !=len(job_ids) or len(locked_jobs) != len(job_ids):
raise JobLockedError()

return job
return locked_jobs[0] if len(locked_jobs) == 1 else locked_jobs

def unlock(self, jobs, merge=True):
if not isinstance(jobs, (list, tuple, set)):
jobs = [jobs]

job_ids = []
job_objs = []

for j in jobs:
if isinstance(j, Job):
job_ids.append(j.id)
job_objs.append(j)
else:
job_ids.append(j)

def unlock(self, job, merge=True):
if not merge or isinstance(job, int):
if isinstance(job, Job):
job = job.id
if not merge or not job_objs:
with self.session_scope() as session:
session.query(Job).filter(Job.id == job).update(
{Job.lock: None, Job.lock_owner: None})
session.query(Job).filter(Job.id.in_(job_ids)).update(
{Job.lock: None, Job.lock_owner: None},
synchronize_session=False
)
session.commit()

else:
with self.session_scope() as session:
j1 = session.query(Job).filter(Job.id == job.id).one()
if j1.lock_owner is None:
raise JobNotLockedError()
if j1.lock_owner != job.lock_owner:
raise JobNotOwnedError()
job.lock = None
job.lock_owner = None
session.merge(job)
db_jobs = (
session.query(Job)
.filter(Job.id.in_(job_ids))
.all()
)

db_map = {j.id: j for j in db_jobs}

for job in job_objs:
j1 = db_map[job.id]

if j1.lock_owner is None:
raise JobNotLockedError()

if j1.lock_owner != job.lock_owner:
raise JobNotOwnedError()

job.lock = None
job.lock_owner = None
session.merge(job)

session.commit()

def clear_locks(self, jobs=None):
Expand All @@ -364,18 +410,22 @@ def clear_locks(self, jobs=None):
q = q.filter(Job.id == j)
q.update({Job.lock: None, Job.lock_owner: None})

def remove_job(self, job_id, check_locked=False):
def remove_job(self, job_ids, check_locked=False):
if isinstance(job_ids, (int, Job)):
job_ids = [job_ids]

job_ids = [j.id if isinstance(j, Job) else j for j in job_ids]

with self.session_scope() as session:
q = session.query(Job).filter(Job.id.in_(job_ids))

if check_locked:
q = session.query(Job).filter(
sqy.and_(Job.id == job_id,
Job.lock == None)) # noqa: E711
else:
q = session.query(Job).filter(Job.id == job_id)
q = q.filter(Job.lock == None) # noqa: E711

n = q.delete()
n = q.delete(synchronize_session=False)
session.commit()
if n == 0:

if n != len(job_ids):
raise JobLockedError()

@contextmanager
Expand Down Expand Up @@ -407,30 +457,38 @@ def locked(self, jobs, count=None, owner=None):
"""
if owner is None:
owner = self._lockstr()

if isinstance(jobs, (int, Job)):
jobs = [jobs]

job_ids = [j.id if isinstance(j, Job) else j for j in jobs]

locked = []
for job in jobs:
if len(locked) >= (1 if count is None else count):
break
if isinstance(job, Job):
job = job.id
try:
j = self.lock(job)
except JobLockedError:
continue
locked.append(j)

try:
if count is None:
if len(locked):
yield locked[0]
else:
yield None
else:
yield locked
with self.session_scope() as session:
unlocked_ids = {
j.id for j in session.query(Job.id)
.filter(Job.id.in_(job_ids), Job.lock == None)
}

selected = []
limit = count if count is not None else 1

for jid in job_ids:
if jid in unlocked_ids:
selected.append(jid)
if len(selected) >= limit:
break

if selected:
locked = self.lock(selected, owner=owner)

yield locked

finally:
for j in locked:
self.unlock(j)
if locked:
self.unlock(locked)

def get_resource(self, jclass, n=None, jstate='open', tags={}):
jobs = self.get_jobs(jclass, jstate=jstate, tags=tags)
Expand Down
71 changes: 57 additions & 14 deletions sotodlib/site_pipeline/multilayer_preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
exist_ok=True)

# jobdb
jobdb_path = configs_proc.get("jobdb", None)
jobdb_path = configs_proc["jobdb"].get("path", None)
if jobdb_path is not None:
jdb = JobManager(sqlite_file=jobdb_path)
# get init jobs
Expand Down Expand Up @@ -404,6 +404,25 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
batch_size_init = configs_init['archive'].get('batch_size', 1)
batch_size_proc = configs_proc['archive'].get('batch_size', 1)

if jobdb_path is not None:
# batch updates to JobDb
jdb_batch_size = configs['jobdb'].get('batch_size', 1)
batched_job_count = 0
batched_job_fields = []

# get jobs up front since it is faster
init_jobs = jdb.get_jobs(jclass="init", jstate=JState.open)
tags_to_job_init = {
frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j
for j in init_jobs
}

proc_jobs = jdb.get_jobs(jclass="proc", jstate=JState.open)
tags_to_job_proc = {
frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j
for j in proc_jobs
}

pb_name = f"pb_{str(int(time.time()))}.txt"
with open(pb_name, 'w') as f:
with MultiDbBatchManager(
Expand Down Expand Up @@ -439,26 +458,50 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
configs_proc, logger, overwrite,
db_manager=db_mgr_proc)


# update jobdb
if jobdb_path is not None:
tags = {}
tags["obs:obs_id"] = obs_id
for gb, g in zip(group_by, group):
tags['dets:' + gb] = g
jobs = jdb.get_jobs(jstate=JState.open, tags=tags)
# get both init and proc
init_job = tags_to_job_init.get(fozenset(tags.items()), None)
proc_job = tags_to_job_proc.get(fozenset(tags.items()), None)

if errors[0] is not None:
jstate = JState.failed
jerror = errors[0]
else:
jstate = JState.done
jerror = None

for job in [init_job, proc_job]:
if job is not None:
batched_job_fields.append(
{
"job": job,
"error": jerror,
"jstate": jstate,
}
)

batched_job_count += 1

if (batched_job_count >= jdb_batch_size) or (len(futures) == 0):
jobs = [j['job'] for j in batched_job_fields]
job_idx = 0
with jdb.locked(jobs, count=len(jobs)) as j:
for job in j:
job.mark_visited()
job.jstate = batched_job_fields[job_idx]["jstate"]
for _t in job._tags:
if _t.key == "error":
_t.value = batched_job_fields[job_idx]["error"]
job_idx += 1
batched_job_count = 0
batched_job_fields = []

for job in jobs:
# init layer state will be JState.done if already run
if job.jstate == JState.open:
with jdb.locked(job) as j:
j.mark_visited()
if errors[0] is not None:
j.jstate = JState.failed
for _t in j._tags:
if _t.key == "error":
_t.value = errors[0]
else:
j.jstate = JState.done
if raise_error:
n_obs_fail = 0
n_groups_fail = 0
Expand Down
Loading