From 5f8fe65404c907de470b8322c3c176f808d23eec Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Thu, 6 Jul 2023 10:55:23 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Computational=20backend:=20set=20co?= =?UTF-8?q?ntainer=20limits=20as=20labels=20=E2=9A=A0=EF=B8=8F=20(devops?= =?UTF-8?q?=20checks=20on=20grafana=20dashboards!)=20(#4453)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/models_library/docker.py | 185 ++++++++++++++-- .../models_library/service_settings_labels.py | 75 ++++--- packages/models-library/tests/test_docker.py | 42 ++-- .../utils/rabbitmq.py | 6 +- services/autoscaling/tests/unit/conftest.py | 10 +- .../autoscaling/tests/unit/test_models.py | 16 +- .../tests/unit/test_utils_rabbitmq.py | 22 +- .../schemas/dynamic_services/service.py | 20 +- .../modules/dask_client.py | 4 +- .../dynamic_sidecar/docker_api/__init__.py | 4 - .../dynamic_sidecar/docker_api/_core.py | 203 +++++++++--------- .../dynamic_sidecar/docker_compose_specs.py | 44 +++- .../docker_service_specs/__init__.py | 7 +- .../docker_service_specs/proxy.py | 37 ++-- .../docker_service_specs/settings.py | 66 ++++-- .../docker_service_specs/sidecar.py | 66 +++--- .../scheduler/_core/_events.py | 19 +- .../simcore_service_director_v2/utils/dask.py | 44 ++-- .../tests/unit/test_modules_dask_client.py | 24 ++- ...es_dynamic_sidecar_docker_compose_specs.py | 33 ++- ...test_modules_dynamic_sidecar_docker_api.py | 40 ---- ...es_dynamic_sidecar_docker_service_specs.py | 103 +++++---- .../tests/unit/with_dbs/test_utils_dask.py | 27 ++- .../src/simcore_service_director/producer.py | 131 +++++++---- services/director/tests/test_producer.py | 10 +- 25 files changed, 763 insertions(+), 475 deletions(-) diff --git a/packages/models-library/src/models_library/docker.py b/packages/models-library/src/models_library/docker.py index c0f54d9de04..dcc7e487f54 100644 --- a/packages/models-library/src/models_library/docker.py +++ b/packages/models-library/src/models_library/docker.py @@ -1,11 +1,21 @@ +import contextlib import re +from typing import Any, ClassVar, Final from models_library.generated_models.docker_rest_api import Task from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes import NodeID from models_library.users import UserID -from pydantic import BaseModel, ConstrainedStr, Field +from pydantic import ( + BaseModel, + ByteSize, + ConstrainedStr, + Field, + ValidationError, + parse_obj_as, + root_validator, +) from .basic_regex import DOCKER_GENERIC_TAG_KEY_RE, DOCKER_LABEL_KEY_REGEX @@ -21,25 +31,102 @@ class DockerGenericTag(ConstrainedStr): regex: re.Pattern[str] | None = DOCKER_GENERIC_TAG_KEY_RE -class SimcoreServiceDockerLabelKeys(BaseModel): - # NOTE: in a next PR, this should be moved to packages models-library and used - # all over, and aliases should use io.simcore.service.* - # https://github.com/ITISFoundation/osparc-simcore/issues/3638 +_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX: Final[str] = "io.simcore.runtime." +_BACKWARDS_COMPATIBILITY_SIMCORE_RUNTIME_DOCKER_LABELS_MAP: Final[dict[str, str]] = { + "node_id": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}node-id", + "product_name": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}product-name", + "project_id": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}project-id", + "simcore_user_agent": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}simcore-user-agent", + "study_id": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}project-id", + "user_id": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}user-id", + "uuid": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}node-id", + "mem_limit": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}memory-limit", + "swarm_stack_name": f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}swarm-stack-name", +} +_UNDEFINED_LABEL_VALUE_STR: Final[str] = "undefined" +_UNDEFINED_LABEL_VALUE_INT: Final[str] = "0" - user_id: UserID = Field(..., alias="user_id") - project_id: ProjectID = Field(..., alias="study_id") - node_id: NodeID = Field(..., alias="uuid") - product_name: ProductName = "opsarc" - simcore_user_agent: str = "" +def to_simcore_runtime_docker_label_key(key: str) -> DockerLabelKey: + return DockerLabelKey( + f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key.replace('_', '-').lower()}" + ) - def to_docker_labels(self) -> dict[str, str]: + +class StandardSimcoreDockerLabels(BaseModel): + """ + Represents the standard label on oSparc created containers (not yet services) + In order to create this object in code, please use construct() method! + """ + + user_id: UserID = Field(..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}user-id") + project_id: ProjectID = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}project-id" + ) + node_id: NodeID = Field(..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}node-id") + + product_name: ProductName = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}product-name" + ) + simcore_user_agent: str = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}simcore-user-agent" + ) + + swarm_stack_name: str = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}swarm-stack-name" + ) + + memory_limit: ByteSize = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}memory-limit" + ) + cpu_limit: float = Field( + ..., alias=f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}cpu-limit" + ) + + @root_validator(pre=True) + @classmethod + def _backwards_compatibility(cls, values: dict[str, Any]) -> dict[str, Any]: + # NOTE: this is necessary for dy-sidecar and legacy service until they are adjusted + if mapped_values := { + _BACKWARDS_COMPATIBILITY_SIMCORE_RUNTIME_DOCKER_LABELS_MAP[k]: v + for k, v in values.items() + if k in _BACKWARDS_COMPATIBILITY_SIMCORE_RUNTIME_DOCKER_LABELS_MAP + }: + # these values were sometimes omitted, so let's provide some defaults + for key in ["product-name", "simcore-user-agent", "swarm-stack-name"]: + mapped_values.setdefault( + f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key}", + _UNDEFINED_LABEL_VALUE_STR, + ) + + mapped_values.setdefault( + f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}memory-limit", + _UNDEFINED_LABEL_VALUE_INT, + ) + + def _convert_nano_cpus_to_cpus(nano_cpu: str) -> str: + with contextlib.suppress(ValidationError): + return f"{parse_obj_as(float, nano_cpu) / (1.0*10**9):.2f}" + return _UNDEFINED_LABEL_VALUE_INT + + mapped_values.setdefault( + f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}cpu-limit", + _convert_nano_cpus_to_cpus( + values.get("nano_cpus_limit", _UNDEFINED_LABEL_VALUE_INT) + ), + ) + return mapped_values + return values + + def to_simcore_runtime_docker_labels(self) -> dict[DockerLabelKey, str]: """returns a dictionary of strings as required by docker""" - std_export = self.dict(by_alias=True) - return {k: f"{v}" for k, v in sorted(std_export.items())} + return { + to_simcore_runtime_docker_label_key(k): f"{v}" + for k, v in sorted(self.dict().items()) + } @classmethod - def from_docker_task(cls, docker_task: Task) -> "SimcoreServiceDockerLabelKeys": + def from_docker_task(cls, docker_task: Task) -> "StandardSimcoreDockerLabels": assert docker_task.Spec # nosec assert docker_task.Spec.ContainerSpec # nosec task_labels = docker_task.Spec.ContainerSpec.Labels or {} @@ -47,3 +134,73 @@ def from_docker_task(cls, docker_task: Task) -> "SimcoreServiceDockerLabelKeys": class Config: allow_population_by_field_name = True + schema_extra: ClassVar[dict[str, Any]] = { + "examples": [ + # legacy service labels + { + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "swarm_stack_name": "devel-simcore", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # legacy container labels + { + "mem_limit": "1073741824", + "nano_cpus_limit": "4000000000", + "node_id": "1f963626-66e1-43f1-a777-33955c08b909", + "simcore_user_agent": "puppeteer", + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "swarm_stack_name": "devel-simcore", + "user_id": "5", + }, + # dy-sidecar service labels + { + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "swarm_stack_name": "devel-simcore", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # dy-sidecar container labels + { + "mem_limit": "1073741824", + "nano_cpus_limit": "4000000000", + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # dy-proxy service labels + { + "dynamic-type": "dynamic-sidecar", + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "swarm_stack_name": "devel-simcore", + "type": "dependency-v2", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # dy-proxy container labels + { + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # dy-sidecar user-services labels + { + "product_name": "osparc", + "simcore_user_agent": "puppeteer", + "study_id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "user_id": "5", + "uuid": "1f963626-66e1-43f1-a777-33955c08b909", + }, + # modern both dynamic-sidecar services and computational services + { + "io.simcore.runtime.cpu-limit": "2.4", + "io.simcore.runtime.memory-limit": "1073741824", + "io.simcore.runtime.node-id": "1f963626-66e1-43f1-a777-33955c08b909", + "io.simcore.runtime.product-name": "osparc", + "io.simcore.runtime.project-id": "29f393fc-1410-47b3-b4b9-61dfce21a2a6", + "io.simcore.runtime.simcore-user-agent": "puppeteer", + "io.simcore.runtime.swarm-stack-name": "devel-osparc", + "io.simcore.runtime.user-id": "5", + }, + ] + } diff --git a/packages/models-library/src/models_library/service_settings_labels.py b/packages/models-library/src/models_library/service_settings_labels.py index f3ab6cd763e..8e5df399288 100644 --- a/packages/models-library/src/models_library/service_settings_labels.py +++ b/packages/models-library/src/models_library/service_settings_labels.py @@ -2,10 +2,11 @@ import json import re +from collections.abc import Generator from enum import Enum from functools import cached_property from pathlib import Path -from typing import Any, Final, Iterator, Literal, TypeAlias +from typing import Any, ClassVar, Final, Literal, TypeAlias from pydantic import ( BaseModel, @@ -57,7 +58,7 @@ class ContainerSpec(BaseModel): ) class Config(_BaseConfig): - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ {"Command": ["executable"]}, {"Command": ["executable", "subcommand"]}, @@ -102,7 +103,7 @@ def ensure_backwards_compatible_setting_type(cls, v): return v class Config(_BaseConfig): - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ # constraints { @@ -138,8 +139,8 @@ class Config(_BaseConfig): "value": [ { "ReadOnly": True, - "Source": "/tmp/.X11-unix", # nosec - "Target": "/tmp/.X11-unix", # nosec + "Source": "/tmp/.X11-unix", # nosec # noqa: S108 + "Target": "/tmp/.X11-unix", # nosec # noqa: S108 "Type": "bind", } ], @@ -201,27 +202,24 @@ def validate_volume_limits(cls, v, values) -> str | None: try: parse_obj_as(ByteSize, size_str) except ValidationError as e: - raise ValueError( - f"Provided size='{size_str}' contains invalid charactes: {str(e)}" - ) from e + msg = f"Provided size='{size_str}' contains invalid charactes: {e!s}" + raise ValueError(msg) from e inputs_path: Path | None = values.get("inputs_path") outputs_path: Path | None = values.get("outputs_path") state_paths: list[Path] | None = values.get("state_paths") path = Path(path_str) if not ( - path == inputs_path - or path == outputs_path + path in (inputs_path, outputs_path) or (state_paths is not None and path in state_paths) ): - raise ValueError( - f"{path=} not found in {inputs_path=}, {outputs_path=}, {state_paths=}" - ) + msg = f"path={path!r} not found in inputs_path={inputs_path!r}, outputs_path={outputs_path!r}, state_paths={state_paths!r}" + raise ValueError(msg) return v class Config(_BaseConfig): - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ { "outputs_path": "/tmp/outputs", # nosec @@ -276,7 +274,8 @@ def lower_less_than_upper(cls, v, values) -> PortInt: upper = v lower: PortInt | None = values.get("lower") if lower is None or lower >= upper: - raise ValueError(f"Condition not satisfied: {lower=} < {upper=}") + msg = f"Condition not satisfied: lower={lower!r} < upper={upper!r}" + raise ValueError(msg) return v @@ -288,7 +287,7 @@ class DNSResolver(BaseModel): class Config(_BaseConfig): extra = Extra.allow - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ {"address": "1.1.1.1", "port": 53}, # NOSONAR {"address": "ns1.example.com", "port": 53}, @@ -308,9 +307,9 @@ class NATRule(BaseModel): description="specify a DNS resolver address and port", ) - def iter_tcp_ports(self) -> Iterator[PortInt]: + def iter_tcp_ports(self) -> Generator[PortInt, None, None]: for port in self.tcp_ports: - if type(port) == _PortRange: + if isinstance(port, _PortRange): yield from range(port.lower, port.upper + 1) else: yield port @@ -383,13 +382,11 @@ def needs_dynamic_sidecar(self) -> bool: def compose_spec_requires_container_http_entry(cls, v, values) -> str | None: v = None if v == "" else v if v is None and values.get("compose_spec") is not None: - raise ValueError( - "Field `container_http_entry` must be defined but is missing" - ) + msg = "Field `container_http_entry` must be defined but is missing" + raise ValueError(msg) if v is not None and values.get("compose_spec") is None: - raise ValueError( - "`container_http_entry` not allowed if `compose_spec` is missing" - ) + msg = "`container_http_entry` not allowed if `compose_spec` is missing" + raise ValueError(msg) return v @validator("containers_allowed_outgoing_permit_list") @@ -402,16 +399,14 @@ def _containers_allowed_outgoing_permit_list_in_compose_spec(cls, v, values): if compose_spec is None: keys = set(v.keys()) if len(keys) != 1 or DEFAULT_SINGLE_SERVICE_NAME not in keys: - raise ValueError( - f"Expected only one entry '{DEFAULT_SINGLE_SERVICE_NAME}' not '{keys.pop()}'" - ) + err_msg = f"Expected only one entry '{DEFAULT_SINGLE_SERVICE_NAME}' not '{keys.pop()}'" + raise ValueError(err_msg) else: containers_in_compose_spec = set(compose_spec["services"].keys()) - for container in v.keys(): + for container in v: if container not in containers_in_compose_spec: - raise ValueError( - f"Trying to permit list {container=} which was not found in {compose_spec=}" - ) + err_msg = f"Trying to permit list {container=} which was not found in {compose_spec=}" + raise ValueError(err_msg) return v @@ -424,14 +419,16 @@ def _containers_allowed_outgoing_internet_in_compose_spec(cls, v, values): compose_spec: dict | None = values.get("compose_spec") if compose_spec is None: if {DEFAULT_SINGLE_SERVICE_NAME} != v: - raise ValueError( + err_msg = ( f"Expected only 1 entry '{DEFAULT_SINGLE_SERVICE_NAME}' not '{v}'" ) + raise ValueError(err_msg) else: containers_in_compose_spec = set(compose_spec["services"].keys()) for container in v: if container not in containers_in_compose_spec: - raise ValueError(f"{container=} not found in {compose_spec=}") + err_msg = f"{container=} not found in {compose_spec=}" + raise ValueError(err_msg) return v @root_validator @@ -442,9 +439,10 @@ def not_allowed_in_both_specs(cls, values): "containers_allowed_outgoing_permit_list", } if match_keys & set(values.keys()) != match_keys: - raise ValueError( + err_msg = ( f"Expected the following keys {match_keys} to be present {values=}" ) + raise ValueError(err_msg) containers_allowed_outgoing_internet = values[ "containers_allowed_outgoing_internet" @@ -462,11 +460,12 @@ def not_allowed_in_both_specs(cls, values): containers_allowed_outgoing_permit_list.keys() ) if len(common_containers) > 0: - raise ValueError( + err_msg = ( f"Not allowed {common_containers=} detected between " "`containers-allowed-outgoing-permit-list` and " "`containers-allowed-outgoing-internet`." ) + raise ValueError(err_msg) return values @@ -490,7 +489,7 @@ class SimcoreServiceLabels(DynamicSidecarServiceLabels): spec will be generated before starting the service. """ - settings: Json[SimcoreServiceSettingsLabel] = Field( + settings: Json[SimcoreServiceSettingsLabel] = Field( # type: ignore default_factory=dict, alias="simcore.service.settings", description=( @@ -502,7 +501,7 @@ class SimcoreServiceLabels(DynamicSidecarServiceLabels): class Config(_BaseConfig): extra = Extra.allow - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ # WARNING: do not change order. Used in tests! # legacy service @@ -546,7 +545,7 @@ class Config(_BaseConfig): "init": True, "environment": ["DISPLAY=${DISPLAY}"], "volumes": [ - "/tmp/.X11-unix:/tmp/.X11-unix" # nosec + "/tmp/.X11-unix:/tmp/.X11-unix" # nosec # noqa: S108 ], }, }, diff --git a/packages/models-library/tests/test_docker.py b/packages/models-library/tests/test_docker.py index 066502c03a0..c5f55adce1c 100644 --- a/packages/models-library/tests/test_docker.py +++ b/packages/models-library/tests/test_docker.py @@ -8,9 +8,10 @@ import pytest from faker import Faker from models_library.docker import ( + _SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX, DockerGenericTag, DockerLabelKey, - SimcoreServiceDockerLabelKeys, + StandardSimcoreDockerLabels, ) from pydantic import ValidationError, parse_obj_as @@ -107,31 +108,20 @@ def test_docker_generic_tag(image_name: str, valid: bool): @pytest.mark.parametrize( "obj_data", - [ - pytest.param( - { - "user_id": _faker.pyint(), - "project_id": _faker.uuid4(), - "node_id": _faker.uuid4(), - }, - id="parse_existing_service_labels", - ), - pytest.param( - { - "user_id": _faker.pyint(), - "project_id": _faker.uuid4(), - "node_id": _faker.uuid4(), - "product": "test_p", - "simcore_user_agent": "a-test-puppet", - }, - id="parse_new_service_labels", - ), - ], + StandardSimcoreDockerLabels.Config.schema_extra["examples"], + ids=str, ) def test_simcore_service_docker_label_keys(obj_data: dict[str, Any]): - simcore_service_docker_label_keys = SimcoreServiceDockerLabelKeys.parse_obj( - obj_data + simcore_service_docker_label_keys = StandardSimcoreDockerLabels.parse_obj(obj_data) + exported_dict = simcore_service_docker_label_keys.to_simcore_runtime_docker_labels() + assert all( + isinstance(v, str) for v in exported_dict.values() + ), "docker labels must be strings!" + assert all( + key.startswith(_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX) for key in exported_dict + ) + re_imported_docker_label_keys = parse_obj_as( + StandardSimcoreDockerLabels, exported_dict ) - exported_dict = simcore_service_docker_label_keys.to_docker_labels() - assert all(isinstance(v, str) for v in exported_dict.values()) - assert parse_obj_as(SimcoreServiceDockerLabelKeys, exported_dict) + assert re_imported_docker_label_keys + assert simcore_service_docker_label_keys == re_imported_docker_label_keys diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py index 5e5a11fef4a..591733c7554 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py @@ -3,7 +3,7 @@ import logging from fastapi import FastAPI -from models_library.docker import SimcoreServiceDockerLabelKeys +from models_library.docker import StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import Task from models_library.rabbitmq_messages import ( LoggerRabbitMessage, @@ -42,7 +42,7 @@ async def progress_tasks_message( async def post_task_progress_message(app: FastAPI, task: Task, progress: float) -> None: with log_catch(logger, reraise=False): - simcore_label_keys = SimcoreServiceDockerLabelKeys.from_docker_task(task) + simcore_label_keys = StandardSimcoreDockerLabels.from_docker_task(task) message = ProgressRabbitMessageNode.construct( node_id=simcore_label_keys.node_id, user_id=simcore_label_keys.user_id, @@ -55,7 +55,7 @@ async def post_task_progress_message(app: FastAPI, task: Task, progress: float) async def post_task_log_message(app: FastAPI, task: Task, log: str, level: int) -> None: with log_catch(logger, reraise=False): - simcore_label_keys = SimcoreServiceDockerLabelKeys.from_docker_task(task) + simcore_label_keys = StandardSimcoreDockerLabels.from_docker_task(task) message = LoggerRabbitMessage.construct( node_id=simcore_label_keys.node_id, user_id=simcore_label_keys.user_id, diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 0ce5651b043..8eab3f9db9e 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -21,7 +21,7 @@ from faker import Faker from fakeredis.aioredis import FakeRedis from fastapi import FastAPI -from models_library.docker import DockerLabelKey, SimcoreServiceDockerLabelKeys +from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import ( Availability, Node, @@ -287,13 +287,13 @@ async def create_service( docker_swarm: None, faker: Faker, ) -> AsyncIterator[ - Callable[[dict[str, Any], dict[str, str] | None], Awaitable[Service]] + Callable[[dict[str, Any], dict[DockerLabelKey, str] | None], Awaitable[Service]] ]: created_services = [] async def _creator( task_template: dict[str, Any], - labels: dict[str, str] | None = None, + labels: dict[DockerLabelKey, str] | None = None, wait_for_service_state="running", ) -> Service: service_name = f"pytest_{faker.pystr()}" @@ -613,8 +613,8 @@ def host_memory_total() -> ByteSize: @pytest.fixture def osparc_docker_label_keys( faker: Faker, -) -> SimcoreServiceDockerLabelKeys: - return SimcoreServiceDockerLabelKeys.parse_obj( +) -> StandardSimcoreDockerLabels: + return StandardSimcoreDockerLabels.parse_obj( dict(user_id=faker.pyint(), project_id=faker.uuid4(), node_id=faker.uuid4()) ) diff --git a/services/autoscaling/tests/unit/test_models.py b/services/autoscaling/tests/unit/test_models.py index 77b31779ccc..fdb9362591a 100644 --- a/services/autoscaling/tests/unit/test_models.py +++ b/services/autoscaling/tests/unit/test_models.py @@ -7,7 +7,7 @@ import aiodocker import pytest -from models_library.docker import SimcoreServiceDockerLabelKeys +from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import Service, Task from pydantic import ByteSize, ValidationError, parse_obj_as from simcore_service_autoscaling.models import Resources @@ -107,17 +107,21 @@ async def test_get_simcore_service_docker_labels_from_task_with_missing_labels_r assert service_tasks assert len(service_tasks) == 1 with pytest.raises(ValidationError): - SimcoreServiceDockerLabelKeys.from_docker_task(service_tasks[0]) + StandardSimcoreDockerLabels.from_docker_task(service_tasks[0]) async def test_get_simcore_service_docker_labels( async_docker_client: aiodocker.Docker, - create_service: Callable[[dict[str, Any], dict[str, str], str], Awaitable[Service]], + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] + ], task_template: dict[str, Any], - osparc_docker_label_keys: SimcoreServiceDockerLabelKeys, + osparc_docker_label_keys: StandardSimcoreDockerLabels, ): service_with_labels = await create_service( - task_template, osparc_docker_label_keys.to_docker_labels(), "running" + task_template, + osparc_docker_label_keys.to_simcore_runtime_docker_labels(), + "running", ) assert service_with_labels.Spec service_tasks = parse_obj_as( @@ -128,7 +132,7 @@ async def test_get_simcore_service_docker_labels( ) assert service_tasks assert len(service_tasks) == 1 - task_ownership = SimcoreServiceDockerLabelKeys.from_docker_task(service_tasks[0]) + task_ownership = StandardSimcoreDockerLabels.from_docker_task(service_tasks[0]) assert task_ownership assert task_ownership.user_id == osparc_docker_label_keys.user_id assert task_ownership.project_id == osparc_docker_label_keys.project_id diff --git a/services/autoscaling/tests/unit/test_utils_rabbitmq.py b/services/autoscaling/tests/unit/test_utils_rabbitmq.py index e7042d68032..e5350a393b0 100644 --- a/services/autoscaling/tests/unit/test_utils_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_utils_rabbitmq.py @@ -9,7 +9,7 @@ import aiodocker from faker import Faker from fastapi import FastAPI -from models_library.docker import DockerLabelKey, SimcoreServiceDockerLabelKeys +from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import Service, Task from models_library.rabbitmq_messages import ( LoggerRabbitMessage, @@ -54,9 +54,11 @@ async def test_post_task_log_message( rabbitmq_client: Callable[[str], RabbitMQClient], mocker: MockerFixture, async_docker_client: aiodocker.Docker, - create_service: Callable[[dict[str, Any], dict[str, str], str], Awaitable[Service]], + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] + ], task_template: dict[str, Any], - osparc_docker_label_keys: SimcoreServiceDockerLabelKeys, + osparc_docker_label_keys: StandardSimcoreDockerLabels, faker: Faker, ): mocked_message_handler = mocker.AsyncMock(return_value=True) @@ -68,7 +70,9 @@ async def test_post_task_log_message( ) service_with_labels = await create_service( - task_template, osparc_docker_label_keys.to_docker_labels(), "running" + task_template, + osparc_docker_label_keys.to_simcore_runtime_docker_labels(), + "running", ) assert service_with_labels.Spec service_tasks = parse_obj_as( @@ -140,9 +144,11 @@ async def test_post_task_progress_message( rabbitmq_client: Callable[[str], RabbitMQClient], mocker: MockerFixture, async_docker_client: aiodocker.Docker, - create_service: Callable[[dict[str, Any], dict[str, str], str], Awaitable[Service]], + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] + ], task_template: dict[str, Any], - osparc_docker_label_keys: SimcoreServiceDockerLabelKeys, + osparc_docker_label_keys: StandardSimcoreDockerLabels, faker: Faker, ): mocked_message_handler = mocker.AsyncMock(return_value=True) @@ -154,7 +160,9 @@ async def test_post_task_progress_message( ) service_with_labels = await create_service( - task_template, osparc_docker_label_keys.to_docker_labels(), "running" + task_template, + osparc_docker_label_keys.to_simcore_runtime_docker_labels(), + "running", ) assert service_with_labels.Spec service_tasks = parse_obj_as( diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py index 6368b479f7b..fbd32f78d88 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py @@ -1,7 +1,7 @@ from enum import Enum, unique from functools import cached_property, lru_cache, total_ordering from pathlib import Path -from typing import Any +from typing import Any, ClassVar from models_library.basic_types import PortInt from models_library.projects import ProjectID @@ -10,20 +10,6 @@ from models_library.users import UserID from pydantic import BaseModel, Field -from ....meta import API_VTAG - - -@unique -class ServiceType(Enum): - """ - Used to filter out services spawned by this service in the stack. - The version was added to distinguish from the ones spawned by director-v0 - These values are attached to the dynamic-sidecar and its relative proxy. - """ - - MAIN = f"main-{API_VTAG}" - DEPENDENCY = f"dependency-{API_VTAG}" - class CommonServiceDetails(BaseModel): key: DynamicServiceKey = Field( @@ -56,7 +42,7 @@ class ServiceDetails(CommonServiceDetails): class Config: allow_population_by_field_name = True - schema_extra: dict[str, Any] = { + schema_extra: ClassVar[dict[str, Any]] = { "example": { "key": "simcore/services/dynamic/3dviewer", "version": "2.4.5", @@ -172,7 +158,7 @@ def from_scheduler_data( class Config(ServiceDetails.Config): keep_untouched = (cached_property,) - schema_extra = { + schema_extra: ClassVar[dict[str, Any]] = { "examples": [ { "boot_type": "V0", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 4a76d121637..3142a22d3b5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -303,12 +303,12 @@ def _comp_sidecar_fct( # noqa: PLR0913 node_id, file_link_type=self.tasks_file_link_type, ) - task_labels = await compute_task_labels( - self.app, + task_labels = compute_task_labels( user_id=user_id, project_id=project_id, node_id=node_id, metadata=metadata, + node_requirements=node_image.node_requirements, ) task_envs = await compute_task_envs( self.app, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py index 2fd7fa99031..99cab1542c5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py @@ -9,10 +9,8 @@ get_or_create_networks_ids, get_projects_networks_containers, get_swarm_network, - inspect_service, is_dynamic_sidecar_stack_missing, is_sidecar_running, - list_dynamic_sidecar_services, remove_dynamic_sidecar_network, remove_dynamic_sidecar_stack, try_to_remove_network, @@ -31,10 +29,8 @@ "get_or_create_networks_ids", "get_projects_networks_containers", "get_swarm_network", - "inspect_service", "is_sidecar_running", "is_dynamic_sidecar_stack_missing", - "list_dynamic_sidecar_services", "remove_dynamic_sidecar_network", "remove_dynamic_sidecar_stack", "remove_pending_volume_removal_services", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py index c667c7d2cea..6d5e133cdf7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py @@ -1,17 +1,19 @@ import json import logging -from typing import Any, Mapping +import warnings +from collections.abc import Mapping +from typing import Any, Final import aiodocker from aiodocker.utils import clean_filters, clean_map -from fastapi import status from fastapi.encoders import jsonable_encoder from models_library.aiodocker_api import AioDockerServiceSpec +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.users import UserID from servicelib.json_serialization import json_dumps from servicelib.utils import logged_gather +from starlette import status from tenacity import TryAgain, retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -23,12 +25,7 @@ DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL, DYNAMIC_SIDECAR_SERVICE_PREFIX, ) -from ....models.schemas.dynamic_services import ( - SchedulerData, - ServiceId, - ServiceState, - ServiceType, -) +from ....models.schemas.dynamic_services import SchedulerData, ServiceId, ServiceState from ....models.schemas.dynamic_services.scheduler import NetworkId from ....utils.dict_utils import get_leaf_key_paths, nested_update from ..docker_states import TASK_STATES_RUNNING, extract_task_state @@ -57,10 +54,8 @@ async def get_swarm_network(dynamic_sidecar_settings: DynamicSidecarSettings) -> x for x in all_networks if "swarm" in x["Scope"] and network_name in x["Name"] ] if not networks or len(networks) > 1: - raise DynamicSidecarError( - f"Swarm network name (searching for '*{network_name}*') is not configured." - f"Found following networks: {networks}" - ) + msg = f"Swarm network name (searching for '*{network_name}*') is not configured.Found following networks: {networks}" + raise DynamicSidecarError(msg) return networks[0] @@ -74,7 +69,7 @@ async def create_network(network_config: dict[str, Any]) -> NetworkId: network_name = network_config["Name"] # make sure the current error being trapped is network dose not exit if f"network with name {network_name} already exists" not in str(e): - raise e + raise # Fetch network name if network already exists. # The environment is trashed because there seems to be an issue @@ -89,9 +84,8 @@ async def create_network(network_config: dict[str, Any]) -> NetworkId: # finally raise an error if a network cannot be spawned # pylint: disable=raise-missing-from - raise DynamicSidecarError( - f"Could not create or recover a network ID for {network_config}" - ) + msg = f"Could not create or recover a network ID for {network_config}" + raise DynamicSidecarError(msg) from e async def create_service_and_get_id( @@ -113,33 +107,22 @@ async def create_service_and_get_id( ) if "ID" not in service_start_result: - raise DynamicSidecarError( - f"Error while starting service: {str(service_start_result)}" - ) + msg = f"Error while starting service: {service_start_result!s}" + raise DynamicSidecarError(msg) service_id: ServiceId = service_start_result["ID"] return service_id -async def inspect_service(service_id: str) -> dict[str, Any]: - async with docker_client() as client: - inspect_result: dict[str, Any] = await client.services.inspect(service_id) - return inspect_result - - async def get_dynamic_sidecars_to_observe( dynamic_sidecar_settings: DynamicSidecarSettings, ) -> list[SchedulerData]: """called when scheduler is started to discover new services to observe""" async with docker_client() as client: - running_dynamic_sidecar_services: list[ - Mapping[str, Any] - ] = await client.services.list( - filters={ - "label": [ - f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}" - ], - "name": [f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}"], - } + running_dynamic_sidecar_services = await _list_docker_services( + client, + node_id=None, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + return_only_sidecars=True, ) return [ SchedulerData.from_service_inspect(x) for x in running_dynamic_sidecar_services @@ -149,22 +132,22 @@ async def get_dynamic_sidecars_to_observe( async def _get_service_latest_task(service_id: str) -> Mapping[str, Any]: try: async with docker_client() as client: - running_services = await client.tasks.list( + service_associated_tasks = await client.tasks.list( filters={"service": f"{service_id}"} ) - if not running_services: - raise DockerServiceNotFoundError(service_id=service_id) + if not service_associated_tasks: + raise DockerServiceNotFoundError(service_id=service_id) # noqa: TRY301 # The service might have more then one task because the # previous might have died out. # Only interested in the latest task as only one task per # service will be running. - sorted_tasks = sorted(running_services, key=lambda task: task["UpdatedAt"]) # type: ignore + sorted_tasks = sorted(service_associated_tasks, key=lambda task: task["UpdatedAt"]) # type: ignore last_task: Mapping[str, Any] = sorted_tasks[-1] return last_task except GenericDockerError as err: - if err.original_exception.status == 404: + if err.original_exception.status == status.HTTP_404_NOT_FOUND: raise DockerServiceNotFoundError(service_id=service_id) from err raise @@ -208,10 +191,8 @@ async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, A docker_node_id: None | str = task.get("NodeID", None) if not docker_node_id: - raise DynamicSidecarError( - f"Could not find an assigned NodeID for service_id={service_id}. " - f"Last task inspect result: {task}" - ) + msg = f"Could not find an assigned NodeID for service_id={service_id}. Last task inspect result: {task}" + raise DynamicSidecarError(msg) return docker_node_id @@ -222,58 +203,90 @@ async def get_dynamic_sidecar_state(service_id: str) -> tuple[ServiceState, str] return service_state, message -async def _get_dynamic_sidecar_stack_services( - node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings -) -> list[Mapping]: - filters = { - "label": [ - f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}", - f"uuid={node_uuid}", - ] - } - async with docker_client() as client: - list_result: list[Mapping] = await client.services.list(filters=filters) - return list_result - - async def is_dynamic_sidecar_stack_missing( node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings ) -> bool: """Check if the proxy and the dynamic-sidecar are absent""" - stack_services = await _get_dynamic_sidecar_stack_services( - node_uuid, dynamic_sidecar_settings - ) + async with docker_client() as client: + stack_services = await _list_docker_services( + client, + node_id=node_uuid, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + return_only_sidecars=False, + ) return len(stack_services) == 0 +_NUM_SIDECAR_STACK_SERVICES: Final[int] = 2 + + async def are_sidecar_and_proxy_services_present( node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings ) -> bool: """ The dynamic-sidecar stack always expects to have 2 running services """ - stack_services = await _get_dynamic_sidecar_stack_services( - node_uuid, dynamic_sidecar_settings - ) - if len(stack_services) != 2: + async with docker_client() as client: + stack_services = await _list_docker_services( + client, + node_id=node_uuid, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + return_only_sidecars=False, + ) + if len(stack_services) != _NUM_SIDECAR_STACK_SERVICES: return False return True +async def _list_docker_services( + client: aiodocker.docker.Docker, + *, + node_id: NodeID | None, + swarm_stack_name: str, + return_only_sidecars: bool, +) -> list[Mapping]: + # NOTE: this is here for backward compatibility when first deploying this change. + # shall be removed after 1-2 releases without issues + # backwards compatibility part + + def _make_filters(*, backwards_compatible: bool) -> Mapping[str, Any]: + filters = { + "label": [ + f"{'swarm_stack_name' if backwards_compatible else to_simcore_runtime_docker_label_key('swarm_stack_name')}={swarm_stack_name}", + ], + } + if node_id: + filters["label"].append( + f"{'uuid' if backwards_compatible else to_simcore_runtime_docker_label_key('node_id')}={node_id}" + ) + if return_only_sidecars: + filters["name"] = [f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}"] + return filters + + warnings.warn( + "After PR#4453 [https://github.com/ITISFoundation/osparc-simcore/pull/4453] reaches" + " production, the backwards compatible code may be removed", + stacklevel=2, + ) + services_list: list[Mapping] = await client.services.list( + filters=_make_filters(backwards_compatible=True) + ) + await client.services.list(filters=_make_filters(backwards_compatible=False)) + return services_list + + async def remove_dynamic_sidecar_stack( node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings ) -> None: """Removes all services from the stack, in theory there should only be 2 services""" async with docker_client() as client: - services_to_remove = await client.services.list( - filters={ - "label": [ - f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}", - f"uuid={node_uuid}", - ] - } + services_to_remove = await _list_docker_services( + client, + node_id=node_uuid, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + return_only_sidecars=False, ) + if services_to_remove: await logged_gather( *( @@ -299,41 +312,15 @@ async def remove_dynamic_sidecar_network(network_name: str) -> bool: return False -async def list_dynamic_sidecar_services( - dynamic_sidecar_settings: DynamicSidecarSettings, - user_id: UserID | None = None, - project_id: ProjectID | None = None, -) -> list[dict[str, Any]]: - service_filters = { - "label": [ - f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}", - ], - "name": [f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}"], - } - if user_id is not None: - service_filters["label"].append(f"user_id={user_id}") - if project_id is not None: - service_filters["label"].append(f"study_id={project_id}") - - async with docker_client() as client: - list_result: list[dict[str, Any]] = await client.services.list( - filters=service_filters - ) - return list_result - - async def is_sidecar_running( node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings ) -> bool: async with docker_client() as client: - sidecar_service_list = await client.services.list( - filters={ - "label": [ - f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}", - f"type={ServiceType.MAIN.value}", - f"uuid={node_uuid}", - ] - } + sidecar_service_list = await _list_docker_services( + client, + node_id=node_uuid, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + return_only_sidecars=True, ) if len(sidecar_service_list) != 1: return False @@ -388,7 +375,7 @@ async def _get_id_from_name(client, network_name: str) -> str: *[_get_id_from_name(client, network) for network in networks] ) - return dict(zip(networks, networks_ids)) + return dict(zip(networks, networks_ids, strict=True)) async def get_projects_networks_containers( @@ -402,7 +389,9 @@ async def get_projects_networks_containers( params = {"filters": clean_filters({"label": [f"project_id={project_id}"]})} filtered_networks = ( # pylint:disable=protected-access - await client.networks.docker._query_json("networks", params=params) + await client.networks.docker._query_json( # noqa: SLF001 + "networks", params=params + ) ) if not filtered_networks: @@ -465,7 +454,7 @@ async def _update_service_spec( include=get_leaf_key_paths(update_in_service_spec), ) - await client._query_json( # pylint: disable=protected-access + await client._query_json( # pylint: disable=protected-access # noqa: SLF001 f"services/{service_id}/update", method="POST", data=json_dumps(clean_map(updated_spec)), @@ -477,7 +466,7 @@ async def _update_service_spec( and "out of sequence" in e.message ): raise TryAgain() from e - raise e + raise async def update_scheduler_data_label(scheduler_data: SchedulerData) -> None: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py index edb7ead13ff..d1fe61fd984 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py @@ -1,9 +1,9 @@ import logging from copy import deepcopy -from typing import Any, Optional +from typing import Any, Optional, TypedDict from fastapi.applications import FastAPI -from models_library.docker import SimcoreServiceDockerLabelKeys +from models_library.docker import DockerGenericTag, StandardSimcoreDockerLabels from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID @@ -21,6 +21,7 @@ ) from models_library.users import UserID from models_library.utils.docker_compose import replace_env_vars_in_compose_spec +from pydantic import ByteSize from servicelib.json_serialization import json_dumps from servicelib.resources import CPU_RESOURCE_LIMIT_KEY, MEM_RESOURCE_LIMIT_KEY from settings_library.docker_registry import RegistrySettings @@ -118,10 +119,16 @@ def _update_paths_mappings( service_content["environment"] = _environment_section.export_as_list(env_vars) +class _AssignedLimits(TypedDict): + cpu: float + memory: int + + def _update_resource_limits_and_reservations( service_resources: ServiceResourcesDict, service_spec: ComposeSpecLabelDict -) -> None: +) -> dict[DockerGenericTag, _AssignedLimits]: # example: '2.3' -> 2 ; '3.7' -> 3 + assigned_limits = {} docker_compose_major_version: int = int(service_spec["version"].split(".")[0]) for spec_service_key, spec in service_spec["services"].items(): if spec_service_key not in service_resources: @@ -186,6 +193,11 @@ def _update_resource_limits_and_reservations( environment.extend(resource_limits) spec["environment"] = environment + assigned_limits[spec_service_key] = _AssignedLimits( + cpu=nano_cpu_limits, memory=int(memory.limit) + ) + return assigned_limits + def _strip_service_quotas(service_spec: ComposeSpecLabelDict): """ @@ -203,18 +215,29 @@ def _update_container_labels( node_id: NodeID, simcore_user_agent: str, product_name: ProductName, + swarm_stack_name: str, + assigned_limits: dict[DockerGenericTag, _AssignedLimits], ) -> None: - for spec in service_spec["services"].values(): + default_limits = _AssignedLimits(memory=0, cpu=0) + for spec_service_key, spec in service_spec["services"].items(): labels: list[str] = spec.setdefault("labels", []) + container_limits: _AssignedLimits = assigned_limits.get( + spec_service_key, default_limits + ) - label_keys = SimcoreServiceDockerLabelKeys( + label_keys = StandardSimcoreDockerLabels.construct( user_id=user_id, - study_id=project_id, - uuid=node_id, + project_id=project_id, + node_id=node_id, simcore_user_agent=simcore_user_agent, product_name=product_name, + swarm_stack_name=swarm_stack_name, + memory_limit=ByteSize(container_limits["memory"]), + cpu_limit=container_limits["cpu"], ) - docker_labels = [f"{k}={v}" for k, v in label_keys.to_docker_labels().items()] + docker_labels = [ + f"{k}={v}" for k, v in label_keys.to_simcore_runtime_docker_labels().items() + ] for docker_label in docker_labels: if docker_label not in labels: @@ -240,6 +263,7 @@ def assemble_spec( project_id: ProjectID, node_id: NodeID, simcore_user_agent: str, + swarm_stack_name: str, ) -> str: """ returns a docker-compose spec used by @@ -285,7 +309,7 @@ def assemble_spec( _update_paths_mappings(service_spec, paths_mapping) - _update_resource_limits_and_reservations( + assigned_limits = _update_resource_limits_and_reservations( service_resources=service_resources, service_spec=service_spec ) @@ -308,6 +332,8 @@ def assemble_spec( node_id=node_id, product_name=product_name, simcore_user_agent=simcore_user_agent, + swarm_stack_name=swarm_stack_name, + assigned_limits=assigned_limits, ) stringified_service_spec: str = replace_env_vars_in_compose_spec( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/__init__.py index 712f9b47d7c..4c831825053 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/__init__.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/__init__.py @@ -1,12 +1,9 @@ from .proxy import get_dynamic_proxy_spec from .settings import merge_settings_before_use, update_service_params_from_settings -from .sidecar import ( - extract_service_port_from_compose_start_spec, - get_dynamic_sidecar_spec, -) +from .sidecar import extract_service_port_service_settings, get_dynamic_sidecar_spec __all__: tuple[str, ...] = ( - "extract_service_port_from_compose_start_spec", + "extract_service_port_service_settings", "get_dynamic_proxy_spec", "get_dynamic_sidecar_spec", "merge_settings_before_use", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/proxy.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/proxy.py index 90d92dc532c..6f9d90542c3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/proxy.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/proxy.py @@ -1,14 +1,16 @@ from typing import Any +from models_library.docker import StandardSimcoreDockerLabels from models_library.services_resources import ( CPU_10_PERCENT, CPU_100_PERCENT, MEMORY_50MB, MEMORY_250MB, ) +from pydantic import ByteSize from ....core.settings import DynamicSidecarProxySettings, DynamicSidecarSettings -from ....models.schemas.dynamic_services import SchedulerData, ServiceType +from ....models.schemas.dynamic_services import SchedulerData from ._constants import DOCKER_CONTAINER_SPEC_RESTART_POLICY_DEFAULTS @@ -61,7 +63,6 @@ def get_dynamic_proxy_spec( "labels": { # TODO: let's use a pydantic model with descriptions "io.simcore.zone": f"{dynamic_sidecar_settings.TRAEFIK_SIMCORE_ZONE}", - "swarm_stack_name": dynamic_sidecar_settings.SWARM_STACK_NAME, "traefik.docker.network": swarm_network_name, "traefik.enable": "true", f"traefik.http.middlewares.{scheduler_data.proxy_service_name}-security-headers.headers.customresponseheaders.Content-Security-Policy": f"frame-ancestors {scheduler_data.request_dns} {scheduler_data.node_uuid}.services.{scheduler_data.request_dns}", @@ -79,12 +80,18 @@ def get_dynamic_proxy_spec( f"traefik.http.routers.{scheduler_data.proxy_service_name}.priority": "10", f"traefik.http.routers.{scheduler_data.proxy_service_name}.rule": f"hostregexp(`{scheduler_data.node_uuid}.services.{{host:.+}}`)", f"traefik.http.routers.{scheduler_data.proxy_service_name}.middlewares": f"{dynamic_sidecar_settings.SWARM_STACK_NAME}_gzip@docker, {scheduler_data.proxy_service_name}-security-headers", - "type": ServiceType.DEPENDENCY.value, "dynamic_type": "dynamic-sidecar", # tagged as dynamic service - "study_id": f"{scheduler_data.project_id}", - "user_id": f"{scheduler_data.user_id}", - "uuid": f"{scheduler_data.node_uuid}", # needed for removal when project is closed - }, + } + | StandardSimcoreDockerLabels( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + product_name=scheduler_data.product_name, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + memory_limit=ByteSize(MEMORY_50MB), + cpu_limit=float(CPU_10_PERCENT) / 1e9, + ).to_simcore_runtime_docker_labels(), "name": scheduler_data.proxy_service_name, "networks": [swarm_network_id, dynamic_sidecar_network_id], "task_template": { @@ -93,12 +100,16 @@ def get_dynamic_proxy_spec( "Hosts": [], "Image": f"caddy:{proxy_settings.DYNAMIC_SIDECAR_CADDY_VERSION}", "Init": True, - "Labels": { - # NOTE: these labels get on the tasks and that is also useful to trace - "study_id": f"{scheduler_data.project_id}", - "user_id": f"{scheduler_data.user_id}", - "uuid": f"{scheduler_data.node_uuid}", - }, + "Labels": StandardSimcoreDockerLabels( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + product_name=scheduler_data.product_name, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + memory_limit=ByteSize(MEMORY_50MB), + cpu_limit=float(CPU_10_PERCENT) / 1e9, + ).to_simcore_runtime_docker_labels(), "Command": [ "sh", "-c", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py index de9c88e1ae7..be929c010db 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py @@ -3,7 +3,9 @@ from collections import deque from typing import Any, cast +from models_library.basic_types import PortInt from models_library.boot_options import BootOption, EnvVarKey +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.service_settings_labels import ( ComposeSpecLabelDict, SimcoreServiceLabels, @@ -72,8 +74,29 @@ def _parse_env_settings(settings: list[str]) -> dict: return envs +def extract_service_port_from_settings( + labels_service_settings: SimcoreServiceSettingsLabel, +) -> PortInt: + param: SimcoreServiceSettingLabelEntry + for param in labels_service_settings: + # publishing port on the ingress network. + if param.name == "ports" and param.setting_type == "int": # backward comp + return PortInt(param.value) + # REST-API compatible + if ( + param.setting_type == "EndpointSpec" + and "Ports" in param.value + and ( + isinstance(param.value["Ports"], list) + and "TargetPort" in param.value["Ports"][0] + ) + ): + return PortInt(param.value["Ports"][0]["TargetPort"]) + msg = "service port not found!" + raise ValueError(msg) + + # pylint: disable=too-many-branches -# TODO: PC->ANE: i tend to agree with pylint, perhaps we can refactor this together def update_service_params_from_settings( labels_service_settings: SimcoreServiceSettingsLabel, create_service_params: dict[str, Any], @@ -105,22 +128,6 @@ def update_service_params_from_settings( # NOTE: The Docker REST API reads Reservation when actually it's Reservations create_service_params["task_template"]["Resources"].update(param.value) - # publishing port on the ingress network. - elif param.name == "ports" and param.setting_type == "int": # backward comp - create_service_params["labels"]["port"] = create_service_params["labels"][ - "service_port" - ] = str(param.value) - # REST-API compatible - elif param.setting_type == "EndpointSpec": - if "Ports" in param.value: - if ( - isinstance(param.value["Ports"], list) - and "TargetPort" in param.value["Ports"][0] - ): - create_service_params["labels"]["port"] = create_service_params[ - "labels" - ]["service_port"] = str(param.value["Ports"][0]["TargetPort"]) - # placement constraints elif param.name == "constraints": # python-API compatible create_service_params["task_template"]["Placement"][ @@ -148,11 +155,28 @@ def update_service_params_from_settings( ].extend(mount_settings) container_spec = create_service_params["task_template"]["ContainerSpec"] - # set labels for CPU and Memory limits - container_spec["Labels"]["nano_cpus_limit"] = str( - create_service_params["task_template"]["Resources"]["Limits"]["NanoCPUs"] + # set labels for CPU and Memory limits, for both service and container labels + # NOTE: cpu-limit is a float not NanoCPUs!! + container_spec["Labels"][ + f"{to_simcore_runtime_docker_label_key('cpu-limit')}" + ] = str( + float(create_service_params["task_template"]["Resources"]["Limits"]["NanoCPUs"]) + / (1 * 10**9) + ) + create_service_params["labels"][ + f"{to_simcore_runtime_docker_label_key('cpu-limit')}" + ] = str( + float(create_service_params["task_template"]["Resources"]["Limits"]["NanoCPUs"]) + / (1 * 10**9) + ) + container_spec["Labels"][ + f"{to_simcore_runtime_docker_label_key('memory-limit')}" + ] = str( + create_service_params["task_template"]["Resources"]["Limits"]["MemoryBytes"] ) - container_spec["Labels"]["mem_limit"] = str( + create_service_params["labels"][ + f"{to_simcore_runtime_docker_label_key('memory-limit')}" + ] = str( create_service_params["task_template"]["Resources"]["Limits"]["MemoryBytes"] ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index ea582cb2784..889bc864310 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -2,27 +2,33 @@ from copy import deepcopy from models_library.aiodocker_api import AioDockerServiceSpec -from models_library.basic_types import BootModeEnum +from models_library.basic_types import BootModeEnum, PortInt +from models_library.docker import ( + StandardSimcoreDockerLabels, + to_simcore_runtime_docker_label_key, +) from models_library.service_settings_labels import SimcoreServiceSettingsLabel -from pydantic import parse_obj_as +from pydantic import ByteSize from servicelib.json_serialization import json_dumps from ....core.settings import AppSettings, DynamicSidecarSettings from ....models.schemas.constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL -from ....models.schemas.dynamic_services import SchedulerData, ServiceType +from ....models.schemas.dynamic_services import SchedulerData from .._namespace import get_compose_namespace from ..volumes import DynamicSidecarVolumesPathsResolver from ._constants import DOCKER_CONTAINER_SPEC_RESTART_POLICY_DEFAULTS -from .settings import update_service_params_from_settings +from .settings import ( + extract_service_port_from_settings, + update_service_params_from_settings, +) log = logging.getLogger(__name__) -def extract_service_port_from_compose_start_spec( - create_service_params: AioDockerServiceSpec, -) -> int: - assert create_service_params.Labels # nosec - return parse_obj_as(int, create_service_params.Labels["service_port"]) +def extract_service_port_service_settings( + settings: SimcoreServiceSettingsLabel, +) -> PortInt: + return extract_service_port_from_settings(settings) def _get_environment_variables( @@ -240,18 +246,22 @@ def get_dynamic_sidecar_spec( create_service_params = { "endpoint_spec": {"Ports": ports} if ports else {}, "labels": { - "type": ServiceType.MAIN.value, # required to be listed as an interactive service and be properly cleaned up - "user_id": f"{scheduler_data.user_id}", - "port": f"{dynamic_sidecar_settings.DYNAMIC_SIDECAR_PORT}", - "study_id": f"{scheduler_data.project_id}", - # the following are used for scheduling - "uuid": f"{scheduler_data.node_uuid}", # also needed for removal when project is closed - "swarm_stack_name": dynamic_sidecar_settings.SWARM_STACK_NAME, # required for listing services with uuid DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL: scheduler_data.as_label_data(), - "service_image": dynamic_sidecar_settings.DYNAMIC_SIDECAR_IMAGE, - "key": scheduler_data.key, - "version": scheduler_data.version, - }, + to_simcore_runtime_docker_label_key("service_key"): scheduler_data.key, + to_simcore_runtime_docker_label_key( + "service_version" + ): scheduler_data.version, + } + | StandardSimcoreDockerLabels( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + product_name=scheduler_data.product_name, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + memory_limit=ByteSize(0), # this should get overwritten + cpu_limit=0, # this should get overwritten + ).to_simcore_runtime_docker_labels(), "name": scheduler_data.service_name, "networks": [{"Target": swarm_network_id}], "task_template": { @@ -268,12 +278,16 @@ def get_dynamic_sidecar_spec( "CapabilityAdd": [ "CAP_LINUX_IMMUTABLE", ], - "Labels": { - # NOTE: these labels get on the tasks and that is also useful to trace - "user_id": f"{scheduler_data.user_id}", - "study_id": f"{scheduler_data.project_id}", - "uuid": f"{scheduler_data.node_uuid}", - }, + "Labels": StandardSimcoreDockerLabels( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + product_name=scheduler_data.product_name, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + memory_limit=ByteSize(0), # this should get overwritten + cpu_limit=0, # this should get overwritten + ).to_simcore_runtime_docker_labels(), "Mounts": mounts, "Secrets": [ { diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py index 068d22003ba..d870f64a403 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py @@ -58,7 +58,7 @@ ) from ...docker_compose_specs import assemble_spec from ...docker_service_specs import ( - extract_service_port_from_compose_start_spec, + extract_service_port_service_settings, get_dynamic_proxy_spec, get_dynamic_sidecar_spec, merge_settings_before_use, @@ -250,9 +250,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: # update service_port and assign it to the status # needed by CreateUserServices action - scheduler_data.service_port = extract_service_port_from_compose_start_spec( - dynamic_sidecar_service_final_spec - ) + scheduler_data.service_port = extract_service_port_service_settings(settings) proxy_settings: DynamicSidecarProxySettings = ( dynamic_sidecar_settings.DYNAMIC_SIDECAR_PROXY_SETTINGS @@ -376,10 +374,11 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: # Extra containers (utilities like forward proxies) can also be present here, # these also are expected to be created or running. - containers_with_error: list[DockerContainerInspect] = [] - for container_inspect in scheduler_data.dynamic_sidecar.containers_inspect: - if container_inspect.status not in _EXPECTED_STATUSES: - containers_with_error.append(container_inspect) + containers_with_error: list[DockerContainerInspect] = [ + container_inspect + for container_inspect in scheduler_data.dynamic_sidecar.containers_inspect + if container_inspect.status not in _EXPECTED_STATUSES + ] if len(containers_with_error) > 0: raise UnexpectedContainerStatusError( @@ -443,7 +442,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: or scheduler_data.dynamic_sidecar.swarm_network_name is None or scheduler_data.proxy_admin_api_port is None ): - raise ValueError( + msg = ( "Did not expect None for any of the following: " f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_id=} " f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id=} " @@ -451,6 +450,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: f"{scheduler_data.dynamic_sidecar.swarm_network_name=} " f"{scheduler_data.proxy_admin_api_port=}" ) + raise ValueError(msg) # Starts dynamic SIDECAR ------------------------------------- # creates a docker compose spec given the service key and tag @@ -489,6 +489,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: project_id=scheduler_data.project_id, node_id=scheduler_data.node_uuid, simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, ) logger.debug( diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 67be3ac0b42..ee2542d2f2b 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -15,10 +15,13 @@ TaskOutputData, TaskOutputDataSchema, ) -from dask_task_models_library.container_tasks.protocol import ContainerEnvsDict +from dask_task_models_library.container_tasks.protocol import ( + ContainerEnvsDict, + ContainerLabelsDict, +) from fastapi import FastAPI from models_library.clusters import ClusterID -from models_library.docker import SimcoreServiceDockerLabelKeys +from models_library.docker import StandardSimcoreDockerLabels from models_library.errors import ErrorDict from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_nodes_io import NodeID, NodeIDStr @@ -293,35 +296,34 @@ async def compute_service_log_file_upload_link( _UNDEFINED_METADATA: Final[str] = "undefined-label" -async def compute_task_labels( - app: FastAPI, +def compute_task_labels( *, user_id: UserID, project_id: ProjectID, node_id: NodeID, metadata: MetadataDict, -) -> dict[str, str]: + node_requirements: NodeRequirements, +) -> ContainerLabelsDict: product_name = metadata.get("product_name", _UNDEFINED_METADATA) - standard_simcore_labels = SimcoreServiceDockerLabelKeys( - user_id=user_id, - study_id=project_id, - uuid=node_id, - product_name=product_name, - simcore_user_agent=metadata.get("simcore_user_agent", _UNDEFINED_METADATA), - ).to_docker_labels() - all_labels = standard_simcore_labels | { - k: f"{v}" - for k, v in metadata.items() - if k not in ["product_name", "simcore_user_agent"] - } - return await resolve_and_substitute_session_variables_in_specs( - app, - all_labels, + standard_simcore_labels = StandardSimcoreDockerLabels.construct( user_id=user_id, - product_name=product_name, project_id=project_id, node_id=node_id, + product_name=product_name, + simcore_user_agent=metadata.get("simcore_user_agent", _UNDEFINED_METADATA), + swarm_stack_name=_UNDEFINED_METADATA, # NOTE: there is currently no need for this label in the comp backend + memory_limit=node_requirements.ram, + cpu_limit=node_requirements.cpu, + ).to_simcore_runtime_docker_labels() + all_labels = standard_simcore_labels | parse_obj_as( + ContainerLabelsDict, + { + k.lower(): f"{v}" + for k, v in metadata.items() + if k not in ["product_name", "simcore_user_agent"] + }, ) + return all_labels async def compute_task_envs( diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 908d324708f..c4d83b3a24d 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -45,6 +45,7 @@ from fastapi.applications import FastAPI from models_library.api_schemas_storage import LinkType from models_library.clusters import ClusterID, NoAuthentication, SimpleAuthentication +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState @@ -471,7 +472,14 @@ def comp_run_metadata(faker: Faker) -> MetadataDict: @pytest.fixture def task_labels(comp_run_metadata: MetadataDict) -> ContainerLabelsDict: - return comp_run_metadata + return parse_obj_as( + ContainerLabelsDict, + { + k.replace("_", "-").lower(): v + for k, v in comp_run_metadata.items() + if k not in ["product_name", "simcore_user_agent"] + }, + ) async def test_send_computation_task( @@ -523,6 +531,9 @@ def fake_sidecar_fct( # NOTE: We pass another fct so it can run in our localy created dask cluster # NOTE2: since there is only 1 task here, it's ok to pass the nodeID + assert image_params.fake_tasks[node_id].node_requirements is not None + assert image_params.fake_tasks[node_id].node_requirements.cpu + assert image_params.fake_tasks[node_id].node_requirements.ram node_id_to_job_ids = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, @@ -535,9 +546,14 @@ def fake_sidecar_fct( expected_envs={}, expected_labels=task_labels | { - "user_id": f"{user_id}", - "study_id": f"{project_id}", - "uuid": f"{node_id}", + f"{to_simcore_runtime_docker_label_key('user-id')}": f"{user_id}", + f"{to_simcore_runtime_docker_label_key('project-id')}": f"{project_id}", + f"{to_simcore_runtime_docker_label_key('node-id')}": f"{node_id}", + f"{to_simcore_runtime_docker_label_key('cpu-limit')}": f"{image_params.fake_tasks[node_id].node_requirements.cpu}", + f"{to_simcore_runtime_docker_label_key('memory-limit')}": f"{image_params.fake_tasks[node_id].node_requirements.ram}", + f"{to_simcore_runtime_docker_label_key('product-name')}": f"{comp_run_metadata['product_name']}", + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": f"{comp_run_metadata['simcore_user_agent']}", + f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}": "undefined-label", }, # type: ignore ), metadata=comp_run_metadata, diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_compose_specs.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_compose_specs.py index 7be49039865..5b1da143d28 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_compose_specs.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_compose_specs.py @@ -8,6 +8,7 @@ import pytest import yaml +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.service_settings_labels import ( @@ -213,14 +214,23 @@ def test_regression_service_has_no_reservations(): NODE_ID: NodeID = uuid4() SIMCORE_USER_AGENT: str = "a-puppet" PRODUCT_NAME: str = "osparc" +SWARM_STACK_NAME: str = "mystackname" +CPU_LIMIT: float = 4.0 +RAM_LIMIT: int = 1233112423423 -EXPECTED_LABELS: list[str] = [ - f"product_name={PRODUCT_NAME}", - f"simcore_user_agent={SIMCORE_USER_AGENT}", - f"study_id={PROJECT_ID}", - f"user_id={USER_ID}", - f"uuid={NODE_ID}", -] + +EXPECTED_LABELS: list[str] = sorted( + [ + f"{to_simcore_runtime_docker_label_key('product-name')}={PRODUCT_NAME}", + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}={SIMCORE_USER_AGENT}", + f"{to_simcore_runtime_docker_label_key('project-id')}={PROJECT_ID}", + f"{to_simcore_runtime_docker_label_key('user-id')}={USER_ID}", + f"{to_simcore_runtime_docker_label_key('node-id')}={NODE_ID}", + f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}={SWARM_STACK_NAME}", + f"{to_simcore_runtime_docker_label_key('cpu-limit')}=0", + f"{to_simcore_runtime_docker_label_key('memory-limit')}=0", + ] +) @pytest.mark.parametrize( @@ -247,6 +257,13 @@ async def test_update_container_labels( service_spec: dict[str, Any], expected_result: dict[str, Any] ): docker_compose_specs._update_container_labels( - service_spec, USER_ID, PROJECT_ID, NODE_ID, SIMCORE_USER_AGENT, PRODUCT_NAME + service_spec, + USER_ID, + PROJECT_ID, + NODE_ID, + SIMCORE_USER_AGENT, + PRODUCT_NAME, + SWARM_STACK_NAME, + {}, ) assert service_spec == expected_result diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py index 0d739136fd1..8769cd4bdf4 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py @@ -30,7 +30,6 @@ from simcore_service_director_v2.models.schemas.dynamic_services import ( SchedulerData, ServiceState, - ServiceType, ) from simcore_service_director_v2.models.schemas.dynamic_services.scheduler import ( DockerContainerInspect, @@ -237,7 +236,6 @@ def dynamic_sidecar_stack_specs( }, "labels": { "swarm_stack_name": f"{dynamic_sidecar_settings.SWARM_STACK_NAME}", - "type": f"{ServiceType.DEPENDENCY.value}", "uuid": f"{node_uuid}", "user_id": f"{user_id}", "study_id": f"{project_id}", @@ -250,7 +248,6 @@ def dynamic_sidecar_stack_specs( }, "labels": { "swarm_stack_name": f"{dynamic_sidecar_settings.SWARM_STACK_NAME}", - "type": f"{ServiceType.MAIN.value}", "uuid": f"{node_uuid}", "user_id": f"{user_id}", "study_id": f"{project_id}", @@ -448,24 +445,6 @@ async def test_create_service( assert service_id -async def test_inspect_service( - service_spec: dict[str, Any], - cleanup_test_service_name: None, - docker_swarm: None, -): - service_id = await docker_api.create_service_and_get_id(service_spec) - assert service_id - - service_inspect = await docker_api.inspect_service(service_id) - - assert service_inspect["Spec"]["Labels"] == service_spec["labels"] - assert service_inspect["Spec"]["Name"] == service_spec["name"] - assert ( - service_inspect["Spec"]["TaskTemplate"]["ContainerSpec"]["Image"] - == service_spec["task_template"]["ContainerSpec"]["Image"] - ) - - async def test_services_to_observe_exist( dynamic_sidecar_service_name: str, dynamic_sidecar_service_spec: dict[str, Any], @@ -656,25 +635,6 @@ async def test_remove_dynamic_sidecar_network_fails( assert delete_result is False -async def test_list_dynamic_sidecar_services( - user_id: UserID, - project_id: ProjectID, - dynamic_sidecar_settings: DynamicSidecarSettings, - dynamic_sidecar_stack_specs: list[dict[str, Any]], - cleanup_dynamic_sidecar_stack: None, - docker_swarm: None, -): - # start 2 fake services to emulate the dynamic-sidecar stack - for dynamic_sidecar_stack in dynamic_sidecar_stack_specs: - service_id = await docker_api.create_service_and_get_id(dynamic_sidecar_stack) - assert service_id - - services = await docker_api.list_dynamic_sidecar_services( - dynamic_sidecar_settings, user_id=user_id, project_id=project_id - ) - assert len(services) == 1 - - async def test_is_sidecar_running( node_uuid: UUID, dynamic_sidecar_settings: DynamicSidecarSettings, diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 013041c7ec8..ab25be8e8c1 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -4,12 +4,13 @@ import json -from typing import Any, cast +from typing import Any, Mapping, cast import pytest import respx from fastapi import FastAPI from models_library.aiodocker_api import AioDockerServiceSpec +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.service_settings_labels import ( SimcoreServiceLabels, SimcoreServiceSettingsLabel, @@ -132,10 +133,10 @@ def expected_dynamic_sidecar_spec( "key": "simcore/services/dynamic/3dviewer", "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "paths_mapping": { - "inputs_path": "/tmp/inputs", - "outputs_path": "/tmp/outputs", - "state_exclude": ["/tmp/strip_me/*", "*.py"], - "state_paths": ["/tmp/save_1", "/tmp_save_2"], + "inputs_path": "/tmp/inputs", # noqa: S108 + "outputs_path": "/tmp/outputs", # noqa: S108 + "state_exclude": ["/tmp/strip_me/*", "*.py"], # noqa: S108 + "state_paths": ["/tmp/save_1", "/tmp_save_2"], # noqa: S108 }, "product_name": osparc_product_name, "project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", @@ -160,16 +161,16 @@ def expected_dynamic_sidecar_spec( "version": "2.4.5", } ).as_label_data(), - "key": "simcore/services/dynamic/3dviewer", - "version": "2.4.5", - "port": "8888", - "service_image": "local/dynamic-sidecar:MOCK", - "service_port": "8888", - "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "swarm_stack_name": "test_swarm_name", - "type": "main-v2", - "user_id": "234", - "uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", + f"{to_simcore_runtime_docker_label_key('service-key')}": "simcore/services/dynamic/3dviewer", + f"{to_simcore_runtime_docker_label_key('service-version')}": "2.4.5", + f"{to_simcore_runtime_docker_label_key('memory-limit')}": "8589934592", + f"{to_simcore_runtime_docker_label_key('cpu-limit')}": "4.0", + f"{to_simcore_runtime_docker_label_key('project-id')}": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", + f"{to_simcore_runtime_docker_label_key('user-id')}": "234", + f"{to_simcore_runtime_docker_label_key('node-id')}": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", + f"{to_simcore_runtime_docker_label_key('product-name')}": "osparc", + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": "python/test", + f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}": "test_swarm_name", }, "name": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa", "networks": [{"Target": "mocked_swarm_network_id"}], @@ -179,12 +180,14 @@ def expected_dynamic_sidecar_spec( "DYNAMIC_SIDECAR_COMPOSE_NAMESPACE": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa", "DY_SIDECAR_NODE_ID": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "DY_SIDECAR_RUN_ID": f"{run_id}", - "DY_SIDECAR_PATH_INPUTS": "/tmp/inputs", - "DY_SIDECAR_PATH_OUTPUTS": "/tmp/outputs", + "DY_SIDECAR_PATH_INPUTS": "/tmp/inputs", # noqa: S108 + "DY_SIDECAR_PATH_OUTPUTS": "/tmp/outputs", # noqa: S108 "DY_SIDECAR_PROJECT_ID": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "DY_SIDECAR_STATE_EXCLUDE": json_dumps({"*.py", "/tmp/strip_me/*"}), + "DY_SIDECAR_STATE_EXCLUDE": json_dumps( + {"*.py", "/tmp/strip_me/*"} # noqa: S108 + ), "DY_SIDECAR_STATE_PATHS": json_dumps( - ["/tmp/save_1", "/tmp_save_2"] + ["/tmp/save_1", "/tmp_save_2"] # noqa: S108 ), "DY_SIDECAR_USER_ID": "234", "DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS": "False", @@ -225,11 +228,14 @@ def expected_dynamic_sidecar_spec( "Image": "local/dynamic-sidecar:MOCK", "Init": True, "Labels": { - "mem_limit": "8589934592", - "nano_cpus_limit": "4000000000", - "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "user_id": "234", - "uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", + f"{to_simcore_runtime_docker_label_key('memory-limit')}": "8589934592", + f"{to_simcore_runtime_docker_label_key('cpu-limit')}": "4.0", + f"{to_simcore_runtime_docker_label_key('project-id')}": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", + f"{to_simcore_runtime_docker_label_key('user-id')}": "234", + f"{to_simcore_runtime_docker_label_key('node-id')}": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", + f"{to_simcore_runtime_docker_label_key('product-name')}": "osparc", + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": "python/test", + f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}": "test_swarm_name", }, "Mounts": [ { @@ -315,8 +321,8 @@ def expected_dynamic_sidecar_spec( }, { "ReadOnly": True, - "Source": "/tmp/.X11-unix", - "Target": "/tmp/.X11-unix", + "Source": "/tmp/.X11-unix", # noqa: S108 + "Target": "/tmp/.X11-unix", # noqa: S108 "Type": "bind", }, ], @@ -382,8 +388,7 @@ def _dict(model: BaseModel) -> dict[str, Any]: allow_internet_access=False, ) - # NOTE: - exclude_keys = { + exclude_keys: Mapping[int | str, Any] = { "Labels": True, "TaskTemplate": {"ContainerSpec": {"Env": True}}, } @@ -391,28 +396,46 @@ def _dict(model: BaseModel) -> dict[str, Any]: # NOTE: some flakiness here # state_exclude is a set and does not preserve order # when dumping to json it gets converted to a list + assert dynamic_sidecar_spec.TaskTemplate + assert dynamic_sidecar_spec.TaskTemplate.ContainerSpec + assert dynamic_sidecar_spec.TaskTemplate.ContainerSpec.Env + assert dynamic_sidecar_spec.TaskTemplate.ContainerSpec.Env[ + "DY_SIDECAR_STATE_EXCLUDE" + ] + dynamic_sidecar_spec.TaskTemplate.ContainerSpec.Env[ "DY_SIDECAR_STATE_EXCLUDE" - ] = sorted( - dynamic_sidecar_spec.TaskTemplate.ContainerSpec.Env[ - "DY_SIDECAR_STATE_EXCLUDE" - ] + ] = json.dumps( + sorted( + json.loads( + dynamic_sidecar_spec.TaskTemplate.ContainerSpec.Env[ + "DY_SIDECAR_STATE_EXCLUDE" + ] + ) + ) ) + assert expected_dynamic_sidecar_spec_model.TaskTemplate.ContainerSpec.Env[ + "DY_SIDECAR_STATE_EXCLUDE" + ] expected_dynamic_sidecar_spec_model.TaskTemplate.ContainerSpec.Env[ "DY_SIDECAR_STATE_EXCLUDE" - ] = sorted( - expected_dynamic_sidecar_spec_model.TaskTemplate.ContainerSpec.Env[ - "DY_SIDECAR_STATE_EXCLUDE" - ] + ] = json.dumps( + sorted( + json.loads( + expected_dynamic_sidecar_spec_model.TaskTemplate.ContainerSpec.Env[ + "DY_SIDECAR_STATE_EXCLUDE" + ] + ) + ) ) assert dynamic_sidecar_spec.dict( exclude=exclude_keys ) == expected_dynamic_sidecar_spec_model.dict(exclude=exclude_keys) - - assert ( - dynamic_sidecar_spec.Labels.keys() - == expected_dynamic_sidecar_spec_model.Labels.keys() + assert dynamic_sidecar_spec.Labels + assert expected_dynamic_sidecar_spec_model.Labels + assert sorted(dynamic_sidecar_spec.Labels.keys()) == sorted( + expected_dynamic_sidecar_spec_model.Labels.keys() ) assert ( diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py index 615d0725f87..76ca32c566b 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py @@ -30,6 +30,7 @@ from fastapi import FastAPI from models_library.api_schemas_storage import FileUploadLinks, FileUploadSchema from models_library.clusters import ClusterID +from models_library.docker import to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID, SimCoreFileLink, SimcoreS3FileID from models_library.users import UserID @@ -565,18 +566,18 @@ async def test_check_if_cluster_is_able_to_run_pipeline( ( {}, { - "product_name": _UNDEFINED_METADATA, - "simcore_user_agent": _UNDEFINED_METADATA, + f"{to_simcore_runtime_docker_label_key('product-name')}": _UNDEFINED_METADATA, + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": _UNDEFINED_METADATA, }, ), ( { - "product_name": "the awesome osparc", + f"{to_simcore_runtime_docker_label_key('product-name')}": "the awesome osparc", "some-crazy-additional-label": "with awesome value", }, { - "product_name": "the awesome osparc", - "simcore_user_agent": _UNDEFINED_METADATA, + f"{to_simcore_runtime_docker_label_key('product-name')}": "the awesome osparc", + f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": _UNDEFINED_METADATA, "some-crazy-additional-label": "with awesome value", }, ), @@ -592,17 +593,23 @@ async def test_compute_task_labels( expected_additional_task_labels: ContainerLabelsDict, initialized_app: FastAPI, ): - task_labels = await compute_task_labels( - initialized_app, + sleeper_task = published_project.tasks[1] + assert sleeper_task.image + assert sleeper_task.image.node_requirements + task_labels = compute_task_labels( user_id=user_id, project_id=project_id, node_id=node_id, metadata=run_metadata, + node_requirements=sleeper_task.image.node_requirements, ) expected_task_labels = { - "user_id": f"{user_id}", - "study_id": f"{project_id}", - "uuid": f"{node_id}", + f"{to_simcore_runtime_docker_label_key('user-id')}": f"{user_id}", + f"{to_simcore_runtime_docker_label_key('project-id')}": f"{project_id}", + f"{to_simcore_runtime_docker_label_key('node-id')}": f"{node_id}", + f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}": f"{_UNDEFINED_METADATA}", + f"{to_simcore_runtime_docker_label_key('cpu-limit')}": f"{sleeper_task.image.node_requirements.cpu}", + f"{to_simcore_runtime_docker_label_key('memory-limit')}": f"{sleeper_task.image.node_requirements.ram}", } | expected_additional_task_labels assert task_labels == expected_task_labels diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index 642122f9f48..9d3db4d03a8 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -62,7 +62,9 @@ async def _check_node_uuid_available( try: # not filtering by "swarm_stack_name" label because it's safer list_of_running_services_w_uuid = await client.services.list( - filters={"label": "uuid=" + node_uuid} + filters={ + "label": f"{_to_simcore_runtime_docker_label_key('node_id')}={node_uuid}" + } ) except aiodocker.exceptions.DockerError as err: log.exception("Error while retrieving services list") @@ -129,6 +131,13 @@ async def _read_service_settings( return settings +_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX: str = "io.simcore.runtime." + + +def _to_simcore_runtime_docker_label_key(key: str) -> str: + return f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key.replace('_', '-').lower()}" + + # pylint: disable=too-many-branches async def _create_docker_service_params( app: web.Application, @@ -165,11 +174,20 @@ async def _create_docker_service_params( "Hosts": get_system_extra_hosts_raw(config.EXTRA_HOSTS_SUFFIX), "Init": True, "Labels": { - "user_id": user_id, - "study_id": project_id, - "node_id": node_uuid, - "swarm_stack_name": config.SWARM_STACK_NAME, - "simcore_user_agent": request_simcore_user_agent, + _to_simcore_runtime_docker_label_key("user_id"): user_id, + _to_simcore_runtime_docker_label_key("project_id"): project_id, + _to_simcore_runtime_docker_label_key("node_id"): node_uuid, + _to_simcore_runtime_docker_label_key( + "swarm_stack_name" + ): config.SWARM_STACK_NAME, + _to_simcore_runtime_docker_label_key( + "simcore_user_agent" + ): request_simcore_user_agent, + _to_simcore_runtime_docker_label_key( + "product_name" + ): "osparc", # fixed no legacy available in other products + _to_simcore_runtime_docker_label_key("cpu_limit"): "0", + _to_simcore_runtime_docker_label_key("memory_limit"): "0", }, "Mounts": [], } @@ -226,11 +244,23 @@ async def _create_docker_service_params( }, "endpoint_spec": {"Mode": "dnsrr"}, "labels": { - "uuid": node_uuid, - "study_id": project_id, - "user_id": user_id, - "type": "main" if main_service else "dependency", - "swarm_stack_name": config.SWARM_STACK_NAME, + _to_simcore_runtime_docker_label_key("user_id"): user_id, + _to_simcore_runtime_docker_label_key("project_id"): project_id, + _to_simcore_runtime_docker_label_key("node_id"): node_uuid, + _to_simcore_runtime_docker_label_key( + "swarm_stack_name" + ): config.SWARM_STACK_NAME, + _to_simcore_runtime_docker_label_key( + "simcore_user_agent" + ): request_simcore_user_agent, + _to_simcore_runtime_docker_label_key( + "product_name" + ): "osparc", # fixed no legacy available in other products + _to_simcore_runtime_docker_label_key("cpu_limit"): "0", + _to_simcore_runtime_docker_label_key("memory_limit"): "0", + _to_simcore_runtime_docker_label_key("type"): "main" + if main_service + else "dependency", "io.simcore.zone": f"{config.TRAEFIK_SIMCORE_ZONE}", "traefik.enable": "true" if main_service else "false", f"traefik.http.services.{service_name}.loadbalancer.server.port": "8080", @@ -305,9 +335,13 @@ async def _create_docker_service_params( # publishing port on the ingress network. elif param["name"] == "ports" and param["type"] == "int": # backward comp - docker_params["labels"]["port"] = docker_params["labels"][ + docker_params["labels"][ + _to_simcore_runtime_docker_label_key("port") + ] = docker_params["labels"][ f"traefik.http.services.{service_name}.loadbalancer.server.port" - ] = str(param["value"]) + ] = str( + param["value"] + ) # REST-API compatible elif param["type"] == "EndpointSpec": if "Ports" in param["value"]: @@ -315,9 +349,13 @@ async def _create_docker_service_params( isinstance(param["value"]["Ports"], list) and "TargetPort" in param["value"]["Ports"][0] ): - docker_params["labels"]["port"] = docker_params["labels"][ + docker_params["labels"][ + _to_simcore_runtime_docker_label_key("port") + ] = docker_params["labels"][ f"traefik.http.services.{service_name}.loadbalancer.server.port" - ] = str(param["value"]["Ports"][0]["TargetPort"]) + ] = str( + param["value"]["Ports"][0]["TargetPort"] + ) # placement constraints elif param["name"] == "constraints": # python-API compatible @@ -357,8 +395,16 @@ async def _create_docker_service_params( mem_limit = str( docker_params["task_template"]["Resources"]["Limits"]["MemoryBytes"] ) - container_spec["Labels"]["nano_cpus_limit"] = nano_cpus_limit - container_spec["Labels"]["mem_limit"] = mem_limit + docker_params["labels"][ + _to_simcore_runtime_docker_label_key("cpu_limit") + ] = container_spec["Labels"][ + _to_simcore_runtime_docker_label_key("cpu_limit") + ] = f"{float(nano_cpus_limit) / 1e9}" + docker_params["labels"][ + _to_simcore_runtime_docker_label_key("memory_limit") + ] = container_spec["Labels"][ + _to_simcore_runtime_docker_label_key("memory_limit") + ] = mem_limit # and make the container aware of them via env variables resource_limits = { @@ -428,8 +474,10 @@ async def _get_docker_image_port_mapping( target_port = target_ports[0] else: # if empty no port is published but there might still be an internal port defined - if "port" in service["Spec"]["Labels"]: - target_port = int(service["Spec"]["Labels"]["port"]) + if _to_simcore_runtime_docker_label_key("port") in service["Spec"]["Labels"]: + target_port = int( + service["Spec"]["Labels"][_to_simcore_runtime_docker_label_key("port")] + ) return published_port, target_port @@ -454,7 +502,7 @@ async def _pass_port_to_service( port, route, ) - service_url = "http://" + service_name + "/" + route # NOSONAR + service_url = "http://" + service_name + "/" + route # NOSONAR query_string = { "hostname": str(config.PUBLISHED_HOST_NAME), "port": str(port), @@ -481,7 +529,7 @@ async def _create_overlay_network_in_swarm( network_config = { "Name": network_name, "Driver": "overlay", - "Labels": {"uuid": node_uuid}, + "Labels": {_to_simcore_runtime_docker_label_key("node_id"): node_uuid}, } docker_network = await client.networks.create(network_config) log.debug( @@ -508,8 +556,9 @@ async def _remove_overlay_network_of_swarm( x for x in (await client.networks.list()) if x["Labels"] - and "uuid" in x["Labels"] - and x["Labels"]["uuid"] == node_uuid + and _to_simcore_runtime_docker_label_key("node_id") in x["Labels"] + and x["Labels"][_to_simcore_runtime_docker_label_key("node_id")] + == node_uuid ] log.debug("Found %s networks with uuid %s", len(networks), node_uuid) # remove any network in the list (should be only one) @@ -534,7 +583,6 @@ async def _get_service_state( log.debug("Getting service %s state", service_name) tasks = await client.tasks.list(filters={"service": service_name}) - # wait for tasks task_started_time = datetime.utcnow() while (datetime.utcnow() - task_started_time) < timedelta(seconds=20): @@ -945,9 +993,13 @@ async def _get_node_details( service_basepath = results[1] service_state, service_msg = results[2] service_name = service["Spec"]["Name"] - service_uuid = service["Spec"]["Labels"]["uuid"] - user_id = service["Spec"]["Labels"]["user_id"] - project_id = service["Spec"]["Labels"]["study_id"] + service_uuid = service["Spec"]["Labels"][ + _to_simcore_runtime_docker_label_key("node_id") + ] + user_id = service["Spec"]["Labels"][_to_simcore_runtime_docker_label_key("user_id")] + project_id = service["Spec"]["Labels"][ + _to_simcore_runtime_docker_label_key("project_id") + ] # get the published port published_port, target_port = await _get_docker_image_port_mapping(service) @@ -973,11 +1025,18 @@ async def get_services_details( ) -> List[Dict]: async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager try: - filters = ["type=main", f"swarm_stack_name={config.SWARM_STACK_NAME}"] + filters = [ + f"{_to_simcore_runtime_docker_label_key('type')}=main", + f"{_to_simcore_runtime_docker_label_key('swarm_stack_name')}={config.SWARM_STACK_NAME}", + ] if user_id: - filters.append("user_id=" + user_id) + filters.append( + f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id + ) if study_id: - filters.append("study_id=" + study_id) + filters.append( + f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id + ) list_running_services = await client.services.list( filters={"label": filters} ) @@ -1004,9 +1063,9 @@ async def get_service_details(app: web.Application, node_uuid: str) -> Dict: list_running_services_with_uuid = await client.services.list( filters={ "label": [ - f"uuid={node_uuid}", - "type=main", - f"swarm_stack_name={config.SWARM_STACK_NAME}", + f"{_to_simcore_runtime_docker_label_key('node_id')}={node_uuid}", + f"{_to_simcore_runtime_docker_label_key('type')}=main", + f"{_to_simcore_runtime_docker_label_key('swarm_stack_name')}={config.SWARM_STACK_NAME}", ] } ) @@ -1040,7 +1099,7 @@ async def get_service_details(app: web.Application, node_uuid: str) -> Dict: async def _save_service_state(service_host_name: str, session: aiohttp.ClientSession): response: ClientResponse async with session.post( - url=f"http://{service_host_name}/state", # NOSONAR + url=f"http://{service_host_name}/state", # NOSONAR timeout=ServicesCommonSettings().director_dynamic_service_save_timeout, ) as response: try: @@ -1086,8 +1145,8 @@ async def stop_service(app: web.Application, node_uuid: str, save_state: bool) - list_running_services_with_uuid = await client.services.list( filters={ "label": [ - f"uuid={node_uuid}", - f"swarm_stack_name={config.SWARM_STACK_NAME}", + f"{_to_simcore_runtime_docker_label_key('node_id')}={node_uuid}", + f"{_to_simcore_runtime_docker_label_key('swarm_stack_name')}={config.SWARM_STACK_NAME}", ] } ) diff --git a/services/director/tests/test_producer.py b/services/director/tests/test_producer.py index 84811e23462..e8fcc4a6fdb 100644 --- a/services/director/tests/test_producer.py +++ b/services/director/tests/test_producer.py @@ -174,7 +174,7 @@ async def test_service_assigned_env_variables( for service in started_services: service_uuid = service["service_uuid"] list_of_services = client.services.list( - filters={"label": "uuid=" + service_uuid} + filters={"label": f"io.simcore.runtime.node-id={service_uuid}"} ) assert len(list_of_services) == 1 docker_service = list_of_services[0] @@ -219,7 +219,9 @@ async def test_interactive_service_published_port(docker_network, run_services): client = docker.from_env() service_uuid = service["service_uuid"] - list_of_services = client.services.list(filters={"label": "uuid=" + service_uuid}) + list_of_services = client.services.list( + filters={"label": f"io.simcore.runtime.node-id={service_uuid}"} + ) assert len(list_of_services) == 1 docker_service = list_of_services[0] @@ -265,7 +267,7 @@ async def test_interactive_service_in_correct_network( client = docker.from_env() service_uuid = service["service_uuid"] list_of_services = client.services.list( - filters={"label": "uuid=" + service_uuid} + filters={"label": f"io.simcore.runtime.node-id={service_uuid}"} ) assert list_of_services assert len(list_of_services) == 1 @@ -285,7 +287,7 @@ async def test_dependent_services_have_common_network(docker_network, run_servic client = docker.from_env() service_uuid = service["service_uuid"] list_of_services = client.services.list( - filters={"label": "uuid=" + service_uuid} + filters={"label": f"io.simcore.runtime.node-id={service_uuid}"} ) # there is one dependency per service assert len(list_of_services) == 2