Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement #844 (bulk delete jobs) #1215

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions pyiron_base/database/filetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def add_item_dict(self, par_dict):
).reset_index(drop=True)
return int(par_dict_merged["id"])

def delete_item(self, item_id):
def _delete_item(self, item_id):
"""
Delete Item from database

Expand Down Expand Up @@ -314,11 +314,12 @@ def get_items_dict(self, item_dict, return_all_columns=True):
else:
return [{"id": i} for i in df_dict["id"].values()]

def get_jobs(self, project=None, recursive=True, columns=None):
def _get_jobs(self, user, sql_query, project=None, recursive=True, columns=None):
"""
Get jobs as dictionary from filetable

Args:
user/sql_query: ignored for compat with IsDatabase
project (str/ None): path to the project
recursive (boolean): recursively iterate over all sub projects
columns (list/ None): list of columns to return
Expand Down Expand Up @@ -350,19 +351,6 @@ def get_jobs(self, project=None, recursive=True, columns=None):
].tolist() # ToDo: Check difference of tolist and to_list
return dictionary

def get_job_ids(self, project=None, recursive=True):
"""
Get job IDs from filetable

Args:
project (str/ None): path to the project
recursive (boolean): recursively iterate over all sub projects

Returns:
list/ None: list of job IDs
"""
return self.get_jobs(project=project, recursive=recursive, columns=["id"])["id"]

def get_job_id(self, job_specifier, project=None):
"""
Get job ID from filetable
Expand Down
16 changes: 15 additions & 1 deletion pyiron_base/database/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np
import re
import os
from typing import Union, Iterable
from datetime import datetime
from pyiron_base.utils.deprecate import deprecate
import pandas
Expand Down Expand Up @@ -739,7 +740,7 @@ def _item_update(self, par_dict, item_id):
else:
raise PermissionError("Not avilable in viewer mode.")

def delete_item(self, item_id):
def _delete_item(self, item_id):
"""
Delete Item from database

Expand All @@ -761,6 +762,19 @@ def delete_item(self, item_id):
else:
raise PermissionError("Not avilable in viewer mode.")

# IsDatabase impl'
def _get_jobs(self, sql_query, user, project_path, recursive=True, columns=None):
df = self.job_table(
sql_query=sql_query,
user=user,
project_path=project_path,
recursive=recursive,
columns=columns,
)
if len(df) == 0:
return {key: list() for key in columns}
return df.to_dict(orient="list")

# Shortcut
def get_item_by_id(self, item_id):
"""
Expand Down
70 changes: 70 additions & 0 deletions pyiron_base/database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,29 @@ def _items_update(self, par_dict, item_ids):
for i_id in item_ids:
self._item_update(par_dict=par_dict, item_id=i_id)

@abstractmethod
def _delete_item(self, item_id):
pass

def _delete_items(self, item_ids):
raise NotImplementedError()

def delete_item(self, item_id: typing.Union[int, typing.Iterable[int]]):
"""
Delete database entry for job with given id.

Args:
item_id (int, iterable): job id to delete or iterable there of
"""
if not isinstance(item_id, Iterable):
self._delete_item(item_id)
else:
try:
self._delete_items(item_id)
except NotImplementedError:
for i in item_id:
self._delete_item(i)

def set_job_status(self, status, job_id):
"""
Set status of a job or multiple jobs if job_id is iterable.
Expand Down Expand Up @@ -300,3 +323,50 @@ def get_db_columns(self):
'totalcputime']
"""
return self.get_table_headings()

@abstractmethod
def _get_jobs(self, sql_query, user, project_path, recursive=True, columns=None):
pass

def get_jobs(self, sql_query, user, project_path, recursive=True, columns=None):
"""
Internal function to return the jobs as dictionary rather than a pandas.Dataframe

Args:
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
columns (list): by default only the columns ['id', 'project'] are selected, but the user can select a subset
of ['id', 'status', 'chemicalformula', 'job', 'subjob', 'project', 'projectpath', 'timestart',
'timestop', 'totalcputime', 'computer', 'hamilton', 'hamversion', 'parentid', 'masterid']

Returns:
dict: columns are used as keys and point to a list of the corresponding values
"""
if columns is None:
columns = ["id", "project"]
return self._get_jobs(
sql_query, user, project_path, recursive, columns
)

def get_job_ids(self, sql_query, user, project_path, recursive=True):
"""
Return the job IDs matching a specific query

Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]

Returns:
list: a list of job IDs
"""
return self.get_jobs(
sql_query=sql_query,
user=user,
project_path=project_path,
recursive=recursive,
)["id"]
64 changes: 0 additions & 64 deletions pyiron_base/database/jobtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,6 @@
__status__ = "production"
__date__ = "Sep 1, 2017"


def get_jobs(database, sql_query, user, project_path, recursive=True, columns=None):
"""
Internal function to return the jobs as dictionary rather than a pandas.Dataframe

Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
columns (list): by default only the columns ['id', 'project'] are selected, but the user can select a subset
of ['id', 'status', 'chemicalformula', 'job', 'subjob', 'project', 'projectpath', 'timestart',
'timestop', 'totalcputime', 'computer', 'hamilton', 'hamversion', 'parentid', 'masterid']

Returns:
dict: columns are used as keys and point to a list of the corresponding values
"""
if not isinstance(database, FileTable):
if columns is None:
columns = ["id", "project"]
df = database.job_table(
sql_query=sql_query,
user=user,
project_path=project_path,
recursive=recursive,
columns=columns,
)
if len(df) == 0:
return {key: list() for key in columns}
return df.to_dict(orient="list")
else:
return database.get_jobs(
project=project_path, recursive=recursive, columns=columns
)


def get_job_ids(database, sql_query, user, project_path, recursive=True):
"""
Return the job IDs matching a specific query

Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]

