Skip to content

Commit

Permalink
refactor 'submit_job_handler' to allow submit from pre-created job ra…
Browse files Browse the repository at this point in the history
…ther than parsing request from scratch
  • Loading branch information
fmigneault committed Oct 15, 2024
1 parent d5a086b commit f29c010
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 68 deletions.
10 changes: 6 additions & 4 deletions tests/functional/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ def test_celery_registry_resolution():
settings = get_settings_from_testapp(webapp)
wps_url = get_wps_url(settings)
job_store = get_db(settings).get_store("jobs")
job1 = job_store.save_job(task_id="tmp", process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}})
job2 = job_store.save_job(task_id="tmp", process="jsonarray2netcdf",
inputs={"input": {"href": "http://random-dont-care.com/fake.json"}})
job1 = job_store.save_job(
task_id="tmp", process="jsonarray2netcdf", inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}
)
job2 = job_store.save_job(
task_id="tmp", process="jsonarray2netcdf", inputs={"input": {"href": "http://random-dont-care.com/fake.json"}}
)

with contextlib.ExitStack() as stack:
celery_mongo_broker = f"""mongodb://{settings["mongodb.host"]}:{settings["mongodb.port"]}/celery-test"""
Expand Down
5 changes: 3 additions & 2 deletions tests/functional/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,9 @@ def test_jobs_search_multi_status(self):
class TestWeaverCLI(TestWeaverClientBase):
def setUp(self):
super(TestWeaverCLI, self).setUp()
job = self.job_store.save_job(task_id="12345678-1111-2222-3333-111122223333", process="fake-process",
access=Visibility.PUBLIC)
job = self.job_store.save_job(
task_id="12345678-1111-2222-3333-111122223333", process="fake-process", access=Visibility.PUBLIC
)
job.status = Status.SUCCEEDED
self.test_job = self.job_store.update_job(job)

Expand Down
52 changes: 26 additions & 26 deletions tests/functional/test_wps_package.py

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ def make_job(self,
): # type: (...) -> Job
if isinstance(created, str):
created = date_parser.parse(created)
job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False,
execute_async=True, user_id=user_id, access=access, created=created)
job = self.job_store.save_job(
task_id=task_id, process=process, service=service, is_workflow=False, execute_async=True, user_id=user_id,
access=access, created=created
)
job.status = status
if status != Status.ACCEPTED:
job.started = job.created + datetime.timedelta(seconds=offset if offset is not None else 0)
Expand Down
38 changes: 38 additions & 0 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,25 @@ def wps_id(self, wps_id):
raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.wps_id'")
self["wps_id"] = wps_id

@property
def wps_url(self):
# type: () -> Optional[str]
"""
Service URL reference for :term:`WPS` interface.
.. seealso::
- :attr:`Process.processEndpointWPS1`
- :attr:`Service.url`
"""
return self.get("wps_url", None)

@wps_url.setter
def wps_url(self, service):
# type: (Optional[str]) -> None
if not isinstance(service, str):
raise TypeError(f"Type 'str' is required for '{self.__name__}.wps_url'")
self["wps_url"] = service

@property
def service(self):
# type: () -> Optional[str]
Expand Down Expand Up @@ -1071,6 +1090,23 @@ def execution_mode(self, mode):
raise ValueError(f"Invalid value for '{self.__name__}.execution_mode'. Must be one of {modes}")
self["execution_mode"] = mode

@property
def execution_wait(self):
# type: () -> Optional[int]
"""
Execution time (in seconds) to wait for a synchronous response.
"""
if not self.execute_sync:
return None
return self.get("execution_wait")

@execution_wait.setter
def execution_wait(self, wait):
# type: (Optional[int]) -> None
if wait is not None or not isinstance(wait, int):
raise ValueError(f"Invalid value for '{self.__name__}.execution_wait'. Must be None or an integer.")
self["execution_wait"] = wait

@property
def execution_response(self):
# type: () -> AnyExecuteResponse
Expand Down Expand Up @@ -1533,6 +1569,7 @@ def params(self):
"id": self.id,
"task_id": self.task_id,
"wps_id": self.wps_id,
"wps_url": self.wps_url,
"service": self.service,
"process": self.process,
"inputs": self.inputs,
Expand All @@ -1544,6 +1581,7 @@ def params(self):
"execution_response": self.execution_response,
"execution_return": self.execution_return,
"execution_mode": self.execution_mode,
"execution_wait": self.execution_wait,
"is_workflow": self.is_workflow,
"created": self.created,
"started": self.started,
Expand Down
84 changes: 61 additions & 23 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
AnyProcessRef,
AnyResponseType,
AnyServiceRef,
AnySettingsContainer,
AnyViewResponse,
AnyValueType,
CeleryResult,
Expand Down Expand Up @@ -754,7 +755,7 @@ def submit_job(request, reference, tags=None, process_id=None):

