diff --git a/tests/closest_location.py b/tests/closest_location.py new file mode 100644 index 00000000..6197452a --- /dev/null +++ b/tests/closest_location.py @@ -0,0 +1,24 @@ +from ruamel.yaml import YAML +import pathlib +from math import cos, asin, sqrt + +def distance(lat1, lon1, lat2, lon2): + p = 0.017453292519943295 + hav = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p)*cos(lat2*p) * (1-cos((lon2-lon1)*p)) / 2 + return 12742 * asin(sqrt(hav)) + +def closest_destination(destinations, objectstore_loc_path, selected_object_store): + yaml=YAML(typ='safe') + objectstore_file = pathlib.Path(objectstore_loc_path) + objectstore = yaml.load(objectstore_file)[selected_object_store] + min_dist = 999999.99 + for dest in destinations: + d_lat = dest.context['latitude'] + d_lon = dest.context['longitude'] + o_lat = objectstore['latitude'] + o_lon = objectstore['longitude'] + dist = distance(o_lat, o_lon, d_lat, d_lon) + if dist < min_dist: + min_dist = dist + closest_dest = dest + return [closest_dest] \ No newline at end of file diff --git a/tests/fixtures/job_conf_scenario_locations.yml b/tests/fixtures/job_conf_scenario_locations.yml new file mode 100644 index 00000000..09a64464 --- /dev/null +++ b/tests/fixtures/job_conf_scenario_locations.yml @@ -0,0 +1,33 @@ +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + slurm: + load: galaxy.jobs.runners.drmaa:DRMAAJobRunner + general_pulsar_1: + load: galaxy.jobs.runners.local:LocalJobRunner + general_pulsar_2: + load: galaxy.jobs.runners.local:LocalJobRunner + highmem_pulsar_1: + load: galaxy.jobs.runners.local:LocalJobRunner + highmem_pulsar_2: + load: galaxy.jobs.runners.local:LocalJobRunner + training_slurm: + load: galaxy.jobs.runners.local:LocalJobRunner + training_pulsar: + load: galaxy.jobs.runners.local:LocalJobRunner + condor: + load: galaxy.jobs.runners.local:LocalJobRunner + +handling: + assign: + - db-skip-locked + +execution: + environments: + tpv_dispatcher: + runner: dynamic + type: python + function: map_tool_to_destination + rules_module: tpv.rules + tpv_config_files: + - config/tpv_rules.yml diff --git a/tests/fixtures/object_store_locations.yml b/tests/fixtures/object_store_locations.yml new file mode 100644 index 00000000..b135c1c1 --- /dev/null +++ b/tests/fixtures/object_store_locations.yml @@ -0,0 +1,18 @@ +object_store_italy_S3_01: + latitude: 50.0689816 + longitude: 19.9070188 +object_store_poland: + latitude: 51.9189046 + longitude: 19.1343786 +object_store_belgium: + latitude: 50.5010789 + longitude: 4.4764595 +object_store_germany: + latitude: 51.1642292 + longitude: 10.4541194 +object_store_france: + latitude: 46.71109 + longitude: 1.7191036 +object_store_australia: + latitude: -26.4390917 + longitude: 133.281323 \ No newline at end of file diff --git a/tests/fixtures/scenario-locations-api.yml b/tests/fixtures/scenario-locations-api.yml new file mode 100644 index 00000000..ea091266 --- /dev/null +++ b/tests/fixtures/scenario-locations-api.yml @@ -0,0 +1,159 @@ +global: + default_inherits: default + +tools: + default: + cores: 2 + mem: 8 + env: {} + scheduling: + require: [] + prefer: + general + accept: + reject: + offline + rules: [] + rank: | + import requests + import json + import pathlib + from ruamel.yaml import YAML + from tpv.commands.test import mock_galaxy + from tpv.core.helpers import get_dataset_attributes, concurrent_job_count_for_tool + from galaxy import model + + request_data = {} + + # static objectstore info + # NOTE: currently object store info is stored in a yaml + objectstore_loc_path = "tests/fixtures/object_store_locations.yml" + yaml=YAML(typ='safe') + objectstore_file = pathlib.Path(objectstore_loc_path) + objectstore_dict = yaml.load(objectstore_file) + request_data["static_objectstores_info"] = objectstore_dict + + # dataset info + request_data["static_dataset_info"] = helpers.get_dataset_attributes(job.input_datasets) + + # static job info + if not entity.gpus: + gpus = 0 + request_data["static_job_info"] = {"tool_id": tool.id, "mem": mem, "cores": cores, "gpus": gpus,} + + # current destination info + dest_info = [] + for dest in candidate_destinations: + dest_dict = {"id": dest.id} + dest_dict.update(dest.context) + dest_dict["queued_job_count"] = 5 #model.Query(model.Job).filter(model.Job.state == "queued", model.Job.destination_id == dest.dest_name).count() + dest_dict["running_job_count"] = 5 #model.Query(model.Job).filter(model.Job.state == "running", model.Job.destination_id == dest.dest_name).count() + dest_info.append(dest_dict) + + request_data["current_dest_info"] = dest_info + + print("REQUEST:") + print(request_data) + + # Define the URL of your FastAPI application + api_url = "http://localhost:8000/process_data" + + # Send a POST request to the API endpoint + response = requests.post(api_url, json=request_data) + print(response.json()) + + # Check if the request was successful (status code 200) + if response.status_code == 200: + result = response.json() + sorted_destination_ids = result["sorted_destinations"] + else: + print(f"Request failed with status code {response.status_code}: {response.text}") + + sorted_candidate_destinations = sorted(candidate_destinations, key=lambda x: sorted_destination_ids.index(x.id)) + sorted_candidate_destinations + trinity: + cores: 2 + mem: cores * 4 + gpus: 0 + env: {} + scheduling: + require: [] + prefer: + - pulsar + accept: + reject: + - offline + + +roles: + ga_admins: + scheduling: + require: + [] + + +destinations: + pulsar_italy: + runner: general_pulsar_1 + max_accepted_cores: 8 + max_accepted_mem: 32 + scheduling: + accept: + - general + require: + - pulsar + context: + latitude: 50.0689816 + longitude: 19.9070188 + slurm_poland: + runner: slurm + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - slurm + context: + latitude: 51.9189046 + longitude: 19.1343786 + condor_belgium: + runner: condor + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - condor + context: + latitude: 50.5010789 + longitude: 4.4764595 + slurm_germany: + runner: slurm + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - slurm + context: + latitude: 51.1642292 + longitude: 10.4541192 + condor_france: + runner: condor + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - condor + context: + latitude: 46.71109 + longitude: 1.7191036 + pulsar_australia: + runner: general_pulsar_1 + max_accepted_cores: 8 + max_accepted_mem: 32 + scheduling: + accept: + - general + require: + - pulsar + context: + latitude: -26.4390917 + longitude: 133.281323 diff --git a/tests/fixtures/scenario-locations.yml b/tests/fixtures/scenario-locations.yml new file mode 100644 index 00000000..1603ac8d --- /dev/null +++ b/tests/fixtures/scenario-locations.yml @@ -0,0 +1,107 @@ +global: + default_inherits: default + +tools: + default: + cores: 2 + mem: 8 + env: {} + scheduling: + require: [] + prefer: + general + accept: + reject: + offline + rules: [] + rank: | + from tests.closest_location import closest_destination + objectstore_loc_path = "tests/fixtures/object_store_locations.yml" + selected_object_store = "object_store_australia" + final_destinations = closest_destination(candidate_destinations, objectstore_loc_path, selected_object_store) + final_destinations + trinity: + cores: 2 + mem: cores * 4 + env: {} + scheduling: + require: [] + prefer: + - pulsar + accept: + reject: + - offline + + +roles: + ga_admins: + scheduling: + require: + [] + + +destinations: + pulsar_italy: + runner: general_pulsar_1 + max_accepted_cores: 8 + max_accepted_mem: 32 + scheduling: + accept: + - general + require: + - pulsar + context: + latitude: 50.0689816 + longitude: 19.9070188 + slurm_poland: + runner: slurm + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - slurm + context: + latitude: 51.9189046 + longitude: 19.1343786 + condor_belgium: + runner: condor + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - condor + context: + latitude: 50.5010789 + longitude: 4.4764595 + slurm_germany: + runner: slurm + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - slurm + context: + latitude: 51.1642292 + longitude: 10.4541192 + condor_france: + runner: condor + max_accepted_cores: 16 + max_accepted_mem: 64 + scheduling: + accept: + - condor + context: + latitude: 46.71109 + longitude: 1.7191036 + pulsar_australia: + runner: general_pulsar_1 + max_accepted_cores: 8 + max_accepted_mem: 32 + scheduling: + accept: + - general + require: + - pulsar + context: + latitude: -26.4390917 + longitude: 133.281323 diff --git a/tests/test_scenarios_locations.py b/tests/test_scenarios_locations.py new file mode 100644 index 00000000..f6ebf9ba --- /dev/null +++ b/tests/test_scenarios_locations.py @@ -0,0 +1,95 @@ +import os +import time +import tempfile +import pathlib +import responses +import shutil +import unittest +from galaxy.jobs.mapper import JobMappingException +from tpv.rules import gateway +from tpv.commands.test import mock_galaxy +from tpv.core.helpers import get_dataset_attributes + +class TestScenarios(unittest.TestCase): + + @staticmethod + def _map_to_destination(tool, user, datasets=[], tpv_config_path=None, job_conf=None, app=None): + if job_conf: + galaxy_app = mock_galaxy.App(job_conf=os.path.join(os.path.dirname(__file__), job_conf)) + elif app: + galaxy_app = app + else: + galaxy_app = mock_galaxy.App(job_conf=os.path.join(os.path.dirname(__file__), 'fixtures/job_conf.yml')) + job = mock_galaxy.Job() + for d in datasets: + job.add_input_dataset(d) + tpv_config = tpv_config_path or os.path.join(os.path.dirname(__file__), 'fixtures/mapping-rules.yml') + gateway.ACTIVE_DESTINATION_MAPPER = None + return gateway.map_tool_to_destination(galaxy_app, job, tool, user, tpv_config_files=[tpv_config]) + + def test_scenario_esg_group_user(self): + """ + pulsar-hm2-user is a user to specifically run jobs on hm2 with a minimum spec. Regardless of anything else. + Each endpoint will have a user that does this. + """ + + tool = mock_galaxy.Tool('trinity') + user = mock_galaxy.User('pulsar-hm2-user', 'pulsar-hm2-user@unimelb.edu.au', roles=["ga_admins"]) + datasets = [mock_galaxy.DatasetAssociation("input", mock_galaxy.Dataset("input.fastq", + file_size=1000*1024**3, + object_store_id="files1"))] + rules_file = os.path.join(os.path.dirname(__file__), 'fixtures/scenario-locations.yml') + destination = self._map_to_destination(tool, user, datasets=datasets, tpv_config_path=rules_file, + job_conf='fixtures/job_conf_scenario_locations.yml') + self.assertEqual(destination.id, "pulsar_australia") + + def test_scenario_esg_group_user_api(self): + """ + pulsar-hm2-user is a user to specifically run jobs on hm2 with a minimum spec. Regardless of anything else. + Each endpoint will have a user that does this. + """ + + def create_job(app, destination): + sa_session = app.model.context + + u = app.model.User(email="highmemuser@unimelb.edu.au", password="password") + j = app.model.Job() + j.user = u + j.tool_id = "trinity" + j.state = "running" + j.destination_id = destination + + sa_session.add(j) + sa_session.flush() + + def create_job_state_history(app, destination): + sa_session = app.model.context + + u = app.model.User(email="highmemuser@unimelb.edu.au", password="password") + j = app.model.Job() + j.user = u + j.tool_id = "trinity" + j.state = state + j.destination_id = destination + + sa_session.add(j) + sa_session.flush() + + app = mock_galaxy.App( + job_conf=os.path.join(os.path.dirname(__file__), 'fixtures/job_conf_scenario_locations.yml'), + create_model=True) + create_job(app, "highmem_pulsar_1") + create_job(app, "highmem_pulsar_2") + create_job(app, "highmem_pulsar_1") + create_job(app, "highmem_pulsar_2") + + tool = mock_galaxy.Tool('trinity') + user = mock_galaxy.User('pulsar-hm2-user', 'pulsar-hm2-user@unimelb.edu.au', roles=["ga_admins"]) + datasets = [mock_galaxy.DatasetAssociation("input", mock_galaxy.Dataset("input.fastq", + file_size=1000*1024**3, + object_store_id="object_store_australia"))] + rules_file = os.path.join(os.path.dirname(__file__), 'fixtures/scenario-locations-api.yml') + destination = self._map_to_destination(tool, user, datasets=datasets, tpv_config_path=rules_file, + job_conf='fixtures/job_conf_scenario_locations.yml', app=app) + self.assertEqual(destination.id, "pulsar_australia") +