Returns:
list: a list of job IDs
"""
if not isinstance(database, FileTable):
return get_jobs(
database=database,
sql_query=sql_query,
user=user,
project_path=project_path,
recursive=recursive,
columns=["id"],
)["id"]
else:
return database.get_job_ids(project=project_path, recursive=recursive)


def get_child_ids(database, sql_query, user, project_path, job_specifier, status=None):
"""
Get the childs for a specific job
Expand Down
27 changes: 27 additions & 0 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from inspect import isclass
import os
import posixpath
import shutil
import warnings

from h5io_browser.base import _read_hdf, _write_hdf
Expand Down Expand Up @@ -1532,6 +1533,32 @@ def _reload_update_master(self, project, master_id):
self._logger.info("busy master: {} {}".format(master_id, self.get_job_id()))
del self

@staticmethod
def _bulk_remove_jobs(project, job_df, progress):
"""
"""
def del_files(row):
if progress is not None:
progress.update(1)
project_path, project, job = row[["projectpath", "project", "job"]]
if project_path is not None:
base_path = os.path.join(project_path, project, job)
else:
base_path = os.path.join(project, job)
if os.path.isfile(base_path + ".h5"):
os.remove(base_path + ".h5")
shutil.rmtree(base_path + "_hdf5", ignore_errors=True)

project.db.delete_item(job_df.id)
# if job doesn't match subjob, it's stored inside another job's HDF5 file and will be delete from there.
n_total = len(job_df)
job_df = job_df.query("job == subjob.str.slice(1, None)")
# sub jobs won't have their files deleted, so advance progress bar on its own
n_sub = n_total - len(job_df)
if progress is not None:
progress.update(n_sub)
job_df.apply(del_files, axis="columns")

def _get_executor(self, max_workers=None):
if self._executor_type is None:
raise ValueError(
Expand Down
7 changes: 7 additions & 0 deletions pyiron_base/jobs/master/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,13 @@ def _init_child_job(self, parent):
"""
self.ref_job = parent

@staticmethod
def _bulk_remove_jobs(project, job_df, progress):
for job_id in job_df["id"]:
job = project.load(job_id)
job.child_project.remove(enable=True)

super()._bulk_remove_jobs(project, job_df, progress)

def get_function_from_string(function_str):
"""
Expand Down
34 changes: 20 additions & 14 deletions pyiron_base/project/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
from pyiron_base.database.filetable import FileTable
from pyiron_base.state import state
from pyiron_base.database.jobtable import (
get_job_ids,
get_job_id,
get_jobs,
set_job_status,
get_child_ids,
get_job_working_directory,
Expand Down Expand Up @@ -508,8 +506,7 @@ def get_jobs(self, recursive=True, columns=None):
Returns:
dict: columns are used as keys and point to a list of the corresponding values
"""
return get_jobs(
database=self.db,
return self.db.get_jobs(
sql_query=self.sql_query,
user=self.user,
project_path=self.project_path,
Expand All @@ -527,8 +524,7 @@ def get_job_ids(self, recursive=True):
Returns:
list: a list of job IDs
"""
return get_job_ids(
database=self.db,
return self.db.get_job_ids(
sql_query=self.sql_query,
user=self.user,
project_path=self.project_path,
Expand Down Expand Up @@ -1700,15 +1696,25 @@ def _remove_jobs_helper(self, recursive=False, progress=True):
if not isinstance(recursive, bool):
raise ValueError("recursive must be a boolean")
if self.db.view_mode:
raise EnvironmentError("copy_to: is not available in Viewermode !")
job_id_lst = self.get_job_ids(recursive=recursive)
job_id_progress = tqdm(job_id_lst) if progress else job_id_lst
for job_id in job_id_progress:
raise RuntimeError("copy_to: is not available in Viewermode !")
job_df = self.job_table(
recursive=recursive,
columns=["id", "hamilton", "parentid", "masterid", "projectpath", "project", "job", "subjob"]
)
job_id_lst = job_df["id"]
parents = set(job_df.parentid.dropna())
masters = set(job_df.masterid.dropna())
if not (parents.issubset(job_id_lst) and masters.issubset(job_id_lst)):
assert False, "Somehow sort out out of project jobs"

progress = tqdm(total=len(job_id_lst)) if progress else None
for hamilton, sub_df in job_df.groupby("hamilton"):
try:
self.remove_job(job_specifier=job_id)
state.logger.debug("Remove job with ID {0} ".format(job_id))
except (IndexError, Exception):
state.logger.debug("Could not remove job with ID {0} ".format(job_id))
job_class = JobType.convert_str_to_class(JOB_CLASS_DICT, hamilton)
except ValueError: # if job class is not registered in JOB_CLASS_DICT use generic routine
from pyiron_base.jobs.job.generic import GenericJob
job_class = GenericJob
job_class._bulk_remove_jobs(self, sub_df, progress)

def _remove_files(self, pattern="*"):
"""
Expand Down
6 changes: 2 additions & 4 deletions tests/database/test_database_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,8 @@ def test_delete_item(self):
par_dict = self.add_items("BO")
key = par_dict["id"]
self.database.delete_item(key)
self.assertRaises(
Exception, self.database.delete_item, [key]
) # use only str or int
# self.assertRaises(Exception, self.database.get_item_by_id, key) # ensure item does not exist anymore
self.assertRaises(Exception, self.database.get_item_by_id, key,
"Item still exists after delete_item!")

def test_get_item_by_id(self):
"""
Expand Down
Loading