def submit_job_handler(payload, # type: ProcessExecution
settings, # type: SettingsType
service_url, # type: str
wps_url, # type: str
provider=None, # type: Optional[AnyServiceRef]
process=None, # type: AnyProcessRef
is_workflow=False, # type: bool
Expand All @@ -767,9 +768,11 @@ def submit_job_handler(payload, # type: ProcessExecution
context=None, # type: Optional[str]
): # type: (...) -> AnyResponseType
"""
Submits the job to the Celery worker with provided parameters.
Parses parameters that defines the submitted :term:`Job`, and responds accordingly with the selected execution mode.
Assumes that parameters have been pre-fetched and validated, except for the :paramref:`payload`.
Assumes that parameters have been pre-fetched and validated, except for the :paramref:`payload` containing the
desired inputs and outputs from the :term:`Job`. The selected execution mode looks up the various combinations
of headers and body parameters available across :term:`API` implementations and revisions.
"""
json_body = validate_job_schema(payload)
db = get_db(settings)
Expand Down Expand Up @@ -820,58 +823,93 @@ def submit_job_handler(payload, # type: ProcessExecution
store = db.get_store(StoreJobs) # type: StoreJobs
job = store.save_job(task_id=job_status, process=process, service=provider_id,
inputs=job_inputs, outputs=job_outputs, is_workflow=is_workflow, is_local=is_local,
execute_async=is_execute_async, execute_response=exec_resp, execute_return=exec_return,
execute_async=is_execute_async, execute_wait=wait,
execute_response=exec_resp, execute_return=exec_return,
custom_tags=tags, user_id=user, access=visibility, context=context, subscribers=subscribers,
accept_type=accept_type, accept_language=language)
job.save_log(logger=LOGGER, message=job_message, status=job_status, progress=0)

job.wps_url = wps_url
job = store.update_job(job)
location_url = job.status_url(settings)

return submit_job_dispatch_task(job, headers=req_headers, container=settings)


def submit_job_dispatch_task(
job, # type: Job
*, # force named keyword arguments after
container, # type: AnySettingsContainer
headers=None, # type: AnyHeadersContainer
): # type: (...) -> AnyResponseType
"""
Submits the :term:`Job` to the :mod:`celery` worker with provided parameters.
Assumes that parameters have been pre-fetched, validated, and can be resolved from the :term:`Job`.
"""
db = get_db(container)
store = db.get_store(StoreJobs)

location_url = job.status_url(container)
resp_headers = {"Location": location_url}
resp_headers.update(applied)
req_headers = copy.deepcopy(headers or {})

task_result = None # type: Optional[CeleryResult]
job_pending_created = job.status == Status.CREATED
if not job_pending_created:
wps_url = clean_ows_url(service_url)
wps_url = clean_ows_url(job.wps_url)
task_result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers)
LOGGER.debug("Celery pending task [%s] for job [%s].", task_result.id, job.id)
if not job_pending_created and not is_execute_async:
LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", wait)

execute_sync = not job_pending_created and not job.execute_async
if execute_sync:
LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", job.execution_wait)
try:
task_result.wait(timeout=wait)
task_result.wait(timeout=job.execution_wait)
except CeleryTaskTimeoutError:
pass
if task_result.ready():
job = store.fetch_by_id(job.id)
# when sync is successful, it must return the results direct instead of status info
# see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response
if job.status == Status.SUCCEEDED:
_, _, sync_applied = parse_prefer_header_execute_mode(req_headers, [ExecuteControlOption.SYNC])
if sync_applied:
resp_headers.update(sync_applied)
return get_job_results_response(
job,
request_headers=req_headers,
response_headers=resp_headers,
container=settings,
container=container,
)
# otherwise return the error status
body = job.json(container=settings)
body = job.json(container=container)
body["location"] = location_url
resp = get_job_submission_response(body, resp_headers, error=True)
return resp
else:
LOGGER.debug("Celery task requested as sync took too long to complete (wait=%ss). Continue in async.", wait)
# sync not respected, therefore must drop it
# since both could be provided as alternative preferences, drop only async with limited subset
prefer = get_header("Preference-Applied", headers, pop=True)
_, _, async_applied = parse_prefer_header_execute_mode({"Prefer": prefer}, [ExecuteControlOption.ASYNC])
if async_applied:
resp_headers.update(async_applied)
job.save_log(
logger=LOGGER,
level=logging.WARNING,
message=(
f"Job requested as synchronous execution took too long to complete (wait={job.execution_wait}s). "
"Will resume with asynchronous execution."
)
)
job = store.update_job(job)
execute_sync = False

if not execute_sync:
# either sync was not respected, therefore must drop it, or it was not requested at all
# since both could be provided as alternative preferences, drop only sync with limited subset
_, _, async_applied = parse_prefer_header_execute_mode(req_headers, [ExecuteControlOption.ASYNC])
if async_applied:
resp_headers.update(async_applied)

LOGGER.debug("Celery task submitted to run async.")
body = {
"jobID": job.id,
"processID": job.process,
"providerID": provider_id, # dropped by validator if not applicable
"status": map_status(job_status),
"providerID": job.service, # dropped by validator if not applicable
"status": map_status(job.status),
"location": location_url, # for convenience/backward compatibility, but official is Location *header*
}
resp_headers = update_preference_applied_return_header(job, req_headers, resp_headers)
Expand All @@ -893,7 +931,7 @@ def update_job_parameters(job, request):
def validate_job_json(request):
# type: (Request) -> JSON
"""
Validates that the request contains valid :term:`JSON` conctens, but not ncessary valid against expected schema.
Validates that the request contains valid :term:`JSON` contents, but not necessary valid against expected schema.
.. seealso::
:func:`validate_job_schema`
Expand Down
1 change: 1 addition & 0 deletions weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def save_job(self,
is_workflow=False, # type: bool
is_local=False, # type: bool
execute_async=True, # type: bool
execute_wait=None, # type: Optional[int]
execute_response=None, # type: Optional[AnyExecuteResponse]
execute_return=None, # type: Optional[AnyExecuteReturnPreference]
custom_tags=None, # type: Optional[List[str]]
Expand Down
21 changes: 13 additions & 8 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import copy
import logging
import uuid
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pymongo
from pymongo.collation import Collation
Expand Down Expand Up @@ -791,6 +791,7 @@ def save_job(self,
is_workflow=False, # type: bool
is_local=False, # type: bool
execute_async=True, # type: bool
execute_wait=None, # type: Optional[int]
execute_response=None, # type: Optional[AnyExecuteResponse]
execute_return=None, # type: Optional[AnyExecuteReturnPreference]
custom_tags=None, # type: Optional[List[str]]
Expand All @@ -812,10 +813,11 @@ def save_job(self,
tags.append(ProcessType.WORKFLOW)
else:
tags.append(ProcessType.APPLICATION)
if execute_async:
tags.append(ExecuteMode.ASYNC)
if execute_async in [None, False] and execute_wait:
execute_mode = ExecuteMode.SYNC
else:
tags.append(ExecuteMode.SYNC)
execute_mode = ExecuteMode.ASYNC
tags.append(execute_mode)
if not access:
access = Visibility.PRIVATE

Expand All @@ -829,7 +831,8 @@ def save_job(self,
"inputs": inputs,
"outputs": outputs,
"status": map_status(Status.ACCEPTED),
"execute_async": execute_async,
"execution_mode": execute_mode,
"execution_wait": execute_wait,
"execution_response": execute_response,
"execution_return": execute_return,
"is_workflow": is_workflow,
Expand Down Expand Up @@ -1047,6 +1050,7 @@ def _find_jobs_grouped(self, pipeline, group_categories):
items = found[0]["items"]
# convert to Job object where applicable, since pipeline result contains (category, jobs, count)
items = [{k: (v if k != "jobs" else [Job(j) for j in v]) for k, v in i.items()} for i in items]
items = cast("JobGroupCategory", items)
if has_provider:
for group_result in items:
group_service = group_result["category"].pop("service", None)
Expand Down Expand Up @@ -1147,13 +1151,14 @@ def _apply_status_filter(status):
statuses = set()
for _status in status:
if _status in StatusCategory:
category_status = JOB_STATUS_CATEGORIES[StatusCategory[_status]]
statuses = statuses.union(category_status)
status_cat = StatusCategory.get(_status)
category_statuses = JOB_STATUS_CATEGORIES[status_cat]
statuses = statuses.union(category_statuses)
else:
statuses.add(_status)
search_filters["status"] = {"$in": list(statuses)} # type: ignore
elif status:
search_filters["status"] = status[0]
search_filters["status"] = str(status[0])
return search_filters

@staticmethod
Expand Down
10 changes: 8 additions & 2 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@
)
from weaver.processes.constants import JobInputsOutputsSchema
from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema
from weaver.processes.execution import submit_job, submit_job_dispatch_wps, submit_job_handler, update_job_parameters
from weaver.processes.execution import (
submit_job,
submit_job_dispatch_task,
submit_job_dispatch_wps,
submit_job_handler,
update_job_parameters
)
from weaver.processes.utils import get_process
from weaver.processes.wps_package import mask_process_inputs
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
Expand Down Expand Up @@ -285,7 +291,7 @@ def trigger_job_execution(request):
raise_job_bad_status_locked(job, request)
# FIXME: reuse job, adjust function or map parameters from attributes
# FIXME: alt 202 code for accepted on async when triggered this way
return submit_job_handler(request, job)
return submit_job_dispatch_task(job, container=request)


@sd.provider_job_service.get(
Expand Down
1 change: 0 additions & 1 deletion weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,6 @@ def get_job_submission_response(body, headers, error=False):
"Execution should begin when resources are available."
)
body = sd.CreatedJobStatusSchema().deserialize(body)
headers.setdefault("Location", body["location"])
return HTTPCreated(json=body, headerlist=headers)


Expand Down

0 comments on commit f29c010

Please sign in to comment.