Skip to content

Commit f7f035f

Browse files
committed
job manager: next step in making df in run_jobs optional
to make it deprecated eventually Also improve test coverage of new approach related to PR #636
1 parent 08fed55 commit f7f035f

File tree

2 files changed

+54
-7
lines changed

2 files changed

+54
-7
lines changed

openeo/extra/job_management.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
9090
"""
9191
...
9292

93+
94+
def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
95+
raise NotImplementedError
96+
97+
9398
class MultiBackendJobManager:
9499
"""
95100
Tracker for multiple jobs on multiple backends.
@@ -366,8 +371,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):
366371

367372
def run_jobs(
368373
self,
369-
df: Optional[pd.DataFrame],
370-
start_job: Callable[[], BatchJob],
374+
df: Optional[pd.DataFrame] = None,
375+
start_job: Callable[[], BatchJob] = _start_job_default,
371376
job_db: Union[str, Path, JobDatabaseInterface, None] = None,
372377
**kwargs,
373378
):
@@ -450,6 +455,7 @@ def run_jobs(
450455
# Resume from existing db
451456
_log.info(f"Resuming `run_jobs` from existing {job_db}")
452457
elif df is not None:
458+
# TODO: start showing deprecation warnings for this usage pattern?
453459
df = self._normalize_df(df)
454460
job_db.persist(df)
455461

tests/extra/test_job_management.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ def sleep_mock(self):
7979
with mock.patch("time.sleep") as sleep:
8080
yield sleep
8181

82-
def test_basic(self, tmp_path, requests_mock, sleep_mock):
83-
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)
82+
def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock):
83+
"""
84+
Legacy `run_jobs()` usage with explicit dataframe and output file
85+
"""
86+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
8487

8588
df = pd.DataFrame(
8689
{
@@ -108,8 +111,45 @@ def start_job(row, connection, **kwargs):
108111
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
109112
assert metadata_path.exists()
110113

114+
def test_basic(self, tmp_path, requests_mock, sleep_mock):
115+
"""
116+
`run_jobs()` usage with a `CsvJobDatabase`
117+
(and no explicit dataframe or output file)
118+
"""
119+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
120+
121+
df = pd.DataFrame(
122+
{
123+
"year": [2018, 2019, 2020, 2021, 2022],
124+
# Use simple points in WKT format to test conversion to the geometry dtype
125+
"geometry": ["POINT (1 2)"] * 5,
126+
}
127+
)
128+
output_file = tmp_path / "jobs.csv"
129+
130+
def start_job(row, connection, **kwargs):
131+
year = int(row["year"])
132+
return BatchJob(job_id=f"job-{year}", connection=connection)
133+
134+
job_db = CsvJobDatabase(output_file)
135+
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
136+
job_db.persist(manager._normalize_df(df))
137+
138+
manager.run_jobs(job_db=job_db, start_job=start_job)
139+
assert sleep_mock.call_count > 10
140+
141+
result = pd.read_csv(output_file)
142+
assert len(result) == 5
143+
assert set(result.status) == {"finished"}
144+
assert set(result.backend_name) == {"foo", "bar"}
145+
146+
# We expect that the job metadata was saved, so verify that it exists.
147+
# Checking for one of the jobs is enough.
148+
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
149+
assert metadata_path.exists()
150+
111151
def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
112-
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)
152+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
113153

114154
df = pd.DataFrame(
115155
{
@@ -125,7 +165,7 @@ def start_job(row, connection, **kwargs):
125165
return BatchJob(job_id=f"job-{year}", connection=connection)
126166

127167
job_db = CsvJobDatabase(output_file)
128-
# TODO: avoid private _normalize_df API
168+
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
129169
job_db.persist(manager._normalize_df(df))
130170

131171
manager.start_job_thread(start_job=start_job, job_db=job_db)
@@ -144,7 +184,8 @@ def start_job(row, connection, **kwargs):
144184
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
145185
assert metadata_path.exists()
146186

147-
def create_basic_mocked_manager(self, requests_mock, tmp_path):
187+
def _create_basic_mocked_manager(self, requests_mock, tmp_path):
188+
# TODO: separate aspects of job manager and dummy backends
148189
requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
149190
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})
150191

0 commit comments

Comments
 (0)